fix: file export to reset data

This commit is contained in:
simonzhangsz
2025-04-10 10:37:40 +08:00
parent 6329d638a4
commit a1eaaaebd7

View File

@@ -10,11 +10,11 @@ import (
// "path/filepath" // "path/filepath"
"time" "time"
ueService "be.ems/features/ue/service"
"be.ems/lib/dborm" "be.ems/lib/dborm"
"be.ems/lib/log" "be.ems/lib/log"
"be.ems/src/framework/cron" "be.ems/src/framework/cron"
ueService "be.ems/features/ue/service" neService "be.ems/src/modules/network_data/service"
neService "be.ems/src/modules/network_data/service"
) )
var NewProcessor = &BarProcessor{ var NewProcessor = &BarProcessor{
@@ -31,20 +31,20 @@ type BarProcessor struct {
} }
type BarParams struct { type BarParams struct {
TableName string `json:"tableName"` TableName string `json:"tableName"`
Columns string `json:"columns"` // exported column name of time string Columns string `json:"columns"` // exported column name of time string
Extras string `json:"extras"` // extras condition for where Extras string `json:"extras"` // extras condition for where
ServiceName string `json:"serviceName"` // data service name ServiceName string `json:"serviceName"` // data service name
OrderBy string `json:"orderBy"` OrderBy string `json:"orderBy"`
OrderType string `json:"orderType"` // order type: asc/desc OrderType string `json:"orderType"` // order type: asc/desc
FileType string `json:"fileType"` // file type: txt/csv FileType string `json:"fileType"` // file type: txt/csv
FilePath string `json:"filePath"` // file path FilePath string `json:"filePath"` // file path
} }
const ( const (
FILE_TYPE_TXT = "txt" FILE_TYPE_TXT = "txt"
FILE_TYPE_CSV = "csv" FILE_TYPE_CSV = "csv"
NE_TYPE_UDM = "UDM" NE_TYPE_UDM = "UDM"
) )
func (s *BarProcessor) Execute(data any) (any, error) { func (s *BarProcessor) Execute(data any) (any, error) {
@@ -75,7 +75,7 @@ func (s *BarProcessor) Execute(data any) (any, error) {
// 返回结果,用于记录执行结果 // 返回结果,用于记录执行结果
return map[string]any{ return map[string]any{
"msg": "sucess", "msg": "sucess",
"results": results, "results": results,
}, nil }, nil
} }
@@ -97,25 +97,30 @@ func (s *BarProcessor) exportUEData(param BarParams) (map[string]any, error) {
Find(&neIDs).Error Find(&neIDs).Error
if err != nil { if err != nil {
log.Error("failed to get ne_ids:", err) log.Error("failed to get ne_ids:", err)
return nil, err return nil, err
} }
var fps []string var fps []string
var affectedArr []int64 var affectedArr []int64
for _, neID := range neIDs { for _, neID := range neIDs {
log.Trace("ne_id:", neID)
// 1. 加载最新数据, 如果数据服务存在,则重新加载数据 // 1. 加载最新数据, 如果数据服务存在,则重新加载数据
dataService, err := GetService(param.ServiceName) result := ResetDataWithResult(param.ServiceName, neID)
if err != nil { log.Trace("reset data count=", result.Count)
log.Warn("failed to get data service:", err) // 后续可以等待异步操作返回结果
} else if dataService != nil { if err := <-result.Err; err != nil {
// 重新加载数据 log.Error("failed to reset data: ", err)
data := dataService.ResetData(neID)
if data == 0 {
log.Error("failed to load data:", err)
return nil, err
}
log.Trace("load data:", data)
} }
// dataService, err := GetService(param.ServiceName)
// if err != nil {
// log.Warn("failed to get data service:", err)
// } else if dataService != nil {
// // 重新加载数据
// data := dataService.ResetData(neID)
// if data == 0 {
// log.Error("failed to load data:", err)
// return nil, err
// }
// log.Trace("load data:", data)
// }
// 2. 构造查询语句 // 2. 构造查询语句
var query string var query string
@@ -138,11 +143,11 @@ func (s *BarProcessor) exportUEData(param BarParams) (map[string]any, error) {
// 3. 构造文件路径 // 3. 构造文件路径
var filePath string var filePath string
if param.FileType == FILE_TYPE_TXT { if param.FileType == FILE_TYPE_TXT {
filePath = fmt.Sprintf("%s/%s_export_%s_%s.txt", filePath = fmt.Sprintf("%s/%s_export_%s_%s.txt",
param.FilePath, param.TableName, time.Now().Format("20060102150405"), neID) param.FilePath, param.TableName, time.Now().Format("20060102150405"), neID)
} else { } else {
// 默认导出 csv 文件 // 默认导出 csv 文件
filePath = fmt.Sprintf("%s/%s_export_%s_%s.csv", filePath = fmt.Sprintf("%s/%s_export_%s_%s.csv",
param.FilePath, param.TableName, time.Now().Format("20060102150405"), neID) param.FilePath, param.TableName, time.Now().Format("20060102150405"), neID)
} }
@@ -159,12 +164,12 @@ func (s *BarProcessor) exportUEData(param BarParams) (map[string]any, error) {
// 5. 返回结果 // 5. 返回结果
result := map[string]any{ result := map[string]any{
"msg": "sucess", "msg": "sucess",
"table": param.TableName, "table": param.TableName,
"ne_id": neIDs, "ne_id": neIDs,
"affected": affectedArr, "affected": affectedArr,
} }
return result, nil return result, nil
} }
func (s *BarProcessor) exportData(query, filePath string, fileType string) (int64, error) { func (s *BarProcessor) exportData(query, filePath string, fileType string) (int64, error) {
rows, err := dborm.XCoreDB().Query(query) rows, err := dborm.XCoreDB().Query(query)
@@ -226,24 +231,51 @@ func (s *BarProcessor) exportData(query, filePath string, fileType string) (int6
return affected, nil return affected, nil
} }
type ResetDataResult struct {
Count int64 // 同步返回的数据长度
Err <-chan error // 异步返回 ClearAndInsert 的执行结果
}
// ResetDataWithResult 重置鉴权用户数据清空数据库重新同步Redis数据
// 立即返回数据量,同时通过 channel 返回 ClearAndInsert 的执行结果
func ResetDataWithResult(serivceName, neID string) ResetDataResult {
dataService, err := GetService(serivceName)
if err != nil {
log.Warn("failed to get data service:", err)
}
authArr := dataService.dataByRedis("*", neID)
resultCh := make(chan error, 1)
go func() {
resultCh <- dataService.ClearAndInsert(neID, authArr)
}()
return ResetDataResult{
Count: int64(len(authArr)),
Err: resultCh,
}
}
// ResettableService 接口定义 // ResettableService 接口定义
type ResettableService interface { type ResettableService interface {
ResetData(neID string) int64 dataByRedis(key, neID string) []map[string]string // 从Redis获取数据
ClearAndInsert(neID string, authArr []map[string]string) error // 清空数据库并插入数据
//ResetData(neID string) int64
} }
// 服务注册表 // 服务注册表
var serviceRegistry = make(map[string]ResettableService) var serviceRegistry = make(map[string]ResettableService)
func RegisterService(name string, service ResettableService) { func RegisterService(name string, service ResettableService) {
serviceRegistry[name] = service serviceRegistry[name] = service
} }
// 获取服务 // 获取服务
func GetService(name string) (ResettableService, error) { func GetService(name string) (ResettableService, error) {
service, exists := serviceRegistry[name] service, exists := serviceRegistry[name]
if !exists { if !exists {
return nil, fmt.Errorf("service %s not found", name) return nil, fmt.Errorf("service %s not found", name)
} }
return service, nil return service, nil
} }
// 初始化注册表 // 初始化注册表
@@ -253,5 +285,5 @@ func init() {
RegisterService("UDMVoIPAuth", ueService.NewVoIPAuthService) RegisterService("UDMVoIPAuth", ueService.NewVoIPAuthService)
RegisterService("UDMIMSUser", ueService.NewIMSUserService) RegisterService("UDMIMSUser", ueService.NewIMSUserService)
// 这里注册更多服务 // 这里注册更多服务
} }