feat: cb message

This commit is contained in:
zhangsz
2025-07-02 17:23:47 +08:00
parent 758276ecfc
commit 574c39f748
6 changed files with 874 additions and 0 deletions

View File

@@ -0,0 +1,441 @@
package cb_message
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"time"
"be.ems/lib/dborm"
timelib "be.ems/lib/time"
neService "be.ems/src/modules/network_element/service"
"gorm.io/gorm"
)
type CBMessageService struct {
db *gorm.DB
}
// 构造函数示例
func NewCBMessageService() *CBMessageService {
db := dborm.DefaultDB()
return &CBMessageService{db: db}
}
// SelectCBMessage 根据条件分页查询CB消息
func (s *CBMessageService) SelectCBMessage(query CBMessageQuery) ([]CBMessage, int, error) {
var msg []CBMessage
var total int64
db := s.db.Table("cb_message")
if query.NeType != "" {
db = db.Where("ne_type = ?", query.NeType)
}
if query.NeId != "" {
db = db.Where("ne_id = ?", query.NeId)
}
if query.EventName != "" {
db = db.Where("JSON_EXTRACT(message_json, '$.eventName') = ?", query.EventName)
}
if query.Status != "" {
db = db.Where("status = ?", query.Status)
}
var startMicro, endMicro int64
var err error
if query.StartTime != "" {
startMicro, err = timelib.ParseTimeToMicro(query.StartTime)
if err != nil {
return nil, 0, fmt.Errorf("invalid start time: %w", err)
}
}
if query.EndTime != "" {
endMicro, err = timelib.ParseTimeToMicro(query.EndTime)
if err != nil {
return nil, 0, fmt.Errorf("invalid end time: %w", err)
}
}
if startMicro > 0 && endMicro > 0 {
db = db.Where("created_at BETWEEN ? AND ?", startMicro, endMicro)
} else if startMicro > 0 {
db = db.Where("created_at >= ?", startMicro)
} else if endMicro > 0 {
db = db.Where("created_at <= ?", endMicro)
}
// 统计总数
if err := db.Count(&total).Error; err != nil {
return nil, 0, fmt.Errorf("failed to count CB message: %w", err)
}
// 分页查询
offset := (query.PageNum - 1) * query.PageSize
if err := db.Limit(query.PageSize).Offset(offset).Order("created_at desc").Find(&msg).Error; err != nil {
return nil, 0, fmt.Errorf("failed to select CB message: %w", err)
}
return msg, int(total), nil
}
// SelectCBMessageByPage 分页查询CB消息
// @Description 分页查询CB消息
// @param page 页码
// @param pageSize 每页大小
// @return []CBMessage CB消息列表
// @return int 总记录数
// @return error 错误信息
// @example
// mfCBMessageService.SelectCBMessageByPage(1, 10)
func (s *CBMessageService) SelectCBMessageByPage(pageNum int, pageSize int) ([]CBMessage, int, error) {
var tickets []CBMessage
var total int64
// 统计总数
if err := s.db.Table("cb_message").Count(&total).Error; err != nil {
return nil, 0, fmt.Errorf("failed to count CB message: %w", err)
}
// 分页查询
offset := (pageNum - 1) * pageSize
if err := s.db.Table("cb_message").
Limit(pageSize).
Offset(offset).
Find(&tickets).Error; err != nil {
return nil, 0, fmt.Errorf("failed to select CB message: %w", err)
}
return tickets, int(total), nil
}
// InsertCBMessage 插入CB消息
// @Description 插入CB消息
// @param msg CB消息对象
// @return error 错误信息
// @example
// CBMessageService.InsertCBMessage(msg)
func (s *CBMessageService) InsertCBMessage(msg CBMessage) error {
// 这里可以使用ORM或其他方式将ticket插入到数据库中
if err := s.db.Table("cb_message").Create(&msg).Error; err != nil {
return fmt.Errorf("failed to insert CB message: %w", err)
}
return nil
}
// SelectCBMessageById 根据工单ID查询CB消息
// @Description 根据工单ID查询CB消息
// @param id 工单ID
// @return *CBMessage CB消息对象
// @return error 错误信息
// @example
// CBMessageService.SelectCBMessageById(12345)
func (s *CBMessageService) SelectCBMessageById(id int64) (*CBMessage, error) {
var msg CBMessage
if err := s.db.Table("cb_message").
Where("id = ?", id).
First(&msg).Error; err != nil {
if err == gorm.ErrRecordNotFound {
return nil, fmt.Errorf("CB message with ID %d not found", id)
}
return nil, fmt.Errorf("failed to select CB message: %w", err)
}
return &msg, nil
}
// UpdateCBMessage 更新CB消息
// @Description 更新CB消息
// @param msg CB消息对象
// @return error 错误信息
// @example
// mfCBMessageService.UpdateCBMessage(msg)
func (s *CBMessageService) UpdateCBMessage(msg CBMessage) error {
if err := s.db.Table("cb_message").
Where("id = ?", msg.Id).
Updates(msg).Error; err != nil {
return fmt.Errorf("failed to update CB message: %w", err)
}
return nil
}
// DeleteCBMessage 删除CB消息
// @Description 删除CB消息
// @param id 工单ID
// @return error 错误信息
// @example
// mfCBMessageService.DeleteCBMessage(12345)
func (s *CBMessageService) DeleteCBMessage(id int64) error {
// 先查询记录状态
var msg CBMessage
if err := s.db.Table("cb_message").
Where("id = ?", id).
First(&msg).Error; err != nil {
if err == gorm.ErrRecordNotFound {
return fmt.Errorf("not found CB message (ID: %d)", id)
}
return fmt.Errorf("failed to query CB message: %w", err)
}
// 检查状态是否为ACTIVE
if msg.Status == CBEventStatusActive {
return fmt.Errorf("cannot delete an active CB message (ID: %d)", id)
}
// 执行删除操作
if err := s.db.Table("cb_message").
Where("id = ?", id).
Delete(&CBMessage{}).Error; err != nil {
return fmt.Errorf("failed to delete CB message: %w", err)
}
return nil
}
// UpdateCBMessageStatus 更新CB消息状态
// @Description 更新CB消息状态并根据状态变化发送相应的HTTP请求
// @param status 新状态
// @return error 错误信息
func (s *CBMessageService) UpdateCBMessageStatus(id int64, status string) error {
newStatus := ParseCBEventStatus(status)
// 查询所有需要更新的记录
var msg CBMessage
if err := s.db.Table("cb_message").Where("id = ?", id).First(&msg).Error; err != nil {
return fmt.Errorf("failed to query CB messages: %w", err)
}
oldStatus := msg.Status
// 检查状态是否发生变化
if oldStatus == newStatus {
return fmt.Errorf("CB message status is already %s", newStatus.Enum())
}
// 更新数据库状态
now := time.Now().UnixMicro()
if err := s.db.Table("cb_message").
Where("id = ?", msg.Id).
Updates(map[string]interface{}{
"status": newStatus,
"updated_at": now,
}).Error; err != nil {
return fmt.Errorf("failed to update CB message status: %w", err)
}
// 根据状态变化发送HTTP请求
if err := s.handleStatusChange(msg, oldStatus, newStatus); err != nil {
// 记录错误但不中断处理其他消息
fmt.Printf("Failed to handle status change for message %d: %v\n", msg.Id, err)
}
return nil
}
// UpdateCBMessageStatus 更新CB消息状态
// @Description 更新CB消息状态并根据状态变化发送相应的HTTP请求
// @param status 新状态
// @return error 错误信息
func (s *CBMessageService) UpdateCBMessageStatusByNeId(neId string, status string) error {
newStatus := ParseCBEventStatus(status)
// 查询所有需要更新的记录
var messages []CBMessage
if err := s.db.Table("cb_message").Where("ne_id = ?", neId).Find(&messages).Error; err != nil {
return fmt.Errorf("failed to query CB messages: %w", err)
}
for _, msg := range messages {
oldStatus := msg.Status
// 检查状态是否发生变化
if oldStatus == newStatus {
continue // 状态没有变化,跳过
}
// 更新数据库状态
now := time.Now().UnixMicro()
if err := s.db.Table("cb_message").
Where("id = ?", msg.Id).
Updates(map[string]interface{}{
"status": newStatus,
"updated_at": now,
}).Error; err != nil {
return fmt.Errorf("failed to update CB message status: %w", err)
}
// 根据状态变化发送HTTP请求
if err := s.handleStatusChange(msg, oldStatus, newStatus); err != nil {
// 记录错误但不中断处理其他消息
fmt.Printf("Failed to handle status change for message %d: %v\n", msg.Id, err)
}
}
return nil
}
// UpdateCBMessageStatus 更新CB消息状态
// @Description 更新CB消息状态并根据状态变化发送相应的HTTP请求
// @param status 新状态
// @return error 错误信息
func (s *CBMessageService) UpdateAllCBMessageStatus(status string) error {
newStatus := ParseCBEventStatus(status)
// 查询所有需要更新的记录
var messages []CBMessage
if err := s.db.Table("cb_message").Find(&messages).Error; err != nil {
return fmt.Errorf("failed to query CB messages: %w", err)
}
for _, msg := range messages {
oldStatus := msg.Status
// 检查状态是否发生变化
if oldStatus == newStatus {
continue // 状态没有变化,跳过
}
// 更新数据库状态
now := time.Now().UnixMicro()
if err := s.db.Table("cb_message").
Where("id = ?", msg.Id).
Updates(map[string]interface{}{
"status": newStatus,
"updated_at": now,
}).Error; err != nil {
return fmt.Errorf("failed to update CB message status: %w", err)
}
// 根据状态变化发送HTTP请求
if err := s.handleStatusChange(msg, oldStatus, newStatus); err != nil {
// 记录错误但不中断处理其他消息
fmt.Printf("Failed to handle status change for message %d: %v\n", msg.Id, err)
}
}
return nil
}
// handleStatusChange 处理状态变化时的HTTP请求
func (s *CBMessageService) handleStatusChange(msg CBMessage, oldStatus, newStatus CBEventStatus) error {
// 从NULL/INACTIVE状态修改为ACTIVE
if (oldStatus == CBEventStatusNull || oldStatus == CBEventStatusInactive) &&
newStatus == CBEventStatusActive {
return s.sendActivateRequest(msg)
}
// 从ACTIVE更改为INACTIVE状态
if oldStatus == CBEventStatusActive && newStatus == CBEventStatusInactive {
return s.sendDeactivateRequest(msg)
}
return nil
}
// sendActivateRequest 发送激活请求 (POST)
func (s *CBMessageService) sendActivateRequest(msg CBMessage) error {
// 获取CBC网元信息 (这里需要根据你的实际情况获取IP和端口)
cbcIP, cbcPort, err := s.getCBCNetworkElement(msg.NeId)
if err != nil {
return fmt.Errorf("failed to get CBC network element info: %w", err)
}
url := fmt.Sprintf("http://%s:%d/api/v1/cbe/message", cbcIP, cbcPort)
// 直接发送message_json内容
req, err := http.NewRequest("POST", url, bytes.NewReader(msg.MessageJson))
if err != nil {
return fmt.Errorf("failed to create POST request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
client := &http.Client{Timeout: 10 * time.Second}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("failed to send POST request: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return fmt.Errorf("POST request failed with status: %d", resp.StatusCode)
}
return nil
}
// sendDeactivateRequest 发送停用请求 (DELETE)
func (s *CBMessageService) sendDeactivateRequest(msg CBMessage) error {
// 解析message_json获取需要的字段
var messageData map[string]interface{}
if err := json.Unmarshal(msg.MessageJson, &messageData); err != nil {
return fmt.Errorf("failed to parse message_json: %w", err)
}
// 提取eventName (从messageJson中获取如果没有eventName字段可能需要从其他地方获取)
eventName, ok := messageData["eventName"].(string)
if !ok {
// 如果messageJson中没有eventName尝试从messageIdProfile中获取
if messageIdProfile, exists := messageData["messageIdProfile"]; exists {
if profile, ok := messageIdProfile.(map[string]interface{}); ok {
if msgId, exists := profile["messageId"]; exists {
eventName = fmt.Sprintf("message_%v", msgId)
}
}
}
if eventName == "" {
return fmt.Errorf("eventName not found in message_json")
}
}
// 构造删除请求的payload只包含messageIdProfile和warningAreaList
deletePayload := make(map[string]interface{})
if messageIdProfile, exists := messageData["messageIdProfile"]; exists {
deletePayload["messageIdProfile"] = messageIdProfile
}
if warningAreaList, exists := messageData["warningAreaList"]; exists {
deletePayload["warningAreaList"] = warningAreaList
}
payloadBytes, err := json.Marshal(deletePayload)
if err != nil {
return fmt.Errorf("failed to marshal delete payload: %w", err)
}
// 获取CBC网元信息
cbcIP, cbcPort, err := s.getCBCNetworkElement(msg.NeId)
if err != nil {
return fmt.Errorf("failed to get CBC network element info: %w", err)
}
url := fmt.Sprintf("http://%s:%d/api/v1/cbe/message/%s", cbcIP, cbcPort, eventName)
req, err := http.NewRequest("DELETE", url, bytes.NewReader(payloadBytes))
if err != nil {
return fmt.Errorf("failed to create DELETE request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
client := &http.Client{Timeout: 10 * time.Second}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("failed to send DELETE request: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return fmt.Errorf("DELETE request failed with status: %d", resp.StatusCode)
}
return nil
}
// getCBCNetworkElement 获取CBC网元的IP和端口信息
// 这个方法需要根据你的实际网元管理系统来实现
func (s *CBMessageService) getCBCNetworkElement(neId string) (string, int64, error) {
// 查询网元信息
neInfo := neService.NewNeInfo.SelectNeInfoByNeTypeAndNeID("CBC", neId)
if neInfo.IP == "" {
return "", 0, fmt.Errorf("CBC network element not found for neId: %s", neId)
}
return neInfo.IP, neInfo.Port, nil
}