Параллельная обработка файлов в Scala

Предположим, мне нужно параллельно обрабатывать файлы в заданной папке. В Java я бы создал поток FolderReader для чтения имен файлов из папки и пул потоков FileProcessor. FolderReader считывает имена файлов и передает функцию обработки файлов (Runnable) исполнителю пула.

В Scala я вижу два варианта:

  • создать пул из FileProcessor актеров и запланировать функцию обработки файлов с помощью Actors.Scheduler.
  • создайте актера для каждого имени файла при чтении имен файлов.

Имеет ли это смысл? Какой лучший вариант?


person Michael    schedule 20.07.2012    source источник
comment
Параллельный доступ к жесткому диску — это плохо. Это вызывает дополнительные магнитные движения головы. Не делай этого.   -  person xiefei    schedule 20.07.2012
comment
Актеры здесь неуместны.   -  person Daniel C. Sobral    schedule 20.07.2012
comment
@DanielC.Sobral Спасибо. Не могли бы вы объяснить, почему и что использовать вместо этого?   -  person Michael    schedule 20.07.2012
comment
@Michael Актеры в Scala плохо работают с блокирующими задачами, такими как описанная. Некоторые из приведенных ниже ответов содержат альтернативы.   -  person Daniel C. Sobral    schedule 20.07.2012
comment
@DanielC.Sobral Понятно. Альтернатива Future я полагаю. Еще раз спасибо.   -  person Michael    schedule 20.07.2012
comment
Коллекции @Michael Parallel также работают.   -  person Daniel C. Sobral    schedule 20.07.2012
comment
@DanielC.Sobral Ну, теперь мне интересно, когда мне следует использовать параллельные коллекции, а когда — Futures.   -  person Michael    schedule 20.07.2012
comment
Если ваши файлы не находятся на разных устройствах или ваши усилия по обработке сильно привязаны к процессору, параллельная обработка файлов вряд ли будет полезной. Кроме того, вы используете слово «параллельный», но я думаю, что вы имеете в виду «параллельный». Актеры предназначены для параллелизма, а не для параллелизма. Фьючерсы и параллельные структуры данных предназначены для параллелизма.   -  person James Iry    schedule 20.07.2012
comment
Как правило, если между отдельными вычислениями нет взаимодействий (т. е. если проблема досадно параллельна), вам лучше использовать параллельные коллекции. Фьючерсы превосходны в случаях, когда может существовать сложная DAG зависимостей между распараллеливаемыми в противном случае вычислениями, но они излишни для смущающе параллельных задач.   -  person Dave Griffith    schedule 20.07.2012


Ответы (6)


Я предлагаю изо всех сил держаться как можно дальше от нитей. К счастью, у нас есть лучшие абстракции, которые заботятся о том, что происходит ниже, и в вашем случае мне кажется, что вам не нужно использовать актеров (пока вы можете), но вы можете использовать более простую абстракцию, называемую фьючерсами. Они являются частью библиотеки с открытым исходным кодом Akka, и я думаю, что в будущем они также станут частью стандартной библиотеки Scala.

Future[T] — это просто то, что вернет T в будущем.

Все, что вам нужно для запуска будущего, — это иметь неявный ExecutionContext, который вы можете получить из службы исполнителя Java. Тогда вы сможете насладиться элегантным API и тем фактом, что future — это монада для преобразования коллекций в коллекции futures, сбора результата и так далее. Я предлагаю вам взглянуть на http://doc.akka.io/docs/akka/2.0.1/scala/futures.html

object TestingFutures {
  implicit val executorService = Executors.newFixedThreadPool(20)
  implicit val executorContext = ExecutionContext.fromExecutorService(executorService)

  def testFutures(myList:List[String]):List[String]= {

    val listOfFutures : Future[List[String]] = Future.traverse(myList){
      aString => Future{
                        aString.reverse
                       }
     }
    val result:List[String] = Await.result(listOfFutures,1 minute)
    result

  }
}

Здесь много всего происходит:

  • Я использую Future.traverse, который получает в качестве первого параметра M[T]<:Traversable[T], а в качестве второго параметра T => Future[T] или, если вы предпочитаете Function1[T,Future[T]], возвращает Future[M[T]]
  • Я использую метод Future.apply для создания анонимного класса типа Future[T]

Есть много других причин обратить внимание на фьючерсы Akka.

  • Фьючерсы могут быть отображены, потому что они монады, т.е. вы можете связать выполнение фьючерсов в цепочку:

    Future { 3 }.map { _ * 2 }.map { _.toString }

  • Фьючерсы имеют обратный вызов: future.onComplete, onSuccess, onFailure, andThen и т. д.

  • Фьючерсы поддерживают не только обход, но и понимание

person Edmondo1984    schedule 20.07.2012

В зависимости от того, что вы делаете, это может быть так же просто, как

for(file<-files.par){
   //process the file
}
person Dave Griffith    schedule 20.07.2012
comment
Вот и я тоже сразу подумал. Рад видеть здесь ответ, ссылающийся на API параллельных коллекций. +1 - person Connor Doyle; 20.07.2012

