diff --git a/src/modules/crontask/processor/exportTable/exportTable.go b/src/modules/crontask/processor/exportTable/exportTable.go new file mode 100644 index 00000000..8411f7ce --- /dev/null +++ b/src/modules/crontask/processor/exportTable/exportTable.go @@ -0,0 +1,160 @@ +package exportTable + +import ( + "database/sql" + "encoding/csv" + "encoding/json" + "fmt" + "os" + "time" + + "be.ems/lib/core/datasource" + "be.ems/lib/log" + "be.ems/src/framework/cron" +) + +var NewProcessor = &BarProcessor{ + progress: 0, + count: 0, +} + +// bar 队列任务处理 +type BarProcessor struct { + // 任务进度 + progress int + // 执行次数 + count int +} + +type BarParams struct { + Duration int `json:"duration"` + TableName string `json:"tableName"` + Columns string `json:"columns"` // exported column name of time string + TimeCol string `json:"timeCol"` // time stamp of column name + TimeUnit string `json:"timeUnit"` // timestamp unit: second/micro/milli + Extras string `json:"extras"` // extras condition for where + FilePath string `json:"filePath"` // file path +} + +func (s *BarProcessor) Execute(data any) (any, error) { + s.count++ + + options := data.(cron.JobData) + sysJob := options.SysJob + var params BarParams + + err := json.Unmarshal([]byte(sysJob.TargetParams), ¶ms) + if err != nil { + return nil, err + } + + // mkdir if not exist + if _, err = os.Stat(params.FilePath); os.IsNotExist(err) { + err = os.MkdirAll(params.FilePath, os.ModePerm) + if err != nil { + log.Error("Failed to Mkdir:", err) + return nil, err + } + } + //duration = params.Duration + + now := time.Now() + end := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location()) + start := end.Add(-time.Duration(params.Duration) * time.Hour) + + var startTime, endTime int64 + switch params.TimeUnit { + case "second": + // 格式化时间戳为秒级 + startTime = start.Unix() + endTime = end.Unix() + case "milli": + // 格式化时间戳为毫秒级 + startTime = start.UnixMilli() + endTime = end.UnixMilli() + case "micro": + // 格式化时间戳为微妙级 + startTime = start.UnixMicro() + endTime = end.UnixMicro() + default: + return nil, fmt.Errorf("error input parameter") + } + var query string + if params.Extras != "" { + query = fmt.Sprintf("SELECT %s FROM `%s` WHERE `%s` >= %d AND `%s` < %d AND %s", + params.Columns, params.TableName, params.TimeCol, startTime, params.TimeCol, endTime, params.Extras) + } else { + query = fmt.Sprintf("SELECT %s FROM `%s` WHERE `%s` >= %d AND `%s` < %d", + params.Columns, params.TableName, params.TimeCol, startTime, params.TimeCol, endTime) + } + log.Trace("query:", query) + filePath := fmt.Sprintf("%s/%s_export_%s.csv", params.FilePath, params.TableName, time.Now().Format("20060102150405")) + affected, err := s.exportData(query, filePath) + if err != nil { + return nil, err + } + + // 返回结果,用于记录执行结果 + return map[string]any{ + "msg": "sucess", + "filePath": filePath, + "affected": affected, + }, nil +} + +func (s *BarProcessor) exportData(query, filePath string) (int64, error) { + rows, err := datasource.DefaultDB().DB().Query(query) + if err != nil { + return 0, err + } + defer rows.Close() + + // 创建 CSV 文件 + file, err := os.Create(filePath) + if err != nil { + return 0, err + } + defer file.Close() + + writer := csv.NewWriter(file) + defer writer.Flush() + + // 写入表头 + columns, _ := rows.ColumnTypes() + header := make([]string, len(columns)) + for i, col := range columns { + header[i] = col.Name() + } + if err := writer.Write(header); err != nil { + return 0, err + } + + // 写入数据 + var affected int64 = 0 + for rows.Next() { + values := make([]sql.RawBytes, len(columns)) + scanArgs := make([]interface{}, len(columns)) + for i := range values { + scanArgs[i] = &values[i] + } + + if err := rows.Scan(scanArgs...); err != nil { + return 0, err + } + + record := make([]string, len(columns)) + for i, val := range values { + if val == nil { + record[i] = "" + } else { + record[i] = string(val) + } + } + affected++ + if err := writer.Write(record); err != nil { + return affected, err + } + } + + return affected, nil +} diff --git a/src/modules/crontask/processor/processor.go b/src/modules/crontask/processor/processor.go index 81ce9de1..09176717 100644 --- a/src/modules/crontask/processor/processor.go +++ b/src/modules/crontask/processor/processor.go @@ -5,6 +5,7 @@ import ( "be.ems/src/modules/crontask/processor/backupEtcFromNE" "be.ems/src/modules/crontask/processor/delExpiredNeBackup" "be.ems/src/modules/crontask/processor/deleteExpiredRecord" + "be.ems/src/modules/crontask/processor/exportTable" "be.ems/src/modules/crontask/processor/genNeStateAlarm" "be.ems/src/modules/crontask/processor/getStateFromNE" processorMonitorSysResource "be.ems/src/modules/crontask/processor/monitor_sys_resource" @@ -23,4 +24,5 @@ func InitCronQueue() { cron.CreateQueue("backupEtcFromNE", backupEtcFromNE.NewProcessor) cron.CreateQueue("getStateFromNE", getStateFromNE.NewProcessor) cron.CreateQueue("genNeStateAlarm", genNeStateAlarm.NewProcessor) + cron.CreateQueue("exportTable", exportTable.NewProcessor) }