7,540
edits
Changes
→Logstash consumer
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonDeserializer");
</source>
<br>
==Logstash consumer==
Azt már láthattuk
/usr/share/logstash/pipeline/logstash.conf
<pre>
input {
kafka {
decorate_events => true
value_deserializer_class => "org.apache.kafka.common.serialization.StringDeserializer"
topics => ["test2-topic"]
bootstrap_servers => "kafka:29092"
group_id => "AvroConsumerGroupId"
client_id => "AvroConsumerClientId"
}
}
output {
stdout {
codec => rubydebug
}
}
</pre>
...
[Consumer clientId=AvroConsumerClientId-0, groupId=AvroConsumerGroupId] Resetting offset for partition test2-topic-0 to offset 0.
</pre>
<pre>
$ ./kafka-console-producer \
> --broker-list kafka:29092 \
> --topic test2-topic
>this is the test message
</pre>
<pre>
confluent_logstash.1.3qsuyylnulxa@worker0 | {
confluent_logstash.1.3qsuyylnulxa@worker0 | "@timestamp" => 2019-04-21T17:55:08.694Z,
confluent_logstash.1.3qsuyylnulxa@worker0 | "message" => "this is the test message",
confluent_logstash.1.3qsuyylnulxa@worker0 | "@version" => "1"
confluent_logstash.1.3qsuyylnulxa@worker0 | }
</pre>
=Adminisztrációs eszközök=