Споделена памет в паралел foreach в R

Описание на проблема:

Имам голяма матрица c, заредена в RAM памет. Целта ми е чрез паралелна обработка да имам достъп само за четене до него. Въпреки това, когато създавам връзките, или използвам doSNOW, doMPI, big.matrix и т.н., използваното количество ram се увеличава драстично.

Има ли начин правилно да се създаде споделена памет, откъдето всички процеси могат да четат, без да се създава локално копие на всички данни?

Пример:

libs<-function(libraries){# Installs missing libraries and then load them
  for (lib in libraries){
    if( !is.element(lib, .packages(all.available = TRUE)) ) {
      install.packages(lib)
    }
    library(lib,character.only = TRUE)
  }
}

libra<-list("foreach","parallel","doSNOW","bigmemory")
libs(libra)

#create a matrix of size 1GB aproximatelly
c<-matrix(runif(10000^2),10000,10000)
#convert it to bigmatrix
x<-as.big.matrix(c)
# get a description of the matrix
mdesc <- describe(x)
# Create the required connections    
cl <- makeCluster(detectCores ())
registerDoSNOW(cl)
out<-foreach(linID = 1:10, .combine=c) %dopar% {
  #load bigmemory
  require(bigmemory)
  # attach the matrix via shared memory??
  m <- attach.big.matrix(mdesc)
  #dummy expression to test data aquisition
  c<-m[1,1]
}
closeAllConnections()

RAM: Използване на RAM по време на ‹code›foreach‹/code› в изображението по-горе може да откриете, че паметта се увеличава много, докато foreach свърши и бъде освободена.


person Stanislav    schedule 22.07.2015    source източник
comment
Имам абсолютно същия проблем в момента и съм силно заинтересован от решение. Също така забелязах, че се правят копия, вместо да се споделя паметта.   -  person NoBackingDown    schedule 23.07.2015


Отговори (2)


Мисля, че решението на проблема може да се види от публикацията на Стив Уестън, авторът на пакета foreach, тук. Там той заявява:

Пакетът doParallel автоматично ще експортира променливи към работниците, които са посочени в цикъла foreach.

Така че мисля, че проблемът е, че във вашия код вашата голяма матрица c е посочена в присвояването c<-m[1,1]. Просто опитайте xyz <- m[1,1] вместо това и вижте какво ще се случи.

Ето пример с big.matrix с файлова поддръжка:

#create a matrix of size 1GB aproximatelly
n <- 10000
m <- 10000
c <- matrix(runif(n*m),n,m)
#convert it to bigmatrix
x <- as.big.matrix(x = c, type = "double", 
                 separated = FALSE, 
                 backingfile = "example.bin", 
                 descriptorfile = "example.desc")
# get a description of the matrix
mdesc <- describe(x)
# Create the required connections    
cl <- makeCluster(detectCores ())
registerDoSNOW(cl)
## 1) No referencing
out <- foreach(linID = 1:4, .combine=c) %dopar% {
  t <- attach.big.matrix("example.desc")
  for (i in seq_len(30L)) {
    for (j in seq_len(m)) {
      y <- t[i,j]
    }
  }
  return(0L)
}

въведете описание на изображението тук

## 2) Referencing
out <- foreach(linID = 1:4, .combine=c) %dopar% {
  invisible(c) ## c is referenced and thus exported to workers
  t <- attach.big.matrix("example.desc")
  for (i in seq_len(30L)) {
    for (j in seq_len(m)) {
      y <- t[i,j]
    }
  }
  return(0L)
}
closeAllConnections()

въведете описание на изображението тук

person NoBackingDown    schedule 24.07.2015
comment
Не можах да видя, че c<-m[1,1] всъщност зарежда c, тъй като очаквах, че това ще генерира нова променлива вместо , добре го прочетох. Това означава, че всъщност паметта е споделена и аз губя времето си в проучване на различни опции заради c. Благодаря ви много за помощта! PS: Не мисля, че кодът по-долу invisible някога се изпълнява. - person Stanislav; 24.07.2015
comment
@Stanislav Съгласен съм, че това е малко неочаквано поведение. Ако отговорът ми разреши проблема ви, ще се радвам да обмислите дали да го приемете. - person NoBackingDown; 24.07.2015
comment
@Stanislav Този отговор е правилен, трябва да сте сигурни какво всъщност изнасяте на работниците. Като цяло е добра практика да нямате еднакви имена на променливи вътре и извън циклите, освен ако всъщност не променяте един и същ обект. - person cdeterman; 24.07.2015

Като алтернатива, ако сте на Linux/Mac и искате CoW споделена памет, използвайте forks. Първо заредете всичките си данни в основната нишка и след това стартирайте работещи нишки (разклонения) с обща функция mcparallel от пакета parallel.

Можете да съберете техните резултати с mccollect или с използването на наистина споделена памет, като използвате библиотеката Rdsm, като това:

library(parallel)
library(bigmemory) #for shared variables
shared<-bigmemory::big.matrix(nrow = size, ncol = 1, type = 'double')
shared[1]<-1 #Init shared memory with some number

job<-mcparallel({shared[1]<-23}) #...change it in another forked thread
shared[1,1] #...and confirm that it gets changed
# [1] 23

Можете да потвърдите, че стойността наистина се актуализира във фонов режим, ако забавите записа:

fn<-function()
{
  Sys.sleep(1) #One second delay
  shared[1]<-11
}

job<-mcparallel(fn())
shared[1] #Execute immediately after last command
# [1] 23
aaa[1,1] #Execute after one second
# [1] 11
mccollect() #To destroy all forked processes (and possibly collect their output)

За да контролирате паралелността и да избегнете условията на състезание, използвайте ключалки:

library(synchronicity) #for locks
m<-boost.mutex() #Lets create a mutex "m"

bad.incr<-function() #This function doesn't protect the shared resource with locks:
{
  a<-shared[1]
  Sys.sleep(1)
  shared[1]<-a+1
}

good.incr<-function()
{
  lock(m)
  a<-shared[1]
  Sys.sleep(1)
  shared[1]<-a+1
  unlock(m)
}

shared[1]<-1
for (i in 1:5) job<-mcparallel(bad.incr())
shared[1] #You can verify, that the value didn't get increased 5 times due to race conditions

mccollect() #To clear all threads, not to get the values
shared[1]<-1
for (i in 1:5) job<-mcparallel(good.incr())
shared[1] #As expected, eventualy after 5 seconds of waiting you get the 6
#[1] 6 

mccollect()

Редактиране:

Опростих малко зависимостите, като размених Rdsm::mgrmakevar в bigmemory::big.matrix. mgrmakevar така или иначе вътрешно извиква big.matrix и не се нуждаем от нищо повече.

person Adam Ryczkowski    schedule 22.06.2016