API Reference

Client

class aiosfstream.SalesforceStreamingClient(*, consumer_key, consumer_secret, username, password, replay=<ReplayOption.NEW_EVENTS: -1>, replay_fallback=None, replay_storage_policy=<ReplayMarkerStoragePolicy.AUTOMATIC: 1>, connection_timeout=10.0, max_pending_count=100, sandbox=False, json_dumps=<function dumps>, json_loads=<function loads>, loop=None)[source]

Salesforce Streaming API client with username/password authentication

This is a convenience class which is suitable for the most common use case. To use a different authentication method, use the general Client class with a different Authenticator

Parameters:
  • consumer_key (str) – Consumer key from the Salesforce connected app definition
  • consumer_secret (str) – Consumer secret from the Salesforce connected app definition
  • username (str) – Salesforce username
  • password (str) – Salesforce password
  • replay (Union[ReplayOption, ReplayMarkerStorage, Mutablemapping[str, ReplayMarker]]) – A ReplayOption or an object capable of storing replay ids if you want to take advantage of Salesforce’s replay extension. You can use one of the ReplayOptions, or an object that supports the MutableMapping protocol like dict, defaultdict, Shelf etc. or a custom ReplayMarkerStorage implementation.
  • replay_fallback (Optional[ReplayOption]) – Replay fallback policy, for when a subscribe operation fails because a replay id was specified for a message outside the retention window
  • replay_storage_policy (ReplayMarkerStoragePolicy) – Defines at which point the replay marker of received messages will be stored
  • connection_timeout (Union[int, float]) – The maximum amount of time to wait for the transport to re-establish a connection with the server when the connection fails.
  • max_pending_count (int) – The maximum number of messages to prefetch from the server. If the number of prefetched messages reach this size then the connection will be suspended, until messages are consumed. If it is less than or equal to zero, the count is infinite.
  • sandbox (bool) – Marks whether the connection has to be made with a sandbox org or with a production org
  • json_dumps (Callable[[Dict[str, Any]], str]) – Function for JSON serialization, the default is json.dumps()
  • json_loads (Callable[[str], Dict[str, Any]]) – Function for JSON deserialization, the default is json.loads()
  • loop (Optional[AbstractEventLoop]) – Event loop used to schedule tasks. If loop is None then asyncio.get_event_loop() is used to get the default event loop.
class aiosfstream.Client(authenticator, *, replay=<ReplayOption.NEW_EVENTS: -1>, replay_fallback=None, replay_storage_policy=<ReplayMarkerStoragePolicy.AUTOMATIC: 1>, connection_timeout=10.0, max_pending_count=100, json_dumps=<function dumps>, json_loads=<function loads>, loop=None)[source]

Salesforce Streaming API client

Parameters:
  • authenticator (AuthenticatorBase) – An authenticator object
  • replay (Union[ReplayOption, ReplayMarkerStorage, Mutablemapping[str, ReplayMarker]]) – A ReplayOption or an object capable of storing replay ids if you want to take advantage of Salesforce’s replay extension. You can use one of the ReplayOptions, or an object that supports the MutableMapping protocol like dict, defaultdict, Shelf etc. or a custom ReplayMarkerStorage implementation.
  • replay_fallback (Optional[ReplayOption]) – Replay fallback policy, for when a subscribe operation fails because a replay id was specified for a message outside the retention window
  • replay_storage_policy (ReplayMarkerStoragePolicy) – Defines at which point the replay marker of received messages will be stored
  • connection_timeout (Union[int, float]) – The maximum amount of time to wait for the transport to re-establish a connection with the server when the connection fails.
  • max_pending_count (int) – The maximum number of messages to prefetch from the server. If the number of prefetched messages reach this size then the connection will be suspended, until messages are consumed. If it is less than or equal to zero, the count is infinite.
  • json_dumps (Callable[[Dict[str, Any]], str]) – Function for JSON serialization, the default is json.dumps()
  • json_loads (Callable[[str], Dict[str, Any]]) – Function for JSON deserialization, the default is json.loads()
  • loop (Optional[AbstractEventLoop]) – Event loop used to schedule tasks. If loop is None then asyncio.get_event_loop() is used to get the default event loop.
