package Nmysql import ( . "proxy/MsgDef" rds "proxy/Nredis" //"strconv" "database/sql" _ "github.com/go-sql-driver/mysql" "os" l4g "proxy/logger" //"github.com/sirupsen/logrus" "time" ) var MySqlDb *sql.DB func OpenMysql(userName, passwd, addr string) error { var err error MySqlDb, err = sql.Open("mysql", userName + ":" + passwd + "@tcp(" + addr + ")/boss") if err == nil { /*bSyn, _ := rds.CheckIfNeedToSyncDb() if bSyn { //l4g.MysqlLog.Infoln("start to sync mysql DB.") //LoadAcctTblFromMysql() //l4g.MysqlLog.Infoln("finish sync mysql DB...") } else {*/ l4g.MysqlLog.Infoln("do not sync mysql DB.") //} //go TriggerEventProcess() return nil } else { l4g.MysqlLog.Errorln("connect mysql DB err: %v", err) } os.Exit(1) return err } func LoadAlertSmsFromMysql() error { queryStr := "select sms_alert_id, sms_called_number, sms_content from tb_sms_info where sms_state=1;" rows, err := MySqlDb.Query(queryStr) if err != nil { l4g.MysqlLog.Errorf("query tb_sms_info table fail: %v", err) return err } defer rows.Close() var count int = 0 var alertId int var serviceNbr, smsContent string for rows.Next(){ if err = rows.Scan(&alertId, &serviceNbr, &smsContent); err != nil { l4g.MysqlLog.Errorf("query row of tb_sms_info fail: %v", err) //_ = rows.Close() return err } err = rds.RdbSetAlertSmsRecord(alertId, serviceNbr, smsContent) if err != nil { l4g.MysqlLog.Errorf("Insert redis tb_sms_info, alertId[%d], msisdn[%s], sms[%s] fail: %v", alertId, serviceNbr, smsContent, err) } count++ //l4g.MysqlLog.Debugf("Insert redis tb_sms_info, alertId[%d], msisdn[%s], sms[%s] succ", alertId, serviceNbr, smsContent) } //_ = rows.Close() l4g.MysqlLog.Debugf("total tb_sms_info num: %d", count) return err } func LoadCreateAcctFromMysql() error { queryStr := "select PRE_ID, OPER_TYPE, IMSI, SERVICE_NBR, KI, OPC, CUST_ID, ACCT_ID, PRD_INST_ID, MOBILE_TYPE, VMS_FLAG, BIRTH_DATE, BALANCE, BALANCE_EXP_DATE, OFR_ID, EXP_DATE, CUG_ID " queryStr += "from tb_sync_mobile where STATE=1 AND OPER_TYPE=1 AND MOBILE_TYPE=1;" rows, err := MySqlDb.Query(queryStr) if err != nil { l4g.MysqlLog.Errorf("query tb_sync_mobile table fail: %v", err) return err } defer rows.Close() var count int = 0 for rows.Next(){ var ca ChgSyncMobile if err = rows.Scan(&ca.PreId, &ca.OperType, &ca.Imsi, &ca.ServiceNbr, &ca.Ki, &ca.Opc, &ca.CustId, &ca.AcctId, &ca.PrdInstId, &ca.MobileType, &ca.VmsFlag, &ca.BirthDate, &ca.Balance, &ca.BalanceExpDate, &ca.OfrId, &ca.ExpDate, &ca.CugId); err != nil { l4g.MysqlLog.Errorf("query row of tb_sync_mobile fail: %v", err) return err } err = rds.RdbSetCreateAcctRecord(&ca) if err != nil { l4g.MysqlLog.Errorf("Insert redis tb_sync_mobile, msisdn[%s], fail: %v", ca.ServiceNbr, err) } count++ //l4g.MysqlLog.Debugf("Insert redis tb_sync_mobile, msisdn[%s] succ", ca.ServiceNbr) } l4g.MysqlLog.Debugf("total tb_sync_mobile num: %d", count) return err } func LoadAcctTblFromMysql() error { queryStr := "SELECT p.PRD_INST_ID,p.PRD_ID AS acct_type,p.SERVICE_NBR,p.PRD_INST_STAS_ID,p.ACCT_ID,p.OWN_CUST_ID,b.ofr_id, a.EFF_DATE, a.EXP_DATE, b.OFR_INST_ID " queryStr += "FROM tb_prd_prd_inst_551 p " queryStr += "INNER JOIN tb_prd_ofr_detail_inst_551 a ON a.ofr_detail_inst_ref_id = p.PRD_INST_ID " queryStr += "INNER JOIN tb_prd_ofr_inst_551 b ON a.ofr_inst_id = b.ofr_inst_id " queryStr += "INNER JOIN tb_prd_ofr c ON b.ofr_id = c.ofr_id " queryStr += "WHERE a.OFR_DETAIL_TYPE_ID = 'A1' AND p.IF_PREPAY = 1 AND p.PRD_INST_STAS_ID = 1001 AND c.ofr_type_id = '1' "// IN (1001, 1202, 1205) " queryStr += "AND a.EXP_DATE > SYSDATE() AND a.EFF_DATE < SYSDATE();" rows, err := MySqlDb.Query(queryStr) if err != nil { l4g.MysqlLog.Errorf("query acct table fail: %v", err) return err } //defer rows.Close() usrList := make([]AcctData, 0, 1) var count int = 0 var prdInstId, acctType, prdInstStasId, acctId, custId, ofrId, ofrInstId int var serviceNbr, effDate, expDate string ofrIdSet := make(map[int]void) for rows.Next(){ if err = rows.Scan(&prdInstId, &acctType, &serviceNbr, &prdInstStasId, &acctId, &custId, &ofrId, &effDate, &expDate, &ofrInstId); err != nil { l4g.MysqlLog.Errorf("query row of acct fail: %v", err) _ = rows.Close() return err } // insert redis db: acct table acctData := AcctData{PrdInstId: prdInstId, AcctType: acctType, ServiceNbr: serviceNbr, PrdInstStasId: prdInstStasId, AcctId: acctId, CustId: custId, OfrId: ofrId, EffTime: effDate, ExpTime: expDate, OfrInstId: ofrInstId} err = rds.RdbSetAcctRecord(&acctData) if err != nil { l4g.MysqlLog.Errorf("Insert redis record[%v] fail: %v", acctData, err) } ofrIdSet[acctData.OfrId] = member usrList = append(usrList, acctData) count++ //l4g.Debugf("name: %s, code: %s", name, code) } _ = rows.Close() for k := range ofrIdSet { insertRdbTariffByOfrId(k) } var serviceNbrs string for _, v := range usrList { // insert redis db: rr table by prd_inst_id if serviceNbrs == "" { serviceNbrs = "'" + v.ServiceNbr + "'" } else { serviceNbrs += "," + "'" + v.ServiceNbr + "'" } // insertRdbRrByServiceNbr(v.ServiceNbr, v.PrdInstId, false) } if serviceNbrs != "" { insertRdbRrByServiceNbrIn(serviceNbrs, true) } l4g.MysqlLog.Debugf("total acct num: %d", count) return err } func LoadOneAcctFromMysql(msisdn string) error { queryStr := "SELECT p.PRD_INST_ID,p.PRD_ID AS acct_type,p.SERVICE_NBR,p.PRD_INST_STAS_ID,p.ACCT_ID,p.OWN_CUST_ID,b.ofr_id, a.EFF_DATE, a.EXP_DATE, b.OFR_INST_ID " queryStr += "FROM tb_prd_prd_inst_551 p " queryStr += "INNER JOIN tb_prd_ofr_detail_inst_551 a ON a.ofr_detail_inst_ref_id = p.PRD_INST_ID " queryStr += "INNER JOIN tb_prd_ofr_inst_551 b ON a.ofr_inst_id = b.ofr_inst_id " queryStr += "INNER JOIN tb_prd_ofr c ON b.ofr_id = c.ofr_id " queryStr += "WHERE a.OFR_DETAIL_TYPE_ID = 'A1' AND p.IF_PREPAY = 1 AND p.PRD_INST_STAS_ID = 1001 AND c.ofr_type_id = '1' "// IN (1001, 1202, 1205) " queryStr += "AND p.SERVICE_NBR=" + msisdn + " AND a.EXP_DATE > SYSDATE() AND a.EFF_DATE < SYSDATE();" rows, err := MySqlDb.Query(queryStr) if err != nil { l4g.MysqlLog.Errorf("query acct table fail: %v", err) return err } //defer rows.Close() usrList := make([]AcctData, 0, 1) var count int = 0 var prdInstId, acctType, prdInstStasId, acctId, custId, ofrId, ofrInstId int var serviceNbr, effDate, expDate string ofrIdSet := make(map[int]void) for rows.Next(){ if err = rows.Scan(&prdInstId, &acctType, &serviceNbr, &prdInstStasId, &acctId, &custId, &ofrId, &effDate, &expDate, &ofrInstId); err != nil { l4g.MysqlLog.Errorf("query row of acct fail: %v", err) _ = rows.Close() return err } // insert redis db: acct table acctData := AcctData{PrdInstId: prdInstId, AcctType: acctType, ServiceNbr: serviceNbr, PrdInstStasId: prdInstStasId, AcctId: acctId, CustId: custId, OfrId: ofrId, EffTime: effDate, ExpTime: expDate, OfrInstId: ofrInstId} err = rds.RdbSetAcctRecord(&acctData) if err != nil { l4g.MysqlLog.Errorf("Insert redis record[%v] fail: %v", acctData, err) } ofrIdSet[acctData.OfrId] = member usrList = append(usrList, acctData) count++ //l4g.Debugf("name: %s, code: %s", name, code) } _ = rows.Close() for k := range ofrIdSet { insertRdbTariffByOfrId(k) } var serviceNbrs string for _, v := range usrList { // insert redis db: rr table by prd_inst_id if serviceNbrs == "" { serviceNbrs = "'" + v.ServiceNbr + "'" } else { serviceNbrs += "," + "'" + v.ServiceNbr + "'" } // insertRdbRrByServiceNbr(v.ServiceNbr, v.PrdInstId, false) } if serviceNbrs != "" { insertRdbRrByServiceNbrIn(serviceNbrs, true) } l4g.MysqlLog.Debugf("total acct num: %d", count) return err } func updateRdbAcctTblByOfrDetailInstId(synType int, ofrDetailInstId int) (int, error) { queryStr := "SELECT p.PRD_INST_ID,p.PRD_ID AS acct_type,p.SERVICE_NBR,p.PRD_INST_STAS_ID,p.ACCT_ID,p.OWN_CUST_ID,b.ofr_id,a.EFF_DATE,a.EXP_DATE, b.OFR_INST_ID, c.OFR_TYPE_ID " queryStr += "FROM tb_prd_prd_inst_551 p " queryStr += "INNER JOIN tb_prd_ofr_detail_inst_551 a ON a.ofr_detail_inst_ref_id = p.PRD_INST_ID " queryStr += "INNER JOIN tb_prd_ofr_inst_551 b ON a.ofr_inst_id = b.ofr_inst_id " queryStr += "INNER JOIN tb_prd_ofr c ON b.ofr_id = c.ofr_id " queryStr += "WHERE a.OFR_DETAIL_TYPE_ID = 'A1' AND a.ofr_detail_inst_id = ? " queryStr += "AND p.IF_PREPAY = 1 and p.PRD_INST_STAS_ID IN (1001) "//AND c.ofr_type_id = '1' "// , 1202, 1205, 1101);" queryStr += "AND a.EXP_DATE > SYSDATE();"// AND a.EFF_DATE < SYSDATE() stmt, _ := MySqlDb.Prepare(queryStr) defer stmt.Close() rows, err := stmt.Query(ofrDetailInstId) if err != nil { l4g.MysqlLog.Errorf("Query acct data by ofr_detail_inst_id[%d] error: %v", ofrDetailInstId, err) return 0, err } defer rows.Close() var acctData AcctData var count int = 0 var prdInstId, acctType, prdInstStasId, acctId, custId, ofrId, ofrInstId int var serviceNbr, effDate, expDate, ofrTypeId string for rows.Next(){ if err = rows.Scan(&prdInstId, &acctType, &serviceNbr, &prdInstStasId, &acctId, &custId, &ofrId, &effDate, &expDate, &ofrInstId, &ofrTypeId); err != nil { l4g.MysqlLog.Errorf("query row of acct fail: %s", err) return 0, err } if ofrTypeId == "1" { // insert redis db: acct table acctData = AcctData{PrdInstId: prdInstId, AcctType: acctType, ServiceNbr: serviceNbr, PrdInstStasId: prdInstStasId, AcctId: acctId, CustId: custId, OfrId: ofrId, EffTime: effDate, ExpTime: expDate, OfrInstId: ofrInstId} if synType == 3 { _ = rds.RdbDelAcctOfr(serviceNbr, ofrId) } else { err = rds.RdbSetAcctRecord(&acctData) if err != nil { l4g.MysqlLog.Errorf("Insert redis record[%v] fail: %v", acctData, err) } } } // insert redis db: tariff table by ofr_id //insert_rdb_tariff_by_ofr_id(ofr_id) // insert redis db: rr table by prd_inst_id //insert_rdb_rr_by_service_nbr(service_nbr, prd_inst_id) count++ //l4g.Debugf("name: %s, code: %s", name, code) } l4g.MysqlLog.Debugf("total acct num: %d", count) return count, err } func insertRdbTariffByOfrId(ofrId int) error { // check if already exist if rds.RdbCheckIfOfrExist(ofrId) { return nil } // set ofr Id err := rds.RdbSetOfrRecord(ofrId) if err != nil { return err } // query tariff queryStr := "SELECT DISTINCT " queryStr += "g.TARIFF_ID, g.TARIFF_SEQ, g.MEASURE_DOMAIN, time_to_sec(g.START_REF_VALUE) startTime, time_to_sec(g.END_REF_VALUE) endTime, " queryStr += "g.SCALED_RATE_VALUE_ID feeUnit, g.rate_unit rateUnit, c.CALC_PRIORITY,h.EVENT_PRIORITY,h.event_pricing_strategy_id," queryStr += "h.EVENT_PRICING_STRATEGY_NAME,c.ofr_id,c.OFR_NAME " queryStr += "FROM " queryStr += "tb_prd_ofr c " queryStr += "INNER JOIN tb_bil_pricing_combine_group d ON c.pricing_plan_id = d.pricing_plan_id " queryStr += "INNER JOIN tb_bil_pricing_combine_gp_list e ON d.pricing_combine_group_id = e.pricing_combine_group_id " queryStr += "INNER JOIN tb_bil_pricing_section f ON e.event_pricing_strategy_id = f.event_pricing_strategy_id AND f.TARIFF_ID IS NOT NULL " queryStr += "INNER JOIN tb_bil_tariff g ON f.pricing_section_id = g.pricing_section_id " queryStr += "INNER JOIN tb_bil_evt_pricing_strategy h ON e.event_pricing_strategy_id = h.event_pricing_strategy_id " queryStr += "LEFT JOIN tb_bil_strategy_resource_rel sr ON sr.EVENT_PRICING_STRATEGY_ID=h.EVENT_PRICING_STRATEGY_ID " queryStr += "WHERE g.MEASURE_DOMAIN in('01','02','03') " queryStr += "AND c.ofr_type_id = '1' " queryStr += "AND c.ofr_id = ? " queryStr += "AND sr.STRATEGY_RATABLE_RELATION_ID IS NULL;" stmt, _ := MySqlDb.Prepare(queryStr) defer stmt.Close() rows, err := stmt.Query(ofrId) if err != nil { l4g.MysqlLog.Errorf("Query tariff data of ofr_id[%d] error: %v", ofrId, err) return err } //defer rows.Close() tList := make([]TariffData, 0, 1) var count int = 0 for rows.Next() { var tariff TariffData err = rows.Scan(&tariff.TariffId, &tariff.TariffSeq, &tariff.MeasureDomain, &tariff.StartTime, &tariff.EndTime, &tariff.FeeUnit, &tariff.RateUnit, &tariff.CalcPriority, &tariff.EventPriority, &tariff.StrategyId, &tariff.StrategyName, &tariff.OfrId, &tariff.OfrName) if err != nil { l4g.MysqlLog.Errorf("Scan data error: %v", err) break } else { tList = append(tList, tariff) count++ l4g.MysqlLog.Debugf("Get tariff info: %v", tariff) //return nil } } _ = rows.Close() for _, v := range tList { rds.RdbSetTariffRecord(&v) insertRdbPrefixRecord(v.StrategyId) insertRdbHolidayDiscountRecord(v.TariffId, v.TariffSeq) } l4g.MysqlLog.Debugf("Get tariff num[%d]!", count) return nil } func updateRdbTariffByTariffId(tariffId int) error { // query tariff queryStr := "SELECT DISTINCT " queryStr += "g.TARIFF_ID, g.MEASURE_DOMAIN, time_to_sec(g.START_REF_VALUE) startTime, time_to_sec(g.END_REF_VALUE) endTime, " queryStr += "g.SCALED_RATE_VALUE_ID feeUnit, g.rate_unit rateUnit, c.CALC_PRIORITY,h.EVENT_PRIORITY,h.event_pricing_strategy_id," queryStr += "h.EVENT_PRICING_STRATEGY_NAME,c.ofr_id,c.OFR_NAME " queryStr += "FROM " queryStr += "tb_prd_ofr c " queryStr += "INNER JOIN tb_bil_pricing_combine_group d ON c.pricing_plan_id = d.pricing_plan_id " queryStr += "INNER JOIN tb_bil_pricing_combine_gp_list e ON d.pricing_combine_group_id = e.pricing_combine_group_id " queryStr += "INNER JOIN tb_bil_pricing_section f ON e.event_pricing_strategy_id = f.event_pricing_strategy_id AND f.TARIFF_ID IS NOT NULL " queryStr += "INNER JOIN tb_bil_tariff g ON f.pricing_section_id = g.pricing_section_id " queryStr += "INNER JOIN tb_bil_evt_pricing_strategy h ON e.event_pricing_strategy_id = h.event_pricing_strategy_id " queryStr += "LEFT JOIN tb_bil_strategy_resource_rel sr ON sr.EVENT_PRICING_STRATEGY_ID=h.EVENT_PRICING_STRATEGY_ID " queryStr += "WHERE g.MEASURE_DOMAIN in('01','02','03') " queryStr += "AND c.ofr_type_id = '1' AND c.OFR_STATE = '2' " queryStr += "AND g.TARIFF_ID = ? " queryStr += "AND sr.STRATEGY_RATABLE_RELATION_ID IS NULL;" stmt, _ := MySqlDb.Prepare(queryStr) defer stmt.Close() rows, err := stmt.Query(tariffId) defer rows.Close() if err != nil { l4g.MysqlLog.Errorf("Query tariff data by tariff_id[%d] error: %v", tariffId, err) return err } var count int = 0 var tariff TariffData for rows.Next() { err = rows.Scan(&tariff.TariffId, &tariff.MeasureDomain, &tariff.StartTime, &tariff.EndTime, &tariff.FeeUnit, &tariff.RateUnit, &tariff.CalcPriority, &tariff.EventPriority, &tariff.StrategyId, &tariff.StrategyName, &tariff.OfrId, &tariff.OfrName) if err != nil { l4g.MysqlLog.Errorf("Scan data error: %v", err) break } else { rds.RdbSetTariffRecord(&tariff) insertRdbPrefixRecord(tariff.StrategyId) count++ l4g.MysqlLog.Debugf("Get tariff info: %v", tariff) break //return nil } } l4g.MysqlLog.Debugf("Get tariff num[%d]!", count) // check if already exist if rds.RdbCheckIfOfrExist(tariff.OfrId) { return nil } // set ofr Id err = rds.RdbSetOfrRecord(tariff.OfrId) if err != nil { return err } return nil } func updateRdbTariffByOfrId(ofrId int) error { // query tariff queryStr := "SELECT DISTINCT " queryStr += "g.TARIFF_ID, g.MEASURE_DOMAIN, time_to_sec(g.START_REF_VALUE) startTime, time_to_sec(g.END_REF_VALUE) endTime, " queryStr += "g.SCALED_RATE_VALUE_ID feeUnit, g.rate_unit rateUnit, c.CALC_PRIORITY,h.EVENT_PRIORITY,h.event_pricing_strategy_id," queryStr += "h.EVENT_PRICING_STRATEGY_NAME,c.ofr_id,c.OFR_NAME " queryStr += "FROM " queryStr += "tb_prd_ofr c " queryStr += "INNER JOIN tb_bil_pricing_combine_group d ON c.pricing_plan_id = d.pricing_plan_id " queryStr += "INNER JOIN tb_bil_pricing_combine_gp_list e ON d.pricing_combine_group_id = e.pricing_combine_group_id " queryStr += "INNER JOIN tb_bil_pricing_section f ON e.event_pricing_strategy_id = f.event_pricing_strategy_id AND f.TARIFF_ID IS NOT NULL " queryStr += "INNER JOIN tb_bil_tariff g ON f.pricing_section_id = g.pricing_section_id " queryStr += "INNER JOIN tb_bil_evt_pricing_strategy h ON e.event_pricing_strategy_id = h.event_pricing_strategy_id " queryStr += "LEFT JOIN tb_bil_strategy_resource_rel sr ON sr.EVENT_PRICING_STRATEGY_ID=h.EVENT_PRICING_STRATEGY_ID " queryStr += "WHERE g.MEASURE_DOMAIN in('01','02','03') " queryStr += "AND c.ofr_type_id = '1' AND c.OFR_STATE = '2' " queryStr += "AND c.ofr_id = ? " queryStr += "AND sr.STRATEGY_RATABLE_RELATION_ID IS NULL;" stmt, _ := MySqlDb.Prepare(queryStr) defer stmt.Close() rows, err := stmt.Query(ofrId) defer rows.Close() if err != nil { l4g.MysqlLog.Errorf("Query tariff data by tariff_id[%d] error: %v", ofrId, err) return err } var count int = 0 var tariff TariffData for rows.Next() { err = rows.Scan(&tariff.TariffId, &tariff.MeasureDomain, &tariff.StartTime, &tariff.EndTime, &tariff.FeeUnit, &tariff.RateUnit, &tariff.CalcPriority, &tariff.EventPriority, &tariff.StrategyId, &tariff.StrategyName, &tariff.OfrId, &tariff.OfrName) if err != nil { l4g.MysqlLog.Errorf("Scan data error: %v", err) break } else { rds.RdbSetTariffRecord(&tariff) insertRdbPrefixRecord(tariff.StrategyId) count++ l4g.MysqlLog.Debugf("Get tariff info: %v", tariff) break //return nil } } l4g.MysqlLog.Debugf("Get tariff num[%d]!", count) // check if already exist if rds.RdbCheckIfOfrExist(tariff.OfrId) { return nil } // set ofr Id err = rds.RdbSetOfrRecord(tariff.OfrId) if err != nil { return err } return nil } func updateRdbTariffByStrategyId(strategyId int) error { // query tariff queryStr := "SELECT DISTINCT " queryStr += "g.TARIFF_ID, g.MEASURE_DOMAIN, time_to_sec(g.START_REF_VALUE) startTime, time_to_sec(g.END_REF_VALUE) endTime, " queryStr += "g.SCALED_RATE_VALUE_ID feeUnit, g.rate_unit rateUnit, c.CALC_PRIORITY,h.EVENT_PRIORITY,h.event_pricing_strategy_id," queryStr += "h.EVENT_PRICING_STRATEGY_NAME,c.ofr_id,c.OFR_NAME " queryStr += "FROM " queryStr += "tb_prd_ofr c " queryStr += "INNER JOIN tb_bil_pricing_combine_group d ON c.pricing_plan_id = d.pricing_plan_id " queryStr += "INNER JOIN tb_bil_pricing_combine_gp_list e ON d.pricing_combine_group_id = e.pricing_combine_group_id " queryStr += "INNER JOIN tb_bil_pricing_section f ON e.event_pricing_strategy_id = f.event_pricing_strategy_id AND f.TARIFF_ID IS NOT NULL " queryStr += "INNER JOIN tb_bil_tariff g ON f.pricing_section_id = g.pricing_section_id " queryStr += "INNER JOIN tb_bil_evt_pricing_strategy h ON e.event_pricing_strategy_id = h.event_pricing_strategy_id " queryStr += "LEFT JOIN tb_bil_strategy_resource_rel sr ON sr.EVENT_PRICING_STRATEGY_ID=h.EVENT_PRICING_STRATEGY_ID " queryStr += "WHERE g.MEASURE_DOMAIN in('01','02','03') " queryStr += "AND c.ofr_type_id = '1' AND c.OFR_STATE = '2' " queryStr += "AND h.event_pricing_strategy_id = ? " queryStr += "AND sr.STRATEGY_RATABLE_RELATION_ID IS NULL;" stmt, _ := MySqlDb.Prepare(queryStr) defer stmt.Close() rows, err := stmt.Query(strategyId) defer rows.Close() if err != nil { l4g.MysqlLog.Errorf("Query tariff data by tariff_id[%d] error: %v", strategyId, err) return err } var count int = 0 var tariff TariffData for rows.Next() { err = rows.Scan(&tariff.TariffId, &tariff.MeasureDomain, &tariff.StartTime, &tariff.EndTime, &tariff.FeeUnit, &tariff.RateUnit, &tariff.CalcPriority, &tariff.EventPriority, &tariff.StrategyId, &tariff.StrategyName, &tariff.OfrId, &tariff.OfrName) if err != nil { l4g.MysqlLog.Errorf("Scan data error: %v", err) break } else { rds.RdbSetTariffRecord(&tariff) //insertRdbPrefixRecord(tariff.StrategyId) count++ l4g.MysqlLog.Debugf("Get tariff info: %v", tariff) break //return nil } } l4g.MysqlLog.Debugf("Get tariff num[%d]!", count) // check if already exist if rds.RdbCheckIfOfrExist(tariff.OfrId) { return nil } // set ofr Id err = rds.RdbSetOfrRecord(tariff.OfrId) if err != nil { return err } return nil } func insertRdbPrefixRecord(strategyId int) error { stmt, _ := MySqlDb.Prepare("SELECT DISTINCT ca.area_name, ca.area_code FROM TB_BIL_PRICING_AREA ar INNER JOIN config_area ca ON ca.area_id = ar.area_id WHERE ar.STRATEGY_ID = ?;") defer stmt.Close() rows, err := stmt.Query(strategyId) defer rows.Close() if err != nil { l4g.MysqlLog.Errorf("Query prefix info of strategy[%d], error: %v", strategyId, err) return err } var count int = 0 var prefix PrefixData prefix.StrategyId = strategyId for rows.Next() { err = rows.Scan(&prefix.AreaName, &prefix.AreaCode) if err != nil { l4g.MysqlLog.Errorf("Scan prefix error: %v", err) break } else { rds.RdbSetPrefixRecord(&prefix) count++ l4g.MysqlLog.Debugf("Get prefix info of strategy[%d], prefix name[%s], code[%s].", strategyId, prefix.AreaName, prefix.AreaCode) //return nil } } l4g.MysqlLog.Debugf("Get prefix num[%d]!", count) return nil } /* HD table import SELECT hr.ofr_id, hr.tariff_id, hr.tariff_seq, hr.holiday_group, hr.state hr_state, hr.holiday_discount, hd.HOLIDAY_ID, hd.STATE hd_state, hd.HOLIDAY_PRIORITY, hd.HOLIDAY_NAME, hd.HOLIDAY_TYPE, hd.HOLIDAY_BEGIN_DATE, hd.HOLIDAY_END_DATE FROM tb_bil_holiday_rel hr INNER JOIN tb_bil_holiday hd ON hr.holiday_group = hd.HOLIDAY_GROUP WHERE hr.tariff_id=? AND hr.tariff_seq=? AND hr.state = 1 AND hd.STATE = 'L0R'; */ func insertRdbHolidayDiscountRecord(tariffId int, tariffSeq int) error { // query holiday discount queryStr := "SELECT DISTINCT hr.ofr_id, hr.tariff_id, hr.tariff_seq, hr.holiday_group, hr.state hr_state, hr.holiday_discount, " queryStr += "hd.HOLIDAY_ID, hd.STATE hd_state, hd.HOLIDAY_PRIORITY, hd.HOLIDAY_NAME, hd.HOLIDAY_TYPE, hd.HOLIDAY_BEGIN_DATE, hd.HOLIDAY_END_DATE " queryStr += "FROM tb_bil_holiday_rel hr " queryStr += "INNER JOIN tb_bil_holiday hd ON hr.holiday_group = hd.HOLIDAY_GROUP " queryStr += "WHERE hr.tariff_id=? AND hr.tariff_seq=? AND hr.state = 1 AND hd.STATE = 'L0R';" stmt, _ := MySqlDb.Prepare(queryStr) defer stmt.Close() rows, err := stmt.Query(tariffId, tariffSeq) defer rows.Close() if err != nil { l4g.MysqlLog.Errorf("Query Holiday discount by tariff_id[%d] error: %v", tariffId, err) return err } var count int = 0 var hd HolidayDiscountData for rows.Next() { err = rows.Scan(&hd.OfrId, &hd.TariffId, &hd.TariffSeq, &hd.HdGroup, &hd.HrState, &hd.HrDiscount, &hd.HdId, &hd.HdState, &hd.HdPriority, &hd.HdName, &hd.HdType, &hd.HdBeginDate, &hd.HdEndDate) if err != nil { l4g.MysqlLog.Errorf("Scan Holiday discount error: %v", err) break } else { rds.RdbSetHolidayDiscountRecord(&hd) count++ l4g.MysqlLog.Debugf("Set Holiday discount of tariff[%d], hd: %v", tariffId, hd) } } l4g.MysqlLog.Debugf("Set holiday discount num[%d] of tariff[%d]!", count, tariffId) return nil } /* HD table import SELECT hr.ofr_id, hr.tariff_id, hr.tariff_seq, hr.holiday_group, hr.state hr_state, hr.holiday_discount, hd.HOLIDAY_ID, hd.STATE hd_state, hd.HOLIDAY_PRIORITY, hd.HOLIDAY_NAME, hd.HOLIDAY_TYPE, hd.HOLIDAY_BEGIN_DATE, hd.HOLIDAY_END_DATE FROM tb_bil_holiday_rel hr INNER JOIN tb_bil_holiday hd ON hr.holiday_group = hd.HOLIDAY_GROUP WHERE hr.tariff_id=? AND hr.tariff_seq=? AND hr.state = 1 AND hd.STATE = 'L0R'; */ func insertRdbHolidayDiscountRecordByHd(holidayId int) error { // query holiday discount queryStr := "SELECT DISTINCT hr.ofr_id, hr.tariff_id, hr.tariff_seq, hr.holiday_group, hr.state hr_state, hr.holiday_discount, " queryStr += "hd.HOLIDAY_ID, hd.STATE hd_state, hd.HOLIDAY_PRIORITY, hd.HOLIDAY_NAME, hd.HOLIDAY_TYPE, hd.HOLIDAY_BEGIN_DATE, hd.HOLIDAY_END_DATE " queryStr += "FROM tb_bil_holiday_rel hr " queryStr += "INNER JOIN tb_bil_holiday hd ON hr.holiday_group = hd.HOLIDAY_GROUP " queryStr += "WHERE hd.HOLIDAY_ID=? AND hr.state = 1 AND hd.STATE = 'L0R';" stmt, _ := MySqlDb.Prepare(queryStr) defer stmt.Close() rows, err := stmt.Query(holidayId) defer rows.Close() if err != nil { l4g.MysqlLog.Errorf("Query Holiday discount by holiday_id[%d] error: %v", holidayId, err) return err } var count int = 0 var hd HolidayDiscountData for rows.Next() { err = rows.Scan(&hd.OfrId, &hd.TariffId, &hd.TariffSeq, &hd.HdGroup, &hd.HrState, &hd.HrDiscount, &hd.HdId, &hd.HdState, &hd.HdPriority, &hd.HdName, &hd.HdType, &hd.HdBeginDate, &hd.HdEndDate) if err != nil { l4g.MysqlLog.Errorf("Scan Holiday discount error: %v", err) break } else { rds.RdbSetHolidayDiscountRecord(&hd) count++ l4g.MysqlLog.Debugf("Set Holiday discount of holiday id[%d], error: %v", holidayId, err) } } l4g.MysqlLog.Debugf("Set holiday discount num[%d] of holiday id[%d]!", count, holidayId) return nil } func updateRdbPrefixRecord(synType, strategyId, areaId int) error { stmt, _ := MySqlDb.Prepare("SELECT area_name, area_code FROM config_area WHERE area_id = ?;") defer stmt.Close() rows, err := stmt.Query(areaId) defer rows.Close() if err != nil { l4g.MysqlLog.Errorf("Query config_area of area_id[%d], error: %v", areaId, err) return err } var count int = 0 var prefix PrefixData prefix.StrategyId = strategyId for rows.Next() { err = rows.Scan(&prefix.AreaName, &prefix.AreaCode) if err != nil { l4g.MysqlLog.Errorf("Scan prefix error: %v", err) break } else { if synType == 3 { rds.RdbDelPrefixRecord(strategyId, prefix.AreaCode) return nil } else { rds.RdbSetPrefixRecord(&prefix) } count++ l4g.MysqlLog.Debugf("Get prefix info of strategy[%d], error: %v", strategyId, err) break //return nil } } l4g.MysqlLog.Debugf("Update prefix num[%d]!", count) return nil } func updateRdbPrefixRecordByConfigArea(synType, areaId int, oAreaCode, nAreaCode string) error { stmt, _ := MySqlDb.Prepare("SELECT STRATEGY_ID FROM TB_BIL_PRICING_AREA WHERE area_id = ?;") defer stmt.Close() rows, err := stmt.Query(areaId) defer rows.Close() if err != nil { l4g.MysqlLog.Errorf("Query strategy_id by area_id[%d], error: %v", areaId, err) return err } var count int = 0 var prefix PrefixData for rows.Next() { err = rows.Scan(&prefix.StrategyId) if err != nil { l4g.MysqlLog.Errorf("Scan strategy_id error: %v", err) break } else { switch synType { case 1: prefix.AreaCode = nAreaCode rds.RdbSetPrefixRecord(&prefix) case 2: rds.RdbDelPrefixRecord(prefix.StrategyId, oAreaCode) prefix.AreaCode = nAreaCode rds.RdbSetPrefixRecord(&prefix) default: rds.RdbDelPrefixRecord(prefix.StrategyId, oAreaCode) } count++ } } l4g.MysqlLog.Debugf("Update prefix set num[%d]!", count) return nil } func insertRdbRrByServiceNbrIn(serviceNbrs string, bUpdatePrefix bool) error { queryStr := "SELECT DISTINCT p.PRD_INST_ID, p.SERVICE_NBR, rr.ID ratableId, rr.PRICING_SUB_SECTION_ID, rr.OFR_INST_ID, rr.BEGIN_TIME, rr.END_TIME, " queryStr += "rr.FREE_VALUE AS ratableValue, rr.`VALUE` usedRatable, g.MEASURE_DOMAIN, c.OFR_ID, c.OFR_NAME, g.TARIFF_ID, " queryStr += "TIME_TO_SEC(g.START_REF_VALUE), TIME_TO_SEC(g.END_REF_VALUE), c.CALC_PRIORITY, h.EVENT_PRIORITY, h.event_pricing_strategy_id, g.SCALED_RATE_VALUE_ID, g.RATE_UNIT " queryStr += "FROM tb_bil_tariff g " queryStr += "INNER JOIN ratable_history rr ON rr.PRICING_SUB_SECTION_ID = g.PRICING_SECTION_ID " queryStr += "INNER JOIN tb_prd_ofr_inst_551 b ON rr.ofr_inst_id = b.ofr_inst_id " queryStr += "INNER JOIN tb_prd_ofr c ON b.ofr_id = c.ofr_id " queryStr += "INNER JOIN tb_prd_prd_inst_551 p ON b.ACCT_ID = p.ACCT_ID " queryStr += "INNER JOIN tb_bil_pricing_combine_group d ON c.pricing_plan_id = d.pricing_plan_id " queryStr += "INNER JOIN tb_bil_pricing_combine_gp_list e ON d.pricing_combine_group_id = e.pricing_combine_group_id " queryStr += "INNER JOIN tb_bil_pricing_section f ON e.event_pricing_strategy_id = f.event_pricing_strategy_id AND f.TARIFF_ID IS NOT NULL " queryStr += "INNER JOIN tb_bil_evt_pricing_strategy h ON e.event_pricing_strategy_id = h.event_pricing_strategy_id " queryStr += "WHERE c.IF_PREPAY=1 AND f.pricing_section_id = g.pricing_section_id " queryStr += "AND p.SERVICE_NBR IN (" + serviceNbrs + ") AND p.PRD_INST_STAS_ID IN ( 1001 ) AND rr.FREE_VALUE > rr.`VALUE` " queryStr += "AND (STR_TO_DATE(rr.END_TIME, '%Y%m%d%H%i%s') >= SYSDATE());" stmt, _ := MySqlDb.Prepare(queryStr) defer stmt.Close() rows, err := stmt.Query() defer rows.Close() if err != nil { l4g.MysqlLog.Errorf("Query rr data error: %v", err) return err } var count int = 0 for rows.Next() { var rr RrData err = rows.Scan(&rr.PrdInstId, &rr.ServiceNbr, &rr.RrId, &rr.PricingSectionId, &rr.OfrInstId, &rr.BeginTime, &rr.EndTime, &rr.FreeValue, &rr.UsedValue, &rr.MeasureDomain, &rr.OfrId, &rr.OfrName, &rr.TariffId, &rr.StartRefTime, &rr.EndRefTime, &rr.CalcPriority, &rr.EventPriority, &rr.StrategyId, &rr.UnitFee, &rr.RateUnit) if err != nil { l4g.MysqlLog.Errorf("Scan data error: %v", err) break } else { // insert redis db: rr table by prd_inst_id rds.RdbSetRrRecord(&rr) if bUpdatePrefix { insertRdbPrefixRecord(rr.StrategyId) } count++ l4g.MysqlLog.Debugf("Get rr info: %v", rr) //return nil } } l4g.MysqlLog.Debugf("Get rr num[%d]!", count) return nil } func insertRdbRrByServiceNbr(serviceNbr string, prdInstId int, bUpdatePrefix bool) error { queryStr := "SELECT DISTINCT rr.ID ratableId, rr.PRICING_SUB_SECTION_ID, rr.OFR_INST_ID, rr.BEGIN_TIME, rr.END_TIME, " queryStr += "rr.FREE_VALUE AS ratableValue, rr.`VALUE` usedRatable, g.MEASURE_DOMAIN, c.OFR_ID, c.OFR_NAME, g.TARIFF_ID, " queryStr += "TIME_TO_SEC(g.START_REF_VALUE), TIME_TO_SEC(g.END_REF_VALUE), c.CALC_PRIORITY, h.EVENT_PRIORITY, h.event_pricing_strategy_id, g.SCALED_RATE_VALUE_ID, g.RATE_UNIT " queryStr += "FROM tb_bil_tariff g " queryStr += "INNER JOIN ratable_history rr ON rr.PRICING_SUB_SECTION_ID = g.PRICING_SECTION_ID " queryStr += "INNER JOIN tb_prd_ofr_inst_551 b ON rr.ofr_inst_id = b.ofr_inst_id " queryStr += "INNER JOIN tb_prd_ofr c ON b.ofr_id = c.ofr_id " queryStr += "INNER JOIN tb_prd_prd_inst_551 p ON b.ACCT_ID = p.ACCT_ID " queryStr += "INNER JOIN tb_bil_pricing_combine_group d ON c.pricing_plan_id = d.pricing_plan_id " queryStr += "INNER JOIN tb_bil_pricing_combine_gp_list e ON d.pricing_combine_group_id = e.pricing_combine_group_id " queryStr += "INNER JOIN tb_bil_pricing_section f ON e.event_pricing_strategy_id = f.event_pricing_strategy_id AND f.TARIFF_ID IS NOT NULL " queryStr += "INNER JOIN tb_bil_evt_pricing_strategy h ON e.event_pricing_strategy_id = h.event_pricing_strategy_id " queryStr += "WHERE c.IF_PREPAY=1 AND f.pricing_section_id = g.pricing_section_id " queryStr += "AND p.SERVICE_NBR = ? AND p.PRD_INST_STAS_ID IN ( 1001 ) AND rr.FREE_VALUE > rr.`VALUE` " queryStr += "AND (STR_TO_DATE(rr.END_TIME, '%Y%m%d%H%i%s') >= SYSDATE());" stmt, _ := MySqlDb.Prepare(queryStr) defer stmt.Close() rows, err := stmt.Query(serviceNbr) defer rows.Close() if err != nil { l4g.MysqlLog.Errorf("Query rr data error: %v", err) return err } var count int = 0 var rr RrData rr.PrdInstId, rr.ServiceNbr = prdInstId, serviceNbr for rows.Next() { err = rows.Scan(&rr.RrId, &rr.PricingSectionId, &rr.OfrInstId, &rr.BeginTime, &rr.EndTime, &rr.FreeValue, &rr.UsedValue, &rr.MeasureDomain, &rr.OfrId, &rr.OfrName, &rr.TariffId, &rr.StartRefTime, &rr.EndRefTime, &rr.CalcPriority, &rr.EventPriority, &rr.StrategyId, &rr.UnitFee, &rr.RateUnit) if err != nil { l4g.MysqlLog.Errorf("Scan data error: %v", err) break } else { // insert redis db: rr table by prd_inst_id rds.RdbSetRrRecord(&rr) if bUpdatePrefix { insertRdbPrefixRecord(rr.StrategyId) } count++ l4g.MysqlLog.Debugf("Get rr info: %v", rr) //return nil } } l4g.MysqlLog.Debugf("Get rr num[%d]!", count) return nil } func insertRdbRrByRrId(rr_id int64) (int, error) { query_str := "SELECT DISTINCT rr.PRICING_SUB_SECTION_ID, rr.OFR_INST_ID, rr.BEGIN_TIME, rr.END_TIME, p.SERVICE_NBR, p.PRD_INST_ID, " query_str += "IFNULL(rr.FREE_VALUE, 0) AS ratableValue, IFNULL(rr.`VALUE`, 0) AS usedRatable, g.MEASURE_DOMAIN, c.OFR_ID, c.OFR_NAME, g.TARIFF_ID, " query_str += "TIME_TO_SEC(g.START_REF_VALUE), TIME_TO_SEC(g.END_REF_VALUE), c.CALC_PRIORITY, h.EVENT_PRIORITY, h.event_pricing_strategy_id, g.SCALED_RATE_VALUE_ID, g.RATE_UNIT " query_str += "FROM tb_bil_tariff g " query_str += "INNER JOIN ratable_history rr ON rr.PRICING_SUB_SECTION_ID = g.PRICING_SECTION_ID " query_str += "INNER JOIN tb_prd_ofr_inst_551 b ON rr.ofr_inst_id = b.ofr_inst_id " query_str += "INNER JOIN tb_prd_ofr c ON b.ofr_id = c.ofr_id " query_str += "INNER JOIN tb_prd_prd_inst_551 p ON b.ACCT_ID = p.ACCT_ID " query_str += "INNER JOIN tb_bil_pricing_combine_group d ON c.pricing_plan_id = d.pricing_plan_id " query_str += "INNER JOIN tb_bil_pricing_combine_gp_list e ON d.pricing_combine_group_id = e.pricing_combine_group_id " query_str += "INNER JOIN tb_bil_pricing_section f ON e.event_pricing_strategy_id = f.event_pricing_strategy_id AND f.TARIFF_ID IS NOT NULL " query_str += "INNER JOIN tb_bil_evt_pricing_strategy h ON e.event_pricing_strategy_id = h.event_pricing_strategy_id " query_str += "WHERE c.IF_PREPAY=1 AND f.pricing_section_id = g.pricing_section_id " query_str += "AND rr.ID = ? AND STR_TO_DATE(rr.END_TIME, '%Y%m%d%H%i%s') > SYSDATE();"// AND p.PRD_INST_STAS_ID IN ( 1001 ) stmt, _ := MySqlDb.Prepare(query_str) defer stmt.Close() rows, err := stmt.Query(rr_id) defer rows.Close() if err != nil { l4g.MysqlLog.Errorf("Query rr data error: %v", err) return 0, err } var count int = 0 var rr RrData rr.RrId = rr_id for rows.Next() { err = rows.Scan(&rr.PricingSectionId, &rr.OfrInstId, &rr.BeginTime, &rr.EndTime, &rr.ServiceNbr, &rr.PrdInstId, &rr.FreeValue, &rr.UsedValue, &rr.MeasureDomain, &rr.OfrId, &rr.OfrName, &rr.TariffId, &rr.StartRefTime, &rr.EndRefTime, &rr.CalcPriority, &rr.EventPriority, &rr.StrategyId, &rr.UnitFee, &rr.RateUnit) if err != nil { l4g.MysqlLog.Errorf("Scan data error: %v", err) break } else { //if strings.Compare(rr.EndTime, time.Now().Format("20060102150405")) <= 0 {// delete RR, rr.UsedValue >= rr.FreeValue || // rds.Rdb_del_rr_record(&rr) // return nil //} // update redis db: rr table by prd_inst_id rds.RdbSetRrRecord(&rr)// "Rr:"+rr.ServiceNbr+":"+rr.MeasureDomain+":"+strconv.Itoa(rr.RrId) insertRdbPrefixRecord(rr.StrategyId) count++ l4g.MysqlLog.Debugf("Get rr info: %v", rr) return count, nil //return nil } } l4g.MysqlLog.Debugf("set rr num[%d], id[%d]!", count, rr_id) return count, nil } func UpdateRdbRrOfrPriority(ofrId, priority int) error { queryStr := "SELECT DISTINCT rr.ID, p.SERVICE_NBR, g.MEASURE_DOMAIN " queryStr += "FROM tb_bil_tariff g " queryStr += "INNER JOIN ratable_history rr ON rr.PRICING_SUB_SECTION_ID = g.PRICING_SECTION_ID " queryStr += "INNER JOIN tb_prd_ofr_inst_551 b ON rr.ofr_inst_id = b.ofr_inst_id " queryStr += "INNER JOIN tb_prd_ofr c ON b.ofr_id = c.ofr_id " queryStr += "INNER JOIN tb_prd_prd_inst_551 p ON b.ACCT_ID = p.ACCT_ID " queryStr += "INNER JOIN tb_bil_pricing_combine_group d ON c.pricing_plan_id = d.pricing_plan_id " queryStr += "INNER JOIN tb_bil_pricing_combine_gp_list e ON d.pricing_combine_group_id = e.pricing_combine_group_id " queryStr += "INNER JOIN tb_bil_pricing_section f ON e.event_pricing_strategy_id = f.event_pricing_strategy_id AND f.TARIFF_ID IS NOT NULL " queryStr += "INNER JOIN tb_bil_evt_pricing_strategy h ON e.event_pricing_strategy_id = h.event_pricing_strategy_id " queryStr += "WHERE c.IF_PREPAY=1 AND f.pricing_section_id = g.pricing_section_id " queryStr += "AND c.ofr_id = ? AND p.PRD_INST_STAS_ID IN ( 1001 ) AND STR_TO_DATE(rr.END_TIME, '%Y%m%d%H%i%s') > SYSDATE();" stmt, _ := MySqlDb.Prepare(queryStr) defer stmt.Close() rows, err := stmt.Query(ofrId) defer rows.Close() if err != nil { l4g.MysqlLog.Errorf("Query rr data error: %v", err) return err } //var count int = 0 var rr RrData rr.CalcPriority = priority for rows.Next() { err = rows.Scan(&rr.RrId, &rr.ServiceNbr, &rr.MeasureDomain) if err != nil { l4g.MysqlLog.Errorf("Scan data error: %v", err) break } else { //if rr.UsedValue >= rr.FreeValue || strings.Compare(rr.EndTime, time.Now().Format("20060102150405")) <= 0 {// delete RR // rds.Rdb_del_rr_record(&rr) //} // update redis db: rr table Event Priority rds.RdbSetOfrCalcPriority(&rr)// "Rr:"+rr.ServiceNbr+":"+rr.MeasureDomain+":"+strconv.Itoa(rr.RrId) //count++ //l4g.Debugf("Get rr info: %v", rr) //return nil //return nil } } //l4g.Debugf("Get rr num[%d]!", count) return nil } func UpdateRdbRrStrategyPriority(strategyId, priority int) error { queryStr := "SELECT DISTINCT rr.ID, p.SERVICE_NBR, g.MEASURE_DOMAIN " queryStr += "FROM tb_bil_tariff g " queryStr += "INNER JOIN ratable_history rr ON rr.PRICING_SUB_SECTION_ID = g.PRICING_SECTION_ID " queryStr += "INNER JOIN tb_prd_ofr_inst_551 b ON rr.ofr_inst_id = b.ofr_inst_id " queryStr += "INNER JOIN tb_prd_ofr c ON b.ofr_id = c.ofr_id " queryStr += "INNER JOIN tb_prd_prd_inst_551 p ON b.ACCT_ID = p.ACCT_ID " queryStr += "INNER JOIN tb_bil_pricing_combine_group d ON c.pricing_plan_id = d.pricing_plan_id " queryStr += "INNER JOIN tb_bil_pricing_combine_gp_list e ON d.pricing_combine_group_id = e.pricing_combine_group_id " queryStr += "INNER JOIN tb_bil_pricing_section f ON e.event_pricing_strategy_id = f.event_pricing_strategy_id AND f.TARIFF_ID IS NOT NULL " queryStr += "INNER JOIN tb_bil_evt_pricing_strategy h ON e.event_pricing_strategy_id = h.event_pricing_strategy_id " queryStr += "WHERE c.IF_PREPAY=1 AND f.pricing_section_id = g.pricing_section_id " queryStr += "AND h.event_pricing_strategy_id = ? AND p.PRD_INST_STAS_ID IN ( 1001 ) AND STR_TO_DATE(rr.END_TIME, '%Y%m%d%H%i%s') > SYSDATE();" stmt, _ := MySqlDb.Prepare(queryStr) defer stmt.Close() rows, err := stmt.Query(strategyId) defer rows.Close() if err != nil { l4g.MysqlLog.Errorf("Query rr data error: %v", err) return err } var count int = 0 var rr RrData rr.EventPriority = priority for rows.Next() { err = rows.Scan(&rr.RrId, &rr.ServiceNbr, &rr.MeasureDomain) if err != nil { l4g.MysqlLog.Errorf("Scan data error: %v", err) break } else { //if rr.UsedValue >= rr.FreeValue || strings.Compare(rr.EndTime, time.Now().Format("20060102150405")) <= 0 {// delete RR // rds.Rdb_del_rr_record(&rr) // return nil //} // update redis db: rr table Event Priority rds.RdbSetStrategyEventPriority(&rr)// "Rr:"+rr.ServiceNbr+":"+rr.MeasureDomain+":"+strconv.Itoa(rr.RrId) count++ //l4g.Debugf("Update Rr info: %v", rr) //return nil //return nil } } l4g.MysqlLog.Debugf("Update event priority of rr num[%d]!", count) return nil } func TriggerEventProcess() { for { scanNewEvent() time.Sleep(time.Duration(300)*time.Millisecond) } } func scanNewEvent() { //if rds.CheckIfRdbMaster() != true {// only sync on Master Redis // return //} queryStr := "SELECT syn_id, tbl_id, syn_type, IFNULL(strategy_id,0) AS strategy_id, IFNULL(area_id,0) AS area_id, IFNULL(prd_inst_id,0) AS prd_inst_id, " queryStr += "IFNULL(service_nbr,'') AS service_nbr, IFNULL(ofr_id,0) AS ofr_id, " queryStr += "IFNULL(ofr_detail_inst_id,0) AS ofr_detail_inst_id, IFNULL(holiday_id,0) AS holiday_id, IFNULL(tariff_id,0) AS tariff_id, IFNULL(tariff_seq,0) AS tariff_seq, " queryStr += "IFNULL(rr_id,0) AS rr_id, IFNULL(o_area_code,'') AS o_area_code, IFNULL(n_area_code,'') AS n_area_code, " queryStr += "IFNULL(ratable_value,0) AS ratable_value, IFNULL(used_value,0) AS used_value, IFNULL(begin_time,'') AS begin_time, " queryStr += "IFNULL(end_time,'') AS end_time FROM tb_sync_prepaid_tariff WHERE state=0 ORDER BY syn_id ASC;" stmt, _ := MySqlDb.Prepare(queryStr) defer stmt.Close() rows, err := stmt.Query()// query all adsl cdr defer rows.Close() if err != nil { l4g.MysqlLog.Errorf("Query trigger event error: %v", err) return } var synId, tblId, synType, strategyId, areaId, prdInstId, ofrDetailInstId, holidayId, tariffId, tariffSeq, rrId, ofrId int var serviceNbr, oAreaCode, nAreaCode, beginTime, endTime string var ratableVal, usedVal int64 for rows.Next() { err = rows.Scan(&synId, &tblId, &synType, &strategyId, &areaId, &prdInstId, &serviceNbr, &ofrId, &ofrDetailInstId, &holidayId, &tariffId, &tariffSeq, &rrId, &oAreaCode, &nAreaCode, &ratableVal, &usedVal, &beginTime, &endTime) if err != nil { l4g.MysqlLog.Errorf("Scan trigger event error: %v", err) break } l4g.MysqlLog.Debugf("read syn_id[%d], table[%d], op[%d]", synId, tblId, synType) // update redis table switch tblId { case 1: //UpdateRedisAcctTable(synType, prdInstId, serviceNbr) case 2: //UpdateRedisAcctTableOfr(synType, ofrDetailInstId) case 3: //UpdateRedisTariffTable(synType, tariffId) case 9: //UpdateRedisTariffTableByOfr(synType, ofrId) case 10: //UpdateRedisTariffTableByStrategy(synType, strategyId) case 4:// update prefix table, while update config_area table //UpdateRedisPrefixTableByConfigArea(synType, areaId, oAreaCode, nAreaCode) case 5: //UpdateRedisPrefixTable(synType, strategyId, areaId) case 6: if rrId > 0 { //UpdateRedisRrTable(synType, int64(rrId), ratableVal, usedVal, beginTime, endTime) } else { if ofrId > 0 {// ofr: CALC_PRIORITY //UpdateRdbRrOfrPriority(ofrId, tariffSeq) } else if strategyId > 0 {// strategy: EVENT_PRIORITY //UpdateRdbRrStrategyPriority(strategyId, tariffSeq) } } case 7: //UpdateRedisHolidayDisTableByTariffId(synType, tariffId, tariffSeq) case 8: //UpdateRedisHolidayDisTableByHolidayId(synType, holidayId) default: } // updateEventState(synId) } } func UpdateRedisRrTable(synType int, c *ChgRr) { switch synType { case 1: go func() { t1Chnl := time.After(2 * time.Second) select { case <-t1Chnl: n, err := insertRdbRrByRrId(c.Id) if err==nil && n==0 { l4g.MysqlLog.Warnf("insertRdbRrByRrId fail, id[%d], try again 60s later", c.Id) timeChannel := time.After(60 * time.Second) select { case <-timeChannel: insertRdbRrByRrId(c.Id) } } } }() case 2: ret, _ := rds.RdbUpdateRrRecordOnly(c.Id, c.NewFreeValue, c.NewValue, c.NewBeginTime, c.NewEndTime) if ret == 0 { insertRdbRrByRrId(c.Id) } case 3: rds.RdbDelRrRecordByRrId(c.Id) default: } } func UpdateRedisTariffTable(synType int, r *ChgTariff) { if synType == 1 || synType == 2 { updateRdbTariffByTariffId(r.TariffId) } else { rds.RdbDelTariffRecord(r.TariffId) } } func UpdateRedisTariffTableByOfr(synType int, r *ChgOfrInfo) { if synType == 1 || synType == 2 { updateRdbTariffByOfrId(r.OfrId) } else { rds.RdbDelTariffRecordByOfr(r.OfrId) } } func UpdateRedisTariffTableByStrategy(synType int, r *ChgPricingStrategy) { if synType == 1 || synType == 2 { updateRdbTariffByStrategyId(r.EventPricingStrategyId) } else { // TODO??? //rds.RdbDelTariffRecord(strategyId) } } func UpdateRedisPrefixTable(synType int, r *ChgPricingArea) { if synType == 3 { updateRdbPrefixRecord(synType, r.OldStrategyId, r.OldAreaId) } else { updateRdbPrefixRecord(synType, r.NewStrategyId, r.NewAreaId) } } func UpdateRedisPrefixTableByConfigArea(synType int, r *ChgConfigArea) { /*if synType == 1 { updateRdbTariffByStrategyId(strategyId) } else if synType == 2 { // TODO??? //rds.RdbDelTariffRecord(strategyId) } else { deleteRdb }*/ updateRdbPrefixRecordByConfigArea(synType, r.AreaId, r.OldAreaCode, r.NewAreaCode) } func UpdateRdbAcctTblByPrdInstId(prdInstId int, bUpdateRr bool) error { queryStr := "SELECT p.PRD_INST_ID,p.PRD_ID AS acct_type,p.SERVICE_NBR,p.PRD_INST_STAS_ID,p.ACCT_ID,p.OWN_CUST_ID,b.ofr_id,a.EFF_DATE,a.EXP_DATE,b.OFR_INST_ID " queryStr += "FROM tb_prd_prd_inst_551 p " queryStr += "INNER JOIN tb_prd_ofr_detail_inst_551 a ON a.ofr_detail_inst_ref_id = p.PRD_INST_ID " queryStr += "INNER JOIN tb_prd_ofr_inst_551 b ON a.ofr_inst_id = b.ofr_inst_id " queryStr += "INNER JOIN tb_prd_ofr c ON b.ofr_id = c.ofr_id " queryStr += "WHERE a.OFR_DETAIL_TYPE_ID = 'A1' AND p.PRD_INST_ID = ? " queryStr += "AND p.IF_PREPAY = 1 and p.PRD_INST_STAS_ID IN (1001) AND c.ofr_type_id = '1' "// , 1202, 1205, 1101);" queryStr += "AND a.EXP_DATE > SYSDATE() AND a.EFF_DATE < SYSDATE() LIMIT 1;" stmt, _ := MySqlDb.Prepare(queryStr) defer stmt.Close() rows, err := stmt.Query(prdInstId) if err != nil { l4g.MysqlLog.Errorf("Query acct data by prd_inst_id[%d] error: %v", prdInstId, err) return err } defer rows.Close() var acctData AcctData var count int = 0 var acctType, prdInstStasId, acctId, custId, ofrId, ofrInstId int var serviceNbr, effDate, expDate string for rows.Next(){ if err = rows.Scan(&prdInstId, &acctType, &serviceNbr, &prdInstStasId, &acctId, &custId, &ofrId, &effDate, &expDate, &ofrInstId); err != nil { l4g.MysqlLog.Errorf("query row of acct fail: %s", err) return err } // insert redis db: acct table acctData = AcctData{PrdInstId: prdInstId, AcctType: acctType, ServiceNbr: serviceNbr, PrdInstStasId: prdInstStasId, AcctId: acctId, CustId: custId, OfrId: ofrId, EffTime: effDate, ExpTime: expDate, OfrInstId: ofrInstId} err = rds.RdbSetAcctRecord(&acctData) if err != nil { l4g.MysqlLog.Errorf("Insert redis record[%v] fail: %v", acctData, err) return err } else { // insert redis db: tariff table by ofr_id //insert_rdb_tariff_by_ofr_id(ofr_id) // insert redis db: rr table by prd_inst_id if bUpdateRr { insertRdbRrByServiceNbr(serviceNbr, prdInstId, true) } count++ //l4g.Debugf("name: %s, code: %s", name, code) } } //if count == 0 { // TODO delete this prd_inst_id related service //} l4g.MysqlLog.Debugf("total acct num: %d", count) return err } func UpdateRedisAcctTable(synType int, row *ChgAcctInfo) { if synType == 1 { UpdateRdbAcctTblByPrdInstId(row.PrdInstId, true) } else if synType == 2 { UpdateRdbAcctTblByPrdInstId(row.PrdInstId, false) } else { rds.RdbDelAcctRecord(row.ServiceNbr) //rds.RdbDelRrRecordByServiceNbr(row.ServiceNbr) } } func UpdateRedisAcctTableOfr(synType int, row *ChgOfrDetail) { if synType == 3 { updateRdbAcctTblByOfrDetailInstId(synType, row.OfrDetailInstId) } else { go func() { t1Chnl := time.After(5 * time.Second) select { case <-t1Chnl: n, err := updateRdbAcctTblByOfrDetailInstId(synType, row.OfrDetailInstId) if err == nil && n == 0 {// will fail for bundle l4g.MysqlLog.Warnf("UpdateRedisAcctTableOfr fail, type[%d], OfrDetailInstId[%d], try again 60s later", synType, row.OfrDetailInstId) timeChannel := time.After(60 * time.Second) select { case <-timeChannel: updateRdbAcctTblByOfrDetailInstId(synType, row.OfrDetailInstId) } } } }() } } func UpdateRedisHolidayDisTableByTariffId(synType int, r *ChgHoliday) { if synType == 1 { insertRdbHolidayDiscountRecord(r.NewTariffId, r.NewTariffSeq) } else if synType == 2 { if r.OldTariffId != r.NewTariffId { rds.RdbDelHolidayDiscountRecord(r.OldTariffId) } insertRdbHolidayDiscountRecord(r.NewTariffId, r.NewTariffSeq) } else { rds.RdbDelHolidayDiscountRecord(r.OldTariffId) } } func UpdateRedisHolidayDisTableByHolidayId(syn_type int, r *ChgBilHoliday) { if syn_type == 1 { insertRdbHolidayDiscountRecordByHd(r.HolidayId) } else if syn_type == 2 { insertRdbHolidayDiscountRecordByHd(r.HolidayId) } else { rds.RdbDelHolidayDiscountRecordByHd(r.HolidayId) } } func UpdateRedisAlertSms(synType int, r *ChgAlertSms) { if synType == 1 || synType == 2 { _ = rds.RdbSetAlertSmsRecord(r.AlertId, r.ServiceNbr, r.SmsContent) } } func UpdateRedisSyncMobile(synType int, r *ChgSyncMobile) { if synType == 1 || synType == 2 { _ = rds.RdbSetCreateAcctRecord(r) } } func updateEventState(id int) error { stmt, _ := MySqlDb.Prepare("UPDATE tb_sync_prepaid_tariff SET state = 1 WHERE syn_id = ?;") defer stmt.Close() _, err := stmt.Exec(id) if err != nil { l4g.MysqlLog.Errorf("Update trigger event err: %v", err) return err } l4g.MysqlLog.Debugf("Update trigger event succ!") return nil }