diff --git a/misc/omcsvc.sh b/misc/omcsvc.sh index 42fdc1e0..9685dd44 100755 --- a/misc/omcsvc.sh +++ b/misc/omcsvc.sh @@ -8,15 +8,11 @@ BinDir=/usr/local/omc/bin case "$1" in start) for procName in $ProcListDesc;do - echo -n "Starting $procName process ... " echo -n "Starting $procName process ... " systemctl start $procName if [ $? = 0 ]; then echo "done" fi - if [ $? = 0 ]; then - echo "done" - fi sleep 1 done ;; diff --git a/src/modules/crontask/processor/exportData/exportData.go b/src/modules/crontask/processor/exportData/exportData.go deleted file mode 100644 index bbd60385..00000000 --- a/src/modules/crontask/processor/exportData/exportData.go +++ /dev/null @@ -1,151 +0,0 @@ -package exportData - -import ( - "database/sql" - "encoding/csv" - "encoding/json" - "fmt" - "os" - - // "path/filepath" - "time" - - "be.ems/lib/dborm" - "be.ems/lib/log" - "be.ems/src/framework/cron" -) - -var NewProcessor = &BarProcessor{ - progress: 0, - count: 0, -} - -// bar 队列任务处理 -type BarProcessor struct { - // 任务进度 - progress int - // 执行次数 - count int -} - -type BarParams struct { - TableName string `json:"tableName"` - Columns string `json:"columns"` // exported column name of time string - Extras string `json:"extras"` // extras condition for where - FileType string `json:"fileType"` // file type: txt/csv - FilePath string `json:"filePath"` // file path -} - -func (s *BarProcessor) Execute(data any) (any, error) { - s.count++ - - options := data.(cron.JobData) - sysJob := options.SysJob - var params BarParams - - err := json.Unmarshal([]byte(sysJob.TargetParams), ¶ms) - if err != nil { - log.Error("failed to unmarshal:", err) - return nil, err - } - - // mkdir if not exist - if _, err = os.Stat(params.FilePath); os.IsNotExist(err) { - err = os.MkdirAll(params.FilePath, os.ModePerm) - if err != nil { - log.Error("Failed to Mkdir:", err) - return nil, err - } - } - - var query string - if params.Extras != "" { - query = fmt.Sprintf("SELECT %s FROM `%s` WHERE %s", - params.Columns, params.TableName, params.Extras) - } else { - query = fmt.Sprintf("SELECT %s FROM `%s`", - params.Columns, params.TableName) - } - log.Trace("query:", query) - var filePath string - if params.FileType == "txt" { - filePath = fmt.Sprintf("%s/%s_export_%s.txt", params.FilePath, params.TableName, time.Now().Format("20060102150405")) - // query = fmt.Sprintf("%s INTO OUTFILE '%s' FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'", - // query, fmt.Sprintf("%s/%s_export_%s.txt", params.FilePath, params.TableName, time.Now().Format("20060102150405"))) - } else { - // 默认导出 csv 文件 - filePath = fmt.Sprintf("%s/%s_export_%s.csv", params.FilePath, params.TableName, time.Now().Format("20060102150405")) - // query = fmt.Sprintf("%s INTO OUTFILE '%s' FIELDS TERMINATED BY ',' ENCLOSED BY '\"' LINES TERMINATED BY '\n'", - // query, fmt.Sprintf("%s/%s_export_%s.csv", params.FilePath, params.TableName, time.Now().Format("20060102150405"))) - } - // filePath := fmt.Sprintf("%s/%s_export_%s.csv", params.FilePath, params.TableName, time.Now().Format("20060102150405")) - affected, err := s.exportData(query, filePath) - if err != nil { - log.Errorf("failed to export data: %v", err) - return nil, err - } - - // 返回结果,用于记录执行结果 - return map[string]any{ - "msg": "sucess", - "filePath": filePath, - "affected": affected, - }, nil -} - -func (s *BarProcessor) exportData(query, filePath string) (int64, error) { - rows, err := dborm.XCoreDB().Query(query) - if err != nil { - return 0, err - } - defer rows.Close() - - // 创建 文件 - file, err := os.Create(filePath) - if err != nil { - return 0, err - } - defer file.Close() - - writer := csv.NewWriter(file) - defer writer.Flush() - - // 写入表头 - columns, _ := rows.ColumnTypes() - header := make([]string, len(columns)) - for i, col := range columns { - header[i] = col.Name() - } - if err := writer.Write(header); err != nil { - return 0, err - } - - // 写入数据 - var affected int64 = 0 - for rows.Next() { - values := make([]sql.RawBytes, len(columns)) - scanArgs := make([]interface{}, len(columns)) - for i := range values { - scanArgs[i] = &values[i] - } - - if err := rows.Scan(scanArgs...); err != nil { - return 0, err - } - - record := make([]string, len(columns)) - for i, val := range values { - if val == nil { - record[i] = "" - } else { - record[i] = string(val) - } - } - affected++ - if err := writer.Write(record); err != nil { - return affected, err - } - } - - return affected, nil -} diff --git a/src/modules/crontask/processor/exportUEData/exportUEData.go b/src/modules/crontask/processor/exportUEData/exportUEData.go new file mode 100644 index 00000000..fa406e07 --- /dev/null +++ b/src/modules/crontask/processor/exportUEData/exportUEData.go @@ -0,0 +1,225 @@ +package exportUEData + +import ( + "database/sql" + "encoding/csv" + "encoding/json" + "fmt" + "os" + + // "path/filepath" + "time" + + "be.ems/lib/dborm" + "be.ems/lib/log" + "be.ems/src/framework/cron" + networkdata "be.ems/src/modules/network_data" +) + +var NewProcessor = &BarProcessor{ + progress: 0, + count: 0, +} + +// bar 队列任务处理 +type BarProcessor struct { + // 任务进度 + progress int + // 执行次数 + count int +} + +type BarParams struct { + TableName string `json:"tableName"` + Columns string `json:"columns"` // exported column name of time string + Extras string `json:"extras"` // extras condition for where + ServiceName string `json:"serviceName"` // data service name + OrderBy string `json:"orderBy"` + OrderType string `json:"orderType"` // order type: asc/desc + FileType string `json:"fileType"` // file type: txt/csv + FilePath string `json:"filePath"` // file path +} + +const ( + FILE_TYPE_TXT = "txt" + FILE_TYPE_CSV = "csv" + NE_TYPE_UDM = "UDM" +) + +func (s *BarProcessor) Execute(data any) (any, error) { + s.count++ + + options := data.(cron.JobData) + sysJob := options.SysJob + params := make([]BarParams, 0) + // 解析参数 + err := json.Unmarshal([]byte(sysJob.TargetParams), ¶ms) + if err != nil { + log.Error("failed to unmarshal:", err) + return nil, err + } + + var results []map[string]any + for _, param := range params { + log.Trace("param:", param) + result, err := s.exportUEData(param) + if err != nil { + log.Error("failed to export data:", err) + return nil, err + } + log.Trace("export result:", result) + results = append(results, result) + } + // 返回结果,用于记录执行结果 + + // 返回结果,用于记录执行结果 + return map[string]any{ + "msg": "sucess", + "results": results, + }, nil +} + +func (s *BarProcessor) exportUEData(param BarParams) (map[string]any, error) { + // mkdir if not exist + var err error + if _, err = os.Stat(param.FilePath); os.IsNotExist(err) { + err = os.MkdirAll(param.FilePath, os.ModePerm) + if err != nil { + log.Error("Failed to Mkdir:", err) + return nil, err + } + } + // load data from udm-xxx + var neIDs []string + // 1. 获取所有的 ne_id + err = dborm.DefaultDB().Table("ne_info").Where("ne_type=?", NE_TYPE_UDM).Select("ne_id"). + Find(&neIDs).Error + if err != nil { + log.Error("failed to get ne_ids:", err) + return nil, err + } + var fps []string + var affectedArr []int64 + for _, neID := range neIDs { + // 1. 加载最新数据, 如果数据服务存在,则重新加载数据 + dataService, err := networkdata.GetService(param.ServiceName) + if err != nil { + log.Warn("failed to get data service:", err) + } else if dataService != nil { + // 重新加载数据 + data := dataService.ResetData(neID) + if data == 0 { + log.Error("failed to load data:", err) + return nil, err + } + log.Trace("load data:", data) + } + + // 2. 构造查询语句 + var query string + if param.Extras != "" { + query = fmt.Sprintf("SELECT %s FROM `%s` WHERE ne_id = %s and %s", + param.Columns, param.TableName, neID, param.Extras) + } else { + query = fmt.Sprintf("SELECT %s FROM `%s` WHERE ne_id = %s", + param.Columns, param.TableName, neID) + } + if param.OrderBy != "" { + if param.OrderType != "desc" { + query += fmt.Sprintf(" ORDER BY %s desc", param.OrderBy) + } else { + query += fmt.Sprintf(" ORDER BY %s asc", param.OrderBy) + } + } + log.Trace("query:", query) + + // 3. 构造文件路径 + var filePath string + if param.FileType == FILE_TYPE_TXT { + filePath = fmt.Sprintf("%s/%s_export_%s_%s.txt", + param.FilePath, param.TableName, time.Now().Format("20060102150405"), neID) + } else { + // 默认导出 csv 文件 + filePath = fmt.Sprintf("%s/%s_export_%s_%s.csv", + param.FilePath, param.TableName, time.Now().Format("20060102150405"), neID) + } + + // 4. 导出数据 + affected, err := s.exportData(query, filePath, param.FileType) + if err != nil { + log.Errorf("failed to export data: %v", err) + return nil, err + } + log.Trace("exported data:", affected) + fps = append(fps, filePath) + affectedArr = append(affectedArr, affected) + } + // 5. 返回结果 + result := map[string]any{ + "msg": "sucess", + "table": param.TableName, + "ne_id": neIDs, + "affected": affectedArr, + } + return result, nil +} + +func (s *BarProcessor) exportData(query, filePath string, fileType string) (int64, error) { + rows, err := dborm.XCoreDB().Query(query) + if err != nil { + return 0, err + } + defer rows.Close() + + // 创建 文件 + file, err := os.Create(filePath) + if err != nil { + return 0, err + } + defer file.Close() + + writer := csv.NewWriter(file) + defer writer.Flush() + + columns, _ := rows.ColumnTypes() + + // 写入表头, txt no header + if fileType != FILE_TYPE_TXT { + header := make([]string, len(columns)) + for i, col := range columns { + header[i] = col.Name() + } + if err := writer.Write(header); err != nil { + return 0, err + } + } + + // 写入数据 + var affected int64 = 0 + for rows.Next() { + values := make([]sql.RawBytes, len(columns)) + scanArgs := make([]interface{}, len(columns)) + for i := range values { + scanArgs[i] = &values[i] + } + + if err := rows.Scan(scanArgs...); err != nil { + return 0, err + } + + record := make([]string, len(columns)) + for i, val := range values { + if val == nil { + record[i] = "" + } else { + record[i] = string(val) + } + } + affected++ + if err := writer.Write(record); err != nil { + return affected, err + } + } + + return affected, nil +} diff --git a/src/modules/crontask/processor/processor.go b/src/modules/crontask/processor/processor.go index 76eae1f8..5de1f3b0 100644 --- a/src/modules/crontask/processor/processor.go +++ b/src/modules/crontask/processor/processor.go @@ -5,7 +5,7 @@ import ( "be.ems/src/modules/crontask/processor/backupEtcFromNE" "be.ems/src/modules/crontask/processor/delExpiredNeBackup" "be.ems/src/modules/crontask/processor/deleteExpiredRecord" - "be.ems/src/modules/crontask/processor/exportData" + "be.ems/src/modules/crontask/processor/exportUE" "be.ems/src/modules/crontask/processor/exportTable" "be.ems/src/modules/crontask/processor/genNeStateAlarm" "be.ems/src/modules/crontask/processor/getStateFromNE" @@ -31,5 +31,5 @@ func InitCronQueue() { cron.CreateQueue("genNeStateAlarm", genNeStateAlarm.NewProcessor) cron.CreateQueue("exportTable", exportTable.NewProcessor) cron.CreateQueue("removeFile", removeFile.NewProcessor) - cron.CreateQueue("exportData", exportData.NewProcessor) + cron.CreateQueue("exportUE", exportUE.NewProcessor) } diff --git a/src/modules/network_data/network_data.go b/src/modules/network_data/network_data.go index 88b2758b..5676c2a1 100644 --- a/src/modules/network_data/network_data.go +++ b/src/modules/network_data/network_data.go @@ -9,6 +9,7 @@ import ( "be.ems/src/modules/network_data/service" "github.com/gin-gonic/gin" + "fmt" ) // 模块路由注册 @@ -323,6 +324,34 @@ func Setup(router *gin.Engine) { } } +// ResettableService 接口定义 +type ResettableService interface { + ResetData(neID string) int64 +} + +// 服务注册表 +var serviceRegistry = make(map[string]ResettableService) +func RegisterService(name string, service ResettableService) { + serviceRegistry[name] = service +} + +// 获取服务 +func GetService(name string) (ResettableService, error) { + service, exists := serviceRegistry[name] + if !exists { + return nil, fmt.Errorf("service %s not found", name) + } + return service, nil +} + +// 初始化注册表 +func init() { + RegisterService("UDMAuthData", service.NewUDMAuthUser) + RegisterService("UDMSubUser", service.NewUDMSubUser) + + // 这里注册更多服务 +} + // InitLoad 初始参数 func InitLoad() { // 启动时,加载UPF上下行流量