Merge remote-tracking branch 'origin/main' into multi-tenant
This commit is contained in:
@@ -319,16 +319,18 @@ func (s *SysJobController) ResetQueueJob(c *gin.Context) {
|
||||
func (s *SysJobController) Export(c *gin.Context) {
|
||||
language := ctx.AcceptLanguage(c)
|
||||
// 查询结果,根据查询条件结果,单页最大值限制
|
||||
// querys := ctx.BodyJSONMap(c)
|
||||
// data := s.sysJobService.SelectJobPage(querys)
|
||||
// if data["total"].(int64) == 0 {
|
||||
// // 导出数据记录为空
|
||||
// c.JSON(200, result.ErrMsg(i18n.TKey(language, "app.common.exportEmpty")))
|
||||
// return
|
||||
// }
|
||||
// rows := data["rows"].([]model.SysJob)
|
||||
querys := ctx.BodyJSONMap(c)
|
||||
querys["pageNum"] = 1
|
||||
querys["pageSize"] = 10000
|
||||
data := s.sysJobService.SelectJobPage(querys)
|
||||
if parse.Number(data["total"]) == 0 {
|
||||
// 导出数据记录为空
|
||||
c.JSON(200, result.ErrMsg(i18n.TKey(language, "app.common.exportEmpty")))
|
||||
return
|
||||
}
|
||||
rows := data["rows"].([]model.SysJob)
|
||||
|
||||
rows := s.sysJobService.SelectJobList(model.SysJob{})
|
||||
// rows := s.sysJobService.SelectJobList(model.SysJob{})
|
||||
if len(rows) <= 0 {
|
||||
// 导出数据记录为空
|
||||
c.JSON(200, result.ErrMsg(i18n.TKey(language, "app.common.exportEmpty")))
|
||||
|
||||
@@ -131,15 +131,18 @@ func (s *SysJobLogController) Clean(c *gin.Context) {
|
||||
func (s *SysJobLogController) Export(c *gin.Context) {
|
||||
language := ctx.AcceptLanguage(c)
|
||||
// 查询结果,根据查询条件结果,单页最大值限制
|
||||
// querys := ctx.BodyJSONMap(c)
|
||||
// data := s.sysJobLogService.SelectJobLogPage(querys)
|
||||
// if data["total"].(int64) == 0 {
|
||||
// c.JSON(200, result.ErrMsg("Export data record is empty"))
|
||||
// return
|
||||
// }
|
||||
// rows := data["rows"].([]model.SysJobLog)
|
||||
querys := ctx.BodyJSONMap(c)
|
||||
querys["pageNum"] = 1
|
||||
querys["pageSize"] = 10000
|
||||
data := s.sysJobLogService.SelectJobLogPage(querys)
|
||||
if parse.Number(data["total"]) == 0 {
|
||||
// 导出数据记录为空
|
||||
c.JSON(200, result.ErrMsg(i18n.TKey(language, "app.common.exportEmpty")))
|
||||
return
|
||||
}
|
||||
rows := data["rows"].([]model.SysJobLog)
|
||||
|
||||
rows := s.sysJobLogService.SelectJobLogList(model.SysJobLog{})
|
||||
// rows := s.sysJobLogService.SelectJobLogList(model.SysJobLog{})
|
||||
if len(rows) <= 0 {
|
||||
// 导出数据记录为空
|
||||
c.JSON(200, result.ErrMsg(i18n.TKey(language, "app.common.exportEmpty")))
|
||||
|
||||
@@ -2,28 +2,19 @@ package model
|
||||
|
||||
// MonitorBase 监控_基本信息 monitor_base
|
||||
type MonitorBase struct {
|
||||
// id
|
||||
ID int64 `json:"id" gorm:"primaryKey"`
|
||||
// 创建时间
|
||||
CreateTime int64 `json:"createTime"`
|
||||
// cpu使用率
|
||||
CPU float64 `json:"cpu"`
|
||||
// cpu平均使用率
|
||||
LoadUsage float64 `json:"loadUsage"`
|
||||
// cpu使用1分钟
|
||||
CPULoad1 float64 `json:"cpuLoad1"`
|
||||
// cpu使用5分钟
|
||||
CPULoad5 float64 `json:"cpuLoad5"`
|
||||
// cpu使用15分钟
|
||||
CPULoad15 float64 `json:"cpuLoad15"`
|
||||
// 内存使用率
|
||||
Memory float64 `json:"memory"`
|
||||
// 网元ID
|
||||
NeType string `json:"neType"`
|
||||
// 网元类型
|
||||
NeID string `json:"neId"`
|
||||
ID int64 `json:"id" gorm:"column:id;primaryKey;autoIncrement"`
|
||||
CreateTime int64 `json:"createTime" gorm:"create_time"` // 创建时间
|
||||
CPU float64 `json:"cpu" gorm:"cpu"` // cpu使用率
|
||||
LoadUsage float64 `json:"loadUsage" gorm:"load_usage"` // cpu平均使用率
|
||||
CPULoad1 float64 `json:"cpuLoad1" gorm:"cpu_load1"` // cpu使用1分钟
|
||||
CPULoad5 float64 `json:"cpuLoad5" gorm:"cpu_load5"` // cpu使用5分钟
|
||||
CPULoad15 float64 `json:"cpuLoad15" gorm:"cpu_load15"` // cpu使用15分钟
|
||||
Memory float64 `json:"memory" gorm:"memory"` // 内存使用率
|
||||
NeType string `json:"neType" gorm:"ne_type"` // 网元类型
|
||||
NeID string `json:"neId" gorm:"ne_id"` // 网元ID
|
||||
}
|
||||
|
||||
func (MonitorBase) TableName() string {
|
||||
// TableName 表名称
|
||||
func (*MonitorBase) TableName() string {
|
||||
return "monitor_base"
|
||||
}
|
||||
|
||||
@@ -2,26 +2,18 @@ package model
|
||||
|
||||
// MonitorIO 监控_磁盘IO monitor_io
|
||||
type MonitorIO struct {
|
||||
// id
|
||||
ID int64 `json:"id" gorm:"primaryKey"`
|
||||
// 创建时间
|
||||
CreateTime int64 `json:"createTime"`
|
||||
// 磁盘名
|
||||
Name string `json:"name"`
|
||||
// 读取K
|
||||
Read int64 `json:"read"`
|
||||
// 写入K
|
||||
Write int64 `json:"write"`
|
||||
// 次数
|
||||
Count int64 `json:"count"`
|
||||
// 耗时
|
||||
Time int64 `json:"time"`
|
||||
// 网元ID
|
||||
NeType string `json:"neType"`
|
||||
// 网元类型
|
||||
NeID string `json:"neId"`
|
||||
ID int64 `json:"id" gorm:"column:id;primaryKey;autoIncrement"`
|
||||
CreateTime int64 `json:"createTime" gorm:"create_time"` // 创建时间
|
||||
Name string `json:"name" gorm:"name"` // 磁盘名
|
||||
Read int64 `json:"read" gorm:"read"` // 读取K
|
||||
Write int64 `json:"write" gorm:"write"` // 写入K
|
||||
Count int64 `json:"count" gorm:"count"` // 读写次数
|
||||
Time int64 `json:"time" gorm:"time"` // 读写延迟
|
||||
NeType string `json:"neType" gorm:"ne_type"` // 网元类型
|
||||
NeID string `json:"neId" gorm:"ne_id"` // 网元ID
|
||||
}
|
||||
|
||||
func (MonitorIO) TableName() string {
|
||||
// TableName 表名称
|
||||
func (*MonitorIO) TableName() string {
|
||||
return "monitor_io"
|
||||
}
|
||||
|
||||
@@ -2,22 +2,16 @@ package model
|
||||
|
||||
// MonitorNetwork 监控_网络IO monitor_network
|
||||
type MonitorNetwork struct {
|
||||
// id
|
||||
ID int64 `json:"id" gorm:"primaryKey"`
|
||||
// 创建时间
|
||||
CreateTime int64 `json:"createTime"`
|
||||
// 网卡名
|
||||
Name string `json:"name"`
|
||||
// 上行
|
||||
Up float64 `json:"up"`
|
||||
// 下行
|
||||
Down float64 `json:"down"`
|
||||
// 网元ID 本机#号
|
||||
NeType string `json:"neType"`
|
||||
// 网元类型 本机#号
|
||||
NeID string `json:"neId"`
|
||||
ID int64 `json:"id" gorm:"column:id;primaryKey;autoIncrement"`
|
||||
CreateTime int64 `json:"createTime" gorm:"create_time"` // 创建时间
|
||||
Name string `json:"name" gorm:"name"` // 网卡名
|
||||
Up float64 `json:"up" gorm:"up"` // 上行
|
||||
Down float64 `json:"down" gorm:"down"` // 下行
|
||||
NeType string `json:"neType" gorm:"ne_type"` // 网元类型
|
||||
NeID string `json:"neId" gorm:"ne_id"` // 网元ID
|
||||
}
|
||||
|
||||
func (MonitorNetwork) TableName() string {
|
||||
// TableName 表名称
|
||||
func (*MonitorNetwork) TableName() string {
|
||||
return "monitor_network"
|
||||
}
|
||||
|
||||
@@ -41,17 +41,20 @@ func (s *MonitorImpl) RunMonitor() {
|
||||
itemBase.CreateTime = time.Now().UnixMilli()
|
||||
itemBase.NeType = "#"
|
||||
itemBase.NeID = "#"
|
||||
loadInfo, _ := load.Avg()
|
||||
itemBase.CPULoad1 = loadInfo.Load1
|
||||
itemBase.CPULoad5 = loadInfo.Load5
|
||||
itemBase.CPULoad15 = loadInfo.Load15
|
||||
totalPercent, _ := cpu.Percent(3*time.Second, false)
|
||||
if len(totalPercent) == 1 {
|
||||
itemBase.CPU = totalPercent[0]
|
||||
}
|
||||
cpuCount, _ := cpu.Counts(false)
|
||||
|
||||
loadInfo, _ := load.Avg()
|
||||
itemBase.CPULoad1 = loadInfo.Load1
|
||||
itemBase.CPULoad5 = loadInfo.Load5
|
||||
itemBase.CPULoad15 = loadInfo.Load15
|
||||
itemBase.LoadUsage = loadInfo.Load1 / (float64(cpuCount*2) * 0.75) * 100
|
||||
cpuAvg := (float64(cpuCount*2) * 0.75) * 100
|
||||
itemBase.LoadUsage = 0
|
||||
if cpuAvg > 0 {
|
||||
itemBase.LoadUsage = loadInfo.Load1 / cpuAvg
|
||||
}
|
||||
|
||||
memoryInfo, _ := mem.VirtualMemory()
|
||||
itemBase.Memory = memoryInfo.UsedPercent
|
||||
|
||||
@@ -49,15 +49,14 @@ func (s *IMSController) CDRList(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
// 查询网元获取IP
|
||||
// neInfo := s.neInfoService.SelectNeInfoByNeTypeAndNeID(querys.NeType, querys.NeID)
|
||||
// if neInfo.NeId != querys.NeID || neInfo.IP == "" {
|
||||
// c.JSON(200, result.ErrMsg(i18n.TKey(language, "app.common.noNEInfo")))
|
||||
// return
|
||||
// }
|
||||
// querys.RmUID = neInfo.RmUID
|
||||
// for multi-tenancy
|
||||
querys.UserName = ctx.LoginUserToUserName(c)
|
||||
// 查询网元信息 rmUID
|
||||
neInfo := s.neInfoService.SelectNeInfoByNeTypeAndNeID(querys.NeType, querys.NeID)
|
||||
if neInfo.NeId != querys.NeID || neInfo.IP == "" {
|
||||
c.JSON(200, result.ErrMsg(i18n.TKey(language, "app.common.noNEInfo")))
|
||||
return
|
||||
}
|
||||
querys.RmUID = neInfo.RmUID
|
||||
|
||||
// 查询数据
|
||||
data := s.cdrEventService.SelectPage(querys)
|
||||
c.JSON(200, result.Ok(data))
|
||||
@@ -104,6 +103,13 @@ func (s *IMSController) CDRExport(c *gin.Context) {
|
||||
if querys.PageSize > 10000 {
|
||||
querys.PageSize = 10000
|
||||
}
|
||||
// 查询网元信息 rmUID
|
||||
neInfo := s.neInfoService.SelectNeInfoByNeTypeAndNeID(querys.NeType, querys.NeID)
|
||||
if neInfo.NeId != querys.NeID || neInfo.IP == "" {
|
||||
c.JSON(200, result.ErrMsg(i18n.TKey(language, "app.common.noNEInfo")))
|
||||
return
|
||||
}
|
||||
querys.RmUID = neInfo.RmUID
|
||||
data := s.cdrEventService.SelectPage(querys)
|
||||
if parse.Number(data["total"]) == 0 {
|
||||
// 导出数据记录为空
|
||||
@@ -119,8 +125,8 @@ func (s *IMSController) CDRExport(c *gin.Context) {
|
||||
"A1": "ID",
|
||||
"B1": "Record Behavior",
|
||||
"C1": "Type",
|
||||
"D1": "Called",
|
||||
"E1": "Caller",
|
||||
"D1": "Caller",
|
||||
"E1": "Called",
|
||||
"F1": "Duration",
|
||||
"G1": "Result",
|
||||
"H1": "Time",
|
||||
@@ -194,8 +200,8 @@ func (s *IMSController) CDRExport(c *gin.Context) {
|
||||
"A" + idx: row.ID,
|
||||
"B" + idx: recordType,
|
||||
"C" + idx: callTypeLable,
|
||||
"D" + idx: called,
|
||||
"E" + idx: caller,
|
||||
"D" + idx: caller,
|
||||
"E" + idx: called,
|
||||
"F" + idx: duration,
|
||||
"G" + idx: callResult,
|
||||
"H" + idx: timeStr,
|
||||
|
||||
@@ -47,15 +47,14 @@ func (s *SMFController) CDRList(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
// 查询网元获取IP
|
||||
// neInfo := s.neInfoService.SelectNeInfoByNeTypeAndNeID(querys.NeType, querys.NeID)
|
||||
// if neInfo.NeId != querys.NeID || neInfo.IP == "" {
|
||||
// c.JSON(200, result.ErrMsg(i18n.TKey(language, "app.common.noNEInfo")))
|
||||
// return
|
||||
// }
|
||||
// querys.RmUID = neInfo.RmUID
|
||||
// for multi-tenancy
|
||||
querys.UserName = ctx.LoginUserToUserName(c)
|
||||
// 查询网元信息 rmUID
|
||||
neInfo := s.neInfoService.SelectNeInfoByNeTypeAndNeID(querys.NeType, querys.NeID)
|
||||
if neInfo.NeId != querys.NeID || neInfo.IP == "" {
|
||||
c.JSON(200, result.ErrMsg(i18n.TKey(language, "app.common.noNEInfo")))
|
||||
return
|
||||
}
|
||||
querys.RmUID = neInfo.RmUID
|
||||
|
||||
// 查询数据
|
||||
data := s.cdrEventService.SelectPage(querys)
|
||||
c.JSON(200, result.Ok(data))
|
||||
@@ -102,6 +101,13 @@ func (s *SMFController) CDRExport(c *gin.Context) {
|
||||
if querys.PageSize > 10000 {
|
||||
querys.PageSize = 10000
|
||||
}
|
||||
// 查询网元信息 rmUID
|
||||
neInfo := s.neInfoService.SelectNeInfoByNeTypeAndNeID(querys.NeType, querys.NeID)
|
||||
if neInfo.NeId != querys.NeID || neInfo.IP == "" {
|
||||
c.JSON(200, result.ErrMsg(i18n.TKey(language, "app.common.noNEInfo")))
|
||||
return
|
||||
}
|
||||
querys.RmUID = neInfo.RmUID
|
||||
data := s.cdrEventService.SelectPage(querys)
|
||||
if parse.Number(data["total"]) == 0 {
|
||||
// 导出数据记录为空
|
||||
|
||||
197
src/modules/network_data/controller/smsc.go
Normal file
197
src/modules/network_data/controller/smsc.go
Normal file
@@ -0,0 +1,197 @@
|
||||
package controller
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"be.ems/src/framework/i18n"
|
||||
"be.ems/src/framework/logger"
|
||||
"be.ems/src/framework/utils/ctx"
|
||||
"be.ems/src/framework/utils/date"
|
||||
"be.ems/src/framework/utils/file"
|
||||
"be.ems/src/framework/utils/parse"
|
||||
"be.ems/src/framework/vo/result"
|
||||
"be.ems/src/modules/network_data/model"
|
||||
neDataService "be.ems/src/modules/network_data/service"
|
||||
neService "be.ems/src/modules/network_element/service"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/gin-gonic/gin/binding"
|
||||
)
|
||||
|
||||
// 实例化控制层 SMSCController 结构体
|
||||
var NewSMSCController = &SMSCController{
|
||||
neInfoService: neService.NewNeInfoImpl,
|
||||
cdrEventService: neDataService.NewCDREventSMSCImpl,
|
||||
}
|
||||
|
||||
// 网元SMSC
|
||||
//
|
||||
// PATH /smsc
|
||||
type SMSCController struct {
|
||||
// 网元信息服务
|
||||
neInfoService neService.INeInfo
|
||||
// CDR会话事件服务
|
||||
cdrEventService neDataService.ICDREventSMSC
|
||||
}
|
||||
|
||||
// CDR会话列表
|
||||
//
|
||||
// GET /cdr/list
|
||||
func (s *SMSCController) CDRList(c *gin.Context) {
|
||||
language := ctx.AcceptLanguage(c)
|
||||
var querys model.CDREventSMSCQuery
|
||||
if err := c.ShouldBindQuery(&querys); err != nil {
|
||||
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
|
||||
return
|
||||
}
|
||||
|
||||
// 查询网元信息 rmUID
|
||||
neInfo := s.neInfoService.SelectNeInfoByNeTypeAndNeID(querys.NeType, querys.NeID)
|
||||
if neInfo.NeId != querys.NeID || neInfo.IP == "" {
|
||||
c.JSON(200, result.ErrMsg(i18n.TKey(language, "app.common.noNEInfo")))
|
||||
return
|
||||
}
|
||||
querys.RmUID = neInfo.RmUID
|
||||
|
||||
// 查询数据
|
||||
data := s.cdrEventService.SelectPage(querys)
|
||||
c.JSON(200, result.Ok(data))
|
||||
}
|
||||
|
||||
// CDR会话删除
|
||||
//
|
||||
// DELETE /cdr/:cdrIds
|
||||
func (s *SMSCController) CDRRemove(c *gin.Context) {
|
||||
language := ctx.AcceptLanguage(c)
|
||||
cdrIds := c.Param("cdrIds")
|
||||
if cdrIds == "" {
|
||||
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
|
||||
return
|
||||
}
|
||||
// 处理字符转id数组后去重
|
||||
ids := strings.Split(cdrIds, ",")
|
||||
uniqueIDs := parse.RemoveDuplicates(ids)
|
||||
if len(uniqueIDs) <= 0 {
|
||||
c.JSON(200, result.Err(nil))
|
||||
return
|
||||
}
|
||||
rows, err := s.cdrEventService.DeleteByIds(uniqueIDs)
|
||||
if err != nil {
|
||||
c.JSON(200, result.ErrMsg(i18n.TKey(language, err.Error())))
|
||||
return
|
||||
}
|
||||
msg := i18n.TTemplate(language, "app.common.deleteSuccess", map[string]any{"num": rows})
|
||||
c.JSON(200, result.OkMsg(msg))
|
||||
}
|
||||
|
||||
// CDR会话列表导出
|
||||
//
|
||||
// POST /cdr/export
|
||||
func (s *SMSCController) CDRExport(c *gin.Context) {
|
||||
language := ctx.AcceptLanguage(c)
|
||||
// 查询结果,根据查询条件结果,单页最大值限制
|
||||
var querys model.CDREventSMSCQuery
|
||||
if err := c.ShouldBindBodyWith(&querys, binding.JSON); err != nil {
|
||||
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
|
||||
return
|
||||
}
|
||||
// 限制导出数据集
|
||||
if querys.PageSize > 10000 {
|
||||
querys.PageSize = 10000
|
||||
}
|
||||
// 查询网元信息 rmUID
|
||||
neInfo := s.neInfoService.SelectNeInfoByNeTypeAndNeID(querys.NeType, querys.NeID)
|
||||
if neInfo.NeId != querys.NeID || neInfo.IP == "" {
|
||||
c.JSON(200, result.ErrMsg(i18n.TKey(language, "app.common.noNEInfo")))
|
||||
return
|
||||
}
|
||||
querys.RmUID = neInfo.RmUID
|
||||
data := s.cdrEventService.SelectPage(querys)
|
||||
if parse.Number(data["total"]) == 0 {
|
||||
// 导出数据记录为空
|
||||
c.JSON(200, result.ErrMsg(i18n.TKey(language, "app.common.exportEmpty")))
|
||||
return
|
||||
}
|
||||
rows := data["rows"].([]model.CDREventSMSC)
|
||||
|
||||
// 导出文件名称
|
||||
fileName := fmt.Sprintf("smsc_cdr_event_export_%d_%d.xlsx", len(rows), time.Now().UnixMilli())
|
||||
// 第一行表头标题
|
||||
headerCells := map[string]string{
|
||||
"A1": "ID",
|
||||
"B1": "Record Behavior",
|
||||
"C1": "Service Type",
|
||||
"D1": "Caller",
|
||||
"E1": "Called",
|
||||
"F1": "Result",
|
||||
"G1": "Time",
|
||||
}
|
||||
// 从第二行开始的数据
|
||||
dataCells := make([]map[string]any, 0)
|
||||
for i, row := range rows {
|
||||
idx := strconv.Itoa(i + 2)
|
||||
// 解析 JSON 字符串为 map
|
||||
var cdrJSON map[string]interface{}
|
||||
err := json.Unmarshal([]byte(row.CDRJSONStr), &cdrJSON)
|
||||
if err != nil {
|
||||
logger.Warnf("CDRExport Error parsing JSON: %s", err.Error())
|
||||
continue
|
||||
}
|
||||
// 记录类型
|
||||
recordType := ""
|
||||
if v, ok := cdrJSON["recordType"]; ok && v != nil {
|
||||
recordType = v.(string)
|
||||
}
|
||||
// 服务类型
|
||||
serviceType := ""
|
||||
if v, ok := cdrJSON["serviceType"]; ok && v != nil {
|
||||
serviceType = v.(string)
|
||||
}
|
||||
// 被叫
|
||||
called := ""
|
||||
if v, ok := cdrJSON["calledParty"]; ok && v != nil {
|
||||
called = v.(string)
|
||||
}
|
||||
// 主叫
|
||||
caller := ""
|
||||
if v, ok := cdrJSON["callerParty"]; ok && v != nil {
|
||||
caller = v.(string)
|
||||
}
|
||||
// 呼叫结果 0失败,1成功
|
||||
callResult := "Fail"
|
||||
if v, ok := cdrJSON["result"]; ok && v != nil {
|
||||
resultVal := parse.Number(v)
|
||||
if resultVal == 1 {
|
||||
callResult = "Success"
|
||||
}
|
||||
}
|
||||
// 取时间
|
||||
timeStr := ""
|
||||
if v, ok := cdrJSON["updateTime"]; ok && v != nil {
|
||||
releaseTime := parse.Number(v)
|
||||
timeStr = date.ParseDateToStr(releaseTime, date.YYYY_MM_DDTHH_MM_SSZ)
|
||||
}
|
||||
|
||||
dataCells = append(dataCells, map[string]any{
|
||||
"A" + idx: row.ID,
|
||||
"B" + idx: recordType,
|
||||
"C" + idx: serviceType,
|
||||
"D" + idx: caller,
|
||||
"E" + idx: called,
|
||||
"F" + idx: callResult,
|
||||
"G" + idx: timeStr,
|
||||
})
|
||||
}
|
||||
|
||||
// 导出数据表格
|
||||
saveFilePath, err := file.WriteSheet(headerCells, dataCells, fileName, "")
|
||||
if err != nil {
|
||||
c.JSON(200, result.ErrMsg(err.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
c.FileAttachment(saveFilePath, fileName)
|
||||
}
|
||||
30
src/modules/network_data/model/cdr_event_smsc.go
Normal file
30
src/modules/network_data/model/cdr_event_smsc.go
Normal file
@@ -0,0 +1,30 @@
|
||||
package model
|
||||
|
||||
import "time"
|
||||
|
||||
// CDREventSMSC CDR会话对象SMSC cdr_event_smsc
|
||||
type CDREventSMSC struct {
|
||||
ID string `json:"id" gorm:"column:id;primaryKey;autoIncrement"`
|
||||
NeType string `json:"neType" gorm:"column:ne_type"`
|
||||
NeName string `json:"neName" gorm:"column:ne_name"`
|
||||
RmUID string `json:"rmUID" gorm:"column:rm_uid"` // 可能没有
|
||||
Timestamp int64 `json:"timestamp" gorm:"column:timestamp"`
|
||||
CDRJSONStr string `json:"cdrJSON" gorm:"column:cdr_json"`
|
||||
CreatedAt time.Time `json:"createdAt" gorm:"column:created_at;default:CURRENT_TIMESTAMP"`
|
||||
}
|
||||
|
||||
// CDREventSMSCQuery CDR会话对象SMSC查询参数结构体
|
||||
type CDREventSMSCQuery struct {
|
||||
NeType string `json:"neType" form:"neType" binding:"required"` // 网元类型SMSC
|
||||
NeID string `json:"neId" form:"neId" binding:"required"`
|
||||
RmUID string `json:"rmUID" form:"rmUID"`
|
||||
RecordType string `json:"recordType" form:"recordType"` // 记录行为 MOSM MTSM
|
||||
CallerParty string `json:"callerParty" form:"callerParty"` // 主叫号码
|
||||
CalledParty string `json:"calledParty" form:"calledParty"` // 被叫号码
|
||||
StartTime string `json:"startTime" form:"startTime"`
|
||||
EndTime string `json:"endTime" form:"endTime"`
|
||||
SortField string `json:"sortField" form:"sortField" binding:"omitempty,oneof=timestamp"` // 排序字段,填写结果字段
|
||||
SortOrder string `json:"sortOrder" form:"sortOrder" binding:"omitempty,oneof=asc desc"` // 排序升降序,asc desc
|
||||
PageNum int64 `json:"pageNum" form:"pageNum" binding:"required"`
|
||||
PageSize int64 `json:"pageSize" form:"pageSize" binding:"required"`
|
||||
}
|
||||
@@ -61,6 +61,25 @@ func Setup(router *gin.Engine) {
|
||||
)
|
||||
}
|
||||
|
||||
// 网元SMSC
|
||||
smscGroup := neDataGroup.Group("/smsc")
|
||||
{
|
||||
smscGroup.GET("/cdr/list",
|
||||
middleware.PreAuthorize(nil),
|
||||
controller.NewSMSCController.CDRList,
|
||||
)
|
||||
smscGroup.DELETE("/cdr/:cdrIds",
|
||||
middleware.PreAuthorize(nil),
|
||||
collectlogs.OperateLog(collectlogs.OptionNew("log.operate.title.smscCDR", collectlogs.BUSINESS_TYPE_DELETE)),
|
||||
controller.NewSMSCController.CDRRemove,
|
||||
)
|
||||
smscGroup.POST("/cdr/export",
|
||||
middleware.PreAuthorize(nil),
|
||||
collectlogs.OperateLog(collectlogs.OptionNew("log.operate.title.smscCDR", collectlogs.BUSINESS_TYPE_EXPORT)),
|
||||
controller.NewSMSCController.CDRExport,
|
||||
)
|
||||
}
|
||||
|
||||
// 网元SMF
|
||||
smfGroup := neDataGroup.Group("/smf")
|
||||
{
|
||||
|
||||
15
src/modules/network_data/repository/cdr_event_smsc.go
Normal file
15
src/modules/network_data/repository/cdr_event_smsc.go
Normal file
@@ -0,0 +1,15 @@
|
||||
package repository
|
||||
|
||||
import "be.ems/src/modules/network_data/model"
|
||||
|
||||
// CDR会话事件SMSC 数据层接口
|
||||
type ICDREventSMSC interface {
|
||||
// SelectPage 根据条件分页查询
|
||||
SelectPage(querys model.CDREventSMSCQuery) map[string]any
|
||||
|
||||
// SelectByIds 通过ID查询
|
||||
SelectByIds(cdrIds []string) []model.CDREventSMSC
|
||||
|
||||
// DeleteByIds 批量删除信息
|
||||
DeleteByIds(cdrIds []string) int64
|
||||
}
|
||||
181
src/modules/network_data/repository/cdr_event_smsc.impl.go
Normal file
181
src/modules/network_data/repository/cdr_event_smsc.impl.go
Normal file
@@ -0,0 +1,181 @@
|
||||
package repository
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"be.ems/src/framework/datasource"
|
||||
"be.ems/src/framework/logger"
|
||||
"be.ems/src/framework/utils/parse"
|
||||
"be.ems/src/framework/utils/repo"
|
||||
"be.ems/src/modules/network_data/model"
|
||||
)
|
||||
|
||||
// 实例化数据层 CDREventSMSCImpl 结构体
|
||||
var NewCDREventSMSCImpl = &CDREventSMSCImpl{
|
||||
selectSql: `select id, ne_type, ne_name, rm_uid, timestamp, cdr_json, created_at from cdr_event_smsc`,
|
||||
|
||||
resultMap: map[string]string{
|
||||
"id": "ID",
|
||||
"ne_type": "NeType",
|
||||
"ne_name": "NeName",
|
||||
"rm_uid": "RmUID",
|
||||
"timestamp": "Timestamp",
|
||||
"cdr_json": "CDRJSONStr",
|
||||
"created_at": "CreatedAt",
|
||||
},
|
||||
}
|
||||
|
||||
// CDREventSMSCImpl CDR会话事件 数据层处理
|
||||
type CDREventSMSCImpl struct {
|
||||
// 查询视图对象SQL
|
||||
selectSql string
|
||||
// 结果字段与实体映射
|
||||
resultMap map[string]string
|
||||
}
|
||||
|
||||
// convertResultRows 将结果记录转实体结果组
|
||||
func (r *CDREventSMSCImpl) convertResultRows(rows []map[string]any) []model.CDREventSMSC {
|
||||
arr := make([]model.CDREventSMSC, 0)
|
||||
for _, row := range rows {
|
||||
item := model.CDREventSMSC{}
|
||||
for key, value := range row {
|
||||
if keyMapper, ok := r.resultMap[key]; ok {
|
||||
repo.SetFieldValue(&item, keyMapper, value)
|
||||
}
|
||||
}
|
||||
arr = append(arr, item)
|
||||
}
|
||||
return arr
|
||||
}
|
||||
|
||||
// SelectPage 根据条件分页查询
|
||||
func (r *CDREventSMSCImpl) SelectPage(querys model.CDREventSMSCQuery) map[string]any {
|
||||
// 查询条件拼接
|
||||
var conditions []string
|
||||
var params []any
|
||||
if querys.NeType != "" {
|
||||
conditions = append(conditions, "ne_type = ?")
|
||||
params = append(params, querys.NeType)
|
||||
}
|
||||
if querys.RmUID != "" {
|
||||
conditions = append(conditions, "rm_uid = ?")
|
||||
params = append(params, querys.RmUID)
|
||||
}
|
||||
if querys.StartTime != "" {
|
||||
conditions = append(conditions, "timestamp >= ?")
|
||||
if len(querys.StartTime) == 13 {
|
||||
querys.StartTime = querys.StartTime[:10]
|
||||
}
|
||||
params = append(params, querys.StartTime)
|
||||
}
|
||||
if querys.EndTime != "" {
|
||||
conditions = append(conditions, "timestamp <= ?")
|
||||
if len(querys.EndTime) == 13 {
|
||||
querys.EndTime = querys.EndTime[:10]
|
||||
}
|
||||
params = append(params, querys.EndTime)
|
||||
}
|
||||
// MySQL8支持的
|
||||
// if querys.RecordType != "" {
|
||||
// recordTypes := strings.Split(querys.RecordType, ",")
|
||||
// placeholder := repo.KeyPlaceholderByQuery(len(recordTypes))
|
||||
// conditions = append(conditions, fmt.Sprintf("JSON_EXTRACT(cdr_json, '$.recordType') in (%s)", placeholder))
|
||||
// for _, recordType := range recordTypes {
|
||||
// params = append(params, recordType)
|
||||
// }
|
||||
// }
|
||||
// Mariadb不支持json in查询改or
|
||||
if querys.RecordType != "" {
|
||||
recordTypes := strings.Split(querys.RecordType, ",")
|
||||
var queryStrArr []string
|
||||
for _, recordType := range recordTypes {
|
||||
queryStrArr = append(queryStrArr, "JSON_EXTRACT(cdr_json, '$.recordType') = ?")
|
||||
params = append(params, recordType)
|
||||
}
|
||||
conditions = append(conditions, fmt.Sprintf("( %s )", strings.Join(queryStrArr, " OR ")))
|
||||
}
|
||||
|
||||
// 构建查询条件语句
|
||||
whereSql := ""
|
||||
if len(conditions) > 0 {
|
||||
whereSql += " where " + strings.Join(conditions, " and ")
|
||||
}
|
||||
|
||||
result := map[string]any{
|
||||
"total": 0,
|
||||
"rows": []model.CDREventSMSC{},
|
||||
}
|
||||
|
||||
// 查询数量 长度为0直接返回
|
||||
totalSql := "select count(1) as 'total' from cdr_event_smsc"
|
||||
totalRows, err := datasource.RawDB("", totalSql+whereSql, params)
|
||||
if err != nil {
|
||||
logger.Errorf("total err => %v", err)
|
||||
return result
|
||||
}
|
||||
total := parse.Number(totalRows[0]["total"])
|
||||
if total == 0 {
|
||||
return result
|
||||
} else {
|
||||
result["total"] = total
|
||||
}
|
||||
|
||||
// 分页
|
||||
pageNum, pageSize := repo.PageNumSize(querys.PageNum, querys.PageSize)
|
||||
pageSql := " limit ?,? "
|
||||
params = append(params, pageNum*pageSize)
|
||||
params = append(params, pageSize)
|
||||
|
||||
// 排序
|
||||
orderSql := ""
|
||||
if querys.SortField != "" {
|
||||
sortSql := querys.SortField
|
||||
if querys.SortOrder != "" {
|
||||
if querys.SortOrder == "desc" {
|
||||
sortSql += " desc "
|
||||
} else {
|
||||
sortSql += " asc "
|
||||
}
|
||||
}
|
||||
orderSql = fmt.Sprintf(" order by id desc, %s ", sortSql)
|
||||
}
|
||||
|
||||
// 查询数据
|
||||
querySql := r.selectSql + whereSql + orderSql + pageSql
|
||||
results, err := datasource.RawDB("", querySql, params)
|
||||
if err != nil {
|
||||
logger.Errorf("query err => %v", err)
|
||||
}
|
||||
|
||||
// 转换实体
|
||||
result["rows"] = r.convertResultRows(results)
|
||||
return result
|
||||
}
|
||||
|
||||
// SelectByIds 通过ID查询
|
||||
func (r *CDREventSMSCImpl) SelectByIds(cdrIds []string) []model.CDREventSMSC {
|
||||
placeholder := repo.KeyPlaceholderByQuery(len(cdrIds))
|
||||
querySql := r.selectSql + " where id in (" + placeholder + ")"
|
||||
parameters := repo.ConvertIdsSlice(cdrIds)
|
||||
results, err := datasource.RawDB("", querySql, parameters)
|
||||
if err != nil {
|
||||
logger.Errorf("query err => %v", err)
|
||||
return []model.CDREventSMSC{}
|
||||
}
|
||||
// 转换实体
|
||||
return r.convertResultRows(results)
|
||||
}
|
||||
|
||||
// DeleteByIds 批量删除信息
|
||||
func (r *CDREventSMSCImpl) DeleteByIds(cdrIds []string) int64 {
|
||||
placeholder := repo.KeyPlaceholderByQuery(len(cdrIds))
|
||||
sql := "delete from cdr_event_smsc where id in (" + placeholder + ")"
|
||||
parameters := repo.ConvertIdsSlice(cdrIds)
|
||||
results, err := datasource.ExecDB("", sql, parameters)
|
||||
if err != nil {
|
||||
logger.Errorf("delete err => %v", err)
|
||||
return 0
|
||||
}
|
||||
return results
|
||||
}
|
||||
12
src/modules/network_data/service/cdr_event_smsc.go
Normal file
12
src/modules/network_data/service/cdr_event_smsc.go
Normal file
@@ -0,0 +1,12 @@
|
||||
package service
|
||||
|
||||
import "be.ems/src/modules/network_data/model"
|
||||
|
||||
// CDR会话事件SMSC 服务层接口
|
||||
type ICDREventSMSC interface {
|
||||
// SelectPage 根据条件分页查询
|
||||
SelectPage(querys model.CDREventSMSCQuery) map[string]any
|
||||
|
||||
// DeleteByIds 批量删除信息
|
||||
DeleteByIds(cdrIds []string) (int64, error)
|
||||
}
|
||||
37
src/modules/network_data/service/cdr_event_smsc.impl.go
Normal file
37
src/modules/network_data/service/cdr_event_smsc.impl.go
Normal file
@@ -0,0 +1,37 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"be.ems/src/modules/network_data/model"
|
||||
"be.ems/src/modules/network_data/repository"
|
||||
)
|
||||
|
||||
var NewCDREventSMSCImpl = &CDREventSMSCImpl{
|
||||
cdrEventRepository: repository.NewCDREventSMSCImpl,
|
||||
}
|
||||
|
||||
type CDREventSMSCImpl struct {
|
||||
// CDR会话事件数据信息
|
||||
cdrEventRepository repository.ICDREventSMSC
|
||||
}
|
||||
|
||||
func (r *CDREventSMSCImpl) SelectPage(querys model.CDREventSMSCQuery) map[string]any {
|
||||
return r.cdrEventRepository.SelectPage(querys)
|
||||
}
|
||||
|
||||
// DeleteByIds 批量删除信息
|
||||
func (r *CDREventSMSCImpl) DeleteByIds(cdrIds []string) (int64, error) {
|
||||
// 检查是否存在
|
||||
ids := r.cdrEventRepository.SelectByIds(cdrIds)
|
||||
if len(ids) <= 0 {
|
||||
return 0, fmt.Errorf("not data")
|
||||
}
|
||||
|
||||
if len(ids) == len(cdrIds) {
|
||||
rows := r.cdrEventRepository.DeleteByIds(cdrIds)
|
||||
return rows, nil
|
||||
}
|
||||
// 删除信息失败!
|
||||
return 0, fmt.Errorf("delete fail")
|
||||
}
|
||||
@@ -6,7 +6,7 @@ import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
// 加密
|
||||
// UDM K4加密
|
||||
func encrypt(origData, key []byte) ([]byte, error) {
|
||||
if len(origData) < 1 || len(key) < 1 {
|
||||
return nil, errors.New("wrong data or key")
|
||||
@@ -36,11 +36,11 @@ func TestEncrypt(t *testing.T) {
|
||||
// ki := []byte{0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef, 0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef}
|
||||
// 0123456789abcdef0123456789abcdef
|
||||
|
||||
// 密码
|
||||
// k4 password
|
||||
key := []byte{0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34}
|
||||
// 1234123412341234
|
||||
|
||||
// 要加密的ki
|
||||
// k4 crypt ki
|
||||
ki := []byte{0x80, 0x5D, 0xAD, 0xC6, 0xE8, 0xA5, 0x4A, 0x0D, 0x59, 0xD6, 0x22, 0xC7, 0xA0, 0x4D, 0x08, 0xE0}
|
||||
// 805DADC6E8A54A0D59D622C7A04D08E0
|
||||
|
||||
@@ -54,7 +54,7 @@ func NeConfigOMC(neInfo model.NeInfo) (map[string]any, error) {
|
||||
func NeConfigInfo(neInfo model.NeInfo, paramName string) (map[string]any, error) {
|
||||
// 网元配置对端网管信息
|
||||
neUrl := fmt.Sprintf("http://%s:%d/api/rest/systemManagement/v1/elementType/%s/objectType/config/%s", neInfo.IP, neInfo.Port, strings.ToLower(neInfo.NeType), paramName)
|
||||
resBytes, err := fetch.Get(neUrl, nil, 1000)
|
||||
resBytes, err := fetch.Get(neUrl, nil, 60_000)
|
||||
if err != nil {
|
||||
logger.Warnf("NeConfigInfo Get \"%s\"", neUrl)
|
||||
logger.Errorf("NeConfigInfo %s", err.Error())
|
||||
|
||||
230
src/modules/network_element/ne_config_test.go
Normal file
230
src/modules/network_element/ne_config_test.go
Normal file
@@ -0,0 +1,230 @@
|
||||
package networkelement
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"be.ems/src/modules/network_element/model"
|
||||
"gopkg.in/yaml.v3"
|
||||
"gorm.io/driver/mysql"
|
||||
"gorm.io/gorm"
|
||||
"gorm.io/gorm/logger"
|
||||
)
|
||||
|
||||
const (
|
||||
// 数据库
|
||||
DbHost = "192.168.8.58"
|
||||
DbPort = 33066
|
||||
DbUser = "root"
|
||||
DbPassswd = "1000omc@kp!"
|
||||
DbName = "omc_db"
|
||||
// 配置文件路径
|
||||
configParamDir = "../../../config/param"
|
||||
// configParamFile = "*" // 目录下全部更新
|
||||
configParamFile = "upf_param_config.yaml" // 单文件更新
|
||||
)
|
||||
|
||||
func TestEncrypt(t *testing.T) {
|
||||
fileNameList, err := getDirFileNameList(configParamDir)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
return
|
||||
}
|
||||
|
||||
if configParamFile == "*" {
|
||||
for _, v := range fileNameList {
|
||||
params := parseData(filepath.Join(configParamDir, v))
|
||||
if params == nil {
|
||||
return
|
||||
}
|
||||
saveData(params)
|
||||
}
|
||||
} else {
|
||||
params := parseData(filepath.Join(configParamDir, configParamFile))
|
||||
if params == nil {
|
||||
return
|
||||
}
|
||||
saveData(params)
|
||||
}
|
||||
}
|
||||
|
||||
// ========= Main =============
|
||||
|
||||
// parseData 文件转map数据
|
||||
func parseData(filePaht string) []map[string]string {
|
||||
data, err := parseStrToMap(filePaht)
|
||||
if err != nil {
|
||||
log.Printf("parseStrToMap => %s", err.Error())
|
||||
return nil
|
||||
}
|
||||
params, err := parseParamConfig(data)
|
||||
if err != nil {
|
||||
log.Printf("parseParamConfig => %s", err.Error())
|
||||
return nil
|
||||
}
|
||||
return params
|
||||
}
|
||||
|
||||
// saveData 保存数据
|
||||
func saveData(params []map[string]string) {
|
||||
// 定义排序函数
|
||||
sort.Slice(params, func(i, j int) bool {
|
||||
paramSortI := params[i]["paramSort"]
|
||||
if len(paramSortI) == 0 || paramSortI == "" {
|
||||
paramSortI = "0"
|
||||
}
|
||||
paramSortJ := params[j]["paramSort"]
|
||||
if len(paramSortJ) == 0 || paramSortJ == "" {
|
||||
paramSortJ = "0"
|
||||
}
|
||||
// 将 age 字段转换为整数进行比较
|
||||
si, _ := strconv.Atoi(paramSortI)
|
||||
sj, _ := strconv.Atoi(paramSortJ)
|
||||
return si < sj
|
||||
})
|
||||
// 遍历插入
|
||||
for _, v := range params {
|
||||
paramSort := v["paramSort"]
|
||||
if len(paramSort) == 0 || paramSort == "" {
|
||||
paramSort = "0"
|
||||
}
|
||||
sort, err := strconv.ParseInt(paramSort, 10, 64)
|
||||
if err != nil {
|
||||
sort = 0
|
||||
}
|
||||
|
||||
neConfig := model.NeConfig{
|
||||
NeType: v["neType"],
|
||||
ParamName: v["paramName"],
|
||||
ParamDisplay: v["paramDisplay"],
|
||||
ParamType: v["paramType"],
|
||||
ParamJson: v["paramJson"],
|
||||
ParamPerms: v["paramPerms"],
|
||||
ParamSort: sort,
|
||||
}
|
||||
neConfig.ID = saveDB(neConfig)
|
||||
log.Println(neConfig.ID, neConfig.NeType, neConfig.ParamDisplay)
|
||||
}
|
||||
}
|
||||
|
||||
// ========= DB =============
|
||||
|
||||
var gdb *gorm.DB
|
||||
|
||||
// connDB 连接到数据库
|
||||
func connDB() *gorm.DB {
|
||||
if gdb != nil {
|
||||
return gdb
|
||||
}
|
||||
|
||||
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=Local", DbUser, DbPassswd, DbHost, DbPort, DbName)
|
||||
newLogger := logger.New(
|
||||
log.New(os.Stdout, "\r\n", log.LstdFlags), // io writer
|
||||
logger.Config{
|
||||
SlowThreshold: time.Minute, // Slow SQL threshold
|
||||
LogLevel: logger.Error, // Log level
|
||||
IgnoreRecordNotFoundError: true, // Ignore ErrRecordNotFound error for logger
|
||||
ParameterizedQueries: true, // Don't include params in the SQL log
|
||||
Colorful: false, // Disable color
|
||||
},
|
||||
)
|
||||
db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{Logger: newLogger})
|
||||
if err != nil {
|
||||
log.Fatalln(err)
|
||||
}
|
||||
gdb = db
|
||||
return gdb
|
||||
}
|
||||
|
||||
// saveDB 表插入或更新
|
||||
func saveDB(s model.NeConfig) string {
|
||||
db := connDB()
|
||||
// 检查是否存在
|
||||
var id string
|
||||
db.Raw("SELECT id FROM ne_config WHERE ne_type = ? AND param_name = ?", s.NeType, s.ParamName).Scan(&id)
|
||||
// 更新时间
|
||||
s.UpdateTime = time.Now().UnixMilli()
|
||||
if id != "" {
|
||||
s.ID = id
|
||||
db.Save(s)
|
||||
} else {
|
||||
db.Create(s)
|
||||
}
|
||||
return s.ID
|
||||
}
|
||||
|
||||
// ========= Utils =============
|
||||
|
||||
// getDirFileNameList 获取文件目录下所有文件名称,不含目录名称
|
||||
func getDirFileNameList(dirPath string) ([]string, error) {
|
||||
fileNames := []string{}
|
||||
|
||||
dir, err := os.Open(dirPath)
|
||||
if err != nil {
|
||||
return fileNames, nil
|
||||
}
|
||||
defer dir.Close()
|
||||
|
||||
fileInfos, err := dir.Readdir(-1)
|
||||
if err != nil {
|
||||
return fileNames, err
|
||||
}
|
||||
|
||||
for _, fileInfo := range fileInfos {
|
||||
if fileInfo.Mode().IsRegular() {
|
||||
fileNames = append(fileNames, fileInfo.Name())
|
||||
}
|
||||
}
|
||||
|
||||
return fileNames, nil
|
||||
}
|
||||
|
||||
// parseStrToMap 解析内容string到map
|
||||
func parseStrToMap(filePath string) (map[string]any, error) {
|
||||
// 读取文件内容
|
||||
bytes, err := os.ReadFile(filePath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
content := string(bytes)
|
||||
var configMap map[string]any
|
||||
err = yaml.Unmarshal([]byte(content), &configMap)
|
||||
|
||||
return configMap, err
|
||||
}
|
||||
|
||||
// parseParamConfig 解析内容文件数据
|
||||
func parseParamConfig(data map[string]any) ([]map[string]string, error) {
|
||||
paramMapArr := make([]map[string]string, 0)
|
||||
for k, v := range data {
|
||||
for ik, iv := range v.(map[string]any) {
|
||||
itemMap := make(map[string]string)
|
||||
itemMap["neType"] = strings.ToUpper(k)
|
||||
itemMap["paramName"] = ik
|
||||
for iik, iiv := range iv.(map[string]any) {
|
||||
switch iik {
|
||||
case "display":
|
||||
itemMap["paramDisplay"] = iiv.(string)
|
||||
case "sort":
|
||||
itemMap["paramSort"] = fmt.Sprint(iiv)
|
||||
case "perms", "method":
|
||||
itemMap["paramPerms"] = iiv.(string)
|
||||
case "data", "list", "array":
|
||||
itemMap["paramType"] = iik
|
||||
strByte, _ := json.Marshal(iiv)
|
||||
itemMap["paramJson"] = string(strByte)
|
||||
}
|
||||
}
|
||||
paramMapArr = append(paramMapArr, itemMap)
|
||||
}
|
||||
}
|
||||
return paramMapArr, nil
|
||||
}
|
||||
@@ -119,6 +119,8 @@ func (r *NeConfigBackupImpl) NeConfigLocalToNe(neInfo model.NeInfo, localFile st
|
||||
sshClient.RunCMD(fmt.Sprintf("sudo mkdir -p /usr/local/etc/rtproxy && sudo cp -rf %s/rtproxy/* /usr/local/etc/rtproxy && sudo chmod 755 /usr/local/etc/rtproxy/rtproxy.conf", neDirTemp))
|
||||
// iwf目录
|
||||
sshClient.RunCMD(fmt.Sprintf("sudo mkdir -p /usr/local/etc/iwf && sudo cp -rf %s/iwf/* /usr/local/etc/iwf && sudo chmod 755 /usr/local/etc/iwf/*.yaml", neDirTemp))
|
||||
} else if neTypeLower == "omc" {
|
||||
sshClient.RunCMD(fmt.Sprintf("sudo mkdir -p /usr/local/omc/etc && sudo cp -rf %s/* /usr/local/omc/etc && sudo chmod 755 /usr/local/omc/etc/*.{yaml,conf}", neDirTemp))
|
||||
} else {
|
||||
neEtcPath := fmt.Sprintf("/usr/local/etc/%s", neTypeLower)
|
||||
chmodFile := fmt.Sprintf("sudo chmod 755 %s/*.yaml", neEtcPath)
|
||||
@@ -172,6 +174,8 @@ func (r *NeConfigBackupImpl) NeConfigNeToLocal(neInfo model.NeInfo) (string, err
|
||||
sshClient.RunCMD(fmt.Sprintf("mkdir -p %s/rtproxy && sudo cp -rf /usr/local/etc/rtproxy/rtproxy.conf %s/rtproxy", neDirTemp, neDirTemp))
|
||||
// iwf目录
|
||||
sshClient.RunCMD(fmt.Sprintf("mkdir -p %s/iwf && sudo cp -rf /usr/local/etc/iwf/*.yaml %s/iwf", neDirTemp, neDirTemp))
|
||||
} else if neTypeLower == "omc" {
|
||||
sshClient.RunCMD(fmt.Sprintf("mkdir -p %s && sudo cp -rf /usr/local/omc/etc/*.{yaml,conf} %s", neDirTemp, neDirTemp))
|
||||
} else {
|
||||
nePath := fmt.Sprintf("/usr/local/etc/%s/*.yaml", neTypeLower)
|
||||
if neTypeLower == "mme" {
|
||||
|
||||
@@ -563,6 +563,7 @@ func (r *NeInfoImpl) NeConfOAMSync(neInfo model.NeInfo, content map[string]any,
|
||||
"pvFlag": neInfo.PvFlag,
|
||||
}
|
||||
|
||||
// 公共参数指定的OMC
|
||||
if omcIP, ok := r.Para5GData["OMC_IP"]; ok && omcIP != "" {
|
||||
if strings.Contains(omcIP, ":") {
|
||||
item["ipType"] = "ipv6"
|
||||
@@ -574,6 +575,17 @@ func (r *NeInfoImpl) NeConfOAMSync(neInfo model.NeInfo, content map[string]any,
|
||||
}
|
||||
}
|
||||
|
||||
if v, ok := content["omcIP"]; ok && v != "" && v != nil {
|
||||
omcIP := v.(string)
|
||||
if strings.Contains(omcIP, ":") {
|
||||
item["ipType"] = "ipv6"
|
||||
item["ipv6"] = omcIP
|
||||
}
|
||||
if strings.Contains(omcIP, ".") {
|
||||
item["ipType"] = "ipv4"
|
||||
item["ipv4"] = omcIP
|
||||
}
|
||||
}
|
||||
if oamEnable, ok := content["oamEnable"]; ok && oamEnable != nil {
|
||||
item["enable"] = parse.Boolean(oamEnable)
|
||||
}
|
||||
|
||||
@@ -228,7 +228,7 @@ func (s *SysConfigController) Export(c *gin.Context) {
|
||||
// 查询结果,根据查询条件结果,单页最大值限制
|
||||
querys := ctx.BodyJSONMap(c)
|
||||
querys["pageNum"] = 1
|
||||
querys["pageSize"] = 1000
|
||||
querys["pageSize"] = 10000
|
||||
data := s.sysConfigService.SelectConfigPage(querys)
|
||||
if parse.Number(data["total"]) == 0 {
|
||||
// 导出数据记录为空
|
||||
|
||||
@@ -241,16 +241,18 @@ func (s *SysDictDataController) DictType(c *gin.Context) {
|
||||
func (s *SysDictDataController) Export(c *gin.Context) {
|
||||
language := ctx.AcceptLanguage(c)
|
||||
// 查询结果,根据查询条件结果,单页最大值限制
|
||||
// querys := ctx.BodyJSONMap(c)
|
||||
// data := s.sysDictDataService.SelectDictDataPage(querys)
|
||||
// if data["total"].(int64) == 0 {
|
||||
// // 导出数据记录为空
|
||||
// c.JSON(200, result.ErrMsg(i18n.TKey(language, "app.common.exportEmpty")))
|
||||
// return
|
||||
// }
|
||||
// rows := data["rows"].([]model.SysDictData)
|
||||
querys := ctx.BodyJSONMap(c)
|
||||
querys["pageNum"] = 1
|
||||
querys["pageSize"] = 10000
|
||||
data := s.sysDictDataService.SelectDictDataPage(querys)
|
||||
if parse.Number(data["total"]) == 0 {
|
||||
// 导出数据记录为空
|
||||
c.JSON(200, result.ErrMsg(i18n.TKey(language, "app.common.exportEmpty")))
|
||||
return
|
||||
}
|
||||
rows := data["rows"].([]model.SysDictData)
|
||||
|
||||
rows := s.sysDictDataService.SelectDictDataList(model.SysDictData{})
|
||||
// rows := s.sysDictDataService.SelectDictDataList(model.SysDictData{})
|
||||
if len(rows) <= 0 {
|
||||
// 导出数据记录为空
|
||||
c.JSON(200, result.ErrMsg(i18n.TKey(language, "app.common.exportEmpty")))
|
||||
|
||||
@@ -244,16 +244,18 @@ func (s *SysDictTypeController) DictOptionselect(c *gin.Context) {
|
||||
func (s *SysDictTypeController) Export(c *gin.Context) {
|
||||
language := ctx.AcceptLanguage(c)
|
||||
// 查询结果,根据查询条件结果,单页最大值限制
|
||||
// querys := ctx.BodyJSONMap(c)
|
||||
// data := s.sysDictTypeService.SelectDictTypePage(querys)
|
||||
// if data["total"].(int64) == 0 {
|
||||
// // 导出数据记录为空
|
||||
// c.JSON(200, result.ErrMsg(i18n.TKey(language, "app.common.exportEmpty")))
|
||||
// return
|
||||
// }
|
||||
// rows := data["rows"].([]model.SysDictType)
|
||||
querys := ctx.BodyJSONMap(c)
|
||||
querys["pageNum"] = 1
|
||||
querys["pageSize"] = 10000
|
||||
data := s.sysDictTypeService.SelectDictTypePage(querys)
|
||||
if parse.Number(data["total"]) == 0 {
|
||||
// 导出数据记录为空
|
||||
c.JSON(200, result.ErrMsg(i18n.TKey(language, "app.common.exportEmpty")))
|
||||
return
|
||||
}
|
||||
rows := data["rows"].([]model.SysDictType)
|
||||
|
||||
rows := s.sysDictTypeService.SelectDictTypeList(model.SysDictType{})
|
||||
// rows := s.sysDictTypeService.SelectDictTypeList(model.SysDictType{})
|
||||
if len(rows) <= 0 {
|
||||
// 导出数据记录为空
|
||||
c.JSON(200, result.ErrMsg(i18n.TKey(language, "app.common.exportEmpty")))
|
||||
|
||||
@@ -122,16 +122,19 @@ func (s *SysLogLoginController) Unlock(c *gin.Context) {
|
||||
func (s *SysLogLoginController) Export(c *gin.Context) {
|
||||
language := ctx.AcceptLanguage(c)
|
||||
// 查询结果,根据查询条件结果,单页最大值限制
|
||||
// querys := ctx.BodyJSONMap(c)
|
||||
// data := s.sysLogLoginService.SelectSysLogLoginPage(querys)
|
||||
// if data["total"].(int64) == 0 {
|
||||
// // 导出数据记录为空
|
||||
// c.JSON(200, result.ErrMsg(i18n.TKey(language, "app.common.exportEmpty")))
|
||||
// return
|
||||
// }
|
||||
// rows := data["rows"].([]model.SysLogLogin)
|
||||
querys := ctx.BodyJSONMap(c)
|
||||
querys["pageNum"] = 1
|
||||
querys["pageSize"] = 10000
|
||||
dataScopeSQL := ctx.LoginUserToDataScopeSQL(c, "d", "u")
|
||||
data := s.sysLogLoginService.SelectSysLogLoginPage(querys, dataScopeSQL)
|
||||
if parse.Number(data["total"]) == 0 {
|
||||
// 导出数据记录为空
|
||||
c.JSON(200, result.ErrMsg(i18n.TKey(language, "app.common.exportEmpty")))
|
||||
return
|
||||
}
|
||||
rows := data["rows"].([]model.SysLogLogin)
|
||||
|
||||
rows := s.sysLogLoginService.SelectSysLogLoginList(model.SysLogLogin{})
|
||||
// rows := s.sysLogLoginService.SelectSysLogLoginList(model.SysLogLogin{})
|
||||
if len(rows) <= 0 {
|
||||
// 导出数据记录为空
|
||||
c.JSON(200, result.ErrMsg(i18n.TKey(language, "app.common.exportEmpty")))
|
||||
|
||||
@@ -110,16 +110,19 @@ func (s *SysLogOperateController) Clean(c *gin.Context) {
|
||||
func (s *SysLogOperateController) Export(c *gin.Context) {
|
||||
language := ctx.AcceptLanguage(c)
|
||||
// 查询结果,根据查询条件结果,单页最大值限制
|
||||
// querys := ctx.BodyJSONMap(c)
|
||||
// data := s.SysLogOperateService.SelectSysLogOperatePage(querys)
|
||||
// if data["total"].(int64) == 0 {
|
||||
// // 导出数据记录为空
|
||||
// c.JSON(200, result.ErrMsg(i18n.TKey(language, "app.common.exportEmpty")))
|
||||
// return
|
||||
// }
|
||||
// rows := data["rows"].([]model.SysLogOperate)
|
||||
querys := ctx.BodyJSONMap(c)
|
||||
querys["pageNum"] = 1
|
||||
querys["pageSize"] = 10000
|
||||
dataScopeSQL := ctx.LoginUserToDataScopeSQL(c, "d", "u")
|
||||
data := s.SysLogOperateService.SelectSysLogOperatePage(querys, dataScopeSQL)
|
||||
if parse.Number(data["total"]) == 0 {
|
||||
// 导出数据记录为空
|
||||
c.JSON(200, result.ErrMsg(i18n.TKey(language, "app.common.exportEmpty")))
|
||||
return
|
||||
}
|
||||
rows := data["rows"].([]model.SysLogOperate)
|
||||
|
||||
rows := s.SysLogOperateService.SelectSysLogOperateList(model.SysLogOperate{})
|
||||
// rows := s.SysLogOperateService.SelectSysLogOperateList(model.SysLogOperate{})
|
||||
if len(rows) <= 0 {
|
||||
// 导出数据记录为空
|
||||
c.JSON(200, result.ErrMsg(i18n.TKey(language, "app.common.exportEmpty")))
|
||||
|
||||
@@ -210,13 +210,11 @@ func (s *SysPostController) Remove(c *gin.Context) {
|
||||
func (s *SysPostController) Export(c *gin.Context) {
|
||||
language := ctx.AcceptLanguage(c)
|
||||
// 查询结果,根据查询条件结果,单页最大值限制
|
||||
// querys := ctx.BodyJSONMap(c)
|
||||
querys := map[string]any{
|
||||
"pageNum": 1,
|
||||
"pageSize": 1000,
|
||||
}
|
||||
querys := ctx.BodyJSONMap(c)
|
||||
querys["pageNum"] = 1
|
||||
querys["pageSize"] = 10000
|
||||
data := s.sysPostService.SelectPostPage(querys)
|
||||
if data["total"].(int64) == 0 {
|
||||
if parse.Number(data["total"]) == 0 {
|
||||
// 导出数据记录为空
|
||||
c.JSON(200, result.ErrMsg(i18n.TKey(language, "app.common.exportEmpty")))
|
||||
return
|
||||
|
||||
@@ -413,14 +413,12 @@ func (s *SysRoleController) AuthUserChecked(c *gin.Context) {
|
||||
func (s *SysRoleController) Export(c *gin.Context) {
|
||||
language := ctx.AcceptLanguage(c)
|
||||
// 查询结果,根据查询条件结果,单页最大值限制
|
||||
// querys := ctx.BodyJSONMap(c)
|
||||
querys := map[string]any{
|
||||
"pageNum": 1,
|
||||
"pageSize": 1000,
|
||||
}
|
||||
querys := ctx.BodyJSONMap(c)
|
||||
querys["pageNum"] = 1
|
||||
querys["pageSize"] = 10000
|
||||
dataScopeSQL := ctx.LoginUserToDataScopeSQL(c, "d", "")
|
||||
data := s.sysRoleService.SelectRolePage(querys, dataScopeSQL)
|
||||
if data["total"].(int64) == 0 {
|
||||
if parse.Number(data["total"]) == 0 {
|
||||
// 导出数据记录为空
|
||||
c.JSON(200, result.ErrMsg(i18n.TKey(language, "app.common.exportEmpty")))
|
||||
return
|
||||
|
||||
@@ -473,14 +473,12 @@ func (s *SysUserController) Status(c *gin.Context) {
|
||||
func (s *SysUserController) Export(c *gin.Context) {
|
||||
language := ctx.AcceptLanguage(c)
|
||||
// 查询结果,根据查询条件结果,单页最大值限制
|
||||
// querys := ctx.BodyJSONMap(c)
|
||||
querys := map[string]any{
|
||||
"pageNum": 1,
|
||||
"pageSize": 1000,
|
||||
}
|
||||
querys := ctx.BodyJSONMap(c)
|
||||
querys["pageNum"] = 1
|
||||
querys["pageSize"] = 10000
|
||||
dataScopeSQL := ctx.LoginUserToDataScopeSQL(c, "d", "u")
|
||||
data := s.sysUserService.SelectUserPage(querys, dataScopeSQL)
|
||||
if data["total"].(int64) == 0 {
|
||||
if parse.Number(data["total"]) == 0 {
|
||||
// 导出数据记录为空
|
||||
c.JSON(200, result.ErrMsg(i18n.TKey(language, "app.common.exportEmpty")))
|
||||
return
|
||||
|
||||
@@ -3,7 +3,6 @@ package controller
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@@ -20,14 +19,15 @@ import (
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// 实例化控制层 WSController 结构体
|
||||
// NewWSController 实例化控制层 WSController 结构体
|
||||
var NewWSController = &WSController{
|
||||
wsService: service.NewWSImpl,
|
||||
wsSendService: service.NewWSSendImpl,
|
||||
neHostService: neService.NewNeHostImpl,
|
||||
neInfoService: neService.NewNeInfoImpl,
|
||||
}
|
||||
|
||||
// WebSocket通信
|
||||
// WSController WebSocket通信
|
||||
//
|
||||
// PATH /ws
|
||||
type WSController struct {
|
||||
@@ -37,9 +37,11 @@ type WSController struct {
|
||||
wsSendService service.IWSSend
|
||||
// 网元主机连接服务
|
||||
neHostService neService.INeHost
|
||||
// 网元信息服务
|
||||
neInfoService neService.INeInfo
|
||||
}
|
||||
|
||||
// 通用
|
||||
// WS 通用
|
||||
//
|
||||
// GET /?subGroupIDs=0
|
||||
func (s *WSController) WS(c *gin.Context) {
|
||||
@@ -71,19 +73,21 @@ func (s *WSController) WS(c *gin.Context) {
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
wsClient := s.wsService.NewClient(loginUser.UserID, subGroupIDs, conn, nil)
|
||||
wsClient := s.wsService.ClientCreate(loginUser.UserID, subGroupIDs, conn, nil)
|
||||
go s.wsService.ClientWriteListen(wsClient)
|
||||
go s.wsService.ClientReadListen(wsClient, service.ReceiveCommont)
|
||||
|
||||
// 等待停止信号
|
||||
for value := range wsClient.StopChan {
|
||||
s.wsService.CloseClient(wsClient.ID)
|
||||
s.wsService.ClientClose(wsClient.ID)
|
||||
logger.Infof("ws Stop Client UID %s %s", wsClient.BindUid, value)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// 测试
|
||||
// Test 测试
|
||||
//
|
||||
// GET /test?clientId=&groupID=
|
||||
// GET /test?clientId=xxx&groupID=xxx
|
||||
func (s *WSController) Test(c *gin.Context) {
|
||||
language := ctx.AcceptLanguage(c)
|
||||
|
||||
@@ -115,11 +119,26 @@ func (s *WSController) Test(c *gin.Context) {
|
||||
c.JSON(200, result.OkData(errMsgArr))
|
||||
}
|
||||
|
||||
// SSH终端
|
||||
// SSH 终端
|
||||
//
|
||||
// GET /ssh?hostId=1&cols=80&rows=40
|
||||
func (s *WSController) SSH(c *gin.Context) {
|
||||
language := ctx.AcceptLanguage(c)
|
||||
var query struct {
|
||||
HostId string `form:"hostId" binding:"required"` // 连接主机ID
|
||||
Cols int `form:"cols"` // 终端单行字符数
|
||||
Rows int `form:"rows"` // 终端显示行数
|
||||
}
|
||||
if err := c.ShouldBindQuery(&query); err != nil {
|
||||
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
|
||||
return
|
||||
}
|
||||
if query.Cols < 80 || query.Cols > 400 {
|
||||
query.Cols = 80
|
||||
}
|
||||
if query.Rows < 40 || query.Rows > 1200 {
|
||||
query.Rows = 40
|
||||
}
|
||||
|
||||
// 登录用户信息
|
||||
loginUser, err := ctx.LoginUser(c)
|
||||
@@ -128,14 +147,8 @@ func (s *WSController) SSH(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
// 连接主机ID
|
||||
hostId := c.Query("hostId")
|
||||
if hostId == "" {
|
||||
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
|
||||
return
|
||||
}
|
||||
neHost := s.neHostService.SelectById(hostId)
|
||||
if neHost.HostID != hostId || neHost.HostType != "ssh" {
|
||||
neHost := s.neHostService.SelectById(query.HostId)
|
||||
if neHost.HostID != query.HostId || neHost.HostType != "ssh" {
|
||||
// 没有可访问主机信息数据!
|
||||
c.JSON(200, result.ErrMsg(i18n.TKey(language, "neHost.noData")))
|
||||
return
|
||||
@@ -158,19 +171,8 @@ func (s *WSController) SSH(c *gin.Context) {
|
||||
}
|
||||
defer client.Close()
|
||||
|
||||
// 终端单行字符数
|
||||
cols, err := strconv.Atoi(c.Query("cols"))
|
||||
if err != nil {
|
||||
cols = 80
|
||||
}
|
||||
// 终端显示行数
|
||||
rows, err := strconv.Atoi(c.Query("rows"))
|
||||
if err != nil {
|
||||
rows = 40
|
||||
}
|
||||
|
||||
// 创建SSH客户端会话
|
||||
clientSession, err := client.NewClientSession(cols, rows)
|
||||
clientSession, err := client.NewClientSession(query.Cols, query.Rows)
|
||||
if err != nil {
|
||||
// 连接主机失败,请检查连接参数后重试
|
||||
c.JSON(200, result.ErrMsg(i18n.TKey(language, "neHost.errByHostInfo")))
|
||||
@@ -185,18 +187,21 @@ func (s *WSController) SSH(c *gin.Context) {
|
||||
}
|
||||
defer wsConn.Close()
|
||||
|
||||
wsClient := s.wsService.NewClient(loginUser.UserID, nil, wsConn, clientSession)
|
||||
wsClient := s.wsService.ClientCreate(loginUser.UserID, nil, wsConn, clientSession)
|
||||
go s.wsService.ClientWriteListen(wsClient)
|
||||
go s.wsService.ClientReadListen(wsClient, service.ReceiveShell)
|
||||
|
||||
// 实时读取SSH消息直接输出
|
||||
msTicker := time.NewTicker(100 * time.Millisecond)
|
||||
defer msTicker.Stop()
|
||||
go func() {
|
||||
for ms := range msTicker.C {
|
||||
for {
|
||||
select {
|
||||
case ms := <-msTicker.C:
|
||||
outputByte := clientSession.Read()
|
||||
if len(outputByte) > 0 {
|
||||
outputStr := string(outputByte)
|
||||
msgByte, _ := json.Marshal(result.Ok(map[string]any{
|
||||
"requestId": fmt.Sprintf("ssh_%s_%d", hostId, ms.UnixMilli()),
|
||||
"requestId": fmt.Sprintf("ssh_%s_%d", neHost.HostID, ms.UnixMilli()),
|
||||
"data": outputStr,
|
||||
}))
|
||||
wsClient.MsgChan <- msgByte
|
||||
@@ -208,22 +213,34 @@ func (s *WSController) SSH(c *gin.Context) {
|
||||
// return
|
||||
// }
|
||||
}
|
||||
case <-wsClient.StopChan: // 等待停止信号
|
||||
s.wsService.ClientClose(wsClient.ID)
|
||||
logger.Infof("ws Stop Client UID %s", wsClient.BindUid)
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
// 等待停止信号
|
||||
for value := range wsClient.StopChan {
|
||||
s.wsService.CloseClient(wsClient.ID)
|
||||
logger.Infof("ws Stop Client UID %s %s", wsClient.BindUid, value)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Telnet终端
|
||||
// Telnet 终端
|
||||
//
|
||||
// GET /telnet?hostId=1
|
||||
func (s *WSController) Telnet(c *gin.Context) {
|
||||
language := ctx.AcceptLanguage(c)
|
||||
var query struct {
|
||||
HostId string `form:"hostId" binding:"required"` // 连接主机ID
|
||||
Cols int `form:"cols"` // 终端单行字符数
|
||||
Rows int `form:"rows"` // 终端显示行数
|
||||
}
|
||||
if err := c.ShouldBindQuery(&query); err != nil {
|
||||
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
|
||||
return
|
||||
}
|
||||
if query.Cols < 120 || query.Cols > 400 {
|
||||
query.Cols = 120
|
||||
}
|
||||
if query.Rows < 128 || query.Rows > 1200 {
|
||||
query.Rows = 128
|
||||
}
|
||||
|
||||
// 登录用户信息
|
||||
loginUser, err := ctx.LoginUser(c)
|
||||
@@ -232,14 +249,8 @@ func (s *WSController) Telnet(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
// 连接主机ID
|
||||
hostId := c.Query("hostId")
|
||||
if hostId == "" {
|
||||
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
|
||||
return
|
||||
}
|
||||
neHost := s.neHostService.SelectById(hostId)
|
||||
if neHost.HostID != hostId || neHost.HostType != "telnet" {
|
||||
neHost := s.neHostService.SelectById(query.HostId)
|
||||
if neHost.HostID != query.HostId || neHost.HostType != "telnet" {
|
||||
// 没有可访问主机信息数据!
|
||||
c.JSON(200, result.ErrMsg(i18n.TKey(language, "neHost.noData")))
|
||||
return
|
||||
@@ -255,20 +266,8 @@ func (s *WSController) Telnet(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
defer client.Close()
|
||||
|
||||
// 终端单行字符数
|
||||
cols, err := strconv.Atoi(c.DefaultQuery("cols", "120"))
|
||||
if err != nil {
|
||||
cols = 120
|
||||
}
|
||||
// 终端显示行数
|
||||
rows, err := strconv.Atoi(c.DefaultQuery("rows", "128"))
|
||||
if err != nil {
|
||||
rows = 128
|
||||
}
|
||||
|
||||
// 创建Telnet客户端会话
|
||||
clientSession, err := client.NewClientSession(cols, rows)
|
||||
clientSession, err := client.NewClientSession(query.Cols, query.Rows)
|
||||
if err != nil {
|
||||
// 连接主机失败,请检查连接参数后重试
|
||||
c.JSON(200, result.ErrMsg(i18n.TKey(language, "neHost.errByHostInfo")))
|
||||
@@ -283,18 +282,25 @@ func (s *WSController) Telnet(c *gin.Context) {
|
||||
}
|
||||
defer wsConn.Close()
|
||||
|
||||
wsClient := s.wsService.NewClient(loginUser.UserID, nil, wsConn, clientSession)
|
||||
wsClient := s.wsService.ClientCreate(loginUser.UserID, nil, wsConn, clientSession)
|
||||
go s.wsService.ClientWriteListen(wsClient)
|
||||
go s.wsService.ClientReadListen(wsClient, service.ReceiveTelnet)
|
||||
|
||||
// 等待1秒,排空首次消息
|
||||
time.Sleep(1 * time.Second)
|
||||
_ = clientSession.Read()
|
||||
|
||||
// 实时读取Telnet消息直接输出
|
||||
msTicker := time.NewTicker(100 * time.Millisecond)
|
||||
defer msTicker.Stop()
|
||||
go func() {
|
||||
for ms := range msTicker.C {
|
||||
for {
|
||||
select {
|
||||
case ms := <-msTicker.C:
|
||||
outputByte := clientSession.Read()
|
||||
if len(outputByte) > 0 {
|
||||
outputStr := strings.TrimRight(string(outputByte), "\x00")
|
||||
msgByte, _ := json.Marshal(result.Ok(map[string]any{
|
||||
"requestId": fmt.Sprintf("telnet_%s_%d", hostId, ms.UnixMilli()),
|
||||
"requestId": fmt.Sprintf("telnet_%s_%d", neHost.HostID, ms.UnixMilli()),
|
||||
"data": outputStr,
|
||||
}))
|
||||
wsClient.MsgChan <- msgByte
|
||||
@@ -306,13 +312,92 @@ func (s *WSController) Telnet(c *gin.Context) {
|
||||
// return
|
||||
// }
|
||||
}
|
||||
case <-wsClient.StopChan: // 等待停止信号
|
||||
s.wsService.ClientClose(wsClient.ID)
|
||||
logger.Infof("ws Stop Client UID %s", wsClient.BindUid)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ShellView 终端交互式文件内容查看
|
||||
//
|
||||
// GET /view
|
||||
func (s *WSController) ShellView(c *gin.Context) {
|
||||
language := ctx.AcceptLanguage(c)
|
||||
var query struct {
|
||||
NeType string `form:"neType" binding:"required"`
|
||||
NeId string `form:"neId" binding:"required"`
|
||||
Cols int `form:"cols"` // 终端单行字符数
|
||||
Rows int `form:"rows"` // 终端显示行数
|
||||
}
|
||||
if err := c.ShouldBindQuery(&query); err != nil {
|
||||
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
|
||||
return
|
||||
}
|
||||
if query.Cols < 120 || query.Cols > 400 {
|
||||
query.Cols = 120
|
||||
}
|
||||
if query.Rows < 40 || query.Rows > 1200 {
|
||||
query.Rows = 40
|
||||
}
|
||||
|
||||
// 登录用户信息
|
||||
loginUser, err := ctx.LoginUser(c)
|
||||
if err != nil {
|
||||
c.JSON(401, result.CodeMsg(401, i18n.TKey(language, err.Error())))
|
||||
return
|
||||
}
|
||||
|
||||
// 网元主机的SSH客户端
|
||||
sshClient, err := s.neInfoService.NeRunSSHClient(query.NeType, query.NeId)
|
||||
if err != nil {
|
||||
c.JSON(200, result.ErrMsg(err.Error()))
|
||||
return
|
||||
}
|
||||
defer sshClient.Close()
|
||||
// ssh连接会话
|
||||
clientSession, err := sshClient.NewClientSession(query.Cols, query.Rows)
|
||||
if err != nil {
|
||||
c.JSON(200, result.ErrMsg("neinfo ssh client session new err"))
|
||||
return
|
||||
}
|
||||
defer clientSession.Close()
|
||||
|
||||
// 将 HTTP 连接升级为 WebSocket 连接
|
||||
wsConn := s.wsService.UpgraderWs(c.Writer, c.Request)
|
||||
if wsConn == nil {
|
||||
return
|
||||
}
|
||||
defer wsConn.Close()
|
||||
|
||||
wsClient := s.wsService.ClientCreate(loginUser.UserID, nil, wsConn, clientSession)
|
||||
go s.wsService.ClientWriteListen(wsClient)
|
||||
go s.wsService.ClientReadListen(wsClient, service.ReceiveShellView)
|
||||
|
||||
// 等待1秒,排空首次消息
|
||||
time.Sleep(1 * time.Second)
|
||||
_ = clientSession.Read()
|
||||
|
||||
// 实时读取SSH消息直接输出
|
||||
msTicker := time.NewTicker(100 * time.Millisecond)
|
||||
defer msTicker.Stop()
|
||||
for {
|
||||
select {
|
||||
case ms := <-msTicker.C:
|
||||
outputByte := clientSession.Read()
|
||||
if len(outputByte) > 0 {
|
||||
outputStr := string(outputByte)
|
||||
msgByte, _ := json.Marshal(result.Ok(map[string]any{
|
||||
"requestId": fmt.Sprintf("view_%d", ms.UnixMilli()),
|
||||
"data": outputStr,
|
||||
}))
|
||||
wsClient.MsgChan <- msgByte
|
||||
}
|
||||
case <-wsClient.StopChan: // 等待停止信号
|
||||
s.wsService.ClientClose(wsClient.ID)
|
||||
logger.Infof("ws Stop Client UID %s", wsClient.BindUid)
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
// 等待停止信号
|
||||
for value := range wsClient.StopChan {
|
||||
s.wsService.CloseClient(wsClient.ID)
|
||||
logger.Infof("ws Stop Client UID %s %s", wsClient.BindUid, value)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"be.ems/src/framework/vo/result"
|
||||
neDataModel "be.ems/src/modules/network_data/model"
|
||||
neDataService "be.ems/src/modules/network_data/service"
|
||||
neInfoService "be.ems/src/modules/network_element/service"
|
||||
)
|
||||
|
||||
// GetCDRConnectByIMS 获取CDR会话事件-IMS
|
||||
@@ -20,6 +21,13 @@ func GetCDRConnectByIMS(requestID string, data any) ([]byte, error) {
|
||||
return nil, fmt.Errorf("query data structure error")
|
||||
}
|
||||
|
||||
// 查询网元信息 rmUID
|
||||
neInfo := neInfoService.NewNeInfoImpl.SelectNeInfoByNeTypeAndNeID(query.NeType, query.NeID)
|
||||
if neInfo.NeId != query.NeID || neInfo.IP == "" {
|
||||
return nil, fmt.Errorf("query neinfo not found")
|
||||
}
|
||||
query.RmUID = neInfo.RmUID
|
||||
|
||||
dataMap := neDataService.NewCDREventIMSImpl.SelectPage(query)
|
||||
resultByte, err := json.Marshal(result.Ok(map[string]any{
|
||||
"requestId": requestID,
|
||||
@@ -38,6 +46,13 @@ func GetCDRConnectBySMF(requestID string, data any) ([]byte, error) {
|
||||
return nil, fmt.Errorf("query data structure error")
|
||||
}
|
||||
|
||||
// 查询网元信息 rmUID
|
||||
neInfo := neInfoService.NewNeInfoImpl.SelectNeInfoByNeTypeAndNeID(query.NeType, query.NeID)
|
||||
if neInfo.NeId != query.NeID || neInfo.IP == "" {
|
||||
return nil, fmt.Errorf("query neinfo not found")
|
||||
}
|
||||
query.RmUID = neInfo.RmUID
|
||||
|
||||
dataMap := neDataService.NewCDREventSMFImpl.SelectPage(query)
|
||||
resultByte, err := json.Marshal(result.Ok(map[string]any{
|
||||
"requestId": requestID,
|
||||
@@ -45,3 +60,28 @@ func GetCDRConnectBySMF(requestID string, data any) ([]byte, error) {
|
||||
}))
|
||||
return resultByte, err
|
||||
}
|
||||
|
||||
// GetCDRConnectBySMSC 获取CDR会话事件-SMSC
|
||||
func GetCDRConnectBySMSC(requestID string, data any) ([]byte, error) {
|
||||
msgByte, _ := json.Marshal(data)
|
||||
var query neDataModel.CDREventSMSCQuery
|
||||
err := json.Unmarshal(msgByte, &query)
|
||||
if err != nil {
|
||||
logger.Warnf("ws processor GetCDRConnect err: %s", err.Error())
|
||||
return nil, fmt.Errorf("query data structure error")
|
||||
}
|
||||
|
||||
// 查询网元信息 rmUID
|
||||
neInfo := neInfoService.NewNeInfoImpl.SelectNeInfoByNeTypeAndNeID(query.NeType, query.NeID)
|
||||
if neInfo.NeId != query.NeID || neInfo.IP == "" {
|
||||
return nil, fmt.Errorf("query neinfo not found")
|
||||
}
|
||||
query.RmUID = neInfo.RmUID
|
||||
|
||||
dataMap := neDataService.NewCDREventSMSCImpl.SelectPage(query)
|
||||
resultByte, err := json.Marshal(result.Ok(map[string]any{
|
||||
"requestId": requestID,
|
||||
"data": dataMap,
|
||||
}))
|
||||
return resultByte, err
|
||||
}
|
||||
|
||||
71
src/modules/ws/processor/shell_command.go
Normal file
71
src/modules/ws/processor/shell_command.go
Normal file
@@ -0,0 +1,71 @@
|
||||
package processor
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"be.ems/src/framework/logger"
|
||||
)
|
||||
|
||||
// ParseCat 解析拼装cat命令
|
||||
func ParseCat(reqData any) (string, error) {
|
||||
msgByte, _ := json.Marshal(reqData)
|
||||
var data struct {
|
||||
FilePath string `json:"filePath"` // 文件地址
|
||||
ShowNumber bool `json:"showNumber"` // 显示文件的行号,从 1 开始
|
||||
ShowAll bool `json:"showAll"` // 结合 -vET 参数,显示所有特殊字符,包括行尾符、制表符等
|
||||
}
|
||||
if err := json.Unmarshal(msgByte, &data); err != nil {
|
||||
logger.Warnf("ws processor ParseCat err: %s", err.Error())
|
||||
return "", fmt.Errorf("query data structure error")
|
||||
}
|
||||
if data.FilePath == "" {
|
||||
return "", fmt.Errorf("query data filePath empty")
|
||||
}
|
||||
|
||||
command := []string{"cat"}
|
||||
if data.ShowNumber {
|
||||
command = append(command, "-n")
|
||||
}
|
||||
if data.ShowAll {
|
||||
command = append(command, "-A")
|
||||
}
|
||||
|
||||
command = append(command, data.FilePath)
|
||||
command = append(command, "\n")
|
||||
return strings.Join(command, " "), nil
|
||||
}
|
||||
|
||||
// ParseTail 解析拼装tail命令
|
||||
func ParseTail(reqData any) (string, error) {
|
||||
msgByte, _ := json.Marshal(reqData)
|
||||
var data struct {
|
||||
FilePath string `json:"filePath"` // 文件地址
|
||||
Lines int `json:"lines"` // 显示文件末尾的指定行数
|
||||
Char int `json:"char"` // 显示文件末尾的指定字数
|
||||
Follow bool `json:"follow"` // 输出文件末尾的内容,并继续监视文件的新增内容
|
||||
}
|
||||
if err := json.Unmarshal(msgByte, &data); err != nil {
|
||||
logger.Warnf("ws processor ParseTail err: %s", err.Error())
|
||||
return "", fmt.Errorf("query data structure error")
|
||||
}
|
||||
if data.FilePath == "" {
|
||||
return "", fmt.Errorf("query data filePath empty")
|
||||
}
|
||||
|
||||
command := []string{"tail"}
|
||||
if data.Follow {
|
||||
command = append(command, "-f")
|
||||
}
|
||||
if data.Lines > 0 {
|
||||
command = append(command, fmt.Sprintf("-n %d", data.Lines))
|
||||
}
|
||||
if data.Char > 0 {
|
||||
command = append(command, fmt.Sprintf("-c %d", data.Char))
|
||||
}
|
||||
|
||||
command = append(command, data.FilePath)
|
||||
command = append(command, "\n")
|
||||
return strings.Join(command, " "), nil
|
||||
}
|
||||
@@ -12,14 +12,21 @@ type IWS interface {
|
||||
// UpgraderWs http升级ws请求
|
||||
UpgraderWs(w http.ResponseWriter, r *http.Request) *websocket.Conn
|
||||
|
||||
// NewClient 新建客户端
|
||||
// ClientCreate 客户端新建
|
||||
//
|
||||
// uid 登录用户ID
|
||||
// groupIDs 用户订阅组
|
||||
// conn ws连接实例
|
||||
// childConn 子连接实例
|
||||
NewClient(uid string, groupIDs []string, conn *websocket.Conn, childConn any) *model.WSClient
|
||||
ClientCreate(uid string, groupIDs []string, conn *websocket.Conn, childConn any) *model.WSClient
|
||||
|
||||
// CloseClient 关闭客户端
|
||||
CloseClient(clientID string)
|
||||
// ClientClose 客户端关闭
|
||||
ClientClose(clientID string)
|
||||
|
||||
// ClientReadListen 客户端读取消息监听
|
||||
// receiveType 根据接收类型进行消息处理
|
||||
ClientReadListen(wsClient *model.WSClient, receiveType int)
|
||||
|
||||
// ClientWriteListen 客户端写入消息监听
|
||||
ClientWriteListen(wsClient *model.WSClient)
|
||||
}
|
||||
|
||||
@@ -14,15 +14,12 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
// ws客户端 [clientId: client]
|
||||
WsClients sync.Map
|
||||
// ws用户对应的多个客户端id [uid:clientIds]
|
||||
WsUsers sync.Map
|
||||
// ws组对应的多个用户id [groupID:uids]
|
||||
WsGroup sync.Map
|
||||
wsClients sync.Map // ws客户端 [clientId: client]
|
||||
wsUsers sync.Map // ws用户对应的多个客户端id [uid:clientIds]
|
||||
wsGroup sync.Map // ws组对应的多个客户端id [groupId:clientIds]
|
||||
)
|
||||
|
||||
// 实例化服务层 WSImpl 结构体
|
||||
// NewWSImpl 实例化服务层 WSImpl 结构体
|
||||
var NewWSImpl = &WSImpl{}
|
||||
|
||||
// WSImpl WebSocket通信 服务层处理
|
||||
@@ -50,13 +47,13 @@ func (s *WSImpl) UpgraderWs(w http.ResponseWriter, r *http.Request) *websocket.C
|
||||
return conn
|
||||
}
|
||||
|
||||
// NewClient 新建客户端
|
||||
// ClientCreate 客户端新建
|
||||
//
|
||||
// uid 登录用户ID
|
||||
// groupIDs 用户订阅组
|
||||
// conn ws连接实例
|
||||
// childConn 子连接实例
|
||||
func (s *WSImpl) NewClient(uid string, groupIDs []string, conn *websocket.Conn, childConn any) *model.WSClient {
|
||||
func (s *WSImpl) ClientCreate(uid string, groupIDs []string, conn *websocket.Conn, childConn any) *model.WSClient {
|
||||
// clientID也可以用其他方式生成,只要能保证在所有服务端中都能保证唯一即可
|
||||
clientID := generate.Code(16)
|
||||
|
||||
@@ -72,122 +69,52 @@ func (s *WSImpl) NewClient(uid string, groupIDs []string, conn *websocket.Conn,
|
||||
}
|
||||
|
||||
// 存入客户端
|
||||
WsClients.Store(clientID, wsClient)
|
||||
wsClients.Store(clientID, wsClient)
|
||||
|
||||
// 存入用户持有客户端
|
||||
if uid != "" {
|
||||
if v, ok := WsUsers.Load(uid); ok {
|
||||
if v, ok := wsUsers.Load(uid); ok {
|
||||
uidClientIds := v.(*[]string)
|
||||
*uidClientIds = append(*uidClientIds, clientID)
|
||||
} else {
|
||||
WsUsers.Store(uid, &[]string{clientID})
|
||||
wsUsers.Store(uid, &[]string{clientID})
|
||||
}
|
||||
}
|
||||
|
||||
// 存入用户订阅组
|
||||
if uid != "" && len(groupIDs) > 0 {
|
||||
for _, groupID := range groupIDs {
|
||||
if v, ok := WsGroup.Load(groupID); ok {
|
||||
groupUIDs := v.(*[]string)
|
||||
// 避免同组内相同用户
|
||||
hasUid := false
|
||||
for _, uidv := range *groupUIDs {
|
||||
if uidv == uid {
|
||||
hasUid = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !hasUid {
|
||||
*groupUIDs = append(*groupUIDs, uid)
|
||||
}
|
||||
if v, ok := wsGroup.Load(groupID); ok {
|
||||
groupClientIds := v.(*[]string)
|
||||
*groupClientIds = append(*groupClientIds, clientID)
|
||||
} else {
|
||||
WsGroup.Store(groupID, &[]string{uid})
|
||||
wsGroup.Store(groupID, &[]string{clientID})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
go s.clientRead(wsClient)
|
||||
go s.clientWrite(wsClient)
|
||||
|
||||
// 发客户端id确认是否连接
|
||||
msgByte, _ := json.Marshal(result.OkData(map[string]string{
|
||||
"clientId": clientID,
|
||||
}))
|
||||
wsClient.MsgChan <- msgByte
|
||||
|
||||
return wsClient
|
||||
}
|
||||
|
||||
// clientRead 客户端读取消息
|
||||
func (s *WSImpl) clientRead(wsClient *model.WSClient) {
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
logger.Errorf("ws ReadMessage Panic Error: %v", err)
|
||||
}
|
||||
}()
|
||||
for {
|
||||
// 读取消息
|
||||
messageType, msg, err := wsClient.Conn.ReadMessage()
|
||||
if err != nil {
|
||||
logger.Warnf("ws ReadMessage UID %s err: %s", wsClient.BindUid, err.Error())
|
||||
s.CloseClient(wsClient.ID)
|
||||
return
|
||||
}
|
||||
// fmt.Println(messageType, string(msg))
|
||||
|
||||
// 文本和二进制类型,只处理文本json
|
||||
if messageType == websocket.TextMessage {
|
||||
var reqMsg model.WSRequest
|
||||
err := json.Unmarshal(msg, &reqMsg)
|
||||
if err != nil {
|
||||
msgByte, _ := json.Marshal(result.ErrMsg("message format not supported"))
|
||||
wsClient.MsgChan <- msgByte
|
||||
} else {
|
||||
// 协程异步处理
|
||||
go NewWSReceiveImpl.AsyncReceive(wsClient, reqMsg)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// clientWrite 客户端写入消息
|
||||
func (s *WSImpl) clientWrite(wsClient *model.WSClient) {
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
logger.Errorf("ws WriteMessage Panic Error: %v", err)
|
||||
}
|
||||
}()
|
||||
for msg := range wsClient.MsgChan {
|
||||
// 发送消息
|
||||
err := wsClient.Conn.WriteMessage(websocket.TextMessage, msg)
|
||||
if err != nil {
|
||||
logger.Warnf("ws WriteMessage UID %s err: %s", wsClient.BindUid, err.Error())
|
||||
s.CloseClient(wsClient.ID)
|
||||
return
|
||||
}
|
||||
wsClient.LastHeartbeat = time.Now().UnixMilli()
|
||||
}
|
||||
}
|
||||
|
||||
// CloseClient 客户端关闭
|
||||
func (s *WSImpl) CloseClient(clientID string) {
|
||||
v, ok := WsClients.Load(clientID)
|
||||
// ClientClose 客户端关闭
|
||||
func (s *WSImpl) ClientClose(clientID string) {
|
||||
v, ok := wsClients.Load(clientID)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
client := v.(*model.WSClient)
|
||||
defer func() {
|
||||
client.Conn.WriteMessage(websocket.CloseMessage, []byte{})
|
||||
client.Conn.Close()
|
||||
WsClients.Delete(clientID)
|
||||
client.MsgChan <- []byte("ws:close")
|
||||
client.StopChan <- struct{}{}
|
||||
client.Conn.Close()
|
||||
wsClients.Delete(clientID)
|
||||
}()
|
||||
|
||||
// 客户端断线时自动踢出Uid绑定列表
|
||||
if client.BindUid != "" {
|
||||
if clientIds, ok := WsUsers.Load(client.BindUid); ok {
|
||||
uidClientIds := clientIds.(*[]string)
|
||||
if v, ok := wsUsers.Load(client.BindUid); ok {
|
||||
uidClientIds := v.(*[]string)
|
||||
if len(*uidClientIds) > 0 {
|
||||
tempClientIds := make([]string, 0, len(*uidClientIds))
|
||||
for _, v := range *uidClientIds {
|
||||
@@ -201,23 +128,93 @@ func (s *WSImpl) CloseClient(clientID string) {
|
||||
}
|
||||
|
||||
// 客户端断线时自动踢出已加入的组
|
||||
if client.BindUid != "" && len(client.SubGroup) > 0 {
|
||||
if len(client.SubGroup) > 0 {
|
||||
for _, groupID := range client.SubGroup {
|
||||
uids, ok := WsGroup.Load(groupID)
|
||||
v, ok := wsGroup.Load(groupID)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
groupUIDs := uids.(*[]string)
|
||||
if len(*groupUIDs) > 0 {
|
||||
tempUIDs := make([]string, 0, len(*groupUIDs))
|
||||
for _, v := range *groupUIDs {
|
||||
if v != client.BindUid {
|
||||
tempUIDs = append(tempUIDs, v)
|
||||
groupClientIds := v.(*[]string)
|
||||
if len(*groupClientIds) > 0 {
|
||||
tempClientIds := make([]string, 0, len(*groupClientIds))
|
||||
for _, v := range *groupClientIds {
|
||||
if v != client.ID {
|
||||
tempClientIds = append(tempClientIds, v)
|
||||
}
|
||||
}
|
||||
*groupUIDs = tempUIDs
|
||||
*groupClientIds = tempClientIds
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ClientReadListen 客户端读取消息监听
|
||||
// receiveType 根据接收类型进行消息处理
|
||||
func (s *WSImpl) ClientReadListen(wsClient *model.WSClient, receiveType int) {
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
logger.Errorf("ws ReadMessage Panic Error: %v", err)
|
||||
}
|
||||
}()
|
||||
for {
|
||||
// 读取消息
|
||||
messageType, msg, err := wsClient.Conn.ReadMessage()
|
||||
if err != nil {
|
||||
logger.Warnf("ws ReadMessage UID %s err: %s", wsClient.BindUid, err.Error())
|
||||
s.ClientClose(wsClient.ID)
|
||||
return
|
||||
}
|
||||
// fmt.Println(messageType, string(msg))
|
||||
|
||||
// 文本 只处理文本json
|
||||
if messageType == websocket.TextMessage {
|
||||
var reqMsg model.WSRequest
|
||||
if err := json.Unmarshal(msg, &reqMsg); err != nil {
|
||||
msgByte, _ := json.Marshal(result.ErrMsg("message format json error"))
|
||||
wsClient.MsgChan <- msgByte
|
||||
continue
|
||||
}
|
||||
// 接收器处理
|
||||
switch receiveType {
|
||||
case ReceiveCommont:
|
||||
go NewWSReceiveImpl.Commont(wsClient, reqMsg)
|
||||
case ReceiveShell:
|
||||
go NewWSReceiveImpl.Shell(wsClient, reqMsg)
|
||||
case ReceiveShellView:
|
||||
go NewWSReceiveImpl.ShellView(wsClient, reqMsg)
|
||||
case ReceiveTelnet:
|
||||
go NewWSReceiveImpl.Telnet(wsClient, reqMsg)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ClientWriteListen 客户端写入消息监听
|
||||
func (s *WSImpl) ClientWriteListen(wsClient *model.WSClient) {
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
logger.Errorf("ws WriteMessage Panic Error: %v", err)
|
||||
}
|
||||
}()
|
||||
// 发客户端id确认是否连接
|
||||
msgByte, _ := json.Marshal(result.OkData(map[string]string{
|
||||
"clientId": wsClient.ID,
|
||||
}))
|
||||
wsClient.MsgChan <- msgByte
|
||||
// 消息发送监听
|
||||
for msg := range wsClient.MsgChan {
|
||||
// 关闭句柄
|
||||
if string(msg) == "ws:close" {
|
||||
wsClient.Conn.WriteMessage(websocket.CloseMessage, []byte{})
|
||||
return
|
||||
}
|
||||
// 发送消息
|
||||
err := wsClient.Conn.WriteMessage(websocket.TextMessage, msg)
|
||||
if err != nil {
|
||||
logger.Warnf("ws WriteMessage UID %s err: %s", wsClient.BindUid, err.Error())
|
||||
s.ClientClose(wsClient.ID)
|
||||
return
|
||||
}
|
||||
wsClient.LastHeartbeat = time.Now().UnixMilli()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,8 +2,24 @@ package service
|
||||
|
||||
import "be.ems/src/modules/ws/model"
|
||||
|
||||
const (
|
||||
ReceiveCommont = iota // Commont 接收通用业务处理
|
||||
ReceiveShell // Shell 接收终端交互业务处理
|
||||
ReceiveShellView // ShellView 接收查看文件终端交互业务处理
|
||||
ReceiveTelnet // Telnet 接收终端交互业务处理
|
||||
)
|
||||
|
||||
// IWSReceive WebSocket消息接收处理 服务层接口
|
||||
type IWSReceive interface {
|
||||
// AsyncReceive 接收业务异步处理
|
||||
AsyncReceive(client *model.WSClient, reqMsg model.WSRequest)
|
||||
// Commont 接收通用业务处理
|
||||
Commont(client *model.WSClient, reqMsg model.WSRequest)
|
||||
|
||||
// Shell 接收终端交互业务处理
|
||||
Shell(client *model.WSClient, reqMsg model.WSRequest)
|
||||
|
||||
// ShellView 接收查看文件终端交互业务处理
|
||||
ShellView(client *model.WSClient, reqMsg model.WSRequest)
|
||||
|
||||
// Telnet 接收终端交互业务处理
|
||||
Telnet(client *model.WSClient, reqMsg model.WSRequest)
|
||||
}
|
||||
|
||||
@@ -20,12 +20,22 @@ var NewWSReceiveImpl = &WSReceiveImpl{}
|
||||
// WSReceiveImpl WebSocket消息接收处理 服务层处理
|
||||
type WSReceiveImpl struct{}
|
||||
|
||||
// AsyncReceive 接收业务异步处理
|
||||
func (s *WSReceiveImpl) AsyncReceive(client *model.WSClient, reqMsg model.WSRequest) {
|
||||
// Commont 接收通用业务处理
|
||||
func (s *WSReceiveImpl) close(client *model.WSClient) {
|
||||
// 主动关闭
|
||||
resultByte, _ := json.Marshal(result.OkMsg("user initiated closure"))
|
||||
client.MsgChan <- resultByte
|
||||
// 等待1s后关闭连接
|
||||
time.Sleep(1 * time.Second)
|
||||
NewWSImpl.ClientClose(client.ID)
|
||||
}
|
||||
|
||||
// Commont 接收通用业务处理
|
||||
func (s *WSReceiveImpl) Commont(client *model.WSClient, reqMsg model.WSRequest) {
|
||||
// 必传requestId确认消息
|
||||
if reqMsg.RequestID == "" {
|
||||
msg := "message requestId is required"
|
||||
logger.Infof("ws AsyncReceive UID %s err: %s", client.BindUid, msg)
|
||||
logger.Infof("ws Commont UID %s err: %s", client.BindUid, msg)
|
||||
msgByte, _ := json.Marshal(result.ErrMsg(msg))
|
||||
client.MsgChan <- msgByte
|
||||
return
|
||||
@@ -36,14 +46,61 @@ func (s *WSReceiveImpl) AsyncReceive(client *model.WSClient, reqMsg model.WSRequ
|
||||
|
||||
switch reqMsg.Type {
|
||||
case "close":
|
||||
// 主动关闭
|
||||
resultByte, _ := json.Marshal(result.OkMsg("user initiated closure"))
|
||||
client.MsgChan <- resultByte
|
||||
// 等待1s后关闭连接
|
||||
time.Sleep(1 * time.Second)
|
||||
client.StopChan <- struct{}{}
|
||||
s.close(client)
|
||||
return
|
||||
case "ps":
|
||||
resByte, err = processor.GetProcessData(reqMsg.RequestID, reqMsg.Data)
|
||||
case "net":
|
||||
resByte, err = processor.GetNetConnections(reqMsg.RequestID, reqMsg.Data)
|
||||
case "ims_cdr":
|
||||
resByte, err = processor.GetCDRConnectByIMS(reqMsg.RequestID, reqMsg.Data)
|
||||
case "smf_cdr":
|
||||
resByte, err = processor.GetCDRConnectBySMF(reqMsg.RequestID, reqMsg.Data)
|
||||
case "smsc_cdr":
|
||||
resByte, err = processor.GetCDRConnectBySMSC(reqMsg.RequestID, reqMsg.Data)
|
||||
case "amf_ue":
|
||||
resByte, err = processor.GetUEConnectByAMF(reqMsg.RequestID, reqMsg.Data)
|
||||
case "mme_ue":
|
||||
resByte, err = processor.GetUEConnectByMME(reqMsg.RequestID, reqMsg.Data)
|
||||
case "upf_tf":
|
||||
resByte, err = processor.GetUPFTotalFlow(reqMsg.RequestID, reqMsg.Data)
|
||||
case "ne_state":
|
||||
resByte, err = processor.GetNeState(reqMsg.RequestID, reqMsg.Data)
|
||||
default:
|
||||
err = fmt.Errorf("message type %s not supported", reqMsg.Type)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
logger.Warnf("ws Commont UID %s err: %s", client.BindUid, err.Error())
|
||||
msgByte, _ := json.Marshal(result.ErrMsg(err.Error()))
|
||||
client.MsgChan <- msgByte
|
||||
return
|
||||
}
|
||||
if len(resByte) > 0 {
|
||||
client.MsgChan <- resByte
|
||||
}
|
||||
}
|
||||
|
||||
// Shell 接收终端交互业务处理
|
||||
func (s *WSReceiveImpl) Shell(client *model.WSClient, reqMsg model.WSRequest) {
|
||||
// 必传requestId确认消息
|
||||
if reqMsg.RequestID == "" {
|
||||
msg := "message requestId is required"
|
||||
logger.Infof("ws Shell UID %s err: %s", client.BindUid, msg)
|
||||
msgByte, _ := json.Marshal(result.ErrMsg(msg))
|
||||
client.MsgChan <- msgByte
|
||||
return
|
||||
}
|
||||
|
||||
var resByte []byte
|
||||
var err error
|
||||
|
||||
switch reqMsg.Type {
|
||||
case "close":
|
||||
s.close(client)
|
||||
return
|
||||
case "ssh":
|
||||
// SSH会话消息接收直接写入会话
|
||||
// SSH会话消息接收写入会话
|
||||
command := reqMsg.Data.(string)
|
||||
sshClientSession := client.ChildConn.(*ssh.SSHClientSession)
|
||||
_, err = sshClientSession.Write(command)
|
||||
@@ -59,33 +116,134 @@ func (s *WSReceiveImpl) AsyncReceive(client *model.WSClient, reqMsg model.WSRequ
|
||||
sshClientSession := client.ChildConn.(*ssh.SSHClientSession)
|
||||
err = sshClientSession.Session.WindowChange(data.Rows, data.Cols)
|
||||
}
|
||||
case "telnet":
|
||||
// Telnet会话消息接收直接写入会话
|
||||
command := reqMsg.Data.(string)
|
||||
telnetClientSession := client.ChildConn.(*telnet.TelnetClientSession)
|
||||
_, err = telnetClientSession.Write(command)
|
||||
case "ps":
|
||||
resByte, err = processor.GetProcessData(reqMsg.RequestID, reqMsg.Data)
|
||||
case "net":
|
||||
resByte, err = processor.GetNetConnections(reqMsg.RequestID, reqMsg.Data)
|
||||
case "ims_cdr":
|
||||
resByte, err = processor.GetCDRConnectByIMS(reqMsg.RequestID, reqMsg.Data)
|
||||
case "smf_cdr":
|
||||
resByte, err = processor.GetCDRConnectBySMF(reqMsg.RequestID, reqMsg.Data)
|
||||
case "amf_ue":
|
||||
resByte, err = processor.GetUEConnectByAMF(reqMsg.RequestID, reqMsg.Data)
|
||||
case "mme_ue":
|
||||
resByte, err = processor.GetUEConnectByMME(reqMsg.RequestID, reqMsg.Data)
|
||||
case "upf_tf":
|
||||
resByte, err = processor.GetUPFTotalFlow(reqMsg.RequestID, reqMsg.Data)
|
||||
case "ne_state":
|
||||
resByte, err = processor.GetNeState(reqMsg.RequestID, reqMsg.Data)
|
||||
default:
|
||||
err = fmt.Errorf("message type not supported")
|
||||
err = fmt.Errorf("message type %s not supported", reqMsg.Type)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
logger.Warnf("ws AsyncReceive UID %s err: %s", client.BindUid, err.Error())
|
||||
logger.Warnf("ws Shell UID %s err: %s", client.BindUid, err.Error())
|
||||
msgByte, _ := json.Marshal(result.ErrMsg(err.Error()))
|
||||
client.MsgChan <- msgByte
|
||||
if err == io.EOF {
|
||||
// 等待1s后关闭连接
|
||||
time.Sleep(1 * time.Second)
|
||||
client.StopChan <- struct{}{}
|
||||
}
|
||||
return
|
||||
}
|
||||
if len(resByte) > 0 {
|
||||
client.MsgChan <- resByte
|
||||
}
|
||||
}
|
||||
|
||||
// ShellView 接收查看文件终端交互业务处理
|
||||
func (s *WSReceiveImpl) ShellView(client *model.WSClient, reqMsg model.WSRequest) {
|
||||
// 必传requestId确认消息
|
||||
if reqMsg.RequestID == "" {
|
||||
msg := "message requestId is required"
|
||||
logger.Infof("ws ShellView UID %s err: %s", client.BindUid, msg)
|
||||
msgByte, _ := json.Marshal(result.ErrMsg(msg))
|
||||
client.MsgChan <- msgByte
|
||||
return
|
||||
}
|
||||
|
||||
var resByte []byte
|
||||
var err error
|
||||
|
||||
switch reqMsg.Type {
|
||||
case "close":
|
||||
s.close(client)
|
||||
return
|
||||
case "cat", "tail":
|
||||
var command string
|
||||
if reqMsg.Type == "cat" {
|
||||
command, err = processor.ParseCat(reqMsg.Data)
|
||||
}
|
||||
if reqMsg.Type == "tail" {
|
||||
command, err = processor.ParseTail(reqMsg.Data)
|
||||
}
|
||||
if command != "" && err == nil {
|
||||
sshClientSession := client.ChildConn.(*ssh.SSHClientSession)
|
||||
_, err = sshClientSession.Write(command)
|
||||
}
|
||||
case "ctrl-c":
|
||||
// 模拟按下 Ctrl+C
|
||||
sshClientSession := client.ChildConn.(*ssh.SSHClientSession)
|
||||
_, err = sshClientSession.Write("\u0003\n")
|
||||
case "resize":
|
||||
// 会话窗口重置
|
||||
msgByte, _ := json.Marshal(reqMsg.Data)
|
||||
var data struct {
|
||||
Cols int `json:"cols"`
|
||||
Rows int `json:"rows"`
|
||||
}
|
||||
err = json.Unmarshal(msgByte, &data)
|
||||
if err == nil {
|
||||
sshClientSession := client.ChildConn.(*ssh.SSHClientSession)
|
||||
err = sshClientSession.Session.WindowChange(data.Rows, data.Cols)
|
||||
}
|
||||
default:
|
||||
err = fmt.Errorf("message type %s not supported", reqMsg.Type)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
logger.Warnf("ws ShellView UID %s err: %s", client.BindUid, err.Error())
|
||||
msgByte, _ := json.Marshal(result.ErrMsg(err.Error()))
|
||||
client.MsgChan <- msgByte
|
||||
if err == io.EOF {
|
||||
// 等待1s后关闭连接
|
||||
time.Sleep(1 * time.Second)
|
||||
client.StopChan <- struct{}{}
|
||||
}
|
||||
return
|
||||
}
|
||||
if len(resByte) > 0 {
|
||||
client.MsgChan <- resByte
|
||||
}
|
||||
}
|
||||
|
||||
// Telnet 接收终端交互业务处理
|
||||
func (s *WSReceiveImpl) Telnet(client *model.WSClient, reqMsg model.WSRequest) {
|
||||
// 必传requestId确认消息
|
||||
if reqMsg.RequestID == "" {
|
||||
msg := "message requestId is required"
|
||||
logger.Infof("ws Shell UID %s err: %s", client.BindUid, msg)
|
||||
msgByte, _ := json.Marshal(result.ErrMsg(msg))
|
||||
client.MsgChan <- msgByte
|
||||
return
|
||||
}
|
||||
|
||||
var resByte []byte
|
||||
var err error
|
||||
|
||||
switch reqMsg.Type {
|
||||
case "close":
|
||||
s.close(client)
|
||||
return
|
||||
case "telnet":
|
||||
// Telnet会话消息接收写入会话
|
||||
command := reqMsg.Data.(string)
|
||||
telnetClientSession := client.ChildConn.(*telnet.TelnetClientSession)
|
||||
_, err = telnetClientSession.Write(command)
|
||||
case "telnet_resize":
|
||||
// Telnet会话窗口重置
|
||||
msgByte, _ := json.Marshal(reqMsg.Data)
|
||||
var data struct {
|
||||
Cols int `json:"cols"`
|
||||
Rows int `json:"rows"`
|
||||
}
|
||||
err = json.Unmarshal(msgByte, &data)
|
||||
if err == nil {
|
||||
telnetClientSession := client.ChildConn.(*telnet.TelnetClientSession)
|
||||
err = telnetClientSession.WindowChange(data.Rows, data.Cols)
|
||||
_ = telnetClientSession.Read()
|
||||
}
|
||||
default:
|
||||
err = fmt.Errorf("message type %s not supported", reqMsg.Type)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
logger.Warnf("ws Shell UID %s err: %s", client.BindUid, err.Error())
|
||||
msgByte, _ := json.Marshal(result.ErrMsg(err.Error()))
|
||||
client.MsgChan <- msgByte
|
||||
if err == io.EOF {
|
||||
|
||||
@@ -5,6 +5,6 @@ type IWSSend interface {
|
||||
// ByClientID 给已知客户端发消息
|
||||
ByClientID(clientID string, data any) error
|
||||
|
||||
// ByGroupID 给订阅组的用户发送消息
|
||||
// ByGroupID 给订阅组的客户端发送消息
|
||||
ByGroupID(gid string, data any) error
|
||||
}
|
||||
|
||||
@@ -18,14 +18,16 @@ const (
|
||||
GROUP_KPI_UPF = "12_"
|
||||
// 组号-自定义KPI指标20_neType_neId
|
||||
GROUP_KPI_C = "20_"
|
||||
// 组号-IMS_CDR会话事件
|
||||
GROUP_IMS_CDR = "1005"
|
||||
// 组号-SMF_CDR会话事件
|
||||
GROUP_SMF_CDR = "1006"
|
||||
// 组号-IMS_CDR会话事件 1005_neId
|
||||
GROUP_IMS_CDR = "1005_"
|
||||
// 组号-SMF_CDR会话事件 1006_neId
|
||||
GROUP_SMF_CDR = "1006_"
|
||||
// 组号-SMSC_CDR会话事件 1007_neId
|
||||
GROUP_SMSC_CDR = "1007_"
|
||||
// 组号-AMF_UE会话事件
|
||||
GROUP_AMF_UE = "1010"
|
||||
// 组号-MME_UE会话事件
|
||||
GROUP_MME_UE = "1011"
|
||||
// 组号-MME_UE会话事件 1011_neId
|
||||
GROUP_MME_UE = "1011_"
|
||||
)
|
||||
|
||||
// 实例化服务层 WSSendImpl 结构体
|
||||
@@ -36,7 +38,7 @@ type WSSendImpl struct{}
|
||||
|
||||
// ByClientID 给已知客户端发消息
|
||||
func (s *WSSendImpl) ByClientID(clientID string, data any) error {
|
||||
v, ok := WsClients.Load(clientID)
|
||||
v, ok := wsClients.Load(clientID)
|
||||
if !ok {
|
||||
return fmt.Errorf("no fount client ID: %s", clientID)
|
||||
}
|
||||
@@ -48,43 +50,35 @@ func (s *WSSendImpl) ByClientID(clientID string, data any) error {
|
||||
|
||||
client := v.(*model.WSClient)
|
||||
if len(client.MsgChan) > 90 {
|
||||
NewWSImpl.CloseClient(client.ID)
|
||||
NewWSImpl.ClientClose(client.ID)
|
||||
return fmt.Errorf("msg chan over 90 will close client ID: %s", clientID)
|
||||
}
|
||||
client.MsgChan <- dataByte
|
||||
return nil
|
||||
}
|
||||
|
||||
// ByGroupID 给订阅组的用户发送消息
|
||||
// ByGroupID 给订阅组的客户端发送消息
|
||||
func (s *WSSendImpl) ByGroupID(groupID string, data any) error {
|
||||
uids, ok := WsGroup.Load(groupID)
|
||||
clientIds, ok := wsGroup.Load(groupID)
|
||||
if !ok {
|
||||
return fmt.Errorf("no fount Group ID: %s", groupID)
|
||||
}
|
||||
|
||||
groupUids := uids.(*[]string)
|
||||
// 群组中没有成员
|
||||
if len(*groupUids) == 0 {
|
||||
// 检查组内是否有客户端
|
||||
ids := clientIds.(*[]string)
|
||||
if len(*ids) == 0 {
|
||||
return fmt.Errorf("no members in the group")
|
||||
}
|
||||
|
||||
// 在群组中找到对应的 uid
|
||||
for _, uid := range *groupUids {
|
||||
clientIds, ok := WsUsers.Load(uid)
|
||||
if !ok {
|
||||
// 遍历给客户端发消息
|
||||
for _, clientId := range *ids {
|
||||
err := s.ByClientID(clientId, map[string]any{
|
||||
"groupId": groupID,
|
||||
"data": data,
|
||||
})
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
// 在用户中找到客户端并发送
|
||||
uidClientIds := clientIds.(*[]string)
|
||||
for _, clientId := range *uidClientIds {
|
||||
err := s.ByClientID(clientId, map[string]any{
|
||||
"groupId": groupID,
|
||||
"data": data,
|
||||
})
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
@@ -21,6 +21,10 @@ func Setup(router *gin.Engine) {
|
||||
collectlogs.OperateLog(collectlogs.OptionNew("log.operate.title.ws", collectlogs.BUSINESS_TYPE_OTHER)),
|
||||
controller.NewWSController.WS,
|
||||
)
|
||||
wsGroup.GET("/test",
|
||||
middleware.PreAuthorize(nil),
|
||||
controller.NewWSController.Test,
|
||||
)
|
||||
wsGroup.GET("/ssh",
|
||||
middleware.PreAuthorize(nil),
|
||||
collectlogs.OperateLog(collectlogs.OptionNew("log.operate.title.ws", collectlogs.BUSINESS_TYPE_OTHER)),
|
||||
@@ -31,9 +35,10 @@ func Setup(router *gin.Engine) {
|
||||
collectlogs.OperateLog(collectlogs.OptionNew("log.operate.title.ws", collectlogs.BUSINESS_TYPE_OTHER)),
|
||||
controller.NewWSController.Telnet,
|
||||
)
|
||||
wsGroup.GET("/test",
|
||||
wsGroup.GET("/view",
|
||||
middleware.PreAuthorize(nil),
|
||||
controller.NewWSController.Test,
|
||||
collectlogs.OperateLog(collectlogs.OptionNew("log.operate.title.ws", collectlogs.BUSINESS_TYPE_OTHER)),
|
||||
controller.NewWSController.ShellView,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user