PHP

首页 -  PHP  -  mq队列Laravel集成rabbitMQ

mq队列Laravel集成rabbitMQ

Laravel集成rabbitMQ

1.安装扩展注意mq扩展和PHP版本laravel6和laravel7可以安装mq10

composer require vladimir-yuldashev/laravel-queue-rabbitmq "10.X" --ignore-platform-reqs

2.配置文件 queue配置文件种加入

'connections' => [
    'rabbitmq' => [
        'driver' => 'rabbitmq',
        'queue' => env('RABBITMQ_QUEUE', 'default'),
        'connection' => PhpAmqpLib\Connection\AMQPLazyConnection::class,

        'hosts' => [
            [
                'host' => env('RABBITMQ_HOST', '127.0.0.1'),
                'port' => env('RABBITMQ_PORT', 5672),
                'user' => env('RABBITMQ_USER', 'guest'),
                'password' => env('RABBITMQ_PASSWORD', 'guest'),
                'vhost' => env('RABBITMQ_VHOST', '/'),
            ],
        ],

        'options' => [
            'ssl_options' => [
                'cafile' => env('RABBITMQ_SSL_CAFILE', null),
                'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null),
                'local_key' => env('RABBITMQ_SSL_LOCALKEY', null),
                'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true),
                'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null),
            ],
        ],

        /*
         * Set to "horizon" if you wish to use Laravel Horizon.
         */
        'worker' => env('RABBITMQ_WORKER', 'default'),

    ]
]

3.env文件配置

QUEUE_DRIVER=rabbitmq #配置名称
RABBITMQ_HOST=容器ip地址
RABBITMQ_PORT=5672 #端口
RABBITMQ_USER=admin #mq账号
RABBITMQ_PASSWORD=admin#mq密码
RABBITMQ_VHOST=my_vhost #虚拟机的意思
RABBITMQ_QUEUE=队列名称 #队列名称
修改QUEUE_CONNECTION=rabbitmq

4.封装mq操作类

<?php
namespace App\Http\Service;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class RabbitmqService
{      //链接mq
    private static function getConnect()
    {
        $config = [
            'host' => env('RABBITMQ_HOST', '127.0.0.1'),
            'port' => env('RABBITMQ_PORT', 5672),
            'user' => env('RABBITMQ_USER', 'guest'),
            'password' => env('RABBITMQ_PASSWORD', 'guest'),
            'vhost' => env('RABBITMQ_VHOST', '/'),
        ];
        return new AMQPStreamConnection($config["host"],$config["port"],$config["user"],$config["password"],$config["vhost"]);
    }

    /**
     * @param $queue
     * @param $messageBody
     * @param string $exchange
     * @throws \Exception
     * 推送
     */
    public static function push($queue,$messageBody,$exchange = 'router')
    {
        $connection = self::getConnect();
        $channel = $connection->channel();
        $channel->queue_declare($queue,false,true,false,false);        //创建交换机路由策 direct严格模式 工作方式类似于单播,Exchange会将消息发送完全匹配ROUTING_KEY的Queue
        $channel->exchange_declare($exchange,'direct',false,true,false);
        $channel->queue_bind($queue, $exchange); // 队列和交换器绑定
        $message = new AMQPMessage($messageBody, array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));
        $channel->basic_publish($message,$exchange); // 推送消息
        $channel->close();
        $connection->close();
    }

    /**
     * @param $queue
     * @param $callback
     * @param string $exchange
     * @throws \Exception
     * 消费
     */
    public static function pop($queue, $callback, $exchange = 'router')
    {
        $connection = self::getConnect();
        $channel = $connection->channel();
        $message = $channel->basic_get($queue); //取出消息
        $res = $callback($message->body);
        if($res){
            $channel->basic_ack($message->getDeliveryTag());//ack确认
        }
        $channel->close();
        $connection->close();
        return true;
    }
}

5.创建laravel  Job类

<?php
namespace App\Jobs;
use App\Http\Service\RabbitmqService;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Log;

class UpdateProduct implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    protected $productKey;

    /**
     * Create a new job instance.
     *
     * @return void
     */
    public function __construct($data)
    {
        $this->delay = 10;//延时10秒看效果如果不延时可不写
        //推送修改到对列里
        $this->productKey = "L::product::info::".$data->id;
        Log::info('L'.$this->productKey);
        Log::info('L'.json_encode($data));
        RabbitmqService::push('update_queue',$data);
    }

    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle()
    {
        //消费对列 update_queue队列名称
        RabbitmqService::pop('update_queue',function ($message){
            Log::info('L::product::info::------'.$this->productKey);
            Log::info('L::product::info::msg------'.json_encode($message));
            $product = //这里根据自己的业务执行相应的逻辑返回true 和 false
            if (!$product){
                return false;
            }
            return true;
        });

    }

    public function failed(\Exception $exception)
    {
        print_r($exception->getMessage());
    }
}

6.业务调用任务类

  $this->dispatch(new UpdateProduct(传递对应的参数));

7.监听队列执行

php artisan queue:work

可使用第三方工具监听队列

https://www.xiaoshu168.com/php/288.html

安装mq案例

https://www.xiaoshu168.com/docker/364.html

如果PHP没有安装amqp扩展参考文章

https://www.xiaoshu168.com/docker/366.html

(1)
分享:

本文由:xiaoshu168 作者:xiaoshu发表,转载请注明来源!

标签:

相关阅读