From 38ae580d6febb6da1f3385f4c916eb66639feb1c Mon Sep 17 00:00:00 2001 From: simonzhangsz Date: Wed, 27 Dec 2023 17:51:24 +0800 Subject: [PATCH] fix: sys job get state and delete ne_state --- features/state/getstate.go | 2 +- lib/dborm/dborm.go | 32 ++++ .../getStateFromNE/getStateFromNE.go | 167 ++++++++++++++++++ src/modules/crontask/processor/processor.go | 2 + 4 files changed, 202 insertions(+), 1 deletion(-) create mode 100644 src/modules/crontask/processor/getStateFromNE/getStateFromNE.go diff --git a/features/state/getstate.go b/features/state/getstate.go index ae7e94be..5f19fda5 100644 --- a/features/state/getstate.go +++ b/features/state/getstate.go @@ -826,7 +826,7 @@ func GetStateFromNF(w http.ResponseWriter, r *http.Request) { result["error"] = errorMessage SN, Version, _ := dborm.XormGetNEStateInfo(ne.NeType, ne.NeId) result["serialNum"] = SN - result["verion"] = Version + result["version"] = Version } else { systemState := make(map[string]interface{}) _ = json.Unmarshal(resp.Body(), &systemState) diff --git a/lib/dborm/dborm.go b/lib/dborm/dborm.go index afc90c8c..0400aa22 100644 --- a/lib/dborm/dborm.go +++ b/lib/dborm/dborm.go @@ -1832,3 +1832,35 @@ func XormGetNEStateInfo(neType, neID string) (string, string, error) { Get(&SN, &Version) return SN, Version, err } + +type NeState struct { + Id int `json:"id" xorm:"pk 'id' autoincr"` + NeType string `json:"neType" xorm:"ne_type"` + NeId string `json:"neId" xorm:"ne_id"` + Version string `json:"version" xorm:"column 'version' VARCHAR(16)"` + Capability uint32 `json:"capability" xorm:"capability"` + SerialNum string `json:"serialNum" xorm:"serial_num"` + ExpiryDate string `json:"expiryDate" xorm:"expiry_date"` + CpuUsage string `json:"cpuUsage" xorm:"cpu_usage"` + MemUsage string `json:"memUsage" xorm:"mem_usage"` + DiskSpace string `json:"diskSpace" xorm:"disk_space"` + Timestamp string `json:"timestamp" xorm:"-" ` +} + +func XormInsertNeState(neState *NeState) (int64, error) { + log.Debug("XormInsertNeState processing... ") + + var affected int64 = 0 + + session := xEngine.NewSession() + defer session.Close() + affected, err := session.InsertOne(neState) + if err != nil { + return 0, err + } + err = session.Commit() + if err != nil { + return 0, err + } + return affected, err +} diff --git a/src/modules/crontask/processor/getStateFromNE/getStateFromNE.go b/src/modules/crontask/processor/getStateFromNE/getStateFromNE.go new file mode 100644 index 00000000..47524903 --- /dev/null +++ b/src/modules/crontask/processor/getStateFromNE/getStateFromNE.go @@ -0,0 +1,167 @@ +package getStateFromNE + +import ( + "encoding/json" + "fmt" + "net/http" + "strings" + "time" + + "ems.agt/lib/dborm" + "ems.agt/lib/log" + "ems.agt/restagent/config" + "github.com/go-resty/resty/v2" +) + +var NewProcessor = &BarProcessor{ + progress: 0, + count: 0, +} + +// bar 队列任务处理 +type BarProcessor struct { + // 任务进度 + progress int + // 执行次数 + count int +} + +type BarParams struct { + Duration int `json:"duration"` +} + +type CpuUsage struct { + NfCpuUsage uint16 `json:"nfCpuUsage"` + SysCpuUsage uint16 `json:"sysCpuUsage"` +} + +type MemUsage struct { + TotalMem uint32 `json:"totalMem"` + NfUsedMem uint32 `json:"nfUsedMem"` + SysMemUsage uint16 `json:"sysMemUsage"` +} + +type PartitionInfo struct { + Total uint32 `json:"total"` // MB + Used uint32 `json:"used"` // MB +} + +type DiskSpace struct { + PartitionNum uint8 `json:"partitionNum"` + + PartitionInfo []PartitionInfo `json:"partitionInfo"` +} + +type SystemState struct { + Version string `json:"version"` + Capability uint32 `json:"capability"` + SerialNum string `json:"serialNum"` + ExpiryDate string `json:"expiryDate"` + //Timestamp string `json:"timestamp"` + + CpuUsage CpuUsage `json:"cpuUsage"` + MemUsage MemUsage `json:"memUsage"` + + DiskSpace DiskSpace `json:"diskSpace"` +} + +var client = resty.New() + +func init() { + /* + client. + SetTimeout(10 * time.Second). + SetRetryCount(1). + SetRetryWaitTime(1 * time.Second). + SetRetryMaxWaitTime(2 * time.Second). + SetRetryAfter(func(client *resty.Client, resp *resty.Response) (time.Duration, error) { + return 0, errors.New("quota exceeded") + }) + */ + client. + SetTimeout(time.Duration(400 * time.Millisecond)) + // SetRetryCount(1). + // SetRetryWaitTime(time.Duration(1 * time.Second)). + // SetRetryMaxWaitTime(time.Duration(2 * time.Second)) + //client.SetTimeout(2 * time.Second) +} + +func (s *BarProcessor) Execute(data any) (any, error) { + var err error + + s.count++ + // options := data.(cron.JobData) + // // sysJob := options.SysJob + // // var params BarParams + + // // // err := json.Unmarshal([]byte(sysJob.TargetParams), ¶ms) + // // // if err == nil { + // // // duration = params.Duration + // // // } + + var nes []dborm.NeInfo + _, err = dborm.XormGetAllNeInfo(&nes) + if err != nil { + log.Error("Failed to get all ne info:", err) + return nil, err + } + + failNum := 0 + succNum := 0 + for _, ne := range nes { + requestURI := fmt.Sprintf("/api/rest/systemManagement/v1/elementType/%s/objectType/systemState", strings.ToLower(ne.NeType)) + requestURL := fmt.Sprintf("http://%s:%s%s", ne.Ip, ne.Port, requestURI) + log.Debug("requestURL: Get", requestURL) + client := resty.New() + response, err := client.R(). + EnableTrace(). + SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}). + SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}). + Get(requestURL) + if err != nil { + log.Error("Failed to Get:", err) + failNum++ + continue + } + + log.Debug("StatusCode: ", response.StatusCode()) + switch response.StatusCode() { + case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted: + log.Debug("response body:", string(response.Body())) + state := new(SystemState) + _ = json.Unmarshal(response.Body(), &state) + neState := new(dborm.NeState) + neState.NeType = ne.NeType + neState.NeId = ne.NeId + neState.Version = state.Version + neState.Capability = state.Capability + neState.SerialNum = state.SerialNum + neState.ExpiryDate = state.ExpiryDate + cu, _ := json.Marshal(state.CpuUsage) + neState.CpuUsage = string(cu) + mu, _ := json.Marshal(state.MemUsage) + neState.MemUsage = string(mu) + ds, _ := json.Marshal(state.DiskSpace) + neState.DiskSpace = string(ds) + log.Debug("neState:", neState) + _, err := dborm.XormInsertNeState(neState) + if err != nil { + log.Error("Failed to insert ne_state:", err) + failNum++ + continue + } + succNum++ + default: + log.Debug("response body:", string(response.Body())) + body := new(map[string]interface{}) + _ = json.Unmarshal(response.Body(), &body) + failNum++ + } + } + + // 返回结果,用于记录执行结果 + return map[string]any{ + "succNum": succNum, + "failNum": failNum, + }, nil +} diff --git a/src/modules/crontask/processor/processor.go b/src/modules/crontask/processor/processor.go index 4a0d8ae1..afe440a1 100644 --- a/src/modules/crontask/processor/processor.go +++ b/src/modules/crontask/processor/processor.go @@ -5,6 +5,7 @@ import ( "ems.agt/src/modules/crontask/processor/backupEtcFromNE" "ems.agt/src/modules/crontask/processor/delExpiredNeBackup" "ems.agt/src/modules/crontask/processor/deleteExpiredRecord" + "ems.agt/src/modules/crontask/processor/getStateFromNE" monitorsysresource "ems.agt/src/modules/crontask/processor/monitor_sys_resource" ) @@ -16,4 +17,5 @@ func InitCronQueue() { cron.CreateQueue("delExpiredNeBackup", delExpiredNeBackup.NewProcessor) cron.CreateQueue("deleteExpiredRecord", deleteExpiredRecord.NewProcessor) cron.CreateQueue("backupEtcFromNE", backupEtcFromNE.NewProcessor) + cron.CreateQueue("getStateFromNE", getStateFromNE.NewProcessor) }