Spark

資料分析 GraphX 處理 (2)

基本介紹

教學目標

透過 GraphX 建立有向圖建立實作和基本操作。

使用教學

以下的練習主要是以下圖為例進行有向圖的建立實作和基本操作。

開啟 Spark 之 Scala 程式語言命令介面 (目前 GraphX 支援 Scala 程式語言。)

1
$ ./bin/spark-shell

載入 GraphX 函式庫。

1
2
3
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

建立有向圖

建立有向圖的點和邊的陣列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
val vertexArray = Array(
(1L, ("Alice", 28)),
(2L, ("Bob", 27)),
(3L, ("Charlie", 65)),
(4L, ("David", 42)),
(5L, ("Ed", 55)),
(6L, ("Fran", 50))
)
val edgeArray = Array(
Edge(2L, 1L, 7),
Edge(2L, 4L, 2),
Edge(3L, 2L, 4),
Edge(3L, 6L, 3),
Edge(4L, 1L, 1),
Edge(5L, 2L, 2),
Edge(5L, 3L, 8),
Edge(5L, 6L, 3)
)

透過 sc.parallelize 以點和邊的陣列建立 RDDs 。

1
2
val vertexRDD: RDD[(Long, (String, Int))] = sc.parallelize(vertexArray)
val edgeRDD: RDD[Edge[Int]] = sc.parallelize(edgeArray)

開始建立有向圖 Graph[V, E] 。

1
val graph: Graph[(String, Int), Int] = Graph(vertexRDD, edgeRDD)

基本操作

顯示符合條件的所有點。

1
2
3
for ((id,(name,age)) <- graph.vertices.filter { case (id,(name,age)) => age > 30 }.collect) {
println(s"$name is $age")
}

解釋點與點之間的有向關係。

1
2
3
for (triplet <- graph.triplets.filter(t => t.attr > 5).collect) {
println(s"${triplet.srcAttr._1} loves ${triplet.dstAttr._1}")
}

建立子圖並且顯示。

1
2
3
4
val subgraph = graph.subgraph(vpred = {case(id, (name,age)) =>age >= 30})
for ((id,(name,age)) <- subgraph.vertices.filter { case (id,(name,age)) => age > 30 }.collect) {
println(s"$name is $age")
}

相關資源

資料分析 GraphX 處理 (1)

基本介紹

教學目標

初步了解 GraphX 的重點概念和基本操作。

重點概念

基本上 GraphX 針對圖的處理可以分為兩大部分,分別為圖的儲存和圖的處理。

圖的儲存

主要有 Vertices 、 Edges 和 Triplets 三種基本結構,並在 Spark 中皆將資料儲存至 RDD ,具備 Immutable、Distributed 和 Fault-Tolerant 關鍵特性。一開始透過 GraphLoader 載入圖的資料文件,同時透過不同的分區策略進行圖的儲存優化,大部分應用以 EdgePartition2d 效果最佳。

圖的處理

主要是採用 Pregel 的處理模型,演算法主要會實作 vertexProgram 、 sendMessage 和 messageCombiner 三個處理函數,然後傳遞給 Pregel 處理,像是 PageRank 演算法。

統一流程

一般來說,Hadoop 和 Spark 主要談論的皆是 Data-Parallel 計算,然而透過 Graph-Parallel 計算理論上應該要非常有效率,可惜為了實現目標針對圖的處理皆要經過許多流程,依賴許多原始資料、表格和圖等資訊,此時透過 GraphX 專案就能處一處理 Data-Parallel 和 Graph-Parallel 計算,彼此之間是基於 RDD 進行資料傳遞。

從原始資料透過 ETL 產生初始的圖,接著切割子圖之後,進行計算,最後分析,等待流程結束之後,再重複執行同樣流程,為了實現目標對對圖的處理產生出最佳的結果,如下圖所示。

使用教學

開啟 Spark 之 Scala 程式語言命令介面 (目前 GraphX 支援 Scala 程式語言。)

1
$ ./bin/spark-shell

載入 GraphX 函式庫。

1
2
3
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

載入圖的資料文件。

1
val graph = GraphLoader.edgeListFile(sc, "graphx/data/followers.txt")

進行 PageRank 演算法。

1
val ranks = graph.pageRank(0.0001).vertices

輸出圖的處理之後結果。

1
println(ranks.collect().mkString("\n"))

進行內容資料對應。

1
2
3
4
5
6
7
val users = sc.textFile("graphx/data/users.txt").map { line =>
val fields = line.split(",")
(fields(0).toLong, fields(1))
}
val ranksByUsername = users.join(ranks).map {
case (id, (username, rank)) => (username, rank)
}

輸出內容資料對應之後結果。

1
println(ranksByUsername.collect().mkString("\n"))

相關資源

資料分析 Spark 處理 (1)

基本介紹

教學目標

學習如何在 Ubuntu 作業系統伺服器中安裝 Spark,接著透過 Python 語言以 Spark 進行資料基本操作。

使用教學

套件安裝

請先在 Amazon EC2 中開啟至少 t2.small 等級以上實體機器。

安裝 Java 套件。

1
2
3
$ sudo apt-add-repository ppa:webupd8team/java
$ sudo apt-get update
$ sudo apt-get install oracle-java7-installer

安裝 Git 套件。

1
$ sudo apt-get install git

Spark 安裝

下載 Spark 套件。

1
2
$ wget http://d3kbcqa49mib13.cloudfront.net/spark-1.2.0.tgz
$ tar xvf spark-1.2.0.tgz

安裝 Spark 套件。

1
2
$ cd spark-1.2.0/
$ sbt/sbt assembly

Python 操作

執行 pyspark ,透過 Python 語言以 Spark 進行資料基本操作。

1
2
$ cd spark-1.2.0/
$ ./bin/pyspark

載入文字檔轉換成彈性分散式資料集,簡稱資料集 (Resilient Distributed Datasets,RDD) ,再透過函式計算資料集的數量。

1
2
>>> textFile = sc.textFile("README.md")
>>> textFile.count()

透過函式計算資料集中包括 Spark 的數量。

1
2
>>> linesWithSpark = textFile.filter(lambda line: "Spark" in line)
>>> linesWithSpark.count()

透過匿名函式 (lambdas) 取得最多字元的行數。

1
2
>>> textFile = sc.textFile("README.md")
>>> textFile.map(lambda line: len(line.split())).reduce(lambda a, b: a if (a > b) else b)

相關資源