1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
|
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"runtime/debug"
"strings"
"time"
es6 "github.com/elastic/go-elasticsearch/v6"
"github.com/elastic/go-elasticsearch/v6/esapi"
"github.com/segmentio/kafka-go"
)
var (
brokers = []string{"10.0.0.1:9092", "10.0.0.2:9092", "10.0.0.3:9092"}
topic = "web_pay_app_log"
es *es6.Client
err error
)
// Message数据还需要进一步解析,偷懒没有做
type message struct {
Host struct {
Name string `json:"name"`
} `json:"host"`
Fields struct {
Env string `json:"env"`
App string `json:"app"`
} `json:"fields"`
Message string `json:"message"`
}
type document struct {
Env string
App string
Host string
Message string
}
func main() {
goWithRecover(func() {
consumer()
})
select {}
}
func consumer() {
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
GroupID: "consumer-group-" + topic,
Topic: topic,
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
})
cfg := es6.Config{
Addresses: []string{
"http://192.168.1.211:9200",
},
}
if es, err = es6.NewClient(cfg); err != nil {
log.Fatalf("Error creating the client: %s", err)
}
ctx := context.Background()
for {
m, err := r.ReadMessage(ctx)
if err != nil {
break
}
var (
msg message
doc document
)
json.Unmarshal(m.Value, &msg)
doc.App = msg.Fields.App
doc.Env = msg.Fields.Env
doc.Host = msg.Host.Name
doc.Message = msg.Message
body, _ := json.Marshal(doc)
writeToEs(string(body))
if err := r.CommitMessages(ctx, m); err != nil {
fmt.Println("failed to commit messages:", err)
}
}
if err = r.Close(); err != nil {
fmt.Println("failed to close reader:", err)
}
}
func writeToEs(body string) {
var index = fmt.Sprintf("web-pay-%s", time.Now().Format("20060102"))
goWithRecover(func() {
req := esapi.IndexRequest{
Index: index,
Body: strings.NewReader(body),
Refresh: "true",
}
res, err := req.Do(context.Background(), es)
if res != nil {
defer res.Body.Close()
}
if err != nil {
log.Println("Error getting response:", err)
}
})
}
func goWithRecover(fn func()) {
go runSafe(fn)
}
func runSafe(fn func()) {
defer func() {
if p := recover(); p != nil {
fmt.Println("panic recover", "panic", p, "stack", string(debug.Stack()))
}
}()
fn()
}
|