library(rhdfs)
library(rmr2)
hdfs.init()
hdfs.delete("/user/output/lm.output")
map <- function(k,lines) {
lines<-unlist(strsplit(lines,‘#‘))
k<-lines[1]
x<-unlist(strsplit(lines[2],‘,‘))
y<-unlist(strsplit(lines[3],‘,‘))
x<-as.numeric(x)
y<-as.numeric(y)
lm <- lm(y ~ x)
return( keyval(k, lm$coefficients[[2]]) )
}
reduce <- function(key, lmres) {
# keyval(key, lmres)
return lmres
}
wordcount <- function (input, output=NULL) {
mapreduce(input=input, output=output, input.format="text",output.format = "text", map=map, reduce=reduce)
}
hdfs.root <- ‘/user‘
hdfs.data <- file.path(hdfs.root, ‘input/lm.input‘)
hdfs.out <- file.path(hdfs.root, ‘output/lm.output‘)
out <- wordcount(hdfs.data, hdfs.out)