diff --git a/agt-module-license/agt-module-license-server/pom.xml b/agt-module-license/agt-module-license-server/pom.xml index 85a965f..655094d 100644 --- a/agt-module-license/agt-module-license-server/pom.xml +++ b/agt-module-license/agt-module-license-server/pom.xml @@ -126,6 +126,19 @@ tika-core + + com.alibaba.otter + canal.client + 1.1.8 + + + + com.alibaba.otter + canal.protocol + 1.1.8 + + + diff --git a/agt-module-license/agt-module-license-server/src/main/java/org/agt/module/license/framework/canal/CanalRunner.java b/agt-module-license/agt-module-license-server/src/main/java/org/agt/module/license/framework/canal/CanalRunner.java new file mode 100644 index 0000000..58fe50b --- /dev/null +++ b/agt-module-license/agt-module-license-server/src/main/java/org/agt/module/license/framework/canal/CanalRunner.java @@ -0,0 +1,17 @@ +package org.agt.module.license.framework.canal; + +import lombok.RequiredArgsConstructor; +import org.springframework.boot.CommandLineRunner; +import org.springframework.stereotype.Component; + +@Component +@RequiredArgsConstructor +public class CanalRunner implements CommandLineRunner { + + private final CanalService canalService; + + @Override + public void run(String... args) { + canalService.startCanalListener(); + } +} diff --git a/agt-module-license/agt-module-license-server/src/main/java/org/agt/module/license/framework/canal/CanalService.java b/agt-module-license/agt-module-license-server/src/main/java/org/agt/module/license/framework/canal/CanalService.java new file mode 100644 index 0000000..37758c1 --- /dev/null +++ b/agt-module-license/agt-module-license-server/src/main/java/org/agt/module/license/framework/canal/CanalService.java @@ -0,0 +1,106 @@ +package org.agt.module.license.framework.canal; + +import com.alibaba.otter.canal.client.CanalConnector; +import com.alibaba.otter.canal.client.CanalConnectors; +import com.alibaba.otter.canal.protocol.CanalEntry; +import com.alibaba.otter.canal.protocol.Message; +import jakarta.annotation.Resource; +import org.agt.module.license.service.license.LicenseServiceImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.scheduling.annotation.Async; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.stereotype.Service; + +import java.net.InetSocketAddress; +import java.util.List; + +@EnableAsync +@Service +public class CanalService { + + private static final Logger log = LoggerFactory.getLogger(CanalService.class); + + @Resource + private LicenseServiceImpl licenseService; + + @Async + public void startCanalListener() { + // 创建连接 + CanalConnector connector = CanalConnectors.newSingleConnector( + new InetSocketAddress("127.0.0.1", 11111), + "example", // Canal instance 名称 + "", ""); // 用户名/密码(可为空) + + try { + connector.connect(); + connector.subscribe("agt-cloud\\.crm_license"); // 订阅库(正则方式) + connector.rollback(); + + System.out.println("✅ Canal client 启动成功,开始监听数据库变化..."); + + while (true) { + // 拉取数据,每次最多 100 条 + Message message = connector.getWithoutAck(100); + long batchId = message.getId(); + + if (batchId == -1 || message.getEntries().isEmpty()) { + Thread.sleep(1000); + } else { + handleEntry(message.getEntries()); + } + connector.ack(batchId); // 提交确认 + } + + } catch (Exception e) { + log.error("canal启动失败:{}", e.getMessage(), e); + } finally { + connector.disconnect(); + } + } + + private void handleEntry(List entries) throws Exception { + for (CanalEntry.Entry entry : entries) { + if (entry.getEntryType() != CanalEntry.EntryType.ROWDATA) { + continue; + } + + CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); + CanalEntry.EventType eventType = rowChange.getEventType(); + + String tableName = entry.getHeader().getTableName(); + System.out.printf("📢 [监听到] 数据库表:%s,事件类型:%s%n", tableName, eventType); + + for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { + switch (eventType) { + case INSERT: + break; + case UPDATE: + printColumns("BEFORE UPDATE", rowData.getBeforeColumnsList()); + printColumns("AFTER UPDATE", rowData.getAfterColumnsList()); + // 更新后的业务逻辑 + if ("crm_license".equals(tableName)) { + rowData.getAfterColumnsList().forEach(column -> { + if ("state".equals(column.getName()) && "3".equals(column.getValue()) && column.getUpdated()) { + licenseService.genLicenseTask(); + } + }); + } + break; + case DELETE: + break; + default: + break; + } + } + } + } + + private void printColumns(String label, List columns) { + System.out.println("==== " + label + " ===="); + for (CanalEntry.Column column : columns) { + System.out.printf("%s = %s (updated=%s)%n", + column.getName(), column.getValue(), column.getUpdated()); + } + } +} diff --git a/agt-module-license/agt-module-license-server/src/main/java/org/agt/module/license/service/license/LicenseTask.java b/agt-module-license/agt-module-license-server/src/main/java/org/agt/module/license/service/license/LicenseTask.java index b6ef293..248800c 100644 --- a/agt-module-license/agt-module-license-server/src/main/java/org/agt/module/license/service/license/LicenseTask.java +++ b/agt-module-license/agt-module-license-server/src/main/java/org/agt/module/license/service/license/LicenseTask.java @@ -79,7 +79,7 @@ public class LicenseTask { @Scheduled(cron = "*/5 * * * * ?") public void task() { - licenseService.genLicenseTask(); +// licenseService.genLicenseTask(); } @Scheduled(cron = "0 0 0 * * ?") diff --git a/agt-server/src/main/resources/application-local.yaml b/agt-server/src/main/resources/application-local.yaml index 744c31d..555db95 100644 --- a/agt-server/src/main/resources/application-local.yaml +++ b/agt-server/src/main/resources/application-local.yaml @@ -258,4 +258,10 @@ spring: urls: ldap://192.168.88.205 base: dc=agrandtech,dc=com username: uid=root,cn=users,dc=agrandtech,dc=com - password: Tian7989! \ No newline at end of file + password: Tian7989! + +canal: + server: 127.0.0.1:11111 + destination: example # 对应 canal server 配置的 instance 名称 + username: "" + password: ""