Difference between revisions of "Apache Kafka"

From berki WIKI
Jump to: navigation, search
(Java kliens)
Line 161: Line 161:
 
}
 
}
 
}
 
}
 +
</source>
 +
 +
 +
=Logstash producer and consumer=
 +
 +
 +
=Avro=
 +
 +
https://docs.confluent.io/current/installation/docker/config-reference.html
 +
 +
<source>
 +
version: '3.2'
 +
services:
 +
  zookeeper:
 +
    image: confluentinc/cp-zookeeper:5.1.2
 +
    networks:
 +
      - kafka-net
 +
    deploy:
 +
      placement:
 +
        constraints:
 +
        - node.role == worker
 +
    environment:
 +
      ZOOKEEPER_CLIENT_PORT: 32181
 +
      ZOOKEEPER_TICK_TIME: 2000
 +
      ZOOKEEPER_SYNC_LIMIT: 2
 +
  kafka:
 +
    image: confluentinc/cp-kafka:5.1.2
 +
    networks:
 +
      - kafka-net
 +
    ports:
 +
      - target: 29092
 +
        published: 29092
 +
        protocol: tcp
 +
        mode: host
 +
    deploy:
 +
      placement:
 +
        constraints:
 +
        - node.role == worker
 +
    environment:
 +
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181
 +
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092
 +
      KAFKA_BROKER_ID: 2
 +
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
 +
  schema-registry:
 +
    image: confluentinc/cp-schema-registry:5.1.2
 +
    networks:
 +
      - kafka-net
 +
    ports:
 +
      - 8081:8081
 +
    deploy:
 +
      placement:
 +
        constraints:
 +
          - node.role == worker
 +
    environment:
 +
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:32181
 +
      SCHEMA_REGISTRY_HOST_NAME: "schema-registry"
 +
      SCHEMA_REGISTRY_LISTENERS: "http://schema-registry:8081"
 +
      SCHEMA_REGISTRY_DEBUG: "true" 
 +
     
 +
networks:
 +
  kafka-net:
 +
    driver: overlay
 
</source>
 
</source>

Revision as of 11:42, 26 March 2019


file: confluentinc_swarm1.yaml

version: '3.2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.1.2
    networks:
      - kafa-net
    deploy:
      placement:
        constraints:
         - node.role == worker
    environment:
      ZOOKEEPER_CLIENT_PORT: 32181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_SYNC_LIMIT: 2
  kafka:
    image: confluentinc/cp-kafka:5.1.2
    networks:
      - kafa-net
    ports:
      - target: 29092
        published: 29092
        protocol: tcp
        mode: host
    deploy:
      placement:
        constraints:
         - node.role == worker
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092
      KAFKA_BROKER_ID: 2
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
networks:
  kafa-net:
    driver: overlay

Fontos, hogy 3.2-es verziót használjunk, mert csak abban lehet a portokat részletesen definiálni. A mode:host azt jelenti, hogy nem az ingress hálózaton publikálja a portot a swarm, hanem csak azon a node-on ahova telepítve van, tehát nem lesz load-balance-olt a kafka. Ez arra kell, hogy ha több példányban is futtatnánk, akkor az összes példány ugyan azon a porton legyen publikálva...


# docker node ls
ID                            HOSTNAME            STATUS              AVAILABILITY        MANAGER STATUS
kse3a58x6f34o7wp7mxum7nux     worker2             Ready               Active              
n39y1bvv50r7o12bijmm57kl2     worker0             Ready               Active              
ux3lwoqql2hiv6l9ylyx2rkar *   mg0                 Ready               Active              Leader
zwumqwkmo32zkg79gqztff397     worker1             Ready               Active              


docker stack deploy -c confluentinc_swarom.yml confluent
Creating service confluent_kafka
Creating service confluent_zookeeper


Keressük meg melyik node-on van a kafka.

# docker service ps confluent_kafka
ID                  NAME                IMAGE                         NODE        DESIRED STATE   PORTS
w74azzl3qzwa        confluent_kafka.1   confluentinc/cp-kafka:5.1.2   worker1     Running         *:29092->29092

Lépjünk be ssh-val a worker1 node-ra, majd lépjünk be az ott futó Kafka konténerbe.


