Apache Kafka

From berki WIKI
Revision as of 21:05, 29 March 2019 by Adam (talk | contribs) (Logstash konfiguráció)

Jump to: navigation, search
ClipCapIt-190327-232724.PNG


Kafka bemutatása

Java producer

Java consumer

Logstash

A logstash-t rakhatjuk a kafka elé és a kafka után is. Első lépésként a kafka elé fogjuk tenni, ami szortírozni fogja a logokat különböző topic-okba.

Producer

A futtatásához szükséges környezet egy két node-os swarm cluster lesz.

# docker node ls
ID                            HOSTNAME            STATUS              AVAILABILITY        MANAGER STATUS      ENGINE VERSION
maigxlyagj1fl4sgcf6rnn9pc *   mg0                 Ready               Active              Leader              18.05.0-ce
vox99u5s1g1su742mc6npm370     worker0             Ready               Active                                  18.05.0-ce

Itt fogunk futtatni egy docker stack-et ami tartalmaz majd a zookeeper-en és a kafka-n kívül is a logstash-t.


ClipCapIt-190327-224419.PNG

Mind a három komponenst rá fogjuk kötni az ingress hálózatra is, mivel a Java producer-nek el kell érnie a logstash-t, és a consumer-nek pedig a kafa-t.

A loggolásra logback-et fogunk használni, aki a logstash 51415-ös portjára fogja küldeni TCP


Logstash konfiguráció

A lostash a TCP socket-en keresztül várja majd a logback-től a logokat. A logberben Marker-eket fogunk használni, amik a [tags] -

ClipCapIt-190327-230626.PNG


input {
  tcp { 
    port => 51415
    codec => "json"
  }
}

output {

  if "T1" in [tags] {

    kafka {
      codec => json
      bootstrap_servers => "kafka:29092"
      topic_id => "T1-topic"
    }
    
  } else if "T2" in [tags] {

    kafka {
      codec => json
      bootstrap_servers => "kafka:29092"
      topic_id => "T2-topic"
    }
    
  } else {

    kafka {
      codec => json
      bootstrap_servers => "kafka:29092"
      topic_id => "msg-topic"
    }

  }

  stdout {
    codec => rubydebug
  }
}

Swarm stack

version: '3.2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.1.2
    networks:
      - kafka-net
    ports:
      - "32181:32181"
    deploy:
      placement:
        constraints:
         - node.role == worker
    environment:
      ZOOKEEPER_CLIENT_PORT: 32181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_SYNC_LIMIT: 2
  kafka:
    image: confluentinc/cp-kafka:5.1.2
    networks:
      - kafka-net
    ports:
      - target: 29092
        published: 29092
        protocol: tcp
    deploy:
      placement:
        constraints:
         - node.role == worker
    environment:
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:32181"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:29092"
      KAFKA_BROKER_ID: 2
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  logstash:
    image: docker.elastic.co/logstash/logstash:6.6.2
    networks:
      - kafka-net
    ports:
      - "51415:51415"
    environment:
      LOGSPOUT: "ignore"
      XPACK_MONITORING_ENABLED: "false"
    volumes:
      - "logstash-conf:/usr/share/logstash/pipeline"
    deploy:   
      placement:
        constraints:
         - node.role == worker
      restart_policy:
        condition: on-failure
      resources:
        reservations:
          memory: 100m   
networks:
  kafka-net:
    driver: overlay
volumes:
  logstash-conf:
    driver: nfs
    driver_opts:
      share: 192.168.42.1:/home/adam/dockerStore/logstash/config/





Topic-ok legyártása:

# ./kafka-topics.sh --create --zookeeper 192.168.42.113:32181 --replication-factor 1 --partitions 1 --topic T1-topic
Created topic "T1-topic".

# ./kafka-topics.sh --create --zookeeper 192.168.42.113:32181 --replication-factor 1 --partitions 1 --topic T2-topic
Created topic "T2-topic".

# ./kafka-topics.sh --create --zookeeper 192.168.42.113:32181 --replication-factor 1 --partitions 1 --topic msg-topic
Created topic "msg-topic".


# ./kafka-topics.sh --list --zookeeper 192.168.42.113:32181 
__confluent.support.metrics
__consumer_offsets
T1-topic
msg-topic
T2-topic


Pom.xml

        <dependency>
			<groupId>ch.qos.logback</groupId>
			<artifactId>logback-classic</artifactId>
			<version>1.2.3</version>
			<scope>runtime</scope>
		</dependency>

		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-api</artifactId>
			<version>1.7.25</version>
		</dependency>

		<dependency>
			<groupId>net.logstash.logback</groupId>
			<artifactId>logstash-logback-encoder</artifactId>
			<version>5.3</version>
		</dependency>


logout.xml

A logstash a LogstashTcpSocketAppender appender-en keresztül fogja elküldeni a logokat a logstash-benek. IP címnek a stack bármelyik node IP címét megadhatjuk. Mi a worker0 címét használjuk. Nagyon fontos, hogy megadjuk a shutdownHook-ot, ami biztosítja, hogy a JVM leállása előtt még a logback elküldje az összes függőben lévő logot. Ha előbb leáll a JVM mint hogy a logstash el tudta volna küldeni a logokat, akkor azok már nem lesznek kiküldve.

