Apache Avro with Kafka
Bevezető
Mi az Avro?
Az Avro egy nyílt forráskódú project, ami egy adata szerializációs szolgáltatás elsősorban az Apache Hadoop-hoz, de nem csak a Hadoop-ban használható, ahogy a mi példánkban is látni fogjuk. Av Avro segítségével nagyon hatékonyan cserélhetünk adatokat két végpont között "big data" környezetben.
Az Avro alapja egy séma regiszter, mihez mind az adat szerializáló és deszerializáló szolgáltatás kapcsolódik. Itt tárolja az Avro a serializálnadó adatok tervrajzát JSON formátumban, a sémákat verziózva. Amikor a serializáló szolgáltatás adatot akar küldeni, akkor megjelöli az Avro serializátornak hogy melyik séma alapján serializálja a küldendő adatot. Ha a séma még nem létezett, akkor beszúrja a séma regiszterbe. Az Avro binárist készít a séma segítségével a küldendő adatból, és az üzenetbe a bináris adat mellé beleteszi a séma azonosítóját is, amit a deszerializiós szolgáltatás megkap, és annak segítségével ki tudja olvasni az adat deszerializálásához szükséges sémát, ami segítségével előállítja az eredeti üzenetet.
Az Avro séma regiszter több verziót is képes kezelni egy sémából. A beállításoknak megfelelően a séma lehet előre vagy visszafelé kompatibilis. Ha egy séma visszafelé kompatibilis, akkor az új sémával is ki lehet olvasni olyan régi adatokat, amit még egy korábbi sémával írtak be.
Az Avor sémákat JSON formátumban kell leírni, és egyedi, kötött szintaktikájuk van, tehát nem szabványos JSON sémák. A schema-registry-vel egy REST API-n keresztül lehet kommunikálni. A legtöbb nyelven elérhető Avro magas szintű API, ami elfedi előlünk a REST kommunikációt. Az Avró többféle adatbázisban is tárolhatja a sémákat, de a leggyakoribb megoldás, hogy egy speciális Kafka topic-ban tárolja azokat.
Az Avro-t gyakran használják a Kafa kommunikációban mint serlializációs szólgáltatás. Mi is így fogjuk használni:
A Kafa produceren a Avro serializáló beküldi a sémát a séma regiszterbe, aminek visszakapja az ID-ját. Majd a séma alapján serializálja az adatokat és gyárt belőle egy Avro üzenetet, amiben benne van a séma azonosító és binárisan az üzenet, ami így nagyon kicsi helyet foglal. Ez kerül fel a megfelelő Kafka topic-ra. A Kafka consumer az üzenetben lévő séma ID alapján lekérdezi a sémát a registry-ből, majd annak segítségével deserializálja az üzenetet.
Ahogy azt már írtam, az Avro schema-registry többféle adatbázisban is képes tárolni a sémákat, köztük Kafka-ban is. Kafka estén az Avro regiszter egy kitüntetett topic-ban tárolja a sémákat (_schemas). Ha a kommunikációra is Kafka-t használunk, akkor használhatjuk akár ugyan azt a Kafka broker-t mind két célra, ahogy az alábbi ábrán is látható:
Környezet
Az Avro futtatásához szükséges környezet egy két node-os swarm cluster lesz.
# virsh list Id Name State ---------------------------------------------------- 1 mg0 running 2 worker0 running
# docker node ls ID HOSTNAME STATUS AVAILABILITY MANAGER STATUS ENGINE VERSION maigxlyagj1fl4sgcf6rnn9pc * mg0 Ready Active Leader 18.05.0-ce vox99u5s1g1su742mc6npm370 worker0 Ready Active 18.05.0-ce
Itt fogunk futtatni egy docker stack-et ami tartalmaz majd egy avro schema-registry-t, egy kafa brókert és egy zookeeper példányt.
version: '3.2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.1.2
networks:
- kafka-net
ports:
- "32181:32181"
deploy:
placement:
constraints:
- node.role == worker
environment:
ZOOKEEPER_CLIENT_PORT: 32181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_SYNC_LIMIT: 2
kafka:
image: confluentinc/cp-kafka:5.1.2
networks:
- kafka-net
ports:
- target: 29092
published: 29092
protocol: tcp
deploy:
placement:
constraints:
- node.role == worker
environment:
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:32181"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:29092"
KAFKA_BROKER_ID: 2
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
schemaregistry:
image: confluentinc/cp-schema-registry:5.1.2
networks:
- kafka-net
ports:
- target: 8081
published: 8081
protocol: tcp
deploy:
placement:
constraints:
- node.role == worker
environment:
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: "zookeeper:32181"
SCHEMA_REGISTRY_HOST_NAME: "schemaregistry"
SCHEMA_REGISTRY_DEBUG: "true"
networks:
kafka-net:
driver: overlay
Keressük meg a worker0 node IP címét:
# docker-machine ip worker0 192.168.42.42
Mivel mind a három komonensünknek egy-egy portját publikáltuk az ingress loadbalance-olt hálózatra, ezrét az összes node-on elérhetőek az adott portokon.
- zookeeper: 192.168.42.42:32181
- kafka: 192.168.42.42:29092
- schema-registry: 192.168.42.42:8081
Avro REST interfész
Az Avro a _schemas nevű Kafka topic-ban tárolja a sémákat az alapértelmezett konfiguráció szerint. Tehát az AVRO schema-registry-nek szüksége van . . A Kafka /bin mappájában található kafka-topics.sh topic admin script-el listázzuk ki a topikokat:
$ ./kafka-topics.sh --list --zookeeper 192.168.42.42:32181 __confluent.support.metrics _schemas
Az schma registry-vel a REST interfészén keresztül lehet kommunikálni. Próbáljuk ki a config paranccsal hogy elérhető e a server a host gépről.
$ curl -X GET http://192.168.42.42:8081/config {"compatibilityLevel":"BACKWARD"}
A válaszban láthatjuk a kompatibilitási szintet.
Avro-ban minden sémát egy úgynevezett subject-ek alá kell regisztrálni. Egy subject alatt ugyan azon séma különböző verzióit tároljuk. Tehát két teljesen különböző sémát nem lehet ugyan azon a subject alá berakni. Tehát mikor hasonló sémákat regisztrálunk ugyan azon subject alá, akkor különböző verziók fognak létrejönni ugyan ahoz a sémához. Azt hogy mekkora a megengedett eltérés mértéke, a schema-registry server konfigurációja határozza meg.
The schema registry server can enforce certain compatibility rules when new schemas are registered in a subject. Currently, we support the following compatibility rules.
- Backward compatibility (default): A new schema is backward compatible if it can be used to read the data written in all previous schemas. Backward compatibility is useful for loading data into systems like Hadoop since one can always query data of all versions using the latest schema.
- Forward compatibility: A new schema is forward compatible if all previous schemas can read data written in this schema. Forward compatibility is useful for consumer applications that can only deal with data in a particular version that may not always be the latest version.
- Full compatibility: A new schema is fully compatible if it’s both backward and forward compatible.
- No compatibility: A new schema can be any schema as long as it’s a valid Avro.
A sémákat a POST:/subjects/<subject-name>/versions REST interfészen kell beküldeni. A POST body-ban a {schema: "...séma definicíó..."} formátumban kell megadni a sémát, ahol a séma definíció egy escape-lt json.
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema" : "... schema def..."}' http://192.168.42.42:8081/subjects/<subject-name>/versions
Szúrjuk be az Avro-ba az alábbi Employee sémát. A namespace majd a schema-to-java kód generálásánál lesz érdekes, ez fogja meghatározni a java csomagot. A type mező mutatja meg, hogy összetett objektumumot vagy sima stringet, vagy tömböt ír le a séma. A record jelenti az összetett objektumot. Az Employee nevű objektum négy mezőből áll.
{"namespace": "hu.alerant.kafka.avro.message",
"type": "record", "name": "Employee",
"fields": [
{"name": "firstName", "type": "string"},
{"name": "lastName", "type": "string"},
{"name": "age", "type": "int"},
{"name": "phoneNumber", "type": "string"}
]
}
Ennek az escape-elt változata:
{\"namespace\": \"hu.alerant.kafka.avro.message\", \"type\": \"record\", \"name\": \"Employee\", \"fields\": [ {\"name\": \"firstName\", \"type\": \"string\"},{\"name\": \"lastName\", \"type\": \"string\"},{\"name\": \"age\", \"type\": \"int\"}, {\"name\": \"phoneNumber\", \"type\": \"string\"} ]}
Szurjuk be a fenit sémát a test1 subject alá:
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema" : "{\"namespace\": \"hu.alerant.kafka.avro.message\", \"type\": \"record\", \"name\": \"Employee\", \"fields\": [ {\"name\": \"firstName\", \"type\": \"string\"},{\"name\": \"lastName\", \"type\": \"string\"},{\"name\": \"age\", \"type\": \"int\"}, {\"name\": \"phoneNumber\", \"type\": \"string\"} ]}"}' http://192.168.42.42:8081/subjects/test1/versions {"id":1}
A válaszban visszakaptuk a séma példány egyedi azonosítóját. Ez nem összekeverendő a séma verziójával. Tehát a test1 subject-en ugyan annak a sémának több verziója is lehet, de globálsan, ennek s séma példánynak az ozonosítója = 1
Most próbáljunk az előbbitől tejesen különböző Company sémát regisztrálni szintán a test1 subject alá.
{"namespace": "hu.alerant.kafka.avro.message",
"type": "record", "name": "Company",
"fields": [
{"name": "name", "type": "string"},
{"name": "address", "type": "string"},
{"name": "employCount", "type": "int"},
{"name": "phoneNumber", "type": "string"}
]
}
Ennek az escape-elt változata az alábbi.
{\"namespace\": \"hu.alerant.kafka.avro.message\", \"type\": \"record\", \"name\": \"Company\", \"fields\": [ {\"name\": \"name\", \"type\": \"string\"}, {\"name\": \"address\", \"type\": \"string\"}, {\"name\": \"employCount\", \"type\": \"int\"}, {\"name\": \"phoneNumber\", \"type\": \"string\"} ]}
A Company sémát szúrjuk be szintén az test1 subject alá.
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema" : "{\"namespace\": \"hu.alerant.kafka.avro.message\", \"type\": \"record\", \"name\": \"Company\", \"fields\": [ {\"name\": \"name\", \"type\": \"string\"}, {\"name\": \"address\", \"type\": \"string\"}, {\"name\": \"employCount\", \"type\": \"int\"}, {\"name\": \"phoneNumber\", \"type\": \"string\"} ]}"}' http://192.168.42.42:8081/subjects/test1/versions {"error_code":409,"message":"Schema being registered is incompatible with an earlier schema ...}
Láthatjuk, hogy nem engedte az Avro a Company sémát regisztrálni a test1 subject alá, mert túl nagy volt az eltérés a Company és a Employee sémák között.
Láthattuk a /config lekérdezésben, hogy jelenleg a beállított kompatibilitási szint BACKWARD, ami azt jelenti, hogy csak olyan sémákat lehet beszúrni ugyan azon subject alá, amivel az összes korábban beszúrt adatot ki lehet olvasni, magyarán csak olyan sémákat lehet egymás után beszúrni, ami részhalmaza az előző sémának.
Most szúrjuk be az Employee sémának egy redukált változatát, amiből hiányzik a phoneNumber mező. Erre teljesűl hogy visszafelé komatibilis.
{"namespace": "hu.alerant.kafka.avro.message",
"type": "record", "name": "Employee",
"fields": [
{"name": "firstName", "type": "string"},
{"name": "lastName", "type": "string"},
{"name": "age", "type": "int"}
]
}
Ennek az escape-elt változata az alábbi:
{\"namespace\": \"hu.alerant.kafka.avro.message\",\"type\": \"record\", \"name\": \"Employee\", \"fields\": [ {\"name\": \"firstName\", \"type\": \"string\"}, {\"name\": \"lastName\", \"type\": \"string\"}, {\"name\": \"age\", \"type\": \"int\"}]}
Szúrjuk ezt be szintén a test1 subject alá:
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema" : "{\"namespace\": \"hu.alerant.kafka.avro.message\",\"type\": \"record\", \"name\": \"Employee\", \"fields\": [ {\"name\": \"firstName\", \"type\": \"string\"}, {\"name\": \"lastName\", \"type\": \"string\"}, {\"name\": \"age\", \"type\": \"int\"}]}"}' http://192.168.42.42:8081/subjects/test1/versions {"id":2}
Láthatjuk, hogy az új séma példány egyedi azonosítója 2.
Most listázzuk ki a test1 subject-en belül az összes sémát:
$ curl -X GET -H "Content-Type: application/vnd.schemaregistry.v1+json" http://192.168.42.42:8081/subjects/test1/versions [1,2]
Láthatjuk, hogy két verziója van elmentve a sémának, amiknek a globális azonosítója 1 és 2.
Ha a /versions/ után odaírjuk a verzió számot is, akkor visszaadja a teljes sémát:
$ curl -X GET -H "Content-Type: application/vnd.schemaregistry.v1+json" http://192.168.42.42:8081/subjects/test1/versions/1 {"subject":"test1","version":1,"id":1,"schema":"{\"type\":\"record\",\"name\":\"Employee\",\"namespace\":\"hu.alerant.kafka.avro.message\",\"fields\":[{\"name\":\"firstName\",\"type\":\"string\"},{\"name\":\"lastName\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"int\"},{\"name\":\"phoneNumber\",\"type\":\"string\"}]}"}
Java kód generálás
Van egy maven plugin, amivel a sémából ki lehet genrálni az Avro-s java osztályokat, amiket majd használni tudunk mind a java producer és consumer-ben. A fenit .xml sémákat tegyük be a /schemas/ mappába .avsc kiterjesztésben:
- Employee.avsc
- Company.avsc
A forrást a /src/main/java/ mappába fogja tenni. Az avro által generált java osztály package a sémában lévő namespace értéke lesz.
pom.xml
<!-- Avro code generator -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-compiler</artifactId>
<version>1.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-ipc</artifactId>
<version>1.8.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.2</version>
</dependency>
....
<build>
<plugins>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.8.2</version>
<executions>
<execution>
<id>schemas</id>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
<goal>protocol</goal>
<goal>idl-protocol</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/schemas/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
Buildelés:
$ mvn install [INFO] Scanning for projects... [INFO] [INFO] -----------------------------< kafka:avro >----------------------------- [INFO] Building avro 0.0.1-SNAPSHOT [INFO] --------------------------------[ jar ]--------------------------------- [INFO] [INFO] --- avro-maven-plugin:1.8.2:schema (schemas) @ avro --- [INFO] [INFO] --- avro-maven-plugin:1.8.2:protocol (schemas) @ avro ---
A generált osztályba az Avro belegenrálja a sémát is, ez az amit majd a Kafak topic-ba dobás előtt a producer fel fog küldeni a schema-register servernek:
package hu.alerant.kafka.avro.message;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.message.BinaryMessageEncoder;
import org.apache.avro.message.BinaryMessageDecoder;
import org.apache.avro.message.SchemaStore;
@org.apache.avro.specific.AvroGenerated
public class Employee extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("
{\"type\":\"record\",\"name\":\"Employee\",\"namespace\":\"hu.alerant.kafka.avro.message\",
\"fields\":[{\"name\":\"firstName\",\"type\":\"string\"},{\"name\":\"lastName\",\"type\":\"string\"},
{\"name\":\"age\",\"type\":\"int\"},{\"name\":\"phoneNumber\",\"type\":\"string\"}]}");
....
Producers
Command line producers
Java avro-kafak producer
Ahogy a Kafka /bin mappában elérhető volt parancssori producer és consumer, úgy a schema-registry-ben elérhető avro-s producer és consumer. Töltsük le a schema-registry binárist, és menjünk a bin mappába.
$ wget http://packages.confluent.io/archive/1.0/confluent-1.0.1-2.10.4.zip $ unzip confluent-1.0.1-2.10.4.zip $ cd confluent-1.0.1
A hagyományos Kafka producer-hez képest csak pár különbség van az inicializálásban. Egyrészt meg kell adni, hogy mind a kulcsot, mind az üzenetet Avro-val akarjuk serializálni, másrészt meg kell adni az Avro schema-registry URL-jét. A /etc/hosts fájlba felvettük a worker0 swarm node IP címével a schema-registry host nevet.
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://schema-registry:8081");
Első alkalommal, mikor a producer be akar dobni egy üzenetet a Kafka topic-ba, felküldi a sémát a már látott POST:http://192.168.42.42:8081/subjects/<subject-name>/versions REST hívással, amit az avro java objektumból nyer ki. Ha a séma egy futás alatt nem változik, akkor többször nem küldi fel a sémát a schema-registry-be. Az avro subject-et automatiksuan képezi a topoci nevéből. Tehát egy topoc-ba csak a kompatibilitási szabályoknak megfelelő sémáknak megfelelő üzeneteket lehet bedobani. Arra nincs mód, hogy bárhogyan is megadjuk, hogy az adott objektum melyik subject melyik verziójának kell hogy megfeleljen, ezt teljesen elfedi előlünk az API.
Összefoglalva, egy adott Kafka topic-ba, amit kommunikációra használnunk (tehát nem a séma tárolására) csak Avro kompatibilis sémáknak megfelelő objektumokat lehet beküldeni. Nem azért mert a topic nem bírna el másik sémából gyártott bináris üzenetet, hanem azért, mert az Avro API a topic nevéből képzi a subject nevét, és egy subject-en belül csak kompatibilis sémákat lehet tárolni.
package hu.alerant.kafka.avro;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.LongSerializer;
import hu.alerant.kafka.avro.message.Employee;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.util.Properties;
import java.util.stream.IntStream;
public class AvroProducer {
private static Producer<Long, Employee> createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://schema-registry:8081");
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:29092");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "AvroProducer");
return new KafkaProducer<Long, Employee>(props);
}
private final static String TOPIC = "test-topic";
public static void main(String... args) {
Producer<Long, Employee> producer = createProducer();
Employee bob = Employee.newBuilder().setAge(35)
.setFirstName("Bob")
.setLastName("Jones")
.setPhoneNumber("")
.build();
producer.send(new ProducerRecord<>(TOPIC, new Long("123456778"), bob));
producer.flush();
producer.close();
}
}
./kafka-avro-console-consumer --topic test-topic --zookeeper 192.168.42.42:32181 --property schema.registry.url="http://schema-registry:8081" SLF4J: Class path contains multiple SLF4J bindings. ... {"firstName":"Bob","lastName":"Jones","age":35,"phoneNumber":""}
Mikor Java-ból küldünk Avron-n keresztül Kafka üzeneteket, akkor létre fog hozni a topic nevével prefixe-lt subjet-eket, egyet a Kafak kulcsnak és egyet a hozzá tartozó értéknek autómatikusan, az első üzenet váltás után. A fenit példa futtatása után listázzuk ki az összes Avro-s subject-et:
$ curl -X GET -H "Content-Type: application/vnd.schemaregistry.v1+json" http://192.168.42.42:8081/subjects/ ["test-topic-value","test-topic-key","test1"]
Láthatjuk, hogy létrehozott a test-topic prefixel egy subject-et a valu-nak és a Kafka kulcsnak is.
A log-ban láthatjuk, hogy két POST kéréssel a kliens beküldte a schema-registry-nek a kulcs és a value sémáját:
2019-03-26 17:38:50 DEBUG RestService:118 - Sending POST with input {"schema":"\"long\""} to http://schema-registry:8081/subjects/test-topic-key/versions 2019-03-26 17:38:50 DEBUG RestService:118 - Sending POST with input {"schema":"{\"type\":\"record\",\"name\":\"Employee\",\"namespace\":\"hu.alerant.kafka.avro.message\",\"fields\":[{\"name\":\"firstName\",\"type\":\"string\"},{\"name\":\"lastName\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"int\"},{\"name\":\"phoneNumber\",\"type\":\"string\"}]}"} to http://schema-registry:8081/subjects/test-topic-value/versions
Partition keys
A partíciós kulcsot nem muszáj Avro sémával megadni, ha nem összetett objektum, használhatjuk a beépített serializálókat, deserealizálókat. Láthattuk is, hogy a kulcs sémája egy darab Long típust tartalmazott.
{"schema":"\"long\""}
A fenti példában a kulcs értéke Long, ezért használhatjuk egyszerűen a LongSerializer osztályt.
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.LongSerializer.class);
A fenti módosítással már csak a value sémáját fogja elküldeni a Kafka-ba írás előtt a producer a séma regiszternek.
2019-04-02 13:30:44 DEBUG RestService:118 - Sending POST with input {"schema":"{\"type\":\"record\",\"name\":\"Employee\",\"namespace\":\"hu.alerant.kafka.avro.message\",\"fields\":[{\"name\":\"firstName\",\"type\":\"string\"},{\"name\":\"lastName\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"int\"},{\"name\":\"phoneNumber\",\"type\":\"string\"}]}"} to http://schema-registry:8081/subjects/test-topic-value/versions
Ez avro schema-registry szintjén azt jelenti, hogy a test-topic-key subject-et nem fogja használni/létrehozni.
$ curl -X GET -H "Content-Type: application/vnd.schemaregistry.v1+json" http://192.168.42.42:8081/subjects/ ["test-topic-value","test-topic-key","test1"]
Java Logbach producer
Consumers
Java consumer
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://schema-registry:8081");
Séma specifikus consumer
Ha a KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG értéke igaz, akkor a választ egy előre meghatározott objektum típusban várjuk, a példában ez lesz a Employee.java
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
Mikor a consumer-t példányosítjuk, már ott meg kell adni, hogy mi az az Avro típus, amit válaszként várunk. Majd mikor elkérjük a consumer-től az üzenetet, akkor is pontosan meg kell adni a típust.
Consumer<Long, Employee> consumer = createConsumer();
...
final ConsumerRecords<Long, Employee> records = consumer.poll(Duration.ofMillis(100));
Ez a teljes consumer:
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.stream.IntStream;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import hu.alerant.kafka.avro.message.Employee;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
public class AvroConsumer {
private final static String BOOTSTRAP_SERVERS = "kafka:29092";
private final static String TOPIC = "test-topic";
private static Consumer<Long, Employee> createConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleAvroConsumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://schema-registry:8081");
return new KafkaConsumer<>(props);
}
public static void main(String... args) {
final Consumer<Long, Employee> consumer = createConsumer();
consumer.subscribe(Collections.singletonList(TOPIC));
IntStream.range(1, 100).forEach(index -> {
final ConsumerRecords<Long, Employee> records = consumer.poll(Duration.ofMillis(100));
if (records.count() == 0) {
System.out.println("None found");
} else
records.forEach(record -> {
Employee employeeRecord = record.value();
System.out.printf("%s %d %d %s \n", record.topic(), record.partition(), record.offset(),
employeeRecord);
});
});
consumer.close();
}
}
Warning
A org.apache.kafka.clients.consumer.KafkaConsumer.poll(long) már deprecated. Helyette a KafkaConsumer.poll(Duration) metódust kell használni
2019-04-02 12:25:04 DEBUG AbstractCoordinator:822 - [Consumer clientId=consumer-1, groupId=KafkaExampleAvroConsumer] Received successful Heartbeat response test-topic 0 1 {"firstName": "Bob", "lastName": "Jones", "age": 35, "phoneNumber": ""}
Generikus consumer
Ha a KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG értéke hamis, akkor a választ a válasz paroszlására a GenericRecord nevű általános célú objektumot kell használni, amiből extra munkával lehet csak kinyerni az eredeti objektum mezőit, cserélbe nem kell séma specifikus consumer-t írni.
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "false");
Mikor a consumer-t példányosítjuk, meg kell adni a GenericRecord típust. Majd mikor elkérjük a consumer-től az üzenetet, akkor is a GenericRecord-t kell megadni:
final Consumer<Long, GenericRecord> consumer = createConsumer();
...
ConsumerRecords<Long, GenericRecord> records = consumer.poll(Duration.ofMillis(100));
Ez a teljes consumer:
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
public class AvroConsumerGeneric {
private final static String BOOTSTRAP_SERVERS = "kafka:29092";
private final static String TOPIC = "test-topic";
private static Consumer<Long, GenericRecord> createConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleAvroConsumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
org.apache.kafka.common.serialization.LongDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "false");
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://schema-registry:8081");
return new KafkaConsumer<>(props);
}
public static void main(String... args) {
final Consumer<Long, GenericRecord> consumer = createConsumer();
consumer.subscribe(Collections.singletonList(TOPIC));
try {
while (true) {
ConsumerRecords<Long, GenericRecord> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<Long, GenericRecord> record : records) {
GenericRecord valueGr = record.value();
System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(),
valueGr.toString());
}
}
} finally {
consumer.close();
}
}
}
Láthatjuk, hogy a GenericRecord példányban ott van a producer által küldött JSON:
offset = 8, key = 123456778, value = {"firstName": "Bob", "lastName": "Jones", "age": 35, "phoneNumber": ""}
Partition keys
Akárcsak a producer esetén, a consumer-ben is használható nem Avro-s kulcs. Lényeg, hogy a consumer-ben ugyan azt a kulcs szerializációs eljárást kell használni, mint a producer-ben:
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.LongDeserializer.class);
Logstash consumer
https://github.com/revpoint/logstash-codec-avro_schema_registry
https://www.elastic.co/guide/en/logstash/current/plugins-codecs-avro.html