feat: support export UE data on task scheduled

This commit is contained in:
zhangsz
2025-04-01 11:44:39 +08:00
parent c12061ce36
commit cfcef54ea8
5 changed files with 256 additions and 157 deletions

View File

@@ -8,15 +8,11 @@ BinDir=/usr/local/omc/bin
case "$1" in
start)
for procName in $ProcListDesc;do
echo -n "Starting $procName process ... "
echo -n "Starting $procName process ... "
systemctl start $procName
if [ $? = 0 ]; then
echo "done"
fi
if [ $? = 0 ]; then
echo "done"
fi
sleep 1
done
;;

View File

@@ -1,151 +0,0 @@
package exportData
import (
"database/sql"
"encoding/csv"
"encoding/json"
"fmt"
"os"
// "path/filepath"
"time"
"be.ems/lib/dborm"
"be.ems/lib/log"
"be.ems/src/framework/cron"
)
var NewProcessor = &BarProcessor{
progress: 0,
count: 0,
}
// bar 队列任务处理
type BarProcessor struct {
// 任务进度
progress int
// 执行次数
count int
}
type BarParams struct {
TableName string `json:"tableName"`
Columns string `json:"columns"` // exported column name of time string
Extras string `json:"extras"` // extras condition for where
FileType string `json:"fileType"` // file type: txt/csv
FilePath string `json:"filePath"` // file path
}
func (s *BarProcessor) Execute(data any) (any, error) {
s.count++
options := data.(cron.JobData)
sysJob := options.SysJob
var params BarParams
err := json.Unmarshal([]byte(sysJob.TargetParams), &params)
if err != nil {
log.Error("failed to unmarshal:", err)
return nil, err
}
// mkdir if not exist
if _, err = os.Stat(params.FilePath); os.IsNotExist(err) {
err = os.MkdirAll(params.FilePath, os.ModePerm)
if err != nil {
log.Error("Failed to Mkdir:", err)
return nil, err
}
}
var query string
if params.Extras != "" {
query = fmt.Sprintf("SELECT %s FROM `%s` WHERE %s",
params.Columns, params.TableName, params.Extras)
} else {
query = fmt.Sprintf("SELECT %s FROM `%s`",
params.Columns, params.TableName)
}
log.Trace("query:", query)
var filePath string
if params.FileType == "txt" {
filePath = fmt.Sprintf("%s/%s_export_%s.txt", params.FilePath, params.TableName, time.Now().Format("20060102150405"))
// query = fmt.Sprintf("%s INTO OUTFILE '%s' FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'",
// query, fmt.Sprintf("%s/%s_export_%s.txt", params.FilePath, params.TableName, time.Now().Format("20060102150405")))
} else {
// 默认导出 csv 文件
filePath = fmt.Sprintf("%s/%s_export_%s.csv", params.FilePath, params.TableName, time.Now().Format("20060102150405"))
// query = fmt.Sprintf("%s INTO OUTFILE '%s' FIELDS TERMINATED BY ',' ENCLOSED BY '\"' LINES TERMINATED BY '\n'",
// query, fmt.Sprintf("%s/%s_export_%s.csv", params.FilePath, params.TableName, time.Now().Format("20060102150405")))
}
// filePath := fmt.Sprintf("%s/%s_export_%s.csv", params.FilePath, params.TableName, time.Now().Format("20060102150405"))
affected, err := s.exportData(query, filePath)
if err != nil {
log.Errorf("failed to export data: %v", err)
return nil, err
}
// 返回结果,用于记录执行结果
return map[string]any{
"msg": "sucess",
"filePath": filePath,
"affected": affected,
}, nil
}
func (s *BarProcessor) exportData(query, filePath string) (int64, error) {
rows, err := dborm.XCoreDB().Query(query)
if err != nil {
return 0, err
}
defer rows.Close()
// 创建 文件
file, err := os.Create(filePath)
if err != nil {
return 0, err
}
defer file.Close()
writer := csv.NewWriter(file)
defer writer.Flush()
// 写入表头
columns, _ := rows.ColumnTypes()
header := make([]string, len(columns))
for i, col := range columns {
header[i] = col.Name()
}
if err := writer.Write(header); err != nil {
return 0, err
}
// 写入数据
var affected int64 = 0
for rows.Next() {
values := make([]sql.RawBytes, len(columns))
scanArgs := make([]interface{}, len(columns))
for i := range values {
scanArgs[i] = &values[i]
}
if err := rows.Scan(scanArgs...); err != nil {
return 0, err
}
record := make([]string, len(columns))
for i, val := range values {
if val == nil {
record[i] = ""
} else {
record[i] = string(val)
}
}
affected++
if err := writer.Write(record); err != nil {
return affected, err
}
}
return affected, nil
}

