This commit is contained in:
TsMask
2023-08-22 19:25:39 +08:00
parent 38d3b7450e
commit 96de169777
45 changed files with 881 additions and 676 deletions

7
core/consts/consts.go Normal file
View File

@@ -0,0 +1,7 @@
package consts
//定义user type
const (
MSG = "msg"
FILE = "ftp"
)

25
core/db/mysql.go Normal file
View File

@@ -0,0 +1,25 @@
package db
import (
"omc/conf"
"github.com/aceld/zinx/zlog"
"gorm.io/driver/mysql"
"gorm.io/gorm"
)
var Client *gorm.DB
func Init() error {
d, err := gorm.Open(mysql.Open(conf.OmcConf.Mysql), &gorm.Config{})
if err != nil {
zlog.Ins().ErrorF("open mysql %s error, ", conf.OmcConf.Mysql, err)
panic(err)
}
sqlDB, _ := d.DB()
sqlDB.SetMaxOpenConns(20)
sqlDB.SetMaxIdleConns(10)
Client = d
return nil
}

View File

@@ -0,0 +1,88 @@
package decoder
import (
"bytes"
"encoding/binary"
"encoding/hex"
"github.com/aceld/zinx/ziface"
"github.com/aceld/zinx/zlog"
"math"
)
//ffff 01 00999999 000a 68656c6c6f2074657374
// +---------------+---------------+--------------------+----------------+-------------------
// | 开始标志 | type | 秒时间戳 | 长度 | 消息体
// | 0xffff(2byte) | uint8(1byte) | uint32(4byte) | uint16(2byte) | bytes(N byte)
// +---------------+---------------+--------------------+----------------+-------------------
const OmcMsgHeaderSize = 9 //表示OMC空包长度
type OmcDecoder struct {
StartSign uint16
MsgType uint8
TimeStamp uint32
LenOfBody uint16
Value []byte
}
func NewOmcDecoder() ziface.IDecoder {
return &OmcDecoder{}
}
func (omc *OmcDecoder) GetLengthField() *ziface.LengthField {
return &ziface.LengthField{
MaxFrameLength: math.MaxUint16 + 9,
LengthFieldOffset: 7,
LengthFieldLength: 2,
LengthAdjustment: 0,
InitialBytesToStrip: 0,
//注意现在默认是大端,使用小端需要指定编码方式
Order: binary.BigEndian,
}
}
func (omc *OmcDecoder) decode(data []byte) *OmcDecoder {
ltvData := OmcDecoder{}
ltvData.StartSign = binary.BigEndian.Uint16(data[0:2])
ltvData.MsgType = data[2]
ltvData.TimeStamp = binary.BigEndian.Uint32(data[3:7])
ltvData.LenOfBody = binary.BigEndian.Uint16(data[7:9])
//Determine the length of V. (确定V的长度)
ltvData.Value = make([]byte, ltvData.LenOfBody)
//5. Get V
binary.Read(bytes.NewBuffer(data[OmcMsgHeaderSize:OmcMsgHeaderSize+ltvData.LenOfBody]), binary.BigEndian, ltvData.Value)
return &ltvData
}
func (omc *OmcDecoder) Intercept(chain ziface.IChain) ziface.IcResp {
//1. Get the IMessage of zinx
iMessage := chain.GetIMessage()
if iMessage == nil {
// Go to the next layer in the chain of responsibility
return chain.ProceedWithIMessage(iMessage, nil)
}
//2. Get Data
data := iMessage.GetData()
zlog.Ins().DebugF("omc-RawData size:%d data:%s\n", len(data), hex.EncodeToString(data))
// (读取的数据不超过包头,直接进入下一层)
if len(data) < OmcMsgHeaderSize {
return chain.ProceedWithIMessage(iMessage, nil)
}
//4. Decode
ltvData := omc.decode(data)
zlog.Ins().DebugF("omc-decode %v", ltvData)
// (将解码后的数据重新设置到IMessage中, Zinx的Router需要MsgID来寻址)
iMessage.SetDataLen(uint32(ltvData.LenOfBody))
iMessage.SetMsgID(uint32(ltvData.MsgType))
iMessage.SetData(ltvData.Value)
//6. Pass the decoded data to the next layer.
// (将解码后的数据进入下一层)
return chain.ProceedWithIMessage(iMessage, *ltvData)
}

