fix: export table file

This commit is contained in:
2024-08-28 18:22:11 +08:00
parent 16eb4e1041
commit f62d7d7d11
3 changed files with 25 additions and 18 deletions

View File

@@ -37,8 +37,8 @@ INSERT INTO `sys_job` VALUES (7, 'job.backupEtcFromNE', 'SYSTEM', 'backupEtcFrom
INSERT INTO `sys_job` VALUES (8, 'job.deleteExpiredNeStateRecord', 'SYSTEM', 'deleteExpiredRecord', '{\"duration\":1,\"tableName\":\"ne_state\",\"colName\":\"timestamp\"}', '0 25 0 * * ?', '3', '0', '1', '1', 'supervisor', 1698478134842, 'admin', 1703668901929, 'job.deleteExpiredNeStateRecordRemark');
INSERT INTO `sys_job` VALUES (9, 'job.getStateFromNE', 'SYSTEM', 'getStateFromNE', '', '0/10 * * * * ?', '3', '0', '0', '0', 'supervisor', 1698478134842, 'admin', 1713231120503, 'job.getStateFromNERemark');
INSERT INTO `sys_job` VALUES (10, 'job.genNeStateAlarm', 'SYSTEM', 'genNeStateAlarm', '{\"alarmID\":\"HXEMSSM10000\",\"alarmCode\":10000,\"alarmTitle\":\"The system state is abnormal\",\"neType\":\"OMC\",\"alarmType\":\"EquipmentAlarm\",\"origSeverity\": \"Major\",\"objectName\":\"EMS;SystemManagement;Heartbeat\",\"objectType\":\"SystemState\",\"specificProblem\":\"Alarm cause: the system state of target NE has not been received for {threshold} seconds\", \"specificProblemID\":\"AC10000\",\"threshold\":30}', '0/5 * * * * ?', '3', '0', '0', '0', 'supervisor', 1698478134842, 'admin', 1713781643031, 'job.genNeStateAlarmRemark');
INSERT INTO `sys_job` VALUES (11, 'job.exportOperateLog', 'SYSTEM', 'exportTable', '{\"duration\":1,\"tableName\":\"sys_log_operate\",\"timeCol\":\"oper_time\",\"timeUnit\":\"milli\",\"columns\":\"oper_id,omc_get_dict_value(title, \\\"i18n_en\\\") as title,business_type,method,request_method,operator_type,oper_name,dept_name,oper_url,oper_ip,oper_location,oper_param,oper_msg,status,oper_time,cost_time,tenant_id\",\"extras\":\"\",\"filePath\":\"/usr/local/omc/backup/operate_log\"}', '0 0 0/1 * * ?', '3', '0', '1', '1', 'supervisor', 1698478134842, 'admin', 1722224659251, '');
INSERT INTO `sys_job` VALUES (12, 'job.exportIMSCDR', 'SYSTEM', 'exportTable', '{\"duration\":1,\"tableName\":\"cdr_event_ims\",\"timeCol\":\"timestamp\",\"timeUnit\":\"second\",\"columns\":\"*\",\"extras\":\"\",\"filePath\":\"/usr/local/omc/backup/ims_cdr\"}', '0 0 0/1 * * ?', '3', '0', '1', '1', 'supervisor', 1698478134842, 'admin', 1722224659251, '');
INSERT INTO `sys_job` VALUES (13, 'job.exportSMFCDR', 'SYSTEM', 'exportTable', '{\"duration\":1,\"tableName\":\"cdr_event_smf\",\"columns\":\"id, ne_type, ne_name, rm_uid, JSON_UNQUOTE(JSON_EXTRACT(cdr_json, \'$.recordType\')) AS record_type, JSON_UNQUOTE(JSON_EXTRACT(cdr_json, \'$.chargingID\')) AS charging_id, JSON_UNQUOTE(JSON_EXTRACT(cdr_json, \'$.subscriberIdentifier.subscriptionIDType\')) AS subscriber_id_type, JSON_UNQUOTE(JSON_EXTRACT(cdr_json, \'$.subscriberIdentifier.subscriptionIDData\')) AS subscriber_id_data, JSON_UNQUOTE(JSON_EXTRACT(cdr_json, \'$.duration\')) AS duration, JSON_UNQUOTE(JSON_EXTRACT(cdr_json, \'$.invocationTimestamp\')) as invocationTimestamp, JSON_UNQUOTE(JSON_EXTRACT(cdr_json, \'$.listOfMultipleUnitUsage[*].usedUnitContainer[*].dataVolumeUplink\')) AS data_volume_uplink, JSON_UNQUOTE(JSON_EXTRACT(cdr_json, \'$.listOfMultipleUnitUsage[*].usedUnitContainer[*].dataVolumeDownlink\')) AS data_volume_downlink, JSON_UNQUOTE(JSON_EXTRACT(cdr_json, \'$.listOfMultipleUnitUsage[*].usedUnitContainer[*].dataTotalVolume\')) AS data_total_volume, JSON_UNQUOTE(JSON_EXTRACT(cdr_json, \'$.pDUSessionChargingInformation.pDUAddress.pDUIPv4Address\')) AS pdu_ipv4_address, timestamp\",\"timeCol\":\"timestamp\",\"timeUnit\":\"second\",\"extras\":\"\",\"filePath\":\"/usr/local/omc/backup/smf_cdr\"}', '0 0 0/1 * * ?', '3', '0', '1', '1', 'supervisor', 1698478134842, 'admin', 1724309047797, '');
INSERT INTO `sys_job` VALUES (11, 'job.exportOperateLog', 'SYSTEM', 'exportTable', '{\"duration\":1,\"tableName\":\"sys_log_operate\",\"timeCol\":\"oper_time\",\"timeUnit\":\"milli\",\"columns\":\"oper_id,omc_get_dict_value(title, \\\"i18n_en\\\") as title,business_type,method,request_method,operator_type,oper_name,dept_name,oper_url,oper_ip,oper_location,oper_param,oper_msg,status,oper_time,cost_time,tenant_id\",\"extras\":\"\",\"filePath\":\"/usr/local/omc/backup/operate_log\"}', '0 0 0/1 * * ?', '3', '0', '1', '1', 'supervisor', 1698478134842, 'admin', 1724833786290, 'job.exportOperateLog');
INSERT INTO `sys_job` VALUES (12, 'job.exportIMSCDR', 'SYSTEM', 'exportTable', '{\"duration\":1,\"tableName\":\"cdr_event_ims\",\"columns\":\"id,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.recordType\')) as record_type,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.callType\')) as call_type,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.callerParty\')) as caller_party,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.calledParty\')) as called_party,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.callDuration\')) as call_duration,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.serviceResult\')) as service_result,DATE_FORMAT(FROM_UNIXTIME(timestamp), \'%Y-%m-%d %H:%i:%s\') AS timestamp\",\"timeCol\":\"timestamp\",\"timeUnit\":\"second\",\"extras\":\"\",\"filePath\":\"/usr/local/omc/backup/ims_cdr\"}', '0 0 0/1 * * ?', '3', '0', '1', '1', 'supervisor', 1698478134842, 'admin', 1722224659251, '');
INSERT INTO `sys_job` VALUES (13, 'job.exportSMFCDR', 'SYSTEM', 'exportTable', '{\"duration\":1,\"tableName\":\"cdr_event_smf\",\"columns\":\"id,ne_type,ne_name,rm_uid,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.recordType\')) AS record_type,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.chargingID\')) AS charging_id,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.subscriberIdentifier.subscriptionIDType\')) AS subscriber_id_type,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.subscriberIdentifier.subscriptionIDData\')) AS subscriber_id_data,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.duration\')) AS duration,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.invocationTimestamp\')) as invocationTimestamp,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.listOfMultipleUnitUsage[*].usedUnitContainer[*].dataVolumeUplink\')) AS data_volume_uplink,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.listOfMultipleUnitUsage[*].usedUnitContainer[*].dataVolumeDownlink\')) AS data_volume_downlink,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.listOfMultipleUnitUsage[*].usedUnitContainer[*].dataTotalVolume\')) AS data_total_volume,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.pDUSessionChargingInformation.pDUAddress.pDUIPv4Address\')) AS pdu_ipv4_address,timestamp\",\"timeCol\":\"timestamp\",\"timeUnit\":\"second\",\"extras\":\"\",\"filePath\":\"/usr/local/omc/backup/smf_cdr\"}', '0 0 0/1 * * ?', '3', '0', '1', '1', 'supervisor', 1698478134842, 'admin', 1724309047797, '');
SET FOREIGN_KEY_CHECKS = 1;

View File

@@ -41,8 +41,8 @@ REPLACE INTO `sys_job` VALUES (7, 'job.backupEtcFromNE', 'SYSTEM', 'backupEtcFro
REPLACE INTO `sys_job` VALUES (8, 'job.deleteExpiredNeStateRecord', 'SYSTEM', 'deleteExpiredRecord', '{\"duration\":1,\"tableName\":\"ne_state\",\"colName\":\"timestamp\"}', '0 25 0 * * ?', '3', '0', '1', '1', 'supervisor', 1698478134842, 'admin', 1703668901929, 'job.deleteExpiredNeStateRecordRemark');
REPLACE INTO `sys_job` VALUES (9, 'job.getStateFromNE', 'SYSTEM', 'getStateFromNE', '', '0/10 * * * * ?', '3', '0', '0', '0', 'supervisor', 1698478134842, 'admin', 1713231120503, 'job.getStateFromNERemark');
REPLACE INTO `sys_job` VALUES (10, 'job.genNeStateAlarm', 'SYSTEM', 'genNeStateAlarm', '{\"alarmID\":\"HXEMSSM10000\",\"alarmCode\":10000,\"alarmTitle\":\"The system state is abnormal\",\"neType\":\"OMC\",\"alarmType\":\"EquipmentAlarm\",\"origSeverity\": \"Major\",\"objectName\":\"EMS;SystemManagement;Heartbeat\",\"objectType\":\"SystemState\",\"specificProblem\":\"Alarm cause: the system state of target NE has not been received for {threshold} seconds\", \"specificProblemID\":\"AC10000\",\"threshold\":30}', '0/5 * * * * ?', '3', '0', '0', '0', 'supervisor', 1698478134842, 'admin', 1713781643031, 'job.genNeStateAlarmRemark');
REPLACE INTO `sys_job` VALUES (11, 'job.exportOperateLog', 'SYSTEM', 'exportTable', '{\"duration\":1,\"tableName\":\"sys_log_operate\",\"timeCol\":\"oper_time\",\"timeUnit\":\"milli\",\"columns\":\"oper_id,omc_get_dict_value(title, \\\"i18n_en\\\") as title,business_type,method,request_method,operator_type,oper_name,dept_name,oper_url,oper_ip,oper_location,oper_param,oper_msg,status,oper_time,cost_time,tenant_id\",\"extras\":\"\",\"filePath\":\"/usr/local/omc/backup/operate_log\"}', '0 0 0/1 * * ?', '3', '0', '1', '1', 'supervisor', 1698478134842, 'admin', 1722224659251, '');
REPLACE INTO `sys_job` VALUES (12, 'job.exportIMSCDR', 'SYSTEM', 'exportTable', '{\"duration\":1,\"tableName\":\"cdr_event_ims\",\"timeCol\":\"timestamp\",\"timeUnit\":\"second\",\"columns\":\"*\",\"extras\":\"\",\"filePath\":\"/usr/local/omc/backup/ims_cdr\"}', '0 0 0/1 * * ?', '3', '0', '1', '1', 'supervisor', 1698478134842, 'admin', 1722224659251, '');
REPLACE INTO `sys_job` VALUES (13, 'job.exportSMFCDR', 'SYSTEM', 'exportTable', '{\"duration\":1,\"tableName\":\"cdr_event_smf\",\"columns\":\"id, ne_type, ne_name, rm_uid, JSON_UNQUOTE(JSON_EXTRACT(cdr_json, \'$.recordType\')) AS record_type, JSON_UNQUOTE(JSON_EXTRACT(cdr_json, \'$.chargingID\')) AS charging_id, JSON_UNQUOTE(JSON_EXTRACT(cdr_json, \'$.subscriberIdentifier.subscriptionIDType\')) AS subscriber_id_type, JSON_UNQUOTE(JSON_EXTRACT(cdr_json, \'$.subscriberIdentifier.subscriptionIDData\')) AS subscriber_id_data, JSON_UNQUOTE(JSON_EXTRACT(cdr_json, \'$.duration\')) AS duration, JSON_UNQUOTE(JSON_EXTRACT(cdr_json, \'$.invocationTimestamp\')) as invocationTimestamp, JSON_UNQUOTE(JSON_EXTRACT(cdr_json, \'$.listOfMultipleUnitUsage[*].usedUnitContainer[*].dataVolumeUplink\')) AS data_volume_uplink, JSON_UNQUOTE(JSON_EXTRACT(cdr_json, \'$.listOfMultipleUnitUsage[*].usedUnitContainer[*].dataVolumeDownlink\')) AS data_volume_downlink, JSON_UNQUOTE(JSON_EXTRACT(cdr_json, \'$.listOfMultipleUnitUsage[*].usedUnitContainer[*].dataTotalVolume\')) AS data_total_volume, JSON_UNQUOTE(JSON_EXTRACT(cdr_json, \'$.pDUSessionChargingInformation.pDUAddress.pDUIPv4Address\')) AS pdu_ipv4_address, timestamp\",\"timeCol\":\"timestamp\",\"timeUnit\":\"second\",\"extras\":\"\",\"filePath\":\"/usr/local/omc/backup/smf_cdr\"}', '0 0 0/1 * * ?', '3', '0', '1', '1', 'supervisor', 1698478134842, 'admin', 1724309047797, '');
REPLACE INTO `sys_job` VALUES (11, 'job.exportOperateLog', 'SYSTEM', 'exportTable', '{\"duration\":1,\"tableName\":\"sys_log_operate\",\"timeCol\":\"oper_time\",\"timeUnit\":\"milli\",\"columns\":\"oper_id,omc_get_dict_value(title, \\\"i18n_en\\\") as title,business_type,method,request_method,operator_type,oper_name,dept_name,oper_url,oper_ip,oper_location,oper_param,oper_msg,status,oper_time,cost_time,tenant_id\",\"extras\":\"\",\"filePath\":\"/usr/local/omc/backup/operate_log\"}', '0 0 0/1 * * ?', '3', '0', '1', '1', 'supervisor', 1698478134842, 'admin', 1724833786290, 'job.exportOperateLog');
REPLACE INTO `sys_job` VALUES (12, 'job.exportIMSCDR', 'SYSTEM', 'exportTable', '{\"duration\":1,\"tableName\":\"cdr_event_ims\",\"columns\":\"id,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.recordType\')) as record_type,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.callType\')) as call_type,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.callerParty\')) as caller_party,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.calledParty\')) as called_party,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.callDuration\')) as call_duration,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.serviceResult\')) as service_result,DATE_FORMAT(FROM_UNIXTIME(timestamp), \'%Y-%m-%d %H:%i:%s\') AS timestamp\",\"timeCol\":\"timestamp\",\"timeUnit\":\"second\",\"extras\":\"\",\"filePath\":\"/usr/local/omc/backup/ims_cdr\"}', '0 0 0/1 * * ?', '3', '0', '1', '1', 'supervisor', 1698478134842, 'admin', 1722224659251, '');
REPLACE INTO `sys_job` VALUES (13, 'job.exportSMFCDR', 'SYSTEM', 'exportTable', '{\"duration\":1,\"tableName\":\"cdr_event_smf\",\"columns\":\"id,ne_type,ne_name,rm_uid,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.recordType\')) AS record_type,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.chargingID\')) AS charging_id,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.subscriberIdentifier.subscriptionIDType\')) AS subscriber_id_type,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.subscriberIdentifier.subscriptionIDData\')) AS subscriber_id_data,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.duration\')) AS duration,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.invocationTimestamp\')) as invocationTimestamp,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.listOfMultipleUnitUsage[*].usedUnitContainer[*].dataVolumeUplink\')) AS data_volume_uplink,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.listOfMultipleUnitUsage[*].usedUnitContainer[*].dataVolumeDownlink\')) AS data_volume_downlink,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.listOfMultipleUnitUsage[*].usedUnitContainer[*].dataTotalVolume\')) AS data_total_volume,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.pDUSessionChargingInformation.pDUAddress.pDUIPv4Address\')) AS pdu_ipv4_address,timestamp\",\"timeCol\":\"timestamp\",\"timeUnit\":\"second\",\"extras\":\"\",\"filePath\":\"/usr/local/omc/backup/smf_cdr\"}', '0 0 0/1 * * ?', '3', '0', '1', '1', 'supervisor', 1698478134842, 'admin', 1724309047797, '');
SET FOREIGN_KEY_CHECKS = 1;

View File

@@ -58,29 +58,36 @@ func (s *BarProcessor) Execute(data any) (any, error) {
}
//duration = params.Duration
// 查询数据
var unitNum int = 0
now := time.Now()
end := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location())
start := end.Add(-time.Duration(params.Duration) * time.Hour)
var startTime, endTime int64
switch params.TimeUnit {
case "second":
unitNum = 1
// 格式化时间戳为秒级
startTime = start.Unix()
endTime = end.Unix()
case "milli":
unitNum = 1000
// 格式化时间戳为毫秒级
startTime = start.UnixMilli()
endTime = end.UnixMilli()
case "micro":
unitNum = 1000000
// 格式化时间戳为微妙级
startTime = start.UnixMicro()
endTime = end.UnixMicro()
default:
return nil, fmt.Errorf("error input parameter")
}
var query string
if params.Extras != "" {
query = fmt.Sprintf("SELECT %s FROM `%s` WHERE `%s` >= UNIX_TIMESTAMP(NOW() - INTERVAL 2*%d HOUR) * %d AND `%s` < UNIX_TIMESTAMP(NOW() - INTERVAL %d HOUR) * %d AND %s",
params.Columns, params.TableName, params.TimeCol, params.Duration, unitNum, params.TimeCol, params.Duration, unitNum, params.Extras)
query = fmt.Sprintf("SELECT %s FROM `%s` WHERE `%s` >= %d AND `%s` < %d AND %s",
params.Columns, params.TableName, params.TimeCol, startTime, params.TimeCol, endTime, params.Extras)
} else {
query = fmt.Sprintf("SELECT %s FROM `%s` WHERE `%s` >= UNIX_TIMESTAMP(NOW() - INTERVAL 2*%d HOUR) * %d AND `%s` < UNIX_TIMESTAMP(NOW() - INTERVAL %d HOUR) * %d",
params.Columns, params.TableName, params.TimeCol, params.Duration, unitNum, params.TimeCol, params.Duration, unitNum)
query = fmt.Sprintf("SELECT %s FROM `%s` WHERE `%s` >= %d AND `%s` < %d",
params.Columns, params.TableName, params.TimeCol, startTime, params.TimeCol, endTime)
}
//log.Trace("query:", query)
//filePath := fmt.Sprintf("%s/%s_export_%s.csv", params.FilePath, params.TableName, time.Now().Format("20060102150405"))
log.Trace("query:", query)
filePath := fmt.Sprintf("%s/%s_export_%s.csv", params.FilePath, params.TableName, time.Now().Format("20060102150405"))
affected, err := s.exportData(query, filePath)
if err != nil {