消息队列
# 创建一个消息队列步聚如下:
- 定义task
//见 modules\system\worker\task\test_task\test_task.go 新文件建在同一目录
package test_task
import (
"context"
"devinggo/modules/system/pkg/worker/glob"
"devinggo/modules/system/pkg/worker/task"
"devinggo/modules/system/worker/consts"
"github.com/hibiken/asynq"
)
type ctestTask struct {
Type string
Payload *glob.Payload
}
type TestTaskData struct { // 传输的数据格式,重要
Name string `json:"name"`
}
func New() *ctestTask {
return &ctestTask{
Type: consts.TEST_TASK, // 任务名称,重要
Payload: &glob.Payload{
//Time: asynq.ProcessIn(gconv.Duration("1s")),
/*
asynq.Retention 执行后保留一段时间删除,唯一
asynq.ProcessIn(time.Second * 5) 等待一段时间后执行 延迟5s执行
asynq.ProcessAt(time.Now().Add(time.Second*10)) 指定时间执行,当前时间+10s执行
*/
Time: asynq.ProcessIn(0),
TaskID: consts.TEST_TASK, // 任务id,可写死,也可以随机,写死后任务同一时间只会执行一条
},
}
}
func (s *ctestTask) GetType() string {
return s.Type
}
func (s *ctestTask) Send(ctx context.Context, data interface{}) error {
s.Payload.Data = data
return task.NewTask(ctx, s)
}
func (s *ctestTask) GetPayload() *glob.Payload {
return s.Payload
}
- 绑定执行任务的worker
// 见 modules\system\worker\server\test_worker.go 新文件建在同一目录
package server
import (
glob2 "devinggo/modules/system/pkg/worker/glob"
"devinggo/modules/system/pkg/worker/server"
"devinggo/modules/system/worker/consts"
"devinggo/modules/system/worker/cron"
"context"
"github.com/hibiken/asynq"
)
var testWorker = &cTestWorker{
Type: consts.TEST_TASK, #任务名称,同上面定义相同,重要
}
type cTestWorker struct {
Type string
}
func init() {
server.Register(testWorker)
}
func (s *cTestWorker) GetType() string {
return s.Type
}
// Execute 执行任务
func (s *cTestWorker) Execute(ctx context.Context, t *asynq.Task) (err error) {
data, err := glob2.GetParamters[cron.TestCronData](ctx, t)
if err != nil {
return err
}
glob2.WithWorkLog().Infof(ctx, `jsonData:%+v`, data)
// todo 执行代码,在这里添加
return
}
- 生产一条消息
testTask := New()
testTask.Send(context.Background(), &TestTaskData{Name: "helloworld"})
上次更新: 2025/01/08, 17:33:46