7,540
edits
Changes
→Kerberos autentikációval SSL felett
A producer-ek egy megadott topic-kra dobálják be az üzeneteket, amit onnan a consumer-ek kiolvasnak. Egy topic tetszőleges számú partícióból állhat. Egy partíció az a logikai egység, aminek rá kell férnie egy lemezre. A topic-kot úgy kell felskálázni, hogy egyre több partíciót adunk hozzá, amik különböző brokereken fognak létrejönni. Minden partíciónak lehet egy vagy több replikája, amik biztonsági másolatok. Mikor a producer beküld egy üzenetet egy partícióba, akkor fog committed üzenetnek minősülni, ha minden replikára is eljutott.
Azt, hogy egy producer melyik partícióba dobja az üzenetet vagy a kulcs határozza meg, vagy round-rubin robin módon mindig egy másikba teszi. Ha van kulcs, akkor az abból készült hash fogja meghatározni, hogy melyik partícióba kerüljön. Ugyan az a kulcs így mindig ugyan abba a partícióba fog kerülni. De a kulcs nem kötelező. A sorrend tartás csak egy partíción belül garantált, de ott nagyon. Ha nagyon kritikus bizonyos üzenetek sorrendje, akkor azokat egy partícióba kell rakni azonos kulcsot használva. Loggolásnál ez nem kritikus, egyrészt mert a logstash sorba rakja az üzeneteket, másrészt mikor elastichsearch-be szúrjuk, ott a dátum lesz az egyik attribútum, ami alapján már sorba lehet majd újra rendezni a logokat. Az meg amúgy sem kritikus, ha a log egy része enyhe csúszással kerül be az adatbázisba, lényeg, hogy végül helyes lesz a sorrend.
A comsumer-eket úgynevezett consumer-group-okba szervezzük az azonosítójuk szerint. Egy csoport mindig ugyan azon topic üzeneteit olvassa, de minden egyes consumer a csoporotban más és más partícióból. Minden partíció csak egy consumer-hez rendelhető hozzá egy csoporton belül. De ha nincs annyi consumer a csoportban mind ahány partíció, akkor egy consumer több partíciót is fog olvasni (ahogy ez a fenti ábrán is látszik, az alsó consumer két partíciót olvas. Viszont ha több consumer van mint partíció egy csoportban, akkor bizonyos consumer-ek mindig idle állapotban lesznek. Minden csoporton belül van egy vezető consumer, általában az aki először csatlakozott. Ő teríti a többieknek a cluster információkat.
==Command line producer==
===Autentikáció nélkül===
A legegyszerűbben a '''kafka-console-producer''' script-el írhatunk egy Kafka topic-ba. Ez a parancs része a Kafka csomagnak, benne van mind az Apache mind a Confluent csomagban is.
* https://kafka.apache.org/downloads
Mivel a Kafka topic-ban addig marad meg egy üzenet amíg le nem ár, ezért ha a consumer-t úgy állítjuk be, hogy minden induláskor a topic elejéről olvasson (--from-beginning) ezért minden olyan üzenetet ki fog olvasni, amit valaha beírtak a topic-ba.
<br>
<br>
===Kerberos autentikációval SSL felett===
A Kerberos authentikácó alapja a Keytab fájl, ami egy bináris fájl, ebben található a kliens kulcsa és principálja. Ezen felül ha ha kafka borekerekhez SSL-el felett kell csatlakozni, akkor szükség van a brokerek certifikációjának a root CA-jára, amit be kell tenni egy trust-stor-ba. A Kerberos authentikáció használatához az alábbi fájlokra van szükség:
* '''java trustStore''': a brokerek Cert-je vagy a root CA
* '''jaas config''': (Java Authentication and Authorization Service): Itt kell megadni, hogy Kerberos-t akarunk használni, ezen felül itt kell megadni a kerberos modult is.
* '''producer.properties''': Kafa producer beállítások: itt adjuk meg, hogy SSL felett menjen a Kerberos authentikáció.
* '''keytab''': egy bináris fájl, amiben a Kerberos kliens titkos kulcsa van. Ezt a Kerberos üzemeltetés adja.
* '''principal''': A Kerberos "felhasználó nevünk": ezt is a Kerberos üzemeltetés adja.
* '''krb5.conf''': Ez a kerberos kliens konfigurációs fájlja. Ebben van megadva a Kerberos autentikációs szerverek címe és portja. Ezt is az üzemeletetés adja.
<br>
Első lépésként le kell menteni a kafak brókerek Root CA-ját, amit be fogunk tenni egy trust store-ba.
<pre>
$ openssl s_client -showcerts -verify 5 -connect kafka.broker01.berki.org:9092 < /dev/null | awk '/BEGIN/,/END/{ if(/BEGIN/){a++}; out="cert"a".pem"; print >out}'
verify depth is 5
depth=2 C = HU, O = Berki, OU = Berki Corp, CN = Berki Root CA
verify return:1
depth=0 C = HU, ST = Budapest, L = Budapest, O = BERKICORP, OU = AMF, CN = kafka.broker01.berki.org
verify return:1
DONE
</pre>
Ekkor a Root CA a cert1.pem fájlban van. Hozzunk létre egy java trust-store-t '''trustStore''' néven az 123456 jelszóval. Tegyük bele a root CA cert-et.
<br>
<br>
Hozzuk létre a Java Autentikációs és Autokorrelációs rendszer konfigurációs fájlját, ahol kikényszerítjük a Kerberos használatát a Java autentikáció során: <br>
kafka_client_jaas.conf
<pre>
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
debug=true
useKeyTab=true
storeKey=true
serviceName="kafka"
keyTab="/home/kafkaconf/test-client.keytab"
principal="_test-client@CORP.BERKI.ORG";
};
</pre>
Itt kell megadni a keytab fájl helyét, és a Kerberos principal-t, amire a keytab ki lett állítva. A keytab fájlt és a principal-t mindig a Kerberos üzemeltetője adja meg. A '''Krb5LoginModule''' modult be kell töltse a Java a Kerberos használatához. A '''serviceName''' paramétert megadhatjuk a producer.properties fájlban is. Ennek az értékét is a Kerberos üzemeltetéstől kell megkapjuk.
{{warning|Fontos, hogy a principal legyen az utolsó sorban, és hogy a sor végét ;-vel zárjuk le, akár csak az egész fájlt. }}
<br>
<br>
A Kafka producer konfigurációs fájljában kell megadni a prototokolt. Ez lehet SASL_PLAINTEXT vagy SASL_SSL titkos csatorna estén. Ez a kafka brokerker fog vonatkozni.
<br>
producer.properties:
<pre>
security.protocol=SASL_SSL
</pre>
<br>
<br>
A Kerberos kliens konfigurációs fájlt is készen kapjuk, ami a Kerberos szerverekről tartalmaz információkat. Fontos hogy az összes felsorolt szervert elérje a kliens a megadott portokon.
krb5.conf
<pre>
# Configuration snippets may be placed in this directory as well
includedir /etc/krb5.conf.d/
[logging]
default = FILE:/var/log/krb5libs.log
....
....
</pre>
<br>
<br>
A producer indítása előtt a '''KAFKA_OPTS''' Java argumentumokban meg kell adni a jaas konfigurációt, a Kerberos kliens konfigurációt, a trust-store-t és az ahhoz tartozó jelszót.
<pre>
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/kafaconf/kafka_client_jaas.conf \
-Djava.security.krb5.conf=/home/kafaconf/kafka/krb5.conf \
-Djavax.net.ssl.trustStore=/home/kafaconf/trustStore \
-Djavax.net.ssl.trustStorePassword=123456"
</pre>
<br>
<br>
Végül indíthatjuk a producer-t, ami csak a '''--producer.config''' kapcsolóban különbözik az autentikáció nélküli producer-től. (meg persze a KAFA_OPTS-ban megadott paraméterekben)
<pre>
./kafka-console-producer.sh \
--broker-list kafka.broker01.berki.org:9092,kafka.broker02.berki.org:9092,kafka.broker03.berki.org:9092 \
--topic test-topic \
--producer.config /home/kafaconf/producer.properties \
this is the first message
</pre>
<br>
<br>
<br>
===Custom log object===
Ahogy a logback hagyományos használata mellett, itt is lehetőség van egyedi üzenet objektumok használatára.
<br>
==Spring-Kafa consumer==
https://www.baeldung.com/spring-kafka
==Alpakka-kafka==
https://doc.akka.io/docs/alpakka-kafka/current/home.html
==Logstash consumer==
https://www.elastic.co/guide/en/logstash/6.7/plugins-inputs-kafka.html
Ahogy azt már láthattuk, a logstash lehet Kafka producer és consumer szerepben is, mind a Kafka input és output plugin-t is tartalmazza az alap logstash image.
:[[File:ClipCapIt-190421-191220.PNG]]
Ugyan azt a docker stack-et fogjuk használni, amit a logstash producer-nél használtunk, csak a konfigurációt fogjuk módosítani, hogy a kafa az input ne az output plugin-ben legyen: https://wiki.berki.org/index.php/Apache_Kafka#Logstash_producer_with_logback
/usr/share/logstash/pipeline/logstash.conf
Ha a logstash elindult, akkor a swarm service logjában láthatjuk, hogy rákapcsolód a test2-topic-ra.
<pre>
# docker service logs -f confluent_logstash
</pre>
Írjunk be egy üzenetet a test2-topic-ba a '''kafka-console-producer'''-el.
<pre>
$ ./kafka-console-producer \
</pre>
Ekkor a logstash logjában meg fog jelenni a beírt üzenet message paraméterben. A logstash kiegészíti két meta paraméterrel az üzenetet (timestamp és version).
<pre>
confluent_logstash.1.3qsuyylnulxa@worker0 | {
=Adminisztrációs eszközök=
* nodefluent/kafka-rest
* nodefluent/kafka-rest-ui
* sheepkiller/kafka-manager
* tobilg/zookeeper-webui