Notification Listener

A notification listener exposes a number of endpoints, each of which contain a set of methods. Each method corresponds to a notification priority.

To create a notification listener, you supply a transport, list of targets and a list of endpoints.

A transport can be obtained simply by calling the get_transport() method:

transport = messaging.get_transport(conf)

which will load the appropriate transport driver according to the user’s messaging configuration configuration. See get_transport() for more details.

The target supplied when creating a notification listener expresses the topic and - optionally - the exchange to listen on. See Target for more details on these attributes.

Notification listener have start(), stop() and wait() messages to begin handling requests, stop handling requests and wait for all in-process requests to complete.

Each notification listener is associated with an executor which integrates the listener with a specific I/O handling framework. Currently, there are blocking and eventlet executors available.

A simple example of a notification listener with multiple endpoints might be:

from oslo.config import cfg
from oslo import messaging

class NotificationEndpoint(object):
    def warn(self, ctxt, publisher_id, event_type, payload, metadata):
        do_something(payload)

class ErrorEndpoint(object):
    def error(self, ctxt, publisher_id, event_type, payload, metadata):
        do_something(payload)

transport = messaging.get_transport(cfg.CONF)
targets = [
    messaging.Target(topic='notifications')
    messaging.Target(topic='notifications_bis')
]
endpoints = [
    NotificationEndpoint(),
    ErrorEndpoint(),
]
server = messaging.get_notification_listener(transport, targets, endpoints)
server.start()
server.wait()

A notifier sends a notification on a topic with a priority, the notification listener will receive this notification if the topic of this one have been set in one of the targets and if an endpoint implements the method named like the priority

Parameters to endpoint methods are the request context supplied by the client, the publisher_id of the notification message, the event_type, the payload and metadata. The metadata parameter is a mapping containing a unique message_id and a timestamp.

By supplying a serializer object, a listener can deserialize a request context and arguments from - and serialize return values to - primitive types.

An endpoint method can explicitly return messaging.NotificationResult.HANDLED to acknowledge a message or messaging.NotificationResult.REQUEUE to requeue the message.

The message is acknowledged only if all endpoints either return messaging.NotificationResult.HANDLED or None.

Note that not all transport drivers implement support for requeueing. In order to use this feature, applications should assert that the feature is available by passing allow_requeue=True to get_notification_listener(). If the driver does not support requeueing, it will raise NotImplementedError at this point.

oslo.messaging.get_notification_listener(transport, targets, endpoints, executor='blocking', serializer=None, allow_requeue=False)

Construct a notification listener

The executor parameter controls how incoming messages will be received and dispatched. By default, the most simple executor is used - the blocking executor.

Parameters:
  • transport (Transport) – the messaging transport
  • targets (list of Target) – the exchanges and topics to listen on
  • endpoints (list) – a list of endpoint objects
  • executor (str) – name of a message executor - e.g. ‘eventlet’, ‘blocking’
  • serializer (Serializer) – an optional entity serializer
  • allow_requeue (bool) – whether NotificationResult.REQUEUE support is needed
Raises :

NotImplementedError

class oslo.messaging.MessageHandlingServer(transport, dispatcher, executor='blocking')

Server for handling messages.

Connect a transport to a dispatcher that knows how process the message using an executor that knows how the app wants to create new tasks.

start()

Start handling incoming messages.

This method causes the server to begin polling the transport for incoming messages and passing them to the dispatcher. Message processing will continue until the stop() method is called.

The executor controls how the server integrates with the applications I/O handling strategy - it may choose to poll for messages in a new process, thread or co-operatively scheduled coroutine or simply by registering a callback with an event loop. Similarly, the executor may choose to dispatch messages in a new thread, coroutine or simply the current thread. An RPCServer subclass is available for each I/O strategy supported by the library, so choose the subclass appropriate for your program.

stop()

Stop handling incoming messages.

Once this method returns, no new incoming messages will be handled by the server. However, the server may still be in the process of handling some messages.

wait()

Wait for message processing to complete.

After calling stop(), there may still be some some existing messages which have not been completely processed. The wait() method blocks until all message processing has completed.

oslo.messaging.get_local_context(ctxt)

Retrieve the RPC endpoint request context for the current thread.

This method allows any code running in the context of a dispatched RPC endpoint method to retrieve the context for this request.

This is commonly used for logging so that, for example, you can include the request ID, user and tenant in every message logged from a RPC endpoint method.

Returns:the context for the request dispatched in the current thread

Previous topic

Notifier

Next topic

Serializer

This Page