Во-первых, может ли кто-нибудь с 1500+ «репутацией» создать тег для «ContinueWith» (и пометить им этот вопрос)? Спасибо!
Извините за длину этого поста, но я не хочу тратить время на то, чтобы кто-то пытался мне помочь, потому что я упустил важные детали. Тем не менее, это все еще может произойти. :)
Теперь подробности. Я работаю над сервисом, который подписывается на пару тем очереди ActiveMQ. Две темы в чем-то связаны. Одно из них — «обновление компании», а другое — «обновление продукта». Идентификатором для обоих является CompanyID. Тема компании включает данные из темы продукта. Требуется, поскольку другим подписчикам нужны данные о продукте, но они не хотят/не должны подписываться на тему о продукте. Поскольку мой сервис является многопоточным (требование не зависит от нашего усмотрения), по мере поступления сообщений я добавляю Task для обработки каждого из них в ConcurrentDictionary, используя AddOrUpdate, где параметр обновления просто ContinueWith (см. ниже). Это сделано для предотвращения одновременных обновлений, которые могут происходить из-за того, что эти темы и подписчики являются «устойчивыми», поэтому, если моя служба прослушивания отключится (по какой-либо причине), мы можем закончить с несколькими сообщениями (компания и / или продукт) для одного и того же CompanyID.
Теперь мой актуальный вопрос (наконец-то!) После завершения задачи (будь то одна задача или последняя в цепочке задач ContinueWith) я хочу удалить ее из ConcurrentDictionary (очевидно). Как? Я думал и получил некоторые идеи от коллег, но мне не очень нравится ни одна из них. Я не буду перечислять идеи, потому что ваш ответ может быть одной из тех идей, которые у меня есть, но которые мне не нравятся, но в конечном итоге она может оказаться лучшей.
Я попытался сжать фрагмент кода, чтобы вам не приходилось слишком много прокручивать вверх и вниз, в отличие от моего описания. :)
nrtq = не относится к вопросу
public interface IMessage
{
long CompantId { get; set; }
void Process();
}
public class CompanyMessage : IMessage
{ //implementation, nrtq }
public class ProductMessage : IMessage
{ //implementation, nrtq }
public class Controller
{
private static ConcurrentDictionary<long, Task> _workers = new ConcurrentDictionary<long, Task>();
//other needed declarations, nrtq
public Controller(){//constructor stuff, nrtq }
public StartSubscribers()
{
//other code, nrtq
_companySubscriber.OnMessageReceived += HandleCompanyMsg;
_productSubscriber.OnMessageReceived += HandleProductMsg;
}
private void HandleCompanyMsg(string msg)
{
try {
//other code, nrtq
QueueItUp(new CompanyMessage(message));
} catch (Exception ex) { //other code, nrtq }
}
private void HandleProductMsg(string msg)
{
try {
//other code, nrtq
QueueItUp(new ProductMessage(message));
} catch (Exception ex) { //other code, nrtq }
}
private static void QueueItUp(IMessage message)
{
_workers.AddOrUpdate(message.CompanyId,
x => {
var task = new Task(message.Process);
task.Start();
return task;
},
(x, y) => y.ContinueWith((z) => message.Process())
);
}
Спасибо!