1
0

marge: 合并代码

This commit is contained in:
TsMask
2024-02-07 12:31:25 +08:00
parent 6d9123314c
commit d5f7a2077e
65 changed files with 2445 additions and 99 deletions

3
.gitignore vendored
View File

@@ -23,6 +23,7 @@ restagent/log/
restagent/upload/
restagent/software/
restagent/database/
restagent/license/
restagent/restagent
sshsvc/sshsvc
@@ -36,6 +37,7 @@ captrace/log/
tools/loadmconf/loadmconf
tools/loadpconf/loadpconf
reference
vendor
# Built Visual Studio Code Extensions
@@ -43,5 +45,6 @@ vendor
*.log
*.log-*
*.bak
*.exe
__debug_bin*.exe

View File

@@ -10,6 +10,7 @@ import (
"ems.agt/lib/log"
"ems.agt/lib/services"
"ems.agt/restagent/config"
wsService "ems.agt/src/modules/ws/service"
)
var (
@@ -41,7 +42,7 @@ func PostCDREventFromNF(w http.ResponseWriter, r *http.Request) {
cdrEvent := new(CDREvent)
err = json.Unmarshal(body, &cdrEvent)
if err != nil {
if cdrEvent.NeType == "" || err != nil {
log.Error("Failed to Unmarshal cdrEvent:", err)
services.ResponseInternalServerError500ProcessError(w, err)
return
@@ -55,5 +56,12 @@ func PostCDREventFromNF(w http.ResponseWriter, r *http.Request) {
return
}
// 推送到ws订阅组
if v, ok := cdrEvent.CDR["recordType"]; ok {
if v == "MOC" || v == "MTSM" {
wsService.NewWSSendImpl.ByGroupID(wsService.GROUP_IMS_CDR, cdrEvent)
}
}
services.ResponseStatusOK204NoContent(w)
}

View File

@@ -16,6 +16,7 @@ import (
"ems.agt/lib/services"
"ems.agt/restagent/config"
tokenConst "ems.agt/src/framework/constants/token"
neService "ems.agt/src/modules/network_element/service"
"github.com/go-resty/resty/v2"
"github.com/gorilla/mux"
@@ -136,7 +137,7 @@ func PostNeInfo(w http.ResponseWriter, r *http.Request) {
services.ResponseNotFound404UriNotExist(w, r)
return
}
log.Debug("Body:", string(body))
log.Trace("Body:", string(body))
neInfo := new(dborm.NeInfo)
err = json.Unmarshal(body, neInfo)
@@ -158,6 +159,9 @@ func PostNeInfo(w http.ResponseWriter, r *http.Request) {
return
}
// 刷新缓存不存在结构体网元Id空字符串
neService.NewNeInfoImpl.RefreshByNeTypeAndNeID(neInfo.NeType, neInfo.NeId)
mapRow := make(map[string]interface{})
row := map[string]interface{}{"affectedRows": affected}
mapRow["data"] = row
@@ -205,6 +209,10 @@ func PostNeInfo(w http.ResponseWriter, r *http.Request) {
} else if affected <= 0 {
log.Infof("Not record affected to insert ne_info")
}
// 刷新缓存不存在结构体网元Id空字符串
neService.NewNeInfoImpl.RefreshByNeTypeAndNeID(neInfo.NeType, neInfo.NeId)
services.ResponseStatusOK204NoContent(w)
return
default:
@@ -261,6 +269,9 @@ func PutNeInfo(w http.ResponseWriter, r *http.Request) {
return
}
// 刷新缓存不存在结构体网元Id空字符串
neService.NewNeInfoImpl.RefreshByNeTypeAndNeID(neInfo.NeType, neInfo.NeId)
mapRow := make(map[string]interface{})
row := map[string]interface{}{"affectedRows": affected}
mapRow["data"] = row
@@ -309,6 +320,10 @@ func PutNeInfo(w http.ResponseWriter, r *http.Request) {
} else if affected <= 0 {
log.Infof("Not record affected to insert ne_info")
}
// 刷新缓存不存在结构体网元Id空字符串
neService.NewNeInfoImpl.RefreshByNeTypeAndNeID(neInfo.NeType, neInfo.NeId)
services.ResponseStatusOK204NoContent(w)
return
default:
@@ -390,6 +405,9 @@ func DeleteNeInfo(w http.ResponseWriter, r *http.Request) {
return
}
// 刷新缓存不存在结构体网元Id空字符串
neService.NewNeInfoImpl.RefreshByNeTypeAndNeID(neInfo.NeType, neInfo.NeId)
mapRow := make(map[string]interface{})
row := map[string]interface{}{"affectedRows": affected}
mapRow["data"] = row

69
features/event/event.go Normal file
View File

@@ -0,0 +1,69 @@
package event
import (
"encoding/json"
"io"
"time"
"ems.agt/lib/dborm"
"ems.agt/lib/global"
"ems.agt/lib/log"
"ems.agt/lib/services"
wsService "ems.agt/src/modules/ws/service"
"github.com/gin-gonic/gin"
)
var (
UriUEEvent = "/upload-ue/v1/:eventType"
)
type UEEvent struct {
NeType string `json:"neType" xorm:"ne_type"`
NeName string `json:"neName" xorm:"ne_name"`
RmUID string `json:"rmUID" xorm:"rm_uid"`
Timestamp int `json:"timestamp" xorm:"timestamp"`
EventType string `json:"eventType" xorm:"event_type"`
EventJson map[string]any `json:"eventJSON" xorm:"event_json"`
}
func PostUEEventFromAMF(c *gin.Context) {
log.Info("PostUEEventFromAMF processing... ")
body, err := io.ReadAll(io.LimitReader(c.Request.Body, global.RequestBodyMaxLen))
if err != nil {
log.Error("Failed to io.ReadAll: ", err)
services.ResponseNotFound404UriNotExist(c.Writer, c.Request)
return
}
//vars := mux.Vars(c.Request)
eventType, ok := c.Params.Get("eventType")
if !ok || eventType == "" {
log.Error("eventType is empty")
services.ResponseNotFound404UriNotExist(c.Writer, c.Request)
return
}
ueEvent := new(UEEvent)
err = json.Unmarshal(body, &ueEvent.EventJson)
if err != nil {
log.Error("Failed to Unmarshal ueEvent:", err)
services.ResponseInternalServerError500ProcessError(c.Writer, err)
return
}
ueEvent.NeType = "AMF"
ueEvent.Timestamp = int(time.Now().Unix())
ueEvent.EventType = eventType
log.Trace("ueEvent:", ueEvent)
affected, err := dborm.XormInsertTableOne("ue_event", ueEvent)
if err != nil && affected <= 0 {
log.Error("Failed to insert ue_event:", err)
services.ResponseInternalServerError500ProcessError(c.Writer, err)
return
}
// 推送到ws订阅组
wsService.NewWSSendImpl.ByGroupID(wsService.GROUP_AMF_UE, ueEvent)
services.ResponseStatusOK204NoContent(c.Writer)
}

View File

@@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
"math"
"net/http"
"strconv"
"time"
@@ -16,6 +17,7 @@ import (
"ems.agt/restagent/config"
"xorm.io/xorm"
wsService "ems.agt/src/modules/ws/service"
"github.com/go-resty/resty/v2"
_ "github.com/go-sql-driver/mysql"
"github.com/gorilla/mux"
@@ -47,17 +49,18 @@ type KpiReport struct {
type GoldKpi struct {
// Id int `json:"-" xorm:"pk 'id' autoincr"`
Date string `json:"date" xorm:"date"`
Index int `json:"index"`
StartTime string `json:"startTime"`
EndTime string `json:"endTime"`
NEName string `json:"neName" xorm:"ne_name"`
RmUid string `json:"rmUid" xorm:"rm_uid"`
NEType string `json:"neType" xorm:"ne_type"`
KpiId string `json:"kpiId" xorm:"kpi_id"`
Value int `json:"value"`
Error string `json:"error"`
Timestamp string `json:"timestamp"`
Date string `json:"date" xorm:"date"`
Index int `json:"index"`
Granularity int8 `json:"granularity"`
StartTime string `json:"startTime"`
EndTime string `json:"endTime"`
NEName string `json:"neName" xorm:"ne_name"`
RmUid string `json:"rmUid" xorm:"rm_uid"`
NEType string `json:"neType" xorm:"ne_type"`
KpiId string `json:"kpiId" xorm:"kpi_id"`
Value int `json:"value"`
Error string `json:"error"`
Timestamp string `json:"timestamp"`
}
var (
@@ -182,11 +185,26 @@ func PostKPIReportFromNF(w http.ResponseWriter, r *http.Request) {
goldKpi.Index, _ = strconv.Atoi(vars["index"])
goldKpi.StartTime = global.GetFmtTimeString(layout, kpiReport.Task.Period.StartTime, time.DateTime)
goldKpi.EndTime = global.GetFmtTimeString(layout, kpiReport.Task.Period.EndTime, time.DateTime)
// get time granularity from startTime and endTime
seconds, _ := global.GetSecondDuration(goldKpi.StartTime, goldKpi.EndTime)
goldKpi.Granularity = 60
if seconds != 0 && seconds <= math.MaxInt8 && seconds >= math.MinInt8 {
goldKpi.Granularity = int8(seconds)
}
goldKpi.NEName = kpiReport.Task.NE.NEName
goldKpi.RmUid = kpiReport.Task.NE.RmUID
goldKpi.NEType = kpiReport.Task.NE.NeType
goldKpi.Timestamp = global.GetFmtTimeString(layout, kpiReport.Timestamp, time.DateTime)
// 黄金指标事件对象
kpiEvent := map[string]any{
// kip_id ...
"neType": goldKpi.NEType,
"neName": goldKpi.NEName,
"startIndex": goldKpi.Index,
"timeGroup": goldKpi.StartTime,
}
for _, k := range kpiReport.Task.NE.KPIs {
kpiEvent[k.KPIID] = k.Value // kip_id
goldKpi.KpiId = k.KPIID
goldKpi.Value = k.Value
goldKpi.Error = k.Err
@@ -224,6 +242,9 @@ func PostKPIReportFromNF(w http.ResponseWriter, r *http.Request) {
}
}
// 推送到ws订阅组
wsService.NewWSSendImpl.ByGroupID(wsService.GROUP_KPI, kpiEvent)
services.ResponseStatusOK200Null(w)
}

View File

@@ -12,10 +12,10 @@ import (
"time"
"ems.agt/lib/log"
"github.com/shirou/gopsutil/cpu"
"github.com/shirou/gopsutil/disk"
"github.com/shirou/gopsutil/mem"
"github.com/shirou/gopsutil/process"
"github.com/shirou/gopsutil/v3/cpu"
"github.com/shirou/gopsutil/v3/disk"
"github.com/shirou/gopsutil/v3/mem"
"github.com/shirou/gopsutil/v3/process"
)
type SysInfo struct {

View File

@@ -11,10 +11,10 @@ import (
"syscall"
"time"
"github.com/shirou/gopsutil/cpu"
"github.com/shirou/gopsutil/disk"
"github.com/shirou/gopsutil/mem"
"github.com/shirou/gopsutil/process"
"github.com/shirou/gopsutil/v3/cpu"
"github.com/shirou/gopsutil/v3/disk"
"github.com/shirou/gopsutil/v3/mem"
"github.com/shirou/gopsutil/v3/process"
)
type SysInfo struct {

View File

@@ -302,7 +302,7 @@ func GetUEInfoFromNF(w http.ResponseWriter, r *http.Request) {
}
}
// Get UEInfo from NF/NFs
// POST User Info from NF/NFs
func PostPCFUserInfo(w http.ResponseWriter, r *http.Request) {
log.Info("PostPCFUserInfo processing... ")
@@ -374,7 +374,7 @@ func PostPCFUserInfo(w http.ResponseWriter, r *http.Request) {
}
}
// Get UEInfo from NF/NFs
// PUT PCF User Info from NF/NFs
func PutPCFUserInfo(w http.ResponseWriter, r *http.Request) {
log.Info("PutPCFUserInfo processing... ")
@@ -575,7 +575,7 @@ func GetUENumFromNF(w http.ResponseWriter, r *http.Request) {
}
}
// Get UEInfo from NF/NFs
// Get Radio Info from NF/NFs
func GetNBInfoFromNF(w http.ResponseWriter, r *http.Request) {
log.Info("GetNBInfoFromNF processing... ")
@@ -639,6 +639,83 @@ func GetNBInfoFromNF(w http.ResponseWriter, r *http.Request) {
}
}
// Get Radio Info from NF/NFs
func GetNBInfoAllFromNF(w http.ResponseWriter, r *http.Request) {
log.Info("GetNBInfoAllFromNF processing... ")
vars := mux.Vars(r)
neType := vars["elementTypeValue"]
if neType == "" {
services.ResponseNotFound404UriNotExist(w, r)
return
}
//neTypeLower := strings.ToLower(neType)
// var neID string
neIDs := services.GetParamsArrByName("neId", r)
// if len(neIDs) == 1 {
// neID = neIDs[0]
// } else {
// services.ResponseNotFound404UriNotExist(w, r)
// return
// }
// token, err := services.CheckFrontValidRequest(w, r)
// if err != nil {
// log.Error("Request error:", err)
// return
// }
// log.Debug("token:", token)
//var ret error
var statusCode int = 500
var dataResponse []services.MapResponse
var neInfos []dborm.NeInfo
dborm.XormGetNeInfo2(neType, neIDs, &neInfos)
for _, neInfo := range neInfos {
// neInfo, err := dborm.XormGetNeInfo(neType, neID)
// if err != nil {
// log.Error("Failed to XormGetNeInfo:", err)
// services.ResponseInternalServerError500ProcessError(w, err)
// return
// } else if neInfo == nil {
// err := global.ErrCMNotFoundTargetNE
// log.Error(global.ErrCMNotFoundTargetNE)
// services.ResponseInternalServerError500ProcessError(w, err)
// return
// }
hostUri := fmt.Sprintf("http://%s:%v", neInfo.Ip, neInfo.Port)
requestURI2NF := fmt.Sprintf("%s%s", hostUri, r.RequestURI)
log.Debug("requestURI2NF:", requestURI2NF)
resp, err := client.R().
EnableTrace().
SetHeaders(map[string]string{tokenConst.HEADER_KEY: r.Header.Get(tokenConst.HEADER_KEY)}).
//SetHeaders(map[string]string{"accessToken": token}).
SetHeaders(map[string]string{"User-Agent": config.GetDefaultUserAgent()}).
SetHeaders(map[string]string{"Content-Type": "application/json;charset=UTF-8"}).
Get(requestURI2NF)
if err != nil {
log.Error("Failed to GET:", err)
continue
// services.ResponseInternalServerError500ProcessError(w, err)
// return
} else {
switch resp.StatusCode() {
case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted:
var response services.MapResponse
_ = json.Unmarshal(resp.Body(), &response)
dataResponse = append(dataResponse, response)
statusCode = http.StatusOK
}
}
}
var response services.DataResponse
response.Data = dataResponse
services.ResponseWithJson(w, statusCode, response)
}
// Get GetUEInfoFileExportNF from NF/NFs
func GetUEInfoFileExportNF(w http.ResponseWriter, r *http.Request) {
log.Info("GetUEInfoFromNF processing... ")

View File

@@ -298,7 +298,40 @@ func XormGetNeInfoByNeType(neType string, nes *[]NeInfo) error {
}
*nes = append(*nes, *ne)
}
log.Debug("nes:", nes)
return nil
}
func XormGetNeInfo2(neType string, neIDs []string, nes *[]NeInfo) error {
log.Info("XormGetNeInfo2 processing... ")
ne := new(NeInfo)
var rows *xorm.Rows
var err error
if len(neIDs) == 0 {
rows, err = xEngine.Table("ne_info").
Where("status in ('0','3') and ne_type=?", neType).
Rows(ne)
} else {
rows, err = xEngine.Table("ne_info").
In("ne_id", neIDs).
And("status in ('0','3') and ne_type=?", neType).
Rows(ne)
}
if err != nil {
log.Error("Failed to get table ne_info from database:", err)
return err
}
defer rows.Close()
for rows.Next() {
err := rows.Scan(ne)
if err != nil {
log.Error("Failed to get table ne_info from database:", err)
return err
}
*nes = append(*nes, *ne)
}
return nil
}
@@ -1350,7 +1383,7 @@ func XormGetTableRows(tableName string, where string, tbInfo *[]interface{}) (*[
}
*tbInfo = append(*tbInfo, row)
}
log.Debug("tbInfo:", tbInfo)
log.Trace("tbInfo:", tbInfo)
return tbInfo, nil
}
@@ -1396,7 +1429,7 @@ func XormGetDataBySQL(sql string) (*[]map[string]string, error) {
rows := make([]map[string]string, 0)
rows, err := DbClient.XEngine.QueryString(sql)
if err != nil {
log.Errorf("Failed to QueryString:", err)
log.Error("Failed to QueryString:", err)
return nil, err
}
@@ -1410,7 +1443,7 @@ func XormDeleteDataByWhere(where, table string) (int64, error) {
defer xSession.Close()
affected, err := xSession.Table(table).Where(where).Delete()
if err != nil {
log.Errorf("Failed to Delete:", err)
log.Error("Failed to Delete:", err)
return 0, err
}
xSession.Commit()

View File

@@ -391,6 +391,27 @@ func GetSecondsSinceDatetime(datetimeStr string) (int64, error) {
return seconds, nil
}
func GetSecondDuration(time1, time2 string) (int64, error) {
loc1, _ := time.LoadLocation("Local")
// 解析日期时间字符串为时间对象
t1, err := time.ParseInLocation(time.DateTime, time1, loc1)
if err != nil {
return 0, err
}
t2, err := time.ParseInLocation(time.DateTime, time2, loc1)
if err != nil {
return 0, err
}
// 计算时间差
duration := t2.Sub(t1)
// 获取时间差的秒数
seconds := int64(duration.Seconds())
return seconds, nil
}
// 0: invalid ip
// 4: IPv4
// 6: IPv6

View File

@@ -303,6 +303,9 @@ func init() {
Register("POST", cdr.UriCDREvent, cdr.PostCDREventFromNF, nil)
Register("POST", cdr.CustomUriCDREvent, cdr.PostCDREventFromNF, nil)
// UE event
//Register("POST", event.UriUEEvent, event.PostUEEventFromAMF, nil)
// 进程网络
Register("GET", psnet.UriWs, psnet.ProcessWs, nil)
Register("POST", psnet.UriStop, psnet.StopProcess, nil)

View File

@@ -26,7 +26,7 @@ rest:
keyFile: ./etc/certs/omc-server.key
webServer:
enabled: true
enabled: false
rootDir: d:/local.git/fe.ems.vue3/dist
listen:
- addr: :80
@@ -42,7 +42,7 @@ database:
type: mysql
user: root
# password: 1000omc@kp!
# host: 127.0.0.1
# host: "192.168.8.103"
# port: 33066
name: omc_db
backup: d:/local.git/ems.agt/restagent/database

View File

@@ -1,7 +1,7 @@
# Makefile for rest agent project
PROJECT = OMC
VERSION = 2.2401.3
VERSION = 2.2402.5
PLATFORM = amd64
ARMPLATFORM = aarch64
BUILDDIR = ../../build

View File

@@ -10,6 +10,7 @@ import (
"strings"
"ems.agt/features/dbrest"
"ems.agt/features/event"
"ems.agt/features/fm"
"ems.agt/features/lm"
"ems.agt/features/pm"
@@ -22,6 +23,8 @@ import (
"ems.agt/src/framework/middleware"
libSession "ems.agt/src/lib_features/session"
"github.com/gin-gonic/gin"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
)
// const defaultConfigFile = "./etc/restconf.yaml"
@@ -67,23 +70,29 @@ import (
// }
func HttpListen(addr string, router http.Handler) {
err := http.ListenAndServe(addr, router)
// 创建HTTP服务器
h2s := &http2.Server{
// ...
}
server := &http.Server{
Addr: addr,
Handler: h2c.NewHandler(router, h2s),
}
// // support http 2.0 server
// err := http2.ConfigureServer(server, &http2.Server{})
// if err != nil {
// fmt.Println("ConfigureServer err:", err)
// os.Exit(11)
// }
err := server.ListenAndServe()
if err != nil {
fmt.Println("ListenAndServe err:", err)
os.Exit(5)
os.Exit(12)
}
}
func HttpListenTLS(addr, caFile, certFile, keyFile string, clientAuthType int, router http.Handler) {
HttpListenConfigTLS(addr, caFile, certFile, keyFile, clientAuthType, router)
err := http.ListenAndServeTLS(addr, certFile, keyFile, router)
if err != nil {
fmt.Println("ListenAndServeTLS err:", err)
os.Exit(6)
}
}
func HttpListenConfigTLS(addr, caFile, certFile, keyFile string, clientAuthType int, router http.Handler) {
// 加载根证书
caCert, err := os.ReadFile(caFile)
if err != nil {
@@ -107,27 +116,71 @@ func HttpListenConfigTLS(addr, caFile, certFile, keyFile string, clientAuthType
TLSConfig: tlsConfig,
}
// support http 2.0 server
http2.ConfigureServer(server, &http2.Server{})
if err != nil {
fmt.Println("ConfigureServer err:", err)
os.Exit(13)
}
err = server.ListenAndServeTLS(certFile, keyFile)
if err != nil {
fmt.Println("ListenAndServeTLS err:", err)
os.Exit(6)
os.Exit(14)
}
}
func HttpListenWebServerTLS(addr, caFile, certFile, keyFile string, clientAuthType int) {
HttpListenConfigTLS(addr, caFile, certFile, keyFile, clientAuthType, nil)
err := http.ListenAndServeTLS(addr, certFile, keyFile, nil)
// 加载根证书
caCert, err := os.ReadFile(caFile)
if err != nil {
log.Fatal(err)
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
// 创建自定义的TLS配置
tlsConfig := &tls.Config{
MinVersion: tls.VersionTLS10,
MaxVersion: tls.VersionTLS13,
ClientCAs: caCertPool,
ClientAuth: tls.ClientAuthType(clientAuthType),
}
// 创建HTTP服务器
server := &http.Server{
Addr: addr,
TLSConfig: tlsConfig,
}
// support http 2.0 server
http2.ConfigureServer(server, &http2.Server{})
if err != nil {
fmt.Println("ConfigureServer err:", err)
os.Exit(9)
}
err = server.ListenAndServeTLS(certFile, keyFile)
if err != nil {
fmt.Println("ListenAndServeTLS err:", err)
os.Exit(7)
os.Exit(10)
}
}
func HttpListenWebServer(addr string) {
err := http.ListenAndServe(addr, nil)
// 创建HTTP服务器
server := &http.Server{
Addr: addr,
}
// support http 2.0 server
err := http2.ConfigureServer(server, &http2.Server{})
if err != nil {
fmt.Println("ConfigureServer err:", err)
os.Exit(7)
}
err = server.ListenAndServe()
if err != nil {
fmt.Println("ListenAndServe err:", err)
os.Exit(7)
os.Exit(8)
}
}
@@ -164,7 +217,7 @@ func main() {
err = dbrest.InitDbClient(conf.Database.Type, conf.Database.User, conf.Database.Password,
conf.Database.Host, conf.Database.Port, conf.Database.Name)
if err != nil {
fmt.Println("rests.initDbClient err:", err)
fmt.Println("dbrest.initDbClient err:", err)
os.Exit(4)
}
err = lm.InitDbClient(conf.Database.Type, conf.Database.User, conf.Database.Password,
@@ -185,6 +238,8 @@ func main() {
uriGroup := app.Group(config.UriPrefix)
uriGroup.Use(libSession.SessionHeader())
uriGroup.Any("/*any", gin.WrapH(routes.NewRouter()))
// AMF上报的UE事件, 无前缀,暂时特殊处理
app.POST(event.UriUEEvent, event.PostUEEventFromAMF)
// 开启监控采集
// monitor.StartMonitor(false, "")

View File

@@ -12,9 +12,11 @@ import (
"ems.agt/src/modules/common"
"ems.agt/src/modules/crontask"
"ems.agt/src/modules/monitor"
networkdata "ems.agt/src/modules/network_data"
networkelement "ems.agt/src/modules/network_element"
"ems.agt/src/modules/system"
"ems.agt/src/modules/trace"
"ems.agt/src/modules/ws"
"github.com/gin-gonic/gin"
)
@@ -117,12 +119,16 @@ func initModulesRoute(app *gin.Engine) {
common.Setup(app)
// 系统模块
system.Setup(app)
// 网元模块
// 网元功能模块
networkelement.Setup(app)
// 网元数据模块
networkdata.Setup(app)
// 跟踪模块
trace.Setup(app)
// 图表模块
chart.Setup(app)
// ws 模块
ws.Setup(app)
// 调度任务模块--暂无接口
crontask.Setup(app)
// 监控模块 - 含调度处理加入队列,放最后

View File

@@ -22,3 +22,9 @@ const RATE_LIMIT_KEY = "rate_limit:"
// 登录账户密码错误次数
const PWD_ERR_CNT_KEY = "pwd_err_cnt:"
// 网元信息管理
const NE_KEY = "ne_info:"
// 网元数据管理
const NE_DATA_KEY = "ne_data:"

View File

@@ -121,7 +121,7 @@ func OperateLog(options Options) gin.HandlerFunc {
BusinessType: options.BusinessType,
OperatorType: options.OperatorType,
Method: funcName,
OperURL: c.Request.RequestURI,
OperURL: c.Request.URL.Path,
RequestMethod: c.Request.Method,
OperIP: ipaddr,
OperLocation: location,

View File

@@ -14,7 +14,14 @@ import (
)
/**无Token可访问白名单 */
var URL_WHITE_LIST = []string{"/performanceManagement", "/faultManagement", "/systemState", "/omcNeConfig"}
var URL_WHITE_LIST = []string{
"/performanceManagement",
"/faultManagement",
"/systemState",
"/omcNeConfig",
"/cdrEvent",
"/upload-ue",
}
// PreAuthorize 用户身份授权认证校验
//

View File

@@ -65,6 +65,11 @@ func IPAddrLocation(c *gin.Context) (string, string) {
// Authorization 解析请求头
func Authorization(c *gin.Context) string {
// Query请求查询
if authQuery, ok := c.GetQuery(token.RESPONSE_FIELD); ok && authQuery != "" {
return authQuery
}
// Header请求头
authHeader := c.GetHeader(token.HEADER_KEY)
if authHeader == "" {
return ""
@@ -99,11 +104,22 @@ func UaOsBrowser(c *gin.Context) (string, string) {
// AcceptLanguage 解析客户端接收语言 zh中文 en: 英文
func AcceptLanguage(c *gin.Context) string {
preferredLanguage := language.English
acceptLanguage := c.GetHeader("Accept-Language")
tags, _, _ := language.ParseAcceptLanguage(acceptLanguage)
if len(tags) > 0 {
preferredLanguage = tags[0]
// Query请求查询
if v, ok := c.GetQuery("language"); ok && v != "" {
tags, _, _ := language.ParseAcceptLanguage(v)
if len(tags) > 0 {
preferredLanguage = tags[0]
}
}
// Header请求头
if v := c.GetHeader("Accept-Language"); v != "" {
tags, _, _ := language.ParseAcceptLanguage(v)
if len(tags) > 0 {
preferredLanguage = tags[0]
}
}
// 只取前缀
lang := preferredLanguage.String()
arr := strings.Split(lang, "-")

View File

@@ -117,7 +117,7 @@ func (s *BarProcessor) Execute(data any) (any, error) {
state := new(SystemState)
_ = json.Unmarshal(response.Body(), &state)
var dateStr *string = nil
if state.ExpiryDate != "" {
if state.ExpiryDate != "" && state.ExpiryDate != "-" {
dateStr = &state.ExpiryDate
}
neState := new(dborm.NeState)

View File

@@ -34,14 +34,17 @@ func (s *SysCacheController) Info(c *gin.Context) {
//
// GET /getNames
func (s *SysCacheController) Names(c *gin.Context) {
language := ctx.AcceptLanguage(c)
caches := []model.SysCache{
model.NewSysCacheNames("user", cachekey.LOGIN_TOKEN_KEY),
model.NewSysCacheNames("configuration", cachekey.SYS_CONFIG_KEY),
model.NewSysCacheNames("dictionary", cachekey.SYS_DICT_KEY),
model.NewSysCacheNames("captcha", cachekey.CAPTCHA_CODE_KEY),
model.NewSysCacheNames("anti-submission", cachekey.REPEAT_SUBMIT_KEY),
model.NewSysCacheNames("current-limiting", cachekey.RATE_LIMIT_KEY),
model.NewSysCacheNames("password-errors-number", cachekey.PWD_ERR_CNT_KEY),
model.NewSysCacheNames(i18n.TKey(language, "cache.name.user"), cachekey.LOGIN_TOKEN_KEY),
model.NewSysCacheNames(i18n.TKey(language, "cache.name.sys_config"), cachekey.SYS_CONFIG_KEY),
model.NewSysCacheNames(i18n.TKey(language, "cache.name.sys_dict"), cachekey.SYS_DICT_KEY),
model.NewSysCacheNames(i18n.TKey(language, "cache.name.captcha_codes"), cachekey.CAPTCHA_CODE_KEY),
model.NewSysCacheNames(i18n.TKey(language, "cache.name.repeat_submit"), cachekey.REPEAT_SUBMIT_KEY),
model.NewSysCacheNames(i18n.TKey(language, "cache.name.rate_limit"), cachekey.RATE_LIMIT_KEY),
model.NewSysCacheNames(i18n.TKey(language, "cache.name.pwd_err_cnt"), cachekey.PWD_ERR_CNT_KEY),
model.NewSysCacheNames(i18n.TKey(language, "cache.name.ne_info"), cachekey.NE_KEY),
model.NewSysCacheNames(i18n.TKey(language, "cache.name.ne_data"), cachekey.NE_DATA_KEY),
}
c.JSON(200, result.OkData(caches))
}
@@ -136,13 +139,17 @@ func (s *SysCacheController) ClearCacheKey(c *gin.Context) {
//
// DELETE /clearCacheSafe
func (s *SysCacheController) ClearCacheSafe(c *gin.Context) {
language := ctx.AcceptLanguage(c)
caches := []model.SysCache{
model.NewSysCacheNames("configuration", cachekey.SYS_CONFIG_KEY),
model.NewSysCacheNames("dictionary", cachekey.SYS_DICT_KEY),
model.NewSysCacheNames("captcha", cachekey.CAPTCHA_CODE_KEY),
model.NewSysCacheNames("anti-submission", cachekey.REPEAT_SUBMIT_KEY),
model.NewSysCacheNames("current-limiting", cachekey.RATE_LIMIT_KEY),
model.NewSysCacheNames("password-errors-number", cachekey.PWD_ERR_CNT_KEY),
model.NewSysCacheNames(i18n.TKey(language, "cache.name.user"), cachekey.LOGIN_TOKEN_KEY),
model.NewSysCacheNames(i18n.TKey(language, "cache.name.sys_config"), cachekey.SYS_CONFIG_KEY),
model.NewSysCacheNames(i18n.TKey(language, "cache.name.sys_dict"), cachekey.SYS_DICT_KEY),
model.NewSysCacheNames(i18n.TKey(language, "cache.name.captcha_codes"), cachekey.CAPTCHA_CODE_KEY),
model.NewSysCacheNames(i18n.TKey(language, "cache.name.repeat_submit"), cachekey.REPEAT_SUBMIT_KEY),
model.NewSysCacheNames(i18n.TKey(language, "cache.name.rate_limit"), cachekey.RATE_LIMIT_KEY),
model.NewSysCacheNames(i18n.TKey(language, "cache.name.pwd_err_cnt"), cachekey.PWD_ERR_CNT_KEY),
model.NewSysCacheNames(i18n.TKey(language, "cache.name.ne_info"), cachekey.NE_KEY),
model.NewSysCacheNames(i18n.TKey(language, "cache.name.ne_data"), cachekey.NE_DATA_KEY),
}
for _, v := range caches {
cacheKeys, err := redis.GetKeys("", v.CacheName+":*")

View File

@@ -0,0 +1,51 @@
package controller
import (
"ems.agt/src/framework/i18n"
"ems.agt/src/framework/utils/ctx"
"ems.agt/src/framework/vo/result"
"ems.agt/src/modules/network_data/model"
neDataService "ems.agt/src/modules/network_data/service"
neService "ems.agt/src/modules/network_element/service"
"github.com/gin-gonic/gin"
)
// 实例化控制层 AMFController 结构体
var NewAMFController = &AMFController{
neInfoService: neService.NewNeInfoImpl,
ueEventService: neDataService.NewUEEventImpl,
}
// 网元AMF
//
// PATH /amf
type AMFController struct {
// 网元信息服务
neInfoService neService.INeInfo
// CDR会话事件服务
ueEventService neDataService.IUEEvent
}
// UE会话列表
//
// GET /ues
func (s *AMFController) UEs(c *gin.Context) {
language := ctx.AcceptLanguage(c)
var querys model.UEEventQuery
if err := c.ShouldBindQuery(&querys); err != nil {
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
return
}
// 查询网元获取IP
neInfo := s.neInfoService.SelectNeInfoByNeTypeAndNeID(querys.NeType, querys.NeID)
if neInfo.NeId != querys.NeID || neInfo.IP == "" {
c.JSON(200, result.ErrMsg(i18n.TKey(language, "app.common.noNEInfo")))
return
}
querys.RmUID = neInfo.RmUID
// 查询数据
data := s.ueEventService.SelectPage(querys)
c.JSON(200, result.Ok(data))
}

View File

@@ -0,0 +1,51 @@
package controller
import (
"ems.agt/src/framework/i18n"
"ems.agt/src/framework/utils/ctx"
"ems.agt/src/framework/vo/result"
"ems.agt/src/modules/network_data/model"
neDataService "ems.agt/src/modules/network_data/service"
neService "ems.agt/src/modules/network_element/service"
"github.com/gin-gonic/gin"
)
// 实例化控制层 IMSController 结构体
var NewIMSController = &IMSController{
neInfoService: neService.NewNeInfoImpl,
cdrEventService: neDataService.NewCDREventImpl,
}
// 网元IMS
//
// PATH /ims
type IMSController struct {
// 网元信息服务
neInfoService neService.INeInfo
// CDR会话事件服务
cdrEventService neDataService.ICDREvent
}
// CDR会话列表
//
// GET /cdrs
func (s *IMSController) CDRs(c *gin.Context) {
language := ctx.AcceptLanguage(c)
var querys model.CDREventQuery
if err := c.ShouldBindQuery(&querys); err != nil {
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
return
}
// 查询网元获取IP
neInfo := s.neInfoService.SelectNeInfoByNeTypeAndNeID(querys.NeType, querys.NeID)
if neInfo.NeId != querys.NeID || neInfo.IP == "" {
c.JSON(200, result.ErrMsg(i18n.TKey(language, "app.common.noNEInfo")))
return
}
querys.RmUID = neInfo.RmUID
// 查询数据
data := s.cdrEventService.SelectPage(querys)
c.JSON(200, result.Ok(data))
}

View File

@@ -0,0 +1,84 @@
package controller
import (
"ems.agt/lib/core/utils/date"
"ems.agt/src/framework/i18n"
"ems.agt/src/framework/utils/ctx"
"ems.agt/src/framework/vo/result"
"ems.agt/src/modules/network_data/model"
neDataService "ems.agt/src/modules/network_data/service"
neService "ems.agt/src/modules/network_element/service"
"github.com/gin-gonic/gin"
)
// 实例化控制层 PerfKPIController 结构体
var NewPerfKPIController = &PerfKPIController{
neInfoService: neService.NewNeInfoImpl,
perfKPIService: neDataService.NewPerfKPIImpl,
}
// 性能统计
//
// PATH /kpi
type PerfKPIController struct {
// 网元信息服务
neInfoService neService.INeInfo
// 统计信息服务
perfKPIService neDataService.IPerfKPI
}
// 获取统计数据
//
// GET /data
func (s *PerfKPIController) GoldKPI(c *gin.Context) {
language := ctx.AcceptLanguage(c)
var querys model.GoldKPIQuery
if err := c.ShouldBindQuery(&querys); err != nil {
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
return
}
// 时间格式校验
startTime := date.ParseStrToDate(querys.StartTime, date.YYYY_MM_DD_HH_MM_SS)
if startTime.IsZero() {
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
return
}
endTime := date.ParseStrToDate(querys.EndTime, date.YYYY_MM_DD_HH_MM_SS)
if endTime.IsZero() {
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
return
}
if querys.Interval < 5 || querys.Interval > 3600 {
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
return
}
// 查询网元获取IP
neInfo := s.neInfoService.SelectNeInfoByNeTypeAndNeID(querys.NeType, querys.NeID)
if neInfo.NeId != querys.NeID || neInfo.IP == "" {
c.JSON(200, result.ErrMsg(i18n.TKey(language, "app.common.noNEInfo")))
return
}
querys.RmUID = neInfo.RmUID
// 查询数据
kpiData := s.perfKPIService.SelectGoldKPI(querys)
c.JSON(200, result.OkData(kpiData))
}
// 获取统计标题
//
// GET /title
func (s *PerfKPIController) Title(c *gin.Context) {
language := ctx.AcceptLanguage(c)
neType := c.Query("neType")
if neType == "" {
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
return
}
kpiTitles := s.perfKPIService.SelectGoldKPITitle(neType)
c.JSON(200, result.OkData(kpiTitles))
}

View File

@@ -0,0 +1,54 @@
package controller
import (
"ems.agt/src/framework/i18n"
"ems.agt/src/framework/utils/ctx"
"ems.agt/src/framework/vo/result"
neDataService "ems.agt/src/modules/network_data/service"
neService "ems.agt/src/modules/network_element/service"
"github.com/gin-gonic/gin"
)
// 实例化控制层 UPFController 结构体
var NewUPFController = &UPFController{
neInfoService: neService.NewNeInfoImpl,
perfKPIService: neDataService.NewPerfKPIImpl,
}
// 网元UPF
//
// PATH /upf
type UPFController struct {
// 网元信息服务
neInfoService neService.INeInfo
// 统计信息服务
perfKPIService neDataService.IPerfKPI
}
// 总流量数 N3上行 N6下行
// 单位 比特(bit)
//
// GET /totalFlow
func (s *UPFController) TotalFlow(c *gin.Context) {
language := ctx.AcceptLanguage(c)
var querys struct {
NeType string `json:"neType" form:"neType" binding:"required"`
NeID string `form:"neId" binding:"required"`
Day int `form:"day" binding:"required"`
}
if err := c.ShouldBindQuery(&querys); querys.Day < 0 || err != nil {
c.JSON(400, result.CodeMsg(400, i18n.TKey(language, "app.common.err400")))
return
}
// 查询网元获取IP
neInfo := s.neInfoService.SelectNeInfoByNeTypeAndNeID(querys.NeType, querys.NeID)
if neInfo.NeId != querys.NeID || neInfo.IP == "" {
c.JSON(200, result.ErrMsg(i18n.TKey(language, "app.common.noNEInfo")))
return
}
data := s.perfKPIService.SelectUPFTotalFlow(neInfo.NeType, neInfo.RmUID, querys.Day)
c.JSON(200, result.OkData(data))
}

View File

@@ -0,0 +1,28 @@
package model
import "time"
// CDREvent CDR会话对象 cdr_event
type CDREvent struct {
ID string `json:"id" gorm:"column:id;primaryKey;autoIncrement"`
NeType string `json:"neType" gorm:"column:ne_type"`
NeName string `json:"neName" gorm:"column:ne_name"`
RmUID string `json:"rmUID" gorm:"column:rm_uid"`
Timestamp int64 `json:"timestamp" gorm:"column:timestamp"`
CDRJSONStr string `json:"cdrJSON" gorm:"column:cdr_json"`
CreatedAt time.Time `json:"createdAt" gorm:"column:created_at;default:CURRENT_TIMESTAMP"`
}
// CDREventQuery CDR会话对象查询参数结构体
type CDREventQuery struct {
NeType string `json:"neType" form:"neType" binding:"required"`
NeID string `json:"neId" form:"neId" binding:"required"`
RmUID string `json:"rmUID" form:"rmUID"`
RecordType string `json:"recordType" form:"recordType"`
StartTime string `json:"startTime" form:"startTime"`
EndTime string `json:"endTime" form:"endTime"`
SortField string `json:"sortField" form:"sortField" binding:"omitempty,oneof=timestamp"`
SortOrder string `json:"sortOrder" form:"sortOrder" binding:"omitempty,oneof=asc desc"`
PageNum int64 `json:"pageNum" form:"pageNum" binding:"required"`
PageSize int64 `json:"pageSize" form:"pageSize" binding:"required"`
}

View File

@@ -0,0 +1,23 @@
package model
// GoldKPITitle 黄金指标标题信息对象 kpi_title
type GoldKPITitle struct {
ID string `json:"id" gorm:"column:id;primaryKey;autoIncrement"`
NeType string `json:"neType" gorm:"column:ne_type"`
KPIID string `json:"kpiId" gorm:"column:kpi_id"`
TitleJson string `json:"titleJson" gorm:"column:title_json"`
CnTitle string `json:"cnTitle" gorm:"column:cn_title"`
EnTitle string `json:"enTitle" gorm:"column:en_title"`
}
// GoldKPIQuery 黄金指标查询参数结构体
type GoldKPIQuery struct {
NeType string `form:"neType" binding:"required"`
NeID string `form:"neId" binding:"required"`
StartTime string `form:"startTime" binding:"required"`
EndTime string `form:"endTime" binding:"required"`
Interval int64 `form:"interval" binding:"required"`
RmUID string `form:"rmUID"`
SortField string `form:"sortField" binding:"omitempty,oneof=timeGroup"`
SortOrder string `form:"sortOrder" binding:"omitempty,oneof=asc desc"`
}

View File

@@ -0,0 +1,28 @@
package model
import "time"
// UEEvent UE会话对象 ue_event
type UEEvent struct {
ID string `json:"id" gorm:"column:id;primaryKey;autoIncrement"`
NeType string `json:"neType" gorm:"column:ne_type"`
NeName string `json:"neName" gorm:"column:ne_name"`
RmUID string `json:"rmUID" gorm:"column:rm_uid"`
Timestamp int64 `json:"timestamp" gorm:"column:timestamp"`
EventType string `json:"eventType" gorm:"column:event_type"`
EventJSONStr string `json:"eventJSON" gorm:"column:event_json"`
CreatedAt time.Time `json:"createdAt" gorm:"column:created_at;default:CURRENT_TIMESTAMP"`
}
// UEEventQuery UE会话对象查询参数结构体
type UEEventQuery struct {
NeType string `json:"neType" form:"neType" binding:"required"`
NeID string `json:"neId" form:"neId" binding:"required"`
RmUID string `json:"rmUID" form:"rmUID"`
StartTime string `json:"startTime" form:"startTime"`
EndTime string `json:"endTime" form:"endTime"`
SortField string `json:"sortField" form:"sortField" binding:"omitempty,oneof=timestamp"`
SortOrder string `json:"sortOrder" form:"sortOrder" binding:"omitempty,oneof=asc desc"`
PageNum int64 `json:"pageNum" form:"pageNum" binding:"required"`
PageSize int64 `json:"pageSize" form:"pageSize" binding:"required"`
}

View File

@@ -0,0 +1,58 @@
package networkdata
import (
"ems.agt/src/framework/logger"
"ems.agt/src/framework/middleware"
"ems.agt/src/modules/network_data/controller"
"github.com/gin-gonic/gin"
)
// 模块路由注册
func Setup(router *gin.Engine) {
logger.Infof("开始加载 ====> network_data 模块路由")
neDataGroup := router.Group("/neData")
// 性能统计信息
kpiGroup := neDataGroup.Group("/kpi")
{
kpiGroup.GET("/title",
middleware.PreAuthorize(nil),
controller.NewPerfKPIController.Title,
)
kpiGroup.GET("/data",
middleware.PreAuthorize(nil),
controller.NewPerfKPIController.GoldKPI,
)
}
// 网元IMS
imsGroup := neDataGroup.Group("/ims")
{
// CDR会话事件信息
imsGroup.GET("/cdrs",
middleware.PreAuthorize(nil),
controller.NewIMSController.CDRs,
)
}
// 网元AMF
amfGroup := neDataGroup.Group("/amf")
{
amfGroup.GET("/ues",
middleware.PreAuthorize(nil),
controller.NewAMFController.UEs,
)
}
// 网元UPF
upfGroup := neDataGroup.Group("/upf")
{
upfGroup.GET("/totalFlow",
middleware.PreAuthorize(nil),
controller.NewUPFController.TotalFlow,
)
}
}

View File

@@ -0,0 +1,9 @@
package repository
import "ems.agt/src/modules/network_data/model"
// CDR会话事件 数据层接口
type ICDREvent interface {
// SelectPage 根据条件分页查询
SelectPage(querys model.CDREventQuery) map[string]any
}

View File

@@ -0,0 +1,136 @@
package repository
import (
"fmt"
"strings"
"ems.agt/src/framework/datasource"
"ems.agt/src/framework/logger"
"ems.agt/src/framework/utils/date"
"ems.agt/src/framework/utils/parse"
"ems.agt/src/framework/utils/repo"
"ems.agt/src/modules/network_data/model"
)
// 实例化数据层 CDREventImpl 结构体
var NewCDREventImpl = &CDREventImpl{
selectSql: `select id, ne_type, ne_name, rm_uid, timestamp, cdr_json, created_at from cdr_event`,
resultMap: map[string]string{
"id": "ID",
"ne_type": "NeType",
"ne_name": "NeName",
"rm_uid": "RmUID",
"timestamp": "Timestamp",
"cdr_json": "CDRJSONStr",
"created_at": "CreatedAt",
},
}
// CDREventImpl CDR会话事件 数据层处理
type CDREventImpl struct {
// 查询视图对象SQL
selectSql string
// 结果字段与实体映射
resultMap map[string]string
}
// convertResultRows 将结果记录转实体结果组
func (r *CDREventImpl) convertResultRows(rows []map[string]any) []model.CDREvent {
arr := make([]model.CDREvent, 0)
for _, row := range rows {
item := model.CDREvent{}
for key, value := range row {
if keyMapper, ok := r.resultMap[key]; ok {
repo.SetFieldValue(&item, keyMapper, value)
}
}
arr = append(arr, item)
}
return arr
}
// SelectPage 根据条件分页查询
func (r *CDREventImpl) SelectPage(querys model.CDREventQuery) map[string]any {
// 查询条件拼接
var conditions []string
var params []any
if querys.NeType != "" {
conditions = append(conditions, "ne_type = ?")
params = append(params, querys.NeType)
}
if querys.RmUID != "" {
conditions = append(conditions, "rm_uid = ?")
params = append(params, querys.RmUID)
}
if querys.StartTime != "" {
conditions = append(conditions, "timestamp >= ?")
beginDate := date.ParseStrToDate(querys.StartTime, date.YYYY_MM_DD_HH_MM_SS)
params = append(params, beginDate.Unix())
}
if querys.EndTime != "" {
conditions = append(conditions, "timestamp <= ?")
endDate := date.ParseStrToDate(querys.EndTime, date.YYYY_MM_DD_HH_MM_SS)
params = append(params, endDate.Unix())
}
if querys.RecordType != "" {
conditions = append(conditions, "JSON_EXTRACT(cdr_json, '$.recordType') = ?")
params = append(params, querys.RecordType)
}
// 构建查询条件语句
whereSql := ""
if len(conditions) > 0 {
whereSql += " where " + strings.Join(conditions, " and ")
}
result := map[string]any{
"total": 0,
"rows": []model.CDREvent{},
}
// 查询数量 长度为0直接返回
totalSql := "select count(1) as 'total' from cdr_event"
totalRows, err := datasource.RawDB("", totalSql+whereSql, params)
if err != nil {
logger.Errorf("total err => %v", err)
return result
}
total := parse.Number(totalRows[0]["total"])
if total == 0 {
return result
} else {
result["total"] = total
}
// 分页
pageNum, pageSize := repo.PageNumSize(querys.PageNum, querys.PageSize)
pageSql := " limit ?,? "
params = append(params, pageNum*pageSize)
params = append(params, pageSize)
// 排序
orderSql := ""
if querys.SortField != "" {
sortSql := querys.SortField
if querys.SortOrder != "" {
if querys.SortOrder == "desc" {
sortSql += " desc "
} else {
sortSql += " asc "
}
}
orderSql = fmt.Sprintf(" order by %s ", sortSql)
}
// 查询数据
querySql := r.selectSql + whereSql + orderSql + pageSql
results, err := datasource.RawDB("", querySql, params)
if err != nil {
logger.Errorf("query err => %v", err)
}
// 转换实体
result["rows"] = r.convertResultRows(results)
return result
}

View File

@@ -0,0 +1,15 @@
package repository
import "ems.agt/src/modules/network_data/model"
// 性能统计 数据层接口
type IPerfKPI interface {
// SelectGoldKPI 通过网元指标数据信息
SelectGoldKPI(query model.GoldKPIQuery, kpiIds []string) []map[string]any
// SelectGoldKPITitle 网元对应的指标名称
SelectGoldKPITitle(neType string) []model.GoldKPITitle
// SelectUPFTotalFlow 查询UPF总流量 N3上行 N6下行
SelectUPFTotalFlow(neType, rmUID, startDate, endDate string) map[string]any
}

View File

@@ -0,0 +1,133 @@
package repository
import (
"fmt"
"strings"
"ems.agt/src/framework/datasource"
"ems.agt/src/framework/logger"
"ems.agt/src/modules/network_data/model"
)
// 实例化数据层 PerfKPIImpl 结构体
var NewPerfKPIImpl = &PerfKPIImpl{}
// PerfKPIImpl 性能统计 数据层处理
type PerfKPIImpl struct{}
// SelectGoldKPI 通过网元指标数据信息
func (r *PerfKPIImpl) SelectGoldKPI(query model.GoldKPIQuery, kpiIds []string) []map[string]any {
// 查询条件拼接
var conditions []string
var params []any
if query.RmUID != "" {
conditions = append(conditions, "gk.rm_uid = ?")
params = append(params, query.RmUID)
}
if query.NeType != "" {
conditions = append(conditions, "gk.ne_type = ?")
params = append(params, query.NeType)
}
if query.StartTime != "" {
conditions = append(conditions, "gk.start_time >= ?")
params = append(params, query.StartTime)
}
if query.EndTime != "" {
conditions = append(conditions, "gk.start_time <= ?")
params = append(params, query.EndTime)
}
// 构建查询条件语句
whereSql := ""
if len(conditions) > 0 {
whereSql += " where " + strings.Join(conditions, " and ")
}
// 查询字段列
timeFormat := "DATE_FORMAT(gk.start_time, '%Y-%m-%d %H:%i:')"
secondGroup := fmt.Sprintf("LPAD(FLOOR(SECOND(gk.start_time) / %d) * %d, 2, '0')", query.Interval, query.Interval)
groupByField := fmt.Sprintf("CONCAT( %s, %s ) AS timeGroup", timeFormat, secondGroup)
if query.Interval > 60 {
minute := query.Interval / 60
timeFormat = "DATE_FORMAT(gk.start_time, '%Y-%m-%d %H:')"
minuteGroup := fmt.Sprintf("LPAD(FLOOR(MINUTE(gk.start_time) / %d) * %d, 2, '0')", minute, minute)
groupByField = fmt.Sprintf("CONCAT( %s, %s ) AS timeGroup", timeFormat, minuteGroup)
}
var fields = []string{
groupByField,
"min(CASE WHEN gk.index != '' THEN gk.index ELSE 0 END) AS startIndex",
"min(CASE WHEN gk.ne_type != '' THEN gk.ne_type ELSE 0 END) AS neType",
"min(CASE WHEN gk.ne_name != '' THEN gk.ne_name ELSE 0 END) AS neName",
}
for _, kid := range kpiIds {
// 特殊字段只取最后一次收到的非0值
if kid == "AMF.01" || kid == "UDM.01" || kid == "UDM.02" || kid == "UDM.03" || kid == "SMF.01" {
str := fmt.Sprintf("IFNULL(SUBSTRING_INDEX(GROUP_CONCAT( CASE WHEN gk.kpi_id = '%s' and gk.VALUE != 0 THEN gk.VALUE END ), ',', 1), 0) AS '%s'", kid, kid)
fields = append(fields, str)
} else {
str := fmt.Sprintf("sum(CASE WHEN gk.kpi_id = '%s' THEN gk.value ELSE 0 END) AS '%s'", kid, kid)
fields = append(fields, str)
}
}
fieldsSql := strings.Join(fields, ",")
// 查询数据
if query.SortField == "" {
query.SortField = "timeGroup"
}
if query.SortOrder == "" {
query.SortOrder = "desc"
}
orderSql := fmt.Sprintf(" order by %s %s", query.SortField, query.SortOrder)
querySql := fmt.Sprintf("SELECT %s FROM gold_kpi gk %s GROUP BY timeGroup %s", fieldsSql, whereSql, orderSql)
results, err := datasource.RawDB("", querySql, params)
if err != nil {
logger.Errorf("query err => %v", err)
}
return results
}
// SelectGoldKPITitle 网元对应的指标名称
func (r *PerfKPIImpl) SelectGoldKPITitle(neType string) []model.GoldKPITitle {
result := []model.GoldKPITitle{}
tx := datasource.DefaultDB().Table("kpi_title").Where("ne_type = ?", neType).Find(&result)
if err := tx.Error; err != nil {
logger.Errorf("Find err => %v", err)
}
return result
}
// SelectUPFTotalFlow 查询UPF总流量 N3上行 N6下行
func (r *PerfKPIImpl) SelectUPFTotalFlow(neType, rmUID, startDate, endDate string) map[string]any {
// 查询条件拼接
var conditions []string
var params []any
if neType != "" {
conditions = append(conditions, "gk.ne_type = ?")
params = append(params, neType)
}
if rmUID != "" {
conditions = append(conditions, "gk.rm_uid = ?")
params = append(params, rmUID)
}
if startDate != "" {
conditions = append(conditions, "gk.date >= ?")
params = append(params, startDate)
}
if endDate != "" {
conditions = append(conditions, "gk.date <= ?")
params = append(params, endDate)
}
// 构建查询条件语句
whereSql := ""
if len(conditions) > 0 {
whereSql += " where " + strings.Join(conditions, " and ")
}
// 查询数据
querySql := fmt.Sprintf("SELECT sum( CASE WHEN gk.kpi_id = 'UPF.03' THEN gk.VALUE ELSE 0 END ) AS 'up', sum( CASE WHEN gk.kpi_id = 'UPF.06' THEN gk.VALUE ELSE 0 END ) AS 'down' FROM gold_kpi gk %s", whereSql)
results, err := datasource.RawDB("", querySql, params)
if err != nil {
logger.Errorf("query err => %v", err)
}
return results[0]
}

View File

@@ -0,0 +1,9 @@
package repository
import "ems.agt/src/modules/network_data/model"
// UE会话事件 数据层接口
type IUEEvent interface {
// SelectPage 根据条件分页查询
SelectPage(querys model.UEEventQuery) map[string]any
}

View File

@@ -0,0 +1,133 @@
package repository
import (
"fmt"
"strings"
"ems.agt/src/framework/datasource"
"ems.agt/src/framework/logger"
"ems.agt/src/framework/utils/date"
"ems.agt/src/framework/utils/parse"
"ems.agt/src/framework/utils/repo"
"ems.agt/src/modules/network_data/model"
)
// 实例化数据层 UEEventImpl 结构体
var NewUEEventImpl = &UEEventImpl{
selectSql: `select id, ne_type, ne_name, rm_uid, timestamp, event_type, event_json, created_at from ue_event`,
resultMap: map[string]string{
"id": "ID",
"ne_type": "NeType",
"ne_name": "NeName",
"rm_uid": "RmUID",
"timestamp": "Timestamp",
"event_type": "EventType",
"event_json": "EventJSONStr",
"created_at": "CreatedAt",
},
}
// UEEventImpl UE会话事件 数据层处理
type UEEventImpl struct {
// 查询视图对象SQL
selectSql string
// 结果字段与实体映射
resultMap map[string]string
}
// convertResultRows 将结果记录转实体结果组
func (r *UEEventImpl) convertResultRows(rows []map[string]any) []model.UEEvent {
arr := make([]model.UEEvent, 0)
for _, row := range rows {
item := model.UEEvent{}
for key, value := range row {
if keyMapper, ok := r.resultMap[key]; ok {
repo.SetFieldValue(&item, keyMapper, value)
}
}
arr = append(arr, item)
}
return arr
}
// SelectPage 根据条件分页查询
func (r *UEEventImpl) SelectPage(querys model.UEEventQuery) map[string]any {
// 查询条件拼接
var conditions []string
var params []any
if querys.NeType != "" {
conditions = append(conditions, "ne_type = ?")
params = append(params, querys.NeType)
}
if querys.RmUID != "" {
conditions = append(conditions, "rm_uid = ?")
params = append(params, querys.RmUID)
}
if querys.StartTime != "" {
conditions = append(conditions, "timestamp >= ?")
beginDate := date.ParseStrToDate(querys.StartTime, date.YYYY_MM_DD_HH_MM_SS)
params = append(params, beginDate.Unix())
}
if querys.EndTime != "" {
conditions = append(conditions, "timestamp <= ?")
endDate := date.ParseStrToDate(querys.EndTime, date.YYYY_MM_DD_HH_MM_SS)
params = append(params, endDate.Unix())
}
// 构建查询条件语句
whereSql := ""
if len(conditions) > 0 {
whereSql += " where " + strings.Join(conditions, " and ")
}
result := map[string]any{
"total": 0,
"rows": []model.CDREvent{},
}
// 查询数量 长度为0直接返回
totalSql := "select count(1) as 'total' from ue_event"
totalRows, err := datasource.RawDB("", totalSql+whereSql, params)
if err != nil {
logger.Errorf("total err => %v", err)
return result
}
total := parse.Number(totalRows[0]["total"])
if total == 0 {
return result
} else {
result["total"] = total
}
// 分页
pageNum, pageSize := repo.PageNumSize(querys.PageNum, querys.PageSize)
pageSql := " limit ?,? "
params = append(params, pageNum*pageSize)
params = append(params, pageSize)
// 排序
orderSql := ""
if querys.SortField != "" {
sortSql := querys.SortField
if querys.SortOrder != "" {
if querys.SortOrder == "desc" {
sortSql += " desc "
} else {
sortSql += " asc "
}
}
orderSql = fmt.Sprintf(" order by %s ", sortSql)
}
// 查询数据
querySql := r.selectSql + whereSql + orderSql + pageSql
results, err := datasource.RawDB("", querySql, params)
if err != nil {
logger.Errorf("query err => %v", err)
}
// 转换实体
result["rows"] = r.convertResultRows(results)
return result
}

View File

@@ -0,0 +1,9 @@
package service
import "ems.agt/src/modules/network_data/model"
// CDR会话事件 服务层接口
type ICDREvent interface {
// SelectPage 根据条件分页查询
SelectPage(querys model.CDREventQuery) map[string]any
}

View File

@@ -0,0 +1,22 @@
package service
import (
"ems.agt/src/modules/network_data/model"
"ems.agt/src/modules/network_data/repository"
)
// 实例化数据层 CDREventImpl 结构体
var NewCDREventImpl = &CDREventImpl{
cdrEventRepository: repository.NewCDREventImpl,
}
// CDREventImpl CDR会话事件 服务层处理
type CDREventImpl struct {
// CDR会话事件数据信息
cdrEventRepository repository.ICDREvent
}
// SelectPage 根据条件分页查询
func (r *CDREventImpl) SelectPage(querys model.CDREventQuery) map[string]any {
return r.cdrEventRepository.SelectPage(querys)
}

View File

@@ -0,0 +1,15 @@
package service
import "ems.agt/src/modules/network_data/model"
// 性能统计 服务层接口
type IPerfKPI interface {
// SelectGoldKPI 通过网元指标数据信息
SelectGoldKPI(query model.GoldKPIQuery) []map[string]any
// SelectGoldKPITitle 网元对应的指标名称
SelectGoldKPITitle(neType string) []model.GoldKPITitle
// SelectUPFTotalFlow 查询UPF总流量 N3上行 N6下行
SelectUPFTotalFlow(neType, rmUID string, day int) map[string]any
}

View File

@@ -0,0 +1,76 @@
package service
import (
"encoding/json"
"fmt"
"time"
"ems.agt/src/framework/constants/cachekey"
"ems.agt/src/framework/redis"
"ems.agt/src/modules/network_data/model"
"ems.agt/src/modules/network_data/repository"
)
// 实例化数据层 PerfKPIImpl 结构体
var NewPerfKPIImpl = &PerfKPIImpl{
perfKPIRepository: repository.NewPerfKPIImpl,
}
// PerfKPIImpl 性能统计 服务层处理
type PerfKPIImpl struct {
// 性能统计数据信息
perfKPIRepository repository.IPerfKPI
}
// SelectGoldKPI 通过网元指标数据信息
func (r *PerfKPIImpl) SelectGoldKPI(query model.GoldKPIQuery) []map[string]any {
// 获取数据指标id
var kpiIds []string
kpiTitles := r.perfKPIRepository.SelectGoldKPITitle(query.NeType)
for _, kpiId := range kpiTitles {
kpiIds = append(kpiIds, kpiId.KPIID)
}
data := r.perfKPIRepository.SelectGoldKPI(query, kpiIds)
if data == nil {
return []map[string]any{}
}
return data
}
// SelectGoldKPITitle 网元对应的指标名称
func (r *PerfKPIImpl) SelectGoldKPITitle(neType string) []model.GoldKPITitle {
return r.perfKPIRepository.SelectGoldKPITitle(neType)
}
// SelectUPFTotalFlow 查询UPF总流量 N3上行 N6下行
func (r *PerfKPIImpl) SelectUPFTotalFlow(neType, rmUID string, day int) map[string]any {
// 获取当前日期
now := time.Now()
endDate := now.Format("2006-01-02")
// 将当前日期前几天数
afterDays := now.AddDate(0, 0, -day)
startDate := afterDays.Format("2006-01-02")
var info map[string]any
// 读取缓存数据
key := fmt.Sprintf("%sUPFTotalFlow_%s_%d", cachekey.NE_DATA_KEY, rmUID, day)
infoStr, _ := redis.Get("", key)
if infoStr != "" {
json.Unmarshal([]byte(infoStr), &info)
expireSecond, _ := redis.GetExpire("", key)
expireMinute := (time.Duration(int64(expireSecond)) * time.Second)
if expireMinute > 1*time.Minute {
return info
}
}
info = r.perfKPIRepository.SelectUPFTotalFlow(neType, rmUID, startDate, endDate)
// 保存到缓存
infoJSON, _ := json.Marshal(info)
redis.SetByExpire("", key, string(infoJSON), time.Duration(10)*time.Minute)
return info
}

View File

@@ -0,0 +1,9 @@
package service
import "ems.agt/src/modules/network_data/model"
// UE会话事件 服务层接口
type IUEEvent interface {
// SelectPage 根据条件分页查询
SelectPage(querys model.UEEventQuery) map[string]any
}

View File

@@ -0,0 +1,22 @@
package service
import (
"ems.agt/src/modules/network_data/model"
"ems.agt/src/modules/network_data/repository"
)
// 实例化数据层 UEEventImpl 结构体
var NewUEEventImpl = &UEEventImpl{
ueEventRepository: repository.NewUEEventImpl,
}
// UEEventImpl UE会话事件 服务层处理
type UEEventImpl struct {
// UE会话事件数据信息
ueEventRepository repository.IUEEvent
}
// SelectPage 根据条件分页查询
func (r *UEEventImpl) SelectPage(querys model.UEEventQuery) map[string]any {
return r.ueEventRepository.SelectPage(querys)
}

View File

@@ -2,6 +2,7 @@ package controller
import (
"fmt"
"sync"
"ems.agt/src/framework/i18n"
"ems.agt/src/framework/utils/ctx"
@@ -26,7 +27,8 @@ type NeInfoController struct {
}
// neStateCacheMap 网元状态缓存最后一次成功的信息
var neStateCacheMap map[string]map[string]any = make(map[string]map[string]any)
var neStateCacheMap sync.Map
var mutex sync.Mutex
// 网元状态
//
@@ -52,12 +54,13 @@ func (s *NeInfoController) NeState(c *gin.Context) {
// 网元直连
resData, err := neService.NeState(neInfo)
if err != nil {
mutex.Lock()
// 异常取上次缓存
if v, ok := neStateCacheMap[neKey]; ok && v != nil {
v["online"] = false
neStateCacheMap[neKey] = v
resDataCache, ok := neStateCacheMap.Load(neKey)
if ok && resDataCache != nil {
resDataCache.(map[string]any)["online"] = false
} else {
neStateCacheMap[neKey] = map[string]any{
resDataCache = map[string]any{
"online": false,
"neId": neInfo.NeId,
"neName": neInfo.NeName,
@@ -65,13 +68,17 @@ func (s *NeInfoController) NeState(c *gin.Context) {
"neIP": neInfo.IP,
}
}
c.JSON(200, result.OkData(neStateCacheMap[neKey]))
neStateCacheMap.Store(neKey, resDataCache)
mutex.Unlock()
c.JSON(200, result.OkData(resDataCache))
return
}
// 存入缓存
resData["online"] = true
neStateCacheMap[neKey] = resData
mutex.Lock()
neStateCacheMap.Store(neKey, resData)
mutex.Unlock()
c.JSON(200, result.OkData(resData))
}

View File

@@ -6,6 +6,7 @@ import (
"ems.agt/src/framework/middleware/collectlogs"
"ems.agt/src/framework/middleware/repeat"
"ems.agt/src/modules/network_element/controller"
"ems.agt/src/modules/network_element/service"
"github.com/gin-gonic/gin"
)
@@ -14,7 +15,12 @@ import (
func Setup(router *gin.Engine) {
logger.Infof("开始加载 ====> network_element 模块路由")
// 启动时需要的初始参数
InitLoad()
neGroup := router.Group("/ne")
// 网元信息
{
neGroup.GET("/info",
middleware.PreAuthorize(nil),
@@ -155,18 +161,10 @@ func Setup(router *gin.Engine) {
controller.NewUDMSub.Import,
)
}
// 性能统计信息
kpiGroup := neGroup.Group("/kpi")
{
kpiGroup.GET("/title",
middleware.PreAuthorize(nil),
controller.NewPerfKPI.Title,
)
kpiGroup.GET("/data",
middleware.PreAuthorize(nil),
controller.NewPerfKPI.GoldKPI,
)
}
}
// InitLoad 初始参数
func InitLoad() {
// 启动时,清除缓存-网元类型
service.NewNeInfoImpl.ClearNeCacheByNeType("*")
}

View File

@@ -41,5 +41,6 @@ func NeState(neInfo model.NeInfo) (map[string]any, error) {
"expire": resData["expiryDate"],
"cpu": resData["cpuUsage"],
"mem": resData["memUsage"],
"disk": resData["diskSpace"],
}, nil
}

View File

@@ -7,6 +7,12 @@ type INeInfo interface {
// SelectNeInfoByNeTypeAndNeID 通过ne_type和ne_id查询网元信息
SelectNeInfoByNeTypeAndNeID(neType, neID string) model.NeInfo
// RefreshByNeTypeAndNeID 通过ne_type和ne_id刷新redis中的缓存
RefreshByNeTypeAndNeID(neType, neID string) model.NeInfo
// ClearNeCacheByNeType 清除网元类型缓存
ClearNeCacheByNeType(neType string) bool
// SelectNeList 查询网元列表
SelectNeList(ne model.NeInfo, bandStatus bool) []model.NeInfo
}

View File

@@ -1,6 +1,11 @@
package service
import (
"encoding/json"
"fmt"
"ems.agt/src/framework/constants/cachekey"
"ems.agt/src/framework/redis"
"ems.agt/src/modules/network_element/model"
"ems.agt/src/modules/network_element/repository"
)
@@ -18,7 +23,50 @@ type NeInfoImpl struct {
// SelectNeInfoByNeTypeAndNeID 通过ne_type和ne_id查询网元信息
func (r *NeInfoImpl) SelectNeInfoByNeTypeAndNeID(neType, neID string) model.NeInfo {
return r.neInfoRepository.SelectNeInfoByNeTypeAndNeID(neType, neID)
var neInfo model.NeInfo
key := fmt.Sprintf("%s%s.%s", cachekey.NE_KEY, neType, neID)
jsonStr, _ := redis.Get("", key)
if len(jsonStr) > 7 {
err := json.Unmarshal([]byte(jsonStr), &neInfo)
if err != nil {
neInfo = model.NeInfo{}
}
} else {
neInfo = r.neInfoRepository.SelectNeInfoByNeTypeAndNeID(neType, neID)
if neInfo.NeId == neID {
redis.Del("", key)
values, _ := json.Marshal(neInfo)
redis.Set("", key, string(values))
}
}
return neInfo
}
// RefreshByNeTypeAndNeID 通过ne_type和ne_id刷新redis中的缓存
func (r *NeInfoImpl) RefreshByNeTypeAndNeID(neType, neID string) model.NeInfo {
var neInfo model.NeInfo
key := fmt.Sprintf("%s%s.%s", cachekey.NE_KEY, neType, neID)
redis.Del("", key)
neInfo = r.neInfoRepository.SelectNeInfoByNeTypeAndNeID(neType, neID)
if neInfo.NeId == neID {
values, _ := json.Marshal(neInfo)
redis.Set("", key, string(values))
}
return neInfo
}
// ClearNeCacheByNeType 清除网元类型缓存
func (r *NeInfoImpl) ClearNeCacheByNeType(neType string) bool {
key := fmt.Sprintf("%s*", cachekey.NE_KEY)
if neType != "*" {
key = fmt.Sprintf("%s%s*", cachekey.NE_KEY, neType)
}
keys, err := redis.GetKeys("", key)
if err != nil {
return false
}
delOk, _ := redis.DelKeys("", keys)
return delOk
}
// SelectNeList 查询网元列表

View File

@@ -34,10 +34,10 @@ type SysMenuController struct {
// GET /list
func (s *SysMenuController) List(c *gin.Context) {
query := model.SysMenu{}
if v, ok := c.GetQuery("menuName"); ok {
if v, ok := c.GetQuery("menuName"); ok && v != "" {
query.MenuName = v
}
if v, ok := c.GetQuery("status"); ok {
if v, ok := c.GetQuery("status"); ok && v != "" {
query.Status = v
}
@@ -289,10 +289,10 @@ func (s *SysMenuController) Remove(c *gin.Context) {
// GET /treeSelect
func (s *SysMenuController) TreeSelect(c *gin.Context) {
query := model.SysMenu{}
if v, ok := c.GetQuery("menuName"); ok {
if v, ok := c.GetQuery("menuName"); ok && v != "" {
query.MenuName = v
}
if v, ok := c.GetQuery("status"); ok {
if v, ok := c.GetQuery("status"); ok && v != "" {
query.Status = v
}
@@ -331,10 +331,10 @@ func (s *SysMenuController) RoleMenuTreeSelect(c *gin.Context) {
}
query := model.SysMenu{}
if v, ok := c.GetQuery("menuName"); ok {
if v, ok := c.GetQuery("menuName"); ok && v != "" {
query.MenuName = v
}
if v, ok := c.GetQuery("status"); ok {
if v, ok := c.GetQuery("status"); ok && v != "" {
query.Status = v
}

View File

@@ -13,7 +13,7 @@ type ISysMenu interface {
// SelectMenuPermsByUserId 根据用户ID查询权限
SelectMenuPermsByUserId(userId string) []string
// SelectMenuPermsByUserId 根据用户ID查询权限
// SelectMenuTreeByUserId 根据用户ID查询权限
SelectMenuTreeByUserId(userId string) []model.SysMenu
// SelectMenuTreeSelectByUserId 查询菜单树结构信息

View File

@@ -0,0 +1,104 @@
package controller
import (
"strings"
"ems.agt/src/framework/i18n"
"ems.agt/src/framework/logger"
"ems.agt/src/framework/utils/ctx"
"ems.agt/src/framework/utils/parse"
"ems.agt/src/framework/vo/result"
"ems.agt/src/modules/ws/service"
"github.com/gin-gonic/gin"
)
// 实例化控制层 WSController 结构体
var NewWSController = &WSController{
wsService: service.NewWSImpl,
wsSendService: service.NewWSSendImpl,
}
// WebSocket通信
//
// PATH /ws
type WSController struct {
// WebSocket 服务
wsService service.IWS
// WebSocket消息发送 服务
wsSendService service.IWSSend
}
// 通用
//
// GET /?subGroupIDs=0
func (s *WSController) WS(c *gin.Context) {
language := ctx.AcceptLanguage(c)
// 登录用户信息
loginUser, err := ctx.LoginUser(c)
if err != nil {
c.JSON(401, result.CodeMsg(401, i18n.TKey(language, err.Error())))
return
}
// 订阅消息组
var subGroupIDs []string
subGroupIDStr := c.Query("subGroupID")
if subGroupIDStr != "" {
// 处理字符转id数组后去重
ids := strings.Split(subGroupIDStr, ",")
uniqueIDs := parse.RemoveDuplicates(ids)
if len(uniqueIDs) > 0 {
subGroupIDs = uniqueIDs
}
}
// 将 HTTP 连接升级为 WebSocket 连接
conn := s.wsService.UpgraderWs(c.Writer, c.Request)
if conn == nil {
return
}
defer conn.Close()
wsClient := s.wsService.NewClient(loginUser.UserID, subGroupIDs, conn)
// 等待停止信号
for value := range wsClient.StopChan {
logger.Infof("ws Stop Client UID %s %s", wsClient.BindUid, value)
return
}
}
// 测试
//
// GET /test?clientId=&groupID=
func (s *WSController) Test(c *gin.Context) {
language := ctx.AcceptLanguage(c)
// 登录用户信息
loginUser, err := ctx.LoginUser(c)
if err != nil {
c.JSON(401, result.CodeMsg(401, i18n.TKey(language, err.Error())))
return
}
errMsgArr := []string{}
clientId := c.Query("clientId")
if clientId != "" {
err := s.wsSendService.ByClientID(c.Query("clientId"), loginUser)
if err != nil {
errMsgArr = append(errMsgArr, "clientId: "+err.Error())
}
}
groupID := c.Query("groupID")
if groupID != "" {
err := s.wsSendService.ByGroupID(c.Query("groupID"), loginUser)
if err != nil {
errMsgArr = append(errMsgArr, "groupID: "+err.Error())
}
}
c.JSON(200, result.OkData(errMsgArr))
}

View File

@@ -0,0 +1,20 @@
package model
import "github.com/shirou/gopsutil/v3/net"
// NetConnectData 网络连接进程数据
type NetConnectData struct {
Type string `json:"type"`
Status string `json:"status"`
Laddr net.Addr `json:"localaddr"`
Raddr net.Addr `json:"remoteaddr"`
PID int32 `json:"PID"`
Name string `json:"name"`
}
// NetConnectQuery 网络连接进程查询
type NetConnectQuery struct {
Port int32 `json:"port"`
ProcessName string `json:"processName"`
ProcessID int32 `json:"processID"`
}

View File

@@ -0,0 +1,38 @@
package model
// PsProcessData 进程数据
type PsProcessData struct {
PID int32 `json:"PID"`
Name string `json:"name"`
PPID int32 `json:"PPID"`
Username string `json:"username"`
Status string `json:"status"`
StartTime string `json:"startTime"`
NumThreads int32 `json:"numThreads"`
NumConnections int `json:"numConnections"`
CpuPercent string `json:"cpuPercent"`
DiskRead string `json:"diskRead"`
DiskWrite string `json:"diskWrite"`
CmdLine string `json:"cmdLine"`
Rss string `json:"rss"`
VMS string `json:"vms"`
HWM string `json:"hwm"`
Data string `json:"data"`
Stack string `json:"stack"`
Locked string `json:"locked"`
Swap string `json:"swap"`
CpuValue float64 `json:"cpuValue"`
RssValue uint64 `json:"rssValue"`
Envs []string `json:"envs"`
}
// PsProcessQuery 进程查询
type PsProcessQuery struct {
Pid int32 `json:"pid"`
Name string `json:"name"`
Username string `json:"username"`
}

View File

@@ -0,0 +1,21 @@
package model
import "github.com/gorilla/websocket"
// WSClient ws客户端
type WSClient struct {
ID string // 连接ID-随机字符串16位
Conn *websocket.Conn // 连接实例
LastHeartbeat int64 // 最近一次心跳消息(毫秒)
BindUid string // 绑定登录用户ID
SubGroup []string // 订阅组ID
MsgChan chan []byte // 消息通道
StopChan chan struct{} // 停止信号-退出协程
}
// WSRequest ws消息接收
type WSRequest struct {
RequestID string `json:"requestId"` // 请求ID
Type string `json:"type"` // 业务类型
Data any `json:"data"` // 查询结构
}

View File

@@ -0,0 +1,29 @@
package processor
import (
"encoding/json"
"fmt"
"ems.agt/src/framework/logger"
"ems.agt/src/framework/vo/result"
neDataModel "ems.agt/src/modules/network_data/model"
neDataService "ems.agt/src/modules/network_data/service"
)
// GetCDRConnect 获取CDR会话事件-IMS
func GetCDRConnect(requestID string, data any) ([]byte, error) {
msgByte, _ := json.Marshal(data)
var query neDataModel.CDREventQuery
err := json.Unmarshal(msgByte, &query)
if err != nil {
logger.Warnf("ws processor GetCDRConnect err: %s", err.Error())
return nil, fmt.Errorf("query data structure error")
}
dataMap := neDataService.NewCDREventImpl.SelectPage(query)
resultByte, err := json.Marshal(result.Ok(map[string]any{
"requestId": requestID,
"data": dataMap,
}))
return resultByte, err
}

View File

@@ -0,0 +1,57 @@
package processor
import (
"encoding/json"
"fmt"
"ems.agt/src/framework/logger"
"ems.agt/src/framework/vo/result"
neService "ems.agt/src/modules/network_element/service"
)
// GetNeState 获取网元服务状态
func GetNeState(requestID string, data any) ([]byte, error) {
msgByte, _ := json.Marshal(data)
var querys struct {
NeType string `json:"neType" form:"neType" binding:"required"`
NeID string `json:"neId" form:"neId" binding:"required"`
}
err := json.Unmarshal(msgByte, &querys)
if err != nil {
logger.Warnf("ws processor GetUPFTotalFlow err: %s", err.Error())
return nil, fmt.Errorf("query data structure error")
}
if querys.NeType == "" || querys.NeID == "" {
return nil, fmt.Errorf("query neType any neId empty")
}
// 查询网元获取IP
neInfo := neService.NewNeInfoImpl.SelectNeInfoByNeTypeAndNeID(querys.NeType, querys.NeID)
if neInfo.NeId != querys.NeID || neInfo.IP == "" {
return nil, fmt.Errorf("no matching network element information found")
}
// 网元直连
resData, err := neService.NeState(neInfo)
if err != nil {
resultByte, err := json.Marshal(result.Ok(map[string]any{
"requestId": requestID,
"data": map[string]any{
"online": false,
"neId": neInfo.NeId,
"neName": neInfo.NeName,
"neType": neInfo.NeType,
"neIP": neInfo.IP,
},
}))
return resultByte, err
}
resData["online"] = true
resultByte, err := json.Marshal(result.Ok(map[string]any{
"requestId": requestID,
"data": resData,
}))
return resultByte, err
}

View File

@@ -0,0 +1,61 @@
package processor
import (
"encoding/json"
"fmt"
"strings"
"ems.agt/src/framework/logger"
"ems.agt/src/framework/vo/result"
"ems.agt/src/modules/ws/model"
"github.com/shirou/gopsutil/v3/net"
"github.com/shirou/gopsutil/v3/process"
)
// GetNetConnections 获取网络连接进程
func GetNetConnections(requestID string, data any) ([]byte, error) {
msgByte, _ := json.Marshal(data)
var query model.NetConnectQuery
err := json.Unmarshal(msgByte, &query)
if err != nil {
logger.Warnf("ws processor GetNetConnections err: %s", err.Error())
return nil, fmt.Errorf("query data structure error")
}
dataArr := []model.NetConnectData{}
for _, netType := range [...]string{"tcp", "udp"} {
connections, err := net.Connections(netType)
if err != nil {
continue
}
for _, conn := range connections {
if query.ProcessID > 0 && query.ProcessID != conn.Pid {
continue
}
proc, err := process.NewProcess(conn.Pid)
if err == nil {
name, _ := proc.Name()
if name != "" && query.ProcessName != "" && !strings.Contains(name, query.ProcessName) {
continue
}
if query.Port > 0 && query.Port != int32(conn.Laddr.Port) && query.Port != int32(conn.Raddr.Port) {
continue
}
dataArr = append(dataArr, model.NetConnectData{
Type: netType,
Status: conn.Status,
Laddr: conn.Laddr,
Raddr: conn.Raddr,
PID: conn.Pid,
Name: name,
})
}
}
}
resultByte, err := json.Marshal(result.Ok(map[string]any{
"requestId": requestID,
"data": dataArr,
}))
return resultByte, err
}

View File

@@ -0,0 +1,142 @@
package processor
import (
"encoding/json"
"fmt"
"sort"
"strings"
"sync"
"ems.agt/src/framework/logger"
"ems.agt/src/framework/utils/date"
"ems.agt/src/framework/utils/parse"
"ems.agt/src/framework/vo/result"
"ems.agt/src/modules/ws/model"
"github.com/shirou/gopsutil/v3/process"
)
// GetProcessData 获取进程数据
func GetProcessData(requestID string, data any) ([]byte, error) {
msgByte, _ := json.Marshal(data)
var query model.PsProcessQuery
err := json.Unmarshal(msgByte, &query)
if err != nil {
logger.Warnf("ws processor GetNetConnections err: %s", err.Error())
return nil, fmt.Errorf("query data structure error")
}
var processes []*process.Process
processes, err = process.Processes()
if err != nil {
return nil, err
}
var (
dataArr = []model.PsProcessData{}
resultMutex sync.Mutex
wg sync.WaitGroup
numWorkers = 4
)
handleData := func(proc *process.Process) {
procData := model.PsProcessData{
PID: proc.Pid,
}
if query.Pid > 0 && query.Pid != proc.Pid {
return
}
procName, err := proc.Name()
if procName == "" || err != nil {
return
} else {
procData.Name = procName
}
if query.Name != "" && !strings.Contains(procData.Name, query.Name) {
return
}
if username, err := proc.Username(); err == nil {
procData.Username = username
}
if query.Username != "" && !strings.Contains(procData.Username, query.Username) {
return
}
procData.PPID, _ = proc.Ppid()
statusArray, _ := proc.Status()
if len(statusArray) > 0 {
procData.Status = strings.Join(statusArray, ",")
}
createTime, procErr := proc.CreateTime()
if procErr == nil {
procData.StartTime = date.ParseDateToStr(createTime, date.YYYY_MM_DD_HH_MM_SS)
}
procData.NumThreads, _ = proc.NumThreads()
procData.CpuValue, _ = proc.CPUPercent()
procData.CpuPercent = fmt.Sprintf("%.2f", procData.CpuValue) + "%"
menInfo, procErr := proc.MemoryInfo()
if procErr == nil {
procData.Rss = parse.Bit(float64(menInfo.RSS))
procData.Data = parse.Bit(float64(menInfo.Data))
procData.VMS = parse.Bit(float64(menInfo.VMS))
procData.HWM = parse.Bit(float64(menInfo.HWM))
procData.Stack = parse.Bit(float64(menInfo.Stack))
procData.Locked = parse.Bit(float64(menInfo.Locked))
procData.Swap = parse.Bit(float64(menInfo.Swap))
procData.RssValue = menInfo.RSS
} else {
procData.Rss = "--"
procData.Data = "--"
procData.VMS = "--"
procData.HWM = "--"
procData.Stack = "--"
procData.Locked = "--"
procData.Swap = "--"
procData.RssValue = 0
}
ioStat, procErr := proc.IOCounters()
if procErr == nil {
procData.DiskWrite = parse.Bit(float64(ioStat.WriteBytes))
procData.DiskRead = parse.Bit(float64(ioStat.ReadBytes))
} else {
procData.DiskWrite = "--"
procData.DiskRead = "--"
}
procData.CmdLine, _ = proc.Cmdline()
procData.Envs, _ = proc.Environ()
resultMutex.Lock()
dataArr = append(dataArr, procData)
resultMutex.Unlock()
}
chunkSize := (len(processes) + numWorkers - 1) / numWorkers
for i := 0; i < numWorkers; i++ {
wg.Add(1)
start := i * chunkSize
end := (i + 1) * chunkSize
if end > len(processes) {
end = len(processes)
}
go func(start, end int) {
defer wg.Done()
for j := start; j < end; j++ {
handleData(processes[j])
}
}(start, end)
}
wg.Wait()
sort.Slice(dataArr, func(i, j int) bool {
return dataArr[i].PID < dataArr[j].PID
})
resultByte, err := json.Marshal(result.Ok(map[string]any{
"requestId": requestID,
"data": dataArr,
}))
return resultByte, err
}

View File

@@ -0,0 +1,29 @@
package processor
import (
"encoding/json"
"fmt"
"ems.agt/src/framework/logger"
"ems.agt/src/framework/vo/result"
neDataModel "ems.agt/src/modules/network_data/model"
neDataService "ems.agt/src/modules/network_data/service"
)
// GetUEConnect 获取UE会话事件-AMF
func GetUEConnect(requestID string, data any) ([]byte, error) {
msgByte, _ := json.Marshal(data)
var query neDataModel.UEEventQuery
err := json.Unmarshal(msgByte, &query)
if err != nil {
logger.Warnf("ws processor GetUEConnect err: %s", err.Error())
return nil, fmt.Errorf("query data structure error")
}
dataMap := neDataService.NewUEEventImpl.SelectPage(query)
resultByte, err := json.Marshal(result.Ok(map[string]any{
"requestId": requestID,
"data": dataMap,
}))
return resultByte, err
}

View File

@@ -0,0 +1,44 @@
package processor
import (
"encoding/json"
"fmt"
"ems.agt/src/framework/logger"
"ems.agt/src/framework/vo/result"
neDataService "ems.agt/src/modules/network_data/service"
neService "ems.agt/src/modules/network_element/service"
)
// GetUPFTotalFlow 获取UPF-总流量数 N3上行 N6下行 单位比特(bit)
func GetUPFTotalFlow(requestID string, data any) ([]byte, error) {
msgByte, _ := json.Marshal(data)
var querys struct {
NeType string `json:"neType" form:"neType" binding:"required"`
NeID string `json:"neId" form:"neId" binding:"required"`
Day int `json:"day" binding:"required"`
}
err := json.Unmarshal(msgByte, &querys)
if err != nil {
logger.Warnf("ws processor GetUPFTotalFlow err: %s", err.Error())
return nil, fmt.Errorf("query data structure error")
}
if querys.NeType == "" || querys.NeID == "" || querys.Day < 0 {
return nil, fmt.Errorf("query neType any neId empty or day less 0 ")
}
// 查询网元获取IP
neInfo := neService.NewNeInfoImpl.SelectNeInfoByNeTypeAndNeID(querys.NeType, querys.NeID)
if neInfo.NeId != querys.NeID || neInfo.IP == "" {
return nil, fmt.Errorf("no matching network element information found")
}
dataMap := neDataService.NewPerfKPIImpl.SelectUPFTotalFlow(neInfo.NeType, neInfo.RmUID, querys.Day)
resultByte, err := json.Marshal(result.Ok(map[string]any{
"requestId": requestID,
"data": dataMap,
}))
return resultByte, err
}

View File

@@ -0,0 +1,20 @@
package service
import (
"net/http"
"ems.agt/src/modules/ws/model"
"github.com/gorilla/websocket"
)
// IWS WebSocket通信 服务层接口
type IWS interface {
// UpgraderWs http升级ws请求
UpgraderWs(w http.ResponseWriter, r *http.Request) *websocket.Conn
// NewClient 新建客户端 uid 登录用户ID
NewClient(uid string, gids []string, conn *websocket.Conn) *model.WSClient
// CloseClient 客户端关闭
CloseClient(clientID string)
}

View File

@@ -0,0 +1,216 @@
package service
import (
"encoding/json"
"net/http"
"sync"
"time"
"ems.agt/src/framework/logger"
"ems.agt/src/framework/utils/generate"
"ems.agt/src/framework/vo/result"
"ems.agt/src/modules/ws/model"
"github.com/gorilla/websocket"
)
var (
// ws客户端 [clientId: client]
WsClients sync.Map
// ws用户对应的多个客户端id [uid:clientIds]
WsUsers sync.Map
// ws组对应的多个用户id [groupID:uids]
WsGroup sync.Map
)
// 实例化服务层 WSImpl 结构体
var NewWSImpl = &WSImpl{}
// WSImpl WebSocket通信 服务层处理
type WSImpl struct{}
// UpgraderWs http升级ws请求
func (s *WSImpl) UpgraderWs(w http.ResponseWriter, r *http.Request) *websocket.Conn {
wsUpgrader := websocket.Upgrader{
Subprotocols: []string{"omc-ws"},
// 设置消息发送缓冲区大小byte如果这个值设置得太小可能会导致服务端在发送大型消息时遇到问题
WriteBufferSize: 1024,
// 消息包启用压缩
EnableCompression: true,
// ws握手超时时间
HandshakeTimeout: 5 * time.Second,
// ws握手过程中允许跨域
CheckOrigin: func(r *http.Request) bool {
return true
},
}
conn, err := wsUpgrader.Upgrade(w, r, nil)
if err != nil {
logger.Errorf("ws Upgrade err: %s", err.Error())
}
return conn
}
// NewClient 新建客户端 uid 登录用户ID
func (s *WSImpl) NewClient(uid string, groupIDs []string, conn *websocket.Conn) *model.WSClient {
// clientID也可以用其他方式生成只要能保证在所有服务端中都能保证唯一即可
clientID := generate.Code(16)
wsClient := &model.WSClient{
ID: clientID,
Conn: conn,
LastHeartbeat: time.Now().UnixMilli(),
BindUid: uid,
SubGroup: groupIDs,
MsgChan: make(chan []byte, 100),
StopChan: make(chan struct{}, 1), // 卡死循环标记
}
// 存入客户端
WsClients.Store(clientID, wsClient)
// 存入用户持有客户端
if uid != "" {
if v, ok := WsUsers.Load(uid); ok {
uidClientIds := v.(*[]string)
*uidClientIds = append(*uidClientIds, clientID)
} else {
WsUsers.Store(uid, &[]string{clientID})
}
}
// 存入用户订阅组
if uid != "" && len(groupIDs) > 0 {
for _, groupID := range groupIDs {
if v, ok := WsGroup.Load(groupID); ok {
groupUIDs := v.(*[]string)
// 避免同组内相同用户
hasUid := false
for _, uidv := range *groupUIDs {
if uidv == uid {
hasUid = true
break
}
}
if !hasUid {
*groupUIDs = append(*groupUIDs, uid)
}
} else {
WsGroup.Store(groupID, &[]string{uid})
}
}
}
go s.clientRead(wsClient)
go s.clientWrite(wsClient)
// 发客户端id确认是否连接
msgByte, _ := json.Marshal(result.OkData(map[string]string{
"clientId": clientID,
}))
wsClient.MsgChan <- msgByte
return wsClient
}
// clientRead 客户端读取消息
func (s *WSImpl) clientRead(wsClient *model.WSClient) {
defer func() {
if err := recover(); err != nil {
logger.Errorf("ws ReadMessage Panic Error: %v", err)
}
}()
for {
// 读取消息
messageType, msg, err := wsClient.Conn.ReadMessage()
if err != nil {
logger.Warnf("ws ReadMessage UID %s err: %s", wsClient.BindUid, err.Error())
s.CloseClient(wsClient.ID)
return
}
// 文本和二进制类型只处理文本json
if messageType == websocket.TextMessage {
var reqMsg model.WSRequest
err := json.Unmarshal(msg, &reqMsg)
// fmt.Println(messageType, string(msg))
if err != nil {
msgByte, _ := json.Marshal(result.ErrMsg("message format not supported"))
wsClient.MsgChan <- msgByte
} else {
go NewWSReceiveImpl.Receive(wsClient, reqMsg)
}
}
}
}
// clientWrite 客户端写入消息
func (s *WSImpl) clientWrite(wsClient *model.WSClient) {
defer func() {
if err := recover(); err != nil {
logger.Errorf("ws WriteMessage Panic Error: %v", err)
}
}()
for msg := range wsClient.MsgChan {
// 发送消息
err := wsClient.Conn.WriteMessage(websocket.TextMessage, msg)
if err != nil {
logger.Warnf("ws WriteMessage UID %s err: %s", wsClient.BindUid, err.Error())
s.CloseClient(wsClient.ID)
return
}
wsClient.LastHeartbeat = time.Now().UnixMilli()
}
}
// CloseClient 客户端关闭
func (s *WSImpl) CloseClient(clientID string) {
v, ok := WsClients.Load(clientID)
if !ok {
return
}
client := v.(*model.WSClient)
defer func() {
client.Conn.WriteMessage(websocket.CloseMessage, []byte{})
client.Conn.Close()
client.StopChan <- struct{}{}
WsClients.Delete(clientID)
}()
// 客户端断线时自动踢出Uid绑定列表
if client.BindUid != "" {
if clientIds, ok := WsUsers.Load(client.BindUid); ok {
uidClientIds := clientIds.(*[]string)
if len(*uidClientIds) > 0 {
tempClientIds := make([]string, 0, len(*uidClientIds))
for _, v := range *uidClientIds {
if v != client.ID {
tempClientIds = append(tempClientIds, v)
}
}
*uidClientIds = tempClientIds
}
}
}
// 客户端断线时自动踢出已加入的组
if client.BindUid != "" && len(client.SubGroup) > 0 {
for _, groupID := range client.SubGroup {
uids, ok := WsGroup.Load(groupID)
if !ok {
continue
}
groupUIDs := uids.(*[]string)
if len(*groupUIDs) > 0 {
tempUIDs := make([]string, 0, len(*groupUIDs))
for _, v := range *groupUIDs {
if v != client.BindUid {
tempUIDs = append(tempUIDs, v)
}
}
*groupUIDs = tempUIDs
}
}
}
}

View File

@@ -0,0 +1,9 @@
package service
import "ems.agt/src/modules/ws/model"
// IWSReceive WebSocket消息接收处理 服务层接口
type IWSReceive interface {
// Receive 接收处理
Receive(client *model.WSClient, reqMsg model.WSRequest) error
}

View File

@@ -0,0 +1,56 @@
package service
import (
"encoding/json"
"fmt"
"ems.agt/src/framework/logger"
"ems.agt/src/framework/vo/result"
"ems.agt/src/modules/ws/model"
"ems.agt/src/modules/ws/processor"
)
// 实例化服务层 WSReceiveImpl 结构体
var NewWSReceiveImpl = &WSReceiveImpl{}
// WSReceiveImpl WebSocket消息接收处理 服务层处理
type WSReceiveImpl struct{}
// Receive 接收处理
func (s *WSReceiveImpl) Receive(client *model.WSClient, reqMsg model.WSRequest) {
if reqMsg.RequestID == "" {
msg := "message requestId is required"
logger.Warnf("ws ReceiveMessage UID %s err: %s", client.BindUid, msg)
msgByte, _ := json.Marshal(result.ErrMsg(msg))
client.MsgChan <- msgByte
return
}
var resByte []byte
var err error
switch reqMsg.Type {
case "ps":
resByte, err = processor.GetProcessData(reqMsg.RequestID, reqMsg.Data)
case "net":
resByte, err = processor.GetNetConnections(reqMsg.RequestID, reqMsg.Data)
case "cdr":
resByte, err = processor.GetCDRConnect(reqMsg.RequestID, reqMsg.Data)
case "ue":
resByte, err = processor.GetUEConnect(reqMsg.RequestID, reqMsg.Data)
case "upf_tf":
resByte, err = processor.GetUPFTotalFlow(reqMsg.RequestID, reqMsg.Data)
case "ne_state":
resByte, err = processor.GetNeState(reqMsg.RequestID, reqMsg.Data)
default:
err = fmt.Errorf("message type not supported")
}
if err != nil {
logger.Warnf("ws ReceiveMessage UID %s err: %s", client.BindUid, err.Error())
msgByte, _ := json.Marshal(result.ErrMsg(err.Error()))
client.MsgChan <- msgByte
return
}
client.MsgChan <- resByte
}

View File

@@ -0,0 +1,10 @@
package service
// IWSSend WebSocket消息发送处理 服务层接口
type IWSSend interface {
// ByClientID 给已知客户端发消息
ByClientID(clientID string, data any) error
// ByGroupID 给订阅组的用户发送消息
ByGroupID(gid string, data any) error
}

View File

@@ -0,0 +1,85 @@
package service
import (
"encoding/json"
"fmt"
"ems.agt/src/framework/vo/result"
"ems.agt/src/modules/ws/model"
)
// 订阅组指定编号为支持服务器向客户端主动推送数据
const (
// 组号-其他
GROUP_OTHER = "0"
// 组号-指标
GROUP_KPI = "10"
// 组号-指标UPF
GROUP_KPI_UPF = "12"
// 组号-IMS_CDR会话事件
GROUP_IMS_CDR = "1005"
// 组号-AMF_UE会话事件
GROUP_AMF_UE = "1010"
)
// 实例化服务层 WSSendImpl 结构体
var NewWSSendImpl = &WSSendImpl{}
// IWSSend WebSocket消息发送处理 服务层处理
type WSSendImpl struct{}
// ByClientID 给已知客户端发消息
func (s *WSSendImpl) ByClientID(clientID string, data any) error {
v, ok := WsClients.Load(clientID)
if !ok {
return fmt.Errorf("no fount client ID: %s", clientID)
}
dataByte, err := json.Marshal(result.OkData(data))
if err != nil {
return err
}
client := v.(*model.WSClient)
if len(client.MsgChan) > 90 {
NewWSImpl.CloseClient(client.ID)
return fmt.Errorf("msg chan over 90 will close client ID: %s", clientID)
}
client.MsgChan <- dataByte
return nil
}
// ByGroupID 给订阅组的用户发送消息
func (s *WSSendImpl) ByGroupID(groupID string, data any) error {
uids, ok := WsGroup.Load(groupID)
if !ok {
return fmt.Errorf("no fount Group ID: %s", groupID)
}
groupUids := uids.(*[]string)
// 群组中没有成员
if len(*groupUids) == 0 {
return fmt.Errorf("no members in the group")
}
// 在群组中找到对应的 uid
for _, uid := range *groupUids {
clientIds, ok := WsUsers.Load(uid)
if !ok {
continue
}
// 在用户中找到客户端并发送
uidClientIds := clientIds.(*[]string)
for _, clientId := range *uidClientIds {
err := s.ByClientID(clientId, map[string]any{
"groupId": groupID,
"data": data,
})
if err != nil {
continue
}
}
}
return nil
}

30
src/modules/ws/ws.go Normal file
View File

@@ -0,0 +1,30 @@
package ws
import (
"ems.agt/src/framework/logger"
"ems.agt/src/framework/middleware"
"ems.agt/src/framework/middleware/collectlogs"
"ems.agt/src/modules/ws/controller"
"github.com/gin-gonic/gin"
)
// 模块路由注册
func Setup(router *gin.Engine) {
logger.Infof("开始加载 ====> ws 模块路由")
// WebSocket 协议
wsGroup := router.Group("/ws")
{
wsGroup.GET("",
middleware.PreAuthorize(nil),
collectlogs.OperateLog(collectlogs.OptionNew("WS Subscription", collectlogs.BUSINESS_TYPE_OTHER)),
controller.NewWSController.WS,
)
wsGroup.GET("/test",
middleware.PreAuthorize(nil),
controller.NewWSController.Test,
)
}
}