Go Kafka 示例代码, 使用github.com/streadway/amqp库, 包含生产和消费示例代码
main.go#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
package main
import (
"context"
"encoding/json"
"fmt"
"runtime/debug"
"time"
"github.com/segmentio/kafka-go"
)
var (
brokers = []string{"10.0.0.1:9092", "10.0.0.2:9092", "10.0.0.3:9092"}
topic = "test"
)
func main() {
goWithRecover(func() {
consumer()
})
goWithRecover(func() {
producer()
})
select {}
}
func goWithRecover(fn func()) {
go runSafe(fn)
}
func runSafe(fn func()) {
defer func() {
if p := recover(); p != nil {
fmt.Println("panic recover", "panic", p, "stack", string(debug.Stack()))
}
}()
fn()
}
|
生产者#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
func producer() {
w := kafka.NewWriter(kafka.WriterConfig{
Brokers: brokers,
Topic: topic,
MaxAttempts: 3,
Async: true,
Balancer: &kafka.LeastBytes{},
})
defer w.Close()
data := make(map[string]interface{})
for {
data["taskId"] = time.Now().Unix()
body, _ := json.Marshal(data)
if err := w.WriteMessages(context.Background(),
kafka.Message{
Value: body,
},
); err != nil {
fmt.Println("failed to write messages:", err)
}
time.Sleep(5 * time.Second)
}
}
|
消费者#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
func consumer() {
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
GroupID: "consumer-group-" + topic,
Topic: topic,
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
})
ctx := context.Background()
for {
m, err := r.ReadMessage(ctx)
if err != nil {
break
}
fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
if err := r.CommitMessages(ctx, m); err != nil {
fmt.Println("failed to commit messages:", err)
}
}
if err := r.Close(); err != nil {
fmt.Println("failed to close reader:", err)
}
}
|