feat: support CBC broadcast

This commit is contained in:
zhangsz
2025-08-01 14:50:55 +08:00
parent 2c1e55bd0c
commit 1b4cb6399e
16 changed files with 1348 additions and 0 deletions

View File

@@ -0,0 +1,453 @@
package service
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"strconv"
"time"
"be.ems/src/framework/i18n"
"be.ems/src/framework/utils/file"
"be.ems/src/modules/network_data/model"
"be.ems/src/modules/network_data/repository"
neService "be.ems/src/modules/network_element/service"
"gorm.io/gorm"
)
// 实例化数据层 CBCMessage 结构体
var NewCBCMessage = &CBCMessage{
cbcMessageRepository: repository.NewCBCMessage,
}
// CBCMessage CB消息 服务层处理
type CBCMessage struct {
cbcMessageRepository *repository.CBCMessage // UE会话事件数据信息
}
// SelectByPage 根据条件分页查询CB消息
func (s *CBCMessage) SelectByPage(query model.CBCMessageQuery) ([]model.CBCMessage, int, error) {
return s.cbcMessageRepository.SelectByPage(query)
}
// SelectCBCMessageById 根据工单ID查询CB消息
// @Description 根据工单ID查询CB消息
// @param id 工单ID
// @return *model.CBCMessage CB消息对象
// @return error 错误信息
// @example
// CBCMessage.SelectCBCMessageById(12345)
func (s *CBCMessage) SelectById(id int64) (*model.CBCMessage, error) {
return s.cbcMessageRepository.SelectById(id)
}
func (s *CBCMessage) Insert(msg model.CBCMessage) error {
// 解析messageJson获取eventName
var messageData map[string]interface{}
if err := json.Unmarshal(msg.MessageJson, &messageData); err != nil {
return fmt.Errorf("failed to parse message_json: %w", err)
}
// 提取eventName
eventName, ok := messageData["eventName"].(string)
if !ok || eventName == "" {
return fmt.Errorf("eventName is required in message_json")
}
// 检查是否已存在相同的eventName
var err error
var existingMsg *model.CBCMessage
if existingMsg, err = s.cbcMessageRepository.SelectByEventName(eventName); err != nil {
return fmt.Errorf("failed to check existing CBC message: %w", err)
}
if existingMsg != nil {
return fmt.Errorf("CBC message with eventName '%s' already exists for neType '%s' and neId '%s'",
eventName, existingMsg.NeType, existingMsg.NeId)
}
return s.cbcMessageRepository.Insert(msg)
}
// UpdateCBCMessage 更新CB消息
// @Description 更新CB消息
// @param msg CB消息对象
// @return error 错误信息
// @example
// mfCBCMessageService.UpdateCBCMessage(msg)
func (s *CBCMessage) Update(id int64, messageJson json.RawMessage) error {
// 解析messageJson获取eventName
var messageData map[string]interface{}
if err := json.Unmarshal(messageJson, &messageData); err != nil {
return fmt.Errorf("failed to parse message_json: %w", err)
}
// 提取eventName
eventName, ok := messageData["eventName"].(string)
if !ok || eventName == "" {
return fmt.Errorf("eventName is required in message_json")
}
// 检查是否已存在相同的eventName
var err error
var msg *model.CBCMessage
if msg, err = s.cbcMessageRepository.SelectByEventName(eventName); err != nil {
if err == gorm.ErrRecordNotFound {
return fmt.Errorf("no existing CBC message found with eventName: %s", eventName)
}
return fmt.Errorf("failed to query existing CBC message: %w", err)
}
// 如果存在,更新记录
if err := s.cbcMessageRepository.Update(id, messageJson); err != nil {
return fmt.Errorf("failed to update CBC message: %w", err)
}
// 如果状态是ACTIVE发送更新请求
if msg.Status == model.CBCEventStatusActive {
if err := s.sendUpdateRequest(*msg); err != nil {
return fmt.Errorf("failed to send update request: %w", err)
}
}
return nil
}
// UpdateCBCMessage 更新CB消息
// @Description 更新CB消息
// @param msg CB消息对象
// @return error 错误信息
// @example
// UpdateCBCMessageDetail(msg)
func (s *CBCMessage) UpdateDetail(eventName, detail string) error {
if err := s.cbcMessageRepository.UpdateDetail(eventName, detail); err != nil {
return fmt.Errorf("failed to update CBC message detail: %w", err)
}
return nil
}
// DeleteCBCMessage 删除CB消息
// @Description 删除CB消息
// @param id 工单ID
// @return error 错误信息
// @example
// mfCBCMessageService.DeleteCBCMessage(12345)
func (s *CBCMessage) Delete(id int64) error {
// 先查询记录状态
var err error
var msg *model.CBCMessage
if msg, err = s.cbcMessageRepository.SelectById(id); err != nil {
if err == gorm.ErrRecordNotFound {
return fmt.Errorf("CBC message with ID %d not found", id)
}
return fmt.Errorf("failed to query CBC message: %w", err)
}
// 检查状态是否为ACTIVE
if msg.Status == model.CBCEventStatusActive {
return fmt.Errorf("cannot delete an active CBC message (ID: %d)", id)
}
// 执行删除操作
if err := s.cbcMessageRepository.Delete(id); err != nil {
return fmt.Errorf("failed to delete CBC message: %w", err)
}
return nil
}
// UpdateCBCMessageStatus 更新CB消息状态
// @Description 更新CB消息状态并根据状态变化发送相应的HTTP请求
// @param status 新状态
// @return error 错误信息
func (s *CBCMessage) UpdateStatus(id int64, status string) error {
newStatus := model.ParseCBCEventStatus(status)
// 查询所有需要更新的记录
var err error
var msg *model.CBCMessage
if msg, err = s.cbcMessageRepository.SelectById(id); err != nil {
if err == gorm.ErrRecordNotFound {
return fmt.Errorf("CBC message with ID %d not found", id)
}
return fmt.Errorf("failed to query CBC message: %w", err)
}
oldStatus := msg.Status
// 检查状态是否发生变化
if oldStatus == newStatus {
return fmt.Errorf("CBC message status is already %s", newStatus.Enum())
}
// 更新数据库状态
if err := s.cbcMessageRepository.UpdateStatus(id, newStatus); err != nil {
return fmt.Errorf("failed to update CBC message status: %w", err)
}
// 根据状态变化发送HTTP请求
if err := s.handleStatusChange(*msg, oldStatus, newStatus); err != nil {
// 记录错误但不中断处理其他消息
fmt.Printf("Failed to handle status change for message %d: %v\n", msg.Id, err)
}
return nil
}
// UpdateCBCMessageStatus 更新CB消息状态
// @Description 更新CB消息状态并根据状态变化发送相应的HTTP请求
// @param status 新状态
// @return error 错误信息
func (s *CBCMessage) UpdateStatusByNeId(neId string, status string) error {
newStatus := model.ParseCBCEventStatus(status)
// 查询所有需要更新的记录
msgs := make([]model.CBCMessage, 0)
if err := s.cbcMessageRepository.SelectByNeId(neId, &msgs); err != nil {
return fmt.Errorf("failed to query CB messages: %w", err)
}
for _, msg := range msgs {
oldStatus := msg.Status
// 检查状态是否发生变化
if oldStatus == newStatus {
continue // 状态没有变化,跳过
}
// 更新数据库状态
if err := s.cbcMessageRepository.UpdateStatus(msg.Id, newStatus); err != nil {
return fmt.Errorf("failed to update CBC message status: %w", err)
}
// 根据状态变化发送HTTP请求
if err := s.handleStatusChange(msg, oldStatus, newStatus); err != nil {
// 记录错误但不中断处理其他消息
fmt.Printf("Failed to handle status change for message %d: %v\n", msg.Id, err)
}
}
return nil
}
// UpdateCBCMessageStatus 更新CB消息状态
// @Description 更新CB消息状态并根据状态变化发送相应的HTTP请求
// @param status 新状态
// @return error 错误信息
func (s *CBCMessage) UpdateAllStatus(status string) error {
newStatus := model.ParseCBCEventStatus(status)
// 查询所有需要更新的记录
msgs := make([]model.CBCMessage, 0)
if err := s.cbcMessageRepository.Select(&msgs); err != nil {
return fmt.Errorf("failed to query CB messages: %w", err)
}
for _, msg := range msgs {
oldStatus := msg.Status
// 检查状态是否发生变化
if oldStatus == newStatus {
continue // 状态没有变化,跳过
}
// 更新数据库状态
if err := s.cbcMessageRepository.UpdateStatus(msg.Id, newStatus); err != nil {
return fmt.Errorf("failed to update CBC message status: %w", err)
}
// 根据状态变化发送HTTP请求
if err := s.handleStatusChange(msg, oldStatus, newStatus); err != nil {
// 记录错误但不中断处理其他消息
fmt.Printf("Failed to handle status change for message %d: %v\n", msg.Id, err)
}
}
return nil
}
// ExportXlsx 导出数据到 xlsx 文件
func (r CBCMessage) ExportXlsx(rows []model.CBCMessage, fileName, language string) (string, error) {
// 第一行表头标题
headerCells := map[string]string{
"A1": i18n.TKey(language, "alarm.export.alarmType"),
"B1": i18n.TKey(language, "alarm.export.origSeverity"),
"C1": i18n.TKey(language, "alarm.export.alarmTitle"),
"D1": i18n.TKey(language, "alarm.export.eventTime"),
"E1": i18n.TKey(language, "alarm.export.alarmId"),
"F1": i18n.TKey(language, "alarm.export.alarmCode"),
"G1": i18n.TKey(language, "ne.common.neType"),
"H1": i18n.TKey(language, "ne.common.neName"),
"I1": i18n.TKey(language, "ne.common.neId"),
}
dataCells := make([]map[string]any, 0)
for i, row := range rows {
idx := strconv.Itoa(i + 2)
cells := map[string]any{
"A" + idx: row.NeType,
"B" + idx: row.NeId,
"C" + idx: row.MessageJson, // 这里假设 MessageJson 已经是字符串格式
"D" + idx: row.Status.Enum(),
"E" + idx: row.Detail,
"F" + idx: time.Unix(row.CreatedAt, 0).Format(time.RFC3339),
"G" + idx: time.Unix(*row.UpdatedAt, 0).Format(time.RFC3339),
}
dataCells = append(dataCells, cells)
}
// 导出数据表格
return file.WriteSheet(headerCells, dataCells, fileName, "")
}
// handleStatusChange 处理状态变化时的HTTP请求
func (s *CBCMessage) handleStatusChange(msg model.CBCMessage, oldStatus, newStatus model.CBCEventStatus) error {
// 从NULL/INACTIVE状态修改为ACTIVE
if (oldStatus == model.CBCEventStatusNull || oldStatus == model.CBCEventStatusInactive) &&
newStatus == model.CBCEventStatusActive {
return s.sendActivateRequest(msg)
}
// 从ACTIVE更改为INACTIVE状态
if oldStatus == model.CBCEventStatusActive && newStatus == model.CBCEventStatusInactive {
return s.sendDeactivateRequest(msg)
}
return nil
}
// getCBCNetworkElement 获取CBC网元的IP和端口信息
// 这个方法需要根据你的实际网元管理系统来实现
func (s *CBCMessage) getCBCNetworkElement(neId string) (string, int64, error) {
// 查询网元信息
neInfo := neService.NewNeInfo.FindByNeTypeAndNeID("CBC", neId)
if neInfo.IP == "" {
return "", 0, fmt.Errorf("CBC network element not found for neId: %s", neId)
}
return neInfo.IP, neInfo.Port, nil
}
// CBCHTTPClient CBC网元HTTP客户端
type CBCHTTPClient struct {
client *http.Client
baseURL string
}
// NewCBCHTTPClient 创建CBC HTTP客户端
func NewCBCHTTPClient(baseURL string) *CBCHTTPClient {
return &CBCHTTPClient{
client: &http.Client{
Timeout: 10 * time.Second,
},
baseURL: baseURL,
}
}
// PostMessage 发送POST请求创建消息
func (c *CBCHTTPClient) PostMessage(messageData []byte) error {
url := fmt.Sprintf("%s/api/v1/cbe/message", c.baseURL)
return c.sendRequest("POST", url, messageData)
}
// PutMessage 发送PUT请求更新消息
func (c *CBCHTTPClient) PutMessage(messageData []byte) error {
url := fmt.Sprintf("%s/api/v1/cbe/message", c.baseURL)
return c.sendRequest("PUT", url, messageData)
}
// DeleteMessage 发送DELETE请求删除消息
func (c *CBCHTTPClient) DeleteMessage(eventName string, deletePayload []byte) error {
url := fmt.Sprintf("%s/api/v1/cbe/message/%s", c.baseURL, eventName)
return c.sendRequest("DELETE", url, deletePayload)
}
// sendRequest 发送HTTP请求
func (c *CBCHTTPClient) sendRequest(method, url string, body []byte) error {
req, err := http.NewRequest(method, url, bytes.NewReader(body))
if err != nil {
return fmt.Errorf("failed to create %s request: %w", method, err)
}
req.Header.Set("Content-Type", "application/json")
resp, err := c.client.Do(req)
if err != nil {
return fmt.Errorf("failed to send %s request: %w", method, err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("%s request failed with status: %d, body: %s",
method, resp.StatusCode, string(body))
}
return nil
}
// 在CBCMessageService中添加方法
func (s *CBCMessage) getCBCHTTPClient(neId string) (*CBCHTTPClient, error) {
cbcIP, cbcPort, err := s.getCBCNetworkElement(neId)
if err != nil {
return nil, fmt.Errorf("failed to get CBC network element info: %w", err)
}
baseURL := fmt.Sprintf("http://%s:%d", cbcIP, cbcPort)
return NewCBCHTTPClient(baseURL), nil
}
// 重构后的激活请求
func (s *CBCMessage) sendActivateRequest(msg model.CBCMessage) error {
client, err := s.getCBCHTTPClient(msg.NeId)
if err != nil {
return err
}
return client.PostMessage(msg.MessageJson)
}
// 重构后的更新请求
func (s *CBCMessage) sendUpdateRequest(msg model.CBCMessage) error {
client, err := s.getCBCHTTPClient(msg.NeId)
if err != nil {
return err
}
return client.PutMessage(msg.MessageJson)
}
// 重构后的停用请求
func (s *CBCMessage) sendDeactivateRequest(msg model.CBCMessage) error {
client, err := s.getCBCHTTPClient(msg.NeId)
if err != nil {
return err
}
// 解析和构造删除载荷的逻辑保持不变
var messageData map[string]interface{}
if err := json.Unmarshal(msg.MessageJson, &messageData); err != nil {
return fmt.Errorf("failed to parse message_json: %w", err)
}
eventName, ok := messageData["eventName"].(string)
if !ok || eventName == "" {
return fmt.Errorf("eventName not found in message_json")
}
deletePayload := make(map[string]interface{})
if messageIdProfile, exists := messageData["messageIdProfile"]; exists {
deletePayload["messageIdProfile"] = messageIdProfile
}
if warningAreaList, exists := messageData["warningAreaList"]; exists {
deletePayload["warningAreaList"] = warningAreaList
}
payloadBytes, err := json.Marshal(deletePayload)
if err != nil {
return fmt.Errorf("failed to marshal delete payload: %w", err)
}
return client.DeleteMessage(eventName, payloadBytes)
}