- 🏠 简介
- 🔌 api接口
-
🔧 插件开发
- 介绍
- 插件优势
- 新建插件
- 技术栈
- 插件开发规范
- 插件目录结构
- model 层开发
- service 层开发
- api 接口开发
- menu 菜单开发
-
event 事件钩子开发
- 开发指南
- 常用事件
- SiteInit 站点初始化事件
- AddSiteAfter 站点创建后事件
- MemberRegister 会员注册后事件
- MemberLogin 会员登录后事件
- PayCreate 支付创建事件
- PaySuccess 支付成功事件
- RefundSuccess 退款成功事件
- TransferSuccess 转账成功事件
- BottomNavigation 底部导航事件
- NoticeData 消息模板数据内容事件
- GetQrcodeOfChannel 创建二维码事件
- ExportDataType 导出数据类型事件
- ExportData 导出数据源事件
- GetPosterType 海报类型事件
- GetPosterData 海报数据事件
- ShowCustomer 应用管理加载事件
- GetWechatTransferTradeScene 微信支付转账场景事件
- ThemeColor 主题风格事件
- initWap 手机端初始化加载事件
- dict 数据字典开发
- layout 自定义布局开发
- job 消息队列和计划任务开发
- notice 消息发送开发
- printer 小票打印模板开发
- export 数据导出开发
- diy 自定义组件/页面装修开发
- diy 自定义表单组件开发
- poster 自定义海报开发
- icon 引入图标
- 支付方式开发
- upgrade 插件版本升级
- 打包插件
- 授权信息变更回调通知
- 官网上架
- 📝 二次开发须知
- 👨💻 二次开发指导
-
🎬 二次开发应用插件视频教程
- 二次开发安装视频教程
- 准备工作与创建插件
- 插件目录整体说明
- 插件admin目录
- 插件app目录说明(adminapi、api、验证器)
- 插件app目录(dict、job)
- 插件app目录说明(lang、listener)
- 插件app目录说明(model、service)
- 插件uniapp目录说明
- 插件开发之后台功能开发(代码生成器)
- 插件开发之uniapp功能开发(api)第一节
- 插件开发之uniapp功能开发(api)第二节
- 插件开发之uniapp功能开发(api)第三节
- 插件安装与打包原理
- 消息队列开发
- 计划任务开发
- DIY组件和自定义页面装修开发
- 支付接口开发
- 插件升级包打包流程以及云编译功能
消息队列
消息队列概述
在开发应用程序的时候,你可能需要执行一些任务,例如解析和存储一些大型文件,这些任务在Web 请求期间需要耗费很长时间才能执行完成。这时候我们就可以用消息队列来创建在后台处理的队列任务。 通过将时间密集型任务移至队列,这样你的应用程序可以以极快的速度响应 Web 请求,并为你的程序提供更好的用户体验。
传统的程序执行流程一般是 即时|同步|串行的,在某些场景下,会存在并发低,吞吐量低,响应时间长等问题。在大型系统中,一般会引入消息队列的组件,将流程中部分任务抽离出来放入消息队列,并由专门的消费者作针对性的处理,从而降低系统耦合度,提高系统性能和可用性。
一般来说,可以抽离的任务具有以下的特点:
●允许延后|异步|并行处理 (相对于传统的 即时|同步|串行 的执行方式):
○允许延后:抢购活动时,先快速缓冲有限的参与人数到消息队列,后续再排队处理实际的抢购业务
○允许异步:业务处理过程中的邮件,短信等通知
○允许并行:用户支付成功之后,邮件通知,微信通知,短信通知可以由多个不同的消费者并行执行,通知到达的时间不要求先后顺序。
●允许失败和重试:
○强一致性的业务放入核心流程处理
○无一致性要求或最终一致即可的业务放入队列处理
核心组件
-
BaseJob类:所有任务类的基类,负责任务的执行和错误处理
-
Dispatch类:负责任务的分发和推送
-
具体任务类:继承BaseJob,实现具体的业务逻辑
-
控制器:负责创建和推送任务
-
路由:提供访问入口
系统配置
当前队列有三种驱动方式
| 驱动方式 | 说明 |
|---|---|
| sync | 同步执行,有新队列任务则通过事件 Event 来直接触发执行,不存储任务,直接执行 |
| database | 数据库存储,新队列任务数据存储到数据库,队列执行程序再从数据库中读取任务数据 |
| redis(推荐) | Redis 存储,新队列任务数据存储到 Redis,队列执行程序再从 Redis 中读取任务数据 |
队列配置
先打开项目根目录的.env 文件,将queue下的state改成true(如果.env文件中没有queue配置的话要手动添加)
在项目根目录的 文件中添加或修改以下配置:

