Files
nbi_alarm/core/manage/user_manager.go
2023-08-24 19:11:12 +08:00

238 lines
5.4 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package manage
import (
"encoding/json"
"errors"
"omc/core/consts"
"omc/core/db"
"omc/core/parse"
"omc/handle/model"
"omc/handle/service"
"strings"
"sync"
"time"
"github.com/aceld/zinx/zlog"
)
/*
ChannelManager 通道管理管理模块
*/
type ChannelManager struct {
Name string
User map[string]*User //当前在线的User集合
BindFlag string //bind 的网元标识ne_type#ne_id 格式
Province string //网元所在省份
DeviceCode string //主机编码 四位每1位可用0-9、A-Z编码
AlarmSeq int64 //当前告警的序列号
pLock sync.RWMutex //保护User的互斥读写机制
}
// MgrObj 多个通道管理模块集合
var MgrObj []*ChannelManager
// NewManager New 提供ChannelManager 初始化 方法
func NewManager(name, bind, province, deviceCode string) *ChannelManager {
newManager := ChannelManager{
Name: name,
BindFlag: bind,
Province: province,
DeviceCode: deviceCode,
User: make(map[string]*User),
}
MgrObj = append(MgrObj, &newManager)
return &newManager
}
func GetManager(name string) *ChannelManager {
for i := range MgrObj {
if MgrObj[i].Name == name {
return MgrObj[i]
}
}
return nil
}
// AddUser 提供添加一个user的功能
func (wm *ChannelManager) AddUser(User *User) {
//将User添加到 世界管理器中
wm.pLock.Lock()
wm.User[User.UID] = User
wm.pLock.Unlock()
}
// Talk User广播消息
func (wm *ChannelManager) Talk(msgID uint32, msg []byte) {
//2. 得到所有的在线user
users := wm.GetAllUser()
//3. 向所有的user发送消息
for _, user := range users {
if user.LoginState && user.AlarmType == consts.MSG {
user.SendMsg(msgID, msg)
}
}
}
// LoginSuccess 登录成功管理
func (wm *ChannelManager) LoginSuccess(UID, name, tp string) error {
wm.pLock.Lock()
defer wm.pLock.Unlock()
//判断是否重复登录
for _, v := range wm.User {
if v.UserName == name && v.AlarmType == tp && v.LoginState {
v.Conn.Stop()
return errors.New("repeat login for the account")
}
}
user, ok := wm.User[UID]
if !ok {
return errors.New("server internal error")
}
user.UserName = name
user.AlarmType = tp
user.LoginState = true
wm.User[UID] = user
return nil
}
// SetSeqNo 设置登录随机码
func (wm *ChannelManager) SetSeqNo(UID, seqNo string) error {
wm.pLock.Lock()
defer wm.pLock.Unlock()
//判断是否重复登录
user, ok := wm.User[UID]
if !ok {
return errors.New("server internal error")
}
user.SeqNo = seqNo
return nil
}
// LoginFail 登录失败管理
func (wm *ChannelManager) LoginFail(UID string) (bool, error) {
wm.pLock.Lock()
defer wm.pLock.Unlock()
//判断是否重复登录
user, ok := wm.User[UID]
if !ok {
return true, errors.New("server internal error")
}
user.LoginCount++
wm.User[UID] = user
if user.LoginCount >= 3 {
return true, errors.New("too many attempts, close connect")
}
return false, nil
}
// RemoveUserByPID 从信息表中移除一个user
func (wm *ChannelManager) RemoveUserByPID(uid string) {
wm.pLock.Lock()
delete(wm.User, uid)
wm.pLock.Unlock()
}
// GetUserByPID 通过user ID 获取对应user信息
func (wm *ChannelManager) GetUserByPID(uID string) *User {
wm.pLock.RLock()
defer wm.pLock.RUnlock()
return wm.User[uID]
}
// GetAllUser 获取所有user的信息
func (wm *ChannelManager) GetAllUser() []*User {
wm.pLock.RLock()
defer wm.pLock.RUnlock()
//创建返回的User集合切片
User := make([]*User, 0)
//添加切片
for _, v := range wm.User {
if v.LoginState && v.AlarmType == consts.MSG {
User = append(User, v)
}
}
//返回
return User
}
func (wm *ChannelManager) RealTimeAlarm() {
for {
wm.pLock.RLock()
//查询
var alarms []model.OmcAlarm
neBind, _ := parse.ConvertBindFlag(wm.BindFlag)
var lastAlarmSeq = wm.AlarmSeq
if wm.AlarmSeq == 0 {
lastAlarmSeq = service.LastAlarmSeq(neBind.NeType, neBind.NeId)
} else {
list, err := service.List(neBind.NeType, neBind.NeId, wm.AlarmSeq)
if err != nil {
zlog.Ins().ErrorF("db error %s", err)
}
alarms = service.ConvertOMCAlarm(list)
}
//上报实时告警信息
if len(alarms) > 0 {
go wm.SendAlarm(alarms)
lastAlarmSeq = service.MaxAlarmSeq(lastAlarmSeq, alarms)
}
var users []string
for _, user := range wm.User {
if user.LoginState && user.AlarmType == consts.MSG {
userInfo := strings.Join([]string{user.UserName, user.RemoteIp}, ";")
users = append(users, userInfo)
}
}
//更新AlarmSeq
wm.AlarmSeq = lastAlarmSeq + 1
wm.pLock.RUnlock()
//记录日志到alarm 日志表
for _, v := range alarms {
for _, u := range users {
var alarmLog model.NbiAlarmLog
ui := strings.Split(u, ";")
if len(ui) == 2 {
alarmLog.OpUser = ui[0]
alarmLog.SrcIp = ui[1]
}
alarmLog.NeType = v.NeType
alarmLog.NeId = v.NeUID
alarmLog.AlarmSeq = int64(v.AlarmSeq)
alarmLog.AlarmId = v.AlarmId
alarmLog.EventTime = v.EventTime
alarmLog.LogTime = time.Now()
db.Client.Create(&alarmLog)
}
}
time.Sleep(3 * time.Second)
}
}
func (wm *ChannelManager) SendAlarm(alarms []model.OmcAlarm) error {
for _, v := range alarms {
//生产告警内容
data, err := json.Marshal(v)
if err != nil {
zlog.Ins().ErrorF("SendAlarm json Marshalerror %v", err)
} else {
//发送告警内容
wm.Talk(0, data)
}
}
return nil
}
func (wm *ChannelManager) UpdateAlarmSeq(alarmSeq int64) {
wm.pLock.Lock()
wm.AlarmSeq = alarmSeq
wm.pLock.Unlock()
}