"""Replay extension classes"""
from collections import namedtuple, abc
from abc import abstractmethod
from enum import IntEnum, unique
import reprlib
from aiocometd import Extension
from aiocometd.constants import MetaChannel
[docs]@unique
class ReplayOption(IntEnum):
"""Replay options supported by Salesforce"""
#: Receive new events that are broadcast after the client subscribes
NEW_EVENTS = -1
#: Receive all events, including past events that are within the 24-hour
#: retention window and new events sent after subscription
ALL_EVENTS = -2
#: Class for storing a message replay id and its creation date
#:
#: :param str date: Creation date of a message, as a ISO 8601 formatted \
#: datetime string
#: :param int replay_id: Replay id of a message
ReplayMarker = namedtuple("ReplayMarker", "date, replay_id")
[docs]class ReplayMarkerStorage(Extension):
"""Abstract base class for replay marker storage implementations"""
def __init__(self):
super().__init__()
self.replay_fallback = None
async def incoming(self, payload, headers=None):
pass
async def outgoing(self, payload, headers):
for message in payload:
# if the outgoing message is a subscribe message, then insert the
# stored replay id into the message
if message["channel"] == MetaChannel.SUBSCRIBE:
await self.insert_replay_id(message)
async def insert_replay_id(self, message):
"""Insert the stored replay id into the *message*
:param dict message: An outgoing, ``/meta/subscribe`` message
"""
# get the name of the channel, that the message is trying to subscribe
# to
subscription = message["subscription"]
# if there is a replay fallback set, then it must be used as the replay
# id in order to successfully subscribe
if self.replay_fallback:
replay_id = self.replay_fallback
self.replay_fallback = None
# otherwise get the stored replay id
else:
replay_id = await self.get_replay_id(subscription)
# if the replay id is None, then we do not yet have a replay id for the
# given subscription, so don't add anything to the message
if replay_id:
if "ext" not in message:
message["ext"] = {}
message["ext"]["replay"] = {subscription: replay_id}
@staticmethod
def get_message_date(message):
"""Return the creation date of the *message*
:param dict message: An incoming message
:return: Creation date as an ISO 8601 formatted datetime string
:rtype: str
"""
# get the creation date of the message from a PushTopic message
# structure if it exists, or read it from a PlatfromEvent message
# structure
return (message["data"]["event"].get("createdDate") or
message["data"]["payload"].get("CreatedDate"))
async def extract_replay_id(self, message):
"""Extract and store the replay id present int the *message*
:param dict message: An incoming broadcast message
"""
# get the name of the subscription
subscription = message["channel"]
# create the replay marker object from the creation date and the
# actual id
marker = ReplayMarker(date=self.get_message_date(message),
replay_id=message["data"]["event"]["replayId"])
# get the last, stored, replay marker
last_marker = await self.get_replay_marker(subscription)
# only store the extracted replay marker, if there is no replay \
# marker for the subscription yet, or if the stored replay marker is\
# older then the extracted one or it has the same data (otherwise,
# we're seeing a replayed message, and in that case, it shouldn't be
# stored)
if not last_marker or last_marker.date <= marker.date:
await self.set_replay_marker(subscription, marker)
async def get_replay_id(self, subscription):
"""Retrieve a stored replay id for the given *subscription*
:param str subscription: Name of the subscribed channel
:return: A replay id or ``None`` if there is nothing stored for \
the given *subscription*
:rtype: int
"""
marker = await self.get_replay_marker(subscription)
if marker:
return marker.replay_id
return None
[docs] @abstractmethod
async def get_replay_marker(self, subscription):
"""Retrieve a stored replay marker for the given *subscription*
:param str subscription: Name of the subscribed channel
:return: A replay marker or ``None`` if there is nothing stored for \
the given *subscription*
:rtype: ReplayMarker or None
"""
[docs] @abstractmethod
async def set_replay_marker(self, subscription, replay_marker):
"""Store the *replay_marker* for the given *subscription*
:param str subscription: Name of the subscribed channel
:param ReplayMarker replay_marker: A replay marker
"""
[docs]class MappingStorage(ReplayMarkerStorage):
"""Mapping based replay marker storage"""
def __init__(self, mapping):
"""
:param mapping: A MutableMapping object for storing replay markers
:type mapping: collections.abc.MutableMapping
"""
if not isinstance(mapping, abc.MutableMapping):
raise TypeError("mapping parameter should be an instance of "
"MutableMapping.")
super().__init__()
#: A MutableMapping object for storing replay markers
self.mapping = mapping
def __repr__(self):
"""Formal string representation"""
cls_name = type(self).__name__
return f"{cls_name}(mapping={reprlib.repr(self.mapping)})"
async def set_replay_marker(self, subscription, replay_marker):
self.mapping[subscription] = replay_marker
async def get_replay_marker(self, subscription):
try:
return self.mapping[subscription]
except KeyError:
return None
class DefaultReplayIdMixin: # pylint: disable=too-few-public-methods
"""A mixin class that will return a default, constant replay id if
there is not replay marker for the given subscription"""
def __init__(self, default_id, **kwargs):
"""
:param int default_id: A replay id
"""
super().__init__(**kwargs)
#: A replay id
self.default_id = default_id
async def get_replay_id(self, subscription):
"""Retrieve a stored replay id for the given *subscription*
:param str subscription: Name of the subscribed channel
:return: The default, constant replay id if there is not replay \
marker for the given subscription
:rtype: int
"""
marker = await self.get_replay_marker(subscription)
if marker:
return marker.replay_id
return self.default_id
[docs]class ConstantReplayId(DefaultReplayIdMixin, ReplayMarkerStorage):
"""A replay marker storage which will return a constant replay id for
every subscription
.. note::
This implementations doesn't actually stores anything for later
retrieval.
"""
def __repr__(self):
"""Formal string representation"""
cls_name = type(self).__name__
return f"{cls_name}(default_id={reprlib.repr(self.default_id)})"
async def set_replay_marker(self, subscription, replay_marker):
pass
async def get_replay_marker(self, subscription):
return None
[docs]class DefaultMappingStorage(DefaultReplayIdMixin, MappingStorage):
"""Mapping based replay marker storage which will return a defualt
replay id if there is not replay marker for the given subscription """
def __init__(self, mapping, default_id):
"""
:param mapping: A MutableMapping object for storing replay markers
:type mapping: collections.abc.MutableMapping
:param int default_id: A replay id
"""
super().__init__(mapping=mapping, default_id=default_id)
def __repr__(self):
"""Formal string representation"""
cls_name = type(self).__name__
return f"{cls_name}(mapping={reprlib.repr(self.mapping)}, " \
f"default_id={reprlib.repr(self.default_id)})"