7,540
edits
Changes
→Kerberos autentikációval SSL felett
:[[File:ClipCapIt-190327-232724.PNG|200px]]
version: '3.2'
services:
image: confluentinc/cp-zookeeper:5.1.2
networks:
- kafakafka-net ports: - "32181:32181"
deploy:
placement:
image: confluentinc/cp-kafka:5.1.2
networks:
- kafakafka-net
ports:
- target: 29092
published: 29092
protocol: tcp
deploy:
placement:
- node.role == worker
environment:
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:32181" KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:29092"
KAFKA_BROKER_ID: 2
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
</source>
Hozzuk létre a docker stack-et:
<pre>
# docker node lsID HOSTNAME STATUS AVAILABILITY MANAGER STATUSkse3a58x6f34o7wp7mxum7nux worker2 Ready Active n39y1bvv50r7o12bijmm57kl2 worker0 Ready Active ux3lwoqql2hiv6l9ylyx2rkar * mg0 Ready Active Leaderzwumqwkmo32zkg79gqztff397 worker1 Ready Active stack deploy -c confluent_swarm.yaml confluent
</pre>
Listázzuk ki a stack-ban létrejött service-eket és az overlay hálózatot:
<pre>
# docker service ls
ID NAME MODE REPLICAS IMAGE PORTS
7vjvop7tqiyc confluent_kafka replicated 1/1 confluentinc/cp-kafka:5.1.2 *:29092->29092/tcp
oxxjtkcusj1f confluent_zookeeper replicated 1/1 confluentinc/cp-zookeeper:5.1.2 *:32181->32181/tcp
</pre>
És listázzuk ki az összes swarm hálózatot. Láthatjuk hogy létrejött a kfaka-net nevű overlay hálózat.
<pre>
# docker stack deploy -c confluentinc_swaromnetwork lsNETWORK ID NAME DRIVER SCOPE...yml confluentCreating service 5albky0eu1to confluent_kafka-net overlay swarmCreating service confluent_zookeeperolqkh5zlqiac ingress overlay swarm...
</pre>
<br>
==Topic-ok kezelése==
Az összes példa során a '''test2-topic''' nevű topic-ot fogjuk használni. A topic-ok kezeléséhez a Kafka csomagban a bin mappába találunk adminisztrációs scripteket. Töltsük le a Kafka-t vagy a kafak.apache.org-ról (https://kafka.apache.org/downloads) vagy a confluent oldaláról (https://www.confluent.io/download/), amiben jóval több script-et találunk mint az apache-os változatban.
Új topic-ot a '''kafka-topics''' paranccsal készíthetünk. Paraméterként meg kell adni a zookeeper szerver elérhetőségét, mivel a Kafka a konfigurációt a zookeeper-ben tárolja, így az új topic-ot a zookeeper-be kell beírni. A zookeeper-t publikáltuk az ingress hálózatra, így bármelyik node IP címével és a publikált porttal (32181) elérhetjük a szervert.
<pre>
# docker service ps confluent_kafkaID NAME IMAGE NODE DESIRED STATE PORTS-machine ip worker0w74azzl3qzwa confluent_kafka192.1 confluentinc/cp-kafka:5168.142.2 worker1 Running *:29092->29092113
</pre>
És most hozzuk létre a '''test2-topic''' nevű topic-ot.
<pre>
$ ./kafka-topics --create --zookeeper 192.168.42.113:32181 --replication-factor 1 --partitions 1 --topic test2-topic
Created topic test2-topic.
</pre>
A --list kapcsolóval listázhatjuk a Kafka cluster-ben elérhető topic-okat, amit a parancs szintén a zookeeper cluster-ből olvas ki.
<pre>
</pre>
<br>
=Producer=
==Command line producer==
===Autentikáció nélkül===
A legegyszerűbben a '''kafka-console-producer''' script-el írhatunk egy Kafka topic-ba. Ez a parancs része a Kafka csomagnak, benne van mind az Apache mind a Confluent csomagban is.
* https://kafka.apache.org/downloads
* https://www.confluent.io/download/
<pre>
</pre>
Indítsunk el ugyan arra a topic-ra egy consumer-t hogy láthassuk hogy megjön e a üzenet.
<pre>
</pre>
Mivel a Kafka topic-ban addig marad meg egy üzenet amíg le nem ár, ezért ha a consumer-t úgy állítjuk be, hogy minden induláskor a topic elejéről olvasson (--from-beginning) ezért minden olyan üzenetet ki fog olvasni, amit valaha beírtak a topic-ba.
<br>
<br>
===Kerberos autentikációval SSL felett===
A Kerberos authentikácó alapja a Keytab fájl, ami egy bináris fájl, ebben található a kliens kulcsa és principálja. Ezen felül ha ha kafka borekerekhez SSL-el felett kell csatlakozni, akkor szükség van a brokerek certifikációjának a root CA-jára, amit be kell tenni egy trust-stor-ba. A Kerberos authentikáció használatához az alábbi fájlokra van szükség:
* '''java trustStore''': a brokerek Cert-je vagy a root CA
* '''jaas config''': (Java Authentication and Authorization Service): Itt kell megadni, hogy Kerberos-t akarunk használni, ezen felül itt kell megadni a kerberos modult is.
* '''producer.properties''': Kafa producer beállítások: itt adjuk meg, hogy SSL felett menjen a Kerberos authentikáció.
* '''keytab''': egy bináris fájl, amiben a Kerberos kliens titkos kulcsa van. Ezt a Kerberos üzemeltetés adja.
* '''principal''': A Kerberos "felhasználó nevünk": ezt is a Kerberos üzemeltetés adja.
* '''krb5.conf''': Ez a kerberos kliens konfigurációs fájlja. Ebben van megadva a Kerberos autentikációs szerverek címe és portja. Ezt is az üzemeletetés adja.
<br>
<pre>
Hozzuk létre a Java Autentikációs és Autokorrelációs rendszer konfigurációs fájlját, ahol kikényszerítjük a Kerberos használatát a Java autentikáció során: <br>kafka_client_jaas.conf<pre>KafkaClient {com.sun.security.auth.module.Krb5LoginModule required debug=true useKeyTab=true storeKey=true serviceName="kafka" keyTab="/usrhome/binkafkaconf/kafkatest-topics --create --zookeeper zookeeper:32181 --replication-factor 1 --partitions 1 --topic adamclient.keytab" Created topic principal="adam_test-client@CORP.BERKI.ORG".;};
</pre>
Itt kell megadni a keytab fájl helyét, és a Kerberos principal-t, amire a keytab ki lett állítva. A keytab fájlt és a principal-t mindig a Kerberos üzemeltetője adja meg. A '''Krb5LoginModule''' modult be kell töltse a Java a Kerberos használatához. A '''serviceName''' paramétert megadhatjuk a producer.properties fájlban is. Ennek az értékét is a Kerberos üzemeltetéstől kell megkapjuk.
{{warning|Fontos, hogy a principal legyen az utolsó sorban, és hogy a sor végét ;-vel zárjuk le, akár csak az egész fájlt. }}
<pre>
</pre>
<br>
<br>
<pre>
</pre>
<pre>
</pre>
<br><br> ==Java producer==A Java klienssel közvetlenül fogunk üzeneteket írni egy Kafka topic-ba. ===Pom.xml===Ahogy azt már a swarm stack létrehozásánál láthattuk az apache Kafka helyett a confluent Kafka termékcsaládot fogjuk használni. A confluent Kafka kliens letöltéséhe hozzá kell adni a maven pom.xml-hez a confluent repository-t. Két dependenciára van szükségünk. A serializációs osztályok a '''kafa''' csomagban vannak, míg a producer a '''kafka-clients''' csomagban van. <source lang="xml"> <repositories> <repository> <id>confluent</id> <url>https://packages.confluent.io/maven/</url> </repository> </repositories>... <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.1.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>2.1.1-cp1</version> </dependency> </dependencies></source><br> ===Java kód===<source lang="java">import org.apache.kafka.clients.producer.*;import org.apache.kafka.common.serialization.LongSerializer;import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties; public class KafkaProducerExample { private final static String TOPIC = "test2-topic"; private static Producer<Long, String> createProducer() { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:29092"); props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaProducer"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); return new KafkaProducer<>(props); } static void runProducer() throws Exception { final Producer<Long, String> producer = nullcreateProducer(); Properties config long key = new PropertiesSystem.currentTimeMillis();
try {
final ProducerRecord<Long, String> record = new ProducerRecord<>(TOPIC, key, "Hello World");
RecordMetadata metadata = producer.send(record).get();
} finally {
}
}
public static void main(String... args) throws Exception {
runProducer();
}
}
</source>
Az összes Kafka specifikus beállítást a Properties map-ben kell megadni. Ezek közül a legfontosabb a Kafka bróker címe. A kafka host nevet felvettük a host fájlba a worker0 node IP címével, de bármelyik swarm node IP címét választhatjuk. A port az a port, amit az ingress hálózaton publikáltunk a Kafka service-hez.
<source lang="java">
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:29092");
</source>
Fontos megadni a kulcs és az üzenet Serializációs módját. Itt azt választottuk, hogy a kulcs egy Long lesz, míg az üzenet egy mezei string.
<source lang="java">
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Bármit is választunk a serializáláshoz, fontos hogy a consumer-ben is csak ezzel kompatibilis deszerializálót választhatunk. A serializáló metódusoknak széles a választéka, pl JSON-t is küldhetünk.
</source>
<br>
==Logback producer==
===Pom.xml===
<source lang="xml">
<dependencies>
...
<dependency>
<groupId>com.github.danielwegener</groupId>
<artifactId>logback-kafka-appender</artifactId>
<version>0.2.0-RC1</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-json</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
....
</dependencies>
</source>
===Logback.xml===
A logback-ek Kafka topic-okba a '''com.github.danielwegener.logback.kafka.KafkaAppender''' osztállyal lehet írni ami egy szabványos Logback appender.
Teljes leírás itt: https://github.com/danielwegener/logback-kafka-appender
<br>
logback.xml
<source lang="xml">
<configuration>
<shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/>
<appender name="kafkaAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender">
<encoder class="net.logstash.logback.encoder.LogstashEncoder">
<customFields>{"application":"this is the extra field"}</customFields>
</encoder>
<topic>test2-topic</topic>
<keyingStrategy class="com.github.danielwegener.logback.kafka.keying.HostNameKeyingStrategy" />
<deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy" />
<appendTimestamp>true</appendTimestamp>
<producerConfig>bootstrap.servers=kafka:9092</producerConfig>
</appender>
<root level="info">
<appender-ref ref="kafkaAppender" />
</root>
</configuration>
</source>
Nagyon fontos, hogy leállítsuk a logger context-et mielőtt a VM leáll, mert ha a VM leállítása nagyon közel van a log beíráshoz, akkor még azelőtt leáll az egész VM, hogy a logokat kiírtuk volna. Nagyon rövid életű programokban, mint amilyen a mi példa programunk, hamarabb leállhat a VM, mint hogy el tudta volna küldeni a logback a Kafka-nak az üzenetet. Ha még a VM leállítása előtt meghívjuk a sthudownHook-ot, akkor leállás előtt még ki fogja írni a logokat.
<source lang="xml">
<shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/>
</source>
A kulcs kezelési stratégiát a '''keyingStrategy''' paraméterben kell definiálni. A kulcsok kitöltése nem kötelező, de szintén hatással lehet a performanciára. Ha a kulcs minden üzenetben ugyan az, akkor az összes üzenet ugyanabba a partícióba fog kerülni, ami nem a legjobb, de cserébe sorrendtartó lesz. Több kulcskezelési stratégia közül választhatunk. A '''NoKeyKeyingStrategy''' hatására nem fog kulcsot generálni, így round robin módon fog mindig egy új partíciót választani. Ha a '''HostNameKeyingStrategy''' stratégiát választjuk, akkor a host név lesz a kulcs, tehát az azonos hostról érkező logsorok mindig ugyan abba a partícióba fognak kerülni.
<source lang="xml">
<keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy" />
VAGY
<keyingStrategy class="com.github.danielwegener.logback.kafka.keying.HostNameKeyingStrategy" />
</source>
A Kafka brókerek listáját a '''bootstrap.servers''' producerConfig paraméterben kell megadni. Nagyon fontos, hogy ugyan azzal a host névvel tegyük ezt ide, mint ahogy a swarm-ban létrehoztuk, és az itt megadott nevet fel kell venni a hosts-ba. A producerConfig-ok teljes listája itt olvasható: ttps://kafka.apache.org/documentation.html#producerconfigs
<source lang="xml">
<producerConfig>bootstrap.servers=kafka:9092</producerConfig>
</source>
A '''customFields''' paraméterben tetszőleges log paramétereket adhatunk a Kafka üzenethez.
<source lang="xml">
<customFields>{"application":"this is the extra field"}</customFields>
</source>
<br>
===Java kód===
<source lang="java">
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class LogbackExample {
private static final Logger logger = LoggerFactory.getLogger(LogbackExample.class.getSimpleName());
public static void main(String... args) throws InterruptedException {
logger.info("this is the message:");
}
}
</source>
===Futtatás===
Indítsuk el a kafka-console-consumer -t a test2-topic-ra, hogy lássuk, hogy a logback milyen üzeneteket tesz be:
<pre>
./kafka-console-consumer \
--bootstrap-server kafka:29092 \
--topic test2-topic \
--from-beginning
</pre>
Futtassuk le a LogbackExample java programot, ekkor a consumer ki fogja írni a logback által beküldött üzenetet:
<pre>
{"@timestamp":"2019-04-21T12:44:32.430+02:00","@version":"1","message":"this is the message:","logger_name":"LogbackExample","thread_name":"main","level":"INFO","level_value":20000,"HOSTNAME":"adamDell2","application":"this is the extra field"}
</pre>
Látható, hogy bekerült az üzenetbe a timestamp is és az extra mező is, amit a logback appender-ben adtunk hozzá.
<br>
<br>
===Custom log object===
Ahogy a logback hagyományos használata mellett, itt is lehetőség van egyedi üzenet objektumok használatára.
1. Az egyik lehetőség az MDC - Mapped Diagnostic Context (https://logback.qos.ch/manual/mdc.html) használata, amivel egyedi mezőket adhatunk hozzá a log-hoz, ami a kafka üzenetben is meg fog jelenni:
<source lang="java">
import org.slf4j.MDC;
...
MDC.put("transactionId", "1111");
logger.info("this is the message:");
</source>
A fenit üzenet a kafka consumer-ben így fog megjelenni:
<pre>
{"@timestamp":"2019-04-21T16:20:34.620+02:00","@version":"1","message":"this is the message:","logger_name":"LogbackExample","thread_name":"main","level":"INFO","level_value":20000,"HOSTNAME":"adamDell2","transactionId":"1111","application":"this is the extra field"}
</pre>
<br>
<br>
2. A másik lehetőség a '''net.logstash.logback.marker.Markers''' használata, amivel tetszőleges java POJO-kat írhatunk be JSON formátumban a logba. A példában az alábbi '''LogMessage''' java objektumot fogjuk használni.
<source lang="java">
public class LogMessage {
private String feild1;
private String field2;
public LogMessage(String feild1, String field2) {
this.feild1 = feild1;
this.field2 = field2;
}
public String getFeild1() {
return feild1;
}
public String getField2() {
return field2;
}
public void setFeild1(String feild1) {
this.feild1 = feild1;
}
public void setField2(String field2) {
this.field2 = field2;
}
}
</source>
A java osztályban az import-ok közé felvesszüka Markers.append-t.
<source lang="java">
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static net.logstash.logback.marker.Markers.append;
public class LogbackExample {
private static final Logger logger = LoggerFactory.getLogger(LogbackExample.class.getSimpleName());
public static void main(String... args) throws InterruptedException {
LogMessage message = new LogMessage("first", "second");
logger.info(append("customFieldName", message), "this is the message");
}
}
</source>
A consume-ben az üzenet az alábbi lesz:
<pre>
{"@timestamp":"2019-04-21T16:20:35.140+02:00","@version":"1","message":"this is the message","logger_name":"LogbackExample","thread_name":"main","level":"INFO","level_value":20000,"HOSTNAME":"adamDell2","transactionId":"1111","customFieldName":{"feild1":"first","field2":"second"},"application":"this is the extra field"}
</pre>
==Logstash producer with logback ==
Mind a három komponenst rá fogjuk kötni az ingress hálózatra is, mivel a Java producer-nek el kell érnie a logstash-t, és a consumer-nek pedig a kafa-t. A loggolásra logback-et fogunk használni, aki a logstash 51415-ös portjára fogja küldeni TCP socket-en. Az üzeneteket a '''kafka-console-consumer'''-el fogjuk kiolvasni. ===Logstash konfiguráció=== A lostash a TCP socket-en keresztül várja majd a logback-től a logokat. A logberben Marker-eket fogunk használni, amik a [tags] tömbbe fog tenni a logstash. A logstash a Kafka output plugin segítségével fogja beírni a megfelelő topic-ba az üzeneteket. :[[File:ClipCapIt-190421-163511.PNG]] Az alap logstash image már tartalmazz mind a Kafka input és output plugin-t is, így kapásból tudunk a logstash-el Kafka-ból írni és olvasni. Listázzuk ki a logstash plugin-eket a '''bin/logstash-plugin list''' paranccsal. Láthatjuk hogy a kafka mind az input mind az output-ban ott van. <pre># docker run -it docker.elastic.co/logstash/logstash:6.6.2 bin/logstash-plugin list...logstash-input-kafka...logstash-output-kafka</pre> /usr/share/logstash/pipeline/'''logstash.conf'''<source lang="xml">input { tcp { port => 51415 codec => "json" }} output { if "T1" in [tags] { kafka { codec => json bootstrap_servers => "kafka:29092" topic_id => "T1-topic" } } else if "T2" in [tags] { kafka { codec => json bootstrap_servers => "kafka:29092" topic_id => "T2-topic" } } else { kafka { codec => json bootstrap_servers => "kafka:29092" topic_id => "msg-topic" } } stdout { codec => rubydebug }}</source> ===Swarm stack=== Fontos, hogy a logstash-ből a 6.6-os szériát használjuk, mert a korábbi verziókban van egy kafak specifikus hiba. A logstash konfigurációt volume driver-er fogjuk felcsatolni host gépről. (A volume dirver-ekről részletek itt: https://wiki.berki.org/index.php/Docker_volume_orchestration) <source lang="yamlC++">
version: '3.2'
services:
networks:
- kafka-net
ports:
- "32181:32181"
deploy:
placement:
published: 29092
protocol: tcp
deploy:
placement:
- node.role == worker
environment:
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:32181" KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:29092"
KAFKA_BROKER_ID: 2
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
networks:
- kafka-net
ports:
- 8081"51415:808151415" environment: LOGSPOUT: "ignore" XPACK_MONITORING_ENABLED: "false" volumes: - "logstash-conf:/usr/share/logstash/pipeline" deploy:
placement:
constraints:
networks:
kafka-net:
driver: overlay
volumes:
logstash-conf:
driver: nfs
driver_opts:
share: 192.168.42.1:/home/adam/dockerStore/logstash/config/
</source>
Topic-ok legyártása:
<pre>
# ./kafka-topics.sh --create --zookeeper 192.168.42.113:32181 --replication-factor 1 --partitions 1 --topic T1-topic
Created topic "T1-topic".
# ./kafka-topics.sh --create --zookeeper 192.168.42.113:32181 --replication-factor 1 --partitions 1 --topic T2-topic
Created topic "T2-topic".
# ./kafka-topics.sh --create --zookeeper 192.168.42.113:32181 --replication-factor 1 --partitions 1 --topic msg-topic
Created topic "msg-topic".
# ./kafka-topics.sh --list --zookeeper 192.168.42.113:32181
__confluent.support.metrics
__consumer_offsets
T1-topic
msg-topic
T2-topic
</pre>
===Pom.xml===
A logback a logstash-be a '''logstash-logback-encoder''' -el fog írni. Ehhez szükség van egy új függőségre:
<source lang="xml">
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>5.3</version>
</dependency>
</source>
===logback.xml===
A logstash a '''LogstashTcpSocketAppender''' appender-en keresztül fogja elküldeni a logokat a logstash-benek. IP címnek a stack bármelyik node IP címét megadhatjuk. Mi a worker0 címét használjuk. Nagyon fontos, hogy megadjuk a shutdownHook-ot, ami biztosítja, hogy a JVM leállása előtt még a logback elküldje az összes függőben lévő logot. Ha előbb leáll a JVM mint hogy a logstash el tudta volna küldeni a logokat, akkor azok már nem lesznek kiküldve.
<source lang="xml">
<configuration>
<shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/>
<appender name="STDOUT"
class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
</pattern>
</encoder>
</appender>
<appender name="stash"
class="net.logstash.logback.appender.LogstashTcpSocketAppender">
<destination>192.168.42.113:51415</destination>
<!-- encoder is required -->
<encoder class="net.logstash.logback.encoder.LogstashEncoder">
<customFields>{"appname":"adam"}</customFields>
</encoder>
</appender>
<root level="debug">
<appender-ref ref="stash" />
<appender-ref ref="STDOUT" />
</root>
</configuration>
</source>
===Java logger example===
<source lang="java">
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
public class App
{
private static final Logger logger = LoggerFactory.getLogger(App.class);
public static void main( String[] args )
{
Marker taMarker = MarkerFactory.getMarker("T1");
Marker alMarker = MarkerFactory.getMarker("T2");
logger.info(taMarker, "Message to T1 from: {}", "adam");
logger.info(alMarker, "Message to T2 from: {}", "adam");
}
}
</source>
===Tesztelés===
Mivel a logstash config-ba beletettük az '''stdout''' output-ot is, ezért a log-ba is be fog írni minden üzenetet:
<pre>
confluence_logstash.1.4a9rr1w42iud@worker0 | {
confluence_logstash.1.4a9rr1w42iud@worker0 | "level_value" => 20000,
confluence_logstash.1.4a9rr1w42iud@worker0 | "logger_name" => "kafka.example2.App",
confluence_logstash.1.4a9rr1w42iud@worker0 | "appname" => "adam",
confluence_logstash.1.4a9rr1w42iud@worker0 | "port" => 41024,
confluence_logstash.1.4a9rr1w42iud@worker0 | "level" => "INFO",
confluence_logstash.1.4a9rr1w42iud@worker0 | "@version" => "1",
confluence_logstash.1.4a9rr1w42iud@worker0 | "host" => "10.255.0.3",
confluence_logstash.1.4a9rr1w42iud@worker0 | "message" => "Message to T1 from: adam",
confluence_logstash.1.4a9rr1w42iud@worker0 | "thread_name" => "main",
confluence_logstash.1.4a9rr1w42iud@worker0 | "@timestamp" => 2019-03-26T22:52:19.168Z,
confluence_logstash.1.4a9rr1w42iud@worker0 | "tags" => [
confluence_logstash.1.4a9rr1w42iud@worker0 | [0] "TA"
confluence_logstash.1.4a9rr1w42iud@worker0 | ]
confluence_logstash.1.4a9rr1w42iud@worker0 | }
confluence_logstash.1.4a9rr1w42iud@worker0 | {
confluence_logstash.1.4a9rr1w42iud@worker0 | "level_value" => 20000,
confluence_logstash.1.4a9rr1w42iud@worker0 | "logger_name" => "kafka.example2.App",
confluence_logstash.1.4a9rr1w42iud@worker0 | "appname" => "adam",
confluence_logstash.1.4a9rr1w42iud@worker0 | "port" => 41024,
confluence_logstash.1.4a9rr1w42iud@worker0 | "level" => "INFO",
confluence_logstash.1.4a9rr1w42iud@worker0 | "@version" => "1",
confluence_logstash.1.4a9rr1w42iud@worker0 | "host" => "10.255.0.3",
confluence_logstash.1.4a9rr1w42iud@worker0 | "message" => "Message to T2 from: adam",
confluence_logstash.1.4a9rr1w42iud@worker0 | "thread_name" => "main",
confluence_logstash.1.4a9rr1w42iud@worker0 | "@timestamp" => 2019-03-26T22:52:19.176Z,
confluence_logstash.1.4a9rr1w42iud@worker0 | "tags" => [
confluence_logstash.1.4a9rr1w42iud@worker0 | [0] "AL"
confluence_logstash.1.4a9rr1w42iud@worker0 | ]
confluence_logstash.1.4a9rr1w42iud@worker0 | }
</pre>
Indítsunk el egy egy '''kafka-console-consumer.sh'''-t mind a T1 mind a T2 topic-kra.
<pre>
./kafka-console-consumer.sh --bootstrap-server 192.168.42.113:29092 --topic T1-topic --from-beginning
{"level_value":20000,"logger_name":"kafka.example2.App","appname":"adam","port":41024,"level":"INFO","@version":"1","host":"10.255.0.3","message":"Message to T1 from: adam","thread_name":"main","@timestamp":"2019-03-26T22:52:19.168Z","tags":["T1"]}
</pre>
<pre>
./kafka-console-consumer.sh --bootstrap-server 192.168.42.113:29092 --topic T2-topic --from-beginning
{"level_value":20000,"logger_name":"kafka.example2.App","appname":"adam","port":41024,"level":"INFO","@version":"1","host":"10.255.0.3","message":"Message to T2 from: adam","thread_name":"main","@timestamp":"2019-03-26T22:52:19.176Z","tags":["T2"]}
</pre>
<br>
=Consumer=
==Command line consumer==
A legegyszerűbben a '''kafka-console-consumer'''-el olvashatunk egy topic-ot, ez is része a standard Apache Kafka és a Confluent csomagnak is. Akárcsak a kafka-console-producer, ez is a bin mappában találathó és szintén a Kafka brokert és a topic nevét kell megadni.
<pre>
./kafka-console-consumer \
--bootstrap-server kafka:29092 \
--topic test2-topic \
--from-beginning
</pre>
Ha elindítottuk a '''kafka-console-consumer'''-t, akkor a Kafka service logjában láthatjuk, hogy a console consumer regisztrálta magát és ő lett a csoport vezetője. Minden consumer csoportnak van egy vezetője, akin keresztül a többi consumer a csoportban megkapja a konfigurációs változásokat.
<pre>
# docker service logs -f confluent_kafka
...
INFO [GroupCoordinator 2]: Assignment received from leader for group console-consumer-73627 for generation 1 (kafka.coordinator.group.GroupCoordinator)
</pre>
A '''kafka-console-consumer''' mi fog loggolni minden egyes új üzenetet a topic-ról.
<pre>
{"@timestamp":"2019-04-21T12:44:32.896+02:00","@version":"1","message":"LogMessage@49d904ec","logger_name":"LogbackExample","thread_name":"main","level":"INFO","level_value":20000,"HOSTNAME":"adamDell2","transactionId":"444","metric":{"feild1":"first","field2":"second"},"application":"this is the extra field"}
</pre>
<br>
==Java consumer==
A Java consumer-nek ugyan azokra a maven függőségekre van szüksége mint a producer-nek, ezrét ezeket itt nem ismételjük meg.
<source lang="java">
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
private final static String TOPIC = "test2-topic";
private static Consumer<String, String> createConsumer() {
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:29092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleConsumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
final Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(TOPIC));
return consumer;
}
static void runConsumer() throws InterruptedException {
final Consumer<String, String> consumer = createConsumer();
final int giveUp = 100;
int noRecordsCount = 0;
while (true) {
final ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
if (consumerRecords.count() == 0) {
noRecordsCount++;
if (noRecordsCount > giveUp)
break;
else
continue;
}
consumerRecords.forEach(record -> {
try {
System.out.printf("Consumer Record:(key: %s value: %s, partition: %d, offset: %d)\n",record.key(), record.value(),
record.partition(), record.offset());
} catch (Exception e) {
e.printStackTrace();
}
});
consumer.commitAsync();
}
consumer.close();
System.out.println("DONE");
}
public static void main(String... args) throws Exception {
runConsumer();
}
}
</source>
A Kafka specifikus beállításokat szintén a Properties map-ben kell megadni, és megegyeznek a producer-nél bemutatott paraméterekkel.
<source lang="java">
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:29092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleConsumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
</source>
Fontos, hogy csak olyan deszerializátort használjunk, ami kompatibilis az üzenet fajtájával, vagyis olyat, ami kompatibilis a producer-nél használt serializátórral. Pl. egy JsonSerializer-el írt üzenetet ki lehet olvasni egy StringDeerializer-el is.
<source lang="java">
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
VAGY
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
VAGY
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonDeserializer");
</source>
<br>
==Spring-Kafa consumer==
https://www.baeldung.com/spring-kafka
==Alpakka-kafka==
https://doc.akka.io/docs/alpakka-kafka/current/home.html
==Logstash consumer==
https://www.elastic.co/guide/en/logstash/6.7/plugins-inputs-kafka.html
Ahogy azt már láthattuk, a logstash lehet Kafka producer és consumer szerepben is, mind a Kafka input és output plugin-t is tartalmazza az alap logstash image.
:[[File:ClipCapIt-190421-191220.PNG]]
Ugyan azt a docker stack-et fogjuk használni, amit a logstash producer-nél használtunk, csak a konfigurációt fogjuk módosítani, hogy a kafa az input ne az output plugin-ben legyen: https://wiki.berki.org/index.php/Apache_Kafka#Logstash_producer_with_logback
Mivel a logstash most Kafka consumer szerepben lesz, ezért most a Kafka input plugin-t fogjuk használni. A logstash konfigurációt docker volume-al fogjuk felcsatolni a service-t futtató konténerbe. A logstash a belső, kafka-net overlay hálózaton keresztül közvetlen el fogja érni a Kafka brókert, ezért a konfigurációban a Kafka service nevét kell megadni, amit a swarm fel fog oldani a service-ben lévő konténerek IP címére.
/usr/share/logstash/pipeline/logstash.conf
<pre>
input {
kafka {
decorate_events => true
value_deserializer_class => "org.apache.kafka.common.serialization.StringDeserializer"
topics => ["test2-topic"]
bootstrap_servers => "kafka:29092"
group_id => "AvroConsumerGroupId"
client_id => "AvroConsumerClientId"
}
}
output {
stdout {
codec => rubydebug
}
}
</pre>
Ha a logstash elindult, akkor a swarm service logjában láthatjuk, hogy rákapcsolód a test2-topic-ra.
<pre>
# docker service logs -f confluent_logstash
...
[Consumer clientId=AvroConsumerClientId-0, groupId=AvroConsumerGroupId] Resetting offset for partition test2-topic-0 to offset 0.
</pre>
Írjunk be egy üzenetet a test2-topic-ba a '''kafka-console-producer'''-el.
<pre>
$ ./kafka-console-producer \
> --broker-list kafka:29092 \
> --topic test2-topic
>this is the test message
</pre>
Ekkor a logstash logjában meg fog jelenni a beírt üzenet message paraméterben. A logstash kiegészíti két meta paraméterrel az üzenetet (timestamp és version).
<pre>
confluent_logstash.1.3qsuyylnulxa@worker0 | {
confluent_logstash.1.3qsuyylnulxa@worker0 | "@timestamp" => 2019-04-21T17:55:08.694Z,
confluent_logstash.1.3qsuyylnulxa@worker0 | "message" => "this is the test message",
confluent_logstash.1.3qsuyylnulxa@worker0 | "@version" => "1"
confluent_logstash.1.3qsuyylnulxa@worker0 | }
</pre>
=Adminisztrációs eszközök=
* nodefluent/kafka-rest
* nodefluent/kafka-rest-ui
* sheepkiller/kafka-manager
* tobilg/zookeeper-webui