From 6e9765ceaaee5af1de1982cebed60100ea2bc16a Mon Sep 17 00:00:00 2001 From: zhangsz Date: Fri, 11 Jul 2025 17:25:00 +0800 Subject: [PATCH] feat: cbc message update --- features/ue/cb_message/controller.go | 29 ++++- features/ue/cb_message/service.go | 187 ++++++++++++++++++--------- 2 files changed, 151 insertions(+), 65 deletions(-) diff --git a/features/ue/cb_message/controller.go b/features/ue/cb_message/controller.go index 0a6b6258..2044709c 100644 --- a/features/ue/cb_message/controller.go +++ b/features/ue/cb_message/controller.go @@ -31,10 +31,14 @@ func Register(r *gin.RouterGroup) { middleware.PreAuthorize(nil), m.Insert, ) - cbGroup.PUT("/message/:id", + cbGroup.PUT("/message/:id/:status", middleware.PreAuthorize(nil), m.UpdateStatus, ) + cbGroup.PUT("/message/:id", + middleware.PreAuthorize(nil), + m.Update, + ) cbGroup.DELETE("/message/:id", middleware.PreAuthorize(nil), m.Delete, @@ -128,15 +132,28 @@ func (m *CBMessage) Insert(c *gin.Context) { func (m *CBMessage) Update(c *gin.Context) { language := ctx.AcceptLanguage(c) - // 绑定请求体 - var msg CBMessage - if err := c.ShouldBindJSON(&msg); err != nil { + // 获取路径参数 + messageId := c.Param("id") + if messageId == "" { c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) return } + id, err := strconv.ParseInt(messageId, 10, 64) + if err != nil { + c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + return + } + // 直接读取body为json.RawMessage + body, err := io.ReadAll(c.Request.Body) + if err != nil { + c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + return + } + messageJson := json.RawMessage(body) + service := NewCBMessageService() - if err := service.UpdateCBMessage(msg); err != nil { + if err := service.UpdateCBMessage(id, messageJson); err != nil { c.JSON(500, result.ErrMsg(err.Error())) return } @@ -156,7 +173,7 @@ func (m *CBMessage) UpdateStatus(c *gin.Context) { language := ctx.AcceptLanguage(c) neId := c.Param("neId") - status := c.Query("status") + status := c.Param("status") if neId == "" || status == "" { c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) return diff --git a/features/ue/cb_message/service.go b/features/ue/cb_message/service.go index f8ddfbc5..99f4550c 100644 --- a/features/ue/cb_message/service.go +++ b/features/ue/cb_message/service.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/json" "fmt" + "io" "net/http" "time" @@ -175,12 +176,46 @@ func (s *CBMessageService) SelectCBMessageById(id int64) (*CBMessage, error) { // @return error 错误信息 // @example // mfCBMessageService.UpdateCBMessage(msg) -func (s *CBMessageService) UpdateCBMessage(msg CBMessage) error { +func (s *CBMessageService) UpdateCBMessage(id int64, messageJson json.RawMessage) error { + // 解析messageJson获取eventName + var messageData map[string]interface{} + if err := json.Unmarshal(messageJson, &messageData); err != nil { + return fmt.Errorf("failed to parse message_json: %w", err) + } + // 提取eventName + eventName, ok := messageData["eventName"].(string) + if !ok || eventName == "" { + return fmt.Errorf("eventName is required in message_json") + } + + // 检查是否已存在相同的eventName + var msg CBMessage if err := s.db.Table("cb_message"). - Where("id = ?", msg.Id). - Updates(msg).Error; err != nil { + Where("id = ? AND JSON_EXTRACT(message_json, '$.eventName') = ?", id, eventName). + First(&msg).Error; err != nil { + if err == gorm.ErrRecordNotFound { + return fmt.Errorf("CB message with eventName '%s' not found for ID %d", eventName, id) + } + return fmt.Errorf("failed to query CB message: %w", err) + } + + now := time.Now().UnixMicro() + if err := s.db.Table("cb_message"). + Where("id = ?", id). + Updates(map[string]interface{}{ + "message_json": messageJson, + "updated_at": now, + }).Error; err != nil { return fmt.Errorf("failed to update CB message: %w", err) } + + // 如果状态是ACTIVE,发送更新请求 + if msg.Status == CBEventStatusActive { + if err := s.sendUpdateRequest(msg); err != nil { + return fmt.Errorf("failed to send update request: %w", err) + } + } + return nil } @@ -356,53 +391,124 @@ func (s *CBMessageService) handleStatusChange(msg CBMessage, oldStatus, newStatu return nil } -// sendActivateRequest 发送激活请求 (POST) -func (s *CBMessageService) sendActivateRequest(msg CBMessage) error { - // 获取CBC网元信息 (这里需要根据你的实际情况获取IP和端口) - cbcIP, cbcPort, err := s.getCBCNetworkElement(msg.NeId) - if err != nil { - return fmt.Errorf("failed to get CBC network element info: %w", err) +// getCBCNetworkElement 获取CBC网元的IP和端口信息 +// 这个方法需要根据你的实际网元管理系统来实现 +func (s *CBMessageService) getCBCNetworkElement(neId string) (string, int64, error) { + // 查询网元信息 + neInfo := neService.NewNeInfo.SelectNeInfoByNeTypeAndNeID("CBC", neId) + if neInfo.IP == "" { + return "", 0, fmt.Errorf("CBC network element not found for neId: %s", neId) } + return neInfo.IP, neInfo.Port, nil +} - url := fmt.Sprintf("http://%s:%d/api/v1/cbe/message", cbcIP, cbcPort) +// CBCHTTPClient CBC网元HTTP客户端 +type CBCHTTPClient struct { + client *http.Client + baseURL string +} - // 直接发送message_json内容 - req, err := http.NewRequest("POST", url, bytes.NewReader(msg.MessageJson)) +// NewCBCHTTPClient 创建CBC HTTP客户端 +func NewCBCHTTPClient(baseURL string) *CBCHTTPClient { + return &CBCHTTPClient{ + client: &http.Client{ + Timeout: 10 * time.Second, + }, + baseURL: baseURL, + } +} + +// PostMessage 发送POST请求创建消息 +func (c *CBCHTTPClient) PostMessage(messageData []byte) error { + url := fmt.Sprintf("%s/api/v1/cbe/message", c.baseURL) + return c.sendRequest("POST", url, messageData) +} + +// PutMessage 发送PUT请求更新消息 +func (c *CBCHTTPClient) PutMessage(messageData []byte) error { + url := fmt.Sprintf("%s/api/v1/cbe/message", c.baseURL) + return c.sendRequest("PUT", url, messageData) +} + +// DeleteMessage 发送DELETE请求删除消息 +func (c *CBCHTTPClient) DeleteMessage(eventName string, deletePayload []byte) error { + url := fmt.Sprintf("%s/api/v1/cbe/message/%s", c.baseURL, eventName) + return c.sendRequest("DELETE", url, deletePayload) +} + +// sendRequest 发送HTTP请求 +func (c *CBCHTTPClient) sendRequest(method, url string, body []byte) error { + req, err := http.NewRequest(method, url, bytes.NewReader(body)) if err != nil { - return fmt.Errorf("failed to create POST request: %w", err) + return fmt.Errorf("failed to create %s request: %w", method, err) } req.Header.Set("Content-Type", "application/json") - client := &http.Client{Timeout: 10 * time.Second} - resp, err := client.Do(req) + resp, err := c.client.Do(req) if err != nil { - return fmt.Errorf("failed to send POST request: %w", err) + return fmt.Errorf("failed to send %s request: %w", method, err) } defer resp.Body.Close() if resp.StatusCode < 200 || resp.StatusCode >= 300 { - return fmt.Errorf("POST request failed with status: %d", resp.StatusCode) + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("%s request failed with status: %d, body: %s", + method, resp.StatusCode, string(body)) } return nil } -// sendDeactivateRequest 发送停用请求 (DELETE) +// 在CBMessageService中添加方法 +func (s *CBMessageService) getCBCHTTPClient(neId string) (*CBCHTTPClient, error) { + cbcIP, cbcPort, err := s.getCBCNetworkElement(neId) + if err != nil { + return nil, fmt.Errorf("failed to get CBC network element info: %w", err) + } + + baseURL := fmt.Sprintf("http://%s:%d", cbcIP, cbcPort) + return NewCBCHTTPClient(baseURL), nil +} + +// 重构后的激活请求 +func (s *CBMessageService) sendActivateRequest(msg CBMessage) error { + client, err := s.getCBCHTTPClient(msg.NeId) + if err != nil { + return err + } + + return client.PostMessage(msg.MessageJson) +} + +// 重构后的更新请求 +func (s *CBMessageService) sendUpdateRequest(msg CBMessage) error { + client, err := s.getCBCHTTPClient(msg.NeId) + if err != nil { + return err + } + + return client.PutMessage(msg.MessageJson) +} + +// 重构后的停用请求 func (s *CBMessageService) sendDeactivateRequest(msg CBMessage) error { - // 解析message_json获取需要的字段 + client, err := s.getCBCHTTPClient(msg.NeId) + if err != nil { + return err + } + + // 解析和构造删除载荷的逻辑保持不变 var messageData map[string]interface{} if err := json.Unmarshal(msg.MessageJson, &messageData); err != nil { return fmt.Errorf("failed to parse message_json: %w", err) } - // 提取eventName (从messageJson中获取,如果没有eventName字段,可能需要从其他地方获取) eventName, ok := messageData["eventName"].(string) if !ok || eventName == "" { return fmt.Errorf("eventName not found in message_json") } - // 构造删除请求的payload,只包含messageIdProfile和warningAreaList deletePayload := make(map[string]interface{}) if messageIdProfile, exists := messageData["messageIdProfile"]; exists { deletePayload["messageIdProfile"] = messageIdProfile @@ -416,42 +522,5 @@ func (s *CBMessageService) sendDeactivateRequest(msg CBMessage) error { return fmt.Errorf("failed to marshal delete payload: %w", err) } - // 获取CBC网元信息 - cbcIP, cbcPort, err := s.getCBCNetworkElement(msg.NeId) - if err != nil { - return fmt.Errorf("failed to get CBC network element info: %w", err) - } - - url := fmt.Sprintf("http://%s:%d/api/v1/cbe/message/%s", cbcIP, cbcPort, eventName) - - req, err := http.NewRequest("DELETE", url, bytes.NewReader(payloadBytes)) - if err != nil { - return fmt.Errorf("failed to create DELETE request: %w", err) - } - - req.Header.Set("Content-Type", "application/json") - - client := &http.Client{Timeout: 10 * time.Second} - resp, err := client.Do(req) - if err != nil { - return fmt.Errorf("failed to send DELETE request: %w", err) - } - defer resp.Body.Close() - - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - return fmt.Errorf("DELETE request failed with status: %d", resp.StatusCode) - } - - return nil -} - -// getCBCNetworkElement 获取CBC网元的IP和端口信息 -// 这个方法需要根据你的实际网元管理系统来实现 -func (s *CBMessageService) getCBCNetworkElement(neId string) (string, int64, error) { - // 查询网元信息 - neInfo := neService.NewNeInfo.SelectNeInfoByNeTypeAndNeID("CBC", neId) - if neInfo.IP == "" { - return "", 0, fmt.Errorf("CBC network element not found for neId: %s", neId) - } - return neInfo.IP, neInfo.Port, nil + return client.DeleteMessage(eventName, payloadBytes) }