Files
waf-platform/EdgeAPI/internal/tasks/task_send_messages_plus.go
2026-02-04 20:27:13 +08:00

221 lines
5.6 KiB
Go

//go:build plus
package tasks
import (
"encoding/json"
"errors"
"fmt"
teaconst "github.com/TeaOSLab/EdgeAPI/internal/const"
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"github.com/TeaOSLab/EdgeAPI/internal/goman"
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
"github.com/TeaOSLab/EdgeAPI/internal/senders/mediasenders"
"github.com/TeaOSLab/EdgeAPI/internal/utils"
"github.com/TeaOSLab/EdgeCommon/pkg/monitorconfigs"
"github.com/iwind/TeaGo/dbs"
"github.com/iwind/TeaGo/maps"
"github.com/iwind/TeaGo/types"
timeutil "github.com/iwind/TeaGo/utils/time"
"sync"
"time"
)
func init() {
dbs.OnReadyDone(func() {
goman.New(func() {
NewSendMessagesTask(30 * time.Second).Start()
})
})
}
type MessageSendingStat struct {
Timestamp int64
Count int32
}
// SendMessagesTask 发送消息任务
type SendMessagesTask struct {
BaseTask
statMap map[int64]*MessageSendingStat // instanceId => *Stat
statLocker sync.Mutex
ticker *time.Ticker
}
// NewSendMessagesTask 获取一个实例
func NewSendMessagesTask(duration time.Duration) *SendMessagesTask {
return &SendMessagesTask{
statMap: map[int64]*MessageSendingStat{},
ticker: time.NewTicker(duration),
}
}
// Start 启动任务
func (this *SendMessagesTask) Start() {
for range this.ticker.C {
err := this.Loop()
if err != nil {
remotelogs.Error("SEND_MESSAGE_TASK", err.Error())
}
}
}
// Loop 单次循环
func (this *SendMessagesTask) Loop() error {
if !this.IsPrimaryNode() {
return nil
}
var cacheMap = utils.NewCacheMap()
var tx *dbs.Tx
messageTasks, err := models.SharedMessageTaskDAO.FindSendingMessageTasks(tx, 1000)
if err != nil {
return err
}
// 产品名称
productName, err := models.SharedSysSettingDAO.ReadProductName(tx)
if err != nil {
return err
}
if len(productName) == 0 {
productName = teaconst.GlobalProductName
}
for _, task := range messageTasks {
resp, shouldSkip, sendErr := this.send(tx, task, productName, cacheMap)
if shouldSkip {
continue
}
var isOk = sendErr == nil
var resultMap = maps.Map{
"isOk": isOk,
"response": resp,
"error": "",
}
if sendErr != nil {
resultMap["error"] = sendErr.Error()
}
var newStatus = models.MessageTaskStatusSuccess
if !isOk {
newStatus = models.MessageTaskStatusFailed
}
sendErr = models.SharedMessageTaskDAO.UpdateMessageTaskStatus(tx, int64(task.Id), newStatus, resultMap.AsJSON())
if sendErr != nil {
remotelogs.Error("SEND_MESSAGE_TASK", sendErr.Error())
}
// 创建发送记录
if newStatus == models.MessageTaskStatusSuccess || newStatus == models.MessageTaskStatusFailed {
createLogErr := models.SharedMessageTaskLogDAO.CreateLog(tx, int64(task.Id), isOk, resultMap.GetString("error"), resp)
if createLogErr != nil {
remotelogs.Error("SEND_MESSAGE_TASK", createLogErr.Error())
}
}
}
return nil
}
// CheckRate 检查发送频率
func (this *SendMessagesTask) CheckRate(instanceId int64, minutes int32, count int32) bool {
if minutes <= 0 || count <= 0 {
return true
}
this.statLocker.Lock()
defer this.statLocker.Unlock()
var nowTime = time.Now().Unix()
stat, ok := this.statMap[instanceId]
if ok {
if stat.Timestamp < nowTime-int64(minutes*60) { // 时间不够
stat.Count = 1
stat.Timestamp = nowTime
return true
} else if stat.Count < count { // 次数不足
stat.Count++
return true
} else {
return false
}
} else {
this.statMap[instanceId] = &MessageSendingStat{
Timestamp: nowTime,
Count: 1,
}
return true
}
}
func (this *SendMessagesTask) StatMap() map[int64]*MessageSendingStat {
return this.statMap
}
// 发送
func (this *SendMessagesTask) send(tx *dbs.Tx, task *models.MessageTask, productName string, cacheMap *utils.CacheMap) (response string, shouldSkip bool, sendErr error) {
// recipient
recipient, err := models.SharedMessageRecipientDAO.FindEnabledMessageRecipient(tx, int64(task.RecipientId), cacheMap)
if err != nil {
return "", false, err
}
if recipient == nil {
return "", false, errors.New("could not find recipient with id '" + types.String(task.RecipientId) + "'")
}
// instance
var instanceId = int64(task.InstanceId)
if instanceId <= 0 {
instanceId = int64(recipient.InstanceId)
}
if instanceId <= 0 {
return "", false, errors.New("could not find instance with id '" + types.String(instanceId) + "'")
}
var instance *models.MessageMediaInstance
instance, err = models.SharedMessageMediaInstanceDAO.FindEnabledMessageMediaInstance(tx, instanceId, cacheMap)
if err != nil {
return "", false, err
}
if instance == nil {
return "", false, errors.New("could not find instance with id '" + types.String(instanceId) + "'")
}
var rateConfig = &monitorconfigs.RateConfig{}
if len(instance.Rate) > 0 {
err = json.Unmarshal(instance.Rate, rateConfig)
if err != nil {
return "", false, fmt.Errorf("decode instance rate json failed: %w", err)
} else if rateConfig.Count > 0 && rateConfig.Minutes > 0 && !this.CheckRate(instanceId, rateConfig.Minutes, rateConfig.Count) {
return "", true, nil
}
}
// user
var toUser = task.User
if len(toUser) == 0 && len(recipient.User) > 0 {
toUser = recipient.User
}
// TODO 考虑重用?
media, err := mediasenders.NewMediaInstance(instance.MediaType, instance.Params)
if err != nil {
return "", false, err
}
if media == nil {
return "", false, errors.New("could not create media instance for '" + instance.MediaType + "'")
}
respBytes, err := media.Send(toUser, task.Subject, task.Body, productName, timeutil.FormatTime("Y-m-d H:i:s", int64(task.CreatedAt)))
if err != nil {
return "", false, err
}
return string(respBytes), false, nil
}