博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
mysql准实时同步数据到Elasticsearch
阅读量:6002 次
发布时间:2019-06-20

本文共 2897 字,大约阅读时间需要 9 分钟。

4. 安装JDK8、MySQL5.6驱动以及Logstash -6.0.0

ECS中分别安装JDK8、MySQL5.6驱动以及Logstash -6.0.0。如下图:

_

安装Logstash input、output插件,此案例数据输入是MySQL,输出是ES,so相应的插件应该是logstash-input-jdbc和logstash-output-elasticsearch。

安装插件的命令分别是(在Logstash主目录下运行):

./bin/logstash-plugin install logstash-input-jdbc
./bin/logstash-plugin install logstash-output-elasticsearch
_

5. MySQL中创建数据库、测试的数据表

如下图所示

_

建表语句(其中updatetime用于记录数据更新时间戳):

create table jm_es_employee (         id varchar(10),     first_name varchar(20),     last_name varchar(20),     age int(10),     about varchar(100),     interests varchar(100),     updatetime timestamp null default current_timestamp on update current_timestamp );
6. 配置Logstash作业文件

ECS中创建Logstash作业配置文件,文件名为logstash-mysql-es.conf。

配置文件内容:

input{     jdbc {         jdbc_driver_library => "mysql-connector-java-5.1.44-bin.jar"         jdbc_driver_class => "com.mysql.jdbc.Driver"         jdbc_connection_string => "jdbc:mysql://rm-***.mysql.rds.aliyuncs.com:3306/db_name"         jdbc_user => "db_user"         jdbc_password => "db_password"         jdbc_paging_enabled => "true"         jdbc_page_size => "1000"         jdbc_default_timezone =>"Asia/Shanghai"         schedule => "* * * * *"         statement => "select * from jm_es_employee where updatetime > :sql_last_value"         use_column_value => true         tracking_column => "updatetime"         last_run_metadata_path => "./logstash_jdbc_last_run"       } } output{      elasticsearch {         hosts => "es-cn-***.elasticsearch.aliyuncs.com:9200"         user => "elastic"         password => "es_password"         index => "employee"         document_id => "%{id}"      }      stdout {         codec => json_lines     } }

其中红色字体部分要做相应的替换,input中的 schedule参数用于配置数据刷新频率,schedule => " *"表示每分钟刷新一次,这也是MySQL数据同步的最小频率。Logstash支持丰富的参数配置,。

7. 同步数据

ECS中指定参数启动Logstash服务,执行命令:

logstash -f logstash-mysql-es.conf

_

之后每分钟会去MySQL中刷新数据

_

RDS中写入几条测试数据,脚本如下:

INSERT INTO jm_es_employee(id,first_name,last_name,age,about,interests) VALUES('001','John','Smith', 25, 'I love to go rock climbing','[ "sports", "music" ]'); INSERT INTO jm_es_employee(id,first_name,last_name,age,about,interests) VALUES('002','Jane','Smith', 32, 'I like to collect rock albums','[ "music" ]'); INSERT INTO jm_es_employee(id,first_name,last_name,age,about,interests) VALUES('003','Douglas','Fir', 35, 'I like to build cabinets','[ "forestry" ]');

由于之前在Logstash配置文件中,output部分既配置了输出到ES,同时也输出到控制台。所以当检测到MySQL中有更新时,数据会输出到控制台中,如下图:

_

此时说明MySQL中的数据更新已经被Logstash推送到ES服务。通过在ECS执行命令检查ES服务中的索引是否被创建。执行命令:

curl -u elastic:es_password -XGET 'http://es-cn-***.elasticsearch.aliyuncs.com:9200/_cat/indices?v'

_

红框内的employee即我们在配置文件中指定的索引名,说明ES中的索引已经被成功创建。

8. 结果验证

通过关键字检索ES服务,验证写入Mysql的数据是否被成功索引到ES并被检索到,执行命令通过关键字“Smith “来检索数据:

curl -u elastic:es_password -XGET 'http://es-cn-***.elasticsearch.aliyuncs.com:9200/employee/_search?q=last_name:Smith&pretty'

_

至此,MySQL中的数据已经被成功索引到Elasticsearch,并也可以被准实时的检索到。

转载地址:http://xpwmx.baihongyu.com/

你可能感兴趣的文章
Android定时器,推荐ScheduledThreadPoolExecutor
查看>>
百度面试总结
查看>>
php获取当月的第一天以及最后一天
查看>>
ASP.NET Core 源码学习之 Logging[1]:Introduction
查看>>
WSGI、flup、fastcgi、web.py的关系
查看>>
leetcode_question_57 Insert Interval
查看>>
日积月累:ProguardGui进行jar包代码混淆
查看>>
布里斯班初体验
查看>>
如何在Webstorm/Phpstorm中设置连接FTP,并快速进行文件比较,上传下载,同步等操作...
查看>>
将本地jar包添加到maven中
查看>>
Effective Java:对于全部对象都通用的方法
查看>>
使用Cubic Spline通过一组2D点绘制平滑曲线
查看>>
PLM_百度百科
查看>>
对语言之争的看法
查看>>
tcpdump抓包
查看>>
java内存泄漏的定位与分析
查看>>
[置顶] JAXB Hello World
查看>>
GITC2016金山云推出重磅产品 开启云计算新格局
查看>>
TigerGraph 独家解读:图数据库的现状与未来
查看>>
天猫Tmall Discovery发布即食燕窝趋势,随手养生成新潮
查看>>