Changes

Apache Avro with Kafka

12,125 bytes added, 19:30, 29 April 2019
Mi az Avro?
:[[File:ClipCapIt-190327-180306.PNG|200px]]
=Bevezető=
 
 
 
==Mi az Avro?==
Az Avro egy nyílt forráskódú project, ami egy adata szerializációs szolgáltatás elsősorban az Apache Hadoop-hoz, de nem csak a Hadoop-ban használható, ahogy a mi példánkban is látni fogjuk. Av Az Avro segítségével nagyon hatékonyan cserélhetünk adatokat két végpont között "big data" környezetben.
:[[File:ClipCapIt-190429-213000.PNG]]
 Az Avro alapja egy séma regiszter, mihez amihez mind az adat szerializáló és deszerializáló szolgáltatás kapcsolódik. Itt tárolja az Avro a serializálnadó adatok tervrajzát JSON formátumban, a sémákat verziózva. Amikor a serializáló szolgáltatás adatot akar küldeni, akkor megjelöli az Avro serializátornak hogy melyik séma alapján serializálja a küldendő adatot. Ha a séma még nem létezett, akkor beszúrja a séma regiszterbe. Az Avro binárist készít a séma segítségével a küldendő adatból, és az üzenetbe a bináris adat mellé beleteszi a séma azonosítóját is, amit a deszerializiós szolgáltatás megkap, és annak segítségével ki tudja olvasni az adat deszerializálásához szükséges sémát, ami segítségével előállítja az eredeti üzenetet.
Az Avro séma regiszter több verziót is képes kezelni egy sémából. A beállításoknak megfelelően a séma lehet előre vagy visszafelé kompatibilis. Ha egy séma visszafelé kompatibilis, akkor az új sémával is ki lehet olvasni olyan régi adatokat, amit még egy korábbi sémával írtak be.
Az Avor sémákat JSON formátumban kell leírni, és aminek egyedi, kötött szintaktikájuk van, tehát nem szabványos JSON sémák. A schema-registry-vel el egy REST API-n keresztül lehet kommunikálni. A legtöbb programozási nyelven elérhető Avro magas szintű API, ami elfedi előlünk a REST kommunikációt. Az Avró többféle adatbázisban is tárolhatja a sémákat, de a leggyakoribb megoldás, hogy egy speciális Kafka topic-ban tárolja azokat.
Ahogy azt már írtam, az Avro schema-registry többféle adatbázisban is képes tárolni a sémákat, köztük Kafka-ban is. Kafka estén az Avro regiszter egy kitüntetett topic-ban tárolja a sémákat (_schemas). Ha a kommunikációra is Kafka-t használunk, akkor használhatjuk akár ugyan azt a Kafka broker-t mind két célra, ahogy az alábbi ábrán is látható:
:[[File:ClipCapIt-190327-213711.PNG]]
 
<br>
==Környezet==
confluence_swarmconfluent_swarm.yaml
<source lang="C++">
version: '3.2'
Hozzuk létre a docker stack-et:
<pre>
# docker stack deploy -c confluence_swarmconfluent_swarm.yaml confluenceconfluent
</pre>
# docker service ls
ID NAME MODE REPLICAS IMAGE PORTS
7vjvop7tqiyc confluence_kafka confluent_kafka replicated 1/1 confluentinc/cp-kafka:5.1.2 *:29092->29092/tcpin6a4ti3jeu5 confluence_schemaregistry confluent_schemaregistry replicated 1/1 confluentinc/cp-schema-registry:5.1.2 *:8081->8081/tcpoxxjtkcusj1f confluence_zookeeper confluent_zookeeper replicated 1/1 confluentinc/cp-zookeeper:5.1.2 *:32181->32181/tcp
</pre>
<pre>
NETWORK ID NAME DRIVER SCOPE
...
5albky0eu1to confluence_kafkaconfluent_kafka-net overlay swarm
olqkh5zlqiac ingress overlay swarm
...
* kafka: 192.168.42.42:29092
* schema-registry: 192.168.42.42:8081
 
