package cb_message import ( "bytes" "encoding/json" "fmt" "io" "net/http" "time" "be.ems/lib/dborm" timelib "be.ems/lib/time" neService "be.ems/src/modules/network_element/service" "gorm.io/gorm" ) type CBMessageService struct { db *gorm.DB } // 构造函数示例 func NewCBMessageService() *CBMessageService { db := dborm.DefaultDB() return &CBMessageService{db: db} } // SelectCBMessage 根据条件分页查询CB消息 func (s *CBMessageService) SelectCBMessage(query CBMessageQuery) ([]CBMessage, int, error) { var msg []CBMessage var total int64 db := s.db.Table("cb_message") if query.NeType != "" { db = db.Where("ne_type = ?", query.NeType) } if query.NeId != "" { db = db.Where("ne_id = ?", query.NeId) } if query.EventName != "" { db = db.Where("JSON_EXTRACT(message_json, '$.eventName') = ?", query.EventName) } if query.Status != "" { db = db.Where("status = ?", query.Status) } var startMicro, endMicro int64 var err error if query.StartTime != "" { startMicro, err = timelib.ParseTimeToMicro(query.StartTime) if err != nil { return nil, 0, fmt.Errorf("invalid start time: %w", err) } } if query.EndTime != "" { endMicro, err = timelib.ParseTimeToMicro(query.EndTime) if err != nil { return nil, 0, fmt.Errorf("invalid end time: %w", err) } } if startMicro > 0 && endMicro > 0 { db = db.Where("created_at BETWEEN ? AND ?", startMicro, endMicro) } else if startMicro > 0 { db = db.Where("created_at >= ?", startMicro) } else if endMicro > 0 { db = db.Where("created_at <= ?", endMicro) } // 统计总数 if err := db.Count(&total).Error; err != nil { return nil, 0, fmt.Errorf("failed to count CB message: %w", err) } // 分页查询 offset := (query.PageNum - 1) * query.PageSize if err := db.Limit(query.PageSize).Offset(offset).Order("created_at desc").Find(&msg).Error; err != nil { return nil, 0, fmt.Errorf("failed to select CB message: %w", err) } return msg, int(total), nil } // SelectCBMessageByPage 分页查询CB消息 // @Description 分页查询CB消息 // @param page 页码 // @param pageSize 每页大小 // @return []CBMessage CB消息列表 // @return int 总记录数 // @return error 错误信息 // @example // mfCBMessageService.SelectCBMessageByPage(1, 10) func (s *CBMessageService) SelectCBMessageByPage(pageNum int, pageSize int) ([]CBMessage, int, error) { var tickets []CBMessage var total int64 // 统计总数 if err := s.db.Table("cb_message").Count(&total).Error; err != nil { return nil, 0, fmt.Errorf("failed to count CB message: %w", err) } // 分页查询 offset := (pageNum - 1) * pageSize if err := s.db.Table("cb_message"). Limit(pageSize). Offset(offset). Find(&tickets).Error; err != nil { return nil, 0, fmt.Errorf("failed to select CB message: %w", err) } return tickets, int(total), nil } // InsertCBMessage 插入CB消息 // @Description 插入CB消息 // @param msg CB消息对象 // @return error 错误信息 // @example // CBMessageService.InsertCBMessage(msg) func (s *CBMessageService) InsertCBMessage(msg CBMessage) error { // 解析messageJson获取eventName var messageData map[string]interface{} if err := json.Unmarshal(msg.MessageJson, &messageData); err != nil { return fmt.Errorf("failed to parse message_json: %w", err) } // 提取eventName eventName, ok := messageData["eventName"].(string) if !ok || eventName == "" { return fmt.Errorf("eventName is required in message_json") } // 检查是否已存在相同的eventName var existingCount int64 if err := s.db.Table("cb_message"). Where("ne_type = ? AND ne_id = ? AND JSON_EXTRACT(message_json, '$.eventName') = ?", msg.NeType, msg.NeId, eventName). Count(&existingCount).Error; err != nil { return fmt.Errorf("failed to check existing CB message: %w", err) } if existingCount > 0 { return fmt.Errorf("CB message with eventName '%s' already exists for neType '%s' and neId '%s'", eventName, msg.NeType, msg.NeId) } // 这里可以使用ORM或其他方式将ticket插入到数据库中 if err := s.db.Table("cb_message").Create(&msg).Error; err != nil { return fmt.Errorf("failed to insert CB message: %w", err) } return nil } // SelectCBMessageById 根据工单ID查询CB消息 // @Description 根据工单ID查询CB消息 // @param id 工单ID // @return *CBMessage CB消息对象 // @return error 错误信息 // @example // CBMessageService.SelectCBMessageById(12345) func (s *CBMessageService) SelectCBMessageById(id int64) (*CBMessage, error) { var msg CBMessage if err := s.db.Table("cb_message"). Where("id = ?", id). First(&msg).Error; err != nil { if err == gorm.ErrRecordNotFound { return nil, fmt.Errorf("CB message with ID %d not found", id) } return nil, fmt.Errorf("failed to select CB message: %w", err) } return &msg, nil } // UpdateCBMessage 更新CB消息 // @Description 更新CB消息 // @param msg CB消息对象 // @return error 错误信息 // @example // mfCBMessageService.UpdateCBMessage(msg) func (s *CBMessageService) UpdateCBMessage(id int64, messageJson json.RawMessage) error { // 解析messageJson获取eventName var messageData map[string]interface{} if err := json.Unmarshal(messageJson, &messageData); err != nil { return fmt.Errorf("failed to parse message_json: %w", err) } // 提取eventName eventName, ok := messageData["eventName"].(string) if !ok || eventName == "" { return fmt.Errorf("eventName is required in message_json") } // 检查是否已存在相同的eventName var msg CBMessage if err := s.db.Table("cb_message"). Where("id = ? AND JSON_EXTRACT(message_json, '$.eventName') = ?", id, eventName). First(&msg).Error; err != nil { if err == gorm.ErrRecordNotFound { return fmt.Errorf("CB message with eventName '%s' not found for ID %d", eventName, id) } return fmt.Errorf("failed to query CB message: %w", err) } now := time.Now().UnixMicro() err := s.db.Transaction(func(tx *gorm.DB) error { // 在事务中更新 if err := tx.Table("cb_message"). Where("id = ?", id). Updates(map[string]any{ "message_json": messageJson, "updated_at": now, }).Error; err != nil { return fmt.Errorf("failed to update CBC message: %w", err) } // 在事务中查询更新后的记录 if err := tx.Table("cb_message"). Where("id = ?", id). First(&msg).Error; err != nil { return fmt.Errorf("failed to fetch updated CBC message: %w", err) } return nil }) if err != nil { return fmt.Errorf("failed to update CB message: %w", err) } // 如果状态是ACTIVE,发送更新请求 if msg.Status == CBEventStatusActive { if err := s.sendUpdateRequest(msg); err != nil { return fmt.Errorf("failed to send update request: %w", err) } } return nil } // DeleteCBMessage 删除CB消息 // @Description 删除CB消息 // @param id 工单ID // @return error 错误信息 // @example // mfCBMessageService.DeleteCBMessage(12345) func (s *CBMessageService) DeleteCBMessage(id int64) error { // 先查询记录状态 var msg CBMessage if err := s.db.Table("cb_message"). Where("id = ?", id). First(&msg).Error; err != nil { if err == gorm.ErrRecordNotFound { return fmt.Errorf("not found CB message (ID: %d)", id) } return fmt.Errorf("failed to query CB message: %w", err) } // 检查状态是否为ACTIVE if msg.Status == CBEventStatusActive { return fmt.Errorf("cannot delete an active CB message (ID: %d)", id) } // 执行删除操作 if err := s.db.Table("cb_message"). Where("id = ?", id). Delete(&CBMessage{}).Error; err != nil { return fmt.Errorf("failed to delete CB message: %w", err) } return nil } // UpdateCBMessageStatus 更新CB消息状态 // @Description 更新CB消息状态,并根据状态变化发送相应的HTTP请求 // @param status 新状态 // @return error 错误信息 func (s *CBMessageService) UpdateCBMessageStatus(id int64, status string) error { newStatus := ParseCBEventStatus(status) // 查询所有需要更新的记录 var msg CBMessage if err := s.db.Table("cb_message").Where("id = ?", id).First(&msg).Error; err != nil { return fmt.Errorf("failed to query CB messages: %w", err) } oldStatus := msg.Status // 检查状态是否发生变化 if oldStatus == newStatus { return fmt.Errorf("CB message status is already %s", newStatus.Enum()) } // 更新数据库状态 now := time.Now().UnixMicro() if err := s.db.Table("cb_message"). Where("id = ?", msg.Id). Updates(map[string]interface{}{ "status": newStatus, "updated_at": now, }).Error; err != nil { return fmt.Errorf("failed to update CB message status: %w", err) } // 根据状态变化发送HTTP请求 if err := s.handleStatusChange(msg, oldStatus, newStatus); err != nil { // 记录错误但不中断处理其他消息 fmt.Printf("Failed to handle status change for message %d: %v\n", msg.Id, err) } return nil } // UpdateCBMessageStatus 更新CB消息状态 // @Description 更新CB消息状态,并根据状态变化发送相应的HTTP请求 // @param status 新状态 // @return error 错误信息 func (s *CBMessageService) UpdateCBMessageStatusByNeId(neId string, status string) error { newStatus := ParseCBEventStatus(status) // 查询所有需要更新的记录 var messages []CBMessage if err := s.db.Table("cb_message").Where("ne_id = ?", neId).Find(&messages).Error; err != nil { return fmt.Errorf("failed to query CB messages: %w", err) } for _, msg := range messages { oldStatus := msg.Status // 检查状态是否发生变化 if oldStatus == newStatus { continue // 状态没有变化,跳过 } // 更新数据库状态 now := time.Now().UnixMicro() if err := s.db.Table("cb_message"). Where("id = ?", msg.Id). Updates(map[string]interface{}{ "status": newStatus, "updated_at": now, }).Error; err != nil { return fmt.Errorf("failed to update CB message status: %w", err) } // 根据状态变化发送HTTP请求 if err := s.handleStatusChange(msg, oldStatus, newStatus); err != nil { // 记录错误但不中断处理其他消息 fmt.Printf("Failed to handle status change for message %d: %v\n", msg.Id, err) } } return nil } // UpdateCBMessageStatus 更新CB消息状态 // @Description 更新CB消息状态,并根据状态变化发送相应的HTTP请求 // @param status 新状态 // @return error 错误信息 func (s *CBMessageService) UpdateAllCBMessageStatus(status string) error { newStatus := ParseCBEventStatus(status) // 查询所有需要更新的记录 var messages []CBMessage if err := s.db.Table("cb_message").Find(&messages).Error; err != nil { return fmt.Errorf("failed to query CB messages: %w", err) } for _, msg := range messages { oldStatus := msg.Status // 检查状态是否发生变化 if oldStatus == newStatus { continue // 状态没有变化,跳过 } // 更新数据库状态 now := time.Now().UnixMicro() if err := s.db.Table("cb_message"). Where("id = ?", msg.Id). Updates(map[string]interface{}{ "status": newStatus, "updated_at": now, }).Error; err != nil { return fmt.Errorf("failed to update CB message status: %w", err) } // 根据状态变化发送HTTP请求 if err := s.handleStatusChange(msg, oldStatus, newStatus); err != nil { // 记录错误但不中断处理其他消息 fmt.Printf("Failed to handle status change for message %d: %v\n", msg.Id, err) } } return nil } // handleStatusChange 处理状态变化时的HTTP请求 func (s *CBMessageService) handleStatusChange(msg CBMessage, oldStatus, newStatus CBEventStatus) error { // 从NULL/INACTIVE状态修改为ACTIVE if (oldStatus == CBEventStatusNull || oldStatus == CBEventStatusInactive) && newStatus == CBEventStatusActive { return s.sendActivateRequest(msg) } // 从ACTIVE更改为INACTIVE状态 if oldStatus == CBEventStatusActive && newStatus == CBEventStatusInactive { return s.sendDeactivateRequest(msg) } return nil } // getCBCNetworkElement 获取CBC网元的IP和端口信息 // 这个方法需要根据你的实际网元管理系统来实现 func (s *CBMessageService) getCBCNetworkElement(neId string) (string, int64, error) { // 查询网元信息 neInfo := neService.NewNeInfo.SelectNeInfoByNeTypeAndNeID("CBC", neId) if neInfo.IP == "" { return "", 0, fmt.Errorf("CBC network element not found for neId: %s", neId) } return neInfo.IP, neInfo.Port, nil } // CBCHTTPClient CBC网元HTTP客户端 type CBCHTTPClient struct { client *http.Client baseURL string } // NewCBCHTTPClient 创建CBC HTTP客户端 func NewCBCHTTPClient(baseURL string) *CBCHTTPClient { return &CBCHTTPClient{ client: &http.Client{ Timeout: 10 * time.Second, }, baseURL: baseURL, } } // PostMessage 发送POST请求创建消息 func (c *CBCHTTPClient) PostMessage(messageData []byte) error { url := fmt.Sprintf("%s/api/v1/cbe/message", c.baseURL) return c.sendRequest("POST", url, messageData) } // PutMessage 发送PUT请求更新消息 func (c *CBCHTTPClient) PutMessage(messageData []byte) error { url := fmt.Sprintf("%s/api/v1/cbe/message", c.baseURL) return c.sendRequest("PUT", url, messageData) } // DeleteMessage 发送DELETE请求删除消息 func (c *CBCHTTPClient) DeleteMessage(eventName string, deletePayload []byte) error { url := fmt.Sprintf("%s/api/v1/cbe/message/%s", c.baseURL, eventName) return c.sendRequest("DELETE", url, deletePayload) } // sendRequest 发送HTTP请求 func (c *CBCHTTPClient) sendRequest(method, url string, body []byte) error { req, err := http.NewRequest(method, url, bytes.NewReader(body)) if err != nil { return fmt.Errorf("failed to create %s request: %w", method, err) } req.Header.Set("Content-Type", "application/json") resp, err := c.client.Do(req) if err != nil { return fmt.Errorf("failed to send %s request: %w", method, err) } defer resp.Body.Close() if resp.StatusCode < 200 || resp.StatusCode >= 300 { body, _ := io.ReadAll(resp.Body) return fmt.Errorf("%s request failed with status: %d, body: %s", method, resp.StatusCode, string(body)) } return nil } // 在CBMessageService中添加方法 func (s *CBMessageService) getCBCHTTPClient(neId string) (*CBCHTTPClient, error) { cbcIP, cbcPort, err := s.getCBCNetworkElement(neId) if err != nil { return nil, fmt.Errorf("failed to get CBC network element info: %w", err) } baseURL := fmt.Sprintf("http://%s:%d", cbcIP, cbcPort) return NewCBCHTTPClient(baseURL), nil } // 重构后的激活请求 func (s *CBMessageService) sendActivateRequest(msg CBMessage) error { client, err := s.getCBCHTTPClient(msg.NeId) if err != nil { return err } return client.PostMessage(msg.MessageJson) } // 重构后的更新请求 func (s *CBMessageService) sendUpdateRequest(msg CBMessage) error { client, err := s.getCBCHTTPClient(msg.NeId) if err != nil { return err } return client.PostMessage(msg.MessageJson) } // 重构后的停用请求 func (s *CBMessageService) sendDeactivateRequest(msg CBMessage) error { client, err := s.getCBCHTTPClient(msg.NeId) if err != nil { return err } // 解析和构造删除载荷的逻辑保持不变 var messageData map[string]interface{} if err := json.Unmarshal(msg.MessageJson, &messageData); err != nil { return fmt.Errorf("failed to parse message_json: %w", err) } eventName, ok := messageData["eventName"].(string) if !ok || eventName == "" { return fmt.Errorf("eventName not found in message_json") } deletePayload := make(map[string]interface{}) if messageIdProfile, exists := messageData["messageIdProfile"]; exists { deletePayload["messageIdProfile"] = messageIdProfile } if warningAreaList, exists := messageData["warningAreaList"]; exists { deletePayload["warningAreaList"] = warningAreaList } payloadBytes, err := json.Marshal(deletePayload) if err != nil { return fmt.Errorf("failed to marshal delete payload: %w", err) } return client.DeleteMessage(eventName, payloadBytes) }