Changes

Apache Kafka

13,453 bytes added, 22:01, 28 October 2019
Kerberos autentikációval SSL felett
A producer-ek egy megadott topic-kra dobálják be az üzeneteket, amit onnan a consumer-ek kiolvasnak. Egy topic tetszőleges számú partícióból állhat. Egy partíció az a logikai egység, aminek rá kell férnie egy lemezre. A topic-kot úgy kell felskálázni, hogy egyre több partíciót adunk hozzá, amik különböző brokereken fognak létrejönni. Minden partíciónak lehet egy vagy több replikája, amik biztonsági másolatok. Mikor a producer beküld egy üzenetet egy partícióba, akkor fog committed üzenetnek minősülni, ha minden replikára is eljutott.
Azt, hogy egy producer melyik partícióba dobja az üzenetet vagy a kulcs határozza meg, vagy round-rubin robin módon mindig egy másikba teszi. Ha van kulcs, akkor az abból készült hash fogja meghatározni, hogy melyik partícióba kerüljön. Ugyan az a kulcs így mindig ugyan abba a partícióba fog kerülni. De a kulcs nem kötelező. A sorrend tartás csak egy partíción belül garantált, de ott nagyon. Ha nagyon kritikus bizonyos üzenetek sorrendje, akkor azokat egy partícióba kell rakni azonos kulcsot használva. Loggolásnál ez nem kritikus, egyrészt mert a logstash sorba rakja az üzeneteket, másrészt mikor elastichsearch-be szúrjuk, ott a dátum lesz az egyik attribútum, ami alapján már sorba lehet majd újra rendezni a logokat. Az meg amúgy sem kritikus, ha a log egy része enyhe csúszással kerül be az adatbázisba, lényeg, hogy végül helyes lesz a sorrend.
A comsumer-eket úgynevezett consumer-group-okba szervezzük az azonosítójuk szerint. Egy csoport mindig ugyan azon topic üzeneteit olvassa, de minden egyes consumer a csoporotban más és más partícióból. Minden partíció csak egy consumer-hez rendelhető hozzá egy csoporton belül. De ha nincs annyi consumer a csoportban mind ahány partíció, akkor egy consumer több partíciót is fog olvasni (ahogy ez a fenti ábrán is látszik, az alsó consumer két partíciót olvas. Viszont ha több consumer van mint partíció egy csoportban, akkor bizonyos consumer-ek mindig idle állapotban lesznek. Minden csoporton belül van egy vezető consumer, általában az aki először csatlakozott. Ő teríti a többieknek a cluster információkat.
==Command line producer==
===Autentikáció nélkül===
A legegyszerűbben a '''kafka-console-producer''' script-el írhatunk egy Kafka topic-ba. Ez a parancs része a Kafka csomagnak, benne van mind az Apache mind a Confluent csomagban is.
* https://kafka.apache.org/downloads
* https://www.confluent.io/download/
 
A '''kafka-console-producer''' parancsot a /bin mappában találjuk sok más hasznos Kafka management parancs között. A használatához meg kell adni a Kafka broker nevét és portját, és a topic nevét. A brokerhez azt a portot adjuk meg, ahogy az ingress hálózaton publikáltuk a Kafka service-t, a kafka domain nevet meg vegyük fel a host fájlba, és tegyük mögé a swarm cluster bármelyik node-jának az IP címét. (a swarm biztosítja, hogy bármelyik node-on keresztül el lehet érni bármelyik service-t)
Miután Enter-t nyomunk, a kafka-console-producer várni fogja sorba a topic-ra küldendő üzeneteket, minden Enter leütésre egy üzenetet küldhetünk el.
<pre>
# ./kafka-console-producer \
> --broker-list kafka:29092 \
> --topic test2-topic
>this is the first message
</pre>
 
Indítsunk el ugyan arra a topic-ra egy consumer-t hogy láthassuk hogy megjön e a üzenet.
<pre>
./kafka-console-consumer \
--bootstrap-server kafka:29092 \
--topic test2-topic \
--from-beginning
 
this is the first message
</pre>
Mivel a Kafka topic-ban addig marad meg egy üzenet amíg le nem ár, ezért ha a consumer-t úgy állítjuk be, hogy minden induláskor a topic elejéről olvasson (--from-beginning) ezért minden olyan üzenetet ki fog olvasni, amit valaha beírtak a topic-ba.
 
<br>
<br>
===Kerberos autentikációval SSL felett===
 
A Kerberos authentikácó alapja a Keytab fájl, ami egy bináris fájl, ebben található a kliens kulcsa és principálja. Ezen felül ha ha kafka borekerekhez SSL-el felett kell csatlakozni, akkor szükség van a brokerek certifikációjának a root CA-jára, amit be kell tenni egy trust-stor-ba. A Kerberos authentikáció használatához az alábbi fájlokra van szükség:
* '''java trustStore''': a brokerek Cert-je vagy a root CA
* '''jaas config''': (Java Authentication and Authorization Service): Itt kell megadni, hogy Kerberos-t akarunk használni, ezen felül itt kell megadni a kerberos modult is.
* '''producer.properties''': Kafa producer beállítások: itt adjuk meg, hogy SSL felett menjen a Kerberos authentikáció.
* '''keytab''': egy bináris fájl, amiben a Kerberos kliens titkos kulcsa van. Ezt a Kerberos üzemeltetés adja.
* '''principal''': A Kerberos "felhasználó nevünk": ezt is a Kerberos üzemeltetés adja.
* '''krb5.conf''': Ez a kerberos kliens konfigurációs fájlja. Ebben van megadva a Kerberos autentikációs szerverek címe és portja. Ezt is az üzemeletetés adja.
<br>
 
 
Első lépésként le kell menteni a kafak brókerek Root CA-ját, amit be fogunk tenni egy trust store-ba.
<pre>
$ openssl s_client -showcerts -verify 5 -connect kafka.broker01.berki.org:9092 < /dev/null | awk '/BEGIN/,/END/{ if(/BEGIN/){a++}; out="cert"a".pem"; print >out}'
verify depth is 5
depth=2 C = HU, O = Berki, OU = Berki Corp, CN = Berki Root CA
verify return:1
depth=0 C = HU, ST = Budapest, L = Budapest, O = BERKICORP, OU = AMF, CN = kafka.broker01.berki.org
verify return:1
DONE
</pre>
Ekkor a Root CA a cert1.pem fájlban van. Hozzunk létre egy java trust-store-t '''trustStore''' néven az 123456 jelszóval. Tegyük bele a root CA cert-et.
<br>
<br>
 
 
Hozzuk létre a Java Autentikációs és Autokorrelációs rendszer konfigurációs fájlját, ahol kikényszerítjük a Kerberos használatát a Java autentikáció során: <br>
kafka_client_jaas.conf
<pre>
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
debug=true
useKeyTab=true
storeKey=true
serviceName="kafka"
keyTab="/home/kafkaconf/test-client.keytab"
principal="_test-client@CORP.BERKI.ORG";
};
</pre>
Itt kell megadni a keytab fájl helyét, és a Kerberos principal-t, amire a keytab ki lett állítva. A keytab fájlt és a principal-t mindig a Kerberos üzemeltetője adja meg. A '''Krb5LoginModule''' modult be kell töltse a Java a Kerberos használatához. A '''serviceName''' paramétert megadhatjuk a producer.properties fájlban is. Ennek az értékét is a Kerberos üzemeltetéstől kell megkapjuk.
{{warning|Fontos, hogy a principal legyen az utolsó sorban, és hogy a sor végét ;-vel zárjuk le, akár csak az egész fájlt. }}
 