<br>
=Avro REST interfész=
Az Avro a '''_schemas''' nevű Kafka topic-ban tárolja a sémákat az alapértelmezett konfiguráció szerint. Tehát az AVRO schema-registry-nek szüksége van . . A Kafka /bin mappájában található '''kafka-topics.sh''' topic admin script-el listázzuk ki a topikokat:
A válaszban láthatjuk a kompatibilitási szintet(ezt majd később részletesen tárgyaljuk).
Avro-ban minden sémát egy úgynevezett subject-ek alá kell regisztrálni. Egy subject alatt ugyan azon séma különböző verzióit tároljuk. Tehát két teljesen különböző sémát nem lehet ugyan azon a ugyanazon subject alá berakni. Tehát mikor hasonló sémákat regisztrálunk ugyan azon ugyanazon subject alá, akkor különböző verziók fognak létrejönni ugyan ahoz ahhoz a sémához. Azt hogy mekkora a megengedett eltérés mértéke, a schema-registry server konfigurációja határozza meg.
A sémákat a POST:/subjects/<subject-name>/versions REST interfészen kell beküldeni. A POST body-ban a {schema: "...séma definicíó..."} formátumban kell megadni a sémát, ahol a séma definíció egy escape-lt belső json.
<pre>
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema" : "... schema def..."}' http://192.168.42.42:8081/subjects/<subject-name>/versions
Szúrjuk be az Avro-ba az alábbi '''Employee''' sémát. A namespace majd a schema-to-java kód generálásánál lesz érdekes, ez fogja meghatározni a java csomagotgenerált kódban. A type mező mutatja meg, hogy összetett objektumumot vagy objektumot, sima stringet, vagy tömböt ír le a séma. A '''record''' jelenti az összetett objektumot. Az '''Employee''' nevű objektum négy mezőből áll. <source lang="xmlC++">
{"namespace": "hu.alerant.kafka.avro.message",
"type": "record", "name": "Employee",
Most próbáljunk az előbbitől tejesen különböző '''Company''' sémát regisztrálni szintán a '''test1''' subject alá.
<source lang="xmlC++">
{"namespace": "hu.alerant.kafka.avro.message",
"type": "record", "name": "Company",
Most szúrjuk be az '''Employee''' sémának egy redukált változatát, amiből hiányzik a '''phoneNumber''' mező. Erre teljesűl hogy visszafelé komatibilis.
<source lang="xmlC++">
{"namespace": "hu.alerant.kafka.avro.message",
"type": "record", "name": "Employee",
Szúrjuk ezt be szintén a '''test1''' subject alá:
<pre>
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema" : "{\"namespace\": \"hu.alerant.kafka.avro.message\",\"type\": \"record\", \"name\": \"Employee\", \"fields\": [ {\"name\": \"firstName\", \"type\": \"string\"}, {\"name\": \"lastName\", \"type\": \"string\"}, {\"name\": \"age\", \"type\": \"int\"}]}"}' http://192.168.42.42:8081/subjects/test1/versions
{"subject":"test1","version":1,"id":1,"schema":"{\"type\":\"record\",\"name\":\"Employee\",\"namespace\":\"hu.alerant.kafka.avro.message\",\"fields\":[{\"name\":\"firstName\",\"type\":\"string\"},{\"name\":\"lastName\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"int\"},{\"name\":\"phoneNumber\",\"type\":\"string\"}]}"}
</pre>
 
 
<br>
=Java kód generálás=
A generált osztályba az Avro belegenrálja a sémát is, ez az amit majd a Kafak topic-ba dobás előtt a producer fel fog küldeni a schema-register servernek: szervernek. Employee.java
<source lang="java">
package hu.alerant.kafka.avro.message;
</source>
 
<br>
=Producers=
 
==Command line producers==
 
==Java avro-kafak producer==
 Ahogy a Kafka /bin mappában elérhető volt parancssori producer és consumer, úgy a schema-registry-ben elérhető avro-s producer és consumer. Töltsük le a schema-registry binárist, és menjünk a bin mappába. <pre>$ wget http://packages.confluent.io/archive/1.0/confluent-1.0.1-2.10.4.zip$ unzip confluent-1.0.1-2.10.4.zip$ cd confluent-1.0.1</pre>  A hagyományos Kafka java producer-hez képest csak pár különbség van az a java producer inicializálásban. Egyrészt meg kell adni, hogy mind a kulcsot, mind az üzenetet Avro-val akarjuk serializálni, másrészt meg kell adni az Avro schema-registry URL-jét. A /etc/hosts fájlba felvettük a worker0 swarm node IP címével a '''schema-registry''' host nevet.
<source lang="java">
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroSerializer.class);
Első alkalommal, mikor a producer be akar dobni egy üzenetet a Kafka topic-ba, felküldi a sémát a már látott POST:http://192.168.42.42:8081/subjects/<subject-name>/versions REST hívással, amit az avro java objektumból nyer ki. Ha a séma egy futás alatt nem változik, akkor többször nem küldi fel a sémát a schema-registry-be. Az A producer az avro subject-et automatiksuan nevét automatikusan képezi a topoci nevéből. Tehát egy topoc-ba csak a kompatibilitási szabályoknak megfelelő sémáknak megfelelő üzeneteket lehet bedobaniberakni. Arra nincs mód, hogy bárhogyan is megadjuk, hogy az adott objektum melyik subject melyik verziójának kell hogy megfeleljen, ezt teljesen elfedi előlünk az API.
'''''Összefoglalva, egy adott Kafka topic-ba, amit kommunikációra használnunk (tehát nem a séma tárolására) csak Avro kompatibilis sémáknak megfelelő objektumokat lehet beküldeni. Nem azért mert a topic nem bírna el másik sémából gyártott bináris üzenetet, hanem azért, mert az Avro API a topic nevéből képzi a subject nevét, és egy subject-en belül csak kompatibilis sémákat lehet tárolni.'''''
</source>
A parancssori kafka-avro consumer segítségével fogjuk kiolvasni a java producer által küldött üzeneteket. Futtassuk le a java producer-t majd indítsuk el a parancssori consumer-t. Az avro consumer csak annyiban különbözök a sima parancssori consumer-től, hogy a séma regiszter címét is meg kell adni.
<pre>
./kafka-avro-console-consumer --topic test-topic --zookeeper 192.168.42.42:32181 --property schema.registry.url="http://schema-registry:8081"
Mikor Java-ból küldünk Avron-n keresztül Kafka üzeneteket, akkor a producer létre fog hozni a topic nevével prefixe-lt subjet-eket, egyet a Kafak kulcsnak és egyet a hozzá tartozó értéknek autómatikusanautomatikusan, az első üzenet váltás után. A fenit fenti példa futtatása után listázzuk ki az összes Avro-s subject-et:
<pre>
$ curl -X GET -H "Content-Type: application/vnd.schemaregistry.v1+json" http://192.168.42.42:8081/subjects/
</pre>
Láthatjuk, hogy létrehozott a '''test-topic ''' prefixel egy subject-et a valu-nak és a Kafka kulcsnak is.
=== Partition keys ===
A partíciós kulcsot nem muszáj Avro sémával megadni, ha nem összetett objektum, használhatjuk a beépített serializálókat, deserealizálókat. Láthattuk is, hogy a kulcs sémája egy darab Long típust tartalmazott.
</source>
A fenti módosítással példa futtatásakor már csak a value sémáját fogja elküldeni a Kafka-ba írás előtt a producer a séma regiszternek.
<pre>
2019-04-02 13:30:44 DEBUG RestService:118 - Sending POST with input {"schema":"{\"type\":\"record\",\"name\":\"Employee\",\"namespace\":\"hu.alerant.kafka.avro.message\",\"fields\":[{\"name\":\"firstName\",\"type\":\"string\"},{\"name\":\"lastName\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"int\"},{\"name\":\"phoneNumber\",\"type\":\"string\"}]}"} to http://schema-registry:8081/subjects/test-topic-value/versions
["test-topic-value","test-topic-key","test1"]
</pre>
 
<br>
<br>
==Command line producers==
https://docs.confluent.io/3.0.0/quickstart.html<br>
A Confluent oldaláról letölthető Kafka csomagban található parancssori kafka-avro producer és consumer is. Töltsük le a Confulent csomagot innen: https://www.confluent.io/download/
 
A '''kafka-avro-console-producer''' program a /bin mappában található. 4 paramétert kell kötelezően kitöltenünk:
* broker-list: itt meg kell adni a Kafka broker URL-jét
* topic: meg kell adni a Kafka topic nevét, ahova írja az üzeneteket
* property: itt fel tudunk sorolni tetszőleges paramétereket, nekünk itt kettőt kell kötelezően megadni:
** schema.registry.url: A séma regiszter elérhetősége
** value.schema: itt meg kell adni a használni kívánt sémát JSON formátumban (a Java producer esetén a séma bele van kódolva az Avro objektumokban, a példában az Employee objektum elején megtalálhatjuk a sémát. A sémát minden producer indulás elején felküldi a producer a séma regiszterbe, hogy ellenőrizze, hogy változott e vagy sem. )
 
 
Emlékezzünk rá, hogy az Employee séma az alábbi volt:
<source lang="C++">
{"namespace": "hu.alerant.kafka.avro.message",
"type": "record", "name": "Employee",
"fields": [
{"name": "firstName", "type": "string"},
{"name": "lastName", "type": "string"},
{"name": "age", "type": "int"},
{"name": "phoneNumber", "type": "string"}
]
}
</source>
Ezt majd meg kell adjuk egysoros alakban a '''kafka-avro-console-producer''' parancsban.
 
{{note|A '''kafka-avro-console-producer''' parancsban a konkrét Avro üzenetet nem lehet megadni. Miután kiadtuk a parancsot, az input-on fogja várni, hogy bírjuk JSON formátumban a sémának megfelelő üzenetet. Minden egyes Enter leütésre megpróbálja elküldeni amit az stdIn-re beírtunk}}
 
 
Az Employee séma használata mellett a parancs az alábbi:
<pre>
./kafka-avro-console-producer \
--broker-list kafka:29092 \
--topic test-topic \
--property schema.registry.url='http://schema-registry:8081' \
--property value.schema='{"namespace": "hu.alerant.kafka.avro.message", "type": "record", "name": "Employee", "fields": [{"name": "firstName", "type": "string"}, {"name": "lastName", "type": "string"}, {"name": "age", "type": "int"}, {"name": "phoneNumber", "type": "string"}]}'
</pre>
 
 
Ekkor várni fogja hogy egy sorba bírjuk az első átküldendő üzenetet JSON formában, ami megfelel a fenti sémának:
<pre>
{"firstName": "Adam", "lastName": "Berki", "age":20, "phoneNumber": "123456"}
</pre>
Ha bemásoltuk, akkor az Enter leütésével küldhetjük be az üzenetet. Ekkor a producer fel fogja küldeni a sémát a séma regiszterbe, majd az Avro segítségével serializálni fogja az üzenetet, majd a séma regisztertől kapott séma ID-t és a bináris üzenetet rá fogja rakni a test-topic-ra.
==Java Logbach producer==
==Command line consumer==
A command line producer-el megegyezően, szintén a Confluent oldaláról letölthető Kafka csomagban találhatjuk meg a command line kafka-avro consumer-t.
A '''kafka-avro-console-consumer''' program a /bin mappában található. Használata nagyon hasonlít a producer-re, 3 kötelező paramétere van:
* bootstrap-server: itt meg kell adni a Kafka broker URL-jét
* topic: meg kell adni a Kafka topic nevét, ahonnan olvassuk az üzeneteket
* property: itt fel tudunk sorolni tetszőleges consumer paramétereket, itt adhatjuk meg a séma regiszter URL-jét: schema.registry.url
 
Mikor a consumer üzenetet kap, el fog menni a séma regiszterhez, hogy letöltse az üzenetben kapott séma ID-hez tartozó sémát. Ez alapján fogja deseralizálni az üzentet. Indítsuk el a consumert-t, amjd a command line producer segítségével küldjünk bele Employee üzeneteket. Láthatjuk majd, hogy JSON formátumban meg fogjuk kapni az eredeti üzenetet.
<pre>
./kafka-avro-console-consumer \
--bootstrap-server kafka:29092 \
--topic test-topic \
--property schema.registry.url='http://schema-registry:8081'
...
{"firstName":"Adam","lastName":"Berki","age":20,"phoneNumber":"123456"}
</pre>
 
<br>
==Java consumer==
A Properties map-ben a szokásos Kafka specifikus paramétereken felül meg kell adjuk a séma regiszter URL-jét és a séma használatára vonatkozó beállításokat.
<source lang="java">
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
===Séma specifikus consumer===
Ha a KafkaAvroDeserializerConfig.'''SPECIFIC_AVRO_READER_CONFIG ''' értéke igaz, akkor a választ egy előre meghatározott objektum típusban várjukfogjuk visszakapni, a példában ez lesz a '''Employee.java'''
<source lang="java">
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
Küldjünk üzeneteket a test-topic-ra. Az üzenetek csak az Employee sémára illeszkedő objektumok lehetnek.
<pre>
2019-04-02 12:25:04 DEBUG AbstractCoordinator:822 - [Consumer clientId=consumer-1, groupId=KafkaExampleAvroConsumer] Received successful Heartbeat response
===Generikus consumer===
Ha a KafkaAvroDeserializerConfig.'''SPECIFIC_AVRO_READER_CONFIG ''' értéke hamis, akkor a választ a válasz paroszlására a '''GenericRecord''' nevű általános célú objektumot kell használni, amiből extra munkával lehet csak kinyerni az eredeti objektum mezőit, cserélbe nem kell séma specifikus consumer-t írni.
<source lang="java">
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "false");
</pre>
<br>
===Partition keys===
Akárcsak a producer esetén, a consumer-ben is használható nem Avro-s kulcs. Lényeg, hogy a consumer-ben ugyan azt a kulcs szerializációs eljárást kell használni, mint a producer-ben:
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.LongDeserializer.class);
</source>
 
 
<br>
==Logstash consumer==
Hasznos linkek:
https://www.elastic.co/guide/en/logstash/6.5/plugins-codecs-avro.html<br>
https://github.com/revpoint/logstash-codec-avro_schema_registry<br>
https://www.elastic.co/guide/en/logstash/current/plugins-codecs-avro.html<br>
 
