RabbitMQ 交换器常用三种类型介绍:direct: (默认)直接交换器;fanout: 广播式交换器; topic: 主题交换器


direct: (默认)直接交换器

工作方式类似于单播,Exchange会将消息发送完全匹配ROUTING_KEY的Queue

生产者

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$exchange = 'devops';
$queue = 'devops:cicd';
$route = 'devops:cicd';

$connection = new AMQPStreamConnection('10.0.0.100', '5672', 'guest', 'guest', '/');
$channel = $connection->channel();

$channel->queue_declare($queue, false, true, false, false);
$channel->queue_bind($queue, $exchange, $route);

$data = json_encode(['taskId' => strval(time()), 'name' => 'devops']);
$msg = new AMQPMessage($data);
$ret = $channel->basic_publish($msg, $exchange, $route);
var_dump($ret);

$channel->close();
$connection->close();

消费者

需要加上heartbeat参数,否则会被断开连接

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
use PhpAmqpLib\Connection\AMQPStreamConnection;

$exchange = 'devops';
$queue = 'devops:cicd';
$consumerTag = 'consumer';

$connection = AMQPStreamConnection::create_connection([
    ['host' => '10.0.0.100', 'port' => '5672', 'user' => 'guest', 'password' => 'guest', 'vhost' => '/']
], ['heartbeat' => 10]);
$channel = $connection->channel();

$channel->basic_consume($queue, $consumerTag, false, false, false, false, 'processMessage');
register_shutdown_function('shutdown', $channel, $connection);
$channel->consume();

function processMessage($message)
{
    echo "\n--------\n";
    echo $message->body;
    echo "\n--------\n";

    $message->ack();

    // Send a message with the string "quit" to cancel the consumer.
    if ($message->body === 'quit') {
        $message->getChannel()->basic_cancel($message->getConsumerTag());
    }
}

function shutdown($channel, $connection)
{
    $channel->close();
    $connection->close();
}

fanout: 广播式交换器

不管消息的ROUTING_KEY设置为什么,Exchange都会将消息转发给所有绑定的Queue(例如注册后需要发送欢迎短信和邮件,将注册行为广播至短信和邮件)

生产者

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$exchange = 'devops.register';
$queueSms = 'devops.register.sms';
$queueEmail = 'devops.register.email';

$connection = new AMQPStreamConnection('10.0.0.100', '5672', 'guest', 'guest', '/');
$channel = $connection->channel();

//注册短信消费者
$channel->queue_declare($queueSms, false, false, false, false);
$channel->queue_bind($queueSms, $exchange);

//注册邮件消费者
$channel->queue_declare($queueEmail, false, false, false, false);
$channel->queue_bind($queueEmail, $exchange);

$data = json_encode(['taskId' => strval(time()), 'name' => 'devops']);
$msg = new AMQPMessage($data);
$ret = $channel->basic_publish($msg, $exchange);
var_dump($ret);

$channel->close();
$connection->close();

topic: 主题交换器

工作方式类似于组播,Exchange会将消息转发和ROUTING_KEY匹配模式相同的所有队列(模糊匹配) (例如日志分级,一个消费者记录全量日志,一个消费者记录Error级别日志)

生产者

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$exchange = 'devops.logs';
$queueAll = 'devops.logs.all';
$queueError = 'devops.logs.error';

$connection = new AMQPStreamConnection('10.0.0.100', '5672', 'guest', 'guest', '/');
$channel = $connection->channel();

$num = rand(0, 10);
if ($num % 3 == 0) {
    $level = 'error';
} elseif ($num % 3 == 1) {
    $level = 'warning';
} else {
    $level = 'common';
}

//全量日志消费者
$channel->queue_declare($queueAll, false, false, false, false);
$channel->queue_bind($queueAll, $exchange, 'devops.logs.*');

//Error日志消费者
$channel->queue_declare($queueError, false, false, false, false);
$channel->queue_bind($queueError, $exchange, 'devops.logs.error');

$data = json_encode(['taskId' => strval(time()), 'name' => 'devops']);
$msg = new AMQPMessage($data);
$ret = $channel->basic_publish($msg, $exchange, 'devops.logs.' . $level);
var_dump($ret);

$channel->close();
$connection->close();