-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathigraph_sp_mpi_sampled.R
88 lines (72 loc) · 2.56 KB
/
igraph_sp_mpi_sampled.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
###### Setting up libs ##########
#Function to check whether package is installed
is.installed <- function(mypkg){
is.element(mypkg, installed.packages()[,1])
}
dir.create(Sys.getenv("R_LIBS_USER"), showWarnings = FALSE, recursive = TRUE)
library(doMPI)
if (!is.installed("igraph")){
install.packages("igraph", dependencies=TRUE, Sys.getenv("R_LIBS_USER"), repos = "http://cran.uni-muenster.de/")
}
library(igraph)
if (!is.installed("plyr")){
install.packages("plyr", dependencies=TRUE, Sys.getenv("R_LIBS_USER"), repos = "http://cran.uni-muenster.de/")
}
library(plyr)
####### Define functions #######
idf <- function(edge_id, edge_cnts, edge_freq_sum) {
frq <- cnt$freq[cnt$x[edge_id]]
idf <- log(n/frq)
return(idf)
}
norm_weights <- function(weights) {
max_w <- max(weights)
return(sapply(weights, inv_x, max_w))
}
inv_x <- function(x, max_x) {
return(1 + max_x - x)
}
####### Parameters #######
gfile <- "data/eta-dataset-joern-hess.data.txt.dbpgraph.gz"
####### Load graph #######
message("loading data")
graph_file <- gzfile(gfile)
input <- read.csv(graph_file, header=F, sep = "", stringsAsFactors=F)
g <- graph.data.frame(input[,1:2], directed=TRUE)
weights <- input[,3:3]
####### Create MPI cluster ######
message("register MPI cluster")
cl <- startMPIcluster()
registerDoMPI(cl)
message(sprintf('Parallel time using doMPI on %d workers', getDoParWorkers()))
####### Start real computation #######
#Weighting edges
message("computing edge weights")
cnt <- count(weights)
n <- sum(cnt$freq)
#weights_idf <- foreach (x=weights, .combine='c') %dopar% {idf(x, cnt, n)}
weights_idf <- sapply(weights, idf, c(cnt, n))
weights_idf <- norm_weights(weights_idf)
edge_file <- gzfile("data/edge_weights_idf.data.gz")
write.csv(weights_idf, file=edge_file)
E(g)$weight <- weights_idf
message("Edge weights written to data/edge_weights_idf.data.gz")
#Shortest path
message("Vertex count ", length(V(g)) )
name <- V(g)$name
nodes <- V(g)[V(g)$name <= 955]
foreach (i=nodes, .inorder=FALSE, .packages=c("igraph","plyr")) %dopar% {
n1 <- V(g)[i]
fname <- sprintf("data/sp_%s.gz", V(g)[i]$name)
val <- shortest.paths(g, n1, to=V(g), mode="all", weights=NULL)
rs <- cbind.data.frame(V(g)[i]$name, name, t(val))
rs <- head(arrange(rs, rs[[3]]), n = topK)
write.table(rs, file=gzfile(fname), quote=F, row.names=F, col.names=F)
#write(rs, file=gzfile(fname), append=FALSE) #ncolumns=3)
message("Finished starting node ", i)
}
message("Finished all")
####### Cleanup #######
# Shutdown the cluster and quit
closeCluster(cl)
mpi.quit()