Advanced Usage¶
Replay configuration¶
Salesforce stores events for 24 hours. Events outside the 24-hour retention period are discarded.
ReplayOption¶
A subscriber can choose which events to receive, such as all events within
the retention window or starting after a particular event. In the
Client object this can be specified with the replay parameter.
The default is to receive only the new events sent after subscribing, the
default replay parameter is ReplayOption.NEW_EVENTS
This high-level diagram shows how event consumers can read a stream of events by using various replay options.
If you want to receive all events within the retention window every time the
Client connects, before receiving new events, then the
ReplayOption.ALL_EVENTS value should be passed to the
Client.
async with SalesforceStreamingClient(
consumer_key="<consumer key>",
consumer_secret="<consumer secret>",
username="<username>",
password="<password>",
replay=ReplayOption.ALL_EVENTS) as client:
await client.subscribe("/topic/foo")
async for message in client:
# process message
ReplayMarkerStorage¶
Although using a fixed ReplayOption can be sometimes useful, the
real advantage of using Salesforce’s replay extension comes from being able
to continue to process event messages from the point where the client left off.
To take advantage of this feature, all you have to do is to
pass an object capable of storing the most recent ReplayMarker for
every channel.
Salesforce extends the event messages with replayId and createdDate
fields (called as ReplayMarker by aiosfstream).
The simplest way is to pass an object for the replay parameter that
inherits from collections.abc.MutableMapping. This can be a simple
dict, OrderedDict or if you want to use
persistent storage then a Shelf object, or maybe one of the
key-value database drivers that inherit from
collections.abc.MutableMapping.
with shelve.open("replay.db") as replay:
async with SalesforceStreamingClient(
consumer_key="<consumer key>",
consumer_secret="<consumer secret>",
username="<username>",
password="<password>",
replay=replay) as client:
await client.subscribe("/topic/foo")
async for message in client:
# process message
By using a collections.abc.MutableMapping object, the client on the
first connection will receive only new events, and on reconnection will
continue from the last unretrieved message. If you want to receive all events
from the retention window before continuing with new events, combined with the
advantage of continuation on the next reconnect, then you can pass a
DefaultMappingStorage object to the replay parameter.
with shelve.open("replay.db") as replay:
default_mapping = DefaultMappingStorage(
replay,
ReplayOption.ALL_EVENTS
)
async with SalesforceStreamingClient(
consumer_key="<consumer key>",
consumer_secret="<consumer secret>",
username="<username>",
password="<password>",
replay=default_mapping) as client:
await client.subscribe("/topic/foo")
async for message in client:
# process message
If you want complete control over how ReplayMarkers
are stored and retrieved or you want to use your favorite database whose
driver doesn’t inherit from collections.abc.MutableMapping then
you can provide your own ReplayMarkerStorage implementation.
class MyReplayMarkerStorage(ReplayMarkerStorage):
async def set_replay_marker(self, subscription, replay_marker):
# store *replay_marker* for the given *subscription*
async def get_replay_marker(self, subscription):
# retrieve the replay marker for the given *subscription*
replay = MyReplayMarkerStorage()
async with SalesforceStreamingClient(
consumer_key="<consumer key>",
consumer_secret="<consumer secret>",
username="<username>",
password="<password>",
replay=replay) as client:
await client.subscribe("/topic/foo")
async for message in client:
# process message
Subscription errors¶
Events outside the 24-hour retention period are discarded. If you’re using some
form of ReplayMarkerStorage or a
MutableMapping object, and if you’re client doesn’t
connects to the Streaming API for more then 24 hours, then it’s possible that
the client will try to continue retrieving messages from a very old message
outside the retention window. Since Salesforce no longer has the event message
that the client would try to retrieve, it would raise
ServerError.
try:
await client.subscribe("/topic/foo")
except ServerError as error:
print(error.error_message)
The above code would print the following message, if the client would request and event outside the retention window:
The replayId {1} you provided was invalid. Please provide a valid ID, -2
to replay all events, or -1 to replay only new events.
To recover from an error like the above, you would have to discard the
ReplayMarker for the problematic channel, and try to subscribe again.
try:
await client.subscribe("/topic/foo")
except ServerError as error:
del replay["/topic/foo"]
await client.subscribe(/topic/foo")
To spare you the hassle of recovering from errors like the one above, you can
pass a ReplayOption for the replay_fallback parameter. If a
subscription error occurs, then Client will try to resubscribe using
the specified ReplayOption.
with shelve.open("replay.db") as replay:
async with SalesforceStreamingClient(
consumer_key="<consumer key>",
consumer_secret="<consumer secret>",
username="<username>",
password="<password>",
replay=replay,
replay_fallback=ReplayOption.ALL_EVENTS) as client:
await client.subscribe("/topic/foo")
async for message in client:
# process message
ReplayMarkerStoragePolicy¶
If you’re using some form of ReplayMarkerStorage or a MutableMapping object, the
ReplayMarker values for messages are stored just before
they’re returned by the receive() method, or yielded from the
asynchronous iterator of the Client object if used as an iterable.
This behaviour might not be optimal in situations when there is a high
probability that the processing of the message might fail on the side of the
user of the Client. If the message processing code would raise an
exception, the message wouldn’t be processed completely, but since the message’s
ReplayMarker is already stored at that point, the
failed message wouldn’t be re-retrieved the next time the Client is
opened.
In order to be able to retrieve unsuccessfully processed messages, users can
take control at which point the ReplayMarkers
are stored. Pass the ReplayMarkerStoragePolicy.MANUAL option
to the replay_storage_policy parameter when creating the Client
and call the extract_replay_id() method of
Client.replay_storage once the message is successfully processed.
with shelve.open("replay.db") as replay:
async with SalesforceStreamingClient(
consumer_key="<consumer key>",
consumer_secret="<consumer secret>",
username="<username>",
password="<password>",
replay=replay,
replay_storage_policy=ReplayMarkerStoragePolicy.MANUAL) as client:
await client.subscribe("/topic/foo")
async for message in client:
# message processing code raising errors...
await client.replay_storage.extract_replay_id(message)
The Client.replay_storage attribute can also be used as an
asynchronous context manager, which will only store the
ReplayMarker of the given message if no errors are
raised in the runtime context.
with shelve.open("replay.db") as replay: async with SalesforceStreamingClient( consumer_key="<consumer key>", consumer_secret="<consumer secret>", username="<username>", password="<password>", replay=replay, replay_storage_policy=ReplayMarkerStoragePolicy.MANUAL) as client: await client.subscribe("/topic/foo") async for message in client: async with client.replay_storage(message): # message processing code raising errors...
Network failures¶
When a Client object is opened, it will try to maintain a continuous
connection in the background with the server. If any network failures happen
while waiting to receive() messages, the client will reconnect
to the server transparently, it will resubscribe to the subscribed channels,
and continue to wait for incoming messages.
To avoid waiting for a server which went offline permanently, or in case of a
permanent network failure, a connection_timeout can be passed to the
Client, to limit how many seconds the client object should wait
before raising a TransportTimeoutError if it can’t
reconnect to the server.
client = SalesforceStreamingClient(
consumer_key="<consumer key>",
consumer_secret="<consumer secret>",
username="<username>",
password="<password>",
connection_timeout=60
)
await client.open()
try:
message = await client.receive()
except TransportTimeoutError:
print("Connection is lost with the server. "
"Couldn't reconnect in 60 seconds.")
The defaul value is 10 seconds. If you pass None as the
connection_timeout value, then the client will keep on trying indefinitely.
Prefetching¶
When a Client is opened it will start and maintain a connection in
the background with the server. It will start to fetch messages from the
server as soon as it’s connected, even before receive() is
called.
Prefetching messages has the advantage, that incoming messages will
wait in a buffer for users to consume them when receive()
is called, without any delay.
To avoid consuming all the available memory by the incoming messages, which are
not consumed yet, the number of prefetched messages can be limited with the
max_pending_count parameter of the Client. The default value is
100.
client = SalesforceStreamingClient(
consumer_key="<consumer key>",
consumer_secret="<consumer secret>",
username="<username>",
password="<password>",
max_pending_count=42
)
The current number of messages waiting to be consumed can be obtained from the
Client.pending_count attribute.
JSON encoder/decoder¶
Besides the standard json module, many third party libraries offer
JSON serialization/deserilization functionality. To use a different library for
handling JSON data types, you can specify the callable to use for serialization
with the json_dumps and the callable for deserialization with the
json_loads parameters of the Client.
import ujson
client = SalesforceStreamingClient(
consumer_key="<consumer key>",
consumer_secret="<consumer secret>",
username="<username>",
password="<password>",
json_dumps=ujson.dumps,
json_loads=ujson.loads
)