feat: support rest data return channel result

This commit is contained in:
zhangsz
2025-04-10 15:46:09 +08:00
parent a1eaaaebd7
commit a00a7da016
7 changed files with 88 additions and 74 deletions

View File

@@ -103,24 +103,18 @@ func (s *BarProcessor) exportUEData(param BarParams) (map[string]any, error) {
var affectedArr []int64
for _, neID := range neIDs {
// 1. 加载最新数据, 如果数据服务存在,则重新加载数据
result := ResetDataWithResult(param.ServiceName, neID)
log.Trace("reset data count=", result.Count)
// 后续可以等待异步操作返回结果
if err := <-result.Err; err != nil {
log.Error("failed to reset data: ", err)
dataService, err := GetService(param.ServiceName)
if err != nil {
log.Warn("failed to get data service:", err)
} else if dataService != nil {
// 重新加载数据
resultChan := dataService.ResetDataWithResult(neID)
affected := <-resultChan // Receive value from the channel
if affected == 0 {
return nil, fmt.Errorf("reload data failed, affected=0")
}
log.Trace("load data affected=", affected)
}
// dataService, err := GetService(param.ServiceName)
// if err != nil {
// log.Warn("failed to get data service:", err)
// } else if dataService != nil {
// // 重新加载数据
// data := dataService.ResetData(neID)
// if data == 0 {
// log.Error("failed to load data:", err)
// return nil, err
// }
// log.Trace("load data:", data)
// }
// 2. 构造查询语句
var query string
@@ -231,35 +225,10 @@ func (s *BarProcessor) exportData(query, filePath string, fileType string) (int6
return affected, nil
}
type ResetDataResult struct {
Count int64 // 同步返回的数据长度
Err <-chan error // 异步返回 ClearAndInsert 的执行结果
}
// ResetDataWithResult 重置鉴权用户数据清空数据库重新同步Redis数据
// 立即返回数据量,同时通过 channel 返回 ClearAndInsert 的执行结果
func ResetDataWithResult(serivceName, neID string) ResetDataResult {
dataService, err := GetService(serivceName)
if err != nil {
log.Warn("failed to get data service:", err)
}
authArr := dataService.dataByRedis("*", neID)
resultCh := make(chan error, 1)
go func() {
resultCh <- dataService.ClearAndInsert(neID, authArr)
}()
return ResetDataResult{
Count: int64(len(authArr)),
Err: resultCh,
}
}
// ResettableService 接口定义
type ResettableService interface {
dataByRedis(key, neID string) []map[string]string // 从Redis获取数据
ClearAndInsert(neID string, authArr []map[string]string) error // 清空数据库并插入数据
//ResetData(neID string) int64
ResetData(neID string) int64
ResetDataWithResult(neID string) chan int64
}
// 服务注册表

View File

@@ -200,3 +200,14 @@ func (r *UDMAuthUser) ParseCommandParams(item model.UDMAuthUser) string {
}
return strings.Join(conditions, ",")
}
// ResetDataWithResult 重置鉴权用户数据清空数据库重新同步Redis数据
// 通过 channel 返回 ClearAndInsert 的执行结果
func (r *UDMAuthUser) ResetDataWithResult(neId string) chan int64 {
arr := r.dataByRedis("*", neId)
resultCh := make(chan int64, 1)
go func() {
resultCh <- r.udmAuthRepository.ClearAndInsert(neId, arr)
}()
return resultCh
}

View File

@@ -362,3 +362,14 @@ func (r *UDMSubUser) ParseCommandParams(item model.UDMSubUser) string {
conditions = append(conditions, fmt.Sprintf("cag=%s", item.Cag))
return strings.Join(conditions, ",")
}
// ResetDataWithResult 重置鉴权用户数据清空数据库重新同步Redis数据
// 通过 channel 返回 ClearAndInsert 的执行结果
func (r *UDMSubUser) ResetDataWithResult(neId string) chan int64 {
arr := r.dataByRedis("*", neId)
resultCh := make(chan int64, 1)
go func() {
resultCh <- r.udmSubRepository.ClearAndInsert(neId, arr)
}()
return resultCh
}