fix: fm告警上报推入kafka

This commit is contained in:
TsMask
2024-07-12 17:42:13 +08:00
parent 6216ffc117
commit d0767105b3

View File

@@ -16,11 +16,14 @@ import (
"nms_cxy/lib/services" "nms_cxy/lib/services"
"nms_cxy/omc/config" "nms_cxy/omc/config"
"xorm.io/xorm" "nms_cxy/src/framework/utils/date"
neDataModel "nms_cxy/src/modules/network_data/model"
nmsCXYService "nms_cxy/src/modules/nms_cxy/service"
"github.com/go-resty/resty/v2" "github.com/go-resty/resty/v2"
_ "github.com/go-sql-driver/mysql" _ "github.com/go-sql-driver/mysql"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"xorm.io/xorm"
) )
const ( const (
@@ -285,6 +288,8 @@ func PostAlarmFromNF(w http.ResponseWriter, r *http.Request) {
services.ResponseInternalServerError500DatabaseOperationFailed(w) services.ResponseInternalServerError500DatabaseOperationFailed(w)
continue continue
} }
// 推送Kafka
ParseAlarmDataPushKafka(alarmData)
} else { } else {
affected, err := session.Where("ne_type=? and ne_id=? and alarm_id=? and alarm_status=1", alarmData.NeType, alarmData.NeId, alarmData.AlarmId). affected, err := session.Where("ne_type=? and ne_id=? and alarm_id=? and alarm_status=1", alarmData.NeType, alarmData.NeId, alarmData.AlarmId).
Cols("alarm_status", "clear_type", "clear_time"). Cols("alarm_status", "clear_type", "clear_time").
@@ -294,6 +299,8 @@ func PostAlarmFromNF(w http.ResponseWriter, r *http.Request) {
services.ResponseInternalServerError500DatabaseOperationFailed(w) services.ResponseInternalServerError500DatabaseOperationFailed(w)
continue continue
} }
// 推送Kafka
ParseAlarmDataPushKafka(alarmData)
} }
log.Trace("alarmData:", alarmData) log.Trace("alarmData:", alarmData)
var currentSeq string var currentSeq string
@@ -354,6 +361,8 @@ func PostAlarmFromNF(w http.ResponseWriter, r *http.Request) {
} }
} }
session.Commit() session.Commit()
// 推送Kafka
ParseAlarmDataPushKafka(alarmData)
// for alarm forward time format // for alarm forward time format
alarmData.EventTime = eventTime alarmData.EventTime = eventTime
} else { } else {
@@ -466,6 +475,8 @@ func PostAlarmFromNF(w http.ResponseWriter, r *http.Request) {
log.Error("Failed to insert alarm_log:", err) log.Error("Failed to insert alarm_log:", err)
} }
session.Commit() session.Commit()
// 推送Kafka
ParseAlarmDataPushKafka(alarmData)
} }
if config.GetYamlConfig().Alarm.ForwardAlarm { if config.GetYamlConfig().Alarm.ForwardAlarm {
if err = AlarmEmailForward(&alarmData); err != nil { if err = AlarmEmailForward(&alarmData); err != nil {
@@ -778,3 +789,24 @@ func GetAlarmFromNF(w http.ResponseWriter, r *http.Request) {
} }
services.ResponseStatusOK204NoContent(w) services.ResponseStatusOK204NoContent(w)
} }
// ParseAlarmDataPushKafka 处理告警数据后推送kafka
func ParseAlarmDataPushKafka(alarmData Alarm) {
neAlarm := neDataModel.Alarm{
AlarmSeq: fmt.Sprint(alarmData.AlarmSeq),
AlarmTitle: alarmData.AlarmTitle,
AlarmStatus: fmt.Sprint(alarmData.AlarmStatus),
AlarmType: alarmData.AlarmType,
OrigSeverity: alarmData.OrigSeverity,
EventTime: date.ParseStrToDate(alarmData.EventTime, date.YYYY_MM_DD_HH_MM_SS),
ID: alarmData.AlarmId,
AlarmCode: fmt.Sprint(alarmData.AlarmCode),
SpecificProblem: alarmData.SpecificProblem,
ObjectUid: alarmData.ObjectUid,
NeName: alarmData.NeName,
AlarmId: alarmData.ObjectUid,
ObjectName: alarmData.ObjectName,
AddInfo: alarmData.AddInfo,
}
nmsCXYService.NewAlarmImpl.KafkaPush(neAlarm)
}