rabbitmq消息确认和持久化

消息确认:

消费消息确认模式有:

  • AcknowledgeMode.NONE:自动确认
  • AcknowledgeMode.AUTO:根据情况确认
  • AcknowledgeMode.MANUAL:手动确认
  • 1、可以自动创建队列,也可以手动创建队列,如果自动创建队列,那么是谁负责创建队列呢?是生产者?还是消费者?
    如果队列不存在,当然消费者不会收到任何的消息。但是如果队列不存在,那么生产者发送的消息就会丢失。所以,为了数据不丢失,消费者和生产者都可以创建队列。那么如果创建一个已经存在的队列呢?那么不会有任何的影响。需要注意的是没有任何的影响,也就是说第二次创建如果参数和第一次不一样,那么该操作虽然成功,但是队列属性并不会改变。队列对于负载均衡的处理是完美的。对于多个消费者来说,RabbitMQ使用轮询的方式均衡的发送给不同的消费者。

    2、RabbitMQ的消息确认机制

    默认情况下,如果消息已经被某个消费者正确的接收到了,那么该消息就会被从队列中移除。当然也可以让同一个消息发送到很多的消费者。

    如果一个队列没有消费者,那么,如果这个队列有数据到达,那么这个数据会被缓存,不会被丢弃。当有消费者时,这个数据会被立即发送到这个消费者,这个数据被消费者正确收到时,这个数据就被从队列中删除。

    那么什么是正确收到呢?通过ack。每个消息都要被acknowledged(确认,ack)。我们可以显示的在程序中去ack,也可以自动的ack。如果有数据没有被ack,那么:

    RabbitMQ Server会把这个信息发送到下一个消费者。

    如果这个app有bug,忘记了ack,那么RabbitMQServer不会再发送数据给它,因为Server认为这个消费者处理能力有限。

    而且ack的机制可以起到限流的作用(Benefitto throttling):在消费者处理完成数据后发送ack,甚至在额外的延时后发送ack,将有效的均衡消费者的负载。

    消息持久化:

rabbitmq生产与消费php代码剖析

RabbitMQ常用的交换器类型有fanout、direct、topic、headers这四种

fanout(广播)

它会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中,转发消息是最快的。

direct(定向)

direct类型的交换器路由规则也很简单,它会把消息路由到那些BindingKey和RoutingKey完全匹配的队列中。

topic(通配符)

前面讲到direct类型的交换器路由规则是完全匹配BindingKey和RoutingKey,但是这种严格的匹配方式在很多情况下不能满足实际业务的需求。topic类型的交换器在匹配规则上进行了扩展,它与direct类型的交换器相似,也是将消息路由到BindingKey和RoutingKey相匹配的队列中,但这里的匹配规则有些不同,它约定:

(一)RoutingKey为一个点好“.”分隔的字符串(被点号“.”分隔开的每一段独立的字符串称为一个单词),如“com.rabbitmq.client”、“com.hidden.client”。

(二)BindingKey和RoutingKey一样也是点号“.”分隔的字符串。

(三)BindingKey中可以存在两种特殊字符串“*”和“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配零个或多个单词。

headers

headers类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。在绑定队列和交换器时指定一组键值对,当发送消息到交换器时,RabbitMQ会获取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对,如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers类型的交换器性能会很差,而且也不实用,基本上不会看到它的存在

匹配规则x-match有下列两种类型:

x-match = all :表示所有的键值对都匹配才能接受到消息

x-match = any :表示只要有键值对匹配就能接受到消息

代码示例send.php

<?php

require 'vendor/autoload.php';
$conn = [
    // Rabbitmq 服务地址
    'host' => '192.168.222.129',
    // Rabbitmq 服务端口
    'port' => '5672',
    // Rabbitmq 帐号
    'login' => 'guest',
    // Rabbitmq 密码
    'password' => 'guest',
    'vhost'=>'/'
];

//创建连接和channel
$conn = new AMQPConnection($conn);
if(!$conn->connect()) {
    die("Cannot connect to the broker!\n");
}
$channel = new AMQPChannel($conn);

// 用来绑定交换机和队列
$routingKey = 'key_1';

$ex = new AMQPExchange($channel);
//  交换机名称
$exchangeName = 'ex1';
$ex->setName($exchangeName);

// 设置交换机类型
$ex->setType(AMQP_EX_TYPE_DIRECT);
// 设置交换机是否持久化消息
$ex->setFlags(AMQP_DURABLE);
$ex->declare();

for($i=0; $i<5; ++$i){
    echo "Send Message:".$ex->publish(date('H:i:s')."用户".$i."注册" , $routingKey )."\n";
}

consume.php 创建队列,阻塞状态消费信息

<?php
/**
 * Des 描述
 * Date 2020/8/24
 * Time  10:32
 * User ycl
 */
