Changes

Apache Avro with Kafka

19,733 bytes added, 17:32, 27 March 2019
Created page with ":File:ClipCapIt-190327-180306.PNG =Bevezető= ==Mi az Avro?== ==Környezet== Az Avro futtatásához szükséges környezet egy két node-os swarm cluster lesz. <pr..."
:[[File:ClipCapIt-190327-180306.PNG]]


=Bevezető=

==Mi az Avro?==




==Környezet==
Az Avro futtatásához szükséges környezet egy két node-os swarm cluster lesz.
<pre>
# docker node ls
ID HOSTNAME STATUS AVAILABILITY MANAGER STATUS ENGINE VERSION
maigxlyagj1fl4sgcf6rnn9pc * mg0 Ready Active Leader 18.05.0-ce
vox99u5s1g1su742mc6npm370 worker0 Ready Active 18.05.0-ce
</pre>

Itt fogunk futtatni egy docker stack-et ami tartalmaz majd egy avro schema-registry-t, egy kafa brókert és egy zookeeper példányt.


:[[File:ClipCapIt-190327-183244.PNG]]




<source lang="C++">
version: '3.2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.1.2
networks:
- kafka-net
ports:
- "32181:32181"
deploy:
placement:
constraints:
- node.role == worker
environment:
ZOOKEEPER_CLIENT_PORT: 32181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_SYNC_LIMIT: 2
kafka:
image: confluentinc/cp-kafka:5.1.2
networks:
- kafka-net
ports:
- target: 29092
published: 29092
protocol: tcp
deploy:
placement:
constraints:
- node.role == worker
environment:
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:32181"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:29092"
KAFKA_BROKER_ID: 2
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
schemaregistry:
image: confluentinc/cp-schema-registry:5.1.2
networks:
- kafka-net
ports:
- target: 8081
published: 8081
protocol: tcp
deploy:
placement:
constraints:
- node.role == worker
environment:
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: "zookeeper:32181"
SCHEMA_REGISTRY_HOST_NAME: "schemaregistry"
SCHEMA_REGISTRY_DEBUG: "true"
networks:
kafka-net:
driver: overlay
</source>

Keressük meg a worker0 node IP címét:
<pre>
# docker-machine ip worker0
192.168.42.42
</pre>

Mivel mind a három komonensünknek egy-egy portját publikáltuk az ingress loadbalance-olt hálózatra, ezrét az összes node-on elérhetőek az adott portokon.
* zookeeper: 192.168.42.42:32181
* kafka: 192.168.42.42:29092
* schema-registry: 192.168.42.42:8081



==Avro kezelése==

Az Avro a '''_schemas''' nevű Kafka topic-ban tárolja a sémákat az alapértelmezett konfiguráció szerint. Tehát az AVRO schema-registry-nek szüksége van . . A Kafka /bin mappájában található '''kafka-topics.sh''' topic admin script-el listázzuk ki a topikokat:


<pre>
$ ./kafka-topics.sh --list --zookeeper 192.168.42.42:32181
__confluent.support.metrics
_schemas
</pre>


Az schma registry-vel a REST interfészén keresztül lehet kommunikálni. Próbáljuk ki a config paranccsal hogy elérhető e a server a host gépről.
<pre>
$ curl -X GET http://192.168.42.42:8081/config
{"compatibilityLevel":"BACKWARD"}
</pre>


A válaszban láthatjuk a kompatibilitási szintet.



Avro-ban minden sémát egy úgynevezett subject-ek alá kell regisztrálni. Egy subject alatt ugyan azon séma különböző verzióit tároljuk. Tehát két teljesen különböző sémát nem lehet ugyan azon a subject alá berakni. Tehát mikor hasonló sémákat regisztrálunk ugyan azon subject alá, akkor különböző verziók fognak létrejönni ugyan ahoz a sémához. Azt hogy mekkora a megengedett eltérés mértéke, a schema-registry server konfigurációja határozza meg.


The schema registry server can enforce certain compatibility rules when new schemas are registered in a subject. Currently, we support the following compatibility rules.

* '''Backward compatibility''' (default): A new schema is backward compatible if it can be used to read the data written in all previous schemas. Backward compatibility is useful for loading data into systems like Hadoop since one can always query data of all versions using the latest schema.

* '''Forward compatibility''': A new schema is forward compatible if all previous schemas can read data written in this schema. Forward compatibility is useful for consumer applications that can only deal with data in a particular version that may not always be the latest version.

* '''Full compatibility''': A new schema is fully compatible if it’s both backward and forward compatible.

* '''No compatibility''': A new schema can be any schema as long as it’s a valid Avro.







