引言:让我们来学习Apache Kafka与Spark Streaming的整合概念,以及两种不同配置方法的优缺点。
Kafka与SparkStreaming的整合
我们在将Apache Kafka与Spark Streaming整合的实战过程中,一般可以选用两种方面来配置Spark Streaming,并接收来自Kafka的数据。第一种是利用接收器和Kafka的高级API;而第二种新的方法则并不使用接收器。这两种方法在性能特征和语义保持上,有着不同的编程模式。
下面让我们来详细探究一下这两种方法。
基于接收器的方法
此法运用接收器(Receiver)来接收数据。而接收器是利用Kafka的高级消费者(consumer)API来实现的。此外,接收到的数据会被存储在Spark的各个执行器(executor)中。然后由Spark Streaming所启动的作业来处理数据。
但是在出现失败时,这种方法的默认配置可能会丢失数据。因此,我们必须在Spark Streaming中额外地启用预写日志(write-ahead log),以确保数据的零丢失。它将所有接收到的Kafka数据,同步地保存到某个分布式文件系统的预写日志中,以便在出现失败时恢复所有的数据。
下面,我们将讨论如何在Kafka-Spark Streaming应用中,使用该基于接收器的方法。
链接
现在,先将您的Kafka streaming应用与如下的artifact相链接,对于Scala和Java类型的应用,我们会用到SBT(Simple Build Tool)和Maven(一种构建工具)的各种项目定义。
groupId = org.apache.spark
artifactId =spark-streaming-kafka-0-8_2.11
version = 2.2.0
而对于Python类型的应用,我们必须在部署自己的应用时,额外添加上述库、及其各种依赖项。
编程
随后,我们在streaming应用的代码中,通过导入KafkaUtils,来创建一项DStream输入:
importorg.apache.spark.streaming.kafka._
val kafkaStream =KafkaUtils.createStream(streamingContext,
[ZK quorum], [consumer group id],[per-topic number of Kafka partitions to consume])
同样,通过使用createStream的各种变形方式,我们可以制定出不同的键/值类,及其对应的解码类。
部署
通常情况下,对于任何Spark应用而言,您都可以使用spark-submit来发布自己的应用。当然,就具体的Scala、Java和Python应用来说,它们在细节上会略有不同。
其中,由于Python应用缺少SBT和Maven的项目管理,我们可以使用–packages spark-streaming-kafka-0-8_2.11、及其各个依赖项,直接添加到spark-submit处。
./bin/spark-submit--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 ...
此外,我们还可以从Maven的存储库中下载Maven artifact的spark-streaming-Kafka-0-8-assembly所对应的JAR包,然后使用-jars,将其添加到spark-submit处。
直接方法(无接收器)
在基于接收器的方法之后,新的一种无接收器式“直接”方法诞生了。此法提供了更强大的端到端保证。它定期查询Kafka在每个topic+分区(partition)中的最新偏移量,而不再使用接收器去接收数据。同时,它也定义了要在每个批次中处理的不同偏移范围。特别是在那些处理数据的作业被启动时,其简单消费者(consumer)API就会被用于读取Kafka中预定义的偏移范围。可见,此过程类似于从某个文件系统中读取各种文件。
注:针对Scala和Java API,Spark在其1.3版本中引入了此功能;而针对Python API,它在其1.4版本中同样引入了该功能。
下面,我们将讨论如何在Streaming应用中使用该方法,并深入了解更多有关消费者API的链接:
链接
当然,这种方法仅被Scala和Java应用所支持,并且通过如下artifact来链接STB和Maven项目。
groupId = org.apache.spark
artifactId =spark-streaming-kafka-0-8_2.11
version = 2.2.0
编程
随后,我们在streaming应用的代码中,通过导入KafkaUtils,来创建一项DStream输入:
importorg.apache.spark.streaming.kafka._
val directKafkaStream =KafkaUtils.createDirectStream[
[key class], [value class], [key decoderclass], [value decoder class] ](
streamingContext, [map of Kafkaparameters], [set of topics to consume])
我们必须在Kafka的参数中,指定metadata.broker.list或bootstrap.servers,以便它能够在默认情况下,从各个Kafka分区的最新偏移量开始消费。当然,如果您在Kafka的参数中将auto.offset.reset配置为最小,那么它就会从最小的偏移开始消费。
此外,通过使用KafkaUtils.createDirectStream的各种变形方式,我们能够从任意偏移量开始消费。当然,我们也可以在每一个批次中,按照如下的方式去消费Kafka的偏移量。
// Hold a reference to thecurrent offset ranges, so downstream can use it
var offsetRanges =Array.empty[OffsetRange]
directKafkaStream.transform{ rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}.map {
...
}.foreachRDD { rdd =>
for (o <- offsetRanges) {
println(s"${o.topic} ${o.partition}${o.fromOffset} ${o.untilOffset}")
}
...
}
如果您想使用基于Zookeeper的Kafka监控工具(https://data-flair.training/blogs/zookeeper-in-kafka/),来显示streaming应用的进度,那么您也可以自行将其更新到Zookeeper中。
部署
该方面的部署过程与基于接收器的方法类似,此处就不赘述了。
直接方法的优点
就Spark Streaming与Kafka整合的角度而言,第二种方法较第一种方法有着如下的优点:
简化并行
无需创建与合并多个输入的Kafka streams(https://data-flair.training/blogs/kafka-streams/)。但是,Sparking Streaming会创建同样多的RDD(Resilient Distributed Datasets,弹性分布式数据集)分区,以供多个Kafka分区使用直接的方法进行消费。这些分区也会并行地从Kafka中读取数据。因此我们可以说:在Kafka和RDD分区之间存在更容易被理解和调整的、一对一的映射关系。
效率
为了实现数据的零丢失,第一种方法需要将数据存储在预写日志中,以供进一步复制数据。此方法的效率实际上是比较低的,因为数据被Kafka和预写日志实际复制了两次。而在直接的方法中,由于没有了接收器,因此不需要预先写入日志,此问题也就迎刃而解了。只要您拥有足够多的Kafka数据保留,各种消息就能够从Kafka中被恢复回来。
准确到位的语义
在第一种方法中,我们使用Kafka的高级API,在Zookeeper中存储被消费的偏移量。然而,这种传统的、从Kafka中消费数据的方式,虽然能够确保数据的零丢失,但是在某些失败情况下,数据可能会被小概率地消费两次。实际上,这种情况源自那些被Spark Streaming可靠地接收到的数据,与Zookeeper跟踪到的偏移量之间所产生的不一致性。因此在第二种方法中,我们不再使用Zookeeper,而只是使用一个简单的Kafka API。Spark Streaming通过其各个检查点(checkpoints),来跟踪不同的偏移量,籍此消除了Spark Streaming和Zookeeper之间的不一致性。
可见,就算出现了失败的情况,那些记录也都会被Spark Streaming有效地、准确地一次性接收。它能够确保我们的输出操作,即:将数据保存到外部数据存储库时,各种保存结果和偏移量的幂等性、和原子事务性,这同时也有助于实现准确到位的语义。
不过,这种方法也有一个缺点:由于它不会在Zookeeper中更新各种偏移量,因此那些基于Zookeeper的Kafka监控工具将无法显示进度。当然,您也可以自行访问每个批次中由此方法处理的偏移量,并更新到Zookeeper之中。
结论
通过上述讨论,我们学到了Kafka与Spark Streaming整合的全体概念。同时,我们也讨论了Kafka-Spark Streaming的两种不同配置方法:接收器方法和直接方法,以及直接方法的几项优点。
【原标题】Apache Kafka + Spark Streaming Integration (作者: Rinu Gour )
原文链接:https://dzone.com/articles/apache-kafka-spark-streaming-integration