Go Kafka 示例代码, 使用github.com/streadway/amqp库, 包含生产和消费示例代码


main.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"runtime/debug"
	"time"

	"github.com/segmentio/kafka-go"
)

var (
	brokers = []string{"10.0.0.1:9092", "10.0.0.2:9092", "10.0.0.3:9092"}
	topic   = "test"
)

func main() {
	goWithRecover(func() {
		consumer()
	})

	goWithRecover(func() {
		producer()
	})

	select {}
}

func goWithRecover(fn func()) {
	go runSafe(fn)
}

func runSafe(fn func()) {
	defer func() {
		if p := recover(); p != nil {
			fmt.Println("panic recover", "panic", p, "stack", string(debug.Stack()))
		}
	}()

	fn()
}

kafka_broker

生产者

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func producer() {
	w := kafka.NewWriter(kafka.WriterConfig{
		Brokers:     brokers,
		Topic:       topic,
		MaxAttempts: 3,
		Async:       true,
		Balancer:    &kafka.LeastBytes{},
	})
	defer w.Close()

	data := make(map[string]interface{})
	for {
		data["taskId"] = time.Now().Unix()
		body, _ := json.Marshal(data)
		if err := w.WriteMessages(context.Background(),
			kafka.Message{
				Value: body,
			},
		); err != nil {
			fmt.Println("failed to write messages:", err)
		}

		time.Sleep(5 * time.Second)
	}
}

消费者

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func consumer() {
	r := kafka.NewReader(kafka.ReaderConfig{
		Brokers:  brokers,
		GroupID:  "consumer-group-" + topic,
		Topic:    topic,
		MinBytes: 10e3, // 10KB
		MaxBytes: 10e6, // 10MB
	})

	ctx := context.Background()
	for {
		m, err := r.ReadMessage(ctx)
		if err != nil {
			break
		}
		fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
		if err := r.CommitMessages(ctx, m); err != nil {
			fmt.Println("failed to commit messages:", err)
		}
	}

	if err := r.Close(); err != nil {
		fmt.Println("failed to close reader:", err)
	}
}

kafka_consumer_group

kafka-consumer-msg