fix: 优化告警状态检查逻辑,增加连续触发次数管理

This commit is contained in:
TsMask
2025-10-16 11:29:23 +08:00
parent 98da13473d
commit 3b053d7f47
2 changed files with 82 additions and 51 deletions

View File

@@ -3,6 +3,7 @@ package ne_alarm_state_check
import (
"encoding/json"
"fmt"
"sync"
"time"
"be.ems/src/framework/constants"
@@ -24,6 +25,8 @@ var NewProcessor = &NeAlarmStateCheckProcessor{
alarmService: neDataService.NewAlarm,
wsSendService: wsService.NewWSSend,
count: 0,
triggerMax: 3,
triggerCount: sync.Map{},
}
// NeAlarmStateCheckProcessor 网元告警状态检查
@@ -33,6 +36,8 @@ type NeAlarmStateCheckProcessor struct {
alarmService *neDataService.Alarm // 告警信息服务
wsSendService *wsService.WSSend // ws发送服务
count int // 执行次数
triggerMax int64 // 阈值:连续触发次数大于该值才会产生告警
triggerCount sync.Map // 阈值连续触发次数,存储每个事件的触发记录
}
// alarmParams 告警参数
@@ -108,8 +113,30 @@ func (s *NeAlarmStateCheckProcessor) Execute(data any) (any, error) {
// 不在线
if !isOnline && alarmStatus == "" {
// 重置连续触发次数, 超过阈值才会清除告警
onlineKey := "ONLINE:" + neInfo.RmUID
if v, ok := s.triggerCount.Load(onlineKey); ok {
count := parse.Number(v)
if count < s.triggerMax {
s.triggerCount.Store(onlineKey, count+1)
continue
}
s.triggerCount.Delete(onlineKey)
} else {
s.triggerCount.Store(onlineKey, 0)
continue
}
// 附加信息
addInfo := params.AddInfo
if addInfo != "" {
addInfo = fmt.Sprintf("%s, NE Connect Failed %s:%d", addInfo, neInfo.IP, neInfo.Port)
} else {
addInfo = fmt.Sprintf("NE Connect Failed %s:%d", neInfo.IP, neInfo.Port)
}
// 进行新增
newAlarm, err := s.alarmNew(neInfo, params)
copyParams := params
copyParams.AddInfo = addInfo
newAlarm, err := s.alarmNew(neInfo, copyParams)
if err != nil {
result[neTypeAndId] = err.Error()
continue
@@ -125,7 +152,7 @@ func (s *NeAlarmStateCheckProcessor) Execute(data any) (any, error) {
}
// alarmClear 清除告警
func (s NeAlarmStateCheckProcessor) alarmClear(neInfo neModel.NeInfo, v neDataModel.Alarm) (neDataModel.Alarm, error) {
func (s *NeAlarmStateCheckProcessor) alarmClear(neInfo neModel.NeInfo, v neDataModel.Alarm) (neDataModel.Alarm, error) {
// 变更告警ID为告警清除ID
v.AlarmId = fmt.Sprintf("%s%d", v.AlarmCode, v.EventTime.UnixMilli())
v.AlarmStatus = "0"
@@ -142,7 +169,7 @@ func (s NeAlarmStateCheckProcessor) alarmClear(neInfo neModel.NeInfo, v neDataMo
}
// alarmNew 新增告警
func (s NeAlarmStateCheckProcessor) alarmNew(neInfo neModel.NeInfo, v alarmParams) (neDataModel.Alarm, error) {
func (s *NeAlarmStateCheckProcessor) alarmNew(neInfo neModel.NeInfo, v alarmParams) (neDataModel.Alarm, error) {
// seq 告警序号
lastSeq := s.alarmService.FindAlarmSeqLast(neInfo.NeType, neInfo.RmUID)

View File

@@ -22,12 +22,6 @@ import (
wsService "be.ems/src/modules/ws/service"
)
var (
triggerMax int64 = 3 // 阈值:连续触发次数大于该值才会产生告警
triggerCount sync.Map // 阈值连续触发次数,存储每个事件的触发记录
triggerWindow time.Duration = 30 * time.Second // 事件触发的时间窗口
)
var NewProcessor = &NeAlarmStateCheckCMDProcessor{
neConfigBackupService: neService.NewNeConfigBackup,
neInfoService: neService.NewNeInfo,
@@ -35,6 +29,8 @@ var NewProcessor = &NeAlarmStateCheckCMDProcessor{
alarmService: neDataService.NewAlarm,
wsSendService: wsService.NewWSSend,
count: 0,
triggerMax: 3,
triggerCount: sync.Map{},
}
// NeAlarmStateCheckCMDProcessor 网元告警内存/CPU/磁盘检查
@@ -45,6 +41,8 @@ type NeAlarmStateCheckCMDProcessor struct {
alarmService *neDataService.Alarm // 告警信息服务
wsSendService *wsService.WSSend // ws发送服务
count int // 执行次数
triggerMax int64 // 阈值:连续触发次数大于该值才会产生告警
triggerCount sync.Map // 阈值连续触发次数,存储每个事件的触发记录
}
// alarmParams 告警参数
@@ -111,7 +109,33 @@ func (s *NeAlarmStateCheckCMDProcessor) Execute(data any) (any, error) {
}
// 检查状态
err := s.serverState(neInfo.ServerState, params.CPUUseGt, params.MemUseGt, params.DiskUseGt)
sysCpuUsage, sysMemUsage, sysDiskUsage := s.serverState(neInfo.ServerState)
// 检查CPU/Memory/Disk使用率
warnMsg := []string{}
if int64(sysCpuUsage) >= params.CPUUseGt {
warnMsg = append(warnMsg, fmt.Sprintf("cpu usage %.2f%%", sysCpuUsage))
}
if int64(sysMemUsage) >= params.MemUseGt {
warnMsg = append(warnMsg, fmt.Sprintf("memory usage %.2f%%", sysMemUsage))
}
if int64(sysDiskUsage) >= params.DiskUseGt {
warnMsg = append(warnMsg, fmt.Sprintf("disk usage %.2f%%", sysDiskUsage))
}
var err error
if len(warnMsg) > 0 {
var count int64
if v, ok := s.triggerCount.Load(neInfo.RmUID); ok {
count = parse.Number(v)
s.triggerCount.Store(neInfo.RmUID, count+1)
} else {
s.triggerCount.Store(neInfo.RmUID, 0)
}
if count >= s.triggerMax {
s.triggerCount.Delete("CLEAR:" + neInfo.RmUID)
s.triggerCount.Delete(neInfo.RmUID)
err = fmt.Errorf("greater than %s", strings.Join(warnMsg, ", "))
}
}
// 附加信息
addInfo := params.AddInfo
@@ -139,6 +163,20 @@ func (s *NeAlarmStateCheckCMDProcessor) Execute(data any) (any, error) {
}
// 活动告警进行清除
if alarmStatus == "1" {
// 重置连续触发次数, 超过阈值才会清除告警
clearKey := "CLEAR:" + neInfo.RmUID
if v, ok := s.triggerCount.Load(clearKey); ok {
count := parse.Number(v)
if count < s.triggerMax {
s.triggerCount.Store(clearKey, count+1)
continue
}
s.triggerCount.Delete(clearKey)
s.triggerCount.Delete(neInfo.RmUID)
} else {
s.triggerCount.Store(clearKey, 0)
continue
}
clearAlarm, err := s.alarmClear(neInfo, alarmIdArr[0])
if err != nil {
result[neTypeAndId] = err.Error()
@@ -147,15 +185,15 @@ func (s *NeAlarmStateCheckCMDProcessor) Execute(data any) (any, error) {
groupID := fmt.Sprintf("%s_%s_%s", wsService.GROUP_ALARM, neInfo.NeType, neInfo.NeId)
s.wsSendService.ByGroupID(groupID, clearAlarm)
result[neTypeAndId] = "alarm clear"
alarmStatus = "" // 标记为未记录再次发起新告警
continue
}
// 未记录
if alarmStatus == "" {
if alarmStatus == "" && addInfo != "" {
// 进行新增
copyParams := params
copyParams.AddInfo = addInfo
newAlarm, err := s.alarmNew(neInfo, copyParams)
triggerCount.Store(neTypeAndId, []time.Time{}) // 重置连续触发次数
s.triggerCount.Store(neTypeAndId, []time.Time{}) // 重置连续触发次数
if err != nil {
result[neTypeAndId] = err.Error()
continue
@@ -171,7 +209,7 @@ func (s *NeAlarmStateCheckCMDProcessor) Execute(data any) (any, error) {
}
// serverState 网元状态
func (s NeAlarmStateCheckCMDProcessor) serverState(state map[string]any, cpuUseGt, memUseGt, diskUseGt int64) error {
func (s *NeAlarmStateCheckCMDProcessor) serverState(state map[string]any) (float64, float64, float64) {
// 网元CPU使用率
var nfCpuUsage float64 = 0
// CPU使用率
@@ -234,45 +272,11 @@ func (s NeAlarmStateCheckCMDProcessor) serverState(state map[string]any, cpuUseG
groupID := fmt.Sprintf("%s_%s_%s", wsService.GROUP_NE_STATE, neState.NeType, neState.NeId)
s.wsSendService.ByGroupID(groupID, neState)
// 检查CPU/Memory/Disk使用率
warnMsg := []string{}
if int64(sysCpuUsage) >= cpuUseGt {
warnMsg = append(warnMsg, fmt.Sprintf("cpu usage %.2f%%", sysCpuUsage))
}
if int64(sysMemUsage) >= memUseGt {
warnMsg = append(warnMsg, fmt.Sprintf("memory usage %.2f%%", sysMemUsage))
}
if int64(sysDiskUsage) >= diskUseGt {
warnMsg = append(warnMsg, fmt.Sprintf("disk usage %.2f%%", sysDiskUsage))
}
if len(warnMsg) > 0 {
currentTime := time.Now()
neTypeAndId := fmt.Sprintf("%s_%s", neState.NeType, neState.NeId)
validTimes := []time.Time{}
if v, ok := triggerCount.Load(neTypeAndId); ok {
times := v.([]time.Time)
// 清理过期的记录10秒前的触发记录不再计入
for _, t := range times {
if currentTime.Sub(t) <= triggerWindow {
validTimes = append(validTimes, t)
}
}
validTimes = append(validTimes, currentTime)
triggerCount.Store(neTypeAndId, validTimes)
} else {
// 事件第一次触发,初始化记录
validTimes = append(validTimes, currentTime)
triggerCount.Store(neTypeAndId, validTimes)
}
if int64(len(validTimes)) >= triggerMax {
return fmt.Errorf("greater than %s", strings.Join(warnMsg, ", "))
}
}
return nil
return sysCpuUsage, sysMemUsage, sysDiskUsage
}
// alarmClear 清除告警
func (s NeAlarmStateCheckCMDProcessor) alarmClear(neInfo neModel.NeInfo, v neDataModel.Alarm) (neDataModel.Alarm, error) {
func (s *NeAlarmStateCheckCMDProcessor) alarmClear(neInfo neModel.NeInfo, v neDataModel.Alarm) (neDataModel.Alarm, error) {
// 变更告警ID为告警清除ID
v.AlarmId = fmt.Sprintf("%s%d", v.AlarmCode, v.EventTime.UnixMilli())
v.AlarmStatus = "0"
@@ -289,7 +293,7 @@ func (s NeAlarmStateCheckCMDProcessor) alarmClear(neInfo neModel.NeInfo, v neDat
}
// alarmNew 新增告警
func (s NeAlarmStateCheckCMDProcessor) alarmNew(neInfo neModel.NeInfo, v alarmParams) (neDataModel.Alarm, error) {
func (s *NeAlarmStateCheckCMDProcessor) alarmNew(neInfo neModel.NeInfo, v alarmParams) (neDataModel.Alarm, error) {
// seq 告警序号
lastSeq := s.alarmService.FindAlarmSeqLast(neInfo.NeType, neInfo.RmUID)