Разархивируйте файл в потоке данных перед чтением

Наш клиент загружает файлы в GCS, но они заархивированы. Есть ли способ, используя SDK Java Dataflow, в котором мы можем просмотреть все заархивированные файлы, разархивировать файл, объединить все полученные файлы .csv в один файл, а затем выполнить только преобразования TextIO?

ИЗМЕНИТЬ

Чтобы ответить на вопросы jkffs,

  1. Ну, на самом деле мне не нужно объединять их все в один файл, просто это было бы намного проще с точки зрения чтения.
  2. Это ZIP-файлы, а не GZ или BZ или что-то еще. Каждый ZIP содержит несколько файлов. Имена файлов не имеют большого значения, и да, я бы предпочел, чтобы TextIO прозрачно распаковывал и объединял все файлы для каждого архива.

Надеюсь, это поможет!


person iLikeBreakfast    schedule 06.10.2015    source источник
comment
Не могли бы вы объяснить, почему вы хотите объединить все файлы .csv в один перед их дальнейшей обработкой — вам нужно обрабатывать файл последовательно, а не параллельно? Есть ли после этого какая-либо распараллеливаемая обработка?   -  person jkff    schedule 06.10.2015
comment
Кроме того, можете ли вы рассказать больше о ваших zip-файлах? Это действительно .zip или .gz/.bz2? (В настоящее время TextIO поддерживает сжатие gzip и bzip2 — cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/ , однако не .zip-файлы) Если это .zip, много ли файлов внутри каждого .zip-файла или только один? Имеют ли значение имена файлов внутри архива? Например. если бы TextIO прозрачно распаковал и объединил все файлы в zip-архиве, сработало бы это для вас?   -  person jkff    schedule 06.10.2015
comment
@jkff Я обновил вопрос, добавив несколько ответов!   -  person iLikeBreakfast    schedule 10.10.2015


Ответы (2)


потому что у меня была такая же проблема, и я пришел к этому 1-летнему и довольно неполному решению. Вот полный пример того, как распаковать файлы в потоке данных Google:

public class SimpleUnzip {

private static final Logger LOG = LoggerFactory.getLogger(SimpleUnzip.class);

public static void main(String[] args){
    Pipeline p = Pipeline.create(
            PipelineOptionsFactory.fromArgs(args).withValidation().create());

    GcsUtilFactory factory = new GcsUtilFactory();
    GcsUtil util = factory.create(p.getOptions());
    try{
        List<GcsPath> gcsPaths = util.expand(GcsPath.fromUri("gs://tlogdataflow/test/*.zip"));
        List<String> paths = new ArrayList<String>();

        for(GcsPath gcsp: gcsPaths){
            paths.add(gcsp.toString());
        }
        p.apply(Create.of(paths))
            .apply(ParDo.of(new UnzipFN()));
        p.run();

        }
    catch(Exception e){
        LOG.error(e.getMessage());
        }


}

public static class UnzipFN extends DoFn<String,Long>{
    private static final long serialVersionUID = 2015166770614756341L;
    private long filesUnzipped=0;
    @Override
    public void processElement(ProcessContext c){
        String p = c.element();
        GcsUtilFactory factory = new GcsUtilFactory();
        GcsUtil u = factory.create(c.getPipelineOptions());
        byte[] buffer = new byte[100000000];
        try{
            SeekableByteChannel sek = u.open(GcsPath.fromUri(p));
            InputStream is = Channels.newInputStream(sek);
            BufferedInputStream bis = new BufferedInputStream(is);
            ZipInputStream zis = new ZipInputStream(bis);
            ZipEntry ze = zis.getNextEntry();
            while(ze!=null){
                LOG.info("Unzipping File {}",ze.getName());
                WritableByteChannel wri = u.create(GcsPath.fromUri("gs://tlogdataflow/test/" + ze.getName()), getType(ze.getName()));
                OutputStream os = Channels.newOutputStream(wri);
                int len;
                while((len=zis.read(buffer))>0){
                    os.write(buffer,0,len);
                }
                os.close();
                filesUnzipped++;
                ze=zis.getNextEntry();

            }
            zis.closeEntry();
            zis.close();

        }
        catch(Exception e){
            e.printStackTrace();
        }
    c.output(filesUnzipped);
    }

    private String getType(String fName){
        if(fName.endsWith(".zip")){
            return "application/x-zip-compressed";
        }
        else {
            return "text/plain";
        }
    }
}

}

person bigdataclown    schedule 11.11.2016

Dataflow/Apache Beam поддерживает ZIP-сжатые файлы в TextIO автоматически: TextIO.read().from(filepattern) автоматически распаковывает файлы, соответствующие шаблону файла в соответствии с их расширением, а .zip является одним из поддерживаемых форматов — в этом случае он неявно объединяет все файлы внутри .zip в один файл и анализировать строки текста из него.

Вы также можете явно указать тип сжатия, используя TextIO.read().from(filepattern).withCompressionType(...), если файлы не имеют расширения.

person jkff    schedule 14.10.2015
comment
Стоит отметить, что это только для библиотеки Java. На данный момент библиотека Python по-прежнему поддерживает только BZIP и GZIP2 (beam.apache.org/releases/pydoc/2.3.0/_modules/apache_beam/io/), и я не могу найти на github ни одного запроса на включение, связанного с добавлением поддержки. - person George S; 12.03.2020