pip_services3_messaging.queues package¶
Submodules¶
- pip_services3_messaging.queues.IMessageQueue module
- pip_services3_messaging.queues.IMessageReceiver module
- pip_services3_messaging.queues.MemoryMessageQueue module
- pip_services3_messaging.queues.MessageEnvelop module
- pip_services3_messaging.queues.MessageQueue module
- pip_services3_messaging.queues.MessagingCapabilities module
Module contents¶
pip_services3_messaging.queues.__init__¶
Queues module initialization
| copyright: | Conceptual Vision Consulting LLC 2018-2019, see AUTHORS for more details. |
|---|---|
| license: | MIT, see LICENSE for more details. |
-
class
pip_services3_messaging.queues.IMessageQueue¶ Bases:
pip_services3_commons.run.IOpenable.IOpenable,pip_services3_commons.run.IClosable.IClosableInterface for asynchronous message queues.
Not all queues may implement all the methods. Attempt to call non-supported method will result in NotImplemented exception. To verify if specific method is supported consult with [[MessagingCapabilities]].
-
abandon(message)¶ Returnes message into the queue and makes it available for all subscribers to receive it again. This method is usually used to return a message which could not be processed at the moment to repeat the attempt. Messages that cause unrecoverable errors shall be removed permanently or/and send to dead letter queue.
Parameters: message – a message to return.
-
begin_listen(correlation_id, receiver)¶ Listens for incoming messages without blocking the current thread.
Parameters: - correlation_id – (optional) transaction id to trace execution through call chain.
- receiver – a receiver to receive incoming messages.
-
complete(message)¶ Permanently removes a message from the queue. This method is usually used to remove the message after successful processing.
Parameters: message – a message to remove.
-
end_listen(correlation_id)¶ Ends listening for incoming messages. When this method is call [[listen]] unblocks the thread and execution continues.
Parameters: correlation_id – (optional) transaction id to trace execution through call chain.
-
get_capabilities()¶ Gets the queue capabilities
Returns: the queue’s capabilities object.
-
get_name()¶ Gets the queue name
Returns: the queue name.
-
listen(correlation_id, receiver)¶ Listens for incoming messages and blocks the current thread until queue is closed.
Parameters: - correlation_id – (optional) transaction id to trace execution through call chain.
- receiver – a receiver to receive incoming messages.
-
move_to_dead_letter(message)¶ Permanently removes a message from the queue and sends it to dead letter queue.
Parameters: message – a message to be removed.
-
peek(correlation_id)¶ Peeks a single incoming message from the queue without removing it. If there are no messages available in the queue it returns null.
Parameters: correlation_id – (optional) transaction id to trace execution through call chain. Returns: a message object.
-
peek_batch(correlation_id, message_count)¶ Peeks multiple incoming messages from the queue without removing them. If there are no messages available in the queue it returns an empty list.
Parameters: - correlation_id – (optional) transaction id to trace execution through call chain.
- message_count – a maximum number of messages to peek.
Returns: a list of message objects.
-
read_message_count()¶ Reads the current number of messages in the queue to be delivered.
Returns: a number of messages
-
receive(correlation_id, wait_timeout)¶ Receives an incoming message and removes it from the queue.
Parameters: - correlation_id – (optional) transaction id to trace execution through call chain.
- wait_timeout – a timeout in milliseconds to wait for a message to come.
Returns: a message object.
-
renew_lock(message, lock_timeout)¶ Renews a lock on a message that makes it invisible from other receivers in the queue. This method is usually used to extend the message processing time.
Parameters: - message – a message to extend its lock.
- lock_timeout – a locking timeout in milliseconds.
-
send(correlation_id, envelop)¶ Sends a message into the queue.
Parameters: - correlation_id – (optional) transaction id to trace execution through call chain.
- envelop – a message envelop to be sent.
-
send_as_object(correlation_id, message_type, message)¶ Sends an object into the queue. Before sending the object is converted into JSON string and wrapped in a [[MessageEnvelop]].
Parameters: - correlation_id – (optional) transaction id to trace execution through call chain.
- message_type – a message type
- message – an object value to be sent
-
-
class
pip_services3_messaging.queues.MessageEnvelop(correlation_id=None, message_type=None, message=None)¶ Bases:
objectAllows adding additional information to messages. A correlation id, message id, and a message type are added to the data being sent/received. Additionally, a MessageEnvelope can reference a lock token.
Side note: a MessageEnvelope’s message is stored as a buffer, so strings are converted using utf8 conversions.
-
correlation_id= None¶
-
message= None¶
-
message_id= None¶
-
message_type= None¶
-
reference= None¶
-
-
class
pip_services3_messaging.queues.MessagingCapabilities(message_count, send, receive, peek, peek_batch, renew_lock, abandon, dead_letter, clear)¶ Bases:
objectData object that contains supported capabilities of a message queue. If certain capability is not supported a queue will throw NotImplemented exception.
-
can_abandon()¶ Informs if the queue is able to abandon messages.
Returns: true if queue is able to abandon.
-
can_clear()¶ Informs if the queue can be cleared.
Returns: true if queue can be cleared.
-
can_dead_letter()¶ Informs if the queue is able to send messages to dead letter queue.
Returns: true if queue is able to send messages to dead letter queue.
-
can_message_count()¶ Informs if the queue is able to read number of messages.
Returns: true if queue supports reading message count.
-
can_peek()¶ Informs if the queue is able to peek messages.
Returns: true if queue is able to peek messages.
-
can_peek_batch()¶ Informs if the queue is able to peek multiple messages in one batch.
Returns: true if queue is able to peek multiple messages in one batch.
-
can_receive()¶ Informs if the queue is able to receive messages.
Returns: true if queue is able to receive messages.
-
can_renew_lock()¶ Informs if the queue is able to renew message lock.
Returns: true if queue is able to renew message lock.
-
can_send()¶ Informs if the queue is able to send messages.
Returns: true if queue is able to send messages.
-
-
class
pip_services3_messaging.queues.IMessageReceiver¶ Bases:
objectCallback interface to receive incoming messages.
- Example:
- class MyMessageReceiver(IMessageReceiver):
- def receive_message(self, envelop, queue):
- print “Received message: ” + envelop.getMessageAsString()
messageQueue = MemoryMessageQueue() messageQueue.listen(“123”, MyMessageReceiver())
messageQueue.open(“123”) messageQueue.send(“123”, MessageEnvelop(None, “mymessage”, “ABC”)) // Output in console: “ABC”
-
receive_message(message, queue)¶
-
class
pip_services3_messaging.queues.MessageQueue(name=None)¶ Bases:
pip_services3_commons.config.IConfigurable.IConfigurable,pip_services3_commons.refer.IReferenceable.IReferenceable,pip_services3_messaging.queues.IMessageQueue.IMessageQueueAbstract message queue.
Abstract message queue that is used as a basis for specific message queue implementations.
### Configuration parameters ###
- name: name of the message queue
- connection(s):
- discovery_key: key to retrieve parameters from discovery service
- protocol: connection protocol like http, https, tcp, udp
- host: host name or IP address
- port: port number
- uri: resource URI or connection string with all parameters in it
- credential(s):
- store_key: key to retrieve parameters from credential store
- username: user name
- password: user password
- access_id: application access id
- access_key: application secret key
### References ###
- :logger::*:1.0 (optional) ILogger components to pass log messages
- :counters::*:1.0 (optional) ICounters components to pass collected measurements
- :discovery::*:1.0 (optional) IDiscovery components to discover connection(s)
- :credential-store::*:1.0 (optional) ICredentialStore componetns to lookup credential(s)
-
begin_listen(correlation_id, receiver)¶ Listens for incoming messages without blocking the current thread.
Parameters: - correlation_id – (optional) transaction id to trace execution through call chain.
- receiver – a receiver to receive incoming messages.
-
configure(config)¶ Configures component by passing configuration parameters.
Parameters: config – configuration parameters to be set.
-
get_capabilities()¶ Gets the queue capabilities
Returns: the queue’s capabilities object.
-
get_name()¶ Gets the queue name
Returns: the queue name.
-
open(correlation_id)¶ Opens the component.
Parameters: correlation_id – (optional) transaction id to trace execution through call chain.
-
send_as_object(correlation_id, message_type, message)¶ Sends an object into the queue. Before sending the object is converted into JSON string and wrapped in a [[MessageEnvelop]].
Parameters: - correlation_id – (optional) transaction id to trace execution through call chain.
- message_type – a message type
- message – an object value to be sent
-
set_references(references)¶ Sets references to dependent components.
Parameters: references – references to locate the component dependencies.
-
class
pip_services3_messaging.queues.MemoryMessageQueue(name=None)¶ Bases:
pip_services3_messaging.queues.MessageQueue.MessageQueue,pip_services3_commons.run.ICleanable.ICleanableMessage queue that sends and receives messages within the same process by using shared memory. This queue is typically used for testing to mock real queues.
### Configuration parameters ###
- name: name of the message queue
### References ###
- Example:
queue = MessageQueue(“myqueue”) queue.send(“123”, MessageEnvelop(None, “mymessage”, “ABC”))
message = queue.receive(“123”, 0) if message != None:
… queue.complete(“123”, message)
-
abandon(message)¶ Returnes message into the queue and makes it available for all subscribers to receive it again. This method is usually used to return a message which could not be processed at the moment to repeat the attempt. Messages that cause unrecoverable errors shall be removed permanently or/and send to dead letter queue.
Parameters: message – a message to return.
-
clear(correlation_id)¶ Clears component state.
Parameters: correlation_id – (optional) transaction id to trace execution through call chain.
-
close(correlation_id)¶ Closes component and frees used resources.
Parameters: correlation_id – (optional) transaction id to trace execution through call chain.
-
complete(message)¶ Permanently removes a message from the queue. This method is usually used to remove the message after successful processing.
Parameters: message – a message to remove.
-
end_listen(correlation_id)¶ Ends listening for incoming messages. When this method is call [[listen]] unblocks the thread and execution continues.
Parameters: correlation_id – (optional) transaction id to trace execution through call chain.
-
is_opened()¶ Checks if the component is opened.
Returns: true if the component has been opened and false otherwise.
-
listen(correlation_id, receiver)¶ Listens for incoming messages and blocks the current thread until queue is closed.
Parameters: - correlation_id – (optional) transaction id to trace execution through call chain.
- receiver – a receiver to receive incoming messages.
-
move_to_dead_letter(message)¶ Permanently removes a message from the queue and sends it to dead letter queue.
Parameters: message – a message to be removed.
-
peek(correlation_id)¶ Peeks a single incoming message from the queue without removing it. If there are no messages available in the queue it returns null.
Parameters: correlation_id – (optional) transaction id to trace execution through call chain. Returns: a message object.
-
peek_batch(correlation_id, message_count)¶ Peeks multiple incoming messages from the queue without removing them. If there are no messages available in the queue it returns an empty list.
Parameters: - correlation_id – (optional) transaction id to trace execution through call chain.
- message_count – a maximum number of messages to peek.
Returns: a list of message objects.
-
read_message_count()¶ Reads the current number of messages in the queue to be delivered.
Returns: a number of messages
-
receive(correlation_id, wait_timeout)¶ Receives an incoming message and removes it from the queue.
Parameters: - correlation_id – (optional) transaction id to trace execution through call chain.
- wait_timeout – a timeout in milliseconds to wait for a message to come.
Returns: a message object.
-
renew_lock(message, lock_timeout)¶ Renews a lock on a message that makes it invisible from other receivers in the queue. This method is usually used to extend the message processing time.
Parameters: - message – a message to extend its lock.
- lock_timeout – a locking timeout in milliseconds.
-
send(correlation_id, message)¶ Sends a message into the queue.
Parameters: - correlation_id – (optional) transaction id to trace execution through call chain.
- envelop – a message envelop to be sent.