增量更新
input { jdbc { jdbc_driver_library => "D:\tools\mysql\mysql-connector-java-5.1.45/mysql-connector-java-5.1.45-bin.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/canyin?characterEncoding=UTF-8&useSSL=false" jdbc_user => "root" jdbc_password => "228151" statement => "SELECT * FROM goods" jdbc_paging_enabled => "true" jdbc_page_size => "50000" schedule => "* * * * *" type => "foods" record_last_run => true last_run_metadata_path => "" clean_run => false } } filter { json { source => "message" remove_field => ["message"] } } output { stdout { codec => rubydebug } elasticsearch { hosts => "127.0.0.1:9200" index => "goods" document_type=>"foods" document_id=>"%{id}" } }
增量更新的参数介绍:
input { stdin { } jdbc { # 数据库地址 端口 数据库名 jdbc_connection_string => "jdbc:mysql://localhost:3306/shen" # 数据库用户名 jdbc_user => "root" # 数据库密码 jdbc_password => "rootroot" # mysql java驱动地址 jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-5.1.43-bin.jar" # 驱动类的名称 jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "50000" #是否记录上次运行的结果 record_last_run => true #记录上次运行结果的文件位置 last_run_metadata_path => "" #是否使用数据库某一列的值, use_column_value => true tracking_column => "id" #numeric或者timestamp tracking_column_type => "numeric" #如果为true则会清除 last_run_metadata_path 的记录,即重新开始同步数据 clean_run => false #sql_last_value根据tracking类型,默认为0或者1970-1-1 statement => "SELECT * FROM TABLE WHERE id > :last_sql_value" # sql 语句文件,对于复杂的查询,可以放在文件中。 # statement_filepath => "filename.sql" # 设置监听间隔,语法与Linux系统Cron相同 schedule => "* * * * *" } } output { stdout { codec => json_lines } elasticsearch { hosts => "localhost:9200" index => "contacts" document_type => "contact" document_id => "%{id}" } }
chedule现在设置成每分钟都执行一次,是为了方便观察行为。statefile这一句是一定要加的。$metrics.lastexecutionstart就是这个脚本的关键所在了,这个指的是上一次脚本执行的时间,可以通过比较这个时间和数据库里的字段来判断是否要更新。
参考文献:http://www.cnblogs.com/cocowool/p/mysql_data_to_elasticsearch_via_logstash.html
http://www.cnblogs.com/zhongshengzhen/p/elasticsearch_logstash.html
配置Logstash https://www.elastic.co/guide/en/logstash/current/configuration.html#