diff --git a/features/ue/ue.go b/features/ue/ue.go index bab2f193..4ad125e2 100644 --- a/features/ue/ue.go +++ b/features/ue/ue.go @@ -795,20 +795,21 @@ func GetUENumFromNF(w http.ResponseWriter, r *http.Request) { // UENum int `json:"ueNum"` // 当前在线用户数 // } -// SmfUENum SMF在线用户数 +// UENumInfo IMS/SMF在线用户数 type UENumInfo struct { NeType string `json:"neType"` NeId string `json:"neId"` - UENum int `json:"ueNum"` // 当前在线用户数 + Data struct { + UENum int `json:"ueNum"` // 当前在线用户数 + } `json:"data"` } - type UENumResponse struct { UENums []UENumInfo } // Get UEInfo from SMF func NewGetUENumFromNF(w http.ResponseWriter, r *http.Request) { - log.Info("GetUENumFromNF processing... ") + log.Info("NewGetUENumFromNF processing... ") vars := mux.Vars(r) neType := vars["elementTypeValue"] diff --git a/src/modules/crontask/processor/exportTable/exportTable.go b/src/modules/crontask/processor/exportTable/exportTable.go new file mode 100644 index 00000000..73a847d5 --- /dev/null +++ b/src/modules/crontask/processor/exportTable/exportTable.go @@ -0,0 +1,143 @@ +package exportTable + +import ( + "database/sql" + "encoding/csv" + "encoding/json" + "fmt" + "os" + "time" + + "be.ems/lib/core/datasource" + "be.ems/src/framework/cron" +) + +var NewProcessor = &BarProcessor{ + progress: 0, + count: 0, +} + +// bar 队列任务处理 +type BarProcessor struct { + // 任务进度 + progress int + // 执行次数 + count int +} + +type BarParams struct { + Duration int `json:"duration"` + TableName string `json:"tableName"` + TimeCol string `json:"timeCol"` // time stamp of column name + TimeUnit string `json:"timeUnit"` // timestamp unit: second/micro/milli + ExportCols string `json:"exportCols"` // exported column name of time string + Extras string `json:"extras"` // extras condition for where + FilePath string `json:"filePath"` // file path +} + +func (s *BarProcessor) Execute(data any) (any, error) { + s.count++ + + options := data.(cron.JobData) + sysJob := options.SysJob + var params BarParams + + err := json.Unmarshal([]byte(sysJob.TargetParams), ¶ms) + if err != nil { + return nil, err + } + + //duration = params.Duration + + // 查询数据 + var unitNum int = 0 + switch params.TimeUnit { + case "second": + unitNum = 1 + case "milli": + unitNum = 1000 + case "micro": + unitNum = 1000000 + default: + return nil, fmt.Errorf("error input parameter") + } + var query string + if params.Extras != "" { + query = fmt.Sprintf("SELECT %s FROM `%s` WHERE `%s` >= UNIX_TIMESTAMP(NOW() - INTERVAL 2*%d HOUR) * %d AND `%s` < UNIX_TIMESTAMP(NOW() - INTERVAL %d HOUR) * %d AND %s", + params.ExportCols, params.TableName, params.TimeCol, params.Duration, unitNum, params.TimeCol, params.Duration, unitNum, params.Extras) + } else { + query = fmt.Sprintf("SELECT %s FROM `%s` WHERE `%s` >= UNIX_TIMESTAMP(NOW() - INTERVAL 2*%d HOUR) * %d AND `%s` < UNIX_TIMESTAMP(NOW() - INTERVAL %d HOUR) * %d", + params.ExportCols, params.TableName, params.TimeCol, params.Duration, unitNum, params.TimeCol, params.Duration, unitNum) + } + + //filePath := fmt.Sprintf("%s/%s_export_%s.csv", params.FilePath, params.TableName, time.Now().Format("20060102150405")) + filePath := fmt.Sprintf("%s/%s_export_%s.csv", params.FilePath, params.TableName, time.Now().Format("20060102150405")) + affected, err := s.exportData(query, filePath) + if err != nil { + return nil, err + } + + // 返回结果,用于记录执行结果 + return map[string]any{ + "msg": "sucess", + "filePath": filePath, + "affected": affected, + }, nil +} + +func (s *BarProcessor) exportData(query, filePath string) (int64, error) { + rows, err := datasource.DefaultDB().DB().Query(query) + if err != nil { + return 0, err + } + defer rows.Close() + + // 创建 CSV 文件 + file, err := os.Create(filePath) + if err != nil { + return 0, err + } + defer file.Close() + + writer := csv.NewWriter(file) + defer writer.Flush() + + // 写入表头 + columns, _ := rows.ColumnTypes() + header := make([]string, len(columns)) + for i, col := range columns { + header[i] = col.Name() + } + if err := writer.Write(header); err != nil { + return 0, err + } + + // 写入数据 + var affected int64 = 0 + for rows.Next() { + values := make([]sql.RawBytes, len(columns)) + scanArgs := make([]interface{}, len(columns)) + for i := range values { + scanArgs[i] = &values[i] + } + + if err := rows.Scan(scanArgs...); err != nil { + return 0, err + } + + record := make([]string, len(columns)) + for i, val := range values { + if val == nil { + record[i] = "" + } else { + record[i] = string(val) + } + } + affected++ + if err := writer.Write(record); err != nil { + return affected, err + } + } + + return affected, nil +} diff --git a/src/modules/crontask/processor/processor.go b/src/modules/crontask/processor/processor.go index f190b896..befe8658 100644 --- a/src/modules/crontask/processor/processor.go +++ b/src/modules/crontask/processor/processor.go @@ -5,6 +5,7 @@ import ( "be.ems/src/modules/crontask/processor/backupEtcFromNE" "be.ems/src/modules/crontask/processor/delExpiredNeBackup" "be.ems/src/modules/crontask/processor/deleteExpiredRecord" + "be.ems/src/modules/crontask/processor/exportTable" "be.ems/src/modules/crontask/processor/genNeStateAlarm" "be.ems/src/modules/crontask/processor/getStateFromNE" monitorsysresource "be.ems/src/modules/crontask/processor/monitor_sys_resource" @@ -20,4 +21,5 @@ func InitCronQueue() { cron.CreateQueue("backupEtcFromNE", backupEtcFromNE.NewProcessor) cron.CreateQueue("getStateFromNE", getStateFromNE.NewProcessor) cron.CreateQueue("genNeStateAlarm", genNeStateAlarm.NewProcessor) + cron.CreateQueue("exportTable", exportTable.NewProcessor) }