Changes

Jump to: navigation, search

Apache Kafka

2,412 bytes added, 20:18, 20 April 2019
Java producer
==Java producer==
A Java klienssel közvetlenül fogunk üzeneteket írni egy Kafka topic-ba.
===Pom.xml===
Két dependenciára van szükségünk. A serializációs osztályok a '''kafa''' csomagban vannak, míg a producer a '''kafka-clients''' csomagban van.
<source lang="xml">
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>2.1.1-cp1</version>
</dependency>
</dependencies>
</source>
<br>
 
===Java kód===
 
<source lang="java">
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:29092");
</source>
 
<source lang="java">
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
</source>
 
 
 
<source lang="java">
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
 
public class KafkaProducerExample {
private final static String TOPIC = "test2-topic";
 
private static Producer<Long, String> createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:29092");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaProducer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return new KafkaProducer<>(props);
}
 
static void runProducer() throws Exception {
final Producer<Long, String> producer = createProducer();
long key = System.currentTimeMillis();
try {
final ProducerRecord<Long, String> record = new ProducerRecord<>(TOPIC, key, "Hello World");
RecordMetadata metadata = producer.send(record).get();
 
System.out.printf("sent record(key=%s value=%s) " + "meta(partition=%d, offset=%d)", record.key(),
record.value(), metadata.partition(), metadata.offset());
 
} finally {
producer.flush();
producer.close();
}
}
 
public static void main(String... args) throws Exception {
runProducer();
 
}
}
</source>
==Logback producer==

Navigation menu