分类目录归档:KAFKA

kafka容器化部署

1.参考https://github.com/wurstmeister/kafka-docker的实现。
2.参考https://github.com/simplesteph/kafka-stack-docker-compose
3.基于上述两个参考,实现以下的部署文件。

version: '3.1'

services:
  zoo1:
    image: zookeeper:3.4.9
    hostname: zoo1
    ports:
      - "2181:2181"
    environment:
        ZOO_MY_ID: 1
        ZOO_PORT: 2181
        ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
    volumes:
      - ./zk-multiple-kafka-multiple/zoo1/data:/data
      - ./zk-multiple-kafka-multiple/zoo1/datalog:/datalog

  zoo2:
    image: zookeeper:3.4.9
    hostname: zoo2
    ports:
      - "2182:2182"
    environment:
        ZOO_MY_ID: 2
        ZOO_PORT: 2182
        ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
    volumes:
      - ./zk-multiple-kafka-multiple/zoo2/data:/data
      - ./zk-multiple-kafka-multiple/zoo2/datalog:/datalog

  zoo3:
    image: zookeeper:3.4.9
    hostname: zoo3
    ports:
      - "2183:2183"
    environment:
        ZOO_MY_ID: 3
        ZOO_PORT: 2183
        ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
    volumes:
      - ./zk-multiple-kafka-multiple/zoo3/data:/data
      - ./zk-multiple-kafka-multiple/zoo3/datalog:/datalog


  kafka1:
    image: wurstmeister/kafka:2.12-2.0.1
    container_name: kafka1
    hostname: kafka1
    ports:
      - "9092:9092"
      - "1099:1099"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183"
      KAFKA_BROKER_ID: 1
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_LISTENERS: PLAINTEXT://:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.10.100:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
      KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=127.0.0.1 -Dcom.sun.management.jmxremote.rmi.port=1099"
      JMX_PORT: 1099
    volumes:
      - ./zk-multiple-kafka-multiple/kafka1:/kafka
      - /var/run/docker.sock:/var/run/docker.sock
    depends_on:
      - zoo1
      - zoo2
      - zoo3

  kafka2:
    image: wurstmeister/kafka:2.12-2.0.1
    container_name: kafka2
    hostname: kafka2
    ports:
      - "9093:9092"
      - "2099:1099"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183"
      KAFKA_BROKER_ID: 2
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_LISTENERS: PLAINTEXT://:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.10.100:9093
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
      KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=127.0.0.1 -Dcom.sun.management.jmxremote.rmi.port=1099"
      JMX_PORT: 1099
    volumes:
      - ./zk-multiple-kafka-multiple/kafka2:/kafka
      - /var/run/docker.sock:/var/run/docker.sock
    depends_on:
      - zoo1
      - zoo2
      - zoo3

  kafka3:
    image: wurstmeister/kafka:2.12-2.0.1
    container_name: kafka3
    hostname: kafka3
    ports:
      - "9094:9092"
      - "3099:1099"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183"
      KAFKA_BROKER_ID: 3
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_LISTENERS: PLAINTEXT://:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.10.100:9094
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
      KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=127.0.0.1 -Dcom.sun.management.jmxremote.rmi.port=1099"
      JMX_PORT: 1099
    volumes:
      - ./zk-multiple-kafka-multiple/kafka3:/kafka
      - /var/run/docker.sock:/var/run/docker.sock
    depends_on:
      - zoo1
      - zoo2
      - zoo3

  manager:
    image: hlebalbau/kafka-manager:2.0.0.2
    hostname: manager
    ports:
      - "9000:9000"
    environment:
      ZK_HOSTS: "zoo1:2181,zoo2:2182,zoo3:2183"
      APPLICATION_SECRET: "random-secret"
      KAFKA_MANAGER_AUTH_ENABLED: "true"
      KAFKA_MANAGER_USERNAME: "abc"
      KAFKA_MANAGER_PASSWORD: "123"
    command: -Dpidfile.path=/dev/null

4.测试文件
基于https://github.com/segmentio/kafka-go库的示范,实现如下:

package kaf

import (
	"context"
	"fmt"
	"github.com/segmentio/kafka-go"
	"log"
	"time"
)

func LeaderProduce() {
	topic := "my-topic"
	partition := 0

	conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
	if err != nil {
		log.Fatal(err)
	}
	conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
	conn.WriteMessages(
		kafka.Message{Value: []byte(fmt.Sprint("one!", time.Now()))},
		kafka.Message{Value: []byte(fmt.Sprint("two!", time.Now()))},
		kafka.Message{Value: []byte(fmt.Sprint("three!", time.Now()))},
	)

	conn.Close()
}

func LeaderConsumer() {
	topic := "my-topic"
	partition := 0

	conn, _ := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)

	conn.SetReadDeadline(time.Now().Add(10 * time.Second))
	batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max
	for {
		msg, err := batch.ReadMessage()
		if err != nil {
			break
		}
		fmt.Println(string(msg.Value))
	}

	batch.Close()
	conn.Close()
}

func ClusterProduce(port int) {
	// make a writer that produces to topic-A, using the least-bytes distribution
	w := kafka.NewWriter(kafka.WriterConfig{
		Brokers:  []string{"localhost:9092", "localhost:9093", "localhost:9094"},
		Topic:    "topic-A",
		Balancer: &kafka.LeastBytes{},
	})

	err := w.WriteMessages(context.Background(),
		kafka.Message{
			Key:   []byte("Key-A"),
			Value: []byte(fmt.Sprint("Hello World!", time.Now())),
		},
		kafka.Message{
			Key:   []byte("Key-B"),
			Value: []byte(fmt.Sprint("One!", time.Now())),
		},
	)
	if err != nil {
		fmt.Println(port, "error", err)
	}

	w.Close()
}

func clusterConsume(port int) {
	// make a new reader that consumes from topic-A
	r := kafka.NewReader(kafka.ReaderConfig{
		Brokers:  []string{"localhost:9092", "localhost:9093", "localhost:9094"},
		GroupID:  "consumer-group-id",
		Topic:    "topic-A",
		MinBytes: 1024 * 10, // 10KB
		MaxBytes: 10e6,      // 10MB
	})

	for {
		m, err := r.ReadMessage(context.Background())
		if err != nil {
			fmt.Println(port, "error.....", err)
			time.Sleep(time.Second * 10)
			continue
		}
		fmt.Printf("%v--message at topic/partition/offset %v/%v/%v: %s = %s\n", port, m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
		// time.Sleep(time.Second)
	}

	r.Close()
}