Npgsql обработка на събития и паралелност

Бих искал да заменя съществуващата си реализация на събития с такава, която може да обработва събития, когато постъпват, едновременно, ако е необходимо. За съжаление никога преди не съм прилагал някакъв вид паралелност, но трябва да започна отнякъде, нали?

Четох за Функционално реактивно програмиране. Въпреки сравнително простите изглеждащи примери, аз съм объркан как трябва да приложа тези примери в моето приложение. Всъщност дори не съм сигурен дали това е най-подходящият начин за постигане на едновременност тук.

Също така по някакъв начин бих могъл да използвам Task Parallel Library (TPL), тъй като предлага управление на нишки, тогава не е нужно да се тревожа твърде много за раждането на твърде много нишки. За съжаление примерите в горната връзка не съдържат такива, които използват TPL.

Текущият ми код е по-долу. Все още съм начинаещ програмист и ще имам нужда от малко държане за ръка, за да премина. Извинения за неприятностите. :(

open System
open System.Threading
open Npgsql

// This application is a Windows service. PostgreSQL sends a notice whenever a new row has been added to a table. 

// The next function processes new rows, aka tasks.
let private processTask () =
    EventLog.writeEventLog "Information" "Received new task notification."
    // Task processing yet to be implemented.

// A connection to PostgreSQL that stays open while service is running. Receives notifications.
let newNotifyConnection (host : string, username : string, password : string, database : string) = 
    let connectionString = sprintf "Host=%s;Username=%s;Password=%s;Database=%s;ContinuousProcessing=true;Keepalive=120;CommandTimeout=0" host username password database
    new NpgsqlConnection(connectionString)

let private notifyConnection = newNotifyConnection Settings.npgsqlConnection
let private listen = new NpgsqlCommand("LISTEN newtask", notifyConnection)

// Event for receiving and processing notifications.
let private onNotification (sender : obj) (e : NpgsqlNotificationEventArgs) =
    processTask()

let private notificationEventHandler = new NotificationEventHandler(onNotification)

// Run this function when service starts.
let startWorker () =
    notifyConnection.Notification.AddHandler(notificationEventHandler)
    notifyConnection.Open()
    listen.ExecuteNonQuery() |> ignore

// Run this function when service stops.
let stopWorker () =
    listen.Dispose()
    notifyConnection.Dispose()

person KeeperB5    schedule 22.09.2015    source източник


Отговори (1)


Ако желаете да тръгнете по пътя на функционалното програмиране първо с F# и RX, препоръчвам да инсталирате FSharp.Control.Reactive, наличен от Nuget. Това ще направи работата с RX много по-лесна, включително добавяне на повече методи към модула Observable (F# по подразбиране съдържа само подмножество тук).

За мен ключовият метод във вашия код е по-долу, тъй като това е, което свързва обратното извикване с източника на събитието.

let startWorker () =
    notifyConnection.Notification.AddHandler(notificationEventHandler)
    notifyConnection.Open()
    listen.ExecuteNonQuery() |> ignore

Номерът е да дадете на вашия обект notifyConnection обекта за обратно извикване, който RX ви предоставя, свързвайки световете между библиотеката Npgsql и RX рамката, така да се каже.

Нещо като:

open System
open System.Reactive.Linq

let observableCreate subscriptionFunction = Observable.Create(new Func<_, IDisposable>(subscriptionFunction))

let sourceObservable = observableCreate  (fun observer -> 
    // Your notifyConnection object will now direct all events to the observer provided
    notifyConnection.Notification.AddHandler(observer.OnNext)
    notifyConnection.Open()
    listen.ExecuteNonQuery() |> ignore
    // RX includes the CompositeDisposable class which you can also use
    { new IDisposable with 
          x.Dispose() =
              listen.Dispose()
              notifyConnection.Dispose() })

use subscription = sourceObservable.Subscribe(notifyConnectionHandler)

Обърнете внимание, че пакетът FSharp.Control.Reactive Nuget предоставя функцията Observable.create. (РЕДАКТИРАНЕ: Беше посочено, че пакетът Reactive няма обвивка за Observable.Create, така че аз го дефинирах по-горе).

Горното ще (при абонамент - с помощта на метода Subscribe()): - Отваряне на връзката - Изпълнение на заявката - Създаване на еднократна употреба, която да се използва, когато абонаментът бъде изхвърлен.

По-интересното е, че можете да свържете наблюдаемото и да го композирате с други, като използвате методите в модула Observable, което ви позволява да създавате реактивни програми. Вероятно е разумно да прочетете за RX, ако се интересувате от работа с IEvent/IObservable във F#.

person akara    schedule 23.09.2015
comment
Ще се опитам да разгледам Rx през следващата седмица или така. Вашият практически пример изглежда наистина удобен за начинаещ като мен, благодаря ви много. :) - person KeeperB5; 24.09.2015
comment
Инсталирах пакет FSharp.Control.Reactive, но в Observable няма „create“. - person KeeperB5; 24.09.2015
comment
Ваше право - гледайки кода на GitHub, това вероятно е един от единствените методи, които не са обвити от библиотеката. Въпреки това е относително лесно да се определи. Ще редактирам публикацията си с определението. - person akara; 24.09.2015