A sémákat a POST:/subjects/<subject-name>/versions REST interfészen kell beküldeni. A POST body-ban a {schema: "...séma definicíó..."} formátumban kell megadni a sémát, ahol a séma definíció egy escape-lt json.
<pre>
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema" : "... schema def..."}' http://192.168.42.42:8081/subjects/<subject-name>/versions
</pre>


Szúrjuk be az Avro-ba az alábbi '''Employee''' sémát. A namespace majd a schema-to-java kód generálásánál lesz érdekes, ez fogja meghatározni a java csomagot. A type mező mutatja meg, hogy összetett objektumumot vagy sima stringet, vagy tömböt ír le a séma. A '''record''' jelenti az összetett objektumot. Az '''Employee''' nevű objektum négy mezőből áll.
<source lang="xml">
{"namespace": "hu.alerant.kafka.avro.message",
"type": "record", "name": "Employee",
"fields": [
{"name": "firstName", "type": "string"},
{"name": "lastName", "type": "string"},
{"name": "age", "type": "int"},
{"name": "phoneNumber", "type": "string"}
]
}
</source>


Ennek az escape-elt változata:
<pre>
{\"namespace\": \"hu.alerant.kafka.avro.message\", \"type\": \"record\", \"name\": \"Employee\", \"fields\": [ {\"name\": \"firstName\", \"type\": \"string\"},{\"name\": \"lastName\", \"type\": \"string\"},{\"name\": \"age\", \"type\": \"int\"}, {\"name\": \"phoneNumber\", \"type\": \"string\"} ]}
</pre>


Szurjuk be a fenit sémát a '''test1''' subject alá:
<pre>
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema" : "{\"namespace\": \"hu.alerant.kafka.avro.message\", \"type\": \"record\", \"name\": \"Employee\", \"fields\": [ {\"name\": \"firstName\", \"type\": \"string\"},{\"name\": \"lastName\", \"type\": \"string\"},{\"name\": \"age\", \"type\": \"int\"}, {\"name\": \"phoneNumber\", \"type\": \"string\"} ]}"}' http://192.168.42.42:8081/subjects/test1/versions

{"id":1}
</pre>

A válaszban visszakaptuk a séma példány egyedi azonosítóját. Ez nem összekeverendő a séma verziójával. Tehát a '''test1''' subject-en ugyan annak a sémának több verziója is lehet, de globálsan, ennek s séma példánynak az ozonosítója = 1




Most próbáljunk az előbbitől tejesen különböző '''Company''' sémát regisztrálni szintán a '''test1''' subject alá.
<source lang="xml">
{"namespace": "hu.alerant.kafka.avro.message",
"type": "record", "name": "Company",
"fields": [
{"name": "name", "type": "string"},
{"name": "address", "type": "string"},
{"name": "employCount", "type": "int"},
{"name": "phoneNumber", "type": "string"}
]
}
</source>

Ennek az escape-elt változata az alábbi.
<pre>
{\"namespace\": \"hu.alerant.kafka.avro.message\", \"type\": \"record\", \"name\": \"Company\", \"fields\": [ {\"name\": \"name\", \"type\": \"string\"}, {\"name\": \"address\", \"type\": \"string\"}, {\"name\": \"employCount\", \"type\": \"int\"}, {\"name\": \"phoneNumber\", \"type\": \"string\"} ]}
</pre>


A '''Company''' sémát szúrjuk be szintén az '''test1''' subject alá.
<pre>
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema" : "{\"namespace\": \"hu.alerant.kafka.avro.message\", \"type\": \"record\", \"name\": \"Company\", \"fields\": [ {\"name\": \"name\", \"type\": \"string\"}, {\"name\": \"address\", \"type\": \"string\"}, {\"name\": \"employCount\", \"type\": \"int\"}, {\"name\": \"phoneNumber\", \"type\": \"string\"} ]}"}' http://192.168.42.42:8081/subjects/test1/versions

{"error_code":409,"message":"Schema being registered is incompatible with an earlier schema ...}
</pre>

Láthatjuk, hogy nem engedte az Avro a '''Company''' sémát regisztrálni a '''test1''' subject alá, mert túl nagy volt az eltérés a '''Company''' és a '''Employee''' sémák között.



Láthattuk a /config lekérdezésben, hogy jelenleg a beállított kompatibilitási szint '''BACKWARD''', ami azt jelenti, hogy csak olyan sémákat lehet beszúrni ugyan azon subject alá, amivel az összes korábban beszúrt adatot ki lehet olvasni, magyarán csak olyan sémákat lehet egymás után beszúrni, ami részhalmaza az előző sémának.

Most szúrjuk be az '''Employee''' sémának egy redukált változatát, amiből hiányzik a '''phoneNumber''' mező. Erre teljesűl hogy visszafelé komatibilis.
<source lang="xml">
{"namespace": "hu.alerant.kafka.avro.message",
"type": "record", "name": "Employee",
"fields": [
{"name": "firstName", "type": "string"},
{"name": "lastName", "type": "string"},
{"name": "age", "type": "int"}
]
}
</source>

Ennek az escape-elt változata az alábbi:
<pre>
{\"namespace\": \"hu.alerant.kafka.avro.message\",\"type\": \"record\", \"name\": \"Employee\", \"fields\": [ {\"name\": \"firstName\", \"type\": \"string\"}, {\"name\": \"lastName\", \"type\": \"string\"}, {\"name\": \"age\", \"type\": \"int\"}]}
</pre>



Szúrjuk ezt be szintén a '''test1''' subject alá:
<pre>
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema" : "{\"namespace\": \"hu.alerant.kafka.avro.message\",\"type\": \"record\", \"name\": \"Employee\", \"fields\": [ {\"name\": \"firstName\", \"type\": \"string\"}, {\"name\": \"lastName\", \"type\": \"string\"}, {\"name\": \"age\", \"type\": \"int\"}]}"}' http://192.168.42.42:8081/subjects/test1/versions

{"id":2}
</pre>
Láthatjuk, hogy az új séma példány egyedi azonosítója 2.


Most listázzuk ki a '''test1''' subject-en belül az összes sémát:
<pre>
$ curl -X GET -H "Content-Type: application/vnd.schemaregistry.v1+json" http://192.168.42.42:8081/subjects/test1/versions

[1,2]
</pre>
Láthatjuk, hogy két verziója van elmentve a sémának, amiknek a globális azonosítója 1 és 2.



Ha a /versions/ után odaírjuk a verzió számot is, akkor visszaadja a teljes sémát:
<pre>
$ curl -X GET -H "Content-Type: application/vnd.schemaregistry.v1+json" http://192.168.42.42:8081/subjects/test1/versions/1

{"subject":"test1","version":1,"id":1,"schema":"{\"type\":\"record\",\"name\":\"Employee\",\"namespace\":\"hu.alerant.kafka.avro.message\",\"fields\":[{\"name\":\"firstName\",\"type\":\"string\"},{\"name\":\"lastName\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"int\"},{\"name\":\"phoneNumber\",\"type\":\"string\"}]}"}
</pre>



==Java kód generálás==

Van egy maven plugin, amivel a sémából ki lehet genrálni az Avro-s java osztályokat, amiket majd használni tudunk mind a java producer és consumer-ben.
A fenit .xml sémákat tegyük be a /schemas/ mappába .avsc kiterjesztésben:

* Employee.avsc
* Company.avsc


A forrást a /src/main/java/ mappába fogja tenni. Az avro által generált java osztály package a sémában lévő '''namespace''' értéke lesz.

pom.xml
<source lang="xml">
<!-- Avro code generator -->

<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-compiler</artifactId>
<version>1.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-ipc</artifactId>
<version>1.8.2</version>
</dependency>


<!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.2</version>
</dependency>
....
<build>
<plugins>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.8.2</version>
<executions>
<execution>
<id>schemas</id>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
<goal>protocol</goal>
<goal>idl-protocol</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/schemas/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</source>

Buildelés:

<pre>
$ mvn install
[INFO] Scanning for projects...
[INFO]
[INFO] -----------------------------< kafka:avro >-----------------------------
[INFO] Building avro 0.0.1-SNAPSHOT
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
[INFO] --- avro-maven-plugin:1.8.2:schema (schemas) @ avro ---
[INFO]
[INFO] --- avro-maven-plugin:1.8.2:protocol (schemas) @ avro ---
</pre>



A generált osztályba az Avro belegenrálja a sémát is, ez az amit majd a Kafak topic-ba dobás előtt a producer fel fog küldeni a schema-register servernek:
<source lang="java">
package hu.alerant.kafka.avro.message;

import org.apache.avro.specific.SpecificData;
import org.apache.avro.message.BinaryMessageEncoder;
import org.apache.avro.message.BinaryMessageDecoder;
import org.apache.avro.message.SchemaStore;


@org.apache.avro.specific.AvroGenerated
public class Employee extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {

public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("
{\"type\":\"record\",\"name\":\"Employee\",\"namespace\":\"hu.alerant.kafka.avro.message\",
\"fields\":[{\"name\":\"firstName\",\"type\":\"string\"},{\"name\":\"lastName\",\"type\":\"string\"},
{\"name\":\"age\",\"type\":\"int\"},{\"name\":\"phoneNumber\",\"type\":\"string\"}]}");

....
</source>






==Java avro producer==


Ahogy a Kafka /bin mappában elérhető volt parancssori producer és consumer, úgy a schema-registry-ben elérhető avro-s producer és consumer. Töltsük le a schema-registry binárist, és menjünk a bin mappába.
<pre>
$ wget http://packages.confluent.io/archive/1.0/confluent-1.0.1-2.10.4.zip
$ unzip confluent-1.0.1-2.10.4.zip
$ cd confluent-1.0.1
</pre>


A hagyományos Kafka producer-hez képest csak pár különbség van az inicializálásban. Egyrészt meg kell adni, hogy mind a kulcsot, mind az üzenetet Avro-val akarjuk serializálni, másrészt meg kell adni az Avro schema-registry URL-jét. A /etc/hosts fájlba felvettük a worker0 swarm node IP címével a '''schema-registry''' host nevet.
<source lang="java">
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);

props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://schema-registry:8081");
</source>


Minden alkalommal, mikor a producer be akar dobni egy üzenetet a Kafka topic-ba, felküldi a sémát a már látott POST:http://192.168.42.42:8081/subjects/<subject-name>/versions REST hívással, amit az avro java objektumból nyer ki. Az avro subject-et automatiksuan képezi a topoci nevéből. Tehát egy topoc-ba csak a kompatibilitási szabályoknak megfelelő sémáknak megfelelő üzeneteket lehet bedobani. Arra nincs mód, hogy márhogyan is megadjuk, hogy az adott objektum melyik subject melyik verziójának kell hogy mefeleljen, ezt teljesen elfedi előlünk az API.

!!!Azt még meg kell nézni, hogy a producer csak egyszer vagy tényleg mindig felküldi a sémát!!!

<source lang="java">
package hu.alerant.kafka.avro;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;

import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.LongSerializer;

import hu.alerant.kafka.avro.message.Employee;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.util.Properties;
import java.util.stream.IntStream;

public class AvroProducer {

private static Producer<Long, Employee> createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://schema-registry:8081");

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:29092");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "AvroProducer");

return new KafkaProducer<Long, Employee>(props);

}

private final static String TOPIC = "test-topic";
public static void main(String... args) {

Producer<Long, Employee> producer = createProducer();
Employee bob = Employee.newBuilder().setAge(35)
.setFirstName("Bob")
.setLastName("Jones")
.setPhoneNumber("")
.build();

producer.send(new ProducerRecord<>(TOPIC, new Long("123456778"), bob));
producer.flush();
producer.close();
}
}
</source>


<pre>
./kafka-avro-console-consumer --topic test-topic --zookeeper 192.168.42.42:32181 --property schema.registry.url="http://schema-registry:8081"
SLF4J: Class path contains multiple SLF4J bindings.
...


{"firstName":"Bob","lastName":"Jones","age":35,"phoneNumber":""}
</pre>


Mikor Java-ból küldünk Avron-n keresztül Kafka üzeneteket, akkor létre fog hozni a topic nevével prefixe-lt subjet-eket, egyet a Kafak kulcsnak és egyet a hozzá tartozó értéknek autómatikusan, az első üzenet váltás után. A fenit példa futtatása után listázzuk ki az összes Avro-s subject-et:
<pre>
$ curl -X GET -H "Content-Type: application/vnd.schemaregistry.v1+json" http://192.168.42.42:8081/subjects/

["test-topic-value","test-topic-key","test1"]
</pre>

Láthatjuk, hogy létrehozott a test-topic prefixel egy subject-et a valu-nak és a Kafka kulcsnak is.


A log-ban láthatjuk, hogy két POST kéréssel a kliens beküldte a schema-registry-nek a kulcs és a value sémáját:
<pre>
2019-03-26 17:38:50 DEBUG RestService:118 - Sending POST with input {"schema":"\"long\""} to http://schema-registry:8081/subjects/test-topic-key/versions
2019-03-26 17:38:50 DEBUG RestService:118 - Sending POST with input {"schema":"{\"type\":\"record\",\"name\":\"Employee\",\"namespace\":\"hu.alerant.kafka.avro.message\",\"fields\":[{\"name\":\"firstName\",\"type\":\"string\"},{\"name\":\"lastName\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"int\"},{\"name\":\"phoneNumber\",\"type\":\"string\"}]}"} to http://schema-registry:8081/subjects/test-topic-value/versions
</pre>




KafkaAvroSerializerConfig:

SCHEMA_REGISTRY_URL_DOC = "Comma-separated list of URLs for schema registry instances that can be used to register or look up schemas.";
SCHEMA_REGISTRY_URL_CONFIG, "http://schema-registry:8081"