feat: cbc message update

This commit is contained in:
zhangsz
2025-07-11 17:25:00 +08:00
parent 130f0a5ac7
commit 6e9765ceaa
2 changed files with 151 additions and 65 deletions

View File

@@ -31,10 +31,14 @@ func Register(r *gin.RouterGroup) {
middleware.PreAuthorize(nil),
m.Insert,
)
cbGroup.PUT("/message/:id",
cbGroup.PUT("/message/:id/:status",
middleware.PreAuthorize(nil),
m.UpdateStatus,
)
cbGroup.PUT("/message/:id",
middleware.PreAuthorize(nil),
m.Update,
)
cbGroup.DELETE("/message/:id",
middleware.PreAuthorize(nil),
m.Delete,
@@ -128,15 +132,28 @@ func (m *CBMessage) Insert(c *gin.Context) {
func (m *CBMessage) Update(c *gin.Context) {
language := ctx.AcceptLanguage(c)
// 绑定请求体
var msg CBMessage
if err := c.ShouldBindJSON(&msg); err != nil {
// 获取路径参数
messageId := c.Param("id")
if messageId == "" {
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
return
}
id, err := strconv.ParseInt(messageId, 10, 64)
if err != nil {
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
return
}
// 直接读取body为json.RawMessage
body, err := io.ReadAll(c.Request.Body)
if err != nil {
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
return
}
messageJson := json.RawMessage(body)
service := NewCBMessageService()
if err := service.UpdateCBMessage(msg); err != nil {
if err := service.UpdateCBMessage(id, messageJson); err != nil {
c.JSON(500, result.ErrMsg(err.Error()))
return
}
@@ -156,7 +173,7 @@ func (m *CBMessage) UpdateStatus(c *gin.Context) {
language := ctx.AcceptLanguage(c)
neId := c.Param("neId")
status := c.Query("status")
status := c.Param("status")
if neId == "" || status == "" {
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
return

View File

@@ -4,6 +4,7 @@ import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
@@ -175,12 +176,46 @@ func (s *CBMessageService) SelectCBMessageById(id int64) (*CBMessage, error) {
// @return error 错误信息
// @example
// mfCBMessageService.UpdateCBMessage(msg)
func (s *CBMessageService) UpdateCBMessage(msg CBMessage) error {
func (s *CBMessageService) UpdateCBMessage(id int64, messageJson json.RawMessage) error {
// 解析messageJson获取eventName
var messageData map[string]interface{}
if err := json.Unmarshal(messageJson, &messageData); err != nil {
return fmt.Errorf("failed to parse message_json: %w", err)
}
// 提取eventName
eventName, ok := messageData["eventName"].(string)
if !ok || eventName == "" {
return fmt.Errorf("eventName is required in message_json")
}
// 检查是否已存在相同的eventName
var msg CBMessage
if err := s.db.Table("cb_message").
Where("id = ?", msg.Id).
Updates(msg).Error; err != nil {
Where("id = ? AND JSON_EXTRACT(message_json, '$.eventName') = ?", id, eventName).
First(&msg).Error; err != nil {
if err == gorm.ErrRecordNotFound {
return fmt.Errorf("CB message with eventName '%s' not found for ID %d", eventName, id)
}
return fmt.Errorf("failed to query CB message: %w", err)
}
now := time.Now().UnixMicro()
if err := s.db.Table("cb_message").
Where("id = ?", id).
Updates(map[string]interface{}{
"message_json": messageJson,
"updated_at": now,
}).Error; err != nil {
return fmt.Errorf("failed to update CB message: %w", err)
}
// 如果状态是ACTIVE发送更新请求
if msg.Status == CBEventStatusActive {
if err := s.sendUpdateRequest(msg); err != nil {
return fmt.Errorf("failed to send update request: %w", err)
}
}
return nil
}
@@ -356,53 +391,124 @@ func (s *CBMessageService) handleStatusChange(msg CBMessage, oldStatus, newStatu
return nil
}
// sendActivateRequest 发送激活请求 (POST)
func (s *CBMessageService) sendActivateRequest(msg CBMessage) error {
// 获取CBC网元信息 (这里需要根据你的实际情况获取IP和端口)
cbcIP, cbcPort, err := s.getCBCNetworkElement(msg.NeId)
if err != nil {
return fmt.Errorf("failed to get CBC network element info: %w", err)
// getCBCNetworkElement 获取CBC网元的IP和端口信息
// 这个方法需要根据你的实际网元管理系统来实现
func (s *CBMessageService) getCBCNetworkElement(neId string) (string, int64, error) {
// 查询网元信息
neInfo := neService.NewNeInfo.SelectNeInfoByNeTypeAndNeID("CBC", neId)
if neInfo.IP == "" {
return "", 0, fmt.Errorf("CBC network element not found for neId: %s", neId)
}
return neInfo.IP, neInfo.Port, nil
}
url := fmt.Sprintf("http://%s:%d/api/v1/cbe/message", cbcIP, cbcPort)
// CBCHTTPClient CBC网元HTTP客户端
type CBCHTTPClient struct {
client *http.Client
baseURL string
}
// 直接发送message_json内容
req, err := http.NewRequest("POST", url, bytes.NewReader(msg.MessageJson))
// NewCBCHTTPClient 创建CBC HTTP客户端
func NewCBCHTTPClient(baseURL string) *CBCHTTPClient {
return &CBCHTTPClient{
client: &http.Client{
Timeout: 10 * time.Second,
},
baseURL: baseURL,
}
}
// PostMessage 发送POST请求创建消息
func (c *CBCHTTPClient) PostMessage(messageData []byte) error {
url := fmt.Sprintf("%s/api/v1/cbe/message", c.baseURL)
return c.sendRequest("POST", url, messageData)
}
// PutMessage 发送PUT请求更新消息
func (c *CBCHTTPClient) PutMessage(messageData []byte) error {
url := fmt.Sprintf("%s/api/v1/cbe/message", c.baseURL)
return c.sendRequest("PUT", url, messageData)
}
// DeleteMessage 发送DELETE请求删除消息
func (c *CBCHTTPClient) DeleteMessage(eventName string, deletePayload []byte) error {
url := fmt.Sprintf("%s/api/v1/cbe/message/%s", c.baseURL, eventName)
return c.sendRequest("DELETE", url, deletePayload)
}
// sendRequest 发送HTTP请求
func (c *CBCHTTPClient) sendRequest(method, url string, body []byte) error {
req, err := http.NewRequest(method, url, bytes.NewReader(body))
if err != nil {
return fmt.Errorf("failed to create POST request: %w", err)
return fmt.Errorf("failed to create %s request: %w", method, err)
}
req.Header.Set("Content-Type", "application/json")
client := &http.Client{Timeout: 10 * time.Second}
resp, err := client.Do(req)
resp, err := c.client.Do(req)
if err != nil {
return fmt.Errorf("failed to send POST request: %w", err)
return fmt.Errorf("failed to send %s request: %w", method, err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return fmt.Errorf("POST request failed with status: %d", resp.StatusCode)
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("%s request failed with status: %d, body: %s",
method, resp.StatusCode, string(body))
}
return nil
}
// sendDeactivateRequest 发送停用请求 (DELETE)
// 在CBMessageService中添加方法
func (s *CBMessageService) getCBCHTTPClient(neId string) (*CBCHTTPClient, error) {
cbcIP, cbcPort, err := s.getCBCNetworkElement(neId)
if err != nil {
return nil, fmt.Errorf("failed to get CBC network element info: %w", err)
}
baseURL := fmt.Sprintf("http://%s:%d", cbcIP, cbcPort)
return NewCBCHTTPClient(baseURL), nil
}
// 重构后的激活请求
func (s *CBMessageService) sendActivateRequest(msg CBMessage) error {
client, err := s.getCBCHTTPClient(msg.NeId)
if err != nil {
return err
}
return client.PostMessage(msg.MessageJson)
}
// 重构后的更新请求
func (s *CBMessageService) sendUpdateRequest(msg CBMessage) error {
client, err := s.getCBCHTTPClient(msg.NeId)
if err != nil {
return err
}
return client.PutMessage(msg.MessageJson)
}
// 重构后的停用请求
func (s *CBMessageService) sendDeactivateRequest(msg CBMessage) error {
// 解析message_json获取需要的字段
client, err := s.getCBCHTTPClient(msg.NeId)
if err != nil {
return err
}
// 解析和构造删除载荷的逻辑保持不变
var messageData map[string]interface{}
if err := json.Unmarshal(msg.MessageJson, &messageData); err != nil {
return fmt.Errorf("failed to parse message_json: %w", err)
}
// 提取eventName (从messageJson中获取如果没有eventName字段可能需要从其他地方获取)
eventName, ok := messageData["eventName"].(string)
if !ok || eventName == "" {
return fmt.Errorf("eventName not found in message_json")
}
// 构造删除请求的payload只包含messageIdProfile和warningAreaList
deletePayload := make(map[string]interface{})
if messageIdProfile, exists := messageData["messageIdProfile"]; exists {
deletePayload["messageIdProfile"] = messageIdProfile
@@ -416,42 +522,5 @@ func (s *CBMessageService) sendDeactivateRequest(msg CBMessage) error {
return fmt.Errorf("failed to marshal delete payload: %w", err)
}
// 获取CBC网元信息
cbcIP, cbcPort, err := s.getCBCNetworkElement(msg.NeId)
if err != nil {
return fmt.Errorf("failed to get CBC network element info: %w", err)
}
url := fmt.Sprintf("http://%s:%d/api/v1/cbe/message/%s", cbcIP, cbcPort, eventName)
req, err := http.NewRequest("DELETE", url, bytes.NewReader(payloadBytes))
if err != nil {
return fmt.Errorf("failed to create DELETE request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
client := &http.Client{Timeout: 10 * time.Second}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("failed to send DELETE request: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return fmt.Errorf("DELETE request failed with status: %d", resp.StatusCode)
}
return nil
}
// getCBCNetworkElement 获取CBC网元的IP和端口信息
// 这个方法需要根据你的实际网元管理系统来实现
func (s *CBMessageService) getCBCNetworkElement(neId string) (string, int64, error) {
// 查询网元信息
neInfo := neService.NewNeInfo.SelectNeInfoByNeTypeAndNeID("CBC", neId)
if neInfo.IP == "" {
return "", 0, fmt.Errorf("CBC network element not found for neId: %s", neId)
}
return neInfo.IP, neInfo.Port, nil
return client.DeleteMessage(eventName, payloadBytes)
}