Laravel集成rabbitMQ
1.安装扩展注意mq扩展和PHP版本laravel6和laravel7可以安装mq10
composer require vladimir-yuldashev/laravel-queue-rabbitmq "10.X" --ignore-platform-reqs
2.配置文件 queue配置文件种加入
'connections' => [
'rabbitmq' => [
'driver' => 'rabbitmq',
'queue' => env('RABBITMQ_QUEUE', 'default'),
'connection' => PhpAmqpLib\Connection\AMQPLazyConnection::class,
'hosts' => [
[
'host' => env('RABBITMQ_HOST', '127.0.0.1'),
'port' => env('RABBITMQ_PORT', 5672),
'user' => env('RABBITMQ_USER', 'guest'),
'password' => env('RABBITMQ_PASSWORD', 'guest'),
'vhost' => env('RABBITMQ_VHOST', '/'),
],
],
'options' => [
'ssl_options' => [
'cafile' => env('RABBITMQ_SSL_CAFILE', null),
'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null),
'local_key' => env('RABBITMQ_SSL_LOCALKEY', null),
'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true),
'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null),
],
],
/*
* Set to "horizon" if you wish to use Laravel Horizon.
*/
'worker' => env('RABBITMQ_WORKER', 'default'),
]
]
3.env文件配置
QUEUE_DRIVER=rabbitmq #配置名称
RABBITMQ_HOST=容器ip地址
RABBITMQ_PORT=5672 #端口
RABBITMQ_USER=admin #mq账号
RABBITMQ_PASSWORD=admin#mq密码
RABBITMQ_VHOST=my_vhost #虚拟机的意思
RABBITMQ_QUEUE=队列名称 #队列名称
修改QUEUE_CONNECTION=rabbitmq
4.封装mq操作类
<?php namespace App\Http\Service; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; class RabbitmqService {
//链接mq private static function getConnect() { $config = [ 'host' => env('RABBITMQ_HOST', '127.0.0.1'), 'port' => env('RABBITMQ_PORT', 5672), 'user' => env('RABBITMQ_USER', 'guest'), 'password' => env('RABBITMQ_PASSWORD', 'guest'), 'vhost' => env('RABBITMQ_VHOST', '/'), ]; return new AMQPStreamConnection($config["host"],$config["port"],$config["user"],$config["password"],$config["vhost"]); } /** * @param $queue * @param $messageBody * @param string $exchange * @throws \Exception * 推送 */ public static function push($queue,$messageBody,$exchange = 'router') { $connection = self::getConnect(); $channel = $connection->channel(); $channel->queue_declare($queue,false,true,false,false);
//创建交换机路由策 direct严格模式 工作方式类似于单播,Exchange会将消息发送完全匹配ROUTING_KEY的Queue $channel->exchange_declare($exchange,'direct',false,true,false); $channel->queue_bind($queue, $exchange); // 队列和交换器绑定 $message = new AMQPMessage($messageBody, array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)); $channel->basic_publish($message,$exchange); // 推送消息 $channel->close(); $connection->close(); } /** * @param $queue * @param $callback * @param string $exchange * @throws \Exception * 消费 */ public static function pop($queue, $callback, $exchange = 'router') { $connection = self::getConnect(); $channel = $connection->channel(); $message = $channel->basic_get($queue); //取出消息 $res = $callback($message->body); if($res){ $channel->basic_ack($message->getDeliveryTag());//ack确认 } $channel->close(); $connection->close(); return true; } }
5.创建laravel Job类
<?php
namespace App\Jobs;
use App\Http\Service\RabbitmqService;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Log;
class UpdateProduct implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected $productKey;
/**
* Create a new job instance.
*
* @return void
*/
public function __construct($data)
{
$this->delay = 10;//延时10秒看效果如果不延时可不写
//推送修改到对列里
$this->productKey = "L::product::info::".$data->id;
Log::info('L'.$this->productKey);
Log::info('L'.json_encode($data));
RabbitmqService::push('update_queue',$data);
}
/**
* Execute the job.
*
* @return void
*/
public function handle()
{
//消费对列 update_queue队列名称
RabbitmqService::pop('update_queue',function ($message){
Log::info('L::product::info::------'.$this->productKey);
Log::info('L::product::info::msg------'.json_encode($message));
$product = //这里根据自己的业务执行相应的逻辑返回true 和 false
if (!$product){
return false;
}
return true;
});
}
public function failed(\Exception $exception)
{
print_r($exception->getMessage());
}
}
6.业务调用任务类
$this->dispatch(new UpdateProduct(传递对应的参数));
7.监听队列执行
php artisan queue:work
可使用第三方工具监听队列
https://www.xiaoshu168.com/php/288.html
安装mq案例
https://www.xiaoshu168.com/docker/364.html
如果PHP没有安装amqp扩展参考文章
本文由:xiaoshu168 作者:xiaoshu发表,转载请注明来源!