Difference between revisions of "Apache Kafka"
(→Java kliens) |
|||
Line 161: | Line 161: | ||
} | } | ||
} | } | ||
+ | </source> | ||
+ | |||
+ | |||
+ | =Logstash producer and consumer= | ||
+ | |||
+ | |||
+ | =Avro= | ||
+ | |||
+ | https://docs.confluent.io/current/installation/docker/config-reference.html | ||
+ | |||
+ | <source> | ||
+ | version: '3.2' | ||
+ | services: | ||
+ | zookeeper: | ||
+ | image: confluentinc/cp-zookeeper:5.1.2 | ||
+ | networks: | ||
+ | - kafka-net | ||
+ | deploy: | ||
+ | placement: | ||
+ | constraints: | ||
+ | - node.role == worker | ||
+ | environment: | ||
+ | ZOOKEEPER_CLIENT_PORT: 32181 | ||
+ | ZOOKEEPER_TICK_TIME: 2000 | ||
+ | ZOOKEEPER_SYNC_LIMIT: 2 | ||
+ | kafka: | ||
+ | image: confluentinc/cp-kafka:5.1.2 | ||
+ | networks: | ||
+ | - kafka-net | ||
+ | ports: | ||
+ | - target: 29092 | ||
+ | published: 29092 | ||
+ | protocol: tcp | ||
+ | mode: host | ||
+ | deploy: | ||
+ | placement: | ||
+ | constraints: | ||
+ | - node.role == worker | ||
+ | environment: | ||
+ | KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181 | ||
+ | KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092 | ||
+ | KAFKA_BROKER_ID: 2 | ||
+ | KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 | ||
+ | schema-registry: | ||
+ | image: confluentinc/cp-schema-registry:5.1.2 | ||
+ | networks: | ||
+ | - kafka-net | ||
+ | ports: | ||
+ | - 8081:8081 | ||
+ | deploy: | ||
+ | placement: | ||
+ | constraints: | ||
+ | - node.role == worker | ||
+ | environment: | ||
+ | SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:32181 | ||
+ | SCHEMA_REGISTRY_HOST_NAME: "schema-registry" | ||
+ | SCHEMA_REGISTRY_LISTENERS: "http://schema-registry:8081" | ||
+ | SCHEMA_REGISTRY_DEBUG: "true" | ||
+ | |||
+ | networks: | ||
+ | kafka-net: | ||
+ | driver: overlay | ||
</source> | </source> |
Revision as of 11:42, 26 March 2019
file: confluentinc_swarm1.yaml
version: '3.2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.1.2
networks:
- kafa-net
deploy:
placement:
constraints:
- node.role == worker
environment:
ZOOKEEPER_CLIENT_PORT: 32181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_SYNC_LIMIT: 2
kafka:
image: confluentinc/cp-kafka:5.1.2
networks:
- kafa-net
ports:
- target: 29092
published: 29092
protocol: tcp
mode: host
deploy:
placement:
constraints:
- node.role == worker
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092
KAFKA_BROKER_ID: 2
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
networks:
kafa-net:
driver: overlay
Fontos, hogy 3.2-es verziót használjunk, mert csak abban lehet a portokat részletesen definiálni. A mode:host azt jelenti, hogy nem az ingress hálózaton publikálja a portot a swarm, hanem csak azon a node-on ahova telepítve van, tehát nem lesz load-balance-olt a kafka. Ez arra kell, hogy ha több példányban is futtatnánk, akkor az összes példány ugyan azon a porton legyen publikálva...
# docker node ls ID HOSTNAME STATUS AVAILABILITY MANAGER STATUS kse3a58x6f34o7wp7mxum7nux worker2 Ready Active n39y1bvv50r7o12bijmm57kl2 worker0 Ready Active ux3lwoqql2hiv6l9ylyx2rkar * mg0 Ready Active Leader zwumqwkmo32zkg79gqztff397 worker1 Ready Active
docker stack deploy -c confluentinc_swarom.yml confluent Creating service confluent_kafka Creating service confluent_zookeeper
Keressük meg melyik node-on van a kafka.
# docker service ps confluent_kafka ID NAME IMAGE NODE DESIRED STATE PORTS w74azzl3qzwa confluent_kafka.1 confluentinc/cp-kafka:5.1.2 worker1 Running *:29092->29092
Lépjünk be ssh-val a worker1 node-ra, majd lépjünk be az ott futó Kafka konténerbe.
# docker-machine ssh worker1 ## . ## ## ## == ## ## ## ## ## === /"""""""""""""""""\___/ === ~~~ {~~ ~~~~ ~~~ ~~~~ ~~~ ~ / ===- ~~~ \______ o __/ \ \ __/ \____\_______/ _ _ ____ _ _ | |__ ___ ___ | |_|___ \ __| | ___ ___| | _____ _ __ | '_ \ / _ \ / _ \| __| __) / _` |/ _ \ / __| |/ / _ \ '__| | |_) | (_) | (_) | |_ / __/ (_| | (_) | (__| < __/ | |_.__/ \___/ \___/ \__|_____\__,_|\___/ \___|_|\_\___|_| Boot2Docker version 17.12.0-ce, build HEAD : 378b049 - Wed Dec 27 23:39:20 UTC 2017 Docker version 17.12.0-ce, build c97c6d6
Majd a kafak vezérlő script-ekkel hozzunk létre egy topikot, majd egy producer-el tegyünk bele üzeneteket, amit kiolvasunk majd egy consumer-el:
docker@worker1:/$ docker ps CONTAINER ID IMAGE 2f32b3ecaabd confluentinc/cp-kafka:5.1.2
docker@worker1:/$ docker exec -it 2f32b3ecaabd /bin/bash root@2f32b3ecaabd:/#
A kafka-topic paranccsal hozzuk létre az adam nevű topoicot. A parancsnak meg kell adni a zookeeper elérhetőségét. Mivel swarm stack ben fut a kafak és a zookeeper, a kafka konténeren belül használhatjuk a stack beli nevét a zookeeper-nek mivel közös overlay hálózaton van a kafka konténerrel.
# host zookeeper zookeeper has address 10.0.2.8 Host zookeeper not found: 3(NXDOMAIN) /usr/bin/kafka-topics --create --zookeeper zookeeper:32181 --replication-factor 1 --partitions 1 --topic adam Created topic "adam".
Listázzuk ki a topic-okat:
root@2f32b3ecaabd:/# /usr/bin/kafka-topics --list --zookeeper zookeeper:32181 __confluent.support.metrics adam
Indítsuk el a Kafka producer-t és küldjünk be pár üzenetet. A bootstrap-server-nek a kafka domain névvel hivatkozhatunk, mert ez volt a stack-ben a service neve, így ezt fel fogja oldani a swarm DNS szerver. Minden ENTER leütéssel egy új üzenetet küldünk be a topic-ba.
root@2f32b3ecaabd:/# /usr/bin/kafka-console-producer --broker-list kafka:29092 --topic adam >msg1 >msg2
Töltsük le a lokális gépünkre a Kafka-t és navigáljunk a bin mappába, majd indítsuk el a comnsumer-t a lokális gépen. A Kafa bootstrap-server-nek a worker1 node IP címét kell megadni, ahol a Kafa fut.
[adam@adamDell2 bin]$ ./kafka-console-consumer.sh --bootstrap-server 192.168.42.95:29092 --topic adam --from-beginning msg1 msg2
Java kliens
Producer<Long, String> producer = null;
Properties config = new Properties();
try {
config.put("client.id", InetAddress.getLocalHost().getHostName());
config.put("bootstrap.servers", "kafka:29092");
config.put("acks", "all");
config.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaExampleProducer");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producer = new KafkaProducer<Long, String>(config);
final ProducerRecord<Long, String> record = new ProducerRecord<>("adam", new Long("100"), "kkkk");
Future<RecordMetadata> future = producer.send(record);
} catch (UnknownHostException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
if (producer != null) {
producer.flush();
producer.close();
}
}
Logstash producer and consumer
Avro
https://docs.confluent.io/current/installation/docker/config-reference.html
version: '3.2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.1.2
networks:
- kafka-net
deploy:
placement:
constraints:
- node.role == worker
environment:
ZOOKEEPER_CLIENT_PORT: 32181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_SYNC_LIMIT: 2
kafka:
image: confluentinc/cp-kafka:5.1.2
networks:
- kafka-net
ports:
- target: 29092
published: 29092
protocol: tcp
mode: host
deploy:
placement:
constraints:
- node.role == worker
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092
KAFKA_BROKER_ID: 2
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
schema-registry:
image: confluentinc/cp-schema-registry:5.1.2
networks:
- kafka-net
ports:
- 8081:8081
deploy:
placement:
constraints:
- node.role == worker
environment:
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:32181
SCHEMA_REGISTRY_HOST_NAME: "schema-registry"
SCHEMA_REGISTRY_LISTENERS: "http://schema-registry:8081"
SCHEMA_REGISTRY_DEBUG: "true"
networks:
kafka-net:
driver: overlay