package genNeStateAlarm import ( "encoding/json" "fmt" "net/http" "strconv" "strings" "time" "be.ems/features/fm" "be.ems/lib/dborm" "be.ems/lib/global" "be.ems/lib/log" "be.ems/restagent/config" "be.ems/src/framework/cron" "github.com/go-resty/resty/v2" ) var NewProcessor = &BarProcessor{ progress: 0, count: 0, } // bar 队列任务处理 type BarProcessor struct { // 任务进度 progress int // 执行次数 count int } type BarParams struct { AlarmID string `json:"alarmID"` AlarmCode int `json:"alarmCode"` AlarmTitle string `json:"alarmTitle"` AlarmType string `json:"alarmType"` OrigSeverity string `json:"origSeverity"` ObjectUID string `json:"objectUID"` ObjectName string `json:"objectName"` ObjectType string `json:"objectType"` SpecificProblem string `json:"specificProblem"` SpecificProblemID string `json:"specificProblemID"` AddInfo string `json:"AddInfo"` Threshold int64 `json:"threshold"` } // type BarParams struct { // Duration int `json:"duration"` // } type Alarm struct { Id int `json:"-" xorm:"pk 'id' autoincr"` AlarmSeq int `json:"alarmSeq"` AlarmId string `json:"alarmId" xorm:"alarm_id"` NeId string `json:"neId"` AlarmCode int `json:"alarmCode"` AlarmTitle string `json:"alarmTitle"` EventTime string `json:"eventTime"` AlarmType string `json:"alarmType"` OrigSeverity string `json:"origSeverity"` PerceivedSeverity string `json:"perceivedSeverity"` PVFlag string `json:"pvFlag" xorm:"pv_flag"` NeName string `json:"neName"` NeType string `json:"neType"` ObjectUid string `json:"objectUid" xorm:"object_uid"` ObjectName string `json:"objectName" xorm:"object_name"` ObjectType string `json:"objectType" xorm:"object_type"` LocationInfo string `json:"locationInfo"` Province string `json:"province"` AlarmStatus int `json:"alarmStatus" xorm:"alarm_status"` SpecificProblem string `json:"specificProblem"` SpecificProblemID string `json:"specificProblemID" xorm:"specific_problem_id"` AddInfo string `json:"addInfo"` // ClearType int `json:"-" xorm:"clear_type"` // 0: Unclear, 1: Auto clear, 2: Manual clear // ClearTime sql.NullTime `json:"-" xorm:"clear_time"` } var client = resty.New() func init() { client. SetTimeout(time.Duration(400 * time.Millisecond)) } func (s *BarProcessor) Execute(data any) (any, error) { var err error s.count++ options := data.(cron.JobData) sysJob := options.SysJob var alarmDefine BarParams err = json.Unmarshal([]byte(sysJob.TargetParams), &alarmDefine) if err != nil { log.Error("Failed to Unmarshal:", err) return nil, err } var nes []dborm.NeInfo _, err = dborm.XormGetAllNeInfo(&nes) if err != nil { log.Error("Failed to get all ne info:", err) return nil, err } succActiveAlarmNum := 0 failActiveAlarmNum := 0 succClearAlarmNum := 0 failClearAlarmNum := 0 for _, ne := range nes { //log.Debug("ne:", ne) sql := fmt.Sprintf("select * from ne_state where ne_type='%s' and ne_id='%s' order by `timestamp` desc limit 1", ne.NeType, ne.NeId) log.Debug("SQL:", sql) neState, err := dborm.XormGetDataBySQL(sql) if err != nil { log.Error("Failed to get ne_state:", err) continue } if len(*neState) == 0 { log.Warn("Not found record in ne_state:") continue } //log.Debug("neState:", *neState) // params := "10000" // alarmDefine, err := dborm.XormGetAlarmDefine(params) // if err != nil { // log.Error("Failed to get alarm_define:", err) // continue // } else if alarmDefine == nil { // log.Error("Not found data from alarm_define") // continue // } // log.Debug("alarmDefine:", alarmDefine) sql = fmt.Sprintf("select * from alarm where alarm_id = '%s' and ne_type='%s' and ne_id = '%s' order by event_time desc limit 1", alarmDefine.AlarmID, ne.NeType, ne.RmUID) alarm, err := dborm.XormGetDataBySQL(sql) if err != nil { log.Error("Failed to get alarm:", err) continue } //log.Debug("alarm:", *alarm) var timestamp string if len(*neState) == 0 { log.Infof("Not found ne_state neType:%s, neId:%s", ne.NeType, ne.NeId) timestamp = ne.UpdateTime } else { timestamp = (*neState)[0]["timestamp"] } // 解析日期时间字符串为时间对象 seconds, err := global.GetSecondsSinceDatetime(timestamp) if err != nil { log.Error("Failed to GetSecondsSinceDatetime:", err) continue } log.Debugf("timestamp:%s seconds:%d", timestamp, seconds) if seconds <= alarmDefine.Threshold { if len(*alarm) == 0 || (*alarm)[0]["alarm_status"] == fm.AlarmStatusClearString { continue } // clear alarm, todo var alarmSeq int = 1 threshold := strconv.FormatInt(alarmDefine.Threshold, 10) SpecificProblem := strings.ReplaceAll(alarmDefine.SpecificProblem, "{threshold}", threshold) locationInfo := fmt.Sprintf("SystemManagement.State: NE heartbeat timestamp=%s,threshold=%v", timestamp, alarmDefine.Threshold) alarmData := &Alarm{ AlarmSeq: alarmSeq, AlarmId: alarmDefine.AlarmID, NeId: ne.RmUID, NeType: ne.NeType, NeName: ne.NeName, Province: ne.Province, PVFlag: ne.PvFlag, AlarmCode: alarmDefine.AlarmCode, AlarmTitle: alarmDefine.AlarmTitle, AlarmType: alarmDefine.AlarmType, AlarmStatus: fm.AlarmStatusClear, OrigSeverity: alarmDefine.OrigSeverity, ObjectUid: alarmDefine.ObjectUID, ObjectName: alarmDefine.ObjectName, ObjectType: alarmDefine.ObjectType, LocationInfo: locationInfo, SpecificProblem: SpecificProblem, SpecificProblemID: alarmDefine.SpecificProblemID, AddInfo: alarmDefine.AddInfo, EventTime: time.Now().Local().Format(time.RFC3339), } alarmArray := &[]Alarm{*alarmData} body, _ := json.Marshal(alarmArray) //log.Debug("body: ", string(body)) var response *resty.Response requestURI := fmt.Sprintf("/api/rest/faultManagement/v1/elementType/%s/objectType/alarms", ne.NeType) restHost := fmt.Sprintf("http://127.0.0.1:%d", config.GetYamlConfig().Rest[0].Port) requestURL := fmt.Sprintf("%s%s", restHost, requestURI) log.Debug("requestURL: POST ", requestURL) response, err = client.R(). EnableTrace(). SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}). SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}). SetBody(body). Post(requestURL) if err != nil { log.Error("Failed to post:", err) failClearAlarmNum++ continue } log.Debug("StatusCode: ", response.StatusCode()) switch response.StatusCode() { case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted: log.Debug("response body:", string(response.Body())) body := new(map[string]interface{}) _ = json.Unmarshal(response.Body(), &body) succClearAlarmNum++ default: log.Debug("response body:", string(response.Body())) body := new(map[string]interface{}) _ = json.Unmarshal(response.Body(), &body) failClearAlarmNum++ } } else { var alarmSeq int = 1 if len(*alarm) > 0 && (*alarm)[0]["alarm_status"] == fm.AlarmStatusActiveString { log.Info("System state alarm has exist") continue } threshold := strconv.FormatInt(alarmDefine.Threshold, 10) SpecificProblem := strings.ReplaceAll(alarmDefine.SpecificProblem, "{threshold}", threshold) locationInfo := fmt.Sprintf("SystemManagement.State: NE heartbeat timestamp=%s,threshold=%v", timestamp, alarmDefine.Threshold) alarmData := &Alarm{ AlarmSeq: alarmSeq, AlarmId: alarmDefine.AlarmID, NeId: ne.RmUID, NeType: ne.NeType, NeName: ne.NeName, Province: ne.Province, PVFlag: ne.PvFlag, AlarmCode: alarmDefine.AlarmCode, AlarmTitle: alarmDefine.AlarmTitle, AlarmType: alarmDefine.AlarmType, AlarmStatus: fm.AlarmStatusActive, OrigSeverity: alarmDefine.OrigSeverity, ObjectUid: alarmDefine.ObjectUID, ObjectName: alarmDefine.ObjectName, ObjectType: alarmDefine.ObjectType, LocationInfo: locationInfo, SpecificProblem: SpecificProblem, SpecificProblemID: alarmDefine.SpecificProblemID, AddInfo: alarmDefine.AddInfo, EventTime: time.Now().Local().Format(time.RFC3339), } alarmArray := &[]Alarm{*alarmData} body, _ := json.Marshal(alarmArray) //log.Debug("body: ", string(body)) var response *resty.Response requestURI := fmt.Sprintf("/api/rest/faultManagement/v1/elementType/%s/objectType/alarms", ne.NeType) restHost := fmt.Sprintf("http://127.0.0.1:%d", config.GetYamlConfig().Rest[0].Port) requestURL := fmt.Sprintf("%s%s", restHost, requestURI) log.Debug("requestURL: POST ", requestURL) response, err = client.R(). EnableTrace(). SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}). SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}). SetBody(body). Post(requestURL) if err != nil { log.Error("Failed to post:", err) failActiveAlarmNum++ continue } log.Debug("StatusCode: ", response.StatusCode()) switch response.StatusCode() { case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted: log.Debug("response body:", string(response.Body())) body := new(map[string]interface{}) _ = json.Unmarshal(response.Body(), &body) succActiveAlarmNum++ default: log.Debug("response body:", string(response.Body())) body := new(map[string]interface{}) _ = json.Unmarshal(response.Body(), &body) failActiveAlarmNum++ } } } // 返回结果,用于记录执行结果 return map[string]any{ "succActiveAlarmNum": succActiveAlarmNum, "failActiveAlarmNum": failActiveAlarmNum, "succClearAlarmNum": succClearAlarmNum, "failClearAlarmNum": failClearAlarmNum, }, nil }