Как удалить элемент из ConcurrentDictionary после окончательного завершения ContinueWith

Во-первых, может ли кто-нибудь с 1500+ «репутацией» создать тег для «ContinueWith» (и пометить им этот вопрос)? Спасибо!

Извините за длину этого поста, но я не хочу тратить время на то, чтобы кто-то пытался мне помочь, потому что я упустил важные детали. Тем не менее, это все еще может произойти. :)

Теперь подробности. Я работаю над сервисом, который подписывается на пару тем очереди ActiveMQ. Две темы в чем-то связаны. Одно из них — «обновление компании», а другое — «обновление продукта». Идентификатором для обоих является CompanyID. Тема компании включает данные из темы продукта. Требуется, поскольку другим подписчикам нужны данные о продукте, но они не хотят/не должны подписываться на тему о продукте. Поскольку мой сервис является многопоточным (требование не зависит от нашего усмотрения), по мере поступления сообщений я добавляю Task для обработки каждого из них в ConcurrentDictionary, используя AddOrUpdate, где параметр обновления просто ContinueWith (см. ниже). Это сделано для предотвращения одновременных обновлений, которые могут происходить из-за того, что эти темы и подписчики являются «устойчивыми», поэтому, если моя служба прослушивания отключится (по какой-либо причине), мы можем закончить с несколькими сообщениями (компания и / или продукт) для одного и того же CompanyID.

Теперь мой актуальный вопрос (наконец-то!) После завершения задачи (будь то одна задача или последняя в цепочке задач ContinueWith) я хочу удалить ее из ConcurrentDictionary (очевидно). Как? Я думал и получил некоторые идеи от коллег, но мне не очень нравится ни одна из них. Я не буду перечислять идеи, потому что ваш ответ может быть одной из тех идей, которые у меня есть, но которые мне не нравятся, но в конечном итоге она может оказаться лучшей.

Я попытался сжать фрагмент кода, чтобы вам не приходилось слишком много прокручивать вверх и вниз, в отличие от моего описания. :)

nrtq = не относится к вопросу

public interface IMessage
{
  long CompantId { get; set; }
  void Process();
}
public class CompanyMessage : IMessage
{ //implementation, nrtq }
public class ProductMessage : IMessage
{ //implementation, nrtq }

public class Controller
{
  private static ConcurrentDictionary<long, Task> _workers = new ConcurrentDictionary<long, Task>();
  //other needed declarations, nrtq

