PHP Unconference Europe 2015

AMQPQueue::consume

(PECL amqp >= Unknown)

AMQPQueue::consumeThe consume purpose

Description

public array AMQPQueue::consume ([ array $options = array() ] )

This is a blocking function, in that the function will not return until the minimum number of of messages as specified by the min are read off of the queue. To use a non-blocking function, see

Parameters

options

options is a an array of consume options. The keys used in the options array are: min, max, and ack. All other keys will be ignored.

For each missing option, the extension will check the ini settings or use the default value.

Return Values

An array containing the messages consumed. The number of returned messages will be at least the number given by min in the options array. But not more than the number given by max.

Examples

Example #1 AMQPQueue::consume() example

<?php

/* Create a connection using all default credentials: */
$connection = new AMQPConnection();
$connection->connect();

/* create a queue object */
$queue = new AMQPQueue($connection);

//declare the queue
$queue->declare('myqueue');

$options = array(
 
'min' => 1,
 
'max' => 10,
 
'ack' => true
);

//get the messages
$messages $queue->consume($options);

foreach (
$messages as $message) {
 echo 
$message['message_body'];
}

?>

add a note add a note

User Contributed Notes 5 notes

up
2
pinepain at gmail dot com
1 year ago
Be careful using this function with non-zero amqp.timeout (you may check at AMQPConnection::getTimeout), because it looks like timeout value says how long to wait for a new message from broker before die in way like

Fatal error: Uncaught exception 'AMQPConnectionException' with message 'Resource temporarily unavailable' in /path/to/your/file.php:12
Stack trace:
#0  /path/to/your/file.php(12): AMQPQueue->consume(Object(Closure), 128)
#1 {main}
  thrown in /path/to/your/file.php on line 12

As for notes about blocking, system resources greediness and so and so, you can investigate how it works by looking in  amqp_queue.c  for read_message_from_channel C function declaration and PHP_METHOD(amqp_queue_class, consume) method declaration. For me it works perfectly without any uncommon resources usage or I/O performance degradation under the load of 10k 64b message per second with delivery time for less than 0.001 sec.

OS: FreeBSD *** 8.2-RELEASE FreeBSD 8.2-RELEASE #0: Sat Mar ****** 2011     root@*****:****  amd64
PHP: PHP Version => 5.3.10, Suhosin Patch 0.9.10, Zend Engine v2.3.0

php AMQP extnsion:

amqp

Version => 1.0.9
Revision => $Revision: 327551 $
Compiled => Dec 2* 2012 @ *****
AMQP protocol version => 0-9-1
librabbitmq version => 0.2.0

Directive => Local Value => Master Value
amqp.auto_ack => 0 => 0
amqp.host => localhost => localhost
amqp.login => guest => guest
amqp.password => guest => guest
amqp.port => 5672 => 5672
amqp.prefetch_count => 3 => 3
amqp.timeout => 0 => 0
amqp.vhost => / => /

AMQP broker: RabbitMQ 3.0.1, Erlang R14B04

Definitely, such loop will block main thread, but due to single-thread PHP nature it's completely normal behavior. To exit this consumption loop your callback function or method (i prefer to use closures, btw) should return FALSE.

The benefit of this function is that you don't have manually iteration for all messages, and what is more important, if there is no unprocessed messages in queue it will wait for such for you.

So you have just to run you consumer (one or many) and optionally time to time check whether they still alive just for reason if you are not sure about callbacks or memory-limit-critical stuff
up
1
liuxiangchao at ometworks dot com
1 year ago
To consume ALL messages stored DURABLE exchanges, you will need to  set channel's prefetch size parameter to 0:

<?php $channel->setPrefetchCount(0); ?>
up
0
Laurent
2 years ago
Be careful using consume() function on AMQP. It will catch all Exception and fall down in infinite loop (message will not be marked as readed and reput in queue)
up
0
peter dot colclough at toolstation dot com
3 years ago
Using AMQP_consume, against a RabbitMQ server, actually stuffs memory. It will work in a loop, or on a constant recall, so long as your exchange/queue and messages are set to durable. However, it will alo make the system unusable within a couple of minutes.

Using get(), all is fine. I think this may be a bugette in teh PHP access code... ff to take a look.
up
0
hlopetz at gmail dot com
3 years ago
you shouldn't use AMQPQueue::consume() if you have to get _all_ incoming messages. you'll get only "max" number of messages and the queue will be destroyed then.

for my amqp.so v0.2.2 this weird behavior is true.

use AMQPQueue::get() and use "count" param instead.
To Top