取消
显示结果 
搜索替代 
您的意思是: 
cancel
1882
查看次数
0
有帮助
2
评论
julianchen
Spotlight
Spotlight

引言:让我们来学习Apache KafkaSpark Streaming的整合概念,以及两种不同配置方法的优缺点。


KafkaSparkStreaming的整合

我们在将Apache KafkaSpark Streaming整合的实战过程中,一般可以选用两种方面来配置Spark Streaming,并接收来自Kafka的数据。第一种是利用接收器和Kafka的高级API;而第二种新的方法则并不使用接收器。这两种方法在性能特征和语义保持上,有着不同的编程模式。


下面让我们来详细探究一下这两种方法。

基于接收器的方法

此法运用接收器(Receiver)来接收数据。而接收器是利用Kafka的高级消费者(consumerAPI来实现的。此外,接收到的数据会被存储在Spark的各个执行器(executor)中。然后由Spark Streaming所启动的作业来处理数据。

但是在出现失败时,这种方法的默认配置可能会丢失数据。因此,我们必须在Spark Streaming中额外地启用预写日志(write-ahead log),以确保数据的零丢失。它将所有接收到的Kafka数据,同步地保存到某个分布式文件系统的预写日志中,以便在出现失败时恢复所有的数据。

下面,我们将讨论如何在Kafka-Spark Streaming应用中,使用该基于接收器的方法。

链接

现在,先将您的Kafka streaming应用与如下的artifact相链接,对于ScalaJava类型的应用,我们会用到SBTSimple Build Tool)和Maven(一种构建工具)的各种项目定义

groupId = org.apache.spark

artifactId =spark-streaming-kafka-0-8_2.11

version = 2.2.0

而对于Python类型的应用,我们必须在部署自己的应用时,额外添加上述库、及其各种依赖项。

编程

随后,我们在streaming应用的代码中,通过导入KafkaUtils,来创建一项DStream输入:

importorg.apache.spark.streaming.kafka._

val kafkaStream =KafkaUtils.createStream(streamingContext,

[ZK quorum], [consumer group id],[per-topic number of Kafka partitions to consume])

同样,通过使用createStream的各种变形方式,我们可以制定出不同的键/值类,及其对应的解码类。

部署

通常情况下,对于任何Spark应用而言,您都可以使用spark-submit来发布自己的应用。当然,就具体的ScalaJavaPython应用来说,它们在细节上会略有不同。

其中,由于Python应用缺少SBTMaven的项目管理,我们可以使用–packages spark-streaming-kafka-0-8_2.11、及其各个依赖项,直接添加到spark-submit处。

./bin/spark-submit--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 ...

此外,我们还可以从Maven的存储库中下载Maven artifactspark-streaming-Kafka-0-8-assembly所对应的JAR包,然后使用-jars,将其添加到spark-submit处。

直接方法(无接收器)

