diff --git a/database/install/sys_i18n.sql b/database/install/sys_i18n.sql index 1a807ae5..ce43901c 100644 --- a/database/install/sys_i18n.sql +++ b/database/install/sys_i18n.sql @@ -310,14 +310,14 @@ INSERT INTO `sys_i18n` VALUES (287, '0', 'system', 1699348237468, 'system', 1699 INSERT INTO `sys_i18n` VALUES (288, '0', 'system', 1699348237468, 'system', 1699348237468, 'config.errType', '操作含有内置参数,禁止删除!', 'The operation contains built-in parameters and deletion is prohibited!'); INSERT INTO `sys_i18n` VALUES (289, '0', 'system', 1699348237468, 'system', 1699348237468, 'job.monitor_sys_resource', '监控-系统资源', 'Monitor-System Resources'); INSERT INTO `sys_i18n` VALUES (290, '0', 'system', 1699348237468, 'system', 1699348237468, 'job.monitor_sys_resource_remark', '系统资源CPU/IO/Netword收集\ninterval单位分钟,平均分钟资源情况\n注:请根据cron表达式的时间单位分钟,传入参数interva值', 'System Resource CPU/IO/Netword Collection\ninterval unit minutes, average minute resource situation\nNote: Please pass the value of the parameter interva according to the time unit minutes of the cron expression'); -INSERT INTO `sys_i18n` VALUES (291, '0', 'system', 1699348237468, 'system', 1699348237468, 'job.delExpiredNeBackup', '删除过期网元备份文件', 'Delete expired NE etc backup file'); -INSERT INTO `sys_i18n` VALUES (292, '0', 'system', 1699348237468, 'system', 1699348237468, 'job.delExpiredNeBackupRemark', '删除过期网元etc备份文件, 传入参数表示保留{duration}天的备份文件, 默认60天', 'Delete expired network element etc backup file, pass in the parameter to keep the backup file for {duration} days, default is 60 days.'); -INSERT INTO `sys_i18n` VALUES (293, '0', 'system', 1699348237468, 'system', 1699348237468, 'job.deleteExpiredAlarmRecord', '删除过期历史告警记录', 'Delete expired historical alarm'); -INSERT INTO `sys_i18n` VALUES (294, '0', 'system', 1699348237468, 'system', 1699348237468, 'job.deleteExpiredAlarmRecordRemark', '删除过期历史告警记录,传入参数表示保留{duration}天的历史告警记录', 'Delete expired history alarm records, pass in the parameter to keep the history alarm records for {duration} days.'); -INSERT INTO `sys_i18n` VALUES (295, '0', 'system', 1699348237468, 'system', 1699348237468, 'job.deleteExpiredKpiRecord', '删除过期黄金指标记录', 'Delete expired KPI records'); -INSERT INTO `sys_i18n` VALUES (296, '0', 'system', 1699348237468, 'system', 1699348237468, 'job.deleteExpiredKpiRecordRemark', '黄金指标记录保留{duration}天', 'KPI record retention for {duration} days'); -INSERT INTO `sys_i18n` VALUES (297, '0', 'system', 1699348237468, 'system', 1699348237468, 'job.backupEtcFromNE', '网元配置自动备份任务', 'Network Element Configuration Auto Backup Task'); -INSERT INTO `sys_i18n` VALUES (298, '0', 'system', 1699348237468, 'system', 1699348237468, 'job.backupEtcFromNERemark', '自动备份网元etc目录下的配置文件', 'Automatically backs up the configuration files in the NE etc directory.'); +INSERT INTO `sys_i18n` VALUES (291, '0', 'system', 1699348237468, 'system', 1699348237468, 'job.delete_ne_config_backup', '删除-过期配置文件备份', 'Delete expired NE etc backup file'); +INSERT INTO `sys_i18n` VALUES (292, '0', 'system', 1699348237468, 'system', 1699348237468, 'job.delete_ne_config_backup_remark', 'storeDays:表示保留最近天数的数据记录', 'storeDays: indicates that the most recent days of data records are kept.'); +INSERT INTO `sys_i18n` VALUES (293, '0', 'system', 1699348237468, 'system', 1699348237468, 'job.delete_alarm_record', '删除-过期告警记录', 'Delete-Expired Alarm Records'); +INSERT INTO `sys_i18n` VALUES (294, '0', 'system', 1699348237468, 'system', 1699348237468, 'job.delete_alarm_record_remark', 'storeDays:表示保留最近天数的数据记录', 'storeDays: indicates that the most recent days of data records are kept.'); +INSERT INTO `sys_i18n` VALUES (295, '0', 'system', 1699348237468, 'system', 1699348237468, 'job.delete_kpi_record', '删除-过期指标记录', 'Delete expired KPI records'); +INSERT INTO `sys_i18n` VALUES (296, '0', 'system', 1699348237468, 'system', 1699348237468, 'job.delete_kpi_record_remark', 'storeDays:表示保留最近天数的数据记录\r\nneList:表示匹配的网元类型', 'storeDays: Indicates the most recent days of data records retained\r\nneList: Indicates the type of network elements matched'); +INSERT INTO `sys_i18n` VALUES (297, '0', 'system', 1699348237468, 'system', 1699348237468, 'table.cdr_event_sgwc', '漫游数据话单', 'Roaming Data CDR'); +-- INSERT INTO `sys_i18n` VALUES (298, '0', 'system', 1699348237468, 'system', 1699348237468, 'job.backupEtcFromNERemark', '自动备份网元etc目录下的配置文件', 'Automatically backs up the configuration files in the NE etc directory.'); INSERT INTO `sys_i18n` VALUES (299, '0', 'system', 1699348237468, 'system', 1699348237468, 'job.export.jobID', '任务编号', 'ID'); INSERT INTO `sys_i18n` VALUES (300, '0', 'system', 1699348237468, 'system', 1699348237468, 'job.export.jobName', '任务名称', 'Name'); INSERT INTO `sys_i18n` VALUES (301, '0', 'system', 1699348237468, 'system', 1699348237468, 'job.export.jobGroupName', '任务组名', 'Group'); @@ -514,12 +514,12 @@ INSERT INTO `sys_i18n` VALUES (491, '0', 'system', 1699348237468, 'system', 1699 INSERT INTO `sys_i18n` VALUES (492, '0', 'system', 1699348237468, 'system', 1699348237468, 'dictType.index_status.normal', '正常', 'Normal'); INSERT INTO `sys_i18n` VALUES (493, '0', 'system', 1699348237468, 'system', 1699348237468, 'dictType.index_status.abnormal', '异常', 'Abnormal'); INSERT INTO `sys_i18n` VALUES (494, '0', 'system', 1699348237468, 'system', 1699348237468, 'menu.log.neFile', '网元日志文件', 'NE Log File'); -INSERT INTO `sys_i18n` VALUES (495, '0', 'system', 1699348237468, 'system', 1699348237468, 'job.deleteExpiredNeStateRecord', '删除过期网元状态记录', 'Delete Expired NE State Record'); -INSERT INTO `sys_i18n` VALUES (496, '0', 'system', 1699348237468, 'system', 1699348237468, 'job.deleteExpiredNeStateRecordRemark', '定期删除过期的网元状态记录, 默认保留{duration}天', 'Delete expired NE state records regularly and keep them for {duration} days by default.'); -INSERT INTO `sys_i18n` VALUES (497, '0', 'system', 1699348237468, 'system', 1699348237468, 'job.getStateFromNE', '获取网元状态信息', 'Get state from NEs'); -INSERT INTO `sys_i18n` VALUES (498, '0', 'system', 1699348237468, 'system', 1699348237468, 'job.getStateFromNERemark', '获取所有网元状态信息', 'Get state information from all NEs'); -INSERT INTO `sys_i18n` VALUES (499, '0', 'system', 1699348237468, 'system', 1699348237468, 'job.ne_alarm_state_check', '网元告警-状态检查', 'Network Element Health Check'); -INSERT INTO `sys_i18n` VALUES (500, '0', 'system', 1699348237468, 'system', 1699348237468, 'job.ne_alarm_state_check_remark', '网元告警-状态检查,异常时产生告警', 'Health status inspection of network elements, generating alarms in case of abnormalities.'); +-- INSERT INTO `sys_i18n` VALUES (495, '0', 'system', 1699348237468, 'system', 1699348237468, 'job.deleteExpiredNeStateRecord', '删除过期网元状态记录', 'Delete Expired NE State Record'); +-- INSERT INTO `sys_i18n` VALUES (496, '0', 'system', 1699348237468, 'system', 1699348237468, 'job.deleteExpiredNeStateRecordRemark', '定期删除过期的网元状态记录, 默认保留{duration}天', 'Delete expired NE state records regularly and keep them for {duration} days by default.'); +-- INSERT INTO `sys_i18n` VALUES (497, '0', 'system', 1699348237468, 'system', 1699348237468, 'job.getStateFromNE', '获取网元状态信息', 'Get state from NEs'); +-- INSERT INTO `sys_i18n` VALUES (498, '0', 'system', 1699348237468, 'system', 1699348237468, 'job.getStateFromNERemark', '获取所有网元状态信息', 'Get state information from all NEs'); +INSERT INTO `sys_i18n` VALUES (499, '0', 'system', 1699348237468, 'system', 1699348237468, 'job.ne_alarm_state_check', '网元告警-状态检查', 'NE Health Check'); +INSERT INTO `sys_i18n` VALUES (500, '0', 'system', 1699348237468, 'system', 1699348237468, 'job.ne_alarm_state_check_remark', '检查网元的健康状况,在出现异常时发出警报。\r\n\r\nAlarm type:\r\nCommunicationAlarm=1\r\nEquipmentAlarm=2\r\nProcessingFailure=3\r\nEnvironmentalAlarm=4\r\nQualityOfServiceAlarm=5\r\n\r\nSeverity:\r\nCritical=1\r\nMajor=2\r\nMinor=3\r\nWarning=4', 'Checks the health of network elements and sends alerts in case of anomalies.\n\nAlarm type:\nCommunicationAlarm=1\nEquipmentAlarm=2\nProcessingFailure=3\nEnvironmentalAlarm=4\nQualityOfServiceAlarm=5\n\nSeverity:\nCritical=1\nMajor=2\nMinor=3\nWarning=4'); INSERT INTO `sys_i18n` VALUES (501, '0', 'system', 1699348237468, 'system', 1699348237468, 'menu.neUser.nssf', 'NSSF在线订阅数', 'NSSF Subscription Info'); INSERT INTO `sys_i18n` VALUES (502, '0', 'system', 1699348237468, 'system', 1699348237468, 'menu.neUser.nssfAmf', 'NSSF可用的注册AMF', 'NSSF Available AMFs'); INSERT INTO `sys_i18n` VALUES (503, '0', 'system', 1699348237468, 'system', 1699348237468, 'menu.monitor.topology', '拓扑信息', 'Topology Info'); @@ -663,7 +663,7 @@ INSERT INTO `sys_i18n` VALUES (640, '0', 'system', 1699348237468, 'system', 1699 INSERT INTO `sys_i18n` VALUES (641, '0', 'system', 1699348237468, 'system', 1699348237468, 'table.cdr_event_ims', '语音话单', 'Voice CDR'); INSERT INTO `sys_i18n` VALUES (642, '0', 'system', 1699348237468, 'system', 1699348237468, 'table.cdr_event_smf', '数据话单', 'Data CDR'); INSERT INTO `sys_i18n` VALUES (643, '0', 'system', 1699348237468, 'system', 1699348237468, 'table.cdr_event_smsc', '短信话单', 'SMS CDR'); -INSERT INTO `sys_i18n` VALUES (644, '0', 'system', 1699348237468, 'system', 1699348237468, 'menu.log.exportFile', '导出文件管理', 'Exported File Management'); +INSERT INTO `sys_i18n` VALUES (644, '0', 'system', 1699348237468, 'system', 1699348237468, 'menu.log.exportFile', '导出文件', 'Exported File'); INSERT INTO `sys_i18n` VALUES (645, '0', 'system', 1699348237468, 'system', 1699348237468, 'menu.perf.kpiCReport', '自定义指标数据', 'Custom Indicator Data'); INSERT INTO `sys_i18n` VALUES (646, '0', 'system', 1699348237468, 'system', 1699348237468, 'menu.trace.taskHLR', 'HLR 跟踪任务', 'HLR Trace Task'); INSERT INTO `sys_i18n` VALUES (647, '0', 'system', 1699348237468, 'system', 1699348237468, 'dictType.cdr_cause_code', 'CDR 响应原因代码类别类型', 'CDR Response Reason Code Category Type'); diff --git a/database/install/sys_job.sql b/database/install/sys_job.sql index 80796017..3a40ffc9 100644 --- a/database/install/sys_job.sql +++ b/database/install/sys_job.sql @@ -31,19 +31,17 @@ CREATE TABLE `sys_job` ( INSERT INTO `sys_job` VALUES (1, 'job.monitor_sys_resource', 'SYSTEM', 'monitor_sys_resource', '{\"interval\":5}', '0 0/5 * * * ?', '3', '0', '1', '0', 'system', 1698478134839, 'system', 1698478134839, 'job.monitor_sys_resource_remark'); INSERT INTO `sys_job` VALUES (2, 'job.ne_config_backup', 'SYSTEM', 'ne_config_backup', '', '0 30 0 * * ?', '3', '0', '1', '1', 'system', 1698478134839, 'system', 1698478134839, 'job.ne_config_backup_remark'); INSERT INTO `sys_job` VALUES (3, 'job.ne_data_udm', 'SYSTEM', 'ne_data_udm', '', '0 0 0/12 * * ?', '3', '0', '1', '1', 'system', 1698478134839, 'system', 1698478134839, ''); -INSERT INTO `sys_job` VALUES (4, 'job.ne_alarm_state_check', 'SYSTEM', 'ne_alarm_state_check', '{\"alarmTitle\":\"NE State Check Alarm\",\"alarmType\":\"2\",\"origSeverity\":\"2\",\"specificProblem\":\"alarm cause: the system state of target NE has not been received\",\"specificProblemId\":\"AC10000\",\"addInfo\":\"\"}', '0/5 * * * * ?', '3', '0', '1', '0', 'system', 1698478134839, 'system', 1698478134839, 'job.ne_alarm_state_check_remark'); -INSERT INTO `sys_job` VALUES (5, 'job.deleteExpiredAlarmRecord', 'SYSTEM', 'deleteExpiredRecord', '{\"duration\":90,\"tableName\":\"alarm\",\"colName\":\"event_time\",\"extras\":\"alarm_status=\'0\'\"}', '0 10 0 * * ?', '3', '0', '1', '1', 'system', 1698478134839, 'system', 1698478134839, 'job.deleteExpiredAlarmRecordRemark'); -INSERT INTO `sys_job` VALUES (6, 'job.deleteExpiredKpiRecord', 'SYSTEM', 'deleteExpiredRecord', '{\"duration\":30,\"tableName\":\"gold_kpi\",\"colName\":\"date\"}', '0 15 0 * * ?', '3', '0', '1', '1', 'system', 1698478134839, 'system', 1698478134839, 'job.deleteExpiredKpiRecordRemark'); -INSERT INTO `sys_job` VALUES (7, 'job.backupEtcFromNE', 'SYSTEM', 'backupEtcFromNE', '{}', '0 30 0 * * ?', '3', '0', '0', '1', 'system', 1698478134839, 'system', 1698478134839, 'job.backupEtcFromNERemark'); -INSERT INTO `sys_job` VALUES (8, 'job.deleteExpiredNeStateRecord', 'SYSTEM', 'deleteExpiredRecord', '{\"duration\":1,\"tableName\":\"ne_state\",\"colName\":\"timestamp\"}', '0 25 0 * * ?', '3', '0', '1', '1', 'system', 1698478134839, 'system', 1698478134839, 'job.deleteExpiredNeStateRecordRemark'); -INSERT INTO `sys_job` VALUES (9, 'job.getStateFromNE', 'SYSTEM', 'getStateFromNE', '', '0/10 * * * * ?', '3', '0', '0', '0', 'system', 1698478134839, 'system', 1698478134839, 'job.getStateFromNERemark'); -INSERT INTO `sys_job` VALUES (10, 'job.delExpiredNeBackup', 'SYSTEM', 'delExpiredNeBackup', '{\"duration\":60}', '0 20 0 * * ?', '3', '0', '0', '1', 'system', 1698478134839, 'system', 1698478134839, 'job.delExpiredNeBackupRemark'); -INSERT INTO `sys_job` VALUES (11, 'job.exportOperateLog', 'SYSTEM', 'exportTable', '{\"duration\":1,\"tableName\":\"sys_log_operate\",\"timeCol\":\"oper_time\",\"timeUnit\":\"milli\",\"columns\":\"oper_id,title,business_type,method,request_method,operator_type,oper_name,dept_name,oper_url,oper_ip,oper_location,oper_param,oper_msg,status,oper_time,cost_time\",\"extras\":\"\",\"filePath\":\"/usr/local/omc/backup/operate_log\"}', '0 0 0/1 * * ?', '3', '0', '1', '1', 'system', 1698478134839, 'system', 1698478134839, 'job.exportOperateLog'); -INSERT INTO `sys_job` VALUES (12, 'job.exportIMSCDR', 'SYSTEM', 'exportTable', '{\"duration\":1,\"tableName\":\"cdr_event_ims\",\"columns\":\"id,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.recordType\')) as record_type,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.callType\')) as call_type,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.callerParty\')) as caller_party,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.calledParty\')) as called_party,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.callDuration\')) as call_duration,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.serviceResult\')) as service_result,DATE_FORMAT(FROM_UNIXTIME(timestamp), \'%Y-%m-%d %H:%i:%s\') AS timestamp\",\"timeCol\":\"timestamp\",\"timeUnit\":\"second\",\"extras\":\"\",\"filePath\":\"/usr/local/omc/backup/ims_cdr\"}', '0 0 0/1 * * ?', '3', '0', '1', '1', 'system', 1698478134839, 'system', 1698478134839, ''); -INSERT INTO `sys_job` VALUES (13, 'job.exportSMFCDR', 'SYSTEM', 'exportTable', '{\"duration\":1,\"tableName\":\"cdr_event_smf\",\"columns\":\"id,ne_type,ne_name,rm_uid,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.recordType\')) AS record_type,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.chargingID\')) AS charging_id,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.subscriberIdentifier.subscriptionIDType\')) AS subscriber_id_type,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.subscriberIdentifier.subscriptionIDData\')) AS subscriber_id_data,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.duration\')) AS duration,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.invocationTimestamp\')) as invocationTimestamp,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.listOfMultipleUnitUsage[*].usedUnitContainer[*].dataVolumeUplink\')) AS data_volume_uplink,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.listOfMultipleUnitUsage[*].usedUnitContainer[*].dataVolumeDownlink\')) AS data_volume_downlink,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.listOfMultipleUnitUsage[*].usedUnitContainer[*].dataTotalVolume\')) AS data_total_volume,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.pDUSessionChargingInformation.pDUAddress.pDUIPv4Address\')) AS pdu_ipv4_address,timestamp\",\"timeCol\":\"timestamp\",\"timeUnit\":\"second\",\"extras\":\"\",\"filePath\":\"/usr/local/omc/backup/smf_cdr\"}', '0 0 0/1 * * ?', '3', '0', '1', '1', 'system', 1698478134839, 'system', 1698478134839, ''); -INSERT INTO `sys_job` VALUES (14, 'job.exportSMSCCDR', 'SYSTEM', 'exportTable', '{\"duration\":1,\"tableName\":\"cdr_event_smsc\",\"columns\":\"id,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.recordType\')) as record_type,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.serviceType\')) as service_type,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.callerParty\')) as caller_party,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.calledParty\')) as called_party,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.result\')) as result,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.updateTime\')) as update_time\",\"timeCol\":\"timestamp\",\"timeUnit\":\"second\",\"extras\":\"\",\"filePath\":\"/usr/local/omc/backup/smsc_cdr\"}', '0 0 0/1 * * ?', '3', '0', '1', '1', 'system', 1698478134839, 'system', 1698478134839, ''); -INSERT INTO `sys_job` VALUES (15, 'job.removeExportedFiles', 'SYSTEM', 'removeFile', '[{\"filePath\":\"/usr/local/omc/backup/operate_log\",\"maxDays\":30},{\"filePath\":\"/usr/local/omc/backup/ims_cdr\",\"maxDays\":30},{\"filePath\":\"/usr/local/omc/backup/smf_cdr\",\"maxDays\":30},{\"filePath\":\"/usr/local/omc/backup/smsc_cdr\",\"maxDays\":30}]', '0 10 0 * * ?', '3', '0', '1', '1', 'system', 1698478134839, 'system', 1698478134839, ''); -INSERT INTO `sys_job` VALUES (16, 'job.exportSGWCCDR', 'SYSTEM', 'exportTable', '{\"duration\":1,\"tableName\":\"cdr_event_sgwc\",\"columns\":\"id,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.recordType\')) as recordType,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.accessPointNameNI\')) as accessPointNameNI,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.servedIMSI\')) as IMSI,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.servedMSISDN\')) as MSISDN,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.servedPDPPDNAddress\')) as PdpAddress,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.duration\')) as duration,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.recordOpeningTime\')) as recordOpeningTime,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.chargingID\')) as chargingID,JSON_UNQUOTE(JSON_EXTRACT(cdr_json, \'$.listOfTrafficVolumes[0].dataVolumeGPRSDownlink\')) AS dataVolumeGPRSDownlink,JSON_UNQUOTE(JSON_EXTRACT(cdr_json, \'$.listOfTrafficVolumes[0].dataVolumeGPRsUplink\')) as dataVolumeGPRsUplink,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.userLocationInformation.tai.tac\')) as tac,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.userLocationInformation.ecgi.eutraCellId\')) as cellID\",\"timeCol\":\"timestamp\",\"timeUnit\":\"second\",\"extras\":\"\",\"filePath\":\"/usr/local/omc/backup/sgwc_cdr\"}', '0 0 0/1 * * ?', '3', '0', '1', '1', 'system', 1698478134839, 'system', 1698478134839, ''); +INSERT INTO `sys_job` VALUES (4, 'job.ne_alarm_state_check', 'SYSTEM', 'ne_alarm_state_check', '{\"alarmTitle\":\"NE State Check Alarm\",\"alarmType\":\"2\",\"origSeverity\":\"2\",\"specificProblem\":\"alarm cause: the system state of target NE has not been received\",\"specificProblemId\":\"AC10000\",\"addInfo\":\"\"}', '0/10 * * * * ?', '3', '0', '1', '0', 'system', 1698478134839, 'system', 1698478134839, 'job.ne_alarm_state_check_remark'); +INSERT INTO `sys_job` VALUES (5, 'job.delete_alarm_record', 'SYSTEM', 'delete_alarm_record', '{\"storeDays\":7}', '0 10 0 * * ?', '3', '0', '1', '1', 'system', 1698478134839, 'system', 1698478134839, 'job.delete_alarm_record_remark'); +INSERT INTO `sys_job` VALUES (6, 'job.delete_kpi_record', 'SYSTEM', 'delete_kpi_record', '{\"storeDays\":7,\"neList\":[\"IMS\",\"AMF\",\"UDM\",\"UPF\",\"MME\",\"SMSC\",\"SMF\",\"MME\"]}', '0 20 0 * * ?', '3', '0', '1', '1', 'system', 1698478134839, 'system', 1698478134839, 'job.delete_kpi_record_remark'); +INSERT INTO `sys_job` VALUES (7, 'job.delete_ne_config_backup', 'SYSTEM', 'delete_ne_config_backup', '{\"storeDays\":7}', '0 20 0 * * ?', '3', '0', '1', '1', 'system', 1698478134839, 'system', 1698478134839, 'job.delete_ne_config_backup_remark'); + +INSERT INTO `sys_job` VALUES (21, 'job.exportOperateLog', 'SYSTEM', 'exportTable', '{\"hour\":1,\"columns\":[\"id\",\"title\",\"business_type\",\"opera_by\",\"opera_url_method\",\"opera_url\",\"opera_ip\",\"status\",\"opera_time\",\"cost_time\"],\"tableName\":\"sys_log_operate\",\"filePath\":\"/usr/local/omc/backup/operate_log\"}', '0 0 0/1 * * ?', '3', '0', '1', '1', 'system', 1698478134839, 'system', 1698478134839, ''); +INSERT INTO `sys_job` VALUES (22, 'job.exportIMSCDR', 'SYSTEM', 'exportTable', '{\"hour\":1,\"columns\":[\"id\",\"record_type\",\"call_type\",\"caller_party\",\"called_party\",\"call_duration\",\"cause\",\"seizure_time\",\"release_time\"],\"tableName\":\"cdr_event_ims\",\"filePath\":\"/usr/local/omc/backup/ims_cdr\"}', '0 0 0/1 * * ?', '3', '0', '1', '1', 'system', 1698478134839, 'system', 1698478134839, ''); +INSERT INTO `sys_job` VALUES (23, 'job.exportSMFCDR', 'SYSTEM', 'exportTable', '{\"hour\":1,\"columns\":[\"id\",\"charging_id\",\"subscription_id_data\",\"subscription_id_type\",\"data_volume_uplink\",\"data_volume_downlink\",\"data_total_volume\",\"invocation_timestamp\",\"user_identifier\",\"ssc_mode\",\"dnn_id\",\"pdu_type\",\"rat_type\",\"pdu_ipv4\",\"pdu_ipv6\",\"network_function_ipv4_address\",\"record_nfId\",\"record_type\",\"record_opening_time\"],\"tableName\":\"cdr_event_smf\",\"filePath\":\"/usr/local/omc/backup/smf_cdr\"}', '0 0 0/1 * * ?', '3', '0', '1', '1', 'system', 1698478134839, 'system', 1698478134839, ''); +INSERT INTO `sys_job` VALUES (24, 'job.exportSMSCCDR', 'SYSTEM', 'exportTable', '{\"hour\":1,\"columns\":[\"id\",\"record_type\",\"service_type\",\"caller_party\",\"called_party\",\"result\",\"update_time\"],\"tableName\":\"cdr_event_smsc\",\"filePath\":\"/usr/local/omc/backup/smsc_cdr\"}', '0 0 0/1 * * ?', '3', '0', '1', '1', 'system', 1698478134839, 'system', 1698478134839, ''); +INSERT INTO `sys_job` VALUES (25, 'job.exportSGWCCDR', 'SYSTEM', 'exportTable', '{\"hour\":1,\"columns\":[\"charging_id\",\"served_imsi\",\"served_msisdn\",\"data_volume_gprs_uplink\",\"data_volume_gprs_downlink\",\"duration\",\"invocation_timestamp\",\"pgw_address_used\",\"sgw_address\",\"rat_type\",\"pdp_pdn_type\",\"served_pdppdn_address\",\"serving_node_address\",\"serving_node_type\",\"access_point_name_ni\",\"cause_for_rec_closing\",\"record_sequence_number\",\"local_record_sequence_number\",\"record_type\",\"record_opening_time\"],\"tableName\":\"cdr_event_sgwc\",\"filePath\":\"/usr/local/omc/backup/sgwc_cdr\"}', '0 0 0/1 * * ?', '3', '0', '1', '1', 'system', 1698478134839, 'system', 1698478134839, ''); +INSERT INTO `sys_job` VALUES (26, 'job.removeExportedFiles', 'SYSTEM', 'removeFile', '[{\"filePath\":\"/usr/local/omc/backup/operate_log\",\"maxDays\":30},{\"filePath\":\"/usr/local/omc/backup/ims_cdr\",\"maxDays\":30},{\"filePath\":\"/usr/local/omc/backup/smf_cdr\",\"maxDays\":30},{\"filePath\":\"/usr/local/omc/backup/smsc_cdr\",\"maxDays\":30},{\"filePath\":\"/usr/local/omc/backup/sgwc_cdr\",\"maxDays\":30}]', '0 10 0 * * ?', '3', '0', '1', '1', 'system', 1698478134839, 'system', 1698478134839, ''); SET FOREIGN_KEY_CHECKS = 1; diff --git a/src/modules/crontask/processor/backupEtcFromNE/backupEtcFromNE.go b/src/modules/crontask/processor/backupEtcFromNE/backupEtcFromNE.go deleted file mode 100644 index 5cb1ca0c..00000000 --- a/src/modules/crontask/processor/backupEtcFromNE/backupEtcFromNE.go +++ /dev/null @@ -1,174 +0,0 @@ -package backupEtcFromNE - -import ( - "fmt" - "os" - "strings" - "time" - - "be.ems/lib/config" - "be.ems/lib/dborm" - "be.ems/lib/global" - "be.ems/lib/log" - "be.ems/src/framework/cron" -) - -var NewProcessor = &BarProcessor{ - progress: 0, - count: 0, -} - -// bar 队列任务处理 -type BarProcessor struct { - // 任务进度 - progress int - // 执行次数 - count int -} - -type BarParams struct { - Duration int `json:"duration"` - TableName string `json:"tableName"` - ColName string `json:"colName"` // column name of time string - Extras string `json:"extras"` // extras condition for where -} - -func (s *BarProcessor) Execute(data any) (any, error) { - log.Infof("execute %d,last progress: %d ", s.count, s.progress) - s.count++ - - options := data.(cron.JobData) - sysJob := options.SysJob - // var params BarParams - - // err := json.Unmarshal([]byte(sysJob.TargetParams), ¶ms) - // if err != nil { - // return nil, err - // } - - log.Infof("Repeat %v Job ID %d", options.Repeat, sysJob.JobId) - - var nes []dborm.NeInfo - _, err := dborm.XormGetAllNeInfo(&nes) - if err != nil { - return nil, err - } - - var successfulNEs, failureNEs []string - for _, neInfo := range nes { - neTypeUpper := strings.ToUpper(neInfo.NeType) - neTypeLower := strings.ToLower(neInfo.NeType) - nePath := fmt.Sprintf("%s/etc/%s", config.GetYamlConfig().OMC.Backup, neTypeLower) - isExist, err := global.PathExists(nePath) - if err != nil { - log.Errorf("Failed to PathExists:", err) - failureNEs = append(failureNEs, neInfo.NeType+"/"+neInfo.NeId) - continue - } - if isExist { - err = os.RemoveAll(nePath) - if err != nil { - log.Errorf("Failed to RemoveAll:", err) - failureNEs = append(failureNEs, neInfo.NeType+"/"+neInfo.NeId) - continue - } - } - err = os.MkdirAll(nePath, os.ModePerm) - if err != nil { - log.Errorf("Failed to MkdirAll:", err) - failureNEs = append(failureNEs, neInfo.NeType+"/"+neInfo.NeId) - continue - } - - var scpCmd string - ipType := global.ParseIPAddr(neInfo.Ip) - omcNetypeLower := strings.ToLower(config.GetYamlConfig().OMC.NeType) - etcListIMS := "{*.yaml,mmtel,vars.cfg}" - if config.GetYamlConfig().NE.EtcListIMS != "" { - etcListIMS = config.GetYamlConfig().NE.EtcListIMS - } - switch neTypeLower { - case omcNetypeLower: - if ipType == global.IsIPv4 { - scpCmd = fmt.Sprintf("scp -r %s@%s:%s/etc/*.yaml %s/etc/%s", config.GetYamlConfig().NE.User, - neInfo.Ip, config.GetYamlConfig().NE.OmcDir, config.GetYamlConfig().OMC.Backup, neTypeLower) - } else { - scpCmd = fmt.Sprintf("scp -r %s@[%s]:%s/etc/*.yaml %s/etc/%s", config.GetYamlConfig().NE.User, - neInfo.Ip, config.GetYamlConfig().NE.OmcDir, config.GetYamlConfig().OMC.Backup, neTypeLower) - } - - case "ims": - if ipType == global.IsIPv4 { - scpCmd = fmt.Sprintf("scp -r %s@%s:%s/%s/%s %s/etc/%s", config.GetYamlConfig().NE.User, - neInfo.Ip, config.GetYamlConfig().NE.EtcDir, neTypeLower, - etcListIMS, config.GetYamlConfig().OMC.Backup, neTypeLower) - } else { - scpCmd = fmt.Sprintf("scp -r %s@[%s]:%s/%s/%s %s/etc/%s", config.GetYamlConfig().NE.User, - neInfo.Ip, config.GetYamlConfig().NE.EtcDir, neTypeLower, - etcListIMS, config.GetYamlConfig().OMC.Backup, neTypeLower) - } - - case "mme": - if ipType == global.IsIPv4 { - scpCmd = fmt.Sprintf("scp -r %s@%s:%s/%s/*.conf %s/etc/%s", config.GetYamlConfig().NE.User, - neInfo.Ip, config.GetYamlConfig().NE.EtcDir, - neTypeLower, config.GetYamlConfig().OMC.Backup, neTypeLower) - } else { - scpCmd = fmt.Sprintf("scp -r %s@[%s]:%s/%s/*.conf %s/etc/%s", config.GetYamlConfig().NE.User, - neInfo.Ip, config.GetYamlConfig().NE.EtcDir, - neTypeLower, config.GetYamlConfig().OMC.Backup, neTypeLower) - } - - default: - if ipType == global.IsIPv4 { - scpCmd = fmt.Sprintf("scp -r %s@%s:%s/%s/*.yaml %s/etc/%s", config.GetYamlConfig().NE.User, - neInfo.Ip, config.GetYamlConfig().NE.EtcDir, - neTypeLower, config.GetYamlConfig().OMC.Backup, neTypeLower) - } else { - scpCmd = fmt.Sprintf("scp -r %s@[%s]:%s/%s/*.yaml %s/etc/%s", config.GetYamlConfig().NE.User, - neInfo.Ip, config.GetYamlConfig().NE.EtcDir, - neTypeLower, config.GetYamlConfig().OMC.Backup, neTypeLower) - } - - } - - zipFile := fmt.Sprintf("%s-%s-etc-%s.zip", neTypeLower, strings.ToLower(neInfo.NeId), time.Now().Format(global.DateData)) - zipFilePath := config.GetYamlConfig().OMC.Backup + "/" + zipFile - zipCmd := fmt.Sprintf("cd %s/etc && zip -r %s %s/*", config.GetYamlConfig().OMC.Backup, zipFilePath, neTypeLower) - - command := fmt.Sprintf("%s&&%s", scpCmd, zipCmd) - - log.Trace("command:", command) - out, err := global.ExecCmd(command) - if err != nil { - log.Error("Faile to exec command:", err) - failureNEs = append(failureNEs, neInfo.NeType+"/"+neInfo.NeId) - continue - } - log.Trace("command output:", out) - - md5Sum, err := global.GetFileMD5Sum(zipFilePath) - if err != nil { - log.Error("Faile to md5sum:", err) - failureNEs = append(failureNEs, neInfo.NeType+"/"+neInfo.NeId) - continue - } - //log.Debug("md5Str:", md5Sum) - path := config.GetYamlConfig().OMC.Backup - neBackup := dborm.NeBackup{NeType: neTypeUpper, NeId: neInfo.NeId, FileName: zipFile, Path: path, Md5Sum: md5Sum} - _, err = dborm.XormInsertTableOne("ne_backup", neBackup) - if err != nil { - log.Error("Faile to XormInsertTableOne:", err) - failureNEs = append(failureNEs, neInfo.NeType+"/"+neInfo.NeId) - continue - } - successfulNEs = append(successfulNEs, neInfo.NeType+"/"+neInfo.NeId) - } - - log.Infof("successfulNEs: %s failureNEs: %s", successfulNEs, failureNEs) - // result - return map[string]any{ - "successfulNEs": successfulNEs, - "failureNEs": failureNEs, - }, nil -} diff --git a/src/modules/crontask/processor/delExpiredNeBackup/delExpiredNeBackup.go b/src/modules/crontask/processor/delExpiredNeBackup/delExpiredNeBackup.go deleted file mode 100644 index 86d2ea7a..00000000 --- a/src/modules/crontask/processor/delExpiredNeBackup/delExpiredNeBackup.go +++ /dev/null @@ -1,80 +0,0 @@ -package delExpiredNeBackup - -import ( - "encoding/json" - "fmt" - - "be.ems/lib/dborm" - "be.ems/lib/log" - "be.ems/src/framework/cron" -) - -var NewProcessor = &BarProcessor{ - progress: 0, - count: 0, -} - -// bar 队列任务处理 -type BarProcessor struct { - // 任务进度 - progress int - // 执行次数 - count int -} - -type BarParams struct { - Duration int `json:"duration"` -} - -func (s *BarProcessor) Execute(data any) (any, error) { - log.Infof("执行 %d 次,上次进度: %d ", s.count, s.progress) - s.count++ - - options := data.(cron.JobData) - sysJob := options.SysJob - var params BarParams - duration := 60 - - err := json.Unmarshal([]byte(sysJob.TargetParams), ¶ms) - if err == nil { - duration = params.Duration - } - log.Infof("重复 %v 任务ID %d", options.Repeat, sysJob.JobId) - - // // 实现任务处理逻辑 - // i := 0 - // s.progress = i - // for i < 5 { - // // 获取任务进度 - // progress := s.progress - // log.Infof("jonId: %s => 任务进度:%d", sysJob.JobID, progress) - // // 延迟响应 - // time.Sleep(time.Second * 2) - // // 程序中途执行错误 - // if i == 3 { - // // arr := [1]int{1} - // // arr[i] = 3 - // // fmt.Println(arr) - // // return "i = 3" - // panic("程序中途执行错误") - // } - // i++ - // // 改变任务进度 - // s.progress = i - // } - where := fmt.Sprintf("NOW()>ADDDATE(`create_time`,interval %d day)", duration) - affected, err := dborm.XormDeleteDataByWhere(where, "ne_backup") - if err != nil { - // panic(fmt.Sprintf("Failed to XormDeleteDataByWhere:%v", err)) - return nil, err - } - - // delete expired files in backup directory - // todo ... - - // 返回结果,用于记录执行结果 - return map[string]any{ - "msg": "sucess", - "affected": affected, - }, nil -} diff --git a/src/modules/crontask/processor/deleteExpiredRecord/deleteExpiredRecord.go b/src/modules/crontask/processor/deleteExpiredRecord/deleteExpiredRecord.go deleted file mode 100644 index 8ffaccb8..00000000 --- a/src/modules/crontask/processor/deleteExpiredRecord/deleteExpiredRecord.go +++ /dev/null @@ -1,98 +0,0 @@ -package deleteExpiredRecord - -import ( - "encoding/json" - "fmt" - - "be.ems/lib/dborm" - "be.ems/lib/log" - "be.ems/src/framework/cron" -) - -var NewProcessor = &BarProcessor{ - progress: 0, - count: 0, -} - -// bar 队列任务处理 -type BarProcessor struct { - // 任务进度 - progress int - // 执行次数 - count int -} - -type BarParams struct { - Duration int `json:"duration"` - TableName string `json:"tableName"` - ColName string `json:"colName"` // column name of time string - Extras string `json:"extras"` // extras condition for where - SessFlag bool `json:"sessFlag"` // session flag, true: session model, false: no session -} - -func (s *BarProcessor) Execute(data any) (any, error) { - log.Infof("执行 %d 次,上次进度: %d ", s.count, s.progress) - s.count++ - - options := data.(cron.JobData) - sysJob := options.SysJob - var params BarParams - - err := json.Unmarshal([]byte(sysJob.TargetParams), ¶ms) - if err != nil { - return nil, err - } - - //duration = params.Duration - log.Infof("重复 %v 任务ID %d", options.Repeat, sysJob.JobId) - - // // 实现任务处理逻辑 - // i := 0 - // s.progress = i - // for i < 5 { - // // 获取任务进度 - // progress := s.progress - // log.Infof("jonId: %s => 任务进度:%d", sysJob.JobID, progress) - // // 延迟响应 - // time.Sleep(time.Second * 2) - // // 程序中途执行错误 - // if i == 3 { - // // arr := [1]int{1} - // // arr[i] = 3 - // // fmt.Println(arr) - // // return "i = 3" - // panic("程序中途执行错误") - // } - // i++ - // // 改变任务进度 - // s.progress = i - // } - - var where string - if params.Extras == "" { - where = fmt.Sprintf("NOW()>ADDDATE(`%s`,interval %d day)", params.ColName, params.Duration) - } else { - where = fmt.Sprintf("NOW()>ADDDATE(`%s`,interval %d day) and %s", params.ColName, params.Duration, params.Extras) - } - - var affected int64 = 0 - if params.SessFlag { - affected, err = dborm.XormDeleteDataByWhere(where, params.TableName) - if err != nil { - // panic(fmt.Sprintf("Failed to XormDeleteDataByWhere:%v", err)) - return nil, err - } - } else { - affected, err = dborm.XormDeleteDataByWhereNoSession(where, params.TableName) - if err != nil { - // panic(fmt.Sprintf("Failed to XormDeleteDataByWhere:%v", err)) - return nil, err - } - } - - // 返回结果,用于记录执行结果 - return map[string]any{ - "msg": "sucess", - "affected": affected, - }, nil -} diff --git a/src/modules/crontask/processor/delete_alarm_record/delete_alarm_record.go b/src/modules/crontask/processor/delete_alarm_record/delete_alarm_record.go new file mode 100644 index 00000000..806a2f69 --- /dev/null +++ b/src/modules/crontask/processor/delete_alarm_record/delete_alarm_record.go @@ -0,0 +1,81 @@ +package delete_alarm_record + +import ( + "encoding/json" + "fmt" + "time" + + "be.ems/src/framework/cron" + "be.ems/src/framework/database/db" + "be.ems/src/framework/logger" +) + +var NewProcessor = &DeleteAlarmRecordProcessor{ + count: 0, +} + +// DeleteAlarmRecordProcessor 删除告警记录 +type DeleteAlarmRecordProcessor struct { + count int // 执行次数 +} + +func (s *DeleteAlarmRecordProcessor) Execute(data any) (any, error) { + s.count++ // 执行次数加一 + options := data.(cron.JobData) + sysJob := options.SysJob + logger.Infof("重复:%v 任务ID:%d 执行次数:%d", options.Repeat, sysJob.JobId, s.count) + // 返回结果,用于记录执行结果 + result := map[string]any{ + "count": s.count, + } + + // 读取参数值 + var params struct { + StoreDays int `json:"storeDays"` // store days + } + err := json.Unmarshal([]byte(sysJob.TargetParams), ¶ms) + if err != nil { + return nil, fmt.Errorf("json params err: %v", err) + } + if params.StoreDays < 0 { + return nil, fmt.Errorf("params storeDays less than 0 ") + } + + // 计算删除时间 + ltTime := time.Now().AddDate(0, 0, -params.StoreDays).UnixMilli() + + // 告警表 + alarmTx := db.DB("").Table("alarm").Where("timestamp < ?", ltTime) + if err := alarmTx.Delete(nil).Error; err != nil { + result["alarm"] = err.Error() + } else { + result["alarm"] = alarmTx.RowsAffected + } + + // 告警事件表 + alarmEventTx := db.DB("").Table("alarm_event").Where("timestamp < ?", ltTime) + if err := alarmEventTx.Delete(nil).Error; err != nil { + result["alarm_event"] = err.Error() + } else { + result["alarm_event"] = alarmEventTx.RowsAffected + } + + // 告警日志表 + alarmLogTx := db.DB("").Table("alarm_log").Where("created_at < ?", ltTime) + if err := alarmLogTx.Delete(nil).Error; err != nil { + result["alarm_log"] = err.Error() + } else { + result["alarm_log"] = alarmLogTx.RowsAffected + } + + // 告警转发日志表 + alarmForwardLogTx := db.DB("").Table("alarm_forward_log").Where("created_at < ?", ltTime) + if err := alarmForwardLogTx.Delete(nil).Error; err != nil { + result["alarm_forward_log"] = err.Error() + } else { + result["alarm_forward_log"] = alarmForwardLogTx.RowsAffected + } + + // 返回结果,用于记录执行结果 + return result, nil +} diff --git a/src/modules/crontask/processor/delete_data_record/delete_data_record.go b/src/modules/crontask/processor/delete_data_record/delete_data_record.go new file mode 100644 index 00000000..0504b721 --- /dev/null +++ b/src/modules/crontask/processor/delete_data_record/delete_data_record.go @@ -0,0 +1,71 @@ +package delete_data_record + +import ( + "encoding/json" + "fmt" + "time" + + "be.ems/src/framework/cron" + "be.ems/src/framework/database/db" + "be.ems/src/framework/logger" +) + +var NewProcessor = &DeleteDataRecordProcessor{ + count: 0, +} + +// bar 队列任务处理 +type DeleteDataRecordProcessor struct { + count int // 执行次数 +} + +type optionParams struct { + TableName string `json:"tableName"` // table name + ColName string `json:"colName"` // column name + StoreDays int `json:"storeDays"` // store days + WhereSQL string `json:"whereSQL"` // extras condition for where +} + +func (s *DeleteDataRecordProcessor) Execute(data any) (any, error) { + s.count++ // 执行次数加一 + options := data.(cron.JobData) + sysJob := options.SysJob + logger.Infof("重复:%v 任务ID:%d 执行次数:%d", options.Repeat, sysJob.JobId, s.count) + // 返回结果,用于记录执行结果 + result := map[string]any{ + "count": s.count, + } + + // 读取参数值 + var params optionParams + err := json.Unmarshal([]byte(sysJob.TargetParams), ¶ms) + if err != nil { + return nil, fmt.Errorf("json params err: %v", err) + } + if params.TableName == "" { + return nil, fmt.Errorf("params tableName is empty ") + } + if params.StoreDays < 0 { + return nil, fmt.Errorf("params storeDays less than 0 ") + } + // 指定表名 + tx := db.DB("").Table(params.TableName) + + if params.StoreDays >= 0 && params.ColName != "" { + // 计算删除时间 + ltTime := time.Now().AddDate(0, 0, -params.StoreDays).UnixMilli() + tx = tx.Where(fmt.Sprintf("%s < ?", params.ColName), ltTime) + } + if params.WhereSQL != "" { + tx = tx.Where(params.WhereSQL) + } + + // 执行删除 + if err := tx.Delete(nil).Error; err != nil { + return nil, err + } + result["affected"] = tx.RowsAffected + + // 返回结果,用于记录执行结果 + return result, nil +} diff --git a/src/modules/crontask/processor/delete_kpi_record/delete_kpi_record.go b/src/modules/crontask/processor/delete_kpi_record/delete_kpi_record.go new file mode 100644 index 00000000..9199f28d --- /dev/null +++ b/src/modules/crontask/processor/delete_kpi_record/delete_kpi_record.go @@ -0,0 +1,77 @@ +package delete_kpi_record + +import ( + "encoding/json" + "fmt" + "strings" + "time" + + "be.ems/src/framework/cron" + "be.ems/src/framework/database/db" + "be.ems/src/framework/logger" +) + +var NewProcessor = &DeleteKPIRecordProcessor{ + count: 0, +} + +// DeleteKPIRecordProcessor 删除KPI记录 +type DeleteKPIRecordProcessor struct { + count int // 执行次数 +} + +func (s *DeleteKPIRecordProcessor) Execute(data any) (any, error) { + s.count++ // 执行次数加一 + options := data.(cron.JobData) + sysJob := options.SysJob + logger.Infof("重复:%v 任务ID:%d 执行次数:%d", options.Repeat, sysJob.JobId, s.count) + // 返回结果,用于记录执行结果 + result := map[string]any{ + "count": s.count, + } + + // 读取参数值 + var params struct { + StoreDays int `json:"storeDays"` // store days + NeList []string `json:"neList"` // ne list + } + err := json.Unmarshal([]byte(sysJob.TargetParams), ¶ms) + if err != nil { + return nil, fmt.Errorf("json params err: %v", err) + } + if params.StoreDays < 0 { + return nil, fmt.Errorf("params storeDays less than 0 ") + } + if len(params.NeList) <= 0 { + return nil, fmt.Errorf("params neList less than 0 ") + } + + // 计算删除时间 + ltTime := time.Now().AddDate(0, 0, -params.StoreDays).UnixMilli() + + for _, neType := range params.NeList { + neTypeLower := strings.ToLower(neType) + + // KPI数据表 + kpiTableName := fmt.Sprintf("kpi_report_%s", neTypeLower) + kpiTx := db.DB("").Table(kpiTableName).Where("created_at < ?", ltTime) + if err := kpiTx.Delete(nil).Error; err != nil { + result[kpiTableName] = err.Error() + } else { + result[kpiTableName] = kpiTx.RowsAffected + } + + // KPI自定义数据表 + kpicTableName := fmt.Sprintf("kpi_c_report_%s", neTypeLower) + kpicTx := db.DB("").Table(kpicTableName).Where("created_at < ?", ltTime) + if err := kpicTx.Delete(nil).Error; err != nil { + result[kpiTableName] = err.Error() + } else { + result[kpiTableName] = kpicTx.RowsAffected + } + + } + + // 返回结果,用于记录执行结果 + return result, nil +} diff --git a/src/modules/crontask/processor/delete_ne_config_backup/delete_ne_config_backup.go b/src/modules/crontask/processor/delete_ne_config_backup/delete_ne_config_backup.go new file mode 100644 index 00000000..d98e13cb --- /dev/null +++ b/src/modules/crontask/processor/delete_ne_config_backup/delete_ne_config_backup.go @@ -0,0 +1,128 @@ +package delete_ne_config_backup + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "strings" + "time" + + "be.ems/src/framework/cron" + "be.ems/src/framework/database/db" + "be.ems/src/framework/logger" + "be.ems/src/framework/utils/date" + neModel "be.ems/src/modules/network_element/model" + neService "be.ems/src/modules/network_element/service" +) + +var NewProcessor = &DeleteNeConfigBackupProcessor{ + neInfoService: neService.NewNeInfo, + count: 0, +} + +// DeleteNeConfigBackupProcessor 网元配置文件定期备份 +type DeleteNeConfigBackupProcessor struct { + neInfoService *neService.NeInfo // 网元信息服务 + count int // 执行次数 +} + +func (s *DeleteNeConfigBackupProcessor) Execute(data any) (any, error) { + s.count++ // 执行次数加一 + options := data.(cron.JobData) + sysJob := options.SysJob + logger.Infof("重复:%v 任务ID:%d 执行次数:%d", options.Repeat, sysJob.JobId, s.count) + // 返回结果,用于记录执行结果 + result := map[string]any{ + "count": s.count, + } + + // 读取参数值 + var params struct { + StoreDays int `json:"storeDays"` // store days + } + err := json.Unmarshal([]byte(sysJob.TargetParams), ¶ms) + if err != nil { + return nil, fmt.Errorf("json params err: %v", err) + } + if params.StoreDays < 0 { + return nil, fmt.Errorf("params storeDays less than 0 ") + } + + neList := s.neInfoService.Find(neModel.NeInfo{}, false, false) + for _, neInfo := range neList { + neTypeAndId := fmt.Sprintf("%s_%s", neInfo.NeType, neInfo.NeId) + tx := db.DB("").Model(&neModel.NeConfigBackup{}) + tx = tx.Where("ne_type = ? and ne_id = ?", neInfo.NeType, neInfo.NeId) + + // 查询最后记录数据 + var lastCreateTime int64 = 0 + lastTx := tx.Select("create_time").Order("create_time DESC").Limit(1) + if err := lastTx.Find(&lastCreateTime).Error; err != nil { + result[neTypeAndId] = err.Error() + continue + } + + if lastCreateTime <= 1e12 { + result[neTypeAndId] = "no data" + continue + } + + // 计算删除时间 + lastTime := time.UnixMilli(lastCreateTime) + ltTime := lastTime.AddDate(0, 0, -params.StoreDays) + + // 删除小于最后时间的数据 + delTx := tx.Delete("create_time < ?", ltTime.UnixMilli()) + if err := delTx.Error; err != nil { + result[neTypeAndId] = err.Error() + continue + } + result[neTypeAndId] = tx.RowsAffected + + // 删除本地文件 + s.deleteFile(neInfo.NeType, neInfo.NeId, ltTime) + } + + return result, nil +} + +// deleteFile 删除本地文件 +func (s DeleteNeConfigBackupProcessor) deleteFile(neType, neId string, oldFileDate time.Time) { + neTypeLower := strings.ToLower(neType) + localPath := fmt.Sprintf("/usr/local/etc/omc/ne_config/%s/%s/backup ", neTypeLower, neId) + files, err := os.ReadDir(localPath) + if err != nil { + logger.Errorf("logger Remove ne_config File ReadDir err: %v", err.Error()) + return + } + for _, file := range files { + // 跳过非指定文件名 + // zipFileName := fmt.Sprintf("%s-%s-etc-%s.zip", neTypeLower, neInfo.NeId, date.ParseDateToStr(time.Now(), date.YYYYMMDDHHMMSS)) + fileName := fmt.Sprintf("%s-%s-etc-", neTypeLower, neId) + if !strings.HasPrefix(file.Name(), fileName) { + continue + } + idx := strings.LastIndex(file.Name(), "-") + if idx == -1 { + continue + } + dateStr := file.Name()[idx+1 : idx+15] + + // 解析日期字符串 + fileDate, err := time.Parse(date.YYYYMMDDHHMMSS, dateStr) + if err != nil { + logger.Errorf("logger Remove ne_config name Parse err: %v", err.Error()) + continue + } + + // 判断文件日期是否在给定日期之前 + if fileDate.Before(oldFileDate) { + err := os.Remove(filepath.Join(localPath, file.Name())) + if err != nil { + logger.Errorf("logger Remove ne_config file err: %v", err.Error()) + continue + } + } + } +} diff --git a/src/modules/crontask/processor/exportTable/exportTable.go b/src/modules/crontask/processor/exportTable/exportTable.go index 0085e3bc..51a3a7d8 100644 --- a/src/modules/crontask/processor/exportTable/exportTable.go +++ b/src/modules/crontask/processor/exportTable/exportTable.go @@ -1,175 +1,923 @@ package exportTable import ( - "database/sql" - "encoding/csv" "encoding/json" "fmt" - "os" "path" "path/filepath" + "strings" "time" - "be.ems/lib/dborm" - "be.ems/lib/log" "be.ems/src/framework/config" "be.ems/src/framework/cron" + "be.ems/src/framework/database/db" + "be.ems/src/framework/i18n" "be.ems/src/framework/logger" "be.ems/src/framework/ssh" "be.ems/src/framework/utils/crypto" + "be.ems/src/framework/utils/date" + "be.ems/src/framework/utils/file" + "be.ems/src/framework/utils/parse" + neDataModel "be.ems/src/modules/network_data/model" + systemModel "be.ems/src/modules/system/model" systemService "be.ems/src/modules/system/service" ) var NewProcessor = &BarProcessor{ - progress: 0, - count: 0, + count: 0, } // bar 队列任务处理 type BarProcessor struct { - // 任务进度 - progress int - // 执行次数 - count int -} - -type BarParams struct { - Duration int `json:"duration"` - TableName string `json:"tableName"` - Columns string `json:"columns"` // exported column name of time string - TimeCol string `json:"timeCol"` // time stamp of column name - TimeUnit string `json:"timeUnit"` // timestamp unit: second/micro/milli - Extras string `json:"extras"` // extras condition for where - FilePath string `json:"filePath"` // file path + count int // 执行次数 } func (s *BarProcessor) Execute(data any) (any, error) { - s.count++ - + s.count++ // 执行次数加一 options := data.(cron.JobData) sysJob := options.SysJob - var params BarParams + logger.Infof("重复:%v 任务ID:%d 执行次数:%d", options.Repeat, sysJob.JobId, s.count) + // 返回结果,用于记录执行结果 + result := map[string]any{ + "count": s.count, + } + var params struct { + Hour int `json:"hour"` // hour + TableName string `json:"tableName"` + Columns []string `json:"columns"` + FilePath string `json:"filePath"` // file path + } err := json.Unmarshal([]byte(sysJob.TargetParams), ¶ms) if err != nil { return nil, err } - // mkdir if not exist - if _, err = os.Stat(params.FilePath); os.IsNotExist(err) { - err = os.MkdirAll(params.FilePath, os.ModePerm) - if err != nil { - log.Error("Failed to Mkdir:", err) - return nil, err - } + var affected int64 + var errMsg error + filePath := fmt.Sprintf("%s/%s_export_%s.csv", params.FilePath, strings.ToLower(params.TableName), time.Now().Format("20060102150405")) + switch params.TableName { + case "sys_log_operate": + affected, errMsg = s.exportSysLogOperate(params.Hour, params.Columns, filePath) + case "cdr_event_smsc": + affected, errMsg = s.exportSMSC(params.Hour, params.Columns, filePath) + case "cdr_event_ims": + affected, errMsg = s.exportIMS(params.Hour, params.Columns, filePath) + case "cdr_event_smf": + affected, errMsg = s.exportSMF(params.Hour, params.Columns, filePath) + case "cdr_event_sgwc": + affected, errMsg = s.exportSGWC(params.Hour, params.Columns, filePath) } - //duration = params.Duration - - now := time.Now() - end := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location()) - start := end.Add(-time.Duration(params.Duration) * time.Hour) - - var startTime, endTime int64 - switch params.TimeUnit { - case "second": - // 格式化时间戳为秒级 - startTime = start.Unix() - endTime = end.Unix() - case "milli": - // 格式化时间戳为毫秒级 - startTime = start.UnixMilli() - endTime = end.UnixMilli() - case "micro": - // 格式化时间戳为微妙级 - startTime = start.UnixMicro() - endTime = end.UnixMicro() - default: - return nil, fmt.Errorf("error input parameter") - } - var query string - if params.Extras != "" { - query = fmt.Sprintf("SELECT %s FROM `%s` WHERE `%s` >= %d AND `%s` < %d AND %s", - params.Columns, params.TableName, params.TimeCol, startTime, params.TimeCol, endTime, params.Extras) - } else { - query = fmt.Sprintf("SELECT %s FROM `%s` WHERE `%s` >= %d AND `%s` < %d", - params.Columns, params.TableName, params.TimeCol, startTime, params.TimeCol, endTime) - } - log.Trace("query:", query) - filePath := fmt.Sprintf("%s/%s_export_%s.csv", params.FilePath, params.TableName, time.Now().Format("20060102150405")) - affected, err := s.exportData(query, filePath) - if err != nil { - return nil, err + if errMsg != nil { + return nil, errMsg } // put ftp - s.putFTP(params.FilePath, filepath.Base(filePath)) + if affected > 0 { + result["affected"] = affected + s.putFTP(filePath) + } // 返回结果,用于记录执行结果 - return map[string]any{ - "msg": "sucess", - "filePath": filePath, - "affected": affected, - }, nil + return result, nil } -func (s *BarProcessor) exportData(query, filePath string) (int64, error) { - rows, err := dborm.XCoreDB().Query(query) - if err != nil { +// exportSysLogOperate 导出csv +func (s BarProcessor) exportSysLogOperate(hour int, columns []string, filePath string) (int64, error) { + // 前 hour 小时 + now := time.Now() + end := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location()) + start := end.Add(-time.Duration(hour) * time.Hour) + + // 查询数据 + rows := []systemModel.SysLogOperate{} + tx := db.DB("").Model(&systemModel.SysLogOperate{}) + tx = tx.Where("opera_time >= ? and opera_time <= ?", start.UnixMilli(), end.UnixMilli()) + if err := tx.Find(&rows).Error; err != nil { return 0, err } - defer rows.Close() - - // 创建 CSV 文件 - file, err := os.Create(filePath) - if err != nil { - return 0, err + if len(rows) <= 0 { + return 0, nil } - defer file.Close() - - writer := csv.NewWriter(file) - defer writer.Flush() - - // 写入表头 - columns, _ := rows.ColumnTypes() - header := make([]string, len(columns)) - for i, col := range columns { - header[i] = col.Name() - } - if err := writer.Write(header); err != nil { - return 0, err - } - - // 写入数据 - var affected int64 = 0 - for rows.Next() { - values := make([]sql.RawBytes, len(columns)) - scanArgs := make([]interface{}, len(columns)) - for i := range values { - scanArgs[i] = &values[i] + language := "en" + // 写入csv + data := [][]string{} + data = append(data, columns) + for _, row := range rows { + // 业务类型 + businessType := "" + switch row.BusinessType { + case "0": + // 业务操作类型-其它 + businessType = i18n.TKey(language, "dictData.operType.other") + case "1": + // 业务操作类型-新增 + businessType = i18n.TKey(language, "dictData.operType.add") + case "2": + // 业务操作类型-修改 + businessType = i18n.TKey(language, "dictData.operType.edit") + case "3": + // 业务操作类型-删除 + businessType = i18n.TKey(language, "dictData.operType.delete") + case "4": + // 业务操作类型-授权 + businessType = i18n.TKey(language, "dictData.operType.auth") + case "5": + // 业务操作类型-导出 + businessType = i18n.TKey(language, "dictData.operType.export") + case "6": + // 业务操作类型-导入 + businessType = i18n.TKey(language, "dictData.operType.import") + case "7": + // 业务操作类型-强退 + businessType = i18n.TKey(language, "dictData.operType.forced quit") + case "8": + // 业务操作类型-清空数据 + businessType = i18n.TKey(language, "dictData.operType.clear") } - if err := rows.Scan(scanArgs...); err != nil { - return 0, err + // 状态 + statusValue := i18n.TKey(language, "dictData.fail") + if row.StatusFlag == "1" { + statusValue = i18n.TKey(language, "dictData.success") } - record := make([]string, len(columns)) - for i, val := range values { - if val == nil { - record[i] = "" - } else { - record[i] = string(val) + arr := make([]string, len(columns)) + for i, col := range columns { + if col == "id" { + arr[i] = fmt.Sprintf("%d", row.ID) + } + if col == "title" { + arr[i] = fmt.Sprintf("%v", row.Title) + } + if col == "businessType" || col == "business_type" { + arr[i] = fmt.Sprintf("%v", businessType) + } + if col == "operaBy" || col == "opera_by" { + arr[i] = fmt.Sprintf("%v", row.OperaBy) + } + if col == "operaUrlMethod" || col == "opera_url_method" { + arr[i] = fmt.Sprintf("%v", row.OperaUrlMethod) + } + if col == "operaUrl" || col == "opera_url" { + arr[i] = fmt.Sprintf("%v", row.OperaUrl) + } + if col == "operaIp" || col == "opera_ip" { + arr[i] = fmt.Sprintf("%v", row.OperaIp) + } + if col == "statusValue" || col == "status" { + arr[i] = fmt.Sprintf("%v", statusValue) + } + if col == "operaTime" || col == "opera_time" { + arr[i] = fmt.Sprintf("%v", date.ParseDateToStr(row.OperaTime, date.YYYY_MM_DDTHH_MM_SSZ)) + } + if col == "costTime" || col == "cost_time" { + arr[i] = fmt.Sprintf("%v", row.CostTime) } } - affected++ - if err := writer.Write(record); err != nil { - return affected, err - } + data = append(data, arr) } - return affected, nil + err := file.WriterFileCSV(data, filePath) + + return tx.RowsAffected, err } -func (s BarProcessor) putFTP(filePath, fileName string) { +// exportSMF 导出csv +func (s BarProcessor) exportSMF(hour int, columns []string, filePath string) (int64, error) { + // 前 hour 小时 + now := time.Now() + end := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location()) + start := end.Add(-time.Duration(hour) * time.Hour) + + // 查询数据 + rows := []neDataModel.CDREventSMF{} + tx := db.DB("").Model(&neDataModel.CDREventSMF{}) + tx = tx.Where("created_at >= ? and created_at <= ?", start.UnixMilli(), end.UnixMilli()) + if err := tx.Find(&rows).Error; err != nil { + return 0, err + } + if len(rows) <= 0 { + return 0, nil + } + + // 写入csv + data := [][]string{} + data = append(data, columns) + for _, row := range rows { + // 解析 JSON 字符串为 map + var cdrJSON map[string]interface{} + err := json.Unmarshal([]byte(row.CdrJson), &cdrJSON) + if err != nil { + logger.Warnf("CDRExport Error parsing JSON: %s", err.Error()) + continue + } + + // 计费ID + chargingID := "" + if v, ok := cdrJSON["chargingID"]; ok && v != nil { + chargingID = fmt.Sprint(parse.Number(v)) + } + // 订阅 ID 类型 + subscriptionIDType := "-" + // 订阅 ID 数据 + subscriptionIDData := "-" + if v, ok := cdrJSON["subscriberIdentifier"]; ok && v != nil { + if sub, subOk := v.(map[string]any); subOk && sub != nil { + subscriptionIDType = sub["subscriptionIDType"].(string) + subscriptionIDData = sub["subscriptionIDData"].(string) + } + } + + // 网络功能 IPv4 地址 + networkFunctionIPv4Address := "" + if v, ok := cdrJSON["nFunctionConsumerInformation"]; ok && v != nil { + if conInfo, conInfoOk := v.(map[string]any); conInfoOk && conInfo != nil { + networkFunctionIPv4Address = conInfo["networkFunctionIPv4Address"].(string) + } + } + + // 数据量上行链路 + var dataVolumeUplink int64 = 0 + // 数据量下行链路 + var dataVolumeDownlink int64 = 0 + // 数据总量 + var dataTotalVolume int64 = 0 + if v, ok := cdrJSON["listOfMultipleUnitUsage"]; ok && v != nil { + usageList := v.([]any) + if len(usageList) > 0 { + for _, used := range usageList { + usedUnit := used.(map[string]any) + usedUnitList := usedUnit["usedUnitContainer"].([]any) + if len(usedUnitList) > 0 { + for _, data := range usedUnitList { + udata := data.(map[string]any) + if dup, dupOk := udata["dataVolumeUplink"]; dupOk { + dataVolumeUplink += parse.Number(dup) + } + if ddown, ddownOk := udata["dataVolumeDownlink"]; ddownOk { + dataVolumeDownlink += parse.Number(ddown) + } + if dt, dtOk := udata["dataTotalVolume"]; dtOk { + dataTotalVolume += parse.Number(dt) + } + } + } + } + } + } + // 时长 + duration := "-" + if v, ok := cdrJSON["duration"]; ok && v != nil { + duration = fmt.Sprint(parse.Number(v)) + } + // 调用时间 + invocationTimestamp := "" + if v, ok := cdrJSON["invocationTimestamp"]; ok && v != nil { + invocationTimestamp = v.(string) + } + // 记录打开时间 + User_Identifier := "" + SSC_Mode := "" + RAT_Type := "" + DNN_ID := "" + PDU_Type := "" + PDU_IPv4 := "" + PDU_IPv6 := "" + if v, ok := cdrJSON["pDUSessionChargingInformation"]; ok && v != nil { + pduInfo := v.(map[string]any) + + if v, ok := pduInfo["userIdentifier"]; ok && v != nil { + User_Identifier = v.(string) + } + if v, ok := pduInfo["sSCMode"]; ok && v != nil { + SSC_Mode = v.(string) + } + if v, ok := pduInfo["rATType"]; ok && v != nil { + RAT_Type = v.(string) + } + if v, ok := pduInfo["dNNID"]; ok && v != nil { + DNN_ID = v.(string) + } + if v, ok := pduInfo["pDUType"]; ok && v != nil { + PDU_Type = v.(string) + } + if v, ok := pduInfo["pDUAddress"]; ok && v != nil { + pDUAddress := v.(map[string]any) + if addr, ok := pDUAddress["pDUIPv4Address"]; ok && addr != nil { + PDU_IPv4 = addr.(string) + } + if addr, ok := pDUAddress["pDUIPv6AddresswithPrefix"]; ok && addr != nil { + PDU_IPv6 = addr.(string) + } + } + } + + // 记录网络参数ID + recordNFID := "" + if v, ok := cdrJSON["recordingNetworkFunctionID"]; ok && v != nil { + recordNFID = v.(string) + } + + //记录开始时间 + recordOpeningTime := "" + if v, ok := cdrJSON["recordOpeningTime"]; ok && v != nil { + recordOpeningTime = v.(string) + } + + //记录类型 + recordType := "" + if v, ok := cdrJSON["recordType"]; ok && v != nil { + recordType = v.(string) + } + + arr := make([]string, len(columns)) + for i, col := range columns { + if col == "id" { + arr[i] = fmt.Sprintf("%d", row.ID) + } + if col == "neName" || col == "ne_name" { + arr[i] = fmt.Sprintf("%v", row.NeName) + } + // 单层json + if strings.HasPrefix("json", col) { + key := strings.TrimPrefix(col, "json") + if v, ok := cdrJSON[key]; ok && v != nil { + arr[i] = fmt.Sprintf("%v", v) + } + } + + // 计费ID + if col == "chargingID" || col == "charging_id" { + arr[i] = fmt.Sprintf("%v", chargingID) + } + if col == "subscriptionIDData" || col == "subscription_id_data" { + arr[i] = fmt.Sprintf("%v", subscriptionIDData) + } + if col == "subscriptionIDType" || col == "subscription_id_type" { + arr[i] = fmt.Sprintf("%v", subscriptionIDType) + } + if col == "dataVolumeUplink" || col == "data_volume_uplink" { + arr[i] = fmt.Sprintf("%v", dataVolumeUplink) + } + if col == "dataVolumeDownlink" || col == "data_volume_downlink" { + arr[i] = fmt.Sprintf("%v", dataVolumeDownlink) + } + if col == "dataTotalVolume" || col == "data_total_volume" { + arr[i] = fmt.Sprintf("%v", dataTotalVolume) + } + if col == "duration" { + arr[i] = fmt.Sprintf("%v", duration) + } + if col == "invocationTimestamp" || col == "invocation_timestamp" { + arr[i] = fmt.Sprintf("%v", invocationTimestamp) + } + if col == "userIdentifier" || col == "user_identifier" { + arr[i] = fmt.Sprintf("%v", User_Identifier) + } + if col == "sscMode" || col == "ssc_mode" { + arr[i] = fmt.Sprintf("%v", SSC_Mode) + } + if col == "dnnId" || col == "dnn_id" { + arr[i] = fmt.Sprintf("%v", DNN_ID) + } + if col == "pduType" || col == "pdu_type" { + arr[i] = fmt.Sprintf("%v", PDU_Type) + } + if col == "ratType" || col == "rat_type" { + arr[i] = fmt.Sprintf("%v", RAT_Type) + } + if col == "pduIpv4" || col == "pdu_ipv4" { + arr[i] = fmt.Sprintf("%v", PDU_IPv4) + } + if col == "pduIpv6" || col == "pdu_ipv6" { + arr[i] = fmt.Sprintf("%v", PDU_IPv6) + } + if col == "networkFunctionIPv4Address" || col == "network_function_ipv4_address" { + arr[i] = fmt.Sprintf("%v", networkFunctionIPv4Address) + } + if col == "recordNFID" || col == "record_nfId" { + arr[i] = fmt.Sprintf("%v", recordNFID) + } + if col == "recordType" || col == "record_type" { + arr[i] = fmt.Sprintf("%v", recordType) + } + if col == "recordOpeningTime" || col == "record_opening_time" { + arr[i] = fmt.Sprintf("%v", recordOpeningTime) + } + } + data = append(data, arr) + } + + err := file.WriterFileCSV(data, filePath) + + return tx.RowsAffected, err +} + +// exportIMS 导出csv +func (s BarProcessor) exportIMS(hour int, columns []string, filePath string) (int64, error) { + // 前 hour 小时 + now := time.Now() + end := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location()) + start := end.Add(-time.Duration(hour) * time.Hour) + + // 查询数据 + rows := []neDataModel.CDREventIMS{} + tx := db.DB("").Model(&neDataModel.CDREventIMS{}) + tx = tx.Where("created_at >= ? and created_at <= ?", start.UnixMilli(), end.UnixMilli()) + if err := tx.Find(&rows).Error; err != nil { + return 0, err + } + if len(rows) <= 0 { + return 0, nil + } + + // 读取字典数据 CDR SIP响应代码类别类型 + dictCDRSipCode := systemService.NewSysDictData.FindByType("cdr_sip_code") + // 读取字典数据 CDR 呼叫类型 + dictCDRCallType := systemService.NewSysDictData.FindByType("cdr_call_type") + language := "en" // 语言 + + // 写入csv + data := [][]string{} + data = append(data, columns) + for _, row := range rows { + // 解析 JSON 字符串为 map + var cdrJSON map[string]interface{} + err := json.Unmarshal([]byte(row.CdrJson), &cdrJSON) + if err != nil { + logger.Warnf("CDRExport Error parsing JSON: %s", err.Error()) + continue + } + + arr := make([]string, len(columns)) + for i, col := range columns { + if col == "id" { + arr[i] = fmt.Sprintf("%d", row.ID) + } + if col == "neName" || col == "ne_name" { + arr[i] = fmt.Sprintf("%v", row.NeName) + } + // 单层json + if strings.HasPrefix("json", col) { + key := strings.TrimPrefix(col, "json") + if v, ok := cdrJSON[key]; ok && v != nil { + arr[i] = fmt.Sprintf("%v", v) + } + } + + // 记录类型 + if col == "recordType" || col == "record_type" { + if v, ok := cdrJSON["recordType"]; ok && v != nil { + arr[i] = fmt.Sprintf("%v", v) + } + } + + // 呼叫类型 + if col == "callType" || col == "call_type" { + if v, ok := cdrJSON["callType"]; ok && v != nil { + callType := "sms" + callTypeLable := "SMS" + if v, ok := cdrJSON["callType"]; ok && v != nil { + callType = v.(string) + for _, v := range dictCDRCallType { + if callType == v.DataValue { + callTypeLable = i18n.TKey(language, v.DataLabel) + break + } + } + } + arr[i] = fmt.Sprintf("%v", callTypeLable) + } + } + + // 被叫 + if col == "calledParty" || col == "called_party" { + if v, ok := cdrJSON["calledParty"]; ok && v != nil { + arr[i] = fmt.Sprintf("%v", v) + } + } + // 主叫 + if col == "callerParty" || col == "caller_party" { + if v, ok := cdrJSON["callerParty"]; ok && v != nil { + arr[i] = fmt.Sprintf("%v", v) + } + } + // 时长 + if col == "callDuration" || col == "call_duration" { + if v, ok := cdrJSON["callDuration"]; ok && v != nil { + arr[i] = fmt.Sprintf("%ds", parse.Number(v)) + } + } + // 呼叫结果 非短信都有code作为结果 sms短信都ok + if col == "cause" { + if v, ok := cdrJSON["cause"]; ok && v != nil { + cause := fmt.Sprint(v) + callResult := "Success" + for _, v := range dictCDRSipCode { + if cause == v.DataValue { + callResult = i18n.TKey(language, v.DataLabel) + break + } + } + arr[i] = fmt.Sprintf("%v", callResult) + } + } + // 呼叫时间 + if col == "seizureTime" || col == "seizure_time" { + if v, ok := cdrJSON["seizureTime"]; ok && v != nil { + if seizureTime := parse.Number(v); seizureTime > 0 { + arr[i] = date.ParseDateToStr(seizureTime, date.YYYY_MM_DDTHH_MM_SSZ) + } else { + arr[i] = fmt.Sprintf("%v", v) + } + + } + } + // 挂断时间 + if col == "releaseTime" || col == "release_time" { + if v, ok := cdrJSON["releaseTime"]; ok && v != nil { + if seizureTime := parse.Number(v); seizureTime > 0 { + arr[i] = date.ParseDateToStr(seizureTime, date.YYYY_MM_DDTHH_MM_SSZ) + } else { + arr[i] = fmt.Sprintf("%v", v) + } + + } + } + } + data = append(data, arr) + } + + err := file.WriterFileCSV(data, filePath) + + return tx.RowsAffected, err +} + +// exportSMSC 导出csv +func (s BarProcessor) exportSMSC(hour int, columns []string, filePath string) (int64, error) { + // 前 hour 小时 + now := time.Now() + end := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location()) + start := end.Add(-time.Duration(hour) * time.Hour) + + // 查询数据 + rows := []neDataModel.CDREventSMSC{} + tx := db.DB("").Model(&neDataModel.CDREventSMSC{}) + tx = tx.Where("created_at >= ? and created_at <= ?", start.UnixMilli(), end.UnixMilli()) + if err := tx.Find(&rows).Error; err != nil { + return 0, err + } + if len(rows) <= 0 { + return 0, nil + } + + // 读取字典数据 CDR 原因码 + dictCDRCauseCode := systemService.NewSysDictData.FindByType("cdr_cause_code") + language := "en" + + // 写入csv + data := [][]string{} + data = append(data, columns) + for _, row := range rows { + // 解析 JSON 字符串为 map + var cdrJSON map[string]interface{} + err := json.Unmarshal([]byte(row.CdrJson), &cdrJSON) + if err != nil { + logger.Warnf("CDRExport Error parsing JSON: %s", err.Error()) + continue + } + + // 记录类型 + recordType := "" + if v, ok := cdrJSON["recordType"]; ok && v != nil { + recordType = v.(string) + } + // 服务类型 + serviceType := "" + if v, ok := cdrJSON["serviceType"]; ok && v != nil { + serviceType = v.(string) + } + // 被叫 + called := "" + if v, ok := cdrJSON["calledParty"]; ok && v != nil { + called = v.(string) + } + // 主叫 + caller := "" + if v, ok := cdrJSON["callerParty"]; ok && v != nil { + caller = v.(string) + } + // 呼叫结果 0失败,1成功 + callResult := "Fail" + if v, ok := cdrJSON["result"]; ok && v != nil { + resultVal := parse.Number(v) + if resultVal == 1 { + callResult = "Success" + } + } + // 结果原因 + if v, ok := cdrJSON["cause"]; ok && v != nil && callResult == "Fail" { + cause := fmt.Sprint(v) + for _, v := range dictCDRCauseCode { + if cause == v.DataValue { + callResult = fmt.Sprintf("%s, %s", callResult, i18n.TKey(language, v.DataLabel)) + break + } + } + } + // 取时间 + timeStr := "" + if v, ok := cdrJSON["updateTime"]; ok && v != nil { + if releaseTime := parse.Number(v); releaseTime > 0 { + timeStr = date.ParseDateToStr(releaseTime, date.YYYY_MM_DDTHH_MM_SSZ) + } else { + timeStr = v.(string) + } + } + + arr := make([]string, len(columns)) + for i, v := range columns { + if v == "id" { + arr[i] = fmt.Sprintf("%d", row.ID) + } + if v == "neName" || v == "ne_name" { + arr[i] = fmt.Sprintf("%v", row.NeName) + } + // 单层json + if strings.HasPrefix("json", v) { + key := strings.TrimPrefix(v, "json") + if v, ok := cdrJSON[key]; ok && v != nil { + arr[i] = fmt.Sprintf("%v", v) + } + } + + if v == "recordType" || v == "record_type" { + arr[i] = fmt.Sprintf("%v", recordType) + } + if v == "serviceType" || v == "service_type" { + arr[i] = fmt.Sprintf("%v", serviceType) + } + if v == "callerParty" || v == "caller_party" { + arr[i] = fmt.Sprintf("%v", caller) + } + if v == "calledParty" || v == "called_party" { + arr[i] = fmt.Sprintf("%v", called) + } + if v == "result" { + arr[i] = fmt.Sprintf("%v", callResult) + } + if v == "updateTime" || v == "update_time" { + arr[i] = fmt.Sprintf("%v", timeStr) + } + } + data = append(data, arr) + } + + err := file.WriterFileCSV(data, filePath) + + return tx.RowsAffected, err +} + +// exportSGWC 导出csv +func (s BarProcessor) exportSGWC(hour int, columns []string, filePath string) (int64, error) { + // 前 hour 小时 + now := time.Now() + end := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location()) + start := end.Add(-time.Duration(hour) * time.Hour) + + // 查询数据 + rows := []neDataModel.CDREventSMSC{} + tx := db.DB("").Model(&neDataModel.CDREventSMSC{}) + tx = tx.Where("created_at >= ? and created_at <= ?", start.UnixMilli(), end.UnixMilli()) + if err := tx.Find(&rows).Error; err != nil { + return 0, err + } + if len(rows) <= 0 { + return 0, nil + } + + // 写入csv + data := [][]string{} + data = append(data, columns) + for _, row := range rows { + // 解析 JSON 字符串为 map + var cdrJSON map[string]interface{} + err := json.Unmarshal([]byte(row.CdrJson), &cdrJSON) + if err != nil { + logger.Warnf("CDRExport Error parsing JSON: %s", err.Error()) + continue + } + + // 计费ID + chargingID := "" + if v, ok := cdrJSON["chargingID"]; ok && v != nil { + chargingID = fmt.Sprint(parse.Number(v)) + } + // IMSI + servedIMSI := "" + if v, ok := cdrJSON["servedIMSI"]; ok && v != nil { + servedIMSI = fmt.Sprint(v) + } + // MSISDN + servedMSISDN := "" + if v, ok := cdrJSON["servedMSISDN"]; ok && v != nil { + servedMSISDN = fmt.Sprint(v) + } + // pGWAddressUsed + pGWAddressUsed := "" + if v, ok := cdrJSON["pGWAddressUsed"]; ok && v != nil { + pGWAddressUsed = fmt.Sprint(v) + } + if v, ok := cdrJSON["GGSNAddress"]; ok && v != nil { + pGWAddressUsed = fmt.Sprint(v) + } + // sGWAddress + sGWAddress := "" + if v, ok := cdrJSON["sGWAddress"]; ok && v != nil { + sGWAddress = fmt.Sprint(v) + } + if v, ok := cdrJSON["SGSNAddress"]; ok && v != nil { + sGWAddress = fmt.Sprint(v) + } + // recordType + recordType := "" + if v, ok := cdrJSON["recordType"]; ok && v != nil { + recordType = fmt.Sprint(v) + } + // rATType + rATType := "" + if v, ok := cdrJSON["rATType"]; ok && v != nil { + rATType = fmt.Sprint(v) + } + // pdpPDNType + pdpPDNType := "" + if v, ok := cdrJSON["pdpPDNType"]; ok && v != nil { + pdpPDNType = fmt.Sprint(v) + } + // servedPDPPDNAddress + servedPDPPDNAddress := "" + if v, ok := cdrJSON["servedPDPPDNAddress"]; ok && v != nil { + servedPDPPDNAddress = fmt.Sprint(v) + } + // servedPDPPDNAddress + servingNodeAddress := []string{} + if v, ok := cdrJSON["servingNodeAddress"]; ok && v != nil { + for _, v := range v.([]any) { + servingNodeAddress = append(servingNodeAddress, fmt.Sprint(v)) + } + } + // servingNodeType + servingNodeType := []string{} + if v, ok := cdrJSON["servingNodeType"]; ok && v != nil { + for _, v := range v.([]any) { + if v, ok := v.(map[string]any)["servingNodeType"]; ok && v != nil { + servingNodeType = append(servingNodeType, fmt.Sprint(v)) + } + } + } + // accessPointNameNI + accessPointNameNI := "" + if v, ok := cdrJSON["accessPointNameNI"]; ok && v != nil { + accessPointNameNI = fmt.Sprint(v) + } + // causeForRecClosing + causeForRecClosing := "" + if v, ok := cdrJSON["causeForRecClosing"]; ok && v != nil { + causeForRecClosing = fmt.Sprint(v) + } + // recordSequenceNumber + recordSequenceNumber := "" + if v, ok := cdrJSON["recordSequenceNumber"]; ok && v != nil { + recordSequenceNumber = fmt.Sprint(v) + } + // localRecordSequenceNumber + localRecordSequenceNumber := "" + if v, ok := cdrJSON["localRecordSequenceNumber"]; ok && v != nil { + localRecordSequenceNumber = fmt.Sprint(v) + } + // 数据量上行链路 + var dataVolumeGPRSUplink int64 = 0 + // 数据量下行链路 + var dataVolumeGPRSDownlink int64 = 0 + if v, ok := cdrJSON["listOfTrafficVolumes"]; ok && v != nil { + usageList := v.([]any) + if len(usageList) > 0 { + for _, used := range usageList { + usedUnit := used.(map[string]any) + if dup, dupOk := usedUnit["dataVolumeGPRSUplink"]; dupOk { + dataVolumeGPRSUplink = parse.Number(dup) + } + if ddown, ddownOk := usedUnit["dataVolumeGPRSDownlink"]; ddownOk { + dataVolumeGPRSDownlink = parse.Number(ddown) + } + } + } + } + // 时长 + duration := "-" + if v, ok := cdrJSON["duration"]; ok && v != nil { + duration = fmt.Sprint(parse.Number(v)) + } + // 调用时间 + invocationTimestamp := "" + if v, ok := cdrJSON["recordOpeningTime"]; ok && v != nil { + invocationTimestamp = v.(string) + } + + arr := make([]string, len(columns)) + for i, v := range columns { + if v == "id" { + arr[i] = fmt.Sprintf("%d", row.ID) + } + if v == "neName" || v == "ne_name" { + arr[i] = fmt.Sprintf("%v", row.NeName) + } + // 单层json + if strings.HasPrefix("json", v) { + key := strings.TrimPrefix(v, "json") + if v, ok := cdrJSON[key]; ok && v != nil { + arr[i] = fmt.Sprintf("%v", v) + } + } + + if v == "chargingID" || v == "charging_id" { + arr[i] = fmt.Sprintf("%v", chargingID) + } + if v == "servedIMSI" || v == "served_imsi" { + arr[i] = fmt.Sprintf("%v", servedIMSI) + } + if v == "servedMSISDN" || v == "served_msisdn" { + arr[i] = fmt.Sprintf("%v", servedMSISDN) + } + if v == "dataVolumeGPRSUplink" || v == "data_volume_gprs_uplink" { + arr[i] = fmt.Sprintf("%v", dataVolumeGPRSUplink) + } + if v == "dataVolumeGPRSDownlink" || v == "data_volume_gprs_downlink" { + arr[i] = fmt.Sprintf("%v", dataVolumeGPRSDownlink) + } + if v == "duration" { + arr[i] = fmt.Sprintf("%v", duration) + } + if v == "invocationTimestamp" || v == "invocation_timestamp" { + arr[i] = fmt.Sprintf("%v", invocationTimestamp) + } + if v == "pgwAddressUsed" || v == "pgw_address_used" { + arr[i] = fmt.Sprintf("%v", pGWAddressUsed) + } + if v == "GGSNAddress" || v == "ggsn_address" { + arr[i] = fmt.Sprintf("%v", pGWAddressUsed) + } + if v == "sgwAddress" || v == "sgw_address" { + arr[i] = fmt.Sprintf("%v", sGWAddress) + } + if v == "SGSNAddress" || v == "sgsn_address" { + arr[i] = fmt.Sprintf("%v", sGWAddress) + } + if v == "ratType" || v == "rat_type" { + arr[i] = fmt.Sprintf("%v", rATType) + } + if v == "pdpPDNType" || v == "pdp_pdn_type" { + arr[i] = fmt.Sprintf("%v", pdpPDNType) + } + if v == "servedPDPPDNAddress" || v == "served_pdppdn_address" { + arr[i] = fmt.Sprintf("%v", servedPDPPDNAddress) + } + if v == "servingNodeAddress" || v == "serving_node_address" { + arr[i] = fmt.Sprintf("%v", strings.Join(servingNodeAddress, ",")) + } + if v == "servingNodeType" || v == "serving_node_type" { + arr[i] = fmt.Sprintf("%v", strings.Join(servingNodeType, ",")) + } + if v == "accessPointNameNI" || v == "access_point_name_ni" { + arr[i] = fmt.Sprintf("%v", accessPointNameNI) + } + if v == "causeForRecClosing" || v == "cause_for_rec_closing" { + arr[i] = fmt.Sprintf("%v", causeForRecClosing) + } + if v == "recordSequenceNumber" || v == "record_sequence_number" { + arr[i] = fmt.Sprintf("%v", recordSequenceNumber) + } + if v == "localRecordSequenceNumber" || v == "local_record_sequence_number" { + arr[i] = fmt.Sprintf("%v", localRecordSequenceNumber) + } + if v == "recordType" || v == "record_type" { + arr[i] = fmt.Sprintf("%v", recordType) + } + if v == "recordOpeningTime" || v == "record_opening_time" { + arr[i] = fmt.Sprintf("%v", invocationTimestamp) + } + } + data = append(data, arr) + } + + err := file.WriterFileCSV(data, filePath) + + return tx.RowsAffected, err +} + +// putFTP 提交到服务器ssh +func (s BarProcessor) putFTP(localFilePath string) { // 获取配置 var cfgData struct { Password string `json:"password" ` @@ -198,8 +946,6 @@ func (s BarProcessor) putFTP(filePath, fileName string) { return } - localFilePath := filepath.Join(filePath, fileName) - connSSH := ssh.ConnSSH{ User: cfgData.Username, Password: cfgData.Password, @@ -221,7 +967,7 @@ func (s BarProcessor) putFTP(filePath, fileName string) { } defer sftpClient.Close() // 远程文件 - remotePath := filepath.Join(cfgData.Dir, path.Base(filePath), fileName) + remotePath := filepath.Join(cfgData.Dir, path.Base(localFilePath), filepath.Base(localFilePath)) // 复制到远程 if err = sftpClient.CopyFileLocalToRemote(localFilePath, remotePath); err != nil { logger.Errorf("putFTP uploading error: %v", err) diff --git a/src/modules/crontask/processor/monitor_sys_resource/monitor_sys_resource.go b/src/modules/crontask/processor/monitor_sys_resource/monitor_sys_resource.go index 33a7845e..bd242c22 100644 --- a/src/modules/crontask/processor/monitor_sys_resource/monitor_sys_resource.go +++ b/src/modules/crontask/processor/monitor_sys_resource/monitor_sys_resource.go @@ -29,7 +29,7 @@ func (s *MonitorSysResourceProcessor) Execute(data any) (any, error) { s.count++ // 执行次数加一 options := data.(cron.JobData) sysJob := options.SysJob - logger.Infof("重复 %v 任务ID %d", options.Repeat, sysJob.JobId) + logger.Infof("重复:%v 任务ID:%d 执行次数:%d", options.Repeat, sysJob.JobId, s.count) // 读取参数值 var params struct { diff --git a/src/modules/crontask/processor/ne_alarm_state_check/ne_alarm_state_check.go b/src/modules/crontask/processor/ne_alarm_state_check/ne_alarm_state_check.go index f3bc2bf0..cb1d4b32 100644 --- a/src/modules/crontask/processor/ne_alarm_state_check/ne_alarm_state_check.go +++ b/src/modules/crontask/processor/ne_alarm_state_check/ne_alarm_state_check.go @@ -51,7 +51,7 @@ func (s *NeAlarmStateCheckProcessor) Execute(data any) (any, error) { s.count++ // 执行次数加一 options := data.(cron.JobData) sysJob := options.SysJob - logger.Infof("重复 %v 任务ID %d", options.Repeat, sysJob.JobId) + logger.Infof("重复:%v 任务ID:%d 执行次数:%d", options.Repeat, sysJob.JobId, s.count) // 返回结果,用于记录执行结果 result := map[string]any{ "count": s.count, diff --git a/src/modules/crontask/processor/ne_config_backup/ne_config_backup.go b/src/modules/crontask/processor/ne_config_backup/ne_config_backup.go index 4d4371da..e08549b3 100644 --- a/src/modules/crontask/processor/ne_config_backup/ne_config_backup.go +++ b/src/modules/crontask/processor/ne_config_backup/ne_config_backup.go @@ -27,7 +27,7 @@ func (s *NeConfigBackupProcessor) Execute(data any) (any, error) { s.count++ // 执行次数加一 options := data.(cron.JobData) sysJob := options.SysJob - logger.Infof("重复 %v 任务ID %d", options.Repeat, sysJob.JobId) + logger.Infof("重复:%v 任务ID:%d 执行次数:%d", options.Repeat, sysJob.JobId, s.count) // 返回结果,用于记录执行结果 result := map[string]any{ "count": s.count, diff --git a/src/modules/crontask/processor/ne_data_udm/ne_data_udm.go b/src/modules/crontask/processor/ne_data_udm/ne_data_udm.go index a53ea560..ad16557a 100644 --- a/src/modules/crontask/processor/ne_data_udm/ne_data_udm.go +++ b/src/modules/crontask/processor/ne_data_udm/ne_data_udm.go @@ -29,7 +29,7 @@ func (s *NeDataUDM) Execute(data any) (any, error) { s.count++ // 执行次数加一 options := data.(cron.JobData) sysJob := options.SysJob - logger.Infof("重复 %v 任务ID %d", options.Repeat, sysJob.JobId) + logger.Infof("重复:%v 任务ID:%d 执行次数:%d", options.Repeat, sysJob.JobId, s.count) // 返回结果,用于记录执行结果 result := map[string]any{ "count": s.count, diff --git a/src/modules/crontask/processor/processor.go b/src/modules/crontask/processor/processor.go index e542a5bc..29aa5a30 100644 --- a/src/modules/crontask/processor/processor.go +++ b/src/modules/crontask/processor/processor.go @@ -2,11 +2,11 @@ package processor import ( "be.ems/src/framework/cron" - "be.ems/src/modules/crontask/processor/backupEtcFromNE" - "be.ems/src/modules/crontask/processor/delExpiredNeBackup" - "be.ems/src/modules/crontask/processor/deleteExpiredRecord" + processorDeleteAlarmRecord "be.ems/src/modules/crontask/processor/delete_alarm_record" + processorDeleteDataRecord "be.ems/src/modules/crontask/processor/delete_data_record" + processorDeleteKPIRecord "be.ems/src/modules/crontask/processor/delete_kpi_record" + processorDeleteNeConfigBackup "be.ems/src/modules/crontask/processor/delete_ne_config_backup" "be.ems/src/modules/crontask/processor/exportTable" - "be.ems/src/modules/crontask/processor/getStateFromNE" processorMonitorSysResource "be.ems/src/modules/crontask/processor/monitor_sys_resource" processorNeAlarmStateCheck "be.ems/src/modules/crontask/processor/ne_alarm_state_check" processorNeConfigBackup "be.ems/src/modules/crontask/processor/ne_config_backup" @@ -24,11 +24,16 @@ func InitCronQueue() { cron.CreateQueue("ne_data_udm", processorNeDataUDM.NewProcessor) // 网元告警-状态检查 cron.CreateQueue("ne_alarm_state_check", processorNeAlarmStateCheck.NewProcessor) - // delete expired NE backup file - cron.CreateQueue("delExpiredNeBackup", delExpiredNeBackup.NewProcessor) - cron.CreateQueue("deleteExpiredRecord", deleteExpiredRecord.NewProcessor) - cron.CreateQueue("backupEtcFromNE", backupEtcFromNE.NewProcessor) - cron.CreateQueue("getStateFromNE", getStateFromNE.NewProcessor) + + // 删除-表内数据记录 + cron.CreateQueue("delete_data_record", processorDeleteDataRecord.NewProcessor) + // 删除-告警数据记录 + cron.CreateQueue("delete_alarm_record", processorDeleteAlarmRecord.NewProcessor) + // 删除-KPI数据记录 + cron.CreateQueue("delete_kpi_record", processorDeleteKPIRecord.NewProcessor) + // 删除-网元配置文件定期备份 + cron.CreateQueue("delete_ne_config_backup", processorDeleteNeConfigBackup.NewProcessor) + cron.CreateQueue("exportTable", exportTable.NewProcessor) cron.CreateQueue("removeFile", removeFile.NewProcessor) }