diff --git a/database/install/sys_job.sql b/database/install/sys_job.sql index 0458ed91..131ad511 100644 --- a/database/install/sys_job.sql +++ b/database/install/sys_job.sql @@ -37,8 +37,8 @@ INSERT INTO `sys_job` VALUES (7, 'job.backupEtcFromNE', 'SYSTEM', 'backupEtcFrom INSERT INTO `sys_job` VALUES (8, 'job.deleteExpiredNeStateRecord', 'SYSTEM', 'deleteExpiredRecord', '{\"duration\":1,\"tableName\":\"ne_state\",\"colName\":\"timestamp\"}', '0 25 0 * * ?', '3', '0', '1', '1', 'supervisor', 1698478134842, 'admin', 1703668901929, 'job.deleteExpiredNeStateRecordRemark'); INSERT INTO `sys_job` VALUES (9, 'job.getStateFromNE', 'SYSTEM', 'getStateFromNE', '', '0/10 * * * * ?', '3', '0', '0', '0', 'supervisor', 1698478134842, 'admin', 1713231120503, 'job.getStateFromNERemark'); INSERT INTO `sys_job` VALUES (10, 'job.genNeStateAlarm', 'SYSTEM', 'genNeStateAlarm', '{\"alarmID\":\"HXEMSSM10000\",\"alarmCode\":10000,\"alarmTitle\":\"The system state is abnormal\",\"neType\":\"OMC\",\"alarmType\":\"EquipmentAlarm\",\"origSeverity\": \"Major\",\"objectName\":\"EMS;SystemManagement;Heartbeat\",\"objectType\":\"SystemState\",\"specificProblem\":\"Alarm cause: the system state of target NE has not been received for {threshold} seconds\", \"specificProblemID\":\"AC10000\",\"threshold\":30}', '0/5 * * * * ?', '3', '0', '0', '0', 'supervisor', 1698478134842, 'admin', 1713781643031, 'job.genNeStateAlarmRemark'); -INSERT INTO `sys_job` VALUES (11, 'job.exportOperateLog', 'SYSTEM', 'exportTable', '{\"duration\":1,\"tableName\":\"sys_log_operate\",\"timeCol\":\"oper_time\",\"timeUnit\":\"milli\",\"columns\":\"oper_id,omc_get_dict_value(title, \\\"i18n_en\\\") as title,business_type,method,request_method,operator_type,oper_name,dept_name,oper_url,oper_ip,oper_location,oper_param,oper_msg,status,oper_time,cost_time,tenant_id\",\"extras\":\"\",\"filePath\":\"/usr/local/omc/backup/operate_log\"}', '0 0 0/1 * * ?', '3', '0', '1', '1', 'supervisor', 1698478134842, 'admin', 1722224659251, ''); -INSERT INTO `sys_job` VALUES (12, 'job.exportIMSCDR', 'SYSTEM', 'exportTable', '{\"duration\":1,\"tableName\":\"cdr_event_ims\",\"timeCol\":\"timestamp\",\"timeUnit\":\"second\",\"columns\":\"*\",\"extras\":\"\",\"filePath\":\"/usr/local/omc/backup/ims_cdr\"}', '0 0 0/1 * * ?', '3', '0', '1', '1', 'supervisor', 1698478134842, 'admin', 1722224659251, ''); -INSERT INTO `sys_job` VALUES (13, 'job.exportSMFCDR', 'SYSTEM', 'exportTable', '{\"duration\":1,\"tableName\":\"cdr_event_smf\",\"columns\":\"id, ne_type, ne_name, rm_uid, JSON_UNQUOTE(JSON_EXTRACT(cdr_json, \'$.recordType\')) AS record_type, JSON_UNQUOTE(JSON_EXTRACT(cdr_json, \'$.chargingID\')) AS charging_id, JSON_UNQUOTE(JSON_EXTRACT(cdr_json, \'$.subscriberIdentifier.subscriptionIDType\')) AS subscriber_id_type, JSON_UNQUOTE(JSON_EXTRACT(cdr_json, \'$.subscriberIdentifier.subscriptionIDData\')) AS subscriber_id_data, JSON_UNQUOTE(JSON_EXTRACT(cdr_json, \'$.duration\')) AS duration, JSON_UNQUOTE(JSON_EXTRACT(cdr_json, \'$.invocationTimestamp\')) as invocationTimestamp, JSON_UNQUOTE(JSON_EXTRACT(cdr_json, \'$.listOfMultipleUnitUsage[*].usedUnitContainer[*].dataVolumeUplink\')) AS data_volume_uplink, JSON_UNQUOTE(JSON_EXTRACT(cdr_json, \'$.listOfMultipleUnitUsage[*].usedUnitContainer[*].dataVolumeDownlink\')) AS data_volume_downlink, JSON_UNQUOTE(JSON_EXTRACT(cdr_json, \'$.listOfMultipleUnitUsage[*].usedUnitContainer[*].dataTotalVolume\')) AS data_total_volume, JSON_UNQUOTE(JSON_EXTRACT(cdr_json, \'$.pDUSessionChargingInformation.pDUAddress.pDUIPv4Address\')) AS pdu_ipv4_address, timestamp\",\"timeCol\":\"timestamp\",\"timeUnit\":\"second\",\"extras\":\"\",\"filePath\":\"/usr/local/omc/backup/smf_cdr\"}', '0 0 0/1 * * ?', '3', '0', '1', '1', 'supervisor', 1698478134842, 'admin', 1724309047797, ''); +INSERT INTO `sys_job` VALUES (11, 'job.exportOperateLog', 'SYSTEM', 'exportTable', '{\"duration\":1,\"tableName\":\"sys_log_operate\",\"timeCol\":\"oper_time\",\"timeUnit\":\"milli\",\"columns\":\"oper_id,omc_get_dict_value(title, \\\"i18n_en\\\") as title,business_type,method,request_method,operator_type,oper_name,dept_name,oper_url,oper_ip,oper_location,oper_param,oper_msg,status,oper_time,cost_time,tenant_id\",\"extras\":\"\",\"filePath\":\"/usr/local/omc/backup/operate_log\"}', '0 0 0/1 * * ?', '3', '0', '1', '1', 'supervisor', 1698478134842, 'admin', 1724833786290, 'job.exportOperateLog'); +INSERT INTO `sys_job` VALUES (12, 'job.exportIMSCDR', 'SYSTEM', 'exportTable', '{\"duration\":1,\"tableName\":\"cdr_event_ims\",\"columns\":\"id,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.recordType\')) as record_type,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.callType\')) as call_type,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.callerParty\')) as caller_party,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.calledParty\')) as called_party,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.callDuration\')) as call_duration,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.serviceResult\')) as service_result,DATE_FORMAT(FROM_UNIXTIME(timestamp), \'%Y-%m-%d %H:%i:%s\') AS timestamp\",\"timeCol\":\"timestamp\",\"timeUnit\":\"second\",\"extras\":\"\",\"filePath\":\"/usr/local/omc/backup/ims_cdr\"}', '0 0 0/1 * * ?', '3', '0', '1', '1', 'supervisor', 1698478134842, 'admin', 1722224659251, ''); +INSERT INTO `sys_job` VALUES (13, 'job.exportSMFCDR', 'SYSTEM', 'exportTable', '{\"duration\":1,\"tableName\":\"cdr_event_smf\",\"columns\":\"id,ne_type,ne_name,rm_uid,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.recordType\')) AS record_type,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.chargingID\')) AS charging_id,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.subscriberIdentifier.subscriptionIDType\')) AS subscriber_id_type,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.subscriberIdentifier.subscriptionIDData\')) AS subscriber_id_data,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.duration\')) AS duration,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.invocationTimestamp\')) as invocationTimestamp,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.listOfMultipleUnitUsage[*].usedUnitContainer[*].dataVolumeUplink\')) AS data_volume_uplink,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.listOfMultipleUnitUsage[*].usedUnitContainer[*].dataVolumeDownlink\')) AS data_volume_downlink,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.listOfMultipleUnitUsage[*].usedUnitContainer[*].dataTotalVolume\')) AS data_total_volume,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.pDUSessionChargingInformation.pDUAddress.pDUIPv4Address\')) AS pdu_ipv4_address,timestamp\",\"timeCol\":\"timestamp\",\"timeUnit\":\"second\",\"extras\":\"\",\"filePath\":\"/usr/local/omc/backup/smf_cdr\"}', '0 0 0/1 * * ?', '3', '0', '1', '1', 'supervisor', 1698478134842, 'admin', 1724309047797, ''); SET FOREIGN_KEY_CHECKS = 1; diff --git a/database/upgrade/upg_sys_job.sql b/database/upgrade/upg_sys_job.sql index e673e29c..48332084 100644 --- a/database/upgrade/upg_sys_job.sql +++ b/database/upgrade/upg_sys_job.sql @@ -41,8 +41,8 @@ REPLACE INTO `sys_job` VALUES (7, 'job.backupEtcFromNE', 'SYSTEM', 'backupEtcFro REPLACE INTO `sys_job` VALUES (8, 'job.deleteExpiredNeStateRecord', 'SYSTEM', 'deleteExpiredRecord', '{\"duration\":1,\"tableName\":\"ne_state\",\"colName\":\"timestamp\"}', '0 25 0 * * ?', '3', '0', '1', '1', 'supervisor', 1698478134842, 'admin', 1703668901929, 'job.deleteExpiredNeStateRecordRemark'); REPLACE INTO `sys_job` VALUES (9, 'job.getStateFromNE', 'SYSTEM', 'getStateFromNE', '', '0/10 * * * * ?', '3', '0', '0', '0', 'supervisor', 1698478134842, 'admin', 1713231120503, 'job.getStateFromNERemark'); REPLACE INTO `sys_job` VALUES (10, 'job.genNeStateAlarm', 'SYSTEM', 'genNeStateAlarm', '{\"alarmID\":\"HXEMSSM10000\",\"alarmCode\":10000,\"alarmTitle\":\"The system state is abnormal\",\"neType\":\"OMC\",\"alarmType\":\"EquipmentAlarm\",\"origSeverity\": \"Major\",\"objectName\":\"EMS;SystemManagement;Heartbeat\",\"objectType\":\"SystemState\",\"specificProblem\":\"Alarm cause: the system state of target NE has not been received for {threshold} seconds\", \"specificProblemID\":\"AC10000\",\"threshold\":30}', '0/5 * * * * ?', '3', '0', '0', '0', 'supervisor', 1698478134842, 'admin', 1713781643031, 'job.genNeStateAlarmRemark'); -REPLACE INTO `sys_job` VALUES (11, 'job.exportOperateLog', 'SYSTEM', 'exportTable', '{\"duration\":1,\"tableName\":\"sys_log_operate\",\"timeCol\":\"oper_time\",\"timeUnit\":\"milli\",\"columns\":\"oper_id,omc_get_dict_value(title, \\\"i18n_en\\\") as title,business_type,method,request_method,operator_type,oper_name,dept_name,oper_url,oper_ip,oper_location,oper_param,oper_msg,status,oper_time,cost_time,tenant_id\",\"extras\":\"\",\"filePath\":\"/usr/local/omc/backup/operate_log\"}', '0 0 0/1 * * ?', '3', '0', '1', '1', 'supervisor', 1698478134842, 'admin', 1722224659251, ''); -REPLACE INTO `sys_job` VALUES (12, 'job.exportIMSCDR', 'SYSTEM', 'exportTable', '{\"duration\":1,\"tableName\":\"cdr_event_ims\",\"timeCol\":\"timestamp\",\"timeUnit\":\"second\",\"columns\":\"*\",\"extras\":\"\",\"filePath\":\"/usr/local/omc/backup/ims_cdr\"}', '0 0 0/1 * * ?', '3', '0', '1', '1', 'supervisor', 1698478134842, 'admin', 1722224659251, ''); -REPLACE INTO `sys_job` VALUES (13, 'job.exportSMFCDR', 'SYSTEM', 'exportTable', '{\"duration\":1,\"tableName\":\"cdr_event_smf\",\"columns\":\"id, ne_type, ne_name, rm_uid, JSON_UNQUOTE(JSON_EXTRACT(cdr_json, \'$.recordType\')) AS record_type, JSON_UNQUOTE(JSON_EXTRACT(cdr_json, \'$.chargingID\')) AS charging_id, JSON_UNQUOTE(JSON_EXTRACT(cdr_json, \'$.subscriberIdentifier.subscriptionIDType\')) AS subscriber_id_type, JSON_UNQUOTE(JSON_EXTRACT(cdr_json, \'$.subscriberIdentifier.subscriptionIDData\')) AS subscriber_id_data, JSON_UNQUOTE(JSON_EXTRACT(cdr_json, \'$.duration\')) AS duration, JSON_UNQUOTE(JSON_EXTRACT(cdr_json, \'$.invocationTimestamp\')) as invocationTimestamp, JSON_UNQUOTE(JSON_EXTRACT(cdr_json, \'$.listOfMultipleUnitUsage[*].usedUnitContainer[*].dataVolumeUplink\')) AS data_volume_uplink, JSON_UNQUOTE(JSON_EXTRACT(cdr_json, \'$.listOfMultipleUnitUsage[*].usedUnitContainer[*].dataVolumeDownlink\')) AS data_volume_downlink, JSON_UNQUOTE(JSON_EXTRACT(cdr_json, \'$.listOfMultipleUnitUsage[*].usedUnitContainer[*].dataTotalVolume\')) AS data_total_volume, JSON_UNQUOTE(JSON_EXTRACT(cdr_json, \'$.pDUSessionChargingInformation.pDUAddress.pDUIPv4Address\')) AS pdu_ipv4_address, timestamp\",\"timeCol\":\"timestamp\",\"timeUnit\":\"second\",\"extras\":\"\",\"filePath\":\"/usr/local/omc/backup/smf_cdr\"}', '0 0 0/1 * * ?', '3', '0', '1', '1', 'supervisor', 1698478134842, 'admin', 1724309047797, ''); +REPLACE INTO `sys_job` VALUES (11, 'job.exportOperateLog', 'SYSTEM', 'exportTable', '{\"duration\":1,\"tableName\":\"sys_log_operate\",\"timeCol\":\"oper_time\",\"timeUnit\":\"milli\",\"columns\":\"oper_id,omc_get_dict_value(title, \\\"i18n_en\\\") as title,business_type,method,request_method,operator_type,oper_name,dept_name,oper_url,oper_ip,oper_location,oper_param,oper_msg,status,oper_time,cost_time,tenant_id\",\"extras\":\"\",\"filePath\":\"/usr/local/omc/backup/operate_log\"}', '0 0 0/1 * * ?', '3', '0', '1', '1', 'supervisor', 1698478134842, 'admin', 1724833786290, 'job.exportOperateLog'); +REPLACE INTO `sys_job` VALUES (12, 'job.exportIMSCDR', 'SYSTEM', 'exportTable', '{\"duration\":1,\"tableName\":\"cdr_event_ims\",\"columns\":\"id,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.recordType\')) as record_type,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.callType\')) as call_type,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.callerParty\')) as caller_party,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.calledParty\')) as called_party,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.callDuration\')) as call_duration,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.serviceResult\')) as service_result,DATE_FORMAT(FROM_UNIXTIME(timestamp), \'%Y-%m-%d %H:%i:%s\') AS timestamp\",\"timeCol\":\"timestamp\",\"timeUnit\":\"second\",\"extras\":\"\",\"filePath\":\"/usr/local/omc/backup/ims_cdr\"}', '0 0 0/1 * * ?', '3', '0', '1', '1', 'supervisor', 1698478134842, 'admin', 1722224659251, ''); +REPLACE INTO `sys_job` VALUES (13, 'job.exportSMFCDR', 'SYSTEM', 'exportTable', '{\"duration\":1,\"tableName\":\"cdr_event_smf\",\"columns\":\"id,ne_type,ne_name,rm_uid,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.recordType\')) AS record_type,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.chargingID\')) AS charging_id,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.subscriberIdentifier.subscriptionIDType\')) AS subscriber_id_type,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.subscriberIdentifier.subscriptionIDData\')) AS subscriber_id_data,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.duration\')) AS duration,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.invocationTimestamp\')) as invocationTimestamp,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.listOfMultipleUnitUsage[*].usedUnitContainer[*].dataVolumeUplink\')) AS data_volume_uplink,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.listOfMultipleUnitUsage[*].usedUnitContainer[*].dataVolumeDownlink\')) AS data_volume_downlink,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.listOfMultipleUnitUsage[*].usedUnitContainer[*].dataTotalVolume\')) AS data_total_volume,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.pDUSessionChargingInformation.pDUAddress.pDUIPv4Address\')) AS pdu_ipv4_address,timestamp\",\"timeCol\":\"timestamp\",\"timeUnit\":\"second\",\"extras\":\"\",\"filePath\":\"/usr/local/omc/backup/smf_cdr\"}', '0 0 0/1 * * ?', '3', '0', '1', '1', 'supervisor', 1698478134842, 'admin', 1724309047797, ''); SET FOREIGN_KEY_CHECKS = 1; \ No newline at end of file diff --git a/src/modules/crontask/processor/exportTable/exportTable.go b/src/modules/crontask/processor/exportTable/exportTable.go index a20972b1..8411f7ce 100644 --- a/src/modules/crontask/processor/exportTable/exportTable.go +++ b/src/modules/crontask/processor/exportTable/exportTable.go @@ -58,29 +58,36 @@ func (s *BarProcessor) Execute(data any) (any, error) { } //duration = params.Duration - // 查询数据 - var unitNum int = 0 + now := time.Now() + end := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location()) + start := end.Add(-time.Duration(params.Duration) * time.Hour) + + var startTime, endTime int64 switch params.TimeUnit { case "second": - unitNum = 1 + // 格式化时间戳为秒级 + startTime = start.Unix() + endTime = end.Unix() case "milli": - unitNum = 1000 + // 格式化时间戳为毫秒级 + startTime = start.UnixMilli() + endTime = end.UnixMilli() case "micro": - unitNum = 1000000 + // 格式化时间戳为微妙级 + startTime = start.UnixMicro() + endTime = end.UnixMicro() default: return nil, fmt.Errorf("error input parameter") } var query string if params.Extras != "" { - query = fmt.Sprintf("SELECT %s FROM `%s` WHERE `%s` >= UNIX_TIMESTAMP(NOW() - INTERVAL 2*%d HOUR) * %d AND `%s` < UNIX_TIMESTAMP(NOW() - INTERVAL %d HOUR) * %d AND %s", - params.Columns, params.TableName, params.TimeCol, params.Duration, unitNum, params.TimeCol, params.Duration, unitNum, params.Extras) + query = fmt.Sprintf("SELECT %s FROM `%s` WHERE `%s` >= %d AND `%s` < %d AND %s", + params.Columns, params.TableName, params.TimeCol, startTime, params.TimeCol, endTime, params.Extras) } else { - query = fmt.Sprintf("SELECT %s FROM `%s` WHERE `%s` >= UNIX_TIMESTAMP(NOW() - INTERVAL 2*%d HOUR) * %d AND `%s` < UNIX_TIMESTAMP(NOW() - INTERVAL %d HOUR) * %d", - params.Columns, params.TableName, params.TimeCol, params.Duration, unitNum, params.TimeCol, params.Duration, unitNum) + query = fmt.Sprintf("SELECT %s FROM `%s` WHERE `%s` >= %d AND `%s` < %d", + params.Columns, params.TableName, params.TimeCol, startTime, params.TimeCol, endTime) } - - //log.Trace("query:", query) - //filePath := fmt.Sprintf("%s/%s_export_%s.csv", params.FilePath, params.TableName, time.Now().Format("20060102150405")) + log.Trace("query:", query) filePath := fmt.Sprintf("%s/%s_export_%s.csv", params.FilePath, params.TableName, time.Now().Format("20060102150405")) affected, err := s.exportData(query, filePath) if err != nil {