Apache Kafka
Kafka bemutatása
Egy Kafka architektúra legalább egy Kafka szerverből (bróker) áll ami a konfigurációját kötelezően a Zookeeper nevű elosztott konfigurációs management rendszerben tárolja. A Kafka borker-hez csatlakoznak a termelők és fogyasztók. A Kafka cluster-ben úgynevezett topic-ok találhatók. A termelők mindig egy dedikált topik-ra írnak, és a fogyasztók mindig egy dedikált topic-ról olvasnak, tehát a topic az a logikai egység, ami egy termelő-fogyasztó páros számára az üzeneteket tárolja és továbbítja. Mikor elindítunk egy Kafa példányt, akkor valójában egy kafka brokert indítunk el. Ha producer-ek mindig egy brokerhez csatlakoznak. A teljes konfiguráció zookeeper-ben van tárolva. A zookeeper tudja értesíteni a klienseket ha a konfiguráció változik, ezért hamar elterjed a hálózaton a változás.
Egy topic úgynevezett partíciókra van osztva. Minden üzenet csak egy partícióba kerül be.
A producer-ek egy megadott topic-kra dobálják be az üzeneteket, amit onnan a consumer-ek kiolvasnak. Egy topic tetszőleges számú partícióból állhat. Egy partíció az a logikai egység, aminek rá kell férnie egy lemezre. A topic-kot úgy kell felskálázni, hogy egyre több partíciót adunk hozzá, amik különböző brokereken fognak létrejönni. Minden partíciónak lehet egy vagy több replikája, amik biztonsági másolatok. Mikor a producer beküld egy üzenetet egy partícióba, akkor fog committed üzenetnek minősülni, ha minden replikára is eljutott.
Azt, hogy egy producer melyik partícióba dobja az üzenetet vagy a kulcs határozza meg, vagy round-rubin módon mindig egy másikba teszi. Ha van kulcs, akkor az abból készült hash fogja meghatározni, hogy melyik partícióba kerüljön. Ugyan az a kulcs így mindig ugyan abba a partícióba fog kerülni. De a kulcs nem kötelező. A sorrend tartás csak egy partíción belül garantált, de ott nagyon. Ha nagyon kritikus bizonyos üzenetek sorrendje, akkor azokat egy partícióba kell rakni azonos kulcsot használva. Loggolásnál ez nem kritikus, egyrészt mert a logstash sorba rakja az üzeneteket, másrészt mikor elastichsearch-be szúrjuk, ott a dátum lesz az egyik attribútum, ami alapján már sorba lehet majd újra rendezni a logokat. Az meg amúgy sem kritikus, ha a log egy része enyhe csúszással kerül be az adatbázisba, lényeg, hogy végül helyes lesz a sorrend.
A comsumer-eket úgynevezett consumer-group-okba szervezzük az azonosítójuk szerint. Egy csoport mindig ugyan azon topic üzeneteit olvassa, de minden egyes consumer a csoporotban más és más partícióból. Minden partíció csak egy consumer-hez rendelhető hozzá egy csoporton belül. De ha nincs annyi consumer a csoportban mind ahány partíció, akkor egy consumer több partíciót is fog olvasni (ahogy ez a fenti ábrán is látszik, az alsó consumer két partíciót olvas. Viszont ha több consumer van mint partíció egy csoportban, akkor bizonyos consumer-ek mindig idle állapotban lesznek. Minden csoporton belül van egy vezető consumer, általában az aki először csatlakozott. Ő teríti a többieknek a cluster információkat.
A Kafka nem tudja értelmezni sem a kulcsot sem az üzenetet. Ez számára egy bájt tömb. Az, hogy egy objektumból hogy lesz bájt tömb kulcs és bájt tömb üzenet a producer-ben lévő serializátor dolga. A consumer-ben pedig a deserializázor dolga, hogy a bájt folyamból újra értelmes objektumot állítson elő.
Minden partíció új üzenete mindig a partíció végére íródik. A partíció elejétől számoljuk az üzenetek sorszámát, ezt hívjuk offset-nek. Mikor egy consumer kiolvas egy üzentet, attól az még ott marad a partícióba egészen addig, amíg len nem jár, alapértelmezetten ez egy nap. Tehát ez eltér a hagyományos sor kezeléstől. A Kafka nyilvántartja, hogy melyik consumer egy adott partícióban melyik offset-nél tartott. Ezt egy speciális topic-ban tartja nyilván: "__...". Ha újra is indul a világ, akkor is tudni fogják a consumer-ek hogy hol tartottak, és onnan folytatják.
A docker alapú claud világban egy tipikus architektúra a logok centralizált gyűjtésére, mikor egy logstash példány a producer és egy másik logstash példány a consumer. A konténer logokat a producer logstash kapja meg, aki a log sorok különböző paraméterei mentén a megfelelő Topic-ba tudja irányítani az üzeneteket. A consumer logstash pedig leszedi a Topic-rol az üzenetet és beírja Elasticsearch-be.
A Kafka világban nagyon széles a választéka a producer-eknek és consumer-eknek, akik képesek közvetlenül Kafka-ba írni és onnan olvasni. A Java világban a megfelelő Kafka lib-ek segítségével írhatunk Java producer-eket és consumer-eket amik olyan Java programok, amik közvetlenül írják ill. olvassák a Kafka topic-ot. A másik lehetőség a producer-re, hogy a logger keretrendszerünk Kafka kliens appender-jét használjuk, ami a rendszer logokat képes kapásból Kafka-ba írni. Ha letöltjük a Kafka programot, akkor abban található parancssori producer és consumer is, ami képes tesztelés céljából közvetlen beírni és kiolvasni egy topic-ból, ami nagyon hasznos a tesztelés során.
Környezet kialakítása
Az Avro futtatásához szükséges környezet egy két node-os swarm cluster lesz.
# virsh list Id Name State ---------------------------------------------------- 1 mg0 running 2 worker0 running
# docker node ls ID HOSTNAME STATUS AVAILABILITY MANAGER STATUS ENGINE VERSION maigxlyagj1fl4sgcf6rnn9pc * mg0 Ready Active Leader 18.05.0-ce vox99u5s1g1su742mc6npm370 worker0 Ready Active 18.05.0-ce
Itt fogunk futtatni egy docker stack-et ami tartalmaz majd egy kafka brókert és egy zookeeper példányt.
A zookeeper és a Kafka broker a kafka-net overlay hálózaton keresztül fognak kommunikálni. Azonban mind a Kafka-t mind a zookeper-t ki kell ajánlani az ingress hálózaton keresztül a külvilágnak, hogy a külső termelők és fogyasztók elérjék őket. A termelők és fogyasztók bármelyik swarm node-on keresztül elérik a Kafa-t ill a zookeeper-t, erről az ingress hálózat gondoskodik.
version: '3.2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.1.2
networks:
- kafka-net
ports:
- "32181:32181"
deploy:
placement:
constraints:
- node.role == worker
environment:
ZOOKEEPER_CLIENT_PORT: 32181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_SYNC_LIMIT: 2
kafka:
image: confluentinc/cp-kafka:5.1.2
networks:
- kafka-net
ports:
- target: 29092
published: 29092
protocol: tcp
deploy:
placement:
constraints:
- node.role == worker
environment:
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:32181"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:29092"
KAFKA_BROKER_ID: 2
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
A Kakfa környezeti váltók beállításánál:
- KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:29092" -> A broker a kafka:29092-n fog csatlakozást elfogadni nem titkosított csatornán (PLAINTEXT). Fontos lesz hogy a kliensek is kafka domain névvel keressék a broker-t. A kafka nevet bármelyik swarm node IP címével fel kell venni majd a klienseken a host fájlba.
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 -> ...
Producer
Command line producer
$ ./kafka-topics --create --zookeeper 192.168.42.113:32181 --replication-factor 1 --partitions 1 --topic test2-topic Created topic test2-topic.
Java producer
Logback producer
Java consumer
Sorting messages with Logstash
A logstash-t rakhatjuk a kafka elé és a kafka után is. Első lépésként a kafka elé fogjuk tenni, ami szortírozni fogja a logokat különböző topic-okba.
Producer
A futtatásához szükséges környezet egy két node-os swarm cluster lesz.
# docker node ls ID HOSTNAME STATUS AVAILABILITY MANAGER STATUS ENGINE VERSION maigxlyagj1fl4sgcf6rnn9pc * mg0 Ready Active Leader 18.05.0-ce vox99u5s1g1su742mc6npm370 worker0 Ready Active 18.05.0-ce
Itt fogunk futtatni egy docker stack-et ami tartalmaz majd a zookeeper-en és a kafka-n kívül is a logstash-t.
Mind a három komponenst rá fogjuk kötni az ingress hálózatra is, mivel a Java producer-nek el kell érnie a logstash-t, és a consumer-nek pedig a kafa-t.
A loggolásra logback-et fogunk használni, aki a logstash 51415-ös portjára fogja küldeni TCP
Logstash konfiguráció
A lostash a TCP socket-en keresztül várja majd a logback-től a logokat. A logberben Marker-eket fogunk használni, amik a [tags] tömbbe fog tenni a logstash.
/usr/share/logstash/pipeline/logstash.conf
input {
tcp {
port => 51415
codec => "json"
}
}
output {
if "T1" in [tags] {
kafka {
codec => json
bootstrap_servers => "kafka:29092"
topic_id => "T1-topic"
}
} else if "T2" in [tags] {
kafka {
codec => json
bootstrap_servers => "kafka:29092"
topic_id => "T2-topic"
}
} else {
kafka {
codec => json
bootstrap_servers => "kafka:29092"
topic_id => "msg-topic"
}
}
stdout {
codec => rubydebug
}
}
Swarm stack
Fontos, hogy a logstash-ből a 6.6-os szériát használjuk, mert a korábbi verziókban van egy kafak specifikus hiba. A logstash konfigurációt volume driver-er fogjuk felcsatolni.
version: '3.2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.1.2
networks:
- kafka-net
ports:
- "32181:32181"
deploy:
placement:
constraints:
- node.role == worker
environment:
ZOOKEEPER_CLIENT_PORT: 32181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_SYNC_LIMIT: 2
kafka:
image: confluentinc/cp-kafka:5.1.2
networks:
- kafka-net
ports:
- target: 29092
published: 29092
protocol: tcp
deploy:
placement:
constraints:
- node.role == worker
environment:
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:32181"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:29092"
KAFKA_BROKER_ID: 2
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
logstash:
image: docker.elastic.co/logstash/logstash:6.6.2
networks:
- kafka-net
ports:
- "51415:51415"
environment:
LOGSPOUT: "ignore"
XPACK_MONITORING_ENABLED: "false"
volumes:
- "logstash-conf:/usr/share/logstash/pipeline"
deploy:
placement:
constraints:
- node.role == worker
restart_policy:
condition: on-failure
resources:
reservations:
memory: 100m
networks:
kafka-net:
driver: overlay
volumes:
logstash-conf:
driver: nfs
driver_opts:
share: 192.168.42.1:/home/adam/dockerStore/logstash/config/
Topic-ok legyártása:
# ./kafka-topics.sh --create --zookeeper 192.168.42.113:32181 --replication-factor 1 --partitions 1 --topic T1-topic Created topic "T1-topic". # ./kafka-topics.sh --create --zookeeper 192.168.42.113:32181 --replication-factor 1 --partitions 1 --topic T2-topic Created topic "T2-topic". # ./kafka-topics.sh --create --zookeeper 192.168.42.113:32181 --replication-factor 1 --partitions 1 --topic msg-topic Created topic "msg-topic". # ./kafka-topics.sh --list --zookeeper 192.168.42.113:32181 __confluent.support.metrics __consumer_offsets T1-topic msg-topic T2-topic
Pom.xml
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>5.3</version>
</dependency>
logout.xml
A logstash a LogstashTcpSocketAppender appender-en keresztül fogja elküldeni a logokat a logstash-benek. IP címnek a stack bármelyik node IP címét megadhatjuk. Mi a worker0 címét használjuk. Nagyon fontos, hogy megadjuk a shutdownHook-ot, ami biztosítja, hogy a JVM leállása előtt még a logback elküldje az összes függőben lévő logot. Ha előbb leáll a JVM mint hogy a logstash el tudta volna küldeni a logokat, akkor azok már nem lesznek kiküldve.
<configuration>
<!-- <shutdownHook/> -->
<!-- Nagyon fontos, hogy leállítsuk a logger context-et mielőtt a VM leáll, mert ha a VM leállítása
nagyon közel van a log beíráshoz, akkor még azelőtt leáll az egész VM, hogy a logokat kiírtuk volna.
de ha még a VM leállítása előtt meghívjuk a sthudownHook-ot, akkor leállás előtt még ki fogja írni a logokat. -->
<shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/>
<appender name="STDOUT"
class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
</pattern>
</encoder>
</appender>
<appender name="stash"
class="net.logstash.logback.appender.LogstashTcpSocketAppender">
<destination>192.168.42.113:51415</destination>
<!-- encoder is required -->
<encoder class="net.logstash.logback.encoder.LogstashEncoder">
<customFields>{"appname":"adam"}</customFields>
</encoder>
</appender>
<root level="debug">
<appender-ref ref="stash" />
<appender-ref ref="STDOUT" />
</root>
</configuration>
Java producer
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
public class App
{
private static final Logger logger = LoggerFactory.getLogger(App.class);
public static void main( String[] args )
{
Marker taMarker = MarkerFactory.getMarker("T1");
Marker alMarker = MarkerFactory.getMarker("T2");
logger.info(taMarker, "Message to T1 from: {}", "adam");
logger.info(alMarker, "Message to T2 from: {}", "adam");
System.out.println( "Hello World!" );
}
}
Tesztelés
Logstash log:
confluence_logstash.1.4a9rr1w42iud@worker0 | { confluence_logstash.1.4a9rr1w42iud@worker0 | "level_value" => 20000, confluence_logstash.1.4a9rr1w42iud@worker0 | "logger_name" => "kafka.example2.App", confluence_logstash.1.4a9rr1w42iud@worker0 | "appname" => "adam", confluence_logstash.1.4a9rr1w42iud@worker0 | "port" => 41024, confluence_logstash.1.4a9rr1w42iud@worker0 | "level" => "INFO", confluence_logstash.1.4a9rr1w42iud@worker0 | "@version" => "1", confluence_logstash.1.4a9rr1w42iud@worker0 | "host" => "10.255.0.3", confluence_logstash.1.4a9rr1w42iud@worker0 | "message" => "Message to T1 from: adam", confluence_logstash.1.4a9rr1w42iud@worker0 | "thread_name" => "main", confluence_logstash.1.4a9rr1w42iud@worker0 | "@timestamp" => 2019-03-26T22:52:19.168Z, confluence_logstash.1.4a9rr1w42iud@worker0 | "tags" => [ confluence_logstash.1.4a9rr1w42iud@worker0 | [0] "TA" confluence_logstash.1.4a9rr1w42iud@worker0 | ] confluence_logstash.1.4a9rr1w42iud@worker0 | } confluence_logstash.1.4a9rr1w42iud@worker0 | { confluence_logstash.1.4a9rr1w42iud@worker0 | "level_value" => 20000, confluence_logstash.1.4a9rr1w42iud@worker0 | "logger_name" => "kafka.example2.App", confluence_logstash.1.4a9rr1w42iud@worker0 | "appname" => "adam", confluence_logstash.1.4a9rr1w42iud@worker0 | "port" => 41024, confluence_logstash.1.4a9rr1w42iud@worker0 | "level" => "INFO", confluence_logstash.1.4a9rr1w42iud@worker0 | "@version" => "1", confluence_logstash.1.4a9rr1w42iud@worker0 | "host" => "10.255.0.3", confluence_logstash.1.4a9rr1w42iud@worker0 | "message" => "Message to T2 from: adam", confluence_logstash.1.4a9rr1w42iud@worker0 | "thread_name" => "main", confluence_logstash.1.4a9rr1w42iud@worker0 | "@timestamp" => 2019-03-26T22:52:19.176Z, confluence_logstash.1.4a9rr1w42iud@worker0 | "tags" => [ confluence_logstash.1.4a9rr1w42iud@worker0 | [0] "AL" confluence_logstash.1.4a9rr1w42iud@worker0 | ] confluence_logstash.1.4a9rr1w42iud@worker0 | }
Indítsunk el egy egy kafka-console-consumer.sh-t mind a T1 mind a T2 topic-kra.
./kafka-console-consumer.sh --bootstrap-server 192.168.42.113:29092 --topic T1-topic --from-beginning {"level_value":20000,"logger_name":"kafka.example2.App","appname":"adam","port":41024,"level":"INFO","@version":"1","host":"10.255.0.3","message":"Message to T1 from: adam","thread_name":"main","@timestamp":"2019-03-26T22:52:19.168Z","tags":["T1"]}
./kafka-console-consumer.sh --bootstrap-server 192.168.42.113:29092 --topic T2-topic --from-beginning {"level_value":20000,"logger_name":"kafka.example2.App","appname":"adam","port":41024,"level":"INFO","@version":"1","host":"10.255.0.3","message":"Message to T2 from: adam","thread_name":"main","@timestamp":"2019-03-26T22:52:19.176Z","tags":["T2"]}