1. 导入 Spark 和 GraphX 相关的库:
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
2. 创建 SparkConf 和 SparkContext:
val conf = new SparkConf().setAppName("GraphXExample").setMaster("local[*]")
val sc = new SparkContext(conf)
3. 创建顶点和边的 RDD:
// 顶点 RDD
val vertices: RDD[(VertexId, String)] = sc.parallelize(Seq(
(1L, "Alice"),
(2L, "Bob"),
(3L, "Charlie"),
// ...
))
// 边 RDD
val edges: RDD[Edge[Int]] = sc.parallelize(Seq(
Edge(1L, 2L, 1), // 表示顶点 1 到顶点 2 的边,权重为 1
Edge(2L, 3L, 2),
// ...
))
4. 创建 Graph 对象:
val defaultVertex = ("Default")
val graph = Graph(vertices, edges, defaultVertex)
5. 基本图操作:
- 顶点和边的数量:
val numVertices = graph.numVertices
val numEdges = graph.numEdges
- 获取顶点和边的信息:
graph.vertices.collect().foreach(println)
graph.edges.collect().foreach(println)
6. 图的转换操作:
- 顶点和边的过滤:
val subgraph = graph.subgraph(vpred = (id, attr) => attr.startsWith("A"))
- 计算顶点的出度和入度:
val inDegrees: VertexRDD[Int] = graph.inDegrees
val outDegrees: VertexRDD[Int] = graph.outDegrees
7. 图算法:
- PageRank 算法:
val ranks = graph.pageRank(0.1).vertices
- 最短路径算法:
val sourceId: VertexId = 1
val shortestPaths = graph.shortestPaths.landmarks(Seq(sourceId)).run()
- 连通性算法:
val connectedComponents = graph.connectedComponents().vertices
8. 关闭 SparkContext:
sc.stop()
这是一个简单的 GraphX 示例,展示了如何创建图、执行基本的图操作和算法。在实际应用中,你可能需要根据具体的图处理任务选择适当的算法和操作。请注意,GraphX 需要在 Spark 集群上运行,上述示例中使用了本地模式(setMaster("local[*]"))用于本地调试。在实际环境中,你需要将其配置为连接到你的 Spark 集群。
转载请注明出处:http://www.pingtaimeng.com/article/detail/9373/Spark