//go:build plus package tasks import ( "encoding/json" "errors" "fmt" teaconst "github.com/TeaOSLab/EdgeAPI/internal/const" "github.com/TeaOSLab/EdgeAPI/internal/db/models" "github.com/TeaOSLab/EdgeAPI/internal/goman" "github.com/TeaOSLab/EdgeAPI/internal/remotelogs" "github.com/TeaOSLab/EdgeAPI/internal/senders/mediasenders" "github.com/TeaOSLab/EdgeAPI/internal/utils" "github.com/TeaOSLab/EdgeCommon/pkg/monitorconfigs" "github.com/iwind/TeaGo/dbs" "github.com/iwind/TeaGo/maps" "github.com/iwind/TeaGo/types" timeutil "github.com/iwind/TeaGo/utils/time" "sync" "time" ) func init() { dbs.OnReadyDone(func() { goman.New(func() { NewSendMessagesTask(30 * time.Second).Start() }) }) } type MessageSendingStat struct { Timestamp int64 Count int32 } // SendMessagesTask 发送消息任务 type SendMessagesTask struct { BaseTask statMap map[int64]*MessageSendingStat // instanceId => *Stat statLocker sync.Mutex ticker *time.Ticker } // NewSendMessagesTask 获取一个实例 func NewSendMessagesTask(duration time.Duration) *SendMessagesTask { return &SendMessagesTask{ statMap: map[int64]*MessageSendingStat{}, ticker: time.NewTicker(duration), } } // Start 启动任务 func (this *SendMessagesTask) Start() { for range this.ticker.C { err := this.Loop() if err != nil { remotelogs.Error("SEND_MESSAGE_TASK", err.Error()) } } } // Loop 单次循环 func (this *SendMessagesTask) Loop() error { if !this.IsPrimaryNode() { return nil } var cacheMap = utils.NewCacheMap() var tx *dbs.Tx messageTasks, err := models.SharedMessageTaskDAO.FindSendingMessageTasks(tx, 1000) if err != nil { return err } // 产品名称 productName, err := models.SharedSysSettingDAO.ReadProductName(tx) if err != nil { return err } if len(productName) == 0 { productName = teaconst.GlobalProductName } for _, task := range messageTasks { resp, shouldSkip, sendErr := this.send(tx, task, productName, cacheMap) if shouldSkip { continue } var isOk = sendErr == nil var resultMap = maps.Map{ "isOk": isOk, "response": resp, "error": "", } if sendErr != nil { resultMap["error"] = sendErr.Error() } var newStatus = models.MessageTaskStatusSuccess if !isOk { newStatus = models.MessageTaskStatusFailed } sendErr = models.SharedMessageTaskDAO.UpdateMessageTaskStatus(tx, int64(task.Id), newStatus, resultMap.AsJSON()) if sendErr != nil { remotelogs.Error("SEND_MESSAGE_TASK", sendErr.Error()) } // 创建发送记录 if newStatus == models.MessageTaskStatusSuccess || newStatus == models.MessageTaskStatusFailed { createLogErr := models.SharedMessageTaskLogDAO.CreateLog(tx, int64(task.Id), isOk, resultMap.GetString("error"), resp) if createLogErr != nil { remotelogs.Error("SEND_MESSAGE_TASK", createLogErr.Error()) } } } return nil } // CheckRate 检查发送频率 func (this *SendMessagesTask) CheckRate(instanceId int64, minutes int32, count int32) bool { if minutes <= 0 || count <= 0 { return true } this.statLocker.Lock() defer this.statLocker.Unlock() var nowTime = time.Now().Unix() stat, ok := this.statMap[instanceId] if ok { if stat.Timestamp < nowTime-int64(minutes*60) { // 时间不够 stat.Count = 1 stat.Timestamp = nowTime return true } else if stat.Count < count { // 次数不足 stat.Count++ return true } else { return false } } else { this.statMap[instanceId] = &MessageSendingStat{ Timestamp: nowTime, Count: 1, } return true } } func (this *SendMessagesTask) StatMap() map[int64]*MessageSendingStat { return this.statMap } // 发送 func (this *SendMessagesTask) send(tx *dbs.Tx, task *models.MessageTask, productName string, cacheMap *utils.CacheMap) (response string, shouldSkip bool, sendErr error) { // recipient recipient, err := models.SharedMessageRecipientDAO.FindEnabledMessageRecipient(tx, int64(task.RecipientId), cacheMap) if err != nil { return "", false, err } if recipient == nil { return "", false, errors.New("could not find recipient with id '" + types.String(task.RecipientId) + "'") } // instance var instanceId = int64(task.InstanceId) if instanceId <= 0 { instanceId = int64(recipient.InstanceId) } if instanceId <= 0 { return "", false, errors.New("could not find instance with id '" + types.String(instanceId) + "'") } var instance *models.MessageMediaInstance instance, err = models.SharedMessageMediaInstanceDAO.FindEnabledMessageMediaInstance(tx, instanceId, cacheMap) if err != nil { return "", false, err } if instance == nil { return "", false, errors.New("could not find instance with id '" + types.String(instanceId) + "'") } var rateConfig = &monitorconfigs.RateConfig{} if len(instance.Rate) > 0 { err = json.Unmarshal(instance.Rate, rateConfig) if err != nil { return "", false, fmt.Errorf("decode instance rate json failed: %w", err) } else if rateConfig.Count > 0 && rateConfig.Minutes > 0 && !this.CheckRate(instanceId, rateConfig.Minutes, rateConfig.Count) { return "", true, nil } } // user var toUser = task.User if len(toUser) == 0 && len(recipient.User) > 0 { toUser = recipient.User } // TODO 考虑重用? media, err := mediasenders.NewMediaInstance(instance.MediaType, instance.Params) if err != nil { return "", false, err } if media == nil { return "", false, errors.New("could not create media instance for '" + instance.MediaType + "'") } respBytes, err := media.Send(toUser, task.Subject, task.Body, productName, timeutil.FormatTime("Y-m-d H:i:s", int64(task.CreatedAt))) if err != nil { return "", false, err } return string(respBytes), false, nil }