树叶云Apache Kafka教程:Apache Kafka 与Spark的集成

在本章中,我们将讨论如何将Apache Kafka与Spark Streaming API集成。

关于Spark

Spark Streaming API支持实时数据流的可扩展,高吞吐量,容错流处理。数据可以从诸如Kafka,Flume,Twitter等许多源中提取,并且可以使用复杂的算法来处理,例如地图,缩小,连接和窗口等高级功能。最后,处理的数据可以推送到文件系统,数据库和活动仪表板。弹性分布式数据集(RDD)是Spark的基本数据结构。它是一个不可变的分布式对象集合。RDD中的每个数据集划分为逻辑分区,可以在集群的不同节点上计算。

与Spark集成

Kafka是Spark流式传输的潜在消息传递和集成平台。Kafka充当实时数据流的中心枢纽,并使用Spark Streaming中的复杂算法进行处理。一旦数据被处理,Spark Streaming可以将结果发布到另一个Kafka主题或存储在HDFS,数据库或仪表板中。下图描述了概念流程。

现在,让我们详细了解Kafka-Spark API。

SparkConf API

它表示Spark应用程序的配置。用于将各种Spark参数设置为键值对。

SparkConf 类有以下方法 –

  • set(string key,string value) – 设置配置变量。

  • remove(string key) – 从配置中移除密钥。

  • setAppName(string name) – 设置应用程序的应用程序名称。

  • get(string key) – get key

StreamingContext API

这是Spark功能的主要入口点。SparkContext表示到Spark集群的连接,可用于在集群上创建RDD,累加器和广播变量。签名的定义如下所示。