coroutine open(self)[source]

Establish a connection with the Streaming API endpoint

Raises:
  • ClientError – If none of the connection types offered by the server are supported
  • ClientInvalidOperation – If the client is already open, or in other words if it isn’t closed
  • TransportError – If a network or transport related error occurs
  • ServerError – If the handshake or the first connect request gets rejected by the server.
  • AuthenticationError – If the server rejects the authentication request or if a network failure occurs during the authentication
Return type:

None

coroutine close(self)[source]

Disconnect from the CometD server

Return type:None
coroutine publish(self, channel, data)[source]

Publish data to the given channel

Warning

The Streaming API is implemented on top of CometD. The publish operation is a CometD operation. While it’s still a legal operation, Salesforce chose not to implement the publishing of Generic Streaming and Platform events with CometD.

You should use the REST API to generate Generic Streaming events, or use the REST or SOAP API to publish Platform events.

Parameters:
  • channel (str) – Name of the channel
  • data (Dict[str, Any]) – Data to send to the server
Return type:

Dict[str, Any]

Returns:

Publish response

Raises:
coroutine subscribe(self, channel)[source]

Subscribe to channel

Parameters:

channel (str) – Name of the channel

Raises:
Return type:

None

coroutine unsubscribe(self, channel)[source]

Unsubscribe from channel

Parameters:

channel (str) – Name of the channel

Raises:
Return type:

None

coroutine receive(self)[source]

Wait for incoming messages from the server

Return type:

Dict[str, Any]

Returns:

Incoming message

Raises:
  • ClientInvalidOperation – If the client is closed, and has no more pending incoming messages
  • ServerError – If the client receives a confirmation message which is not successful
  • TransportTimeoutError – If the transport can’t re-establish connection with the server in connection_timeout time.
  • ReplayError – On a message replay or replay marker storage related error
closed

Marks whether the client is open or closed

Return type:bool
subscriptions

Set of subscribed channels

Return type:Set[str]
connection_type

The current connection type in use if the client is open, otherwise None

Return type:Optional[ConnectionType]
pending_count

The number of pending incoming messages

Once open is called the client starts listening for messages from the server. The incoming messages are retrieved and stored in an internal queue until they get consumed by calling receive.

Return type:int
has_pending_messages

Marks whether the client has any pending incoming messages

Return type:bool
coroutine close(self)[source]

Disconnect from the CometD server

Return type:None
static create_replay_storage(replay_param)[source]

Create a ReplayMarkerStorage object based from replay_param

Parameters:replay_param (Union[ReplayOption, ReplayMarkerStorage, Mutablemapping[str, ReplayMarker]]) – One of the supported replay_param type objects
Return type:Optional[ReplayMarkerStorage]
Returns:A new ReplayMarkerStorage object or replay_param if it’s already an instance of ReplayMarkerStorage object, or None if replay_param is None
static get_cometd_url(instance_url)[source]

Get the CometD URL associated with the instance_url

Parameters:instance_url (str) – Salesforce instance URL
Return type:str
Returns:CometD URL associated with the instance_url
coroutine open(self)[source]

Establish a connection with the Streaming API endpoint

Raises:
  • ClientError – If none of the connection types offered by the server are supported
  • ClientInvalidOperation – If the client is already open, or in other words if it isn’t closed
  • TransportError – If a network or transport related error occurs
  • ServerError – If the handshake or the first connect request gets rejected by the server.
  • AuthenticationError – If the server rejects the authentication request or if a network failure occurs during the authentication
Return type:

None

coroutine publish(self, channel, data)[source]

Publish data to the given channel

Warning

The Streaming API is implemented on top of CometD. The publish operation is a CometD operation. While it’s still a legal operation, Salesforce chose not to implement the publishing of Generic Streaming and Platform events with CometD.

You should use the REST API to generate Generic Streaming events, or use the REST or SOAP API to publish Platform events.

