Как прочитать огромный файл CSV в Mule

Я использую Mule Studio 3.4.0 Community Edition. У меня есть большая проблема с тем, как анализировать большой CSV-файл, поступающий с File Endpoint. Сценарий таков, что у меня есть 3 файла CSV, и я бы поместил содержимое файлов в базу данных. Но когда я пытаюсь загрузить огромный файл (около 144 МБ), я получаю исключение «OutOfMemory». В качестве решения я подумал о том, чтобы разделить/разделить мой большой CSV на CSV меньшего размера (я не знаю, является ли это решение лучшим), или попытаться найти способ обработки CSV без исключения.

<file:connector name="File" autoDelete="true" streaming="true" validateConnections="true" doc:name="File"/>

<flow name="CsvToFile" doc:name="CsvToFile">
        <file:inbound-endpoint path="src/main/resources/inbox" moveToDirectory="src/main/resources/processed"  responseTimeout="10000" doc:name="CSV" connector-ref="File">
            <file:filename-wildcard-filter pattern="*.csv" caseSensitive="true"/>
        </file:inbound-endpoint>
        <component class="it.aizoon.grpBuyer.AddMessageProperty" doc:name="Add Message Property"/>
        <choice doc:name="Choice">
            <when expression="INVOCATION:nome_file=azienda" evaluator="header">
                <jdbc-ee:csv-to-maps-transformer delimiter="," mappingFile="src/main/resources/companies-csv-format.xml" ignoreFirstRecord="true" doc:name="CSV2Azienda"/>
                <jdbc-ee:outbound-endpoint exchange-pattern="one-way" queryKey="InsertAziende" queryTimeout="-1" connector-ref="jdbcConnector" doc:name="Database Azienda">
                    <jdbc-ee:query key="InsertAziende" value="INSERT INTO aw006_azienda VALUES (#[map-payload:AW006_ID], #[map-payload:AW006_ID_CLIENTE], #[map-payload:AW006_RAGIONE_SOCIALE])"/>
                </jdbc-ee:outbound-endpoint>
            </when>
            <when expression="INVOCATION:nome_file=servizi" evaluator="header">
                <jdbc-ee:csv-to-maps-transformer delimiter="," mappingFile="src/main/resources/services-csv-format.xml" ignoreFirstRecord="true" doc:name="CSV2Servizi"/>
                <jdbc-ee:outbound-endpoint exchange-pattern="one-way" queryKey="InsertServizi" queryTimeout="-1" connector-ref="jdbcConnector" doc:name="Database Servizi">
                    <jdbc-ee:query key="InsertServizi" value="INSERT INTO ctrl_aemd_unb_servizi VALUES (#[map-payload:CTRL_ID_TIPO_OPERAZIONE], #[map-payload:CTRL_DESCRIZIONE], #[map-payload:CTRL_COD_SERVIZIO])"/>
                </jdbc-ee:outbound-endpoint>
            </when>
            <when expression="INVOCATION:nome_file=richiesta" evaluator="header">
                <jdbc-ee:csv-to-maps-transformer delimiter="," mappingFile="src/main/resources/requests-csv-format.xml" ignoreFirstRecord="true" doc:name="CSV2Richiesta"/>
                <jdbc-ee:outbound-endpoint exchange-pattern="one-way" queryKey="InsertRichieste" queryTimeout="-1" connector-ref="jdbcConnector" doc:name="Database Richiesta">
                    <jdbc-ee:query key="InsertRichieste" value="INSERT INTO ctrl_aemd_unb_richiesta VALUES (#[map-payload:CTRL_ID_CONTROLLER], #[map-payload:CTRL_NUM_RICH_VENDITORE], #[map-payload:CTRL_VENDITORE], #[map-payload:CTRL_CANALE_VENDITORE], #[map-payload:CTRL_CODICE_SERVIZIO], #[map-payload:CTRL_STATO_AVANZ_SERVIZIO], #[map-payload:CTRL_DATA_INSERIMENTO])"/>
                </jdbc-ee:outbound-endpoint>
            </when>
        </choice>   
    </flow>

Пожалуйста, я не знаю, как решить эту проблему. Заранее спасибо за любую помощь


person Paride Letizia    schedule 06.05.2013    source источник


Ответы (2)


Как сказал SteveS, csv-to-maps-transformer может попытаться загрузить весь файл в память перед его обработкой. Что вы можете попробовать сделать, так это разделить CSV-файл на более мелкие части и отправить эти части VM для индивидуальной обработки. Сначала создайте компонент для выполнения этого первого шага:

public class CSVReader implements Callable{
    @Override
    public Object onCall(MuleEventContext eventContext) throws Exception {

        InputStream fileStream = (InputStream) eventContext.getMessage().getPayload();
        DataInputStream ds = new DataInputStream(fileStream);
        BufferedReader br = new BufferedReader(new InputStreamReader(ds));

        MuleClient muleClient = eventContext.getMuleContext().getClient();

        String line;
        while ((line = br.readLine()) != null) {
            muleClient.dispatch("vm://in", line, null);
        }

        fileStream.close();
        return null;
    }
}

Затем разделите основной поток на два

<file:connector name="File" 
    workDirectory="yourWorkDirPath" autoDelete="false" streaming="true"/>

