Поток данных для программного слива конвейера

Пробовал слить пайплайн программно. Первая часть кода запускает конвейер, который вызывается с помощью отдельного потока. Затем программа некоторое время спит, а затем пытается опустошить конвейер. Я пытался запустить в Dataflow, и это не сработало. Конвейер запускается, но затем оставшаяся часть кода слива выглядит так, будто никогда не выполнялась. Пожалуйста, дайте мне знать, если это возможно.

Я попытался просмотреть ведение журнала, чтобы увидеть, какая часть программы выполняется, но похоже, что Dataflow будет регистрировать только рабочие журналы, поэтому не мог видеть многого до того, где он выполнялся. Я считаю, что код после запуска конвейера не выполняется.

DataflowRunner runner = DataflowRunner.fromOptions(options);
           DataflowPipelineJob pp = null; 
        // to run the pipeline which calls pipeline.run  
    new Thread(() -> runMethod(pp, runner, options)).start();   

        //Draining below
        try {
            Thread.sleep(360000);           
            GoogleCredential credential;
            credential = GoogleCredential.getApplicationDefault();
               if (credential.createScopedRequired()) {
                   credential = credential.createScoped(Collections.singletonList("https://www.googleapis.com/auth/cloud-platform"));
               }
               HttpTransport httpTransport;        
               httpTransport = GoogleNetHttpTransport.newTrustedTransport();
               JsonFactory jsonFactory = JacksonFactory.getDefaultInstance();
                  Dataflow client = new Dataflow.Builder(httpTransport, jsonFactory, credential)
                          .setApplicationName("Google Cloud Platform Sample 1")
                          .build(); 
               Job content = new Job();
               content.setProjectId("sample-id");
               content.setId(pp.getJobId());
               content.setRequestedState("JOB_STATE_DRAINING");
               client.projects().jobs()
                       .update("sample-id", pp.getJobId(), content)
                       .execute();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }  catch (GeneralSecurityException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
           }

person Roshan Fernando    schedule 25.08.2018    source источник


Ответы (1)


Конвейер будет истощаться до тех пор, пока в данных о рейсах есть . . Он прекратит прием данных, и когда все данные будут обработаны, задание остановится.

1- откуда вы получаете данные 2- все данные могут быть обработаны во время сна

person Nathan Nasser    schedule 28.08.2018
comment
Конвейер считывает неограниченный источник данных. Таким образом, работа выполняется бесконечно. Вопрос вот в чем слив не происходит, когда пайплайн запускается, то ждет какое-то время, а потом пытается вызвать слив. Итак, проверяем, действительно ли работает в одном и том же пайплайне запуск и последующий слив. Общий код выше. - person Roshan Fernando; 29.08.2018
comment
Чтобы убедиться, что вы используете правильный идентификатор проекта, вы должны использовать: content.setProjectId(options.getProject()); и .update(options.getProject(), pp.getJobId(), контент) - person Nathan Nasser; 30.08.2018
comment
Спасибо, попробую так - person Roshan Fernando; 30.08.2018