public StreamingContext(String master, String appName, Duration batchDuration, 
   String sparkHome, scala.collection.Seq<String> jars, 
   scala.collection.Map<String,String> environment)
  • – 要连接的群集网址(例如mesos:// host:port,spark:// host:port,local [4])。

  • appName – 作业的名称,以显示在集群Web UI上

  • batchDuration – 流式数据将被分成批次的时间间隔

public StreamingContext(SparkConf conf, Duration batchDuration)

通过提供新的SparkContext所需的配置创建StreamingContext。

  • conf – Spark参数

  • batchDuration – 流式数据将被分成批次的时间间隔

KafkaUtils API

KafkaUtils API用于将Kafka集群连接到Spark流。此API具有如下定义的显着方法 createStream

public static ReceiverInputDStream<scala.Tuple2<String,String>> createStream(
   StreamingContext ssc, String zkQuorum, String groupId,
   scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel)

上面显示的方法用于创建从Kafka Brokers提取消息的输入流。

  • ssc – StreamingContext对象。

  • zkQuorum – Zookeeper quorum。

  • groupId – 此消费者的组ID。

  • 主题 – 返回要消费的主题的地图。

  • storageLevel – 用于存储接收的对象的存储级别。

KafkaUtils API有另一个方法createDirectStream,用于创建一个输入流,直接从Kafka Brokers拉取消息,而不使用任何接收器。这个流可以保证来自Kafka的每个消息都包含在转换中一次。

示例应用程序在Scala中完成。要编译应用程序,请下载并安装 sbt ,scala构建工具(类似于maven)。主要应用程序代码如下所示。

import java.util.HashMap

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produc-erRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._

object KafkaWordCount {
   def main(args: Array[String]) {
      if (args.length < 4) {
         System.err.println("Usage: KafkaWordCount <zkQuorum><group> <topics> <numThreads>")
         System.exit(1)
      }

      val Array(zkQuorum, group, topics, numThreads) = args
      val sparkConf = new SparkConf().setAppName("KafkaWordCount")
      val ssc = new StreamingContext(sparkConf, Seconds(2))
      ssc.checkpoint("checkpoint")

      val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
      val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
      val words = lines.flatMap(_.split(" "))
      val wordCounts = words.map(x => (x, 1L))
         .reduceByKeyAndWindow(_ &plus; _, _ - _, Minutes(10), Seconds(2), 2)
      wordCounts.print()

      ssc.start()
      ssc.awaitTermination()
   }
}

构建脚本

spark-kafka集成取决于Spark,Spark流和Spark与Kafka的集成jar。创建一个新文件 build.sbt ,并指定应用程序详细信息及其依赖关系。在编译和打包应用程序时, sbt 将下载所需的jar。

name := "Spark Kafka Project"
version := "1.0"
scalaVersion := "2.10.5"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0"

编译/包装

运行以下命令以编译和打包应用程序的jar文件。我们需要将jar文件提交到spark控制台以运行应用程序。

sbt package

提交到Spark

启动Kafka Producer CLI(在上一章中解释),创建一个名为 my-first-topic 的新主题,并提供一些样本消息,如下所示。

Another spark test message

运行以下命令将应用程序提交到spark控制台。

/usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-streaming
-kafka_2.10:1.6.0 --class "KafkaWordCount" --master local[4] target/scala-2.10/spark
-kafka-project_2.10-1.0.jar localhost:2181 <group name> <topic name> <number of threads>

此应用程序的示例输出如下所示。

spark console messages ..
(Test,1)
(spark,1)
(another,1)
(message,1)
spark console message ..

文章来源网络,作者:运维,如若转载,请注明出处:https://shuyeidc.com/wp/251466.html<

(0)
运维的头像运维
上一篇2025-04-28 23:56
下一篇 2025-04-28 23:57

相关推荐

  • 个人主题怎么制作?

    制作个人主题是一个将个人风格、兴趣或专业领域转化为视觉化或结构化内容的过程,无论是用于个人博客、作品集、社交媒体账号还是品牌形象,核心都是围绕“个人特色”展开,以下从定位、内容规划、视觉设计、技术实现四个维度,详细拆解制作个人主题的完整流程,明确主题定位:找到个人特色的核心主题定位是所有工作的起点,需要先回答……

    2025-11-20
    0
  • 社群营销管理关键是什么?

    社群营销的核心在于通过建立有温度、有价值、有归属感的社群,实现用户留存、转化和品牌传播,其管理需贯穿“目标定位-内容运营-用户互动-数据驱动-风险控制”全流程,以下从五个维度展开详细说明:明确社群定位与目标社群管理的首要任务是精准定位,需明确社群的核心价值(如行业交流、产品使用指导、兴趣分享等)、目标用户画像……

    2025-11-20
    0
  • 香港公司网站备案需要什么材料?

    香港公司进行网站备案是一个涉及多部门协调、流程相对严谨的过程,尤其需兼顾中国内地与香港两地的监管要求,由于香港公司注册地与中国内地不同,其网站若主要服务内地用户或使用内地服务器,需根据服务器位置、网站内容性质等,选择对应的备案路径(如工信部ICP备案或公安备案),以下从备案主体资格、流程步骤、材料准备、注意事项……

    2025-11-20
    0
  • 如何企业上云推广

    企业上云已成为数字化转型的核心战略,但推广过程中需结合行业特性、企业痛点与市场需求,构建系统性、多维度的推广体系,以下从市场定位、策略设计、执行落地及效果优化四个维度,详细拆解企业上云推广的实践路径,精准定位:明确目标企业与核心价值企业上云并非“一刀切”的方案,需先锁定目标客户群体,提炼差异化价值主张,客户分层……

    2025-11-20
    0
  • PS设计搜索框的实用技巧有哪些?

    在PS中设计一个美观且功能性的搜索框需要结合创意构思、视觉设计和用户体验考量,以下从设计思路、制作步骤、细节优化及交互预览等方面详细说明,帮助打造符合需求的搜索框,设计前的规划明确使用场景:根据网站或APP的整体风格确定搜索框的调性,例如极简风适合细线条和纯色,科技感适合渐变和发光效果,电商类则可能需要突出搜索……

    2025-11-20
    0

发表回复

您的邮箱地址不会被公开。必填项已用 * 标注