package canal import ( "github.com/go-mysql-org/go-mysql/canal" "github.com/go-mysql-org/go-mysql/mysql" "github.com/go-mysql-org/go-mysql/replication" mdb "proxy/Nmysql" rds "proxy/Nredis" "proxy/config" //"github.com/siddontang/go-log/log" //"strconv" "proxy/logger" ) type MyEventHandler struct { canal.DummyEventHandler } // 监听数据记录 func (h *MyEventHandler) OnRow(e *canal.RowsEvent) error { logger.CanalLog.Infof("OnRow: %s.%s %s %v", e.Table.Schema, e.Table.Name, e.Action, e.Rows) /*if !config.Config.CanalServer.Standalone && rds.CheckIfRdbMaster() == false { logger.CanalLog.Warnf("not stand alone, and change to Slave!") cnl.Close() return nil }*/ c := ParseAndFilterBinLog(e) updateRedisTable(c) for columnIndex, curColumn := range e.Table.Columns { logger.CanalLog.Debugf("row info: %v %v %v", curColumn.Name, columnIndex, e.Rows[len(e.Rows)-1][columnIndex]) } //cnl.Close()// if slave return nil } // 创建,更改,重命名或者删除表时触发,通常会需要清除与表相关的数据,如缓存。 It will be called before OnDDL. func (h *MyEventHandler) OnTableChanged(header *replication.EventHeader, schema string, table string) error { logger.CanalLog.Infof("OnTableChanged: %s %s", schema, table) return nil } // 监听binlog日志的变化文件与记录的位置 func (h *MyEventHandler) OnPosSynced(header *replication.EventHeader, pos mysql.Position, set mysql.GTIDSet, force bool) error { // if force == true, 立即同步位置 rds.RdbSetBinLogPos(pos.Name, pos.Pos) logger.CanalLog.Infof("OnPosSynced: Name[%v] Pos[%v], force[%t]", pos.Name, pos.Pos, force) return nil } // 当产生新的binlog日志后触发(在达到内存的使用限制后(默认为1GB),会开启另一个文件,每个新文件的名称后都会有一个增量.) func (h *MyEventHandler) OnRotate(header *replication.EventHeader, r *replication.RotateEvent) error { // record := fmt.Sprintf("On Rotate: %v \n", &mysql.Position{Name: string(r.NextLogName), Pos: uint32(r.Position)}) // binlog的记录位置,新binlog的文件名 logger.CanalLog.Infof("On Rotate: Pos[%v] NextLogName[%v] \n", r.Position, r.NextLogName) return nil } // Create alter drop truncate(删除当前表再新建一个一模一样的表结构) func (h *MyEventHandler) OnDDL(header *replication.EventHeader, nextPos mysql.Position, queryEvent *replication.QueryEvent) error { // binlog日志的变化文件与记录的位置 logger.CanalLog.Infof("OnDDL: Name[%v] Pos[%v]\n", nextPos.Name, nextPos.Pos) logger.CanalLog.Infof("%v\n %v\n %v\n %v\n %v\n", queryEvent.ExecutionTime, // 猜是执行时间,但测试显示0 string(queryEvent.Schema), // 库名 string(queryEvent.Query), // 变更的sql语句 string(queryEvent.StatusVars[:]), //测试显示乱码 queryEvent.SlaveProxyID) // 从库代理ID? return nil } func (h *MyEventHandler) String() string { return "MyEventHandler" } var cnl *canal.Canal func StartMyCanal(addr string, username string, password string) { cfg := canal.NewDefaultConfig() cfg.Addr = addr //"192.168.1.211:3306" cfg.User = username cfg.Password = password // We only care table canal_test in test db cfg.Dump.TableDB = "boss" cfg.Dump.Tables = []string{"tb_prd_ofr_detail_inst_551", "tb_bil_tariff", "tb_prd_ofr", "tb_bil_evt_pricing_strategy", "config_area", "tb_bil_pricing_area", "ratable_history", "tb_bil_holiday_rel", "tb_bil_holiday"}//, "tb_prd_prd_inst_551"} cfg.IncludeTableRegex = make([]string, 0, 1) cfg.IncludeTableRegex = append(cfg.IncludeTableRegex, "boss\\.tb_prd_prd_inst_551") cfg.IncludeTableRegex = append(cfg.IncludeTableRegex, "boss\\.tb_prd_ofr_detail_inst_551") cfg.IncludeTableRegex = append(cfg.IncludeTableRegex, "boss\\.tb_bil_tariff") cfg.IncludeTableRegex = append(cfg.IncludeTableRegex, "boss\\.tb_prd_ofr") cfg.IncludeTableRegex = append(cfg.IncludeTableRegex, "boss\\.tb_bil_evt_pricing_strategy") cfg.IncludeTableRegex = append(cfg.IncludeTableRegex, "boss\\.config_area") cfg.IncludeTableRegex = append(cfg.IncludeTableRegex, "boss\\.tb_bil_pricing_area") cfg.IncludeTableRegex = append(cfg.IncludeTableRegex, "boss\\.ratable_history") cfg.IncludeTableRegex = append(cfg.IncludeTableRegex, "boss\\.tb_bil_holiday_rel") cfg.IncludeTableRegex = append(cfg.IncludeTableRegex, "boss\\.tb_bil_holiday") if config.Config.CronCfg.Enabled && config.Config.CronCfg.NtfSms != "" { cfg.IncludeTableRegex = append(cfg.IncludeTableRegex, "boss\\.tb_sms_info") } if config.Config.Rest.Enabled { cfg.Dump.Tables = append(cfg.Dump.Tables, "tb_sync_mobile") cfg.IncludeTableRegex = append(cfg.IncludeTableRegex, "boss\\.tb_sync_mobile") } c, err := canal.NewCanal(cfg) if err != nil { logger.CanalLog.Errorf("NewCanal, err: %v", err) return } // Register a handler to handle RowsEvent c.SetEventHandler(&MyEventHandler{}) var pos *mysql.Position if config.Config.CanalServer.Reinit { pos = QueryBinLogPos(c) if pos == nil { logger.CanalLog.Errorln("fail to get binlog position.") return } logger.CanalLog.Infoln("start to sync mysql DB.") CurState = "loading" if config.Config.CanalServer.FlushBeforeInit { rds.ClrTariffAndBundle() } _ = mdb.LoadAcctTblFromMysql() _ = mdb.LoadAlertSmsFromMysql() //_ = mdb.LoadCreateAcctFromMysql() config.Config.CanalServer.Reinit = false config.SavePcfCfg() logger.CanalLog.Infoln("finish sync mysql DB...") CurState = "synchronize" } else { ret := rds.RdbGetBinLogPos() if ret == nil { logger.CanalLog.Warnln("fail to get redis binlog position, query current position from mysql.") pos = QueryBinLogPos(c) if pos == nil { logger.CanalLog.Errorln("fail to get binlog position.") return } } else { pos = &mysql.Position{Name: ret.File, Pos: ret.Position} } } // Start canal cnl = c logger.CanalLog.Infof("Go run") err = c.RunFrom(*pos) if err != nil { logger.CanalLog.Errorf("RunFrom err: %v", err) } else { logger.CanalLog.Warnf("Canal exit!!!!!!") c.Close() } /* 从头开始监听 err = c.Run() if err != nil { logger.CanalLog.Infof("Run err: %v", err) }*/ // mysql-bin.000004, 1027 // startPos := mysql.Position{Name: "mysql-bin.000004", Pos: 1027} // c.RunFrom(startPos) } func QueryBinLogPos(c *canal.Canal) *mysql.Position { var pos mysql.Position ret, err := c.Execute("SHOW MASTER STATUS;") if err != nil { logger.CanalLog.Errorf("SHOW MASTER STATUS err: %v", err) return nil } pos.Name, err = ret.Resultset.GetStringByName(0, "File") if err != nil { logger.CanalLog.Errorf("Get binlog file name err: %v", err) return nil } posTmp, err := ret.Resultset.GetUintByName(0, "Position") if err != nil { logger.CanalLog.Errorf("Get binlog position err: %v", err) return nil } pos.Pos = uint32(posTmp) logger.CanalLog.Infof("Current Position: %v", pos) rds.RdbSetBinLogPos(pos.Name, pos.Pos) return &pos }