fix: 在告警数据上报时进行推送kafka

This commit is contained in:
TsMask
2024-03-19 11:22:44 +08:00
parent 42a73b9dfa
commit 3b601736b3
2 changed files with 35 additions and 2 deletions

View File

@@ -15,6 +15,9 @@ import (
"nms_nbi/lib/log" "nms_nbi/lib/log"
"nms_nbi/lib/services" "nms_nbi/lib/services"
"nms_nbi/restagent/config" "nms_nbi/restagent/config"
"nms_nbi/src/framework/utils/date"
neDataModel "nms_nbi/src/modules/network_data/model"
nmsCXYService "nms_nbi/src/modules/nms_cxy/service"
"xorm.io/xorm" "xorm.io/xorm"
@@ -284,6 +287,9 @@ func PostAlarmFromNF(w http.ResponseWriter, r *http.Request) {
services.ResponseInternalServerError500DatabaseOperationFailed(w) services.ResponseInternalServerError500DatabaseOperationFailed(w)
continue continue
} }
//
pushKafka(alarmData)
} else { } else {
affected, err := session.Where("ne_type=? and ne_id=? and alarm_id=? and alarm_status=1", alarmData.NeType, alarmData.NeId, alarmData.AlarmId). affected, err := session.Where("ne_type=? and ne_id=? and alarm_id=? and alarm_status=1", alarmData.NeType, alarmData.NeId, alarmData.AlarmId).
Cols("alarm_status", "clear_type", "clear_time"). Cols("alarm_status", "clear_type", "clear_time").
@@ -293,6 +299,9 @@ func PostAlarmFromNF(w http.ResponseWriter, r *http.Request) {
services.ResponseInternalServerError500DatabaseOperationFailed(w) services.ResponseInternalServerError500DatabaseOperationFailed(w)
continue continue
} }
//
pushKafka(alarmData)
} }
log.Trace("alarmData:", alarmData) log.Trace("alarmData:", alarmData)
var currentSeq string var currentSeq string
@@ -456,6 +465,9 @@ func PostAlarmFromNF(w http.ResponseWriter, r *http.Request) {
log.Error("Failed to insert alarm_log:", err) log.Error("Failed to insert alarm_log:", err)
} }
session.Commit() session.Commit()
//
pushKafka(alarmData)
} }
if config.GetYamlConfig().Alarm.ForwardAlarm { if config.GetYamlConfig().Alarm.ForwardAlarm {
if err = AlarmEmailForward(&alarmData); err != nil { if err = AlarmEmailForward(&alarmData); err != nil {
@@ -470,6 +482,27 @@ func PostAlarmFromNF(w http.ResponseWriter, r *http.Request) {
services.ResponseStatusOK200Null(w) services.ResponseStatusOK200Null(w)
} }
// pushKafka 告警推送kafka
func pushKafka(alarmData Alarm) {
neAlarm := neDataModel.Alarm{
AlarmSeq: fmt.Sprint(alarmData.AlarmSeq),
AlarmTitle: alarmData.AlarmTitle,
AlarmStatus: fmt.Sprint(alarmData.AlarmStatus),
AlarmType: alarmData.AlarmType,
OrigSeverity: alarmData.OrigSeverity,
EventTime: date.ParseStrToDate(alarmData.EventTime, date.YYYY_MM_DD_HH_MM_SS),
ID: alarmData.AlarmId,
AlarmCode: fmt.Sprint(alarmData.AlarmCode),
SpecificProblem: alarmData.SpecificProblem,
ObjectUid: alarmData.ObjectUid,
NeName: alarmData.NeName,
AlarmId: alarmData.ObjectUid,
ObjectName: alarmData.ObjectName,
AddInfo: alarmData.AddInfo,
}
nmsCXYService.NewAlarmImpl.KafkaPush(neAlarm)
}
// process alarm get from NFs // process alarm get from NFs
func GetAlarmFromNF(w http.ResponseWriter, r *http.Request) { func GetAlarmFromNF(w http.ResponseWriter, r *http.Request) {
log.Debug("GetAlarmFromNF processing... ") log.Debug("GetAlarmFromNF processing... ")

View File

@@ -65,12 +65,12 @@ func (s *AlarmImpl) parseAlarmData(v neDataModel.Alarm) model.Alarm {
AlarmType: v.AlarmType, AlarmType: v.AlarmType,
AlarmLevel: alarmLevel, AlarmLevel: alarmLevel,
EventTime: date.ParseDateToStr(v.EventTime, date.YYYY_MM_DD_HH_MM_SS), EventTime: date.ParseDateToStr(v.EventTime, date.YYYY_MM_DD_HH_MM_SS),
AlarmId: parse.Number(v.ID), AlarmId: parse.Number(v.AlarmId),
CauseID: v.AlarmCode, CauseID: v.AlarmCode,
Cause: v.SpecificProblem, Cause: v.SpecificProblem,
NeRUID: v.ObjectUid, NeRUID: v.ObjectUid,
NeUserLabel: v.NeName, NeUserLabel: v.NeName,
ObjectRUID: v.AlarmId, ObjectRUID: v.ObjectUid,
ObjectUserLabel: v.ObjectName, ObjectUserLabel: v.ObjectName,
AddInfo: v.AddInfo, AddInfo: v.AddInfo,
} }