Changes

Jump to: navigation, search

Apache Kafka

4,021 bytes removed, 21:55, 27 March 2019
no edit summary
 
=Kafka bemutatása=
file: confluentinc_swarm1.yaml
<source lang="yaml">
version: '3.2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.1.2
networks:
- kafa-net
deploy:
placement:
constraints:
- node.role == worker
environment:
ZOOKEEPER_CLIENT_PORT: 32181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_SYNC_LIMIT: 2
kafka:
image: confluentinc/cp-kafka:5.1.2
networks:
- kafa-net
ports:
- target: 29092
published: 29092
protocol: tcp
mode: host
deploy:
placement:
constraints:
- node.role == worker
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092
KAFKA_BROKER_ID: 2
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
networks:
kafa-net:
driver: overlay
</source>
Fontos, hogy 3.2-es verziót használjunk, mert csak abban lehet a portokat részletesen definiálni. A mode:host azt jelenti, hogy nem az ingress hálózaton publikálja a portot a swarm, hanem csak azon a node-on ahova telepítve van, tehát nem lesz load-balance-olt a kafka. Ez arra kell, hogy ha több példányban is futtatnánk, akkor az összes példány ugyan azon a porton legyen publikálva...
=Java producer=
<pre># docker node lsID HOSTNAME STATUS AVAILABILITY MANAGER STATUSkse3a58x6f34o7wp7mxum7nux worker2 Ready Active n39y1bvv50r7o12bijmm57kl2 worker0 Ready Active ux3lwoqql2hiv6l9ylyx2rkar * mg0 Ready Active Leaderzwumqwkmo32zkg79gqztff397 worker1 Ready Active </pre>=Java consumer=
<pre>docker stack deploy -c confluentinc_swarom.yml confluentCreating service confluent_kafkaCreating service confluent_zookeeper</pre>=Logstash=
==Producer==
Keressük meg melyik node-on van a kafka.
<pre>
# docker service ps confluent_kafka
ID NAME IMAGE NODE DESIRED STATE PORTS
w74azzl3qzwa confluent_kafka.1 confluentinc/cp-kafka:5.1.2 worker1 Running *:29092->29092
</pre>
Lépjünk be ssh-val a worker1 node-ra, majd lépjünk be az ott futó Kafka konténerbe.
:[[File:ClipCapIt-190327-224419.PNG]]
<pre>
# docker-machine ssh worker1
## .
## ## ## ==
## ## ## ## ## ===
/"""""""""""""""""\___/ ===
~~~ {~~ ~~~~ ~~~ ~~~~ ~~~ ~ / ===- ~~~
\______ o __/
\ \ __/
\____\_______/
_ _ ____ _ _
| |__ ___ ___ | |_|___ \ __| | ___ ___| | _____ _ __
| '_ \ / _ \ / _ \| __| __) / _` |/ _ \ / __| |/ / _ \ '__|
| |_) | (_) | (_) | |_ / __/ (_| | (_) | (__| < __/ |
|_.__/ \___/ \___/ \__|_____\__,_|\___/ \___|_|\_\___|_|
Boot2Docker version 17.12.0-ce, build HEAD : 378b049 - Wed Dec 27 23:39:20 UTC 2017
Docker version 17.12.0-ce, build c97c6d6
</pre>
Majd a kafak vezérlő script-ekkel hozzunk létre egy topikot, majd egy producer-el tegyünk bele üzeneteket, amit kiolvasunk majd egy consumer-el:
<pre>
docker@worker1:/$ docker ps
CONTAINER ID IMAGE
2f32b3ecaabd confluentinc/cp-kafka:5.1.2
</pre>
<pre>
docker@worker1:/$ docker exec -it 2f32b3ecaabd /bin/bash
root@2f32b3ecaabd:/#
</pre>
A kafka-topic paranccsal hozzuk létre az adam nevű topoicot. A parancsnak meg kell adni a zookeeper elérhetőségét. Mivel swarm stack ben fut a kafak és a zookeeper, a kafka konténeren belül használhatjuk a stack beli nevét a zookeeper-nek mivel közös overlay hálózaton van a kafka konténerrel.
<pre>
# host zookeeper
zookeeper has address 10.0.2.8
Host zookeeper not found: 3(NXDOMAIN)
/usr/bin/kafka-topics --create --zookeeper zookeeper:32181 --replication-factor 1 --partitions 1 --topic adam
Created topic "adam".
</pre>
Listázzuk ki a topic-okat:
<pre>
root@2f32b3ecaabd:/# /usr/bin/kafka-topics --list --zookeeper zookeeper:32181
__confluent.support.metrics
adam
</pre>
Indítsuk el a Kafka producer-t és küldjünk be pár üzenetet. A bootstrap-server-nek a kafka domain névvel hivatkozhatunk, mert ez volt a stack-ben a service neve, így ezt fel fogja oldani a swarm DNS szerver. Minden ENTER leütéssel egy új üzenetet küldünk be a topic-ba.
<pre>
root@2f32b3ecaabd:/# /usr/bin/kafka-console-producer --broker-list kafka:29092 --topic adam
>msg1
>msg2
</pre>
<source lang="xml">
input {
tcp {
port => 51415
codec => "json"
}
}
Töltsük le a lokális gépünkre a Kafka-t és navigáljunk a bin mappába, majd indítsuk el a comnsumer-t a lokális gépen. A Kafa bootstrap-server-nek a worker1 node IP címét kell megadni, ahol a Kafa fut. <pre>[adam@adamDell2 bin]$ ./kafka-console-consumer.sh --bootstrap-server 192.168.42.95:29092 --topic adam --from-beginningmsg1msg2</pre>output {
if "TA" in [tags] {
kafka { codec => json bootstrap_servers =Java kliens=> "kafka:29092" topic_id => "ta-topic" } } else if "AL" in [tags] {
<source lang kafka { codec =java>jsonProducer<Long, String bootstrap_servers => producer "kafka:29092" topic_id = null;> "al-topic" Properties config = new Properties(); } try } else {
config.put( kafka { codec => json bootstrap_servers => "client.idkafka:29092", InetAddress.getLocalHost().getHostName()); topic_id => "msg-topic" }
config.put("bootstrap.servers", "kafka:29092"); config.put("acks", "all"); config.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaExampleProducer"); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName()); config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producer = new KafkaProducer<Long, String>(config); final ProducerRecord<Long, String> record = new ProducerRecord<>("adam", new Long("100"), "kkkk"); Future<RecordMetadata> future = producer.send(record); }
} catch (UnknownHostException e) stdout { // TODO Auto-generated catch block e.printStackTrace(); } finally { if (producer ! codec = null) { producer.flush(); producer.close();> rubydebug } }
</source>
 
=Logstash producer and consumer=
 
 
=Avro=
 
https://docs.confluent.io/current/installation/docker/config-reference.html
<source lang="C++">
networks:
- kafka-net
ports:
- "32181:32181"
deploy:
placement:
published: 29092
protocol: tcp
mode: host
deploy:
placement:
- node.role == worker
environment:
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:32181" KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:29092"
KAFKA_BROKER_ID: 2
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
schema-registrylogstash: image: confluentincdocker.elastic.co/logstash/cp-schema-registrylogstash:56.16.2
networks:
- kafka-net
ports:
- 8081"51415:808151415" environment: LOGSPOUT: "ignore" XPACK_MONITORING_ENABLED: "false" volumes: - "logstash-conf:/usr/share/logstash/pipeline" deploy:
placement:
constraints:
- node.role == worker environment: SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeperrestart_policy:32181 SCHEMA_REGISTRY_HOST_NAME condition: "schemaon-registry"failure SCHEMA_REGISTRY_LISTENERSresources: "http://schema-registry reservations:8081" SCHEMA_REGISTRY_DEBUG memory: "true" 100m
networks:
kafka-net:
driver: overlay
volumes:
logstash-conf:
driver: nfs
driver_opts:
share: 192.168.42.1:/home/adam/dockerStore/logstash/config/
</source>
 
 
 
 
 
pom.xml
<source lang="xml">
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
<scope>runtime</scope>
</dependency>
 
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
 
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>5.3</version>
</dependency>
</source>
 
 
 
 
 
 
<source lang="java">
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
public class App
{
private static final Logger logger = LoggerFactory.getLogger(App.class);
public static void main( String[] args )
{
 
Marker taMarker = MarkerFactory.getMarker("TA");
Marker alMarker = MarkerFactory.getMarker("AL");
logger.info(taMarker, "Message to TA from: {}", "adam");
logger.info(alMarker, "Message to AL from: {}", "adam");
System.out.println( "Hello World!" );
}
}

Navigation menu