package redisqueue import ( "context" "encoding/json" "be.ems/lib/log" redisdb "be.ems/src/framework/database/redis" "github.com/redis/go-redis/v9" ) const ( // MaxMessages is the maximum number of messages to retain in each stream // This is set to 2000 as per the original code // It ensures that only the latest 2000 messages are kept in the stream // This helps in managing memory and performance by not allowing the stream to grow indefinitely // If you need to change this, make sure to update it in all relevant places // across the codebase to maintain consistency // and avoid unexpected behavior. // Note: This value should be consistent with the XTRIM command used in the code // to trim the stream. // If you change this value, also update the XTRIM command accordingly. maxMessages = 2000 // Maximum number of messages to retain in each stream ) // 写入 alarm_relation 消息 func AddAlarmRelationQueue(ids []string) error { payload := map[string][]string{"ids": ids} payloadBytes, err := json.Marshal(payload) if err != nil { return err } values := map[string]interface{}{ "payload": string(payloadBytes), } _, err = client.XAdd(ctx, &redis.XAddArgs{ Stream: "alarm_relation", Values: values, }).Result() // 只保留最新2000条消息 client.XTrimMaxLen(context.Background(), "alarm_relation", maxMessages).Result() return err } // 写入 alarm 消息 func AddAlarmQueue(id string) error { payload := map[string]string{"id": id} payloadBytes, err := json.Marshal(payload) if err != nil { return err } values := map[string]interface{}{ "payload": string(payloadBytes), } _, err = client.XAdd(ctx, &redis.XAddArgs{ Stream: "alarm", Values: values, }).Result() // 只保留最新2000条消息 client.XTrimMaxLen(context.Background(), "alarm", maxMessages).Result() return err } // 写入 nbi_cm 消息 func AddNbiCMQueue(ids []string) error { payload := map[string][]string{"ids": ids} payloadBytes, err := json.Marshal(payload) if err != nil { return err } values := map[string]interface{}{ "payload": string(payloadBytes), } _, err = client.XAdd(ctx, &redis.XAddArgs{ Stream: "nbi_cm", Values: values, }).Result() // 只保留最新2000条消息 client.XTrimMaxLen(context.Background(), "nbi_cm", maxMessages).Result() return err } // 写入 kpi_report_xxx 消息 func AddNbiKpiQueue(neType, id string) error { payload := map[string]string{"neType": neType, "id": id} payloadBytes, err := json.Marshal(payload) if err != nil { return err } values := map[string]interface{}{ "payload": string(payloadBytes), } _, err = client.XAdd(ctx, &redis.XAddArgs{ Stream: "nbi_kpi", Values: values, }).Result() // 只保留最新2000条消息 client.XTrimMaxLen(context.Background(), "nbi_kpi", maxMessages).Result() return err } // 写入 nbi_pm 消息 func AddNbiPMQueueOrg(neType, id string) error { payload := map[string]string{"neType": neType, "id": id} payloadBytes, err := json.Marshal(payload) if err != nil { return err } values := map[string]interface{}{ "payload": string(payloadBytes), } _, err = client.XAdd(ctx, &redis.XAddArgs{ Stream: "nbi_pm", Values: values, }).Result() // 只保留最新2000条消息 client.XTrimMaxLen(context.Background(), "nbi_pm", maxMessages).Result() return err } // 写入 nbi_pm 消息 func AddNbiPMQueue(id string) error { payload := map[string]string{"id": id} payloadBytes, err := json.Marshal(payload) if err != nil { return err } values := map[string]interface{}{ "payload": string(payloadBytes), } _, err = client.XAdd(ctx, &redis.XAddArgs{ Stream: "nbi_pm", Values: values, }).Result() // 只保留最新2000条消息 client.XTrimMaxLen(context.Background(), "nbi_pm", maxMessages).Result() return err } // 读取并打印指定 stream 的最新消息 func ReadLatest(stream string) error { res, err := client.XRevRangeN(ctx, stream, "+", "-", 1).Result() if err != nil { return err } for _, msg := range res { log.Tracef("Stream: %s, ID: %s, Values: %v\n", stream, msg.ID, msg.Values) } return nil } var ctx = context.Background() var client *redis.Client func InitRedisQueue() { // var cancel context.CancelFunc // ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) // defer cancel() client = redisdb.RDB("") } func CloseRedisQueue() { if err := client.Close(); err != nil { log.Errorf("redis db close: %s", err) } }