add: scheduled task to remove exported files
This commit is contained in:
@@ -690,5 +690,6 @@ INSERT INTO `sys_dict_data` VALUES (2179, 2179, 'dictData.cdr_cause_code.42', '
|
||||
INSERT INTO `sys_dict_data` VALUES (2180, 2180, 'dictData.cdr_cause_code.47', '资源不可用', 'i18n_zh', '', '', '1', 'supervisor', 1725877564156, '', 0, '');
|
||||
INSERT INTO `sys_dict_data` VALUES (2181, 2181, 'dictData.cdr_cause_code.50', '请求的设施未订阅', 'i18n_zh', '', '', '1', 'supervisor', 1725877564156, '', 0, '');
|
||||
INSERT INTO `sys_dict_data` VALUES (2182, 2182, 'job.exportSMSCCDR', '定期从短信话单表导出文件至指定目录', 'i18n_zh', '', '', '1', 'supervisor', 1721902269805, '', 0, '');
|
||||
INSERT INTO `sys_dict_data` VALUES (2183, 2183, 'job.removeExportedFiles', '定期删除指定目录过期文件', 'i18n_zh', '', '', '1', 'supervisor', 1721902269805, '', 0, '');
|
||||
|
||||
SET FOREIGN_KEY_CHECKS = 1;
|
||||
|
||||
@@ -690,5 +690,6 @@ INSERT INTO `sys_dict_data` VALUES (4179, 4179, 'dictData.cdr_cause_code.42', 'C
|
||||
INSERT INTO `sys_dict_data` VALUES (4180, 4180, 'dictData.cdr_cause_code.47', 'Resources Unavailable Unspec', 'i18n_en', '', '', '1', 'supervisor', 1725877564156, '', 0, '');
|
||||
INSERT INTO `sys_dict_data` VALUES (4181, 4181, 'dictData.cdr_cause_code.50', 'Requested Facility Not Subscribed', 'i18n_en', '', '', '1', 'supervisor', 1725877564156, '', 0, '');
|
||||
INSERT INTO `sys_dict_data` VALUES (4182, 4182, 'job.exportSMSCCDR', 'Export regularly from SMSC CDR table', 'i18n_en', '', '', '1', 'supervisor', 1721902269805, '', 0, '');
|
||||
INSERT INTO `sys_dict_data` VALUES (4183, 4183, 'job.removeExportedFiles', 'Regularly delete expired files in the specified directory', 'i18n_en', '', '', '1', 'supervisor', 1721902269805, '', 0, '');
|
||||
|
||||
SET FOREIGN_KEY_CHECKS = 1;
|
||||
|
||||
@@ -41,5 +41,6 @@ INSERT INTO `sys_job` VALUES (11, 'job.exportOperateLog', 'SYSTEM', 'exportTable
|
||||
INSERT INTO `sys_job` VALUES (12, 'job.exportIMSCDR', 'SYSTEM', 'exportTable', '{\"duration\":1,\"tableName\":\"cdr_event_ims\",\"columns\":\"id,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.recordType\')) as record_type,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.callType\')) as call_type,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.callerParty\')) as caller_party,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.calledParty\')) as called_party,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.callDuration\')) as call_duration,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.serviceResult\')) as service_result,DATE_FORMAT(FROM_UNIXTIME(timestamp), \'%Y-%m-%d %H:%i:%s\') AS timestamp\",\"timeCol\":\"timestamp\",\"timeUnit\":\"second\",\"extras\":\"\",\"filePath\":\"/usr/local/omc/backup/ims_cdr\"}', '0 0 0/1 * * ?', '3', '0', '1', '1', 'supervisor', 1698478134842, 'admin', 1722224659251, '');
|
||||
INSERT INTO `sys_job` VALUES (13, 'job.exportSMFCDR', 'SYSTEM', 'exportTable', '{\"duration\":1,\"tableName\":\"cdr_event_smf\",\"columns\":\"id,ne_type,ne_name,rm_uid,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.recordType\')) AS record_type,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.chargingID\')) AS charging_id,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.subscriberIdentifier.subscriptionIDType\')) AS subscriber_id_type,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.subscriberIdentifier.subscriptionIDData\')) AS subscriber_id_data,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.duration\')) AS duration,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.invocationTimestamp\')) as invocationTimestamp,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.listOfMultipleUnitUsage[*].usedUnitContainer[*].dataVolumeUplink\')) AS data_volume_uplink,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.listOfMultipleUnitUsage[*].usedUnitContainer[*].dataVolumeDownlink\')) AS data_volume_downlink,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.listOfMultipleUnitUsage[*].usedUnitContainer[*].dataTotalVolume\')) AS data_total_volume,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.pDUSessionChargingInformation.pDUAddress.pDUIPv4Address\')) AS pdu_ipv4_address,timestamp\",\"timeCol\":\"timestamp\",\"timeUnit\":\"second\",\"extras\":\"\",\"filePath\":\"/usr/local/omc/backup/smf_cdr\"}', '0 0 0/1 * * ?', '3', '0', '1', '1', 'supervisor', 1698478134842, 'admin', 1724309047797, '');
|
||||
INSERT INTO `sys_job` VALUES (14, 'job.exportSMSCCDR', 'SYSTEM', 'exportTable', '{\"duration\":1,\"tableName\":\"cdr_event_smsc\",\"columns\":\"id,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.recordType\')) as record_type,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.serviceType\')) as service_type,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.callerParty\')) as caller_party,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.calledParty\')) as called_party,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.result\')) as result,DATE_FORMAT(FROM_UNIXTIME(JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.updateTime\'))), \'%Y-%m-%d %H:%i:%s\') as update_time\",\"timeCol\":\"timestamp\",\"timeUnit\":\"second\",\"extras\":\"\",\"filePath\":\"/usr/local/omc/backup/smsc_cdr\"}', '0 0 0/1 * * ?', '3', '0', '1', '1', 'supervisor', 1698478134842, 'admin', 1724309047797, '');
|
||||
INSERT INTO `sys_job` VALUES (15, 'job.removeExportedFiles', 'SYSTEM', 'removeFile', '[{\"filePath\":\"/usr/local/omc/backup/operate_log\",\"maxDays\":30},{\"filePath\":\"/usr/local/omc/backup/ims_cdr\",\"maxDays\":30},{\"filePath\":\"/usr/local/omc/backup/smf_cdr\",\"maxDays\":30},{\"filePath\":\"/usr/local/omc/backup/smsc_cdr\",\"maxDays\":30}]', '0 10 0 * * ?', '3', '0', '1', '1', 'supervisor', 1698478134842, 'admin', 1728634085631, '');
|
||||
|
||||
SET FOREIGN_KEY_CHECKS = 1;
|
||||
|
||||
@@ -697,5 +697,6 @@ REPLACE INTO `sys_dict_data` VALUES (2179, 2179, 'dictData.cdr_cause_code.42', '
|
||||
REPLACE INTO `sys_dict_data` VALUES (2180, 2180, 'dictData.cdr_cause_code.47', '资源不可用', 'i18n_zh', '', '', '1', 'supervisor', 1725877564156, '', 0, '');
|
||||
REPLACE INTO `sys_dict_data` VALUES (2181, 2181, 'dictData.cdr_cause_code.50', '请求的设施未订阅', 'i18n_zh', '', '', '1', 'supervisor', 1725877564156, '', 0, '');
|
||||
REPLACE INTO `sys_dict_data` VALUES (2182, 2182, 'job.exportSMSCCDR', '定期从短信话单表导出文件至指定目录', 'i18n_zh', '', '', '1', 'supervisor', 1721902269805, '', 0, '');
|
||||
REPLACE INTO `sys_dict_data` VALUES (2183, 2183, 'job.removeExportedFiles', '定期删除指定目录过期文件', 'i18n_zh', '', '', '1', 'supervisor', 1721902269805, '', 0, '');
|
||||
|
||||
SET FOREIGN_KEY_CHECKS = 1;
|
||||
|
||||
@@ -692,5 +692,6 @@ REPLACE INTO `sys_dict_data` VALUES (4179, 4179, 'dictData.cdr_cause_code.42', '
|
||||
REPLACE INTO `sys_dict_data` VALUES (4180, 4180, 'dictData.cdr_cause_code.47', 'Resources Unavailable Unspec', 'i18n_en', '', '', '1', 'supervisor', 1725877564156, '', 0, '');
|
||||
REPLACE INTO `sys_dict_data` VALUES (4181, 4181, 'dictData.cdr_cause_code.50', 'Requested Facility Not Subscribed', 'i18n_en', '', '', '1', 'supervisor', 1725877564156, '', 0, '');
|
||||
REPLACE INTO `sys_dict_data` VALUES (4182, 4182, 'job.exportSMSCCDR', 'Export regularly from SMSC CDR table', 'i18n_en', '', '', '1', 'supervisor', 1721902269805, '', 0, '');
|
||||
REPLACE INTO `sys_dict_data` VALUES (4183, 4183, 'job.removeExportedFiles', 'Regularly delete expired files in the specified directory', 'i18n_en', '', '', '1', 'supervisor', 1721902269805, '', 0, '');
|
||||
|
||||
SET FOREIGN_KEY_CHECKS = 1;
|
||||
|
||||
@@ -45,5 +45,6 @@ REPLACE INTO `sys_job` VALUES (11, 'job.exportOperateLog', 'SYSTEM', 'exportTabl
|
||||
REPLACE INTO `sys_job` VALUES (12, 'job.exportIMSCDR', 'SYSTEM', 'exportTable', '{\"duration\":1,\"tableName\":\"cdr_event_ims\",\"columns\":\"id,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.recordType\')) as record_type,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.callType\')) as call_type,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.callerParty\')) as caller_party,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.calledParty\')) as called_party,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.callDuration\')) as call_duration,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.serviceResult\')) as service_result,DATE_FORMAT(FROM_UNIXTIME(timestamp), \'%Y-%m-%d %H:%i:%s\') AS timestamp\",\"timeCol\":\"timestamp\",\"timeUnit\":\"second\",\"extras\":\"\",\"filePath\":\"/usr/local/omc/backup/ims_cdr\"}', '0 0 0/1 * * ?', '3', '0', '1', '1', 'supervisor', 1698478134842, 'admin', 1722224659251, '');
|
||||
REPLACE INTO `sys_job` VALUES (13, 'job.exportSMFCDR', 'SYSTEM', 'exportTable', '{\"duration\":1,\"tableName\":\"cdr_event_smf\",\"columns\":\"id,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.recordType\')) AS record_type,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.chargingID\')) AS charging_id,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.subscriberIdentifier.subscriptionIDType\')) AS subscriber_id_type,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.subscriberIdentifier.subscriptionIDData\')) AS subscriber_id_data,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.duration\')) AS duration,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.invocationTimestamp\')) as invocationTimestamp,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.listOfMultipleUnitUsage[*].usedUnitContainer[*].dataVolumeUplink\')) AS data_volume_uplink,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.listOfMultipleUnitUsage[*].usedUnitContainer[*].dataVolumeDownlink\')) AS data_volume_downlink,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.listOfMultipleUnitUsage[*].usedUnitContainer[*].dataTotalVolume\')) AS data_total_volume,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.pDUSessionChargingInformation.pDUAddress.pDUIPv4Address\')) AS pdu_ipv4_address,timestamp\",\"timeCol\":\"timestamp\",\"timeUnit\":\"second\",\"extras\":\"\",\"filePath\":\"/usr/local/omc/backup/smf_cdr\"}', '0 0 0/1 * * ?', '3', '0', '1', '1', 'supervisor', 1698478134842, 'admin', 1724309047797, '');
|
||||
REPLACE INTO `sys_job` VALUES (14, 'job.exportSMSCCDR', 'SYSTEM', 'exportTable', '{\"duration\":1,\"tableName\":\"cdr_event_smsc\",\"columns\":\"id,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.recordType\')) as record_type,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.serviceType\')) as service_type,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.callerParty\')) as caller_party,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.calledParty\')) as called_party,JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.result\')) as result,DATE_FORMAT(FROM_UNIXTIME(JSON_UNQUOTE(JSON_EXTRACT(cdr_json,\'$.updateTime\'))), \'%Y-%m-%d %H:%i:%s\') as update_time\",\"timeCol\":\"timestamp\",\"timeUnit\":\"second\",\"extras\":\"\",\"filePath\":\"/usr/local/omc/backup/smsc_cdr\"}', '0 0 0/1 * * ?', '3', '0', '1', '1', 'supervisor', 1698478134842, 'admin', 1724309047797, '');
|
||||
REPLACE INTO `sys_job` VALUES (15, 'job.removeExportedFiles', 'SYSTEM', 'removeFile', '[{\"filePath\":\"/usr/local/omc/backup/operate_log\",\"maxDays\":30},{\"filePath\":\"/usr/local/omc/backup/ims_cdr\",\"maxDays\":30},{\"filePath\":\"/usr/local/omc/backup/smf_cdr\",\"maxDays\":30},{\"filePath\":\"/usr/local/omc/backup/smsc_cdr\",\"maxDays\":30}]', '0 10 0 * * ?', '3', '0', '1', '1', 'supervisor', 1698478134842, 'admin', 1728634085631, '');
|
||||
|
||||
SET FOREIGN_KEY_CHECKS = 1;
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"be.ems/src/modules/crontask/processor/getStateFromNE"
|
||||
processorMonitorSysResource "be.ems/src/modules/crontask/processor/monitor_sys_resource"
|
||||
processorNeConfigBackup "be.ems/src/modules/crontask/processor/ne_config_backup"
|
||||
"be.ems/src/modules/crontask/processor/removeFile"
|
||||
)
|
||||
|
||||
// InitCronQueue 初始定时任务队列
|
||||
@@ -25,4 +26,5 @@ func InitCronQueue() {
|
||||
cron.CreateQueue("getStateFromNE", getStateFromNE.NewProcessor)
|
||||
cron.CreateQueue("genNeStateAlarm", genNeStateAlarm.NewProcessor)
|
||||
cron.CreateQueue("exportTable", exportTable.NewProcessor)
|
||||
cron.CreateQueue("removeFile", removeFile.NewProcessor)
|
||||
}
|
||||
|
||||
159
src/modules/crontask/processor/removeFile/removeFile.go
Normal file
159
src/modules/crontask/processor/removeFile/removeFile.go
Normal file
@@ -0,0 +1,159 @@
|
||||
package removeFile
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"be.ems/lib/log"
|
||||
"be.ems/src/framework/cron"
|
||||
)
|
||||
|
||||
var NewProcessor = &BarProcessor{
|
||||
progress: 0,
|
||||
count: 0,
|
||||
}
|
||||
|
||||
// bar 队列任务处理
|
||||
type BarProcessor struct {
|
||||
// 任务进度
|
||||
progress int
|
||||
// 执行次数
|
||||
count int
|
||||
}
|
||||
|
||||
type BarParams struct {
|
||||
FilePath string `json:"filePath"` // file path
|
||||
MaxDays int `json:"maxDays"`
|
||||
MaxFiles *int `json:"maxFiles"` // keep max files
|
||||
MaxSize *int64 `json:"maxSize"`
|
||||
Extras string `json:"extras"` // extras condition for where
|
||||
}
|
||||
|
||||
type FileInfo struct {
|
||||
Path string
|
||||
Info os.FileInfo
|
||||
}
|
||||
|
||||
func (s *BarProcessor) Execute(data any) (any, error) {
|
||||
s.count++
|
||||
|
||||
options := data.(cron.JobData)
|
||||
sysJob := options.SysJob
|
||||
var params []BarParams
|
||||
|
||||
err := json.Unmarshal([]byte(sysJob.TargetParams), ¶ms)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result := []map[string]any{}
|
||||
for _, param := range params {
|
||||
res, _ := s.ExecuteOne(param)
|
||||
result = append(result, res)
|
||||
}
|
||||
|
||||
// 返回结果,用于记录执行结果
|
||||
return map[string]any{
|
||||
"result": result,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *BarProcessor) ExecuteOne(params BarParams) (map[string]any, error) {
|
||||
var maxFiles int = 0
|
||||
var maxSize int64 = 0
|
||||
if params.MaxFiles != nil {
|
||||
maxFiles = *params.MaxFiles
|
||||
}
|
||||
if params.MaxSize != nil {
|
||||
maxSize = int64(*params.MaxSize * 1024 * 1024)
|
||||
}
|
||||
files, err := getFiles(params.FilePath)
|
||||
if err != nil {
|
||||
return map[string]any{
|
||||
"msg": "failed",
|
||||
"err": err.Error(),
|
||||
}, err
|
||||
}
|
||||
|
||||
// 获取本地时区
|
||||
loc, err := time.LoadLocation("Local")
|
||||
if err != nil {
|
||||
return map[string]any{
|
||||
"msg": "failed",
|
||||
"err": err.Error(),
|
||||
}, err
|
||||
}
|
||||
cutoff := time.Now().In(loc).AddDate(0, 0, -params.MaxDays)
|
||||
|
||||
var oldFiles []FileInfo
|
||||
for _, file := range files {
|
||||
if file.Info.ModTime().Before(cutoff) {
|
||||
oldFiles = append(oldFiles, file)
|
||||
}
|
||||
}
|
||||
|
||||
// 按修改时间排序文件(最旧的在前)
|
||||
sort.Slice(oldFiles, func(i, j int) bool {
|
||||
return oldFiles[i].Info.ModTime().Before(oldFiles[j].Info.ModTime())
|
||||
})
|
||||
|
||||
deleted, errorDel := 0, 0
|
||||
|
||||
// 删除文件,直到满足文件总数不超过maxFiles个且总大小不超过maxSize的条件
|
||||
var totalSize int64
|
||||
for i, file := range oldFiles {
|
||||
if (maxFiles > 0 && i >= maxFiles) || (maxSize > 0 && totalSize+file.Info.Size() > maxSize) {
|
||||
break
|
||||
}
|
||||
err := os.Remove(file.Path)
|
||||
if err != nil {
|
||||
log.Error("Error deleting file:", file.Path, err)
|
||||
errorDel++
|
||||
continue
|
||||
}
|
||||
totalSize += file.Info.Size()
|
||||
deleted++
|
||||
}
|
||||
|
||||
// 如果仍然有超过maxFiles个文件或总大小超过maxSize,继续删除最旧的文件
|
||||
remainingFiles := files
|
||||
sort.Slice(remainingFiles, func(i, j int) bool {
|
||||
return remainingFiles[i].Info.ModTime().Before(remainingFiles[j].Info.ModTime())
|
||||
})
|
||||
|
||||
for (maxFiles > 0 && len(remainingFiles) > maxFiles) || (maxSize > 0 && totalSize > maxSize) {
|
||||
file := remainingFiles[0]
|
||||
err := os.Remove(file.Path)
|
||||
if err != nil {
|
||||
log.Error("Error deleting file:", file.Path, err)
|
||||
remainingFiles = remainingFiles[1:]
|
||||
continue
|
||||
}
|
||||
totalSize -= file.Info.Size()
|
||||
remainingFiles = remainingFiles[1:]
|
||||
}
|
||||
|
||||
// 返回结果,用于记录执行结果
|
||||
return map[string]any{
|
||||
"msg": "successed",
|
||||
"filePath": params.FilePath,
|
||||
"deleted": deleted,
|
||||
"errorDel": errorDel,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func getFiles(dir string) ([]FileInfo, error) {
|
||||
var files []FileInfo
|
||||
err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !info.IsDir() {
|
||||
files = append(files, FileInfo{Path: path, Info: info})
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return files, err
|
||||
}
|
||||
Reference in New Issue
Block a user