星期三, 九月 26, 2012

用Parallel和foreach包玩转并行计算


众所周知,在大数据时代R语言有两个弱项,其中一个就是只能使用单线程计算。但是在2.14版本之后,R就内置了parallel包,强化了R的并行计算能力。parallel包实际上整合了之前已经比较成熟的snow包和multicore包。前者已经在之前的文章中介绍过了,而后者无法在windows下运行,所以也就先不管了。parallel包可以很容易的在计算集群上实施并行计算,在多个CPU核心的单机上,也能发挥并行计算的功能。我们今天就来探索一下parallel包在多核心单机上的使用。

parallel包的思路和lapply函数很相似,都是将输入数据分割、计算、整合结果。只不过并行计算是用到了不同的cpu来运算。下面的例子是解决欧拉问题的第14个问题

# 并行计算euler14问题
# 自定义函数以返回原始数值和步数
func <- function(x) {
    n = 1
    raw <- x
    while (x > 1) {
        x <- ifelse(x%%2==0,x/2,3*x+1)
        n = n + 1
    }
    return(c(raw,n))
}
 
library(parallel)
# 用system.time来返回计算所需时间
system.time({
    x <- 1:1e6
    cl <- makeCluster(4)  # 初始化四核心集群
    results <- parLapply(cl,x,func) # lapply的并行版本
    res.df <- do.call('rbind',results) # 整合结果
    stopCluster(cl) # 关闭集群
})
# 找到最大的步数对应的数字
res.df[which.max(res.df[,2]),1]
 
上例中关键的函数就是parLapply,其中三个参数分别是集群对象、输入参数和运算函数名。我们最后算出的结果是837799。

foreach包是revolutionanalytics公司贡献给R开源社区的一个包。它能使R中的并行计算更为方便。与sapply函数类似,foreach函数中的第一个参数是输入参数,%do%后面的对象表示运算函数,而.combine则表示运算结果的整合方式。 下面的例子即是用foreach来完成前面的同一个任务。如果要启用并行,则需要加载doParallel包,并将%do%改为%dopar%。这样一行代码就能方便的完成并行计算了。

library(foreach)
# 非并行计算方式,类似于sapply函数的功能
x <- foreach(x=1:1000,.combine='rbind') %do% func(x)
 
# 启用parallel作为foreach并行计算的后端
library(doParallel)
cl <- makeCluster(4)
registerDoParallel(cl)
# 并行计算方式
x <- foreach(x=1:1000,.combine='rbind') %dopar% func(x)
stopCluster(cl)

下面的例子是用foreach函数来进行随机森林的并行计算。我们一共要生成十万个树来组合成一个随机森林,每个核心负责生成两万五千个树。最后用combine进行组合。

# 随机森林的并行计算
library(randomForest)
 cl <- makeCluster(4)
 registerDoParallel(cl)
 rf <- foreach(ntree=rep(25000, 4), 
                  .combine=combine,
                  .packages='randomForest') %dopar%
          randomForest(Species~., data=iris, ntree=ntree)
stopCluster(cl)

并行不仅可以在建模时进行,也可以在数据整理阶段进行。之前我们提到过的plyr包也可以进行并行,前提是加载了foreach包,并且参数.parallel设置为TURE。当然不是所有的任务都能并行计算,而且并行计算前你需要改写你的代码。

参考资料:
http://stat.ethz.ch/R-manual/R-devel/library/parallel/doc/parallel.pdf
http://cran.r-project.org/web/packages/foreach/vignettes/foreach.pdf
http://cran.r-project.org/web/packages/doParallel/vignettes/gettingstartedParallel.pdf

7 条评论:

  1. 实际使用并行的时候发现一个问题,如果你在b函数里面调用一个%dopar% a()是没问题的,但是如果在c函数里面再调用b函数就会报错说无法发现函数a。
    这个逻辑有点奇怪……

    回复删除
  2. 如果使用有超线程技术的cpu,比如是2核4线程,那么在makeCluster一步应该选2还是4呢?要不要把超线程关掉再跑计算程序呢?

    回复删除
    回复
    1. 4或2应该都可以,如果4个任务放到2个核心上,它也会自动分配的。detectCores(logical = FALSE)这条函数可以检测真实核心数。

      删除
  3. 此评论已被作者删除。

    回复删除
  4. library(doParallel)
    library(SkewHyperbolic)
    library(foreach)

    #下面的函数用于算n第3个到最后一个元素的逆累积分布函数,n的第一和第二个元系是分布的参数
    qu<-function(n){
    n=matrix(n,,1);
    d=n[1,1];
    b=n[2,1];
    len=length(n);
    da=matrix(n[3:len,]);
    se=qskewhyp(da, mu=0, delta = sqrt(d), beta = b, nu = d, lower.tail = TRUE,method="integrate", log.p=FALSE,uniTol=1e-12,intTol=1e-12);
    return(t(t(se)));}

    #生成模拟数据,第一个和第二对应了参数
    y=seq(from=0.1,to=0.9,length=500);
    u=matrix(y,,1);
    u[1,]=20;
    u[2,]=0.5;
    m=matrix(u,500,15);

    #开始并行计算
    cl <- makeCluster(2)
    registerDoParallel(cl)
    system.time(foreach(i=1:15) %dopar% qu(m[,i]));



    我在以上代码中提示:错误于qu(m[, i]) : task 1 failed - "没有"qskewhyp"这个函数"

    这个函数qskewhyp是SkewHyperbolic包里面的,我写的qu这个函数对于m的单个列都可以调用,不知道是不是这个并行计算的使用还依赖于其它包的支持?

    回复删除
    回复
    1. 这是因为foreach不知道如何把qskewhyp分发到计算节点上,你可以加上包名再试试,例如SkewHyperbolic::qskewwhyp

      删除