@@ -0,0 +1,301 @@
package genNeStateAlarm
import (
"encoding/json"
"fmt"
"net/http"
"time"
"ems.agt/features/fm"
"ems.agt/lib/dborm"
"ems.agt/lib/global"
"ems.agt/lib/log"
"ems.agt/restagent/config"
"github.com/go-resty/resty/v2"
)
var NewProcessor = & BarProcessor {
progress : 0 ,
count : 0 ,
}
// bar 队列任务处理
type BarProcessor struct {
// 任务进度
progress int
// 执行次数
count int
}
type BarParams struct {
Duration int ` json:"duration" `
}
type Alarm struct {
Id int ` json:"-" xorm:"pk 'id' autoincr" `
AlarmSeq int ` json:"alarmSeq" `
AlarmId string ` json:"alarmId" xorm:"alarm_id" `
NeId string ` json:"neId" `
AlarmCode int ` json:"alarmCode" `
AlarmTitle string ` json:"alarmTitle" `
EventTime string ` json:"eventTime" `
AlarmType string ` json:"alarmType" `
OrigSeverity string ` json:"origSeverity" `
PerceivedSeverity string ` json:"perceivedSeverity" `
PVFlag string ` json:"pvFlag" xorm:"pv_flag" `
NeName string ` json:"neName" `
NeType string ` json:"neType" `
ObjectUid string ` json:"objectUid" xorm:"object_uid" `
ObjectName string ` json:"objectName" xorm:"object_name" `
ObjectType string ` json:"objectType" xorm:"object_type" `
LocationInfo string ` json:"locationInfo" `
Province string ` json:"province" `
AlarmStatus int ` json:"alarmStatus" xorm:"alarm_status" `
SpecificProblem string ` json:"specificProblem" `
SpecificProblemID string ` json:"specificProblemID" xorm:"specific_problem_id" `
AddInfo string ` json:"addInfo" `
// ClearType int `json:"-" xorm:"clear_type"` // 0: Unclear, 1: Auto clear, 2: Manual clear
// ClearTime sql.NullTime `json:"-" xorm:"clear_time"`
}
var client = resty . New ( )
func init ( ) {
/*
client.
SetTimeout(10 * time.Second).
SetRetryCount(1).
SetRetryWaitTime(1 * time.Second).
SetRetryMaxWaitTime(2 * time.Second).
SetRetryAfter(func(client *resty.Client, resp *resty.Response) (time.Duration, error) {
return 0, errors.New("quota exceeded")
})
*/
client .
SetTimeout ( time . Duration ( 400 * time . Millisecond ) )
// SetRetryCount(1).
// SetRetryWaitTime(time.Duration(1 * time.Second)).
// SetRetryMaxWaitTime(time.Duration(2 * time.Second))
//client.SetTimeout(2 * time.Second)
}
func ( s * BarProcessor ) Execute ( data any ) ( any , error ) {
var err error
s . count ++
// options := data.(cron.JobData)
// // sysJob := options.SysJob
// // var params BarParams
// // // err := json.Unmarshal([]byte(sysJob.TargetParams), ¶ms)
// // // if err == nil {
// // // duration = params.Duration
// // // }
var nes [ ] dborm . NeInfo
_ , err = dborm . XormGetAllNeInfo ( & nes )
if err != nil {
log . Error ( "Failed to get all ne info:" , err )
}
succActiveAlarmNum := 0
failActiveAlarmNum := 0
succClearAlarmNum := 0
failClearAlarmNum := 0
for _ , ne := range nes {
log . Debug ( "ne:" , ne )
sql := fmt . Sprintf ( "select * from ne_state where ne_type = '%s' and ne_id = '%s' order by timestamp desc limit 1" , ne . NeType , ne . NeId )
neState , err := dborm . XormGetDataBySQL ( sql )
if err != nil {
log . Error ( "Failed to get ne_state:" , err )
continue
}
if len ( * neState ) == 0 {
log . Warn ( "Not found record in ne_state:" )
continue
}
log . Debug ( "neState:" , * neState )
params := "10000"
alarmDefine , err := dborm . XormGetAlarmDefine ( params )
if err != nil {
log . Error ( "Failed to get alarm_define:" , err )
continue
} else if alarmDefine == nil {
log . Error ( "Not found data from alarm_define" )
continue
}
log . Debug ( "alarmDefine:" , alarmDefine )
sql = fmt . Sprintf ( "select * from alarm where alarm_id = '%s' and ne_type='%s' and ne_id = '%s' order by event_time desc limit 1" ,
alarmDefine . AlarmId , ne . NeType , ne . RmUID )
alarm , err := dborm . XormGetDataBySQL ( sql )
if err != nil {
log . Error ( "Failed to get alarm:" , err )
continue
}
log . Debug ( "alarm:" , * alarm )
var timestamp string
if len ( * neState ) == 0 {
log . Infof ( "Not found ne_state neType:%s, neId:%s" , ne . NeType , ne . NeId )
timestamp = ne . UpdateTime
} else {
timestamp = ( * neState ) [ 0 ] [ "timestamp" ]
}
// 解析日期时间字符串为时间对象
seconds , err := global . GetSecondsSinceDatetime ( timestamp )
if err != nil {
log . Error ( "Failed to GetSecondsSinceDatetime:" , err )
continue
}
log . Debugf ( "timestamp:%s seconds:%d" , timestamp , seconds )
if seconds <= alarmDefine . Threshold {
if len ( * alarm ) == 0 || ( * alarm ) [ 0 ] [ "alarm_status" ] == fm . AlarmStatusClearString {
continue
}
// clear alarm, todo
var alarmSeq int = 1
SpecificProblem := fmt . Sprintf ( alarmDefine . SpecificProblem , alarmDefine . Threshold )
locationInfo := fmt . Sprintf ( "SystemManagement.State: NE heartbeat timestamp=%s, threshold=%v" , timestamp , alarmDefine . Threshold )
alarmData := & Alarm {
AlarmSeq : alarmSeq ,
AlarmId : alarmDefine . AlarmId ,
NeId : ne . RmUID ,
NeType : ne . NeType ,
NeName : ne . NeName ,
Province : ne . Province ,
PVFlag : ne . PvFlag ,
AlarmCode : alarmDefine . AlarmCode ,
AlarmTitle : alarmDefine . AlarmTitle ,
AlarmType : alarmDefine . AlarmType ,
AlarmStatus : fm . AlarmStatusClear ,
OrigSeverity : alarmDefine . OrigSeverity ,
ObjectUid : alarmDefine . ObjectUid ,
ObjectName : alarmDefine . ObjectName ,
ObjectType : alarmDefine . ObjectType ,
LocationInfo : locationInfo ,
SpecificProblem : SpecificProblem ,
SpecificProblemID : alarmDefine . SpecificProblemId ,
AddInfo : alarmDefine . AddInfo ,
EventTime : time . Now ( ) . Local ( ) . Format ( time . RFC3339 ) ,
}
alarmArray := & [ ] Alarm { * alarmData }
body , _ := json . Marshal ( alarmArray )
log . Debug ( "body: " , string ( body ) )
var response * resty . Response
requestURI := fmt . Sprintf ( "/api/rest/faultManagement/v1/elementType/%s/objectType/alarms" , ne . NeType )
restHost := fmt . Sprintf ( "http://127.0.0.1:%d" , config . GetYamlConfig ( ) . Rest [ 0 ] . Port )
requestURL := fmt . Sprintf ( "%s%s" , restHost , requestURI )
log . Debug ( "requestURL: POST " , requestURL )
response , err = client . R ( ) .
EnableTrace ( ) .
SetHeaders ( map [ string ] string { "User-Agent" : config . GetDefaultUserAgent ( ) } ) .
SetHeaders ( map [ string ] string { "Content-Type" : "application/json;charset=UTF-8" } ) .
SetBody ( body ) .
Post ( requestURL )
if err != nil {
log . Error ( "Failed to post:" , err )
failClearAlarmNum ++
continue
}
log . Debug ( "StatusCode: " , response . StatusCode ( ) )
switch response . StatusCode ( ) {
case http . StatusOK , http . StatusCreated , http . StatusNoContent , http . StatusAccepted :
log . Debug ( "response body:" , string ( response . Body ( ) ) )
body := new ( map [ string ] interface { } )
_ = json . Unmarshal ( response . Body ( ) , & body )
succClearAlarmNum ++
default :
log . Debug ( "response body:" , string ( response . Body ( ) ) )
body := new ( map [ string ] interface { } )
_ = json . Unmarshal ( response . Body ( ) , & body )
failClearAlarmNum ++
}
} else {
var alarmSeq int = 1
if len ( * alarm ) > 0 && ( * alarm ) [ 0 ] [ "alarm_status" ] == fm . AlarmStatusActiveString {
log . Info ( "System state alarm has exist" )
continue
}
SpecificProblem := fmt . Sprintf ( alarmDefine . SpecificProblem , alarmDefine . Threshold )
locationInfo := fmt . Sprintf ( "SystemManagement.State: NE heartbeat timestamp=%s, threshold=%v" , timestamp , alarmDefine . Threshold )
alarmData := & Alarm {
AlarmSeq : alarmSeq ,
AlarmId : alarmDefine . AlarmId ,
NeId : ne . RmUID ,
NeType : ne . NeType ,
NeName : ne . NeName ,
Province : ne . Province ,
PVFlag : ne . PvFlag ,
AlarmCode : alarmDefine . AlarmCode ,
AlarmTitle : alarmDefine . AlarmTitle ,
AlarmType : alarmDefine . AlarmType ,
AlarmStatus : fm . AlarmStatusActive ,
OrigSeverity : alarmDefine . OrigSeverity ,
ObjectUid : alarmDefine . ObjectUid ,
ObjectName : alarmDefine . ObjectName ,
ObjectType : alarmDefine . ObjectType ,
LocationInfo : locationInfo ,
SpecificProblem : SpecificProblem ,
SpecificProblemID : alarmDefine . SpecificProblemId ,
AddInfo : alarmDefine . AddInfo ,
EventTime : time . Now ( ) . Local ( ) . Format ( time . RFC3339 ) ,
}
alarmArray := & [ ] Alarm { * alarmData }
body , _ := json . Marshal ( alarmArray )
log . Debug ( "body: " , string ( body ) )
var response * resty . Response
requestURI := fmt . Sprintf ( "/api/rest/faultManagement/v1/elementType/%s/objectType/alarms" , ne . NeType )
restHost := fmt . Sprintf ( "http://127.0.0.1:%d" , config . GetYamlConfig ( ) . Rest [ 0 ] . Port )
requestURL := fmt . Sprintf ( "%s%s" , restHost , requestURI )
log . Debug ( "requestURL: POST " , requestURL )
response , err = client . R ( ) .
EnableTrace ( ) .
SetHeaders ( map [ string ] string { "User-Agent" : config . GetDefaultUserAgent ( ) } ) .
SetHeaders ( map [ string ] string { "Content-Type" : "application/json;charset=UTF-8" } ) .
SetBody ( body ) .
Post ( requestURL )
if err != nil {
log . Error ( "Failed to post:" , err )
failActiveAlarmNum ++
continue
}
log . Debug ( "StatusCode: " , response . StatusCode ( ) )
switch response . StatusCode ( ) {
case http . StatusOK , http . StatusCreated , http . StatusNoContent , http . StatusAccepted :
log . Debug ( "response body:" , string ( response . Body ( ) ) )
body := new ( map [ string ] interface { } )
_ = json . Unmarshal ( response . Body ( ) , & body )
succActiveAlarmNum ++
default :
log . Debug ( "response body:" , string ( response . Body ( ) ) )
body := new ( map [ string ] interface { } )
_ = json . Unmarshal ( response . Body ( ) , & body )
failActiveAlarmNum ++
}
}
}
// 返回结果,用于记录执行结果
return map [ string ] any {
"succActiveAlarmNum" : succActiveAlarmNum ,
"failActiveAlarmNum" : failActiveAlarmNum ,
"succClearAlarmNum" : succClearAlarmNum ,
"failClearAlarmNum" : failClearAlarmNum ,
} , nil
}