Платформа сервера Java для прослушивания операторов PostgreSQL NOTIFY

Мне нужно написать сервер, который прослушивает операторы PostgreSQL NOTIFY и рассматривает каждое уведомление как запрос на обслуживание (на самом деле это больше похоже на задачу для обработки). Мои основные требования:

1) Механизм опроса PGConnection (в идеале это должен быть прослушиватель, но в реализации PgJDBC нам необходимо опрашивать ожидающие уведомления. Справочник)

2) Выполнить обратный вызов на основе «запроса» (используя имя канала в уведомлении NOTIFY) в отдельном потоке.

3) Имеет встроенный механизм управления потоками (создание/удаление потоков, когда задача обрабатывается/завершается, ставится в очередь, когда одновременно обрабатывается слишком много задач и т. д.)

Требования 1 и 2 — это то, что мне легко реализовать самому. Но я бы предпочел не писать управление потоками самостоятельно.

Существует ли существующая структура, отвечающая этим требованиям? Дополнительным преимуществом было бы то, что фреймворк автоматически генерирует статистику запросов.


person Aman    schedule 26.12.2011    source источник


Ответы (1)


Честно говоря, требование 3, вероятно, можно было бы легко удовлетворить, просто используя стандартные реализации ExecutorService от Executors, что позволит вам, например, получить пул потоков фиксированного размера и отправить им работу в виде реализаций Runnable или Callable. Они будут иметь дело с кровавыми деталями создания потоков до предела и т. Д. Затем вы можете попросить своего слушателя реализовать тонкий слой Runnable для сбора статистики и т. д.

Что-то типа:

private final ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
private final NotificationCallback callback;
private int waiting, executing, succeeded, failed;

public void pollAndDispatch() {
   Notification notification;
   while ((notification = pollDatabase()) != null) {
      final Notification ourNotification = notification;
      incrementWaitingCount();
      threadPool.submit(new Runnable() {
         public void run() {
           waitingToExecuting();
           try {
             callback.processNotification(ourNotification);
             executionCompleted();
           } catch (Exception e) {
             executionFailed();
             LOG.error("Exeception thrown while processing notification: " + ourNotification, e);
           }
         }
      });
   }
}
// check PGconn for notification and return it, or null if none received
protected Notification pollDatabase() { ... }
// maintain statistics
private synchronized void incrementWaitingCount() { ++waiting; }
private synchronized void waitingToExecuting() { --waiting; ++executing; }
private synchronized void executionCompleted() { --executing; ++succeeded; }
private synchronized void executionFailed() { --executing; ++failed; }

Если вы хотите проявить изюминку, поместите уведомления в очередь JMS и используйте ее инфраструктуру для прослушивания новых элементов и их обработки.

person araqnid    schedule 27.12.2011
comment
Я огляделся, и написать это самому показалось хорошей идеей. Спасибо за ответ, и фрагмент кода был действительно полезен. - person Aman; 29.12.2011