feat: redis读取hgetall数据批量读取返回
This commit is contained in:
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"be.ems/src/framework/config"
|
||||
@@ -179,31 +180,22 @@ func GetExpire(source string, key string) (float64, error) {
|
||||
}
|
||||
|
||||
// 获得缓存数据的key列表
|
||||
func GetKeys(source string, pattern string) ([]string, error) {
|
||||
func GetKeys(source string, match string) ([]string, error) {
|
||||
// 数据源
|
||||
rdb := DefaultRDB()
|
||||
if source != "" {
|
||||
rdb = RDB(source)
|
||||
}
|
||||
|
||||
// 初始化变量
|
||||
var keys []string
|
||||
var cursor uint64 = 0
|
||||
keys := make([]string, 0)
|
||||
ctx := context.Background()
|
||||
// 循环遍历获取匹配的键
|
||||
for {
|
||||
// 使用 SCAN 命令获取匹配的键
|
||||
batchKeys, nextCursor, err := rdb.Scan(ctx, cursor, pattern, 1000).Result()
|
||||
if err != nil {
|
||||
logger.Errorf("Failed to scan keys: %v", err)
|
||||
return keys, err
|
||||
}
|
||||
cursor = nextCursor
|
||||
keys = append(keys, batchKeys...)
|
||||
// 当 cursor 为 0,表示遍历完成
|
||||
if cursor == 0 {
|
||||
break
|
||||
}
|
||||
iter := rdb.Scan(ctx, 0, match, 1000).Iterator()
|
||||
if err := iter.Err(); err != nil {
|
||||
logger.Errorf("Failed to scan keys: %v", err)
|
||||
return keys, err
|
||||
}
|
||||
for iter.Next(ctx) {
|
||||
keys = append(keys, iter.Val())
|
||||
}
|
||||
return keys, nil
|
||||
}
|
||||
@@ -261,6 +253,83 @@ func GetHash(source, key string) (map[string]string, error) {
|
||||
return value, nil
|
||||
}
|
||||
|
||||
// 批量获得缓存数据 [key]result
|
||||
func GetHashBatch(source string, keys []string) (map[string]map[string]string, error) {
|
||||
result := make(map[string]map[string]string, 0)
|
||||
if len(keys) == 0 {
|
||||
return result, fmt.Errorf("not keys")
|
||||
}
|
||||
|
||||
// 数据源
|
||||
rdb := DefaultRDB()
|
||||
if source != "" {
|
||||
rdb = RDB(source)
|
||||
}
|
||||
ctx := context.Background()
|
||||
|
||||
// 创建一个有限的并发控制信号通道
|
||||
sem := make(chan struct{}, 10)
|
||||
var wg sync.WaitGroup
|
||||
var mt sync.Mutex
|
||||
batchSize := 1000
|
||||
total := len(keys)
|
||||
if total < batchSize {
|
||||
batchSize = total
|
||||
}
|
||||
|
||||
for i := 0; i < total; i += batchSize {
|
||||
wg.Add(1)
|
||||
go func(start int) {
|
||||
// 并发控制,限制同时执行的 Goroutine 数量
|
||||
sem <- struct{}{}
|
||||
defer func() {
|
||||
wg.Done()
|
||||
<-sem
|
||||
}()
|
||||
|
||||
pipe := rdb.Pipeline()
|
||||
for _, key := range keys[start : start+batchSize] {
|
||||
pipe.HGetAll(ctx, key)
|
||||
}
|
||||
|
||||
cmds, err := pipe.Exec(ctx)
|
||||
if err != nil {
|
||||
logger.Errorf("Failed to get hash batch exec err: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// 将结果添加到 result map 并发访问
|
||||
mt.Lock()
|
||||
defer mt.Unlock()
|
||||
|
||||
// 处理命令结果
|
||||
for _, cmd := range cmds {
|
||||
if cmd.Err() != nil {
|
||||
logger.Errorf("Failed to get hash batch cmds err: %v", cmd.Err())
|
||||
continue
|
||||
}
|
||||
// 将结果转换为 *redis.StringStringMapCmd 类型
|
||||
rcmd, ok := cmd.(*redis.MapStringStringCmd)
|
||||
if !ok {
|
||||
logger.Errorf("Failed to get hash batch type err: %v", cmd.Err())
|
||||
continue
|
||||
}
|
||||
|
||||
key := "-"
|
||||
args := rcmd.Args()
|
||||
if len(args) > 0 {
|
||||
key = fmt.Sprint(args[1])
|
||||
}
|
||||
|
||||
result[key] = rcmd.Val()
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// 判断是否存在
|
||||
func Has(source string, keys ...string) (bool, error) {
|
||||
// 数据源
|
||||
|
||||
@@ -43,14 +43,18 @@ func (r *UDMAuthUser) dataByRedis(imsi, neId string) []model.UDMAuthUser {
|
||||
if err != nil {
|
||||
return arr
|
||||
}
|
||||
for _, key := range ausfArr {
|
||||
m, err := redis.GetHash(source, key)
|
||||
if err != nil {
|
||||
mkv, err := redis.GetHashBatch(source, ausfArr)
|
||||
if err != nil {
|
||||
return arr
|
||||
}
|
||||
|
||||
for k, m := range mkv {
|
||||
if k == "-" {
|
||||
continue
|
||||
}
|
||||
|
||||
// 跳过-号数据 ausf:360000100000130
|
||||
imsi := key[5:]
|
||||
imsi := k[5:]
|
||||
if strings.Contains(imsi, "-") {
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -44,14 +44,24 @@ func (r *UDMSubUser) dataByRedis(imsi, neId string) []model.UDMSubUser {
|
||||
if err != nil {
|
||||
return arr
|
||||
}
|
||||
for _, key := range udmsdArr {
|
||||
m, err := redis.GetHash(source, key)
|
||||
if err != nil {
|
||||
mkv, err := redis.GetHashBatch(source, udmsdArr)
|
||||
if err != nil {
|
||||
return arr
|
||||
}
|
||||
|
||||
for k, m := range mkv {
|
||||
if k == "-" {
|
||||
continue
|
||||
}
|
||||
|
||||
// 跳过-号数据 udm-sd:360000100000130
|
||||
imsi := k[7:]
|
||||
if strings.Contains(imsi, "-") {
|
||||
continue
|
||||
}
|
||||
|
||||
a := model.UDMSubUser{
|
||||
IMSI: key[7:], // udm-sd:360000100000130
|
||||
IMSI: imsi, // udm-sd:360000100000130
|
||||
MSISDN: m["gpsi"], // 8612300000130
|
||||
NeId: neId,
|
||||
SmfSel: m["smf-sel"], // def_snssai
|
||||
|
||||
Reference in New Issue
Block a user