laravel8中laravel-swoole的扩展不兼容消息队列怎么办?

2022-05-15 0 1,005

下面由laravel教程栏目给大家介绍laravel-swoole消息队列,希望对需要的朋友有所帮助!

这段时间用laravel8+laravel-swoole做项目,可发现laravel-swoole的扩展不兼容消息队列;
laravel8中laravel-swoole的扩展不兼容消息队列怎么办?

思来想去这咋办呢,这咋办呢.咋办那就自己写咯!还好thinkphp-swoole扩展已经兼容了,那不就嘿嘿嘿!
laravel8中laravel-swoole的扩展不兼容消息队列怎么办?

直接上修改的思路和代码!开干!

一种是增加另外启动的命令或者在swoole启动的时候一起启动消息队列进行消费,我这么懒的人一个命令能解决的,绝不写两命令.

首先重写swoole启动命令

<?phpnamespace crmeb\\swoole\\command;use Illuminate\\Support\\Arr;use Swoole\\Process;use SwooleTW\\Http\\Server\\Facades\\Server;use SwooleTW\\Http\\Server\\Manager;use crmeb\\swoole\\server\\InteractsWithQueue;use crmeb\\swoole\\server\\FileWatcher;use Swoole\\Runtime;class HttpServerCommand extends \\SwooleTW\\Http\\Commands\\HttpServerCommand{    use InteractsWithQueue;    /**     * The name and signature of the console command.     *     * @var string     */    protected $signature = 'crmeb:http {action : start|stop|restart|reload|infos}';    /**     * Run swoole_http_server.     */    protected function start()    {        if ($this->isRunning()) {            $this->error('Failed! swoole_http_server process is already running.');            return;        }        $host             = Arr::get($this->config, 'server.host');        $port             = Arr::get($this->config, 'server.port');        $hotReloadEnabled = Arr::get($this->config, 'hot_reload.enabled');        $queueEnabled     = Arr::get($this->config, 'queue.enabled');        $accessLogEnabled = Arr::get($this->config, 'server.access_log');        $coroutineEnable  = Arr::get($this->config, 'coroutine.enable');        $this->info('Starting swoole http server...');        $this->info("Swoole http server started: <http://{$host}:{$port}>");        if ($this->isDaemon()) {            $this->info(                '> (You can run this command to ensure the ' .                'swoole_http_server process is running: ps aux|grep "swoole")'            );        }        $manager = $this->laravel->make(Manager::class);        $server  = $this->laravel->make(Server::class);        if ($accessLogEnabled) {            $this->registerAccessLog();        }        //热更新重写        if ($hotReloadEnabled) {            $manager->addProcess($this->getHotReloadProcessNow($server));        }        //启动消息队列进行消费        if ($queueEnabled) {            $this->prepareQueue($manager);        }        if ($coroutineEnable) {            Runtime::enableCoroutine(true, Arr::get($this->config, 'coroutine.flags', SWOOLE_HOOK_ALL));        }        $manager->run();    }    /**     * @param Server $server     * @return Process|void     */    protected function getHotReloadProcessNow($server)    {        return new Process(function () use ($server) {            $watcher = new FileWatcher(                Arr::get($this->config, 'hot_reload.include', []),                Arr::get($this->config, 'hot_reload.exclude', []),                Arr::get($this->config, 'hot_reload.name', [])            );            $watcher->watch(function () use ($server) {                $server->reload();            });        }, false, 0, true);    }}

InteractsWithQueue 类

<?phpnamespace crmeb\\swoole\\server;use crmeb\\swoole\\queue\\Manager as QueueManager;use SwooleTW\\Http\\Server\\Manager;/** * Trait InteractsWithQueue * @package crmeb\\swoole\\server */trait InteractsWithQueue{    public function prepareQueue(Manager $manager)    {        /** @var QueueManager $queueManager */        $queueManager = $this->laravel->make(QueueManager::class);        $queueManager->attachToServer($manager, $this->output);    }}

Manager类

<?phpnamespace crmeb\\swoole\\queue;use Illuminate\\Contracts\\Container\\Container;use Swoole\\Constant;use Swoole\\Process;use Swoole\\Process\\Pool;use Swoole\\Timer;use Illuminate\\Support\\Arr;use Illuminate\\Queue\\Events\\JobFailed;use Illuminate\\Queue\\Worker;use crmeb\\swoole\\server\\WithContainer;use Illuminate\\Queue\\Jobs\\Job;use function Swoole\\Coroutine\\run;use Illuminate\\Queue\\WorkerOptions;use SwooleTW\\Http\\Server\\Manager as ServerManager;use Illuminate\\Console\\OutputStyle;class Manager{    use WithContainer;    /**     * Container.     *     * @var \\Illuminate\\Contracts\\Container\\Container     */    protected $container;    /**     * @var OutputStyle     */    protected $output;    /**     * @var Closure[]     */    protected $workers = [];    /**     * Manager constructor.     * @param Container $container     */    public function __construct(Container $container)    {        $this->container = $container;    }    /**     * @param ServerManager $server     */    public function attachToServer(ServerManager $server, OutputStyle $output)    {        $this->output = $output;        $this->listenForEvents();        $this->createWorkers();        foreach ($this->workers as $worker) {            $server->addProcess(new Process($worker, false, 0, true));        }    }    /**     * 运行消息队列命令     */    public function run(): void    {        @cli_set_process_title("swoole queue: manager process");        $this->listenForEvents();        $this->createWorkers();        $pool = new Pool(count($this->workers));        $pool->on(Constant::EVENT_WORKER_START, function (Pool $pool, int $workerId) {            $process = $pool->getProcess($workerId);            run($this->workers[$workerId], $process);        });        $pool->start();    }    /**     * 创建执行任务     */    protected function createWorkers()    {        $workers = $this->getConfig('queue.workers', []);        foreach ($workers as $queue => $options) {            if (strpos($queue, '@') !== false) {                [$queue, $connection] = explode('@', $queue);            } else {                $connection = null;            }            $this->workers[] = function (Process $process) use ($options, $connection, $queue) {                @cli_set_process_title("swoole queue: worker process");                /** @var Worker $worker */                $worker = $this->container->make('queue.worker');                /** @var WorkerOptions $option */                $option = $this->container->make(WorkerOptions::class);                $option->sleep = Arr::get($options, "sleep", 3);                $option->maxTries = Arr::get($options, "tries", 0);                $option->timeout = Arr::get($options, "timeout", 60);                $timer = Timer::after($option->timeout * 1000, function () use ($process) {                    $process->exit();                });                $worker->runNextJob($connection, $queue, $option);                Timer::clear($timer);            };        }    }    /**     * 注册事件     */    protected function listenForEvents()    {        $this->container->make('events')->listen(JobFailed::class, function (JobFailed $event) {            $this->writeOutput($event->job);            $this->logFailedJob($event);        });    }    /**     * 记录失败任务     * @param JobFailed $event     */    protected function logFailedJob(JobFailed $event)    {        $this->container['queue.failer']->log(            $event->connection,            $event->job->getQueue(),            $event->job->getRawBody(),            $event->exception        );    }    /**     * Write the status output for the queue worker.     *     * @param Job $job     * @param     $status     */    protected function writeOutput(Job $job, $status)    {        switch ($status) {            case 'starting':                $this->writeStatus($job, 'Processing', 'comment');                break;            case 'success':                $this->writeStatus($job, 'Processed', 'info');                break;            case 'failed':                $this->writeStatus($job, 'Failed', 'error');                break;        }    }    /**     * Format the status output for the queue worker.     *     * @param Job $job     * @param string $status     * @param string $type     * @return void     */    protected function writeStatus(Job $job, $status, $type)    {        $this->output->writeln(sprintf(            "<{$type}>[%s][%s] %s</{$type}> %s",            date('Y-m-d H:i:s'),            $job->getJobId(),            str_pad("{$status}:", 11), $job->getName()        ));    }}

增加CrmebServiceProvider类

<?phpnamespace crmeb\\swoole;use Illuminate\\Contracts\\Debug\\ExceptionHandler;use Illuminate\\Contracts\\Http\\Kernel;use crmeb\\swoole\\command\\HttpServerCommand;use Illuminate\\Queue\\Worker;use SwooleTW\\Http\\HttpServiceProvider;use SwooleTW\\Http\\Middleware\\AccessLog;use SwooleTW\\Http\\Server\\Manager;/** * Class CrmebServiceProvider * @package crmeb\\swoole */class CrmebServiceProvider extends HttpServiceProvider{    /**     * Register manager.     *     * @return void     */    protected function registerManager()    {        $this->app->singleton(Manager::class, function ($app) {            return new Manager($app, 'laravel');        });        $this->app->alias(Manager::class, 'swoole.manager');        $this->app->singleton('queue.worker', function ($app) {            $isDownForMaintenance = function () {                return $this->app->isDownForMaintenance();            };            return new Worker(                $app['queue'],                $app['events'],                $app[ExceptionHandler::class],                $isDownForMaintenance            );        });    }    /**     * Boot websocket routes.     *     * @return void     */    protected function bootWebsocketRoutes()    {        require base_path('vendor/swooletw/laravel-swoole') . '/routes/laravel_routes.php';    }    /**     * Register access log middleware to container.     *     * @return void     */    protected function pushAccessLogMiddleware()    {        $this->app->make(Kernel::class)->pushMiddleware(AccessLog::class);    }    /**     * Register commands.     */    protected function registerCommands()    {        $this->commands([            HttpServerCommand::class,        ]);    }    /**     * Merge configurations.     */    protected function mergeConfigs()    {        $this->mergeConfigFrom(base_path('vendor/swooletw/laravel-swoole') . '/config/swoole_http.php', 'swoole_http');        $this->mergeConfigFrom(base_path('vendor/swooletw/laravel-swoole') . '/config/swoole_websocket.php', 'swoole_websocket');    }    /**     * Publish files of this package.     */    protected function publishFiles()    {        $this->publishes([            base_path('vendor/swooletw/laravel-swoole') . '/config/swoole_http.php' => base_path('config/swoole_http.php'),            base_path('vendor/swooletw/laravel-swoole') . '/config/swoole_websocket.php' => base_path('config/swoole_websocket.php'),            base_path('vendor/swooletw/laravel-swoole') . '/routes/websocket.php' => base_path('routes/websocket.php'),        ], 'laravel-swoole');    }}

然后再把\\crmeb\\swoole\\CrmebServiceProvider::class放入config/app.php中的providers中加载重写了swoole的命令启动方式

配置config/swoole_http.php

return [    'queue'        => [        //是否开启自动消费队列        'enabled' => true,        'workers' => [            //队列名称            'CRMEB' => []        ]    ],];

输入命令:
php artisan crmeb:http restart

swoole启动后就可以自动消费队列了。

相关推荐:最新的五个Laravel视频教程

收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

【声明:根据2013年1月30日《计算机软件保护条例》2次修订第17条规定: 为了学习和研究软件内含的设计思想和原理,通过安装、显示、传输或者存 储软件等方式使用软件的,可以不经软件著作权人许可,不向其支付报酬! 鉴于此,也希望大家按此说明研究软件!】
本站所有源码尽量保证原汁原味,如有特殊情况会作出声明及标注,网站资源不做任何二次加密(原版加密除外,不影响程序使用的不会做解密处理),方便您更好的学习参考。 在您的能力范围内,为了大环境的良性发展,请尽可能的选择正版资源。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。

即刻码站__国内靠谱的站长资源下载平台 php教程 laravel8中laravel-swoole的扩展不兼容消息队列怎么办? https://www.jike1995.com/36312.html

常见问题
  • 本站所有资源版权均属于原作者所有,这里所提供资源均只能用于参考学习用,请勿直接商用。若由于商用引起版权纠纷,一切责任均由使用
查看详情
  • 最常见的情况是下载不完整: 可对比下载完压缩包的与网盘上的容量,若小于网盘提示的容量则是这个原因。这是浏览器下载的bug,建议用百度
查看详情

相关文章

发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务