Files
be.ems/crontask/db.go
2023-09-06 11:23:09 +08:00

527 lines
17 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package main
import (
"database/sql"
"fmt"
"time"
"ems.agt/lib/log"
_ "github.com/go-sql-driver/mysql"
"xorm.io/xorm"
)
type NullTime sql.NullTime
type DBClient struct {
dbType string
dbUrl string
dbConnMaxLifetime time.Duration
dbMaxIdleConns int
dbMaxOpenConns int
IsShowSQL bool
xEngine *xorm.Engine
}
var dbClient DBClient
func initDbClient() error {
db := yamlConfig.Database
dbClient.dbUrl = fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&parseTime=true&loc=Local",
db.User, db.Password, db.Host, db.Port, db.Name)
dbClient.dbType = db.Type
dbClient.dbConnMaxLifetime = 0
dbClient.dbMaxIdleConns = 0
dbClient.dbMaxOpenConns = 0
if log.GetLevel() == log.LOG_TRACE {
dbClient.IsShowSQL = true
}
var err error
dbClient.xEngine, err = xorm.NewEngine(dbClient.dbType, dbClient.dbUrl)
if err != nil {
log.Error("Failed to connect database:", err)
return err
}
//dbClient.xEngine.Ping() // 可以判断是否能连接
//if err != nil {
// log.Error("Failed to ping database:", err)
// return err
//}
// defer dbClient.xEngine.Close() // 退出后关闭
if dbClient.IsShowSQL == true {
dbClient.xEngine.ShowSQL(true)
}
dbClient.xEngine.SetConnMaxLifetime(dbClient.dbConnMaxLifetime)
dbClient.xEngine.SetMaxIdleConns(dbClient.dbMaxIdleConns)
dbClient.xEngine.SetMaxOpenConns(dbClient.dbMaxOpenConns)
return nil
}
var xEngine *xorm.Engine
func XormConnectDatabaseWithUri(sql string) (*xorm.Engine, error) {
sqlStr := fmt.Sprintf("%s?charset=utf8&parseTime=true&loc=Local", sql)
var err error
xEngine, err = xorm.NewEngine("mysql", sqlStr) //1、Create xorm engine
if err != nil {
fmt.Println("Failed to connect database:", err)
return nil, err
}
xEngine.ShowSQL(true)
return xEngine, nil
}
type NeInfo struct {
Id int `json:"-" xorm:"pk 'id' autoincr"`
NeType string `json:"neType" xorm:"ne_type"`
NeId string `json:"neId" xorm:"ne_id"` // neUID/rmUID 网元唯一标识
RmUID string `json:"rmUID" xorm:"rm_uid"` // neUID/rmUID网元UID
NeName string `json:"neName" xorm:"ne_name"` // NeName/UserLabel 网元名称/网元设备友好名称
Ip string `json:"ip" xorm:"ip"`
Port string `json:"port" xorm:"port"`
PvFlag string `json:"pvFlag" xorm:"pv_flag"` // 网元虚实性标识 VNF/PNF: 虚拟/物理
NeAddress string `json:"neAddress" xorm:"ne_address"` // 只对PNF
Province string `json:"province" xorm:"province"` // 网元所在省份
VendorName string `json:"vendorName" xorm:"vendor_name"` // 厂商名称
Dn string `json:"dn" xorm:"dn"` // 网络标识
Status int `json:"status" xorm:"status"`
UpdateTime string `json:"-" xorm:"-"`
}
func XormGetNeInfoByType(neType string, nes *[]NeInfo) (*[]NeInfo, error) {
log.Debug("XormGetNeInfoByType processing... ")
ne := new(NeInfo)
rows, err := dbClient.xEngine.Table("ne_info").Where("status=0 and ne_type =?", neType).Rows(ne)
if err != nil {
log.Error("Failed to get table ne_info from database:", err)
return nil, err
}
defer rows.Close()
for rows.Next() {
err := rows.Scan(ne)
if err != nil {
log.Error("Failed to get table ne_info from database:", err)
return nil, err
}
*nes = append(*nes, *ne)
}
log.Debug("nes:", nes)
return nes, nil
}
func XormGetAllNeInfo(nes *[]NeInfo) (*[]NeInfo, error) {
log.Debug("XormGetAllNeInfo processing... ")
ne := new(NeInfo)
rows, err := dbClient.xEngine.Table("ne_info").Where("status='0'").Rows(ne)
if err != nil {
log.Error("Failed to get table ne_info from database:", err)
return nil, err
}
defer rows.Close()
for rows.Next() {
err := rows.Scan(ne)
if err != nil {
log.Error("Failed to get table ne_info from database:", err)
return nil, err
}
*nes = append(*nes, *ne)
}
log.Debug("nes:", nes)
return nes, nil
}
type NeState struct {
Id int `json:"id" xorm:"pk 'id' autoincr"`
NeType string `json:"neType" xorm:"ne_type"`
NeId string `json:"neId" xorm:"ne_id"`
Version string `json:"version" xorm:"version"`
Capability uint32 `json:"capability" xorm:"capability"`
SerialNum string `json:"serialNum" xorm:"serial_num"`
ExpiryDate string `json:"expiryDate" xorm:"expiry_date"`
CpuUsage string `json:"cpuUsage" xorm:"cpu_usage"`
MemUsage string `json:"memUsage" xorm:"mem_usage"`
DiskSpace string `json:"diskSpace" xorm:"disk_space"`
Timestamp string `json:"timestamp" xorm:"-" `
}
func XormInsertNeState(neState *NeState) (int64, error) {
log.Debug("XormInsertNeState processing... ")
var affected int64 = 0
session := dbClient.xEngine.NewSession()
defer session.Close()
affected, err := session.InsertOne(neState)
session.Commit()
return affected, err
}
type NorthboundPm struct {
Id int `json:"-" xorm:"pk 'id' autoincr"`
Date string `json:"Date" xorm:"date"`
Index int `json:"Index" xorm:"index"` // 1天中测量时间粒度(如15分钟)的切片索引: 0~95
Timestamp string `json:"-" xorm:"-"`
NeName string `json:"NeName" xorm:"ne_name"` // UserLabel
RmUID string `json:"RmUID" xorm:"rm_uid"`
NeType string `json:"NeType" xorm:"ne_type"` // 网元类型
PmVersion string `json:"PmVersion" xorm:"pm_version"` // 性能数据版本号
Dn string `json:"Dn" xorm:"dn"` // (???)网元标识, 如:RJN-CMZJ-TZ,SubNetwork=5GC88,ManagedElement=SMF53456,SmfFunction=53456
Period string `json:"Period" xorm:"period"` // 测量时间粒度选项5/15/30/60
TimeZone string `json:"TimeZone" xorm:"time_zone"`
StartTime string `json:"StartTime" xorm:"start_time"`
Datas []struct {
ObjectType string `json:"ObjectType" xorm:"object_type"` // 网络资源类别名称, Pm指标项列表中为空间粒度 如SmfFunction
KPIs []struct {
KPIID string `json:"KPIID" xorm:"pm_name"` // 指标项, 如: SMF.AttCreatePduSession._Dnn
KPIValues []struct {
Name string `json:"Name" xorm:"name"` // 单个的写"Total", 或者指标项有多个测量项如Dnn的名称写对应的Dnn"cmnet"/"ims"
Value int `json:"Value" xorm:"value"`
} `json:"KPIValues" xorm:"sub_datas"`
} `json:"KPIs" xorm:"pm_datas"`
} `json:"Datas" xorm:"datas"`
}
func XormInsertNorthboundPm(pm *NorthboundPm) (int64, error) {
log.Debug("XormInsertNorthboundPm processing... ")
var affected int64 = 0
session := dbClient.xEngine.NewSession()
defer session.Close()
affected, err := session.InsertOne(pm)
session.Commit()
return affected, err
}
func XormGetNorthboundPm(date string, index int, neType string, pms *[]NorthboundPm) (*[]NorthboundPm, error) {
log.Debug("XormGetNorthboundPm processing... ")
pm := new(NorthboundPm)
rows, err := dbClient.xEngine.Table("northbound_pm").
Where("`ne_type` = ? AND `date` = ? AND `index` = ?", neType, date, index).
Rows(pm)
if err != nil {
log.Error("Failed to get table northbound_pm from database:", err)
return nil, err
}
defer rows.Close()
for rows.Next() {
err := rows.Scan(pm)
if err != nil {
log.Error("Failed to get table northbound_pm from database:", err)
return nil, err
}
*pms = append(*pms, *pm)
}
log.Debug("pms:", pms)
return pms, nil
}
func XormGetMeasureThreshold(tableName string, where string, datas *[]MeasureThreshold) (*[]MeasureThreshold, error) {
log.Debug("XormGetMeasureThreshold processing... ")
row := new(MeasureThreshold)
rows, err := dbClient.xEngine.Table(tableName).Where(where).Rows(row)
if err != nil {
log.Errorf("Failed to get table %s from database: %v", tableName, err)
return nil, err
}
defer rows.Close()
for rows.Next() {
err := rows.Scan(row)
if err != nil {
log.Error("Failed to get table measure_threshold from database:", err)
return nil, err
}
*datas = append(*datas, *row)
}
log.Debug("datas:", datas)
return datas, nil
}
type MeasureThreshold struct {
Id int `json:"id" xorm:"pk 'id' autoincr"`
NeType string `json:"neType" xorm:"ne_type"`
KpiSet string `json:"kpiSet" xorm:"kpi_set"`
Threshold int64 `json:"threshold" xorm:"threshold"`
Status string `json:"status" xorm:"Status"`
OrigSeverity string `json:"origSeverity" xorm:"orig_severity"`
AlarmCode string `json:"alarmCode" xorm:"alarm_code"`
AlarmFlag bool `json:"alarmFlag" xorm:"alarm_flag"`
}
type MeasureData struct {
// Id int `json:"id" xorm:"pk 'id' autoincr"`
Id int `json:"id" xorm:"-"`
Date string `json:"date" xorm:"date"`
TaskId int `json:"taskId"`
NeType string `json:"neType" xorm:"ne_type"`
NeName string `json:"neName" xorm:"ne_name"`
RmUid string `json:"rmUid" xorm:"rm_uid"`
GranulOption string `json:"granulOption" xorm:"granul_option"`
StartTime string `json:"startTime"`
EndTime string `json:"endTime"`
KpiCode string `json:"kpiCode" xorm:"kpi_code"`
KpiId string `json:"kpiId" xorm:"kpi_id"`
KpiExt string `json:"kpiExt" xorm:"kpi_ext"`
Value int64 `json:"value"`
Timestamp string `json:"timestamp"`
}
func XormGetMeasureData(where string, datas *[]MeasureData) (*[]MeasureData, error) {
log.Debug("XormGetMeasureData processing... ")
row := new(MeasureData)
rows, err := dbClient.xEngine.Where(where).Rows(row)
if err != nil {
log.Errorf("Failed to get table measure_data from database: %v", err)
return nil, err
}
defer rows.Close()
for rows.Next() {
err := rows.Scan(row)
if err != nil {
log.Error("Failed to get table measure_data from database:", err)
return nil, err
}
*datas = append(*datas, *row)
}
log.Debug("datas:", datas)
return datas, nil
}
func XormGetMeasureDataLastOne(neType, rmUID string, taskId int) (*MeasureData, error) {
log.Debug("XormGetMeasureDataOneByKpi processing... ")
measureData := new(MeasureData)
_, err := dbClient.xEngine.
SQL("select * from measure_data where ne_type=? and rm_uid=? and task_id=? order by start_time desc limit 1", neType, rmUID, taskId).
Get(measureData)
if err != nil {
log.Errorf("Failed to get measure_data: %v", err)
return nil, err
}
return measureData, nil
}
func XormGetMeasureDataOneByKpi(kpi string) (*MeasureData, error) {
log.Debug("XormGetMeasureDataOneByKpi processing... ")
measureData := new(MeasureData)
_, err := dbClient.xEngine.
SQL("select * from measure_data where kpi_id = ? order by timestamp desc limit 1", kpi).
Get(measureData)
if err != nil {
log.Errorf("Failed to get table measure_data from database: %v", err)
return nil, err
}
log.Debug("measureData:", measureData)
return measureData, nil
}
type AlarmDefine struct {
AlarmId string `json:"alarmId" xorm:"alarm_id"`
AlarmCode int `json:"alarmCode" xorm:"alarm_code"`
AlarmTitle string `json:"alarmTitle" xorm:"alarm_title"`
NeType string `json:"neType" xorm:"ne_type"`
AlarmType string `json:"alarmType" xorm:"alarm_type"`
OrigSeverity string `json:"origSeverity" xorm:"orig_severity"`
ObjectUid string `json:"objectUid" xorm:"object_uid"`
ObjectName string `json:"objectName" xorm:"object_name"`
ObjectType string `json:"objectType" xorm:"object_type"`
LocationInfo string `json:"locationInfo"`
SpecificProblem string `json:"specificProblem"`
SpecificProblemId string `json:"specificProblemId" xorm:"specific_problem_id"`
AddInfo string `json:"addInfo" xorm:"add_info"`
Threshold int64 `json:"threshold" xorm:"threshold"`
Status string `json:"status" xorm:"status"`
}
func XormGetAlarmDefine(alarmCode string) (*AlarmDefine, error) {
log.Debug("XormGetAlarmDefine processing... ")
alarmDefine := new(AlarmDefine)
_, err := dbClient.xEngine.
Where("alarm_code=? and status='Active'", alarmCode).
Get(alarmDefine)
if err != nil {
log.Error("Failed to get table alarm_define from database:", err)
return nil, err
}
return alarmDefine, nil
}
const (
AlarmStatusClear = 0
AlarmStatusActive = 1
AlarmStatusClearString = "0"
AlarmStatusActiveString = "1"
)
const (
ClearTypeUnclear = 0
ClearTypeAutoClear = 1
ClearTypeManualClear = 2
)
type Alarm struct {
Id int `json:"-" xorm:"pk 'id' autoincr"`
AlarmSeq int `json:"alarmSeq"`
AlarmId string `json:"alarmId" xorm:"alarm_id"`
NeId string `json:"neId"`
AlarmCode int `json:"alarmCode"`
AlarmTitle string `json:"alarmTitle"`
EventTime string `json:"eventTime"`
AlarmType string `json:"alarmType"`
OrigSeverity string `json:"origSeverity"`
PerceivedSeverity string `json:"perceivedSeverity"`
PVFlag string `json:"pvFlag" xorm:"pv_flag"`
NeName string `json:"neName"`
NeType string `json:"neType"`
ObjectUid string `json:"objectUid" xorm:"object_uid"`
ObjectName string `json:"objectName" xorm:"object_name"`
ObjectType string `json:"objectType" xorm:"object_type"`
LocationInfo string `json:"locationInfo"`
Province string `json:"province"`
AlarmStatus int `json:"alarmStatus" xorm:"alarm_status"`
SpecificProblem string `json:"specificProblem"`
SpecificProblemID string `json:"specificProblemID" xorm:"specific_problem_id"`
AddInfo string `json:"addInfo"`
// ClearType int `json:"-" xorm:"clear_type"` // 0: Unclear, 1: Auto clear, 2: Manual clear
// ClearTime sql.NullTime `json:"-" xorm:"clear_time"`
}
type AlarmLog struct {
NeType string `json:"neType" xorm:"ne_type"`
NeId string `json:"neId" xorm:"ne_id"`
AlarmSeq string `json:"alarmSeq" xorm:"alarm_seq"`
AlarmId string `json:"alarmId" xorm:"alarm_id"`
AlarmCode int `json:"alarmCode" xorm:"alarm_code"`
AlarmStatus int `json:"alarmStatus" xorm:"alarm_status"`
EventTime string `json:"eventTime" xorm:"event_time"`
// ClearTime sql.NullTime `json:"clearTime" xorm:"clear_time"`
LogTime string `json:"logTime" xorm:"-"`
}
func XormInsertAlarm(alarm *Alarm) (int64, error) {
log.Debug("XormInsertAlarm processing... ")
var affected int64 = 0
session := dbClient.xEngine.NewSession()
defer session.Close()
affected, err := session.InsertOne(alarm)
session.Commit()
return affected, err
}
func XormInsertTalbeOne(tbInfo interface{}) (int64, error) {
log.Debug("XormInsertTalbeOne processing... ")
var affected int64 = 0
session := dbClient.xEngine.NewSession()
defer session.Close()
affected, err := session.InsertOne(tbInfo)
session.Commit()
return affected, err
}
func XormGetDataBySQL(sql string) (*[]map[string]string, error) {
log.Debug("XormGetDataBySQL processing... ")
rows := make([]map[string]string, 0)
rows, err := dbClient.xEngine.QueryString(sql)
if err != nil {
log.Errorf("Failed to QueryString:", err)
return nil, err
}
return &rows, nil
}
func XormGetTableOneByWhere(where string, tableName string) (*[]interface{}, error) {
log.Debug("XormGetTableOneByWhere processing... ")
row := new([]interface{})
tb, err := dbClient.xEngine.TableInfo(tableName)
if err != nil {
log.Error("Failed to get TableInfo:", err)
return nil, err
}
columns := tb.Columns()
log.Debug("columns:", columns)
has, err := dbClient.xEngine.Table(tableName).Where(where).Get(row)
if err != nil {
log.Errorf("Failed to get table %s from database:%v", tableName, err)
return nil, err
} else if has == false {
log.Infof("Not found data from %s where=%s", tableName, where)
return nil, nil
}
log.Debugf("%s:%v", tableName, row)
return row, nil
}
func XormGetTableOneById(id int, tableName string) (*[]interface{}, error) {
log.Debug("XormGetTableOneById processing... ")
rows := new([]interface{})
has, err := dbClient.xEngine.Table(tableName).ID(id).Get(rows)
if err != nil {
log.Errorf("Failed to get table %s from database:id=%d, %v", tableName, id, err)
return nil, err
} else if has == false {
log.Infof("Not found table %s from database:id=%d", tableName, id)
return nil, nil
}
log.Debugf("%s:%v", tableName, rows)
return rows, nil
}
func XormUpdateTableById(id int, tableName string, tbInfo interface{}, cols ...string) (int64, error) {
log.Debug("XormUpdateTableById processing... ")
session := dbClient.xEngine.NewSession()
defer session.Close()
affected, err := session.Table(tableName).ID(id).MustCols(cols...).Update(tbInfo)
if err != nil {
log.Errorf("Failed to update table %s from database:%v", tableName, err)
return 0, err
}
session.Commit()
return affected, nil
}
func XormUpdateTableByWhere(where string, tableName string, tbInfo interface{}) (int64, error) {
log.Debug("XormUpdateTableByWhere processing... ")
session := dbClient.xEngine.NewSession()
defer session.Close()
affected, err := session.Table(tableName).Where(where).Update(tbInfo)
if err != nil {
log.Errorf("Failed to update table %s from database:%v", tableName, err)
return 0, err
}
session.Commit()
return affected, nil
}