码迷,mamicode.com
首页 > 其他好文 > 详细

ELK同步kafka带有key的Message

时间:2019-01-24 21:54:28      阅读:452      评论:0      收藏:0      [点我收藏+]

标签:index   ==   match   fse   ora   str   boot   host   arc   

需求

kafka中的message带有key,带有相同key值的message后入kafka的意味着更新message,message值为null则意味着删除message。
用logstash来同步kafka中这样的数据入Elasticsearch,从而实现可以同步增删改数据。

环境

1) logstash-6.5.4 
2) kafka中topic的message带有key

解决(Logstash的配置如下)

input {
    kafka {
        bootstrap_servers=> "192.168.31.92:9092"
        group_id => "test_group"
        topics => ["test_topic"]
        type => "test_type"
        auto_offset_reset => earliest
        consumer_threads => 1
        decorate_events => true
        codec => plain {
            format => ""
        }
    }
}

filter{
    if ([message] == "") {
        mutate {
            add_field => { "@esaction" => "delete"}
        }
    mutate {
            remove_field => ["@version"]
        }
    } else {
        json {
            source => "message"
        }

        mutate {
            remove_field => ["@version","message"]
        }

        mutate {
            add_field => { "@esaction" => "index"}
        }

        date {
            match => ["updated","UNIX_MS"]
        target => "@timestamp"
        }
    }
}

output {
    elasticsearch {
        hosts => ["192.168.21.80:9200"]
        index => "test_index"
        document_id => "%{[@metadata][kafka][key]}"
        action => "%{[@esaction]}"
        codec => "json"
    }
}

ELK同步kafka带有key的Message

标签:index   ==   match   fse   ora   str   boot   host   arc   

原文地址:https://www.cnblogs.com/firstsword/p/10316855.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!