add: export log&CDR&event task
This commit is contained in:
@@ -795,20 +795,21 @@ func GetUENumFromNF(w http.ResponseWriter, r *http.Request) {
|
|||||||
// UENum int `json:"ueNum"` // 当前在线用户数
|
// UENum int `json:"ueNum"` // 当前在线用户数
|
||||||
// }
|
// }
|
||||||
|
|
||||||
// SmfUENum SMF在线用户数
|
// UENumInfo IMS/SMF在线用户数
|
||||||
type UENumInfo struct {
|
type UENumInfo struct {
|
||||||
NeType string `json:"neType"`
|
NeType string `json:"neType"`
|
||||||
NeId string `json:"neId"`
|
NeId string `json:"neId"`
|
||||||
UENum int `json:"ueNum"` // 当前在线用户数
|
Data struct {
|
||||||
|
UENum int `json:"ueNum"` // 当前在线用户数
|
||||||
|
} `json:"data"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type UENumResponse struct {
|
type UENumResponse struct {
|
||||||
UENums []UENumInfo
|
UENums []UENumInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get UEInfo from SMF
|
// Get UEInfo from SMF
|
||||||
func NewGetUENumFromNF(w http.ResponseWriter, r *http.Request) {
|
func NewGetUENumFromNF(w http.ResponseWriter, r *http.Request) {
|
||||||
log.Info("GetUENumFromNF processing... ")
|
log.Info("NewGetUENumFromNF processing... ")
|
||||||
|
|
||||||
vars := mux.Vars(r)
|
vars := mux.Vars(r)
|
||||||
neType := vars["elementTypeValue"]
|
neType := vars["elementTypeValue"]
|
||||||
|
|||||||
143
src/modules/crontask/processor/exportTable/exportTable.go
Normal file
143
src/modules/crontask/processor/exportTable/exportTable.go
Normal file
@@ -0,0 +1,143 @@
|
|||||||
|
package exportTable
|
||||||
|
|
||||||
|
import (
|
||||||
|
"database/sql"
|
||||||
|
"encoding/csv"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"be.ems/lib/core/datasource"
|
||||||
|
"be.ems/src/framework/cron"
|
||||||
|
)
|
||||||
|
|
||||||
|
var NewProcessor = &BarProcessor{
|
||||||
|
progress: 0,
|
||||||
|
count: 0,
|
||||||
|
}
|
||||||
|
|
||||||
|
// bar 队列任务处理
|
||||||
|
type BarProcessor struct {
|
||||||
|
// 任务进度
|
||||||
|
progress int
|
||||||
|
// 执行次数
|
||||||
|
count int
|
||||||
|
}
|
||||||
|
|
||||||
|
type BarParams struct {
|
||||||
|
Duration int `json:"duration"`
|
||||||
|
TableName string `json:"tableName"`
|
||||||
|
TimeCol string `json:"timeCol"` // time stamp of column name
|
||||||
|
TimeUnit string `json:"timeUnit"` // timestamp unit: second/micro/milli
|
||||||
|
ExportCols string `json:"exportCols"` // exported column name of time string
|
||||||
|
Extras string `json:"extras"` // extras condition for where
|
||||||
|
FilePath string `json:"filePath"` // file path
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BarProcessor) Execute(data any) (any, error) {
|
||||||
|
s.count++
|
||||||
|
|
||||||
|
options := data.(cron.JobData)
|
||||||
|
sysJob := options.SysJob
|
||||||
|
var params BarParams
|
||||||
|
|
||||||
|
err := json.Unmarshal([]byte(sysJob.TargetParams), ¶ms)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
//duration = params.Duration
|
||||||
|
|
||||||
|
// 查询数据
|
||||||
|
var unitNum int = 0
|
||||||
|
switch params.TimeUnit {
|
||||||
|
case "second":
|
||||||
|
unitNum = 1
|
||||||
|
case "milli":
|
||||||
|
unitNum = 1000
|
||||||
|
case "micro":
|
||||||
|
unitNum = 1000000
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("error input parameter")
|
||||||
|
}
|
||||||
|
var query string
|
||||||
|
if params.Extras != "" {
|
||||||
|
query = fmt.Sprintf("SELECT %s FROM `%s` WHERE `%s` >= UNIX_TIMESTAMP(NOW() - INTERVAL 2*%d HOUR) * %d AND `%s` < UNIX_TIMESTAMP(NOW() - INTERVAL %d HOUR) * %d AND %s",
|
||||||
|
params.ExportCols, params.TableName, params.TimeCol, params.Duration, unitNum, params.TimeCol, params.Duration, unitNum, params.Extras)
|
||||||
|
} else {
|
||||||
|
query = fmt.Sprintf("SELECT %s FROM `%s` WHERE `%s` >= UNIX_TIMESTAMP(NOW() - INTERVAL 2*%d HOUR) * %d AND `%s` < UNIX_TIMESTAMP(NOW() - INTERVAL %d HOUR) * %d",
|
||||||
|
params.ExportCols, params.TableName, params.TimeCol, params.Duration, unitNum, params.TimeCol, params.Duration, unitNum)
|
||||||
|
}
|
||||||
|
|
||||||
|
//filePath := fmt.Sprintf("%s/%s_export_%s.csv", params.FilePath, params.TableName, time.Now().Format("20060102150405"))
|
||||||
|
filePath := fmt.Sprintf("%s/%s_export_%s.csv", params.FilePath, params.TableName, time.Now().Format("20060102150405"))
|
||||||
|
affected, err := s.exportData(query, filePath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// 返回结果,用于记录执行结果
|
||||||
|
return map[string]any{
|
||||||
|
"msg": "sucess",
|
||||||
|
"filePath": filePath,
|
||||||
|
"affected": affected,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BarProcessor) exportData(query, filePath string) (int64, error) {
|
||||||
|
rows, err := datasource.DefaultDB().DB().Query(query)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
// 创建 CSV 文件
|
||||||
|
file, err := os.Create(filePath)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
defer file.Close()
|
||||||
|
|
||||||
|
writer := csv.NewWriter(file)
|
||||||
|
defer writer.Flush()
|
||||||
|
|
||||||
|
// 写入表头
|
||||||
|
columns, _ := rows.ColumnTypes()
|
||||||
|
header := make([]string, len(columns))
|
||||||
|
for i, col := range columns {
|
||||||
|
header[i] = col.Name()
|
||||||
|
}
|
||||||
|
if err := writer.Write(header); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// 写入数据
|
||||||
|
var affected int64 = 0
|
||||||
|
for rows.Next() {
|
||||||
|
values := make([]sql.RawBytes, len(columns))
|
||||||
|
scanArgs := make([]interface{}, len(columns))
|
||||||
|
for i := range values {
|
||||||
|
scanArgs[i] = &values[i]
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := rows.Scan(scanArgs...); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
record := make([]string, len(columns))
|
||||||
|
for i, val := range values {
|
||||||
|
if val == nil {
|
||||||
|
record[i] = ""
|
||||||
|
} else {
|
||||||
|
record[i] = string(val)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
affected++
|
||||||
|
if err := writer.Write(record); err != nil {
|
||||||
|
return affected, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return affected, nil
|
||||||
|
}
|
||||||
@@ -5,6 +5,7 @@ import (
|
|||||||
"be.ems/src/modules/crontask/processor/backupEtcFromNE"
|
"be.ems/src/modules/crontask/processor/backupEtcFromNE"
|
||||||
"be.ems/src/modules/crontask/processor/delExpiredNeBackup"
|
"be.ems/src/modules/crontask/processor/delExpiredNeBackup"
|
||||||
"be.ems/src/modules/crontask/processor/deleteExpiredRecord"
|
"be.ems/src/modules/crontask/processor/deleteExpiredRecord"
|
||||||
|
"be.ems/src/modules/crontask/processor/exportTable"
|
||||||
"be.ems/src/modules/crontask/processor/genNeStateAlarm"
|
"be.ems/src/modules/crontask/processor/genNeStateAlarm"
|
||||||
"be.ems/src/modules/crontask/processor/getStateFromNE"
|
"be.ems/src/modules/crontask/processor/getStateFromNE"
|
||||||
monitorsysresource "be.ems/src/modules/crontask/processor/monitor_sys_resource"
|
monitorsysresource "be.ems/src/modules/crontask/processor/monitor_sys_resource"
|
||||||
@@ -20,4 +21,5 @@ func InitCronQueue() {
|
|||||||
cron.CreateQueue("backupEtcFromNE", backupEtcFromNE.NewProcessor)
|
cron.CreateQueue("backupEtcFromNE", backupEtcFromNE.NewProcessor)
|
||||||
cron.CreateQueue("getStateFromNE", getStateFromNE.NewProcessor)
|
cron.CreateQueue("getStateFromNE", getStateFromNE.NewProcessor)
|
||||||
cron.CreateQueue("genNeStateAlarm", genNeStateAlarm.NewProcessor)
|
cron.CreateQueue("genNeStateAlarm", genNeStateAlarm.NewProcessor)
|
||||||
|
cron.CreateQueue("exportTable", exportTable.NewProcessor)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user