package service import ( "context" "strconv" "time" "ems.agt/src/framework/logger" "ems.agt/src/modules/monitor/model" "ems.agt/src/modules/monitor/repository" systemService "ems.agt/src/modules/system/service" "github.com/shirou/gopsutil/v3/cpu" "github.com/shirou/gopsutil/v3/disk" "github.com/shirou/gopsutil/v3/load" "github.com/shirou/gopsutil/v3/mem" "github.com/shirou/gopsutil/v3/net" ) // 实例化服务层 MonitorImpl 结构体 var NewMonitorImpl = &MonitorImpl{ sysConfigService: systemService.NewSysConfigImpl, monitorRepository: repository.NewMonitorImpl, diskIO: make(chan []disk.IOCountersStat, 2), netIO: make(chan []net.IOCountersStat, 2), } // MonitorImpl 服务器系统相关信息 服务层处理 type MonitorImpl struct { // 参数配置服务 sysConfigService systemService.ISysConfig // 监控服务资源数据信息 monitorRepository repository.IMonitor // 磁盘网络IO 数据通道 diskIO chan ([]disk.IOCountersStat) netIO chan ([]net.IOCountersStat) } // RunMonitor 执行资源监控 func (s *MonitorImpl) RunMonitor() { var itemBase model.MonitorBase itemBase.CreateTime = time.Now().UnixMilli() itemBase.NeType = "#" itemBase.NeID = "#" totalPercent, _ := cpu.Percent(3*time.Second, false) if len(totalPercent) == 1 { itemBase.CPU = totalPercent[0] } cpuCount, _ := cpu.Counts(false) loadInfo, _ := load.Avg() itemBase.CPULoad1 = loadInfo.Load1 itemBase.CPULoad5 = loadInfo.Load5 itemBase.CPULoad15 = loadInfo.Load15 itemBase.LoadUsage = loadInfo.Load1 / (float64(cpuCount*2) * 0.75) * 100 memoryInfo, _ := mem.VirtualMemory() itemBase.Memory = memoryInfo.UsedPercent if err := s.monitorRepository.CreateMonitorBase(itemBase); err != nil { logger.Errorf("CreateMonitorBase err: %v", err) } // 将当前资源发送到chan中处理保存 s.loadDiskIO() s.loadNetIO() // 监控系统资源-保留天数 storeDays := s.sysConfigService.SelectConfigValueByKey("monitor.sysResource.storeDays") if storeDays != "" { storeDays, _ := strconv.Atoi(storeDays) ltTime := time.Now().AddDate(0, 0, -storeDays).UnixMilli() _ = s.monitorRepository.DelMonitorBase(ltTime) _ = s.monitorRepository.DelMonitorIO(ltTime) _ = s.monitorRepository.DelMonitorNet(ltTime) } } func (s *MonitorImpl) loadDiskIO() { ioStat, _ := disk.IOCounters() var diskIOList []disk.IOCountersStat for _, io := range ioStat { diskIOList = append(diskIOList, io) } s.diskIO <- diskIOList } func (s *MonitorImpl) loadNetIO() { netStat, _ := net.IOCounters(true) netStatAll, _ := net.IOCounters(false) var netList []net.IOCountersStat netList = append(netList, netStat...) netList = append(netList, netStatAll...) s.netIO <- netList } // monitorCancel 监控搜集IO/Network上下文 var monitorCancel context.CancelFunc // RunMonitorDataCancel 启动资源监控数据存储io/network通道 移除之前的chan上下文后在设置新的均值 // interval 采集的平均值(分钟) func (s *MonitorImpl) RunMonitorDataCancel(removeBefore bool, interval float64) { // 是否取消之前的 if removeBefore { monitorCancel() } // 上下文控制 ctx, cancel := context.WithCancel(context.Background()) monitorCancel = cancel // chanl 通道进行存储数据 go s.saveIODataToDB(ctx, interval) go s.saveNetDataToDB(ctx, interval) } func (s *MonitorImpl) saveIODataToDB(ctx context.Context, interval float64) { defer close(s.diskIO) for { select { case <-ctx.Done(): return case ioStat := <-s.diskIO: select { case <-ctx.Done(): return case ioStat2 := <-s.diskIO: var ioList []model.MonitorIO timeMilli := time.Now().UnixMilli() for _, io2 := range ioStat2 { for _, io1 := range ioStat { if io2.Name == io1.Name { var itemIO model.MonitorIO itemIO.CreateTime = timeMilli itemIO.NeType = "#" itemIO.NeID = "#" itemIO.Name = io1.Name if io2.ReadBytes != 0 && io1.ReadBytes != 0 && io2.ReadBytes > io1.ReadBytes { itemIO.Read = int64(float64(io2.ReadBytes-io1.ReadBytes) / interval / 60) } if io2.WriteBytes != 0 && io1.WriteBytes != 0 && io2.WriteBytes > io1.WriteBytes { itemIO.Write = int64(float64(io2.WriteBytes-io1.WriteBytes) / interval / 60) } if io2.ReadCount != 0 && io1.ReadCount != 0 && io2.ReadCount > io1.ReadCount { itemIO.Count = int64(float64(io2.ReadCount-io1.ReadCount) / interval / 60) } writeCount := int64(0) if io2.WriteCount != 0 && io1.WriteCount != 0 && io2.WriteCount > io1.WriteCount { writeCount = int64(float64(io2.WriteCount-io1.WriteCount) / interval * 60) } if writeCount > itemIO.Count { itemIO.Count = writeCount } if io2.ReadTime != 0 && io1.ReadTime != 0 && io2.ReadTime > io1.ReadTime { itemIO.Time = int64(float64(io2.ReadTime-io1.ReadTime) / interval / 60) } writeTime := int64(0) if io2.WriteTime != 0 && io1.WriteTime != 0 && io2.WriteTime > io1.WriteTime { writeTime = int64(float64(io2.WriteTime-io1.WriteTime) / interval / 60) } if writeTime > itemIO.Time { itemIO.Time = writeTime } ioList = append(ioList, itemIO) break } } } if err := s.monitorRepository.BatchCreateMonitorIO(ioList); err != nil { logger.Errorf("BatchCreateMonitorIO err: %v", err) } s.diskIO <- ioStat2 } } } } func (s *MonitorImpl) saveNetDataToDB(ctx context.Context, interval float64) { defer close(s.netIO) for { select { case <-ctx.Done(): return case netStat := <-s.netIO: select { case <-ctx.Done(): return case netStat2 := <-s.netIO: var netList []model.MonitorNetwork timeMilli := time.Now().UnixMilli() for _, net2 := range netStat2 { for _, net1 := range netStat { if net2.Name == net1.Name { var itemNet model.MonitorNetwork itemNet.CreateTime = timeMilli itemNet.NeType = "#" itemNet.NeID = "#" itemNet.Name = net1.Name if net2.BytesSent != 0 && net1.BytesSent != 0 && net2.BytesSent > net1.BytesSent { itemNet.Up = float64(net2.BytesSent-net1.BytesSent) / 1024 / interval / 60 } if net2.BytesRecv != 0 && net1.BytesRecv != 0 && net2.BytesRecv > net1.BytesRecv { itemNet.Down = float64(net2.BytesRecv-net1.BytesRecv) / 1024 / interval / 60 } netList = append(netList, itemNet) break } } } if err := s.monitorRepository.BatchCreateMonitorNet(netList); err != nil { logger.Errorf("BatchCreateMonitorNet err: %v", err) } s.netIO <- netStat2 } } } } // SelectMonitorInfo 查询监控资源信息 func (s *MonitorImpl) SelectMonitorInfo(query map[string]any) map[string]any { infoType := query["type"] startTimeMilli := query["startTime"] endTimeMilli := query["endTime"] neType := query["neType"] neId := query["neId"] name := query["name"] // 返回数据 backDatas := map[string]any{} // 基本信息 if infoType == "all" || infoType == "load" || infoType == "cpu" || infoType == "memory" { rows := s.monitorRepository.SelectMonitorBase(map[string]any{ "startTime": startTimeMilli, "endTime": endTimeMilli, "neType": neType, "neId": neId, }) backDatas["base"] = rows } // 磁盘IO if infoType == "all" || infoType == "io" { rows := s.monitorRepository.SelectMonitorIO(map[string]any{ "startTime": startTimeMilli, "endTime": endTimeMilli, "neType": neType, "neId": neId, "name": name, }) backDatas["io"] = rows } // 网络 if infoType == "all" || infoType == "network" { rows := s.monitorRepository.SelectMonitorNetwork(map[string]any{ "startTime": startTimeMilli, "endTime": endTimeMilli, "neType": neType, "neId": neId, "name": name, }) backDatas["network"] = rows } return backDatas }