Difference between revisions of "Apache Kafka"

From berki WIKI
Jump to: navigation, search
(Producer)
(Producer)
Line 35: Line 35:
 
output {
 
output {
  
   if "TA" in [tags] {
+
   if "T1" in [tags] {
  
 
     kafka {
 
     kafka {
 
       codec => json
 
       codec => json
 
       bootstrap_servers => "kafka:29092"
 
       bootstrap_servers => "kafka:29092"
       topic_id => "ta-topic"
+
       topic_id => "T1-topic"
 
     }
 
     }
 
      
 
      
   } else if "AL" in [tags] {
+
   } else if "T2" in [tags] {
  
 
     kafka {
 
     kafka {
 
       codec => json
 
       codec => json
 
       bootstrap_servers => "kafka:29092"
 
       bootstrap_servers => "kafka:29092"
       topic_id => "al-topic"
+
       topic_id => "T2-topic"
 
     }
 
     }
 
      
 
      
Line 133: Line 133:
  
  
 
+
:[[File:ClipCapIt-190327-230626.PNG]]
  
  
 
pom.xml
 
pom.xml
 
<source lang="xml">
 
<source lang="xml">
                <dependency>
+
              <dependency>
 
<groupId>ch.qos.logback</groupId>
 
<groupId>ch.qos.logback</groupId>
 
<artifactId>logback-classic</artifactId>
 
<artifactId>logback-classic</artifactId>
Line 175: Line 175:
 
     {   
 
     {   
  
         Marker taMarker = MarkerFactory.getMarker("TA");
+
         Marker taMarker = MarkerFactory.getMarker("T1");
         Marker alMarker = MarkerFactory.getMarker("AL");
+
         Marker alMarker = MarkerFactory.getMarker("T2");
 
    
 
    
     logger.info(taMarker, "Message to TA from: {}", "adam");   
+
     logger.info(taMarker, "Message to T1 from: {}", "adam");   
     logger.info(alMarker, "Message to AL from: {}", "adam");
+
     logger.info(alMarker, "Message to T2 from: {}", "adam");
 
    
 
    
 
         System.out.println( "Hello World!" );
 
         System.out.println( "Hello World!" );
Line 185: Line 185:
 
}
 
}
 
</source>
 
</source>
 +
 +
 +
Logstash log:
 +
<pre>
 +
confluence_logstash.1.4a9rr1w42iud@worker0    | {
 +
confluence_logstash.1.4a9rr1w42iud@worker0    |    "level_value" => 20000,
 +
confluence_logstash.1.4a9rr1w42iud@worker0    |    "logger_name" => "kafka.example2.App",
 +
confluence_logstash.1.4a9rr1w42iud@worker0    |        "appname" => "adam",
 +
confluence_logstash.1.4a9rr1w42iud@worker0    |            "port" => 41024,
 +
confluence_logstash.1.4a9rr1w42iud@worker0    |          "level" => "INFO",
 +
confluence_logstash.1.4a9rr1w42iud@worker0    |        "@version" => "1",
 +
confluence_logstash.1.4a9rr1w42iud@worker0    |            "host" => "10.255.0.3",
 +
confluence_logstash.1.4a9rr1w42iud@worker0    |        "message" => "Message to TA from: adam",
 +
confluence_logstash.1.4a9rr1w42iud@worker0    |    "thread_name" => "main",
 +
confluence_logstash.1.4a9rr1w42iud@worker0    |      "@timestamp" => 2019-03-26T22:52:19.168Z,
 +
confluence_logstash.1.4a9rr1w42iud@worker0    |            "tags" => [
 +
confluence_logstash.1.4a9rr1w42iud@worker0    |        [0] "TA"
 +
confluence_logstash.1.4a9rr1w42iud@worker0    |    ]
 +
confluence_logstash.1.4a9rr1w42iud@worker0    | }
 +
confluence_logstash.1.4a9rr1w42iud@worker0    | {
 +
confluence_logstash.1.4a9rr1w42iud@worker0    |    "level_value" => 20000,
 +
confluence_logstash.1.4a9rr1w42iud@worker0    |    "logger_name" => "kafka.example2.App",
 +
confluence_logstash.1.4a9rr1w42iud@worker0    |        "appname" => "adam",
 +
confluence_logstash.1.4a9rr1w42iud@worker0    |            "port" => 41024,
 +
confluence_logstash.1.4a9rr1w42iud@worker0    |          "level" => "INFO",
 +
confluence_logstash.1.4a9rr1w42iud@worker0    |        "@version" => "1",
 +
confluence_logstash.1.4a9rr1w42iud@worker0    |            "host" => "10.255.0.3",
 +
confluence_logstash.1.4a9rr1w42iud@worker0    |        "message" => "Message to AL from: adam",
 +
confluence_logstash.1.4a9rr1w42iud@worker0    |    "thread_name" => "main",
 +
confluence_logstash.1.4a9rr1w42iud@worker0    |      "@timestamp" => 2019-03-26T22:52:19.176Z,
 +
confluence_logstash.1.4a9rr1w42iud@worker0    |            "tags" => [
 +
confluence_logstash.1.4a9rr1w42iud@worker0    |        [0] "AL"
 +
confluence_logstash.1.4a9rr1w42iud@worker0    |    ]
 +
confluence_logstash.1.4a9rr1w42iud@worker0    | }
 +
</pre>

Revision as of 22:08, 27 March 2019

Kafka bemutatása

Java producer

Java consumer

Logstash

Producer

ClipCapIt-190327-224419.PNG






input {
  tcp { 
    port => 51415
    codec => "json"
  }
}

output {

  if "T1" in [tags] {

    kafka {
      codec => json
      bootstrap_servers => "kafka:29092"
      topic_id => "T1-topic"
    }
    
  } else if "T2" in [tags] {

    kafka {
      codec => json
      bootstrap_servers => "kafka:29092"
      topic_id => "T2-topic"
    }
    
  } else {

    kafka {
      codec => json
      bootstrap_servers => "kafka:29092"
      topic_id => "msg-topic"
    }

  }

  stdout {
    codec => rubydebug
  }
}


version: '3.2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.1.2
    networks:
      - kafka-net
    ports:
      - "32181:32181"
    deploy:
      placement:
        constraints:
         - node.role == worker
    environment:
      ZOOKEEPER_CLIENT_PORT: 32181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_SYNC_LIMIT: 2
  kafka:
    image: confluentinc/cp-kafka:5.1.2
    networks:
      - kafka-net
    ports:
      - target: 29092
        published: 29092
        protocol: tcp
    deploy:
      placement:
        constraints:
         - node.role == worker
    environment:
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:32181"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:29092"
      KAFKA_BROKER_ID: 2
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  logstash:
    image: docker.elastic.co/logstash/logstash:6.6.2
    networks:
      - kafka-net
    ports:
      - "51415:51415"
    environment:
      LOGSPOUT: "ignore"
      XPACK_MONITORING_ENABLED: "false"
    volumes:
      - "logstash-conf:/usr/share/logstash/pipeline"
    deploy:   
      placement:
        constraints:
         - node.role == worker
      restart_policy:
        condition: on-failure
      resources:
        reservations:
          memory: 100m   
networks:
  kafka-net:
    driver: overlay
volumes:
  logstash-conf:
    driver: nfs
    driver_opts:
      share: 192.168.42.1:/home/adam/dockerStore/logstash/config/


ClipCapIt-190327-230626.PNG


pom.xml

               <dependency>
			<groupId>ch.qos.logback</groupId>
			<artifactId>logback-classic</artifactId>
			<version>1.2.3</version>
			<scope>runtime</scope>
		</dependency>

		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-api</artifactId>
			<version>1.7.25</version>
		</dependency>

		<dependency>
			<groupId>net.logstash.logback</groupId>
			<artifactId>logstash-logback-encoder</artifactId>
			<version>5.3</version>
		</dependency>




import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
public class App 
{	
    private static final Logger logger = LoggerFactory.getLogger(App.class);
	
    public static void main( String[] args )
    {    	

        Marker taMarker = MarkerFactory.getMarker("T1");
        Marker alMarker = MarkerFactory.getMarker("T2");
    	
    	logger.info(taMarker, "Message to T1 from: {}", "adam");    	
    	logger.info(alMarker, "Message to T2 from: {}", "adam");
    	
        System.out.println( "Hello World!" );
    }
}


Logstash log:

confluence_logstash.1.4a9rr1w42iud@worker0    | {
confluence_logstash.1.4a9rr1w42iud@worker0    |     "level_value" => 20000,
confluence_logstash.1.4a9rr1w42iud@worker0    |     "logger_name" => "kafka.example2.App",
confluence_logstash.1.4a9rr1w42iud@worker0    |         "appname" => "adam",
confluence_logstash.1.4a9rr1w42iud@worker0    |            "port" => 41024,
confluence_logstash.1.4a9rr1w42iud@worker0    |           "level" => "INFO",
confluence_logstash.1.4a9rr1w42iud@worker0    |        "@version" => "1",
confluence_logstash.1.4a9rr1w42iud@worker0    |            "host" => "10.255.0.3",
confluence_logstash.1.4a9rr1w42iud@worker0    |         "message" => "Message to TA from: adam",
confluence_logstash.1.4a9rr1w42iud@worker0    |     "thread_name" => "main",
confluence_logstash.1.4a9rr1w42iud@worker0    |      "@timestamp" => 2019-03-26T22:52:19.168Z,
confluence_logstash.1.4a9rr1w42iud@worker0    |            "tags" => [
confluence_logstash.1.4a9rr1w42iud@worker0    |         [0] "TA"
confluence_logstash.1.4a9rr1w42iud@worker0    |     ]
confluence_logstash.1.4a9rr1w42iud@worker0    | }
confluence_logstash.1.4a9rr1w42iud@worker0    | {
confluence_logstash.1.4a9rr1w42iud@worker0    |     "level_value" => 20000,
confluence_logstash.1.4a9rr1w42iud@worker0    |     "logger_name" => "kafka.example2.App",
confluence_logstash.1.4a9rr1w42iud@worker0    |         "appname" => "adam",
confluence_logstash.1.4a9rr1w42iud@worker0    |            "port" => 41024,
confluence_logstash.1.4a9rr1w42iud@worker0    |           "level" => "INFO",
confluence_logstash.1.4a9rr1w42iud@worker0    |        "@version" => "1",
confluence_logstash.1.4a9rr1w42iud@worker0    |            "host" => "10.255.0.3",
confluence_logstash.1.4a9rr1w42iud@worker0    |         "message" => "Message to AL from: adam",
confluence_logstash.1.4a9rr1w42iud@worker0    |     "thread_name" => "main",
confluence_logstash.1.4a9rr1w42iud@worker0    |      "@timestamp" => 2019-03-26T22:52:19.176Z,
confluence_logstash.1.4a9rr1w42iud@worker0    |            "tags" => [
confluence_logstash.1.4a9rr1w42iud@worker0    |         [0] "AL"
confluence_logstash.1.4a9rr1w42iud@worker0    |     ]
confluence_logstash.1.4a9rr1w42iud@worker0    | }