View File

@@ -0,0 +1,225 @@
package exportUEData
import (
"database/sql"
"encoding/csv"
"encoding/json"
"fmt"
"os"
// "path/filepath"
"time"
"be.ems/lib/dborm"
"be.ems/lib/log"
"be.ems/src/framework/cron"
networkdata "be.ems/src/modules/network_data"
)
var NewProcessor = &BarProcessor{
progress: 0,
count: 0,
}
// bar 队列任务处理
type BarProcessor struct {
// 任务进度
progress int
// 执行次数
count int
}
type BarParams struct {
TableName string `json:"tableName"`
Columns string `json:"columns"` // exported column name of time string
Extras string `json:"extras"` // extras condition for where
ServiceName string `json:"serviceName"` // data service name
OrderBy string `json:"orderBy"`
OrderType string `json:"orderType"` // order type: asc/desc
FileType string `json:"fileType"` // file type: txt/csv
FilePath string `json:"filePath"` // file path
}
const (
FILE_TYPE_TXT = "txt"
FILE_TYPE_CSV = "csv"
NE_TYPE_UDM = "UDM"
)
func (s *BarProcessor) Execute(data any) (any, error) {
s.count++
options := data.(cron.JobData)
sysJob := options.SysJob
params := make([]BarParams, 0)
// 解析参数
err := json.Unmarshal([]byte(sysJob.TargetParams), &params)
if err != nil {
log.Error("failed to unmarshal:", err)
return nil, err
}
var results []map[string]any
for _, param := range params {
log.Trace("param:", param)
result, err := s.exportUEData(param)
if err != nil {
log.Error("failed to export data:", err)
return nil, err
}
log.Trace("export result:", result)
results = append(results, result)
}
// 返回结果,用于记录执行结果
// 返回结果,用于记录执行结果
return map[string]any{
"msg": "sucess",
"results": results,
}, nil
}
func (s *BarProcessor) exportUEData(param BarParams) (map[string]any, error) {
// mkdir if not exist
var err error
if _, err = os.Stat(param.FilePath); os.IsNotExist(err) {
err = os.MkdirAll(param.FilePath, os.ModePerm)
if err != nil {
log.Error("Failed to Mkdir:", err)
return nil, err
}
}
// load data from udm-xxx
var neIDs []string
// 1. 获取所有的 ne_id
err = dborm.DefaultDB().Table("ne_info").Where("ne_type=?", NE_TYPE_UDM).Select("ne_id").
Find(&neIDs).Error
if err != nil {
log.Error("failed to get ne_ids:", err)
return nil, err
}
var fps []string
var affectedArr []int64
for _, neID := range neIDs {
// 1. 加载最新数据, 如果数据服务存在,则重新加载数据
dataService, err := networkdata.GetService(param.ServiceName)
if err != nil {
log.Warn("failed to get data service:", err)
} else if dataService != nil {
// 重新加载数据
data := dataService.ResetData(neID)
if data == 0 {
log.Error("failed to load data:", err)
return nil, err
}
log.Trace("load data:", data)
}
// 2. 构造查询语句
var query string
if param.Extras != "" {
query = fmt.Sprintf("SELECT %s FROM `%s` WHERE ne_id = %s and %s",
param.Columns, param.TableName, neID, param.Extras)
} else {
query = fmt.Sprintf("SELECT %s FROM `%s` WHERE ne_id = %s",
param.Columns, param.TableName, neID)
}
if param.OrderBy != "" {
if param.OrderType != "desc" {
query += fmt.Sprintf(" ORDER BY %s desc", param.OrderBy)
} else {
query += fmt.Sprintf(" ORDER BY %s asc", param.OrderBy)
}
}
log.Trace("query:", query)
// 3. 构造文件路径
var filePath string
if param.FileType == FILE_TYPE_TXT {
filePath = fmt.Sprintf("%s/%s_export_%s_%s.txt",
param.FilePath, param.TableName, time.Now().Format("20060102150405"), neID)
} else {
// 默认导出 csv 文件
filePath = fmt.Sprintf("%s/%s_export_%s_%s.csv",
param.FilePath, param.TableName, time.Now().Format("20060102150405"), neID)
}
// 4. 导出数据
affected, err := s.exportData(query, filePath, param.FileType)
if err != nil {
log.Errorf("failed to export data: %v", err)
return nil, err
}
log.Trace("exported data:", affected)
fps = append(fps, filePath)
affectedArr = append(affectedArr, affected)
}
// 5. 返回结果
result := map[string]any{
"msg": "sucess",
"table": param.TableName,
"ne_id": neIDs,
"affected": affectedArr,
}
return result, nil
}
func (s *BarProcessor) exportData(query, filePath string, fileType string) (int64, error) {
rows, err := dborm.XCoreDB().Query(query)
if err != nil {
return 0, err
}
defer rows.Close()
// 创建 文件
file, err := os.Create(filePath)
if err != nil {
return 0, err
}
defer file.Close()
writer := csv.NewWriter(file)
defer writer.Flush()
columns, _ := rows.ColumnTypes()
// 写入表头, txt no header
if fileType != FILE_TYPE_TXT {
header := make([]string, len(columns))
for i, col := range columns {
header[i] = col.Name()
}
if err := writer.Write(header); err != nil {
return 0, err
}
}
// 写入数据
var affected int64 = 0
for rows.Next() {
values := make([]sql.RawBytes, len(columns))
scanArgs := make([]interface{}, len(columns))
for i := range values {
scanArgs[i] = &values[i]
}
if err := rows.Scan(scanArgs...); err != nil {
return 0, err
}
record := make([]string, len(columns))
for i, val := range values {
if val == nil {
record[i] = ""
} else {
record[i] = string(val)
}
}
affected++
if err := writer.Write(record); err != nil {
return affected, err
}
}
return affected, nil
}

