Difference between revisions of "Apache Kafka"

From berki WIKI
Jump to: navigation, search
(Avro)
Line 1: Line 1:
 +
 +
=Kafka bemutatása=
  
  
file: confluentinc_swarm1.yaml
 
<source lang="yaml">
 
version: '3.2'
 
services:
 
  zookeeper:
 
    image: confluentinc/cp-zookeeper:5.1.2
 
    networks:
 
      - kafa-net
 
    deploy:
 
      placement:
 
        constraints:
 
        - node.role == worker
 
    environment:
 
      ZOOKEEPER_CLIENT_PORT: 32181
 
      ZOOKEEPER_TICK_TIME: 2000
 
      ZOOKEEPER_SYNC_LIMIT: 2
 
  kafka:
 
    image: confluentinc/cp-kafka:5.1.2
 
    networks:
 
      - kafa-net
 
    ports:
 
      - target: 29092
 
        published: 29092
 
        protocol: tcp
 
        mode: host
 
    deploy:
 
      placement:
 
        constraints:
 
        - node.role == worker
 
    environment:
 
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181
 
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092
 
      KAFKA_BROKER_ID: 2
 
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
 
networks:
 
  kafa-net:
 
    driver: overlay
 
</source>
 
Fontos, hogy 3.2-es verziót használjunk, mert csak abban lehet a portokat részletesen definiálni. A mode:host azt jelenti, hogy nem az ingress hálózaton publikálja a portot a swarm, hanem csak azon a node-on ahova telepítve van, tehát nem lesz load-balance-olt a kafka. Ez arra kell, hogy ha több példányban is futtatnánk, akkor az összes példány ugyan azon a porton legyen publikálva...
 
  
 +
=Java producer=
  
<pre>
+
=Java consumer=
# docker node ls
 
ID                            HOSTNAME            STATUS              AVAILABILITY        MANAGER STATUS
 
kse3a58x6f34o7wp7mxum7nux    worker2            Ready              Active             
 
n39y1bvv50r7o12bijmm57kl2    worker0            Ready              Active             
 
ux3lwoqql2hiv6l9ylyx2rkar *  mg0                Ready              Active              Leader
 
zwumqwkmo32zkg79gqztff397    worker1            Ready              Active             
 
</pre>
 
  
  
<pre>
+
=Logstash=
docker stack deploy -c confluentinc_swarom.yml confluent
 
Creating service confluent_kafka
 
Creating service confluent_zookeeper
 
</pre>
 
  
 +
==Producer==
  
Keressük meg melyik node-on van a kafka.
 
<pre>
 
# docker service ps confluent_kafka
 
ID                  NAME                IMAGE                        NODE        DESIRED STATE  PORTS
 
w74azzl3qzwa        confluent_kafka.1  confluentinc/cp-kafka:5.1.2  worker1    Running        *:29092->29092
 
</pre>
 
Lépjünk be ssh-val a worker1 node-ra, majd lépjünk be az ott futó Kafka konténerbe.
 
  
 +
:[[File:ClipCapIt-190327-224419.PNG]]
  
<pre>
 
