feat: nbi
This commit is contained in:
@@ -10,6 +10,7 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"be.ems/features/nbi/redisqueue"
|
||||
"be.ems/lib/config"
|
||||
"be.ems/lib/dborm"
|
||||
"be.ems/lib/global"
|
||||
@@ -476,6 +477,24 @@ func PostAlarmFromNF(w http.ResponseWriter, r *http.Request) {
|
||||
log.Error("Failed to AlarmSMSForward:", err)
|
||||
}
|
||||
}
|
||||
if config.GetYamlConfig().Alarm.RelatedRules.Enabled {
|
||||
relatedId, found := JudgeRelated(&alarmData, session, config.RelationRules.Related)
|
||||
if found {
|
||||
id, err := InsertAlarmRelation(session, alarmData.AlarmId, relatedId, "related")
|
||||
if err != nil {
|
||||
log.Error("Failed to insert alarm relation:", err)
|
||||
}
|
||||
redisqueue.AddAlarmRelationQueue([]string{strconv.FormatInt(id, 10)})
|
||||
}
|
||||
derivedId, found := JudgeDerived(&alarmData, session)
|
||||
if found {
|
||||
id, err := InsertAlarmRelation(session, alarmData.AlarmId, derivedId, "derived")
|
||||
if err != nil {
|
||||
log.Error("Failed to insert alarm relation:", err)
|
||||
}
|
||||
redisqueue.AddAlarmRelationQueue([]string{strconv.FormatInt(id, 10)})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
services.ResponseStatusOK200Null(w)
|
||||
|
||||
78
features/fm/relation.go
Normal file
78
features/fm/relation.go
Normal file
@@ -0,0 +1,78 @@
|
||||
package fm
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"be.ems/lib/config"
|
||||
"xorm.io/xorm"
|
||||
)
|
||||
|
||||
func JudgeDerived(alarm *Alarm, session *xorm.Session) (parentId string, found bool) {
|
||||
for _, rule := range config.RelationRules.Derived {
|
||||
if alarm.AlarmCode == rule.ChildCode {
|
||||
// 查询是否有 parent_code 的告警
|
||||
var parent Alarm
|
||||
has, _ := session.Table("alarm").
|
||||
Where("alarm_code=? AND ne_id=? AND alarm_status=1", rule.ParentCode, alarm.NeId).
|
||||
Get(&parent)
|
||||
if has {
|
||||
return parent.AlarmId, true
|
||||
}
|
||||
}
|
||||
}
|
||||
return "", false
|
||||
}
|
||||
|
||||
func JudgeRelated(alarm *Alarm, session *xorm.Session, rules []config.RelatedRule) (relatedId string, found bool) {
|
||||
for _, rule := range rules {
|
||||
// 1. 检查 ne_type 是否匹配
|
||||
if rule.NeType != "" && alarm.NeType != rule.NeType {
|
||||
continue
|
||||
}
|
||||
// 2. 检查 code 是否在 codes 列表
|
||||
codeMatch := false
|
||||
for _, c := range rule.Codes {
|
||||
if alarm.AlarmCode == c {
|
||||
codeMatch = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !codeMatch {
|
||||
continue
|
||||
}
|
||||
// 3. 查询同一ne_id、codes列表内、时间窗口内的其他告警
|
||||
var brother Alarm
|
||||
parsedTime, err := time.Parse(time.RFC3339, alarm.EventTime)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
timeStart := parsedTime.Add(-time.Duration(rule.TimeWindow) * time.Second)
|
||||
has, _ := session.Table("alarm").
|
||||
Where("ne_id=? AND alarm_code IN (?) AND event_time BETWEEN ? AND ? AND alarm_id!=?",
|
||||
alarm.NeId, rule.Codes, timeStart, alarm.EventTime, alarm.AlarmId).
|
||||
Get(&brother)
|
||||
if has {
|
||||
return brother.AlarmId, true
|
||||
}
|
||||
}
|
||||
return "", false
|
||||
}
|
||||
|
||||
type AlarmRelation struct {
|
||||
Id int64 `xorm:"pk autoincr 'id'"`
|
||||
AlarmId string `xorm:"alarm_id"`
|
||||
RelatedAlarmId string `xorm:"related_alarm_id"`
|
||||
RelationType string `xorm:"relation_type"`
|
||||
CreateAt time.Time `xorm:"create_at"`
|
||||
}
|
||||
|
||||
func InsertAlarmRelation(session *xorm.Session, alarmId, relatedId, relationType string) (int64, error) {
|
||||
relation := &AlarmRelation{
|
||||
AlarmId: alarmId,
|
||||
RelatedAlarmId: relatedId,
|
||||
RelationType: relationType,
|
||||
CreateAt: time.Now(),
|
||||
}
|
||||
_, err := session.Insert(relation)
|
||||
return relation.Id, err // 返回插入的ID
|
||||
}
|
||||
103
features/nbi/redisqueue/queue.go
Normal file
103
features/nbi/redisqueue/queue.go
Normal file
@@ -0,0 +1,103 @@
|
||||
package redisqueue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
|
||||
"be.ems/lib/log"
|
||||
|
||||
redisdb "be.ems/src/framework/database/redis"
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
// 写入 alarm_relation 消息
|
||||
func AddAlarmRelationQueue(ids []string) error {
|
||||
payload := map[string][]string{"ids": ids}
|
||||
payloadBytes, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
values := map[string]interface{}{
|
||||
"payload": string(payloadBytes),
|
||||
}
|
||||
|
||||
_, err = client.XAdd(ctx, &redis.XAddArgs{
|
||||
Stream: "alarm_relation",
|
||||
Values: values,
|
||||
}).Result()
|
||||
|
||||
// 只保留最新2000条消息
|
||||
client.XTrimMaxLen(context.Background(), "alarm_relation", 2000).Result()
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// 写入 nbi_cm 消息
|
||||
func AddNbiCMQueue(ids []string) error {
|
||||
payload := map[string][]string{"ids": ids}
|
||||
payloadBytes, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
values := map[string]interface{}{
|
||||
"payload": string(payloadBytes),
|
||||
}
|
||||
_, err = client.XAdd(ctx, &redis.XAddArgs{
|
||||
Stream: "nbi_cm",
|
||||
Values: values,
|
||||
}).Result()
|
||||
|
||||
// 只保留最新2000条消息
|
||||
client.XTrimMaxLen(context.Background(), "nbi_cm", 2000).Result()
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// 写入 nbi_pm 消息
|
||||
func AddNbiPMQueue(neType, id string) error {
|
||||
payload := map[string]string{"neType": neType, "id": id}
|
||||
payloadBytes, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
values := map[string]interface{}{
|
||||
"payload": string(payloadBytes),
|
||||
}
|
||||
_, err = client.XAdd(ctx, &redis.XAddArgs{
|
||||
Stream: "nbi_pm",
|
||||
Values: values,
|
||||
}).Result()
|
||||
|
||||
// 只保留最新2000条消息
|
||||
client.XTrimMaxLen(context.Background(), "nbi_pm", 2000).Result()
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// 读取并打印指定 stream 的最新消息
|
||||
func ReadLatest(stream string) error {
|
||||
res, err := client.XRevRangeN(ctx, stream, "+", "-", 1).Result()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, msg := range res {
|
||||
log.Tracef("Stream: %s, ID: %s, Values: %v\n", stream, msg.ID, msg.Values)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
var ctx = context.Background()
|
||||
var client *redis.Client
|
||||
|
||||
func InitRedisQueue() {
|
||||
// var cancel context.CancelFunc
|
||||
// ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
|
||||
// defer cancel()
|
||||
client = redisdb.RDB("")
|
||||
}
|
||||
|
||||
func CloseRedisQueue() {
|
||||
if err := client.Close(); err != nil {
|
||||
log.Errorf("redis db close: %s", err)
|
||||
}
|
||||
}
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"be.ems/features/nbi/redisqueue"
|
||||
"be.ems/features/pm/kpi_c_report"
|
||||
"be.ems/features/pm/kpi_c_title"
|
||||
"be.ems/lib/config"
|
||||
@@ -338,6 +339,12 @@ func PostKPIReportFromNF(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
// 推送到redis队列
|
||||
err = redisqueue.AddNbiPMQueue(kpiData.NEType, strconv.Itoa(kpiData.ID))
|
||||
if err != nil {
|
||||
log.Warn("Failed to AddNbiPMQueue:", err)
|
||||
}
|
||||
|
||||
services.ResponseStatusOK204NoContent(w)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user