Difference between revisions of "Apache Kafka"

From berki WIKI
Jump to: navigation, search
(Producer)
(Logstash)
Line 41: Line 41:
  
  
=Logstash=
+
=Sorting messages with Logstash=
  
 
A logstash-t rakhatjuk a kafka elé és a kafka után is. Első lépésként a kafka elé fogjuk tenni, ami szortírozni fogja a logokat különböző topic-okba.  
 
A logstash-t rakhatjuk a kafka elé és a kafka után is. Első lépésként a kafka elé fogjuk tenni, ami szortírozni fogja a logokat különböző topic-okba.  

Revision as of 23:01, 19 April 2019

ClipCapIt-190327-232724.PNG


Kafka bemutatása

Mikor elindítunk egy Kafa példányt, akkor valójában egy kafka brokert indítunk el. A brokereknek van száma, ezeket nem lehet konfigurációból felskálázni. Ha producer-ek mindig egy brokerhez csatlakoznak. A teljes konfiguráció zookeeper-ben van tárolva. A zookeeper tudja értesíteni a klienseket ha a konfiguráció változik, ezért hamar elterjed a hálózaton a változás.

A producer-ek egy megadott topic-kra dobálják be az üzeneteket, amit onnan a consumer-ek kiolvasnak. Egy topic tetszőleges számú partícióból állhat. Egy partíció az a logikai egység, aminek rá kell férnie egy lemezre. A topic-kot úgy kell felskálázni, hogy egyre több partíciót adunk hozzá, amik különböző brokereken fognak létrejönni. Minden partíciónak lehet egy vagy több replikája, amik biztonsági másolatok. Mikor a producer beküld egy üzenetet egy partícióba, akkor fog committed üzenetnek minősülni, ha minden replikára is eljutott.

Azt, hogy egy producer melyik partícióba dobja az üzenetet vagy a kulcs határozza meg, vagy round-rubin módon mindig egy másikba teszi. Ha van kulcs, akkor az abból készült hash fogja meghatározni, hogy melyik partícióba kerüljön. Ugyan az a kulcs így mindig ugyan abba a partícióba fog kerülni. De a kulcs nem kötelező. A sorrend tartás csak egy partíción belül garantált, de ott nagyon. Ha nagyon kritikus bizonyos üzenetek sorrendje, akkor azokat egy partícióba kell rakni azonos kulcsot használva. Loggolásnál ez nem kritikus, egyrészt mert a logstash sorba rakja az üzeneteket, másrészt mikor elastichsearch-be szúrjuk, ott a dátum lesz az egyik attribútum, ami alapján már sorba lehet majd újra rendezni a logokat. Az meg amúgy sem kritikus, ha a log egy része enyhe csúszással kerül be az adatbázisba, lényeg, hogy végül helyes lesz a sorrend.

A comsumer-eket úgynevezett consumer-group-okba szervezzük az azonosítójuk szerint. Egy csoport mindig ugyan azon topic üzeneteit olvassa, de minden egyes consumer a csoporotban más és más partícióból. Minden partíció csak egy consumer-hez rendelhető hozzá egy csoporton belül. De ha nincs annyi consumer a csoportban mind ahány partíció, akkor egy consumer több partíciót is fog olvasni. Viszont ha több consumer van mint partíció egy csoportban, akkor bizonyos consumer-ek mindig idle állapotban lesznek. Minden csoporton belül van egy vezető consumer, általában az aki először csatlakozott. Ő teríti a többieknek a cluster információkat. A kafka nem tudja értelmezni sem a kulcsot sem az üzenetet. Ez számára egy bájt tömb. Az, hogy egy objektumból hogy lesz bájt tömb kulcs és bájt tömb üzenet a producer-ben lévő serializátor dolga. A consumer-ben pedig a deserializázor dolga, hogy a bájt folyamból újra értelmes objektumot állítson elő.