A logstash-t tehetjük a Kafka elé és mögé is. Ha a Kafka mögé tesszük, pl. a Kafka és az Elasticsearh közé, akkor a logstash-nek kell elvégezni az Avro deszeralizációt az Elastichsearh-be való írás előtt. Az avro üzenetek feldolgozására nem képes önállóan a Logstash, szükség van egy megfelelő input avro-kafka-logstash plugin-re, ami el tudja végezni a deseralizációt.
 
:[[File:ClipCapIt-190419-125341.PNG]]
 
 
<br>
<br>
Sajnos az alap docker logstash image nem tartalmazza semmilyen avro támogatást. A plugin-ek listáját a '''bin/logstash-plugin list''' paranccsal ellenőrizhetjük. Láthatjuk, hogy az alap docker image nem tartalmazza az avro plugin-t.
<pre>
# docker run -it docker.elastic.co/logstash/logstash:6.6.2 bin/logstash-plugin list
logstash-codec-cef
logstash-codec-collectd
logstash-codec-dots
...
</pre>
Két lehetőségünk van. Egyik megoldás, hogy készítünk egy új image-et a logstash:6.6.2 image-ből kiindulva. Új plugin-t a '''bin/logstash-plugin install''' paranccsal installálhatunk. Ezt betehetjük egy Dockerfile-ba. A nehézség, hogy a swarm node-okra el kell juttatni a módosított image-t vagy saját repot kell használni.
A másik lehetőség, hogy a dockerhub-on keresünk olyan image-et amibe már installáltak avro plugin-t.
<pre>
bin/logstash-plugin install logstash-codec-avro
</pre>
https://github.com/revpoint/<br>===logstash avro plugin===Két AVRO codek érhető a logstash-codec-avro_schema_registryhez:
'''1.'''https://www.elastic.co/guide/en/logstash/current6.5/plugins-codecs-avro.html<br>Ez a codek nem kapcsolódik a séma regiszterhez, előre meghatározott séma fájl alapján tud dekódolni, amit a fájlrendszerben oda kell másoljunk logstash-be. Ha változik a séma, akkor kézzel ki kell cserélni. <br>
:[[File:ClipCapIt-190402-175630.PNG]]
'''2.'''
https://github.com/revpoint/logstash-codec-avro_schema_registry<br>
Ez a codek tud csatlakozni a séma regiszterhez. Tehát nem kell kézzel karbantartsuk a sémákat.
<pre>
bin/logstash-plugin install logstash-codec-avro_schema_registry
</pre>
 
 
 