<br>
<br>
A Kafka producer konfigurációs fájljában kell megadni a prototokolt. Ez lehet SASL_PLAINTEXT vagy SASL_SSL titkos csatorna estén. Ez a kafka brokerker fog vonatkozni.
<br>
producer.properties:
<pre>
security.protocol=SASL_SSL
</pre>
 
<br>
<br>
 
A Kerberos kliens konfigurációs fájlt is készen kapjuk, ami a Kerberos szerverekről tartalmaz információkat. Fontos hogy az összes felsorolt szervert elérje a kliens a megadott portokon.
krb5.conf
<pre>
# Configuration snippets may be placed in this directory as well
includedir /etc/krb5.conf.d/
 
[logging]
default = FILE:/var/log/krb5libs.log
....
....
</pre>
 
 
<br>
<br>
A producer indítása előtt a '''KAFKA_OPTS''' Java argumentumokban meg kell adni a jaas konfigurációt, a Kerberos kliens konfigurációt, a trust-store-t és az ahhoz tartozó jelszót.
<pre>
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/kafaconf/kafka_client_jaas.conf \
-Djava.security.krb5.conf=/home/kafaconf/kafka/krb5.conf \
-Djavax.net.ssl.trustStore=/home/kafaconf/trustStore \
-Djavax.net.ssl.trustStorePassword=123456"
</pre>
<br>
<br>
 
