pip_services3_messaging.queues package

Module contents

pip_services3_messaging.queues.__init__

Queues module initialization

copyright:Conceptual Vision Consulting LLC 2018-2019, see AUTHORS for more details.
license:MIT, see LICENSE for more details.
class pip_services3_messaging.queues.IMessageQueue

Bases: pip_services3_commons.run.IOpenable.IOpenable, pip_services3_commons.run.IClosable.IClosable

Interface for asynchronous message queues.

Not all queues may implement all the methods. Attempt to call non-supported method will result in NotImplemented exception. To verify if specific method is supported consult with [[MessagingCapabilities]].

abandon(message)

Returnes message into the queue and makes it available for all subscribers to receive it again. This method is usually used to return a message which could not be processed at the moment to repeat the attempt. Messages that cause unrecoverable errors shall be removed permanently or/and send to dead letter queue.

Parameters:message – a message to return.
begin_listen(correlation_id, receiver)

Listens for incoming messages without blocking the current thread.

Parameters:
  • correlation_id – (optional) transaction id to trace execution through call chain.
  • receiver – a receiver to receive incoming messages.
complete(message)

Permanently removes a message from the queue. This method is usually used to remove the message after successful processing.

Parameters:message – a message to remove.
end_listen(correlation_id)

Ends listening for incoming messages. When this method is call [[listen]] unblocks the thread and execution continues.

Parameters:correlation_id – (optional) transaction id to trace execution through call chain.
get_capabilities()

Gets the queue capabilities

Returns:the queue’s capabilities object.
get_name()

Gets the queue name

Returns:the queue name.
listen(correlation_id, receiver)

Listens for incoming messages and blocks the current thread until queue is closed.

Parameters:
  • correlation_id – (optional) transaction id to trace execution through call chain.
  • receiver – a receiver to receive incoming messages.
move_to_dead_letter(message)

Permanently removes a message from the queue and sends it to dead letter queue.

Parameters:message – a message to be removed.
peek(correlation_id)

Peeks a single incoming message from the queue without removing it. If there are no messages available in the queue it returns null.

Parameters:correlation_id – (optional) transaction id to trace execution through call chain.
Returns:a message object.
peek_batch(correlation_id, message_count)

Peeks multiple incoming messages from the queue without removing them. If there are no messages available in the queue it returns an empty list.

Parameters:
  • correlation_id – (optional) transaction id to trace execution through call chain.
  • message_count – a maximum number of messages to peek.
Returns:

a list of message objects.

read_message_count()

Reads the current number of messages in the queue to be delivered.

Returns:a number of messages
receive(correlation_id, wait_timeout)

Receives an incoming message and removes it from the queue.

Parameters:
  • correlation_id – (optional) transaction id to trace execution through call chain.
  • wait_timeout – a timeout in milliseconds to wait for a message to come.
Returns:

a message object.

renew_lock(message, lock_timeout)

Renews a lock on a message that makes it invisible from other receivers in the queue. This method is usually used to extend the message processing time.

Parameters:
  • message – a message to extend its lock.
  • lock_timeout – a locking timeout in milliseconds.
send(correlation_id, envelop)

Sends a message into the queue.

Parameters:
  • correlation_id – (optional) transaction id to trace execution through call chain.
  • envelop – a message envelop to be sent.
send_as_object(correlation_id, message_type, message)

Sends an object into the queue. Before sending the object is converted into JSON string and wrapped in a [[MessageEnvelop]].

Parameters:
  • correlation_id – (optional) transaction id to trace execution through call chain.
  • message_type – a message type
  • message – an object value to be sent
class pip_services3_messaging.queues.MessageEnvelop(correlation_id=None, message_type=None, message=None)

Bases: object

Allows adding additional information to messages. A correlation id, message id, and a message type are added to the data being sent/received. Additionally, a MessageEnvelope can reference a lock token.

Side note: a MessageEnvelope’s message is stored as a buffer, so strings are converted using utf8 conversions.

correlation_id = None
message = None
message_id = None
message_type = None
reference = None
class pip_services3_messaging.queues.MessagingCapabilities(message_count, send, receive, peek, peek_batch, renew_lock, abandon, dead_letter, clear)

Bases: object

Data object that contains supported capabilities of a message queue. If certain capability is not supported a queue will throw NotImplemented exception.

can_abandon()

Informs if the queue is able to abandon messages.

Returns:true if queue is able to abandon.
can_clear()

Informs if the queue can be cleared.

Returns:true if queue can be cleared.
can_dead_letter()

Informs if the queue is able to send messages to dead letter queue.

Returns:true if queue is able to send messages to dead letter queue.
can_message_count()

Informs if the queue is able to read number of messages.

Returns:true if queue supports reading message count.
can_peek()

Informs if the queue is able to peek messages.

Returns:true if queue is able to peek messages.
can_peek_batch()

Informs if the queue is able to peek multiple messages in one batch.

Returns:true if queue is able to peek multiple messages in one batch.
can_receive()

Informs if the queue is able to receive messages.

Returns:true if queue is able to receive messages.
can_renew_lock()

Informs if the queue is able to renew message lock.

Returns:true if queue is able to renew message lock.
can_send()

Informs if the queue is able to send messages.

Returns:true if queue is able to send messages.
class pip_services3_messaging.queues.IMessageReceiver

Bases: object

Callback interface to receive incoming messages.

Example:
class MyMessageReceiver(IMessageReceiver):
def receive_message(self, envelop, queue):
print “Received message: ” + envelop.getMessageAsString()

messageQueue = MemoryMessageQueue() messageQueue.listen(“123”, MyMessageReceiver())