  public Controller(){//constructor stuff, nrtq }

  public StartSubscribers()
  {
    //other code, nrtq
    _companySubscriber.OnMessageReceived += HandleCompanyMsg;
    _productSubscriber.OnMessageReceived += HandleProductMsg;
  }

  private void HandleCompanyMsg(string msg)
  {
    try {
      //other code, nrtq
      QueueItUp(new CompanyMessage(message));
    } catch (Exception ex) { //other code, nrtq }
  }

  private void HandleProductMsg(string msg)
  {
    try {
      //other code, nrtq
      QueueItUp(new ProductMessage(message));
    } catch (Exception ex) { //other code, nrtq }
  }

  private static void QueueItUp(IMessage message)
  {
    _workers.AddOrUpdate(message.CompanyId,
      x => {
        var task = new Task(message.Process);
        task.Start();
        return task;
      },
      (x, y) => y.ContinueWith((z) => message.Process())
    );
  }

Спасибо!


person Andrew Steitz    schedule 14.06.2012    source источник
comment
Не могли бы вы просто подключить финальное продолжение, которое удаляет ключ словаря? (Спасибо за сокращение кода, кстати!)   -  person usr    schedule 14.06.2012
comment
@usr - вот проблема, с которой я столкнулся. Хотелось бы услышать, есть ли у вас решение. КОГДА я прикреплю окончательный ContWith? Я не знаю, сколько заданий у меня будет в итоге. Пример: я добавляю первую задачу (и ContWith, чтобы удалить ключ). B4 это сделано, я получаю еще 2 и продолжаю с ними, но они ПОСЛЕ Del. Задача Del выполняется. Во время выполнения задачи для Msg2 я получаю новое сообщение (#4) для того же идентификатора, но идентификатора больше нет в Dict, поэтому я запускаю задачу NEW, которая теперь может выполняться одновременно с задачей для Msg3. Если бы существовал InsertContWithBeforeLastContWith, это было бы здорово! :)   -  person Andrew Steitz    schedule 14.06.2012
comment
Или FinalContinue, который поместит задачу в цепочку, но всегда будет держать ее в конце, чтобы любые новые задачи ContinueWith выполнялись перед ней.   -  person Andrew Steitz    schedule 14.06.2012
comment
Не существует такой вещи, как цепочка продолжения. Это произвольный ориентированный ациклический граф. Если вам нужно поместить что-то в определенное место, библиотека не может помочь с этим, потому что в этой модели конец цепочки не определен четко. Вам нужно как-то определить, что вы не собираетесь добавлять новое продолжение.   -  person usr    schedule 14.06.2012


Ответы (1)


Я не буду «принимать» этот ответ какое-то время, потому что мне не терпится увидеть, сможет ли кто-нибудь еще найти лучшее решение.

Коллега придумал решение, которое я немного подправил. Да, я знаю об иронии (?) использования оператора lock с оператором ConcurrentDictionary. У меня действительно нет времени прямо сейчас, чтобы посмотреть, будет ли лучший тип коллекции для использования. По сути, вместо того, чтобы просто делать ContinueWith() для существующих задач, мы заменяем задачу самой собой плюс еще одну задачу, прикрепляемую в конце, используя ContinueWith().

Какая разница? Рад, что вы спросили! :) Если бы мы только что выполнили ContinueWith(), то !worker.Value.IsCompleted вернуло бы true, как только первая задача в цепочке будет завершена. Однако, заменив задачу двумя (или более) связанными задачами, то, что касается коллекции, будет только одна задача, и !worker.Value.IsCompleted не вернет true, пока все задачи в цепочке завершены.

Признаюсь, я был немного обеспокоен заменой задачи самой собой+(новой задачей), потому что что, если задача окажется запущенной во время ее замены. Что ж, я тестировал живые дневные светы из этого и не столкнулся ни с какими проблемами. Я считаю, что происходит то, что, поскольку задача выполняется в своем собственном потоке, а коллекция просто содержит указатель на нее, работающая задача не затрагивается. Заменив его на саму + (новая задача), мы сохраняем указатель на исполняемый поток и получаем «уведомление», когда оно завершено, чтобы следующая задача могла «продолжиться», или IsCompleted возвращает true.

Кроме того, то, как работает цикл «очистки» и где он расположен, означает, что у нас будут «завершенные» задачи, висящие в коллекции, но только до следующего запуска «очистки», то есть в следующий раз, когда сообщение получено. Опять же, я провел много тестов, чтобы увидеть, не вызову ли я проблемы с памятью из-за этого, но моя служба никогда не использовала более 20 МБ ОЗУ, даже при обработке сотен сообщений в секунду. Нам пришлось бы получать довольно большие сообщения и иметь много длительных задач, чтобы это когда-либо вызывало проблему, но об этом следует помнить, поскольку ваша ситуация может отличаться.

Как и выше, в приведенном ниже коде nrtq = не относится к вопросу.

public interface IMessage
{
  long CompantId { get; set; }
  void Process();
}
public class CompanyMessage : IMessage
{ //implementation, nrtq }
public class ProductMessage : IMessage
{ //implementation, nrtq }

public class Controller
{
  private static ConcurrentDictionary<long, Task> _workers = new ConcurrentDictionary<long, Task>();
  //other needed declarations, nrtq

  public Controller(){//constructor stuff, nrtq }

  public StartSubscribers()
  {
    //other code, nrtq
    _companySubscriber.OnMessageReceived += HandleCompanyMsg;
    _productSubscriber.OnMessageReceived += HandleProductMsg;
  }

  private void HandleCompanyMsg(string msg)
  {
    //other code, nrtq
    QueueItUp(new CompanyMessage(message));
  }

  private void HandleProductMsg(string msg)
  {
    //other code, nrtq
    QueueItUp(new ProductMessage(message));
  }

  private static void QueueItUp(IMessage message)
  {
    lock(_workers)
    {
      foreach (var worker in Workers)
      {
        if (!worker.Value.IsCompleted) continue;
        Task task;
        Workers.TryRemove(worker.Key, out task);
      }
      var id = message.CompanyId;
      if (_workers.ContainsKey(id))
        _workers[id] = _workers[id].ContinueWith(x => message.Process());
      else
      {
        var task = new Task(y => message.Process(), id);
        _workers.TryAdd(id, task);
        task.Start();
      }
    }
  }
person Andrew Steitz    schedule 19.06.2012