feat: ue support job backup export data

This commit is contained in:
zhangsz
2025-03-27 17:25:07 +08:00
parent 90faaaba13
commit e971e0be9d
4 changed files with 250 additions and 10 deletions

View File

@@ -0,0 +1,149 @@
package exportData
import (
"database/sql"
"encoding/csv"
"encoding/json"
"fmt"
"os"
// "path/filepath"
"time"
"be.ems/lib/dborm"
"be.ems/lib/log"
"be.ems/src/framework/cron"
)
var NewProcessor = &BarProcessor{
progress: 0,
count: 0,
}
// bar 队列任务处理
type BarProcessor struct {
// 任务进度
progress int
// 执行次数
count int
}
type BarParams struct {
TableName string `json:"tableName"`
Columns string `json:"columns"` // exported column name of time string
Extras string `json:"extras"` // extras condition for where
FileType string `json:"fileType"` // file type: txt/csv
FilePath string `json:"filePath"` // file path
}
func (s *BarProcessor) Execute(data any) (any, error) {
s.count++
options := data.(cron.JobData)
sysJob := options.SysJob
var params BarParams
err := json.Unmarshal([]byte(sysJob.TargetParams), &params)
if err != nil {
return nil, err
}
// mkdir if not exist
if _, err = os.Stat(params.FilePath); os.IsNotExist(err) {
err = os.MkdirAll(params.FilePath, os.ModePerm)
if err != nil {
log.Error("Failed to Mkdir:", err)
return nil, err
}
}
var query string
if params.Extras != "" {
query = fmt.Sprintf("SELECT %s FROM `%s` WHERE %s",
params.Columns, params.TableName, params.Extras)
} else {
query = fmt.Sprintf("SELECT %s FROM `%s`",
params.Columns, params.TableName)
}
log.Trace("query:", query)
var filePath string
if params.FileType == "txt" {
filePath = fmt.Sprintf("%s/%s_export_%s.txt", params.FilePath, params.TableName, time.Now().Format("20060102150405"))
// query = fmt.Sprintf("%s INTO OUTFILE '%s' FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'",
// query, fmt.Sprintf("%s/%s_export_%s.txt", params.FilePath, params.TableName, time.Now().Format("20060102150405")))
} else {
// 默认导出 csv 文件
filePath = fmt.Sprintf("%s/%s_export_%s.csv", params.FilePath, params.TableName, time.Now().Format("20060102150405"))
// query = fmt.Sprintf("%s INTO OUTFILE '%s' FIELDS TERMINATED BY ',' ENCLOSED BY '\"' LINES TERMINATED BY '\n'",
// query, fmt.Sprintf("%s/%s_export_%s.csv", params.FilePath, params.TableName, time.Now().Format("20060102150405")))
}
// filePath := fmt.Sprintf("%s/%s_export_%s.csv", params.FilePath, params.TableName, time.Now().Format("20060102150405"))
affected, err := s.exportData(query, filePath)
if err != nil {
return nil, err
}
// 返回结果,用于记录执行结果
return map[string]any{
"msg": "sucess",
"filePath": filePath,
"affected": affected,
}, nil
}
func (s *BarProcessor) exportData(query, filePath string) (int64, error) {
rows, err := dborm.XCoreDB().Query(query)
if err != nil {
return 0, err
}
defer rows.Close()
// 创建 CSV 文件
file, err := os.Create(filePath)
if err != nil {
return 0, err
}
defer file.Close()
writer := csv.NewWriter(file)
defer writer.Flush()
// 写入表头
columns, _ := rows.ColumnTypes()
header := make([]string, len(columns))
for i, col := range columns {
header[i] = col.Name()
}
if err := writer.Write(header); err != nil {
return 0, err
}
// 写入数据
var affected int64 = 0
for rows.Next() {
values := make([]sql.RawBytes, len(columns))
scanArgs := make([]interface{}, len(columns))
for i := range values {
scanArgs[i] = &values[i]
}
if err := rows.Scan(scanArgs...); err != nil {
return 0, err
}
record := make([]string, len(columns))
for i, val := range values {
if val == nil {
record[i] = ""
} else {
record[i] = string(val)
}
}
affected++
if err := writer.Write(record); err != nil {
return affected, err
}
}
return affected, nil
}

View File

@@ -5,6 +5,7 @@ import (
"be.ems/src/modules/crontask/processor/backupEtcFromNE"
"be.ems/src/modules/crontask/processor/delExpiredNeBackup"
"be.ems/src/modules/crontask/processor/deleteExpiredRecord"
"be.ems/src/modules/crontask/processor/exportData"
"be.ems/src/modules/crontask/processor/exportTable"
"be.ems/src/modules/crontask/processor/genNeStateAlarm"
"be.ems/src/modules/crontask/processor/getStateFromNE"
@@ -30,4 +31,5 @@ func InitCronQueue() {
cron.CreateQueue("genNeStateAlarm", genNeStateAlarm.NewProcessor)
cron.CreateQueue("exportTable", exportTable.NewProcessor)
cron.CreateQueue("removeFile", removeFile.NewProcessor)
cron.CreateQueue("exportData", exportData.NewProcessor)
}