diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d7f842e --- /dev/null +++ b/.gitignore @@ -0,0 +1,44 @@ +# ---> VisualStudioCode +.vscode/* +!.vscode/settings.json +!.vscode/tasks.json +!.vscode/launch.json +!.vscode/extensions.json +!.vscode/*.code-snippets + +# Local History for Visual Studio Code +.history/ + +# Run temp file and dir +crontask/log/ +crontask/ftp/ +crontask/database/ +crontask/export/ +crontask/temp +crontask/crontask + +restagent/backup/ +restagent/log/ +restagent/upload/ +restagent/software/ +restagent/database/ +restagent/restagent + +sshsvc/sshsvc +sshsvc/mmllog/ +sshsvc/mmlhome/ +sshsvc/log/ + +tools/loadmconf/loadmconf + +vendor + +# Built Visual Studio Code Extensions +*.vsix +*.log +*.log-* +*.bak + +.idea +omc_log +data diff --git a/api/close.go b/api/close.go new file mode 100644 index 0000000..f007fd5 --- /dev/null +++ b/api/close.go @@ -0,0 +1,15 @@ +package api + +import ( + "github.com/aceld/zinx/ziface" + "github.com/aceld/zinx/znet" +) + +// CloseApi 关闭连接API +type CloseApi struct { + znet.BaseRouter +} + +func (*CloseApi) Handle(request ziface.IRequest) { + request.GetConnection().Stop() +} diff --git a/api/heart_beat.go b/api/heart_beat.go new file mode 100644 index 0000000..484bf77 --- /dev/null +++ b/api/heart_beat.go @@ -0,0 +1,44 @@ +package api + +import ( + "omc/omc" + + "github.com/aceld/zinx/ziface" + "github.com/aceld/zinx/zlog" + "github.com/aceld/zinx/znet" +) + +// HeartBeatApi 心跳请求 +type HeartBeatApi struct { + znet.BaseRouter +} + +func (*HeartBeatApi) Handle(request ziface.IRequest) { + // 解包 + msgBody := omc.MsgBody{ + RawData: request.GetData(), + Msg: make(map[string]string, 0), + } + if err := msgBody.Decode(); err != nil { + zlog.Ins().ErrorF("inlaid message body %s", err.Error()) + request.GetConnection().SendMsg(omc.AckHeartBeat, omc.ErrorMsg("ackHeartBeat", "", "inlaid message body")) + return + } + + reqId, ok := msgBody.Msg["reqId"] + if !ok { + zlog.Ins().ErrorF("missing parameter of message body") + request.GetConnection().SendMsg(omc.AckHeartBeat, omc.ErrorMsg("ackHeartBeat", "", "missing parameter of message body")) + return + } + + //ack + ackBody := omc.MsgBody{ + MsgName: "ackHeartBeat", + Msg: make(map[string]string, 0), + } + ackBody.Msg["reqId"] = reqId + ackBody.Keys = append(ackBody.Keys, "reqId") + ackBody.Pack() + request.GetConnection().SendMsg(omc.AckHeartBeat, ackBody.RawData) +} diff --git a/api/login.go b/api/login.go new file mode 100644 index 0000000..01ba9f9 --- /dev/null +++ b/api/login.go @@ -0,0 +1,161 @@ +package api + +import ( + "encoding/hex" + "github.com/aceld/zinx/ziface" + "github.com/aceld/zinx/zlog" + "github.com/aceld/zinx/znet" + "github.com/google/uuid" + "omc/core" + "omc/omc" + "omc/service" + "strings" +) + +// LoginApi 登录API +type LoginApi struct { + znet.BaseRouter +} + +// Handle Login reqLoginAlarm;user=yiy;key=qw#$@;type=msg +func (*LoginApi) Handle(request ziface.IRequest) { + // 登录消息处理 + msgBody := omc.MsgBody{ + RawData: request.GetData(), + Msg: make(map[string]string, 0), + } + if err := msgBody.Decode(); err != nil { + zlog.Ins().ErrorF("inlaid message body %s", err.Error()) + request.GetConnection().SendMsg(omc.AckLoginAlarm, omc.ErrorMsg("ackLoginAlarm", "", "inlaid message body")) + return + } + + user, userOK := msgBody.Msg["user"] + pw, pwOK := msgBody.Msg["key"] + tp, tpOK := msgBody.Msg["type"] + if !userOK || !pwOK || !tpOK { + zlog.Ins().ErrorF("missing parameter of message body") + request.GetConnection().SendMsg(omc.AckLoginAlarm, omc.ErrorMsg("ackLoginAlarm", "", "missing parameter of message body")) + return + } + m := core.GetManager(request.GetConnection().GetName()) + if m == nil { + zlog.Ins().ErrorF("server internal error") + request.GetConnection().SendMsg(omc.AckLoginAlarm, omc.ErrorMsg("ackLoginAlarm", "", "server internal error")) + return + } + uID, err := request.GetConnection().GetProperty("UID") + if err != nil { + zlog.Ins().ErrorF("GetProperty UID error %s", err) + request.GetConnection().Stop() + return + } + + //登录信息check + if err := service.UserLogin(user, pw); err != nil { + zlog.Ins().ErrorF("LoginFail %s", err) + request.GetConnection().SendMsg(omc.AckLoginAlarm, omc.ErrorMsg("ackLoginAlarm", "", "Incorrect username and password")) + isClose, _ := m.LoginFail(uID.(string)) //登录错误超过3次,断开连接 + if isClose { + request.GetConnection().Stop() + return + } + return + } + + //manager 更新 + if err := m.LoginSuccess(uID.(string), user, tp); err != nil { + zlog.Ins().ErrorF("manager:%s", err) + request.GetConnection().SendMsg(omc.AckLoginAlarm, omc.ErrorMsg("ackLoginAlarm", "", err.Error())) + return + } + zlog.Ins().InfoF("user login loginSuccess,username:%s, type:%s, channel:%s", user, tp, request.GetConnection().GetName()) + request.GetConnection().SendMsg(omc.AckLoginAlarm, omc.SuccessMsg("ackLoginAlarm", "", "")) +} + +// CMCALoginSeq 登录API + +type CMCALoginSeq struct { + znet.BaseRouter +} + +//reqCMCALoginSeq + +func (*CMCALoginSeq) Handle(request ziface.IRequest) { + uid := uuid.New() + seqNo := hex.EncodeToString(uid[0:]) + seqNo = strings.ToUpper(seqNo) + //发送文件同步信息 + ackBody := omc.MsgBody{ + MsgName: "ackCMCALoginSeq", + Msg: make(map[string]string, 0), + } + ackBody.Msg["seqNo"] = seqNo + ackBody.Pack() + + request.GetConnection().SendMsg(omc.AckCMCALoginSeq, ackBody.RawData) +} + +// + +type CMCALoginAlarm struct { + znet.BaseRouter +} + +//reqCMCALoginSeq +//reqCMCALoginAlarm;user=yiy;key=12313121213123;cert=AAAAAAAAAA;type=msg + +func (*CMCALoginAlarm) Handle(request ziface.IRequest) { + // 登录消息处理 + msgBody := omc.MsgBody{ + RawData: request.GetData(), + Msg: make(map[string]string, 0), + } + if err := msgBody.Decode(); err != nil { + zlog.Ins().ErrorF("inlaid message body %s", err.Error()) + request.GetConnection().SendMsg(omc.AckLoginAlarm, omc.ErrorMsg("ackLoginAlarm", "", "inlaid message body")) + return + } + + user, userOK := msgBody.Msg["user"] + pw, pwOK := msgBody.Msg["key"] + tp, tpOK := msgBody.Msg["type"] + if !userOK || !pwOK || !tpOK { + zlog.Ins().ErrorF("missing parameter of message body") + request.GetConnection().SendMsg(omc.AckLoginAlarm, omc.ErrorMsg("ackLoginAlarm", "", "missing parameter of message body")) + return + } + m := core.GetManager(request.GetConnection().GetName()) + if m == nil { + zlog.Ins().ErrorF("server internal error") + request.GetConnection().SendMsg(omc.AckLoginAlarm, omc.ErrorMsg("ackLoginAlarm", "", "server internal error")) + return + } + uID, err := request.GetConnection().GetProperty("UID") + if err != nil { + zlog.Ins().ErrorF("GetProperty UID error %s", err) + request.GetConnection().Stop() + return + } + + //登录信息check + if err := service.UserLogin(user, pw); err != nil { + zlog.Ins().ErrorF("LoginFail %s", err) + request.GetConnection().SendMsg(omc.AckLoginAlarm, omc.ErrorMsg("ackLoginAlarm", "", "Incorrect username and password")) + isClose, _ := m.LoginFail(uID.(string)) //登录错误超过3次,断开连接 + if isClose { + request.GetConnection().Stop() + return + } + return + } + + //manager 更新 + if err := m.LoginSuccess(uID.(string), user, tp); err != nil { + zlog.Ins().ErrorF("manager:%s", err) + request.GetConnection().SendMsg(omc.AckLoginAlarm, omc.ErrorMsg("ackLoginAlarm", "", err.Error())) + return + } + zlog.Ins().InfoF("user login loginSuccess,username:%s, type:%s, channel:%s", user, tp, request.GetConnection().GetName()) + request.GetConnection().SendMsg(omc.AckLoginAlarm, omc.SuccessMsg("ackLoginAlarm", "", "")) +} diff --git a/api/req_sync_alarm.go b/api/req_sync_alarm.go new file mode 100644 index 0000000..6454adb --- /dev/null +++ b/api/req_sync_alarm.go @@ -0,0 +1,61 @@ +package api + +import ( + "github.com/aceld/zinx/ziface" + "github.com/aceld/zinx/zlog" + "github.com/aceld/zinx/znet" + "omc/core" + "omc/omc" + "omc/service" + "strconv" +) + +// SyncAlarmApi 消息方式同步告警请求 +type SyncAlarmApi struct { + znet.BaseRouter +} + +func (*SyncAlarmApi) Handle(request ziface.IRequest) { + // 消息处理 + checker := []string{"reqId", "alarmSeq"} + msg, err := core.APIDecode(request, checker) + if err != nil { + zlog.Ins().ErrorF("inlaid message body %s", err.Error()) + request.GetConnection().SendMsg(omc.AckSyncAlarmMsg, omc.ErrorMsg("ackSyncAlarmMsg", "", err.Error())) + return + } + //管理模块 + m := core.GetManager(request.GetConnection().GetName()) + if m == nil { + zlog.Ins().ErrorF("server internal error") + request.GetConnection().SendMsg(omc.AckSyncAlarmFile, omc.ErrorMsg("ackSyncAlarmFile", msg.Msg["reqId"], "server internal error")) + return + } + + // 检查用户是否登录 + u := m.GetUserByPID(msg.UID) + if !u.LoginState || u.AlarmType != omc.MSG { + zlog.Ins().ErrorF("no permissions ") + request.GetConnection().SendMsg(omc.AckSyncAlarmMsg, omc.ErrorMsg("ackSyncAlarmMsg", msg.Msg["reqId"], "no permissions")) + return + } + + alarmSeq, err := strconv.Atoi(msg.Msg["alarmSeq"]) + if err != nil || alarmSeq < 1 { + zlog.Ins().ErrorF("invalid parameter of message body") + request.GetConnection().SendMsg(omc.AckSyncAlarmMsg, omc.ErrorMsg("ackSyncAlarmMsg", msg.Msg["reqId"], "invalid parameter of message body")) + return + } + + //check alarmSeq 是否存在 + neBind, _ := core.ConvertBindFlag(m.BindFlag) + alarms, _ := service.GetRealTimeAlarm(neBind.NeType, neBind.NeId, int32(alarmSeq)) + if len(alarms) == 0 { + request.GetConnection().SendMsg(omc.AckSyncAlarmMsg, omc.ErrorMsg("ackSyncAlarmMsg", msg.Msg["reqId"], "alarm seq does not exist")) + return + } + //更新实时上报的alarm seq + m.UpdateAlarmSeq(int32(alarmSeq)) + request.GetConnection().SendMsg(omc.AckSyncAlarmMsg, omc.SuccessMsg("ackSyncAlarmMsg", msg.Msg["reqId"], "")) + +} diff --git a/api/req_sync_alarm_file.go b/api/req_sync_alarm_file.go new file mode 100644 index 0000000..bb54b62 --- /dev/null +++ b/api/req_sync_alarm_file.go @@ -0,0 +1,91 @@ +package api + +import ( + "fmt" + "github.com/aceld/zinx/ziface" + "github.com/aceld/zinx/zlog" + "github.com/aceld/zinx/znet" + "omc/conf" + "omc/core" + "omc/lib" + "omc/omc" + "omc/service" + "strconv" + "time" +) + +// SyncAlarmFileApi 文件方式同步告警请求 +type SyncAlarmFileApi struct { + znet.BaseRouter +} + +// Handle +//reqSyncAlarmFile;reqId=33;startTime=2014-11-27 10:00:00;endTime=2014-11-27 10:30:00; syncSource =0 +func (*SyncAlarmFileApi) Handle(request ziface.IRequest) { + // 消息处理 + checker := []string{"reqId", "syncSource"} + msg, err := core.APIDecode(request, checker) + if err != nil { + zlog.Ins().ErrorF("inlaid message body %s", err.Error()) + request.GetConnection().SendMsg(omc.AckSyncAlarmFile, omc.ErrorMsg("ackSyncAlarmFile", "", err.Error())) + return + } + //管理模块 + m := core.GetManager(request.GetConnection().GetName()) + if m == nil { + zlog.Ins().ErrorF("server internal error") + request.GetConnection().SendMsg(omc.AckSyncAlarmFile, omc.ErrorMsg("ackSyncAlarmFile", msg.Msg["reqId"], "server internal error")) + return + } + + // 检查用户是否登录 + u := m.GetUserByPID(msg.UID) + if !u.LoginState || u.AlarmType != omc.FILE { + zlog.Ins().ErrorF("no permissions ") + request.GetConnection().SendMsg(omc.AckSyncAlarmFile, omc.ErrorMsg("ackSyncAlarmFile", msg.Msg["reqId"], "no permissions")) + return + } + //查询需要上报的告警信息 + start := "" + end := "" + syncSource := "" + alarmSeq := 0 + fmt.Println("msg.Msg:", msg.Msg) + //map[alarmSeq:1 reqId:35 syncSource:1] + // map[endTime:2023-07-15 23:59:59 reqId:34 startTime:2023-01-08 16:07:00 syncSource:0] + if v, ok := msg.Msg["startTime"]; ok { + start = v + } + if v, ok := msg.Msg["endTime"]; ok { + end = v + } + if v, ok := msg.Msg["syncSource"]; ok { + syncSource = v + } + if v, ok := msg.Msg["alarmSeq"]; ok { + if seq, err := strconv.Atoi(v); err == nil { + alarmSeq = seq + } + } + neBind, _ := core.ConvertBindFlag(m.BindFlag) + alarms, err := service.GetAlarm(neBind.NeType, neBind.NeId, start, end, syncSource, alarmSeq) + if err != nil || len(alarms) == 0 { + + request.GetConnection().SendMsg(omc.AckSyncAlarmFile, omc.ErrorMsg("ackSyncAlarmFile", msg.Msg["reqId"], "not find record")) + return + } + //ack + request.GetConnection().SendMsg(omc.AckSyncAlarmFile, omc.SuccessMsg("ackSyncAlarmFile", msg.Msg["reqId"], "")) + + //打包结果文件 + //打包生成文件 + var meta lib.FileMeta + meta.DirRoot = conf.OmcConf.FTPRoot + meta.Province = m.Province + meta.DeviceCode = m.DeviceCode + meta.Index = "001" + meta.Time = time.Now().Format("20060102150405") + meta.Compress = false + go service.GenFile(request, &meta, alarms) + +} diff --git a/asdf b/asdf deleted file mode 100644 index f50ea83..0000000 --- a/asdf +++ /dev/null @@ -1 +0,0 @@ -vasd \ No newline at end of file diff --git a/client_robot.go b/client_robot.go new file mode 100644 index 0000000..f2f886d --- /dev/null +++ b/client_robot.go @@ -0,0 +1,219 @@ +package main + +import ( + "bytes" + "encoding/binary" + "encoding/hex" + "errors" + "fmt" + "github.com/aceld/zinx/zconf" + "github.com/aceld/zinx/zlog" + "gorm.io/driver/mysql" + "gorm.io/gorm" + "io" + "net" + "omc/model" + "omc/omc" + "time" +) + +type Message struct { + StartSign uint16 + MsgType uint8 + TimeStamp uint32 + LenOfBody uint16 + Value []byte +} + +type TcpClient struct { + conn net.Conn + PID int32 + isOnline chan bool +} + +func (this *TcpClient) Unpack(headdata []byte) (head *Message, err error) { + fmt.Println("unpack:", hex.EncodeToString(headdata)) + headBuf := bytes.NewReader(headdata) + + head = &Message{} + + // Read the startSign + if err := binary.Read(headBuf, binary.BigEndian, &head.StartSign); err != nil { + return nil, err + } + + // Read the msgID + if err := binary.Read(headBuf, binary.BigEndian, &head.MsgType); err != nil { + return nil, err + } + + // read timeStamp + if err := binary.Read(headBuf, binary.BigEndian, &head.TimeStamp); err != nil { + return nil, err + } + + // Read the data length + if err := binary.Read(headBuf, binary.BigEndian, &head.LenOfBody); err != nil { + return nil, err + } + + // Check whether the data length exceeds the maximum allowed packet size + // (判断dataLen的长度是否超出我们允许的最大包长度) + if zconf.GlobalObject.MaxPacketSize > 0 && uint32(head.LenOfBody) > zconf.GlobalObject.MaxPacketSize { + return nil, errors.New("too large msg data received") + } + + return head, nil +} + +func (this *TcpClient) Pack(msgID uint32, dataBytes []byte) (out []byte, err error) { + outbuff := bytes.NewBuffer([]byte{}) + + // Write the oxffff + if err := binary.Write(outbuff, binary.BigEndian, uint16(0xffff)); err != nil { + return nil, err + } + //Write the type + if err := binary.Write(outbuff, binary.BigEndian, uint8(msgID)); err != nil { + return nil, err + } + + //Write the timestamp + if err := binary.Write(outbuff, binary.BigEndian, uint32(0x999999)); err != nil { + return nil, err + } + + //Write the length + if err := binary.Write(outbuff, binary.BigEndian, uint16(len(dataBytes))); err != nil { + return nil, err + } + + // Write the data + if err := binary.Write(outbuff, binary.BigEndian, dataBytes); err != nil { + return nil, err + } + + out = outbuff.Bytes() + return +} + +func (this *TcpClient) SendMsg(msgID uint32, data []byte) { + sendData, err := this.Pack(msgID, data) + if err == nil { + _, err := this.conn.Write(sendData) + fmt.Println("send msg:", hex.EncodeToString(sendData), " err:", err) + } else { + fmt.Println(err) + } + return +} + +func (this *TcpClient) Receive() { + for { + //读取服务端发来的数据 ==》 SyncPID + //1.读取8字节 + ////第一次读取,读取数据头 + headData := make([]byte, 9) + if _, err := io.ReadFull(this.conn, headData); err != nil { + fmt.Println(err) + return + } + pkgHead, err := this.Unpack(headData) + if err != nil { + fmt.Println("Unpack", err) + return + } + //data + if pkgHead.LenOfBody > 0 { + pkgHead.Value = make([]byte, pkgHead.LenOfBody) + if _, err := io.ReadFull(this.conn, pkgHead.Value); err != nil { + return + } + } + + //处理服务器回执业务 + fmt.Println("=================================") + fmt.Println("StartSign:", pkgHead.StartSign) + fmt.Println("MsgType:", pkgHead.MsgType) + fmt.Println("TimeStamp:", pkgHead.TimeStamp) + fmt.Println("LenOfBody:", pkgHead.LenOfBody) + fmt.Println("Value:", string(pkgHead.Value)) + fmt.Println("=================================") + time.Sleep(10 * time.Microsecond) + } +} + +func (this *TcpClient) Start() { + //登录 + data := "reqLoginAlarm;user=omc;key=omc@password;type=ftp" + this.SendMsg(0x01, []byte(data)) + + //发送同步告警信息 + //data = "reqSyncAlarmMsg;reqId=33;alarmSeq=1" + //this.SendMsg(omc.ReqSyncAlarmMsg, []byte(data)) + + //发送文件同步告警 + //data = "reqSyncAlarmFile;reqId=35;alarmSeq=2000;syncSource=1" + //data = "reqSyncAlarmFile;reqId=33;startTime=2023-01-08 00:00:00;syncSource=0" + data = "reqSyncAlarmFile;reqId=34;startTime=2023-01-08 16:07:00;endTime=2023-07-19 23:59:59;syncSource=1" + this.SendMsg(omc.ReqSyncAlarmFile, []byte(data)) + go this.Receive() + +} + +func DataMock() { + conf := "root:1000omc@kp!@tcp(192.168.0.229:33066)/omc_db?charset=utf8mb4&parseTime=True&loc=Local" + d, err := gorm.Open(mysql.Open(conf), &gorm.Config{}) + if err != nil { + zlog.Ins().ErrorF("open mysql %s error, ", conf, err) + panic(err) + } + + var alarms []model.Alarm + if err := d.Model(&model.Alarm{}).Where("ne_type = ? and ne_id = ? and alarm_seq >= ?", "SMF", "SZ_01", 0).Order("alarm_seq asc").Limit(5).Find(&alarms).Error; err != nil { + zlog.Ins().ErrorF("open mysql %s error, ", conf, err) + return + } + var alarm model.Alarm + if err := d.Model(&model.Alarm{}).Where("ne_type = ? and ne_id = ?", "SMF", "SZ_01").Order("alarm_seq desc").First(&alarm).Error; err != nil { + zlog.Ins().ErrorF("db error %s", err) + return + } + index := alarm.AlarmSeq + 1 + for { + for _, v := range alarms { + v.AlarmSeq = index + v.Id = 0 + d.Create(&v) + index++ + } + time.Sleep(10 * time.Microsecond) + } + +} + +func NewTcpClient(ip string, port int) *TcpClient { + addrStr := fmt.Sprintf("%s:%d", ip, port) + conn, err := net.Dial("tcp", addrStr) + if err != nil { + fmt.Println("net.Dial err: ", err) + panic(err) + } + + client := &TcpClient{ + conn: conn, + PID: 0, + isOnline: make(chan bool), + } + + fmt.Println(fmt.Sprintf("conn: %+v. Connected to server...", conn)) + + return client +} + +func main() { + client := NewTcpClient("127.0.0.1", 31232) + client.Start() + //DataMock() + select {} +} diff --git a/conf/global.go b/conf/global.go new file mode 100644 index 0000000..038e142 --- /dev/null +++ b/conf/global.go @@ -0,0 +1,75 @@ +package conf + +import ( + "encoding/json" + "errors" + "fmt" + "github.com/aceld/zinx/zconf" + "github.com/aceld/zinx/zlog" + "io/ioutil" + "os" +) + +type Config struct { + /* + Server + */ + Mysql string `json:"mysql"` + FTPRoot string `json:"ftp_root"` + Channel []struct { + TCPPort int `json:"tcp_port"` //当前通道的TCP监听端口 + BindFlag string `json:"bind_flag"` //当前通道bind 的网元信息 + Province string `json:"province"` //网元所在省份 + DeviceCode string `json:"device_code"` //主机编码 四位,每1位可用0-9、A-Z编码 + } `json:"channel"` + + //以下是zinx 的配置 + Name string `json:"name"` + MaxConn int `json:"max_conn"` + WorkerPoolSize int `json:"worker_pool_size"` + HeartbeatMax int `json:"heartbeat_max"` + LogDir string `json:"log_dir"` + LogFile string `json:"log_file"` +} + +var OmcConf Config + +const ConfPath = "/conf/nbi_alarm_agent.json" + +func Reload(path string) error { + if confFileExists, _ := zconf.PathExists(path); confFileExists != true { + zlog.Ins().ErrorF("Config File %s is not exist!!", path) + return errors.New("config file is not exist") + } + + data, err := ioutil.ReadFile(path) + if err != nil { + return err + } + + err = json.Unmarshal(data, &OmcConf) + if err != nil { + return err + } + return nil +} + +func Init(path string) { + fmt.Println("filePath:", path) + if path == "" { + pwd, err := os.Getwd() + if err != nil { + pwd = "." + } + path = pwd + ConfPath + fmt.Println("path", path) + } + + if err := Reload(path); err != nil { + zlog.Ins().ErrorF("Config File %s error, ", err) + + panic(err) + } +} + +//"mysql": "root:1000omc@kp!@tcp(192.168.2.119:33066)/omc_db?charset=utf8mb4&parseTime=True&loc=Local", diff --git a/conf/nbi_alarm_agent.json b/conf/nbi_alarm_agent.json new file mode 100644 index 0000000..b74adfb --- /dev/null +++ b/conf/nbi_alarm_agent.json @@ -0,0 +1,22 @@ +{ + "channel": [ + { + "tcp_port": 31232, + "bind_flag": "SMF#SZ_01", + "province": "BJ", + "device_code": "0001" + }, + { + "tcp_port": 31233, + "bind_flag": "UDM#SZ_03", + "province": "BJ", + "device_code": "0002" + } + ], + "mysql": "root:1000omc@kp!@tcp(192.168.0.229:33066)/omc_db?charset=utf8mb4&parseTime=True&loc=Local", + "ftp_root": "data/ftp", + "mame":"nbi north alarm agent", + "heartbeat_max": 180, + "log_dir": "./nbi_alarm", + "log_file":"nbi_alarm.log" +} diff --git a/core/heart_beat.go b/core/heart_beat.go new file mode 100644 index 0000000..5ac2967 --- /dev/null +++ b/core/heart_beat.go @@ -0,0 +1,23 @@ +package core + +import ( + "github.com/aceld/zinx/ziface" + "github.com/aceld/zinx/zlog" +) + +// MyHeartBeatMsg 用户自定义的心跳检测消息处理方法 +func MyHeartBeatMsg(conn ziface.IConnection) []byte { + return []byte("heartbeat, I am server, I am alive") +} + +// MyHeartBeat 用户自定义心跳发送函数 +func MyHeartBeat(conn ziface.IConnection) error { + return nil +} + +// MyOnRemoteNotAlive 用户自定义的远程连接不存活时的处理方法 +func MyOnRemoteNotAlive(conn ziface.IConnection) { + zlog.Ins().ErrorF("myOnRemoteNotAlive is Called, connID=%v", conn.GetConnID(), "remoteAddr =%v ", conn.RemoteAddr()) + //关闭链接 + conn.Stop() +} diff --git a/core/user.go b/core/user.go new file mode 100644 index 0000000..cc62852 --- /dev/null +++ b/core/user.go @@ -0,0 +1,50 @@ +package core + +import ( + "fmt" + "github.com/aceld/zinx/ziface" + "github.com/google/uuid" +) + +// User User 对象 +type User struct { + UID string //ID + Conn ziface.IConnection //当前User的连接 + LoginState bool + LoginCount int + AlarmType string + UserName string + SeqNo string // 登录随机码 + RemoteIp string +} + +// NewUser 创建一个user对象 +func NewUser(conn ziface.IConnection, addr string) *User { + p := &User{ + UID: uuid.NewString(), + Conn: conn, + RemoteIp: addr, + } + return p +} + +// LostConnection User下线 +func (p *User) LostConnection(m *ChannelManager) { + m.RemoveUserByPID(p.UID) +} + +// SendMsg /* +func (p *User) SendMsg(msgID uint32, msg []byte) { + if p.Conn == nil { + fmt.Println("connection in player is nil") + return + } + + //调用SendMsg发包 + if err := p.Conn.SendMsg(msgID, msg); err != nil { + fmt.Println("Player SendMsg error !") + return + } + + return +} diff --git a/core/user_manager.go b/core/user_manager.go new file mode 100644 index 0000000..3504bfb --- /dev/null +++ b/core/user_manager.go @@ -0,0 +1,232 @@ +package core + +import ( + "errors" + "github.com/aceld/zinx/zlog" + "omc/db" + "omc/model" + "omc/omc" + "omc/service" + "strings" + "sync" + "time" +) + +/* +ChannelManager 通道管理管理模块 +*/ +type ChannelManager struct { + Name string + User map[string]*User //当前在线的User集合 + BindFlag string //bind 的网元标识,ne_type#ne_id 格式 + Province string //网元所在省份 + DeviceCode string //主机编码 四位,每1位可用0-9、A-Z编码 + AlarmSeq int32 //当前告警的序列号 + pLock sync.RWMutex //保护User的互斥读写机制 +} + +// MgrObj 多个通道管理模块集合 +var MgrObj []*ChannelManager + +// NewManager New 提供ChannelManager 初始化 方法 +func NewManager(name, bind, province, deviceCode string) *ChannelManager { + newManager := ChannelManager{ + Name: name, + BindFlag: bind, + Province: province, + DeviceCode: deviceCode, + User: make(map[string]*User), + } + MgrObj = append(MgrObj, &newManager) + return &newManager +} + +func GetManager(name string) *ChannelManager { + for i := range MgrObj { + if MgrObj[i].Name == name { + return MgrObj[i] + } + } + return nil +} + +// AddUser 提供添加一个user的功能 +func (wm *ChannelManager) AddUser(User *User) { + //将User添加到 世界管理器中 + wm.pLock.Lock() + wm.User[User.UID] = User + wm.pLock.Unlock() +} + +// Talk User广播消息 +func (wm *ChannelManager) Talk(msgID uint32, msg []byte) { + //2. 得到所有的在线user + users := wm.GetAllUser() + + //3. 向所有的user发送消息 + for _, user := range users { + if user.LoginState && user.AlarmType == omc.MSG { + user.SendMsg(msgID, msg) + } + } +} + +// LoginSuccess 登录成功管理 +func (wm *ChannelManager) LoginSuccess(UID, name, tp string) error { + wm.pLock.Lock() + defer wm.pLock.Unlock() + //判断是否重复登录 + for _, v := range wm.User { + if v.UserName == name && v.AlarmType == tp && v.LoginState == true { + return errors.New("repeat login for the same account") + } + } + user, ok := wm.User[UID] + if !ok { + return errors.New("server internal error") + } + user.UserName = name + user.AlarmType = tp + user.LoginState = true + wm.User[UID] = user + return nil +} + +// SetSeqNo 设置登录随机码 +func (wm *ChannelManager) SetSeqNo(UID, seqNo string) error { + wm.pLock.Lock() + defer wm.pLock.Unlock() + //判断是否重复登录 + user, ok := wm.User[UID] + if !ok { + return errors.New("server internal error") + } + user.SeqNo = seqNo + return nil +} + +// LoginFail 登录失败管理 +func (wm *ChannelManager) LoginFail(UID string) (bool, error) { + wm.pLock.Lock() + defer wm.pLock.Unlock() + //判断是否重复登录 + user, ok := wm.User[UID] + if !ok { + return true, errors.New("server internal error") + } + user.LoginCount++ + wm.User[UID] = user + if user.LoginCount >= 3 { + return true, errors.New("too many attempts, close connect") + } + return false, nil +} + +// RemoveUserByPID 从信息表中移除一个user +func (wm *ChannelManager) RemoveUserByPID(uid string) { + wm.pLock.Lock() + delete(wm.User, uid) + wm.pLock.Unlock() +} + +// GetUserByPID 通过user ID 获取对应user信息 +func (wm *ChannelManager) GetUserByPID(uID string) *User { + wm.pLock.RLock() + defer wm.pLock.RUnlock() + + return wm.User[uID] +} + +// GetAllUser 获取所有user的信息 +func (wm *ChannelManager) GetAllUser() []*User { + wm.pLock.RLock() + defer wm.pLock.RUnlock() + + //创建返回的User集合切片 + User := make([]*User, 0) + + //添加切片 + for _, v := range wm.User { + if v.LoginState && v.AlarmType == omc.MSG { + User = append(User, v) + } + } + + //返回 + return User +} + +func (wm *ChannelManager) RealTimeAlarm() { + for { + wm.pLock.RLock() + //查询 + var newAlarmSeq = wm.AlarmSeq + var alarms []service.OmcAlarm + neBind, _ := ConvertBindFlag(wm.BindFlag) + if wm.AlarmSeq == 0 { + newAlarmSeq = service.GetLastAlarmSeq(neBind.NeType, neBind.NeId) + + } else { + ams, err := service.GetRealTimeAlarm(neBind.NeType, neBind.NeId, wm.AlarmSeq) + if err != nil { + zlog.Ins().ErrorF("db error %s", err) + } + alarms = ams + } + //上报实时告警信息 + if len(alarms) > 0 { + go wm.SendAlarm(alarms) + newAlarmSeq = service.MaxAlarm(newAlarmSeq, alarms) + 1 + } + var users []string + for _, user := range wm.User { + if user.LoginState && user.AlarmType == omc.MSG { + userInfo := strings.Join([]string{user.UserName, user.RemoteIp}, ";") + users = append(users, userInfo) + } + } + + //更新AlarmSeq + wm.AlarmSeq = newAlarmSeq + wm.pLock.RUnlock() + + //记录日志到alarm 日志表 + for _, v := range alarms { + for _, u := range users { + var alarmLog model.NbiAlarmLog + ui := strings.Split(u, ";") + if len(ui) == 2 { + alarmLog.OpUser = ui[0] + alarmLog.SrcIp = ui[1] + } + alarmLog.NeType = v.NeType + alarmLog.NeId = v.NeUID + alarmLog.AlarmSeq = int64(v.AlarmSeq) + alarmLog.AlarmId = v.AlarmId + et, _ := time.Parse("2006-01-02 15:04:05", v.EventTime) + alarmLog.EventTime = et + alarmLog.LogTime = time.Now() + alarmLog.AId = v.AId + + db.Client.Create(&alarmLog) + } + } + time.Sleep(3 * time.Second) + } +} + +func (wm *ChannelManager) SendAlarm(alarms []service.OmcAlarm) error { + for _, v := range alarms { + //生产告警内容 + data := service.GenAlarm(v) + //发送告警内容 + wm.Talk(omc.RealTimeAlarm, data) + } + return nil +} + +func (wm *ChannelManager) UpdateAlarmSeq(newSeq int32) { + wm.pLock.Lock() + wm.AlarmSeq = newSeq + wm.pLock.Unlock() +} diff --git a/core/utils.go b/core/utils.go new file mode 100644 index 0000000..9c436a8 --- /dev/null +++ b/core/utils.go @@ -0,0 +1,48 @@ +package core + +import ( + "errors" + "github.com/aceld/zinx/ziface" + "omc/omc" + "strings" +) + +type NeBind struct { + NeType string + NeId string +} + +func ConvertBindFlag(bindFlag string) (NeBind, error) { + var neBind NeBind + nb := strings.Split(bindFlag, "#") + if len(nb) != 2 { + return neBind, errors.New("ne bind flag invalid") + } + neBind.NeType = nb[0] + neBind.NeId = nb[1] + return neBind, nil +} + +// APIDecode 消息解析 +func APIDecode(request ziface.IRequest, checker []string) (*omc.MsgBody, error) { + // 消息处理 + msgBody := omc.MsgBody{ + RawData: request.GetData(), + Msg: make(map[string]string, 0), + } + if err := msgBody.Decode(); err != nil { + return nil, errors.New("inlaid message body") + } + for _, v := range checker { + if _, ok := msgBody.Msg[v]; !ok { + return nil, errors.New("missing parameter of message body") + } + } + uID, err := request.GetConnection().GetProperty("UID") + if err != nil { + request.GetConnection().Stop() + return nil, errors.New("server internal error") + } + msgBody.UID = uID.(string) + return &msgBody, nil +} diff --git a/db/mysql.go b/db/mysql.go new file mode 100644 index 0000000..b503d8e --- /dev/null +++ b/db/mysql.go @@ -0,0 +1,24 @@ +package db + +import ( + "github.com/aceld/zinx/zlog" + "gorm.io/driver/mysql" + "gorm.io/gorm" + "omc/conf" +) + +var Client *gorm.DB + +func Init() error { + + d, err := gorm.Open(mysql.Open(conf.OmcConf.Mysql), &gorm.Config{}) + if err != nil { + zlog.Ins().ErrorF("open mysql %s error, ", conf.OmcConf.Mysql, err) + panic(err) + } + sqlDB, _ := d.DB() + sqlDB.SetMaxOpenConns(20) + sqlDB.SetMaxIdleConns(10) + Client = d + return nil +} diff --git a/decoder/decode_omc.go b/decoder/decode_omc.go new file mode 100644 index 0000000..2122ed7 --- /dev/null +++ b/decoder/decode_omc.go @@ -0,0 +1,88 @@ +package decoder + +import ( + "bytes" + "encoding/binary" + "encoding/hex" + "github.com/aceld/zinx/ziface" + "github.com/aceld/zinx/zlog" + "math" +) + +//ffff 01 00999999 000a 68656c6c6f2074657374 +// +---------------+---------------+--------------------+----------------+------------------- +// | 开始标志 | type | 秒时间戳 | 长度 | 消息体 +// | 0xffff(2byte) | uint8(1byte) | uint32(4byte) | uint16(2byte) | bytes(N byte) +// +---------------+---------------+--------------------+----------------+------------------- + +const OmcMsgHeaderSize = 9 //表示OMC空包长度 + +type OmcDecoder struct { + StartSign uint16 + MsgType uint8 + TimeStamp uint32 + LenOfBody uint16 + Value []byte +} + +func NewOmcDecoder() ziface.IDecoder { + return &OmcDecoder{} +} + +func (omc *OmcDecoder) GetLengthField() *ziface.LengthField { + return &ziface.LengthField{ + MaxFrameLength: math.MaxUint16 + 9, + LengthFieldOffset: 7, + LengthFieldLength: 2, + LengthAdjustment: 0, + InitialBytesToStrip: 0, + //注意现在默认是大端,使用小端需要指定编码方式 + Order: binary.BigEndian, + } +} + +func (omc *OmcDecoder) decode(data []byte) *OmcDecoder { + ltvData := OmcDecoder{} + ltvData.StartSign = binary.BigEndian.Uint16(data[0:2]) + ltvData.MsgType = data[2] + ltvData.TimeStamp = binary.BigEndian.Uint32(data[3:7]) + ltvData.LenOfBody = binary.BigEndian.Uint16(data[7:9]) + //Determine the length of V. (确定V的长度) + ltvData.Value = make([]byte, ltvData.LenOfBody) + + //5. Get V + binary.Read(bytes.NewBuffer(data[OmcMsgHeaderSize:OmcMsgHeaderSize+ltvData.LenOfBody]), binary.BigEndian, ltvData.Value) + + return <vData +} + +func (omc *OmcDecoder) Intercept(chain ziface.IChain) ziface.IcResp { + //1. Get the IMessage of zinx + iMessage := chain.GetIMessage() + if iMessage == nil { + // Go to the next layer in the chain of responsibility + return chain.ProceedWithIMessage(iMessage, nil) + } + + //2. Get Data + data := iMessage.GetData() + zlog.Ins().DebugF("omc-RawData size:%d data:%s\n", len(data), hex.EncodeToString(data)) + + // (读取的数据不超过包头,直接进入下一层) + if len(data) < OmcMsgHeaderSize { + return chain.ProceedWithIMessage(iMessage, nil) + } + + //4. Decode + ltvData := omc.decode(data) + zlog.Ins().DebugF("omc-decode %v", ltvData) + + // (将解码后的数据重新设置到IMessage中, Zinx的Router需要MsgID来寻址) + iMessage.SetDataLen(uint32(ltvData.LenOfBody)) + iMessage.SetMsgID(uint32(ltvData.MsgType)) + iMessage.SetData(ltvData.Value) + + //6. Pass the decoded data to the next layer. + // (将解码后的数据进入下一层) + return chain.ProceedWithIMessage(iMessage, *ltvData) +} diff --git a/dpack/datapack_omc.go b/dpack/datapack_omc.go new file mode 100644 index 0000000..f71fc09 --- /dev/null +++ b/dpack/datapack_omc.go @@ -0,0 +1,117 @@ +package dpack + +import ( + "bytes" + "encoding/binary" + "errors" + "github.com/aceld/zinx/zlog" + "github.com/aceld/zinx/zpack" + "time" + + "github.com/aceld/zinx/zconf" + "github.com/aceld/zinx/ziface" +) + +// +---------------+---------------+--------------------+----------------+------------------- +// | 开始标志 | type | 秒时间戳 | 长度 | 消息体 +// | 0xffff(2byte) | uint8(1byte) | uint32(4byte) | uint16(2byte) | bytes(N byte) +// +---------------+---------------+--------------------+----------------+------------------- + +var defaultHeaderLen uint32 = 9 + +type DataPack struct{} + +// NewDataPack initializes a packing and unpacking instance +// (封包拆包实例初始化方法) +func NewDataPack() ziface.IDataPack { + return &DataPack{} +} + +// GetHeadLen returns the length of the message header +// (获取包头长度方法) +func (dp *DataPack) GetHeadLen() uint32 { + //ID uint32(4 bytes) + DataLen uint32(4 bytes) + return defaultHeaderLen +} + +// Pack packs the message (compresses the data) +// (封包方法,压缩数据) +func (dp *DataPack) Pack(msg ziface.IMessage) ([]byte, error) { + zlog.Ins().InfoF("my pack: %v", msg) + // Create a buffer to store the bytes + // (创建一个存放bytes字节的缓冲) + dataBuff := bytes.NewBuffer([]byte{}) + + // Write the oxffff + if err := binary.Write(dataBuff, binary.BigEndian, uint16(0xffff)); err != nil { + return nil, err + } + //Write the type + if err := binary.Write(dataBuff, binary.BigEndian, uint8(msg.GetMsgID())); err != nil { + return nil, err + } + + //Write the timestamp + if err := binary.Write(dataBuff, binary.BigEndian, uint32(time.Now().Unix())); err != nil { + return nil, err + } + + //Write the length + if err := binary.Write(dataBuff, binary.BigEndian, uint16(msg.GetDataLen())); err != nil { + return nil, err + } + + // Write the data + if err := binary.Write(dataBuff, binary.BigEndian, msg.GetData()); err != nil { + return nil, err + } + + return dataBuff.Bytes(), nil +} + +// Unpack unpacks the message (decompresses the data) +// (拆包方法,解压数据) +func (dp *DataPack) Unpack(binaryData []byte) (ziface.IMessage, error) { + // Create an ioReader for the input binary data + dataBuff := bytes.NewReader(binaryData) + + // Only unpack the header information to obtain the data length and message ID + // (只解压head的信息,得到dataLen和msgID) + msg := &zpack.Message{} + + // Read the startSign + var startSign uint16 + if err := binary.Read(dataBuff, binary.BigEndian, &startSign); err != nil { + return nil, err + } + + // Read the msgID + var msgType uint + if err := binary.Read(dataBuff, binary.BigEndian, &msgType); err != nil { + return nil, err + } + msg.ID = uint32(msgType) + + // read timeStamp + var timeStamp uint32 + if err := binary.Read(dataBuff, binary.BigEndian, &timeStamp); err != nil { + return nil, err + } + + // Read the data length + var length uint16 + if err := binary.Read(dataBuff, binary.BigEndian, &length); err != nil { + return nil, err + } + msg.DataLen = uint32(length) + + // Check whether the data length exceeds the maximum allowed packet size + // (判断dataLen的长度是否超出我们允许的最大包长度) + if zconf.GlobalObject.MaxPacketSize > 0 && msg.GetDataLen() > zconf.GlobalObject.MaxPacketSize { + return nil, errors.New("too large msg data received") + } + + // Only the header data needs to be unpacked, and then another data read is performed from the connection based on the header length + // (这里只需要把head的数据拆包出来就可以了,然后再通过head的长度,再从conn读取一次数据) + return msg, nil +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..4dfc029 --- /dev/null +++ b/go.mod @@ -0,0 +1,18 @@ +module omc + +go 1.18 + +require ( + github.com/aceld/zinx v1.1.21 + github.com/google/uuid v1.3.0 + golang.org/x/crypto v0.11.0 + gorm.io/driver/mysql v1.5.1 + gorm.io/gorm v1.25.1 +) + +require ( + github.com/go-sql-driver/mysql v1.7.0 // indirect + github.com/gorilla/websocket v1.5.0 // indirect + github.com/jinzhu/inflection v1.0.0 // indirect + github.com/jinzhu/now v1.1.5 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..97e8828 --- /dev/null +++ b/go.sum @@ -0,0 +1,40 @@ +github.com/aceld/zinx v1.1.21 h1:8zoZ+hcEAd7gDsl8xOKPaWPEs9vZDRQOvhjG3vuvAnQ= +github.com/aceld/zinx v1.1.21/go.mod h1:nITkdASGtkLSwNKZ5yj88IpcCHTCFCP6cL12JWms1Fo= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-sql-driver/mysql v1.7.0 h1:ueSltNNllEqE3qcWBTD0iQd3IpL/6U+mJxLkazJ7YPc= +github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= +github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= +github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= +github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +golang.org/x/crypto v0.11.0 h1:6Ewdq3tDic1mg5xRO4milcWCfMVQhI4NkqWWvqejpuA= +golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gorm.io/driver/mysql v1.5.1 h1:WUEH5VF9obL/lTtzjmML/5e6VfFR/788coz2uaVCAZw= +gorm.io/driver/mysql v1.5.1/go.mod h1:Jo3Xu7mMhCyj8dlrb3WoCaRd1FhsVh+yMXb1jUInf5o= +gorm.io/gorm v1.25.1 h1:nsSALe5Pr+cM3V1qwwQ7rOkw+6UeLrX5O4v3llhHa64= +gorm.io/gorm v1.25.1/go.mod h1:L4uxeKpfBml98NYqVqwAdmV1a2nBtAec/cf3fpucW/k= diff --git a/lib/FTP/BJ/HX/RJ/OMC/FM/20230626140419/BJ-FM-OMC-0001-V0-20230626140419-001 b/lib/FTP/BJ/HX/RJ/OMC/FM/20230626140419/BJ-FM-OMC-0001-V0-20230626140419-001 new file mode 100644 index 0000000..b5cc6a8 --- /dev/null +++ b/lib/FTP/BJ/HX/RJ/OMC/FM/20230626140419/BJ-FM-OMC-0001-V0-20230626140419-001 @@ -0,0 +1 @@ +this a test file \ No newline at end of file diff --git a/lib/FTP/BJ/HX/RJ/OMC/FM/20230626140600/BJ-FM-OMC-0001-V0-20230626140600-001.txt b/lib/FTP/BJ/HX/RJ/OMC/FM/20230626140600/BJ-FM-OMC-0001-V0-20230626140600-001.txt new file mode 100644 index 0000000..b5cc6a8 --- /dev/null +++ b/lib/FTP/BJ/HX/RJ/OMC/FM/20230626140600/BJ-FM-OMC-0001-V0-20230626140600-001.txt @@ -0,0 +1 @@ +this a test file \ No newline at end of file diff --git a/lib/file.go b/lib/file.go new file mode 100644 index 0000000..b768861 --- /dev/null +++ b/lib/file.go @@ -0,0 +1,81 @@ +package lib + +import ( + "archive/zip" + "os" + "strings" +) + +//BJ/HX/RJ/OMC/FM/告警文件生成时间 +///FTP根目录/省份简称/专业简称/厂家编码/OMC名称/数据类别/日期或时间/ +//<省份简称>-<数据类别>-<网元类型>[-网元子类]-<主机编号>-<数据版本>-<数据时间>[-登录用户名][-同步请求标识][-Ri][-统计周期] [-序列号].<后缀> +//BJ-FM-OMC-主机编码-v0-告警文件生成时间-001.txt + +type FileMeta struct { + DirRoot string `json:"dir_root"` + Province string `json:"province"` //网元所在省份 + DeviceCode string `json:"device_code"` //主机编码 四位,每1位可用0-9、A-Z编码 + Time string `json:"time"` //文件生成时间 + Index string `json:"index"` //文件标识 + Compress bool `json:"compress"` //文件是否压缩 + ReqId string `json:"req_id"` +} + +// HasDir 判断文件夹是否存在 +func HasDir(path string) (bool, error) { + _, _err := os.Stat(path) + if _err == nil { + return true, nil + } + if os.IsNotExist(_err) { + return false, nil + } + return false, _err +} + +func CreateDir(meta *FileMeta) (string, error) { + dir := strings.Join([]string{meta.DirRoot, meta.Province, "HX", "RJ", "OMC", "FM", meta.Time}, "/") + exist, err := HasDir(dir) + if err != nil { + return "", err + } + if !exist { + err := os.MkdirAll(dir, os.ModePerm) + if err != nil { + return "", err + } + } + return dir, err +} + +func GetName(meta *FileMeta) string { + fileName := strings.Join([]string{meta.Province, "FM", "OMC", meta.DeviceCode, "v0", meta.Time, meta.Index}, "-") + return strings.ToUpper(fileName) +} + +func GenFile(meta *FileMeta, content []byte) (string, error) { + // 创建文件夹 + dir, err := CreateDir(meta) + if err != nil { + return "", err + } + //创建文件 + fileName := dir + "/" + GetName(meta) + ".txt" + err = os.WriteFile(fileName, content, 0666) + if err != nil { + return "", err + } + // 创建一个新的ZIP文件 + fileName = fileName + ".zip" + zipFile, err := os.Create(fileName) + if err != nil { + return "", err + } + defer zipFile.Close() + + // 创建一个ZIP写入器 + zipWriter := zip.NewWriter(zipFile) + defer zipWriter.Close() + + return fileName, nil +} diff --git a/lib/file_test.go b/lib/file_test.go new file mode 100644 index 0000000..a2b9890 --- /dev/null +++ b/lib/file_test.go @@ -0,0 +1,20 @@ +package lib + +import ( + "fmt" + "testing" + "time" +) + +func TestFile(t *testing.T) { + var meta FileMeta + meta.DirRoot = "FTP" + meta.Province = "BJ" + meta.DeviceCode = "0001" + meta.Index = "001" + meta.Time = time.Now().Format("20060102150405") + meta.Compress = false + content := "this a test file" + f, err := GenFile(&meta, []byte(content)) + fmt.Println(f, err) +} diff --git a/lib/password.go b/lib/password.go new file mode 100644 index 0000000..7ebf1eb --- /dev/null +++ b/lib/password.go @@ -0,0 +1,14 @@ +package lib + +import "golang.org/x/crypto/bcrypt" + +// Encrypt 加密明文密码 +func Encrypt(password string) (string, error) { + hashedBytes, err := bcrypt.GenerateFromPassword([]byte(password), bcrypt.DefaultCost) + return string(hashedBytes), err +} + +// Compare 密文校验 +func Compare(hashedPassword, password string) error { + return bcrypt.CompareHashAndPassword([]byte(hashedPassword), []byte(password)) +} diff --git a/model/alarm.go b/model/alarm.go new file mode 100644 index 0000000..e7d8293 --- /dev/null +++ b/model/alarm.go @@ -0,0 +1,51 @@ +package model + +import "time" + +type Alarm struct { + Id int + AlarmSeq int + AlarmId string + NeId string + AlarmCode int + AlarmTitle string + EventTime time.Time + AlarmType string + OrigSeverity string + PVFlag string + NeName string + NeType string + ObjectName string + ObjectUID string + ObjectType string + LocationInfo string + Province string + AlarmStatus int + SpecificProblem string + SpecificProblemID string + AddInfo string + ClearType int + ClearTime time.Time +} + +func (Alarm) TableName() string { + return "alarm" +} + +type NbiAlarmLog struct { + ID int64 + AId int64 + OpUser string + SrcIp string + NeType string + NeId string + AlarmSeq int64 + AlarmId string + AlarmCode int + EventTime time.Time + LogTime time.Time +} + +func (NbiAlarmLog) TableName() string { + return "nbi_alarm_log" +} diff --git a/model/user.go b/model/user.go new file mode 100644 index 0000000..5b02930 --- /dev/null +++ b/model/user.go @@ -0,0 +1,38 @@ +package model + +import "time" + +type User struct { + Id int `json:"id"` + AccountId string `json:"account_id"` + Name string `json:"name"` + RealName string `json:"real_name"` + Sn string `json:"sn"` + Gender string `json:"gender"` + Email string `json:"email"` + IdCardNumber string `json:"id_card_number"` + Description string `json:"description"` + TelephoneNumber string `json:"telephone_number"` + Phone string `json:"phone"` + Mobile string `json:"mobile"` + EmployeeNumber string `json:"employee_number"` + EmployeeType string `json:"employee_type"` + Organize string `json:"organize"` + SupporterCorpName string `json:"supporter_corp_name"` + StartTime time.Time `json:"start_time"` + EndTime time.Time `json:"end_time"` + Password string `json:"password"` + PasswordSha512 string `json:"password_sha512"` + ChangePasswordFlag int `json:"change_password_flag"` + PasswordExpiration string `json:"password_expiration"` + Status string `json:"status"` + UserExpiration string `json:"user_expiration"` + GroupName string `json:"group_name"` + Profile string `json:"profile"` + CreateTime time.Time `json:"create_time"` + UpdateTime time.Time `json:"update_time"` +} + +func (User) TableName() string { + return "user" +} diff --git a/mysql_robot.exe b/mysql_robot.exe new file mode 100644 index 0000000..8a394a3 Binary files /dev/null and b/mysql_robot.exe differ diff --git a/nbi_alarm b/nbi_alarm new file mode 100644 index 0000000..a5295c9 Binary files /dev/null and b/nbi_alarm differ diff --git a/nbi_alarm_agent b/nbi_alarm_agent new file mode 100644 index 0000000..80aca99 Binary files /dev/null and b/nbi_alarm_agent differ diff --git a/nbi_alarm_agent.go b/nbi_alarm_agent.go new file mode 100644 index 0000000..b0916a1 --- /dev/null +++ b/nbi_alarm_agent.go @@ -0,0 +1,134 @@ +package main + +import ( + "fmt" + "github.com/aceld/zinx/zutils/commandline/args" + + "omc/conf" + + api2 "omc/api" + "omc/core" + "omc/db" + "omc/decoder" + "omc/dpack" + "omc/omc" + "os" + "os/signal" + "time" + + "github.com/aceld/zinx/zconf" + "github.com/aceld/zinx/ziface" + "github.com/aceld/zinx/zlog" + "github.com/aceld/zinx/znet" +) + +// OnConnectionAdd 当客户端建立连接的时候的hook函数 +func OnConnectionAdd(conn ziface.IConnection) { + //创建一个user + + user := core.NewUser(conn, conn.RemoteAddrString()) + //将当前新上线玩家添加到ChannelManager中 + m := core.GetManager(conn.GetName()) + if m == nil { + zlog.Ins().ErrorF("server internal error in GetManager") + conn.Stop() + return + } + m.AddUser(user) + + //将该连接绑定属性PID + conn.SetProperty("UID", user.UID) + + zlog.Ins().InfoF("====> User uID = %s", user.UID, " arrived ====", "") +} + +// OnConnectionLost 当客户端断开连接的时候的hook函数 +func OnConnectionLost(conn ziface.IConnection) { + //获取当前连接的PID属性 + uID, _ := conn.GetProperty("UID") + var userID string + if uID != nil { + userID = uID.(string) + } + + //根据pID获取对应usr + m := core.GetManager(conn.GetName()) + if m == nil { + zlog.Ins().ErrorF("server internal error in GetManager") + return + } + user := m.GetUserByPID(userID) + + //触发玩家下线业务 + if user != nil { + user.LostConnection(m) + } + + zlog.Ins().InfoF("====> User %s-%s", user.UID, user.UserName, " left =====") + +} + +func main() { + //配置初始化 + conf.Init(args.Args.ConfigFile) + db.Init() + //创建服务器句柄 + for _, cg := range conf.OmcConf.Channel { + serverName := fmt.Sprintf("omc-tcp-Server-port:%d", cg.TCPPort) + //注册用户管理模块 + m := core.NewManager(serverName, cg.BindFlag, cg.Province, cg.DeviceCode) + + //new 一个TCP服务 + s := znet.NewUserConfServer(&zconf.Config{ + TCPPort: cg.TCPPort, + Name: serverName, + Host: "0.0.0.0", + MaxConn: 3000, + WorkerPoolSize: 10, + HeartbeatMax: conf.OmcConf.HeartbeatMax, + LogDir: conf.OmcConf.LogDir, + LogFile: conf.OmcConf.LogFile, + }) + + //注册客户端连接建立和丢失函数 + s.SetOnConnStart(OnConnectionAdd) + s.SetOnConnStop(OnConnectionLost) + + //注册路由 + s.AddRouter(omc.ReqLoginAlarm, &api2.LoginApi{}) + s.AddRouter(omc.ReqHeartBeat, &api2.HeartBeatApi{}) + s.AddRouter(omc.CloseConnAlarm, &api2.CloseApi{}) + s.AddRouter(omc.ReqSyncAlarmMsg, &api2.SyncAlarmApi{}) + s.AddRouter(omc.ReqSyncAlarmFile, &api2.SyncAlarmFileApi{}) + s.AddRouter(omc.ReqCMCALoginSeq, &api2.SyncAlarmFileApi{}) + s.AddRouter(omc.ReqCMCALoginAlarm, &api2.SyncAlarmFileApi{}) + + //添加LTV数据格式Decoder + s.SetDecoder(decoder.NewOmcDecoder()) + //添加LTV数据格式的Pack封包Encoder + s.SetPacket(dpack.NewDataPack()) + + // (启动心跳检测) + s.StartHeartBeatWithOption(60*time.Second, &ziface.HeartBeatOption{ + MakeMsg: core.MyHeartBeatMsg, + OnRemoteNotAlive: core.MyOnRemoteNotAlive, + Router: &api2.HeartBeatApi{}, + HeadBeatMsgID: uint32(0xFF), + }) + // 设置默认的心跳发送函数 + heart := s.GetHeartBeat() + heart.SetHeartbeatFunc(core.MyHeartBeat) + + //启动服务 + go s.Serve() + + //启动实时告警 + go m.RealTimeAlarm() + } + + // close + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, os.Kill) + sig := <-c + zlog.Ins().InfoF("===exit=== %s", sig) +} diff --git a/omc/msg.go b/omc/msg.go new file mode 100644 index 0000000..50650a5 --- /dev/null +++ b/omc/msg.go @@ -0,0 +1,38 @@ +package omc + +// ErrorMsg ackLoginAlarm;result=fail;resDesc=username-error +func ErrorMsg(msgType string, reqID string, desc string) []byte { + msgBody := MsgBody{ + MsgName: msgType, + Msg: make(map[string]string, 0), + } + if reqID != "" { + msgBody.Msg["reqId"] = reqID + msgBody.Keys = append(msgBody.Keys, "reqId") + } + msgBody.Msg["result"] = "fail" + msgBody.Keys = append(msgBody.Keys, "result") + msgBody.Msg["resDesc"] = desc + msgBody.Keys = append(msgBody.Keys, "resDesc") + msgBody.Pack() + return msgBody.RawData +} + +func SuccessMsg(msgType string, reqID string, desc string) []byte { + msgBody := MsgBody{ + MsgName: msgType, + Msg: make(map[string]string, 0), + } + if reqID != "" { + msgBody.Msg["reqId"] = reqID + msgBody.Keys = append(msgBody.Keys, "reqId") + } + msgBody.Msg["result"] = "succ" + msgBody.Keys = append(msgBody.Keys, "result") + //msgBody.Msg["resDesc"] = desc + msgBody.Msg["resDesc"] = "succ" + msgBody.Keys = append(msgBody.Keys, "resDesc") + + msgBody.Pack() + return msgBody.RawData +} diff --git a/omc/omc_pack.go b/omc/omc_pack.go new file mode 100644 index 0000000..918d1d0 --- /dev/null +++ b/omc/omc_pack.go @@ -0,0 +1,46 @@ +package omc + +import ( + "errors" + "fmt" + "strings" +) + +type MsgBody struct { + UID string + RawData []byte + MsgName string + Msg map[string]string + Keys []string +} + +// Decode +// reqLoginAlarm;user=yiy;key=qw#$@;type=msg +func (o *MsgBody) Decode() error { + multi := strings.Split(string(o.RawData), ";") + if len(multi) < 1 { + return errors.New("invalid msg body") + } + for i := 1; i < len(multi); i++ { + m := strings.Split(multi[i], "=") + if len(m) != 2 { + return errors.New("invalid msg body") + } + o.Msg[m[0]] = m[1] + } + return nil +} + +// Pack +// reqLoginAlarm;user=yiy;key=qw#$@;type=msg +func (o *MsgBody) Pack() error { + var multi []string + multi = append(multi, o.MsgName) + for _, key := range o.Keys { + item := fmt.Sprintf("%s=%s", key, o.Msg[key]) + multi = append(multi, item) + } + raw := strings.Join(multi, ";") + o.RawData = []byte(raw) + return nil +} diff --git a/omc/omc_type.go b/omc/omc_type.go new file mode 100644 index 0000000..92e28d9 --- /dev/null +++ b/omc/omc_type.go @@ -0,0 +1,39 @@ +package omc + +const ( + RealTimeAlarm = 0 + ReqLoginAlarm = 1 + AckLoginAlarm = 2 + ReqSyncAlarmMsg = 3 + AckSyncAlarmMsg = 4 + ReqSyncAlarmFile = 5 + AckSyncAlarmFile = 6 + AckSyncAlarmFileResult = 7 + ReqHeartBeat = 8 + AckHeartBeat = 9 + CloseConnAlarm = 10 + ReqCMCALoginAlarm = 11 + ReqCMCALoginSeq = 12 + AckCMCALoginSeq = 13 +) + +//定义user type +const ( + MSG = "msg" + FILE = "ftp" +) + +func OrigSeverity(os string) int32 { + switch os { + case "Critical": + return 1 + case "Major": + return 2 + case "Minor": + return 3 + case "Warning": + return 4 + default: + return 0 + } +} diff --git a/readme.txt b/readme.txt new file mode 100644 index 0000000..dcef26e --- /dev/null +++ b/readme.txt @@ -0,0 +1,43 @@ + +项目交付文件: +fpt://192.168.0.229/home/guodeng/bin + +服务启动: +./nb_alarm_agent -c /home/guodeng/omc/conf/nb_alarm_agent.json + +配置文件详情: +{ + /* 通道配置项*/ + "channel": [ + { + "tcp_port": 31232, //通道TCP监听端口 + "bind_flag": "SMF#SZ_01", //通道Bind网元(格式为 ne_type#ne_id) + "province": "BJ", //网元所在省份 + "device_code": "0001" //网元主机编码 + }, + { + "tcp_port": 31233, //通道TCP监听端口 + "bind_flag": "UDM#SZ_03", //通道Bind网元(格式为 ne_type#ne_id) + "province": "BJ", //网元所在省份 + "device_code": "0002" //网元主机编码 + } + ], + + // 数据库配置 + "mysql": "root:1000omc@kp!@tcp(192.168.0.229:33066)/omc_db?charset=utf8mb4&parseTime=True&loc=Local", + + // FTP服务器根目录 + "ftp_root": "ftp:192.168.0.229/data/ftp", + + //服务名称 + "mame":"north agent", + + // 心跳保活时间, 如果服务器检查client 在超过心跳时间没有数据(心跳数据或者业务数据)发送,则断开连接 + "heartbeat_max": 180, + + // 日志存放目录 + "log_dir": "./omc_log", + + // 日志文件名称 + "log_file":"omc.log" +} diff --git a/service/login.go b/service/login.go new file mode 100644 index 0000000..e62dfe9 --- /dev/null +++ b/service/login.go @@ -0,0 +1,23 @@ +package service + +import ( + "errors" + "github.com/aceld/zinx/zlog" + "omc/db" + "omc/lib" + "omc/model" +) + +func UserLogin(name, pw string) error { + // 用户名密码校验 + var user model.User + if err := db.Client.Model(&model.User{}).Where("account_id=?", name).First(&user).Error; err != nil { + return err + } + + if err := lib.Compare(user.Password, pw); err != nil { + zlog.Ins().ErrorF("Password Login[%s]:%s", name, err) + return errors.New("incorrect username and password") + } + return nil +} diff --git a/service/real_time_alarm.go b/service/real_time_alarm.go new file mode 100644 index 0000000..f74c6c1 --- /dev/null +++ b/service/real_time_alarm.go @@ -0,0 +1,91 @@ +package service + +import ( + "encoding/json" + "github.com/aceld/zinx/zlog" + "omc/db" + "omc/model" + "omc/omc" +) + +type OmcAlarm struct { + AId int64 `json:"-"` + AlarmSeq int32 `json:"alarmSeq"` //告警序列号 + AlarmTitle string `json:"alarmTitle"` //告警事件标题 + AlarmStatus int32 `json:"alarmStatus"` //告警状态 + AlarmType string `json:"alarmType"` //告警类型 + OrigSeverity int32 `json:"origSeverity"` //原始级别 + EventTime string `json:"eventTime"` //事件发生时间 + AlarmId string `json:"alarmId"` //告警事件唯一标识 + SpecificProblemID string `json:"specificProblemID"` //告警问题原因ID + SpecificProblem string `json:"specificProblem"` //告警问题原因 + NeUID string `json:"neUID"` //告警网元UID + NeName string `json:"neName"` //告警网元名称 + NeType string `json:"neType"` //告警网元设备类型 + ObjectUID string `json:"objectUID"` //告警定位对象UID + ObjectName string `json:"objectName"` //告警定位对象名称 + ObjectType string `json:"objectType"` //告警定位对象资源类型 + LocationInfo string `json:"locationInfo"` //告警定位信息 + AddInfo string `json:"addInfo"` //告警辅助信息[条件必选] + PVFlag string `json:"PVFlag"` //网元虚实性[条件必选] + Province string `json:"province"` //网元服务省份 +} + +// GetRealTimeAlarm 获取最新的告警信息 +func GetRealTimeAlarm(neType, neId string, alarmSeq int32) ([]OmcAlarm, error) { + var alarms []model.Alarm + var result []OmcAlarm + if err := db.Client.Model(&model.Alarm{}).Where("ne_type = ? and ne_id = ? and alarm_seq >= ?", neType, neId, alarmSeq).Order("alarm_seq asc").Find(&alarms).Error; err != nil { + return nil, err + } + for _, v := range alarms { + var item OmcAlarm + item.AlarmSeq = int32(v.AlarmSeq) + item.AlarmTitle = v.AlarmTitle + item.AlarmStatus = int32(v.AlarmStatus) + item.AlarmType = v.AlarmType + item.OrigSeverity = omc.OrigSeverity(v.OrigSeverity) + item.EventTime = v.EventTime.Format("2006-01-02 15:04:05") + item.AlarmId = v.AlarmId + item.SpecificProblemID = v.SpecificProblemID + item.SpecificProblem = v.SpecificProblem + item.NeUID = v.NeId + item.NeName = v.NeName + item.NeType = v.NeType + item.ObjectUID = v.ObjectUID + item.ObjectName = v.NeName + item.ObjectType = v.ObjectType + item.LocationInfo = v.LocationInfo + item.AddInfo = v.AddInfo + item.PVFlag = v.PVFlag + item.Province = v.Province + item.AId = int64(v.Id) + result = append(result, item) + } + return result, nil +} + +// GetLastAlarmSeq 获取最新的alarm seq +func GetLastAlarmSeq(neType, neId string) int32 { + var alarm model.Alarm + if err := db.Client.Model(&model.Alarm{}).Where("ne_type = ? and ne_id = ?", neType, neId).Order("alarm_seq desc").First(&alarm).Error; err != nil { + zlog.Ins().ErrorF("db error %s", err) + return 0 + } + return int32(alarm.AlarmSeq) + 1 +} + +func GenAlarm(alarm OmcAlarm) []byte { + data, _ := json.Marshal(&alarm) + return data +} + +func MaxAlarm(current int32, alarms []OmcAlarm) int32 { + var req = current + for _, v := range alarms { + if v.AlarmSeq > req { + req = v.AlarmSeq + } + } + return req +} diff --git a/service/sysn_alarm_file.go b/service/sysn_alarm_file.go new file mode 100644 index 0000000..5cc3ba5 --- /dev/null +++ b/service/sysn_alarm_file.go @@ -0,0 +1,219 @@ +package service + +import ( + "bytes" + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "omc/db" + "omc/lib" + "omc/model" + "omc/omc" + "time" + + "github.com/aceld/zinx/ziface" +) + +func GenFile(request ziface.IRequest, meta *lib.FileMeta, data []OmcAlarm) { + //生成文件内容 + dataBuff := bytes.NewBuffer([]byte{}) + for _, v := range data { + b, _ := json.Marshal(v) + binary.Write(dataBuff, binary.BigEndian, b) + binary.Write(dataBuff, binary.BigEndian, '\r') + binary.Write(dataBuff, binary.BigEndian, '\n') + } + + file, err := lib.GenFile(meta, dataBuff.Bytes()) + if err != nil { + return + } + + // add by simon at 2023/08/14 + fmt.Println("meta:", meta) + if meta.ReqId == "" { + meta.ReqId = "2" + } + //发送文件同步信息 + ackBody := omc.MsgBody{ + MsgName: "ackSyncAlarmFileResult", + Msg: make(map[string]string, 0), + } + ackBody.Msg["reqId"] = meta.ReqId + ackBody.Keys = append(ackBody.Keys, "reqId") + ackBody.Msg["result"] = "succ" + ackBody.Keys = append(ackBody.Keys, "result") + ackBody.Msg["fileName"] = file + ackBody.Keys = append(ackBody.Keys, "fileName") + ackBody.Msg["resDesc"] = "succ" + ackBody.Keys = append(ackBody.Keys, "resDesc") + ackBody.Pack() + request.GetConnection().SendMsg(omc.AckSyncAlarmFileResult, ackBody.RawData) +} + +// GetAlarmOfAlarmSeq 获取告警信息 +func GetAlarmOfAlarmSeq(neType, neId string, alarmSeq int) ([]OmcAlarm, error) { + var alarms []model.Alarm + var result []OmcAlarm + + query := db.Client.Model(&model.Alarm{}).Where("ne_type = ? and ne_id = ? and alarm_seq > ?", neType, neId, alarmSeq) + if err := query.Order("alarm_seq asc").Find(&alarms).Error; err != nil { + return nil, err + } + for _, v := range alarms { + var item OmcAlarm + item.AlarmSeq = int32(v.AlarmSeq) + item.AlarmTitle = v.AlarmTitle + item.AlarmStatus = int32(v.AlarmStatus) + item.AlarmType = v.AlarmType + item.OrigSeverity = omc.OrigSeverity(v.OrigSeverity) + item.EventTime = v.EventTime.Format("2006-01-02 15:04:05") + item.AlarmId = v.AlarmId + item.SpecificProblemID = v.SpecificProblemID + item.SpecificProblem = v.SpecificProblem + item.NeUID = v.NeId + item.NeName = v.NeName + item.NeType = v.NeType + item.ObjectUID = v.ObjectUID + item.ObjectName = v.NeName + item.ObjectType = v.ObjectType + item.LocationInfo = v.LocationInfo + item.AddInfo = v.AddInfo + item.PVFlag = v.PVFlag + item.Province = v.Province + result = append(result, item) + } + return result, nil +} + +//GetAlarm + +/* +1 如果syncSource=1 && alarmSeq 为空: 从北向告警上报日志中(nbi_alarm_log)取数据ID,然后反查告警信息表(alarm)取出告警日志 + +2 其他情况: 从告警信息表中取数据, 数据来源为设备告警事件 + +*/ + +func GetAlarm(neType, neId, startTime, endTime, syncSource string, alarmSeq int) ([]OmcAlarm, error) { + if syncSource == "0" { + return GetAlarmOfEventTime(neType, neId, startTime, endTime) + + } else { + if alarmSeq > 0 { + return GetAlarmOfAlarmSeq(neType, neId, alarmSeq) + } else { + return GetAlarmOfLog(neType, neId, startTime, endTime) + } + } +} + +// GetAlarmOfEventTime 获取告警信息 +func GetAlarmOfEventTime(neType, neId, startTime, endTime string) ([]OmcAlarm, error) { + var alarms []model.Alarm + var result []OmcAlarm + + if startTime == "" && endTime == "" { + return result, nil + } + query := db.Client.Model(&model.Alarm{}).Where("ne_type = ? and ne_id = ?", neType, neId) + if startTime != "" { + t1, err := time.Parse("2006-01-02 15:04:05", startTime) + if err != nil { + return nil, errors.New("startTime invalid") + } + query = query.Where("event_time > ?", t1) + } + if endTime != "" { + t2, err := time.Parse("2006-01-02 15:04:05", endTime) + if err != nil { + return nil, errors.New("endTime invalid") + } + query = query.Where("event_time < ?", t2) + } + if err := query.Order("alarm_seq asc").Find(&alarms).Error; err != nil { + return nil, err + } + for _, v := range alarms { + var item OmcAlarm + item.AlarmSeq = int32(v.AlarmSeq) + item.AlarmTitle = v.AlarmTitle + item.AlarmStatus = int32(v.AlarmStatus) + item.AlarmType = v.AlarmType + item.OrigSeverity = omc.OrigSeverity(v.OrigSeverity) + item.EventTime = v.EventTime.Format("2006-01-02 15:04:05") + item.AlarmId = v.AlarmId + item.SpecificProblemID = v.SpecificProblemID + item.SpecificProblem = v.SpecificProblem + item.NeUID = v.NeId + item.NeName = v.NeName + item.NeType = v.NeType + item.ObjectUID = v.ObjectUID + item.ObjectName = v.NeName + item.ObjectType = v.ObjectType + item.LocationInfo = v.LocationInfo + item.AddInfo = v.AddInfo + item.PVFlag = v.PVFlag + item.Province = v.Province + result = append(result, item) + } + return result, nil +} + +// GetAlarmOfLog 获取告警信息 +func GetAlarmOfLog(neType, neId, startTime, endTime string) ([]OmcAlarm, error) { + var alarms []model.Alarm + var result []OmcAlarm + + if startTime == "" && endTime == "" { + return result, nil + } + var aIDs []int64 + query := db.Client.Model(&model.NbiAlarmLog{}).Select("distinct a_id").Where("ne_type = ? and ne_id = ?", neType, neId) + if startTime != "" { + t1, err := time.Parse("2006-01-02 15:04:05", startTime) + if err != nil { + return nil, errors.New("startTime invalid") + } + query = query.Where("log_time >= ?", t1) + } + if endTime != "" { + t2, err := time.Parse("2006-01-02 15:04:05", endTime) + if err != nil { + return nil, errors.New("endTime invalid") + } + query = query.Where("log_time <= ?", t2) + } + if err := query.Order("alarm_seq asc").Find(&aIDs).Error; err != nil { + return nil, err + } + + if err := db.Client.Model(&model.Alarm{}).Where("id in (?)", aIDs).Find(&alarms).Error; err != nil { + return nil, err + } + for _, v := range alarms { + var item OmcAlarm + item.AlarmSeq = int32(v.AlarmSeq) + item.AlarmTitle = v.AlarmTitle + item.AlarmStatus = int32(v.AlarmStatus) + item.AlarmType = v.AlarmType + item.OrigSeverity = omc.OrigSeverity(v.OrigSeverity) + item.EventTime = v.EventTime.Format("2006-01-02 15:04:05") + item.AlarmId = v.AlarmId + item.SpecificProblemID = v.SpecificProblemID + item.SpecificProblem = v.SpecificProblem + item.NeUID = v.NeId + item.NeName = v.NeName + item.NeType = v.NeType + item.ObjectUID = v.ObjectUID + item.ObjectName = v.NeName + item.ObjectType = v.ObjectType + item.LocationInfo = v.LocationInfo + item.AddInfo = v.AddInfo + item.PVFlag = v.PVFlag + item.Province = v.Province + result = append(result, item) + } + return result, nil +}