Files
selfcare/proxy/canal/client.go
2025-03-25 09:46:16 +08:00

161 lines
4.9 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package canal
import (
//"log"
"os"
"time"
"github.com/golang/protobuf/proto"
"github.com/withlin/canal-go/client"
pbe "github.com/withlin/canal-go/protocol/entry"
"proxy/logger"
)
func ConectCanalServer(ipaddr string) {
// 192.168.199.17 替换成你的canal server的地址
// example 替换成-e canal.destinations=example 你自己定义的名字
connector := client.NewSimpleCanalConnector(ipaddr, 11111, "", "", "example", 60000, 60*60*1000)
err := connector.Connect()
if err != nil {
logger.CanalLog.Errorln(err)
os.Exit(1)
}
// https://github.com/alibaba/canal/wiki/AdminGuide
//mysql 数据解析关注的表Perl正则表达式.
//
//多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\)
//
//常见例子:
//
// 1. 所有表:.* or .*\\..*
// 2. canal schema下所有表 canal\\..*
// 3. canal下的以canal打头的表canal\\.canal.*
// 4. canal schema下的一张表canal\\.test1
// 5. 多个规则组合使用canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
filter := "canal.instance.filter.regex=boss.tb_prd_prd_inst_551,boss.tb_prd_ofr_detail_inst_551,boss.tb_bil_tariff,boss.tb_prd_ofr,boss.tb_bil_evt_pricing_strategy,boss.config_area,boss.tb_bil_pricing_area,boss.ratable_history,boss.tb_bil_holiday_rel,boss.tb_bil_holiday"
//err = connector.Subscribe(".*\\..*")
err = connector.Subscribe(filter)
if err != nil {
logger.CanalLog.Errorln(err)
os.Exit(1)
}
var counter int= 0
for {
message, err := connector.Get(100, nil, nil)
if err != nil {
logger.CanalLog.Errorln(err)
os.Exit(1)
}
batchId := message.Id
if batchId == -1 || len(message.Entries) <= 0 {
time.Sleep(200 * time.Millisecond)
counter++
if counter == 300 {
logger.CanalLog.Infoln("===Idle===")
counter = 0
}
continue
}
printEntry(message.Entries)
counter = 0
}
}
/*
func updateRedisTable(oneR *RecordChange) {
var synType int
if oneR.EventType == pbe.EventType_INSERT {
synType = 1
} else if oneR.EventType == pbe.EventType_DELETE {
synType = 3
} else {
synType = 2
}
switch oneR.TableName {
case TbAcctInfo:
mdb.UpdateRedisAcctTable(synType, oneR.ChgAcctInfo.PrdInstId, oneR.ChgAcctInfo.ServiceNbr)
case TbOfrDetail:
mdb.UpdateRedisAcctTableOfr(synType, oneR.ChgOfrDetail.OfrDetailInstId)
case TbTariff:
mdb.UpdateRedisTariffTable(synType, oneR.ChgTariff.TariffId)
case TbOfrInfo:
mdb.UpdateRedisTariffTableByOfr(synType, oneR.ChgOfrInfo.OfrId)
case TbPricingStrategy:
mdb.UpdateRedisTariffTableByStrategy(synType, oneR.ChgPricingStrategy.EventPricingStrategyId)
case TbConfigArea:// update prefix table, while update config_area table
mdb.UpdateRedisPrefixTableByConfigArea(synType, oneR.ChgConfigArea.AreaId, oneR.ChgConfigArea.OldAreaCode, oneR.ChgConfigArea.NewAreaCode)
case TbPricingArea:
mdb.UpdateRedisPrefixTable(synType, oneR.ChgPricingArea.NewStrategyId, oneR.ChgPricingArea.NewAreaId)
case TbRr:
if rrId > 0 {
mdb.UpdateRedisRrTable(synType, rrId, ratableVal, usedVal, beginTime, endTime)
} else {
if ofrId > 0 {// ofr: CALC_PRIORITY
mdb.UpdateRdbRrOfrPriority(ofrId, tariffSeq)
} else if strategyId > 0 {// strategy: EVENT_PRIORITY
mdb.UpdateRdbRrStrategyPriority(strategyId, tariffSeq)
}
}
case TbHoliday:
mdb.UpdateRedisHolidayDisTableByTariffId(synType, TariffId, tariffSeq)
case TbBilHoliday:
mdb.UpdateRedisHolidayDisTableByHolidayId(synType, holidayId)
default:
}
}*/
func printEntry(entrys []pbe.Entry) {
for _, entry := range entrys {
if entry.GetEntryType() == pbe.EntryType_TRANSACTIONBEGIN || entry.GetEntryType() == pbe.EntryType_TRANSACTIONEND {
continue
}
rowChange := new(pbe.RowChange)
err := proto.Unmarshal(entry.GetStoreValue(), rowChange)
checkError(err)
if rowChange != nil {
eventType := rowChange.GetEventType()
header := entry.GetHeader()
logger.CanalLog.Infof("================> binlog[%s : %d],name[%s,%s], eventType: %s", header.GetLogfileName(), header.GetLogfileOffset(), header.GetSchemaName(), header.GetTableName(), header.GetEventType())
for _, rowData := range rowChange.GetRowDatas() {
/*oneRecordChg := ParseAndFilterChange(eventType, header.GetTableName(), rowData)
if oneRecordChg == nil {
continue
}
updateRedisTable(oneRecordChg)*/
if eventType == pbe.EventType_DELETE {
printColumn(rowData.GetBeforeColumns())
} else if eventType == pbe.EventType_INSERT {
printColumn(rowData.GetAfterColumns())
} else {
logger.CanalLog.Infoln("-------> before")
printColumn(rowData.GetBeforeColumns())
logger.CanalLog.Infoln("-------> after")
printColumn(rowData.GetAfterColumns())
}
}
}
}
}
func printColumn(columns []*pbe.Column) {
for _, col := range columns {
logger.CanalLog.Infof("%s : %s update= %t", col.GetName(), col.GetValue(), col.GetUpdated())
}
}
func checkError(err error) {
if err != nil {
logger.CanalLog.Errorf("Fatal error: %s", err.Error())
os.Exit(1)
}
}