diff --git a/features/fm/alarm.go b/features/fm/alarm.go index 7e857e7..398b11c 100644 --- a/features/fm/alarm.go +++ b/features/fm/alarm.go @@ -16,11 +16,14 @@ import ( "nms_cxy/lib/services" "nms_cxy/omc/config" - "xorm.io/xorm" + "nms_cxy/src/framework/utils/date" + neDataModel "nms_cxy/src/modules/network_data/model" + nmsCXYService "nms_cxy/src/modules/nms_cxy/service" "github.com/go-resty/resty/v2" _ "github.com/go-sql-driver/mysql" "github.com/gorilla/mux" + "xorm.io/xorm" ) const ( @@ -285,6 +288,8 @@ func PostAlarmFromNF(w http.ResponseWriter, r *http.Request) { services.ResponseInternalServerError500DatabaseOperationFailed(w) continue } + // 推送Kafka + ParseAlarmDataPushKafka(alarmData) } else { affected, err := session.Where("ne_type=? and ne_id=? and alarm_id=? and alarm_status=1", alarmData.NeType, alarmData.NeId, alarmData.AlarmId). Cols("alarm_status", "clear_type", "clear_time"). @@ -294,6 +299,8 @@ func PostAlarmFromNF(w http.ResponseWriter, r *http.Request) { services.ResponseInternalServerError500DatabaseOperationFailed(w) continue } + // 推送Kafka + ParseAlarmDataPushKafka(alarmData) } log.Trace("alarmData:", alarmData) var currentSeq string @@ -354,6 +361,8 @@ func PostAlarmFromNF(w http.ResponseWriter, r *http.Request) { } } session.Commit() + // 推送Kafka + ParseAlarmDataPushKafka(alarmData) // for alarm forward time format alarmData.EventTime = eventTime } else { @@ -466,6 +475,8 @@ func PostAlarmFromNF(w http.ResponseWriter, r *http.Request) { log.Error("Failed to insert alarm_log:", err) } session.Commit() + // 推送Kafka + ParseAlarmDataPushKafka(alarmData) } if config.GetYamlConfig().Alarm.ForwardAlarm { if err = AlarmEmailForward(&alarmData); err != nil { @@ -778,3 +789,24 @@ func GetAlarmFromNF(w http.ResponseWriter, r *http.Request) { } services.ResponseStatusOK204NoContent(w) } + +// ParseAlarmDataPushKafka 处理告警数据后推送kafka +func ParseAlarmDataPushKafka(alarmData Alarm) { + neAlarm := neDataModel.Alarm{ + AlarmSeq: fmt.Sprint(alarmData.AlarmSeq), + AlarmTitle: alarmData.AlarmTitle, + AlarmStatus: fmt.Sprint(alarmData.AlarmStatus), + AlarmType: alarmData.AlarmType, + OrigSeverity: alarmData.OrigSeverity, + EventTime: date.ParseStrToDate(alarmData.EventTime, date.YYYY_MM_DD_HH_MM_SS), + ID: alarmData.AlarmId, + AlarmCode: fmt.Sprint(alarmData.AlarmCode), + SpecificProblem: alarmData.SpecificProblem, + ObjectUid: alarmData.ObjectUid, + NeName: alarmData.NeName, + AlarmId: alarmData.ObjectUid, + ObjectName: alarmData.ObjectName, + AddInfo: alarmData.AddInfo, + } + nmsCXYService.NewAlarmImpl.KafkaPush(neAlarm) +}