Parameters:
  • channel (str) – Name of the channel
  • data (Dict[str, Any]) – Data to send to the server
Return type:

Dict[str, Any]

Returns:

Publish response

Raises:
coroutine receive(self)[source]

Wait for incoming messages from the server

Return type:

Dict[str, Any]

Returns:

Incoming message

Raises:
  • ClientInvalidOperation – If the client is closed, and has no more pending incoming messages
  • ServerError – If the client receives a confirmation message which is not successful
  • TransportTimeoutError – If the transport can’t re-establish connection with the server in connection_timeout time.
  • ReplayError – On a message replay or replay marker storage related error
replay_fallback = None

Replay fallback policy, for when a subscribe operation fails because a replay id was specified for a message outside the retention window

replay_storage = None

ReplayMarkerStorage instance capable of storing ReplayMarker objects

replay_storage_policy = None

Defines at which point the ReplayMarker of received messages will be stored

coroutine subscribe(self, channel)[source]

Subscribe to channel

Parameters:

channel (str) – Name of the channel

Raises:
Return type:

None

coroutine unsubscribe(self, channel)[source]

Unsubscribe from channel

Parameters:

channel (str) – Name of the channel

Raises:
Return type:

None

class aiosfstream.ReplayMarkerStoragePolicy[source]

Defines the available replay marker storage policies

AUTOMATIC = 1

Store the replay marker of messages automatically, as soon as they’re received. The downside of this approach is that the replay marker of a message will be stored (thus marking it as successfully consumed) even if the processing of the message fails in the client side code

MANUAL = 2

Store the replay marker of messages manually, after the message has been successfully processed in client side code

Authenticators

class aiosfstream.auth.AuthenticatorBase(sandbox=False, json_dumps=<function dumps>, json_loads=<function loads>)[source]

Abstract base class to serve as a base for implementing concrete authenticators

Parameters:
class aiosfstream.PasswordAuthenticator(consumer_key, consumer_secret, username, password, sandbox=False, json_dumps=<function dumps>, json_loads=<function loads>)[source]

Authenticator for using the OAuth 2.0 Username-Password Flow

Parameters:
  • consumer_key (str) – Consumer key from the Salesforce connected app definition
  • consumer_secret (str) – Consumer secret from the Salesforce connected app definition
  • username (str) – Salesforce username
  • password (str) – Salesforce password
  • sandbox (bool) – Marks whether the authentication has to be done for a sandbox org or for a production org
  • json_dumps (Callable[[Dict[str, Any]], str]) – Function for JSON serialization, the default is json.dumps()
  • json_loads (Callable[[str], Dict[str, Any]]) – Function for JSON deserialization, the default is json.loads()
client_id = None

OAuth2 client id

client_secret = None

OAuth2 client secret

password = None

Salesforce password

username = None

Salesforce username

class aiosfstream.RefreshTokenAuthenticator(consumer_key, consumer_secret, refresh_token, sandbox=False, json_dumps=<function dumps>, json_loads=<function loads>)[source]

Authenticator for using the OAuth 2.0 Refresh Token Flow

Parameters:
  • consumer_key (str) – Consumer key from the Salesforce connected app definition
  • consumer_secret (str) – Consumer secret from the Salesforce connected app definition
  • refresh_token (str) – A refresh token obtained from Salesforce by using one of its authentication methods (for example with the OAuth 2.0 Web Server Authentication Flow)
  • sandbox (bool) – Marks whether the authentication has to be done for a sandbox org or for a production org
  • json_dumps (Callable[[Dict[str, Any]], str]) – Function for JSON serialization, the default is json.dumps()
  • json_loads (Callable[[str], Dict[str, Any]]) – Function for JSON deserialization, the default is json.loads()
client_id = None

OAuth2 client id

client_secret = None

OAuth2 client secret

refresh_token = None

Salesforce refresh token

Replay

class aiosfstream.ReplayOption[source]

Replay options supported by Salesforce

ALL_EVENTS = -2

Receive all events, including past events that are within the 24-hour retention window and new events sent after subscription

