Files
selfcare/proxy/canal/msg.go
2025-03-25 09:46:16 +08:00

1103 lines
31 KiB
Go

package canal
import (
"fmt"
"github.com/go-mysql-org/go-mysql/canal"
"proxy/logger"
. "proxy/MsgDef"
"strconv"
)
func getAcctInfoColumns(c *RecordChange, bB bool, bA bool, beC []interface{}, afC []interface{}, e *canal.RowsEvent) {
var chg ChgAcctInfo
var str string
if bB {
//beC := rowData.GetBeforeColumns()
for index, column := range e.Table.Columns {
switch column.Name {
case "PRD_INST_STAS_ID":
str = fmt.Sprintf("%v", beC[index])
chg.OldPrdInstStasId = str
case "PRD_INST_ID":
str = fmt.Sprintf("%v", beC[index])
chg.PrdInstId, _ = strconv.Atoi(str)
case "SERVICE_NBR":
chg.ServiceNbr = fmt.Sprintf("%v", beC[index])
case "IF_PREPAY":
chg.IfPrepaid = fmt.Sprintf("%v", beC[index])
}
}
}
if bA {
//afC := rowData.GetAfterColumns()
for index, column := range e.Table.Columns {
switch column.Name {
case "PRD_INST_STAS_ID":
str = fmt.Sprintf("%v", afC[index])
if bB && str != chg.OldPrdInstStasId {
chg.BUpdate = true
}
chg.NewPrdInstStasId = str
case "PRD_INST_ID":
str = fmt.Sprintf("%v", afC[index])
chg.PrdInstId, _ = strconv.Atoi(str)
case "SERVICE_NBR":
chg.ServiceNbr = fmt.Sprintf("%v", afC[index])
case "IF_PREPAY":
chg.IfPrepaid = fmt.Sprintf("%v", afC[index])
}
}
}
switch c.EventType {
case canal.InsertAction:
if chg.IfPrepaid != "1" || chg.NewPrdInstStasId != "1001" {
} else {
c.ChgAcctInfo = append(c.ChgAcctInfo, chg)
logger.CanalLog.Infof("insert into prd_prd_inst_551: [%#v]", chg)
}
case canal.DeleteAction:
if chg.IfPrepaid != "1" || chg.OldPrdInstStasId != "1001" {
} else {
c.ChgAcctInfo = append(c.ChgAcctInfo, chg)
logger.CanalLog.Infof("delete from prd_prd_inst_551: [%#v]", chg)
}
default:
if chg.IfPrepaid != "1" || !chg.BUpdate {
} else {
if chg.OldPrdInstStasId == "1001" && chg.NewPrdInstStasId != "1001" {
c.EventType = canal.DeleteAction
} else if chg.OldPrdInstStasId != "1001" && chg.NewPrdInstStasId == "1001" {
c.EventType = canal.InsertAction
}
c.ChgAcctInfo = append(c.ChgAcctInfo, chg)
logger.CanalLog.Infof("modify prd_prd_inst_551: [%#v]", chg)
}
}
}
func getOfrDetailColumns(c *RecordChange, bB bool, bA bool, beC []interface{}, afC []interface{}, e *canal.RowsEvent) {
var chg ChgOfrDetail
var str string
if bB {
//beC := rowData.GetBeforeColumns()
for index, column := range e.Table.Columns {
switch column.Name {
case "OFR_DETAIL_TYPE_ID":
str = fmt.Sprintf("%v", beC[index])
chg.OfrDetailTypeId = str
case "OFR_DETAIL_INST_ID":
str = fmt.Sprintf("%v", beC[index])
chg.OfrDetailInstId, _ = strconv.Atoi(str)
case "EFF_DATE":
str = fmt.Sprintf("%v", beC[index])
chg.OldEffDate = str
case "EXP_DATE":
str = fmt.Sprintf("%v", beC[index])
chg.OldExpDate = str
/*case "CALC_PRIORITY":
chg.ChgOfrDetail.OldCalcPriority = column.Value*/
}
}
}
if bA {
//afC := rowData.GetAfterColumns()
for index, column := range e.Table.Columns {
switch column.Name {
case "OFR_DETAIL_TYPE_ID":
str = fmt.Sprintf("%v", afC[index])
chg.OfrDetailTypeId = str
case "OFR_DETAIL_INST_ID":
str = fmt.Sprintf("%v", afC[index])
chg.OfrDetailInstId, _ = strconv.Atoi(str)
case "EFF_DATE":
str = fmt.Sprintf("%v", afC[index])
if bB && str != chg.OldEffDate {
chg.BUpdate = true
}
chg.NewEffDate = str
case "EXP_DATE":
str = fmt.Sprintf("%v", afC[index])
if bB && str != chg.OldExpDate {
chg.BUpdate = true
}
chg.NewExpDate = str
/*case "CALC_PRIORITY":
if column.Updated {
chg.ChgOfrDetail.BUpdate = true
}
chg.ChgOfrDetail.NewCalcPriority = column.Value*/
}
}
}
switch c.EventType {
case canal.InsertAction:
if chg.OfrDetailTypeId != "A1" {
} else {
c.ChgOfrDetail = append(c.ChgOfrDetail, chg)
logger.CanalLog.Infof("insert into prd_ofr_detail_inst_551: [%#v]", chg)
}
case canal.DeleteAction:
if chg.OfrDetailTypeId != "A1" {
} else {
c.ChgOfrDetail = append(c.ChgOfrDetail, chg)
logger.CanalLog.Infof("delete from prd_ofr_detail_inst_551: [%#v]", chg)
}
default:
if !chg.BUpdate || chg.OfrDetailTypeId != "A1" {
} else {
c.ChgOfrDetail = append(c.ChgOfrDetail, chg)
logger.CanalLog.Infof("modify prd_ofr_detail_inst_551: [%#v]", chg)
}
}
//c.ChgOfrDetail = append(c.ChgOfrDetail, chg)
}
func getTariffColumns(c *RecordChange, bB bool, bA bool, beC []interface{}, afC []interface{}, e *canal.RowsEvent) {
var chg ChgTariff
var str string
if bB {
//beC := rowData.GetBeforeColumns()
for index, column := range e.Table.Columns {
switch column.Name {
case "TARIFF_ID":
str = fmt.Sprintf("%v", beC[index])
chg.TariffId, _ = strconv.Atoi(str)
}
}
}
if bA {
//afC := rowData.GetAfterColumns()
for index, column := range e.Table.Columns {
switch column.Name {
case "TARIFF_ID":
str = fmt.Sprintf("%v", afC[index])
chg.TariffId, _ = strconv.Atoi(str)
}
}
}
c.ChgTariff = append(c.ChgTariff, chg)
logger.CanalLog.Infof("update tb_tariff: [%#v], type[%s]", chg, c.EventType)
}
func getOfrInfoColumns(c *RecordChange, bB bool, bA bool, beC []interface{}, afC []interface{}, e *canal.RowsEvent) {
var chg ChgOfrInfo
var str string
if bB {
//beC := rowData.GetBeforeColumns()
for index, column := range e.Table.Columns {
switch column.Name {
case "OFR_TYPE_ID":
str = fmt.Sprintf("%v", beC[index])
chg.OfrTypeId = str
case "OFR_ID":
str = fmt.Sprintf("%v", beC[index])
chg.OfrId, _ = strconv.Atoi(str)
case "IF_PREPAY":
str = fmt.Sprintf("%v", beC[index])
chg.IfPrepaid = str
case "OFR_STATE":
str = fmt.Sprintf("%v", beC[index])
chg.OldOfrState = str
case "CALC_PRIORITY":
str = fmt.Sprintf("%v", beC[index])
chg.OldCalcPriority = str
}
}
}
if bA {
//afC := rowData.GetAfterColumns()
for index, column := range e.Table.Columns {
switch column.Name {
case "OFR_TYPE_ID":
str = fmt.Sprintf("%v", afC[index])
chg.OfrTypeId = str
case "OFR_ID":
str = fmt.Sprintf("%v", afC[index])
chg.OfrId, _ = strconv.Atoi(str)
case "IF_PREPAY":
str = fmt.Sprintf("%v", afC[index])
chg.IfPrepaid = str
case "OFR_STATE":
str = fmt.Sprintf("%v", afC[index])
if bB && str != chg.OldOfrState {
chg.BUpdate = true
}
chg.NewOfrState = str
case "CALC_PRIORITY":
str = fmt.Sprintf("%v", afC[index])
if bB && str != chg.OldCalcPriority {
chg.BUpdate = true
}
chg.NewCalcPriority = str
}
}
}
switch c.EventType {
case canal.InsertAction:
if chg.IfPrepaid != "1" || chg.OfrTypeId != "1" || chg.NewOfrState != "2" {
} else {
c.ChgOfrInfo = append(c.ChgOfrInfo, chg)
logger.CanalLog.Infof("insert into tb_ofr: [%#v]", chg)
}
case canal.DeleteAction:
if chg.IfPrepaid != "1" || chg.OfrTypeId != "1" || chg.OldOfrState != "2" {
} else {
c.ChgOfrInfo = append(c.ChgOfrInfo, chg)
logger.CanalLog.Infof("delete from tb_ofr: [%#v]", chg)
}
default:
if chg.IfPrepaid != "1" || chg.OfrTypeId != "1" || !chg.BUpdate {
} else {
if chg.OldOfrState == "2" && chg.NewOfrState != "2" {
c.EventType = canal.DeleteAction
} else if chg.OldOfrState != "2" && chg.NewOfrState == "2" {
c.EventType = canal.InsertAction
}
c.ChgOfrInfo = append(c.ChgOfrInfo, chg)
logger.CanalLog.Infof("modify tb_ofr: [%#v]", chg)
}
}
}
func getPricingStrategyColumns(c *RecordChange, bB bool, bA bool, beC []interface{}, afC []interface{}, e *canal.RowsEvent) {
var chg ChgPricingStrategy
var str string
if bB {
//beC := rowData.GetBeforeColumns()
for index, column := range e.Table.Columns {
switch column.Name {
case "EVENT_PRICING_STRATEGY_ID":
str = fmt.Sprintf("%v", beC[index])
chg.EventPricingStrategyId, _ = strconv.Atoi(str)
case "EVENT_PRIORITY":
str = fmt.Sprintf("%v", beC[index])
chg.OldEventPriority = str
case "CALL_TYPE":
str = fmt.Sprintf("%v", beC[index])
chg.OldCallType = str
}
}
}
if bA {
//afC := rowData.GetAfterColumns()
for index, column := range e.Table.Columns {
switch column.Name {
case "EVENT_PRICING_STRATEGY_ID":
str = fmt.Sprintf("%v", afC[index])
chg.EventPricingStrategyId, _ = strconv.Atoi(str)
case "EVENT_PRIORITY":
str = fmt.Sprintf("%v", afC[index])
if bB && str != chg.OldEventPriority {
chg.BUpdate = true
}
chg.NewEventPriority = str
case "CALL_TYPE":
str = fmt.Sprintf("%v", afC[index])
if bB && str != chg.OldCallType {
chg.BUpdate = true
}
chg.NewCallType = str
}
}
}
switch c.EventType {
case canal.InsertAction:
c.ChgPricingStrategy = append(c.ChgPricingStrategy, chg)
logger.CanalLog.Infof("insert into tb_pricing_strategy: [%#v]", chg)
case canal.DeleteAction:
c.ChgPricingStrategy = append(c.ChgPricingStrategy, chg)
logger.CanalLog.Infof("delete from tb_pricing_strategy: [%#v]", chg)
default:
if !chg.BUpdate {
} else {
c.ChgPricingStrategy = append(c.ChgPricingStrategy, chg)
logger.CanalLog.Infof("modify tb_pricing_strategy: [%#v]", chg)
}
}
}
func getConfigAreaColumns(c *RecordChange, bB bool, bA bool, beC []interface{}, afC []interface{}, e *canal.RowsEvent) {
var chg ChgConfigArea
var str string
if bB {
//beC := rowData.GetBeforeColumns()
for index, column := range e.Table.Columns {
switch column.Name {
case "area_id":
str = fmt.Sprintf("%v", beC[index])
chg.AreaId, _ = strconv.Atoi(str)
case "area_code":
str = fmt.Sprintf("%v", beC[index])
chg.OldAreaCode = str
case "state":
str = fmt.Sprintf("%v", beC[index])
chg.OldState = str
}
}
}
if bA {
//afC := rowData.GetAfterColumns()
for index, column := range e.Table.Columns {
switch column.Name {
case "area_id":
str = fmt.Sprintf("%v", afC[index])
chg.AreaId, _ = strconv.Atoi(str)
case "area_code":
str = fmt.Sprintf("%v", afC[index])
if bB && str != chg.OldAreaCode {
chg.BUpdate = true
}
chg.NewAreaCode = str
case "state":
str = fmt.Sprintf("%v", afC[index])
if bB && str != chg.OldState {
chg.BUpdate = true
}
chg.NewState = str
}
}
}
switch c.EventType {
case canal.InsertAction:
if chg.NewState != "1" {
} else {
c.ChgConfigArea = append(c.ChgConfigArea, chg)
logger.CanalLog.Infof("insert into tb_config_area: [%#v]", chg)
}
case canal.DeleteAction:
if chg.OldState != "1" {
} else {
c.ChgConfigArea = append(c.ChgConfigArea, chg)
logger.CanalLog.Infof("delete from tb_config_area: [%#v]", chg)
}
default:
if !chg.BUpdate {
} else {
if chg.OldState != "1" && chg.NewState == "1" {
c.EventType = canal.InsertAction
} else if chg.OldState == "1" && chg.NewState != "1" {
c.EventType = canal.DeleteAction
}
c.ChgConfigArea = append(c.ChgConfigArea, chg)
logger.CanalLog.Infof("modify tb_config_area: [%#v]", chg)
}
}
}
func getPricingAreaColumns(c *RecordChange, bB bool, bA bool, beC []interface{}, afC []interface{}, e *canal.RowsEvent) {
var chg ChgPricingArea
var str string
if bB {
//beC := rowData.GetBeforeColumns()
for index, column := range e.Table.Columns {
switch column.Name {
case "STRATEGY_ID":
str = fmt.Sprintf("%v", beC[index])
chg.OldStrategyId, _ = strconv.Atoi(str)
case "area_id":
str = fmt.Sprintf("%v", beC[index])
chg.OldAreaId, _ = strconv.Atoi(str)
}
}
}
if bA {
//afC := rowData.GetAfterColumns()
for index, column := range e.Table.Columns {
switch column.Name {
case "STRATEGY_ID":
str = fmt.Sprintf("%v", afC[index])
chg.NewStrategyId, _ = strconv.Atoi(str)
if bB && chg.NewStrategyId != chg.OldStrategyId {
chg.BUpdate = true
}
case "area_id":
str = fmt.Sprintf("%v", afC[index])
chg.NewAreaId, _ = strconv.Atoi(str)
if bB && chg.NewStrategyId != chg.OldAreaId {
chg.BUpdate = true
}
}
}
}
c.ChgPricingArea = append(c.ChgPricingArea, chg)
logger.CanalLog.Infof("update tb_pricing_area: [%#v], type[%s]", chg, c.EventType)
}
func getRrColumns(c *RecordChange, bB bool, bA bool, beC []interface{}, afC []interface{}, e *canal.RowsEvent) {
var chg ChgRr
//c.ChgRr = make([]ChgRr, 0, 1)
var str string
if bB {
//beC := rowData.GetBeforeColumns()
for index, column := range e.Table.Columns {
switch column.Name {
case "ID":
str = fmt.Sprintf("%v", beC[index])
chg.Id, _ = strconv.ParseInt(str, 10, 64)
case "END_TIME":
str = fmt.Sprintf("%v", beC[index])
chg.OldEndTime = str
case "BEGIN_TIME":
str = fmt.Sprintf("%v", beC[index])
chg.OldBeginTime = str
case "FREE_VALUE":
str = fmt.Sprintf("%v", beC[index])
chg.OldFreeValue, _ = strconv.ParseInt(str, 10, 64)
case "VALUE":
str = fmt.Sprintf("%v", beC[index])
chg.OldValue, _ = strconv.ParseInt(str, 10, 64)
case "PRICING_SUB_SECTION_ID":
str = fmt.Sprintf("%v", beC[index])
chg.OldPricingSubSectionId = str
case "OFR_INST_ID":
str = fmt.Sprintf("%v", beC[index])
chg.OldOfrInstId = str
case "OWNER_ID":
str = fmt.Sprintf("%v", beC[index])
chg.OldOwnId = str
}
}
}
if bA {
//afC := rowData.GetAfterColumns()
for index, column := range e.Table.Columns {
switch column.Name {
case "ID":
str = fmt.Sprintf("%v", afC[index])
chg.Id, _ = strconv.ParseInt(str, 10, 64)
case "END_TIME":
str = fmt.Sprintf("%v", afC[index])
if bB && str != chg.OldEndTime {
chg.BUpdate = true
}
chg.NewEndTime = str
case "BEGIN_TIME":
str = fmt.Sprintf("%v", afC[index])
if bB && str != chg.OldBeginTime {
chg.BUpdate = true
}
chg.NewBeginTime = str
case "FREE_VALUE":
str = fmt.Sprintf("%v", afC[index])
chg.NewFreeValue, _ = strconv.ParseInt(str, 10, 64)
if bB && chg.NewFreeValue != chg.OldFreeValue {
chg.BUpdate = true
}
case "VALUE":
str = fmt.Sprintf("%v", afC[index])
chg.NewValue, _ = strconv.ParseInt(str, 10, 64)
if bB && chg.NewValue != chg.OldValue {
chg.BUpdate = true
}
case "PRICING_SUB_SECTION_ID":
str = fmt.Sprintf("%v", afC[index])
if bB && str != chg.OldPricingSubSectionId {
chg.BUpdate = true
}
chg.NewPricingSubSectionId = str
case "OFR_INST_ID":
str = fmt.Sprintf("%v", afC[index])
if bB && str != chg.OldOfrInstId {
chg.BUpdate = true
}
chg.NewOfrInstId = str
case "OWNER_ID":
str = fmt.Sprintf("%v", afC[index])
if bB && str != chg.OldOwnId {
chg.BUpdate = true
}
chg.NewOwnId = str
}
}
}
switch c.EventType {
case canal.InsertAction:
c.ChgRr = append(c.ChgRr, chg)
logger.CanalLog.Infof("insert into rr: [%#v]", chg)
case canal.DeleteAction:
c.ChgRr = append(c.ChgRr, chg)
logger.CanalLog.Infof("delete from rr: [%#v]", chg)
default:
if !chg.BUpdate {
} else {
//if chg.NewValue >= chg.NewFreeValue {// may refund late, should not delete
// c.EventType = canal.DeleteAction
//}
if chg.OldValue >= chg.OldFreeValue && chg.NewValue < chg.NewFreeValue {
c.EventType = canal.InsertAction
}
c.ChgRr = append(c.ChgRr, chg)
logger.CanalLog.Infof("modify rr: [%#v]", chg)
}
}
}
func getHolidayColumns(c *RecordChange, bB bool, bA bool, beC []interface{}, afC []interface{}, e *canal.RowsEvent) {
var chg ChgHoliday
var str string
if bB {
//beC := rowData.GetBeforeColumns()
for index, column := range e.Table.Columns {
switch column.Name {
case "state":
str = fmt.Sprintf("%v", beC[index])
chg.OldState = str
case "tariff_id":
str = fmt.Sprintf("%v", beC[index])
chg.OldTariffId, _ = strconv.Atoi(str)
case "tariff_seq":
str = fmt.Sprintf("%v", beC[index])
chg.OldTariffSeq, _ = strconv.Atoi(str)
}
}
}
if bA {
//afC := rowData.GetAfterColumns()
for index, column := range e.Table.Columns {
switch column.Name {
case "state":
str = fmt.Sprintf("%v", afC[index])
if bB && str != chg.OldState {
chg.BUpdate = true
}
chg.NewState = str
case "tariff_id":
str = fmt.Sprintf("%v", afC[index])
chg.NewTariffId, _ = strconv.Atoi(str)
if bB && chg.NewTariffId != chg.OldTariffId {
chg.BUpdate = true
}
case "tariff_seq":
str = fmt.Sprintf("%v", afC[index])
chg.NewTariffSeq, _ = strconv.Atoi(str)
if bB && chg.NewTariffSeq != chg.OldTariffSeq {
chg.BUpdate = true
}
}
}
}
switch c.EventType {
case canal.InsertAction:
if chg.NewState != "1" {
} else {
c.ChgHoliday = append(c.ChgHoliday, chg)
logger.CanalLog.Infof("insert into tb_holiday: [%#v]", chg)
}
case canal.DeleteAction:
if chg.OldState != "1" {
} else {
c.ChgHoliday = append(c.ChgHoliday, chg)
logger.CanalLog.Infof("delete from tb_holiday: [%#v]", chg)
}
default:
if chg.OldState != "1" && chg.NewState != "1" {
} else {
if chg.OldState != "1" && chg.NewState == "1" {
c.EventType = canal.InsertAction
} else if chg.OldState == "1" && chg.NewState != "1" {
c.EventType = canal.DeleteAction
}
c.ChgHoliday = append(c.ChgHoliday, chg)
logger.CanalLog.Infof("modify tb_holiday: [%#v]", chg)
}
}
}
func getBilHolidayColumns(c *RecordChange, bB bool, bA bool, beC []interface{}, afC []interface{}, e *canal.RowsEvent) {
var chg ChgBilHoliday
var str string
if bB {
//beC := rowData.GetBeforeColumns()
for index, column := range e.Table.Columns {
switch column.Name {
case "HOLIDAY_ID":
str = fmt.Sprintf("%v", beC[index])
chg.HolidayId, _ = strconv.Atoi(str)
case "STATE":
str = fmt.Sprintf("%v", beC[index])
chg.OldState = str
}
}
}
if bA {
//afC := rowData.GetAfterColumns()
for index, column := range e.Table.Columns {
switch column.Name {
case "HOLIDAY_ID":
str = fmt.Sprintf("%v", afC[index])
chg.HolidayId, _ = strconv.Atoi(str)
case "STATE":
str = fmt.Sprintf("%v", afC[index])
if bB && str != chg.OldState {
chg.BUpdate = true
}
chg.NewState = str
}
}
}
switch c.EventType {
case canal.InsertAction:
if chg.NewState != "L0R" {
} else {
c.ChgBilHoliday = append(c.ChgBilHoliday, chg)
logger.CanalLog.Infof("insert into tb_bil_holiday: [%#v]", chg)
}
case canal.DeleteAction:
if chg.OldState != "L0R" {
} else {
c.ChgBilHoliday = append(c.ChgBilHoliday, chg)
logger.CanalLog.Infof("delete from tb_bil_holiday: [%#v]", chg)
}
default:
if chg.OldState != "L0R" && chg.NewState != "L0R" {
} else {
if chg.OldState != "L0R" && chg.NewState == "L0R" {
c.EventType = canal.InsertAction
} else if chg.OldState == "L0R" && chg.NewState != "L0R" {
c.EventType = canal.DeleteAction
}
c.ChgBilHoliday = append(c.ChgBilHoliday, chg)
logger.CanalLog.Infof("modify tb_bil_holiday: [%#v]", chg)
}
}
}
func getSmsInfoColumns(c *RecordChange, bB bool, bA bool, beC []interface{}, afC []interface{}, e *canal.RowsEvent) {
var chg ChgAlertSms
var str string
if bB {
//beC := rowData.GetBeforeColumns()
for index, column := range e.Table.Columns {
switch column.Name {
case "sms_alert_id":
str = fmt.Sprintf("%v", beC[index])
chg.AlertId, _ = strconv.Atoi(str)
case "sms_called_number":
chg.ServiceNbr = fmt.Sprintf("%v", beC[index])
case "sms_content":
chg.SmsContent = fmt.Sprintf("%v", beC[index])
case "sms_state":
str = fmt.Sprintf("%v", beC[index])
chg.OldState, _ = strconv.Atoi(str)
}
}
}
if bA {
//afC := rowData.GetAfterColumns()
for index, column := range e.Table.Columns {
switch column.Name {
case "sms_alert_id":
str = fmt.Sprintf("%v", afC[index])
chg.AlertId, _ = strconv.Atoi(str)
case "sms_called_number":
chg.ServiceNbr = fmt.Sprintf("%v", afC[index])
case "sms_content":
chg.SmsContent = fmt.Sprintf("%v", afC[index])
case "sms_state":
str = fmt.Sprintf("%v", afC[index])
chg.NewState, _ = strconv.Atoi(str)
}
}
}
switch c.EventType {
case canal.InsertAction:
if chg.NewState == 1 {
c.ChgAlertSms = append(c.ChgAlertSms, chg)
logger.CanalLog.Infof("insert into tb_sms_info: [%#v]", chg)
}
case canal.UpdateAction:
if chg.OldState != 1 && chg.NewState != 1 {
c.ChgAlertSms = append(c.ChgAlertSms, chg)
logger.CanalLog.Infof("modify tb_sms_info: [%#v]", chg)
}
}
}
func getSyncMobileColumns(c *RecordChange, bB bool, bA bool, beC []interface{}, afC []interface{}, e *canal.RowsEvent) {
var chg ChgSyncMobile
var str string
if bB {
//beC := rowData.GetBeforeColumns()
for index, column := range e.Table.Columns {
switch column.Name {
case "STATE":
str = fmt.Sprintf("%v", beC[index])
chg.OldState, _ = strconv.Atoi(str)
}
}
}
if bA {
//afC := rowData.GetAfterColumns()
for index, column := range e.Table.Columns {
switch column.Name {
case "PRE_ID":
str = fmt.Sprintf("%v", afC[index])
chg.PreId, _ = strconv.Atoi(str)
case "OPER_TYPE":
str = fmt.Sprintf("%v", afC[index])
chg.OperType, _ = strconv.Atoi(str)
case "STATE":
str = fmt.Sprintf("%v", afC[index])
chg.State, _ = strconv.Atoi(str)
case "MOBILE_TYPE":
str = fmt.Sprintf("%v", afC[index])
chg.MobileType, _ = strconv.Atoi(str)
case "SERVICE_NBR":
chg.ServiceNbr = fmt.Sprintf("%v", afC[index])
case "CUST_ID":
str = fmt.Sprintf("%v", afC[index])
chg.CustId, _ = strconv.Atoi(str)
case "ACCT_ID":
str = fmt.Sprintf("%v", afC[index])
chg.AcctId, _ = strconv.Atoi(str)
case "PRD_INST_ID":
str = fmt.Sprintf("%v", afC[index])
chg.PrdInstId, _ = strconv.Atoi(str)
case "OFR_ID":
str = fmt.Sprintf("%v", afC[index])
chg.OfrId, _ = strconv.Atoi(str)
case "BALANCE":
str = fmt.Sprintf("%v", afC[index])
chg.Balance, _ = strconv.Atoi(str)
case "BIRTH_DATE":
chg.BirthDate = fmt.Sprintf("%v", afC[index])
case "BALANCE_EXP_DATE":
chg.BalanceExpDate = fmt.Sprintf("%v", afC[index])
case "IMSI":
chg.Imsi = fmt.Sprintf("%v", afC[index])
case "KI":
chg.Ki = fmt.Sprintf("%v", afC[index])
case "OPC":
chg.Opc = fmt.Sprintf("%v", afC[index])
case "EXP_DATE":
chg.ExpDate = fmt.Sprintf("%v", afC[index])
case "VMS_FLAG":
str = fmt.Sprintf("%v", afC[index])
chg.VmsFlag, _ = strconv.Atoi(str)
case "CUG_ID":
str = fmt.Sprintf("%v", afC[index])
chg.CugId, _ = strconv.Atoi(str)
}
}
}
switch c.EventType {
case canal.InsertAction:
if chg.State == 1 && chg.OperType == 1 && chg.MobileType == 1 {
c.ChgSyncMobile = append(c.ChgSyncMobile, chg)
logger.CanalLog.Infof("insert into tb_sync_mobile: [%#v]", chg)
}
case canal.UpdateAction:
if chg.OldState != 1 && chg.State == 1 {
c.ChgSyncMobile = append(c.ChgSyncMobile, chg)
logger.CanalLog.Infof("modify tb_sync_mobile: [%#v]", chg)
}
}
}
func ParseAndFilterBinLog(e *canal.RowsEvent) *RecordChange {
var chg = RecordChange{EventType: e.Action, TableName: e.Table.Name}
var num int=0
if chg.EventType == canal.InsertAction || chg.EventType == canal.DeleteAction {
num = len(e.Rows)
} else {
num = len(e.Rows)/2
}
for i:=0; i<num; i++ {
switch chg.TableName {
case TbAcctInfo :
switch chg.EventType {
case canal.InsertAction:
getAcctInfoColumns(&chg, false, true, nil, e.Rows[i], e)
case canal.DeleteAction:
getAcctInfoColumns(&chg, true, false, e.Rows[i], nil, e)
default:
getAcctInfoColumns(&chg, true, true, e.Rows[2*i], e.Rows[2*i+1], e)
}
case TbOfrDetail:
switch chg.EventType {
case canal.InsertAction:
getOfrDetailColumns(&chg, false, true, nil, e.Rows[i], e)
case canal.DeleteAction:
getOfrDetailColumns(&chg, true, false, e.Rows[i], nil, e)
default:
getOfrDetailColumns(&chg, true, true, e.Rows[2*i], e.Rows[2*i+1], e)
}
case TbTariff:
switch chg.EventType {
case canal.InsertAction:
getTariffColumns(&chg, false, true, nil, e.Rows[i], e)
case canal.DeleteAction:
getTariffColumns(&chg, true, false, e.Rows[i], nil, e)
default:
getTariffColumns(&chg, true, true, e.Rows[2*i], e.Rows[2*i+1], e)
}
case TbOfrInfo:
switch chg.EventType {
case canal.InsertAction:
getOfrInfoColumns(&chg, false, true, nil, e.Rows[i], e)
case canal.DeleteAction:
getOfrInfoColumns(&chg, true, false, e.Rows[i], nil, e)
default:
getOfrInfoColumns(&chg, true, true, e.Rows[2*i], e.Rows[2*i+1], e)
}
case TbPricingStrategy:
switch chg.EventType {
case canal.InsertAction:
getPricingStrategyColumns(&chg, false, true, nil, e.Rows[i], e)
case canal.DeleteAction:
getPricingStrategyColumns(&chg, true, false, e.Rows[i], nil, e)
default:
getPricingStrategyColumns(&chg, true, true, e.Rows[2*i], e.Rows[2*i+1], e)
}
case TbConfigArea:
switch chg.EventType {
case canal.InsertAction:
getConfigAreaColumns(&chg, false, true, nil, e.Rows[i], e)
case canal.DeleteAction:
getConfigAreaColumns(&chg, true, false, e.Rows[i], nil, e)
default:
getConfigAreaColumns(&chg, true, true, e.Rows[2*i], e.Rows[2*i+1], e)
}
case TbPricingArea:
switch chg.EventType {
case canal.InsertAction:
getPricingAreaColumns(&chg, false, true, nil, e.Rows[i], e)
case canal.DeleteAction:
getPricingAreaColumns(&chg, true, false, e.Rows[i], nil, e)
default:
getPricingAreaColumns(&chg, true, true, e.Rows[2*i], e.Rows[2*i+1], e)
}
case TbRr:
switch chg.EventType {
case canal.InsertAction:
getRrColumns(&chg, false, true, nil, e.Rows[i], e)
case canal.DeleteAction:
getRrColumns(&chg, true, false, e.Rows[i], nil, e)
default:
getRrColumns(&chg, true, true, e.Rows[2*i], e.Rows[2*i+1], e)
}
case TbHoliday:
switch chg.EventType {
case canal.InsertAction:
getHolidayColumns(&chg, false, true, nil, e.Rows[i], e)
case canal.DeleteAction:
getHolidayColumns(&chg, true, false, e.Rows[i], nil, e)
default:
getHolidayColumns(&chg, true, true, e.Rows[2*i], e.Rows[2*i+1], e)
}
case TbBilHoliday:
switch chg.EventType {
case canal.InsertAction:
getBilHolidayColumns(&chg, false, true, nil, e.Rows[i], e)
case canal.DeleteAction:
getBilHolidayColumns(&chg, true, false, e.Rows[i], nil, e)
default:
getBilHolidayColumns(&chg, true, true, e.Rows[2*i], e.Rows[2*i+1], e)
}
case TbSmsInfo:
switch chg.EventType {
case canal.InsertAction:
getSmsInfoColumns(&chg, false, true, nil, e.Rows[i], e)
case canal.UpdateAction:
getSmsInfoColumns(&chg, true, true, e.Rows[2*i], e.Rows[2*i+1], e)
default:
}
case TbSyncMobile:
switch chg.EventType {
case canal.InsertAction:
getSyncMobileColumns(&chg, false, true, nil, e.Rows[i], e)
case canal.UpdateAction:
getSyncMobileColumns(&chg, true, true, e.Rows[2*i], e.Rows[2*i+1], e)
default:
}
default:
return nil
}
}
//logger.CanalLog.Infof("ParseAndFilterBinLog: changes[%v]", chg)
return &chg
}
/*
func ParseAndFilterChange(eventType entry.EventType, tableName string, rowData *entry.RowData) *RecordChange {
var chg = RecordChange{EventType: string(eventType), TableName: tableName}
switch tableName {
case TbAcctInfo :
switch eventType {
case entry.EventType_INSERT:
getAcctInfoColumns(&chg, false, true, rowData)
if chg.ChgAcctInfo.IfPrepaid != "1" || chg.ChgAcctInfo.NewPrdInstStasId != "1001" {
return nil
}
case entry.EventType_DELETE:
getAcctInfoColumns(&chg, true, false, rowData)
if chg.ChgAcctInfo.IfPrepaid != "1" || chg.ChgAcctInfo.OldPrdInstStasId != "1001" {
return nil
}
default:
getAcctInfoColumns(&chg, true, true, rowData)
if chg.ChgAcctInfo.IfPrepaid != "1" || !chg.ChgAcctInfo.BUpdate {
return nil
}
}
case TbOfrDetail:
switch eventType {
case entry.EventType_INSERT:
getOfrDetailColumns(&chg, false, true, rowData)
if chg.ChgOfrDetail.OfrDetailTypeId != "A1" {
return nil
}
case entry.EventType_DELETE:
getOfrDetailColumns(&chg, true, false, rowData)
if chg.ChgOfrDetail.OfrDetailTypeId != "A1" {
return nil
}
default:
getOfrDetailColumns(&chg, true, true, rowData)
if !chg.ChgOfrDetail.BUpdate || chg.ChgOfrDetail.OfrDetailTypeId != "A1" {
return nil
}
}
case TbTariff:
switch eventType {
case entry.EventType_INSERT:
getTariffColumns(&chg, false, true, rowData)
case entry.EventType_DELETE:
getTariffColumns(&chg, true, false, rowData)
default:
getTariffColumns(&chg, true, true, rowData)
}
case TbOfrInfo:
switch eventType {
case entry.EventType_INSERT:
getOfrInfoColumns(&chg, false, true, rowData)
if chg.ChgOfrInfo.IfPrepaid != "1" || chg.ChgOfrInfo.OfrTypeId != "1" || chg.ChgOfrInfo.NewOfrState != "2" {
return nil
}
case entry.EventType_DELETE:
getOfrInfoColumns(&chg, true, false, rowData)
if chg.ChgOfrInfo.IfPrepaid != "1" || chg.ChgOfrInfo.OfrTypeId != "1" || chg.ChgOfrInfo.OldOfrState != "2" {
return nil
}
default:
getOfrInfoColumns(&chg, true, true, rowData)
if chg.ChgOfrInfo.IfPrepaid != "1" || chg.ChgOfrInfo.OfrTypeId != "1" || !chg.ChgOfrInfo.BUpdate {
return nil
}
}
case TbPricingStrategy:
switch eventType {
case entry.EventType_INSERT:
getPricingStrategyColumns(&chg, false, true, rowData)
case entry.EventType_DELETE:
getPricingStrategyColumns(&chg, true, false, rowData)
default:
getPricingStrategyColumns(&chg, true, true, rowData)
if !chg.ChgPricingStrategy.BUpdate {
return nil
}
}
case TbConfigArea:
switch eventType {
case entry.EventType_INSERT:
getConfigAreaColumns(&chg, false, true, rowData)
if chg.ChgConfigArea.NewState != "1" {
return nil
}
case entry.EventType_DELETE:
getConfigAreaColumns(&chg, true, false, rowData)
if chg.ChgConfigArea.OldState != "1" {
return nil
}
default:
getConfigAreaColumns(&chg, true, true, rowData)
if !chg.ChgConfigArea.BUpdate {
return nil
}
}
case TbPricingArea:
switch eventType {
case entry.EventType_INSERT:
getPricingAreaColumns(&chg, false, true, rowData)
case entry.EventType_DELETE:
getPricingAreaColumns(&chg, true, false, rowData)
default:
getPricingAreaColumns(&chg, true, true, rowData)
}
case TbRr:
switch eventType {
case entry.EventType_INSERT:
getRrColumns(&chg, false, true, rowData)
case entry.EventType_DELETE:
getRrColumns(&chg, true, false, rowData)
default:
getRrColumns(&chg, true, true, rowData)
if !chg.ChgRr.BUpdate {
return nil
}
}
case TbHoliday:
switch eventType {
case entry.EventType_INSERT:
getHolidayColumns(&chg, false, true, rowData)
if chg.ChgHoliday.NewState != "1" {
return nil
}
case entry.EventType_DELETE:
getHolidayColumns(&chg, true, false, rowData)
if chg.ChgHoliday.OldState != "1" {
return nil
}
default:
getHolidayColumns(&chg, true, true, rowData)
if chg.ChgHoliday.OldState != "1" && chg.ChgHoliday.NewState != "1" {
return nil
}
}
case TbBilHoliday:
switch eventType {
case entry.EventType_INSERT:
getBilHolidayColumns(&chg, false, true, rowData)
if chg.ChgBilHoliday.NewState != "L0R" {
return nil
}
case entry.EventType_DELETE:
getBilHolidayColumns(&chg, true, false, rowData)
if chg.ChgBilHoliday.OldState != "L0R" {
return nil
}
default:
getBilHolidayColumns(&chg, true, true, rowData)
if chg.ChgBilHoliday.OldState != "L0R" && chg.ChgBilHoliday.NewState != "L0R" {
return nil
}
}
default:
return nil
}
return &chg
}*/