<flow name="CsvToFile" doc:name="Split and dispatch">
    <file:inbound-endpoint path="inboxPath"
        moveToDirectory="processedPath" pollingFrequency="60000"
        doc:name="CSV" connector-ref="File">
        <file:filename-wildcard-filter pattern="*.csv"
            caseSensitive="true" />
    </file:inbound-endpoint>
    <component class="it.aizoon.grpBuyer.AddMessageProperty" doc:name="Add Message Property" />
    <component class="com.dgonza.CSVReader" doc:name="Split the file and dispatch every line to VM" />
</flow>

<flow name="storeInDatabase" doc:name="receive lines and store in database">
    <vm:inbound-endpoint exchange-pattern="one-way"
        path="in" doc:name="VM" />
    <Choice>
        .
        .
        Your JDBC Stuff
        .
        .
    <Choice />
</flow>

Сохраните текущую конфигурацию file-connector, чтобы включить потоковую передачу. С помощью этого решения данные csv можно обрабатывать без необходимости сначала загружать весь файл в память. ХТН

person Daniel    schedule 06.05.2013
comment
Большое спасибо SteveS и Daniel, я попробую это решение. - person Paride Letizia; 07.05.2013
comment
Привет, я пытался использовать вашу схему, но, ХОТЯ я могу вставить несколько сотен строк, в какой-то момент я получаю это сообщение: - person Paride Letizia; 07.05.2013
comment
INFO 2013-05-07 18:23:18,379 [[splitmultithread].FileSplitter.receiver.02] org.mule.transport.file.FileMessageReceiver: получена блокировка файла: C:\workspace_3.4\splitmultithread\src\main\ resources\inbox\richiesta.csv ОШИБКА 2013-05-07 18:24:00,144 [[splitmultithread].storeInDatabase.stage1.04] org.mule.processor.AsyncWorkListener: работа вызвала исключение для «workCompleted». Выполняемая работа: org.mule.processor.AsyncInterceptingMessageProcessor$AsyncMessageProcessorWorker@3bc752 - person Paride Letizia; 07.05.2013
comment
Какова ценность pollingFrequency для вашего file:inbound-endpoint? Попробуйте увеличить это значение. Возможно, эта конечная точка пытается прочитать файл, который все еще обрабатывается. - person Daniel; 07.05.2013
comment
Да, Даниил, это проблема. Знаете ли вы, как отключить опрос или, что лучше, как активировать конечную точку файла только тогда, когда в каталоге есть файл? А не каждые х секунд? - person Paride Letizia; 07.05.2013
comment
Вы не можете отключить опрос для file:inbound-endpoint. Чтобы избежать подобных проблем, настройте workDirectory в файле file:connector. Он переместит файл в другой каталог перед его обработкой, и мул больше не будет пытаться заблокировать его. Наконец, когда поток будет закрыт, файл будет перемещен в каталог, настроенный в file:inbound-endpoint - person Daniel; 07.05.2013
comment
Хорошо, вы можете опубликовать небольшой пример того, что вы говорите? Просто небольшой фрагмент кода. Спасибо, Даниил, большое - person Paride Letizia; 08.05.2013
comment
Просто делаю то, что вы предложили - person Paride Letizia; 08.05.2013
comment
Здорово! Обновлен мой ответ, добавив конфигурацию соединителя. Если это решило вашу проблему, отметьте ответ как принятый. - person Daniel; 08.05.2013
comment
Привет, Даниил, у меня другая проблема. Если я помещаю для обработки более одного файла csv, мул не обрабатывает один файл за раз... как я могу решить эту проблему? Эта проблема вызвана тем, что система не ждет, чтобы обработать один файл, а затем начинает с другого .... Я схожу с ума. Пожалуйста помогите. Ваше решение работает, если у меня есть только один CSV - person Paride Letizia; 08.05.2013
comment
Будет лучше, если вы опубликуете еще один вопрос с этой новой проблемой, объяснив ее более подробно. - person Daniel; 08.05.2013
comment
Я решил. Если вы хотите обрабатывать каждый файл за раз, вы должны добавить processingStrategy=synchronous в поток CSV для базы данных. - person Paride Letizia; 08.05.2013

Я считаю, что csv-to-maps-transformer заставит весь файл помещаться в память. Поскольку вы имеете дело с одним большим файлом, лично я бы просто написал класс Java для его обработки. Конечная точка File передаст файловый поток вашему пользовательскому преобразователю. Затем вы можете установить соединение JDBC и выбирать информацию построчно, не загружая весь файл. Я использовал OpenCSV для синтаксического анализа CSV. Таким образом, ваш класс Java будет содержать что-то вроде следующего:

protected Object doTransform(Object src, String enc) throws TransformerException {  

    try {
        //Make a JDBC connection here

        //Now read and parse the CSV

        FileReader csvFileData = (FileReader) src;


        BufferedReader br = new BufferedReader(csvFileData);
        CSVReader reader = new CSVReader(br);

        //Read the CSV file and add the row to the appropriate List(s)
        String[] nextLine;
        while ((nextLine = reader.readNext()) != null) {
            //Push your data into the database through your JDBC connection
        }
        //Close connection.

               }catch (Exception e){
    }
person SteveS    schedule 06.05.2013
comment
Прежде всего, я хотел бы поблагодарить вас за помощь, как SteveS, так и Дэниела. Я попробую ваше решение, надеюсь, вы будете здесь, если у меня возникнут проблемы. Спасибо - person Paride Letizia; 07.05.2013