NEW_EVENTS = -1

Receive new events that are broadcast after the client subscribes

class aiosfstream.ReplayMarker[source]

Bases: tuple

Class for storing a message replay id and its creation date

Create new instance of ReplayMarker(date, replay_id)

date

Creation date of a message, as a ISO 8601 formatted datetime string

replay_id

Replay id of a message

class aiosfstream.ReplayMarkerStorage[source]

Abstract base class for replay marker storage implementations

coroutine get_replay_marker(self, subscription)[source]

Retrieve a stored replay marker for the given subscription

Parameters:subscription (str) – Name of the subscribed channel
Return type:Optional[ReplayMarker]
Returns:A replay marker or None if there is nothing stored for the given subscription
coroutine set_replay_marker(self, subscription, replay_marker)[source]

Store the replay_marker for the given subscription

Parameters:
  • subscription (str) – Name of the subscribed channel
  • replay_marker (ReplayMarker) – A replay marker
Return type:

None

coroutine extract_replay_id(self, message)[source]

Extract and store the replay id present int the message

Parameters:message (Dict[str, Any]) – An incoming broadcast message
Raises:ReplayError – If no creation date can be found in the message
Return type:None
coroutine __call__(self, message)[source]

Return an asynchronous context manager instance for extracting the replay id from the message if no exceptions occur inside the runtime context

Parameters:message (Dict[str, Any]) – An incoming message
Return type:Asynccontextmanager[None]
Returns:An asynchronous context manager
class aiosfstream.MappingStorage(mapping)[source]

Mapping based replay marker storage

Parameters:mapping (Mutablemapping[str, ReplayMarker]) – A MutableMapping object for storing replay markers
class aiosfstream.DefaultMappingStorage(mapping, default_id)[source]

Mapping based replay marker storage which will return a defualt replay id if there is not replay marker for the given subscription

Parameters:
  • mapping (Mutablemapping[str, ReplayMarker]) – A MutableMapping object for storing replay markers
  • default_id (int) – A replay id
class aiosfstream.ConstantReplayId(default_id, **kwargs)[source]

A replay marker storage which will return a constant replay id for every subscription

Note

This implementations doesn’t actually stores anything for later retrieval.

Parameters:default_id (int) – A replay id

Exceptions

Exception types

Exception hierarchy:

AiosfstreamException
    AuthenticationError
    ClientError
        ClientInvalidOperation
    TransportError
        TransportInvalidOperation
        TransportTimeoutError
        TransportConnectionClosed
    ServerError
    ReplayError
exception aiosfstream.exceptions.AiosfstreamException[source]

Base exception type.

All exceptions of the package inherit from this class.

exception aiosfstream.exceptions.AuthenticationError[source]

Authentication failure

exception aiosfstream.exceptions.ClientError[source]

Client side error

exception aiosfstream.exceptions.ClientInvalidOperation[source]

The requested operation can’t be executed on the current state of the client

exception aiosfstream.exceptions.TransportError[source]

Error during the transportation of messages

exception aiosfstream.exceptions.TransportInvalidOperation[source]

The requested operation can’t be executed on the current state of the transport

exception aiosfstream.exceptions.TransportTimeoutError[source]

Transport timeout

exception aiosfstream.exceptions.TransportConnectionClosed[source]

The connection unexpectedly closed

exception aiosfstream.exceptions.ServerError(message, response)[source]

Streaming API server side error

If the response contains an error field it gets parsed according to the specs

Parameters:
  • message (str) – Error description
  • response (dict) – Server response message

If the response contains an error field it gets parsed according to the specs

Parameters:
message

Error description

Return type:str
response

Server response message

Return type:Optional[Dict[str, Any]]
error

Error field in the response

Return type:Optional[str]
error_code

Error code part of the error code part of the error, message field

Return type:Optional[int]
error_args

Arguments part of the error, message field

Return type:Optional[List[str]]
error_message

Description part of the error, message field

Return type:Optional[str]
exception aiosfstream.exceptions.ReplayError[source]

Message replay related error