117
core/dpack/datapack_omc.go Normal file
View File

@@ -0,0 +1,117 @@
package dpack
import (
"bytes"
"encoding/binary"
"errors"
"github.com/aceld/zinx/zlog"
"github.com/aceld/zinx/zpack"
"time"
"github.com/aceld/zinx/zconf"
"github.com/aceld/zinx/ziface"
)
// +---------------+---------------+--------------------+----------------+-------------------
// | 开始标志 | type | 秒时间戳 | 长度 | 消息体
// | 0xffff(2byte) | uint8(1byte) | uint32(4byte) | uint16(2byte) | bytes(N byte)
// +---------------+---------------+--------------------+----------------+-------------------
var defaultHeaderLen uint32 = 9
type DataPack struct{}
// NewDataPack initializes a packing and unpacking instance
// (封包拆包实例初始化方法)
func NewDataPack() ziface.IDataPack {
return &DataPack{}
}
// GetHeadLen returns the length of the message header
// (获取包头长度方法)
func (dp *DataPack) GetHeadLen() uint32 {
//ID uint32(4 bytes) + DataLen uint32(4 bytes)
return defaultHeaderLen
}
// Pack packs the message (compresses the data)
// (封包方法,压缩数据)
func (dp *DataPack) Pack(msg ziface.IMessage) ([]byte, error) {
zlog.Ins().InfoF("my pack: %v", msg)
// Create a buffer to store the bytes
// (创建一个存放bytes字节的缓冲)
dataBuff := bytes.NewBuffer([]byte{})
// Write the oxffff
if err := binary.Write(dataBuff, binary.BigEndian, uint16(0xffff)); err != nil {
return nil, err
}
//Write the type
if err := binary.Write(dataBuff, binary.BigEndian, uint8(msg.GetMsgID())); err != nil {
return nil, err
}
//Write the timestamp
if err := binary.Write(dataBuff, binary.BigEndian, uint32(time.Now().Unix())); err != nil {
return nil, err
}
//Write the length
if err := binary.Write(dataBuff, binary.BigEndian, uint16(msg.GetDataLen())); err != nil {
return nil, err
}
// Write the data
if err := binary.Write(dataBuff, binary.BigEndian, msg.GetData()); err != nil {
return nil, err
}
return dataBuff.Bytes(), nil
}
// Unpack unpacks the message (decompresses the data)
// (拆包方法,解压数据)
func (dp *DataPack) Unpack(binaryData []byte) (ziface.IMessage, error) {
// Create an ioReader for the input binary data
dataBuff := bytes.NewReader(binaryData)
// Only unpack the header information to obtain the data length and message ID
// (只解压head的信息得到dataLen和msgID)
msg := &zpack.Message{}
// Read the startSign
var startSign uint16
if err := binary.Read(dataBuff, binary.BigEndian, &startSign); err != nil {
return nil, err
}
// Read the msgID
var msgType uint
if err := binary.Read(dataBuff, binary.BigEndian, &msgType); err != nil {
return nil, err
}
msg.ID = uint32(msgType)
// read timeStamp
var timeStamp uint32
if err := binary.Read(dataBuff, binary.BigEndian, &timeStamp); err != nil {
return nil, err
}
// Read the data length
var length uint16
if err := binary.Read(dataBuff, binary.BigEndian, &length); err != nil {
return nil, err
}
msg.DataLen = uint32(length)
// Check whether the data length exceeds the maximum allowed packet size
// (判断dataLen的长度是否超出我们允许的最大包长度)
if zconf.GlobalObject.MaxPacketSize > 0 && msg.GetDataLen() > zconf.GlobalObject.MaxPacketSize {
return nil, errors.New("too large msg data received")
}
// Only the header data needs to be unpacked, and then another data read is performed from the connection based on the header length
// (这里只需要把head的数据拆包出来就可以了然后再通过head的长度再从conn读取一次数据)
return msg, nil
}

