今天就跟大家聊聊有關(guān)Spark Graphx如何實(shí)現(xiàn)圖中極大團(tuán)挖掘,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。
專業(yè)網(wǎng)站建設(shè)公司網(wǎng)站可以采用ASP、PHP、.NET編程語(yǔ)言及配備的SQL SERVER、MYSQL、ACCESSS數(shù)據(jù)庫(kù)存儲(chǔ)來(lái)整體開發(fā)及設(shè)計(jì)各類型大中型網(wǎng)站(包括:公司、行業(yè)門戶、醫(yī)院門戶、商城、政府門戶、音樂、視頻、交友、各行業(yè)網(wǎng)站等各種類型網(wǎng)站),我們可以提供從網(wǎng)站開發(fā)、網(wǎng)站設(shè)計(jì)、網(wǎng)站安全維護(hù)及網(wǎng)站托管和網(wǎng)絡(luò)推廣一條龍服務(wù)。打造高端企業(yè)網(wǎng)站設(shè)計(jì)公司,網(wǎng)站開發(fā)周期短,質(zhì)量有保證,設(shè)計(jì)精美,價(jià)格合理。
spark graphx并未提供極大團(tuán)挖掘算法
當(dāng)下的極大團(tuán)算法都是串行化的算法,基于Bron–Kerbosch算法
####思路:####
spark graphx提供了連通圖的算法,連通圖和極大團(tuán)都是無(wú)向圖中的概念,極大團(tuán)為連通圖的子集
利用spark graphx 找出連通圖,在從各個(gè)連通圖中,利用串行化的極大團(tuán)算法,找出極大團(tuán) (偽并行化)
對(duì)于關(guān)聯(lián)性較強(qiáng)的圖,找出來(lái)的連通圖非常大,這時(shí)串行化的極大團(tuán)算法,仍然會(huì)耗時(shí)很久,這里利用剪枝的思想減少樣本數(shù)據(jù)量,但是對(duì)于大圖,優(yōu)化空間有限
期待真正的并行化的極大團(tuán)算法
####配置文件:####
graph_data_path=hdfs://localhost/graph_data out_path=hdfs://localhost/clique ck_path=hdfs://localhost/checkpoint numIter=50 剪枝次數(shù) count=3 極大團(tuán)頂點(diǎn)數(shù)大小 algorithm=2 極大團(tuán)算法,1:個(gè)人實(shí)現(xiàn) 2:jgrapht percent=90 剪枝后的頂點(diǎn)數(shù),占前一次的百分比,如果剪完后,還剩下90%的數(shù)據(jù),那么剪枝效率已然不高 spark.master=local spark.app.name=graph spark.serializer=org.apache.spark.serializer.KryoSerializer spark.yarn.executor.memoryOverhead=20480 spark.yarn.driver.memoryOverhead=20480 spark.driver.extraJavaOptions=-XX:+UseG1GC -XX:+UseCompressedOops -XX:+DisableExplicitGC spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:+UseCompressedOops -XX:+DisableExplicitGC spark.driver.maxResultSize=10g spark.default.parallelism=60
####樣本數(shù)據(jù):####
{"src":"0","dst":"1"} {"src":"0","dst":"2"} {"src":"0","dst":"3"} {"src":"1","dst":"0"} {"src":"2","dst":"1"} {"src":"3","dst":"5"} {"src":"4","dst":"6"} {"src":"5","dst":"4"} {"src":"6","dst":"5"} {"src":"3","dst":"2"} {"src":"2","dst":"3"} {"src":"6","dst":"4"} {"src":"3","dst":"4"} {"src":"4","dst":"3"} {"src":"2","dst":"6"} {"src":"6","dst":"2"} {"src":"6","dst":"7"} {"src":"7","dst":"6"}
####樣本圖:####
####輸出:####
0,1,2 0,2,3 3,4,5 4,5,6
####代碼實(shí)現(xiàn):####
import java.util import java.util.Properties
import org.apache.spark.broadcast.Broadcast import org.apache.spark.graphx.{Edge, Graph} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.storage.StorageLevel import org.apache.spark.{SparkConf, SparkContext} import org.jgrapht.alg.BronKerboschCliqueFinder import org.jgrapht.graph.{DefaultEdge, SimpleGraph} import scala.collection.JavaConverters._ import scala.collection.mutable object ApplicationTitan { def main(args: Array[String]) { val prop = new Properties() prop.load(getClass.getResourceAsStream("/config.properties")) val graph_data_path = prop.getProperty("graph_data_path") val out_path = prop.getProperty("out_path") val ck_path = prop.getProperty("ck_path") val count = Integer.parseInt(prop.getProperty("count")) val numIter = Integer.parseInt(prop.getProperty("numIter")) val algorithm = Integer.parseInt(prop.getProperty("algorithm")) val percent = Integer.parseInt(prop.getProperty("percent")) val conf = new SparkConf() try { Runtime.getRuntime.exec("hdfs dfs -rm -r " + out_path) // Runtime.getRuntime.exec("cmd.exe /C rd /s /q " + out_path) } catch { case ex: Exception => ex.printStackTrace(System.out) } prop.stringPropertyNames().asScala.foreach(s => { if (s.startsWith("spark")) { conf.set(s, prop.getProperty(s)) } }) conf.registerKryoClasses(Array(getClass)) val sc = new SparkContext(conf) sc.setLogLevel("ERROR") sc.setCheckpointDir(ck_path) val sqlc = new SQLContext(sc) try { val e_df = sqlc.read // .json(graph_data_path) .parquet(graph_data_path) var e_rdd = e_df .mapPartitions(it => { it.map({ case Row(dst: String, src: String) => val src_long = src.toLong val dst_long = dst.toLong if (src_long < dst_long) (src_long, dst_long) else (dst_long, src_long) }) }).distinct() e_rdd.persist(StorageLevel.MEMORY_AND_DISK_SER) var bc: Broadcast[Set[Long]] = null var iter = 0 var bc_size = 0 //剪枝 while (iter <= numIter) { val temp = e_rdd .flatMap(x => List((x._1, 1), (x._2, 1))) .reduceByKey((x, y) => x + y) .filter(x => x._2 >= count - 1) .mapPartitions(it => it.map(x => x._1)) val bc_value = temp.collect().toSet bc = sc.broadcast(bc_value) e_rdd = e_rdd.filter(x => bc.value.contains(x._1) && bc.value.contains(x._2)) e_rdd.persist(StorageLevel.MEMORY_AND_DISK_SER) iter += 1 if (bc_size != 0 && bc_value.size >= bc_size * percent / 100) { println("total iter : "+ iter) iter = Int.MaxValue } bc_size = bc_value.size } // 構(gòu)造圖 val edge: RDD[Edge[Long]] = e_rdd.mapPartitions(it => it.map(x => Edge(x._1, x._2))) val graph = Graph.fromEdges(edge, 0, StorageLevel.MEMORY_AND_DISK_SER, StorageLevel.MEMORY_AND_DISK_SER) //連通圖 val cc = graph.connectedComponents().vertices cc.persist(StorageLevel.MEMORY_AND_DISK_SER) cc.join(e_rdd) .mapPartitions(it => it.map(x => ((math.random * 10).toInt.toString.concat(x._2._1.toString), (x._1, x._2._2)))) .aggregateByKey(List[(Long, Long)]())((list, v) => list :+ v, (list1, list2) => list1 ::: list2) .mapPartitions(it => it.map(x => (x._1.substring(1), x._2))) .aggregateByKey(List[(Long, Long)]())((list1, list2) => list1 ::: list2, (list3, list4) => list3 ::: list4) .filter(x => x._2.size >= count - 1) .flatMap(x => { if (algorithm == 1) find(x, count) else find2(x, count) }) .mapPartitions(it => { it.map({ case set => var temp = "" set.asScala.foreach(x => temp += x + ",") temp.substring(0, temp.length - 1) case _ => }) }) // .coalesce(1) .saveAsTextFile(out_path) } catch { case ex: Exception => ex.printStackTrace(System.out) } sc.stop() } //自己實(shí)現(xiàn)的極大團(tuán)算法 def find(x: (String, List[(Long, Long)]), count: Int): mutable.Set[util.Set[String]] = { println(x._1 + "|s|" + x._2.size) println("BKCliqueFinder---" + x._1 + "---" + System.currentTimeMillis()) val neighbors = new util.HashMap[String, util.Set[String]] val finder = new CliqueFinder(neighbors, count) x._2.foreach(r => { val v1 = r._1.toString val v2 = r._2.toString if (neighbors.containsKey(v1)) { neighbors.get(v1).add(v2) } else { val temp = new util.HashSet[String]() temp.add(v2) neighbors.put(v1, temp) } if (neighbors.containsKey(v2)) { neighbors.get(v2).add(v1) } else { val temp = new util.HashSet[String]() temp.add(v1) neighbors.put(v2, temp) } }) println("BKCliqueFinder---" + x._1 + "---" + System.currentTimeMillis()) finder.findMaxCliques().asScala } //jgrapht 中的極大團(tuán)算法 def find2(x: (String, List[(Long, Long)]), count: Int): Set[util.Set[String]] = { println(x._1 + "|s|" + x._2.size) println("BKCliqueFinder---" + x._1 + "---" + System.currentTimeMillis()) val to_clique = new SimpleGraph[String, DefaultEdge](classOf[DefaultEdge]) x._2.foreach(r => { val v1 = r._1.toString val v2 = r._2.toString to_clique.addVertex(v1) to_clique.addVertex(v2) to_clique.addEdge(v1, v2) }) val finder = new BronKerboschCliqueFinder(to_clique) val list = finder.getAllMaximalCliques.asScala var result = Set[util.Set[String]]() list.foreach(x => { if (x.size() >= count) result = result + x }) println("BKCliqueFinder---" + x._1 + "---" + System.currentTimeMillis()) result } }
//自己實(shí)現(xiàn)的極大團(tuán)算法
import java.util.*; /** * [@author](https://my.oschina.net/arthor) mopspecial@gmail.com * [@date](https://my.oschina.net/u/2504391) 2017/7/31 */ public class CliqueFinder { private Map<String, Set<String>> neighbors; private Set<String> nodes; private Set<Set<String>> maxCliques = new HashSet<>(); private Integer minSize; public CliqueFinder(Map<String, Set<String>> neighbors, Integer minSize) { this.neighbors = neighbors; this.nodes = neighbors.keySet(); this.minSize = minSize; } private void bk3(Set<String> clique, List<String> candidates, List<String> excluded) { if (candidates.isEmpty() && excluded.isEmpty()) { if (!clique.isEmpty() && clique.size() >= minSize) { maxCliques.add(clique); } return; } for (String s : degeneracy_order(candidates)) { List<String> new_candidates = new ArrayList<>(candidates); new_candidates.retainAll(neighbors.get(s)); List<String> new_excluded = new ArrayList<>(excluded); new_excluded.retainAll(neighbors.get(s)); Set<String> nextClique = new HashSet<>(clique); nextClique.add(s); bk2(nextClique, new_candidates, new_excluded); candidates.remove(s); excluded.add(s); } } private void bk2(Set<String> clique, List<String> candidates, List<String> excluded) { if (candidates.isEmpty() && excluded.isEmpty()) { if (!clique.isEmpty() && clique.size() >= minSize) { maxCliques.add(clique); } return; } String pivot = pick_random(candidates); if (pivot == null) { pivot = pick_random(excluded); } List<String> tempc = new ArrayList<>(candidates); tempc.removeAll(neighbors.get(pivot)); for (String s : tempc) { List<String> new_candidates = new ArrayList<>(candidates); new_candidates.retainAll(neighbors.get(s)); List<String> new_excluded = new ArrayList<>(excluded); new_excluded.retainAll(neighbors.get(s)); Set<String> nextClique = new HashSet<>(clique); nextClique.add(s); bk2(nextClique, new_candidates, new_excluded); candidates.remove(s); excluded.add(s); } } private List<String> degeneracy_order(List<String> innerNodes) { List<String> result = new ArrayList<>(); Map<String, Integer> deg = new HashMap<>(); for (String node : innerNodes) { deg.put(node, neighbors.get(node).size()); } while (!deg.isEmpty()) { Integer min = Collections.min(deg.values()); String minKey = null; for (String key : deg.keySet()) { if (deg.get(key).equals(min)) { minKey = key; break; } } result.add(minKey); deg.remove(minKey); for (String k : neighbors.get(minKey)) { if (deg.containsKey(k)) { deg.put(k, deg.get(k) - 1); } } } return result; } private String pick_random(List<String> random) { if (random != null && !random.isEmpty()) { return random.get(0); } else { return null; } } public Set<Set<String>> findMaxCliques() { this.bk3(new HashSet<>(), new ArrayList<>(nodes), new ArrayList<>()); return maxCliques; } public static void main(String[] args) { Map<String, Set<String>> neighbors = new HashMap<>(); neighbors.put("0", new HashSet<>(Arrays.asList("1", "2", "3"))); neighbors.put("1", new HashSet<>(Arrays.asList("0", "2"))); neighbors.put("2", new HashSet<>(Arrays.asList("0", "1", "3", "6"))); neighbors.put("3", new HashSet<>(Arrays.asList("0", "2", "4", "5"))); neighbors.put("4", new HashSet<>(Arrays.asList("3", "5", "6"))); neighbors.put("5", new HashSet<>(Arrays.asList("3", "4", "6"))); neighbors.put("6", new HashSet<>(Arrays.asList("2", "4", "5"))); neighbors.put("7", new HashSet<>(Arrays.asList("6"))); CliqueFinder finder = new CliqueFinder(neighbors, 3); finder.bk3(new HashSet<>(), new ArrayList<>(neighbors.keySet()), new ArrayList<>()); System.out.println(finder.maxCliques); } }
看完上述內(nèi)容,你們對(duì)Spark Graphx如何實(shí)現(xiàn)圖中極大團(tuán)挖掘有進(jìn)一步的了解嗎?如果還想了解更多知識(shí)或者相關(guān)內(nèi)容,請(qǐng)關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝大家的支持。
當(dāng)前題目:SparkGraphx如何實(shí)現(xiàn)圖中極大團(tuán)挖掘
網(wǎng)頁(yè)網(wǎng)址:http://jinyejixie.com/article40/pocsho.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供品牌網(wǎng)站建設(shè)、建站公司、網(wǎng)站導(dǎo)航、企業(yè)網(wǎng)站制作、響應(yīng)式網(wǎng)站、移動(dòng)網(wǎng)站建設(shè)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)