Changes

Apache Avro with Kafka

468 bytes added, 16:19, 28 April 2019
Consumers
A '''kafka-avro-console-consumer''' program a /bin mappában található. Használata nagyon hasonlít a producer-re, 3 kötelező paramétere van:
* bootstrap-server: itt meg kell adni a Kafka broker URL-jét
* topic: meg kell adni a Kafka topic nevét, ahova írja ahonnan olvassuk az üzeneteket* property: itt fel tudunk sorolni tetszőleges consumer paramétereket, nekünk itt kettőt kell kötelezően megadniadhatjuk meg a séma regiszter URL-jét: ** schema.registry.url: A séma regiszter elérhetősége
Mikor a consumer üzenetet kap, el fog menni a séma regiszterhez, hogy letöltse az üzenetben kapott séma ID-hez tartozó sémát. Ez alapján fogja deseralizálni az üzentet. Indítsuk el a consumert-t, amjd a command line producer segítségével küldjünk bele Employee üzeneteket. Láthatjuk majd, hogy JSON formátumban meg fogjuk kapni az eredeti üzenetet.
==Java consumer==
A Properties map-ben a szokásos Kafka specifikus paramétereken felül meg kell adjuk a séma regiszter URL-jét és a séma használatára vonatkozó beállításokat.
<source lang="java">
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
===Séma specifikus consumer===
Ha a KafkaAvroDeserializerConfig.'''SPECIFIC_AVRO_READER_CONFIG ''' értéke igaz, akkor a választ egy előre meghatározott objektum típusban várjukfogjuk visszakapni, a példában ez lesz a '''Employee.java'''
<source lang="java">
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
Küldjünk üzeneteket a test-topic-ra. Az üzenetek csak az Employee sémára illeszkedő objektumok lehetnek.
<pre>
2019-04-02 12:25:04 DEBUG AbstractCoordinator:822 - [Consumer clientId=consumer-1, groupId=KafkaExampleAvroConsumer] Received successful Heartbeat response
===Generikus consumer===
Ha a KafkaAvroDeserializerConfig.'''SPECIFIC_AVRO_READER_CONFIG ''' értéke hamis, akkor a választ a válasz paroszlására a '''GenericRecord''' nevű általános célú objektumot kell használni, amiből extra munkával lehet csak kinyerni az eredeti objektum mezőit, cserélbe nem kell séma specifikus consumer-t írni.
<source lang="java">
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "false");
https://www.elastic.co/guide/en/logstash/current/plugins-codecs-avro.html<br>
A logstash-t tehetjük a Kafka elé és mögé is. Ha a Kafka mögé tesszük, pl. a Kafka és az Elasticsearh közé, akkor a logstash-nek kell elvégezni az Avro deszeralizációt az Elastichsearh-be való írás előtt. Az avró avro üzenetek feldolgozására nem képes önállóan a Logstash, szükség van egy megfelelő input avro-kafka-logstash plugin-re, ami el tudja végezni a deseralizációt.
:[[File:ClipCapIt-190419-125341.PNG]]
<br>
<br>
Sajnos az alap docker logstash image nem tartalmazza egyik codek-et semsemmilyen avro támogatást. A plugin-ek listáját a '''bin/logstash-plugin list''' paranccsal ellenőrizhetjük. Láthatjuk, hogy az alap docker image nem tartalmazza az avro plugin-t.
<pre>
# docker run -it docker.elastic.co/logstash/logstash:6.6.2 bin/logstash-plugin list
...
</pre>
Két lehetőségünk van. Egyik megoldás, hogy készítünk egy új image-et a logstash:6.6.2 image-ből kiindulva. Új plugin-t a '''bin/logstash-plugin install ''' paranccsal installálhatunk. Ezt betehetjük egy Dockerfile-ba. A nehézség, hogy a swarm node-okra el kell juttatni a módosított image-t vagy saját repot kell használni.
A másik lehetőség, hogy a dockerhub-on keresünk olyan image-et amibe már installáltak avro plugin-t.
<pre>
bin/logstash-plugin install logstash-codec-avro
</pre>
 