View File

@@ -0,0 +1,16 @@
package emun
func OrigSeverity(os string) int32 {
switch os {
case "Critical":
return 1
case "Major":
return 2
case "Minor":
return 3
case "Warning":
return 4
default:
return 0
}
}

68
core/file/file.go Normal file
View File

@@ -0,0 +1,68 @@
package file
import (
"os"
"strings"
)
//BJ/HX/RJ/OMC/FM/告警文件生成时间
///FTP根目录/省份简称/专业简称/厂家编码/OMC名称/数据类别/日期或时间/
//<省份简称>-<数据类别>-<网元类型>[-网元子类]-<主机编号>-<数据版本>-<数据时间>[-登录用户名][-同步请求标识][-Ri][-统计周期] [-序列号].<后缀>
//BJ-FM-OMC-主机编码-v0-告警文件生成时间-001.txt
type FileMeta struct {
DirRoot string `json:"dir_root"`
Province string `json:"province"` //网元所在省份
DeviceCode string `json:"device_code"` //主机编码 四位每1位可用0-9、A-Z编码
Time string `json:"time"` //文件生成时间
Index string `json:"index"` //文件标识
Compress bool `json:"compress"` //文件是否压缩
ReqId string `json:"req_id"`
}
// HasDir 判断文件夹是否存在
func HasDir(path string) (bool, error) {
_, _err := os.Stat(path)
if _err == nil {
return true, nil
}
if os.IsNotExist(_err) {
return false, nil
}
return false, _err
}
func CreateDir(meta *FileMeta) (string, error) {
dir := strings.Join([]string{meta.DirRoot, meta.Province, "HX", "RJ", "OMC", "FM", meta.Time}, "/")
exist, err := HasDir(dir)
if err != nil {
return "", err
}
if !exist {
err := os.MkdirAll(dir, os.ModePerm)
if err != nil {
return "", err
}
}
return dir, err
}
func GetName(meta *FileMeta) string {
fileName := strings.Join([]string{meta.Province, "FM", "OMC", meta.DeviceCode, "v0", meta.Time, meta.Index}, "-")
return strings.ToUpper(fileName)
}
func GenFile(meta *FileMeta, content []byte) (string, error) {
// 创建文件夹
dir, err := CreateDir(meta)
if err != nil {
return "", err
}
//创建文件
fileName := dir + "/" + GetName(meta) + ".txt"
err = os.WriteFile(fileName, content, 0666)
if err != nil {
return "", err
}
return fileName, nil
}

20
core/file/file_test.go Normal file
View File

@@ -0,0 +1,20 @@
package file
import (
"fmt"
"testing"
"time"
)
func TestFile(t *testing.T) {
var meta FileMeta
meta.DirRoot = "FTP"
meta.Province = "BJ"
meta.DeviceCode = "0001"
meta.Index = "001"
meta.Time = time.Now().Format("20060102150405")
meta.Compress = false
content := "this a test file"
f, err := GenFile(&meta, []byte(content))
fmt.Println(f, err)
}

View File