require 'vendor/autoload.php';
$conn = [
    // Rabbitmq 服务地址
    'host' => '192.168.222.129',
    // Rabbitmq 服务端口
    'port' => '5672',
    // Rabbitmq 帐号
    'login' => 'guest',
    // Rabbitmq 密码
    'password' => 'guest',
    'vhost'=>'/'
];
try {

//创建连接和channel
    $conn = new AMQPConnection($conn);
    if (!$conn->connect()) {
        die("Cannot connect to the broker!\n");
    }
    $channel = new AMQPChannel($conn);
    $exchangeName = 'ex1';

//创建交换机
    $ex = new AMQPExchange($channel);
    $ex->setName($exchangeName);

    $ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型
    $ex->setFlags(AMQP_DURABLE); //持久化交换机
    $ex->declare();

//  创建队列
    $queueName = 'queue1';
    $q = new AMQPQueue($channel);
    $q->setName($queueName);
    $q->setFlags(AMQP_DURABLE);//持久化队列
    $q->declareQueue();

// 用于绑定队列和交换机,跟 send.php 中的一致。
    $routingKey = 'key_1';
    $q->bind($exchangeName, $routingKey);

//接收消息
    $q->consume(function ($envelope, $queue) {
        $msg = $envelope->getBody();
        echo $msg . "\n"; //处理消息
    }, AMQP_AUTOACK);//消息自动确认

    $conn->disconnect();
}catch (AMQPException $e){
    echo $e->getMessage();
}

dockrer安装RabbitMq

RabbitMQ 常用的交换器类型有 fanout direct topic headers 这四种

   1  docker pull rabbitmq
   2  进入docker配置页面  docker exec -it 0e6840bd6241 /bin/bash
       执行安装客户端插件:rabbitmq-plugins enable rabbitmq_management
   3  运行容器: 5672用于php链接,15672用于客户端链接 
    docker run -itd --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq

测试:

浏览器输入:http://192.168.222.129:15672     默认用户名guest密码guest

php测试rabbitmq

composer安装扩展

"require": {
    "php-amqplib/php-amqplib": ">=2.6.1"
}

新建send.php 端口号为5672

<?php
/**
 * Des 描述
 * Date 2020/8/24
 * Time  10:31
 * User ycl
 */
require 'vendor/autoload.php';
$conn = [
    // Rabbitmq 服务地址
    'host' => '192.168.222.129',
    // Rabbitmq 服务端口
    'port' => '5672',
    // Rabbitmq 帐号
    'login' => 'guest',
    // Rabbitmq 密码
    'password' => 'guest',
    'vhost'=>'/'
];

//创建连接和channel
$conn = new AMQPConnection($conn);
if(!$conn->connect()) {
    die("Cannot connect to the broker!\n");
}
$channel = new AMQPChannel($conn);

// 用来绑定交换机和队列
$routingKey = 'key_1';

$ex = new AMQPExchange($channel);
//  交换机名称
$exchangeName = 'ex1';
$ex->setName($exchangeName);

// 设置交换机类型
$ex->setType(AMQP_EX_TYPE_DIRECT);
// 设置交换机是否持久化消息
$ex->setFlags(AMQP_DURABLE);
$ex->declare();

for($i=0; $i<5; ++$i){
    echo "Send Message:".$ex->publish(date('H:i:s')."用户".$i."注册" , $routingKey )."\n";
}

新建consume.php 端口号为5672

<?php
/**
 * Des 描述
 * Date 2020/8/24
 * Time  10:32
 * User ycl
 */
require 'vendor/autoload.php';
$conn = [
    // Rabbitmq 服务地址
    'host' => '192.168.222.129',
    // Rabbitmq 服务端口
    'port' => '5672',
    // Rabbitmq 帐号
    'login' => 'guest',
    // Rabbitmq 密码
    'password' => 'guest',
    'vhost'=>'/'
];
try {

//创建连接和channel
    $conn = new AMQPConnection($conn);
    if (!$conn->connect()) {
        die("Cannot connect to the broker!\n");
    }
    $channel = new AMQPChannel($conn);
    $exchangeName = 'ex1';

//创建交换机
    $ex = new AMQPExchange($channel);
    $ex->setName($exchangeName);

    $ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型
    $ex->setFlags(AMQP_DURABLE); //持久化
    $ex->declare();

//  创建队列
    $queueName = 'queue1';
    $q = new AMQPQueue($channel);
    $q->setName($queueName);
    $q->setFlags(AMQP_DURABLE);
    $q->declareQueue();

// 用于绑定队列和交换机,跟 send.php 中的一致。
    $routingKey = 'key_1';
    $q->bind($exchangeName, $routingKey);

//接收消息
    $q->consume(function ($envelope, $queue) {
        $msg = $envelope->getBody();
        echo $msg . "\n"; //处理消息
    }, AMQP_AUTOACK);

    $conn->disconnect();
}catch (AMQPException $e){
    echo $e->getMessage();
}

执行

先执行 php consume.php

新开窗口执行 php send.php

结果 consume.php执行窗口会有输出 成功