需求背景:在Kibana能够方便查询服务端程序记录的日志,考虑到公司技术栈多样性,有java、php、golang、python、lua等等。我们采用的方案是程序写本地日志,然后通过Filebeat 上报到Kafka,最后把kafka数据消费写入到Elasticsearch。


流程图

filebeat-kafka-es

程序写入本地文本日志

php程序示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
public static function warning(string $category, string $message, array $context = [])
{
    try {
        self::makeLogger($category, "warning")->warning($message, $context);
    } catch (\Exception $e) {
        Notification::dingtalk($e->getMessage());
    }
}

public static function info(string $category, array $context = [])
{
    try {
        self::makeLogger($category, "info")->info("-", $context);
    } catch (\Exception $e) {
        Notification::dingtalk($e->getMessage());
    }
}

日志内容

1
2
2022-12-20 01:37:40 INFO rk api '2000000230897' 192.168.1.3 /order/verify | - | {"orderId":"2000000230897","status":1}
2022-12-20 01:37:40 INFO rk api 'GPA.3316-8111-3644-22366' 192.168.1.3 /order/verify | - | {"orderId":"GPA.3316-8111-3644-22366","status":1}

filebeat上报到kafka

配置文件:web-pay.yml

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
#=========================== Filebeat inputs =============================

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /data/wwwroot/pay/logs/*/*
  fields:
    env: beta
    appname: web
    module: pay

#==================== Elasticsearch template setting ==========================
setup.template.enabled: false

#----------------------------- Kafka output --------------------------------
output.kafka:
 enabled: true
 hosts: ["192.168.1.5:9092"]
 topic: 'web_pay_app_log'
 max_retries: 3

启动filebeat

1
./filebeat -e -c web-pay.yml

kafka消费数据到Elasticsearch

写入ES数据格式

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
{
   "@timestamp": "2023-01-07T13:26:49.552Z",
   "@metadata": {
      "beat": "filebeat",
      "type": "doc",
      "version": "6.4.1",
      "topic": "web_pay_app_log"
   },
   "offset": 0,
   "message": "2022-12-20 01:37:40 INFO rk api 'GPA.3316-8111-3644-22366' 192.168.1.3 /order/verify | - | {\"orderId\":\"GPA.3316-8111-3644-22366\",\"status\":1}",
   "prospector": {
      "type": "log"
   },
   "input": {
      "type": "log"
   },
   "fields": {
      "env": "beta",
      "appname": "web",
      "module": "pay"
   },
   "beat": {
      "name": "OA",
      "hostname": "OA",
      "version": "6.4.1"
   },
   "host": {
      "name": "OA"
   },
   "source": "/data/wwwroot/pay/logs/rk/inapp-2023-01-07.log"
}

golang消费kafka数据到ES

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"runtime/debug"
	"strings"
	"time"

	es6 "github.com/elastic/go-elasticsearch/v6"
	"github.com/elastic/go-elasticsearch/v6/esapi"
	"github.com/segmentio/kafka-go"
)

var (
	brokers = []string{"10.0.0.1:9092", "10.0.0.2:9092", "10.0.0.3:9092"}
	topic   = "web_pay_app_log"

	es  *es6.Client
	err error
)

// Message数据还需要进一步解析,偷懒没有做
type message struct {
	Host struct {
		Name string `json:"name"`
	} `json:"host"`
	Fields struct {
		Env string `json:"env"`
		App string `json:"app"`
	} `json:"fields"`
	Message string `json:"message"`
}

type document struct {
	Env     string
	App     string
	Host    string
	Message string
}

func main() {
	goWithRecover(func() {
		consumer()
	})

	select {}
}

func consumer() {
	r := kafka.NewReader(kafka.ReaderConfig{
		Brokers:  brokers,
		GroupID:  "consumer-group-" + topic,
		Topic:    topic,
		MinBytes: 10e3, // 10KB
		MaxBytes: 10e6, // 10MB
	})

	cfg := es6.Config{
		Addresses: []string{
			"http://192.168.1.211:9200",
		},
	}
	if es, err = es6.NewClient(cfg); err != nil {
		log.Fatalf("Error creating the client: %s", err)
	}

	ctx := context.Background()
	for {
		m, err := r.ReadMessage(ctx)
		if err != nil {
			break
		}

		var (
			msg message
			doc document
		)
		json.Unmarshal(m.Value, &msg)

		doc.App = msg.Fields.App
		doc.Env = msg.Fields.Env
		doc.Host = msg.Host.Name
		doc.Message = msg.Message

		body, _ := json.Marshal(doc)
		writeToEs(string(body))
		if err := r.CommitMessages(ctx, m); err != nil {
			fmt.Println("failed to commit messages:", err)
		}
	}

	if err = r.Close(); err != nil {
		fmt.Println("failed to close reader:", err)
	}
}

func writeToEs(body string) {
	var index = fmt.Sprintf("web-pay-%s", time.Now().Format("20060102"))
	goWithRecover(func() {
		req := esapi.IndexRequest{
			Index:   index,
			Body:    strings.NewReader(body),
			Refresh: "true",
		}
		res, err := req.Do(context.Background(), es)
		if res != nil {
			defer res.Body.Close()
		}
		if err != nil {
			log.Println("Error getting response:", err)
		}
	})
}

func goWithRecover(fn func()) {
	go runSafe(fn)
}

func runSafe(fn func()) {
	defer func() {
		if p := recover(); p != nil {
			fmt.Println("panic recover", "panic", p, "stack", string(debug.Stack()))
		}
	}()

	fn()
}

Kibana查看数据

kibana