实时流计算系统设计与实现:数据存储-离线分析!

B站影视 2024-12-25 16:06 1

摘要:离线数据处理和分析是实时流系统中非常重要的一部分。在Lambda架构中,我们就已经看到了批处理系统对实时系统的辅助作用。而离线数据处理和分析虽然并不会直接影响实时流系统的执行,但是离线系统对实时系统也有着很多的辅助作用。这些作用包括:

离线数据处理和分析是实时流系统中非常重要的一部分。在Lambda架构中,我们就已经看到了批处理系统对实时系统的辅助作用。而离线数据处理和分析虽然并不会直接影响实时流系统的执行,但是离线系统对实时系统也有着很多的辅助作用。这些作用包括:

·数据存储和ETL处理;

·离线数据分析和模型训练;

·离线报表统计。

这些离线任务都有一个共同的模式,即数据需要存储下来,然后在这些数据基础上做各种数据处理和分析。针对此类任务,以Hadoop为基础的大数据生态为我们提供了非常好的解决方案。围绕Hadoop的一些列软件和相关资源都非常丰富,因此本书不深入展开,感兴趣的读者可以自行查阅相关内容。这里我们重点关注离线任务的3个方面:

存储、处理和分析、调度。

实时流数据经过处理和分析后,需要进行数据落地,也就是将数据存入持久化存储设备。为了将实时流处理和数据落地的逻辑分离开,最好先将实时流数据发送到Kafka消息队列,然后从Kafka消息队列拉取数据,最后将数据写入HDFS(Hadoop分布式文件系统)。从Kafka拉取消息写入HDFS的方法有很多种,Flume就是一种常用的方案,如图9-7所示。

图9-7 使用Flume将Kafka数据写入HDFS

下面是使用Flume将Kafka数据写入HDFS的配置样例。

events.sources = src1

events.channels = ch1

events.sinks = sk1events.sources.src1.type = org.apache.flume.source.kafka.KafkaSource

events.sources.src1.channels = ch1

events.sources.src1.ZookeeperConnect =

zookeeper1:2181,zookeeper2:2181,zookeeper3:2181/kafka

events.sources.src1.topic = events

events.sources.src1.groupId = flume

events.sources.src1.kafka.consumer.timeout.ms = 100

events.sources.src1.interceptors = json_interceptor

events.sources.src1.interceptors.json_interceptor.type =

com.alain898.flume.plugins.interceptors.JsonInterceptor$Builder

events.sources.src1.interceptors.json_interceptor.headers = timestamp

events.sources.src1.interceptors.json_interceptor.paths = $.timestamp

events.channels.ch1.type = file

events.channels.ch1.capacity = 10000

events.channels.ch1.transactionCapacity = 1000

events.channels.ch1.checkpointDir = ./checkpoint/events

events.channels.ch1.dataDirs = ./data/events

events.channels = ch1

events.sinks.sk1.type = hdfs

events.sinks.sk1.channel = ch1

events.sinks.sk1.hdfs.path = hdfs://nameservice1/flume/events/day=%Y%m%d/hour=%H

events.sinks.sk1.hdfs.filePrefix = events.

events.sinks.sk1.hdfs.fileSuffix = .json

events.sinks.sk1.hdfs.inUseSuffix = .tmp

events.sinks.sk1.hdfs.round = true

events.sinks.sk1.hdfs.roundValue = 10

events.sinks.sk1.hdfs.roundUnit = minute

events.sinks.sk1.hdfs.fileType = DataStream

events.sinks.sk1.hdfs.rollSize = 0

events.sinks.sk1.hdfs.rollCount = 0

events.sinks.sk1.hdfs.rollInterval = 300

events.sinks.sk1.hdfs.timeZone = UTC

在上面的配置中,我们需要注意以下几点。

1.小文件问题

Flume将数据写入HDFS时可以设置3种滚动条件,即按时间间隔滚动(rollInterval)、按文件大小滚动(rollSize)和按事件数滚动(rollCount)。这会造成一个问题,即如果两次滚动之间的事件数比较少,那么就会在HDFS上产生很多小文件。这虽然在功能上没什么问题,但是由于HDFS本身是针对大数据设计的文件系统,太多的小文件一方面会浪费大量的块节点,另一方面会降低MapReduce、Hive和Spark等程序的性能。所以,在设置滚动周期时,应该平衡文件大小和所能接收的时延。例如,如果HDFS的块大小是128MB,那么文件大小最好为128MB的整数倍再小一点儿。如果实在既要求数据入库的时延小,又没太多数据从而造成产生很多小文件,那么使用额外的任务周期性地将小文件合并成大文件也是很有必要的。

2.时间戳问题

Flume使用事件头部的timestamp字段作为分区时间依据。大多数情况下,我们需要使用事件发生的时间而不是Flume接收到事件的时间作为分区时间依据,所以我们需要自行定义一个时间戳拦截器将事件时间写入事件头部。如在前面的代码中,我们使用JsonInterceptor将JSON格式事件中的timestamp字段写入事件头部。

3.HDFS高可用问题

如果HDFS集群配置了高可用模式,那么Flume写入HDFS的路径就不能够直接使用具体的某台namenode服务器地址,而必须使用nameservice代替。否则当HDFS的namenode在active与standby两种状态之间切换时,Flume就不能写入数据了。

在配置好Flume代理后,使用如下命令启动Flume代理即可。

nohup bin/flume-ng agent --name events --conf ./conf --conf-file conf/events.conf -

Dflume.log.file=events.log &

