1
0

chore: nbi pm xlxs generate to json schema

This commit is contained in:
simon
2025-08-11 17:00:39 +08:00
parent 16ef83f04b
commit 847d9ab781
7 changed files with 400 additions and 0 deletions

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

175
docs/nbi/generate_ne_pm.py Normal file
View File

@@ -0,0 +1,175 @@
# -*- coding: utf-8 -*-
import pandas as pd
import json
import random
import os
import glob
def generate_pm_json(excel_file_path, output_file_path):
"""
从 Excel 文件中提取英文名称和空间粒度,生成 PM JSON 文件
"""
try:
# 检查文件是否存在
if not os.path.exists(excel_file_path):
print(f"错误:文件 {excel_file_path} 不存在")
return False
print(f"正在读取 Excel 文件: {excel_file_path}")
# 读取 Excel 文件的所有工作表
excel_data = pd.read_excel(excel_file_path, sheet_name=None)
# 使用字典按 granularity 分组
granularity_groups = {}
processed_sheets = 0
# 遍历所有工作表,查找包含"英文名称"列的表格
for sheet_name, df in excel_data.items():
print(f" 处理工作表: {sheet_name}")
# 检查是否包含"英文名称"列
if '英文名称' in df.columns:
processed_sheets += 1
print(f" - 找到英文名称列,共 {len(df)} 行数据")
# 检查是否有空间粒度列
granularity_col = None
for col in df.columns:
if '空间粒度' in str(col) or '粒度' in str(col):
granularity_col = col
break
# 用于记录当前sheet中最后一个不为空的空间粒度
last_valid_granularity = "DefaultFunction"
for index, row in df.iterrows():
english_name = row['英文名称']
# 跳过空值和非字符串值
if pd.notna(english_name) and str(english_name).strip() and str(english_name) != 'nan':
english_name = str(english_name).strip()
# 获取空间粒度信息
current_granularity = None
if granularity_col and pd.notna(row[granularity_col]):
current_granularity = str(row[granularity_col]).strip()
if current_granularity: # 确保不是空字符串
last_valid_granularity = current_granularity
# 如果当前行的空间粒度为空,使用最后一个有效的空间粒度
granularity = last_valid_granularity
# 生成随机值
random_value = random.randint(0, 16)
kpi = {
"KPIID": english_name,
"KPIValues": [
{
"Name": "Total",
"Value": random_value
}
]
}
# 按 granularity 分组
if granularity not in granularity_groups:
granularity_groups[granularity] = []
granularity_groups[granularity].append(kpi)
# 显示使用的粒度信息
granularity_info = ""
if current_granularity:
granularity_info = f" (当前: {granularity})"
else:
granularity_info = f" (继承: {granularity})"
print(f" 添加 KPI: {english_name} = {random_value}{granularity_info}")
else:
print(f" - 跳过(无英文名称列)")
# 生成最终的 JSON 结构
result = []
for granularity, kpis in granularity_groups.items():
result.append({
"ObjectType": granularity,
"KPIs": kpis
})
# 写入文件
with open(output_file_path, 'w', encoding='utf-8') as f:
json.dump(result, f, indent=4, ensure_ascii=False)
print(f" 成功生成文件: {output_file_path}")
print(f" 处理了 {processed_sheets} 个工作表")
print(f" 生成了 {len(granularity_groups)} 个对象类型")
for granularity, kpis in granularity_groups.items():
print(f" - {granularity}: {len(kpis)} 个 KPI")
return True
except Exception as e:
print(f"错误: {str(e)}")
import traceback
traceback.print_exc()
return False
def main():
print("PM JSON 批量生成器")
print("=" * 60)
# 定义要处理的文件列表
pm_files = [
"AMF-PM(V1.1.5)-Company-Version00.xlsx",
"PCF-PM(V1.1.4)-Company-Version00.xlsx",
"SMF-PM(V1.2.1)-Company-Version00.xlsx",
"UDM-PM(V1.2.2)-Company-Version00.xlsx",
"UPF-PM(V1.2.1)-Company-Version00.xlsx"
]
# 也可以自动发现当前目录下的所有 PM Excel 文件
# pm_files = glob.glob("*-PM(*)-Company-Version*.xlsx")
successful_files = 0
failed_files = 0
for excel_file in pm_files:
print(f"\n处理文件: {excel_file}")
print("-" * 50)
# 检查文件是否存在
if not os.path.exists(excel_file):
print(f" 警告:文件 {excel_file} 不存在,跳过")
failed_files += 1
continue
# 生成输出文件名
base_name = os.path.splitext(excel_file)[0]
output_file = f"{base_name}-generated.json"
# 处理文件
if generate_pm_json(excel_file, output_file):
successful_files += 1
else:
failed_files += 1
# 汇总结果
print("\n" + "=" * 60)
print("批量处理完成!")
print(f"成功处理: {successful_files} 个文件")
print(f"失败/跳过: {failed_files} 个文件")
print(f"总计: {successful_files + failed_files} 个文件")
if successful_files > 0:
print("\n生成的文件列表:")
for excel_file in pm_files:
if os.path.exists(excel_file):
base_name = os.path.splitext(excel_file)[0]
output_file = f"{base_name}-generated.json"
if os.path.exists(output_file):
print(f" - {output_file}")
if __name__ == "__main__":
main()

