标签:报错 position bin user 驱动 存在 drive 接收 cal
jdbc默认已安装,如果没安装使用logstash-plugin安装即可(logstash-plugin在logstash的bin目录下):
logstash-plugin install logstash-input-jdbc
mysql最新版是8.0,但使用时报错,这里仍以5.1版本演示
wget https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-5.1.47.zip # 不推荐放/tmp目录,但这里为了与用户无关所以直接解压到/tmp目录 unzip mysql-connector-java-5.1.47.zip -d /tmp
配置如下项,如无意外mysql中数据即可同步到elasticsearch
input{ jdbc { # jdbc:mysql://数据库ip:端口/数据库名 jdbc_connection_string => "jdbc:mysql://10.10.6.91:3306/expdb" # 数据库用户名 jdbc_user => "root" # 数据库密码 jdbc_password => "root" # mysql java驱动文件位置 jdbc_driver_library => "/tmp/mysql-connector-java-5.1.47/mysql-connector-java-5.1.47.jar" # 驱动类名,这个一般不需要改 jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "50000" # 与crontab类似的格式,* * * * *基本隔十来秒就同步一次 schedule => "* * * * *" # 同步时要执行的sql语句,语句最终的结果集同步到elasticsearch; 我这里只是简单将表中所有记录查出 # 如果语句很多,可使用statement_file来指定sql语句文件 statement => "select * from exploit_records" } } filter { } output{ elasticsearch { hosts => ["localhost:9200"] } }
elasticsearch以_id项作为主键,只要传过来的数据中_id项的值尚未存在elasticsearch会接收该数据。
而_id项的值默认其实是数据到来时elasticsearch自己生成的然后添加上去的,所以在这种情况下_id是不可能重复的,即所有数据都会被来者不拒地接收。
logstash jdbc插件对数据库的数据的操作是依照计划(schedule)执行sql语句(statement),所有查出来的数据就统统按output发过去。
在这种模式下,每执行一次计划(schedule)所有记录就会被发往elasticsearch一次,而_id是由elasticsearch生成的不会重复,所以不同次计划(schedule)的同一条记录会被elasticsearch反复收录。
同一条记录被反复收录这种情况一般都不是我们想要的,而要消除这种情况其关键是要将_id变成不是由elasticsearch自动生成,更确切地讲是要将_id变成人为可控地生成。
这种配置也不难,elasticsearch允计通过使用document_id来配置_id。
假设我们表的主键是edb_id,为了避免重复,我们要以表主键值赋值给document_id即可。此时1.3中配置修改如下:
input{ jdbc { # jdbc:mysql://数据库ip:端口/数据库名 jdbc_connection_string => "jdbc:mysql://10.10.6.91:3306/expdb" # 数据库用户名 jdbc_user => "root" # 数据库密码 jdbc_password => "root" # mysql java驱动文件位置 jdbc_driver_library => "/tmp/mysql-connector-java-5.1.47/mysql-connector-java-5.1.47.jar" # 驱动类名,这个一般不需要改 jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "50000" # 与crontab类似的格式,* * * * *基本隔十来秒就同步一次 schedule => "* * * * *" # 同步时要执行的sql语句,语句最终的结果集同步到elasticsearch; 我这里只是简单将表中所有记录查出 # 如果语句很多,可使用statement_file来指定sql语句文件 statement => "select * from exploit_records" } } filter { } output{ elasticsearch { hosts => ["localhost:9200"] # 相比1.3只多加了这一句 # 将表主键edb_id字段的值赋值给elasticsearch的主键 document_id => "%{edb_id}" } }
由于我们这里只查询了一张表,所以2.2中的配置没有什么问题。但只果此时要多输入一张表这张表没有edb_id字段,或者要多输入的根本就不是数据库的表不会有edb_id项,那么按document_id => "%{edb_id}"取到的edb_id会为空,这些数据因_id项值都为空而被视为重复项不为elasticsearch所收录。
要解决这个问题,一种思路是在filter中进行处理,确保每条数据都生成edb_id项。但要给数据加上自己本来就不需要的项这种操作怎么看都不像是最优解。
另外一种思路就是在output中进行限制操作,只有当前这张表才执行document_id => "%{edb_id}"。这是能想到的比较好的办法,此时配置修改如下:
input{ jdbc { # jdbc:mysql://数据库ip:端口/数据库名 jdbc_connection_string => "jdbc:mysql://10.10.6.91:3306/expdb" # 数据库用户名 jdbc_user => "root" # 数据库密码 jdbc_password => "root" # mysql java驱动文件位置 jdbc_driver_library => "/tmp/mysql-connector-java-5.1.47/mysql-connector-java-5.1.47.jar" # 驱动类名,这个一般不需要改 jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "50000" # 与crontab类似的格式,* * * * *基本隔十来秒就同步一次 schedule => "* * * * *" # 同步时要执行的sql语句,语句最终的结果集同步到elasticsearch; 我这里只是简单将表中所有记录查出 # 如果语句很多,可使用statement_file来指定sql语句文件 statement => "select * from exploit_records" } # 多读取一个文件 file { # 要读取的文件的位置 path => ["/home/ls/test.txt"] start_position => "beginning" } } filter { } output{ # 对于ebd_id项不为空的记录使用此项 if [edb_id] != "" { elasticsearch { hosts => ["localhost:9200"] document_id => "%{edb_id}" } } # 对于path匹配test.txt的记录使用此项 if [path] =~ "test.txt"{ elasticsearch { hosts => ["localhost:9200"] } } }
参考:
https://segmentfault.com/a/1190000014387486
标签:报错 position bin user 驱动 存在 drive 接收 cal
原文地址:https://www.cnblogs.com/lsdb/p/9857091.html