221 lines
5.6 KiB
Go
221 lines
5.6 KiB
Go
//go:build plus
|
|
|
|
package tasks
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
teaconst "github.com/TeaOSLab/EdgeAPI/internal/const"
|
|
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
|
|
"github.com/TeaOSLab/EdgeAPI/internal/goman"
|
|
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
|
|
"github.com/TeaOSLab/EdgeAPI/internal/senders/mediasenders"
|
|
"github.com/TeaOSLab/EdgeAPI/internal/utils"
|
|
"github.com/TeaOSLab/EdgeCommon/pkg/monitorconfigs"
|
|
"github.com/iwind/TeaGo/dbs"
|
|
"github.com/iwind/TeaGo/maps"
|
|
"github.com/iwind/TeaGo/types"
|
|
timeutil "github.com/iwind/TeaGo/utils/time"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
func init() {
|
|
dbs.OnReadyDone(func() {
|
|
goman.New(func() {
|
|
NewSendMessagesTask(30 * time.Second).Start()
|
|
})
|
|
})
|
|
}
|
|
|
|
type MessageSendingStat struct {
|
|
Timestamp int64
|
|
Count int32
|
|
}
|
|
|
|
// SendMessagesTask 发送消息任务
|
|
type SendMessagesTask struct {
|
|
BaseTask
|
|
|
|
statMap map[int64]*MessageSendingStat // instanceId => *Stat
|
|
statLocker sync.Mutex
|
|
|
|
ticker *time.Ticker
|
|
}
|
|
|
|
// NewSendMessagesTask 获取一个实例
|
|
func NewSendMessagesTask(duration time.Duration) *SendMessagesTask {
|
|
return &SendMessagesTask{
|
|
statMap: map[int64]*MessageSendingStat{},
|
|
ticker: time.NewTicker(duration),
|
|
}
|
|
}
|
|
|
|
// Start 启动任务
|
|
func (this *SendMessagesTask) Start() {
|
|
for range this.ticker.C {
|
|
err := this.Loop()
|
|
if err != nil {
|
|
remotelogs.Error("SEND_MESSAGE_TASK", err.Error())
|
|
}
|
|
}
|
|
}
|
|
|
|
// Loop 单次循环
|
|
func (this *SendMessagesTask) Loop() error {
|
|
if !this.IsPrimaryNode() {
|
|
return nil
|
|
}
|
|
|
|
var cacheMap = utils.NewCacheMap()
|
|
var tx *dbs.Tx
|
|
messageTasks, err := models.SharedMessageTaskDAO.FindSendingMessageTasks(tx, 1000)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// 产品名称
|
|
productName, err := models.SharedSysSettingDAO.ReadProductName(tx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if len(productName) == 0 {
|
|
productName = teaconst.GlobalProductName
|
|
}
|
|
|
|
for _, task := range messageTasks {
|
|
resp, shouldSkip, sendErr := this.send(tx, task, productName, cacheMap)
|
|
if shouldSkip {
|
|
continue
|
|
}
|
|
var isOk = sendErr == nil
|
|
var resultMap = maps.Map{
|
|
"isOk": isOk,
|
|
"response": resp,
|
|
"error": "",
|
|
}
|
|
if sendErr != nil {
|
|
resultMap["error"] = sendErr.Error()
|
|
}
|
|
var newStatus = models.MessageTaskStatusSuccess
|
|
if !isOk {
|
|
newStatus = models.MessageTaskStatusFailed
|
|
}
|
|
|
|
sendErr = models.SharedMessageTaskDAO.UpdateMessageTaskStatus(tx, int64(task.Id), newStatus, resultMap.AsJSON())
|
|
if sendErr != nil {
|
|
remotelogs.Error("SEND_MESSAGE_TASK", sendErr.Error())
|
|
}
|
|
|
|
// 创建发送记录
|
|
if newStatus == models.MessageTaskStatusSuccess || newStatus == models.MessageTaskStatusFailed {
|
|
createLogErr := models.SharedMessageTaskLogDAO.CreateLog(tx, int64(task.Id), isOk, resultMap.GetString("error"), resp)
|
|
if createLogErr != nil {
|
|
remotelogs.Error("SEND_MESSAGE_TASK", createLogErr.Error())
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// CheckRate 检查发送频率
|
|
func (this *SendMessagesTask) CheckRate(instanceId int64, minutes int32, count int32) bool {
|
|
if minutes <= 0 || count <= 0 {
|
|
return true
|
|
}
|
|
|
|
this.statLocker.Lock()
|
|
defer this.statLocker.Unlock()
|
|
|
|
var nowTime = time.Now().Unix()
|
|
|
|
stat, ok := this.statMap[instanceId]
|
|
if ok {
|
|
if stat.Timestamp < nowTime-int64(minutes*60) { // 时间不够
|
|
stat.Count = 1
|
|
stat.Timestamp = nowTime
|
|
return true
|
|
} else if stat.Count < count { // 次数不足
|
|
stat.Count++
|
|
return true
|
|
} else {
|
|
return false
|
|
}
|
|
} else {
|
|
this.statMap[instanceId] = &MessageSendingStat{
|
|
Timestamp: nowTime,
|
|
Count: 1,
|
|
}
|
|
return true
|
|
}
|
|
}
|
|
|
|
func (this *SendMessagesTask) StatMap() map[int64]*MessageSendingStat {
|
|
return this.statMap
|
|
}
|
|
|
|
// 发送
|
|
func (this *SendMessagesTask) send(tx *dbs.Tx, task *models.MessageTask, productName string, cacheMap *utils.CacheMap) (response string, shouldSkip bool, sendErr error) {
|
|
// recipient
|
|
recipient, err := models.SharedMessageRecipientDAO.FindEnabledMessageRecipient(tx, int64(task.RecipientId), cacheMap)
|
|
if err != nil {
|
|
return "", false, err
|
|
}
|
|
if recipient == nil {
|
|
return "", false, errors.New("could not find recipient with id '" + types.String(task.RecipientId) + "'")
|
|
}
|
|
|
|
// instance
|
|
var instanceId = int64(task.InstanceId)
|
|
if instanceId <= 0 {
|
|
instanceId = int64(recipient.InstanceId)
|
|
}
|
|
if instanceId <= 0 {
|
|
return "", false, errors.New("could not find instance with id '" + types.String(instanceId) + "'")
|
|
}
|
|
|
|
var instance *models.MessageMediaInstance
|
|
instance, err = models.SharedMessageMediaInstanceDAO.FindEnabledMessageMediaInstance(tx, instanceId, cacheMap)
|
|
if err != nil {
|
|
return "", false, err
|
|
}
|
|
if instance == nil {
|
|
return "", false, errors.New("could not find instance with id '" + types.String(instanceId) + "'")
|
|
}
|
|
|
|
var rateConfig = &monitorconfigs.RateConfig{}
|
|
if len(instance.Rate) > 0 {
|
|
err = json.Unmarshal(instance.Rate, rateConfig)
|
|
if err != nil {
|
|
return "", false, fmt.Errorf("decode instance rate json failed: %w", err)
|
|
} else if rateConfig.Count > 0 && rateConfig.Minutes > 0 && !this.CheckRate(instanceId, rateConfig.Minutes, rateConfig.Count) {
|
|
return "", true, nil
|
|
}
|
|
}
|
|
|
|
// user
|
|
var toUser = task.User
|
|
if len(toUser) == 0 && len(recipient.User) > 0 {
|
|
toUser = recipient.User
|
|
}
|
|
|
|
// TODO 考虑重用?
|
|
media, err := mediasenders.NewMediaInstance(instance.MediaType, instance.Params)
|
|
if err != nil {
|
|
return "", false, err
|
|
}
|
|
|
|
if media == nil {
|
|
return "", false, errors.New("could not create media instance for '" + instance.MediaType + "'")
|
|
}
|
|
|
|
respBytes, err := media.Send(toUser, task.Subject, task.Body, productName, timeutil.FormatTime("Y-m-d H:i:s", int64(task.CreatedAt)))
|
|
if err != nil {
|
|
return "", false, err
|
|
}
|
|
|
|
return string(respBytes), false, nil
|
|
}
|