feat: 添加KPI数据备份导出功能及相关数据库结构更新
This commit is contained in:
@@ -0,0 +1,416 @@
|
||||
package backup_export_kpi
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"be.ems/src/framework/cron"
|
||||
"be.ems/src/framework/logger"
|
||||
"be.ems/src/framework/utils/date"
|
||||
"be.ems/src/framework/utils/file"
|
||||
"be.ems/src/framework/utils/parse"
|
||||
neDataModel "be.ems/src/modules/network_data/model"
|
||||
neDataService "be.ems/src/modules/network_data/service"
|
||||
neModel "be.ems/src/modules/network_element/model"
|
||||
neService "be.ems/src/modules/network_element/service"
|
||||
)
|
||||
|
||||
var NewProcessor = &BackupExportKPIProcessor{
|
||||
count: 0,
|
||||
backupService: neDataService.NewBackup,
|
||||
neInfoService: neService.NewNeInfo,
|
||||
kpiReportService: neDataService.NewKpiReport,
|
||||
kpicReportService: neDataService.NewKpiCReport,
|
||||
}
|
||||
|
||||
// BackupExportKPI 队列任务处理
|
||||
type BackupExportKPIProcessor struct {
|
||||
count int // 执行次数
|
||||
backupService *neDataService.Backup // 备份相关服务
|
||||
neInfoService *neService.NeInfo // 网元信息服务
|
||||
kpiReportService *neDataService.KpiReport // 统计信息服务
|
||||
kpicReportService *neDataService.KpiCReport // 统计信息服务
|
||||
}
|
||||
|
||||
func (s *BackupExportKPIProcessor) Execute(data any) (any, error) {
|
||||
s.count++ // 执行次数加一
|
||||
options := data.(cron.JobData)
|
||||
sysJob := options.SysJob
|
||||
logger.Infof("重复:%v 任务ID:%d 执行次数:%d", options.Repeat, sysJob.JobId, s.count)
|
||||
// 返回结果,用于记录执行结果
|
||||
result := map[string]any{
|
||||
"count": s.count,
|
||||
}
|
||||
|
||||
var params struct {
|
||||
DataType []string `json:"dataType"` // 类型支持 ims/amf/udm/smf/pcf/upf/mme/smsc
|
||||
FileType string `json:"fileType"` // 文件类型 csv/xlsx
|
||||
Hour int `json:"hour"` // 数据时间从任务执行时间前的小时数
|
||||
}
|
||||
if err := json.Unmarshal([]byte(sysJob.TargetParams), ¶ms); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !(params.FileType == "csv" || params.FileType == "xlsx") {
|
||||
return nil, fmt.Errorf("file type error, only support csv,xlsx")
|
||||
}
|
||||
|
||||
for _, v := range params.DataType {
|
||||
neList := s.neInfoService.Find(neModel.NeInfo{NeType: strings.ToUpper(v)}, false, false)
|
||||
for _, ne := range neList {
|
||||
// 前 hour 小时
|
||||
now := time.Now()
|
||||
end := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location())
|
||||
start := end.Add(-time.Duration(params.Hour) * time.Hour)
|
||||
// 密度
|
||||
var interval int64 = 60
|
||||
if ne.NeType == "UPF" {
|
||||
interval = 5
|
||||
}
|
||||
|
||||
// 指标
|
||||
query := neDataModel.KPIQuery{
|
||||
NeType: ne.NeType,
|
||||
NeID: ne.NeId,
|
||||
RmUID: ne.RmUID,
|
||||
Interval: interval,
|
||||
BeginTime: start.UnixMilli(),
|
||||
EndTime: end.UnixMilli(),
|
||||
}
|
||||
result[ne.NeName+"_kpi"] = s.exportKPI(query, params.FileType)
|
||||
|
||||
// 自定义指标
|
||||
queryC := neDataModel.KPICQuery{
|
||||
NeType: ne.NeType,
|
||||
NeID: ne.NeId,
|
||||
RmUID: ne.RmUID,
|
||||
Interval: interval,
|
||||
BeginTime: start.UnixMilli(),
|
||||
EndTime: end.UnixMilli(),
|
||||
}
|
||||
result[ne.NeName+"_kpic"] = s.exportKPIC(queryC, params.FileType)
|
||||
}
|
||||
}
|
||||
|
||||
// 返回结果,用于记录执行结果
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// exportKPI 导出KPI数据
|
||||
func (s *BackupExportKPIProcessor) exportKPI(query neDataModel.KPIQuery, fileType string) string {
|
||||
rows := s.kpiReportService.FindData(query)
|
||||
if len(rows) == 0 {
|
||||
return "no data"
|
||||
}
|
||||
|
||||
// 获取数据指标id
|
||||
kpiIdMap := make(map[string]string, 0)
|
||||
kpiTitles := s.kpiReportService.FindTitle(query.NeType)
|
||||
for _, v := range kpiTitles {
|
||||
kpiIdMap[v.KpiId] = v.EnTitle
|
||||
}
|
||||
|
||||
// 导出文件名称
|
||||
dateStr := date.ParseDateToStr(parse.Number(query.EndTime), date.YYYYMMDDHHMMSS)
|
||||
fileName := fmt.Sprintf("%s_%s_kpi_export_%d_%s.%s", strings.ToLower(query.NeType), query.NeID, len(rows), dateStr, fileType)
|
||||
filePath := filepath.Join(s.backupService.BACKUP_DIR, fmt.Sprintf("/%s_kpi", strings.ToLower(query.NeType)), fileName)
|
||||
if runtime.GOOS == "windows" {
|
||||
filePath = fmt.Sprintf("C:%s", filePath)
|
||||
}
|
||||
|
||||
if fileType == "csv" {
|
||||
data := [][]string{}
|
||||
|
||||
// 获取kpiIdMap的键并排序
|
||||
var sortedKpiIds []string
|
||||
for kpiId := range kpiIdMap {
|
||||
sortedKpiIds = append(sortedKpiIds, kpiId)
|
||||
}
|
||||
sort.Strings(sortedKpiIds) // 按字母排序
|
||||
|
||||
// 头 - 按排序后的顺序添加标题
|
||||
header := []string{}
|
||||
for _, kpiId := range sortedKpiIds {
|
||||
header = append(header, kpiIdMap[kpiId])
|
||||
}
|
||||
header = append(header, "NE Name")
|
||||
header = append(header, "Time")
|
||||
data = append(data, header)
|
||||
|
||||
for _, row := range rows {
|
||||
// 取时间
|
||||
timeStr := ""
|
||||
if v, ok := row["timeGroup"]; ok && v != nil {
|
||||
if releaseTime := parse.Number(v); releaseTime > 0 {
|
||||
timeStr = date.ParseDateToStr(releaseTime, date.YYYY_MM_DDTHH_MM_SSZ)
|
||||
} else {
|
||||
timeStr = fmt.Sprintf("%s", v)
|
||||
}
|
||||
}
|
||||
// 取网元名称
|
||||
neName := ""
|
||||
if v, ok := row["neName"]; ok && v != nil {
|
||||
neName = fmt.Sprintf("%s", v)
|
||||
}
|
||||
|
||||
// 按排序后的顺序获取数据
|
||||
dataRow := []string{}
|
||||
for _, kpiId := range sortedKpiIds {
|
||||
value := ""
|
||||
if v, ok := row[kpiId]; ok && v != nil {
|
||||
value = fmt.Sprint(v)
|
||||
}
|
||||
dataRow = append(dataRow, value)
|
||||
}
|
||||
dataRow = append(dataRow, neName)
|
||||
dataRow = append(dataRow, timeStr)
|
||||
data = append(data, dataRow)
|
||||
}
|
||||
// 输出到文件
|
||||
if err := file.WriterFileCSV(data, filePath); err != nil {
|
||||
logger.Errorf("export operate log err => %v", err.Error())
|
||||
return "export err"
|
||||
}
|
||||
}
|
||||
|
||||
if fileType == "xlsx" {
|
||||
// 获取kpiIdMap的键并排序
|
||||
var sortedKpiIds []string
|
||||
for kpiId := range kpiIdMap {
|
||||
sortedKpiIds = append(sortedKpiIds, kpiId)
|
||||
}
|
||||
sort.Strings(sortedKpiIds) // 按字母排序
|
||||
|
||||
// 第一行表头标题 - 按排序后的顺序
|
||||
headerCells := map[string]string{}
|
||||
colIndex := 0
|
||||
|
||||
// 先添加KPI标题列
|
||||
for i, kpiId := range sortedKpiIds {
|
||||
colLetter := string(rune('A' + i))
|
||||
headerCells[colLetter+"1"] = kpiIdMap[kpiId]
|
||||
colIndex++
|
||||
}
|
||||
|
||||
// 添加NE Name和Time列
|
||||
neNameCol := string(rune('A' + colIndex))
|
||||
headerCells[neNameCol+"1"] = "NE Name"
|
||||
colIndex++
|
||||
|
||||
timeCol := string(rune('A' + colIndex))
|
||||
headerCells[timeCol+"1"] = "Time"
|
||||
|
||||
// 从第二行开始的数据
|
||||
dataCells := make([]map[string]any, 0)
|
||||
for i, row := range rows {
|
||||
idx := strconv.Itoa(i + 2)
|
||||
|
||||
// 取时间和网元名称
|
||||
timeStr := ""
|
||||
if v, ok := row["timeGroup"]; ok && v != nil {
|
||||
if releaseTime := parse.Number(v); releaseTime > 0 {
|
||||
timeStr = date.ParseDateToStr(releaseTime, date.YYYY_MM_DDTHH_MM_SSZ)
|
||||
} else {
|
||||
timeStr = fmt.Sprintf("%s", v)
|
||||
}
|
||||
}
|
||||
|
||||
neName := ""
|
||||
if v, ok := row["neName"]; ok && v != nil {
|
||||
neName = fmt.Sprintf("%s", v)
|
||||
}
|
||||
|
||||
// 按排序后的顺序填充数据
|
||||
dataCell := map[string]any{}
|
||||
|
||||
// 填充KPI数据
|
||||
for j, kpiId := range sortedKpiIds {
|
||||
value := ""
|
||||
if v, ok := row[kpiId]; ok && v != nil {
|
||||
value = fmt.Sprint(v)
|
||||
}
|
||||
colLetter := string(rune('A' + j))
|
||||
dataCell[colLetter+idx] = value
|
||||
}
|
||||
|
||||
// 填充NE Name和Time
|
||||
dataCell[neNameCol+idx] = neName
|
||||
dataCell[timeCol+idx] = timeStr
|
||||
|
||||
dataCells = append(dataCells, dataCell)
|
||||
}
|
||||
// 导出数据表格
|
||||
if err := file.WriterFileExecl(headerCells, dataCells, filePath, ""); err != nil {
|
||||
logger.Errorf("export operate log err => %v", err.Error())
|
||||
return "export err"
|
||||
}
|
||||
}
|
||||
|
||||
// 上传到FTP服务器
|
||||
if err := s.backupService.FTPPushFile(filePath, ""); err != nil {
|
||||
return "ok, ftp err:" + err.Error()
|
||||
}
|
||||
return "ok"
|
||||
}
|
||||
|
||||
// exportKPI 导出KPI数据
|
||||
func (s *BackupExportKPIProcessor) exportKPIC(query neDataModel.KPICQuery, fileType string) string {
|
||||
rows := s.kpicReportService.FindData(query)
|
||||
if len(rows) == 0 {
|
||||
return "no data"
|
||||
}
|
||||
|
||||
// 获取数据指标id
|
||||
kpiIdMap := make(map[string]string, 0)
|
||||
kpiTitles := s.kpiReportService.FindTitle(query.NeType)
|
||||
for _, v := range kpiTitles {
|
||||
kpiIdMap[v.KpiId] = v.EnTitle
|
||||
}
|
||||
|
||||
// 导出文件名称
|
||||
dateStr := date.ParseDateToStr(parse.Number(query.EndTime), date.YYYYMMDDHHMMSS)
|
||||
fileName := fmt.Sprintf("%s_%s_kpic_export_%d_%s.%s", strings.ToLower(query.NeType), query.NeID, len(rows), dateStr, fileType)
|
||||
filePath := filepath.Join(s.backupService.BACKUP_DIR, fmt.Sprintf("/%s_kpi", strings.ToLower(query.NeType)), fileName)
|
||||
if runtime.GOOS == "windows" {
|
||||
filePath = fmt.Sprintf("C:%s", filePath)
|
||||
}
|
||||
|
||||
if fileType == "csv" {
|
||||
data := [][]string{}
|
||||
|
||||
// 获取kpiIdMap的键并排序
|
||||
var sortedKpiIds []string
|
||||
for kpiId := range kpiIdMap {
|
||||
sortedKpiIds = append(sortedKpiIds, kpiId)
|
||||
}
|
||||
sort.Strings(sortedKpiIds) // 按字母排序
|
||||
|
||||
// 头 - 按排序后的顺序添加标题
|
||||
header := []string{}
|
||||
for _, kpiId := range sortedKpiIds {
|
||||
header = append(header, kpiIdMap[kpiId])
|
||||
}
|
||||
header = append(header, "NE Name")
|
||||
header = append(header, "Time")
|
||||
data = append(data, header)
|
||||
|
||||
for _, row := range rows {
|
||||
// 取时间
|
||||
timeStr := ""
|
||||
if v, ok := row["timeGroup"]; ok && v != nil {
|
||||
if releaseTime := parse.Number(v); releaseTime > 0 {
|
||||
timeStr = date.ParseDateToStr(releaseTime, date.YYYY_MM_DDTHH_MM_SSZ)
|
||||
} else {
|
||||
timeStr = fmt.Sprintf("%s", v)
|
||||
}
|
||||
}
|
||||
// 取网元名称
|
||||
neName := ""
|
||||
if v, ok := row["neName"]; ok && v != nil {
|
||||
neName = fmt.Sprintf("%s", v)
|
||||
}
|
||||
|
||||
// 按排序后的顺序获取数据
|
||||
dataRow := []string{}
|
||||
for _, kpiId := range sortedKpiIds {
|
||||
value := ""
|
||||
if v, ok := row[kpiId]; ok && v != nil {
|
||||
value = fmt.Sprint(v)
|
||||
}
|
||||
dataRow = append(dataRow, value)
|
||||
}
|
||||
dataRow = append(dataRow, neName)
|
||||
dataRow = append(dataRow, timeStr)
|
||||
data = append(data, dataRow)
|
||||
}
|
||||
// 输出到文件
|
||||
if err := file.WriterFileCSV(data, filePath); err != nil {
|
||||
logger.Errorf("export operate log err => %v", err.Error())
|
||||
return "export err"
|
||||
}
|
||||
}
|
||||
|
||||
if fileType == "xlsx" {
|
||||
// 获取kpiIdMap的键并排序
|
||||
var sortedKpiIds []string
|
||||
for kpiId := range kpiIdMap {
|
||||
sortedKpiIds = append(sortedKpiIds, kpiId)
|
||||
}
|
||||
sort.Strings(sortedKpiIds) // 按字母排序
|
||||
|
||||
// 第一行表头标题 - 按排序后的顺序
|
||||
headerCells := map[string]string{}
|
||||
colIndex := 0
|
||||
|
||||
// 先添加KPI标题列
|
||||
for i, kpiId := range sortedKpiIds {
|
||||
colLetter := string(rune('A' + i))
|
||||
headerCells[colLetter+"1"] = kpiIdMap[kpiId]
|
||||
colIndex++
|
||||
}
|
||||
|
||||
// 添加NE Name和Time列
|
||||
neNameCol := string(rune('A' + colIndex))
|
||||
headerCells[neNameCol+"1"] = "NE Name"
|
||||
colIndex++
|
||||
|
||||
timeCol := string(rune('A' + colIndex))
|
||||
headerCells[timeCol+"1"] = "Time"
|
||||
|
||||
// 从第二行开始的数据
|
||||
dataCells := make([]map[string]any, 0)
|
||||
for i, row := range rows {
|
||||
idx := strconv.Itoa(i + 2)
|
||||
|
||||
// 取时间和网元名称
|
||||
timeStr := ""
|
||||
if v, ok := row["timeGroup"]; ok && v != nil {
|
||||
if releaseTime := parse.Number(v); releaseTime > 0 {
|
||||
timeStr = date.ParseDateToStr(releaseTime, date.YYYY_MM_DDTHH_MM_SSZ)
|
||||
} else {
|
||||
timeStr = fmt.Sprintf("%s", v)
|
||||
}
|
||||
}
|
||||
|
||||
neName := ""
|
||||
if v, ok := row["neName"]; ok && v != nil {
|
||||
neName = fmt.Sprintf("%s", v)
|
||||
}
|
||||
|
||||
// 按排序后的顺序填充数据
|
||||
dataCell := map[string]any{}
|
||||
|
||||
// 填充KPI数据
|
||||
for j, kpiId := range sortedKpiIds {
|
||||
value := ""
|
||||
if v, ok := row[kpiId]; ok && v != nil {
|
||||
value = fmt.Sprint(v)
|
||||
}
|
||||
colLetter := string(rune('A' + j))
|
||||
dataCell[colLetter+idx] = value
|
||||
}
|
||||
|
||||
// 填充NE Name和Time
|
||||
dataCell[neNameCol+idx] = neName
|
||||
dataCell[timeCol+idx] = timeStr
|
||||
|
||||
dataCells = append(dataCells, dataCell)
|
||||
}
|
||||
// 导出数据表格
|
||||
if err := file.WriterFileExecl(headerCells, dataCells, filePath, ""); err != nil {
|
||||
logger.Errorf("export operate log err => %v", err.Error())
|
||||
return "export err"
|
||||
}
|
||||
}
|
||||
|
||||
// 上传到FTP服务器
|
||||
if err := s.backupService.FTPPushFile(filePath, ""); err != nil {
|
||||
return "ok, ftp err:" + err.Error()
|
||||
}
|
||||
return "ok"
|
||||
}
|
||||
@@ -3,6 +3,7 @@ package processor
|
||||
import (
|
||||
"be.ems/src/framework/cron"
|
||||
processorBackupExportCDR "be.ems/src/modules/crontask/processor/backup_export_cdr"
|
||||
processorBackupExportKPI "be.ems/src/modules/crontask/processor/backup_export_kpi"
|
||||
processorBackupExportLog "be.ems/src/modules/crontask/processor/backup_export_log"
|
||||
processorBackupExportTable "be.ems/src/modules/crontask/processor/backup_export_table"
|
||||
processorBackupExportUDM "be.ems/src/modules/crontask/processor/backup_export_udm"
|
||||
@@ -60,4 +61,6 @@ func InitCronQueue() {
|
||||
cron.CreateQueue("backup_export_cdr", processorBackupExportCDR.NewProcessor)
|
||||
// 备份-导出Log数据
|
||||
cron.CreateQueue("backup_export_log", processorBackupExportLog.NewProcessor)
|
||||
// 备份-导出KPI数据
|
||||
cron.CreateQueue("backup_export_kpi", processorBackupExportKPI.NewProcessor)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user