Végül indíthatjuk a producer-t, ami csak a '''--producer.config''' kapcsolóban különbözik az autentikáció nélküli producer-től. (meg persze a KAFA_OPTS-ban megadott paraméterekben)
<pre>
./kafka-console-producer.sh \
--broker-list kafka.broker01.berki.org:9092,kafka.broker02.berki.org:9092,kafka.broker03.berki.org:9092 \
--topic test-topic \
--producer.config /home/kafaconf/producer.properties \
this is the first message
</pre>
 
 
 
<br>
<br>
==Java producer==
===Java kód===
 
<source lang="java">
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:29092");
</source>
 
<source lang="java">
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
</source>
 
 
 
<source lang="java">
import org.apache.kafka.clients.producer.*;
}
}
</source>
 
Az összes Kafka specifikus beállítást a Properties map-ben kell megadni. Ezek közül a legfontosabb a Kafka bróker címe. A kafka host nevet felvettük a host fájlba a worker0 node IP címével, de bármelyik swarm node IP címét választhatjuk. A port az a port, amit az ingress hálózaton publikáltunk a Kafka service-hez.
<source lang="java">
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:29092");
</source>
 
Fontos megadni a kulcs és az üzenet Serializációs módját. Itt azt választottuk, hogy a kulcs egy Long lesz, míg az üzenet egy mezei string.
<source lang="java">
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Bármit is választunk a serializáláshoz, fontos hogy a consumer-ben is csak ezzel kompatibilis deszerializálót választhatunk. A serializáló metódusoknak széles a választéka, pl JSON-t is küldhetünk.
</source>
<br>
 
==Logback producer==
logback.xml
<source lang="xml">
<configuration>
<shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/>
 
<appender name="kafkaAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender">
<producerConfig>bootstrap.servers=kafka:9092</producerConfig>
</appender>
 
<root level="info">
<appender-ref ref="kafkaAppender" />
</root>
</configuration>
</source>
 
 
Nagyon fontos, hogy leállítsuk a logger context-et mielőtt a VM leáll, mert ha a VM leállítása nagyon közel van a log beíráshoz, akkor még azelőtt leáll az egész VM, hogy a logokat kiírtuk volna. Nagyon rövid életű programokban, mint amilyen a mi példa programunk, hamarabb leállhat a VM, mint hogy el tudta volna küldeni a logback a Kafka-nak az üzenetet. Ha még a VM leállítása előtt meghívjuk a sthudownHook-ot, akkor leállás előtt még ki fogja írni a logokat.
<source lang="xml">
<shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/>
</source>
<br>
 
===Java kód===
<br>
 
