Changes

Apache Kafka

10,099 bytes added, 20:54, 20 April 2019
Logback producer
</source>
<br>
<br>
 
 
==Logstash producer==
 
A logstash-t rakhatjuk a kafka elé és a kafka után is. Első lépésként a kafka elé fogjuk tenni, ami szortírozni fogja a logokat különböző topic-okba.
 
A futtatásához szükséges környezet egy két node-os swarm cluster lesz.
<pre>
# docker node ls
ID HOSTNAME STATUS AVAILABILITY MANAGER STATUS ENGINE VERSION
maigxlyagj1fl4sgcf6rnn9pc * mg0 Ready Active Leader 18.05.0-ce
vox99u5s1g1su742mc6npm370 worker0 Ready Active 18.05.0-ce
</pre>
 
Itt fogunk futtatni egy docker stack-et ami tartalmaz majd a zookeeper-en és a kafka-n kívül is a logstash-t.
 
 
 
:[[File:ClipCapIt-190327-224419.PNG]]
 
Mind a három komponenst rá fogjuk kötni az ingress hálózatra is, mivel a Java producer-nek el kell érnie a logstash-t, és a consumer-nek pedig a kafa-t.
 
A loggolásra logback-et fogunk használni, aki a logstash 51415-ös portjára fogja küldeni TCP
 
 
===Logstash konfiguráció===
 
A lostash a TCP socket-en keresztül várja majd a logback-től a logokat. A logberben Marker-eket fogunk használni, amik a [tags] tömbbe fog tenni a logstash.
 
:[[File:ClipCapIt-190327-230626.PNG]]
 
/usr/share/logstash/pipeline/'''logstash.conf'''
<source lang="xml">
input {
tcp {
port => 51415
codec => "json"
}
}
 
output {
 
if "T1" in [tags] {
 
kafka {
codec => json
bootstrap_servers => "kafka:29092"
topic_id => "T1-topic"
}
} else if "T2" in [tags] {
 
kafka {
codec => json
bootstrap_servers => "kafka:29092"
topic_id => "T2-topic"
}
} else {
 
kafka {
codec => json
bootstrap_servers => "kafka:29092"
topic_id => "msg-topic"
}
 
}
 
stdout {
codec => rubydebug
}
}
</source>
 
===Swarm stack===
 
Fontos, hogy a logstash-ből a 6.6-os szériát használjuk, mert a korábbi verziókban van egy kafak specifikus hiba. A logstash konfigurációt volume driver-er fogjuk felcsatolni.
 
<source lang="C++">
version: '3.2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.1.2
networks:
- kafka-net
ports:
- "32181:32181"
deploy:
placement:
constraints:
- node.role == worker
environment:
ZOOKEEPER_CLIENT_PORT: 32181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_SYNC_LIMIT: 2
kafka:
image: confluentinc/cp-kafka:5.1.2
networks:
- kafka-net
ports:
- target: 29092
published: 29092
protocol: tcp
deploy:
placement:
constraints:
- node.role == worker
environment:
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:32181"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:29092"
KAFKA_BROKER_ID: 2
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
logstash:
image: docker.elastic.co/logstash/logstash:6.6.2
networks:
- kafka-net
ports:
- "51415:51415"
environment:
LOGSPOUT: "ignore"
XPACK_MONITORING_ENABLED: "false"
volumes:
- "logstash-conf:/usr/share/logstash/pipeline"
deploy:
placement:
constraints:
- node.role == worker
restart_policy:
condition: on-failure
resources:
reservations:
memory: 100m
networks:
kafka-net:
driver: overlay
volumes:
logstash-conf:
driver: nfs
driver_opts:
share: 192.168.42.1:/home/adam/dockerStore/logstash/config/
</source>
 
 
 
 
 
 
 
 
Topic-ok legyártása:
<pre>
# ./kafka-topics.sh --create --zookeeper 192.168.42.113:32181 --replication-factor 1 --partitions 1 --topic T1-topic
Created topic "T1-topic".
 
# ./kafka-topics.sh --create --zookeeper 192.168.42.113:32181 --replication-factor 1 --partitions 1 --topic T2-topic
Created topic "T2-topic".
 
# ./kafka-topics.sh --create --zookeeper 192.168.42.113:32181 --replication-factor 1 --partitions 1 --topic msg-topic
Created topic "msg-topic".
 
 
# ./kafka-topics.sh --list --zookeeper 192.168.42.113:32181
__confluent.support.metrics
__consumer_offsets
T1-topic
msg-topic
T2-topic
</pre>
 
===Pom.xml===
 
 
<source lang="xml">
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
<scope>runtime</scope>
</dependency>
 
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
 
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>5.3</version>
</dependency>
</source>
 
 
===logback.xml===
 
A logstash a '''LogstashTcpSocketAppender''' appender-en keresztül fogja elküldeni a logokat a logstash-benek. IP címnek a stack bármelyik node IP címét megadhatjuk. Mi a worker0 címét használjuk. Nagyon fontos, hogy megadjuk a shutdownHook-ot, ami biztosítja, hogy a JVM leállása előtt még a logback elküldje az összes függőben lévő logot. Ha előbb leáll a JVM mint hogy a logstash el tudta volna küldeni a logokat, akkor azok már nem lesznek kiküldve.
 