В идеале вы должны использовать двух актеров. Один для чтения списка файлов, а другой для фактического чтения файла.

Вы запускаете процесс, просто отправляя одно «стартовое» сообщение первому действующему лицу. Затем актор может прочитать список файлов и отправить сообщение второму актору. Затем второй актор читает файл и обрабатывает его содержимое.

Наличие нескольких акторов, что может показаться сложным, на самом деле хорошо в том смысле, что у вас есть куча объектов, взаимодействующих друг с другом, как в теоретической объектно-ориентированной системе.

Редактировать: вы ДЕЙСТВИТЕЛЬНО не должны выполнять параллельное чтение одного файла.

person darth10    schedule 20.07.2012

Я собирался точно написать, что сделал @Edmondo1984, но он меня опередил. :) Я во многом поддерживаю его предложение. Я также предлагаю вам прочитать документацию для Akka 2.0.2. . Кроме того, я приведу вам немного более конкретный пример:

import akka.dispatch.{ExecutionContext, Future, Await}
import akka.util.duration._
import java.util.concurrent.Executors
import java.io.File

val execService = Executors.newCachedThreadPool()
implicit val execContext = ExecutionContext.fromExecutorService(execService)

val tmp = new File("/tmp/")
val files = tmp.listFiles()
val workers = files.map { f =>
  Future {
    f.getAbsolutePath()
  }
}.toSeq
val result = Future.sequence(workers)
result.onSuccess {
  case filenames =>
    filenames.foreach { fn =>
      println(fn)
    }
}

// Artificial just to make things work for the example
Thread.sleep(100)
execContext.shutdown()

Здесь я использую sequence вместо traverse, но разница будет зависеть от ваших потребностей.

Иди с Будущим, мой друг; Актер в данном случае является более болезненным подходом.


person Derek Wyatt    schedule 20.07.2012
comment
Есть ли какие-либо существенные изменения по сравнению с 2.0.1? - person Edmondo1984; 20.07.2012
comment
Не то, чтобы я знал об этом, но всегда смотреть на последние и самые лучшие вещи — это хорошо. - person Derek Wyatt; 20.07.2012
comment
Обратите внимание, что Futures теперь являются частью самой Scala std lib. Вышеприведенное работает REPL, если вы замените akka._ на import scala.concurrent._ - person Matthew Fellows; 06.10.2013

Но если использовать актеров, что в этом плохого?

Если нам нужно читать/писать в какой-то файл свойств. Вот мой пример Java. Но все же с Akka Actors.

Допустим, у нас есть актор ActorFile, представляющий один файл. Хм.. Возможно это не может представлять собой один файл. Правильно? (было бы неплохо, если бы это было возможно). Итак, тогда он представляет несколько файлов, таких как PropertyFilesActor, тогда:

Почему бы не использовать что-то вроде этого:

public class PropertyFilesActor extends UntypedActor {

    Map<String, String> filesContent = new LinkedHashMap<String, String>();

    { // here we should use real files of cource
        filesContent.put("file1.xml", "");
        filesContent.put("file2.xml", "");
    }

    @Override
    public void onReceive(Object message) throws Exception {

        if (message instanceof WriteMessage)  {
            WriteMessage writeMessage = (WriteMessage) message;
            String content = filesContent.get(writeMessage.fileName);
            String newContent = content + writeMessage.stringToWrite;
            filesContent.put(writeMessage.fileName, newContent);
        }

        else if (message instanceof ReadMessage) {
            ReadMessage readMessage = (ReadMessage) message;
            String currentContent = filesContent.get(readMessage.fileName);
            // Send the current content back to the sender
            getSender().tell(new ReadMessage(readMessage.fileName, currentContent), getSelf());
        }

        else unhandled(message);

    }

}

...сообщение будет отправлено с параметром (fileName)

У него есть собственный in-box, принимающий такие сообщения, как:

  1. WriteLine(имя файла, строка)
  2. ReadLine(имя файла, строка)

Эти сообщения будут сохраняться в in-box в порядке, одно за другим. Актер будет выполнять свою работу, получая сообщения из ящика - сохраняя/читая, и тем временем отправляя обратную связь sender ! message обратно.

Таким образом, скажем, если мы пишем в файл свойств и отправляем, показывая содержимое на веб-странице. Мы можем начать показывать страницу (сразу после того, как мы отправили сообщение для сохранения данных в файл) и, как только мы получили обратную связь, обновить часть страницы данными из только что обновленного файла (с помощью ajax).

person ses    schedule 15.05.2013

Ну так хватай свои файлы и втыкай их в параллельную структуру

scala> new java.io.File("/tmp").listFiles.par
res0: scala.collection.parallel.mutable.ParArray[java.io.File] = ParArray( ... )

Потом...

scala> res0 map (_.length)
res1: scala.collection.parallel.mutable.ParArray[Long] = ParArray(4943, 1960, 4208, 103266, 363 ... )
person oxbow_lakes    schedule 20.07.2012