package core import ( "errors" "github.com/aceld/zinx/zlog" "omc/db" "omc/model" "omc/omc" "omc/service" "strings" "sync" "time" ) /* ChannelManager 通道管理管理模块 */ type ChannelManager struct { Name string User map[string]*User //当前在线的User集合 BindFlag string //bind 的网元标识,ne_type#ne_id 格式 Province string //网元所在省份 DeviceCode string //主机编码 四位,每1位可用0-9、A-Z编码 AlarmSeq int32 //当前告警的序列号 pLock sync.RWMutex //保护User的互斥读写机制 } // MgrObj 多个通道管理模块集合 var MgrObj []*ChannelManager // NewManager New 提供ChannelManager 初始化 方法 func NewManager(name, bind, province, deviceCode string) *ChannelManager { newManager := ChannelManager{ Name: name, BindFlag: bind, Province: province, DeviceCode: deviceCode, User: make(map[string]*User), } MgrObj = append(MgrObj, &newManager) return &newManager } func GetManager(name string) *ChannelManager { for i := range MgrObj { if MgrObj[i].Name == name { return MgrObj[i] } } return nil } // AddUser 提供添加一个user的功能 func (wm *ChannelManager) AddUser(User *User) { //将User添加到 世界管理器中 wm.pLock.Lock() wm.User[User.UID] = User wm.pLock.Unlock() } // Talk User广播消息 func (wm *ChannelManager) Talk(msgID uint32, msg []byte) { //2. 得到所有的在线user users := wm.GetAllUser() //3. 向所有的user发送消息 for _, user := range users { if user.LoginState && user.AlarmType == omc.MSG { user.SendMsg(msgID, msg) } } } // LoginSuccess 登录成功管理 func (wm *ChannelManager) LoginSuccess(UID, name, tp string) error { wm.pLock.Lock() defer wm.pLock.Unlock() //判断是否重复登录 for _, v := range wm.User { if v.UserName == name && v.AlarmType == tp && v.LoginState == true { return errors.New("repeat login for the same account") } } user, ok := wm.User[UID] if !ok { return errors.New("server internal error") } user.UserName = name user.AlarmType = tp user.LoginState = true wm.User[UID] = user return nil } // SetSeqNo 设置登录随机码 func (wm *ChannelManager) SetSeqNo(UID, seqNo string) error { wm.pLock.Lock() defer wm.pLock.Unlock() //判断是否重复登录 user, ok := wm.User[UID] if !ok { return errors.New("server internal error") } user.SeqNo = seqNo return nil } // LoginFail 登录失败管理 func (wm *ChannelManager) LoginFail(UID string) (bool, error) { wm.pLock.Lock() defer wm.pLock.Unlock() //判断是否重复登录 user, ok := wm.User[UID] if !ok { return true, errors.New("server internal error") } user.LoginCount++ wm.User[UID] = user if user.LoginCount >= 3 { return true, errors.New("too many attempts, close connect") } return false, nil } // RemoveUserByPID 从信息表中移除一个user func (wm *ChannelManager) RemoveUserByPID(uid string) { wm.pLock.Lock() delete(wm.User, uid) wm.pLock.Unlock() } // GetUserByPID 通过user ID 获取对应user信息 func (wm *ChannelManager) GetUserByPID(uID string) *User { wm.pLock.RLock() defer wm.pLock.RUnlock() return wm.User[uID] } // GetAllUser 获取所有user的信息 func (wm *ChannelManager) GetAllUser() []*User { wm.pLock.RLock() defer wm.pLock.RUnlock() //创建返回的User集合切片 User := make([]*User, 0) //添加切片 for _, v := range wm.User { if v.LoginState && v.AlarmType == omc.MSG { User = append(User, v) } } //返回 return User } func (wm *ChannelManager) RealTimeAlarm() { for { wm.pLock.RLock() //查询 var newAlarmSeq = wm.AlarmSeq var alarms []service.OmcAlarm neBind, _ := ConvertBindFlag(wm.BindFlag) if wm.AlarmSeq == 0 { newAlarmSeq = service.GetLastAlarmSeq(neBind.NeType, neBind.NeId) } else { ams, err := service.GetRealTimeAlarm(neBind.NeType, neBind.NeId, wm.AlarmSeq) if err != nil { zlog.Ins().ErrorF("db error %s", err) } alarms = ams } //上报实时告警信息 if len(alarms) > 0 { go wm.SendAlarm(alarms) newAlarmSeq = service.MaxAlarm(newAlarmSeq, alarms) + 1 } var users []string for _, user := range wm.User { if user.LoginState && user.AlarmType == omc.MSG { userInfo := strings.Join([]string{user.UserName, user.RemoteIp}, ";") users = append(users, userInfo) } } //更新AlarmSeq wm.AlarmSeq = newAlarmSeq wm.pLock.RUnlock() //记录日志到alarm 日志表 for _, v := range alarms { for _, u := range users { var alarmLog model.NbiAlarmLog ui := strings.Split(u, ";") if len(ui) == 2 { alarmLog.OpUser = ui[0] alarmLog.SrcIp = ui[1] } alarmLog.NeType = v.NeType alarmLog.NeId = v.NeUID alarmLog.AlarmSeq = int64(v.AlarmSeq) alarmLog.AlarmId = v.AlarmId et, _ := time.Parse("2006-01-02 15:04:05", v.EventTime) alarmLog.EventTime = et alarmLog.LogTime = time.Now() alarmLog.AId = v.AId db.Client.Create(&alarmLog) } } time.Sleep(3 * time.Second) } } func (wm *ChannelManager) SendAlarm(alarms []service.OmcAlarm) error { for _, v := range alarms { //生产告警内容 data := service.GenAlarm(v) //发送告警内容 wm.Talk(omc.RealTimeAlarm, data) } return nil } func (wm *ChannelManager) UpdateAlarmSeq(newSeq int32) { wm.pLock.Lock() wm.AlarmSeq = newSeq wm.pLock.Unlock() }