package service import ( "bytes" "encoding/json" "fmt" "io" "net/http" "strconv" "time" "be.ems/src/framework/i18n" "be.ems/src/framework/utils/file" "be.ems/src/modules/network_data/model" "be.ems/src/modules/network_data/repository" neService "be.ems/src/modules/network_element/service" "gorm.io/gorm" ) // 实例化数据层 CBCMessage 结构体 var NewCBCMessage = &CBCMessage{ cbcMessageRepository: repository.NewCBCMessage, } // CBCMessage CB消息 服务层处理 type CBCMessage struct { cbcMessageRepository *repository.CBCMessage // UE会话事件数据信息 } // SelectByPage 根据条件分页查询CB消息 func (s *CBCMessage) SelectByPage(query model.CBCMessageQuery) ([]model.CBCMessage, int, error) { return s.cbcMessageRepository.SelectByPage(query) } // SelectCBCMessageById 根据工单ID查询CB消息 // @Description 根据工单ID查询CB消息 // @param id 工单ID // @return *model.CBCMessage CB消息对象 // @return error 错误信息 // @example // CBCMessage.SelectCBCMessageById(12345) func (s *CBCMessage) SelectById(id int64) (*model.CBCMessage, error) { return s.cbcMessageRepository.SelectById(id) } func (s *CBCMessage) Insert(msg model.CBCMessage) error { // 解析messageJson获取eventName var messageData map[string]interface{} if err := json.Unmarshal(msg.MessageJson, &messageData); err != nil { return fmt.Errorf("failed to parse message_json: %w", err) } // 提取eventName eventName, ok := messageData["eventName"].(string) if !ok || eventName == "" { return fmt.Errorf("eventName is required in message_json") } // 检查是否已存在相同的eventName var err error var existingMsg *model.CBCMessage if existingMsg, err = s.cbcMessageRepository.SelectByEventName(eventName); err != nil { return fmt.Errorf("failed to check existing CBC message: %w", err) } if existingMsg != nil { return fmt.Errorf("CBC message with eventName '%s' already exists for neType '%s' and neId '%s'", eventName, existingMsg.NeType, existingMsg.NeId) } return s.cbcMessageRepository.Insert(msg) } // UpdateCBCMessage 更新CB消息 // @Description 更新CB消息 // @param msg CB消息对象 // @return error 错误信息 // @example // mfCBCMessageService.UpdateCBCMessage(msg) func (s *CBCMessage) Update(id int64, messageJson json.RawMessage) error { // 解析messageJson获取eventName var messageData map[string]interface{} if err := json.Unmarshal(messageJson, &messageData); err != nil { return fmt.Errorf("failed to parse message_json: %w", err) } // 提取eventName eventName, ok := messageData["eventName"].(string) if !ok || eventName == "" { return fmt.Errorf("eventName is required in message_json") } // 检查是否已存在相同的eventName var err error var msg *model.CBCMessage if msg, err = s.cbcMessageRepository.SelectByEventName(eventName); err != nil { if err == gorm.ErrRecordNotFound { return fmt.Errorf("no existing CBC message found with eventName: %s", eventName) } return fmt.Errorf("failed to query existing CBC message: %w", err) } // 如果存在,更新记录 if err := s.cbcMessageRepository.Update(id, messageJson); err != nil { return fmt.Errorf("failed to update CBC message: %w", err) } // 如果状态是ACTIVE,发送更新请求 if msg.Status == model.CBCEventStatusActive { if err := s.sendUpdateRequest(*msg); err != nil { return fmt.Errorf("failed to send update request: %w", err) } } return nil } // UpdateCBCMessage 更新CB消息 // @Description 更新CB消息 // @param msg CB消息对象 // @return error 错误信息 // @example // UpdateCBCMessageDetail(msg) func (s *CBCMessage) UpdateDetail(eventName, detail string) error { if err := s.cbcMessageRepository.UpdateDetail(eventName, detail); err != nil { return fmt.Errorf("failed to update CBC message detail: %w", err) } return nil } // DeleteCBCMessage 删除CB消息 // @Description 删除CB消息 // @param id 工单ID // @return error 错误信息 // @example // mfCBCMessageService.DeleteCBCMessage(12345) func (s *CBCMessage) Delete(id int64) error { // 先查询记录状态 var err error var msg *model.CBCMessage if msg, err = s.cbcMessageRepository.SelectById(id); err != nil { if err == gorm.ErrRecordNotFound { return fmt.Errorf("CBC message with ID %d not found", id) } return fmt.Errorf("failed to query CBC message: %w", err) } // 检查状态是否为ACTIVE if msg.Status == model.CBCEventStatusActive { return fmt.Errorf("cannot delete an active CBC message (ID: %d)", id) } // 执行删除操作 if err := s.cbcMessageRepository.Delete(id); err != nil { return fmt.Errorf("failed to delete CBC message: %w", err) } return nil } // UpdateCBCMessageStatus 更新CB消息状态 // @Description 更新CB消息状态,并根据状态变化发送相应的HTTP请求 // @param status 新状态 // @return error 错误信息 func (s *CBCMessage) UpdateStatus(id int64, status string) error { newStatus := model.ParseCBCEventStatus(status) // 查询所有需要更新的记录 var err error var msg *model.CBCMessage if msg, err = s.cbcMessageRepository.SelectById(id); err != nil { if err == gorm.ErrRecordNotFound { return fmt.Errorf("CBC message with ID %d not found", id) } return fmt.Errorf("failed to query CBC message: %w", err) } oldStatus := msg.Status // 检查状态是否发生变化 if oldStatus == newStatus { return fmt.Errorf("CBC message status is already %s", newStatus.Enum()) } // 更新数据库状态 if err := s.cbcMessageRepository.UpdateStatus(id, newStatus); err != nil { return fmt.Errorf("failed to update CBC message status: %w", err) } // 根据状态变化发送HTTP请求 if err := s.handleStatusChange(*msg, oldStatus, newStatus); err != nil { // 记录错误但不中断处理其他消息 fmt.Printf("Failed to handle status change for message %d: %v\n", msg.Id, err) } return nil } // UpdateCBCMessageStatus 更新CB消息状态 // @Description 更新CB消息状态,并根据状态变化发送相应的HTTP请求 // @param status 新状态 // @return error 错误信息 func (s *CBCMessage) UpdateStatusByNeId(neId string, status string) error { newStatus := model.ParseCBCEventStatus(status) // 查询所有需要更新的记录 msgs := make([]model.CBCMessage, 0) if err := s.cbcMessageRepository.SelectByNeId(neId, &msgs); err != nil { return fmt.Errorf("failed to query CB messages: %w", err) } for _, msg := range msgs { oldStatus := msg.Status // 检查状态是否发生变化 if oldStatus == newStatus { continue // 状态没有变化,跳过 } // 更新数据库状态 if err := s.cbcMessageRepository.UpdateStatus(msg.Id, newStatus); err != nil { return fmt.Errorf("failed to update CBC message status: %w", err) } // 根据状态变化发送HTTP请求 if err := s.handleStatusChange(msg, oldStatus, newStatus); err != nil { // 记录错误但不中断处理其他消息 fmt.Printf("Failed to handle status change for message %d: %v\n", msg.Id, err) } } return nil } // UpdateCBCMessageStatus 更新CB消息状态 // @Description 更新CB消息状态,并根据状态变化发送相应的HTTP请求 // @param status 新状态 // @return error 错误信息 func (s *CBCMessage) UpdateAllStatus(status string) error { newStatus := model.ParseCBCEventStatus(status) // 查询所有需要更新的记录 msgs := make([]model.CBCMessage, 0) if err := s.cbcMessageRepository.Select(&msgs); err != nil { return fmt.Errorf("failed to query CB messages: %w", err) } for _, msg := range msgs { oldStatus := msg.Status // 检查状态是否发生变化 if oldStatus == newStatus { continue // 状态没有变化,跳过 } // 更新数据库状态 if err := s.cbcMessageRepository.UpdateStatus(msg.Id, newStatus); err != nil { return fmt.Errorf("failed to update CBC message status: %w", err) } // 根据状态变化发送HTTP请求 if err := s.handleStatusChange(msg, oldStatus, newStatus); err != nil { // 记录错误但不中断处理其他消息 fmt.Printf("Failed to handle status change for message %d: %v\n", msg.Id, err) } } return nil } // ExportXlsx 导出数据到 xlsx 文件 func (r CBCMessage) ExportXlsx(rows []model.CBCMessage, fileName, language string) (string, error) { // 第一行表头标题 headerCells := map[string]string{ "A1": i18n.TKey(language, "alarm.export.alarmType"), "B1": i18n.TKey(language, "alarm.export.origSeverity"), "C1": i18n.TKey(language, "alarm.export.alarmTitle"), "D1": i18n.TKey(language, "alarm.export.eventTime"), "E1": i18n.TKey(language, "alarm.export.alarmId"), "F1": i18n.TKey(language, "alarm.export.alarmCode"), "G1": i18n.TKey(language, "ne.common.neType"), "H1": i18n.TKey(language, "ne.common.neName"), "I1": i18n.TKey(language, "ne.common.neId"), } dataCells := make([]map[string]any, 0) for i, row := range rows { idx := strconv.Itoa(i + 2) cells := map[string]any{ "A" + idx: row.NeType, "B" + idx: row.NeId, "C" + idx: row.MessageJson, // 这里假设 MessageJson 已经是字符串格式 "D" + idx: row.Status.Enum(), "E" + idx: row.Detail, "F" + idx: time.Unix(row.CreatedAt, 0).Format(time.RFC3339), "G" + idx: time.Unix(*row.UpdatedAt, 0).Format(time.RFC3339), } dataCells = append(dataCells, cells) } // 导出数据表格 return file.WriteSheet(headerCells, dataCells, fileName, "") } // handleStatusChange 处理状态变化时的HTTP请求 func (s *CBCMessage) handleStatusChange(msg model.CBCMessage, oldStatus, newStatus model.CBCEventStatus) error { // 从NULL/INACTIVE状态修改为ACTIVE if (oldStatus == model.CBCEventStatusNull || oldStatus == model.CBCEventStatusInactive) && newStatus == model.CBCEventStatusActive { return s.sendActivateRequest(msg) } // 从ACTIVE更改为INACTIVE状态 if oldStatus == model.CBCEventStatusActive && newStatus == model.CBCEventStatusInactive { return s.sendDeactivateRequest(msg) } return nil } // getCBCNetworkElement 获取CBC网元的IP和端口信息 // 这个方法需要根据你的实际网元管理系统来实现 func (s *CBCMessage) getCBCNetworkElement(neId string) (string, int64, error) { // 查询网元信息 neInfo := neService.NewNeInfo.FindByNeTypeAndNeID("CBC", neId) if neInfo.IP == "" { return "", 0, fmt.Errorf("CBC network element not found for neId: %s", neId) } return neInfo.IP, neInfo.Port, nil } // CBCHTTPClient CBC网元HTTP客户端 type CBCHTTPClient struct { client *http.Client baseURL string } // NewCBCHTTPClient 创建CBC HTTP客户端 func NewCBCHTTPClient(baseURL string) *CBCHTTPClient { return &CBCHTTPClient{ client: &http.Client{ Timeout: 10 * time.Second, }, baseURL: baseURL, } } // PostMessage 发送POST请求创建消息 func (c *CBCHTTPClient) PostMessage(messageData []byte) error { url := fmt.Sprintf("%s/api/v1/cbe/message", c.baseURL) return c.sendRequest("POST", url, messageData) } // PutMessage 发送PUT请求更新消息 func (c *CBCHTTPClient) PutMessage(messageData []byte) error { url := fmt.Sprintf("%s/api/v1/cbe/message", c.baseURL) return c.sendRequest("PUT", url, messageData) } // DeleteMessage 发送DELETE请求删除消息 func (c *CBCHTTPClient) DeleteMessage(eventName string, deletePayload []byte) error { url := fmt.Sprintf("%s/api/v1/cbe/message/%s", c.baseURL, eventName) return c.sendRequest("DELETE", url, deletePayload) } // sendRequest 发送HTTP请求 func (c *CBCHTTPClient) sendRequest(method, url string, body []byte) error { req, err := http.NewRequest(method, url, bytes.NewReader(body)) if err != nil { return fmt.Errorf("failed to create %s request: %w", method, err) } req.Header.Set("Content-Type", "application/json") resp, err := c.client.Do(req) if err != nil { return fmt.Errorf("failed to send %s request: %w", method, err) } defer resp.Body.Close() if resp.StatusCode < 200 || resp.StatusCode >= 300 { body, _ := io.ReadAll(resp.Body) return fmt.Errorf("%s request failed with status: %d, body: %s", method, resp.StatusCode, string(body)) } return nil } // 在CBCMessageService中添加方法 func (s *CBCMessage) getCBCHTTPClient(neId string) (*CBCHTTPClient, error) { cbcIP, cbcPort, err := s.getCBCNetworkElement(neId) if err != nil { return nil, fmt.Errorf("failed to get CBC network element info: %w", err) } baseURL := fmt.Sprintf("http://%s:%d", cbcIP, cbcPort) return NewCBCHTTPClient(baseURL), nil } // 重构后的激活请求 func (s *CBCMessage) sendActivateRequest(msg model.CBCMessage) error { client, err := s.getCBCHTTPClient(msg.NeId) if err != nil { return err } return client.PostMessage(msg.MessageJson) } // 重构后的更新请求 func (s *CBCMessage) sendUpdateRequest(msg model.CBCMessage) error { client, err := s.getCBCHTTPClient(msg.NeId) if err != nil { return err } return client.PutMessage(msg.MessageJson) } // 重构后的停用请求 func (s *CBCMessage) sendDeactivateRequest(msg model.CBCMessage) error { client, err := s.getCBCHTTPClient(msg.NeId) if err != nil { return err } // 解析和构造删除载荷的逻辑保持不变 var messageData map[string]interface{} if err := json.Unmarshal(msg.MessageJson, &messageData); err != nil { return fmt.Errorf("failed to parse message_json: %w", err) } eventName, ok := messageData["eventName"].(string) if !ok || eventName == "" { return fmt.Errorf("eventName not found in message_json") } deletePayload := make(map[string]interface{}) if messageIdProfile, exists := messageData["messageIdProfile"]; exists { deletePayload["messageIdProfile"] = messageIdProfile } if warningAreaList, exists := messageData["warningAreaList"]; exists { deletePayload["warningAreaList"] = warningAreaList } payloadBytes, err := json.Marshal(deletePayload) if err != nil { return fmt.Errorf("failed to marshal delete payload: %w", err) } return client.DeleteMessage(eventName, payloadBytes) }