<source lang="xml">
<configuration>
<!-- <shutdownHook/> -->
<!-- Nagyon fontos, hogy leállítsuk a logger context-et mielőtt a VM leáll, mert ha a VM leállítása
nagyon közel van a log beíráshoz, akkor még azelőtt leáll az egész VM, hogy a logokat kiírtuk volna.
de ha még a VM leállítása előtt meghívjuk a sthudownHook-ot, akkor leállás előtt még ki fogja írni a logokat. -->
<shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/>
<appender name="STDOUT"
class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
</pattern>
</encoder>
</appender>
 
<appender name="stash"
class="net.logstash.logback.appender.LogstashTcpSocketAppender">
<destination>192.168.42.113:51415</destination>
 
<!-- encoder is required -->
<encoder class="net.logstash.logback.encoder.LogstashEncoder">
<customFields>{"appname":"adam"}</customFields>
</encoder>
</appender>
 
<root level="debug">
<appender-ref ref="stash" />
<appender-ref ref="STDOUT" />
</root>
</configuration>
</source>
 
 
===Java producer===
<source lang="java">
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
public class App
{
private static final Logger logger = LoggerFactory.getLogger(App.class);
public static void main( String[] args )
{
 
Marker taMarker = MarkerFactory.getMarker("T1");
Marker alMarker = MarkerFactory.getMarker("T2");
logger.info(taMarker, "Message to T1 from: {}", "adam");
logger.info(alMarker, "Message to T2 from: {}", "adam");
System.out.println( "Hello World!" );
}
}
</source>
 
===Tesztelés===
Logstash log:
<pre>
confluence_logstash.1.4a9rr1w42iud@worker0 | {
confluence_logstash.1.4a9rr1w42iud@worker0 | "level_value" => 20000,
confluence_logstash.1.4a9rr1w42iud@worker0 | "logger_name" => "kafka.example2.App",
confluence_logstash.1.4a9rr1w42iud@worker0 | "appname" => "adam",
confluence_logstash.1.4a9rr1w42iud@worker0 | "port" => 41024,
confluence_logstash.1.4a9rr1w42iud@worker0 | "level" => "INFO",
confluence_logstash.1.4a9rr1w42iud@worker0 | "@version" => "1",
confluence_logstash.1.4a9rr1w42iud@worker0 | "host" => "10.255.0.3",
confluence_logstash.1.4a9rr1w42iud@worker0 | "message" => "Message to T1 from: adam",
confluence_logstash.1.4a9rr1w42iud@worker0 | "thread_name" => "main",
confluence_logstash.1.4a9rr1w42iud@worker0 | "@timestamp" => 2019-03-26T22:52:19.168Z,
confluence_logstash.1.4a9rr1w42iud@worker0 | "tags" => [
confluence_logstash.1.4a9rr1w42iud@worker0 | [0] "TA"
confluence_logstash.1.4a9rr1w42iud@worker0 | ]
confluence_logstash.1.4a9rr1w42iud@worker0 | }
confluence_logstash.1.4a9rr1w42iud@worker0 | {
confluence_logstash.1.4a9rr1w42iud@worker0 | "level_value" => 20000,
confluence_logstash.1.4a9rr1w42iud@worker0 | "logger_name" => "kafka.example2.App",
confluence_logstash.1.4a9rr1w42iud@worker0 | "appname" => "adam",
confluence_logstash.1.4a9rr1w42iud@worker0 | "port" => 41024,
confluence_logstash.1.4a9rr1w42iud@worker0 | "level" => "INFO",
confluence_logstash.1.4a9rr1w42iud@worker0 | "@version" => "1",
confluence_logstash.1.4a9rr1w42iud@worker0 | "host" => "10.255.0.3",
confluence_logstash.1.4a9rr1w42iud@worker0 | "message" => "Message to T2 from: adam",
confluence_logstash.1.4a9rr1w42iud@worker0 | "thread_name" => "main",
confluence_logstash.1.4a9rr1w42iud@worker0 | "@timestamp" => 2019-03-26T22:52:19.176Z,
confluence_logstash.1.4a9rr1w42iud@worker0 | "tags" => [
confluence_logstash.1.4a9rr1w42iud@worker0 | [0] "AL"
confluence_logstash.1.4a9rr1w42iud@worker0 | ]
confluence_logstash.1.4a9rr1w42iud@worker0 | }
</pre>
 
 
Indítsunk el egy egy '''kafka-console-consumer.sh'''-t mind a T1 mind a T2 topic-kra.
 
<pre>
./kafka-console-consumer.sh --bootstrap-server 192.168.42.113:29092 --topic T1-topic --from-beginning
 
{"level_value":20000,"logger_name":"kafka.example2.App","appname":"adam","port":41024,"level":"INFO","@version":"1","host":"10.255.0.3","message":"Message to T1 from: adam","thread_name":"main","@timestamp":"2019-03-26T22:52:19.168Z","tags":["T1"]}
</pre>
 
 
<pre>
./kafka-console-consumer.sh --bootstrap-server 192.168.42.113:29092 --topic T2-topic --from-beginning
 
{"level_value":20000,"logger_name":"kafka.example2.App","appname":"adam","port":41024,"level":"INFO","@version":"1","host":"10.255.0.3","message":"Message to T2 from: adam","thread_name":"main","@timestamp":"2019-03-26T22:52:19.176Z","tags":["T2"]}
</pre>
 
<br>