如何基于ESB实现实时数据同步DAP-上

B站影视 2025-01-15 17:40 3

摘要:DAP数据分析平台是面向企业数据治理和数据分析的大数据应用平台,通过企业数仓建设和数据可视化分析,实现企业数据的整合归集、标准统一,构建数据资产,支撑企业业务管理和决策。在强化数据整合、管控的同时,实现数据的集成联动,通过流动的数据服务于业务,产生业务价值,实

DAP数据分析平台是面向企业数据治理和数据分析的大数据应用平台,通过企业数仓建设和数据可视化分析,实现企业数据的整合归集、标准统一,构建数据资产,支撑企业业务管理和决策。在强化数据整合、管控的同时,实现数据的集成联动,通过流动的数据服务于业务,产生业务价值,实现数据价值的最大化。

在数据集成整合的过程中,DAP平台需要从各个内外部系统数据源中采集各类数据,包括结构化数据、半结构化数据和非结构化数据等,同时为了满足不同业务和使用的需要,在采集方式上需要实现实时采集和离线采集相结合的方式。本文将分为上下两篇,上篇主要介绍Kafka部署过程,下篇将主要介绍Flink部署及整体测试过程。

DAP数据分析平台作为数据中台方案的核心产品,提供全方面的数据体系建设,实际企业数据整合归集的同时深度挖掘数据价值,通过数据支撑企业决策、推动业务运营。

1.产品方案

DAP平台主要面对企业数据治理和数据分析需求,一般会结合ESB、MDM形成数据中台或轻量级数据中台方案,轻量级数据中台主要通过数仓建设实现数据分析和数据应用,完整的数据中台会在轻量级数据中台基础上添加主数据治理的内容

DAP平台作为数据中台的核心产品,承担了数仓建设、数据分析、数据应用等核心业务需求,通过ODS、DWD、DWS、ADS构建数仓的的分层体系。同时通过ESB总线平台实现源头系统到数仓的数据采集、加工、转换、汇总,为DAP数仓提供数据支持。而MDM在实现主数据治理的同时,也为DAP的数仓提供维表支持。通过数据中台的建设实现企业数据的整合汇聚,构建数据中心,基于数据进行分析展现、透视业务、价值挖掘,有效支撑企业的业务发展

2.功能说明

DAP平台在功能上包括了数仓建设、数据分析、数据资产、数据服务、算法预测等内容。

1.数仓建设:包括ODS建设、数仓建设等内容,通过数据采集、加工、转换、汇总的过程实现从源头系统到数仓的建设;

2.数据分析:基于数仓构建数据集、立方体、指标集等分析模型,通过DAP预置的可视化组件实现可视化分析与联动穿透,从而支持企业数据的查看以及业务管理;

3.资产目录:基于数仓数据构建数据资产体系,将企业数据构建成数据资产,用于企业数据的管理、价值分析以及数据共享;

4.数据服务:根据配置的各类分析模型自动构建数据服务接口,实现对外进行数据提供和应用;

5.算法模型:通过平台预置的各类算法对数据进行训练与模拟,构建算法模型对象,从而实现数据的预测与价值挖掘,支持业务层面的数据应用;

6.质量安全:通过数据加密、脱敏策略实现对关键数据、敏感数据的加密、脱敏处理,保证数据使用过程中的数据安全性。

3.实时集成

DAP在从源头系统采集数据时,一般分为实时采集和离线采集两种方式,实时采集即实时获取源头系统变化的数据,数据实时性高、频率高,离线采集即定时采集数据,通过定时轮询的方式获取增量数据。本次主要介绍实时采集的模式,DAP从源头系统采集数据是通过ESB数据总线实现的,而ESB中预置了Flink的采集方式,本次涉及的采集流程如下图:

Kafka部署

本次主要介绍Kafka的虚拟机部署方式,如果整体产品方案采用k8s容器化部署的方式,建议直接通过UMC部署Kafka,更加方便进行Kafka的部署和管理。

1.部署过程

1.下载kafka安装包,上传

2.解压:tar -zxvf kafka_2.12-3.5.1.tgz

3.启动zookeeper:nohup ./bin/zookeeper-server-start.sh ./config/zookeeper.properties > zookeeper.log 2>&1 &

4.查看端口:netstat -nlp|grep 2181

5.启动Kafka:./bin/kafka-server-start.sh ./config/server.properties,后台启动:nohup ./bin/kafka-server-start.sh ./config/server.properties > kafka.log 2>&1 &

6.端口监听:

2.测试验证

通过Kafka自带的Topic、生产者、消费者命令创建相关对象,手动测试Kafka的数据订阅消费操作。

> > > > 创建Topic

1.打开一个新的shell窗口

2.进入kafka目录

3.创建topic:./bin/kafka-topics.sh --create --topic test001 --bootstrap-server 127.0.0.1:9092

4.查看所有的topic:./bin/kafka-topics.sh --list --bootstrap-server 127.0.0.1:9092

> > > > 启动消费者

1.打开一个新的shell窗口或者在创建topic窗口

2.进入kafka目录

3.启动消费者:./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test001 --from-beginning

> > > > 启动生产者

1.打开一个新的shell窗口或者在创建topic窗口,但不能占用消费者窗口

2.进入kafka目录

3.启动生产者:./bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test001

4.生产者输入测试内容:

5.消费者查看:

3.安全认证

通过ESB连接Kafka以及在生产环境中,Kafka的访问需要设置密码,保证Kafka的访问安全,所以通过调整Kafka的配置添加安全认证机制。

> > > > 配置调整

1.进入kafka目录

2.新建kafka_server_jaas.conf文件:vi config/kafka_server_jaas.conf

1)username、password为broker内部通信的用户名密码;

2)user_xxx其中xxx必须和username配置的用户名一致,密码也一致(这个用户就是管理员用户,不受鉴权控制)

3)user_producer和user_consumer为创建的生产者和消费者用户;

4)在实际应用时,admin、producer、consumer都可以同时作为生产者或消费者用户,并且可以用同一个用户;

3.修改kafka配置文件:vi config/server.properties

1)advertised.listeners定义了kafka broker对外宣布的监听器列表,如果有多个监听器,它们之间用逗号分隔;

2)inter.broker.listener.name 应该设置为 advertised.listeners 中定义的一个监听器名称,要求完全匹配(区分大小写);

3)如果需要外部服务器访问,可以不写ip,只写端口;

4)经过实测,如果advertised.listeners配置多个,必须配置对应多个listeners,并且保证多个listeners的端口不同,参考:

4.修改kafka启动文件:vi bin/kafka-server-start.sh

5.重启kafka:./bin/kafka-server-stop.sh & nohup ./bin/kafka-server-start.sh ./config/server.properties > kafka.log 2>&1 &

6.查看日志:tail -f kafka.log

> > > > 测试验证

1.启动消费者:

1)使用admin、producer、consumer用户均可,注意密码对应

2.启动生产者:

1)使用admin、producer、consumer用户均可,注意密码对应

3.发送测试

基于ESB数据总线进行数据采集是DAP平台实现数据采集、加工、转换、汇总的基础,而根据实际业务、需求以及数据类型的不同,采集方式也会有所区别,本次数据针对结构化数据的实时采集进行介绍,通过Flink与Kafka结合的方式实现的采集过程。本篇主要介绍了Kafka部署过程,下篇将主要介绍Flink部署及整体测试过程,敬请关注。

本文由@数通畅联原创,欢迎转发,仅供学习交流使用,引用请注明出处!谢谢~

来源:数通畅联

相关推荐