add: export table at scheduled task
This commit is contained in:
160
src/modules/crontask/processor/exportTable/exportTable.go
Normal file
160
src/modules/crontask/processor/exportTable/exportTable.go
Normal file
@@ -0,0 +1,160 @@
|
||||
package exportTable
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"encoding/csv"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"be.ems/lib/core/datasource"
|
||||
"be.ems/lib/log"
|
||||
"be.ems/src/framework/cron"
|
||||
)
|
||||
|
||||
var NewProcessor = &BarProcessor{
|
||||
progress: 0,
|
||||
count: 0,
|
||||
}
|
||||
|
||||
// bar 队列任务处理
|
||||
type BarProcessor struct {
|
||||
// 任务进度
|
||||
progress int
|
||||
// 执行次数
|
||||
count int
|
||||
}
|
||||
|
||||
type BarParams struct {
|
||||
Duration int `json:"duration"`
|
||||
TableName string `json:"tableName"`
|
||||
Columns string `json:"columns"` // exported column name of time string
|
||||
TimeCol string `json:"timeCol"` // time stamp of column name
|
||||
TimeUnit string `json:"timeUnit"` // timestamp unit: second/micro/milli
|
||||
Extras string `json:"extras"` // extras condition for where
|
||||
FilePath string `json:"filePath"` // file path
|
||||
}
|
||||
|
||||
func (s *BarProcessor) Execute(data any) (any, error) {
|
||||
s.count++
|
||||
|
||||
options := data.(cron.JobData)
|
||||
sysJob := options.SysJob
|
||||
var params BarParams
|
||||
|
||||
err := json.Unmarshal([]byte(sysJob.TargetParams), ¶ms)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// mkdir if not exist
|
||||
if _, err = os.Stat(params.FilePath); os.IsNotExist(err) {
|
||||
err = os.MkdirAll(params.FilePath, os.ModePerm)
|
||||
if err != nil {
|
||||
log.Error("Failed to Mkdir:", err)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
//duration = params.Duration
|
||||
|
||||
now := time.Now()
|
||||
end := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location())
|
||||
start := end.Add(-time.Duration(params.Duration) * time.Hour)
|
||||
|
||||
var startTime, endTime int64
|
||||
switch params.TimeUnit {
|
||||
case "second":
|
||||
// 格式化时间戳为秒级
|
||||
startTime = start.Unix()
|
||||
endTime = end.Unix()
|
||||
case "milli":
|
||||
// 格式化时间戳为毫秒级
|
||||
startTime = start.UnixMilli()
|
||||
endTime = end.UnixMilli()
|
||||
case "micro":
|
||||
// 格式化时间戳为微妙级
|
||||
startTime = start.UnixMicro()
|
||||
endTime = end.UnixMicro()
|
||||
default:
|
||||
return nil, fmt.Errorf("error input parameter")
|
||||
}
|
||||
var query string
|
||||
if params.Extras != "" {
|
||||
query = fmt.Sprintf("SELECT %s FROM `%s` WHERE `%s` >= %d AND `%s` < %d AND %s",
|
||||
params.Columns, params.TableName, params.TimeCol, startTime, params.TimeCol, endTime, params.Extras)
|
||||
} else {
|
||||
query = fmt.Sprintf("SELECT %s FROM `%s` WHERE `%s` >= %d AND `%s` < %d",
|
||||
params.Columns, params.TableName, params.TimeCol, startTime, params.TimeCol, endTime)
|
||||
}
|
||||
log.Trace("query:", query)
|
||||
filePath := fmt.Sprintf("%s/%s_export_%s.csv", params.FilePath, params.TableName, time.Now().Format("20060102150405"))
|
||||
affected, err := s.exportData(query, filePath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 返回结果,用于记录执行结果
|
||||
return map[string]any{
|
||||
"msg": "sucess",
|
||||
"filePath": filePath,
|
||||
"affected": affected,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *BarProcessor) exportData(query, filePath string) (int64, error) {
|
||||
rows, err := datasource.DefaultDB().DB().Query(query)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
// 创建 CSV 文件
|
||||
file, err := os.Create(filePath)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
writer := csv.NewWriter(file)
|
||||
defer writer.Flush()
|
||||
|
||||
// 写入表头
|
||||
columns, _ := rows.ColumnTypes()
|
||||
header := make([]string, len(columns))
|
||||
for i, col := range columns {
|
||||
header[i] = col.Name()
|
||||
}
|
||||
if err := writer.Write(header); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// 写入数据
|
||||
var affected int64 = 0
|
||||
for rows.Next() {
|
||||
values := make([]sql.RawBytes, len(columns))
|
||||
scanArgs := make([]interface{}, len(columns))
|
||||
for i := range values {
|
||||
scanArgs[i] = &values[i]
|
||||
}
|
||||
|
||||
if err := rows.Scan(scanArgs...); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
record := make([]string, len(columns))
|
||||
for i, val := range values {
|
||||
if val == nil {
|
||||
record[i] = ""
|
||||
} else {
|
||||
record[i] = string(val)
|
||||
}
|
||||
}
|
||||
affected++
|
||||
if err := writer.Write(record); err != nil {
|
||||
return affected, err
|
||||
}
|
||||
}
|
||||
|
||||
return affected, nil
|
||||
}
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"be.ems/src/modules/crontask/processor/backupEtcFromNE"
|
||||
"be.ems/src/modules/crontask/processor/delExpiredNeBackup"
|
||||
"be.ems/src/modules/crontask/processor/deleteExpiredRecord"
|
||||
"be.ems/src/modules/crontask/processor/exportTable"
|
||||
"be.ems/src/modules/crontask/processor/genNeStateAlarm"
|
||||
"be.ems/src/modules/crontask/processor/getStateFromNE"
|
||||
processorMonitorSysResource "be.ems/src/modules/crontask/processor/monitor_sys_resource"
|
||||
@@ -23,4 +24,5 @@ func InitCronQueue() {
|
||||
cron.CreateQueue("backupEtcFromNE", backupEtcFromNE.NewProcessor)
|
||||
cron.CreateQueue("getStateFromNE", getStateFromNE.NewProcessor)
|
||||
cron.CreateQueue("genNeStateAlarm", genNeStateAlarm.NewProcessor)
|
||||
cron.CreateQueue("exportTable", exportTable.NewProcessor)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user