From 5b29776a97a97e2d85341f1a06f3e3822ad2878f Mon Sep 17 00:00:00 2001 From: simonzhangsz Date: Fri, 11 Oct 2024 16:33:35 +0800 Subject: [PATCH] add: scheduled task to remove exported files --- database/install/sys_dict_data1_i18n_zh.sql | 1 + database/install/sys_dict_data2_i18n_en.sql | 1 + database/install/sys_job.sql | 1 + .../upgrade/upg_sys_dict_data1_i18n_zh.sql | 1 + .../upgrade/upg_sys_dict_data2_i18n_en.sql | 1 + database/upgrade/upg_sys_job.sql | 1 + src/modules/crontask/processor/processor.go | 2 + .../processor/removeFile/removeFile.go | 159 ++++++++++++++++++ 8 files changed, 167 insertions(+) create mode 100644 src/modules/crontask/processor/removeFile/removeFile.go diff --git a/database/install/sys_dict_data1_i18n_zh.sql b/database/install/sys_dict_data1_i18n_zh.sql index b69b45d7..cbb6baf9 100644 --- a/database/install/sys_dict_data1_i18n_zh.sql +++ b/database/install/sys_dict_data1_i18n_zh.sql @@ -690,5 +690,6 @@ INSERT INTO `sys_dict_data` VALUES (2179, 2179, 'dictData.cdr_cause_code.42', ' INSERT INTO `sys_dict_data` VALUES (2180, 2180, 'dictData.cdr_cause_code.47', '资源不可用', 'i18n_zh', '', '', '1', 'supervisor', 1725877564156, '', 0, ''); INSERT INTO `sys_dict_data` VALUES (2181, 2181, 'dictData.cdr_cause_code.50', '请求的设施未订阅', 'i18n_zh', '', '', '1', 'supervisor', 1725877564156, '', 0, ''); INSERT INTO `sys_dict_data` VALUES (2182, 2182, 'job.exportSMSCCDR', '定期从短信话单表导出文件至指定目录', 'i18n_zh', '', '', '1', 'supervisor', 1721902269805, '', 0, ''); +INSERT INTO `sys_dict_data` VALUES (2183, 2183, 'job.removeExportedFiles', '定期删除指定目录过期文件', 'i18n_zh', '', '', '1', 'supervisor', 1721902269805, '', 0, ''); SET FOREIGN_KEY_CHECKS = 1; diff --git a/database/install/sys_dict_data2_i18n_en.sql b/database/install/sys_dict_data2_i18n_en.sql index 89b7e292..27eb88a7 100644 --- a/database/install/sys_dict_data2_i18n_en.sql +++ b/database/install/sys_dict_data2_i18n_en.sql @@ -690,5 +690,6 @@ INSERT INTO `sys_dict_data` VALUES (4179, 4179, 'dictData.cdr_cause_code.42', 'C INSERT INTO `sys_dict_data` VALUES (4180, 4180, 'dictData.cdr_cause_code.47', 'Resources Unavailable Unspec', 'i18n_en', '', '', '1', 'supervisor', 1725877564156, '', 0, ''); INSERT INTO `sys_dict_data` VALUES (4181, 4181, 'dictData.cdr_cause_code.50', 'Requested Facility Not Subscribed', 'i18n_en', '', '', '1', 'supervisor', 1725877564156, '', 0, ''); INSERT INTO `sys_dict_data` VALUES (4182, 4182, 'job.exportSMSCCDR', 'Export regularly from SMSC CDR table', 'i18n_en', '', '', '1', 'supervisor', 1721902269805, '', 0, ''); +INSERT INTO `sys_dict_data` VALUES (4183, 4183, 'job.removeExportedFiles', 'Regularly delete expired files in the specified directory', 'i18n_en', '', '', '1', 'supervisor', 1721902269805, '', 0, ''); SET FOREIGN_KEY_CHECKS = 1; diff --git a/database/install/sys_job.sql b/database/install/sys_job.sql index 62ac54bf..a41728d4 100644 --- a/database/install/sys_job.sql +++ b/database/install/sys_job.sql @@ -41,5 +41,6 @@ INSERT INTO `sys_job` VALUES (11, 'job.exportOperateLog', 'SYSTEM', 'exportTable INSERT INTO `sys_job` VALUES (12, 'job.exportIMSCDR', 'SYSTEM', 'exportTable', '{\"duration\":1,\"tableName\":\"cdr_event_ims\",\"columns\":\"id,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.recordType\')) as record_type,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.callType\')) as call_type,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.callerParty\')) as caller_party,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.calledParty\')) as called_party,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.callDuration\')) as call_duration,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.serviceResult\')) as service_result,DATE_FORMAT(FROM_UNIXTIME(timestamp), \'%Y-%m-%d %H:%i:%s\') AS timestamp\",\"timeCol\":\"timestamp\",\"timeUnit\":\"second\",\"extras\":\"\",\"filePath\":\"/usr/local/omc/backup/ims_cdr\"}', '0 0 0/1 * * ?', '3', '0', '1', '1', 'supervisor', 1698478134842, 'admin', 1722224659251, ''); INSERT INTO `sys_job` VALUES (13, 'job.exportSMFCDR', 'SYSTEM', 'exportTable', '{\"duration\":1,\"tableName\":\"cdr_event_smf\",\"columns\":\"id,ne_type,ne_name,rm_uid,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.recordType\')) AS record_type,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.chargingID\')) AS charging_id,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.subscriberIdentifier.subscriptionIDType\')) AS subscriber_id_type,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.subscriberIdentifier.subscriptionIDData\')) AS subscriber_id_data,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.duration\')) AS duration,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.invocationTimestamp\')) as invocationTimestamp,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.listOfMultipleUnitUsage[*].usedUnitContainer[*].dataVolumeUplink\')) AS data_volume_uplink,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.listOfMultipleUnitUsage[*].usedUnitContainer[*].dataVolumeDownlink\')) AS data_volume_downlink,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.listOfMultipleUnitUsage[*].usedUnitContainer[*].dataTotalVolume\')) AS data_total_volume,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.pDUSessionChargingInformation.pDUAddress.pDUIPv4Address\')) AS pdu_ipv4_address,timestamp\",\"timeCol\":\"timestamp\",\"timeUnit\":\"second\",\"extras\":\"\",\"filePath\":\"/usr/local/omc/backup/smf_cdr\"}', '0 0 0/1 * * ?', '3', '0', '1', '1', 'supervisor', 1698478134842, 'admin', 1724309047797, ''); INSERT INTO `sys_job` VALUES (14, 'job.exportSMSCCDR', 'SYSTEM', 'exportTable', '{\"duration\":1,\"tableName\":\"cdr_event_smsc\",\"columns\":\"id,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.recordType\')) as record_type,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.serviceType\')) as service_type,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.callerParty\')) as caller_party,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.calledParty\')) as called_party,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.result\')) as result,DATE_FORMAT(FROM_UNIXTIME(JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.updateTime\'))), \'%Y-%m-%d %H:%i:%s\') as update_time\",\"timeCol\":\"timestamp\",\"timeUnit\":\"second\",\"extras\":\"\",\"filePath\":\"/usr/local/omc/backup/smsc_cdr\"}', '0 0 0/1 * * ?', '3', '0', '1', '1', 'supervisor', 1698478134842, 'admin', 1724309047797, ''); +INSERT INTO `sys_job` VALUES (15, 'job.removeExportedFiles', 'SYSTEM', 'removeFile', '[{\"filePath\":\"/usr/local/omc/backup/operate_log\",\"maxDays\":30},{\"filePath\":\"/usr/local/omc/backup/ims_cdr\",\"maxDays\":30},{\"filePath\":\"/usr/local/omc/backup/smf_cdr\",\"maxDays\":30},{\"filePath\":\"/usr/local/omc/backup/smsc_cdr\",\"maxDays\":30}]', '0 10 0 * * ?', '3', '0', '1', '1', 'supervisor', 1698478134842, 'admin', 1728634085631, ''); SET FOREIGN_KEY_CHECKS = 1; diff --git a/database/upgrade/upg_sys_dict_data1_i18n_zh.sql b/database/upgrade/upg_sys_dict_data1_i18n_zh.sql index 04363e03..397642b0 100644 --- a/database/upgrade/upg_sys_dict_data1_i18n_zh.sql +++ b/database/upgrade/upg_sys_dict_data1_i18n_zh.sql @@ -697,5 +697,6 @@ REPLACE INTO `sys_dict_data` VALUES (2179, 2179, 'dictData.cdr_cause_code.42', ' REPLACE INTO `sys_dict_data` VALUES (2180, 2180, 'dictData.cdr_cause_code.47', '资源不可用', 'i18n_zh', '', '', '1', 'supervisor', 1725877564156, '', 0, ''); REPLACE INTO `sys_dict_data` VALUES (2181, 2181, 'dictData.cdr_cause_code.50', '请求的设施未订阅', 'i18n_zh', '', '', '1', 'supervisor', 1725877564156, '', 0, ''); REPLACE INTO `sys_dict_data` VALUES (2182, 2182, 'job.exportSMSCCDR', '定期从短信话单表导出文件至指定目录', 'i18n_zh', '', '', '1', 'supervisor', 1721902269805, '', 0, ''); +REPLACE INTO `sys_dict_data` VALUES (2183, 2183, 'job.removeExportedFiles', '定期删除指定目录过期文件', 'i18n_zh', '', '', '1', 'supervisor', 1721902269805, '', 0, ''); SET FOREIGN_KEY_CHECKS = 1; diff --git a/database/upgrade/upg_sys_dict_data2_i18n_en.sql b/database/upgrade/upg_sys_dict_data2_i18n_en.sql index 5bb3ed6c..5a71db25 100644 --- a/database/upgrade/upg_sys_dict_data2_i18n_en.sql +++ b/database/upgrade/upg_sys_dict_data2_i18n_en.sql @@ -692,5 +692,6 @@ REPLACE INTO `sys_dict_data` VALUES (4179, 4179, 'dictData.cdr_cause_code.42', ' REPLACE INTO `sys_dict_data` VALUES (4180, 4180, 'dictData.cdr_cause_code.47', 'Resources Unavailable Unspec', 'i18n_en', '', '', '1', 'supervisor', 1725877564156, '', 0, ''); REPLACE INTO `sys_dict_data` VALUES (4181, 4181, 'dictData.cdr_cause_code.50', 'Requested Facility Not Subscribed', 'i18n_en', '', '', '1', 'supervisor', 1725877564156, '', 0, ''); REPLACE INTO `sys_dict_data` VALUES (4182, 4182, 'job.exportSMSCCDR', 'Export regularly from SMSC CDR table', 'i18n_en', '', '', '1', 'supervisor', 1721902269805, '', 0, ''); +REPLACE INTO `sys_dict_data` VALUES (4183, 4183, 'job.removeExportedFiles', 'Regularly delete expired files in the specified directory', 'i18n_en', '', '', '1', 'supervisor', 1721902269805, '', 0, ''); SET FOREIGN_KEY_CHECKS = 1; diff --git a/database/upgrade/upg_sys_job.sql b/database/upgrade/upg_sys_job.sql index df4ca21e..e1293816 100644 --- a/database/upgrade/upg_sys_job.sql +++ b/database/upgrade/upg_sys_job.sql @@ -45,5 +45,6 @@ REPLACE INTO `sys_job` VALUES (11, 'job.exportOperateLog', 'SYSTEM', 'exportTabl REPLACE INTO `sys_job` VALUES (12, 'job.exportIMSCDR', 'SYSTEM', 'exportTable', '{\"duration\":1,\"tableName\":\"cdr_event_ims\",\"columns\":\"id,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.recordType\')) as record_type,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.callType\')) as call_type,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.callerParty\')) as caller_party,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.calledParty\')) as called_party,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.callDuration\')) as call_duration,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.serviceResult\')) as service_result,DATE_FORMAT(FROM_UNIXTIME(timestamp), \'%Y-%m-%d %H:%i:%s\') AS timestamp\",\"timeCol\":\"timestamp\",\"timeUnit\":\"second\",\"extras\":\"\",\"filePath\":\"/usr/local/omc/backup/ims_cdr\"}', '0 0 0/1 * * ?', '3', '0', '1', '1', 'supervisor', 1698478134842, 'admin', 1722224659251, ''); REPLACE INTO `sys_job` VALUES (13, 'job.exportSMFCDR', 'SYSTEM', 'exportTable', '{\"duration\":1,\"tableName\":\"cdr_event_smf\",\"columns\":\"id,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.recordType\')) AS record_type,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.chargingID\')) AS charging_id,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.subscriberIdentifier.subscriptionIDType\')) AS subscriber_id_type,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.subscriberIdentifier.subscriptionIDData\')) AS subscriber_id_data,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.duration\')) AS duration,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.invocationTimestamp\')) as invocationTimestamp,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.listOfMultipleUnitUsage[*].usedUnitContainer[*].dataVolumeUplink\')) AS data_volume_uplink,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.listOfMultipleUnitUsage[*].usedUnitContainer[*].dataVolumeDownlink\')) AS data_volume_downlink,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.listOfMultipleUnitUsage[*].usedUnitContainer[*].dataTotalVolume\')) AS data_total_volume,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.pDUSessionChargingInformation.pDUAddress.pDUIPv4Address\')) AS pdu_ipv4_address,timestamp\",\"timeCol\":\"timestamp\",\"timeUnit\":\"second\",\"extras\":\"\",\"filePath\":\"/usr/local/omc/backup/smf_cdr\"}', '0 0 0/1 * * ?', '3', '0', '1', '1', 'supervisor', 1698478134842, 'admin', 1724309047797, ''); REPLACE INTO `sys_job` VALUES (14, 'job.exportSMSCCDR', 'SYSTEM', 'exportTable', '{\"duration\":1,\"tableName\":\"cdr_event_smsc\",\"columns\":\"id,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.recordType\')) as record_type,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.serviceType\')) as service_type,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.callerParty\')) as caller_party,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.calledParty\')) as called_party,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.result\')) as result,DATE_FORMAT(FROM_UNIXTIME(JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.updateTime\'))), \'%Y-%m-%d %H:%i:%s\') as update_time\",\"timeCol\":\"timestamp\",\"timeUnit\":\"second\",\"extras\":\"\",\"filePath\":\"/usr/local/omc/backup/smsc_cdr\"}', '0 0 0/1 * * ?', '3', '0', '1', '1', 'supervisor', 1698478134842, 'admin', 1724309047797, ''); +REPLACE INTO `sys_job` VALUES (15, 'job.removeExportedFiles', 'SYSTEM', 'removeFile', '[{\"filePath\":\"/usr/local/omc/backup/operate_log\",\"maxDays\":30},{\"filePath\":\"/usr/local/omc/backup/ims_cdr\",\"maxDays\":30},{\"filePath\":\"/usr/local/omc/backup/smf_cdr\",\"maxDays\":30},{\"filePath\":\"/usr/local/omc/backup/smsc_cdr\",\"maxDays\":30}]', '0 10 0 * * ?', '3', '0', '1', '1', 'supervisor', 1698478134842, 'admin', 1728634085631, ''); SET FOREIGN_KEY_CHECKS = 1; \ No newline at end of file diff --git a/src/modules/crontask/processor/processor.go b/src/modules/crontask/processor/processor.go index 09176717..5363e561 100644 --- a/src/modules/crontask/processor/processor.go +++ b/src/modules/crontask/processor/processor.go @@ -10,6 +10,7 @@ import ( "be.ems/src/modules/crontask/processor/getStateFromNE" processorMonitorSysResource "be.ems/src/modules/crontask/processor/monitor_sys_resource" processorNeConfigBackup "be.ems/src/modules/crontask/processor/ne_config_backup" + "be.ems/src/modules/crontask/processor/removeFile" ) // InitCronQueue 初始定时任务队列 @@ -25,4 +26,5 @@ func InitCronQueue() { cron.CreateQueue("getStateFromNE", getStateFromNE.NewProcessor) cron.CreateQueue("genNeStateAlarm", genNeStateAlarm.NewProcessor) cron.CreateQueue("exportTable", exportTable.NewProcessor) + cron.CreateQueue("removeFile", removeFile.NewProcessor) } diff --git a/src/modules/crontask/processor/removeFile/removeFile.go b/src/modules/crontask/processor/removeFile/removeFile.go new file mode 100644 index 00000000..5420b40b --- /dev/null +++ b/src/modules/crontask/processor/removeFile/removeFile.go @@ -0,0 +1,159 @@ +package removeFile + +import ( + "encoding/json" + "os" + "path/filepath" + "sort" + "time" + + "be.ems/lib/log" + "be.ems/src/framework/cron" +) + +var NewProcessor = &BarProcessor{ + progress: 0, + count: 0, +} + +// bar 队列任务处理 +type BarProcessor struct { + // 任务进度 + progress int + // 执行次数 + count int +} + +type BarParams struct { + FilePath string `json:"filePath"` // file path + MaxDays int `json:"maxDays"` + MaxFiles *int `json:"maxFiles"` // keep max files + MaxSize *int64 `json:"maxSize"` + Extras string `json:"extras"` // extras condition for where +} + +type FileInfo struct { + Path string + Info os.FileInfo +} + +func (s *BarProcessor) Execute(data any) (any, error) { + s.count++ + + options := data.(cron.JobData) + sysJob := options.SysJob + var params []BarParams + + err := json.Unmarshal([]byte(sysJob.TargetParams), ¶ms) + if err != nil { + return nil, err + } + result := []map[string]any{} + for _, param := range params { + res, _ := s.ExecuteOne(param) + result = append(result, res) + } + + // 返回结果,用于记录执行结果 + return map[string]any{ + "result": result, + }, nil +} + +func (s *BarProcessor) ExecuteOne(params BarParams) (map[string]any, error) { + var maxFiles int = 0 + var maxSize int64 = 0 + if params.MaxFiles != nil { + maxFiles = *params.MaxFiles + } + if params.MaxSize != nil { + maxSize = int64(*params.MaxSize * 1024 * 1024) + } + files, err := getFiles(params.FilePath) + if err != nil { + return map[string]any{ + "msg": "failed", + "err": err.Error(), + }, err + } + + // 获取本地时区 + loc, err := time.LoadLocation("Local") + if err != nil { + return map[string]any{ + "msg": "failed", + "err": err.Error(), + }, err + } + cutoff := time.Now().In(loc).AddDate(0, 0, -params.MaxDays) + + var oldFiles []FileInfo + for _, file := range files { + if file.Info.ModTime().Before(cutoff) { + oldFiles = append(oldFiles, file) + } + } + + // 按修改时间排序文件(最旧的在前) + sort.Slice(oldFiles, func(i, j int) bool { + return oldFiles[i].Info.ModTime().Before(oldFiles[j].Info.ModTime()) + }) + + deleted, errorDel := 0, 0 + + // 删除文件,直到满足文件总数不超过maxFiles个且总大小不超过maxSize的条件 + var totalSize int64 + for i, file := range oldFiles { + if (maxFiles > 0 && i >= maxFiles) || (maxSize > 0 && totalSize+file.Info.Size() > maxSize) { + break + } + err := os.Remove(file.Path) + if err != nil { + log.Error("Error deleting file:", file.Path, err) + errorDel++ + continue + } + totalSize += file.Info.Size() + deleted++ + } + + // 如果仍然有超过maxFiles个文件或总大小超过maxSize,继续删除最旧的文件 + remainingFiles := files + sort.Slice(remainingFiles, func(i, j int) bool { + return remainingFiles[i].Info.ModTime().Before(remainingFiles[j].Info.ModTime()) + }) + + for (maxFiles > 0 && len(remainingFiles) > maxFiles) || (maxSize > 0 && totalSize > maxSize) { + file := remainingFiles[0] + err := os.Remove(file.Path) + if err != nil { + log.Error("Error deleting file:", file.Path, err) + remainingFiles = remainingFiles[1:] + continue + } + totalSize -= file.Info.Size() + remainingFiles = remainingFiles[1:] + } + + // 返回结果,用于记录执行结果 + return map[string]any{ + "msg": "successed", + "filePath": params.FilePath, + "deleted": deleted, + "errorDel": errorDel, + }, nil +} + +func getFiles(dir string) ([]FileInfo, error) { + var files []FileInfo + err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if !info.IsDir() { + files = append(files, FileInfo{Path: path, Info: info}) + } + return nil + }) + return files, err +}