package manage import ( "encoding/json" "errors" "omc/core/consts" "omc/core/db" "omc/core/parse" "omc/handle/model" "omc/handle/service" "strings" "sync" "time" "github.com/aceld/zinx/zlog" ) /* ChannelManager 通道管理管理模块 */ type ChannelManager struct { Name string User map[string]*User //当前在线的User集合 BindFlag string //bind 的网元标识,ne_type#ne_id 格式 Province string //网元所在省份 DeviceCode string //主机编码 四位,每1位可用0-9、A-Z编码 AlarmSeq int64 //当前告警的序列号 pLock sync.RWMutex //保护User的互斥读写机制 } // MgrObj 多个通道管理模块集合 var MgrObj []*ChannelManager // NewManager New 提供ChannelManager 初始化 方法 func NewManager(name, bind, province, deviceCode string) *ChannelManager { newManager := ChannelManager{ Name: name, BindFlag: bind, Province: province, DeviceCode: deviceCode, User: make(map[string]*User), } MgrObj = append(MgrObj, &newManager) return &newManager } func GetManager(name string) *ChannelManager { for i := range MgrObj { if MgrObj[i].Name == name { return MgrObj[i] } } return nil } // AddUser 提供添加一个user的功能 func (wm *ChannelManager) AddUser(User *User) { //将User添加到 世界管理器中 wm.pLock.Lock() wm.User[User.UID] = User wm.pLock.Unlock() } // Talk User广播消息 func (wm *ChannelManager) Talk(msgID uint32, msg []byte) { //2. 得到所有的在线user users := wm.GetAllUser() //3. 向所有的user发送消息 for _, user := range users { if user.LoginState && user.AlarmType == consts.MSG { user.SendMsg(msgID, msg) } } } // LoginSuccess 登录成功管理 func (wm *ChannelManager) LoginSuccess(UID, name, tp string) error { wm.pLock.Lock() defer wm.pLock.Unlock() //判断是否重复登录 for _, v := range wm.User { if v.UserName == name && v.AlarmType == tp && v.LoginState { v.Conn.Stop() return errors.New("repeat login for the account") } } user, ok := wm.User[UID] if !ok { return errors.New("server internal error") } user.UserName = name user.AlarmType = tp user.LoginState = true wm.User[UID] = user return nil } // SetSeqNo 设置登录随机码 func (wm *ChannelManager) SetSeqNo(UID, seqNo string) error { wm.pLock.Lock() defer wm.pLock.Unlock() //判断是否重复登录 user, ok := wm.User[UID] if !ok { return errors.New("server internal error") } user.SeqNo = seqNo return nil } // LoginFail 登录失败管理 func (wm *ChannelManager) LoginFail(UID string) (bool, error) { wm.pLock.Lock() defer wm.pLock.Unlock() //判断是否重复登录 user, ok := wm.User[UID] if !ok { return true, errors.New("server internal error") } user.LoginCount++ wm.User[UID] = user if user.LoginCount >= 3 { return true, errors.New("too many attempts, close connect") } return false, nil } // RemoveUserByPID 从信息表中移除一个user func (wm *ChannelManager) RemoveUserByPID(uid string) { wm.pLock.Lock() delete(wm.User, uid) wm.pLock.Unlock() } // GetUserByPID 通过user ID 获取对应user信息 func (wm *ChannelManager) GetUserByPID(uID string) *User { wm.pLock.RLock() defer wm.pLock.RUnlock() return wm.User[uID] } // GetAllUser 获取所有user的信息 func (wm *ChannelManager) GetAllUser() []*User { wm.pLock.RLock() defer wm.pLock.RUnlock() //创建返回的User集合切片 User := make([]*User, 0) //添加切片 for _, v := range wm.User { if v.LoginState && v.AlarmType == consts.MSG { User = append(User, v) } } //返回 return User } func (wm *ChannelManager) RealTimeAlarm() { for { wm.pLock.RLock() //查询 var alarms []model.OmcAlarm neBind, _ := parse.ConvertBindFlag(wm.BindFlag) var lastAlarmSeq = wm.AlarmSeq if wm.AlarmSeq == 0 { lastAlarmSeq = service.LastAlarmSeq(neBind.NeType, neBind.NeId) } else { list, err := service.List(neBind.NeType, neBind.NeId, wm.AlarmSeq) if err != nil { zlog.Ins().ErrorF("db error %s", err) } alarms = service.ConvertOMCAlarm(list) } //上报实时告警信息 if len(alarms) > 0 { go wm.SendAlarm(alarms) lastAlarmSeq = service.MaxAlarmSeq(lastAlarmSeq, alarms) } var users []string for _, user := range wm.User { if user.LoginState && user.AlarmType == consts.MSG { userInfo := strings.Join([]string{user.UserName, user.RemoteIp}, ";") users = append(users, userInfo) } } //更新AlarmSeq wm.AlarmSeq = lastAlarmSeq + 1 wm.pLock.RUnlock() //记录日志到alarm 日志表 for _, v := range alarms { for _, u := range users { var alarmLog model.NbiAlarmLog ui := strings.Split(u, ";") if len(ui) == 2 { alarmLog.OpUser = ui[0] alarmLog.SrcIp = ui[1] } alarmLog.NeType = v.NeType alarmLog.NeId = v.NeUID alarmLog.AlarmSeq = int64(v.AlarmSeq) alarmLog.AlarmId = v.AlarmId alarmLog.EventTime = v.EventTime alarmLog.LogTime = time.Now() db.Client.Create(&alarmLog) } } time.Sleep(3 * time.Second) } } func (wm *ChannelManager) SendAlarm(alarms []model.OmcAlarm) error { for _, v := range alarms { //生产告警内容 data, err := json.Marshal(v) if err != nil { zlog.Ins().ErrorF("SendAlarm json Marshalerror %v", err) } else { //发送告警内容 wm.Talk(0, data) } } return nil } func (wm *ChannelManager) UpdateAlarmSeq(alarmSeq int64) { wm.pLock.Lock() wm.AlarmSeq = alarmSeq wm.pLock.Unlock() }