Обработка событий и параллелизм Npgsql

Я хотел бы заменить существующую реализацию событий на такую, которая может обрабатывать события по мере их поступления, при необходимости одновременно. К сожалению, я никогда раньше не реализовывал какой-либо параллелизм, но нужно же с чего-то начинать, а?

Я читал об Функциональном реактивном программировании. Несмотря на то, что примеры выглядят относительно просто, я не понимаю, как применять эти примеры в своем приложении. На самом деле, я даже не уверен, является ли это наиболее подходящим способом достижения параллелизма здесь.

Я также каким-то образом смог бы использовать библиотеку параллельных задач (TPL), поскольку она предлагает управление потоками, и тогда мне не нужно слишком беспокоиться о порождении слишком большого количества потоков. К сожалению, примеры в приведенной выше ссылке не содержат примеров, использующих TPL.

Мой текущий код ниже. Я все еще новичок в программировании, и мне понадобится рука, чтобы пройти. Извиняюсь за беспокойство. :(

open System
open System.Threading
open Npgsql

// This application is a Windows service. PostgreSQL sends a notice whenever a new row has been added to a table. 

// The next function processes new rows, aka tasks.
let private processTask () =
    EventLog.writeEventLog "Information" "Received new task notification."
    // Task processing yet to be implemented.

// A connection to PostgreSQL that stays open while service is running. Receives notifications.
let newNotifyConnection (host : string, username : string, password : string, database : string) = 
    let connectionString = sprintf "Host=%s;Username=%s;Password=%s;Database=%s;ContinuousProcessing=true;Keepalive=120;CommandTimeout=0" host username password database
    new NpgsqlConnection(connectionString)

let private notifyConnection = newNotifyConnection Settings.npgsqlConnection
let private listen = new NpgsqlCommand("LISTEN newtask", notifyConnection)

// Event for receiving and processing notifications.
let private onNotification (sender : obj) (e : NpgsqlNotificationEventArgs) =
    processTask()

let private notificationEventHandler = new NotificationEventHandler(onNotification)

// Run this function when service starts.
let startWorker () =
    notifyConnection.Notification.AddHandler(notificationEventHandler)
    notifyConnection.Open()
    listen.ExecuteNonQuery() |> ignore

// Run this function when service stops.
let stopWorker () =
    listen.Dispose()
    notifyConnection.Dispose()

person KeeperB5    schedule 22.09.2015    source источник


Ответы (1)


Если вы хотите пойти по пути функционального программирования сначала с F # и RX, я рекомендую установить FSharp.Control.Reactive, доступный в Nuget. Это значительно упростит работу с RX, включая добавление дополнительных методов в модуль Observable (F# по умолчанию содержит здесь только подмножество).

Для меня ключевой метод в вашем коде приведен ниже, поскольку именно он связывает обратный вызов с источником события.

let startWorker () =
    notifyConnection.Notification.AddHandler(notificationEventHandler)
    notifyConnection.Open()
    listen.ExecuteNonQuery() |> ignore

Хитрость заключается в том, чтобы дать вашему объекту notifyConnection объект обратного вызова, который RX предоставляет вам, так сказать, соединяя миры между библиотекой Npgsql и инфраструктурой RX.

Что-то типа:

open System
open System.Reactive.Linq

let observableCreate subscriptionFunction = Observable.Create(new Func<_, IDisposable>(subscriptionFunction))

let sourceObservable = observableCreate  (fun observer -> 
    // Your notifyConnection object will now direct all events to the observer provided
    notifyConnection.Notification.AddHandler(observer.OnNext)
    notifyConnection.Open()
    listen.ExecuteNonQuery() |> ignore
    // RX includes the CompositeDisposable class which you can also use
    { new IDisposable with 
          x.Dispose() =
              listen.Dispose()
              notifyConnection.Dispose() })

use subscription = sourceObservable.Subscribe(notifyConnectionHandler)

Обратите внимание, что пакет FSharp.Control.Reactive Nuget предоставляет функцию Observable.create. (EDIT: было указано, что пакет Reactive не имеет оболочки для Observable.Create, поэтому я определил это выше себя).

Вышеприведенное будет (при подписке - с использованием метода Subscribe()): - Открыть соединение - Выполнить запрос - Создать одноразовый объект для использования при удалении подписки.

Что более интересно, так это то, что вы можете связать наблюдаемое и скомпоновать его с другими, используя методы модуля Observable, позволяющие создавать реактивные программы. Вероятно, было бы разумно прочитать RX, если вы заинтересованы в работе с IEvent/IObservable в F#.

person akara    schedule 23.09.2015
comment
Я постараюсь изучить Rx в течение следующей недели или около того. Ваш практический пример выглядит очень удобным для новичка вроде меня, большое спасибо. :) - person KeeperB5; 24.09.2015
comment
Я установил пакет FSharp.Control.Reactive, но в Observable нет «создать». - person KeeperB5; 24.09.2015
comment
Ваше право - глядя на код GitHub, это, вероятно, один из немногих методов, не обернутых библиотекой. Однако определить его относительно легко. Отредактирую свой пост с определением. - person akara; 24.09.2015