minden partíció újabb üzenete mindig a partíció végére íródik. A partíció elejétől számoljuk az üzenetek sorszámát, ezt hívjuk offset-nek. Mikor egy consumer kiolvas egy üzentet, attól az még ott marad a partícióba egészen addig, amíg len nem jár, alapértelmezetten ez egy nap. Tehát ez eltér a hagyományos sor kezeléstől. A Kafka nyilvántartja, hogy melyik consumer egy adott partícióban melyik offset-nél tartott. Ezt egy speciális topic-ban tartja nyilván: "__...". Ha újra is indul a világ, akkor is tudni fogják a consumer-ek hogy hol tartottak, és onnan folytatják.


Producer -> serializator -> partitioner -> batch (partícionként) ->|idáig tart a producer | kafa broker

Producer:

bootstrap.servers: a brokerek listája. Itt nem kell az összes broker-t felsorolni, mert ha már az egyikhez hozzá tud csatlakozni, az elküldi a teljes cluster topológiát. Viszont ajánlatos többet megadni hibatűrés céljából. key.serializer: Akkor is meg kell adni, ha nem használunk kulcsot. A kulcs is mindig bájt tömb, ezért a serializátor implementációnknak bájt tömböt kell visszaadni. Vannak beépítettek a java kliensbe a java alaptípusokra. value.serializer

Producer

$ ./kafka-topics --create --zookeeper 192.168.42.113:32181 --replication-factor 1 --partitions 1 --topic test2-topic
Created topic test2-topic.

Command line producer

Java producer

Logback producer

Java consumer

Sorting messages with Logstash

A logstash-t rakhatjuk a kafka elé és a kafka után is. Első lépésként a kafka elé fogjuk tenni, ami szortírozni fogja a logokat különböző topic-okba.

Producer

A futtatásához szükséges környezet egy két node-os swarm cluster lesz.

# docker node ls
ID                            HOSTNAME            STATUS              AVAILABILITY        MANAGER STATUS      ENGINE VERSION
maigxlyagj1fl4sgcf6rnn9pc *   mg0                 Ready               Active              Leader              18.05.0-ce
vox99u5s1g1su742mc6npm370     worker0             Ready               Active                                  18.05.0-ce

Itt fogunk futtatni egy docker stack-et ami tartalmaz majd a zookeeper-en és a kafka-n kívül is a logstash-t.


ClipCapIt-190327-224419.PNG

Mind a három komponenst rá fogjuk kötni az ingress hálózatra is, mivel a Java producer-nek el kell érnie a logstash-t, és a consumer-nek pedig a kafa-t.

A loggolásra logback-et fogunk használni, aki a logstash 51415-ös portjára fogja küldeni TCP


Logstash konfiguráció

A lostash a TCP socket-en keresztül várja majd a logback-től a logokat. A logberben Marker-eket fogunk használni, amik a [tags] tömbbe fog tenni a logstash.

ClipCapIt-190327-230626.PNG

/usr/share/logstash/pipeline/logstash.conf

input {
  tcp { 
    port => 51415
    codec => "json"
  }
}

output {

  if "T1" in [tags] {

    kafka {
      codec => json
      bootstrap_servers => "kafka:29092"
      topic_id => "T1-topic"
    }
    
  } else if "T2" in [tags] {

    kafka {
      codec => json
      bootstrap_servers => "kafka:29092"
      topic_id => "T2-topic"
    }
    
  } else {

    kafka {
      codec => json
      bootstrap_servers => "kafka:29092"
      topic_id => "msg-topic"
    }

  }

  stdout {
    codec => rubydebug
  }
}

Swarm stack

Fontos, hogy a logstash-ből a 6.6-os szériát használjuk, mert a korábbi verziókban van egy kafak specifikus hiba. A logstash konfigurációt volume driver-er fogjuk felcsatolni.

