Skip to content

Commit 092d2ed

Browse files
committed
实现发送消息到媒介
1 parent 3a21656 commit 092d2ed

24 files changed

Lines changed: 548 additions & 71 deletions

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ require (
1414
github.com/go-sql-driver/mysql v1.5.0
1515
github.com/go-yaml/yaml v2.1.0+incompatible
1616
github.com/golang/protobuf v1.4.2
17-
github.com/iwind/TeaGo v0.0.0-20210325033016-3279bdaa087d
17+
github.com/iwind/TeaGo v0.0.0-20210411134150-ddf57e240c2f
1818
github.com/lionsoul2014/ip2region v2.2.0-release+incompatible
1919
github.com/mozillazg/go-pinyin v0.18.0
2020
github.com/pkg/sftp v1.12.0

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,8 @@ github.com/iwind/TeaGo v0.0.0-20210302120856-7588e79bdbe3 h1:k9K3HHMmkF7HYyIHz21
181181
github.com/iwind/TeaGo v0.0.0-20210302120856-7588e79bdbe3/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc=
182182
github.com/iwind/TeaGo v0.0.0-20210325033016-3279bdaa087d h1:FQTYJmZeCMdwM0Bz+C4h31SDBt04ap6A4JOjm+FfYwk=
183183
github.com/iwind/TeaGo v0.0.0-20210325033016-3279bdaa087d/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc=
184+
github.com/iwind/TeaGo v0.0.0-20210411134150-ddf57e240c2f h1:r2O8PONj/KiuZjJHVHn7KlCePUIjNtgAmvLfgRafQ8o=
185+
github.com/iwind/TeaGo v0.0.0-20210411134150-ddf57e240c2f/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc=
184186
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
185187
github.com/jmespath/go-jmespath v0.3.0 h1:OS12ieG61fsCg5+qLJ+SsW9NicxNkg3b25OyT2yCeUc=
186188
github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik=

internal/db/models/message_dao.go

Lines changed: 88 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,12 @@ const (
2525
type MessageType = string
2626

2727
const (
28-
MessageTypeHealthCheckFailed MessageType = "HealthCheckFailed"
29-
MessageTypeHealthCheckNodeUp MessageType = "HealthCheckNodeUp"
30-
MessageTypeHealthCheckNodeDown MessageType = "HealthCheckNodeDown"
31-
MessageTypeNodeInactive MessageType = "NodeInactive"
32-
MessageTypeNodeActive MessageType = "NodeActive"
33-
MessageTypeClusterDNSSyncFailed MessageType = "ClusterDNSSyncFailed"
28+
MessageTypeHealthCheckFailed MessageType = "HealthCheckFailed" // 节点健康检查失败
29+
MessageTypeHealthCheckNodeUp MessageType = "HealthCheckNodeUp" // 因健康检查节点上线
30+
MessageTypeHealthCheckNodeDown MessageType = "HealthCheckNodeDown" // 因健康检查节点下线
31+
MessageTypeNodeInactive MessageType = "NodeInactive" // 节点不活跃
32+
MessageTypeNodeActive MessageType = "NodeActive" // 节点活跃
33+
MessageTypeClusterDNSSyncFailed MessageType = "ClusterDNSSyncFailed" // DNS同步失败
3434
MessageTypeSSLCertExpiring MessageType = "SSLCertExpiring" // SSL证书即将过期
3535
MessageTypeSSLCertACMETaskFailed MessageType = "SSLCertACMETaskFailed" // SSL证书任务执行失败
3636
MessageTypeSSLCertACMETaskSuccess MessageType = "SSLCertACMETaskSuccess" // SSL证书任务执行成功
@@ -60,7 +60,7 @@ func init() {
6060
})
6161
}
6262

63-
// 启用条目
63+
// EnableMessage 启用条目
6464
func (this *MessageDAO) EnableMessage(tx *dbs.Tx, id int64) error {
6565
_, err := this.Query(tx).
6666
Pk(id).
@@ -69,7 +69,7 @@ func (this *MessageDAO) EnableMessage(tx *dbs.Tx, id int64) error {
6969
return err
7070
}
7171

72-
// 禁用条目
72+
// DisableMessage 禁用条目
7373
func (this *MessageDAO) DisableMessage(tx *dbs.Tx, id int64) error {
7474
_, err := this.Query(tx).
7575
Pk(id).
@@ -78,7 +78,7 @@ func (this *MessageDAO) DisableMessage(tx *dbs.Tx, id int64) error {
7878
return err
7979
}
8080

81-
// 查找启用中的条目
81+
// FindEnabledMessage 查找启用中的条目
8282
func (this *MessageDAO) FindEnabledMessage(tx *dbs.Tx, id int64) (*Message, error) {
8383
result, err := this.Query(tx).
8484
Pk(id).
@@ -90,20 +90,60 @@ func (this *MessageDAO) FindEnabledMessage(tx *dbs.Tx, id int64) (*Message, erro
9090
return result.(*Message), err
9191
}
9292

93-
// 创建集群消息
94-
func (this *MessageDAO) CreateClusterMessage(tx *dbs.Tx, clusterId int64, messageType MessageType, level string, body string, paramsJSON []byte) error {
95-
_, err := this.createMessage(tx, clusterId, 0, messageType, level, body, paramsJSON)
96-
return err
93+
// CreateClusterMessage 创建集群消息
94+
func (this *MessageDAO) CreateClusterMessage(tx *dbs.Tx, clusterId int64, messageType MessageType, level string, subject string, body string, paramsJSON []byte) error {
95+
_, err := this.createMessage(tx, clusterId, 0, messageType, level, subject, body, paramsJSON)
96+
if err != nil {
97+
return err
98+
}
99+
100+
// 发送给媒介接收人
101+
err = SharedMessageTaskDAO.CreateMessageTasks(tx, MessageTaskTarget{
102+
ClusterId: clusterId,
103+
NodeId: 0,
104+
ServerId: 0,
105+
}, messageType, subject, body)
106+
if err != nil {
107+
return err
108+
}
109+
110+
return nil
97111
}
98112

99-
// 创建节点消息
100-
func (this *MessageDAO) CreateNodeMessage(tx *dbs.Tx, clusterId int64, nodeId int64, messageType MessageType, level string, body string, paramsJSON []byte) error {
101-
_, err := this.createMessage(tx, clusterId, nodeId, messageType, level, body, paramsJSON)
102-
return err
113+
// CreateNodeMessage 创建节点消息
114+
func (this *MessageDAO) CreateNodeMessage(tx *dbs.Tx, clusterId int64, nodeId int64, messageType MessageType, level string, subject string, body string, paramsJSON []byte) error {
115+
_, err := this.createMessage(tx, clusterId, nodeId, messageType, level, subject, body, paramsJSON)
116+
if err != nil {
117+
return err
118+
}
119+
120+
// 发送给媒介接收人 - 集群
121+
err = SharedMessageTaskDAO.CreateMessageTasks(tx, MessageTaskTarget{
122+
ClusterId: clusterId,
123+
NodeId: 0,
124+
ServerId: 0,
125+
}, messageType, subject, body)
126+
if err != nil {
127+
return err
128+
}
129+
130+
// 发送给媒介接收人 - 节点
131+
if nodeId > 0 {
132+
err = SharedMessageTaskDAO.CreateMessageTasks(tx, MessageTaskTarget{
133+
ClusterId: clusterId,
134+
NodeId: nodeId,
135+
ServerId: 0,
136+
}, messageType, subject, body)
137+
if err != nil {
138+
return err
139+
}
140+
}
141+
142+
return nil
103143
}
104144

105-
// 创建普通消息
106-
func (this *MessageDAO) CreateMessage(tx *dbs.Tx, adminId int64, userId int64, messageType MessageType, level string, body string, paramsJSON []byte) error {
145+
// CreateMessage 创建普通消息
146+
func (this *MessageDAO) CreateMessage(tx *dbs.Tx, adminId int64, userId int64, messageType MessageType, level string, subject string, body string, paramsJSON []byte) error {
107147
h := md5.New()
108148
h.Write([]byte(body))
109149
h.Write(paramsJSON)
@@ -114,6 +154,14 @@ func (this *MessageDAO) CreateMessage(tx *dbs.Tx, adminId int64, userId int64, m
114154
op.UserId = userId
115155
op.Type = messageType
116156
op.Level = level
157+
158+
subjectRunes := []rune(subject)
159+
if len(subjectRunes) > 100 {
160+
op.Subject = string(subjectRunes[:100]) + "..."
161+
} else {
162+
op.Subject = subject
163+
}
164+
117165
op.Body = body
118166
if len(paramsJSON) > 0 {
119167
op.Params = paramsJSON
@@ -123,10 +171,14 @@ func (this *MessageDAO) CreateMessage(tx *dbs.Tx, adminId int64, userId int64, m
123171
op.Day = timeutil.Format("Ymd")
124172
op.Hash = hash
125173
err := this.Save(tx, op)
126-
return err
174+
if err != nil {
175+
return err
176+
}
177+
178+
return nil
127179
}
128180

129-
// 删除某天之前的消息
181+
// DeleteMessagesBeforeDay 删除某天之前的消息
130182
func (this *MessageDAO) DeleteMessagesBeforeDay(tx *dbs.Tx, dayTime time.Time) error {
131183
day := timeutil.Format("Ymd", dayTime)
132184
_, err := this.Query(tx).
@@ -136,7 +188,7 @@ func (this *MessageDAO) DeleteMessagesBeforeDay(tx *dbs.Tx, dayTime time.Time) e
136188
return err
137189
}
138190

139-
// 计算未读消息数量
191+
// CountUnreadMessages 计算未读消息数量
140192
func (this *MessageDAO) CountUnreadMessages(tx *dbs.Tx, adminId int64, userId int64) (int64, error) {
141193
query := this.Query(tx).
142194
Attr("isRead", false)
@@ -149,7 +201,7 @@ func (this *MessageDAO) CountUnreadMessages(tx *dbs.Tx, adminId int64, userId in
149201
return query.Count()
150202
}
151203

152-
// 列出单页未读消息
204+
// ListUnreadMessages 列出单页未读消息
153205
func (this *MessageDAO) ListUnreadMessages(tx *dbs.Tx, adminId int64, userId int64, offset int64, size int64) (result []*Message, err error) {
154206
query := this.Query(tx).
155207
Attr("isRead", false)
@@ -168,7 +220,7 @@ func (this *MessageDAO) ListUnreadMessages(tx *dbs.Tx, adminId int64, userId int
168220
return
169221
}
170222

171-
// 设置消息已读状态
223+
// UpdateMessageRead 设置消息已读状态
172224
func (this *MessageDAO) UpdateMessageRead(tx *dbs.Tx, messageId int64, b bool) error {
173225
if messageId <= 0 {
174226
return errors.New("invalid messageId")
@@ -180,7 +232,7 @@ func (this *MessageDAO) UpdateMessageRead(tx *dbs.Tx, messageId int64, b bool) e
180232
return err
181233
}
182234

183-
// 设置一组消息为已读状态
235+
// UpdateMessagesRead 设置一组消息为已读状态
184236
func (this *MessageDAO) UpdateMessagesRead(tx *dbs.Tx, messageIds []int64, b bool) error {
185237
// 这里我们一个一个更改,因为In语句不容易Prepare,且效率不高
186238
for _, messageId := range messageIds {
@@ -192,7 +244,7 @@ func (this *MessageDAO) UpdateMessagesRead(tx *dbs.Tx, messageIds []int64, b boo
192244
return nil
193245
}
194246

195-
// 设置所有消息为已读
247+
// UpdateAllMessagesRead 设置所有消息为已读
196248
func (this *MessageDAO) UpdateAllMessagesRead(tx *dbs.Tx, adminId int64, userId int64) error {
197249
query := this.Query(tx).
198250
Attr("isRead", false)
@@ -208,7 +260,7 @@ func (this *MessageDAO) UpdateAllMessagesRead(tx *dbs.Tx, adminId int64, userId
208260
return err
209261
}
210262

211-
// 检查消息权限
263+
// CheckMessageUser 检查消息权限
212264
func (this *MessageDAO) CheckMessageUser(tx *dbs.Tx, messageId int64, adminId int64, userId int64) (bool, error) {
213265
if messageId <= 0 || (adminId <= 0 && userId <= 0) {
214266
return false, nil
@@ -225,7 +277,7 @@ func (this *MessageDAO) CheckMessageUser(tx *dbs.Tx, messageId int64, adminId in
225277
}
226278

227279
// 创建消息
228-
func (this *MessageDAO) createMessage(tx *dbs.Tx, clusterId int64, nodeId int64, messageType MessageType, level string, body string, paramsJSON []byte) (int64, error) {
280+
func (this *MessageDAO) createMessage(tx *dbs.Tx, clusterId int64, nodeId int64, messageType MessageType, level string, subject string, body string, paramsJSON []byte) (int64, error) {
229281
h := md5.New()
230282
h.Write([]byte(body))
231283
h.Write(paramsJSON)
@@ -241,6 +293,14 @@ func (this *MessageDAO) createMessage(tx *dbs.Tx, clusterId int64, nodeId int64,
241293
op.NodeId = nodeId
242294
op.Type = messageType
243295
op.Level = level
296+
297+
subjectRunes := []rune(subject)
298+
if len(subjectRunes) > 100 {
299+
op.Subject = string(subjectRunes[:100]) + "..."
300+
} else {
301+
op.Subject = subject
302+
}
303+
244304
op.Body = body
245305
if len(paramsJSON) > 0 {
246306
op.Params = paramsJSON

internal/db/models/message_model.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
package models
22

3-
// 消息通知
3+
// Message 消息通知
44
type Message struct {
55
Id uint64 `field:"id"` // ID
66
AdminId uint32 `field:"adminId"` // 管理员ID
77
UserId uint32 `field:"userId"` // 用户ID
88
ClusterId uint32 `field:"clusterId"` // 集群ID
99
NodeId uint32 `field:"nodeId"` // 节点ID
1010
Level string `field:"level"` // 级别
11+
Subject string `field:"subject"` // 标题
1112
Body string `field:"body"` // 内容
1213
Type string `field:"type"` // 消息类型
1314
Params string `field:"params"` // 额外的参数
@@ -25,6 +26,7 @@ type MessageOperator struct {
2526
ClusterId interface{} // 集群ID
2627
NodeId interface{} // 节点ID
2728
Level interface{} // 级别
29+
Subject interface{} // 标题
2830
Body interface{} // 内容
2931
Type interface{} // 消息类型
3032
Params interface{} // 额外的参数
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
package models
2+
3+
import (
4+
"encoding/json"
5+
_ "github.com/go-sql-driver/mysql"
6+
"github.com/iwind/TeaGo/Tea"
7+
"github.com/iwind/TeaGo/dbs"
8+
"github.com/iwind/TeaGo/maps"
9+
)
10+
11+
const (
12+
MessageReceiverStateEnabled = 1 // 已启用
13+
MessageReceiverStateDisabled = 0 // 已禁用
14+
)
15+
16+
type MessageReceiverDAO dbs.DAO
17+
18+
func NewMessageReceiverDAO() *MessageReceiverDAO {
19+
return dbs.NewDAO(&MessageReceiverDAO{
20+
DAOObject: dbs.DAOObject{
21+
DB: Tea.Env,
22+
Table: "edgeMessageReceivers",
23+
Model: new(MessageReceiver),
24+
PkName: "id",
25+
},
26+
}).(*MessageReceiverDAO)
27+
}
28+
29+
var SharedMessageReceiverDAO *MessageReceiverDAO
30+
31+
func init() {
32+
dbs.OnReady(func() {
33+
SharedMessageReceiverDAO = NewMessageReceiverDAO()
34+
})
35+
}
36+
37+
// EnableMessageReceiver 启用条目
38+
func (this *MessageReceiverDAO) EnableMessageReceiver(tx *dbs.Tx, id int64) error {
39+
_, err := this.Query(tx).
40+
Pk(id).
41+
Set("state", MessageReceiverStateEnabled).
42+
Update()
43+
return err
44+
}
45+
46+
// DisableMessageReceiver 禁用条目
47+
func (this *MessageReceiverDAO) DisableMessageReceiver(tx *dbs.Tx, id int64) error {
48+
_, err := this.Query(tx).
49+
Pk(id).
50+
Set("state", MessageReceiverStateDisabled).
51+
Update()
52+
return err
53+
}
54+
55+
// FindEnabledMessageReceiver 查找启用中的条目
56+
func (this *MessageReceiverDAO) FindEnabledMessageReceiver(tx *dbs.Tx, id int64) (*MessageReceiver, error) {
57+
result, err := this.Query(tx).
58+
Pk(id).
59+
Attr("state", MessageReceiverStateEnabled).
60+
Find()
61+
if result == nil {
62+
return nil, err
63+
}
64+
return result.(*MessageReceiver), err
65+
}
66+
67+
// DisableReceivers 禁用一组接收人
68+
func (this *MessageReceiverDAO) DisableReceivers(tx *dbs.Tx, clusterId int64, nodeId int64, serverId int64) error {
69+
return this.Query(tx).
70+
Attr("clusterId", clusterId).
71+
Attr("nodeId", nodeId).
72+
Attr("serverId", serverId).
73+
Set("state", MessageReceiverStateDisabled).
74+
UpdateQuickly()
75+
}
76+
77+
// CreateReceiver 创建接收人
78+
func (this *MessageReceiverDAO) CreateReceiver(tx *dbs.Tx, target MessageTaskTarget, messageType MessageType, params maps.Map, recipientId int64, recipientGroupId int64) (int64, error) {
79+
op := NewMessageReceiverOperator()
80+
op.ClusterId = target.ClusterId
81+
op.NodeId = target.NodeId
82+
op.ServerId = target.ServerId
83+
op.Type = messageType
84+
85+
if params == nil {
86+
params = maps.Map{}
87+
}
88+
paramsJSON, err := json.Marshal(params)
89+
if err != nil {
90+
return 0, err
91+
}
92+
op.Params = paramsJSON
93+
94+
op.RecipientId = recipientId
95+
op.RecipientGroupId = recipientGroupId
96+
op.State = MessageReceiverStateEnabled
97+
return this.SaveInt64(tx, op)
98+
}
99+
100+
// FindAllReceivers 查询接收人
101+
func (this *MessageReceiverDAO) FindAllReceivers(tx *dbs.Tx, target MessageTaskTarget, messageType string) (result []*MessageReceiver, err error) {
102+
query := this.Query(tx)
103+
if len(messageType) > 0 {
104+
query.Attr("type", []string{"*", messageType}) // *表示所有的
105+
}
106+
_, err = query.
107+
Attr("clusterId", target.ClusterId).
108+
Attr("nodeId", target.NodeId).
109+
Attr("serverId", target.ServerId).
110+
State(MessageReceiverStateEnabled).
111+
AscPk().
112+
Slice(&result).
113+
FindAll()
114+
return
115+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package models
2+
3+
import (
4+
_ "github.com/go-sql-driver/mysql"
5+
_ "github.com/iwind/TeaGo/bootstrap"
6+
)

0 commit comments

Comments
 (0)