# docker-machine ssh worker1
                        ##         .
                  ## ## ##        ==
               ## ## ## ## ##    ===
           /"""""""""""""""""\___/ ===
      ~~~ {~~ ~~~~ ~~~ ~~~~ ~~~ ~ /  ===- ~~~
           \______ o           __/
             \    \         __/
              \____\_______/
 _                 _   ____     _            _
| |__   ___   ___ | |_|___ \ __| | ___   ___| | _____ _ __
| '_ \ / _ \ / _ \| __| __) / _` |/ _ \ / __| |/ / _ \ '__|
| |_) | (_) | (_) | |_ / __/ (_| | (_) | (__|   <  __/ |
|_.__/ \___/ \___/ \__|_____\__,_|\___/ \___|_|\_\___|_|
Boot2Docker version 17.12.0-ce, build HEAD : 378b049 - Wed Dec 27 23:39:20 UTC 2017
Docker version 17.12.0-ce, build c97c6d6


Majd a kafak vezérlő script-ekkel hozzunk létre egy topikot, majd egy producer-el tegyünk bele üzeneteket, amit kiolvasunk majd egy consumer-el:

docker@worker1:/$ docker ps
CONTAINER ID        IMAGE                        
2f32b3ecaabd        confluentinc/cp-kafka:5.1.2 
docker@worker1:/$ docker exec -it 2f32b3ecaabd /bin/bash
root@2f32b3ecaabd:/# 

A kafka-topic paranccsal hozzuk létre az adam nevű topoicot. A parancsnak meg kell adni a zookeeper elérhetőségét. Mivel swarm stack ben fut a kafak és a zookeeper, a kafka konténeren belül használhatjuk a stack beli nevét a zookeeper-nek mivel közös overlay hálózaton van a kafka konténerrel.

# host zookeeper
zookeeper has address 10.0.2.8
Host zookeeper not found: 3(NXDOMAIN)

/usr/bin/kafka-topics --create --zookeeper zookeeper:32181 --replication-factor 1 --partitions 1 --topic adam
Created topic "adam".

Listázzuk ki a topic-okat:

root@2f32b3ecaabd:/# /usr/bin/kafka-topics --list --zookeeper zookeeper:32181
__confluent.support.metrics
adam


Indítsuk el a Kafka producer-t és küldjünk be pár üzenetet. A bootstrap-server-nek a kafka domain névvel hivatkozhatunk, mert ez volt a stack-ben a service neve, így ezt fel fogja oldani a swarm DNS szerver. Minden ENTER leütéssel egy új üzenetet küldünk be a topic-ba.

root@2f32b3ecaabd:/# /usr/bin/kafka-console-producer --broker-list kafka:29092 --topic adam
>msg1
>msg2


Töltsük le a lokális gépünkre a Kafka-t és navigáljunk a bin mappába, majd indítsuk el a comnsumer-t a lokális gépen. A Kafa bootstrap-server-nek a worker1 node IP címét kell megadni, ahol a Kafa fut.

[adam@adamDell2 bin]$ ./kafka-console-consumer.sh --bootstrap-server 192.168.42.95:29092 --topic adam --from-beginning
msg1
msg2


Java kliens

Producer<Long, String> producer = null;
		Properties config = new Properties();
		try {

			config.put("client.id", InetAddress.getLocalHost().getHostName());

			config.put("bootstrap.servers", "kafka:29092");
			config.put("acks", "all");
			config.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaExampleProducer");
			config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
			config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
			producer = new KafkaProducer<Long, String>(config);
			final ProducerRecord<Long, String> record = new ProducerRecord<>("adam", new Long("100"), "kkkk");
			Future<RecordMetadata> future = producer.send(record);

		} catch (UnknownHostException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} finally {
			if (producer != null) {
				producer.flush();
				producer.close();
			}
		}


Logstash producer and consumer

Avro

https://docs.confluent.io/current/installation/docker/config-reference.html

version: '3.2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.1.2
    networks:
      - kafka-net
    deploy:
      placement:
        constraints:
         - node.role == worker
    environment:
      ZOOKEEPER_CLIENT_PORT: 32181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_SYNC_LIMIT: 2
  kafka:
    image: confluentinc/cp-kafka:5.1.2
    networks:
      - kafka-net
    ports:
      - target: 29092
        published: 29092
        protocol: tcp
        mode: host
    deploy:
      placement:
        constraints:
         - node.role == worker
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092
      KAFKA_BROKER_ID: 2
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  schema-registry:
    image: confluentinc/cp-schema-registry:5.1.2
    networks:
      - kafka-net
    ports:
      - 8081:8081
    deploy:
      placement:
        constraints:
          - node.role == worker
    environment: 
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:32181
      SCHEMA_REGISTRY_HOST_NAME: "schema-registry"
      SCHEMA_REGISTRY_LISTENERS: "http://schema-registry:8081"
      SCHEMA_REGISTRY_DEBUG: "true"  
      
networks:
  kafka-net:
    driver: overlay