在基于接收器的方法之后,新的一种无接收器式“直接”方法诞生了。此法提供了更强大的端到端保证。它定期查询Kafka在每个topic+分区(partition)中的最新偏移量,而不再使用接收器去接收数据。同时,它也定义了要在每个批次中处理的不同偏移范围。特别是在那些处理数据的作业被启动时,其简单消费者(consumerAPI就会被用于读取Kafka中预定义的偏移范围。可见,此过程类似于从某个文件系统中读取各种文件。

注:针对ScalaJava APISpark在其1.3版本中引入了此功能;而针对Python API,它在其1.4版本中同样引入了该功能。

下面,我们将讨论如何在Streaming应用中使用该方法,并深入了解更多有关消费者API的链接:

链接

当然,这种方法仅被ScalaJava应用所支持,并且通过如下artifact来链接STBMaven项目。

groupId = org.apache.spark

artifactId =spark-streaming-kafka-0-8_2.11

version = 2.2.0

编程

随后,我们在streaming应用的代码中,通过导入KafkaUtils,来创建一项DStream输入:

importorg.apache.spark.streaming.kafka._

val directKafkaStream =KafkaUtils.createDirectStream[

[key class], [value class], [key decoderclass], [value decoder class] ](

streamingContext, [map of Kafkaparameters], [set of topics to consume])

我们必须在Kafka的参数中,指定metadata.broker.listbootstrap.servers,以便它能够在默认情况下,从各个Kafka分区的最新偏移量开始消费。当然,如果您在Kafka的参数中将auto.offset.reset配置为最小,那么它就会从最小的偏移开始消费。

此外,通过使用KafkaUtils.createDirectStream的各种变形方式,我们能够从任意偏移量开始消费。当然,我们也可以在每一个批次中,按照如下的方式去消费Kafka的偏移量。

// Hold a reference to thecurrent offset ranges, so downstream can use it

var offsetRanges =Array.empty[OffsetRange]

directKafkaStream.transform{ rdd =>

offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

rdd

}.map {

...

}.foreachRDD { rdd =>

for (o <- offsetRanges) {

println(s"${o.topic} ${o.partition}${o.fromOffset} ${o.untilOffset}")

}

...

}

如果您想使用基于ZookeeperKafka监控工具(https://data-flair.training/blogs/zookeeper-in-kafka/),来显示streaming应用的进度,那么您也可以自行将其更新到Zookeeper中。

部署

该方面的部署过程与基于接收器的方法类似,此处就不赘述了。

直接方法的优点

Spark StreamingKafka整合的角度而言,第二种方法较第一种方法有着如下的优点:

简化并行

无需创建与合并多个输入的Kafka streamshttps://data-flair.training/blogs/kafka-streams/)。但是,Sparking Streaming会创建同样多的RDDResilient Distributed Datasets,弹性分布式数据集)分区,以供多个Kafka分区使用直接的方法进行消费。这些分区也会并行地从Kafka中读取数据。因此我们可以说:在KafkaRDD分区之间存在更容易被理解和调整的、一对一的映射关系。

效率

为了实现数据的零丢失,第一种方法需要将数据存储在预写日志中,以供进一步复制数据。此方法的效率实际上是比较低的,因为数据被Kafka和预写日志实际复制了两次。而在直接的方法中,由于没有了接收器,因此不需要预先写入日志,此问题也就迎刃而解了。只要您拥有足够多的Kafka数据保留,各种消息就能够从Kafka中被恢复回来。

准确到位的语义

在第一种方法中,我们使用Kafka的高级API,在Zookeeper中存储被消费的偏移量。然而,这种传统的、从Kafka中消费数据的方式,虽然能够确保数据的零丢失,但是在某些失败情况下,数据可能会被小概率地消费两次。实际上,这种情况源自那些被Spark Streaming可靠地接收到的数据,与Zookeeper跟踪到的偏移量之间所产生的不一致性。因此在第二种方法中,我们不再使用Zookeeper,而只是使用一个简单的Kafka APISpark Streaming通过其各个检查点(checkpoints),来跟踪不同的偏移量,籍此消除了Spark StreamingZookeeper之间的不一致性。

可见,就算出现了失败的情况,那些记录也都会被Spark Streaming有效地、准确地一次性接收。它能够确保我们的输出操作,即:将数据保存到外部数据存储库时,各种保存结果和偏移量的幂等性、和原子事务性,这同时也有助于实现准确到位的语义。

不过,这种方法也有一个缺点:由于它不会在Zookeeper中更新各种偏移量,因此那些基于ZookeeperKafka监控工具将无法显示进度。当然,您也可以自行访问每个批次中由此方法处理的偏移量,并更新到Zookeeper之中。

结论

通过上述讨论,我们学到了KafkaSpark Streaming整合的全体概念。同时,我们也讨论了Kafka-Spark Streaming的两种不同配置方法:接收器方法和直接方法,以及直接方法的几项优点。

【原标题】Apache Kafka + Spark Streaming Integration (作者: Rinu Gour )

原文链接:https://dzone.com/articles/apache-kafka-spark-streaming-integration

评论
one-time
Level 13
Level 13
感谢楼主分享,谢谢~
likuo
Spotlight
Spotlight
认真学习。
入门指南

使用上面的搜索栏输入关键字、短语或问题,搜索问题的答案。

我们希望您在这里的旅程尽可能顺利,因此这里有一些链接可以帮助您快速熟悉思科社区:









快捷链接