Changes

Apache Avro with Kafka

2,426 bytes added, 10:27, 2 April 2019
Consumers
==Java consumer==
 
 
<source lang="java">
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.stream.IntStream;
 
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
 
import hu.alerant.kafka.avro.message.Employee;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
 
public class AvroConsumer {
 
private final static String BOOTSTRAP_SERVERS = "kafka:29092";
private final static String TOPIC = "test-topic";
 
private static Consumer<Long, Employee> createConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleAvroConsumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
 
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://schema-registry:8081");
return new KafkaConsumer<>(props);
}
 
public static void main(String... args) {
final Consumer<Long, Employee> consumer = createConsumer();
consumer.subscribe(Collections.singletonList(TOPIC));
IntStream.range(1, 100).forEach(index -> {
final ConsumerRecords<Long, Employee> records = consumer.poll(Duration.ofMillis(100));
if (records.count() == 0) {
System.out.println("None found");
} else
records.forEach(record -> {
Employee employeeRecord = record.value();
System.out.printf("%s %d %d %s \n", record.topic(), record.partition(), record.offset(),
employeeRecord);
});
});
consumer.close();
}
}
</source>
 
 
 
{{warning|A '''''org.apache.kafka.clients.consumer.KafkaConsumer.poll(long)''''' már deprecated. Helyette a '''KafkaConsumer.poll(Duration)''' metódust kell használni}}
 
 
 
 
<pre>
2019-04-02 12:25:04 DEBUG AbstractCoordinator:822 - [Consumer clientId=consumer-1, groupId=KafkaExampleAvroConsumer] Received successful Heartbeat response
test-topic 0 1 {"firstName": "Bob", "lastName": "Jones", "age": 35, "phoneNumber": ""}
</pre>
 
==Logstash consumer==