# docker-machine ssh worker1
 
                        ##        .
 
                  ## ## ##        ==
 
              ## ## ## ## ##    ===
 
          /"""""""""""""""""\___/ ===
 
      ~~~ {~~ ~~~~ ~~~ ~~~~ ~~~ ~ /  ===- ~~~
 
          \______ o          __/
 
            \    \        __/
 
              \____\_______/
 
_                _  ____    _            _
 
| |__  ___  ___ | |_|___ \ __| | ___  ___| | _____ _ __
 
| '_ \ / _ \ / _ \| __| __) / _` |/ _ \ / __| |/ / _ \ '__|
 
| |_) | (_) | (_) | |_ / __/ (_| | (_) | (__|  <  __/ |
 
|_.__/ \___/ \___/ \__|_____\__,_|\___/ \___|_|\_\___|_|
 
Boot2Docker version 17.12.0-ce, build HEAD : 378b049 - Wed Dec 27 23:39:20 UTC 2017
 
Docker version 17.12.0-ce, build c97c6d6
 
</pre>
 
  
  
Majd a kafak vezérlő script-ekkel hozzunk létre egy topikot, majd egy producer-el tegyünk bele üzeneteket, amit kiolvasunk majd egy consumer-el:
 
<pre>
 
docker@worker1:/$ docker ps
 
CONTAINER ID        IMAGE                       
 
2f32b3ecaabd        confluentinc/cp-kafka:5.1.2
 
</pre>
 
  
<pre>
 
docker@worker1:/$ docker exec -it 2f32b3ecaabd /bin/bash
 
root@2f32b3ecaabd:/#
 
</pre>
 
  
A kafka-topic paranccsal hozzuk létre az adam nevű topoicot. A parancsnak meg kell adni a zookeeper elérhetőségét. Mivel swarm stack ben fut a kafak és a zookeeper, a kafka konténeren belül használhatjuk a stack beli nevét a zookeeper-nek mivel közös overlay hálózaton van a kafka konténerrel.
 
<pre>
 
# host zookeeper
 
zookeeper has address 10.0.2.8
 
Host zookeeper not found: 3(NXDOMAIN)
 
  
/usr/bin/kafka-topics --create --zookeeper zookeeper:32181 --replication-factor 1 --partitions 1 --topic adam
 
Created topic "adam".
 
</pre>
 
  
Listázzuk ki a topic-okat:
 
<pre>
 
root@2f32b3ecaabd:/# /usr/bin/kafka-topics --list --zookeeper zookeeper:32181
 
__confluent.support.metrics
 
adam
 
</pre>
 
  
  
Indítsuk el a Kafka producer-t és küldjünk be pár üzenetet. A bootstrap-server-nek a kafka domain névvel hivatkozhatunk, mert ez volt a stack-ben a service neve, így ezt fel fogja oldani a swarm DNS szerver. Minden ENTER leütéssel egy új üzenetet küldünk be a topic-ba.
 
<pre>
 
root@2f32b3ecaabd:/# /usr/bin/kafka-console-producer --broker-list kafka:29092 --topic adam
 
>msg1
 
>msg2
 
</pre>
 
  
 +
<source lang="xml">
 +
input {
 +
  tcp {
 +
    port => 51415
 +
    codec => "json"
 +
  }
 +
}
  
Töltsük le a lokális gépünkre a Kafka-t és navigáljunk a bin mappába, majd indítsuk el a comnsumer-t a lokális gépen. A Kafa bootstrap-server-nek a worker1 node IP címét kell megadni, ahol a Kafa fut.
+
output {
<pre>
 
[adam@adamDell2 bin]$ ./kafka-console-consumer.sh --bootstrap-server 192.168.42.95:29092 --topic adam --from-beginning
 
msg1
 
msg2
 
</pre>
 
  
 +
  if "TA" in [tags] {
  
==Java kliens==
+
    kafka {
 +
      codec => json
 +
      bootstrap_servers => "kafka:29092"
 +
      topic_id => "ta-topic"
 +
    }
 +
   
 +
  } else if "AL" in [tags] {
  
<source lang=java>
+
    kafka {
Producer<Long, String> producer = null;
+
      codec => json
Properties config = new Properties();
+
      bootstrap_servers => "kafka:29092"
try {
+
      topic_id => "al-topic"
 +
    }
 +
   
 +
  } else {
  
config.put("client.id", InetAddress.getLocalHost().getHostName());
+
    kafka {
 +
      codec => json
 +
      bootstrap_servers => "kafka:29092"
 +
      topic_id => "msg-topic"
 +
    }
  
config.put("bootstrap.servers", "kafka:29092");
+
  }
config.put("acks", "all");
 
config.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaExampleProducer");
 
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
 
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
 
producer = new KafkaProducer<Long, String>(config);
 
final ProducerRecord<Long, String> record = new ProducerRecord<>("adam", new Long("100"), "kkkk");
 
Future<RecordMetadata> future = producer.send(record);
 
  
} catch (UnknownHostException e) {
+
  stdout {
// TODO Auto-generated catch block
+
    codec => rubydebug
e.printStackTrace();
+
  }
} finally {
+
}
if (producer != null) {
 
producer.flush();
 
producer.close();
 
}
 
}
 
 
</source>
 
</source>
  
 
=Logstash producer and consumer=
 
 
 
=Avro=
 
 
https://docs.confluent.io/current/installation/docker/config-reference.html
 
  
 
<source lang="C++">
 
<source lang="C++">
Line 178: Line 75:
 
     networks:
 
     networks:
 
       - kafka-net
 
       - kafka-net
 +
    ports:
 +
      - "32181:32181"
 
     deploy:
 
     deploy:
 
       placement:
 
       placement:
Line 194: Line 93:
 
         published: 29092
 
         published: 29092
 
         protocol: tcp
 
         protocol: tcp
        mode: host
 
 
     deploy:
 
     deploy:
 
       placement:
 
       placement:
Line 200: Line 98:
 
         - node.role == worker
 
         - node.role == worker
 
     environment:
 
     environment:
       KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181
+
       KAFKA_ZOOKEEPER_CONNECT: "zookeeper:32181"
       KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092
+
       KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:29092"
 
       KAFKA_BROKER_ID: 2
 
       KAFKA_BROKER_ID: 2
 
       KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
 
       KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
   schema-registry:
+
   logstash:
     image: confluentinc/cp-schema-registry:5.1.2
+
     image: docker.elastic.co/logstash/logstash:6.6.2
 
     networks:
 
     networks:
 
       - kafka-net
 
       - kafka-net
 
     ports:
 
     ports:
       - 8081:8081
+
       - "51415:51415"
     deploy:
+
    environment:
 +
      LOGSPOUT: "ignore"
 +
      XPACK_MONITORING_ENABLED: "false"
 +
    volumes:
 +
      - "logstash-conf:/usr/share/logstash/pipeline"
 +
     deploy:  
 
       placement:
 
       placement:
 
         constraints:
 
         constraints:
          - node.role == worker
+
        - node.role == worker
    environment:
+
       restart_policy:
       SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:32181
+
        condition: on-failure
      SCHEMA_REGISTRY_HOST_NAME: "schema-registry"
+
       resources:
       SCHEMA_REGISTRY_LISTENERS: "http://schema-registry:8081"
+
        reservations:
      SCHEMA_REGISTRY_DEBUG: "true" 
+
          memory: 100m 
     
 
 
networks:
 
networks:
 
   kafka-net:
 
   kafka-net:
 
     driver: overlay
 
     driver: overlay
 +
volumes:
 +
  logstash-conf:
 +
    driver: nfs
 +
    driver_opts:
 +
      share: 192.168.42.1:/home/adam/dockerStore/logstash/config/
 
</source>
 
</source>
 +
 +
 +
 +
 +
 +
pom.xml
 +
<source lang="xml">
 +
                <dependency>
 +
<groupId>ch.qos.logback</groupId>
 +
<artifactId>logback-classic</artifactId>
 +
<version>1.2.3</version>
 +
<scope>runtime</scope>
 +
</dependency>
 +
 +
<dependency>
 +
<groupId>org.slf4j</groupId>
 +
<artifactId>slf4j-api</artifactId>
 +
<version>1.7.25</version>
 +
</dependency>
 +
 +
<dependency>
 +
<groupId>net.logstash.logback</groupId>
 +
<artifactId>logstash-logback-encoder</artifactId>
 +
<version>5.3</version>
 +
</dependency>
 +
</source>
 +
 +
 +
 +
 +
 +
 +
<source lang="java">
 +
import org.slf4j.Logger;
 +
import org.slf4j.LoggerFactory;
 +
import org.slf4j.Marker;
 +
import org.slf4j.MarkerFactory;
 +
public class App
 +
{
 +
    private static final Logger logger = LoggerFactory.getLogger(App.class);
 +
 +
    public static void main( String[] args )
 +
    {   
 +
 +
        Marker taMarker = MarkerFactory.getMarker("TA");
 +
        Marker alMarker = MarkerFactory.getMarker("AL");
 +
   
 +
    logger.info(taMarker, "Message to TA from: {}", "adam");   
 +
    logger.info(alMarker, "Message to AL from: {}", "adam");
 +
   
 +
        System.out.println( "Hello World!" );
 +
    }
 +
}

Revision as of 21:55, 27 March 2019

Kafka bemutatása

Java producer

Java consumer

Logstash

Producer

ClipCapIt-190327-224419.PNG






input {
  tcp { 
    port => 51415
    codec => "json"
  }
}

output {

  if "TA" in [tags] {

    kafka {
      codec => json
      bootstrap_servers => "kafka:29092"
      topic_id => "ta-topic"
    }
    
  } else if "AL" in [tags] {

    kafka {
      codec => json
      bootstrap_servers => "kafka:29092"
      topic_id => "al-topic"
    }
    
  } else {

    kafka {
      codec => json
      bootstrap_servers => "kafka:29092"
      topic_id => "msg-topic"
    }

  }

  stdout {
    codec => rubydebug
  }
}


version: '3.2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.1.2
    networks:
      - kafka-net
    ports:
      - "32181:32181"
    deploy:
      placement:
        constraints:
         - node.role == worker
    environment:
      ZOOKEEPER_CLIENT_PORT: 32181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_SYNC_LIMIT: 2
  kafka:
    image: confluentinc/cp-kafka:5.1.2
    networks:
      - kafka-net
    ports:
      - target: 29092
        published: 29092
        protocol: tcp
    deploy:
      placement:
        constraints:
         - node.role == worker
    environment:
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:32181"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:29092"
      KAFKA_BROKER_ID: 2
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  logstash:
    image: docker.elastic.co/logstash/logstash:6.6.2
    networks:
      - kafka-net
    ports:
      - "51415:51415"
    environment:
      LOGSPOUT: "ignore"
      XPACK_MONITORING_ENABLED: "false"
    volumes:
      - "logstash-conf:/usr/share/logstash/pipeline"
    deploy:   
      placement:
        constraints:
         - node.role == worker
      restart_policy:
        condition: on-failure
      resources:
        reservations:
          memory: 100m   
networks:
  kafka-net:
    driver: overlay
volumes:
  logstash-conf:
    driver: nfs
    driver_opts:
      share: 192.168.42.1:/home/adam/dockerStore/logstash/config/



pom.xml

                 <dependency>
			<groupId>ch.qos.logback</groupId>
			<artifactId>logback-classic</artifactId>
			<version>1.2.3</version>
			<scope>runtime</scope>
		</dependency>

		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-api</artifactId>
			<version>1.7.25</version>
		</dependency>

		<dependency>
			<groupId>net.logstash.logback</groupId>
			<artifactId>logstash-logback-encoder</artifactId>
			<version>5.3</version>
		</dependency>




<source lang="java"> import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.Marker; import org.slf4j.MarkerFactory; public class App {

   private static final Logger logger = LoggerFactory.getLogger(App.class);
   public static void main( String[] args )
   {    	
       Marker taMarker = MarkerFactory.getMarker("TA");
       Marker alMarker = MarkerFactory.getMarker("AL");
   	
   	logger.info(taMarker, "Message to TA from: {}", "adam");    	
   	logger.info(alMarker, "Message to AL from: {}", "adam");
   	
       System.out.println( "Hello World!" );
   }

}