Changes

Apache Avro with Kafka

1,908 bytes added, 11:07, 19 April 2019
Logstash consumer
A logstash-t tehetjük a Kafka elé és mögé is. Ha a Kafka mögé tesszük, a Kafka és az Elasticsearh közé, akkor a logstash-nek kell elvégezni az Avro deszeralizációt az Elastichsearh-be való írás előtt. Az avró üzenetek feldolgozására nem képes önállóan a Logstash, szükség van egy megfelelő avro-logstash plugin-re, ami el tudja végezni a deseralizációt.
:[[File:ClipCapIt-190419-125341.PNG]]
<br><br>Sajnos az alap docker logstash image nem tartalmazza egyik codek-et sem. A plugin-ek listáját a '''bin/logstash-plugin list''' paranccsal ellenőrizhetjük. Láthatjuk, hogy az alap docker image nem tartalmazza az avro plugin-t. <pre># docker run -it docker.elastic.co/logstash/logstash:6.6.2 bin/logstash-plugin listlogstash-codec-ceflogstash-codec-collectdlogstash-codec-dots...</pre>Két lehetőségünk van. Egyik megoldás, hogy készítünk egy új image-et a logstash:6.6.2 image-ből kiindulva. Új plugin-t a bin/logstash-plugin install paranccsal installálhatunk. Ezt betehetjük egy Dockerfile-ba. A nehézség, hogy a swarm node-okra el kell juttatni a módosított image-t vagy saját repot kell használni. A másik lehetőség, hogy a dockerhub-on keresünk olyan image-et amibe már installáltak avro plugin-t.
===logstash avro plugin===
 
Két AVRO codek érhető a logstash-hez:
'''1.'''
https://www.elastic.co/guide/en/logstash/6.5/plugins-codecs-avro.html<br>
Ez a codek nem kapcsolódik a séma regiszterhez, előre meghatározott séma fájl alapján tud dekódolni, amit a fájlrendszerben oda kell másoljunk. Ha változik a séma, akkor kézzel ki kell cserélni.
'''2.'''
https://github.com/revpoint/logstash-codec-avro_schema_registry<br>
Ez a codek tud csatlakozni a séma regiszterhez.
</pre>
<br><br>Sajnos az alap docker logstash image nem tartalmazza egyik codek-et sem, így nézhetjük meg: <pre># docker run -it docker.elastic.co/logstash/logstash:6.6.2 bin/logstash-plugin listlogstash-codec-ceflogstash-codec-collectdlogstash-codec-dots...</pre>
<br>
* https://hub.docker.com/r/slavirok/logstash-avro --> docker pull slavirok/logstash-avro
Listázzuk ki a második image-ben a plugine-eket. Láthatjuk, hogy az avro plugin köztük van.
<pre>
# docker run -it rokasovo/logstash-avro2 bin/logstash-plugin list
===Swarm architektúra===
A swarm architektúrát bővíteni fogjuk a '''rokasovo/logstash-avro2 logstash''' komponenssel. A logstash a belső '''kafka-net''' overlay hálózaton fogja elérni a schema-registry-t. Az Elasticsearh-öt már nem tesszük be a swarm stack-be, a logstash által feldolgozott üzeneteket csak ki fogjuk loggolni:
:[[File:ClipCapIt-190402-175630.PNG]]
 
 
 
 
<pre>
# docker run -d --name logstash --mount type=bind,source=/home/adam/dockerStore/logstash/config/,target=/usr/share/logstash/pipeline rokasovo/logstash-avro2
</pre>
 
 
 
<pre>
input {
kafka {
codec => avro_schema_registry {
endpoint => "https://${KAFKA_SCHEMA_REGISTRY_SERVER}"
username => "${KAFKA_CLIENTID}"
password => "${KAFKA_CLIENTPASSWORD}"
}
decorate_events => true
value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
...
</pre>
 
 
 
 
 
 
===Logstash config===
 
/usr/share/logstash/pipeline/'''logstash.conf'''
<pre>
input {
kafka {
codec => avro_schema_registry {
endpoint => "http://schemaregistry:8081"
}
decorate_events => true
value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
topics => [
"test-topic"
]
bootstrap_servers => "kafka:29092"
group_id => "AvroConsumerGroupId"
client_id => "AvroConsumerClientId"
}
}
output {
stdout {
codec => rubydebug
}
}
</pre>
<pre>
# docker run -d --name logstash --mount type=bind,source=/home/adam/dockerStore/logstash/config/,target=/usr/share/logstash/pipeline rokasovo/logstash-avro2
</pre>
===Futtatás===
<pre>
[INFO ]Kafka version : 1.0.0
[INFO ]Kafka commitId : aaa7af6d4a11b29d
[INFO ][Consumer clientId=AvroConsumerClientId-0, groupId=AvroConsumerGroupId] Discovered coordinator kafka:29092 (id: 2147483645 rack: null)
[INFO ][Consumer clientId=AvroConsumerClientId-0, groupId=AvroConsumerGroupId] Revoking previously assigned partitions []
[INFO ][Consumer clientId=AvroConsumerClientId-0, groupId=AvroConsumerGroupId] (Re-)joining group
[INFO ][Consumer clientId=AvroConsumerClientId-0, groupId=AvroConsumerGroupId] Successfully joined group with generation 5
[INFO ][Consumer clientId=AvroConsumerClientId-0, groupId=AvroConsumerGroupId] Setting newly assigned partitions [test-topic-0]
</pre>
/usr/share/logstash/pipeline/'''logstash.conf'''
<pre>
input { kafka | { bootstrap_servers | "@version" => "kafka:290921", topics | "age" => 35, | "@timestamp"test=> 2019-04-topic"19T10:39:30.266Z, codec | "phoneNumber" => avro {"", schema_uri | "firstName" => "/usr/share/logstash/pipeline/employee.avscBob", } }} output {  stdout { codec | "lastName" => rubydebug }"Jones" | }
</pre>