1103 lines
31 KiB
Go
1103 lines
31 KiB
Go
package canal
|
|
|
|
import (
|
|
"fmt"
|
|
"github.com/go-mysql-org/go-mysql/canal"
|
|
"proxy/logger"
|
|
. "proxy/MsgDef"
|
|
"strconv"
|
|
)
|
|
|
|
func getAcctInfoColumns(c *RecordChange, bB bool, bA bool, beC []interface{}, afC []interface{}, e *canal.RowsEvent) {
|
|
var chg ChgAcctInfo
|
|
var str string
|
|
if bB {
|
|
//beC := rowData.GetBeforeColumns()
|
|
for index, column := range e.Table.Columns {
|
|
switch column.Name {
|
|
case "PRD_INST_STAS_ID":
|
|
str = fmt.Sprintf("%v", beC[index])
|
|
chg.OldPrdInstStasId = str
|
|
case "PRD_INST_ID":
|
|
str = fmt.Sprintf("%v", beC[index])
|
|
chg.PrdInstId, _ = strconv.Atoi(str)
|
|
case "SERVICE_NBR":
|
|
chg.ServiceNbr = fmt.Sprintf("%v", beC[index])
|
|
case "IF_PREPAY":
|
|
chg.IfPrepaid = fmt.Sprintf("%v", beC[index])
|
|
}
|
|
}
|
|
}
|
|
if bA {
|
|
//afC := rowData.GetAfterColumns()
|
|
for index, column := range e.Table.Columns {
|
|
switch column.Name {
|
|
case "PRD_INST_STAS_ID":
|
|
str = fmt.Sprintf("%v", afC[index])
|
|
if bB && str != chg.OldPrdInstStasId {
|
|
chg.BUpdate = true
|
|
}
|
|
chg.NewPrdInstStasId = str
|
|
case "PRD_INST_ID":
|
|
str = fmt.Sprintf("%v", afC[index])
|
|
chg.PrdInstId, _ = strconv.Atoi(str)
|
|
case "SERVICE_NBR":
|
|
chg.ServiceNbr = fmt.Sprintf("%v", afC[index])
|
|
case "IF_PREPAY":
|
|
chg.IfPrepaid = fmt.Sprintf("%v", afC[index])
|
|
}
|
|
}
|
|
}
|
|
switch c.EventType {
|
|
case canal.InsertAction:
|
|
if chg.IfPrepaid != "1" || chg.NewPrdInstStasId != "1001" {
|
|
|
|
} else {
|
|
c.ChgAcctInfo = append(c.ChgAcctInfo, chg)
|
|
logger.CanalLog.Infof("insert into prd_prd_inst_551: [%#v]", chg)
|
|
}
|
|
case canal.DeleteAction:
|
|
if chg.IfPrepaid != "1" || chg.OldPrdInstStasId != "1001" {
|
|
|
|
} else {
|
|
c.ChgAcctInfo = append(c.ChgAcctInfo, chg)
|
|
logger.CanalLog.Infof("delete from prd_prd_inst_551: [%#v]", chg)
|
|
}
|
|
default:
|
|
if chg.IfPrepaid != "1" || !chg.BUpdate {
|
|
|
|
} else {
|
|
if chg.OldPrdInstStasId == "1001" && chg.NewPrdInstStasId != "1001" {
|
|
c.EventType = canal.DeleteAction
|
|
} else if chg.OldPrdInstStasId != "1001" && chg.NewPrdInstStasId == "1001" {
|
|
c.EventType = canal.InsertAction
|
|
}
|
|
c.ChgAcctInfo = append(c.ChgAcctInfo, chg)
|
|
logger.CanalLog.Infof("modify prd_prd_inst_551: [%#v]", chg)
|
|
}
|
|
}
|
|
}
|
|
|
|
func getOfrDetailColumns(c *RecordChange, bB bool, bA bool, beC []interface{}, afC []interface{}, e *canal.RowsEvent) {
|
|
var chg ChgOfrDetail
|
|
var str string
|
|
if bB {
|
|
//beC := rowData.GetBeforeColumns()
|
|
for index, column := range e.Table.Columns {
|
|
switch column.Name {
|
|
case "OFR_DETAIL_TYPE_ID":
|
|
str = fmt.Sprintf("%v", beC[index])
|
|
chg.OfrDetailTypeId = str
|
|
case "OFR_DETAIL_INST_ID":
|
|
str = fmt.Sprintf("%v", beC[index])
|
|
chg.OfrDetailInstId, _ = strconv.Atoi(str)
|
|
case "EFF_DATE":
|
|
str = fmt.Sprintf("%v", beC[index])
|
|
chg.OldEffDate = str
|
|
case "EXP_DATE":
|
|
str = fmt.Sprintf("%v", beC[index])
|
|
chg.OldExpDate = str
|
|
/*case "CALC_PRIORITY":
|
|
chg.ChgOfrDetail.OldCalcPriority = column.Value*/
|
|
}
|
|
}
|
|
}
|
|
if bA {
|
|
//afC := rowData.GetAfterColumns()
|
|
for index, column := range e.Table.Columns {
|
|
switch column.Name {
|
|
case "OFR_DETAIL_TYPE_ID":
|
|
str = fmt.Sprintf("%v", afC[index])
|
|
chg.OfrDetailTypeId = str
|
|
case "OFR_DETAIL_INST_ID":
|
|
str = fmt.Sprintf("%v", afC[index])
|
|
chg.OfrDetailInstId, _ = strconv.Atoi(str)
|
|
case "EFF_DATE":
|
|
str = fmt.Sprintf("%v", afC[index])
|
|
if bB && str != chg.OldEffDate {
|
|
chg.BUpdate = true
|
|
}
|
|
chg.NewEffDate = str
|
|
case "EXP_DATE":
|
|
str = fmt.Sprintf("%v", afC[index])
|
|
if bB && str != chg.OldExpDate {
|
|
chg.BUpdate = true
|
|
}
|
|
chg.NewExpDate = str
|
|
/*case "CALC_PRIORITY":
|
|
if column.Updated {
|
|
chg.ChgOfrDetail.BUpdate = true
|
|
}
|
|
chg.ChgOfrDetail.NewCalcPriority = column.Value*/
|
|
}
|
|
}
|
|
}
|
|
switch c.EventType {
|
|
case canal.InsertAction:
|
|
if chg.OfrDetailTypeId != "A1" {
|
|
|
|
} else {
|
|
c.ChgOfrDetail = append(c.ChgOfrDetail, chg)
|
|
logger.CanalLog.Infof("insert into prd_ofr_detail_inst_551: [%#v]", chg)
|
|
}
|
|
case canal.DeleteAction:
|
|
if chg.OfrDetailTypeId != "A1" {
|
|
|
|
} else {
|
|
c.ChgOfrDetail = append(c.ChgOfrDetail, chg)
|
|
logger.CanalLog.Infof("delete from prd_ofr_detail_inst_551: [%#v]", chg)
|
|
}
|
|
default:
|
|
if !chg.BUpdate || chg.OfrDetailTypeId != "A1" {
|
|
|
|
} else {
|
|
c.ChgOfrDetail = append(c.ChgOfrDetail, chg)
|
|
logger.CanalLog.Infof("modify prd_ofr_detail_inst_551: [%#v]", chg)
|
|
}
|
|
}
|
|
//c.ChgOfrDetail = append(c.ChgOfrDetail, chg)
|
|
}
|
|
|
|
func getTariffColumns(c *RecordChange, bB bool, bA bool, beC []interface{}, afC []interface{}, e *canal.RowsEvent) {
|
|
var chg ChgTariff
|
|
var str string
|
|
if bB {
|
|
//beC := rowData.GetBeforeColumns()
|
|
for index, column := range e.Table.Columns {
|
|
switch column.Name {
|
|
case "TARIFF_ID":
|
|
str = fmt.Sprintf("%v", beC[index])
|
|
chg.TariffId, _ = strconv.Atoi(str)
|
|
}
|
|
}
|
|
}
|
|
if bA {
|
|
//afC := rowData.GetAfterColumns()
|
|
for index, column := range e.Table.Columns {
|
|
switch column.Name {
|
|
case "TARIFF_ID":
|
|
str = fmt.Sprintf("%v", afC[index])
|
|
chg.TariffId, _ = strconv.Atoi(str)
|
|
}
|
|
}
|
|
}
|
|
c.ChgTariff = append(c.ChgTariff, chg)
|
|
logger.CanalLog.Infof("update tb_tariff: [%#v], type[%s]", chg, c.EventType)
|
|
}
|
|
|
|
func getOfrInfoColumns(c *RecordChange, bB bool, bA bool, beC []interface{}, afC []interface{}, e *canal.RowsEvent) {
|
|
var chg ChgOfrInfo
|
|
var str string
|
|
if bB {
|
|
//beC := rowData.GetBeforeColumns()
|
|
for index, column := range e.Table.Columns {
|
|
switch column.Name {
|
|
case "OFR_TYPE_ID":
|
|
str = fmt.Sprintf("%v", beC[index])
|
|
chg.OfrTypeId = str
|
|
case "OFR_ID":
|
|
str = fmt.Sprintf("%v", beC[index])
|
|
chg.OfrId, _ = strconv.Atoi(str)
|
|
case "IF_PREPAY":
|
|
str = fmt.Sprintf("%v", beC[index])
|
|
chg.IfPrepaid = str
|
|
case "OFR_STATE":
|
|
str = fmt.Sprintf("%v", beC[index])
|
|
chg.OldOfrState = str
|
|
case "CALC_PRIORITY":
|
|
str = fmt.Sprintf("%v", beC[index])
|
|
chg.OldCalcPriority = str
|
|
}
|
|
}
|
|
}
|
|
if bA {
|
|
//afC := rowData.GetAfterColumns()
|
|
for index, column := range e.Table.Columns {
|
|
switch column.Name {
|
|
case "OFR_TYPE_ID":
|
|
str = fmt.Sprintf("%v", afC[index])
|
|
chg.OfrTypeId = str
|
|
case "OFR_ID":
|
|
str = fmt.Sprintf("%v", afC[index])
|
|
chg.OfrId, _ = strconv.Atoi(str)
|
|
case "IF_PREPAY":
|
|
str = fmt.Sprintf("%v", afC[index])
|
|
chg.IfPrepaid = str
|
|
case "OFR_STATE":
|
|
str = fmt.Sprintf("%v", afC[index])
|
|
if bB && str != chg.OldOfrState {
|
|
chg.BUpdate = true
|
|
}
|
|
chg.NewOfrState = str
|
|
case "CALC_PRIORITY":
|
|
str = fmt.Sprintf("%v", afC[index])
|
|
if bB && str != chg.OldCalcPriority {
|
|
chg.BUpdate = true
|
|
}
|
|
chg.NewCalcPriority = str
|
|
}
|
|
}
|
|
}
|
|
switch c.EventType {
|
|
case canal.InsertAction:
|
|
if chg.IfPrepaid != "1" || chg.OfrTypeId != "1" || chg.NewOfrState != "2" {
|
|
|
|
} else {
|
|
c.ChgOfrInfo = append(c.ChgOfrInfo, chg)
|
|
logger.CanalLog.Infof("insert into tb_ofr: [%#v]", chg)
|
|
}
|
|
case canal.DeleteAction:
|
|
if chg.IfPrepaid != "1" || chg.OfrTypeId != "1" || chg.OldOfrState != "2" {
|
|
|
|
} else {
|
|
c.ChgOfrInfo = append(c.ChgOfrInfo, chg)
|
|
logger.CanalLog.Infof("delete from tb_ofr: [%#v]", chg)
|
|
}
|
|
default:
|
|
if chg.IfPrepaid != "1" || chg.OfrTypeId != "1" || !chg.BUpdate {
|
|
|
|
} else {
|
|
if chg.OldOfrState == "2" && chg.NewOfrState != "2" {
|
|
c.EventType = canal.DeleteAction
|
|
} else if chg.OldOfrState != "2" && chg.NewOfrState == "2" {
|
|
c.EventType = canal.InsertAction
|
|
}
|
|
c.ChgOfrInfo = append(c.ChgOfrInfo, chg)
|
|
logger.CanalLog.Infof("modify tb_ofr: [%#v]", chg)
|
|
}
|
|
}
|
|
}
|
|
|
|
func getPricingStrategyColumns(c *RecordChange, bB bool, bA bool, beC []interface{}, afC []interface{}, e *canal.RowsEvent) {
|
|
var chg ChgPricingStrategy
|
|
var str string
|
|
if bB {
|
|
//beC := rowData.GetBeforeColumns()
|
|
for index, column := range e.Table.Columns {
|
|
switch column.Name {
|
|
case "EVENT_PRICING_STRATEGY_ID":
|
|
str = fmt.Sprintf("%v", beC[index])
|
|
chg.EventPricingStrategyId, _ = strconv.Atoi(str)
|
|
case "EVENT_PRIORITY":
|
|
str = fmt.Sprintf("%v", beC[index])
|
|
chg.OldEventPriority = str
|
|
case "CALL_TYPE":
|
|
str = fmt.Sprintf("%v", beC[index])
|
|
chg.OldCallType = str
|
|
}
|
|
}
|
|
}
|
|
if bA {
|
|
//afC := rowData.GetAfterColumns()
|
|
for index, column := range e.Table.Columns {
|
|
switch column.Name {
|
|
case "EVENT_PRICING_STRATEGY_ID":
|
|
str = fmt.Sprintf("%v", afC[index])
|
|
chg.EventPricingStrategyId, _ = strconv.Atoi(str)
|
|
case "EVENT_PRIORITY":
|
|
str = fmt.Sprintf("%v", afC[index])
|
|
if bB && str != chg.OldEventPriority {
|
|
chg.BUpdate = true
|
|
}
|
|
chg.NewEventPriority = str
|
|
case "CALL_TYPE":
|
|
str = fmt.Sprintf("%v", afC[index])
|
|
if bB && str != chg.OldCallType {
|
|
chg.BUpdate = true
|
|
}
|
|
chg.NewCallType = str
|
|
}
|
|
}
|
|
}
|
|
switch c.EventType {
|
|
case canal.InsertAction:
|
|
c.ChgPricingStrategy = append(c.ChgPricingStrategy, chg)
|
|
logger.CanalLog.Infof("insert into tb_pricing_strategy: [%#v]", chg)
|
|
case canal.DeleteAction:
|
|
c.ChgPricingStrategy = append(c.ChgPricingStrategy, chg)
|
|
logger.CanalLog.Infof("delete from tb_pricing_strategy: [%#v]", chg)
|
|
default:
|
|
if !chg.BUpdate {
|
|
|
|
} else {
|
|
c.ChgPricingStrategy = append(c.ChgPricingStrategy, chg)
|
|
logger.CanalLog.Infof("modify tb_pricing_strategy: [%#v]", chg)
|
|
}
|
|
}
|
|
}
|
|
|
|
func getConfigAreaColumns(c *RecordChange, bB bool, bA bool, beC []interface{}, afC []interface{}, e *canal.RowsEvent) {
|
|
var chg ChgConfigArea
|
|
var str string
|
|
if bB {
|
|
//beC := rowData.GetBeforeColumns()
|
|
for index, column := range e.Table.Columns {
|
|
switch column.Name {
|
|
case "area_id":
|
|
str = fmt.Sprintf("%v", beC[index])
|
|
chg.AreaId, _ = strconv.Atoi(str)
|
|
case "area_code":
|
|
str = fmt.Sprintf("%v", beC[index])
|
|
chg.OldAreaCode = str
|
|
case "state":
|
|
str = fmt.Sprintf("%v", beC[index])
|
|
chg.OldState = str
|
|
}
|
|
}
|
|
}
|
|
if bA {
|
|
//afC := rowData.GetAfterColumns()
|
|
for index, column := range e.Table.Columns {
|
|
switch column.Name {
|
|
case "area_id":
|
|
str = fmt.Sprintf("%v", afC[index])
|
|
chg.AreaId, _ = strconv.Atoi(str)
|
|
case "area_code":
|
|
str = fmt.Sprintf("%v", afC[index])
|
|
if bB && str != chg.OldAreaCode {
|
|
chg.BUpdate = true
|
|
}
|
|
chg.NewAreaCode = str
|
|
case "state":
|
|
str = fmt.Sprintf("%v", afC[index])
|
|
if bB && str != chg.OldState {
|
|
chg.BUpdate = true
|
|
}
|
|
chg.NewState = str
|
|
}
|
|
}
|
|
}
|
|
switch c.EventType {
|
|
case canal.InsertAction:
|
|
if chg.NewState != "1" {
|
|
|
|
} else {
|
|
c.ChgConfigArea = append(c.ChgConfigArea, chg)
|
|
logger.CanalLog.Infof("insert into tb_config_area: [%#v]", chg)
|
|
}
|
|
case canal.DeleteAction:
|
|
if chg.OldState != "1" {
|
|
|
|
} else {
|
|
c.ChgConfigArea = append(c.ChgConfigArea, chg)
|
|
logger.CanalLog.Infof("delete from tb_config_area: [%#v]", chg)
|
|
}
|
|
default:
|
|
if !chg.BUpdate {
|
|
|
|
} else {
|
|
if chg.OldState != "1" && chg.NewState == "1" {
|
|
c.EventType = canal.InsertAction
|
|
} else if chg.OldState == "1" && chg.NewState != "1" {
|
|
c.EventType = canal.DeleteAction
|
|
}
|
|
c.ChgConfigArea = append(c.ChgConfigArea, chg)
|
|
logger.CanalLog.Infof("modify tb_config_area: [%#v]", chg)
|
|
}
|
|
}
|
|
}
|
|
|
|
func getPricingAreaColumns(c *RecordChange, bB bool, bA bool, beC []interface{}, afC []interface{}, e *canal.RowsEvent) {
|
|
var chg ChgPricingArea
|
|
var str string
|
|
if bB {
|
|
//beC := rowData.GetBeforeColumns()
|
|
for index, column := range e.Table.Columns {
|
|
switch column.Name {
|
|
case "STRATEGY_ID":
|
|
str = fmt.Sprintf("%v", beC[index])
|
|
chg.OldStrategyId, _ = strconv.Atoi(str)
|
|
case "area_id":
|
|
str = fmt.Sprintf("%v", beC[index])
|
|
chg.OldAreaId, _ = strconv.Atoi(str)
|
|
}
|
|
}
|
|
}
|
|
if bA {
|
|
//afC := rowData.GetAfterColumns()
|
|
for index, column := range e.Table.Columns {
|
|
switch column.Name {
|
|
case "STRATEGY_ID":
|
|
str = fmt.Sprintf("%v", afC[index])
|
|
chg.NewStrategyId, _ = strconv.Atoi(str)
|
|
if bB && chg.NewStrategyId != chg.OldStrategyId {
|
|
chg.BUpdate = true
|
|
}
|
|
case "area_id":
|
|
str = fmt.Sprintf("%v", afC[index])
|
|
chg.NewAreaId, _ = strconv.Atoi(str)
|
|
if bB && chg.NewStrategyId != chg.OldAreaId {
|
|
chg.BUpdate = true
|
|
}
|
|
}
|
|
}
|
|
}
|
|
c.ChgPricingArea = append(c.ChgPricingArea, chg)
|
|
logger.CanalLog.Infof("update tb_pricing_area: [%#v], type[%s]", chg, c.EventType)
|
|
}
|
|
|
|
func getRrColumns(c *RecordChange, bB bool, bA bool, beC []interface{}, afC []interface{}, e *canal.RowsEvent) {
|
|
var chg ChgRr
|
|
//c.ChgRr = make([]ChgRr, 0, 1)
|
|
var str string
|
|
if bB {
|
|
//beC := rowData.GetBeforeColumns()
|
|
for index, column := range e.Table.Columns {
|
|
switch column.Name {
|
|
case "ID":
|
|
str = fmt.Sprintf("%v", beC[index])
|
|
chg.Id, _ = strconv.ParseInt(str, 10, 64)
|
|
case "END_TIME":
|
|
str = fmt.Sprintf("%v", beC[index])
|
|
chg.OldEndTime = str
|
|
case "BEGIN_TIME":
|
|
str = fmt.Sprintf("%v", beC[index])
|
|
chg.OldBeginTime = str
|
|
case "FREE_VALUE":
|
|
str = fmt.Sprintf("%v", beC[index])
|
|
chg.OldFreeValue, _ = strconv.ParseInt(str, 10, 64)
|
|
case "VALUE":
|
|
str = fmt.Sprintf("%v", beC[index])
|
|
chg.OldValue, _ = strconv.ParseInt(str, 10, 64)
|
|
case "PRICING_SUB_SECTION_ID":
|
|
str = fmt.Sprintf("%v", beC[index])
|
|
chg.OldPricingSubSectionId = str
|
|
case "OFR_INST_ID":
|
|
str = fmt.Sprintf("%v", beC[index])
|
|
chg.OldOfrInstId = str
|
|
case "OWNER_ID":
|
|
str = fmt.Sprintf("%v", beC[index])
|
|
chg.OldOwnId = str
|
|
}
|
|
}
|
|
}
|
|
if bA {
|
|
//afC := rowData.GetAfterColumns()
|
|
for index, column := range e.Table.Columns {
|
|
switch column.Name {
|
|
case "ID":
|
|
str = fmt.Sprintf("%v", afC[index])
|
|
chg.Id, _ = strconv.ParseInt(str, 10, 64)
|
|
case "END_TIME":
|
|
str = fmt.Sprintf("%v", afC[index])
|
|
if bB && str != chg.OldEndTime {
|
|
chg.BUpdate = true
|
|
}
|
|
chg.NewEndTime = str
|
|
case "BEGIN_TIME":
|
|
str = fmt.Sprintf("%v", afC[index])
|
|
if bB && str != chg.OldBeginTime {
|
|
chg.BUpdate = true
|
|
}
|
|
chg.NewBeginTime = str
|
|
case "FREE_VALUE":
|
|
str = fmt.Sprintf("%v", afC[index])
|
|
chg.NewFreeValue, _ = strconv.ParseInt(str, 10, 64)
|
|
if bB && chg.NewFreeValue != chg.OldFreeValue {
|
|
chg.BUpdate = true
|
|
}
|
|
case "VALUE":
|
|
str = fmt.Sprintf("%v", afC[index])
|
|
chg.NewValue, _ = strconv.ParseInt(str, 10, 64)
|
|
if bB && chg.NewValue != chg.OldValue {
|
|
chg.BUpdate = true
|
|
}
|
|
case "PRICING_SUB_SECTION_ID":
|
|
str = fmt.Sprintf("%v", afC[index])
|
|
if bB && str != chg.OldPricingSubSectionId {
|
|
chg.BUpdate = true
|
|
}
|
|
chg.NewPricingSubSectionId = str
|
|
case "OFR_INST_ID":
|
|
str = fmt.Sprintf("%v", afC[index])
|
|
if bB && str != chg.OldOfrInstId {
|
|
chg.BUpdate = true
|
|
}
|
|
chg.NewOfrInstId = str
|
|
case "OWNER_ID":
|
|
str = fmt.Sprintf("%v", afC[index])
|
|
if bB && str != chg.OldOwnId {
|
|
chg.BUpdate = true
|
|
}
|
|
chg.NewOwnId = str
|
|
}
|
|
}
|
|
}
|
|
switch c.EventType {
|
|
case canal.InsertAction:
|
|
c.ChgRr = append(c.ChgRr, chg)
|
|
logger.CanalLog.Infof("insert into rr: [%#v]", chg)
|
|
case canal.DeleteAction:
|
|
c.ChgRr = append(c.ChgRr, chg)
|
|
logger.CanalLog.Infof("delete from rr: [%#v]", chg)
|
|
default:
|
|
if !chg.BUpdate {
|
|
|
|
} else {
|
|
//if chg.NewValue >= chg.NewFreeValue {// may refund late, should not delete
|
|
// c.EventType = canal.DeleteAction
|
|
//}
|
|
if chg.OldValue >= chg.OldFreeValue && chg.NewValue < chg.NewFreeValue {
|
|
c.EventType = canal.InsertAction
|
|
}
|
|
c.ChgRr = append(c.ChgRr, chg)
|
|
logger.CanalLog.Infof("modify rr: [%#v]", chg)
|
|
}
|
|
}
|
|
}
|
|
|
|
func getHolidayColumns(c *RecordChange, bB bool, bA bool, beC []interface{}, afC []interface{}, e *canal.RowsEvent) {
|
|
var chg ChgHoliday
|
|
var str string
|
|
if bB {
|
|
//beC := rowData.GetBeforeColumns()
|
|
for index, column := range e.Table.Columns {
|
|
switch column.Name {
|
|
case "state":
|
|
str = fmt.Sprintf("%v", beC[index])
|
|
chg.OldState = str
|
|
case "tariff_id":
|
|
str = fmt.Sprintf("%v", beC[index])
|
|
chg.OldTariffId, _ = strconv.Atoi(str)
|
|
case "tariff_seq":
|
|
str = fmt.Sprintf("%v", beC[index])
|
|
chg.OldTariffSeq, _ = strconv.Atoi(str)
|
|
}
|
|
}
|
|
}
|
|
if bA {
|
|
//afC := rowData.GetAfterColumns()
|
|
for index, column := range e.Table.Columns {
|
|
switch column.Name {
|
|
case "state":
|
|
str = fmt.Sprintf("%v", afC[index])
|
|
if bB && str != chg.OldState {
|
|
chg.BUpdate = true
|
|
}
|
|
chg.NewState = str
|
|
case "tariff_id":
|
|
str = fmt.Sprintf("%v", afC[index])
|
|
chg.NewTariffId, _ = strconv.Atoi(str)
|
|
if bB && chg.NewTariffId != chg.OldTariffId {
|
|
chg.BUpdate = true
|
|
}
|
|
case "tariff_seq":
|
|
str = fmt.Sprintf("%v", afC[index])
|
|
chg.NewTariffSeq, _ = strconv.Atoi(str)
|
|
if bB && chg.NewTariffSeq != chg.OldTariffSeq {
|
|
chg.BUpdate = true
|
|
}
|
|
}
|
|
}
|
|
}
|
|
switch c.EventType {
|
|
case canal.InsertAction:
|
|
if chg.NewState != "1" {
|
|
|
|
} else {
|
|
c.ChgHoliday = append(c.ChgHoliday, chg)
|
|
logger.CanalLog.Infof("insert into tb_holiday: [%#v]", chg)
|
|
}
|
|
case canal.DeleteAction:
|
|
if chg.OldState != "1" {
|
|
|
|
} else {
|
|
c.ChgHoliday = append(c.ChgHoliday, chg)
|
|
logger.CanalLog.Infof("delete from tb_holiday: [%#v]", chg)
|
|
}
|
|
default:
|
|
if chg.OldState != "1" && chg.NewState != "1" {
|
|
|
|
} else {
|
|
if chg.OldState != "1" && chg.NewState == "1" {
|
|
c.EventType = canal.InsertAction
|
|
} else if chg.OldState == "1" && chg.NewState != "1" {
|
|
c.EventType = canal.DeleteAction
|
|
}
|
|
c.ChgHoliday = append(c.ChgHoliday, chg)
|
|
logger.CanalLog.Infof("modify tb_holiday: [%#v]", chg)
|
|
}
|
|
}
|
|
}
|
|
|
|
func getBilHolidayColumns(c *RecordChange, bB bool, bA bool, beC []interface{}, afC []interface{}, e *canal.RowsEvent) {
|
|
var chg ChgBilHoliday
|
|
var str string
|
|
if bB {
|
|
//beC := rowData.GetBeforeColumns()
|
|
for index, column := range e.Table.Columns {
|
|
switch column.Name {
|
|
case "HOLIDAY_ID":
|
|
str = fmt.Sprintf("%v", beC[index])
|
|
chg.HolidayId, _ = strconv.Atoi(str)
|
|
case "STATE":
|
|
str = fmt.Sprintf("%v", beC[index])
|
|
chg.OldState = str
|
|
}
|
|
}
|
|
}
|
|
if bA {
|
|
//afC := rowData.GetAfterColumns()
|
|
for index, column := range e.Table.Columns {
|
|
switch column.Name {
|
|
case "HOLIDAY_ID":
|
|
str = fmt.Sprintf("%v", afC[index])
|
|
chg.HolidayId, _ = strconv.Atoi(str)
|
|
case "STATE":
|
|
str = fmt.Sprintf("%v", afC[index])
|
|
if bB && str != chg.OldState {
|
|
chg.BUpdate = true
|
|
}
|
|
chg.NewState = str
|
|
}
|
|
}
|
|
}
|
|
switch c.EventType {
|
|
case canal.InsertAction:
|
|
if chg.NewState != "L0R" {
|
|
|
|
} else {
|
|
c.ChgBilHoliday = append(c.ChgBilHoliday, chg)
|
|
logger.CanalLog.Infof("insert into tb_bil_holiday: [%#v]", chg)
|
|
}
|
|
case canal.DeleteAction:
|
|
if chg.OldState != "L0R" {
|
|
|
|
} else {
|
|
c.ChgBilHoliday = append(c.ChgBilHoliday, chg)
|
|
logger.CanalLog.Infof("delete from tb_bil_holiday: [%#v]", chg)
|
|
}
|
|
default:
|
|
if chg.OldState != "L0R" && chg.NewState != "L0R" {
|
|
|
|
} else {
|
|
if chg.OldState != "L0R" && chg.NewState == "L0R" {
|
|
c.EventType = canal.InsertAction
|
|
} else if chg.OldState == "L0R" && chg.NewState != "L0R" {
|
|
c.EventType = canal.DeleteAction
|
|
}
|
|
c.ChgBilHoliday = append(c.ChgBilHoliday, chg)
|
|
logger.CanalLog.Infof("modify tb_bil_holiday: [%#v]", chg)
|
|
}
|
|
}
|
|
}
|
|
|
|
func getSmsInfoColumns(c *RecordChange, bB bool, bA bool, beC []interface{}, afC []interface{}, e *canal.RowsEvent) {
|
|
var chg ChgAlertSms
|
|
var str string
|
|
if bB {
|
|
//beC := rowData.GetBeforeColumns()
|
|
for index, column := range e.Table.Columns {
|
|
switch column.Name {
|
|
case "sms_alert_id":
|
|
str = fmt.Sprintf("%v", beC[index])
|
|
chg.AlertId, _ = strconv.Atoi(str)
|
|
case "sms_called_number":
|
|
chg.ServiceNbr = fmt.Sprintf("%v", beC[index])
|
|
case "sms_content":
|
|
chg.SmsContent = fmt.Sprintf("%v", beC[index])
|
|
case "sms_state":
|
|
str = fmt.Sprintf("%v", beC[index])
|
|
chg.OldState, _ = strconv.Atoi(str)
|
|
}
|
|
}
|
|
}
|
|
if bA {
|
|
//afC := rowData.GetAfterColumns()
|
|
for index, column := range e.Table.Columns {
|
|
switch column.Name {
|
|
case "sms_alert_id":
|
|
str = fmt.Sprintf("%v", afC[index])
|
|
chg.AlertId, _ = strconv.Atoi(str)
|
|
case "sms_called_number":
|
|
chg.ServiceNbr = fmt.Sprintf("%v", afC[index])
|
|
case "sms_content":
|
|
chg.SmsContent = fmt.Sprintf("%v", afC[index])
|
|
case "sms_state":
|
|
str = fmt.Sprintf("%v", afC[index])
|
|
chg.NewState, _ = strconv.Atoi(str)
|
|
}
|
|
}
|
|
}
|
|
switch c.EventType {
|
|
case canal.InsertAction:
|
|
if chg.NewState == 1 {
|
|
c.ChgAlertSms = append(c.ChgAlertSms, chg)
|
|
logger.CanalLog.Infof("insert into tb_sms_info: [%#v]", chg)
|
|
}
|
|
case canal.UpdateAction:
|
|
if chg.OldState != 1 && chg.NewState != 1 {
|
|
c.ChgAlertSms = append(c.ChgAlertSms, chg)
|
|
logger.CanalLog.Infof("modify tb_sms_info: [%#v]", chg)
|
|
}
|
|
}
|
|
}
|
|
|
|
func getSyncMobileColumns(c *RecordChange, bB bool, bA bool, beC []interface{}, afC []interface{}, e *canal.RowsEvent) {
|
|
var chg ChgSyncMobile
|
|
var str string
|
|
if bB {
|
|
//beC := rowData.GetBeforeColumns()
|
|
for index, column := range e.Table.Columns {
|
|
switch column.Name {
|
|
case "STATE":
|
|
str = fmt.Sprintf("%v", beC[index])
|
|
chg.OldState, _ = strconv.Atoi(str)
|
|
}
|
|
}
|
|
}
|
|
if bA {
|
|
//afC := rowData.GetAfterColumns()
|
|
for index, column := range e.Table.Columns {
|
|
switch column.Name {
|
|
case "PRE_ID":
|
|
str = fmt.Sprintf("%v", afC[index])
|
|
chg.PreId, _ = strconv.Atoi(str)
|
|
case "OPER_TYPE":
|
|
str = fmt.Sprintf("%v", afC[index])
|
|
chg.OperType, _ = strconv.Atoi(str)
|
|
case "STATE":
|
|
str = fmt.Sprintf("%v", afC[index])
|
|
chg.State, _ = strconv.Atoi(str)
|
|
case "MOBILE_TYPE":
|
|
str = fmt.Sprintf("%v", afC[index])
|
|
chg.MobileType, _ = strconv.Atoi(str)
|
|
case "SERVICE_NBR":
|
|
chg.ServiceNbr = fmt.Sprintf("%v", afC[index])
|
|
case "CUST_ID":
|
|
str = fmt.Sprintf("%v", afC[index])
|
|
chg.CustId, _ = strconv.Atoi(str)
|
|
case "ACCT_ID":
|
|
str = fmt.Sprintf("%v", afC[index])
|
|
chg.AcctId, _ = strconv.Atoi(str)
|
|
case "PRD_INST_ID":
|
|
str = fmt.Sprintf("%v", afC[index])
|
|
chg.PrdInstId, _ = strconv.Atoi(str)
|
|
case "OFR_ID":
|
|
str = fmt.Sprintf("%v", afC[index])
|
|
chg.OfrId, _ = strconv.Atoi(str)
|
|
case "BALANCE":
|
|
str = fmt.Sprintf("%v", afC[index])
|
|
chg.Balance, _ = strconv.Atoi(str)
|
|
case "BIRTH_DATE":
|
|
chg.BirthDate = fmt.Sprintf("%v", afC[index])
|
|
case "BALANCE_EXP_DATE":
|
|
chg.BalanceExpDate = fmt.Sprintf("%v", afC[index])
|
|
case "IMSI":
|
|
chg.Imsi = fmt.Sprintf("%v", afC[index])
|
|
case "KI":
|
|
chg.Ki = fmt.Sprintf("%v", afC[index])
|
|
case "OPC":
|
|
chg.Opc = fmt.Sprintf("%v", afC[index])
|
|
case "EXP_DATE":
|
|
chg.ExpDate = fmt.Sprintf("%v", afC[index])
|
|
case "VMS_FLAG":
|
|
str = fmt.Sprintf("%v", afC[index])
|
|
chg.VmsFlag, _ = strconv.Atoi(str)
|
|
case "CUG_ID":
|
|
str = fmt.Sprintf("%v", afC[index])
|
|
chg.CugId, _ = strconv.Atoi(str)
|
|
}
|
|
}
|
|
}
|
|
switch c.EventType {
|
|
case canal.InsertAction:
|
|
if chg.State == 1 && chg.OperType == 1 && chg.MobileType == 1 {
|
|
c.ChgSyncMobile = append(c.ChgSyncMobile, chg)
|
|
logger.CanalLog.Infof("insert into tb_sync_mobile: [%#v]", chg)
|
|
}
|
|
case canal.UpdateAction:
|
|
if chg.OldState != 1 && chg.State == 1 {
|
|
c.ChgSyncMobile = append(c.ChgSyncMobile, chg)
|
|
logger.CanalLog.Infof("modify tb_sync_mobile: [%#v]", chg)
|
|
}
|
|
}
|
|
}
|
|
|
|
func ParseAndFilterBinLog(e *canal.RowsEvent) *RecordChange {
|
|
var chg = RecordChange{EventType: e.Action, TableName: e.Table.Name}
|
|
var num int=0
|
|
if chg.EventType == canal.InsertAction || chg.EventType == canal.DeleteAction {
|
|
num = len(e.Rows)
|
|
} else {
|
|
num = len(e.Rows)/2
|
|
}
|
|
for i:=0; i<num; i++ {
|
|
switch chg.TableName {
|
|
case TbAcctInfo :
|
|
switch chg.EventType {
|
|
case canal.InsertAction:
|
|
getAcctInfoColumns(&chg, false, true, nil, e.Rows[i], e)
|
|
case canal.DeleteAction:
|
|
getAcctInfoColumns(&chg, true, false, e.Rows[i], nil, e)
|
|
default:
|
|
getAcctInfoColumns(&chg, true, true, e.Rows[2*i], e.Rows[2*i+1], e)
|
|
}
|
|
case TbOfrDetail:
|
|
switch chg.EventType {
|
|
case canal.InsertAction:
|
|
getOfrDetailColumns(&chg, false, true, nil, e.Rows[i], e)
|
|
case canal.DeleteAction:
|
|
getOfrDetailColumns(&chg, true, false, e.Rows[i], nil, e)
|
|
default:
|
|
getOfrDetailColumns(&chg, true, true, e.Rows[2*i], e.Rows[2*i+1], e)
|
|
}
|
|
case TbTariff:
|
|
switch chg.EventType {
|
|
case canal.InsertAction:
|
|
getTariffColumns(&chg, false, true, nil, e.Rows[i], e)
|
|
case canal.DeleteAction:
|
|
getTariffColumns(&chg, true, false, e.Rows[i], nil, e)
|
|
default:
|
|
getTariffColumns(&chg, true, true, e.Rows[2*i], e.Rows[2*i+1], e)
|
|
}
|
|
case TbOfrInfo:
|
|
switch chg.EventType {
|
|
case canal.InsertAction:
|
|
getOfrInfoColumns(&chg, false, true, nil, e.Rows[i], e)
|
|
case canal.DeleteAction:
|
|
getOfrInfoColumns(&chg, true, false, e.Rows[i], nil, e)
|
|
default:
|
|
getOfrInfoColumns(&chg, true, true, e.Rows[2*i], e.Rows[2*i+1], e)
|
|
}
|
|
case TbPricingStrategy:
|
|
switch chg.EventType {
|
|
case canal.InsertAction:
|
|
getPricingStrategyColumns(&chg, false, true, nil, e.Rows[i], e)
|
|
case canal.DeleteAction:
|
|
getPricingStrategyColumns(&chg, true, false, e.Rows[i], nil, e)
|
|
default:
|
|
getPricingStrategyColumns(&chg, true, true, e.Rows[2*i], e.Rows[2*i+1], e)
|
|
}
|
|
case TbConfigArea:
|
|
switch chg.EventType {
|
|
case canal.InsertAction:
|
|
getConfigAreaColumns(&chg, false, true, nil, e.Rows[i], e)
|
|
case canal.DeleteAction:
|
|
getConfigAreaColumns(&chg, true, false, e.Rows[i], nil, e)
|
|
default:
|
|
getConfigAreaColumns(&chg, true, true, e.Rows[2*i], e.Rows[2*i+1], e)
|
|
}
|
|
case TbPricingArea:
|
|
switch chg.EventType {
|
|
case canal.InsertAction:
|
|
getPricingAreaColumns(&chg, false, true, nil, e.Rows[i], e)
|
|
case canal.DeleteAction:
|
|
getPricingAreaColumns(&chg, true, false, e.Rows[i], nil, e)
|
|
default:
|
|
getPricingAreaColumns(&chg, true, true, e.Rows[2*i], e.Rows[2*i+1], e)
|
|
}
|
|
case TbRr:
|
|
switch chg.EventType {
|
|
case canal.InsertAction:
|
|
getRrColumns(&chg, false, true, nil, e.Rows[i], e)
|
|
case canal.DeleteAction:
|
|
getRrColumns(&chg, true, false, e.Rows[i], nil, e)
|
|
default:
|
|
getRrColumns(&chg, true, true, e.Rows[2*i], e.Rows[2*i+1], e)
|
|
}
|
|
case TbHoliday:
|
|
switch chg.EventType {
|
|
case canal.InsertAction:
|
|
getHolidayColumns(&chg, false, true, nil, e.Rows[i], e)
|
|
case canal.DeleteAction:
|
|
getHolidayColumns(&chg, true, false, e.Rows[i], nil, e)
|
|
default:
|
|
getHolidayColumns(&chg, true, true, e.Rows[2*i], e.Rows[2*i+1], e)
|
|
}
|
|
case TbBilHoliday:
|
|
switch chg.EventType {
|
|
case canal.InsertAction:
|
|
getBilHolidayColumns(&chg, false, true, nil, e.Rows[i], e)
|
|
case canal.DeleteAction:
|
|
getBilHolidayColumns(&chg, true, false, e.Rows[i], nil, e)
|
|
default:
|
|
getBilHolidayColumns(&chg, true, true, e.Rows[2*i], e.Rows[2*i+1], e)
|
|
}
|
|
case TbSmsInfo:
|
|
switch chg.EventType {
|
|
case canal.InsertAction:
|
|
getSmsInfoColumns(&chg, false, true, nil, e.Rows[i], e)
|
|
case canal.UpdateAction:
|
|
getSmsInfoColumns(&chg, true, true, e.Rows[2*i], e.Rows[2*i+1], e)
|
|
default:
|
|
|
|
}
|
|
case TbSyncMobile:
|
|
switch chg.EventType {
|
|
case canal.InsertAction:
|
|
getSyncMobileColumns(&chg, false, true, nil, e.Rows[i], e)
|
|
case canal.UpdateAction:
|
|
getSyncMobileColumns(&chg, true, true, e.Rows[2*i], e.Rows[2*i+1], e)
|
|
default:
|
|
|
|
}
|
|
default:
|
|
return nil
|
|
}
|
|
}
|
|
//logger.CanalLog.Infof("ParseAndFilterBinLog: changes[%v]", chg)
|
|
|
|
return &chg
|
|
}
|
|
|
|
/*
|
|
func ParseAndFilterChange(eventType entry.EventType, tableName string, rowData *entry.RowData) *RecordChange {
|
|
var chg = RecordChange{EventType: string(eventType), TableName: tableName}
|
|
switch tableName {
|
|
case TbAcctInfo :
|
|
switch eventType {
|
|
case entry.EventType_INSERT:
|
|
getAcctInfoColumns(&chg, false, true, rowData)
|
|
if chg.ChgAcctInfo.IfPrepaid != "1" || chg.ChgAcctInfo.NewPrdInstStasId != "1001" {
|
|
return nil
|
|
}
|
|
case entry.EventType_DELETE:
|
|
getAcctInfoColumns(&chg, true, false, rowData)
|
|
if chg.ChgAcctInfo.IfPrepaid != "1" || chg.ChgAcctInfo.OldPrdInstStasId != "1001" {
|
|
return nil
|
|
}
|
|
default:
|
|
getAcctInfoColumns(&chg, true, true, rowData)
|
|
if chg.ChgAcctInfo.IfPrepaid != "1" || !chg.ChgAcctInfo.BUpdate {
|
|
return nil
|
|
}
|
|
}
|
|
case TbOfrDetail:
|
|
switch eventType {
|
|
case entry.EventType_INSERT:
|
|
getOfrDetailColumns(&chg, false, true, rowData)
|
|
if chg.ChgOfrDetail.OfrDetailTypeId != "A1" {
|
|
return nil
|
|
}
|
|
case entry.EventType_DELETE:
|
|
getOfrDetailColumns(&chg, true, false, rowData)
|
|
if chg.ChgOfrDetail.OfrDetailTypeId != "A1" {
|
|
return nil
|
|
}
|
|
default:
|
|
getOfrDetailColumns(&chg, true, true, rowData)
|
|
if !chg.ChgOfrDetail.BUpdate || chg.ChgOfrDetail.OfrDetailTypeId != "A1" {
|
|
return nil
|
|
}
|
|
}
|
|
case TbTariff:
|
|
switch eventType {
|
|
case entry.EventType_INSERT:
|
|
getTariffColumns(&chg, false, true, rowData)
|
|
case entry.EventType_DELETE:
|
|
getTariffColumns(&chg, true, false, rowData)
|
|
default:
|
|
getTariffColumns(&chg, true, true, rowData)
|
|
}
|
|
case TbOfrInfo:
|
|
switch eventType {
|
|
case entry.EventType_INSERT:
|
|
getOfrInfoColumns(&chg, false, true, rowData)
|
|
if chg.ChgOfrInfo.IfPrepaid != "1" || chg.ChgOfrInfo.OfrTypeId != "1" || chg.ChgOfrInfo.NewOfrState != "2" {
|
|
return nil
|
|
}
|
|
case entry.EventType_DELETE:
|
|
getOfrInfoColumns(&chg, true, false, rowData)
|
|
if chg.ChgOfrInfo.IfPrepaid != "1" || chg.ChgOfrInfo.OfrTypeId != "1" || chg.ChgOfrInfo.OldOfrState != "2" {
|
|
return nil
|
|
}
|
|
default:
|
|
getOfrInfoColumns(&chg, true, true, rowData)
|
|
if chg.ChgOfrInfo.IfPrepaid != "1" || chg.ChgOfrInfo.OfrTypeId != "1" || !chg.ChgOfrInfo.BUpdate {
|
|
return nil
|
|
}
|
|
}
|
|
case TbPricingStrategy:
|
|
switch eventType {
|
|
case entry.EventType_INSERT:
|
|
getPricingStrategyColumns(&chg, false, true, rowData)
|
|
case entry.EventType_DELETE:
|
|
getPricingStrategyColumns(&chg, true, false, rowData)
|
|
default:
|
|
getPricingStrategyColumns(&chg, true, true, rowData)
|
|
if !chg.ChgPricingStrategy.BUpdate {
|
|
return nil
|
|
}
|
|
}
|
|
case TbConfigArea:
|
|
switch eventType {
|
|
case entry.EventType_INSERT:
|
|
getConfigAreaColumns(&chg, false, true, rowData)
|
|
if chg.ChgConfigArea.NewState != "1" {
|
|
return nil
|
|
}
|
|
case entry.EventType_DELETE:
|
|
getConfigAreaColumns(&chg, true, false, rowData)
|
|
if chg.ChgConfigArea.OldState != "1" {
|
|
return nil
|
|
}
|
|
default:
|
|
getConfigAreaColumns(&chg, true, true, rowData)
|
|
if !chg.ChgConfigArea.BUpdate {
|
|
return nil
|
|
}
|
|
}
|
|
case TbPricingArea:
|
|
switch eventType {
|
|
case entry.EventType_INSERT:
|
|
getPricingAreaColumns(&chg, false, true, rowData)
|
|
case entry.EventType_DELETE:
|
|
getPricingAreaColumns(&chg, true, false, rowData)
|
|
default:
|
|
getPricingAreaColumns(&chg, true, true, rowData)
|
|
}
|
|
case TbRr:
|
|
switch eventType {
|
|
case entry.EventType_INSERT:
|
|
getRrColumns(&chg, false, true, rowData)
|
|
case entry.EventType_DELETE:
|
|
getRrColumns(&chg, true, false, rowData)
|
|
default:
|
|
getRrColumns(&chg, true, true, rowData)
|
|
if !chg.ChgRr.BUpdate {
|
|
return nil
|
|
}
|
|
}
|
|
case TbHoliday:
|
|
switch eventType {
|
|
case entry.EventType_INSERT:
|
|
getHolidayColumns(&chg, false, true, rowData)
|
|
if chg.ChgHoliday.NewState != "1" {
|
|
return nil
|
|
}
|
|
case entry.EventType_DELETE:
|
|
getHolidayColumns(&chg, true, false, rowData)
|
|
if chg.ChgHoliday.OldState != "1" {
|
|
return nil
|
|
}
|
|
default:
|
|
getHolidayColumns(&chg, true, true, rowData)
|
|
if chg.ChgHoliday.OldState != "1" && chg.ChgHoliday.NewState != "1" {
|
|
return nil
|
|
}
|
|
}
|
|
case TbBilHoliday:
|
|
switch eventType {
|
|
case entry.EventType_INSERT:
|
|
getBilHolidayColumns(&chg, false, true, rowData)
|
|
if chg.ChgBilHoliday.NewState != "L0R" {
|
|
return nil
|
|
}
|
|
case entry.EventType_DELETE:
|
|
getBilHolidayColumns(&chg, true, false, rowData)
|
|
if chg.ChgBilHoliday.OldState != "L0R" {
|
|
return nil
|
|
}
|
|
default:
|
|
getBilHolidayColumns(&chg, true, true, rowData)
|
|
if chg.ChgBilHoliday.OldState != "L0R" && chg.ChgBilHoliday.NewState != "L0R" {
|
|
return nil
|
|
}
|
|
}
|
|
default:
|
|
return nil
|
|
}
|
|
return &chg
|
|
}*/
|