spark ssc.textFileStream не передает файлы из каталога

Я пытаюсь выполнить приведенный ниже код, используя eclipse (с maven conf) с 2 рабочими, и каждый из них имеет 2 ядра, или также пробовал с помощью spark-submit.

public class StreamingWorkCount implements Serializable {

    public static void main(String[] args) {
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN);
        JavaStreamingContext jssc = new JavaStreamingContext(
                "spark://192.168.1.19:7077", "JavaWordCount",
                new Duration(1000));
        JavaDStream<String> trainingData = jssc.textFileStream(
                "/home/bdi-user/kaushal-drive/spark/data/training").cache();
        trainingData.foreach(new Function<JavaRDD<String>, Void>() {

            public Void call(JavaRDD<String> rdd) throws Exception {
                List<String> output = rdd.collect();
                System.out.println("Sentences Collected from files " + output);
                return null;
            }
        });

        trainingData.print();
        jssc.start();
        jssc.awaitTermination();
    }
}

И журнал этого кода

15/01/22 21:57:13 INFO FileInputDStream: New files at time 1421944033000 ms:

15/01/22 21:57:13 INFO JobScheduler: Added jobs for time 1421944033000 ms
15/01/22 21:57:13 INFO JobScheduler: Starting job streaming job 1421944033000 ms.0 from job set of time 1421944033000 ms
15/01/22 21:57:13 INFO SparkContext: Starting job: foreach at StreamingKMean.java:33
15/01/22 21:57:13 INFO DAGScheduler: Job 3 finished: foreach at StreamingKMean.java:33, took 0.000094 s
Sentences Collected from files []
-------------------------------------------
15/01/22 21:57:13 INFO JobScheduler: Finished job streaming job 1421944033000 ms.0 from job set of time 1421944033000 ms
Time: 1421944033000 ms
-------------------------------------------15/01/22 21:57:13 INFO JobScheduler: Starting job streaming job 1421944033000 ms.1 from job set of time 1421944033000 ms


15/01/22 21:57:13 INFO JobScheduler: Finished job streaming job 1421944033000 ms.1 from job set of time 1421944033000 ms
15/01/22 21:57:13 INFO JobScheduler: Total delay: 0.028 s for time 1421944033000 ms (execution: 0.013 s)
15/01/22 21:57:13 INFO MappedRDD: Removing RDD 5 from persistence list
15/01/22 21:57:13 INFO BlockManager: Removing RDD 5
15/01/22 21:57:13 INFO FileInputDStream: Cleared 0 old files that were older than 1421943973000 ms: 
15/01/22 21:57:13 INFO FileInputDStream: Cleared 0 old files that were older than 1421943973000 ms: 
15/01/22 21:57:13 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer()

Проблема в том, что я не получаю данные из файла, который находится в каталоге. Помогите пожалуйста мне.


person Kaushal    schedule 22.01.2015    source источник
comment
Столкнулся с точно такой же проблемой на машине с Windows. Пожалуйста, предложите   -  person Gaurav Khare    schedule 08.09.2016
comment
Я думаю, что это работает только в HDFS, а не в локальной файловой системе.   -  person Gaurav Khare    schedule 08.09.2016
comment
см. также этот вопрос (и ответ): stackoverflow.com/questions/33704326/spark- проблема с потоковой передачей файлов   -  person Oliver Hummel    schedule 02.12.2019


Ответы (7)


Попробуйте это с другим каталогом, а затем скопируйте эти файлы в этот каталог во время выполнения задания.

person pzecevic    schedule 22.01.2015
comment
да, я также пробовал это с другим каталогом. я не понял, в чем проблема и как отлаживать, даже то, что не отображается в журнале. - person Kaushal; 22.01.2015
comment
Но был ли каталог пустым, когда вы начали работу? - person pzecevic; 22.01.2015
comment
На самом деле некоторые файлы уже есть, и я также копирую некоторые файлы, когда начинаю свою работу. - person Kaushal; 22.01.2015
comment
Это каталог HDFS или локальный? - person pzecevic; 22.01.2015
comment
Это должна быть HDFS, и вы должны копировать файлы в нее во время работы программы. - person pzecevic; 22.01.2015
comment
Вы имеете в виду, что каталог должен быть пустым при запуске, а когда я начинаю свою работу, он копируется в hdfs. - person Kaushal; 22.01.2015
comment
да, @pzecevic, ты прав. Spark обрабатывал только те файлы, которые были скопированы в HDFS после выполнения задания, он не читал предыдущие файлы, находящиеся в каталоге. - person Kaushal; 23.01.2015

была такая же проблема. Вот мой код:

lines = jssc.textFileStream("file:///Users/projects/spark/test/data');

TextFileSTream очень чувствителен; в итоге я сделал следующее:

1. Run Spark program
2. touch datafile
3. mv datafile datafile2
4. mv datafile2  /Users/projects/spark/test/data

и это сделало это.

person matthieu lieber    schedule 09.10.2015
comment
Я использую строки Windows = jssc.textFileStream(file:///c:/data'); lines.foreachRDD(file=›{file.foreach(fc=›{println(fc)})}) Я не получаю вывод. как это решить? - person Gnana; 26.03.2018

Я думаю, вам нужно добавить схему, т.е. file:// или hdfs:// перед вашим путем.


Отмена редактирования моего комментария, потому что: На самом деле file:// и hdfs:// нужно добавить "перед" путем, поэтому общий путь становится file:///tmp/file.txt или hdfs:///user/data. Если в конфигурации не задан NameNode, последний должен быть hdfs://host:port/user/data.

person tgpfeiffer    schedule 23.01.2015
comment
используя HDFS, это работает, но когда я использую локальную файловую систему с префиксом «file:///» (spark не поддерживает file://), она не работает. - person Kaushal; 23.01.2015
comment
Это может быть связано с тем, что вы используете кластер, и указанный путь должен быть доступен для всех исполнителей Spark, т. е. недостаточно, чтобы драйвер Spark мог получить к нему доступ. - person tgpfeiffer; 26.01.2015

JavaDoc предлагает функцию только потоковой передачи новых файлов.

Ссылка: https://spark.apache.org/docs/1.0.1/api/java/org/apache/spark/streaming/api/java/JavaStreamingContext.html#textFileStream(java.lang.String)

Создайте поток ввода, который отслеживает файловую систему, совместимую с Hadoop, на наличие новых файлов и считывает их как текстовые файлы (используя ключ как LongWritable, значение как Text и формат ввода как TextInputFormat). Файлы должны быть записаны в отслеживаемый каталог путем «перемещения» их из другого места в той же файловой системе. Имена файлов, начинающиеся с . игнорируются.

person Anil G    schedule 25.04.2016

textFileStream может отслеживать папку только тогда, когда файлы в папке добавляются или обновляются.

Если вы просто хотите читать файлы, вы можете использовать SparkContext.textFile.

person wzktravel    schedule 29.11.2016

Вы должны принять во внимание, что Spark Streaming будет читать только новые файлы в каталоге, а не обновленные (если они находятся в каталоге), а также все они должны иметь одинаковый формат.

Источник

person froblesmartin    schedule 11.03.2017

Я часами ломал голову, и что сработало для меня, так это

person Adelin    schedule 29.01.2021