<br>
A docker-hub-on elérhető két logstash image is, ami tartalmazza a logstash-codec-avro_shcema_registry plugin-t:
 
* https://hub.docker.com/r/rokasovo/logstash-avro2 --> ocker pull rokasovo/logstash-avro2
* https://hub.docker.com/r/slavirok/logstash-avro --> docker pull slavirok/logstash-avro
 
Listázzuk ki a második image-ben a plugine-eket. Láthatjuk, hogy az avro plugin köztük van.
<pre>
# docker run -it rokasovo/logstash-avro2 bin/logstash-plugin list
logstash-codec-avro_schema_registry
logstash-codec-cef
...
</pre>
 
<br>
===Swarm architektúra===
 
A swarm architektúrát bővíteni fogjuk a '''rokasovo/logstash-avro2 logstash''' komponenssel. A logstash a belső '''kafka-net''' overlay hálózaton fogja elérni a schema-registry-t. Az Elasticsearh-öt már nem tesszük be a swarm stack-be, a logstash által feldolgozott üzeneteket csak ki fogjuk loggolni:
:[[File:ClipCapIt-190419-201131.PNG]]
 
A logstash konfigurációját külső volume-ként fogjuk felcsatolni a netshare plugin használatával (részletek itt: https://wiki.berki.org/index.php/Docker_volume_orchestration)
<source lang="C++">
SCHEMA_REGISTRY_DEBUG: "true"
logstash:
image: docker.elastic.corokasovo/logstash/logstash:6.6.2-avro2
networks:
- kafka-net
</source>
<br>
===Logstash config===
A logstash image-en belül a konfigurációs fájl itt található: /usr/share/logstash/pipeline/'''logstash.conf'''. Ide kell felcsatolni a külső konfigurációs fájlt.<br>
A kafka input-ban a codec-nek meg kell adni a '''avro_schema_registry plugin'''-t, amit a '''rokasovo/logstash-avro2''' image már tartalmaz. Az '''endpoint''' paraméterben kell megadni a schema-registry url-jét. Fontos, hogy itt a belső, kafka-net overlay hálózati nevet adjuk meg, ami megegyezik a stack fájlban a service nevével. Ugyanis a service nevére egy stack-en belül a docker névfeloldást végez.
Valamiért a deserializációs osztálynak a '''ByteArrayDeserializer''' osztályt kell megadni, nem a '''KafkaAvroDeserializer''' osztályt (A KafkaAvroDeserializer-t nem tartalmazza az avro input plugin)
Az output-ban egyenlőre nem írjuk be Elasticsearch-be az üzeneteket, csak kiírjuk a log-ba.
<pre>
input {
kafka {
codec => avro_schema_registry {
endpoint => "http://schemaregistry:8081"
}
decorate_events => true
value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
 
topics => [
"test-topic"
]
bootstrap_servers => "kafka:29092"
group_id => "AvroConsumerGroupId"
client_id => "AvroConsumerClientId"
/usr/share/logstash/pipeline/'''logstash.conf'''
<pre>
input {
kafka {
bootstrap_servers => "kafka:29092"
topics => "test-topic"
codec => avro {
schema_uri => "/usr/share/logstash/pipeline/employee.avsc"
}
}
}
}
</pre>
 
Ha a helyére tettük a konfigurációs fájlt, akkor indítsuk el '''docker run''' paranccsal lokálisan felcsatolva a '''/usr/share/logstash/pipeline''' mappába a konfigurációs fájlt, hogy ki tudjuk külön próbálni, hogy a konfiguráció megfelelő e. Persze a Kafka-hoz nem fog tudni csatlakozni, de a szintaktikai hibákat tudjuk ellenőrizni.
<pre>
# docker run -d --name logstash --mount type=bind,source=/home/adam/dockerStore/logstash/config/,target=/usr/share/logstash/pipeline rokasovo/logstash-avro2
</pre>
 
<br>
===Futtatás===
Telepítsük fel a docker swarm stack-et:
<pre>
# docker stack deploy -c confluent_swarm_logstash.yaml confluent
Creating network confluent_kafka-net
Creating service confluent_logstash
Creating service confluent_zookeeper
Creating service confluent_kafka
Creating service confluent_schemaregistry
</pre>
 
 
Majd nézzük bele a logstash service logjába. Látnunk kell, hogy hozzá tudod csatlakozni a '''test-topic''' nevű topichoz. A séma regiszterhez csak az üzenet deszerializációja közben fog csatlakozni, ezért nem láthatjuk még a logban.
<pre>
# docker service logs -f confluent_logstash
...
[INFO ]Kafka version : 1.0.0
[INFO ]Kafka commitId : aaa7af6d4a11b29d
[INFO ][Consumer clientId=AvroConsumerClientId-0, groupId=AvroConsumerGroupId] Discovered coordinator kafka:29092 (id: 2147483645 rack: null)
[INFO ][Consumer clientId=AvroConsumerClientId-0, groupId=AvroConsumerGroupId] Revoking previously assigned partitions []
[INFO ][Consumer clientId=AvroConsumerClientId-0, groupId=AvroConsumerGroupId] (Re-)joining group
[INFO ][Consumer clientId=AvroConsumerClientId-0, groupId=AvroConsumerGroupId] Successfully joined group with generation 5
[INFO ][Consumer clientId=AvroConsumerClientId-0, groupId=AvroConsumerGroupId] Setting newly assigned partitions [test-topic-0]
</pre>
 
 
Majd indítsuk el a '''Java avro-kafak producer''' fejezetben leírt java producert, ami egy egy Employee objektumot fog beküldeni a test-topic-ba.
Emlékezzünk rá, hogy az Employee objektum sémája az alábbi:
<source lang="C++">
{"namespace": "hu.alerant.kafka.avro.message",
"type": "record", "name": "Employee",
"fields": [
{"name": "firstName", "type": "string"},
{"name": "lastName", "type": "string"},
{"name": "age", "type": "int"},
{"name": "phoneNumber", "type": "string"}
]
}
</source>
 
A java producer-ben az Employee objektum példányosítása az alábbi:
<source lang="java">
...
Producer<Long, Employee> producer = createProducer();
Employee bob = Employee.newBuilder().setAge(35)
.setFirstName("Bob")
.setLastName("Jones")
.setPhoneNumber("")
.build();
...
</source>
 
 
Miután a producer beküldte az avro üzenetet a Kafka test-topic-ba, a logstash logban meg kell jelenjen az alábbi üzenet:
<pre>
| {
| "@version" => "1",
| "age" => 35,
| "@timestamp" => 2019-04-19T10:39:30.266Z,
| "phoneNumber" => "",
| "firstName" => "Bob",
| "lastName" => "Jones"
| }
</pre>
Láthatjuk, hogy a logstash-avro plugin kiegészítette két meta adattal az üzenetet:
* @timestamp
* @version
 
 
A fenti konfig fájlban, az output egyszerű módosításával Elasticsarch-be írható az adat.