<br>
'''1.'''
https://www.elastic.co/guide/en/logstash/6.5/plugins-codecs-avro.html<br>
Ez a codek nem kapcsolódik a séma regiszterhez, előre meghatározott séma fájl alapján tud dekódolni, amit a fájlrendszerben oda kell másoljunklogstash-be. Ha változik a séma, akkor kézzel ki kell cserélni. <pre>bin/logstash-plugin install logstash-codec-avro</pre>
<br>
'''2.'''
https://github.com/revpoint/logstash-codec-avro_schema_registry<br>
Ez a codek tud csatlakozni a séma regiszterhez. Tehát nem kell kézzel karbantartsuk a sémákat.
<pre>
bin/logstash-plugin install logstash-codec-avro_schema_registry
<br>
A docker-hub-on elérhető két logstash image is, ami tartalmazza a logstash-codec-avro_shcema_registry plugin-t:
* https://hub.docker.com/r/rokasovo/logstash-avro2 --> ocker pull rokasovo/logstash-avro2
<br>
===Logstash config===
A logstash image-en belül a konfigurációs fájl itt vantalálható: /usr/share/logstash/pipeline/'''logstash.conf'''. Ide kell felcsatolni a külső konfigurációs fájlt.<br>A kafka input-ban a codec-nek meg kell adni a '''avro_schema_registry plugin'''-t, amit a '''rokasovo/logstash-avroavro2'''2 image már tartalmaz. Az '''endpoint''' paraméterben kell megadni a schema-registry url-jét. Fontos, hogy itt a belső, kafka-net overlay hálózati nevet adjuk meg, ami megegyezik a stack fájlban a service nevével. Ugyanis a service nevére egy stack-en belül a docker névfeloldást végez. Valamiért a deserializációs osztálynak a '''ByteArrayDeserializer''' osztályt kell megadni, nem a '''KafkaAvroDeserializer''' osztályt, amit (A KafkaAvroDeserializer-t nem is tartalmaz a tartalmazza az avro input plugin és még is működik. )
Az output-ban egyenlőre nem írjuk be Elasticsearch-be az üzeneteket, csak kiírjuk a log-ba.
<pre>
</pre>
Ha a helyére tettük a konfigurációs fájlt, akkor indítsuk el '''docker run''' paranccsal lokálisan felcsatolva a '''/usr/share/logstash/pipeline''' mappába a konfigurációs fájlt, hogy ki tudjuk külön próbálni, hogy a konfiguráció megfelelő e. Persze a Kafka-hoz nem fog tudni csatlakozni, de a szintaktikai hibákat tudjuk ellenőrizni.
<pre>
# docker run -d --name logstash --mount type=bind,source=/home/adam/dockerStore/logstash/config/,target=/usr/share/logstash/pipeline rokasovo/logstash-avro2
Telepítsük fel a docker swarm stack-et:
<pre>
# docker stack deploy -c confluence_swarm_logstashconfluent_swarm_logstash.yaml confluenceconfluentCreating network confluence_kafkaconfluent_kafka-netCreating service confluence_logstashconfluent_logstashCreating service confluence_zookeeperconfluent_zookeeperCreating service confluence_kafkaconfluent_kafkaCreating service confluence_schemaregistryconfluent_schemaregistry
</pre>
Majd nézzük bele a logstash service logjába. Látnunk kell, hogy hozzá tudod csatlakozni a '''test-topic''' nevű topichoz. A séma regiszterhez csak az üzenet deszerializációja közben fog csatlakozni, ezért nem láthatjuk az indulási még a logban.
<pre>
# docker service logs -f confluence_logstashconfluent_logstash
...
[INFO ]Kafka version : 1.0.0