From 5a28b0b86139066f52c422888ad0789a046f10a3 Mon Sep 17 00:00:00 2001 From: TsMask <340112800@qq.com> Date: Mon, 18 Mar 2024 11:31:50 +0800 Subject: [PATCH] =?UTF-8?q?add:=20=E6=98=9F=E7=BD=91=E5=8C=97=E5=90=91?= =?UTF-8?q?=E6=A8=A1=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/app.go | 3 + src/modules/api_rest_cxy/api_rest_cxy.go | 167 +++++ .../api_rest_cxy/controller/security.go | 117 ++++ src/modules/api_rest_cxy/controller/system.go | 636 ++++++++++++++++++ src/modules/api_rest_cxy/controller/test.go | 223 ++++++ src/modules/api_rest_cxy/model/alarm.go | 19 + src/modules/api_rest_cxy/service/alarm.go | 18 + .../api_rest_cxy/service/alarm.impl.go | 164 +++++ src/modules/api_rest_cxy/service/config.go | 7 + .../api_rest_cxy/service/config.impl.go | 32 + src/modules/api_rest_cxy/service/log.go | 10 + src/modules/api_rest_cxy/service/log.impl.go | 154 +++++ .../api_rest_cxy/service/performance.go | 7 + .../api_rest_cxy/service/performance.impl.go | 55 ++ src/modules/api_rest_cxy/service/resource.go | 7 + .../api_rest_cxy/service/resource.impl.go | 13 + .../api_rest_cxy/utils/common/common.go | 153 +++++ src/modules/api_rest_cxy/utils/kafka/kafka.go | 163 +++++ src/modules/api_rest_cxy/utils/oss/oss.go | 52 ++ 19 files changed, 2000 insertions(+) create mode 100644 src/modules/api_rest_cxy/api_rest_cxy.go create mode 100644 src/modules/api_rest_cxy/controller/security.go create mode 100644 src/modules/api_rest_cxy/controller/system.go create mode 100644 src/modules/api_rest_cxy/controller/test.go create mode 100644 src/modules/api_rest_cxy/model/alarm.go create mode 100644 src/modules/api_rest_cxy/service/alarm.go create mode 100644 src/modules/api_rest_cxy/service/alarm.impl.go create mode 100644 src/modules/api_rest_cxy/service/config.go create mode 100644 src/modules/api_rest_cxy/service/config.impl.go create mode 100644 src/modules/api_rest_cxy/service/log.go create mode 100644 src/modules/api_rest_cxy/service/log.impl.go create mode 100644 src/modules/api_rest_cxy/service/performance.go create mode 100644 src/modules/api_rest_cxy/service/performance.impl.go create mode 100644 src/modules/api_rest_cxy/service/resource.go create mode 100644 src/modules/api_rest_cxy/service/resource.impl.go create mode 100644 src/modules/api_rest_cxy/utils/common/common.go create mode 100644 src/modules/api_rest_cxy/utils/kafka/kafka.go create mode 100644 src/modules/api_rest_cxy/utils/oss/oss.go diff --git a/src/app.go b/src/app.go index 23c845e..be70505 100644 --- a/src/app.go +++ b/src/app.go @@ -8,6 +8,7 @@ import ( "nms_nbi/src/framework/errorcatch" "nms_nbi/src/framework/middleware" "nms_nbi/src/framework/middleware/security" + apirestcxy "nms_nbi/src/modules/api_rest_cxy" "nms_nbi/src/modules/chart" "nms_nbi/src/modules/common" "nms_nbi/src/modules/crontask" @@ -133,4 +134,6 @@ func initModulesRoute(app *gin.Engine) { crontask.Setup(app) // 监控模块 - 含调度处理加入队列,放最后 monitor.Setup(app) + // 北向模块 - 中国星网 + apirestcxy.Setup(app) } diff --git a/src/modules/api_rest_cxy/api_rest_cxy.go b/src/modules/api_rest_cxy/api_rest_cxy.go new file mode 100644 index 0000000..db9bab2 --- /dev/null +++ b/src/modules/api_rest_cxy/api_rest_cxy.go @@ -0,0 +1,167 @@ +package apirestcxy + +import ( + "nms_nbi/src/framework/logger" + "nms_nbi/src/framework/middleware" + "nms_nbi/src/modules/api_rest_cxy/controller" + "nms_nbi/src/modules/api_rest_cxy/utils/kafka" + "nms_nbi/src/modules/api_rest_cxy/utils/oss" + + "github.com/gin-gonic/gin" +) + +// 模块路由注册 - 中国星网网络创新研究院有限公司企业标准 +func Setup(router *gin.Engine) { + logger.Infof("开始加载 ====> NMS-CXY 北向模块路由") + + apiRest := router.Group("nms-cxy/api/rest") + + // 启动时需要的初始参数 + InitLoad() + + // 安全管理 + securityManagementGroup := apiRest.Group("/securityManagement") + { + // 鉴权登录接口 + securityManagementGroup.POST("/:apiVersion/oauth/token", + controller.NewSecurity.OauthTokenLogin, + ) + // 握手消息接口 + securityManagementGroup.POST("/:apiVersion/oauth/handshake", + middleware.PreAuthorize(nil), + controller.NewSecurity.Handshake, + ) + // 退出消息接口 + securityManagementGroup.DELETE("/:apiVersion/oauth/token", + middleware.PreAuthorize(nil), + controller.NewSecurity.OauthTokenLogout, + ) + } + + // 系统管理 + systemManagementGroup := apiRest.Group("/systemManagement") + { + // 软件版本查询接口 + systemManagementGroup.GET("/:apiVersion/softwareVersion", + middleware.PreAuthorize(nil), + controller.NewSystem.SoftwareVersionInfo, + ) + + // 软件下载请求接口 + systemManagementGroup.POST("/:apiVersion/softwareDownload", + middleware.PreAuthorize(nil), + controller.NewSystem.SoftwareDownloadCheck, + ) + // 软件下载状态查询接口 + systemManagementGroup.GET("/:apiVersion/softwareDownload", + middleware.PreAuthorize(nil), + controller.NewSystem.SoftwareDownloadState, + ) + + // 软件更新请求接口 + systemManagementGroup.POST("/:apiVersion/softwareUpdate", + middleware.PreAuthorize(nil), + controller.NewSystem.SoftwareUpdateVersion, + ) + // 软件更新状态查询接口 + systemManagementGroup.GET("/:apiVersion/softwareUpdate", + middleware.PreAuthorize(nil), + controller.NewSystem.SoftwareUpdateState, + ) + + // 网元重启请求接口 + systemManagementGroup.POST("/:apiVersion/reboot", + middleware.PreAuthorize(nil), + controller.NewSystem.RebootApp, + ) + // 网元重启状态查询接口 + systemManagementGroup.GET("/:apiVersion/reboot", + middleware.PreAuthorize(nil), + controller.NewSystem.RebootState, + ) + + // 恢复出厂配置请求接口 + systemManagementGroup.POST("/:apiVersion/reinitiate", + middleware.PreAuthorize(nil), + controller.NewSystem.ReinitiateApp, + ) + // 恢复出厂配置状态查询接口 + systemManagementGroup.GET("/:apiVersion/reinitiate", + middleware.PreAuthorize(nil), + controller.NewSystem.ReinitiateState, + ) + + // 配置修改接口 + systemManagementGroup.POST("/:apiVersion/config", + middleware.PreAuthorize(nil), + controller.NewSystem.ConfigApp, + ) + // 配置查询接口 + systemManagementGroup.GET("/:apiVersion/config", + middleware.PreAuthorize(nil), + controller.NewSystem.ConfigInfo, + ) + + // 告警文件同步请求接口 + systemManagementGroup.POST("/:apiVersion/alarm", + middleware.PreAuthorize(nil), + controller.NewSystem.AlarmSysnc, + ) + + // 日志数据同步请求接口 + systemManagementGroup.POST("/:apiVersion/log", + middleware.PreAuthorize(nil), + controller.NewSystem.LogSysnc, + ) + + // 业务诊断请求接口 + systemManagementGroup.POST("/:apiVersion/diagnose", + middleware.PreAuthorize(nil), + controller.NewSystem.Diagnose, + ) + + // 业务连续性功能控制接口 + systemManagementGroup.POST("/:apiVersion/servicecontinuity", + middleware.PreAuthorize(nil), + controller.NewSystem.ServiceContinuity, + ) + } + + // 测试管理 + testManagementGroup := apiRest.Group("/testManagement") + { + // kafka发送消息 + testManagementGroup.POST("/kafka/send", + middleware.PreAuthorize(nil), + controller.NewTest.KafkaSend, + ) + // kafka读取并消费消息 + testManagementGroup.POST("/kafka/readMark", + middleware.PreAuthorize(nil), + controller.NewTest.KafkaReadMark, + ) + // 告警列表 + testManagementGroup.POST("/alarm/list", + middleware.PreAuthorize(nil), + controller.NewTest.AlarmList, + ) + // OSS文件上传 + testManagementGroup.POST("/oss/upload", + middleware.PreAuthorize(nil), + controller.NewTest.OssUpload, + ) + // OSS文件上传测试上报 + testManagementGroup.POST("/oss/testUp", + middleware.PreAuthorize(nil), + controller.NewTest.OssTestUp, + ) + } +} + +// InitLoad 初始参数 +func InitLoad() { + // 初始化连接Kafka + kafka.InitConfig() + // 初始化连接OSS + oss.InitConfig() +} diff --git a/src/modules/api_rest_cxy/controller/security.go b/src/modules/api_rest_cxy/controller/security.go new file mode 100644 index 0000000..bea841b --- /dev/null +++ b/src/modules/api_rest_cxy/controller/security.go @@ -0,0 +1,117 @@ +package controller + +import ( + commonConstants "nms_nbi/src/framework/constants/common" + "nms_nbi/src/framework/constants/token" + "nms_nbi/src/framework/i18n" + "nms_nbi/src/framework/utils/ctx" + tokenUtils "nms_nbi/src/framework/utils/token" + "nms_nbi/src/framework/vo/result" + commonService "nms_nbi/src/modules/common/service" + systemService "nms_nbi/src/modules/system/service" + + "github.com/gin-gonic/gin" +) + +// 实例化控制层 SecurityController 结构体 +var NewSecurity = &SecurityController{ + accountService: commonService.NewAccountImpl, + sysLogLoginService: systemService.NewSysLogLoginImpl, +} + +// 安全管理接口 +// +// PATH /securityManagement +type SecurityController struct { + // 账号身份操作服务 + accountService commonService.IAccount + // 系统登录访问 + sysLogLoginService systemService.ISysLogLogin +} + +// 鉴权登录接口 +// +// POST /:apiVersion/oauth/token +func (s *SecurityController) OauthTokenLogin(c *gin.Context) { + var body struct { + GrantType string `json:"grantType" binding:"required,oneof=password"` + UserName string `json:"userName" binding:"required"` + Value string `json:"value" binding:"required"` + } + if err := c.ShouldBindJSON(&body); err != nil { + c.JSON(400, map[string]any{}) + return + } + + // 当前请求信息 + ipaddr, location := ctx.IPAddrLocation(c) + os, browser := ctx.UaOsBrowser(c) + + // 登录用户信息 + loginUser, err := s.accountService.LoginByUsername(body.UserName, body.Value) + if err != nil { + c.JSON(400, map[string]any{}) + return + } + + // 生成令牌,创建系统访问记录 + tokenStr := tokenUtils.Create(&loginUser, ipaddr, location, os, browser) + if tokenStr == "" { + c.JSON(400, map[string]any{}) + return + } else { + s.accountService.UpdateLoginDateAndIP(&loginUser) + // 登录成功 + s.sysLogLoginService.CreateSysLogLogin( + body.UserName, commonConstants.STATUS_YES, "app.common.loginSuccess", + ipaddr, location, os, browser, + ) + } + + expires := (loginUser.ExpireTime - loginUser.LoginTime) / 1000 + c.JSON(200, map[string]any{ + token.ACCESS_TOKEN: tokenStr, + "expires": expires, + }) +} + +// 握手消息接口 +// +// POST /:apiVersion/oauth/handshake +func (s *SecurityController) Handshake(c *gin.Context) { + language := ctx.AcceptLanguage(c) + loginUser, err := ctx.LoginUser(c) + if err != nil { + c.JSON(401, result.CodeMsg(401, i18n.TKey(language, err.Error()))) + return + } + + c.JSON(200, map[string]any{ + "userID": loginUser.UserID, + }) +} + +// 退出消息接口 +// +// DELETE /:apiVersion/oauth/token +func (s *SecurityController) OauthTokenLogout(c *gin.Context) { + language := ctx.AcceptLanguage(c) + tokenStr := ctx.Authorization(c) + if tokenStr != "" { + // 存在token时记录退出信息 + userName := tokenUtils.Remove(tokenStr) + if userName != "" { + // 当前请求信息 + ipaddr, location := ctx.IPAddrLocation(c) + os, browser := ctx.UaOsBrowser(c) + + // 创建系统访问记录 退出成功 + s.sysLogLoginService.CreateSysLogLogin( + userName, commonConstants.STATUS_YES, "app.common.logoutSuccess", + ipaddr, location, os, browser, + ) + } + } + + c.JSON(204, result.OkMsg(i18n.TKey(language, "app.common.logoutSuccess"))) +} diff --git a/src/modules/api_rest_cxy/controller/system.go b/src/modules/api_rest_cxy/controller/system.go new file mode 100644 index 0000000..e0b8221 --- /dev/null +++ b/src/modules/api_rest_cxy/controller/system.go @@ -0,0 +1,636 @@ +package controller + +import ( + "fmt" + "nms_nbi/src/framework/config" + "nms_nbi/src/framework/i18n" + "nms_nbi/src/framework/utils/cmd" + "nms_nbi/src/framework/utils/ctx" + "nms_nbi/src/framework/utils/date" + "nms_nbi/src/framework/utils/ping" + "nms_nbi/src/framework/vo/result" + "nms_nbi/src/modules/api_rest_cxy/service" + "nms_nbi/src/modules/api_rest_cxy/utils/common" + neService "nms_nbi/src/modules/network_element/service" + + "time" + + "github.com/gin-gonic/gin" +) + +// 实例化控制层 SystemController 结构体 +var NewSystem = &SystemController{ + neInfoService: neService.NewNeInfoImpl, + neVersionService: neService.NewNeVersionImpl, + neSoftwareService: neService.NewNeSoftwareImpl, + alarmService: service.NewAlarmImpl, + logService: service.NewLogImpl, +} + +// 系统管理接口 +// +// PATH /systemManagement +type SystemController struct { + // 网元信息服务 + neInfoService neService.INeInfo + // 网元版本信息服务 + neVersionService neService.INeVersion + // 网元软件包信息服务 + neSoftwareService neService.INeSoftware + // 告警数据服务 + alarmService service.IAlarm + // 日志数据服务 + logService service.ILog +} + +// 软件版本查询接口 +// +// GET /:apiVersion/softwareVersion +func (s *SystemController) SoftwareVersionInfo(c *gin.Context) { + language := ctx.AcceptLanguage(c) + var body struct { + RequestId string `json:"requestId" binding:"required"` + Ruid string `json:"ruid" binding:"required"` + } + if err := c.ShouldBindJSON(&body); err != nil { + c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + return + } + + neInfo := s.neInfoService.SelectNeInfoByRmuid(body.Ruid) + if neInfo.RmUID != body.Ruid { + c.JSON(500, map[string]any{ + "requestId": body.RequestId, + "errorInfo": "NE not exist", + }) + return + } + + neVersion := s.neVersionService.SelectByTypeAndID(neInfo.NeType, neInfo.NeId) + if neVersion.ID == "" { + c.JSON(200, map[string]any{ + "requestId": body.RequestId, + "currentVersion": "-", + "backupVersionList": []string{"-"}, + }) + return + } + c.JSON(200, map[string]any{ + "requestId": body.RequestId, + "currentVersion": neVersion.Version, + "backupVersionList": []string{neVersion.PreVersion}, + }) +} + +// 软件下载请求接口 +// +// POST /:apiVersion/softwareDownload +func (s *SystemController) SoftwareDownloadCheck(c *gin.Context) { + language := ctx.AcceptLanguage(c) + var body struct { + RequestId string `json:"requestId" binding:"required"` + SoftwareVersion string `json:"softwareVersion" binding:"required"` + DownloadPath string `json:"downloadPath" binding:"required"` + } + if err := c.ShouldBindJSON(&body); err != nil { + c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + return + } + + neSoftware := s.neSoftwareService.SelectByVersionAndPath(body.SoftwareVersion, body.DownloadPath) + if neSoftware.Version != body.SoftwareVersion { + c.JSON(500, map[string]any{ + "requestId": body.RequestId, + "errorInfo": "NE not exist", + }) + return + } + + c.JSON(200, map[string]any{ + "requestId": body.RequestId, + }) +} + +// 软件下载状态查询接口 +// +// GET /:apiVersion/softwareDownload +func (s *SystemController) SoftwareDownloadState(c *gin.Context) { + language := ctx.AcceptLanguage(c) + var body struct { + RequestId string `json:"requestId" binding:"required"` + } + if err := c.ShouldBindJSON(&body); err != nil { + c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + return + } + + if len(body.RequestId) < 5 { + c.JSON(500, map[string]any{ + "requestId": body.RequestId, + "errorInfo": "NE not exist", + }) + return + } + + c.JSON(200, map[string]any{ + "requestId": body.RequestId, + "status": "fail", + "additionalInfo": "Download path not exist", + }) +} + +// 软件更新请求接口 +// +// POST /:apiVersion/softwareUpdate +func (s *SystemController) SoftwareUpdateVersion(c *gin.Context) { + language := ctx.AcceptLanguage(c) + var body struct { + RequestId string `json:"requestId" binding:"required"` + Ruid string `json:"ruid" binding:"required"` + UpdateVersion string `json:"updateVersion" binding:"required"` + } + if err := c.ShouldBindJSON(&body); err != nil { + c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + return + } + + neInfo := s.neInfoService.SelectNeInfoByRmuid(body.Ruid) + if neInfo.RmUID != body.Ruid { + c.JSON(500, map[string]any{ + "requestId": body.RequestId, + "errorInfo": "NE not exist", + }) + return + } + + neVersion := s.neVersionService.SelectByTypeAndID(neInfo.NeType, neInfo.NeId) + if neVersion.ID == "" { + c.JSON(200, map[string]any{ + "requestId": body.RequestId, + "currentVersion": "-", + "backupVersionList": []string{"-"}, + }) + return + } + + // 更新版本号 + neVersion.Version = body.UpdateVersion + rows := s.neVersionService.Update(neVersion) + if rows > 0 { + c.JSON(200, map[string]any{ + "requestId": body.RequestId, + }) + return + } + c.JSON(500, map[string]any{ + "requestId": body.RequestId, + "errorInfo": "Update Version Fail", + }) +} + +// 软件更新状态查询接口 +// +// GET /:apiVersion/softwareUpdate +func (s *SystemController) SoftwareUpdateState(c *gin.Context) { + language := ctx.AcceptLanguage(c) + var body struct { + RequestId string `json:"requestId" binding:"required"` + Ruid string `json:"ruid" binding:"required"` + } + if err := c.ShouldBindJSON(&body); err != nil { + c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + return + } + + neInfo := s.neInfoService.SelectNeInfoByRmuid(body.Ruid) + if neInfo.RmUID != body.Ruid { + c.JSON(500, map[string]any{ + "requestId": body.RequestId, + "errorInfo": "NE not exist", + }) + return + } + + c.JSON(200, map[string]any{ + "requestId": body.RequestId, + "status": "fail", + "additionalInfo": "software not compatible", + }) +} + +// 网元重启请求接口 +// +// POST /:apiVersion/reboot +func (s *SystemController) RebootApp(c *gin.Context) { + language := ctx.AcceptLanguage(c) + var body struct { + RequestId string `json:"requestId" binding:"required"` + Ruid string `json:"ruid" binding:"required"` + } + if err := c.ShouldBindJSON(&body); err != nil { + c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + return + } + + neInfo := s.neInfoService.SelectNeInfoByRmuid(body.Ruid) + if neInfo.RmUID != body.Ruid { + c.JSON(500, map[string]any{ + "requestId": body.RequestId, + "errorInfo": "NE not exist", + }) + return + } + + _, err := cmd.ExecWithCheck("nohup", "sh", "-c", "sleep 2s && service restagent restart", "&") + if err != nil { + c.JSON(200, map[string]any{ + "requestId": body.RequestId, + "status": "fail", + "additionalInfo": fmt.Sprintf("NE service reboot error %s", err.Error()), + }) + return + } + + c.JSON(200, map[string]any{ + "requestId": body.RequestId, + }) +} + +// 网元重启状态查询接口 +// +// GET /:apiVersion/reboot +func (s *SystemController) RebootState(c *gin.Context) { + language := ctx.AcceptLanguage(c) + var body struct { + RequestId string `json:"requestId" binding:"required"` + Ruid string `json:"ruid" binding:"required"` + } + if err := c.ShouldBindJSON(&body); err != nil { + c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + return + } + + neInfo := s.neInfoService.SelectNeInfoByRmuid(body.Ruid) + if neInfo.RmUID != body.Ruid { + c.JSON(500, map[string]any{ + "requestId": body.RequestId, + "errorInfo": "NE not exist", + }) + return + } + + // 获取程序运行时间 + runTime := time.Since(config.RunTime()).Abs().Seconds() + if runTime > 120 { + c.JSON(200, map[string]any{ + "requestId": body.RequestId, + "status": "fail", + "additionalInfo": "NE not implement", + }) + return + } + c.JSON(200, map[string]any{ + "requestId": body.RequestId, + "status": "success", + "additionalInfo": "NE successfully restarted", + }) +} + +// 恢复出厂配置请求接口 +// +// POST /:apiVersion/reinitiate +func (s *SystemController) ReinitiateApp(c *gin.Context) { + language := ctx.AcceptLanguage(c) + var body struct { + RequestId string `json:"requestId" binding:"required"` + Ruid string `json:"ruid" binding:"required"` + } + if err := c.ShouldBindJSON(&body); err != nil { + c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + return + } + + neInfo := s.neInfoService.SelectNeInfoByRmuid(body.Ruid) + if neInfo.RmUID != body.Ruid { + c.JSON(500, map[string]any{ + "requestId": body.RequestId, + "errorInfo": "NE not exist", + }) + return + } + + c.JSON(200, map[string]any{ + "requestId": body.RequestId, + }) +} + +// 恢复出厂配置状态查询接口 +// +// GET /:apiVersion/reinitiate +func (s *SystemController) ReinitiateState(c *gin.Context) { + language := ctx.AcceptLanguage(c) + var body struct { + RequestId string `json:"requestId" binding:"required"` + Ruid string `json:"ruid" binding:"required"` + } + if err := c.ShouldBindJSON(&body); err != nil { + c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + return + } + + neInfo := s.neInfoService.SelectNeInfoByRmuid(body.Ruid) + if neInfo.RmUID != body.Ruid { + c.JSON(500, map[string]any{ + "requestId": body.RequestId, + "errorInfo": "NE not exist", + }) + return + } + + c.JSON(200, map[string]any{ + "requestId": body.RequestId, + "status": "fail", + "additionalInfo": "NE not implement", + }) +} + +// 配置修改接口 +// +// POST /:apiVersion/config +func (s *SystemController) ConfigApp(c *gin.Context) { + language := ctx.AcceptLanguage(c) + var body struct { + RequestId string `json:"requestId" binding:"required"` + Ruid string `json:"ruid" binding:"required"` + OperType string `json:"operType" binding:"required"` + OperPara any `json:"operPara" binding:"required"` + } + if err := c.ShouldBindJSON(&body); err != nil { + c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + return + } + + neInfo := s.neInfoService.SelectNeInfoByRmuid(body.Ruid) + if neInfo.RmUID != body.Ruid { + c.JSON(500, map[string]any{ + "requestId": body.RequestId, + "errorInfo": "NE not exist", + }) + return + } + + c.JSON(200, map[string]any{ + "requestId": body.RequestId, + "status": "partialsuccess", + "failPara": []string{"PhyCellID", "TAC"}, + }) +} + +// 配置查询接口 +// +// GET /:apiVersion/config +func (s *SystemController) ConfigInfo(c *gin.Context) { + language := ctx.AcceptLanguage(c) + var body struct { + RequestId string `json:"requestId" binding:"required"` + Ruid string `json:"ruid" binding:"required"` + QueryPara []string `json:"queryPara" binding:"required"` + } + if err := c.ShouldBindJSON(&body); err != nil { + c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + return + } + + neInfo := s.neInfoService.SelectNeInfoByRmuid(body.Ruid) + if neInfo.RmUID != body.Ruid { + c.JSON(500, map[string]any{ + "requestId": body.RequestId, + "errorInfo": "NE not exist", + }) + return + } + + c.JSON(200, map[string]any{ + "requestId": body.RequestId, + "status": "partialsuccess", + "successPara": map[string]any{ + "cellIdentity": "1234567890", + "PhyCellID": "12345", + "TAC": "12345", + }, + }) +} + +// 告警文件同步请求接口 +// +// 告警实时上报通过kafka推送到topic +// +// POST /:apiVersion/alarm +func (s *SystemController) AlarmSysnc(c *gin.Context) { + language := ctx.AcceptLanguage(c) + var body struct { + RequestId string `json:"requestId" binding:"required"` + Ruid string `json:"ruid" binding:"required"` + StartTime string `json:"startTime" binding:"required"` + EndTime string `json:"endTime" binding:"required"` + } + if err := c.ShouldBindJSON(&body); err != nil { + c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + return + } + + // 时间格式校验 + startTime := date.ParseStrToDate(body.StartTime, date.YYYY_MM_DD_HH_MM_SS) + if startTime.IsZero() { + c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + return + } + endTime := date.ParseStrToDate(body.EndTime, date.YYYY_MM_DD_HH_MM_SS) + if endTime.IsZero() { + c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + return + } + + // 查询网元是否存在 + neInfo := s.neInfoService.SelectNeInfoByRmuid(body.Ruid) + if neInfo.RmUID != body.Ruid { + c.JSON(500, map[string]any{ + "requestId": body.RequestId, + "errorInfo": "NE not exist", + }) + return + } + + sysncNum, filePath := s.alarmService.AlarmDataToFile(body.RequestId, neInfo.NeType, neInfo.RmUID, body.StartTime, body.EndTime) + if sysncNum > 0 { + common.UploadOSSByJSONToZip(filePath, neInfo.NeType, "FM") + } + + c.JSON(200, map[string]any{ + "requestId": body.RequestId, + "sysncNum": sysncNum, + }) +} + +// 日志数据同步请求接口 +// +// POST /:apiVersion/log +func (s *SystemController) LogSysnc(c *gin.Context) { + language := ctx.AcceptLanguage(c) + var body struct { + RequestId string `json:"requestId" binding:"required"` + Ruid string `json:"ruid" binding:"required"` + LogType string `json:"logType"` // operate、security、other + StartTime string `json:"startTime" binding:"required"` // 2022-12-15 08:34:24 + EndTime string `json:"endTime" binding:"required"` + } + if err := c.ShouldBindJSON(&body); err != nil { + c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + return + } + + // 时间格式校验 + startTime := date.ParseStrToDate(body.StartTime, date.YYYY_MM_DD_HH_MM_SS) + if startTime.IsZero() { + c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + return + } + endTime := date.ParseStrToDate(body.EndTime, date.YYYY_MM_DD_HH_MM_SS) + if endTime.IsZero() { + c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + return + } + + neInfo := s.neInfoService.SelectNeInfoByRmuid(body.Ruid) + if neInfo.RmUID != body.Ruid { + c.JSON(500, map[string]any{ + "requestId": body.RequestId, + "errorInfo": "NE not exist", + }) + return + } + + if body.LogType == "operate" { + sysncNum, filePath := s.logService.OperateLogToFile(language, body.StartTime, body.EndTime) + if sysncNum > 0 { + common.UploadOSSByJSONToZip(filePath, neInfo.NeType, "LOG") + } + c.JSON(200, map[string]any{ + "requestId": body.RequestId, + "sysncNum": sysncNum, + }) + return + } + + if body.LogType == "security" { + sysncNum, filePath := s.logService.SecurityLogToFile(language, body.StartTime, body.EndTime) + if sysncNum > 0 { + common.UploadOSSByJSONToZip(filePath, neInfo.NeType, "LOG") + } + c.JSON(200, map[string]any{ + "requestId": body.RequestId, + "sysncNum": sysncNum, + }) + return + } + + c.JSON(500, map[string]any{ + "requestId": body.RequestId, + "errorInfo": "logType not found", + }) +} + +// 业务诊断请求接口 +// +// POST /:apiVersion/diagnose +func (s *SystemController) Diagnose(c *gin.Context) { + language := ctx.AcceptLanguage(c) + var body struct { + RequestId string `json:"requestId" binding:"required"` + Ruid string `json:"ruid" binding:"required"` + OperType string `json:"operType" binding:"required,oneof=ping trace"` + OperPara map[string]any `json:"operPara" binding:"required"` + } + if err := c.ShouldBindJSON(&body); err != nil { + c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + return + } + + operPara := body.OperPara + // 必填目的 IP 地址(字符串类型,必填) + desAddr, desAddrOk := operPara["desAddr"] + if !desAddrOk || desAddr == nil { + c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + return + } + + // 端到端ping检测 + if body.OperType == "ping" { + p := ping.Ping{} + p.CopyFrom(operPara) + info, err := p.StatsInfo() + if err != nil { + c.JSON(500, map[string]any{ + "requestId": body.RequestId, + "errorInfo": fmt.Sprintf("ping fail %s", err.Error()), + }) + return + } + c.JSON(200, map[string]any{ + "requestId": body.RequestId, + "operResult": info, + }) + return + } + + // 端到端Trace检测 + if body.OperType == "trace" { + p := ping.Ping{} + p.CopyFrom(operPara) + info, err := p.StatsRtt() + if err != nil { + c.JSON(500, map[string]any{ + "requestId": body.RequestId, + "errorInfo": fmt.Sprintf("ping fail %s", err.Error()), + }) + return + } + c.JSON(200, map[string]any{ + "requestId": body.RequestId, + "operResult": info, + }) + return + } + + c.JSON(500, map[string]any{ + "requestId": body.RequestId, + "errorInfo": "operType not found", + }) +} + +// 业务连续性功能控制接口 +// +// POST /:apiVersion/servicecontinuity +func (s *SystemController) ServiceContinuity(c *gin.Context) { + language := ctx.AcceptLanguage(c) + var body struct { + RequestId string `json:"requestId" binding:"required"` + OperType int `json:"operType" binding:"required"` + } + if err := c.ShouldBindJSON(&body); err != nil { + c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + return + } + + if body.OperType > 1 { + c.JSON(500, map[string]any{ + "requestId": body.RequestId, + "errorInfo": "operType not support", + }) + return + } + + c.JSON(200, map[string]any{ + "requestId": body.RequestId, + }) +} diff --git a/src/modules/api_rest_cxy/controller/test.go b/src/modules/api_rest_cxy/controller/test.go new file mode 100644 index 0000000..d34d204 --- /dev/null +++ b/src/modules/api_rest_cxy/controller/test.go @@ -0,0 +1,223 @@ +package controller + +import ( + "encoding/json" + "nms_nbi/src/framework/i18n" + "nms_nbi/src/framework/utils/ctx" + "nms_nbi/src/framework/vo/result" + "nms_nbi/src/modules/api_rest_cxy/service" + "nms_nbi/src/modules/api_rest_cxy/utils/kafka" + "nms_nbi/src/modules/api_rest_cxy/utils/oss" + neDataModel "nms_nbi/src/modules/network_data/model" + neService "nms_nbi/src/modules/network_element/service" + + "github.com/gin-gonic/gin" +) + +// 实例化控制层 TestController 结构体 +var NewTest = &TestController{ + neInfoService: neService.NewNeInfoImpl, + alarmService: service.NewAlarmImpl, + configService: service.NewConfigImpl, + performanceService: service.NewPerformanceImpl, +} + +// 测试接口 +// +// PATH /testManagement +type TestController struct { + // 网元信息服务 + neInfoService neService.INeInfo + // 告警数据服务 + alarmService service.IAlarm + // 配置数据处理服务 + configService service.IConfig + // 性能数据处理服务 + performanceService service.IPerformance +} + +// kafka发送消息 +// +// POST /kafka/send +func (s *TestController) KafkaSend(c *gin.Context) { + language := ctx.AcceptLanguage(c) + var body struct { + RequestId string `json:"requestId" binding:"required"` + Topic string `json:"topic" binding:"required"` + Partition int `json:"partition"` + Msg any `json:"msg"` + } + if err := c.ShouldBindJSON(&body); err != nil { + c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + return + } + + bytes, err := json.Marshal(body.Msg) + if err != nil { + c.JSON(400, result.CodeMsg(400, err.Error())) + return + } + + // 发送消息 + partition, offset, err := kafka.KInitConm.MessageSyncSend(body.Topic, int32(body.Partition), string(bytes)) + if err != nil { + c.JSON(200, map[string]any{ + "requestId": body.RequestId, + "msg": body.Msg, + "error": err.Error(), + }) + return + } + c.JSON(200, map[string]any{ + "requestId": body.RequestId, + "partition": partition, + "offset": offset, + }) +} + +// kafka读取并消费消息 +// +// POST /kafka/readMark +func (s *TestController) KafkaReadMark(c *gin.Context) { + language := ctx.AcceptLanguage(c) + var body struct { + RequestId string `json:"requestId" binding:"required"` + Topic string `json:"topic" binding:"required"` + Partition int `json:"partition"` + Group string `json:"group" binding:"required"` + } + if err := c.ShouldBindJSON(&body); err != nil { + c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + return + } + + // 消费消息 + msg, err := kafka.KInitConm.MessageSyncReadMark(body.Topic, int32(body.Partition), body.Group) + if err != nil { + c.JSON(200, map[string]any{ + "requestId": body.RequestId, + "error": err.Error(), + }) + return + } + c.JSON(200, map[string]any{ + "requestId": body.RequestId, + "msg": msg, + }) +} + +// 告警列表 +// +// POST /alarm/list +func (s *TestController) AlarmList(c *gin.Context) { + language := ctx.AcceptLanguage(c) + var body struct { + RequestId string `json:"requestId" binding:"required"` + Ruid string `json:"ruid" binding:"required"` + AlarmLevel string `json:"alarmLevel" form:"alarmLevel"` // 告警类型 1: Critical, 2: Major, 3: Minor, 4: Warning, 5: Event(Only VNF) + StartTime string `json:"startTime" form:"startTime"` + EndTime string `json:"endTime" form:"endTime"` + SortField string `json:"sortField" form:"sortField" binding:"omitempty,oneof=timestamp"` // 排序字段,填写结果字段 + SortOrder string `json:"sortOrder" form:"sortOrder" binding:"omitempty,oneof=asc desc"` // 排序升降序,asc desc + PageNum int64 `json:"pageNum" form:"pageNum" binding:"required"` + PageSize int64 `json:"pageSize" form:"pageSize" binding:"required"` + } + if err := c.ShouldBindJSON(&body); err != nil { + c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + return + } + + neInfo := s.neInfoService.SelectNeInfoByRmuid(body.Ruid) + if neInfo.RmUID != body.Ruid { + c.JSON(500, map[string]any{ + "requestId": body.RequestId, + "errorInfo": "NE not exist", + }) + return + } + + rows, total := s.alarmService.SelectPage(neDataModel.AlarmQuery{ + PageNum: body.PageNum, + PageSize: body.PageSize, + SortField: body.SortField, + SortOrder: body.SortOrder, + StartTime: body.StartTime, + EndTime: body.EndTime, + RmUID: neInfo.RmUID, + NeType: neInfo.NeType, + OrigSeverity: body.AlarmLevel, + }) + c.JSON(200, map[string]any{ + "requestId": body.RequestId, + "rows": rows, + "total": total, + }) +} + +// OSS文件上传 +// +// POST /oss/upload +func (s *TestController) OssUpload(c *gin.Context) { + language := ctx.AcceptLanguage(c) + var body struct { + RequestId string `json:"requestId" binding:"required"` + FilePath string `json:"filePath" binding:"required"` + NewFilePath string `json:"newFilePath" binding:"required"` + } + if err := c.ShouldBindJSON(&body); err != nil { + c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + return + } + // 消费消息 + msg, err := oss.FileUploadFileByFilePath(body.FilePath, body.NewFilePath) + if err != nil { + c.JSON(200, map[string]any{ + "requestId": body.RequestId, + "error": err.Error(), + }) + return + } + c.JSON(200, map[string]any{ + "requestId": body.RequestId, + "msg": msg, + }) +} + +// OSS文件上传测试上报 +// +// POST /oss/testUp +func (s *TestController) OssTestUp(c *gin.Context) { + language := ctx.AcceptLanguage(c) + var body struct { + RequestId string `json:"requestId" binding:"required"` + OperType string `json:"operType" binding:"required"` + } + if err := c.ShouldBindJSON(&body); err != nil { + c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400"))) + return + } + + if body.OperType == "config" { + err := s.configService.ConfigUploadOSS("OMC") + c.JSON(200, map[string]any{ + "requestId": body.RequestId, + "operType": body.OperType, + "err": err, + }) + return + } + if body.OperType == "performance" { + err := s.performanceService.PerformanceUploadOSS("AMF") + c.JSON(200, map[string]any{ + "requestId": body.RequestId, + "operType": body.OperType, + "err": err, + }) + return + } + + c.JSON(200, map[string]any{ + "requestId": body.RequestId, + "operType": body.OperType, + }) +} diff --git a/src/modules/api_rest_cxy/model/alarm.go b/src/modules/api_rest_cxy/model/alarm.go new file mode 100644 index 0000000..62ee30c --- /dev/null +++ b/src/modules/api_rest_cxy/model/alarm.go @@ -0,0 +1,19 @@ +package model + +// 告警数据结构体 +type Alarm struct { + AlarmSeq string `json:"alarmSeq"` // 告警原始序号 + AlarmTitle string `json:"alarmTitle"` // 告警标题 + AlarmStatus int64 `json:"alarmStatus"` // 告警状态(1:活动告警;0:清除告警) + AlarmType string `json:"alarmType"` // 告警类型 + AlarmLevel int64 `json:"alarmLevel"` // 原始告警级别(1:一级告警;2:二级告警;3:三级告警;4:四级告警) + EventTime string `json:"eventTime"` // 事件发生时间 + AlarmId int64 `json:"alarmId"` // 告警事件Id + CauseID string `json:"causeID"` // 告警问题原因ID + Cause string `json:"cause"` // 告警问题原因 + NeRUID string `json:"neRUID"` // 告警网元RUID + NeUserLabel string `json:"neUserLabel"` // 告警网元名称 + ObjectRUID string `json:"objectRUID"` // 告警对象RUID + ObjectUserLabel string `json:"objectUserLabel"` // 告警对象名称 + AddInfo string `json:"addInfo"` // 告警辅助信息 +} diff --git a/src/modules/api_rest_cxy/service/alarm.go b/src/modules/api_rest_cxy/service/alarm.go new file mode 100644 index 0000000..e51e6ef --- /dev/null +++ b/src/modules/api_rest_cxy/service/alarm.go @@ -0,0 +1,18 @@ +package service + +import ( + "nms_nbi/src/modules/api_rest_cxy/model" + neDataModel "nms_nbi/src/modules/network_data/model" +) + +// 告警数据处理服务 服务层接口 +type IAlarm interface { + // SelectPage 根据条件分页查询 + SelectPage(querys neDataModel.AlarmQuery) ([]model.Alarm, int64) + + // KafkaPush 推送数据Kafka + KafkaPush(alarm neDataModel.Alarm) error + + // AlarmDataToFile 告警数据写入文件 + AlarmDataToFile(requestId, neType, rmUID, startTime, endTime string) (int64, string) +} diff --git a/src/modules/api_rest_cxy/service/alarm.impl.go b/src/modules/api_rest_cxy/service/alarm.impl.go new file mode 100644 index 0000000..3733dfb --- /dev/null +++ b/src/modules/api_rest_cxy/service/alarm.impl.go @@ -0,0 +1,164 @@ +package service + +import ( + "encoding/json" + "fmt" + "nms_nbi/src/framework/constants/uploadsubpath" + "nms_nbi/src/framework/logger" + "nms_nbi/src/framework/utils/date" + "nms_nbi/src/framework/utils/file" + "nms_nbi/src/framework/utils/parse" + "nms_nbi/src/modules/api_rest_cxy/model" + "nms_nbi/src/modules/api_rest_cxy/utils/common" + "nms_nbi/src/modules/api_rest_cxy/utils/kafka" + neDataModel "nms_nbi/src/modules/network_data/model" + neDataService "nms_nbi/src/modules/network_data/service" +) + +// 实例化数据层 AlarmImpl 结构体 +var NewAlarmImpl = &AlarmImpl{ + alarmService: neDataService.NewAlarmImpl, + alarmLevel: map[string]int64{ + "Critical": 1, + "Major": 2, + "Minor": 3, + "Warning": 4, + }, +} + +// 告警数据处理服务 服务层处理 +type AlarmImpl struct { + // 告警信息服务 + alarmService neDataService.IAlarm + // 原始告警级别 1:一级告警;2:二级告警;3:三级告警;4:四级告警 + alarmLevel map[string]int64 +} + +// SelectPage 根据条件分页查询 +func (s *AlarmImpl) SelectPage(querys neDataModel.AlarmQuery) ([]model.Alarm, int64) { + dataArr := []model.Alarm{} + // 查询数据 + pageData := s.alarmService.SelectPage(querys) + total := parse.Number(pageData["total"]) + rows, ok := pageData["rows"].([]neDataModel.Alarm) + if !ok || len(rows) <= 0 { + return dataArr, total + } + + for _, v := range rows { + item := s.parseAlarmData(v) + dataArr = append(dataArr, item) + } + return dataArr, total +} + +// parseAlarmData 解析转换为指定的告警对象 +func (s *AlarmImpl) parseAlarmData(v neDataModel.Alarm) model.Alarm { + alarmLevel, ok := s.alarmLevel[v.OrigSeverity] + if !ok { + alarmLevel = 0 + } + return model.Alarm{ + AlarmSeq: v.AlarmSeq, + AlarmTitle: v.AlarmTitle, + AlarmStatus: parse.Number(v.AlarmStatus), + AlarmType: v.AlarmType, + AlarmLevel: alarmLevel, + EventTime: date.ParseDateToStr(v.EventTime, date.YYYY_MM_DD_HH_MM_SS), + AlarmId: parse.Number(v.ID), + CauseID: v.AlarmCode, + Cause: v.SpecificProblem, + NeRUID: v.ObjectUid, + NeUserLabel: v.NeName, + ObjectRUID: v.AlarmId, + ObjectUserLabel: v.ObjectName, + AddInfo: v.AddInfo, + } +} + +// KafkaPush 推送数据Kafka +func (s *AlarmImpl) KafkaPush(alarm neDataModel.Alarm) error { + if alarm.AlarmTitle == "" { + return nil + } + + // 数据序列化 + data := s.parseAlarmData(alarm) + bytes, err := json.Marshal(data) + if err != nil { + logger.Errorf("KafkaPush parseAlarmData err => %s", err.Error()) + return err + } + + // 订阅topic,名称为:专业编码-厂商编码-OMC编号-数据类别(固定为FM) + basePath := common.BasePath("-") + topic := fmt.Sprintf("%s-FM", basePath) + + // 发送消息 + partition, offset, err := kafka.KInitConm.MessageSyncSend(topic, 0, string(bytes)) + if err != nil { + logger.Errorf("KafkaPush MessageSyncSend err => %s", err.Error()) + return err + } + logger.Infof("KafkaPush MessageSyncSend Partition:%d, Offset:%d", partition, offset) + return err +} + +// AlarmDataToFile 告警数据写入文件 +func (s *AlarmImpl) AlarmDataToFile(requestId, neType, rmUID, startTime, endTime string) (int64, string) { + var pageNum int64 = 1 + var pageSize int64 = 20 + dataArr := []any{} + + // 查询数据 + pageData := s.alarmService.SelectPage(neDataModel.AlarmQuery{ + PageNum: pageNum, + PageSize: pageSize, + StartTime: startTime, + EndTime: endTime, + RmUID: rmUID, + NeType: neType, + }) + + total := parse.Number(pageData["total"]) + rows, ok := pageData["rows"].([]neDataModel.Alarm) + if !ok || len(rows) <= 0 { + return int64(len(dataArr)), "" + } + for _, v := range rows { + dataArr = append(dataArr, s.parseAlarmData(v)) + } + + // 按总数分批遍历 + batchNum := total / pageSize + for i := 1; i <= int(batchNum); i++ { + // 查询数据 + pageData := s.alarmService.SelectPage(neDataModel.AlarmQuery{ + PageNum: int64(i), + PageSize: pageSize, + StartTime: startTime, + EndTime: endTime, + RmUID: rmUID, + NeType: neType, + }) + rows, ok := pageData["rows"].([]neDataModel.Alarm) + if !ok || len(rows) <= 0 { + return int64(len(dataArr)), "" + } + for _, v := range rows { + dataArr = append(dataArr, s.parseAlarmData(v)) + } + } + + // 文件名 + dataSaveFileName := common.DataSaveFileName{ + RequestId: requestId, + } + fileName := dataSaveFileName.JSON() + filePath := fmt.Sprintf("%s/%s", file.ParseUploadFileDir(uploadsubpath.EXPORT), fileName) + err := file.WriterFileJSONLine(dataArr, filePath) + if err != nil { + return int64(len(dataArr)), "" + } + return int64(len(dataArr)), filePath +} diff --git a/src/modules/api_rest_cxy/service/config.go b/src/modules/api_rest_cxy/service/config.go new file mode 100644 index 0000000..76fa01e --- /dev/null +++ b/src/modules/api_rest_cxy/service/config.go @@ -0,0 +1,7 @@ +package service + +// 配置数据处理服务 服务层接口 +type IConfig interface { + // ConfigUploadOSS 配置数据上报 + ConfigUploadOSS(neType string) error +} diff --git a/src/modules/api_rest_cxy/service/config.impl.go b/src/modules/api_rest_cxy/service/config.impl.go new file mode 100644 index 0000000..49a7689 --- /dev/null +++ b/src/modules/api_rest_cxy/service/config.impl.go @@ -0,0 +1,32 @@ +package service + +import ( + "fmt" + "nms_nbi/src/framework/constants/uploadsubpath" + "nms_nbi/src/framework/utils/file" + "nms_nbi/src/modules/api_rest_cxy/utils/common" +) + +// 实例化数据层 ConfigImpl 结构体 +var NewConfigImpl = &ConfigImpl{} + +// 配置数据处理服务 服务层处理 +type ConfigImpl struct{} + +// ConfigUploadOSS 配置数据上报 +func (s *ConfigImpl) ConfigUploadOSS(neType string) error { + // 配置文件目录 + dir := "C:\\usr\\local\\omc\\upload\\export\\2023\\12" + + // 文件名 + dataSaveFileName := common.DataSaveFileName{} + fileName := dataSaveFileName.ZIP() + filePath := fmt.Sprintf("%s/%s", file.ParseUploadFileDir(uploadsubpath.EXPORT), fileName) + + err := file.CompressZipByDir(filePath, dir) + if err != nil { + return err + } + + return common.UploadOSSByZip(filePath, neType, "CM") +} diff --git a/src/modules/api_rest_cxy/service/log.go b/src/modules/api_rest_cxy/service/log.go new file mode 100644 index 0000000..e8ff89c --- /dev/null +++ b/src/modules/api_rest_cxy/service/log.go @@ -0,0 +1,10 @@ +package service + +// 日志数据处理服务 服务层接口 +type ILog interface { + // OperateLogToFile 操作日志写入文件 + OperateLogToFile(language, startTime, endTime string) (int64, string) + + // SecurityLogToFile 安全日志写入文件 + SecurityLogToFile(language, startTime, endTime string) (int64, string) +} diff --git a/src/modules/api_rest_cxy/service/log.impl.go b/src/modules/api_rest_cxy/service/log.impl.go new file mode 100644 index 0000000..749e511 --- /dev/null +++ b/src/modules/api_rest_cxy/service/log.impl.go @@ -0,0 +1,154 @@ +package service + +import ( + "fmt" + "nms_nbi/src/framework/constants/uploadsubpath" + "nms_nbi/src/framework/i18n" + "nms_nbi/src/framework/utils/file" + "nms_nbi/src/framework/utils/parse" + "nms_nbi/src/modules/api_rest_cxy/utils/common" + systemModel "nms_nbi/src/modules/system/model" + systemService "nms_nbi/src/modules/system/service" +) + +// 实例化数据层 LogImpl 结构体 +var NewLogImpl = &LogImpl{ + sysLogOperateService: systemService.NewSysLogOperateImpl, + sysLogLoginService: systemService.NewSysLogLoginImpl, +} + +// 日志数据处理服务 服务层处理 +type LogImpl struct { + // 操作日志服务 + sysLogOperateService systemService.ISysLogOperate + // 系统登录日志服务 + sysLogLoginService systemService.ISysLogLogin +} + +// OperateLogToFile 操作日志写入文件 +func (s *LogImpl) OperateLogToFile(language, startTime, endTime string) (int64, string) { + var pageNum int64 = 1 + var pageSize int64 = 20 + dataArr := []any{} + + // 查询数据 + pageData := s.sysLogOperateService.SelectSysLogOperatePage(map[string]any{ + "beginTime": startTime, + "endTime": endTime, + "pageNum": pageNum, + "pageSize": pageSize, + }) + + total := parse.Number(pageData["total"]) + rows, ok := pageData["rows"].([]systemModel.SysLogOperate) + if !ok || len(rows) <= 0 { + return 0, "" + } + for _, v := range rows { + v.Title = i18n.TKey(language, v.Title) + v.OperLocation = i18n.TKey(language, v.OperLocation) + v.Method = "-" + dataArr = append(dataArr, v) + } + pageNum += 1 + + // 按总数分批遍历 + batchNum := total / pageSize + for i := 1; i <= int(batchNum); i++ { + // 查询数据 + pageData := s.sysLogOperateService.SelectSysLogOperatePage(map[string]any{ + "beginTime": startTime, + "endTime": endTime, + "pageNum": pageNum, + "pageSize": pageSize, + }) + rows, ok := pageData["rows"].([]systemModel.SysLogOperate) + if !ok || len(rows) <= 0 { + return int64(len(dataArr)), "" + } + for _, v := range rows { + v.Title = i18n.TKey(language, v.Title) + v.OperLocation = i18n.TKey(language, v.OperLocation) + v.Method = "-" + dataArr = append(dataArr, v) + } + pageNum += 1 + } + + // 文件名 + dataSaveFileName := common.DataSaveFileName{ + LogType: "operate", + } + fileName := dataSaveFileName.JSON() + filePath := fmt.Sprintf("%s/%s", file.ParseUploadFileDir(uploadsubpath.EXPORT), fileName) + err := file.WriterFileJSONLine(dataArr, filePath) + if err != nil { + return int64(len(dataArr)), "" + } + return int64(len(dataArr)), filePath +} + +// SecurityLogToFile 安全日志写入文件 +func (s *LogImpl) SecurityLogToFile(language, startTime, endTime string) (int64, string) { + var pageNum int64 = 1 + var pageSize int64 = 20 + dataArr := []any{} + + // 查询数据 + pageData := s.sysLogLoginService.SelectSysLogLoginPage(map[string]any{ + "beginTime": startTime, + "endTime": endTime, + "pageNum": pageNum, + "pageSize": pageSize, + }) + + total := parse.Number(pageData["total"]) + rows, ok := pageData["rows"].([]systemModel.SysLogLogin) + if !ok || len(rows) <= 0 { + return 0, "" + } + for _, v := range rows { + v.LoginLocation = i18n.TKey(language, v.LoginLocation) + v.OS = i18n.TKey(language, v.OS) + v.Browser = i18n.TKey(language, v.Browser) + v.Msg = i18n.TKey(language, v.Msg) + dataArr = append(dataArr, v) + } + pageNum += 1 + + // 按总数分批遍历 + batchNum := total / pageSize + for i := 1; i <= int(batchNum); i++ { + // 查询数据 + pageData := s.sysLogLoginService.SelectSysLogLoginPage(map[string]any{ + "beginTime": startTime, + "endTime": endTime, + "pageNum": pageNum, + "pageSize": pageSize, + }) + rows, ok := pageData["rows"].([]systemModel.SysLogLogin) + if !ok || len(rows) <= 0 { + return int64(len(dataArr)), "" + } + for _, v := range rows { + v.LoginLocation = i18n.TKey(language, v.LoginLocation) + v.OS = i18n.TKey(language, v.OS) + v.Browser = i18n.TKey(language, v.Browser) + v.Msg = i18n.TKey(language, v.Msg) + dataArr = append(dataArr, v) + } + pageNum += 1 + } + + // 文件名 + dataSaveFileName := common.DataSaveFileName{ + LogType: "security", + } + fileName := dataSaveFileName.JSON() + filePath := fmt.Sprintf("%s/%s", file.ParseUploadFileDir(uploadsubpath.EXPORT), fileName) + err := file.WriterFileJSONLine(dataArr, filePath) + if err != nil { + return int64(len(dataArr)), "" + } + return int64(len(dataArr)), filePath +} diff --git a/src/modules/api_rest_cxy/service/performance.go b/src/modules/api_rest_cxy/service/performance.go new file mode 100644 index 0000000..1b56321 --- /dev/null +++ b/src/modules/api_rest_cxy/service/performance.go @@ -0,0 +1,7 @@ +package service + +// 性能数据处理服务 服务层接口 +type IPerformance interface { + // PerformanceUploadOSS 性能数据上报 + PerformanceUploadOSS(neType string) error +} diff --git a/src/modules/api_rest_cxy/service/performance.impl.go b/src/modules/api_rest_cxy/service/performance.impl.go new file mode 100644 index 0000000..a2ac11d --- /dev/null +++ b/src/modules/api_rest_cxy/service/performance.impl.go @@ -0,0 +1,55 @@ +package service + +import ( + "fmt" + "nms_nbi/src/framework/constants/uploadsubpath" + "nms_nbi/src/framework/utils/file" + "nms_nbi/src/modules/api_rest_cxy/utils/common" + neDataModel "nms_nbi/src/modules/network_data/model" + neDataService "nms_nbi/src/modules/network_data/service" +) + +// 实例化数据层 PerformanceImpl 结构体 +var NewPerformanceImpl = &PerformanceImpl{ + perfKPIService: neDataService.NewPerfKPIImpl, +} + +// 性能数据处理服务 服务层处理 +type PerformanceImpl struct { + // 统计信息服务 + perfKPIService neDataService.IPerfKPI +} + +// PerformanceUploadOSS 性能数据上报 +func (s *PerformanceImpl) PerformanceUploadOSS(neType string) error { + query := neDataModel.GoldKPIQuery{ + NeType: neType, + StartTime: "2024-02-15 10:16:30", + EndTime: "2024-03-07 10:20:08", + Interval: 900, + } + + data := s.perfKPIService.SelectGoldKPI(query) + dataArr := [][]string{} + for i, v := range data { + if i == 0 { + dataItem := []string{} + dataItem[0] = fmt.Sprint(v["AMF.25"]) + dataArr = append(dataArr, dataItem) + } + dataItem := []string{} + dataItem[0] = fmt.Sprint(v["AMF.25"]) + dataArr = append(dataArr, dataItem) + } + + // 文件名 + dataSaveFileName := common.DataSaveFileName{} + fileName := dataSaveFileName.CSV() + filePath := fmt.Sprintf("%s/%s", file.ParseUploadFileDir(uploadsubpath.EXPORT), fileName) + err := file.WriterFileCSV(dataArr, filePath) + if err != nil { + return err + } + + return common.UploadOSSByZip(filePath, neType, "PM") +} diff --git a/src/modules/api_rest_cxy/service/resource.go b/src/modules/api_rest_cxy/service/resource.go new file mode 100644 index 0000000..da1968e --- /dev/null +++ b/src/modules/api_rest_cxy/service/resource.go @@ -0,0 +1,7 @@ +package service + +// 资源数据处理服务 服务层接口 +type IResource interface { + // ResourceDataToFile 资源数据写入文件 + ResourceDataToFile(requestId, neType, rmUID, startTime, endTime string) (int64, string) +} diff --git a/src/modules/api_rest_cxy/service/resource.impl.go b/src/modules/api_rest_cxy/service/resource.impl.go new file mode 100644 index 0000000..a4269de --- /dev/null +++ b/src/modules/api_rest_cxy/service/resource.impl.go @@ -0,0 +1,13 @@ +package service + +// 实例化数据层 ResourceImpl 结构体 +var NewResourceImpl = &ResourceImpl{} + +// 资源数据处理服务 服务层处理 +type ResourceImpl struct{} + +// ResourceDataToFile 资源数据写入文件 +func (s *ResourceImpl) ResourceDataToFile(requestId, neType, rmUID, startTime, endTime string) (int64, string) { + + return 0, "" +} diff --git a/src/modules/api_rest_cxy/utils/common/common.go b/src/modules/api_rest_cxy/utils/common/common.go new file mode 100644 index 0000000..78a5d33 --- /dev/null +++ b/src/modules/api_rest_cxy/utils/common/common.go @@ -0,0 +1,153 @@ +package common + +import ( + "fmt" + "nms_nbi/src/framework/config" + "nms_nbi/src/framework/utils/date" + "nms_nbi/src/framework/utils/file" + "nms_nbi/src/modules/api_rest_cxy/utils/oss" + "path/filepath" + "strings" + "time" +) + +// DataSaveFileName 数据文件命名要求 +// +// 资源数据文件、性能数据文件的命名规则如下: +// <资源对象类型简称>-<数据版本>-<数据时间>-<日志类型>-<同步请求标识>--<序列号>.<后缀> +// 告警同步数据文件、配置数据文件、日志数据文件的命名规则如下: +// <网元资源对象RUID中的“资源对象编号”>-<数据版本>-<数据时间>-<日志类型>-<同步请求标识>--<序列号>.<后缀> +// +// –resCode 资源对象类型简称、网元资源对象RUID中的“资源对象编号”:参考本文档附件1。 +// –version 数据版本:遵循数据模型规范版本,允许同一目录下存放不同版本的数据文件。 +// –dateStr 数据时间:采用YYYYMMDDHH24MMSS格式。资源、告警、配置、日志数据为数据文件开始生成时间,其中资源数据文件和配置数据文件周期性上报,生成时间为每天的0点和12点;性能数据为统计周期的起始时间,周期性上报,周期为15分钟。 +// –logType 日志类型:可选,只针对日志文件,包括operate(操作日志)、security(安全日志)、other(其他)。 +// –requestId 同步请求标识:可选,仅在告警文件同步时使用,用于唯一标识一次同步请求。标识规则是“请求操作ID”。“请求操作ID”是同步请求消息中消息序号(requestId)。 +// –Ri:可选。当接口数据文件内容有误(如数据缺失等)时,OMC重新生成文件提供给NMS进行数据补采,新文件增加“Ri”进行标识,i从1开始,每重新生成一次i加1。 +// –Num 序列号:可选。当文件总量小于100MB(允许上下浮动10%)时,应只形成一个文件;当文件总量大于100MB(允许上下浮动10%)时要求进行文件分割,即分割后的文件大小(除最后一个)均应介于(90MB,110MB)之间。分割后的文件增加序列号标识,序列号为三位,取值为001-999。在文件切分过程中,不能把一条完整的记录切开放到两个文件中。 +// –suffix 后缀:每个文件都进行压缩,统一采用zip或gzip压缩,压缩文件后缀是zip或gz。对于压缩前的JSON格式文件,后缀为JSON。 +// +// 文件名示例: +// 资源文件:CLL-V1.0-20151227000000-001.zip +// 性能文件:CLL-V1.0-20151227000000-001.zip +// 告警文件:0000000001153492-V1.0-20150611011603-001.zip +// 日志文件:0000000001153492-V1.0-20150611011603-operate.zip +// 配置文件:0000000001153492-V1.0-20150611011600.zip +type DataSaveFileName struct { + ResCode string // 资源对象类型简称(资源数据文件、性能数据文件) => 网元类型 资源对象编号(告警同步数据文件、配置数据文件、日志数据文件) => 设备序列号 + Version string // 数据版本 => 网元版本 固定1.0 + DateStr string // 数据时间 采用YYYYMMDDHH24MMSS格式 + LogType string // 可选 日志类型 只针对日志文件,包括operate(操作日志)、security(安全日志)、other(其他) + RequestId string // 可选 同步请求标识 仅在告警文件同步时使用 + Ri string // 可选 数据补采 当接口数据文件内容有误(如数据缺失等)时 + Num string // 可选 序列号 当文件总量大于100MB进行分割,之后的文件增加序列号标识,序列号为三位,取值为001-999 +} + +// join 将参数进行连接-分隔 +func (s *DataSaveFileName) join() string { + nameArr := []string{} + + if s.ResCode != "" { + nameArr = append(nameArr, s.ResCode) + } else { + serialNumber := config.Get("cxy.serialNumber").(string) + serialNumber = fmt.Sprintf("%0*s", 16, serialNumber) + nameArr = append(nameArr, serialNumber) + } + + if s.Version != "" { + nameArr = append(nameArr, "V"+s.Version) + } else { + nameArr = append(nameArr, "V1.0") + } + + if s.DateStr != "" { + nameArr = append(nameArr, s.DateStr) + } else { + nameArr = append(nameArr, date.ParseDateToStr(time.Now(), date.YYYYMMDDHHMMSS)) + } + + if s.LogType != "" { + nameArr = append(nameArr, s.LogType) + } + if s.RequestId != "" { + nameArr = append(nameArr, s.RequestId) + } + if s.Ri != "" { + nameArr = append(nameArr, s.Ri) + } + if s.Num != "" { + nameArr = append(nameArr, s.Num) + } + return strings.Join(nameArr, "-") +} + +// JSON 生成json后缀文件名 +func (s *DataSaveFileName) JSON() string { + filename := s.join() + return fmt.Sprintf("%s.json", filename) +} + +// ZIP 生成zip后缀文件名 +func (s *DataSaveFileName) ZIP() string { + filename := s.join() + return fmt.Sprintf("%s.zip", filename) +} + +// CSV 生成csv后缀文件名 +func (s *DataSaveFileName) CSV() string { + filename := s.join() + return fmt.Sprintf("%s.csv", filename) +} + +// DataSaveFilePath 文件数据保存路径 +// +// 文件数据集中存储在云平台所提供的对象存储数据库OSS中。文件数据保存路径如下: +// 路径说明:/专业编码/OMC厂商编码/OMC编号/网元类型/数据类别/时间 +// –专业编码、OMC厂商编码、OMC编号、网元类型:请参考本文档附件1。 +// –数据类别:NRM(资源)/ PM(性能)/CM(配置)/FM(告警)/LOG(日志)。 +// –时间:资源、告警、配置、日志数据按天存放,时间格式为YYYYMMDD;性能数据按小时存放,时间格式为YYYYMMDDHH24,小时数为性能数据统计起始时间。 +func DataSaveFilePath(neType, dataType string) string { + basePath := BasePath("/") + dateStr := date.ParseDateToStr(time.Now(), "20060102") + if dataType == "PM" { + dateStr = date.ParseDateToStr(time.Now(), "200601021504") + } + return fmt.Sprintf("%s/%s/%s/%s", basePath, neType, dataType, dateStr) +} + +// BasePath 基础路径 separator 分隔符 +// +// 专业编码{separator}OMC厂商编码{separator}OMC编号 +func BasePath(separator string) string { + professionCode := config.Get("cxy.professionCode").(string) + vendorCode := config.Get("cxy.vendorCode").(string) + omcCode := config.Get("cxy.omcCode").(string) + return fmt.Sprintf("%s%s%s%s%s", professionCode, separator, vendorCode, separator, omcCode) +} + +// UploadOSSByJSONToZip 将json文件压缩zip上传到oss +func UploadOSSByJSONToZip(filePath, neType, dataType string) error { + dirPath := DataSaveFilePath(neType, dataType) + // 添加到zip压缩文件 + zipFilePath := strings.Replace(filePath, ".json", ".zip", 1) + err := file.CompressZipByFile(zipFilePath, filePath) + if err != nil { + return err + } + // 组成上传文件路径名 + zipFileName := filepath.Base(zipFilePath) + newFileName := fmt.Sprintf("%s/%s", dirPath, zipFileName) + oss.FileUploadFileByFilePath(zipFilePath, newFileName) + return nil +} + +// UploadOSSByZip 将zip文件上传到oss +func UploadOSSByZip(filePath, neType, dataType string) error { + dirPath := DataSaveFilePath(neType, dataType) + // 组成上传文件路径名 + zipFileName := filepath.Base(filePath) + newFileName := fmt.Sprintf("%s/%s", dirPath, zipFileName) + oss.FileUploadFileByFilePath(filePath, newFileName) + return nil +} diff --git a/src/modules/api_rest_cxy/utils/kafka/kafka.go b/src/modules/api_rest_cxy/utils/kafka/kafka.go new file mode 100644 index 0000000..03cf1f4 --- /dev/null +++ b/src/modules/api_rest_cxy/utils/kafka/kafka.go @@ -0,0 +1,163 @@ +package kafka + +import ( + "context" + "nms_nbi/src/framework/config" + "nms_nbi/src/framework/logger" + "time" + + "github.com/IBM/sarama" +) + +// KInitConm Kafka初始配置连接实例 +var KInitConm Kafka + +// InitConfig 连接Kafka实例 +func InitConfig() { + // 服务地址 + addrs := []string{} + addrsArr := config.Get("cxy.kafka.addrs").([]any) + for _, v := range addrsArr { + addrs = append(addrs, v.(string)) + } + k := Kafka{ + Addrs: addrs, + } + k.NewConfig() + k.Config.Net.SASL.Enable = false + KInitConm = k +} + +// Kafka 实例连接 +type Kafka struct { + Addrs []string + Config *sarama.Config +} + +func (k *Kafka) NewConfig() { + // 设置Kafka配置 + config := sarama.NewConfig() + // 生产者 + config.Producer.RequiredAcks = sarama.WaitForAll + config.Producer.Retry.Max = 5 + config.Producer.Return.Successes = true + + k.Config = config +} + +// MessageSyncSend 消息同步发送 +func (k *Kafka) MessageSyncSend(topic string, partition int32, msg string) (int32, int64, error) { + // 创建Kafka生产者 + producer, err := sarama.NewSyncProducer(k.Addrs, k.Config) + if err != nil { + logger.Warnf("MessageSyncSend NewSyncProducer Topic:%s, Partition:%d => %s", topic, partition, err.Error()) + return partition, 0, err + } + defer producer.Close() + + // 发送消息 + message := &sarama.ProducerMessage{ + Topic: topic, + Partition: partition, + Value: sarama.StringEncoder(msg), + } + partition, offset, err := producer.SendMessage(message) + if err != nil { + logger.Errorf("MessageSyncSend SendMessage Topic:%s, Partition:%d => %s", topic, partition, err.Error()) + } + return partition, offset, err +} + +// MessageSyncRead 消息同步读取-最旧历史积压 +func (k *Kafka) MessageSyncRead(topic string, partition int32) (string, error) { + // 创建Kafka消费者 + consumer, err := sarama.NewConsumer(k.Addrs, k.Config) + if err != nil { + logger.Warnf("MessageSyncRead NewConsumer Topic:%s, Partition:%d => %s", topic, partition, err.Error()) + return "", err + } + defer consumer.Close() + + // 消费消息 + partitionConsumer, err := consumer.ConsumePartition(topic, partition, sarama.OffsetOldest) + if err != nil { + logger.Errorf("MessageSyncRead ConsumePartition Topic:%s, Partition:%d => %s", topic, partition, err.Error()) + return "", err + } + defer partitionConsumer.Close() + + // 设置超时时间为1秒 + timeout := time.After(1 * time.Second) + for { + select { + case msg := <-partitionConsumer.Messages(): + return string(msg.Value), nil + case <-timeout: + logger.Infof("MessageSyncRead timeout Topic:%s, Partition:%d", topic, partition) + return "", nil + } + } +} + +// MessageSyncReadMark 消息同步读取并消费-最旧历史积压 +func (k *Kafka) MessageSyncReadMark(topic string, partition int32, group string) (string, error) { + consumerGroup, err := sarama.NewConsumerGroup(k.Addrs, group, k.Config) + if err != nil { + logger.Warnf("MessageSyncReadMark NewConsumerGroup Topic:%s, Partition:%d, Group:%s => %s", topic, partition, group, err.Error()) + return "", err + } + defer consumerGroup.Close() + + consumer := readAndMarkConsumerGroup{ + MessageChan: make(chan []byte, 1), + } + + // `Consume` should be called inside an infinite loop, when a + // server-side rebalance happens, the consumer session will need to be + // recreated to get the new claims + err = consumerGroup.Consume(context.Background(), []string{topic}, consumer) + if err != nil { + logger.Errorf("MessageSyncReadMark Consume Topic:%s, Partition:%d, Group:%s => %s", topic, partition, group, err.Error()) + return "", err + } + + messageByte := <-consumer.MessageChan + return string(messageByte), err +} + +// readAndMarkConsumerGroup 消费组积压消息读取立即标记 +type readAndMarkConsumerGroup struct { + MessageChan chan []byte // 消息通道 +} + +func (readAndMarkConsumerGroup) Setup(_ sarama.ConsumerGroupSession) error { return nil } +func (readAndMarkConsumerGroup) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } +func (g readAndMarkConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + // 设置超时时间为1秒 + timeout := time.After(1 * time.Second) + for { + select { + case message, ok := <-claim.Messages(): + if !ok { + logger.Warnf("ConsumeClaim Messages fail Topic:%s, Partition:%d, point:%d", claim.Topic(), claim.Partition(), claim.InitialOffset()) + g.MessageChan <- []byte{} + return nil + } + + session.MarkMessage(message, "") + g.MessageChan <- message.Value + return nil + // Should return when `session.Context()` is done. + // If not, will raise `ErrRebalanceInProgress` or `read tcp :: i/o timeout` when kafka rebalance. see: + // https://github.com/IBM/sarama/issues/1192 + case <-session.Context().Done(): + logger.Infof("ConsumeClaim done Topic:%s, Partition:%d, point:%d", claim.Topic(), claim.Partition(), claim.InitialOffset()) + g.MessageChan <- []byte{} + return nil + case <-timeout: + logger.Infof("ConsumeClaim timeout Topic:%s, Partition:%d, point:%d", claim.Topic(), claim.Partition(), claim.InitialOffset()) + g.MessageChan <- []byte{} + return nil + } + } +} diff --git a/src/modules/api_rest_cxy/utils/oss/oss.go b/src/modules/api_rest_cxy/utils/oss/oss.go new file mode 100644 index 0000000..abdc809 --- /dev/null +++ b/src/modules/api_rest_cxy/utils/oss/oss.go @@ -0,0 +1,52 @@ +package oss + +import ( + "context" + "nms_nbi/src/framework/config" + "nms_nbi/src/framework/logger" + + "github.com/minio/minio-go/v7" + "github.com/minio/minio-go/v7/pkg/credentials" +) + +// OInitConm OSS初始配置连接实例 +var OInitConm *minio.Client + +// InitConfig 连接OSS实例 +func InitConfig() { + // 读取数据源配置 + bucketname := config.Get("cxy.oss.bucketname").(string) + endpoint := config.Get("cxy.oss.endpoint").(string) + useSSL := config.Get("cxy.oss.useSSL").(bool) + accessKeyID := config.Get("cxy.oss.accessKeyID").(string) + secretAccessKey := config.Get("cxy.oss.secretAccessKey").(string) + + // 初始minio客户端对象 + minioClient, err := minio.New(endpoint, &minio.Options{ + Creds: credentials.NewStaticV4(accessKeyID, secretAccessKey, ""), + Secure: useSSL, + }) + if err != nil { + logger.Fatalf("minio %s client err %v", endpoint, err) + } + + // 检查存储桶是否存在可用 + found, err := minioClient.BucketExists(context.Background(), bucketname) + if err != nil { + logger.Fatalf("minio %s bucket [%s] exists err %v", endpoint, bucketname, err) + } + if !found { + logger.Fatalf("minio %s bucket [%s] not found err %v", endpoint, bucketname, err) + } + logger.Infof("minio %s bucket [%s] exists", endpoint, bucketname) + OInitConm = minioClient +} + +// FileUploadFileByFilePath 文件上传-文件绝对路径 +func FileUploadFileByFilePath(filePath, newFileName string) (minio.UploadInfo, error) { + bucketname := config.Get("cxy.oss.bucketname").(string) + contentType := "application/octet-stream" + // Upload the test file with FPutObject + ctx := context.Background() + return OInitConm.FPutObject(ctx, bucketname, newFileName, filePath, minio.PutObjectOptions{ContentType: contentType}) +}