Python: Pika

Installing

python3 -m pip install pika

Connection Class

Connection Adapters

BlockingConnection

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

Channel

Create Exchange

exchange_declare   (exchange                # string - exchange name
                    ,exchange_type          # string - exchange type ["direct", ...]
                    ,passive=False          # boolean - only check to see if the exchange exists
                    ,durable=False          # boolean - whether or not the exchange will survive a broker restart
                    ,auto_delete=False      # boolean - whether or not to delete the exchange when no queues are bound to it
                    ,internal=False         # boolean - whether or not the exchange can only be published to by other exchanges
                    ,arguments=None
                    ,callback=None)

Delete Exchange

# tbd

Declare a Queue, Create if Necessary

queue_declare   (queue                  # string - queue name; if empty string, the broker will create a unique queue name
                ,passive=False          # boolean - only check to see if the queue exists
                ,durable=False          # boolean - whether or not the queue will survive a broker restart
                ,exclusive=False        # boolean - only allow access by current connection (consumer)
                ,auto_delete=False      # boolean - delete after consumer cancels or disconnects
                ,arguments=None         # Dictionary - custom key/value arguments for queue
                ,callback=None)         # callable - callback(pika.frame.Method) for method Queue.DeclareOk

Delete Queue

# tbd

Perge Queue (Perge All Messages)

# tbd

Bind a Queue to an Exchange

queue_bind  (queue              # string - queue name to bind to
            ,exchange           # string - exchange name to bind to
            ,routing_key=None   # string - routing key to bind on
            ,arguments=None     # Dictionary - optional arguments
            ,callback=None)     # callable - callback (pika.frame.Method) for method Queue.BindOk

Unbind a Queue from an Exchange

# tbd

Publish a Message [to an exchange]

basic_publish   (exchange           # string - the exchange to publish to
                ,routing_key        # string
                ,body               # bytes - message body
                ,properties=None    # properties(pika.spec.BasicProperties)
                ,mandatory=False)
# sending a persistent msg 
# - persistent messages will survive a broker re-start

channel.basic_publish(exchange='foo'
                    ,routing_key='bar'
                    ,body="hello, world!"
                    ,properties=pika.BasicProperties(delivery_mode = pika.spec.PERSISTENT_DELIVERY_MODE)
                    )

Consume Messages - via callback

#
# RETURN
# ======
# consumer tag (string) which csn be used to cancel the consumer
#
# RAISES
# ======
# ValueError 

basic_consume   (queue                  # string - queue to consume from
                ,on_message_callback    # callable - the function to be called when a message is received. see signature below
                ,auto_ack=False         # boolean - whether or not automatic acknowledgement mode will be used.
                ,exclusive=False        # boolean - whether or not other consumers will be allowed to consume from the queue
                ,consumer_tag=None      # string a user specified consumer tag
                ,arguments=None         # Dictionary - custom key/value pair arguments for the consumer
                ,callback=None)         # callable - callback (pica.frame.Method) for method Basic.ConsumeOk
# callback definition example
def callback    (ch             # channel
                ,method
                ,properties
                ,body           # bytes
                ):

    global num_msg_rcvd

    num_msg_rcvd += 1

    print ("PID=%d. Received |%r|. MSG#=%d" % (pid,body,num_msg_rcvd))

    ch.basic_ack(delivery_tag = method.delivery_tag)
    print ("PID=%d. Acknowledgement Sent" % (pid))

# register a callback
channel.basic_consume (queue=queue,on_message_callback=callback)

# start consuming from queue
channel.start_consuming()

Acknowledge (Ack) a Message(s)

# tbd

Negative Acknowledge (Nack) a Message(s)

#
# multiple, delivery_tag
# ======================
# - see also basic_reject() - very similar (except also supports multiple)
# - multiple=True, delivery_tag is "up to and including" - allowing multiple messages to be acknowledged via a single call
# - multiple=False, delivery_tag refers to a single message. 
# - multiple=True, delivery_tag=0 indicates an acknowledgement of all outstanding messages
#
# requeue
# =======
# - True - the broker will attempt to requeue the message
# - False - the message will be discarded or dead-lettered
#

basic_nack  (delivery_tag=0     # integer - the server-assigned delivery tag
            ,multiple=False     # boolean - see above
            ,requeue=True)      # boolean - see above

Reject a Message(s)

#
# see nack 
# - same functionality, only reject does not support multiple
#

basic_reject   (delivery_tag=0
                    ,requeue=True)

Publisher Confirms

#
# callback signature
# callback(pica.frame.Method) - where method_frame contains either method spec.Basic.Ack or spec.Basic.Nack
#
# RAISES
# ======
# ValueError
#

confirm_delivery(ack_nack_callback  # callable - callback for delivery confirmations 
                ,callback=None)     # callable - callback(pika.frame.Method) for method Confirm.SelecdtOk

Hello, World!

#!/usr/bin/python3

//
// send.py
//

import pika

queue='hello_world_queue'

connection = pika.BlockingConnection(pika.ConnecdtionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue=queue)

txt = "Hello, World!"

channel.basic_publish(exchange''                # default exchange - msgs will be routed to queue with name = routing_key
                    ,routing_key=queue
                    ,body=txt
                    )

print ("send.py: Sent %s" % (txt))

connection.close()
//
//  receive.py
//

import sys
import pika

queue='hello_world_queue'

def callback (ch, method, properties, body):

    print ("Received %r" % body)

    ch.basic_ack(delivery_tag = method.delivery_tag)

    print ("Acknowledgement Sent")


def main():

    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    
    # creating a queue is idempotent
    print ("creating queue '%s'" % (queue))
    channel.queue_declare(queue=queue)
    
    channel.basic_consume (queue=queue,on_message_callback=callback)
    
    print ("Waiting for messages. To exit press CTRL+C")
    
    channel.start_consuming()
    
    connection.close()


if __name__ == '__main__':

    try:
        main()

    except KeyboardInterrupt:

        print ("Goodbye")

        sys.exit(0)`

Acknowledgements