diff --git a/database/install/cb_message.sql b/database/install/cb_message.sql new file mode 100755 index 00000000..93ba402f --- /dev/null +++ b/database/install/cb_message.sql @@ -0,0 +1,37 @@ +/* + Navicat Premium Data Transfer + + Source Server : omc@192.168.2.223-psap + Source Server Type : MariaDB + Source Server Version : 100622 (10.6.22-MariaDB-0ubuntu0.22.04.1) + Source Host : 192.168.2.223:33066 + Source Schema : psap_db + + Target Server Type : MariaDB + Target Server Version : 100622 (10.6.22-MariaDB-0ubuntu0.22.04.1) + File Encoding : 65001 + + Date: 30/06/2025 16:38:58 +*/ + +SET NAMES utf8mb4; +SET FOREIGN_KEY_CHECKS = 0; + +-- ---------------------------- +-- Table structure for cb_message +-- ---------------------------- +DROP TABLE IF EXISTS `cb_message`; +CREATE TABLE `cb_message` ( + `id` bigint(20) NOT NULL AUTO_INCREMENT, + `ne_type` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL, + `ne_id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL, + `message_json` varchar(2048) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL, + `status` enum('ACTIVE','INACTIVE') CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT 'INACTIVE', + `created_at` bigint(20) NULL DEFAULT current_timestamp(), + `updated_at` bigint(20) NULL DEFAULT NULL, + PRIMARY KEY (`id`) USING BTREE, + INDEX `id`(`id`) USING BTREE, + INDEX `idx_ne_time`(`ne_type`, `ne_id`, `created_at`) USING BTREE +) ENGINE = InnoDB AUTO_INCREMENT = 11 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = 'CDR事件_MF' ROW_FORMAT = Dynamic; + +SET FOREIGN_KEY_CHECKS = 1; diff --git a/features/ue/api.go b/features/ue/api.go index f2531021..a7e98844 100644 --- a/features/ue/api.go +++ b/features/ue/api.go @@ -3,6 +3,7 @@ package ue import ( + "be.ems/features/ue/cb_message" "be.ems/features/ue/mf_callback_ticket" "be.ems/features/ue/mf_calling" "be.ems/lib/log" @@ -13,8 +14,10 @@ func InitSubServiceRoute(r *gin.Engine) { log.Info("======init UE management group gin.Engine") mfGroup := r.Group("/psap/v1/mf") + cbeGroup := r.Group("/psap/v1/cbc") mf_calling.Register(mfGroup) mf_callback_ticket.Register(mfGroup) + cb_message.Register(cbeGroup) // register other sub modules routes } diff --git a/features/ue/cb_message/controller.go b/features/ue/cb_message/controller.go new file mode 100644 index 00000000..0a6b6258 --- /dev/null +++ b/features/ue/cb_message/controller.go @@ -0,0 +1,267 @@ +package cb_message + +import ( + "encoding/json" + "io" + "strconv" + "time" + + "be.ems/src/framework/i18n" + "be.ems/src/framework/middleware" + "be.ems/src/framework/utils/ctx" + "be.ems/src/framework/vo/result" + "github.com/gin-gonic/gin" +) + +// @Description Register Routes for mf calling +func Register(r *gin.RouterGroup) { + + cbGroup := r.Group("/:neId") + { + var m *CBMessage + cbGroup.GET("/message/list", + middleware.PreAuthorize(nil), + m.List, + ) + cbGroup.GET("/message/:id", + middleware.PreAuthorize(nil), + m.GetById, + ) + cbGroup.POST("/message", + middleware.PreAuthorize(nil), + m.Insert, + ) + cbGroup.PUT("/message/:id", + middleware.PreAuthorize(nil), + m.UpdateStatus, + ) + cbGroup.DELETE("/message/:id", + middleware.PreAuthorize(nil), + m.Delete, + ) + } +} + +func (m *CBMessage) List(c *gin.Context) { + language := ctx.AcceptLanguage(c) + neId := c.Param("neId") + if neId == "" { + c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + return + } + var query CBMessageQuery + if err := c.ShouldBindQuery(&query); err != nil { + c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + return + } + query.NeType = "CBC" + query.NeId = neId + + service := NewCBMessageService() + data, total, err := service.SelectCBMessage(query) + if err != nil { + c.JSON(500, result.ErrMsg(err.Error())) + return + } + + // 转换数据格式,确保 MessageJson 正确序列化 + processedData := make([]map[string]interface{}, len(data)) + for i, msg := range data { + var messageJson interface{} + if len(msg.MessageJson) > 0 { + // 尝试解析为 JSON 对象 + if err := json.Unmarshal(msg.MessageJson, &messageJson); err != nil { + // 如果解析失败,就作为字符串 + messageJson = string(msg.MessageJson) + } + } + + processedData[i] = map[string]interface{}{ + "id": msg.Id, + "neType": msg.NeType, + "neId": msg.NeId, + "messageJson": messageJson, // 这里是解析后的 JSON 对象 + "status": msg.Status.Enum(), + "createdAt": msg.CreatedAt, + "updatedAt": msg.UpdatedAt, + } + } + c.JSON(200, result.Ok(gin.H{ + "total": total, + "data": processedData, + })) +} + +// Update 更新CB消息 +func (m *CBMessage) Insert(c *gin.Context) { + language := ctx.AcceptLanguage(c) + // 绑定请求体 + var msg CBMessage + msg.NeType = "CBC" + msg.NeId = c.Param("neId") + msg.Status = CBEventStatusInactive // 默认状态为 INACTIVE + if msg.NeId == "" { + c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + return + } + msg.CreatedAt = time.Now().Local().UnixMicro() + msg.UpdatedAt = nil // 新增时更新时间为nil + + // 直接读取body为json.RawMessage + body, err := io.ReadAll(c.Request.Body) + if err != nil { + c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + return + } + msg.MessageJson = json.RawMessage(body) + + service := NewCBMessageService() + if err := service.InsertCBMessage(msg); err != nil { + c.JSON(500, result.ErrMsg(err.Error())) + return + } + + c.JSON(200, result.Ok(nil)) +} + +// Update 更新CB消息 +func (m *CBMessage) Update(c *gin.Context) { + language := ctx.AcceptLanguage(c) + + // 绑定请求体 + var msg CBMessage + if err := c.ShouldBindJSON(&msg); err != nil { + c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + return + } + + service := NewCBMessageService() + if err := service.UpdateCBMessage(msg); err != nil { + c.JSON(500, result.ErrMsg(err.Error())) + return + } + + c.JSON(200, result.Ok(nil)) +} + +// UpdateStatus 更新CB消息状态 +// 这里的 neId 参数是为了兼容性,实际更新状态时不需要使用它 +// 但为了保持与原有接口一致,仍然保留该参数 +// 如果需要根据 neId 进行特定的逻辑处理,可以在服务层实现 +// 但在当前实现中,neId 仅用于验证请求的有效性 +// 实际的状态更新逻辑不依赖于 neId +// 该接口用于更新 CB 消息的状态,状态值通过查询参数传递 +// 例如:PUT /:neId/message/status?status=ACTIVE +func (m *CBMessage) UpdateStatus(c *gin.Context) { + language := ctx.AcceptLanguage(c) + + neId := c.Param("neId") + status := c.Query("status") + if neId == "" || status == "" { + c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + return + } + + messageId := c.Param("id") + if messageId != "" { + id, err := strconv.ParseInt(messageId, 10, 64) + if err != nil { + c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + return + } + // 如果提供了 messageId,则更新特定消息的状态 + service := NewCBMessageService() + if err := service.UpdateCBMessageStatus(id, status); err != nil { + c.JSON(500, result.ErrMsg(err.Error())) + return + } + c.JSON(200, result.Ok(nil)) + return + } + // 如果没有提供 messageId,则更新所有消息的状态 + service := NewCBMessageService() + if err := service.UpdateCBMessageStatusByNeId(neId, status); err != nil { + c.JSON(500, result.ErrMsg(err.Error())) + return + } + c.JSON(200, result.Ok(nil)) +} + +// Delete 删除CB消息 +func (m *CBMessage) Delete(c *gin.Context) { + language := ctx.AcceptLanguage(c) + + // 获取路径参数 + messageId := c.Param("id") + if messageId == "" { + c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + return + } + + id, err := strconv.ParseInt(messageId, 10, 64) + if err != nil { + c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + return + } + + service := NewCBMessageService() + if err := service.DeleteCBMessage(id); err != nil { + c.JSON(500, result.ErrMsg(err.Error())) + return + } + + c.JSON(200, result.Ok(nil)) +} + +// GetById 根据ID获取CB消息 +func (m *CBMessage) GetById(c *gin.Context) { + language := ctx.AcceptLanguage(c) + + // 获取路径参数 + idStr := c.Param("id") + if idStr == "" { + c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + return + } + + id, err := strconv.ParseInt(idStr, 10, 64) + if err != nil { + c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + return + } + + service := NewCBMessageService() + data, err := service.SelectCBMessageById(id) + if err != nil { + c.JSON(500, result.ErrMsg(err.Error())) + return + } + + if data == nil { + c.JSON(404, result.CodeMsg(404, i18n.TKey(language, "app.common.err404"))) + return + } + // 转换数据格式,确保 MessageJson 正确序列化 + var messageJson interface{} + if len(data.MessageJson) > 0 { + // 尝试解析为 JSON 对象 + if err := json.Unmarshal(data.MessageJson, &messageJson); err != nil { + // 如果解析失败,就作为字符串 + messageJson = string(data.MessageJson) + } + } + + processedData := map[string]interface{}{ + "id": data.Id, + "neType": data.NeType, + "neId": data.NeId, + "messageJson": messageJson, // 这里是解析后的 JSON 对象 + "status": data.Status.Enum(), + "createdAt": data.CreatedAt, + "updatedAt": data.UpdatedAt, + } + + c.JSON(200, result.Ok(gin.H{ + "data": processedData, + })) +} diff --git a/features/ue/cb_message/model.go b/features/ue/cb_message/model.go new file mode 100644 index 00000000..61ec73f8 --- /dev/null +++ b/features/ue/cb_message/model.go @@ -0,0 +1,111 @@ +package cb_message + +import ( + "database/sql/driver" + "encoding/json" + "errors" + "fmt" + "strconv" + "strings" +) + +type CBEventStatus int + +// CBEventStatus CB事件状态枚举 +const ( + CBEventStatusNull CBEventStatus = iota // 未知状态 + CBEventStatusActive + CBEventStatusInactive +) + +func (status CBEventStatus) Enum() string { + switch status { + case CBEventStatusNull: + return "NULL" + case CBEventStatusActive: + return "ACTIVE" + case CBEventStatusInactive: + return "INACTIVE" + default: + return "UNKNOWN" + } +} + +func (status CBEventStatus) String() string { + return fmt.Sprintf("%d", status) +} + +// ParseCBEventStatus 将字符串转换为 枚举类型 +func ParseCBEventStatus(s string) CBEventStatus { + if i, err := strconv.Atoi(s); err == nil { + return CBEventStatus(i) + } + // 如果转换失败,则按名称匹配(忽略大小写) + switch strings.ToUpper(s) { + case "NULL": + return CBEventStatusNull + case "ACTIVE": + return CBEventStatusActive + case "INACTIVE": + return CBEventStatusInactive + case "": + // 如果字符串为空,则返回未知状态 + return CBEventStatusNull + default: + // 默认返回未知状态 + return CBEventStatusNull + } +} + +// CBMessageQuery 查询条件结构体 +type CBMessageQuery struct { + NeType string `form:"neType"` // 网元类型 + NeId string `form:"neId"` // 网元ID + EventName string `form:"eventName"` // 事件名称 + Status string `form:"status"` // 消息状态 + StartTime string `form:"startTime"` // 创建时间范围-起始 + EndTime string `form:"endTime"` // 创建时间范围-结束 + PageNum int `form:"pageNum" binding:"required"` + PageSize int `form:"pageSize" binding:"required"` +} + +// @Description CBMessage CB消息 +type CBMessage struct { + Id int64 `json:"id" gorm:"column:id"` // CB消息ID + NeType string `json:"neType" gorm:"column:ne_type"` // 网元类型 + NeId string `json:"neId" gorm:"column:ne_id"` // 网元ID + MessageJson json.RawMessage `json:"messageJson" gorm:"column:message_json"` // 消息内容JSON + Status CBEventStatus `json:"status" gorm:"column:status"` // 消息状态 + CreatedAt int64 `json:"createdAt" gorm:"column:created_at"` // 创建时间 + UpdatedAt *int64 `json:"updatedAt" gorm:"column:updated_at;autoUpdateTime:false"` // 更新时间 +} + +// TableName 表名称 +func (*CBMessage) TableName() string { + return "cb_message" +} + +// Scan 实现 sql.Scanner 接口,支持从数据库字符串转为 CBEventStatus +func (s *CBEventStatus) Scan(value interface{}) error { + switch v := value.(type) { + case string: + *s = ParseCBEventStatus(v) + return nil + case []byte: + *s = ParseCBEventStatus(string(v)) + return nil + case int64: + *s = CBEventStatus(v) + return nil + case int: + *s = CBEventStatus(v) + return nil + default: + return errors.New("unsupported Scan type for CBEventStatus") + } +} + +// Value 实现 driver.Valuer 接口,支持将 CBEventStatus 存为字符串 +func (s CBEventStatus) Value() (driver.Value, error) { + return s.Enum(), nil +} diff --git a/features/ue/cb_message/service.go b/features/ue/cb_message/service.go new file mode 100644 index 00000000..5fabae55 --- /dev/null +++ b/features/ue/cb_message/service.go @@ -0,0 +1,441 @@ +package cb_message + +import ( + "bytes" + "encoding/json" + "fmt" + "net/http" + "time" + + "be.ems/lib/dborm" + timelib "be.ems/lib/time" + neService "be.ems/src/modules/network_element/service" + "gorm.io/gorm" +) + +type CBMessageService struct { + db *gorm.DB +} + +// 构造函数示例 +func NewCBMessageService() *CBMessageService { + db := dborm.DefaultDB() + return &CBMessageService{db: db} +} + +// SelectCBMessage 根据条件分页查询CB消息 +func (s *CBMessageService) SelectCBMessage(query CBMessageQuery) ([]CBMessage, int, error) { + var msg []CBMessage + var total int64 + + db := s.db.Table("cb_message") + + if query.NeType != "" { + db = db.Where("ne_type = ?", query.NeType) + } + if query.NeId != "" { + db = db.Where("ne_id = ?", query.NeId) + } + if query.EventName != "" { + db = db.Where("JSON_EXTRACT(message_json, '$.eventName') = ?", query.EventName) + } + if query.Status != "" { + db = db.Where("status = ?", query.Status) + } + + var startMicro, endMicro int64 + var err error + if query.StartTime != "" { + startMicro, err = timelib.ParseTimeToMicro(query.StartTime) + if err != nil { + return nil, 0, fmt.Errorf("invalid start time: %w", err) + } + } + if query.EndTime != "" { + endMicro, err = timelib.ParseTimeToMicro(query.EndTime) + if err != nil { + return nil, 0, fmt.Errorf("invalid end time: %w", err) + } + } + if startMicro > 0 && endMicro > 0 { + db = db.Where("created_at BETWEEN ? AND ?", startMicro, endMicro) + } else if startMicro > 0 { + db = db.Where("created_at >= ?", startMicro) + } else if endMicro > 0 { + db = db.Where("created_at <= ?", endMicro) + } + + // 统计总数 + if err := db.Count(&total).Error; err != nil { + return nil, 0, fmt.Errorf("failed to count CB message: %w", err) + } + + // 分页查询 + offset := (query.PageNum - 1) * query.PageSize + if err := db.Limit(query.PageSize).Offset(offset).Order("created_at desc").Find(&msg).Error; err != nil { + return nil, 0, fmt.Errorf("failed to select CB message: %w", err) + } + + return msg, int(total), nil +} + +// SelectCBMessageByPage 分页查询CB消息 +// @Description 分页查询CB消息 +// @param page 页码 +// @param pageSize 每页大小 +// @return []CBMessage CB消息列表 +// @return int 总记录数 +// @return error 错误信息 +// @example +// mfCBMessageService.SelectCBMessageByPage(1, 10) +func (s *CBMessageService) SelectCBMessageByPage(pageNum int, pageSize int) ([]CBMessage, int, error) { + var tickets []CBMessage + var total int64 + + // 统计总数 + if err := s.db.Table("cb_message").Count(&total).Error; err != nil { + return nil, 0, fmt.Errorf("failed to count CB message: %w", err) + } + + // 分页查询 + offset := (pageNum - 1) * pageSize + if err := s.db.Table("cb_message"). + Limit(pageSize). + Offset(offset). + Find(&tickets).Error; err != nil { + return nil, 0, fmt.Errorf("failed to select CB message: %w", err) + } + + return tickets, int(total), nil +} + +// InsertCBMessage 插入CB消息 +// @Description 插入CB消息 +// @param msg CB消息对象 +// @return error 错误信息 +// @example +// CBMessageService.InsertCBMessage(msg) +func (s *CBMessageService) InsertCBMessage(msg CBMessage) error { + // 这里可以使用ORM或其他方式将ticket插入到数据库中 + if err := s.db.Table("cb_message").Create(&msg).Error; err != nil { + return fmt.Errorf("failed to insert CB message: %w", err) + } + return nil +} + +// SelectCBMessageById 根据工单ID查询CB消息 +// @Description 根据工单ID查询CB消息 +// @param id 工单ID +// @return *CBMessage CB消息对象 +// @return error 错误信息 +// @example +// CBMessageService.SelectCBMessageById(12345) +func (s *CBMessageService) SelectCBMessageById(id int64) (*CBMessage, error) { + var msg CBMessage + if err := s.db.Table("cb_message"). + Where("id = ?", id). + First(&msg).Error; err != nil { + if err == gorm.ErrRecordNotFound { + return nil, fmt.Errorf("CB message with ID %d not found", id) + } + return nil, fmt.Errorf("failed to select CB message: %w", err) + } + return &msg, nil +} + +// UpdateCBMessage 更新CB消息 +// @Description 更新CB消息 +// @param msg CB消息对象 +// @return error 错误信息 +// @example +// mfCBMessageService.UpdateCBMessage(msg) +func (s *CBMessageService) UpdateCBMessage(msg CBMessage) error { + if err := s.db.Table("cb_message"). + Where("id = ?", msg.Id). + Updates(msg).Error; err != nil { + return fmt.Errorf("failed to update CB message: %w", err) + } + return nil +} + +// DeleteCBMessage 删除CB消息 +// @Description 删除CB消息 +// @param id 工单ID +// @return error 错误信息 +// @example +// mfCBMessageService.DeleteCBMessage(12345) +func (s *CBMessageService) DeleteCBMessage(id int64) error { + // 先查询记录状态 + var msg CBMessage + if err := s.db.Table("cb_message"). + Where("id = ?", id). + First(&msg).Error; err != nil { + if err == gorm.ErrRecordNotFound { + return fmt.Errorf("not found CB message (ID: %d)", id) + } + return fmt.Errorf("failed to query CB message: %w", err) + } + + // 检查状态是否为ACTIVE + if msg.Status == CBEventStatusActive { + return fmt.Errorf("cannot delete an active CB message (ID: %d)", id) + } + + // 执行删除操作 + if err := s.db.Table("cb_message"). + Where("id = ?", id). + Delete(&CBMessage{}).Error; err != nil { + return fmt.Errorf("failed to delete CB message: %w", err) + } + return nil +} + +// UpdateCBMessageStatus 更新CB消息状态 +// @Description 更新CB消息状态,并根据状态变化发送相应的HTTP请求 +// @param status 新状态 +// @return error 错误信息 +func (s *CBMessageService) UpdateCBMessageStatus(id int64, status string) error { + newStatus := ParseCBEventStatus(status) + + // 查询所有需要更新的记录 + var msg CBMessage + if err := s.db.Table("cb_message").Where("id = ?", id).First(&msg).Error; err != nil { + return fmt.Errorf("failed to query CB messages: %w", err) + } + + oldStatus := msg.Status + + // 检查状态是否发生变化 + if oldStatus == newStatus { + return fmt.Errorf("CB message status is already %s", newStatus.Enum()) + } + + // 更新数据库状态 + now := time.Now().UnixMicro() + if err := s.db.Table("cb_message"). + Where("id = ?", msg.Id). + Updates(map[string]interface{}{ + "status": newStatus, + "updated_at": now, + }).Error; err != nil { + return fmt.Errorf("failed to update CB message status: %w", err) + } + + // 根据状态变化发送HTTP请求 + if err := s.handleStatusChange(msg, oldStatus, newStatus); err != nil { + // 记录错误但不中断处理其他消息 + fmt.Printf("Failed to handle status change for message %d: %v\n", msg.Id, err) + } + + return nil +} + +// UpdateCBMessageStatus 更新CB消息状态 +// @Description 更新CB消息状态,并根据状态变化发送相应的HTTP请求 +// @param status 新状态 +// @return error 错误信息 +func (s *CBMessageService) UpdateCBMessageStatusByNeId(neId string, status string) error { + newStatus := ParseCBEventStatus(status) + + // 查询所有需要更新的记录 + var messages []CBMessage + if err := s.db.Table("cb_message").Where("ne_id = ?", neId).Find(&messages).Error; err != nil { + return fmt.Errorf("failed to query CB messages: %w", err) + } + + for _, msg := range messages { + oldStatus := msg.Status + + // 检查状态是否发生变化 + if oldStatus == newStatus { + continue // 状态没有变化,跳过 + } + + // 更新数据库状态 + now := time.Now().UnixMicro() + if err := s.db.Table("cb_message"). + Where("id = ?", msg.Id). + Updates(map[string]interface{}{ + "status": newStatus, + "updated_at": now, + }).Error; err != nil { + return fmt.Errorf("failed to update CB message status: %w", err) + } + + // 根据状态变化发送HTTP请求 + if err := s.handleStatusChange(msg, oldStatus, newStatus); err != nil { + // 记录错误但不中断处理其他消息 + fmt.Printf("Failed to handle status change for message %d: %v\n", msg.Id, err) + } + } + + return nil +} + +// UpdateCBMessageStatus 更新CB消息状态 +// @Description 更新CB消息状态,并根据状态变化发送相应的HTTP请求 +// @param status 新状态 +// @return error 错误信息 +func (s *CBMessageService) UpdateAllCBMessageStatus(status string) error { + newStatus := ParseCBEventStatus(status) + + // 查询所有需要更新的记录 + var messages []CBMessage + if err := s.db.Table("cb_message").Find(&messages).Error; err != nil { + return fmt.Errorf("failed to query CB messages: %w", err) + } + + for _, msg := range messages { + oldStatus := msg.Status + + // 检查状态是否发生变化 + if oldStatus == newStatus { + continue // 状态没有变化,跳过 + } + + // 更新数据库状态 + now := time.Now().UnixMicro() + if err := s.db.Table("cb_message"). + Where("id = ?", msg.Id). + Updates(map[string]interface{}{ + "status": newStatus, + "updated_at": now, + }).Error; err != nil { + return fmt.Errorf("failed to update CB message status: %w", err) + } + + // 根据状态变化发送HTTP请求 + if err := s.handleStatusChange(msg, oldStatus, newStatus); err != nil { + // 记录错误但不中断处理其他消息 + fmt.Printf("Failed to handle status change for message %d: %v\n", msg.Id, err) + } + } + + return nil +} + +// handleStatusChange 处理状态变化时的HTTP请求 +func (s *CBMessageService) handleStatusChange(msg CBMessage, oldStatus, newStatus CBEventStatus) error { + // 从NULL/INACTIVE状态修改为ACTIVE + if (oldStatus == CBEventStatusNull || oldStatus == CBEventStatusInactive) && + newStatus == CBEventStatusActive { + return s.sendActivateRequest(msg) + } + + // 从ACTIVE更改为INACTIVE状态 + if oldStatus == CBEventStatusActive && newStatus == CBEventStatusInactive { + return s.sendDeactivateRequest(msg) + } + + return nil +} + +// sendActivateRequest 发送激活请求 (POST) +func (s *CBMessageService) sendActivateRequest(msg CBMessage) error { + // 获取CBC网元信息 (这里需要根据你的实际情况获取IP和端口) + cbcIP, cbcPort, err := s.getCBCNetworkElement(msg.NeId) + if err != nil { + return fmt.Errorf("failed to get CBC network element info: %w", err) + } + + url := fmt.Sprintf("http://%s:%d/api/v1/cbe/message", cbcIP, cbcPort) + + // 直接发送message_json内容 + req, err := http.NewRequest("POST", url, bytes.NewReader(msg.MessageJson)) + if err != nil { + return fmt.Errorf("failed to create POST request: %w", err) + } + + req.Header.Set("Content-Type", "application/json") + + client := &http.Client{Timeout: 10 * time.Second} + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("failed to send POST request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return fmt.Errorf("POST request failed with status: %d", resp.StatusCode) + } + + return nil +} + +// sendDeactivateRequest 发送停用请求 (DELETE) +func (s *CBMessageService) sendDeactivateRequest(msg CBMessage) error { + // 解析message_json获取需要的字段 + var messageData map[string]interface{} + if err := json.Unmarshal(msg.MessageJson, &messageData); err != nil { + return fmt.Errorf("failed to parse message_json: %w", err) + } + + // 提取eventName (从messageJson中获取,如果没有eventName字段,可能需要从其他地方获取) + eventName, ok := messageData["eventName"].(string) + if !ok { + // 如果messageJson中没有eventName,尝试从messageIdProfile中获取 + if messageIdProfile, exists := messageData["messageIdProfile"]; exists { + if profile, ok := messageIdProfile.(map[string]interface{}); ok { + if msgId, exists := profile["messageId"]; exists { + eventName = fmt.Sprintf("message_%v", msgId) + } + } + } + if eventName == "" { + return fmt.Errorf("eventName not found in message_json") + } + } + + // 构造删除请求的payload,只包含messageIdProfile和warningAreaList + deletePayload := make(map[string]interface{}) + if messageIdProfile, exists := messageData["messageIdProfile"]; exists { + deletePayload["messageIdProfile"] = messageIdProfile + } + if warningAreaList, exists := messageData["warningAreaList"]; exists { + deletePayload["warningAreaList"] = warningAreaList + } + + payloadBytes, err := json.Marshal(deletePayload) + if err != nil { + return fmt.Errorf("failed to marshal delete payload: %w", err) + } + + // 获取CBC网元信息 + cbcIP, cbcPort, err := s.getCBCNetworkElement(msg.NeId) + if err != nil { + return fmt.Errorf("failed to get CBC network element info: %w", err) + } + + url := fmt.Sprintf("http://%s:%d/api/v1/cbe/message/%s", cbcIP, cbcPort, eventName) + + req, err := http.NewRequest("DELETE", url, bytes.NewReader(payloadBytes)) + if err != nil { + return fmt.Errorf("failed to create DELETE request: %w", err) + } + + req.Header.Set("Content-Type", "application/json") + + client := &http.Client{Timeout: 10 * time.Second} + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("failed to send DELETE request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return fmt.Errorf("DELETE request failed with status: %d", resp.StatusCode) + } + + return nil +} + +// getCBCNetworkElement 获取CBC网元的IP和端口信息 +// 这个方法需要根据你的实际网元管理系统来实现 +func (s *CBMessageService) getCBCNetworkElement(neId string) (string, int64, error) { + // 查询网元信息 + neInfo := neService.NewNeInfo.SelectNeInfoByNeTypeAndNeID("CBC", neId) + if neInfo.IP == "" { + return "", 0, fmt.Errorf("CBC network element not found for neId: %s", neId) + } + return neInfo.IP, neInfo.Port, nil +} diff --git a/lib/time/time.go b/lib/time/time.go new file mode 100644 index 00000000..da3a570d --- /dev/null +++ b/lib/time/time.go @@ -0,0 +1,15 @@ +package time + +import "time" + +// 假设 query.StartTime 和 query.EndTime 是 "2006-01-02 15:04:05" 格式字符串 +func ParseTimeToMicro(ts string) (int64, error) { + if ts == "" { + return 0, nil + } + t, err := time.ParseInLocation("2006-01-02 15:04:05", ts, time.Local) + if err != nil { + return 0, err + } + return t.UnixMicro(), nil +}