devinggo devinggo
首页
  • 系统介绍
  • 开发环境搭建
  • 快速开始
  • 生产部署
  • 目录结构
  • MVC开发
  • 权限控制
  • 拦截器
  • 定时任务
  • 消息队列
  • websocket服务器
  • 缓存
  • module开发
  • 代码自动生成
  • 命令
前端开发
常见问题
演示
GitHub (opens new window)
首页
  • 系统介绍
  • 开发环境搭建
  • 快速开始
  • 生产部署
  • 目录结构
  • MVC开发
  • 权限控制
  • 拦截器
  • 定时任务
  • 消息队列
  • websocket服务器
  • 缓存
  • module开发
  • 代码自动生成
  • 命令
前端开发
常见问题
演示
GitHub (opens new window)
  • 目录结构
  • MVC开发
  • 权限控制
  • 拦截器
  • 定时任务
  • 消息队列
    • websocket服务器
    • 缓存
    • module开发
    • 代码自动生成
    • 命令
    • 系统开发
    Kai
    2025-01-07
    目录

    消息队列

    # 创建一个消息队列步聚如下:

    1. 定义task
    //见 modules\system\worker\task\test_task\test_task.go 新文件建在同一目录
    package test_task
    
    import (
    	"context"
    	"devinggo/modules/system/pkg/worker/glob"
    	"devinggo/modules/system/pkg/worker/task"
    	"devinggo/modules/system/worker/consts"
    	"github.com/hibiken/asynq"
    )
    
    type ctestTask struct {
    	Type    string
    	Payload *glob.Payload
    }
    
    type TestTaskData struct { // 传输的数据格式,重要
    	Name string `json:"name"`
    }
    
    func New() *ctestTask {
    	return &ctestTask{
    		Type: consts.TEST_TASK, // 任务名称,重要
    		Payload: &glob.Payload{
    			//Time:     asynq.ProcessIn(gconv.Duration("1s")),
    			/*
    				asynq.Retention  执行后保留一段时间删除,唯一
    				asynq.ProcessIn(time.Second * 5)  等待一段时间后执行 延迟5s执行
    				asynq.ProcessAt(time.Now().Add(time.Second*10)) 指定时间执行,当前时间+10s执行
    			*/
    			Time:   asynq.ProcessIn(0),
    			TaskID: consts.TEST_TASK, // 任务id,可写死,也可以随机,写死后任务同一时间只会执行一条
    		},
    	}
    }
    
    func (s *ctestTask) GetType() string {
    	return s.Type
    }
    
    func (s *ctestTask) Send(ctx context.Context, data interface{}) error {
    	s.Payload.Data = data
    	return task.NewTask(ctx, s)
    }
    
    func (s *ctestTask) GetPayload() *glob.Payload {
    	return s.Payload
    }
    
    1. 绑定执行任务的worker
    // 见 modules\system\worker\server\test_worker.go 新文件建在同一目录
    
    package server
    
    import (
    	glob2 "devinggo/modules/system/pkg/worker/glob"
    	"devinggo/modules/system/pkg/worker/server"
    	"devinggo/modules/system/worker/consts"
    	"devinggo/modules/system/worker/cron"
    	"context"
    	"github.com/hibiken/asynq"
    )
    
    var testWorker = &cTestWorker{
    	Type: consts.TEST_TASK,  #任务名称,同上面定义相同,重要
    }
    
    type cTestWorker struct {
    	Type string
    }
    
    func init() {
    	server.Register(testWorker)
    }
    
    func (s *cTestWorker) GetType() string {
    	return s.Type
    }
    
    // Execute 执行任务
    func (s *cTestWorker) Execute(ctx context.Context, t *asynq.Task) (err error) {
    	data, err := glob2.GetParamters[cron.TestCronData](ctx, t)
    	if err != nil {
    		return err
    	}
    	glob2.WithWorkLog().Infof(ctx, `jsonData:%+v`, data)
        // todo 执行代码,在这里添加
    	return
    }
    
    
    1. 生产一条消息
    testTask := New()
    testTask.Send(context.Background(), &TestTaskData{Name: "helloworld"})
    
    上次更新: 2025/01/08, 17:33:46
    定时任务
    websocket服务器

    ← 定时任务 websocket服务器→

    Theme by Vdoing | Copyright © 2025-2025 Kai
    • 跟随系统
    • 浅色模式
    • 深色模式
    • 阅读模式