add: 星网北向模块

This commit is contained in:
TsMask
2024-03-18 11:31:50 +08:00
parent 828113c0b9
commit 5a28b0b861
19 changed files with 2000 additions and 0 deletions

View File

@@ -8,6 +8,7 @@ import (
"nms_nbi/src/framework/errorcatch"
"nms_nbi/src/framework/middleware"
"nms_nbi/src/framework/middleware/security"
apirestcxy "nms_nbi/src/modules/api_rest_cxy"
"nms_nbi/src/modules/chart"
"nms_nbi/src/modules/common"
"nms_nbi/src/modules/crontask"
@@ -133,4 +134,6 @@ func initModulesRoute(app *gin.Engine) {
crontask.Setup(app)
// 监控模块 - 含调度处理加入队列,放最后
monitor.Setup(app)
// 北向模块 - 中国星网
apirestcxy.Setup(app)
}

View File

@@ -0,0 +1,167 @@
package apirestcxy
import (
"nms_nbi/src/framework/logger"
"nms_nbi/src/framework/middleware"
"nms_nbi/src/modules/api_rest_cxy/controller"
"nms_nbi/src/modules/api_rest_cxy/utils/kafka"
"nms_nbi/src/modules/api_rest_cxy/utils/oss"
"github.com/gin-gonic/gin"
)
// 模块路由注册 - 中国星网网络创新研究院有限公司企业标准
func Setup(router *gin.Engine) {
logger.Infof("开始加载 ====> NMS-CXY 北向模块路由")
apiRest := router.Group("nms-cxy/api/rest")
// 启动时需要的初始参数
InitLoad()
// 安全管理
securityManagementGroup := apiRest.Group("/securityManagement")
{
// 鉴权登录接口
securityManagementGroup.POST("/:apiVersion/oauth/token",
controller.NewSecurity.OauthTokenLogin,
)
// 握手消息接口
securityManagementGroup.POST("/:apiVersion/oauth/handshake",
middleware.PreAuthorize(nil),
controller.NewSecurity.Handshake,
)
// 退出消息接口
securityManagementGroup.DELETE("/:apiVersion/oauth/token",
middleware.PreAuthorize(nil),
controller.NewSecurity.OauthTokenLogout,
)
}
// 系统管理
systemManagementGroup := apiRest.Group("/systemManagement")
{
// 软件版本查询接口
systemManagementGroup.GET("/:apiVersion/softwareVersion",
middleware.PreAuthorize(nil),
controller.NewSystem.SoftwareVersionInfo,
)
// 软件下载请求接口
systemManagementGroup.POST("/:apiVersion/softwareDownload",
middleware.PreAuthorize(nil),
controller.NewSystem.SoftwareDownloadCheck,
)
// 软件下载状态查询接口
systemManagementGroup.GET("/:apiVersion/softwareDownload",
middleware.PreAuthorize(nil),
controller.NewSystem.SoftwareDownloadState,
)
// 软件更新请求接口
systemManagementGroup.POST("/:apiVersion/softwareUpdate",
middleware.PreAuthorize(nil),
controller.NewSystem.SoftwareUpdateVersion,
)
// 软件更新状态查询接口
systemManagementGroup.GET("/:apiVersion/softwareUpdate",
middleware.PreAuthorize(nil),
controller.NewSystem.SoftwareUpdateState,
)
// 网元重启请求接口
systemManagementGroup.POST("/:apiVersion/reboot",
middleware.PreAuthorize(nil),
controller.NewSystem.RebootApp,
)
// 网元重启状态查询接口
systemManagementGroup.GET("/:apiVersion/reboot",
middleware.PreAuthorize(nil),
controller.NewSystem.RebootState,
)
// 恢复出厂配置请求接口
systemManagementGroup.POST("/:apiVersion/reinitiate",
middleware.PreAuthorize(nil),
controller.NewSystem.ReinitiateApp,
)
// 恢复出厂配置状态查询接口
systemManagementGroup.GET("/:apiVersion/reinitiate",
middleware.PreAuthorize(nil),
controller.NewSystem.ReinitiateState,
)
// 配置修改接口
systemManagementGroup.POST("/:apiVersion/config",
middleware.PreAuthorize(nil),
controller.NewSystem.ConfigApp,
)
// 配置查询接口
systemManagementGroup.GET("/:apiVersion/config",
middleware.PreAuthorize(nil),
controller.NewSystem.ConfigInfo,
)
// 告警文件同步请求接口
systemManagementGroup.POST("/:apiVersion/alarm",
middleware.PreAuthorize(nil),
controller.NewSystem.AlarmSysnc,
)
// 日志数据同步请求接口
systemManagementGroup.POST("/:apiVersion/log",
middleware.PreAuthorize(nil),
controller.NewSystem.LogSysnc,
)
// 业务诊断请求接口
systemManagementGroup.POST("/:apiVersion/diagnose",
middleware.PreAuthorize(nil),
controller.NewSystem.Diagnose,
)
// 业务连续性功能控制接口
systemManagementGroup.POST("/:apiVersion/servicecontinuity",
middleware.PreAuthorize(nil),
controller.NewSystem.ServiceContinuity,
)
}
// 测试管理
testManagementGroup := apiRest.Group("/testManagement")
{
// kafka发送消息
testManagementGroup.POST("/kafka/send",
middleware.PreAuthorize(nil),
controller.NewTest.KafkaSend,
)
// kafka读取并消费消息
testManagementGroup.POST("/kafka/readMark",
middleware.PreAuthorize(nil),
controller.NewTest.KafkaReadMark,
)
// 告警列表
testManagementGroup.POST("/alarm/list",
middleware.PreAuthorize(nil),
controller.NewTest.AlarmList,
)
// OSS文件上传
testManagementGroup.POST("/oss/upload",
middleware.PreAuthorize(nil),
controller.NewTest.OssUpload,
)
// OSS文件上传测试上报
testManagementGroup.POST("/oss/testUp",
middleware.PreAuthorize(nil),
controller.NewTest.OssTestUp,
)
}
}
// InitLoad 初始参数
func InitLoad() {
// 初始化连接Kafka
kafka.InitConfig()
// 初始化连接OSS
oss.InitConfig()
}

View File

@@ -0,0 +1,117 @@
package controller
import (
commonConstants "nms_nbi/src/framework/constants/common"
"nms_nbi/src/framework/constants/token"
"nms_nbi/src/framework/i18n"
"nms_nbi/src/framework/utils/ctx"
tokenUtils "nms_nbi/src/framework/utils/token"
"nms_nbi/src/framework/vo/result"
commonService "nms_nbi/src/modules/common/service"
systemService "nms_nbi/src/modules/system/service"
"github.com/gin-gonic/gin"
)
// 实例化控制层 SecurityController 结构体
var NewSecurity = &SecurityController{
accountService: commonService.NewAccountImpl,
sysLogLoginService: systemService.NewSysLogLoginImpl,
}
// 安全管理接口
//
// PATH /securityManagement
type SecurityController struct {
// 账号身份操作服务
accountService commonService.IAccount
// 系统登录访问
sysLogLoginService systemService.ISysLogLogin
}
// 鉴权登录接口
//
// POST /:apiVersion/oauth/token
func (s *SecurityController) OauthTokenLogin(c *gin.Context) {
var body struct {
GrantType string `json:"grantType" binding:"required,oneof=password"`
UserName string `json:"userName" binding:"required"`
Value string `json:"value" binding:"required"`
}
if err := c.ShouldBindJSON(&body); err != nil {
c.JSON(400, map[string]any{})
return
}
// 当前请求信息
ipaddr, location := ctx.IPAddrLocation(c)
os, browser := ctx.UaOsBrowser(c)
// 登录用户信息
loginUser, err := s.accountService.LoginByUsername(body.UserName, body.Value)
if err != nil {
c.JSON(400, map[string]any{})
return
}
// 生成令牌,创建系统访问记录
tokenStr := tokenUtils.Create(&loginUser, ipaddr, location, os, browser)
if tokenStr == "" {
c.JSON(400, map[string]any{})
return
} else {
s.accountService.UpdateLoginDateAndIP(&loginUser)
// 登录成功
s.sysLogLoginService.CreateSysLogLogin(
body.UserName, commonConstants.STATUS_YES, "app.common.loginSuccess",
ipaddr, location, os, browser,
)
}
expires := (loginUser.ExpireTime - loginUser.LoginTime) / 1000
c.JSON(200, map[string]any{
token.ACCESS_TOKEN: tokenStr,
"expires": expires,
})
}
// 握手消息接口
//
// POST /:apiVersion/oauth/handshake
func (s *SecurityController) Handshake(c *gin.Context) {
language := ctx.AcceptLanguage(c)
loginUser, err := ctx.LoginUser(c)
if err != nil {
c.JSON(401, result.CodeMsg(401, i18n.TKey(language, err.Error())))
return
}
c.JSON(200, map[string]any{
"userID": loginUser.UserID,
})
}
// 退出消息接口
//
// DELETE /:apiVersion/oauth/token
func (s *SecurityController) OauthTokenLogout(c *gin.Context) {
language := ctx.AcceptLanguage(c)
tokenStr := ctx.Authorization(c)
if tokenStr != "" {
// 存在token时记录退出信息
userName := tokenUtils.Remove(tokenStr)
if userName != "" {
// 当前请求信息
ipaddr, location := ctx.IPAddrLocation(c)
os, browser := ctx.UaOsBrowser(c)
// 创建系统访问记录 退出成功
s.sysLogLoginService.CreateSysLogLogin(
userName, commonConstants.STATUS_YES, "app.common.logoutSuccess",
ipaddr, location, os, browser,
)
}
}
c.JSON(204, result.OkMsg(i18n.TKey(language, "app.common.logoutSuccess")))
}

View File

@@ -0,0 +1,636 @@
package controller
import (
"fmt"
"nms_nbi/src/framework/config"
"nms_nbi/src/framework/i18n"
"nms_nbi/src/framework/utils/cmd"
"nms_nbi/src/framework/utils/ctx"
"nms_nbi/src/framework/utils/date"
"nms_nbi/src/framework/utils/ping"
"nms_nbi/src/framework/vo/result"
"nms_nbi/src/modules/api_rest_cxy/service"
"nms_nbi/src/modules/api_rest_cxy/utils/common"
neService "nms_nbi/src/modules/network_element/service"
"time"
"github.com/gin-gonic/gin"
)
// 实例化控制层 SystemController 结构体
var NewSystem = &SystemController{
neInfoService: neService.NewNeInfoImpl,
neVersionService: neService.NewNeVersionImpl,
neSoftwareService: neService.NewNeSoftwareImpl,
alarmService: service.NewAlarmImpl,
logService: service.NewLogImpl,
}
// 系统管理接口
//
// PATH /systemManagement
type SystemController struct {
// 网元信息服务
neInfoService neService.INeInfo
// 网元版本信息服务
neVersionService neService.INeVersion
// 网元软件包信息服务
neSoftwareService neService.INeSoftware
// 告警数据服务
alarmService service.IAlarm
// 日志数据服务
logService service.ILog
}
// 软件版本查询接口
//
// GET /:apiVersion/softwareVersion
func (s *SystemController) SoftwareVersionInfo(c *gin.Context) {
language := ctx.AcceptLanguage(c)
var body struct {
RequestId string `json:"requestId" binding:"required"`
Ruid string `json:"ruid" binding:"required"`
}
if err := c.ShouldBindJSON(&body); err != nil {
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
return
}
neInfo := s.neInfoService.SelectNeInfoByRmuid(body.Ruid)
if neInfo.RmUID != body.Ruid {
c.JSON(500, map[string]any{
"requestId": body.RequestId,
"errorInfo": "NE not exist",
})
return
}
neVersion := s.neVersionService.SelectByTypeAndID(neInfo.NeType, neInfo.NeId)
if neVersion.ID == "" {
c.JSON(200, map[string]any{
"requestId": body.RequestId,
"currentVersion": "-",
"backupVersionList": []string{"-"},
})
return
}
c.JSON(200, map[string]any{
"requestId": body.RequestId,
"currentVersion": neVersion.Version,
"backupVersionList": []string{neVersion.PreVersion},
})
}
// 软件下载请求接口
//
// POST /:apiVersion/softwareDownload
func (s *SystemController) SoftwareDownloadCheck(c *gin.Context) {
language := ctx.AcceptLanguage(c)
var body struct {
RequestId string `json:"requestId" binding:"required"`
SoftwareVersion string `json:"softwareVersion" binding:"required"`
DownloadPath string `json:"downloadPath" binding:"required"`
}
if err := c.ShouldBindJSON(&body); err != nil {
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
return
}
neSoftware := s.neSoftwareService.SelectByVersionAndPath(body.SoftwareVersion, body.DownloadPath)
if neSoftware.Version != body.SoftwareVersion {
c.JSON(500, map[string]any{
"requestId": body.RequestId,
"errorInfo": "NE not exist",
})
return
}
c.JSON(200, map[string]any{
"requestId": body.RequestId,
})
}
// 软件下载状态查询接口
//
// GET /:apiVersion/softwareDownload
func (s *SystemController) SoftwareDownloadState(c *gin.Context) {
language := ctx.AcceptLanguage(c)
var body struct {
RequestId string `json:"requestId" binding:"required"`
}
if err := c.ShouldBindJSON(&body); err != nil {
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
return
}
if len(body.RequestId) < 5 {
c.JSON(500, map[string]any{
"requestId": body.RequestId,
"errorInfo": "NE not exist",
})
return
}
c.JSON(200, map[string]any{
"requestId": body.RequestId,
"status": "fail",
"additionalInfo": "Download path not exist",
})
}
// 软件更新请求接口
//
// POST /:apiVersion/softwareUpdate
func (s *SystemController) SoftwareUpdateVersion(c *gin.Context) {
language := ctx.AcceptLanguage(c)
var body struct {
RequestId string `json:"requestId" binding:"required"`
Ruid string `json:"ruid" binding:"required"`
UpdateVersion string `json:"updateVersion" binding:"required"`
}
if err := c.ShouldBindJSON(&body); err != nil {
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
return
}
neInfo := s.neInfoService.SelectNeInfoByRmuid(body.Ruid)
if neInfo.RmUID != body.Ruid {
c.JSON(500, map[string]any{
"requestId": body.RequestId,
"errorInfo": "NE not exist",
})
return
}
neVersion := s.neVersionService.SelectByTypeAndID(neInfo.NeType, neInfo.NeId)
if neVersion.ID == "" {
c.JSON(200, map[string]any{
"requestId": body.RequestId,
"currentVersion": "-",
"backupVersionList": []string{"-"},
})
return
}
// 更新版本号
neVersion.Version = body.UpdateVersion
rows := s.neVersionService.Update(neVersion)
if rows > 0 {
c.JSON(200, map[string]any{
"requestId": body.RequestId,
})
return
}
c.JSON(500, map[string]any{
"requestId": body.RequestId,
"errorInfo": "Update Version Fail",
})
}
// 软件更新状态查询接口
//
// GET /:apiVersion/softwareUpdate
func (s *SystemController) SoftwareUpdateState(c *gin.Context) {
language := ctx.AcceptLanguage(c)
var body struct {
RequestId string `json:"requestId" binding:"required"`
Ruid string `json:"ruid" binding:"required"`
}
if err := c.ShouldBindJSON(&body); err != nil {
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
return
}
neInfo := s.neInfoService.SelectNeInfoByRmuid(body.Ruid)
if neInfo.RmUID != body.Ruid {
c.JSON(500, map[string]any{
"requestId": body.RequestId,
"errorInfo": "NE not exist",
})
return
}
c.JSON(200, map[string]any{
"requestId": body.RequestId,
"status": "fail",
"additionalInfo": "software not compatible",
})
}
// 网元重启请求接口
//
// POST /:apiVersion/reboot
func (s *SystemController) RebootApp(c *gin.Context) {
language := ctx.AcceptLanguage(c)
var body struct {
RequestId string `json:"requestId" binding:"required"`
Ruid string `json:"ruid" binding:"required"`
}
if err := c.ShouldBindJSON(&body); err != nil {
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
return
}
neInfo := s.neInfoService.SelectNeInfoByRmuid(body.Ruid)
if neInfo.RmUID != body.Ruid {
c.JSON(500, map[string]any{
"requestId": body.RequestId,
"errorInfo": "NE not exist",
})
return
}
_, err := cmd.ExecWithCheck("nohup", "sh", "-c", "sleep 2s && service restagent restart", "&")
if err != nil {
c.JSON(200, map[string]any{
"requestId": body.RequestId,
"status": "fail",
"additionalInfo": fmt.Sprintf("NE service reboot error %s", err.Error()),
})
return
}
c.JSON(200, map[string]any{
"requestId": body.RequestId,
})
}
// 网元重启状态查询接口
//
// GET /:apiVersion/reboot
func (s *SystemController) RebootState(c *gin.Context) {
language := ctx.AcceptLanguage(c)
var body struct {
RequestId string `json:"requestId" binding:"required"`
Ruid string `json:"ruid" binding:"required"`
}
if err := c.ShouldBindJSON(&body); err != nil {
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
return
}
neInfo := s.neInfoService.SelectNeInfoByRmuid(body.Ruid)
if neInfo.RmUID != body.Ruid {
c.JSON(500, map[string]any{
"requestId": body.RequestId,
"errorInfo": "NE not exist",
})
return
}
// 获取程序运行时间
runTime := time.Since(config.RunTime()).Abs().Seconds()
if runTime > 120 {
c.JSON(200, map[string]any{
"requestId": body.RequestId,
"status": "fail",
"additionalInfo": "NE not implement",
})
return
}
c.JSON(200, map[string]any{
"requestId": body.RequestId,
"status": "success",
"additionalInfo": "NE successfully restarted",
})
}
// 恢复出厂配置请求接口
//
// POST /:apiVersion/reinitiate
func (s *SystemController) ReinitiateApp(c *gin.Context) {
language := ctx.AcceptLanguage(c)
var body struct {
RequestId string `json:"requestId" binding:"required"`
Ruid string `json:"ruid" binding:"required"`
}
if err := c.ShouldBindJSON(&body); err != nil {
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
return
}
neInfo := s.neInfoService.SelectNeInfoByRmuid(body.Ruid)
if neInfo.RmUID != body.Ruid {
c.JSON(500, map[string]any{
"requestId": body.RequestId,
"errorInfo": "NE not exist",
})
return
}
c.JSON(200, map[string]any{
"requestId": body.RequestId,
})
}
// 恢复出厂配置状态查询接口
//
// GET /:apiVersion/reinitiate
func (s *SystemController) ReinitiateState(c *gin.Context) {
language := ctx.AcceptLanguage(c)
var body struct {
RequestId string `json:"requestId" binding:"required"`
Ruid string `json:"ruid" binding:"required"`
}
if err := c.ShouldBindJSON(&body); err != nil {
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
return
}
neInfo := s.neInfoService.SelectNeInfoByRmuid(body.Ruid)
if neInfo.RmUID != body.Ruid {
c.JSON(500, map[string]any{
"requestId": body.RequestId,
"errorInfo": "NE not exist",
})
return
}
c.JSON(200, map[string]any{
"requestId": body.RequestId,
"status": "fail",
"additionalInfo": "NE not implement",
})
}
// 配置修改接口
//
// POST /:apiVersion/config
func (s *SystemController) ConfigApp(c *gin.Context) {
language := ctx.AcceptLanguage(c)
var body struct {
RequestId string `json:"requestId" binding:"required"`
Ruid string `json:"ruid" binding:"required"`
OperType string `json:"operType" binding:"required"`
OperPara any `json:"operPara" binding:"required"`
}
if err := c.ShouldBindJSON(&body); err != nil {
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
return
}
neInfo := s.neInfoService.SelectNeInfoByRmuid(body.Ruid)
if neInfo.RmUID != body.Ruid {
c.JSON(500, map[string]any{
"requestId": body.RequestId,
"errorInfo": "NE not exist",
})
return
}
c.JSON(200, map[string]any{
"requestId": body.RequestId,
"status": "partialsuccess",
"failPara": []string{"PhyCellID", "TAC"},
})
}
// 配置查询接口
//
// GET /:apiVersion/config
func (s *SystemController) ConfigInfo(c *gin.Context) {
language := ctx.AcceptLanguage(c)
var body struct {
RequestId string `json:"requestId" binding:"required"`
Ruid string `json:"ruid" binding:"required"`
QueryPara []string `json:"queryPara" binding:"required"`
}
if err := c.ShouldBindJSON(&body); err != nil {
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
return
}
neInfo := s.neInfoService.SelectNeInfoByRmuid(body.Ruid)
if neInfo.RmUID != body.Ruid {
c.JSON(500, map[string]any{
"requestId": body.RequestId,
"errorInfo": "NE not exist",
})
return
}
c.JSON(200, map[string]any{
"requestId": body.RequestId,
"status": "partialsuccess",
"successPara": map[string]any{
"cellIdentity": "1234567890",
"PhyCellID": "12345",
"TAC": "12345",
},
})
}
// 告警文件同步请求接口
//
// 告警实时上报通过kafka推送到topic
//
// POST /:apiVersion/alarm
func (s *SystemController) AlarmSysnc(c *gin.Context) {
language := ctx.AcceptLanguage(c)
var body struct {
RequestId string `json:"requestId" binding:"required"`
Ruid string `json:"ruid" binding:"required"`
StartTime string `json:"startTime" binding:"required"`
EndTime string `json:"endTime" binding:"required"`
}
if err := c.ShouldBindJSON(&body); err != nil {
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
return
}
// 时间格式校验
startTime := date.ParseStrToDate(body.StartTime, date.YYYY_MM_DD_HH_MM_SS)
if startTime.IsZero() {
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
return
}
endTime := date.ParseStrToDate(body.EndTime, date.YYYY_MM_DD_HH_MM_SS)
if endTime.IsZero() {
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
return
}
// 查询网元是否存在
neInfo := s.neInfoService.SelectNeInfoByRmuid(body.Ruid)
if neInfo.RmUID != body.Ruid {
c.JSON(500, map[string]any{
"requestId": body.RequestId,
"errorInfo": "NE not exist",
})
return
}
sysncNum, filePath := s.alarmService.AlarmDataToFile(body.RequestId, neInfo.NeType, neInfo.RmUID, body.StartTime, body.EndTime)
if sysncNum > 0 {
common.UploadOSSByJSONToZip(filePath, neInfo.NeType, "FM")
}
c.JSON(200, map[string]any{
"requestId": body.RequestId,
"sysncNum": sysncNum,
})
}
// 日志数据同步请求接口
//
// POST /:apiVersion/log
func (s *SystemController) LogSysnc(c *gin.Context) {
language := ctx.AcceptLanguage(c)
var body struct {
RequestId string `json:"requestId" binding:"required"`
Ruid string `json:"ruid" binding:"required"`
LogType string `json:"logType"` // operate、security、other
StartTime string `json:"startTime" binding:"required"` // 2022-12-15 08:34:24
EndTime string `json:"endTime" binding:"required"`
}
if err := c.ShouldBindJSON(&body); err != nil {
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
return
}
// 时间格式校验
startTime := date.ParseStrToDate(body.StartTime, date.YYYY_MM_DD_HH_MM_SS)
if startTime.IsZero() {
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
return
}
endTime := date.ParseStrToDate(body.EndTime, date.YYYY_MM_DD_HH_MM_SS)
if endTime.IsZero() {
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
return
}
neInfo := s.neInfoService.SelectNeInfoByRmuid(body.Ruid)
if neInfo.RmUID != body.Ruid {
c.JSON(500, map[string]any{
"requestId": body.RequestId,
"errorInfo": "NE not exist",
})
return
}
if body.LogType == "operate" {
sysncNum, filePath := s.logService.OperateLogToFile(language, body.StartTime, body.EndTime)
if sysncNum > 0 {
common.UploadOSSByJSONToZip(filePath, neInfo.NeType, "LOG")
}
c.JSON(200, map[string]any{
"requestId": body.RequestId,
"sysncNum": sysncNum,
})
return
}
if body.LogType == "security" {
sysncNum, filePath := s.logService.SecurityLogToFile(language, body.StartTime, body.EndTime)
if sysncNum > 0 {
common.UploadOSSByJSONToZip(filePath, neInfo.NeType, "LOG")
}
c.JSON(200, map[string]any{
"requestId": body.RequestId,
"sysncNum": sysncNum,
})
return
}
c.JSON(500, map[string]any{
"requestId": body.RequestId,
"errorInfo": "logType not found",
})
}
// 业务诊断请求接口
//
// POST /:apiVersion/diagnose
func (s *SystemController) Diagnose(c *gin.Context) {
language := ctx.AcceptLanguage(c)
var body struct {
RequestId string `json:"requestId" binding:"required"`
Ruid string `json:"ruid" binding:"required"`
OperType string `json:"operType" binding:"required,oneof=ping trace"`
OperPara map[string]any `json:"operPara" binding:"required"`
}
if err := c.ShouldBindJSON(&body); err != nil {
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
return
}
operPara := body.OperPara
// 必填目的 IP 地址(字符串类型,必填)
desAddr, desAddrOk := operPara["desAddr"]
if !desAddrOk || desAddr == nil {
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
return
}
// 端到端ping检测
if body.OperType == "ping" {
p := ping.Ping{}
p.CopyFrom(operPara)
info, err := p.StatsInfo()
if err != nil {
c.JSON(500, map[string]any{
"requestId": body.RequestId,
"errorInfo": fmt.Sprintf("ping fail %s", err.Error()),
})
return
}
c.JSON(200, map[string]any{
"requestId": body.RequestId,
"operResult": info,
})
return
}
// 端到端Trace检测
if body.OperType == "trace" {
p := ping.Ping{}
p.CopyFrom(operPara)
info, err := p.StatsRtt()
if err != nil {
c.JSON(500, map[string]any{
"requestId": body.RequestId,
"errorInfo": fmt.Sprintf("ping fail %s", err.Error()),
})
return
}
c.JSON(200, map[string]any{
"requestId": body.RequestId,
"operResult": info,
})
return
}
c.JSON(500, map[string]any{
"requestId": body.RequestId,
"errorInfo": "operType not found",
})
}
// 业务连续性功能控制接口
//
// POST /:apiVersion/servicecontinuity
func (s *SystemController) ServiceContinuity(c *gin.Context) {
language := ctx.AcceptLanguage(c)
var body struct {
RequestId string `json:"requestId" binding:"required"`
OperType int `json:"operType" binding:"required"`
}
if err := c.ShouldBindJSON(&body); err != nil {
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
return
}
if body.OperType > 1 {
c.JSON(500, map[string]any{
"requestId": body.RequestId,
"errorInfo": "operType not support",
})
return
}
c.JSON(200, map[string]any{
"requestId": body.RequestId,
})
}

View File

@@ -0,0 +1,223 @@
package controller
import (
"encoding/json"
"nms_nbi/src/framework/i18n"
"nms_nbi/src/framework/utils/ctx"
"nms_nbi/src/framework/vo/result"
"nms_nbi/src/modules/api_rest_cxy/service"
"nms_nbi/src/modules/api_rest_cxy/utils/kafka"
"nms_nbi/src/modules/api_rest_cxy/utils/oss"
neDataModel "nms_nbi/src/modules/network_data/model"
neService "nms_nbi/src/modules/network_element/service"
"github.com/gin-gonic/gin"
)
// 实例化控制层 TestController 结构体
var NewTest = &TestController{
neInfoService: neService.NewNeInfoImpl,
alarmService: service.NewAlarmImpl,
configService: service.NewConfigImpl,
performanceService: service.NewPerformanceImpl,
}
// 测试接口
//
// PATH /testManagement
type TestController struct {
// 网元信息服务
neInfoService neService.INeInfo
// 告警数据服务
alarmService service.IAlarm
// 配置数据处理服务
configService service.IConfig
// 性能数据处理服务
performanceService service.IPerformance
}
// kafka发送消息
//
// POST /kafka/send
func (s *TestController) KafkaSend(c *gin.Context) {
language := ctx.AcceptLanguage(c)
var body struct {
RequestId string `json:"requestId" binding:"required"`
Topic string `json:"topic" binding:"required"`
Partition int `json:"partition"`
Msg any `json:"msg"`
}
if err := c.ShouldBindJSON(&body); err != nil {
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
return
}
bytes, err := json.Marshal(body.Msg)
if err != nil {
c.JSON(400, result.CodeMsg(400, err.Error()))
return
}
// 发送消息
partition, offset, err := kafka.KInitConm.MessageSyncSend(body.Topic, int32(body.Partition), string(bytes))
if err != nil {
c.JSON(200, map[string]any{
"requestId": body.RequestId,
"msg": body.Msg,
"error": err.Error(),
})
return
}
c.JSON(200, map[string]any{
"requestId": body.RequestId,
"partition": partition,
"offset": offset,
})
}
// kafka读取并消费消息
//
// POST /kafka/readMark
func (s *TestController) KafkaReadMark(c *gin.Context) {
language := ctx.AcceptLanguage(c)
var body struct {
RequestId string `json:"requestId" binding:"required"`
Topic string `json:"topic" binding:"required"`
Partition int `json:"partition"`
Group string `json:"group" binding:"required"`
}
if err := c.ShouldBindJSON(&body); err != nil {
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
return
}
// 消费消息
msg, err := kafka.KInitConm.MessageSyncReadMark(body.Topic, int32(body.Partition), body.Group)
if err != nil {
c.JSON(200, map[string]any{
"requestId": body.RequestId,
"error": err.Error(),
})
return
}
c.JSON(200, map[string]any{
"requestId": body.RequestId,
"msg": msg,
})
}
// 告警列表
//
// POST /alarm/list
func (s *TestController) AlarmList(c *gin.Context) {
language := ctx.AcceptLanguage(c)
var body struct {
RequestId string `json:"requestId" binding:"required"`
Ruid string `json:"ruid" binding:"required"`
AlarmLevel string `json:"alarmLevel" form:"alarmLevel"` // 告警类型 1: Critical, 2: Major, 3: Minor, 4: Warning, 5: Event(Only VNF)
StartTime string `json:"startTime" form:"startTime"`
EndTime string `json:"endTime" form:"endTime"`
SortField string `json:"sortField" form:"sortField" binding:"omitempty,oneof=timestamp"` // 排序字段,填写结果字段
SortOrder string `json:"sortOrder" form:"sortOrder" binding:"omitempty,oneof=asc desc"` // 排序升降序asc desc
PageNum int64 `json:"pageNum" form:"pageNum" binding:"required"`
PageSize int64 `json:"pageSize" form:"pageSize" binding:"required"`
}
if err := c.ShouldBindJSON(&body); err != nil {
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
return
}
neInfo := s.neInfoService.SelectNeInfoByRmuid(body.Ruid)
if neInfo.RmUID != body.Ruid {
c.JSON(500, map[string]any{
"requestId": body.RequestId,
"errorInfo": "NE not exist",
})
return
}
rows, total := s.alarmService.SelectPage(neDataModel.AlarmQuery{
PageNum: body.PageNum,
PageSize: body.PageSize,
SortField: body.SortField,
SortOrder: body.SortOrder,
StartTime: body.StartTime,
EndTime: body.EndTime,
RmUID: neInfo.RmUID,
NeType: neInfo.NeType,
OrigSeverity: body.AlarmLevel,
})
c.JSON(200, map[string]any{
"requestId": body.RequestId,
"rows": rows,
"total": total,
})
}
// OSS文件上传
//
// POST /oss/upload
func (s *TestController) OssUpload(c *gin.Context) {
language := ctx.AcceptLanguage(c)
var body struct {
RequestId string `json:"requestId" binding:"required"`
FilePath string `json:"filePath" binding:"required"`
NewFilePath string `json:"newFilePath" binding:"required"`
}
if err := c.ShouldBindJSON(&body); err != nil {
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
return
}
// 消费消息
msg, err := oss.FileUploadFileByFilePath(body.FilePath, body.NewFilePath)
if err != nil {
c.JSON(200, map[string]any{
"requestId": body.RequestId,
"error": err.Error(),
})
return
}
c.JSON(200, map[string]any{
"requestId": body.RequestId,
"msg": msg,
})
}
// OSS文件上传测试上报
//
// POST /oss/testUp
func (s *TestController) OssTestUp(c *gin.Context) {
language := ctx.AcceptLanguage(c)
var body struct {
RequestId string `json:"requestId" binding:"required"`
OperType string `json:"operType" binding:"required"`
}
if err := c.ShouldBindJSON(&body); err != nil {
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
return
}
if body.OperType == "config" {
err := s.configService.ConfigUploadOSS("OMC")
c.JSON(200, map[string]any{
"requestId": body.RequestId,
"operType": body.OperType,
"err": err,
})
return
}
if body.OperType == "performance" {
err := s.performanceService.PerformanceUploadOSS("AMF")
c.JSON(200, map[string]any{
"requestId": body.RequestId,
"operType": body.OperType,
"err": err,
})
return
}
c.JSON(200, map[string]any{
"requestId": body.RequestId,
"operType": body.OperType,
})
}

View File

@@ -0,0 +1,19 @@
package model
// 告警数据结构体
type Alarm struct {
AlarmSeq string `json:"alarmSeq"` // 告警原始序号
AlarmTitle string `json:"alarmTitle"` // 告警标题
AlarmStatus int64 `json:"alarmStatus"` // 告警状态1活动告警0清除告警
AlarmType string `json:"alarmType"` // 告警类型
AlarmLevel int64 `json:"alarmLevel"` // 原始告警级别1一级告警2二级告警3三级告警4四级告警
EventTime string `json:"eventTime"` // 事件发生时间
AlarmId int64 `json:"alarmId"` // 告警事件Id
CauseID string `json:"causeID"` // 告警问题原因ID
Cause string `json:"cause"` // 告警问题原因
NeRUID string `json:"neRUID"` // 告警网元RUID
NeUserLabel string `json:"neUserLabel"` // 告警网元名称
ObjectRUID string `json:"objectRUID"` // 告警对象RUID
ObjectUserLabel string `json:"objectUserLabel"` // 告警对象名称
AddInfo string `json:"addInfo"` // 告警辅助信息
}

View File

@@ -0,0 +1,18 @@
package service
import (
"nms_nbi/src/modules/api_rest_cxy/model"
neDataModel "nms_nbi/src/modules/network_data/model"
)
// 告警数据处理服务 服务层接口
type IAlarm interface {
// SelectPage 根据条件分页查询
SelectPage(querys neDataModel.AlarmQuery) ([]model.Alarm, int64)
// KafkaPush 推送数据Kafka
KafkaPush(alarm neDataModel.Alarm) error
// AlarmDataToFile 告警数据写入文件
AlarmDataToFile(requestId, neType, rmUID, startTime, endTime string) (int64, string)
}

View File

@@ -0,0 +1,164 @@
package service
import (
"encoding/json"
"fmt"
"nms_nbi/src/framework/constants/uploadsubpath"
"nms_nbi/src/framework/logger"
"nms_nbi/src/framework/utils/date"
"nms_nbi/src/framework/utils/file"
"nms_nbi/src/framework/utils/parse"
"nms_nbi/src/modules/api_rest_cxy/model"
"nms_nbi/src/modules/api_rest_cxy/utils/common"
"nms_nbi/src/modules/api_rest_cxy/utils/kafka"
neDataModel "nms_nbi/src/modules/network_data/model"
neDataService "nms_nbi/src/modules/network_data/service"
)
// 实例化数据层 AlarmImpl 结构体
var NewAlarmImpl = &AlarmImpl{
alarmService: neDataService.NewAlarmImpl,
alarmLevel: map[string]int64{
"Critical": 1,
"Major": 2,
"Minor": 3,
"Warning": 4,
},
}
// 告警数据处理服务 服务层处理
type AlarmImpl struct {
// 告警信息服务
alarmService neDataService.IAlarm
// 原始告警级别 1一级告警2二级告警3三级告警4四级告警
alarmLevel map[string]int64
}
// SelectPage 根据条件分页查询
func (s *AlarmImpl) SelectPage(querys neDataModel.AlarmQuery) ([]model.Alarm, int64) {
dataArr := []model.Alarm{}
// 查询数据
pageData := s.alarmService.SelectPage(querys)
total := parse.Number(pageData["total"])
rows, ok := pageData["rows"].([]neDataModel.Alarm)
if !ok || len(rows) <= 0 {
return dataArr, total
}
for _, v := range rows {
item := s.parseAlarmData(v)
dataArr = append(dataArr, item)
}
return dataArr, total
}
// parseAlarmData 解析转换为指定的告警对象
func (s *AlarmImpl) parseAlarmData(v neDataModel.Alarm) model.Alarm {
alarmLevel, ok := s.alarmLevel[v.OrigSeverity]
if !ok {
alarmLevel = 0
}
return model.Alarm{
AlarmSeq: v.AlarmSeq,
AlarmTitle: v.AlarmTitle,
AlarmStatus: parse.Number(v.AlarmStatus),
AlarmType: v.AlarmType,
AlarmLevel: alarmLevel,
EventTime: date.ParseDateToStr(v.EventTime, date.YYYY_MM_DD_HH_MM_SS),
AlarmId: parse.Number(v.ID),
CauseID: v.AlarmCode,
Cause: v.SpecificProblem,
NeRUID: v.ObjectUid,
NeUserLabel: v.NeName,
ObjectRUID: v.AlarmId,
ObjectUserLabel: v.ObjectName,
AddInfo: v.AddInfo,
}
}
// KafkaPush 推送数据Kafka
func (s *AlarmImpl) KafkaPush(alarm neDataModel.Alarm) error {
if alarm.AlarmTitle == "" {
return nil
}
// 数据序列化
data := s.parseAlarmData(alarm)
bytes, err := json.Marshal(data)
if err != nil {
logger.Errorf("KafkaPush parseAlarmData err => %s", err.Error())
return err
}
// 订阅topic名称为专业编码-厂商编码-OMC编号-数据类别固定为FM
basePath := common.BasePath("-")
topic := fmt.Sprintf("%s-FM", basePath)
// 发送消息
partition, offset, err := kafka.KInitConm.MessageSyncSend(topic, 0, string(bytes))
if err != nil {
logger.Errorf("KafkaPush MessageSyncSend err => %s", err.Error())
return err
}
logger.Infof("KafkaPush MessageSyncSend Partition:%d, Offset:%d", partition, offset)
return err
}
// AlarmDataToFile 告警数据写入文件
func (s *AlarmImpl) AlarmDataToFile(requestId, neType, rmUID, startTime, endTime string) (int64, string) {
var pageNum int64 = 1
var pageSize int64 = 20
dataArr := []any{}
// 查询数据
pageData := s.alarmService.SelectPage(neDataModel.AlarmQuery{
PageNum: pageNum,
PageSize: pageSize,
StartTime: startTime,
EndTime: endTime,
RmUID: rmUID,
NeType: neType,
})
total := parse.Number(pageData["total"])
rows, ok := pageData["rows"].([]neDataModel.Alarm)
if !ok || len(rows) <= 0 {
return int64(len(dataArr)), ""
}
for _, v := range rows {
dataArr = append(dataArr, s.parseAlarmData(v))
}
// 按总数分批遍历
batchNum := total / pageSize
for i := 1; i <= int(batchNum); i++ {
// 查询数据
pageData := s.alarmService.SelectPage(neDataModel.AlarmQuery{
PageNum: int64(i),
PageSize: pageSize,
StartTime: startTime,
EndTime: endTime,
RmUID: rmUID,
NeType: neType,
})
rows, ok := pageData["rows"].([]neDataModel.Alarm)
if !ok || len(rows) <= 0 {
return int64(len(dataArr)), ""
}
for _, v := range rows {
dataArr = append(dataArr, s.parseAlarmData(v))
}
}
// 文件名
dataSaveFileName := common.DataSaveFileName{
RequestId: requestId,
}
fileName := dataSaveFileName.JSON()
filePath := fmt.Sprintf("%s/%s", file.ParseUploadFileDir(uploadsubpath.EXPORT), fileName)
err := file.WriterFileJSONLine(dataArr, filePath)
if err != nil {
return int64(len(dataArr)), ""
}
return int64(len(dataArr)), filePath
}

View File

@@ -0,0 +1,7 @@
package service
// 配置数据处理服务 服务层接口
type IConfig interface {
// ConfigUploadOSS 配置数据上报
ConfigUploadOSS(neType string) error
}

View File

@@ -0,0 +1,32 @@
package service
import (
"fmt"
"nms_nbi/src/framework/constants/uploadsubpath"
"nms_nbi/src/framework/utils/file"
"nms_nbi/src/modules/api_rest_cxy/utils/common"
)
// 实例化数据层 ConfigImpl 结构体
var NewConfigImpl = &ConfigImpl{}
// 配置数据处理服务 服务层处理
type ConfigImpl struct{}
// ConfigUploadOSS 配置数据上报
func (s *ConfigImpl) ConfigUploadOSS(neType string) error {
// 配置文件目录
dir := "C:\\usr\\local\\omc\\upload\\export\\2023\\12"
// 文件名
dataSaveFileName := common.DataSaveFileName{}
fileName := dataSaveFileName.ZIP()
filePath := fmt.Sprintf("%s/%s", file.ParseUploadFileDir(uploadsubpath.EXPORT), fileName)
err := file.CompressZipByDir(filePath, dir)
if err != nil {
return err
}
return common.UploadOSSByZip(filePath, neType, "CM")
}

View File

@@ -0,0 +1,10 @@
package service
// 日志数据处理服务 服务层接口
type ILog interface {
// OperateLogToFile 操作日志写入文件
OperateLogToFile(language, startTime, endTime string) (int64, string)
// SecurityLogToFile 安全日志写入文件
SecurityLogToFile(language, startTime, endTime string) (int64, string)
}

View File

@@ -0,0 +1,154 @@
package service
import (
"fmt"
"nms_nbi/src/framework/constants/uploadsubpath"
"nms_nbi/src/framework/i18n"
"nms_nbi/src/framework/utils/file"
"nms_nbi/src/framework/utils/parse"
"nms_nbi/src/modules/api_rest_cxy/utils/common"
systemModel "nms_nbi/src/modules/system/model"
systemService "nms_nbi/src/modules/system/service"
)
// 实例化数据层 LogImpl 结构体
var NewLogImpl = &LogImpl{
sysLogOperateService: systemService.NewSysLogOperateImpl,
sysLogLoginService: systemService.NewSysLogLoginImpl,
}
// 日志数据处理服务 服务层处理
type LogImpl struct {
// 操作日志服务
sysLogOperateService systemService.ISysLogOperate
// 系统登录日志服务
sysLogLoginService systemService.ISysLogLogin
}
// OperateLogToFile 操作日志写入文件
func (s *LogImpl) OperateLogToFile(language, startTime, endTime string) (int64, string) {
var pageNum int64 = 1
var pageSize int64 = 20
dataArr := []any{}
// 查询数据
pageData := s.sysLogOperateService.SelectSysLogOperatePage(map[string]any{
"beginTime": startTime,
"endTime": endTime,
"pageNum": pageNum,
"pageSize": pageSize,
})
total := parse.Number(pageData["total"])
rows, ok := pageData["rows"].([]systemModel.SysLogOperate)
if !ok || len(rows) <= 0 {
return 0, ""
}
for _, v := range rows {
v.Title = i18n.TKey(language, v.Title)
v.OperLocation = i18n.TKey(language, v.OperLocation)
v.Method = "-"
dataArr = append(dataArr, v)
}
pageNum += 1
// 按总数分批遍历
batchNum := total / pageSize
for i := 1; i <= int(batchNum); i++ {
// 查询数据
pageData := s.sysLogOperateService.SelectSysLogOperatePage(map[string]any{
"beginTime": startTime,
"endTime": endTime,
"pageNum": pageNum,
"pageSize": pageSize,
})
rows, ok := pageData["rows"].([]systemModel.SysLogOperate)
if !ok || len(rows) <= 0 {
return int64(len(dataArr)), ""
}
for _, v := range rows {
v.Title = i18n.TKey(language, v.Title)
v.OperLocation = i18n.TKey(language, v.OperLocation)
v.Method = "-"
dataArr = append(dataArr, v)
}
pageNum += 1
}
// 文件名
dataSaveFileName := common.DataSaveFileName{
LogType: "operate",
}
fileName := dataSaveFileName.JSON()
filePath := fmt.Sprintf("%s/%s", file.ParseUploadFileDir(uploadsubpath.EXPORT), fileName)
err := file.WriterFileJSONLine(dataArr, filePath)
if err != nil {
return int64(len(dataArr)), ""
}
return int64(len(dataArr)), filePath
}
// SecurityLogToFile 安全日志写入文件
func (s *LogImpl) SecurityLogToFile(language, startTime, endTime string) (int64, string) {
var pageNum int64 = 1
var pageSize int64 = 20
dataArr := []any{}
// 查询数据
pageData := s.sysLogLoginService.SelectSysLogLoginPage(map[string]any{
"beginTime": startTime,
"endTime": endTime,
"pageNum": pageNum,
"pageSize": pageSize,
})
total := parse.Number(pageData["total"])
rows, ok := pageData["rows"].([]systemModel.SysLogLogin)
if !ok || len(rows) <= 0 {
return 0, ""
}
for _, v := range rows {
v.LoginLocation = i18n.TKey(language, v.LoginLocation)
v.OS = i18n.TKey(language, v.OS)
v.Browser = i18n.TKey(language, v.Browser)
v.Msg = i18n.TKey(language, v.Msg)
dataArr = append(dataArr, v)
}
pageNum += 1
// 按总数分批遍历
batchNum := total / pageSize
for i := 1; i <= int(batchNum); i++ {
// 查询数据
pageData := s.sysLogLoginService.SelectSysLogLoginPage(map[string]any{
"beginTime": startTime,
"endTime": endTime,
"pageNum": pageNum,
"pageSize": pageSize,
})
rows, ok := pageData["rows"].([]systemModel.SysLogLogin)
if !ok || len(rows) <= 0 {
return int64(len(dataArr)), ""
}
for _, v := range rows {
v.LoginLocation = i18n.TKey(language, v.LoginLocation)
v.OS = i18n.TKey(language, v.OS)
v.Browser = i18n.TKey(language, v.Browser)
v.Msg = i18n.TKey(language, v.Msg)
dataArr = append(dataArr, v)
}
pageNum += 1
}
// 文件名
dataSaveFileName := common.DataSaveFileName{
LogType: "security",
}
fileName := dataSaveFileName.JSON()
filePath := fmt.Sprintf("%s/%s", file.ParseUploadFileDir(uploadsubpath.EXPORT), fileName)
err := file.WriterFileJSONLine(dataArr, filePath)
if err != nil {
return int64(len(dataArr)), ""
}
return int64(len(dataArr)), filePath
}

View File

@@ -0,0 +1,7 @@
package service
// 性能数据处理服务 服务层接口
type IPerformance interface {
// PerformanceUploadOSS 性能数据上报
PerformanceUploadOSS(neType string) error
}

View File

@@ -0,0 +1,55 @@
package service
import (
"fmt"
"nms_nbi/src/framework/constants/uploadsubpath"
"nms_nbi/src/framework/utils/file"
"nms_nbi/src/modules/api_rest_cxy/utils/common"
neDataModel "nms_nbi/src/modules/network_data/model"
neDataService "nms_nbi/src/modules/network_data/service"
)
// 实例化数据层 PerformanceImpl 结构体
var NewPerformanceImpl = &PerformanceImpl{
perfKPIService: neDataService.NewPerfKPIImpl,
}
// 性能数据处理服务 服务层处理
type PerformanceImpl struct {
// 统计信息服务
perfKPIService neDataService.IPerfKPI
}
// PerformanceUploadOSS 性能数据上报
func (s *PerformanceImpl) PerformanceUploadOSS(neType string) error {
query := neDataModel.GoldKPIQuery{
NeType: neType,
StartTime: "2024-02-15 10:16:30",
EndTime: "2024-03-07 10:20:08",
Interval: 900,
}
data := s.perfKPIService.SelectGoldKPI(query)
dataArr := [][]string{}
for i, v := range data {
if i == 0 {
dataItem := []string{}
dataItem[0] = fmt.Sprint(v["AMF.25"])
dataArr = append(dataArr, dataItem)
}
dataItem := []string{}
dataItem[0] = fmt.Sprint(v["AMF.25"])
dataArr = append(dataArr, dataItem)
}
// 文件名
dataSaveFileName := common.DataSaveFileName{}
fileName := dataSaveFileName.CSV()
filePath := fmt.Sprintf("%s/%s", file.ParseUploadFileDir(uploadsubpath.EXPORT), fileName)
err := file.WriterFileCSV(dataArr, filePath)
if err != nil {
return err
}
return common.UploadOSSByZip(filePath, neType, "PM")
}

View File

@@ -0,0 +1,7 @@
package service
// 资源数据处理服务 服务层接口
type IResource interface {
// ResourceDataToFile 资源数据写入文件
ResourceDataToFile(requestId, neType, rmUID, startTime, endTime string) (int64, string)
}

View File

@@ -0,0 +1,13 @@
package service
// 实例化数据层 ResourceImpl 结构体
var NewResourceImpl = &ResourceImpl{}
// 资源数据处理服务 服务层处理
type ResourceImpl struct{}
// ResourceDataToFile 资源数据写入文件
func (s *ResourceImpl) ResourceDataToFile(requestId, neType, rmUID, startTime, endTime string) (int64, string) {
return 0, ""
}

View File

@@ -0,0 +1,153 @@
package common
import (
"fmt"
"nms_nbi/src/framework/config"
"nms_nbi/src/framework/utils/date"
"nms_nbi/src/framework/utils/file"
"nms_nbi/src/modules/api_rest_cxy/utils/oss"
"path/filepath"
"strings"
"time"
)
// DataSaveFileName 数据文件命名要求
//
// 资源数据文件、性能数据文件的命名规则如下:
// <资源对象类型简称>-<数据版本>-<数据时间>-<日志类型>-<同步请求标识>-<Ri>-<序列号>.<后缀>
// 告警同步数据文件、配置数据文件、日志数据文件的命名规则如下:
// <网元资源对象RUID中的“资源对象编号”>-<数据版本>-<数据时间>-<日志类型>-<同步请求标识>-<Ri>-<序列号>.<后缀>
//
// resCode 资源对象类型简称、网元资源对象RUID中的“资源对象编号”参考本文档附件1。
// version 数据版本:遵循数据模型规范版本,允许同一目录下存放不同版本的数据文件。
// dateStr 数据时间采用YYYYMMDDHH24MMSS格式。资源、告警、配置、日志数据为数据文件开始生成时间其中资源数据文件和配置数据文件周期性上报生成时间为每天的0点和12点性能数据为统计周期的起始时间周期性上报周期为15分钟。
// logType 日志类型可选只针对日志文件包括operate操作日志、security(安全日志)、other其他
// requestId 同步请求标识可选仅在告警文件同步时使用用于唯一标识一次同步请求。标识规则是“请求操作ID”。“请求操作ID”是同步请求消息中消息序号requestId
// Ri可选。当接口数据文件内容有误如数据缺失等OMC重新生成文件提供给NMS进行数据补采新文件增加“Ri”进行标识i从1开始每重新生成一次i加1。
// Num 序列号可选。当文件总量小于100MB(允许上下浮动10%)时应只形成一个文件当文件总量大于100MB(允许上下浮动10%)时要求进行文件分割即分割后的文件大小除最后一个均应介于90MB,110MB之间。分割后的文件增加序列号标识序列号为三位取值为001-999。在文件切分过程中不能把一条完整的记录切开放到两个文件中。
// suffix 后缀每个文件都进行压缩统一采用zip或gzip压缩压缩文件后缀是zip或gz。对于压缩前的JSON格式文件后缀为JSON。
//
// 文件名示例:
// 资源文件CLL-V1.0-20151227000000-001.zip
// 性能文件CLL-V1.0-20151227000000-001.zip
// 告警文件0000000001153492-V1.0-20150611011603-001.zip
// 日志文件0000000001153492-V1.0-20150611011603-operate.zip
// 配置文件0000000001153492-V1.0-20150611011600.zip
type DataSaveFileName struct {
ResCode string // 资源对象类型简称(资源数据文件、性能数据文件) => 网元类型 资源对象编号(告警同步数据文件、配置数据文件、日志数据文件) => 设备序列号
Version string // 数据版本 => 网元版本 固定1.0
DateStr string // 数据时间 采用YYYYMMDDHH24MMSS格式
LogType string // 可选 日志类型 只针对日志文件包括operate操作日志、security(安全日志)、other其他
RequestId string // 可选 同步请求标识 仅在告警文件同步时使用
Ri string // 可选 数据补采 当接口数据文件内容有误(如数据缺失等)时
Num string // 可选 序列号 当文件总量大于100MB进行分割之后的文件增加序列号标识序列号为三位取值为001-999
}
// join 将参数进行连接-分隔
func (s *DataSaveFileName) join() string {
nameArr := []string{}
if s.ResCode != "" {
nameArr = append(nameArr, s.ResCode)
} else {
serialNumber := config.Get("cxy.serialNumber").(string)
serialNumber = fmt.Sprintf("%0*s", 16, serialNumber)
nameArr = append(nameArr, serialNumber)
}
if s.Version != "" {
nameArr = append(nameArr, "V"+s.Version)
} else {
nameArr = append(nameArr, "V1.0")
}
if s.DateStr != "" {
nameArr = append(nameArr, s.DateStr)
} else {
nameArr = append(nameArr, date.ParseDateToStr(time.Now(), date.YYYYMMDDHHMMSS))
}
if s.LogType != "" {
nameArr = append(nameArr, s.LogType)
}
if s.RequestId != "" {
nameArr = append(nameArr, s.RequestId)
}
if s.Ri != "" {
nameArr = append(nameArr, s.Ri)
}
if s.Num != "" {
nameArr = append(nameArr, s.Num)
}
return strings.Join(nameArr, "-")
}
// JSON 生成json后缀文件名
func (s *DataSaveFileName) JSON() string {
filename := s.join()
return fmt.Sprintf("%s.json", filename)
}
// ZIP 生成zip后缀文件名
func (s *DataSaveFileName) ZIP() string {
filename := s.join()
return fmt.Sprintf("%s.zip", filename)
}
// CSV 生成csv后缀文件名
func (s *DataSaveFileName) CSV() string {
filename := s.join()
return fmt.Sprintf("%s.csv", filename)
}
// DataSaveFilePath 文件数据保存路径
//
// 文件数据集中存储在云平台所提供的对象存储数据库OSS中。文件数据保存路径如下
// 路径说明:/专业编码/OMC厂商编码/OMC编号/网元类型/数据类别/时间
// 专业编码、OMC厂商编码、OMC编号、网元类型请参考本文档附件1。
// 数据类别NRM资源/ PM性能/CM配置/FM告警/LOG日志
// 时间资源、告警、配置、日志数据按天存放时间格式为YYYYMMDD性能数据按小时存放时间格式为YYYYMMDDHH24小时数为性能数据统计起始时间。
func DataSaveFilePath(neType, dataType string) string {
basePath := BasePath("/")
dateStr := date.ParseDateToStr(time.Now(), "20060102")
if dataType == "PM" {
dateStr = date.ParseDateToStr(time.Now(), "200601021504")
}
return fmt.Sprintf("%s/%s/%s/%s", basePath, neType, dataType, dateStr)
}
// BasePath 基础路径 separator 分隔符
//
// 专业编码{separator}OMC厂商编码{separator}OMC编号
func BasePath(separator string) string {
professionCode := config.Get("cxy.professionCode").(string)
vendorCode := config.Get("cxy.vendorCode").(string)
omcCode := config.Get("cxy.omcCode").(string)
return fmt.Sprintf("%s%s%s%s%s", professionCode, separator, vendorCode, separator, omcCode)
}
// UploadOSSByJSONToZip 将json文件压缩zip上传到oss
func UploadOSSByJSONToZip(filePath, neType, dataType string) error {
dirPath := DataSaveFilePath(neType, dataType)
// 添加到zip压缩文件
zipFilePath := strings.Replace(filePath, ".json", ".zip", 1)
err := file.CompressZipByFile(zipFilePath, filePath)
if err != nil {
return err
}
// 组成上传文件路径名
zipFileName := filepath.Base(zipFilePath)
newFileName := fmt.Sprintf("%s/%s", dirPath, zipFileName)
oss.FileUploadFileByFilePath(zipFilePath, newFileName)
return nil
}
// UploadOSSByZip 将zip文件上传到oss
func UploadOSSByZip(filePath, neType, dataType string) error {
dirPath := DataSaveFilePath(neType, dataType)
// 组成上传文件路径名
zipFileName := filepath.Base(filePath)
newFileName := fmt.Sprintf("%s/%s", dirPath, zipFileName)
oss.FileUploadFileByFilePath(filePath, newFileName)
return nil
}

View File

@@ -0,0 +1,163 @@
package kafka
import (
"context"
"nms_nbi/src/framework/config"
"nms_nbi/src/framework/logger"
"time"
"github.com/IBM/sarama"
)
// KInitConm Kafka初始配置连接实例
var KInitConm Kafka
// InitConfig 连接Kafka实例
func InitConfig() {
// 服务地址
addrs := []string{}
addrsArr := config.Get("cxy.kafka.addrs").([]any)
for _, v := range addrsArr {
addrs = append(addrs, v.(string))
}
k := Kafka{
Addrs: addrs,
}
k.NewConfig()
k.Config.Net.SASL.Enable = false
KInitConm = k
}
// Kafka 实例连接
type Kafka struct {
Addrs []string
Config *sarama.Config
}
func (k *Kafka) NewConfig() {
// 设置Kafka配置
config := sarama.NewConfig()
// 生产者
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max = 5
config.Producer.Return.Successes = true
k.Config = config
}
// MessageSyncSend 消息同步发送
func (k *Kafka) MessageSyncSend(topic string, partition int32, msg string) (int32, int64, error) {
// 创建Kafka生产者
producer, err := sarama.NewSyncProducer(k.Addrs, k.Config)
if err != nil {
logger.Warnf("MessageSyncSend NewSyncProducer Topic:%s, Partition:%d => %s", topic, partition, err.Error())
return partition, 0, err
}
defer producer.Close()
// 发送消息
message := &sarama.ProducerMessage{
Topic: topic,
Partition: partition,
Value: sarama.StringEncoder(msg),
}
partition, offset, err := producer.SendMessage(message)
if err != nil {
logger.Errorf("MessageSyncSend SendMessage Topic:%s, Partition:%d => %s", topic, partition, err.Error())
}
return partition, offset, err
}
// MessageSyncRead 消息同步读取-最旧历史积压
func (k *Kafka) MessageSyncRead(topic string, partition int32) (string, error) {
// 创建Kafka消费者
consumer, err := sarama.NewConsumer(k.Addrs, k.Config)
if err != nil {
logger.Warnf("MessageSyncRead NewConsumer Topic:%s, Partition:%d => %s", topic, partition, err.Error())
return "", err
}
defer consumer.Close()
// 消费消息
partitionConsumer, err := consumer.ConsumePartition(topic, partition, sarama.OffsetOldest)
if err != nil {
logger.Errorf("MessageSyncRead ConsumePartition Topic:%s, Partition:%d => %s", topic, partition, err.Error())
return "", err
}
defer partitionConsumer.Close()
// 设置超时时间为1秒
timeout := time.After(1 * time.Second)
for {
select {
case msg := <-partitionConsumer.Messages():
return string(msg.Value), nil
case <-timeout:
logger.Infof("MessageSyncRead timeout Topic:%s, Partition:%d", topic, partition)
return "", nil
}
}
}
// MessageSyncReadMark 消息同步读取并消费-最旧历史积压
func (k *Kafka) MessageSyncReadMark(topic string, partition int32, group string) (string, error) {
consumerGroup, err := sarama.NewConsumerGroup(k.Addrs, group, k.Config)
if err != nil {
logger.Warnf("MessageSyncReadMark NewConsumerGroup Topic:%s, Partition:%d, Group:%s => %s", topic, partition, group, err.Error())
return "", err
}
defer consumerGroup.Close()
consumer := readAndMarkConsumerGroup{
MessageChan: make(chan []byte, 1),
}
// `Consume` should be called inside an infinite loop, when a
// server-side rebalance happens, the consumer session will need to be
// recreated to get the new claims
err = consumerGroup.Consume(context.Background(), []string{topic}, consumer)
if err != nil {
logger.Errorf("MessageSyncReadMark Consume Topic:%s, Partition:%d, Group:%s => %s", topic, partition, group, err.Error())
return "", err
}
messageByte := <-consumer.MessageChan
return string(messageByte), err
}
// readAndMarkConsumerGroup 消费组积压消息读取立即标记
type readAndMarkConsumerGroup struct {
MessageChan chan []byte // 消息通道
}
func (readAndMarkConsumerGroup) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (readAndMarkConsumerGroup) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (g readAndMarkConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// 设置超时时间为1秒
timeout := time.After(1 * time.Second)
for {
select {
case message, ok := <-claim.Messages():
if !ok {
logger.Warnf("ConsumeClaim Messages fail Topic:%s, Partition:%d, point:%d", claim.Topic(), claim.Partition(), claim.InitialOffset())
g.MessageChan <- []byte{}
return nil
}
session.MarkMessage(message, "")
g.MessageChan <- message.Value
return nil
// Should return when `session.Context()` is done.
// If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see:
// https://github.com/IBM/sarama/issues/1192
case <-session.Context().Done():
logger.Infof("ConsumeClaim done Topic:%s, Partition:%d, point:%d", claim.Topic(), claim.Partition(), claim.InitialOffset())
g.MessageChan <- []byte{}
return nil
case <-timeout:
logger.Infof("ConsumeClaim timeout Topic:%s, Partition:%d, point:%d", claim.Topic(), claim.Partition(), claim.InitialOffset())
g.MessageChan <- []byte{}
return nil
}
}
}

View File

@@ -0,0 +1,52 @@
package oss
import (
"context"
"nms_nbi/src/framework/config"
"nms_nbi/src/framework/logger"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
)
// OInitConm OSS初始配置连接实例
var OInitConm *minio.Client
// InitConfig 连接OSS实例
func InitConfig() {
// 读取数据源配置
bucketname := config.Get("cxy.oss.bucketname").(string)
endpoint := config.Get("cxy.oss.endpoint").(string)
useSSL := config.Get("cxy.oss.useSSL").(bool)
accessKeyID := config.Get("cxy.oss.accessKeyID").(string)
secretAccessKey := config.Get("cxy.oss.secretAccessKey").(string)
// 初始minio客户端对象
minioClient, err := minio.New(endpoint, &minio.Options{
Creds: credentials.NewStaticV4(accessKeyID, secretAccessKey, ""),
Secure: useSSL,
})
if err != nil {
logger.Fatalf("minio %s client err %v", endpoint, err)
}
// 检查存储桶是否存在可用
found, err := minioClient.BucketExists(context.Background(), bucketname)
if err != nil {
logger.Fatalf("minio %s bucket [%s] exists err %v", endpoint, bucketname, err)
}
if !found {
logger.Fatalf("minio %s bucket [%s] not found err %v", endpoint, bucketname, err)
}
logger.Infof("minio %s bucket [%s] exists", endpoint, bucketname)
OInitConm = minioClient
}
// FileUploadFileByFilePath 文件上传-文件绝对路径
func FileUploadFileByFilePath(filePath, newFileName string) (minio.UploadInfo, error) {
bucketname := config.Get("cxy.oss.bucketname").(string)
contentType := "application/octet-stream"
// Upload the test file with FPutObject
ctx := context.Background()
return OInitConm.FPutObject(ctx, bucketname, newFileName, filePath, minio.PutObjectOptions{ContentType: contentType})
}