package exportTable import ( "database/sql" "encoding/csv" "encoding/json" "fmt" "os" "path" "path/filepath" "time" "be.ems/lib/dborm" "be.ems/lib/log" "be.ems/src/framework/config" "be.ems/src/framework/cron" "be.ems/src/framework/logger" "be.ems/src/framework/utils/crypto" "be.ems/src/framework/utils/ssh" systemService "be.ems/src/modules/system/service" "github.com/jlaffaye/ftp" ) var NewProcessor = &BarProcessor{ progress: 0, count: 0, } // bar 队列任务处理 type BarProcessor struct { // 任务进度 progress int // 执行次数 count int } type BarParams struct { Duration int `json:"duration"` TableName string `json:"tableName"` Columns string `json:"columns"` // exported column name of time string TimeCol string `json:"timeCol"` // time stamp of column name TimeUnit string `json:"timeUnit"` // timestamp unit: second/micro/milli Extras string `json:"extras"` // extras condition for where FilePath string `json:"filePath"` // file path } func (s *BarProcessor) Execute(data any) (any, error) { s.count++ options := data.(cron.JobData) sysJob := options.SysJob var params BarParams err := json.Unmarshal([]byte(sysJob.TargetParams), ¶ms) if err != nil { return nil, err } // mkdir if not exist if _, err = os.Stat(params.FilePath); os.IsNotExist(err) { err = os.MkdirAll(params.FilePath, os.ModePerm) if err != nil { log.Error("Failed to Mkdir:", err) return nil, err } } //duration = params.Duration now := time.Now() end := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location()) start := end.Add(-time.Duration(params.Duration) * time.Hour) var startTime, endTime int64 switch params.TimeUnit { case "second": // 格式化时间戳为秒级 startTime = start.Unix() endTime = end.Unix() case "milli": // 格式化时间戳为毫秒级 startTime = start.UnixMilli() endTime = end.UnixMilli() case "micro": // 格式化时间戳为微妙级 startTime = start.UnixMicro() endTime = end.UnixMicro() default: return nil, fmt.Errorf("error input parameter") } var query string if params.Extras != "" { query = fmt.Sprintf("SELECT %s FROM `%s` WHERE `%s` >= %d AND `%s` < %d AND %s", params.Columns, params.TableName, params.TimeCol, startTime, params.TimeCol, endTime, params.Extras) } else { query = fmt.Sprintf("SELECT %s FROM `%s` WHERE `%s` >= %d AND `%s` < %d", params.Columns, params.TableName, params.TimeCol, startTime, params.TimeCol, endTime) } log.Trace("query:", query) filePath := fmt.Sprintf("%s/%s_export_%s.csv", params.FilePath, params.TableName, time.Now().Format("20060102150405")) affected, err := s.exportData(query, filePath) if err != nil { return nil, err } // put ftp s.putFTP(params.FilePath, filepath.Base(filePath)) // 返回结果,用于记录执行结果 return map[string]any{ "msg": "sucess", "filePath": filePath, "affected": affected, }, nil } func (s *BarProcessor) exportData(query, filePath string) (int64, error) { rows, err := dborm.XCoreDB().Query(query) if err != nil { return 0, err } defer rows.Close() // 创建 CSV 文件 file, err := os.Create(filePath) if err != nil { return 0, err } defer file.Close() writer := csv.NewWriter(file) defer writer.Flush() // 写入表头 columns, _ := rows.ColumnTypes() header := make([]string, len(columns)) for i, col := range columns { header[i] = col.Name() } if err := writer.Write(header); err != nil { return 0, err } // 写入数据 var affected int64 = 0 for rows.Next() { values := make([]sql.RawBytes, len(columns)) scanArgs := make([]interface{}, len(columns)) for i := range values { scanArgs[i] = &values[i] } if err := rows.Scan(scanArgs...); err != nil { return 0, err } record := make([]string, len(columns)) for i, val := range values { if val == nil { record[i] = "" } else { record[i] = string(val) } } affected++ if err := writer.Write(record); err != nil { return affected, err } } return affected, nil } func (s BarProcessor) putFTP(filePath, fileName string) { // 获取配置 var cfgData struct { Password string `json:"password" ` Username string `json:"username" binding:"required"` ToIp string `json:"toIp" binding:"required"` ToPort int64 `json:"toPort" binding:"required"` Protocol string `json:"protocol" binding:"required,oneof=ssh ftp"` Dir string `json:"dir" binding:"required"` } cfg := systemService.NewSysConfigImpl.SelectConfigByKey("sys.exportTable") if cfg.ConfigID != "" { // 解密body appKey := config.Get("aes.appKey").(string) bodyDe, err := crypto.AESDecryptBase64(cfg.ConfigValue, appKey) if err != nil { logger.Errorf("putFTP decrypt error: %v", err) return } err = json.Unmarshal([]byte(bodyDe), &cfgData) if err != nil { logger.Errorf("putFTP unmarshal error: %v", err) return } } localFilePath := filepath.Join(filePath, fileName) if cfgData.Protocol == "ssh" { connSSH := ssh.ConnSSH{ User: cfgData.Username, Password: cfgData.Password, Addr: cfgData.ToIp, Port: cfgData.ToPort, AuthMode: "0", } sshClient, err := connSSH.NewClient() if err != nil { logger.Errorf("putFTP ssh error: %v", err) return } defer sshClient.Close() // 网元主机的SSH客户端进行文件传输 sftpClient, err := sshClient.NewClientSFTP() if err != nil { logger.Errorf("putFTP sftp error: %v", err) return } defer sftpClient.Close() // 远程文件 remotePath := filepath.Join(cfgData.Dir, path.Base(filePath), fileName) // 复制到远程 if err = sftpClient.CopyFileLocalToRemote(localFilePath, remotePath); err != nil { logger.Errorf("putFTP uploading error: %v", err) return } } if cfgData.Protocol == "ftp" { // 连接到 FTP 服务器 addr := fmt.Sprintf("%s:%d", cfgData.ToIp, cfgData.ToPort) ftpComm, err := ftp.Dial(addr, ftp.DialWithTimeout(15*time.Second)) if err != nil { logger.Errorf("putFTP ftp error: %v", err) return } // 登录到 FTP 服务器 err = ftpComm.Login(cfgData.Username, cfgData.Password) if err != nil { logger.Errorf("putFTP login error: %v", err) return } defer ftpComm.Quit() // 打开本地文件 file, err := os.Open(localFilePath) if err != nil { logger.Errorf("putFTP open error: %v", err) return } defer file.Close() // 远程文件 remotePath := filepath.Join(cfgData.Dir, path.Base(filePath), fileName) // 上传文件到 FTP 服务器 err = ftpComm.Stor(remotePath, file) if err != nil { logger.Errorf("putFTP uploading error: %v", err) return } } }