Python: Pika
pika
is a Python module which implements the AMQP 0-9-1 protocol.
- It is typically used to interact with RabbitMQ
Installing
python3 -m pip install pika
Connection Class
- The
Connection
class handles communication with RabbitMQ
- It is the base class for which all Connection Adapters extend
- This class should not be invoked directlky but rather through the use of an adapter such as:
BlockingConnection
or SelectConnection
Connection Adapters
BlockingConnection
BlockingConnection
creates a layer on top of pika's asynchronous code providing methods
that will block until their response has returned
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
Channel
- The Channel class provides methods for interacting with an AMQP Channel.
- Here are the Channel class Methods:
Create Exchange
exchange_declare (exchange # string - exchange name
,exchange_type # string - exchange type ["direct", ...]
,passive=False # boolean - only check to see if the exchange exists
,durable=False # boolean - whether or not the exchange will survive a broker restart
,auto_delete=False # boolean - whether or not to delete the exchange when no queues are bound to it
,internal=False # boolean - whether or not the exchange can only be published to by other exchanges
,arguments=None
,callback=None)
Delete Exchange
# tbd
Declare a Queue, Create if Necessary
queue_declare (queue # string - queue name; if empty string, the broker will create a unique queue name
,passive=False # boolean - only check to see if the queue exists
,durable=False # boolean - whether or not the queue will survive a broker restart
,exclusive=False # boolean - only allow access by current connection (consumer)
,auto_delete=False # boolean - delete after consumer cancels or disconnects
,arguments=None # Dictionary - custom key/value arguments for queue
,callback=None) # callable - callback(pika.frame.Method) for method Queue.DeclareOk
Delete Queue
# tbd
Perge Queue (Perge All Messages)
# tbd
Bind a Queue to an Exchange
queue_bind (queue # string - queue name to bind to
,exchange # string - exchange name to bind to
,routing_key=None # string - routing key to bind on
,arguments=None # Dictionary - optional arguments
,callback=None) # callable - callback (pika.frame.Method) for method Queue.BindOk
Unbind a Queue from an Exchange
# tbd
Publish a Message [to an exchange]
basic_publish (exchange # string - the exchange to publish to
,routing_key # string
,body # bytes - message body
,properties=None # properties(pika.spec.BasicProperties)
,mandatory=False)
# sending a persistent msg
# - persistent messages will survive a broker re-start
channel.basic_publish(exchange='foo'
,routing_key='bar'
,body="hello, world!"
,properties=pika.BasicProperties(delivery_mode = pika.spec.PERSISTENT_DELIVERY_MODE)
)
Consume Messages - via callback
#
# RETURN
# ======
# consumer tag (string) which csn be used to cancel the consumer
#
# RAISES
# ======
# ValueError
basic_consume (queue # string - queue to consume from
,on_message_callback # callable - the function to be called when a message is received. see signature below
,auto_ack=False # boolean - whether or not automatic acknowledgement mode will be used.
,exclusive=False # boolean - whether or not other consumers will be allowed to consume from the queue
,consumer_tag=None # string a user specified consumer tag
,arguments=None # Dictionary - custom key/value pair arguments for the consumer
,callback=None) # callable - callback (pica.frame.Method) for method Basic.ConsumeOk
# callback definition example
def callback (ch # channel
,method
,properties
,body # bytes
):
global num_msg_rcvd
num_msg_rcvd += 1
print ("PID=%d. Received |%r|. MSG#=%d" % (pid,body,num_msg_rcvd))
ch.basic_ack(delivery_tag = method.delivery_tag)
print ("PID=%d. Acknowledgement Sent" % (pid))
# register a callback
channel.basic_consume (queue=queue,on_message_callback=callback)
# start consuming from queue
channel.start_consuming()
Acknowledge (Ack) a Message(s)
# tbd
Negative Acknowledge (Nack) a Message(s)
#
# multiple, delivery_tag
# ======================
# - see also basic_reject() - very similar (except also supports multiple)
# - multiple=True, delivery_tag is "up to and including" - allowing multiple messages to be acknowledged via a single call
# - multiple=False, delivery_tag refers to a single message.
# - multiple=True, delivery_tag=0 indicates an acknowledgement of all outstanding messages
#
# requeue
# =======
# - True - the broker will attempt to requeue the message
# - False - the message will be discarded or dead-lettered
#
basic_nack (delivery_tag=0 # integer - the server-assigned delivery tag
,multiple=False # boolean - see above
,requeue=True) # boolean - see above
Reject a Message(s)
#
# see nack
# - same functionality, only reject does not support multiple
#
basic_reject (delivery_tag=0
,requeue=True)
Publisher Confirms
#
# callback signature
# callback(pica.frame.Method) - where method_frame contains either method spec.Basic.Ack or spec.Basic.Nack
#
# RAISES
# ======
# ValueError
#
confirm_delivery(ack_nack_callback # callable - callback for delivery confirmations
,callback=None) # callable - callback(pika.frame.Method) for method Confirm.SelecdtOk
Hello, World!
#!/usr/bin/python3
//
// send.py
//
import pika
queue='hello_world_queue'
connection = pika.BlockingConnection(pika.ConnecdtionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue=queue)
txt = "Hello, World!"
channel.basic_publish(exchange'' # default exchange - msgs will be routed to queue with name = routing_key
,routing_key=queue
,body=txt
)
print ("send.py: Sent %s" % (txt))
connection.close()
//
// receive.py
//
import sys
import pika
queue='hello_world_queue'
def callback (ch, method, properties, body):
print ("Received %r" % body)
ch.basic_ack(delivery_tag = method.delivery_tag)
print ("Acknowledgement Sent")
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# creating a queue is idempotent
print ("creating queue '%s'" % (queue))
channel.queue_declare(queue=queue)
channel.basic_consume (queue=queue,on_message_callback=callback)
print ("Waiting for messages. To exit press CTRL+C")
channel.start_consuming()
connection.close()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print ("Goodbye")
sys.exit(0)`
Acknowledgements