NiFi 1.1.1 протестирован как на Windows 7, так и на RHEL 7.
Фоновый поток находится здесь.
Я создал службу контроллера DBCPConnectionPool, указывающую на базу данных SQL Server, я могу извлекать данные из таблицы и записывать их на локальный диск (ExecuteSQL -> ConvertAvroToJSON -> PutFile).
Мой код:
public byte[] getMaxLSN(Connection connection, String containerDB) {
String dbMaxLSN = "{? = CALL sys.fn_cdc_get_max_lsn()}";
byte[] maxLSN = null;
try (final CallableStatement cstmt = connection.prepareCall(dbMaxLSN);) {
cstmt.registerOutParameter(1, java.sql.JDBCType.BINARY);
cstmt.execute();
if (cstmt.getBytes(1) == null || cstmt.getBytes(1).length <= 0) {
System.out.println("Coudln't retrieve the max lsn for the db "
+ containerDB);
} else {
maxLSN = cstmt.getBytes(1);
}
} catch (SQLException sqlException) {
System.out.println("sqlException !!!");
sqlException.printStackTrace();
}
return maxLSN;
}
Проблема возникает, когда я использую пул как свойство в своем пользовательском процессоре. В коде процессора мне нужно вызвать функцию в базе данных, но это приводит к исключению SQLException, указывающему на драйвер JDBC. Обратите внимание, что тот же драйвер правильно работает в автономном коде Java (предоставляется в фоновом потоке, чтобы не загромождать этот пост), и я получаю возвращаемое значение из функции. Я подозреваю, что служба контроллера настроена неправильно — она может выполнять запросы на выборку, но когда код вызывает функцию, она генерирует исключение. Что мне не хватает?
Process or SQL exception in <configure logger template to pick the code location>
2017-03-17 09:25:30,717 ERROR [Timer-Driven Process Thread-6] c.s.d.processors.SQLServerCDCProcessor
org.apache.nifi.processor.exception.ProcessException: Coudln't retrieve the max lsn for the db test
at com.datalake.processors.SQLServerCDCProcessor$SQLServerCDCUtils.getMaxLSN(SQLServerCDCProcessor.java:692) ~[nifi-NiFiCDCPoC-processors-1.0-SNAPSHOT.jar:1.0-SNAPSHOT]
at com.datalake.processors.SQLServerCDCProcessor.getChangedTableQueries(SQLServerCDCProcessor.java:602) ~[nifi-NiFiCDCPoC-processors-1.0-SNAPSHOT.jar:1.0-SNAPSHOT]
at com.datalake.processors.SQLServerCDCProcessor.onTrigger(SQLServerCDCProcessor.java:249) ~[nifi-NiFiCDCPoC-processors-1.0-SNAPSHOT.jar:1.0-SNAPSHOT]
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) [nifi-api-1.1.1.jar:1.1.1]
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1099) [nifi-framework-core-1.1.1.jar:1.1.1]
at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-1.1.1.jar:1.1.1]
at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-1.1.1.jar:1.1.1]
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132) [nifi-framework-core-1.1.1.jar:1.1.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_71]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_71]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_71]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_71]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_71]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_71]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_71]
Caused by: java.sql.SQLFeatureNotSupportedException: registerOutParameter not implemented
at java.sql.CallableStatement.registerOutParameter(CallableStatement.java:2613) ~[na:1.8.0_71]
at com.datalake.processors.SQLServerCDCProcessor$SQLServerCDCUtils.getMaxLSN(SQLServerCDCProcessor.java:677) ~[nifi-NiFiCDCPoC-processors-1.0-SNAPSHOT.jar:1.0-SNAPSHOT]
... 14 common frames omitted