摘要:在当今数字化时代,实时数据处理对于企业的决策和运营至关重要。许多业务场景需要及时响应数据库中的数据变化,例如电商平台实时更新库存、金融系统实时监控交易数据等。
在当今数字化时代,实时数据处理对于企业的决策和运营至关重要。许多业务场景需要及时响应数据库中的数据变化,例如电商平台实时更新库存、金融系统实时监控交易数据等。
本文将详细介绍如何通过Debezium捕获数据库变更事件,并利用Server - Sent Events(SSE)将这些变更实时推送给前端应用。
++ ++ ++ | MySQL 数据库 | 监听变更 | SpringBoot 服务 | 推送变更 | Web 前端 | | (Binlog 模式) | ------> | (Debezium CDC) | ------> | (EventSource) | ++ ++ ++ Debezium 是一个开源的分布式平台,它能够监控数据库的变化,并将这些变化以事件流的形式发送出去。它支持多种数据库,如 MySQL、PostgreSQL 等,通过模拟数据库的复制协议来实现对数据库变更的实时捕获。Server - Sent Events(SSE)是一种允许网页自动获取服务器推送更新的技术。它基于 HTTP 协议,通过一个单向的连接,服务器可以持续向客户端发送事件流数据,非常适合实时数据推送的场景。-- 启用 Binlog(ROW 模式) SET GLOBAL log_bin = ON; SET GLOBAL binlog_format = 'ROW'; -- 创建 CDC 用户(需 REPLICATION 权限) CREATE USER 'cdc_user' IDENTIFIED BY 'cdc_pass'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'cdc_user';❝
配置要点:确保 Binlog 记录行级变更
io.debezium debezium-embedded 1.6.0.Final io.debezium debezium-connector-mysql 1.6.0.Final@Slf4j@Componentpublic class BinlogListener { @Autowired private SSEService sseService; @PostConStruct public void start { Configuration config = Configuration.create .with("name", "mysql-connector-1") .with("connector.class", "io.debezium.connector.mysql.MySqlConnector") .with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore") .with("offset.storage.file.filename", "D:\\usr\\debezium\\mysql-offsets.dat") .with("offset.flush.interval.ms", "10000") .with("database.server.name", "mysql-connector-1") .with("database.hostname", "localhost") .with("database.port", "3306") .with("database.user", "root") .with("database.password", "root") .with("database.server.id", "1") .with("database.include.list", "scf") .with("table.include.list", "scf.user") .with("include.schema.changes", "false") .with("snapshot.mode", "initial") .with("database.history.skip.unparseable.ddl", "true") // 忽略解析错误 .with("database.connection.attempts", "5") // 最大重试次数 .with("database.connection.backoff.ms", "10000") // 重试间隔 10s .with("database.history", "io.debezium.relational.history.FileDatabaseHistory") .with("database.history.file.filename", "D:\\usr\\debezium\\mysql-history.dat") .build; EmbeddedEngine engine = EmbeddedEngine.create .using(config) .notifying(this::handleEvent) .build; Executors.newSingleThreadExecutor.execute(engine::run); } private void handleEvent(SourceRecord record) { Struct value = (Struct) record.value; Struct after = value.getStruct("after"); // 转换为 Map 并序列化 Map dataMap = new HashMap; dataMap.put("id", after.getString("id")); dataMap.put("name", after.getString("name")); dataMap.put("age", after.getInt32("age")); sseService.broadcast(JSON.toJSONString(dataMap)); }}SSE 推送服务@Service public class SseService { private final Set emitters = ConcurrentHashMap.newKeySet; public SseEmitter subscribe { SseEmitter emitter = new SseEmitter(60_000L); emitter.onCompletion( -> emitters.remove(emitter)); emitters.add(emitter); return emitter; } public void broadcast(String data) { emitters.forEach(emitter -> { try { emitter.send(SseEmitter.event .data(data) .id(UUID.randomUUID.toString)); } catch (IOException e) { emitter.completeWithError(e); } }); } } @RestController @RequestMapping("/sse") public class SseController { @Autowired private SseService sseService; @GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public SseEmitter stream { return sseService.subscribe; } } 实时数据推送测试 const eventSource = new EventSource('/sse/stream'); eventSource.onmessage = e => { const data = JSON.parse(e.data); document.getElementById('updates').innerHTML += `用户变更: ID=${data.id}, 姓名=${data.name}
`; }; eventSource.onerror = e => console.error("SSE 错误:", e);来源:散文随风想
免责声明:本站系转载,并不代表本网赞同其观点和对其真实性负责。如涉及作品内容、版权和其它问题,请在30日内与本站联系,我们将在第一时间删除内容!