===Custom log object===
Ahogy a logback hagyományos használata mellett, itt is lehetőség van egyedi üzenet objektumok használatára.  1. Az egyik lehetőség az MDC - Mapped Diagnostic Context (https://logback.qos.ch/manual/mdc.html) használata, amivel egyedi mezőket adhatunk hozzá a log-hoz, ami a kafka üzenetben is meg fog jelenni: <source lang="java">import org.slf4j.MDC;...MDC.put("transactionId", "1111");logger.info("this is the message:");</source> A fenit üzenet a kafka consumer-ben így fog megjelenni: <pre>{"@timestamp":"2019-04-21T16:20:34.620+02:00","@version":"1","message":"this is the message:","logger_name":"LogbackExample","thread_name":"main","level":"INFO","level_value":20000,"HOSTNAME":"adamDell2","transactionId":"1111","application":"this is the extra field"}</pre> <br><br>2. A másik lehetőség a '''net.logstash.logback.marker.Markers''' használata, amivel tetszőleges java POJO-kat írhatunk be JSON formátumban a logba. A példában az alábbi '''LogMessage''' java objektumot fogjuk használni.
<source lang="java">
public class LogMessage {
}
</source>
A java osztályban az import-ok közé felvesszüka Markers.append-t.
<source lang="java">
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import static net.logstash.logback.marker.Markers.append;
<br>public class LogbackExample {
private static final Logger logger ==Logstash producer with logback ==LoggerFactory.getLogger(LogbackExample.class.getSimpleName());
A logstash-t rakhatjuk a kafka elé és a kafka után is public static void main(String. Első lépésként a kafka elé fogjuk tenni, ami szortírozni fogja a logokat különböző topic-okba. . args) throws InterruptedException {
LogMessage message = new LogMessage("first", "second"); logger.info(append("customFieldName", message), "this is the message"); }}</source>A futtatásához szükséges környezet egy két nodeconsume-os swarm cluster ben az üzenet az alábbi lesz. :
<pre>
# docker node lsID HOSTNAME STATUS AVAILABILITY MANAGER STATUS ENGINE VERSIONmaigxlyagj1fl4sgcf6rnn9pc * mg0 Ready Active Leader 18.05.0{"@timestamp":"2019-04-cevox99u5s1g1su742mc6npm370 worker0 Ready Active 1821T16:20:35.05.0-ce140+02:00","@version":"1","message":"this is the message","logger_name":"LogbackExample","thread_name":"main","level":"INFO","level_value":20000,"HOSTNAME":"adamDell2","transactionId":"1111","customFieldName":{"feild1":"first","field2":"second"},"application":"this is the extra field"}
</pre>
Itt fogunk futtatni egy docker stack-et ami tartalmaz majd a zookeeper-en és a kafka-n kívül is a logstash-t. <br>
==Logstash producer with logback ==
A logstash-t rakhatjuk a kafka elé és a kafka után is. Első lépésként a kafka elé fogjuk tenni, ami szortírozni fogja a logokat különböző topic-okba. A logstash-be a logback-el fogjuk beírni az üzeneteket egy speciális logstash appender-t használva.
A korábban ismertetett docker stack-et ki fogjuk bővíteni egy logstash service-el. :[[File:ClipCapIt-190327190421-224419163339.PNG]]
Mind a három komponenst rá fogjuk kötni az ingress hálózatra is, mivel a Java producer-nek el kell érnie a logstash-t, és a consumer-nek pedig a kafa-t.
A loggolásra logback-et fogunk használni, aki a logstash 51415-ös portjára fogja küldeni TCPsocket-en. Az üzeneteket a '''kafka-console-consumer'''-el fogjuk kiolvasni.
===Logstash konfiguráció===
A lostash a TCP socket-en keresztül várja majd a logback-től a logokat. A logberben Marker-eket fogunk használni, amik a [tags] tömbbe fog tenni a logstash. A logstash a Kafka output plugin segítségével fogja beírni a megfelelő topic-ba az üzeneteket.  :[[File:ClipCapIt-190421-163511.PNG]] Az alap logstash image már tartalmazz mind a Kafka input és output plugin-t is, így kapásból tudunk a logstash-el Kafka-ból írni és olvasni. Listázzuk ki a logstash plugin-eket a '''bin/logstash-plugin list''' paranccsal. Láthatjuk hogy a kafka mind az input mind az output-ban ott van. <pre># docker run -it docker.elastic.co/logstash/logstash:6.6.2 bin/logstash-plugin list...logstash-input-kafka...logstash-output-kafka</pre> 
:[[File:ClipCapIt-190327-230626.PNG]]
/usr/share/logstash/pipeline/'''logstash.conf'''
===Swarm stack===
Fontos, hogy a logstash-ből a 6.6-os szériát használjuk, mert a korábbi verziókban van egy kafak specifikus hiba. A logstash konfigurációt volume driver-er fogjuk felcsatolnihost gépről. (A volume dirver-ekről részletek itt: https://wiki.berki.org/index.php/Docker_volume_orchestration)
<source lang="C++">
share: 192.168.42.1:/home/adam/dockerStore/logstash/config/
</source>
 
 
 
 
 
T2-topic
</pre>
 
===Pom.xml===
A logback a logstash-be a '''logstash-logback-encoder''' -el fog írni. Ehhez szükség van egy új függőségre:
<source lang="xml">
<dependency>
<source lang="xml">
<configuration>
<!-- <shutdownHook/> --> <!-- Nagyon fontos, hogy leállítsuk a logger context-et mielőtt a VM leáll, mert ha a VM leállítása nagyon közel van a log beíráshoz, akkor még azelőtt leáll az egész VM, hogy a logokat kiírtuk volna. de ha még a VM leállítása előtt meghívjuk a sthudownHook-ot, akkor leállás előtt még ki fogja írni a logokat. --> <shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/>
<appender name="STDOUT"
===Java producerlogger example===
<source lang="java">
import org.slf4j.Logger;
logger.info(taMarker, "Message to T1 from: {}", "adam");
logger.info(alMarker, "Message to T2 from: {}", "adam");
System.out.println( "Hello World!" );
}
}
===Tesztelés===
Logstash Mivel a logstash config-ba beletettük az '''stdout''' output-ot is, ezért a log-ba is be fog írni minden üzenetet:
<pre>
confluence_logstash.1.4a9rr1w42iud@worker0 | {
==Command line consumer==
A legegyszerűbben a '''kafka-console-consumer'''-el olvashatunk egy topic-ot, ez is része a standard Apache Kafka és a Confluent csomagnak is. Akárcsak a kafka-console-producer, ez is a bin mappában találathó és szintén a Kafka brokert és a topic nevét kell megadni.
<pre>
./kafka-console-consumer \
</pre>
Ha elindítottuk a '''kafka-console-consumer'''-t, akkor a Kafka service logjában láthatjuk, hogy a console consumer regisztrálta magát és ő lett a csoport vezetője. Minden consumer csoportnak van egy vezetője, akin keresztül a többi consumer a csoportban megkapja a konfigurációs változásokat.
<pre>
# docker service logs -f confluent_kafka
</pre>
A '''kafka-console-consumer''' mi fog loggolni minden egyes új üzenetet a topic-ról.
<pre>
{"@timestamp":"2019-04-21T12:44:32.896+02:00","@version":"1","message":"LogMessage@49d904ec","logger_name":"LogbackExample","thread_name":"main","level":"INFO","level_value":20000,"HOSTNAME":"adamDell2","transactionId":"444","metric":{"feild1":"first","field2":"second"},"application":"this is the extra field"}
==Java consumer==
 <source lang="java">props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:29092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleConsumer");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIGA Java consumer-nek ugyan azokra a maven függőségekre van szüksége mint a producer-nek, LongDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.classezrét ezeket itt nem ismételjük meg.getName());</source>   
<source lang="java">
import org.apache.kafka.clients.consumer.*;
</source>
A Kafka specifikus beállításokat szintén a Properties map-ben kell megadni, és megegyeznek a producer-nél bemutatott paraméterekkel.
<source lang="java">
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:29092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleConsumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
</source>
Fontos, hogy csak olyan deszerializátort használjunk, ami kompatibilis az üzenet fajtájával, vagyis olyat, ami kompatibilis a producer-nél használt serializátórral. Pl. egy JsonSerializer-el írt üzenetet ki lehet olvasni egy StringDeerializer-el is.
<source lang="java">
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonDeserializer");
</source>
 
<br>
 
==Spring-Kafa consumer==
 
https://www.baeldung.com/spring-kafka
 
 
==Alpakka-kafka==
 
https://doc.akka.io/docs/alpakka-kafka/current/home.html
==Logstash consumer==
https://www.elastic.co/guide/en/logstash/6.7/plugins-inputs-kafka.html
Ahogy azt már láthattuk, a logstash lehet Kafka producer és consumer szerepben is, mind a Kafka input és output plugin-t is tartalmazza az alap logstash image.
 
:[[File:ClipCapIt-190421-191220.PNG]]
 
 
Ugyan azt a docker stack-et fogjuk használni, amit a logstash producer-nél használtunk, csak a konfigurációt fogjuk módosítani, hogy a kafa az input ne az output plugin-ben legyen: https://wiki.berki.org/index.php/Apache_Kafka#Logstash_producer_with_logback
 
Mivel a logstash most Kafka consumer szerepben lesz, ezért most a Kafka input plugin-t fogjuk használni. A logstash konfigurációt docker volume-al fogjuk felcsatolni a service-t futtató konténerbe. A logstash a belső, kafka-net overlay hálózaton keresztül közvetlen el fogja érni a Kafka brókert, ezért a konfigurációban a Kafka service nevét kell megadni, amit a swarm fel fog oldani a service-ben lévő konténerek IP címére.
 
/usr/share/logstash/pipeline/logstash.conf
<pre>
input {
kafka {
 
decorate_events => true
value_deserializer_class => "org.apache.kafka.common.serialization.StringDeserializer"
 
topics => ["test2-topic"]
 
bootstrap_servers => "kafka:29092"
group_id => "AvroConsumerGroupId"
client_id => "AvroConsumerClientId"
 
}
}
 
output {
 
stdout {
codec => rubydebug
}
}
</pre>
 
 
Ha a logstash elindult, akkor a swarm service logjában láthatjuk, hogy rákapcsolód a test2-topic-ra.
<pre>
# docker service logs -f confluent_logstash
...
[Consumer clientId=AvroConsumerClientId-0, groupId=AvroConsumerGroupId] Resetting offset for partition test2-topic-0 to offset 0.
</pre>
 
Írjunk be egy üzenetet a test2-topic-ba a '''kafka-console-producer'''-el.
<pre>
$ ./kafka-console-producer \
> --broker-list kafka:29092 \
> --topic test2-topic
>this is the test message
</pre>
 
Ekkor a logstash logjában meg fog jelenni a beírt üzenet message paraméterben. A logstash kiegészíti két meta paraméterrel az üzenetet (timestamp és version).
<pre>
confluent_logstash.1.3qsuyylnulxa@worker0 | {
confluent_logstash.1.3qsuyylnulxa@worker0 | "@timestamp" => 2019-04-21T17:55:08.694Z,
confluent_logstash.1.3qsuyylnulxa@worker0 | "message" => "this is the test message",
confluent_logstash.1.3qsuyylnulxa@worker0 | "@version" => "1"
confluent_logstash.1.3qsuyylnulxa@worker0 | }
</pre>
=Adminisztrációs eszközök=
 
* nodefluent/kafka-rest
* nodefluent/kafka-rest-ui
* sheepkiller/kafka-manager
* tobilg/zookeeper-webui