@@ -1,10 +1,12 @@
package core
package heartbeat
import (
"github.com/aceld/zinx/ziface"
"github.com/aceld/zinx/zlog"
)
var HeadBeatMsgID uint32 = 255
// MyHeartBeatMsg 用户自定义的心跳检测消息处理方法
func MyHeartBeatMsg(conn ziface.IConnection) []byte {
return []byte("heartbeat, I am server, I am alive")

View File

@@ -0,0 +1,42 @@
package interceptor
import (
"encoding/hex"
"fmt"
"omc/core/parse"
"time"
"github.com/aceld/zinx/ziface"
)
type GlobalInterceptor struct{}
func (m *GlobalInterceptor) Intercept(chain ziface.IChain) ziface.IcResp {
request := chain.Request() //从责任链中获取当前拦截器的输入数据
// 这一层是自定义拦截器处理逻辑,这里只是简单打印输入
iRequest := request.(ziface.IRequest) //注意由于Zinx的Request类型这里需要做一下断言转换
fmt.Println("\n\n=========自定义拦截器=====")
body, err := parse.RequestBodyDecode(iRequest, nil)
fmt.Printf("消息ID: %v \n", iRequest.GetMsgID())
fmt.Printf("原始数据: %v \n", body.RawData)
fmt.Printf("原始字符: %v \n", hex.EncodeToString(body.RawData))
fmt.Printf("原始字符: %v \n", string(body.RawData))
fmt.Printf("用户ID: %v \n", body.UID)
fmt.Printf("收到消息: %v %v \n", body.Name, body.Data)
fmt.Printf("错误:%v \n", err)
// return chain.Proceed(chain.Request()) //进入并执行下一个拦截器
iMessage := chain.GetIMessage()
resp := chain.ProceedWithIMessage(iMessage, iRequest)
fmt.Printf("目标消息ID: %v \n", iMessage.GetMsgID())
fmt.Printf("收到消息长度: %v \n", iMessage.GetDataLen())
fmt.Printf("信息时间:%v \n", time.Now().String())
fmt.Print("=========自定义拦截器=====\n\n\n")
return resp
}

51
core/manage/manage.go Normal file
View File

@@ -0,0 +1,51 @@
package manage
import (
"github.com/aceld/zinx/ziface"
"github.com/aceld/zinx/zlog"
)
// OnConnectionAdd 当客户端建立连接的时候的hook函数
func OnConnectionAdd(conn ziface.IConnection) {
//创建一个user
user := NewUser(conn, conn.RemoteAddrString())
//将当前新上线玩家添加到ChannelManager中
m := GetManager(conn.GetName())
if m == nil {
zlog.Ins().ErrorF("server internal error in GetManager")
conn.Stop()
return
}
m.AddUser(user)
//将该连接绑定属性PID
conn.SetProperty("UID", user.UID)
zlog.Ins().InfoF("====> User uID = %s", user.UID, " arrived ====", "")
}
// OnConnectionLost 当客户端断开连接的时候的hook函数
func OnConnectionLost(conn ziface.IConnection) {
//获取当前连接的PID属性
uID, _ := conn.GetProperty("UID")
var userID string
if uID != nil {
userID = uID.(string)
}
//根据pID获取对应usr
m := GetManager(conn.GetName())
if m == nil {
zlog.Ins().ErrorF("server internal error in GetManager")
return
}
user := m.GetUserByPID(userID)
//触发玩家下线业务
if user != nil {
user.LostConnection(m)
}
zlog.Ins().InfoF("====> User %s-%s", user.UID, user.UserName, " left =====")
}

View File

@@ -1,7 +1,8 @@
package core
package manage
import (
"fmt"
"github.com/aceld/zinx/ziface"
"github.com/google/uuid"
)
@@ -46,5 +47,4 @@ func (p *User) SendMsg(msgID uint32, msg []byte) {
return
}
return
}

View File