messageQueue.open(“123”) messageQueue.send(“123”, MessageEnvelop(None, “mymessage”, “ABC”)) // Output in console: “ABC”

receive_message(message, queue)
class pip_services3_messaging.queues.MessageQueue(name=None)

Bases: pip_services3_commons.config.IConfigurable.IConfigurable, pip_services3_commons.refer.IReferenceable.IReferenceable, pip_services3_messaging.queues.IMessageQueue.IMessageQueue

Abstract message queue.

Abstract message queue that is used as a basis for specific message queue implementations.

### Configuration parameters ###

  • name: name of the message queue
  • connection(s):
    • discovery_key: key to retrieve parameters from discovery service
    • protocol: connection protocol like http, https, tcp, udp
    • host: host name or IP address
    • port: port number
    • uri: resource URI or connection string with all parameters in it
  • credential(s):
  • store_key: key to retrieve parameters from credential store
  • username: user name
  • password: user password
  • access_id: application access id
  • access_key: application secret key

### References ###

  • :logger::*:1.0 (optional) ILogger components to pass log messages
  • :counters::*:1.0 (optional) ICounters components to pass collected measurements
  • :discovery::*:1.0 (optional) IDiscovery components to discover connection(s)
  • :credential-store::*:1.0 (optional) ICredentialStore componetns to lookup credential(s)
begin_listen(correlation_id, receiver)

Listens for incoming messages without blocking the current thread.

Parameters:
  • correlation_id – (optional) transaction id to trace execution through call chain.
  • receiver – a receiver to receive incoming messages.
configure(config)

Configures component by passing configuration parameters.

Parameters:config – configuration parameters to be set.
get_capabilities()

Gets the queue capabilities

Returns:the queue’s capabilities object.
get_name()

Gets the queue name

Returns:the queue name.
open(correlation_id)

Opens the component.

Parameters:correlation_id – (optional) transaction id to trace execution through call chain.
send_as_object(correlation_id, message_type, message)

Sends an object into the queue. Before sending the object is converted into JSON string and wrapped in a [[MessageEnvelop]].

Parameters:
  • correlation_id – (optional) transaction id to trace execution through call chain.
  • message_type – a message type
  • message – an object value to be sent
set_references(references)

Sets references to dependent components.

Parameters:references – references to locate the component dependencies.
class pip_services3_messaging.queues.MemoryMessageQueue(name=None)

Bases: pip_services3_messaging.queues.MessageQueue.MessageQueue, pip_services3_commons.run.ICleanable.ICleanable

Message queue that sends and receives messages within the same process by using shared memory. This queue is typically used for testing to mock real queues.

### Configuration parameters ###

  • name: name of the message queue

### References ###

  • :logger::*:1.0 (optional) ILogger components to pass log messages
  • :counters::*:1.0 (optional) ICounters components to pass collected measurements
Example:

queue = MessageQueue(“myqueue”) queue.send(“123”, MessageEnvelop(None, “mymessage”, “ABC”))

message = queue.receive(“123”, 0) if message != None:

… queue.complete(“123”, message)
class LockedMessage

Bases: object

lock_expiration = None
abandon(message)

Returnes message into the queue and makes it available for all subscribers to receive it again. This method is usually used to return a message which could not be processed at the moment to repeat the attempt. Messages that cause unrecoverable errors shall be removed permanently or/and send to dead letter queue.

Parameters:message – a message to return.
clear(correlation_id)

Clears component state.

Parameters:correlation_id – (optional) transaction id to trace execution through call chain.
close(correlation_id)

Closes component and frees used resources.

Parameters:correlation_id – (optional) transaction id to trace execution through call chain.
complete(message)

Permanently removes a message from the queue. This method is usually used to remove the message after successful processing.

Parameters:message – a message to remove.
end_listen(correlation_id)

Ends listening for incoming messages. When this method is call [[listen]] unblocks the thread and execution continues.

Parameters:correlation_id – (optional) transaction id to trace execution through call chain.
is_opened()

Checks if the component is opened.

Returns:true if the component has been opened and false otherwise.
listen(correlation_id, receiver)

Listens for incoming messages and blocks the current thread until queue is closed.

Parameters:
  • correlation_id – (optional) transaction id to trace execution through call chain.
  • receiver – a receiver to receive incoming messages.
move_to_dead_letter(message)

Permanently removes a message from the queue and sends it to dead letter queue.

Parameters:message – a message to be removed.
peek(correlation_id)

Peeks a single incoming message from the queue without removing it. If there are no messages available in the queue it returns null.

Parameters:correlation_id – (optional) transaction id to trace execution through call chain.
Returns:a message object.
peek_batch(correlation_id, message_count)

Peeks multiple incoming messages from the queue without removing them. If there are no messages available in the queue it returns an empty list.

Parameters:
  • correlation_id – (optional) transaction id to trace execution through call chain.
  • message_count – a maximum number of messages to peek.
Returns:

a list of message objects.

read_message_count()

Reads the current number of messages in the queue to be delivered.

Returns:a number of messages
receive(correlation_id, wait_timeout)

Receives an incoming message and removes it from the queue.

Parameters:
  • correlation_id – (optional) transaction id to trace execution through call chain.
  • wait_timeout – a timeout in milliseconds to wait for a message to come.
Returns:

a message object.

renew_lock(message, lock_timeout)

Renews a lock on a message that makes it invisible from other receivers in the queue. This method is usually used to extend the message processing time.

Parameters:
  • message – a message to extend its lock.
  • lock_timeout – a locking timeout in milliseconds.
send(correlation_id, message)

Sends a message into the queue.

Parameters:
  • correlation_id – (optional) transaction id to trace execution through call chain.
  • envelop – a message envelop to be sent.