225
docs/nbi/pm_generator.go Normal file
View File

@@ -0,0 +1,225 @@
package main
import (
"encoding/json"
"fmt"
"math/rand"
"os"
"path/filepath"
"strings"
)
// KPIValue 表示单个 KPI 值
type KPIValue struct {
Name string `json:"Name"`
Value int `json:"Value"`
}
// KPI 表示单个 KPI 项
type KPI struct {
KPIID string `json:"KPIID"`
KPIValues []KPIValue `json:"KPIValues"`
}
// PMObject 表示性能管理对象
type PMObject struct {
ObjectType string `json:"ObjectType"`
KPIs []KPI `json:"KPIs"`
}
// PMData 表示完整的性能管理数据
type PMData []PMObject
// generateRandomValue 生成 0-16 之间的随机数
func generateRandomValue() int {
return rand.Intn(17) // 0-16
}
// generatePMDataFromSchema 从 schema 文件生成随机数据
func generatePMDataFromSchema(schemaFile string, outputFile string) error {
// 读取 schema 文件
schemaData, err := os.ReadFile(schemaFile)
if err != nil {
return fmt.Errorf("读取 schema 文件失败: %v", err)
}
// 解析 JSON
var pmSchema PMData
if err := json.Unmarshal(schemaData, &pmSchema); err != nil {
return fmt.Errorf("解析 JSON 失败: %v", err)
}
// 生成新的随机数据
var newPMData PMData
for _, pmObj := range pmSchema {
newObj := PMObject{
ObjectType: pmObj.ObjectType,
KPIs: make([]KPI, len(pmObj.KPIs)),
}
for i, kpi := range pmObj.KPIs {
newKPI := KPI{
KPIID: kpi.KPIID,
KPIValues: make([]KPIValue, len(kpi.KPIValues)),
}
for j, kpiVal := range kpi.KPIValues {
newKPI.KPIValues[j] = KPIValue{
Name: kpiVal.Name,
Value: generateRandomValue(),
}
}
newObj.KPIs[i] = newKPI
}
newPMData = append(newPMData, newObj)
}
// 写入新文件
outputData, err := json.MarshalIndent(newPMData, "", " ")
if err != nil {
return fmt.Errorf("序列化 JSON 失败: %v", err)
}
if err := os.WriteFile(outputFile, outputData, 0644); err != nil {
return fmt.Errorf("写入文件失败: %v", err)
}
fmt.Printf("成功生成随机数据文件: %s\n", outputFile)
return nil
}
// generatePMDataFromMultipleSchemas 批量处理多个 schema 文件
func generatePMDataFromMultipleSchemas(schemaDir string) error {
// 查找所有 *-generated.json 文件
pattern := filepath.Join(schemaDir, "*-generated.json")
schemaFiles, err := filepath.Glob(pattern)
if err != nil {
return fmt.Errorf("查找 schema 文件失败: %v", err)
}
if len(schemaFiles) == 0 {
return fmt.Errorf("未找到任何 schema 文件 (pattern: %s)", pattern)
}
fmt.Printf("找到 %d 个 schema 文件\n", len(schemaFiles))
successCount := 0
failCount := 0
for _, schemaFile := range schemaFiles {
// 生成输出文件名:将 "-generated.json" 替换为 "-random.json"
outputFile := strings.Replace(schemaFile, "-generated.json", "-random.json", 1)
fmt.Printf("\n处理文件: %s\n", filepath.Base(schemaFile))
fmt.Printf("输出文件: %s\n", filepath.Base(outputFile))
if err := generatePMDataFromSchema(schemaFile, outputFile); err != nil {
fmt.Printf("错误: %v\n", err)
failCount++
} else {
successCount++
}
}
fmt.Printf("\n处理完成! 成功: %d, 失败: %d\n", successCount, failCount)
return nil
}
// printPMDataSummary 打印 PM 数据摘要信息
func printPMDataSummary(file string) error {
data, err := os.ReadFile(file)
if err != nil {
return err
}
var pmData PMData
if err := json.Unmarshal(data, &pmData); err != nil {
return err
}
fmt.Printf("\n文件: %s\n", filepath.Base(file))
fmt.Printf("对象类型数量: %d\n", len(pmData))
totalKPIs := 0
for _, obj := range pmData {
fmt.Printf(" - %s: %d 个 KPI\n", obj.ObjectType, len(obj.KPIs))
totalKPIs += len(obj.KPIs)
}
fmt.Printf("总 KPI 数量: %d\n", totalKPIs)
return nil
}
// 示例用法函数
func main() {
// 设置随机种子
// rand.Seed(time.Now().UnixNano())
if len(os.Args) < 2 {
fmt.Println("用法:")
fmt.Println(" go run pm_generator.go <command> [args]")
fmt.Println("")
fmt.Println("命令:")
fmt.Println(" batch <directory> - 批量处理目录下的所有 *-generated.json 文件")
fmt.Println(" single <input> <output> - 处理单个文件")
fmt.Println(" summary <file> - 显示文件摘要信息")
fmt.Println("")
fmt.Println("示例:")
fmt.Println(" go run pm_generator.go batch .")
fmt.Println(" go run pm_generator.go single AMF-PM-generated.json AMF-PM-random.json")
fmt.Println(" go run pm_generator.go summary AMF-PM-random.json")
return
}
command := os.Args[1]
switch command {
case "batch":
dir := "."
if len(os.Args) > 2 {
dir = os.Args[2]
}
fmt.Println("PM 数据随机生成器 - 批量模式")
fmt.Println(strings.Repeat("=", 50))
if err := generatePMDataFromMultipleSchemas(dir); err != nil {
fmt.Printf("错误: %v\n", err)
os.Exit(1)
}
case "single":
if len(os.Args) < 4 {
fmt.Println("用法: go run pm_generator.go single <input_file> <output_file>")
os.Exit(1)
}
inputFile := os.Args[2]
outputFile := os.Args[3]
fmt.Printf("处理单个文件: %s -> %s\n", inputFile, outputFile)
if err := generatePMDataFromSchema(inputFile, outputFile); err != nil {
fmt.Printf("错误: %v\n", err)
os.Exit(1)
}
case "summary":
if len(os.Args) < 3 {
fmt.Println("用法: go run pm_generator.go summary <file>")
os.Exit(1)
}
file := os.Args[2]
if err := printPMDataSummary(file); err != nil {
fmt.Printf("错误: %v\n", err)
os.Exit(1)
}
default:
fmt.Printf("未知命令: %s\n", command)
os.Exit(1)
}
}