198 lines
6.7 KiB
Go
198 lines
6.7 KiB
Go
package canal
|
||
|
||
import (
|
||
"github.com/go-mysql-org/go-mysql/canal"
|
||
"github.com/go-mysql-org/go-mysql/mysql"
|
||
"github.com/go-mysql-org/go-mysql/replication"
|
||
mdb "proxy/Nmysql"
|
||
rds "proxy/Nredis"
|
||
"proxy/config"
|
||
|
||
//"github.com/siddontang/go-log/log"
|
||
//"strconv"
|
||
|
||
"proxy/logger"
|
||
)
|
||
|
||
type MyEventHandler struct {
|
||
canal.DummyEventHandler
|
||
}
|
||
|
||
// 监听数据记录
|
||
func (h *MyEventHandler) OnRow(e *canal.RowsEvent) error {
|
||
|
||
logger.CanalLog.Infof("OnRow: %s.%s %s %v", e.Table.Schema, e.Table.Name, e.Action, e.Rows)
|
||
|
||
/*if !config.Config.CanalServer.Standalone && rds.CheckIfRdbMaster() == false {
|
||
logger.CanalLog.Warnf("not stand alone, and change to Slave!")
|
||
cnl.Close()
|
||
return nil
|
||
}*/
|
||
c := ParseAndFilterBinLog(e)
|
||
updateRedisTable(c)
|
||
for columnIndex, curColumn := range e.Table.Columns {
|
||
logger.CanalLog.Debugf("row info: %v %v %v", curColumn.Name, columnIndex, e.Rows[len(e.Rows)-1][columnIndex])
|
||
}
|
||
//cnl.Close()// if slave
|
||
return nil
|
||
}
|
||
|
||
// 创建,更改,重命名或者删除表时触发,通常会需要清除与表相关的数据,如缓存。 It will be called before OnDDL.
|
||
func (h *MyEventHandler) OnTableChanged(schema string, table string) error {
|
||
//
|
||
logger.CanalLog.Infof("OnTableChanged: %s %s", schema, table)
|
||
return nil
|
||
}
|
||
|
||
// 监听binlog日志的变化文件与记录的位置
|
||
func (h *MyEventHandler) OnPosSynced(pos mysql.Position, set mysql.GTIDSet, force bool) error {// update and save
|
||
// if force == true, 立即同步位置
|
||
rds.RdbSetBinLogPos(pos.Name, pos.Pos)
|
||
logger.CanalLog.Infof("OnPosSynced: Name[%v] Pos[%v], force[%t]", pos.Name, pos.Pos, force)
|
||
return nil
|
||
}
|
||
|
||
// 当产生新的binlog日志后触发(在达到内存的使用限制后(默认为1GB),会开启另一个文件,每个新文件的名称后都会有一个增量.)
|
||
func (h *MyEventHandler) OnRotate(r *replication.RotateEvent) error {
|
||
// record := fmt.Sprintf("On Rotate: %v \n", &mysql.Position{Name: string(r.NextLogName), Pos: uint32(r.Position)})
|
||
// binlog的记录位置,新binlog的文件名
|
||
logger.CanalLog.Infof("On Rotate: Pos[%v] NextLogName[%v] \n", r.Position, r.NextLogName)
|
||
return nil
|
||
}
|
||
|
||
// Create alter drop truncate(删除当前表再新建一个一模一样的表结构)
|
||
func (h *MyEventHandler) OnDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent) error {
|
||
// binlog日志的变化文件与记录的位置
|
||
logger.CanalLog.Infof("OnDDL: Name[%v] Pos[%v]\n", nextPos.Name, nextPos.Pos)
|
||
logger.CanalLog.Infof("%v\n %v\n %v\n %v\n %v\n",
|
||
queryEvent.ExecutionTime,// 猜是执行时间,但测试显示0
|
||
string(queryEvent.Schema),// 库名
|
||
string(queryEvent.Query),// 变更的sql语句
|
||
string(queryEvent.StatusVars[:]),//测试显示乱码
|
||
queryEvent.SlaveProxyID)// 从库代理ID?
|
||
return nil
|
||
}
|
||
|
||
func (h *MyEventHandler) String() string {
|
||
return "MyEventHandler"
|
||
}
|
||
|
||
var cnl *canal.Canal
|
||
func StartMyCanal(addr string, username string, password string) {
|
||
cfg := canal.NewDefaultConfig()
|
||
cfg.Addr = addr//"192.168.1.211:3306"
|
||
cfg.User = username
|
||
cfg.Password = password
|
||
// We only care table canal_test in test db
|
||
cfg.Dump.TableDB = "boss"
|
||
cfg.Dump.Tables = []string{"tb_prd_ofr_detail_inst_551",
|
||
"tb_bil_tariff", "tb_prd_ofr", "tb_bil_evt_pricing_strategy", "config_area",
|
||
"tb_bil_pricing_area", "ratable_history", "tb_bil_holiday_rel", "tb_bil_holiday"}//, "tb_prd_prd_inst_551"}
|
||
cfg.IncludeTableRegex = make([]string, 0, 1)
|
||
cfg.IncludeTableRegex = append(cfg.IncludeTableRegex, "boss\\.tb_prd_prd_inst_551")
|
||
cfg.IncludeTableRegex = append(cfg.IncludeTableRegex, "boss\\.tb_prd_ofr_detail_inst_551")
|
||
cfg.IncludeTableRegex = append(cfg.IncludeTableRegex, "boss\\.tb_bil_tariff")
|
||
cfg.IncludeTableRegex = append(cfg.IncludeTableRegex, "boss\\.tb_prd_ofr")
|
||
cfg.IncludeTableRegex = append(cfg.IncludeTableRegex, "boss\\.tb_bil_evt_pricing_strategy")
|
||
cfg.IncludeTableRegex = append(cfg.IncludeTableRegex, "boss\\.config_area")
|
||
cfg.IncludeTableRegex = append(cfg.IncludeTableRegex, "boss\\.tb_bil_pricing_area")
|
||
cfg.IncludeTableRegex = append(cfg.IncludeTableRegex, "boss\\.ratable_history")
|
||
cfg.IncludeTableRegex = append(cfg.IncludeTableRegex, "boss\\.tb_bil_holiday_rel")
|
||
cfg.IncludeTableRegex = append(cfg.IncludeTableRegex, "boss\\.tb_bil_holiday")
|
||
if config.Config.CronCfg.Enabled && config.Config.CronCfg.NtfSms != "" {
|
||
|
||
cfg.IncludeTableRegex = append(cfg.IncludeTableRegex, "boss\\.tb_sms_info")
|
||
}
|
||
if config.Config.Rest.Enabled {
|
||
cfg.Dump.Tables = append(cfg.Dump.Tables, "tb_sync_mobile")
|
||
cfg.IncludeTableRegex = append(cfg.IncludeTableRegex, "boss\\.tb_sync_mobile")
|
||
}
|
||
|
||
c, err := canal.NewCanal(cfg)
|
||
if err != nil {
|
||
logger.CanalLog.Errorf("NewCanal, err: %v", err)
|
||
return
|
||
}
|
||
|
||
// Register a handler to handle RowsEvent
|
||
c.SetEventHandler(&MyEventHandler{})
|
||
|
||
var pos *mysql.Position
|
||
if config.Config.CanalServer.Reinit {
|
||
pos = QueryBinLogPos(c)
|
||
if pos == nil {
|
||
logger.CanalLog.Errorln("fail to get binlog position.")
|
||
return
|
||
}
|
||
|
||
logger.CanalLog.Infoln("start to sync mysql DB.")
|
||
CurState = "loading"
|
||
if config.Config.CanalServer.FlushBeforeInit {
|
||
rds.ClrTariffAndBundle()
|
||
}
|
||
_ = mdb.LoadAcctTblFromMysql()
|
||
_ = mdb.LoadAlertSmsFromMysql()
|
||
//_ = mdb.LoadCreateAcctFromMysql()
|
||
config.Config.CanalServer.Reinit = false
|
||
config.SavePcfCfg()
|
||
logger.CanalLog.Infoln("finish sync mysql DB...")
|
||
CurState = "synchronize"
|
||
} else {
|
||
ret := rds.RdbGetBinLogPos()
|
||
if ret == nil {
|
||
logger.CanalLog.Warnln("fail to get redis binlog position, query current position from mysql.")
|
||
pos = QueryBinLogPos(c)
|
||
if pos == nil {
|
||
logger.CanalLog.Errorln("fail to get binlog position.")
|
||
return
|
||
}
|
||
} else {
|
||
pos = &mysql.Position{Name: ret.File, Pos: ret.Position}
|
||
}
|
||
}
|
||
|
||
// Start canal
|
||
cnl = c
|
||
logger.CanalLog.Infof("Go run")
|
||
err = c.RunFrom(*pos)
|
||
if err != nil {
|
||
logger.CanalLog.Errorf("RunFrom err: %v", err)
|
||
} else {
|
||
logger.CanalLog.Warnf("Canal exit!!!!!!")
|
||
c.Close()
|
||
}
|
||
|
||
/* 从头开始监听
|
||
err = c.Run()
|
||
if err != nil {
|
||
logger.CanalLog.Infof("Run err: %v", err)
|
||
}*/
|
||
|
||
// mysql-bin.000004, 1027
|
||
// startPos := mysql.Position{Name: "mysql-bin.000004", Pos: 1027}
|
||
// c.RunFrom(startPos)
|
||
}
|
||
|
||
func QueryBinLogPos(c *canal.Canal) *mysql.Position {
|
||
var pos mysql.Position
|
||
ret, err := c.Execute("SHOW MASTER STATUS;")
|
||
if err != nil {
|
||
logger.CanalLog.Errorf("SHOW MASTER STATUS err: %v", err)
|
||
return nil
|
||
}
|
||
pos.Name, err = ret.Resultset.GetStringByName(0, "File")
|
||
if err != nil {
|
||
logger.CanalLog.Errorf("Get binlog file name err: %v", err)
|
||
return nil
|
||
}
|
||
posTmp, err := ret.Resultset.GetUintByName(0, "Position")
|
||
if err != nil {
|
||
logger.CanalLog.Errorf("Get binlog position err: %v", err)
|
||
return nil
|
||
}
|
||
pos.Pos = uint32(posTmp)
|
||
logger.CanalLog.Infof("Current Position: %v", pos)
|
||
rds.RdbSetBinLogPos(pos.Name, pos.Pos)
|
||
return &pos
|
||
}
|