Go Rabbitmq 示例代码, 使用github.com/streadway/amqp库, 包含生产和消费示例代码


初始化连接

需要注意断线重连

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package rabbitmq

import (
	"fmt"
	"log"
	"time"

	"github.com/streadway/amqp"
)

var (
	MqConn        *amqp.Connection
	MqChan        *amqp.Channel
	MqDone        chan bool
	notifyClose   chan *amqp.Error
	mqIsConnected bool
)

const (
	reconnectDelay = 2 * time.Second
)

func InitConnect() {
	MqDone = make(chan bool, 1)

	if !connect() {
		log.Fatalln("MQ连接失败")
	}

	go handleReconnect()
}

func handleReconnect() {
	for {
		<-notifyClose
		mqIsConnected = false
		if mqIsConnected {
			continue
		}

		log.Println("Attempting to connect mq")
		for !connect() {
			log.Println("Failed to connect mq. Retrying...")
			time.Sleep(reconnectDelay)
		}

		log.Println("MQ重新连接成功")
	}
}

func connect() bool {
	var err error

	MqConn, err = amqp.Dial("amqp://guest:[email protected]:5672/")
	if err != nil {
		fmt.Println("MQ打开链接失败:" + err.Error())
		return false
	}

	MqChan, err = MqConn.Channel()
	if err != nil {
		fmt.Println("MQ打开管道失败:" + err.Error())
		return false
	}

	changeConnection()
	mqIsConnected = true
	MqDone <- true

	return true
}

func changeConnection() {
	notifyClose = make(chan *amqp.Error)
	MqChan.NotifyClose(notifyClose)
}

消费者

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package rabbitmq

import "github.com/streadway/amqp"

type Consumer struct {
	queueName string
}

func NewConsumer(queueName string) *Consumer {
	return &Consumer{
		queueName: queueName,
	}
}

func (r *Consumer) ConsumeAck() (msgs <-chan amqp.Delivery, err error) {
	err = MqChan.Qos(100, 0, false)
	if err != nil {
		return
	}

	msgs, err = MqChan.Consume(
		r.queueName, // queue
		"",          // consumer
		false,       // auto-ack   不进行自动ACK
		false,       // exclusive
		false,       // no-local
		false,       // no-wait
		nil,         // args
	)

	return
}

生产者

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package rabbitmq

import (
	"errors"

	"github.com/streadway/amqp"
)

type Producer struct {
	queueName    string
	routingKey   string
	exchangeName string
}

func NewProducer(queueName string, routingKey string, exchangeName string) *Producer {
	return &Producer{
		queueName:    queueName,
		routingKey:   routingKey,
		exchangeName: exchangeName,
	}
}

func (r *Producer) AddQueue() (err error) {
	_, err = MqChan.QueueDeclare(
		r.queueName,
		true,
		false,
		false,
		true,
		nil,
	)

	return
}

func (r *Producer) BindQueue() error {
	return MqChan.QueueBind(
		r.queueName,
		r.routingKey,
		r.exchangeName,
		true,
		nil,
	)
}

func (r *Producer) Push(body []byte, headers map[string]interface{}) (err error) {
	if !mqIsConnected {
		return errors.New("MQ无法连接")
	}

	return MqChan.Publish(
		r.exchangeName,
		r.routingKey,
		false,
		false,
		amqp.Publishing{
			DeliveryMode: 2,
			ContentType:  "application/json",
			Headers:      headers,
			Body:         body,
		},
	)
}

入口程序

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package main

import (
	"encoding/json"
	"fmt"
	"go-study/pkg/rabbitmq"
	"log"
	"runtime/debug"
	"time"
)

const (
	exchangeName string = "devops"
	queueName    string = "devops:cicd"
)

func main() {
	rabbitmq.InitConnect()

	//初始化队列
	initQueue()

	//启动消费
	startConsumer()

	//生产
	header := make(map[string]interface{})
	producer := rabbitmq.NewProducer(queueName, queueName, exchangeName)
	for {
		data := make(map[string]interface{})
		data["taskId"] = time.Now().Unix()
		body, _ := json.Marshal(data)

		if err := producer.Push(body, header); err != nil {
			log.Println("producer error:", err.Error())
		}

		time.Sleep(2 * time.Second)
	}

}

func initQueue() {
	producer := rabbitmq.NewProducer(queueName, queueName, exchangeName)
	if err := producer.AddQueue(); err != nil {
		log.Fatalln("rabbitmq add queue error:", err.Error())
	}
	if err := producer.BindQueue(); err != nil {
		log.Fatalln("rabbitmq bind queue error:", err.Error())
	}
}

func startConsumer() {
	goWithRecover(func() {
		for {
			<-rabbitmq.MqDone
			consumeCiCd()
			time.Sleep(2 * time.Second)
		}
	})
}

func consumeCiCd() {
	consumers := rabbitmq.NewConsumer(queueName)
	messages, err := consumers.ConsumeAck()
	if err != nil {
		log.Println("consume error:", err.Error())
		return
	}

	log.Println("Start Consume")
	goWithRecover(func() {
		for message := range messages {
			fmt.Println(message.Headers, string(message.Body))
			if err := message.Ack(false); err != nil {
				log.Println("ack error:", err.Error())
			}
		}
	})
}

func goWithRecover(fn func()) {
	go runSafe(fn)
}

func runSafe(fn func()) {
	defer func() {
		if p := recover(); p != nil {
			fmt.Println("panic recover", "panic", p, "stack", string(debug.Stack()))
		}
	}()

	fn()
}