Очередь Rabbitmq с разветвлением процесса в PHP

У меня есть простой работник очереди, основанный на стандартном классе AMQP из PHP. Он работает с RabbitMQ в качестве сервера. У меня есть класс очереди для инициализации соединения AMQP с RabbitMQ. Все отлично работает с приведенным ниже кодом:

$queue = new Queue('myQueue');

 while($envelope = $queue->getEnvelope()) {
   $command = unserialize($envelope->getBody());

   if ($command instanceof QueueCommand) {
     try {
       if ($command->execute()) {
         $queue->ack($envelope->getDeliveryTag());
       }
     } catch (Exception $exc) {
       // an error occurred so do some processing to deal with it
     }
   }
 }

Однако я хотел разветвить выполнение команды очереди, но в этом случае очередь снова и снова становится бесконечной с первой командой. Я не могу подтвердить RabbitMQ, что сообщение было получено с помощью $ queue-> ack (); Моя разветвленная версия (упрощенная только с одним дочерним элементом для тестирования) выглядит так:

$queue = new Queue('myQueue');

while($envelope = $queue->getEnvelope()) {
  $command = unserialize($envelope->getBody());

  if ($command instanceof QueueCommand) {
    $pid = pcntl_fork();

    if ($pid) {
      //parent proces
      //wait for child
      pcntl_waitpid($pid, $status, WUNTRACED);

      if($status > 0) {
        // an error occurred so do some processing to deal with it
      } else {
        //remove Command from queue
        $queue->ack($envelope->getDeliveryTag());
      }
    } else {
      //child process
      try {
        if ($command->execute()) {
          exit(0);
        }
      } catch (Exception $exc) {
        exit(1);
      }
    }
  }
}

любая помощь будет оценена ...


person Grzegorz Motyl    schedule 07.09.2012    source источник


Ответы (1)


Я наконец решил проблему! Мне пришлось запустить команду ack из дочернего процесса, так работает! Это правильный код:

$queue = new Queue('myQueue');

while($envelope = $queue->getEnvelope()) {
  $command = unserialize($envelope->getBody());

  if ($command instanceof QueueCommand) {
    $pid = pcntl_fork();

    if ($pid) {
      //parent proces
      //wit for child
      pcntl_waitpid($pid, $status, WUNTRACED);

      if($status > 0) {
        // an error occurred so do some processing to deal with it
      } else {
        // sucess
      }
    } else {
      //child process
      try {
        if ($command->execute()) {
          $queue->ack($envelope->getDeliveryTag());
          exit(0);
        }
      } catch (Exception $exc) {
        exit(1);
      }
    }
  }
}
person Grzegorz Motyl    schedule 07.09.2012