Разархивирайте файла в Dataflow преди четене

Клиентът ни качва файлове в GCS, но те са компресирани. Има ли някакъв начин, използвайки Java Dataflow SDK, в който можем да преминем през всички компресирани файлове, да разархивираме файла, да комбинираме всички получени .csv файлове в един файл и след това да направим само TextIO трансформациите?

РЕДАКТИРАНЕ

За да отговорите на въпросите на jkffs,

  1. Ами всъщност не трябва да ги комбинирам всички в един файл, просто би било много по-лесно от гледна точка на четене.
  2. Те са ZIP файлове, а не GZ или BZ или нещо друго. Всеки ZIP съдържа множество файлове. Имената на файловете не са наистина значими и да, всъщност бих предпочел TextIO прозрачно да декомпресира и обединява всички файлове на базата на архив.

Надявам се това да помогне!


person iLikeBreakfast    schedule 06.10.2015    source източник
comment
Бихте ли обяснили защо искате да комбинирате всички .csv файлове в един, преди да ги обработите допълнително - трябва ли да обработвате файла последователно, а не паралелно? Има ли паралелна обработка след това?   -  person jkff    schedule 06.10.2015
comment
Също така, можете ли да кажете повече за вашите zip файлове? Наистина ли са .zip или .gz/.bz2? (TextIO в момента поддържа gzip и bzip2 компресия - cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/ обаче не .zip файлове) Ако е .zip, има ли много файлове във всеки .zip файл или само един? Значими ли са имената на файловете в архива? напр. ако TextIO прозрачно декомпресира и свърже всички файлове в zip архива, това ще свърши ли работа за вас?   -  person jkff    schedule 06.10.2015
comment
@jkff Актуализирах въпроса с някои отговори!   -  person iLikeBreakfast    schedule 10.10.2015


Отговори (2)


защото имах същия проблем и стигнах само до това 1-годишно и доста непълно решение. Ето пълен пример за това как да разархивирате файлове в google dataflow:

public class SimpleUnzip {

private static final Logger LOG = LoggerFactory.getLogger(SimpleUnzip.class);

public static void main(String[] args){
    Pipeline p = Pipeline.create(
            PipelineOptionsFactory.fromArgs(args).withValidation().create());

    GcsUtilFactory factory = new GcsUtilFactory();
    GcsUtil util = factory.create(p.getOptions());
    try{
        List<GcsPath> gcsPaths = util.expand(GcsPath.fromUri("gs://tlogdataflow/test/*.zip"));
        List<String> paths = new ArrayList<String>();

        for(GcsPath gcsp: gcsPaths){
            paths.add(gcsp.toString());
        }
        p.apply(Create.of(paths))
            .apply(ParDo.of(new UnzipFN()));
        p.run();

        }
    catch(Exception e){
        LOG.error(e.getMessage());
        }


}

public static class UnzipFN extends DoFn<String,Long>{
    private static final long serialVersionUID = 2015166770614756341L;
    private long filesUnzipped=0;
    @Override
    public void processElement(ProcessContext c){
        String p = c.element();
        GcsUtilFactory factory = new GcsUtilFactory();
        GcsUtil u = factory.create(c.getPipelineOptions());
        byte[] buffer = new byte[100000000];
        try{
            SeekableByteChannel sek = u.open(GcsPath.fromUri(p));
            InputStream is = Channels.newInputStream(sek);
            BufferedInputStream bis = new BufferedInputStream(is);
            ZipInputStream zis = new ZipInputStream(bis);
            ZipEntry ze = zis.getNextEntry();
            while(ze!=null){
                LOG.info("Unzipping File {}",ze.getName());
                WritableByteChannel wri = u.create(GcsPath.fromUri("gs://tlogdataflow/test/" + ze.getName()), getType(ze.getName()));
                OutputStream os = Channels.newOutputStream(wri);
                int len;
                while((len=zis.read(buffer))>0){
                    os.write(buffer,0,len);
                }
                os.close();
                filesUnzipped++;
                ze=zis.getNextEntry();

            }
            zis.closeEntry();
            zis.close();

        }
        catch(Exception e){
            e.printStackTrace();
        }
    c.output(filesUnzipped);
    }

    private String getType(String fName){
        if(fName.endsWith(".zip")){
            return "application/x-zip-compressed";
        }
        else {
            return "text/plain";
        }
    }
}

}

person bigdataclown    schedule 11.11.2016

Dataflow / Apache Beam поддържа ZIP-компресирани файлове в TextIO автоматично: TextIO.read().from(filepattern) автоматично ще декомпресира файлове, съответстващи на файловия шаблон според тяхното разширение, а .zip е един от поддържаните формати - в този случай той имплицитно ще свърже всички файлове в .zip в един файл и анализирайте редове текст от него.

Можете също така изрично да посочите типа компресия, като използвате TextIO.read().from(filepattern).withCompressionType(...), ако файловете нямат разширение.

person jkff    schedule 14.10.2015
comment
Струва си да се отбележи, че това е само за библиотеката на Java. Засега библиотеката на Python все още поддържа само BZIP и GZIP2 (beam.apache.org/releases/pydoc/2.3.0/_modules/apache_beam/io/) и не мога да намеря никакви заявки за изтегляне в github, свързани с добавянето на поддръжка. - person George S; 12.03.2020