version: '3.2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.1.2
    networks:
      - kafka-net
    ports:
      - "32181:32181"
    deploy:
      placement:
        constraints:
         - node.role == worker
    environment:
      ZOOKEEPER_CLIENT_PORT: 32181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_SYNC_LIMIT: 2
  kafka:
    image: confluentinc/cp-kafka:5.1.2
    networks:
      - kafka-net
    ports:
      - target: 29092
        published: 29092
        protocol: tcp
    deploy:
      placement:
        constraints:
         - node.role == worker
    environment:
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:32181"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:29092"
      KAFKA_BROKER_ID: 2
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  logstash:
    image: docker.elastic.co/logstash/logstash:6.6.2
    networks:
      - kafka-net
    ports:
      - "51415:51415"
    environment:
      LOGSPOUT: "ignore"
      XPACK_MONITORING_ENABLED: "false"
    volumes:
      - "logstash-conf:/usr/share/logstash/pipeline"
    deploy:   
      placement:
        constraints:
         - node.role == worker
      restart_policy:
        condition: on-failure
      resources:
        reservations:
          memory: 100m   
networks:
  kafka-net:
    driver: overlay
volumes:
  logstash-conf:
    driver: nfs
    driver_opts:
      share: 192.168.42.1:/home/adam/dockerStore/logstash/config/





Topic-ok legyártása:

# ./kafka-topics.sh --create --zookeeper 192.168.42.113:32181 --replication-factor 1 --partitions 1 --topic T1-topic
Created topic "T1-topic".

# ./kafka-topics.sh --create --zookeeper 192.168.42.113:32181 --replication-factor 1 --partitions 1 --topic T2-topic
Created topic "T2-topic".

# ./kafka-topics.sh --create --zookeeper 192.168.42.113:32181 --replication-factor 1 --partitions 1 --topic msg-topic
Created topic "msg-topic".


# ./kafka-topics.sh --list --zookeeper 192.168.42.113:32181 
__confluent.support.metrics
__consumer_offsets
T1-topic
msg-topic
T2-topic

Pom.xml

        <dependency>
			<groupId>ch.qos.logback</groupId>
			<artifactId>logback-classic</artifactId>
			<version>1.2.3</version>
			<scope>runtime</scope>
		</dependency>

		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-api</artifactId>
			<version>1.7.25</version>
		</dependency>

		<dependency>
			<groupId>net.logstash.logback</groupId>
			<artifactId>logstash-logback-encoder</artifactId>
			<version>5.3</version>
		</dependency>


logout.xml

A logstash a LogstashTcpSocketAppender appender-en keresztül fogja elküldeni a logokat a logstash-benek. IP címnek a stack bármelyik node IP címét megadhatjuk. Mi a worker0 címét használjuk. Nagyon fontos, hogy megadjuk a shutdownHook-ot, ami biztosítja, hogy a JVM leállása előtt még a logback elküldje az összes függőben lévő logot. Ha előbb leáll a JVM mint hogy a logstash el tudta volna küldeni a logokat, akkor azok már nem lesznek kiküldve.

<configuration>
<!--     <shutdownHook/> -->
    
    <!--  Nagyon fontos, hogy leállítsuk a logger context-et mielőtt a VM leáll, mert ha a VM leállítása 
          nagyon közel van a log beíráshoz, akkor még azelőtt leáll az egész VM, hogy a logokat kiírtuk volna. 
          de ha még a VM leállítása előtt meghívjuk a sthudownHook-ot, akkor leállás előtt még ki fogja írni a logokat.  -->
    <shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/>
    
	<appender name="STDOUT"
		class="ch.qos.logback.core.ConsoleAppender">
		<encoder>
			<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
			</pattern>
		</encoder>
	</appender>

	<appender name="stash"
		class="net.logstash.logback.appender.LogstashTcpSocketAppender">
		<destination>192.168.42.113:51415</destination>

		<!-- encoder is required -->
		<encoder class="net.logstash.logback.encoder.LogstashEncoder">
			<customFields>{"appname":"adam"}</customFields>
		</encoder>
	</appender>

	<root level="debug">
        <appender-ref ref="stash" />
        <appender-ref ref="STDOUT" />
		
	</root>
</configuration>


Java producer

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
public class App 
{	
    private static final Logger logger = LoggerFactory.getLogger(App.class);
	
