from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# 创建 SparkContext,这是 Spark 通信的入口点
sc = SparkContext("local[2]", "SparkStreamingExample")
# 创建 StreamingContext,每隔1秒处理一次数据
ssc = StreamingContext(sc, 1)
# 此处添加你的 Spark Streaming 代码,例如从 TCP socket 读取实时数据,并执行一些转换和操作
# 启动 StreamingContext
ssc.start()
# 等待终止
ssc.awaitTermination()
上述代码的关键部分如下:
1. 创建 SparkContext 对象(通常缩写为 sc)。SparkContext 是 Spark 与集群通信的入口点,你需要在创建 StreamingContext 之前先创建它。
2. 创建 StreamingContext 对象(通常缩写为 ssc)。在创建时,需要传递 SparkContext 对象和批处理间隔(例如,每秒处理一次数据)。
3. 在 StreamingContext 中添加你的 Spark Streaming 代码,例如从 TCP socket 读取实时数据,并执行一些转换和操作。这部分代码将在 ssc.start() 之前执行。
4. 启动 StreamingContext 对象,使其开始处理实时数据。
5. 使用 ssc.awaitTermination() 使程序保持运行状态,直到手动停止或出现错误。
确保你的 Spark 环境已正确设置,例如 Spark 的二进制文件路径和版本与代码中使用的相匹配。此外,如果你运行在分布式集群上,你需要提供适当的集群管理器和相关配置。
这只是一个简单的示例,实际中你可能需要根据你的需求进行更多的设置和配置。
转载请注明出处:http://www.pingtaimeng.com/article/detail/9347/Spark