View File

@@ -5,7 +5,7 @@ import (
"be.ems/src/modules/crontask/processor/backupEtcFromNE"
"be.ems/src/modules/crontask/processor/delExpiredNeBackup"
"be.ems/src/modules/crontask/processor/deleteExpiredRecord"
"be.ems/src/modules/crontask/processor/exportData"
"be.ems/src/modules/crontask/processor/exportUE"
"be.ems/src/modules/crontask/processor/exportTable"
"be.ems/src/modules/crontask/processor/genNeStateAlarm"
"be.ems/src/modules/crontask/processor/getStateFromNE"
@@ -31,5 +31,5 @@ func InitCronQueue() {
cron.CreateQueue("genNeStateAlarm", genNeStateAlarm.NewProcessor)
cron.CreateQueue("exportTable", exportTable.NewProcessor)
cron.CreateQueue("removeFile", removeFile.NewProcessor)
cron.CreateQueue("exportData", exportData.NewProcessor)
cron.CreateQueue("exportUE", exportUE.NewProcessor)
}

View File

@@ -9,6 +9,7 @@ import (
"be.ems/src/modules/network_data/service"
"github.com/gin-gonic/gin"
"fmt"
)
// 模块路由注册
@@ -323,6 +324,34 @@ func Setup(router *gin.Engine) {
}
}
// ResettableService 接口定义
type ResettableService interface {
ResetData(neID string) int64
}
// 服务注册表
var serviceRegistry = make(map[string]ResettableService)
func RegisterService(name string, service ResettableService) {
serviceRegistry[name] = service
}
// 获取服务
func GetService(name string) (ResettableService, error) {
service, exists := serviceRegistry[name]
if !exists {
return nil, fmt.Errorf("service %s not found", name)
}
return service, nil
}
// 初始化注册表
func init() {
RegisterService("UDMAuthData", service.NewUDMAuthUser)
RegisterService("UDMSubUser", service.NewUDMSubUser)
// 这里注册更多服务
}
// InitLoad 初始参数
func InitLoad() {
// 启动时加载UPF上下行流量