From 3b601736b3537a341f9324ed81a94939f7e08e12 Mon Sep 17 00:00:00 2001 From: TsMask <340112800@qq.com> Date: Tue, 19 Mar 2024 11:22:44 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E5=9C=A8=E5=91=8A=E8=AD=A6=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E4=B8=8A=E6=8A=A5=E6=97=B6=E8=BF=9B=E8=A1=8C=E6=8E=A8?= =?UTF-8?q?=E9=80=81kafka?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- features/fm/alarm.go | 33 +++++++++++++++++++++++ src/modules/nms_cxy/service/alarm.impl.go | 4 +-- 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/features/fm/alarm.go b/features/fm/alarm.go index 8887bf1..3b6a251 100644 --- a/features/fm/alarm.go +++ b/features/fm/alarm.go @@ -15,6 +15,9 @@ import ( "nms_nbi/lib/log" "nms_nbi/lib/services" "nms_nbi/restagent/config" + "nms_nbi/src/framework/utils/date" + neDataModel "nms_nbi/src/modules/network_data/model" + nmsCXYService "nms_nbi/src/modules/nms_cxy/service" "xorm.io/xorm" @@ -284,6 +287,9 @@ func PostAlarmFromNF(w http.ResponseWriter, r *http.Request) { services.ResponseInternalServerError500DatabaseOperationFailed(w) continue } + + // + pushKafka(alarmData) } else { affected, err := session.Where("ne_type=? and ne_id=? and alarm_id=? and alarm_status=1", alarmData.NeType, alarmData.NeId, alarmData.AlarmId). Cols("alarm_status", "clear_type", "clear_time"). @@ -293,6 +299,9 @@ func PostAlarmFromNF(w http.ResponseWriter, r *http.Request) { services.ResponseInternalServerError500DatabaseOperationFailed(w) continue } + + // + pushKafka(alarmData) } log.Trace("alarmData:", alarmData) var currentSeq string @@ -456,6 +465,9 @@ func PostAlarmFromNF(w http.ResponseWriter, r *http.Request) { log.Error("Failed to insert alarm_log:", err) } session.Commit() + + // + pushKafka(alarmData) } if config.GetYamlConfig().Alarm.ForwardAlarm { if err = AlarmEmailForward(&alarmData); err != nil { @@ -470,6 +482,27 @@ func PostAlarmFromNF(w http.ResponseWriter, r *http.Request) { services.ResponseStatusOK200Null(w) } +// pushKafka 告警推送kafka +func pushKafka(alarmData Alarm) { + neAlarm := neDataModel.Alarm{ + AlarmSeq: fmt.Sprint(alarmData.AlarmSeq), + AlarmTitle: alarmData.AlarmTitle, + AlarmStatus: fmt.Sprint(alarmData.AlarmStatus), + AlarmType: alarmData.AlarmType, + OrigSeverity: alarmData.OrigSeverity, + EventTime: date.ParseStrToDate(alarmData.EventTime, date.YYYY_MM_DD_HH_MM_SS), + ID: alarmData.AlarmId, + AlarmCode: fmt.Sprint(alarmData.AlarmCode), + SpecificProblem: alarmData.SpecificProblem, + ObjectUid: alarmData.ObjectUid, + NeName: alarmData.NeName, + AlarmId: alarmData.ObjectUid, + ObjectName: alarmData.ObjectName, + AddInfo: alarmData.AddInfo, + } + nmsCXYService.NewAlarmImpl.KafkaPush(neAlarm) +} + // process alarm get from NFs func GetAlarmFromNF(w http.ResponseWriter, r *http.Request) { log.Debug("GetAlarmFromNF processing... ") diff --git a/src/modules/nms_cxy/service/alarm.impl.go b/src/modules/nms_cxy/service/alarm.impl.go index a2afb42..2f0c39a 100644 --- a/src/modules/nms_cxy/service/alarm.impl.go +++ b/src/modules/nms_cxy/service/alarm.impl.go @@ -65,12 +65,12 @@ func (s *AlarmImpl) parseAlarmData(v neDataModel.Alarm) model.Alarm { AlarmType: v.AlarmType, AlarmLevel: alarmLevel, EventTime: date.ParseDateToStr(v.EventTime, date.YYYY_MM_DD_HH_MM_SS), - AlarmId: parse.Number(v.ID), + AlarmId: parse.Number(v.AlarmId), CauseID: v.AlarmCode, Cause: v.SpecificProblem, NeRUID: v.ObjectUid, NeUserLabel: v.NeName, - ObjectRUID: v.AlarmId, + ObjectRUID: v.ObjectUid, ObjectUserLabel: v.ObjectName, AddInfo: v.AddInfo, }