Redis配置
队列必须配置redis,打开.env文件,redis下的配置就是redis配置

完整工作流程
控制器调用 -> Dispatch::dispatch() -> Redis队列 -> queue:listen命令 -> BaseJob::fire() -> 具体任务类::doJob()
实现
步骤1:创建任务类
在 niucloud/app/job/ 目录下创建任务消费类:
<?php
namespace app\job;
use core\base\BaseJob;
/**
* 订单延时关闭任务
*/
class OrderClose extends BaseJob
{
/**
* 消费
* @param $order_id 订单id
* @param $close_time 关闭时间
* @return true
*/
protected function doJob(int $order_id, int $close_time)
{
//业务代码
//......
return true;
}
}
步骤2:创建控制器
新增一个控制器 niucloud\app\adminapi\controller\test\JobTest.php,在该控制器中添加 helloWorldJob 方法,实例化刚刚创建的任务类,并调用dispatch函数:
<?php
namespace app\adminapi\controller\test;
use app\job\OrderClose;
use core\base\BaseAdminController;
use think\Response;
/**
* 测试队列任务
*/
class JobTest extends BaseAdminController
{
/**
* 创建消息
* @return Response
*/
public function helloWorldJob()
{
$order_id = 1;//订单id
$close_time = time();//订单关闭时间
$is_pushed = OrderClose::dispatch(['order_id' => $order_id, 'close_time' => $close_time]);
if( $is_pushed !== false ){
echo date('Y-m-d H:i:s') . " Hello World Job is Pushed.";
}else{
echo 'Hello World Job Error.';
}
}
}
dispatch函数的参数介绍
dispatch($action, $data, $secs, $queue_name, $is_async):
$action: 任务消费类内部的方法,非必填,不填写或类型为数组的话的话默认会调用doJob函数,如果是数组的话则视为当前值为data参数
$data: 数据传参,他会将这儿的数组作为可变数量的参数传递给对应函数的给定变量参数
$secs: 延时执行时间(单位为秒),默认为0,支持延时执行和定时执行,如果为0就是普通的异步任务,如果传值则视为当前时间的*秒后延时执行
$queue_name: 当前任务归属的队列名称,如果为新队列,会自动创建,结合命令行使用
$is_async: 默认为true,当前任务是否需要异步执行,最高优先级
步骤3:配置路由
在 niucloud/app/adminapi/route/job.php 中添加路由:
<?php
use think\facade\Route;
Route::group('job', function () {
//获取本地插件
Route::get('test', 'test.JobTest/helloWorldJob');
});
发布任务
在浏览器中访问http://your.project.com/adminapi/job/test,可以看到消息推送成功。

启动队列监听进程
打开终端,切换到项目的 niucloud 目录,执行以下命令启动队列监听:
php think queue:listen
可以看到执行的结果类似如下:
如果需要监听特定队列:
php think queue:listen --queue=queue_name
命令行参数介绍:
php think queue:listen
--queue helloJobQueue #监听的队列的名称
--delay 0 #如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0
--memory 128 #该进程允许使用的内存上限,以 M 为单位
--sleep 3 #如果队列中无任务,则多长时间后重新检查,默认为3秒
--tries 0 #如果任务已经超过重发次数上限,则进入失败处理逻辑,默认为0
--timeout 60 #work进程允许执行的最长时间,以秒为单位
为了稳定要放在消息队列Supervisor下运行
至此,我们成功地完成了一个消息的 创建 -> 推送 -> 消费 -> 删除 的流程
特别注意:修改传参后需要重启 workerman 才可生效