Заполнить карту потокобезопасным способом и передать эту карту другому методу из фонового потока?

У меня есть класс ниже, в котором метод add будет вызываться несколькими потоками для заполнения channelMessageHolder CHM потокобезопасным способом.

В том же классе у меня есть фоновый поток, который запускается каждые 30 секунд и вызывает метод send, передавая данные из channelMessageHolder.

public class Processor {
  private final ScheduledExecutorService executorService = Executors
      .newSingleThreadScheduledExecutor();
  private final AtomicReference<ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>>> channelMessageHolder =
                new AtomicReference<>(new ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>>());

  private Processor() {
    executorService.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
        send();
      }
    }, 0, 30, TimeUnit.SECONDS);
  }

  // this will be called by only single background thread
  private void send(ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>> messageByChannels) {
    for(Entry<Channel, ConcurrentLinkedQueue<Message>> entry : messageByChannels.entrySet()) {
      Channel channel = entry.getKey();
      ConcurrentLinkedQueue<Message> messageHolder = entry.getValue();

      while (!messageHolder.isEmpty()) {
        Message message = messageHolder.poll();
        ....
        // process this and send to database
      }      
    }
  }

  // called by multiple threads
  public void add(final Channel channel, final Message message) {
    // populate channelMessageHolder in a thread safe way
  }
}

Вопрос

Как вы можете видеть, channelMessageHolder уже присутствует в моем классе Processor, поэтому мне нужно явно передавать данные из этой карты каждые 30 секунд для отправки метода? Или я могу напрямую использовать его в своем методе отправки?

Путаница заключается в том, что если я напрямую использую его в своем методе отправки, то он будет заполняться несколькими потоками одновременно, поэтому я использую метод getAndSet AtomicReference, чтобы передать его методу send.

Дайте мне знать, если то, что я делаю, неправильно, и есть ли лучший способ сделать это?


person user1950349    schedule 04.02.2017    source источник


Ответы (2)


Как вы можете видеть, channelMessageHolder уже присутствует в моем классе Processor, поэтому мне нужно явно передавать данные из этой карты каждые 30 секунд для отправки метода? Или я могу напрямую использовать его в своем методе отправки?

Вы, безусловно, можете использовать его непосредственно в методе send(), и вам не нужна оболочка AtomicReference, поскольку ConcurrentHashMap уже синхронизирована. Вам нужно беспокоиться о том, чтобы ваши объекты ключей и значений на карте правильно синхронизировались. Я предполагаю, что Channel неизменяемо, а ConcurrentLinkedQueue является параллельным, поэтому у вас все должно быть хорошо.

// no need for AtomicReference
private final ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>> channelMessageHolder =
     new ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>>();

ConcurrentHashMap заботится о синхронизации для вас, поэтому потоки-производители могут добавлять в него элементы в то же время, когда ваш поток-отправитель отправляет элементы без конфликтов. AtomicReference нужен только в том случае, если вы пытаетесь разделить несинхронизированный класс между несколькими потоками.

Путаница заключается в том, что если я напрямую использую его в своем методе отправки, то он будет заполняться несколькими потоками одновременно, поэтому я использую метод getAndSet AtomicReference для передачи его в метод отправки.

Правильно, но это нормально. Несколько потоков будут добавлять сообщения в файл ConcurrentLinkedQueue. Каждые 30 секунд ваш фоновый поток запускается, получает Channel, удаляется из очереди и затем отправляет сообщения, находящиеся в очереди в этот момент. ConcurrentLinkedQueue защищает от условий гонки между производителем и потребителем.

Проблема, с которой вы сталкиваетесь в своем коде, заключается в том, что он не является реентерабельным, поскольку он использует несколько вызовов очереди:

while (!messageHolder.isEmpty()) {
    Message message = messageHolder.poll();

Это работает в вашем случае, потому что похоже, что только один поток удаляется из очереди, но следующий код лучше:

while (true) {
    // only one call to the concurrent queue
    Message message = messageHolder.poll();
    if (message == null) {
        break;
    }
    ...
}
person Gray    schedule 04.02.2017
comment
Можете ли вы объяснить, почему нам не нужна атомарная ссылка? Я подумал, что если нам нужно, чтобы все потоки видели одни и те же данные в одном и том же CHM, нам это нужно? Также я обновил свой вопрос (в разделе обновлений) полным кодом, который я использую. Я просто хотел убедиться, что получаю правильную обратную связь, поэтому решил проверить полный код, который буду использовать. Я выполняю несколько каналов параллельно, и у меня будет максимум 5-6 каналов. И Channel - это класс перечисления в моем коде. - person user1950349; 05.02.2017
comment
Кроме того, если мы используем while true, то он будет работать вечно, и в этом случае нам не нужен фоновый поток, который запускается каждые 30 секунд? Потому что в моем случае я сомневаюсь, что CHM будет пустым в любой момент времени, он всегда будет полон, поэтому я запускал каждые 30 секунд, но с бесконечным циклом он будет продолжать работать вечно, и следующий фоновый поток не получит забил, я полагаю. - person user1950349; 05.02.2017
comment
AtomicReference был бы необходим @user1950349, если бы вы пытались поделиться несинхронизированным объектом. Поскольку CHM уже синхронизирован внутри, несколько потоков могут совместно использовать его, не беспокоясь о синхронизации данных самостоятельно. Если бы вы использовали стандартный HashMap, вам пришлось бы использовать AtomicReference. - person Gray; 05.02.2017
comment
Ну, если он никогда не бывает пустым, то в какой-то момент производители заполнят память, верно @user1950349? Кроме того, если while (!messageHolder.isEmpty()) { всегда верно, тогда ваш код также будет вечно зацикливаться, верно? Я просто меняю тест, чтобы он был более реентерабельным, поскольку он выполняет одну операцию. - person Gray; 05.02.2017
comment
Ясно.. Да, вы правы.. Также вы видите какую-либо проблему в обновленном коде, где я выполняю несколько каналов параллельно. Я думаю, что я сделал это правильно, но хотел убедиться? Также в том же коде я использую два исполнителя, один для запуска фонового потока, а другой для параллельного выполнения нескольких каналов. Это должно быть так, и это правильно, или я могу использовать здесь одного исполнителя? - person user1950349; 05.02.2017
comment
Пожалуйста, не обновляйте свой код, потому что это делает недействительными ответы для потомков. Не стесняйтесь задавать еще один вопрос с новым кодом @user1950349. - person Gray; 05.02.2017
comment
конечно понял. Я удалил это .. Запомню это в следующий раз. Спасибо, что держишь меня в узде. Я уже задавал вопрос здесь. - person user1950349; 05.02.2017

Или я могу напрямую использовать его в своем методе отправки, ничего не передавая

Вы должны иметь возможность напрямую использовать его в методе send, сказав channelMessageHolder.getAndSet(new ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>>()) в начале метода send без каких-либо проблем.

Тем не менее, Java 8 добавил новый метод computeIfAbsent в класс ConcurrentHashMap, что означает, что вам на самом деле не нужен тот AtomicReference, который вы используете.

person CKing    schedule 04.02.2017
comment
Я все еще на Java 7, поэтому я не могу использовать computeIfAbsent. Также что такое состояние гонки и как я могу его исправить? И мой метод отправки будет вызываться только одним фоновым потоком. - person user1950349; 04.02.2017
comment
Должен ли я использовать здесь какую-либо другую структуру данных вместо CLQ? Я начал использовать его из-за его безопасности потоков - person user1950349; 04.02.2017
comment
@ user1950349 Плохо. Я неправильно прочитал newSingleThreadScheduledExecutor как newScheduledThreadPool. Там не будет условия гонки в том месте, где я сказал, что это будет в таком случае. Безопасность потоков — сложная тема, и было бы трудно давать дальнейшие советы, не понимая, чего вы пытаетесь достичь. Тем не менее, вместо этого вы можете использовать одну из реализаций BlockingQueue. - person CKing; 04.02.2017
comment
Конечно понял. Я также обновил вопрос. Можете ли вы проверить и сообщить мне, если это то, что вы имели в виду в методе отправки? - person user1950349; 04.02.2017
comment
@ user1950349 Да. Это изменение должно работать так же, как и раньше. - person CKing; 04.02.2017