    public static void main( String[] args )
    {    	

        Marker taMarker = MarkerFactory.getMarker("T1");
        Marker alMarker = MarkerFactory.getMarker("T2");
    	
    	logger.info(taMarker, "Message to T1 from: {}", "adam");    	
    	logger.info(alMarker, "Message to T2 from: {}", "adam");
    	
        System.out.println( "Hello World!" );
    }
}

Tesztelés

Logstash log:

confluence_logstash.1.4a9rr1w42iud@worker0    | {
confluence_logstash.1.4a9rr1w42iud@worker0    |     "level_value" => 20000,
confluence_logstash.1.4a9rr1w42iud@worker0    |     "logger_name" => "kafka.example2.App",
confluence_logstash.1.4a9rr1w42iud@worker0    |         "appname" => "adam",
confluence_logstash.1.4a9rr1w42iud@worker0    |            "port" => 41024,
confluence_logstash.1.4a9rr1w42iud@worker0    |           "level" => "INFO",
confluence_logstash.1.4a9rr1w42iud@worker0    |        "@version" => "1",
confluence_logstash.1.4a9rr1w42iud@worker0    |            "host" => "10.255.0.3",
confluence_logstash.1.4a9rr1w42iud@worker0    |         "message" => "Message to T1 from: adam",
confluence_logstash.1.4a9rr1w42iud@worker0    |     "thread_name" => "main",
confluence_logstash.1.4a9rr1w42iud@worker0    |      "@timestamp" => 2019-03-26T22:52:19.168Z,
confluence_logstash.1.4a9rr1w42iud@worker0    |            "tags" => [
confluence_logstash.1.4a9rr1w42iud@worker0    |         [0] "TA"
confluence_logstash.1.4a9rr1w42iud@worker0    |     ]
confluence_logstash.1.4a9rr1w42iud@worker0    | }
confluence_logstash.1.4a9rr1w42iud@worker0    | {
confluence_logstash.1.4a9rr1w42iud@worker0    |     "level_value" => 20000,
confluence_logstash.1.4a9rr1w42iud@worker0    |     "logger_name" => "kafka.example2.App",
confluence_logstash.1.4a9rr1w42iud@worker0    |         "appname" => "adam",
confluence_logstash.1.4a9rr1w42iud@worker0    |            "port" => 41024,
confluence_logstash.1.4a9rr1w42iud@worker0    |           "level" => "INFO",
confluence_logstash.1.4a9rr1w42iud@worker0    |        "@version" => "1",
confluence_logstash.1.4a9rr1w42iud@worker0    |            "host" => "10.255.0.3",
confluence_logstash.1.4a9rr1w42iud@worker0    |         "message" => "Message to T2 from: adam",
confluence_logstash.1.4a9rr1w42iud@worker0    |     "thread_name" => "main",
confluence_logstash.1.4a9rr1w42iud@worker0    |      "@timestamp" => 2019-03-26T22:52:19.176Z,
confluence_logstash.1.4a9rr1w42iud@worker0    |            "tags" => [
confluence_logstash.1.4a9rr1w42iud@worker0    |         [0] "AL"
confluence_logstash.1.4a9rr1w42iud@worker0    |     ]
confluence_logstash.1.4a9rr1w42iud@worker0    | }


Indítsunk el egy egy kafka-console-consumer.sh-t mind a T1 mind a T2 topic-kra.

./kafka-console-consumer.sh --bootstrap-server 192.168.42.113:29092 --topic T1-topic --from-beginning

{"level_value":20000,"logger_name":"kafka.example2.App","appname":"adam","port":41024,"level":"INFO","@version":"1","host":"10.255.0.3","message":"Message to T1 from: adam","thread_name":"main","@timestamp":"2019-03-26T22:52:19.168Z","tags":["T1"]}


./kafka-console-consumer.sh --bootstrap-server 192.168.42.113:29092 --topic T2-topic --from-beginning

{"level_value":20000,"logger_name":"kafka.example2.App","appname":"adam","port":41024,"level":"INFO","@version":"1","host":"10.255.0.3","message":"Message to T2 from: adam","thread_name":"main","@timestamp":"2019-03-26T22:52:19.176Z","tags":["T2"]}