Kafka with ELK on swarm

Revision as of 21:45, 1 March 2019 by Adam (talk | contribs)

Revision as of 21:45, 1 March 2019 by Adam (talk | contribs)

Mikor elindítunk egy Kafa példányt, akkor valójában egy kafka brokert indítunk el. A brokereknek van száma, ezeket nem lehet konfigurációból felskálázni. Ha producer-ek mindig egy brokerhez csatlakoznak. A teljes konfiguráció zookeeper-ben van tárolva. A zookeeper tudja értesíteni a klienseket ha a konfiguráció változik, ezért hamar elterjed a hálózaton a változás. A producer-ek egy megadott topic-kra dobálják be az üzeneteket, amit onnan a consumer-ek kiolvasnak. Egy topic tetszőleges számú partícióból állhat. Egy partíció az a logikai egység, aminek rá kell férnie egy lemezre. A topic-kot úgy kell felskálázni, hogy egyre több partíciót adunk hozzá, amik különböző brokereken fognak létrejönni. Minden partíciónak lehet egy vagy több replikája, amik biztonsági másolatok. Mikor a producer beküld egy üzenetet egy partícióba, akkor fog committed üzenetnek minősülni, ha minden replikára is eljutott. Azt, hogy egy producer melyik partícióba dobja az üzenetet vagy a kulcs határozza meg, vagy round-rubin módon mindig egy másikba teszi. Ha van kulcs, akkor az abból készült hash fogja meghatározni, hogy melyik partícióba kerüljön. Ugyan az a kulcs így mindig ugyan abba a partícióba fog kerülni. De a kulcs nem kötelező. A sorrend tartás csak egy partíción belül garantált, de ott nagyon. Ha nagyon kritikus bizonyos üzenetek sorrendje, akkor azokat egy partícióba kell rakni azonos kulcsot használva. Loggolásnál ez nem kritikus, egyrészt mert a logstash sorba rakja az üzeneteket, másrészt mikor elastichsearch-be szúrjuk, ott a dátum lesz az egyik attribútum, ami alapján már sorba lehet majd újra rendezni a logokat. Az meg amúgy sem kritikus, ha a log egy része enyhe csúszással kerül be az adatbázisba, lényeg, hogy végül helyes lesz a sorrend. A comsumer-eket úgynevezett consumer-group-okba szervezzük az azonosítójuk szerint. Egy csoport mindig ugyan azon topic üzeneteit olvassa, de minden egyes consumer a csoporotban más és más partícióból. Minden partíció csak egy consumer-hez rendelhető hozzá egy csoporton belül. De ha nincs annyi consumer a csoportban mind ahány partíció, akkor egy consumer több partíciót is fog olvasni. Viszont ha több consumer van mint partíció egy csoportban, akkor bizonyos consumer-ek mindig idle állapotban lesznek. Minden csoporton belül van egy vezető consumer, általában az aki először csatlakozott. Ő teríti a többieknek a cluster információkat. A kafka nem tudja értelmezni sem a kulcsot sem az üzenetet. Ez számára egy bájt tömb. Az, hogy egy objektumból hogy lesz bájt tömb kulcs és bájt tömb üzenet a producer-ben lévő serializátor dolga. A consumer-ben pedig a deserializázor dolga, hogy a bájt folyamból újra értelmes objektumot állítson elő.


Avro

Avro: serializer, desierializer:

http://cloudurable.com/blog/avro/index.html

Avro supports direct mapping to JSON as well as a compact binary format. It is a very fast serialization format. Avro is widely used in the Hadoop ecosystem. Avro supports polyglot bindings to many programming languages and a code generation for static languages. For dynamically typed languages, code generation is not needed. Another key advantage of Avro is its support of evolutionary schemas which supports compatibility checks, and allows evolving your data over time.


Schema registry

https://dzone.com/articles/kafka-avro-serialization-and-the-schema-registry

The Kafka producer creates a record/message that is an Avro record. The record contains a schema ID and data. With the Kafka Avro Serializer, the schema is registered if needed and then it serializes the data and schema ID. The Kafka Avro Serializer keeps a cache of registered schemas from the Schema Registry their schema IDs.

Consumers receive payloads and deserialize them with Kafka Avro Deserializers, which use the Confluent Schema Registry. The Deserializer looks up the full schema from the cache or Schema Registry based on ID. You can manage schemas via a REST API with the Schema registry. We will need to start up the Schema Registry server pointing to our ZooKeeper cluster


Írni kell pl JAVA-ban olyan Kafka Producer-eket és és Kafa Consumer-eket, amikbe beállítjuk hogy a serializáló és deserializáló az a Avro lesz: Properties props = new Properties();

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "AvroProducer");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                LongSerializer.class.getName());
        // Configure the KafkaAvroSerializer. <<--------- itt mondjuk meg hogy az Avro-t használja
       props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());  
        // Schema Registry location. <<------- itt állítjuk be a Schema reigstry-t az Avro-nak. 
        props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");

Aztán ezt a Kafa producer-t ugyan úgy kell használni, mint ha nem Avro-t használnánk, de a Comsumer-nek ugyan így kell majd deserializálnija.


Log4J with Kafa

https://logging.apache.org/log4j/2.x/manual/appenders.html#KafkaAppender

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-log4j-appender</artifactId>
	<version>1.0.0</version>
</dependency>