1.参考https://github.com/wurstmeister/kafka-docker的实现。
2.参考https://github.com/simplesteph/kafka-stack-docker-compose
3.基于上述两个参考,实现以下的部署文件。
version: '3.1' services: zoo1: image: zookeeper:3.4.9 hostname: zoo1 ports: - "2181:2181" environment: ZOO_MY_ID: 1 ZOO_PORT: 2181 ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888 volumes: - ./zk-multiple-kafka-multiple/zoo1/data:/data - ./zk-multiple-kafka-multiple/zoo1/datalog:/datalog zoo2: image: zookeeper:3.4.9 hostname: zoo2 ports: - "2182:2182" environment: ZOO_MY_ID: 2 ZOO_PORT: 2182 ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888 volumes: - ./zk-multiple-kafka-multiple/zoo2/data:/data - ./zk-multiple-kafka-multiple/zoo2/datalog:/datalog zoo3: image: zookeeper:3.4.9 hostname: zoo3 ports: - "2183:2183" environment: ZOO_MY_ID: 3 ZOO_PORT: 2183 ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888 volumes: - ./zk-multiple-kafka-multiple/zoo3/data:/data - ./zk-multiple-kafka-multiple/zoo3/datalog:/datalog kafka1: image: wurstmeister/kafka:2.12-2.0.1 container_name: kafka1 hostname: kafka1 ports: - "9092:9092" - "1099:1099" environment: KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183" KAFKA_BROKER_ID: 1 KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" KAFKA_LISTENERS: PLAINTEXT://:9092 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.10.100:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 KAFKA_DEFAULT_REPLICATION_FACTOR: 3 KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=127.0.0.1 -Dcom.sun.management.jmxremote.rmi.port=1099" JMX_PORT: 1099 volumes: - ./zk-multiple-kafka-multiple/kafka1:/kafka - /var/run/docker.sock:/var/run/docker.sock depends_on: - zoo1 - zoo2 - zoo3 kafka2: image: wurstmeister/kafka:2.12-2.0.1 container_name: kafka2 hostname: kafka2 ports: - "9093:9092" - "2099:1099" environment: KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183" KAFKA_BROKER_ID: 2 KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" KAFKA_LISTENERS: PLAINTEXT://:9092 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.10.100:9093 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 KAFKA_DEFAULT_REPLICATION_FACTOR: 3 KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=127.0.0.1 -Dcom.sun.management.jmxremote.rmi.port=1099" JMX_PORT: 1099 volumes: - ./zk-multiple-kafka-multiple/kafka2:/kafka - /var/run/docker.sock:/var/run/docker.sock depends_on: - zoo1 - zoo2 - zoo3 kafka3: image: wurstmeister/kafka:2.12-2.0.1 container_name: kafka3 hostname: kafka3 ports: - "9094:9092" - "3099:1099" environment: KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183" KAFKA_BROKER_ID: 3 KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" KAFKA_LISTENERS: PLAINTEXT://:9092 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.10.100:9094 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 KAFKA_DEFAULT_REPLICATION_FACTOR: 3 KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=127.0.0.1 -Dcom.sun.management.jmxremote.rmi.port=1099" JMX_PORT: 1099 volumes: - ./zk-multiple-kafka-multiple/kafka3:/kafka - /var/run/docker.sock:/var/run/docker.sock depends_on: - zoo1 - zoo2 - zoo3 manager: image: hlebalbau/kafka-manager:2.0.0.2 hostname: manager ports: - "9000:9000" environment: ZK_HOSTS: "zoo1:2181,zoo2:2182,zoo3:2183" APPLICATION_SECRET: "random-secret" KAFKA_MANAGER_AUTH_ENABLED: "true" KAFKA_MANAGER_USERNAME: "abc" KAFKA_MANAGER_PASSWORD: "123" command: -Dpidfile.path=/dev/null |
4.测试文件
基于https://github.com/segmentio/kafka-go库的示范,实现如下:
package kaf import ( "context" "fmt" "github.com/segmentio/kafka-go" "log" "time" ) func LeaderProduce() { topic := "my-topic" partition := 0 conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition) if err != nil { log.Fatal(err) } conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) conn.WriteMessages( kafka.Message{Value: []byte(fmt.Sprint("one!", time.Now()))}, kafka.Message{Value: []byte(fmt.Sprint("two!", time.Now()))}, kafka.Message{Value: []byte(fmt.Sprint("three!", time.Now()))}, ) conn.Close() } func LeaderConsumer() { topic := "my-topic" partition := 0 conn, _ := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition) conn.SetReadDeadline(time.Now().Add(10 * time.Second)) batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max for { msg, err := batch.ReadMessage() if err != nil { break } fmt.Println(string(msg.Value)) } batch.Close() conn.Close() } func ClusterProduce(port int) { // make a writer that produces to topic-A, using the least-bytes distribution w := kafka.NewWriter(kafka.WriterConfig{ Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"}, Topic: "topic-A", Balancer: &kafka.LeastBytes{}, }) err := w.WriteMessages(context.Background(), kafka.Message{ Key: []byte("Key-A"), Value: []byte(fmt.Sprint("Hello World!", time.Now())), }, kafka.Message{ Key: []byte("Key-B"), Value: []byte(fmt.Sprint("One!", time.Now())), }, ) if err != nil { fmt.Println(port, "error", err) } w.Close() } func clusterConsume(port int) { // make a new reader that consumes from topic-A r := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"}, GroupID: "consumer-group-id", Topic: "topic-A", MinBytes: 1024 * 10, // 10KB MaxBytes: 10e6, // 10MB }) for { m, err := r.ReadMessage(context.Background()) if err != nil { fmt.Println(port, "error.....", err) time.Sleep(time.Second * 10) continue } fmt.Printf("%v--message at topic/partition/offset %v/%v/%v: %s = %s\n", port, m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value)) // time.Sleep(time.Second) } r.Close() } |