Changes

Apache Avro with Kafka

4,009 bytes added, 12:10, 2 April 2019
Java consumer
===Specifikus séma használata===
Ha a KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG értéke igaz, akkor a választ egy előre meghatározott objektum típusban várjuk, a példában ez lesz a '''Employee.java'''
<source lang="java">
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
</source>
Mikor a consumer-t példányosítjuk, már ott meg kell adni, hogy mi az az Avro típus, amit válaszként várunk. Majd mikor elkérjük a consumer-től az üzenetet, akkor is pontosan meg kell adni a típust.
<source lang="java">
Consumer<Long, Employee> consumer = createConsumer();
...
final ConsumerRecords<Long, Employee> records = consumer.poll(Duration.ofMillis(100));
</source>
Ez a teljes consumer:
<source lang="java">
import java.time.Duration;
2019-04-02 12:25:04 DEBUG AbstractCoordinator:822 - [Consumer clientId=consumer-1, groupId=KafkaExampleAvroConsumer] Received successful Heartbeat response
test-topic 0 1 {"firstName": "Bob", "lastName": "Jones", "age": 35, "phoneNumber": ""}
</pre>
 
 
===Generikus séma használata===
Ha a KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG értéke hamis, akkor a választ a válasz paroszlására a '''GenericRecord''' nevű általános célú objektumot kell használni, amiből extra munkával lehet csak kinyerni az eredeti objektum mezőit, cserélbe nem kell séma specifikus consumer-t írni.
<source lang="java">
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "false");
</source>
 
Mikor a consumer-t példányosítjuk, meg kell adni a GenericRecord típust. Majd mikor elkérjük a consumer-től az üzenetet, akkor is a '''GenericRecord'''-t kell megadni:
<source lang="java">
final Consumer<Long, GenericRecord> consumer = createConsumer();
...
ConsumerRecords<Long, GenericRecord> records = consumer.poll(Duration.ofMillis(100));
</source>
 
 
Ez a teljes consumer:
<source lang="java">
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
 
public class AvroConsumerGeneric {
 
private final static String BOOTSTRAP_SERVERS = "kafka:29092";
private final static String TOPIC = "test-topic";
 
private static Consumer<Long, GenericRecord> createConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleAvroConsumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
org.apache.kafka.common.serialization.LongDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "false");
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://schema-registry:8081");
return new KafkaConsumer<>(props);
}
 
public static void main(String... args) {
final Consumer<Long, GenericRecord> consumer = createConsumer();
consumer.subscribe(Collections.singletonList(TOPIC));
 
try {
while (true) {
ConsumerRecords<Long, GenericRecord> records = consumer.poll(Duration.ofMillis(100));
 
for (ConsumerRecord<Long, GenericRecord> record : records) {
GenericRecord valueGr = record.value();
System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(),
valueGr.toString());
}
}
} finally {
consumer.close();
}
}
}
</source>
 
Láthatjuk, hogy a '''GenericRecord''' példányban ott van a producer által küldött JSON:
<pre>
offset = 8, key = 123456778, value = {"firstName": "Bob", "lastName": "Jones", "age": 35, "phoneNumber": ""}
</pre>
===Partition keys===
Akárcsak a producer esetén, a consumer-ben is használható nem Avro-s kulcs. Lényeg, hogy a consumer-ben ugyan azt a kulcs szerializációs eljárást kell használni, mint a producer-ben:
<source lang="java">
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.LongDeserializer.class);