@@ -1,15 +1,17 @@
package core
package manage
import (
"errors"
"github.com/aceld/zinx/zlog"
"omc/db"
"omc/model"
"omc/omc"
"omc/service"
"omc/core/consts"
"omc/core/db"
"omc/core/parse"
"omc/handle/model"
"omc/handle/service"
"strings"
"sync"
"time"
"github.com/aceld/zinx/zlog"
)
/*
@@ -65,7 +67,7 @@ func (wm *ChannelManager) Talk(msgID uint32, msg []byte) {
//3. 向所有的user发送消息
for _, user := range users {
if user.LoginState && user.AlarmType == omc.MSG {
if user.LoginState && user.AlarmType == consts.MSG {
user.SendMsg(msgID, msg)
}
}
@@ -77,7 +79,7 @@ func (wm *ChannelManager) LoginSuccess(UID, name, tp string) error {
defer wm.pLock.Unlock()
//判断是否重复登录
for _, v := range wm.User {
if v.UserName == name && v.AlarmType == tp && v.LoginState == true {
if v.UserName == name && v.AlarmType == tp && v.LoginState {
return errors.New("repeat login for the same account")
}
}
@@ -147,7 +149,7 @@ func (wm *ChannelManager) GetAllUser() []*User {
//添加切片
for _, v := range wm.User {
if v.LoginState && v.AlarmType == omc.MSG {
if v.LoginState && v.AlarmType == consts.MSG {
User = append(User, v)
}
}
@@ -162,7 +164,7 @@ func (wm *ChannelManager) RealTimeAlarm() {
//查询
var newAlarmSeq = wm.AlarmSeq
var alarms []service.OmcAlarm
neBind, _ := ConvertBindFlag(wm.BindFlag)
neBind, _ := parse.ConvertBindFlag(wm.BindFlag)
if wm.AlarmSeq == 0 {
newAlarmSeq = service.GetLastAlarmSeq(neBind.NeType, neBind.NeId)
@@ -180,7 +182,7 @@ func (wm *ChannelManager) RealTimeAlarm() {
}
var users []string
for _, user := range wm.User {
if user.LoginState && user.AlarmType == omc.MSG {
if user.LoginState && user.AlarmType == consts.MSG {
userInfo := strings.Join([]string{user.UserName, user.RemoteIp}, ";")
users = append(users, userInfo)
}
@@ -220,7 +222,7 @@ func (wm *ChannelManager) SendAlarm(alarms []service.OmcAlarm) error {
//生产告警内容
data := service.GenAlarm(v)
//发送告警内容
wm.Talk(omc.RealTimeAlarm, data)
wm.Talk(0, data)
}
return nil
}

9
core/model/body.go Normal file
View File

@@ -0,0 +1,9 @@
package model
// 请求数据
type Body struct {
UID string // 连接实例ID
RawData []byte // 原始数据
Name string // 请求名
Data map[string]string // 数据Key
}

110
core/parse/parse.go Normal file
View File

@@ -0,0 +1,110 @@
package parse
import (
"errors"
"fmt"
"omc/core/model"
"strings"
"github.com/aceld/zinx/ziface"
)
// 网元类型#网元标记
type NeBind struct {
NeType string
NeId string
}
// 转换解析服务端绑定的网元
func ConvertBindFlag(bindFlag string) (NeBind, error) {
var neBind NeBind
nb := strings.Split(bindFlag, "#")
if len(nb) != 2 {
return neBind, errors.New("ne bind flag invalid")
}
neBind.NeType = nb[0]
neBind.NeId = nb[1]
return neBind, nil
}
// RequestBodyDecode 请求消息解析
// checker 检查参数必传
func RequestBodyDecode(request ziface.IRequest, checker []string) (model.Body, error) {
// 消息处理
body := model.Body{}
err := Decode(request.GetData(), &body)
if err != nil {
return body, errors.New("inlaid message body")
}
// 检查key
if len(checker) > 0 {
for _, v := range checker {
if _, ok := body.Data[v]; !ok {
return body, errors.New("missing parameter of message body : " + v)
}
}
}
// 当前连接实例ID
uID, err := request.GetConnection().GetProperty("UID")
if err != nil {
request.GetConnection().Stop()
return body, errors.New("server internal error")
}
body.UID = uID.(string)
return body, nil
}
// Decode 数据解析
// reqLoginAlarm;user=yiy;key=qw#$@;type=msg
func Decode(data []byte, body *model.Body) error {
body.RawData = data
multi := strings.Split(string(data), ";")
if len(multi) < 1 {
return errors.New("invalid msg body")
}
// 获取函数名
if multi[0] != "" {
name := multi[0]
idx := strings.LastIndex(name, "\x14")
if idx == -1 {
idx = strings.LastIndex(name, "\x00")
}
if idx > 0 {
name = name[idx+1:]
name = strings.Replace(name, "\"", "", 1)
name = strings.Replace(name, "'", "", 1)
name = strings.Replace(name, "#", "", 1)
}
body.Name = name
}
// 解析data KEY
body.Data = make(map[string]string)
for i := 1; i < len(multi); i++ {
m := strings.Split(multi[i], "=")
if len(m) != 2 {
return errors.New("invalid msg body")
}
body.Data[m[0]] = m[1]
}
return nil
}
// Pack 数据压缩
func Pack(name string, data map[string]string) []byte {
var multi []string
multi = append(multi, name)
for i, v := range data {
item := fmt.Sprintf("%s=%s", i, v)
multi = append(multi, item)
}
raw := strings.Join(multi, ";")
return []byte(raw)
}

34
core/result.go Normal file
View File

@@ -0,0 +1,34 @@
package core
import (
"omc/core/parse"
)
// Result
// 配合 request.GetConnection().SendMsg()
func Result(name string, data map[string]string) []byte {
return parse.Pack(name, data)
}
// ResultError ackLoginAlarm;result=fail;resDesc=username-error
// request.GetConnection().SendMsg(omc.AckSyncAlarmMsg, core.ResultError("ackSyncAlarmMsg", err.Error(), ""))
func ResultError(name string, desc, reqID string) []byte {
data := map[string]string{
"result": "fail",
"reqId": reqID,
"resDesc": desc,
}
return Result(name, data)
}
// request.GetConnection().SendMsg(omc.AckSyncAlarmMsg, core.ResultSuccess("ackSyncAlarmMsg", err.Error(), ""))
func ResultSuccess(name string, desc, reqID string) []byte {
data := map[string]string{
"result": "succ",
"reqId": reqID,
"resDesc": desc,
}
return Result(name, data)
}

View File

@@ -1,48 +0,0 @@
package core
import (
"errors"
"github.com/aceld/zinx/ziface"
"omc/omc"
"strings"
)
type NeBind struct {
NeType string
NeId string
}
func ConvertBindFlag(bindFlag string) (NeBind, error) {
var neBind NeBind
nb := strings.Split(bindFlag, "#")
if len(nb) != 2 {
return neBind, errors.New("ne bind flag invalid")
}
neBind.NeType = nb[0]
neBind.NeId = nb[1]
return neBind, nil
}
// APIDecode 消息解析
func APIDecode(request ziface.IRequest, checker []string) (*omc.MsgBody, error) {
// 消息处理
msgBody := omc.MsgBody{
RawData: request.GetData(),
Msg: make(map[string]string, 0),
}
if err := msgBody.Decode(); err != nil {
return nil, errors.New("inlaid message body")
}
for _, v := range checker {
if _, ok := msgBody.Msg[v]; !ok {
return nil, errors.New("missing parameter of message body")
}
}
uID, err := request.GetConnection().GetProperty("UID")
if err != nil {
request.GetConnection().Stop()
return nil, errors.New("server internal error")
}
msgBody.UID = uID.(string)
return &msgBody, nil
}

14
core/utils/bcrypt.go Normal file
View File

@@ -0,0 +1,14 @@
package utils
import "golang.org/x/crypto/bcrypt"
// Encrypt 加密明文密码
func Encrypt(password string) (string, error) {
hashedBytes, err := bcrypt.GenerateFromPassword([]byte(password), bcrypt.DefaultCost)
return string(hashedBytes), err
}
// Compare 密文校验
func Compare(hashedPassword, password string) error {
return bcrypt.CompareHashAndPassword([]byte(hashedPassword), []byte(password))
}

25
core/utils/strrand.go Normal file
View File

@@ -0,0 +1,25 @@
package utils
import (
"math/rand"
"time"
)
// 生成编号 SeqNo 0-9A-Z
func SeqNo(length int) string {
items := []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "A", "B", "C", "D", "E", "F"}
// 创建基于时间的随机生成器
source := rand.NewSource(time.Now().UnixNano())
rng := rand.New(source)
// 生成32位长度的字符串值
result := ""
for i := 0; i < length; i++ {
randomIndex := rng.Intn(len(items))
randomItem := items[randomIndex]
result += randomItem
}
return result
}