使用logstash将kafka中的元数据写入到es中

目前用的是logstash7.3版本,匹配kafka2.0以上。Logstash7.3 kafka input doc

logstash7.3目前支持的元数据只有,header元数据目前logstash暂时不支持

通过源代码可以看出 logstash-input-kafka

if @decorate_events
	event.set("[@metadata][kafka][topic]", record.topic)
	event.set("[@metadata][kafka][consumer_group]", @group_id)
	event.set("[@metadata][kafka][partition]", record.partition)
	event.set("[@metadata][kafka][offset]", record.offset)
	event.set("[@metadata][kafka][key]", record.key)
	event.set("[@metadata][kafka][timestamp]", record.timestamp)
  • [metadata][kafka][topic]: Original Kafka topic from where the message was consumed.
  • [@metadata][kafka][consumer_group]: Consumer group
  • [@metadata][kafka][partition]: Partition info for this message.
  • [@metadata][kafka][offset]: Original record offset for this message.
  • [@metadata][kafka][key]: Record key, if any.
  • [@metadata][kafka][timestamp]: Timestamp in the Record. Depending on your broker

配置文件如下:

  • 重点在于filter中的 mutate 属性的使用
input {
        kafka {
            bootstrap_servers => "127.0.0.1:9092"
            topics => ["test"]
            group_id => "test"
            #如果使用元数据就不能使用下面的byte字节序列化,否则会报错
            #key_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
            #value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
            consumer_threads => 1
            #默认为false,只有为true的时候才会获取到元数据
			decorate_events => true
			auto_offset_reset => "earliest"
         }
}
filter {
	mutate {
		#从kafka的key中获取数据并按照逗号切割
		split => ["[@metadata][kafka][key]", ","]
		add_field => {
			#将切割后的第一位数据放入自定义的“index”字段中
			"index" => "%{[@metadata][kafka][key][0]}"
		}
	}
}
output {
   elasticsearch {
          user => elastic
          password => changeme
          pool_max => 1000
          pool_max_per_route => 200
          hosts => ["127.0.0.1:9200"]
          index => "test-%{+YYYY.MM.dd}"
   }
    stdout {
        codec => rubydebug
    }
}