使用Flume搬运少数主题的数据到HDFS还是非常方便的。但是当主题较多时,需要启动非常多的Flume代理进程,分散地管理这些任务会变得比较麻烦。除了使用这种比较“底层”的方式外,第8章讲到的消息路由服务Apache Camel也会给我们非常大的启发。通过ApacheCamel,可以统一且方便地管理数据在不同端点之间的传递,这部分解决了数据入库任务的管理问题。但是Apache Camel对这种任务管理的支持还不是一步到位的,我们依旧需要自己开发诸如集群化、监控、管理和UI之类的功能,所以我们“得寸进尺”,有没有更佳“一站式”的方案呢?这个当然可以有。诸如Apache NiFi、Apache Gobblin之类的开源工具就提供了功能更佳强大的大数据集成方案。

以Apache NiFi为例,它是一款大数据集成平台。Apache NiFi图形化界面如图9-8所示。

图9-8 Apach.NiFi图形化界面

可以说,Apache NiFi是我们理想中Apache Camel的模样,即支持可视化设计和分布式集群功能。图9-9展示了Apache NiFi集群的组成。

可以看到,Apache NiFi集群中的每个节点都是“平等”的,它们之间通过Zookeeper协调工作及共享状态。

Apache NiFi的这种集群方案非常贴合大数据集成场景,这是因为它具备以下优良特性。

图9-9 Apache NiFi集群的组成

·通过图形化界面创建、管理、监控各种ETL任务,使用起来更加直观方便。

·集群化的运行环境一方面能够集中管理各种ETL的任务,不需要像Flume或Camel那样管理零散运行实例,另一方面能够更加一致地对集群处理能力进行水平扩展。

·这是一款简单且独立于其他如YARN或Mesos等资源管理框架的集群方案,让其具有更少的依赖,部署、管理和维护起来非常方便。

总体来说,将数据从Kafka中读取出来并存储到HDFS并不是非常难的事,难的是当类似的任务变多后的管理问题。如果需要写入HDFS的Kafka主题比较少,则直接使用Flume或Camel非常方便。但是当主题非常多,变得难以管理时,不妨选择使用Apach.NiFi和Apach.Gobblin这类专门的大数据集成方案。

围绕Hadoop有关数据处理和分析的工具有很多种,这里我们只选择两个典型的离线数据处理和分析工具进行讲解,即Hive和Spark。

1.Hive

Hive是一个数据仓库工具,它将结构化数据文件映射为数据库表,并提供SQL查询功能。图9-10展示了Hive的工作原理。Hive内部将SQL语句转换为MapReduce或Tez作业,然后提交Hadoop执行,因此可以将Hive理解为MapReduce或Tez的一层SQL“皮肤”。使用Hive的好处在于其对SQL的支持,只要有SQL基础,就可以快速开始离线数据的统计分析。使用Hive时需要注意,在将数据与表绑定起来时,应该尽量使用外部表。

只有在需要创建和使用临时表时,才使用内部表。另外,临时表在用完之后一定要删除,否则这些数据会留在Hive里成为垃圾数据,越积越多,从而影响未来Hive的正常使用。

图9-10 Hive的工作原理

2.Spark

另一个更为数据分析人员所喜爱的大数据分析工具是Spark。Spark中的RDD(Resilient Distributed Datasets,分布式弹性数据集)和DataFrame这两个核心概念都是对数据的矩阵表示,因此对于大数据分析人员而言,Spark天生就是为他们量身定做的数学分析工具。Spark以RDD抽象为核心,提供了一系列的Transformation操作和Action操作API。通过这些操作组合,可以实现复杂的计算模式和分析功能。另外,Spark充分使用内存来进行操作计算,相比Hadoop最初的MapReduce,在性能表现上有了数量级的提升。时至今日,Spark已经成为了大数据分析的主流工具,我们在做离线数据分析,特别是一些复杂的分析计算(如统计学习和机器学习相关的模型训练)时,Spark都是不可多得的强有力工具。

离线任务通常是周期性定时执行的,因此需要一个能够管理离线任务执行的调度系统。比较简单的调度系统是Linux操作系统下常用的定时执行工具cron。cron工具只是一个简单的调度工具,它只支持本地调度,并且没有用户友好的管理界面。当调度任务很多时,cron任务难以管理,任务执行状态也不方便追踪。因此,我们需要功能更加强大的调度工具,如Azkaban。

Azkaban是由LinkedIn为调度Hadoop作业而开发的批处理工作流调度器,它解决了调度作业之间的相互依赖问题,并提供一个简单易用的Web用户界面来管理和追踪作业的调度和执行情况。图9-11展示了Azkaban的工作原理。WebServer提供Web界面,用户可以通过它上传、调度、监控和管理作业。ExecutorServer是作业执行的地方,当需要调度执行的任务非常多时,可以部署多台ExecutorServer。另外,在WebServer和ExecutorServer之外,Azkaban还需要用数据库来保存作业、调度和状态等各种元数据。

图9-12展示了Azkaban的作业执行历史页面。其中,event_report和add_partition这两个每小时执行一次的作业每次都是执行成功的,而另外一个每天执行一次的作业report_daily则执行失败了。

图9-11 Azkaban的工作原理

图9-12 Azkaban的作业执行历史页面

在使用Azkaban时,需要严格控制被调度执行任务的内存。如果任务占用内存过大,则一台16GB内存的ExecutorServer也不能同时启动几个任务,容易造成系统内存不足,调度任务被操作系统随机杀死。

除了Azkaban外,还有一些其他工作流调度器,如Oozie、Airflow等。这里就不再一一展开介绍了,读者可以自行查阅相关资料。

来源:大数据架构师

相关推荐