<configuration>
<!--     <shutdownHook/> -->
    
    <!--  Nagyon fontos, hogy leállítsuk a logger context-et mielőtt a VM leáll, mert ha a VM leállítása 
          nagyon közel van a log beíráshoz, akkor még azelőtt leáll az egész VM, hogy a logokat kiírtuk volna. 
          de ha még a VM leállítása előtt meghívjuk a sthudownHook-ot, akkor leállás előtt még ki fogja írni a logokat.  -->
    <shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/>
    
	<appender name="STDOUT"
		class="ch.qos.logback.core.ConsoleAppender">
		<encoder>
			<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
			</pattern>
		</encoder>
	</appender>

	<appender name="stash"
		class="net.logstash.logback.appender.LogstashTcpSocketAppender">
		<destination>192.168.42.113:51415</destination>

		<!-- encoder is required -->
		<encoder class="net.logstash.logback.encoder.LogstashEncoder">
			<customFields>{"appname":"adam"}</customFields>
		</encoder>
	</appender>

	<root level="debug">
        <appender-ref ref="stash" />
        <appender-ref ref="STDOUT" />
		
	</root>
</configuration>


Java producer

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
public class App 
{	
    private static final Logger logger = LoggerFactory.getLogger(App.class);
	
    public static void main( String[] args )
    {    	

        Marker taMarker = MarkerFactory.getMarker("T1");
        Marker alMarker = MarkerFactory.getMarker("T2");
    	
    	logger.info(taMarker, "Message to T1 from: {}", "adam");    	
    	logger.info(alMarker, "Message to T2 from: {}", "adam");
    	
        System.out.println( "Hello World!" );
    }
}

Tesztelés

Logstash log:

confluence_logstash.1.4a9rr1w42iud@worker0    | {
confluence_logstash.1.4a9rr1w42iud@worker0    |     "level_value" => 20000,
confluence_logstash.1.4a9rr1w42iud@worker0    |     "logger_name" => "kafka.example2.App",
confluence_logstash.1.4a9rr1w42iud@worker0    |         "appname" => "adam",
confluence_logstash.1.4a9rr1w42iud@worker0    |            "port" => 41024,
confluence_logstash.1.4a9rr1w42iud@worker0    |           "level" => "INFO",
confluence_logstash.1.4a9rr1w42iud@worker0    |        "@version" => "1",
confluence_logstash.1.4a9rr1w42iud@worker0    |            "host" => "10.255.0.3",
confluence_logstash.1.4a9rr1w42iud@worker0    |         "message" => "Message to T1 from: adam",
confluence_logstash.1.4a9rr1w42iud@worker0    |     "thread_name" => "main",
confluence_logstash.1.4a9rr1w42iud@worker0    |      "@timestamp" => 2019-03-26T22:52:19.168Z,
confluence_logstash.1.4a9rr1w42iud@worker0    |            "tags" => [
confluence_logstash.1.4a9rr1w42iud@worker0    |         [0] "TA"
confluence_logstash.1.4a9rr1w42iud@worker0    |     ]
confluence_logstash.1.4a9rr1w42iud@worker0    | }
confluence_logstash.1.4a9rr1w42iud@worker0    | {
confluence_logstash.1.4a9rr1w42iud@worker0    |     "level_value" => 20000,
confluence_logstash.1.4a9rr1w42iud@worker0    |     "logger_name" => "kafka.example2.App",
confluence_logstash.1.4a9rr1w42iud@worker0    |         "appname" => "adam",
confluence_logstash.1.4a9rr1w42iud@worker0    |            "port" => 41024,
confluence_logstash.1.4a9rr1w42iud@worker0    |           "level" => "INFO",
confluence_logstash.1.4a9rr1w42iud@worker0    |        "@version" => "1",
confluence_logstash.1.4a9rr1w42iud@worker0    |            "host" => "10.255.0.3",
confluence_logstash.1.4a9rr1w42iud@worker0    |         "message" => "Message to T2 from: adam",
confluence_logstash.1.4a9rr1w42iud@worker0    |     "thread_name" => "main",
confluence_logstash.1.4a9rr1w42iud@worker0    |      "@timestamp" => 2019-03-26T22:52:19.176Z,
confluence_logstash.1.4a9rr1w42iud@worker0    |            "tags" => [
confluence_logstash.1.4a9rr1w42iud@worker0    |         [0] "AL"
confluence_logstash.1.4a9rr1w42iud@worker0    |     ]
confluence_logstash.1.4a9rr1w42iud@worker0    | }


Indítsunk el egy egy kafka-console-consumer.sh-t mind a T1 mind a T2 topic-kra.

./kafka-console-consumer.sh --bootstrap-server 192.168.42.113:29092 --topic T1-topic --from-beginning

{"level_value":20000,"logger_name":"kafka.example2.App","appname":"adam","port":41024,"level":"INFO","@version":"1","host":"10.255.0.3","message":"Message to T1 from: adam","thread_name":"main","@timestamp":"2019-03-26T22:52:19.168Z","tags":["T1"]}


./kafka-console-consumer.sh --bootstrap-server 192.168.42.113:29092 --topic T2-topic --from-beginning

{"level_value":20000,"logger_name":"kafka.example2.App","appname":"adam","port":41024,"level":"INFO","@version":"1","host":"10.255.0.3","message":"Message to T2 from: adam","thread_name":"main","@timestamp":"2019-03-26T22:52:19.176Z","tags":["T2"]}