Difference between revisions of "Apache Kafka"
From berki WIKI
(→Avro) |
|||
Line 1: | Line 1: | ||
+ | |||
+ | =Kafka bemutatása= | ||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
+ | =Java producer= | ||
− | + | =Java consumer= | |
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | + | =Logstash= | |
− | |||
− | |||
− | |||
− | |||
+ | ==Producer== | ||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
+ | :[[File:ClipCapIt-190327-224419.PNG]] | ||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
+ | <source lang="xml"> | ||
+ | input { | ||
+ | tcp { | ||
+ | port => 51415 | ||
+ | codec => "json" | ||
+ | } | ||
+ | } | ||
− | + | output { | |
− | |||
− | |||
− | |||
− | |||
− | |||
+ | if "TA" in [tags] { | ||
− | == | + | kafka { |
+ | codec => json | ||
+ | bootstrap_servers => "kafka:29092" | ||
+ | topic_id => "ta-topic" | ||
+ | } | ||
+ | |||
+ | } else if "AL" in [tags] { | ||
− | + | kafka { | |
− | + | codec => json | |
− | + | bootstrap_servers => "kafka:29092" | |
− | + | topic_id => "al-topic" | |
+ | } | ||
+ | |||
+ | } else { | ||
− | + | kafka { | |
+ | codec => json | ||
+ | bootstrap_servers => "kafka:29092" | ||
+ | topic_id => "msg-topic" | ||
+ | } | ||
− | + | } | |
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | + | stdout { | |
− | + | codec => rubydebug | |
− | + | } | |
− | + | } | |
− | |||
− | |||
− | |||
− | |||
− | |||
</source> | </source> | ||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
<source lang="C++"> | <source lang="C++"> | ||
Line 178: | Line 75: | ||
networks: | networks: | ||
- kafka-net | - kafka-net | ||
+ | ports: | ||
+ | - "32181:32181" | ||
deploy: | deploy: | ||
placement: | placement: | ||
Line 194: | Line 93: | ||
published: 29092 | published: 29092 | ||
protocol: tcp | protocol: tcp | ||
− | |||
deploy: | deploy: | ||
placement: | placement: | ||
Line 200: | Line 98: | ||
- node.role == worker | - node.role == worker | ||
environment: | environment: | ||
− | KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181 | + | KAFKA_ZOOKEEPER_CONNECT: "zookeeper:32181" |
− | KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092 | + | KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:29092" |
KAFKA_BROKER_ID: 2 | KAFKA_BROKER_ID: 2 | ||
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 | KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 | ||
− | + | logstash: | |
− | image: | + | image: docker.elastic.co/logstash/logstash:6.6.2 |
networks: | networks: | ||
- kafka-net | - kafka-net | ||
ports: | ports: | ||
− | - | + | - "51415:51415" |
− | deploy: | + | environment: |
+ | LOGSPOUT: "ignore" | ||
+ | XPACK_MONITORING_ENABLED: "false" | ||
+ | volumes: | ||
+ | - "logstash-conf:/usr/share/logstash/pipeline" | ||
+ | deploy: | ||
placement: | placement: | ||
constraints: | constraints: | ||
− | + | - node.role == worker | |
− | + | restart_policy: | |
− | + | condition: on-failure | |
− | + | resources: | |
− | + | reservations: | |
− | + | memory: 100m | |
− | |||
networks: | networks: | ||
kafka-net: | kafka-net: | ||
driver: overlay | driver: overlay | ||
+ | volumes: | ||
+ | logstash-conf: | ||
+ | driver: nfs | ||
+ | driver_opts: | ||
+ | share: 192.168.42.1:/home/adam/dockerStore/logstash/config/ | ||
</source> | </source> | ||
+ | |||
+ | |||
+ | |||
+ | |||
+ | |||
+ | pom.xml | ||
+ | <source lang="xml"> | ||
+ | <dependency> | ||
+ | <groupId>ch.qos.logback</groupId> | ||
+ | <artifactId>logback-classic</artifactId> | ||
+ | <version>1.2.3</version> | ||
+ | <scope>runtime</scope> | ||
+ | </dependency> | ||
+ | |||
+ | <dependency> | ||
+ | <groupId>org.slf4j</groupId> | ||
+ | <artifactId>slf4j-api</artifactId> | ||
+ | <version>1.7.25</version> | ||
+ | </dependency> | ||
+ | |||
+ | <dependency> | ||
+ | <groupId>net.logstash.logback</groupId> | ||
+ | <artifactId>logstash-logback-encoder</artifactId> | ||
+ | <version>5.3</version> | ||
+ | </dependency> | ||
+ | </source> | ||
+ | |||
+ | |||
+ | |||
+ | |||
+ | |||
+ | |||
+ | <source lang="java"> | ||
+ | import org.slf4j.Logger; | ||
+ | import org.slf4j.LoggerFactory; | ||
+ | import org.slf4j.Marker; | ||
+ | import org.slf4j.MarkerFactory; | ||
+ | public class App | ||
+ | { | ||
+ | private static final Logger logger = LoggerFactory.getLogger(App.class); | ||
+ | |||
+ | public static void main( String[] args ) | ||
+ | { | ||
+ | |||
+ | Marker taMarker = MarkerFactory.getMarker("TA"); | ||
+ | Marker alMarker = MarkerFactory.getMarker("AL"); | ||
+ | |||
+ | logger.info(taMarker, "Message to TA from: {}", "adam"); | ||
+ | logger.info(alMarker, "Message to AL from: {}", "adam"); | ||
+ | |||
+ | System.out.println( "Hello World!" ); | ||
+ | } | ||
+ | } |
Revision as of 21:55, 27 March 2019
Kafka bemutatása
Java producer
Java consumer
Logstash
Producer
input {
tcp {
port => 51415
codec => "json"
}
}
output {
if "TA" in [tags] {
kafka {
codec => json
bootstrap_servers => "kafka:29092"
topic_id => "ta-topic"
}
} else if "AL" in [tags] {
kafka {
codec => json
bootstrap_servers => "kafka:29092"
topic_id => "al-topic"
}
} else {
kafka {
codec => json
bootstrap_servers => "kafka:29092"
topic_id => "msg-topic"
}
}
stdout {
codec => rubydebug
}
}
version: '3.2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.1.2
networks:
- kafka-net
ports:
- "32181:32181"
deploy:
placement:
constraints:
- node.role == worker
environment:
ZOOKEEPER_CLIENT_PORT: 32181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_SYNC_LIMIT: 2
kafka:
image: confluentinc/cp-kafka:5.1.2
networks:
- kafka-net
ports:
- target: 29092
published: 29092
protocol: tcp
deploy:
placement:
constraints:
- node.role == worker
environment:
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:32181"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:29092"
KAFKA_BROKER_ID: 2
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
logstash:
image: docker.elastic.co/logstash/logstash:6.6.2
networks:
- kafka-net
ports:
- "51415:51415"
environment:
LOGSPOUT: "ignore"
XPACK_MONITORING_ENABLED: "false"
volumes:
- "logstash-conf:/usr/share/logstash/pipeline"
deploy:
placement:
constraints:
- node.role == worker
restart_policy:
condition: on-failure
resources:
reservations:
memory: 100m
networks:
kafka-net:
driver: overlay
volumes:
logstash-conf:
driver: nfs
driver_opts:
share: 192.168.42.1:/home/adam/dockerStore/logstash/config/
pom.xml
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>5.3</version>
</dependency>
<source lang="java">
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
public class App
{
private static final Logger logger = LoggerFactory.getLogger(App.class);
public static void main( String[] args ) {
Marker taMarker = MarkerFactory.getMarker("TA"); Marker alMarker = MarkerFactory.getMarker("AL"); logger.info(taMarker, "Message to TA from: {}", "adam"); logger.info(alMarker, "Message to AL from: {}", "adam"); System.out.println( "Hello World!" ); }
}