API Reference¶
Client¶
-
class
aiosfstream.
SalesforceStreamingClient
(*, consumer_key, consumer_secret, username, password, replay=<ReplayOption.NEW_EVENTS: -1>, replay_fallback=None, replay_storage_policy=<ReplayMarkerStoragePolicy.AUTOMATIC: 1>, connection_timeout=10.0, max_pending_count=100, sandbox=False, json_dumps=<function dumps>, json_loads=<function loads>, loop=None)[source]¶ Salesforce Streaming API client with username/password authentication
This is a convenience class which is suitable for the most common use case. To use a different authentication method, use the general
Client
class with a differentAuthenticator
Parameters: - consumer_key (
str
) – Consumer key from the Salesforce connected app definition - consumer_secret (
str
) – Consumer secret from the Salesforce connected app definition - username (
str
) – Salesforce username - password (
str
) – Salesforce password - replay (
Union
[ReplayOption
,ReplayMarkerStorage
,Mutablemapping
[str
,ReplayMarker
]]) – A ReplayOption or an object capable of storing replay ids if you want to take advantage of Salesforce’s replay extension. You can use one of theReplayOptions
, or an object that supports the MutableMapping protocol likedict
,defaultdict
,Shelf
etc. or a customReplayMarkerStorage
implementation. - replay_fallback (
Optional
[ReplayOption
]) – Replay fallback policy, for when a subscribe operation fails because a replay id was specified for a message outside the retention window - replay_storage_policy (
ReplayMarkerStoragePolicy
) – Defines at which point the replay marker of received messages will be stored - connection_timeout (
Union
[int
,float
]) – The maximum amount of time to wait for the transport to re-establish a connection with the server when the connection fails. - max_pending_count (
int
) – The maximum number of messages to prefetch from the server. If the number of prefetched messages reach this size then the connection will be suspended, until messages are consumed. If it is less than or equal to zero, the count is infinite. - sandbox (
bool
) – Marks whether the connection has to be made with a sandbox org or with a production org - json_dumps (
Callable
[[Dict
[str
,Any
]],str
]) – Function for JSON serialization, the default isjson.dumps()
- json_loads (
Callable
[[str
],Dict
[str
,Any
]]) – Function for JSON deserialization, the default isjson.loads()
- loop (
Optional
[AbstractEventLoop
]) – Eventloop
used to schedule tasks. If loop isNone
thenasyncio.get_event_loop()
is used to get the default event loop.
- consumer_key (
-
class
aiosfstream.
Client
(authenticator, *, replay=<ReplayOption.NEW_EVENTS: -1>, replay_fallback=None, replay_storage_policy=<ReplayMarkerStoragePolicy.AUTOMATIC: 1>, connection_timeout=10.0, max_pending_count=100, json_dumps=<function dumps>, json_loads=<function loads>, loop=None)[source]¶ Salesforce Streaming API client
Parameters: - authenticator (
AuthenticatorBase
) – An authenticator object - replay (
Union
[ReplayOption
,ReplayMarkerStorage
,Mutablemapping
[str
,ReplayMarker
]]) – A ReplayOption or an object capable of storing replay ids if you want to take advantage of Salesforce’s replay extension. You can use one of theReplayOptions
, or an object that supports the MutableMapping protocol likedict
,defaultdict
,Shelf
etc. or a customReplayMarkerStorage
implementation. - replay_fallback (
Optional
[ReplayOption
]) – Replay fallback policy, for when a subscribe operation fails because a replay id was specified for a message outside the retention window - replay_storage_policy (
ReplayMarkerStoragePolicy
) – Defines at which point the replay marker of received messages will be stored - connection_timeout (
Union
[int
,float
]) – The maximum amount of time to wait for the transport to re-establish a connection with the server when the connection fails. - max_pending_count (
int
) – The maximum number of messages to prefetch from the server. If the number of prefetched messages reach this size then the connection will be suspended, until messages are consumed. If it is less than or equal to zero, the count is infinite. - json_dumps (
Callable
[[Dict
[str
,Any
]],str
]) – Function for JSON serialization, the default isjson.dumps()
- json_loads (
Callable
[[str
],Dict
[str
,Any
]]) – Function for JSON deserialization, the default isjson.loads()
- loop (
Optional
[AbstractEventLoop
]) – Eventloop
used to schedule tasks. If loop isNone
thenasyncio.get_event_loop()
is used to get the default event loop.
-
coroutine
open
(self)[source]¶ Establish a connection with the Streaming API endpoint
Raises: - ClientError – If none of the connection types offered by the server are supported
- ClientInvalidOperation – If the client is already open, or in other words if it isn’t
closed
- TransportError – If a network or transport related error occurs
- ServerError – If the handshake or the first connect request gets rejected by the server.
- AuthenticationError – If the server rejects the authentication request or if a network failure occurs during the authentication
Return type: None
-
coroutine
publish
(self, channel, data)[source]¶ Publish data to the given channel
Warning
The Streaming API is implemented on top of CometD. The publish operation is a CometD operation. While it’s still a legal operation, Salesforce chose not to implement the publishing of Generic Streaming and Platform events with CometD.
You should use the REST API to generate Generic Streaming events, or use the REST or SOAP API to publish Platform events.
Parameters: Return type: Returns: Publish response
Raises: - ClientInvalidOperation – If the client is
closed
- TransportError – If a network or transport related error occurs
- ServerError – If the publish request gets rejected by the server
- ClientInvalidOperation – If the client is
-
coroutine
subscribe
(self, channel)[source]¶ Subscribe to channel
Parameters: channel (
str
) – Name of the channelRaises: - ClientInvalidOperation – If the client is
closed
- TransportError – If a network or transport related error occurs
- ServerError – If the subscribe request gets rejected by the server
Return type: None
- ClientInvalidOperation – If the client is
-
coroutine
unsubscribe
(self, channel)[source]¶ Unsubscribe from channel
Parameters: channel (
str
) – Name of the channelRaises: - ClientInvalidOperation – If the client is
closed
- TransportError – If a network or transport related error occurs
- ServerError – If the unsubscribe request gets rejected by the server
Return type: None
- ClientInvalidOperation – If the client is
-
coroutine
receive
(self)[source]¶ Wait for incoming messages from the server
Return type: Returns: Incoming message
Raises: - ClientInvalidOperation – If the client is closed, and has no more pending incoming messages
- ServerError – If the client receives a confirmation message which is not
successful
- TransportTimeoutError – If the transport can’t re-establish connection with the server in
connection_timeout
time. - ReplayError – On a message replay or replay marker storage related error
-
connection_type
¶ The current connection type in use if the client is open, otherwise
None
Return type: Optional
[ConnectionType
]
-
pending_count
¶ The number of pending incoming messages
Once
open
is called the client starts listening for messages from the server. The incoming messages are retrieved and stored in an internal queue until they get consumed by callingreceive
.Return type: int
-
coroutine
close
(self)[source] Disconnect from the CometD server
Return type: None
-
static
create_replay_storage
(replay_param)[source]¶ Create a
ReplayMarkerStorage
object based from replay_paramParameters: replay_param ( Union
[ReplayOption
,ReplayMarkerStorage
,Mutablemapping
[str
,ReplayMarker
]]) – One of the supported replay_param type objectsReturn type: Optional
[ReplayMarkerStorage
]Returns: A new ReplayMarkerStorage
object or replay_param if it’s already an instance ofReplayMarkerStorage
object, or None if replay_param is None
-
static
get_cometd_url
(instance_url)[source]¶ Get the CometD URL associated with the instance_url
Parameters: instance_url ( str
) – Salesforce instance URLReturn type: str
Returns: CometD URL associated with the instance_url
-
coroutine
open
(self)[source] Establish a connection with the Streaming API endpoint
Raises: - ClientError – If none of the connection types offered by the server are supported
- ClientInvalidOperation – If the client is already open, or in other words if it isn’t
closed
- TransportError – If a network or transport related error occurs
- ServerError – If the handshake or the first connect request gets rejected by the server.
- AuthenticationError – If the server rejects the authentication request or if a network failure occurs during the authentication
Return type: None
-
coroutine
publish
(self, channel, data)[source] Publish data to the given channel
Warning
The Streaming API is implemented on top of CometD. The publish operation is a CometD operation. While it’s still a legal operation, Salesforce chose not to implement the publishing of Generic Streaming and Platform events with CometD.
You should use the REST API to generate Generic Streaming events, or use the REST or SOAP API to publish Platform events.
Parameters: Return type: Returns: Publish response
Raises: - ClientInvalidOperation – If the client is
closed
- TransportError – If a network or transport related error occurs
- ServerError – If the publish request gets rejected by the server
- ClientInvalidOperation – If the client is
-
coroutine
receive
(self)[source] Wait for incoming messages from the server
Return type: Returns: Incoming message
Raises: - ClientInvalidOperation – If the client is closed, and has no more pending incoming messages
- ServerError – If the client receives a confirmation message which is not
successful
- TransportTimeoutError – If the transport can’t re-establish connection with the server in
connection_timeout
time. - ReplayError – On a message replay or replay marker storage related error
-
replay_fallback
= None¶ Replay fallback policy, for when a subscribe operation fails because a replay id was specified for a message outside the retention window
-
replay_storage
= None¶ ReplayMarkerStorage
instance capable of storingReplayMarker
objects
-
replay_storage_policy
= None¶ Defines at which point the
ReplayMarker
of received messages will be stored
-
coroutine
subscribe
(self, channel)[source] Subscribe to channel
Parameters: channel (
str
) – Name of the channelRaises: - ClientInvalidOperation – If the client is
closed
- TransportError – If a network or transport related error occurs
- ServerError – If the subscribe request gets rejected by the server
Return type: None
- ClientInvalidOperation – If the client is
-
coroutine
unsubscribe
(self, channel)[source] Unsubscribe from channel
Parameters: channel (
str
) – Name of the channelRaises: - ClientInvalidOperation – If the client is
closed
- TransportError – If a network or transport related error occurs
- ServerError – If the unsubscribe request gets rejected by the server
Return type: None
- ClientInvalidOperation – If the client is
- authenticator (
-
class
aiosfstream.
ReplayMarkerStoragePolicy
[source]¶ Defines the available replay marker storage policies
-
AUTOMATIC
= 1¶ Store the replay marker of messages automatically, as soon as they’re received. The downside of this approach is that the replay marker of a message will be stored (thus marking it as successfully consumed) even if the processing of the message fails in the client side code
-
MANUAL
= 2¶ Store the replay marker of messages manually, after the message has been successfully processed in client side code
-
Authenticators¶
-
class
aiosfstream.auth.
AuthenticatorBase
(sandbox=False, json_dumps=<function dumps>, json_loads=<function loads>)[source]¶ Abstract base class to serve as a base for implementing concrete authenticators
Parameters: - sandbox (
bool
) – Marks whether the authentication has to be done for a sandbox org or for a production org - json_dumps (
Callable
[[Dict
[str
,Any
]],str
]) – Function for JSON serialization, the default isjson.dumps()
- json_loads (
Callable
[[str
],Dict
[str
,Any
]]) – Function for JSON deserialization, the default isjson.loads()
- sandbox (
-
class
aiosfstream.
PasswordAuthenticator
(consumer_key, consumer_secret, username, password, sandbox=False, json_dumps=<function dumps>, json_loads=<function loads>)[source]¶ Authenticator for using the OAuth 2.0 Username-Password Flow
Parameters: - consumer_key (
str
) – Consumer key from the Salesforce connected app definition - consumer_secret (
str
) – Consumer secret from the Salesforce connected app definition - username (
str
) – Salesforce username - password (
str
) – Salesforce password - sandbox (
bool
) – Marks whether the authentication has to be done for a sandbox org or for a production org - json_dumps (
Callable
[[Dict
[str
,Any
]],str
]) – Function for JSON serialization, the default isjson.dumps()
- json_loads (
Callable
[[str
],Dict
[str
,Any
]]) – Function for JSON deserialization, the default isjson.loads()
-
client_id
= None¶ OAuth2 client id
-
client_secret
= None¶ OAuth2 client secret
-
password
= None¶ Salesforce password
-
username
= None¶ Salesforce username
- consumer_key (
-
class
aiosfstream.
RefreshTokenAuthenticator
(consumer_key, consumer_secret, refresh_token, sandbox=False, json_dumps=<function dumps>, json_loads=<function loads>)[source]¶ Authenticator for using the OAuth 2.0 Refresh Token Flow
Parameters: - consumer_key (
str
) – Consumer key from the Salesforce connected app definition - consumer_secret (
str
) – Consumer secret from the Salesforce connected app definition - refresh_token (
str
) – A refresh token obtained from Salesforce by using one of its authentication methods (for example with the OAuth 2.0 Web Server Authentication Flow) - sandbox (
bool
) – Marks whether the authentication has to be done for a sandbox org or for a production org - json_dumps (
Callable
[[Dict
[str
,Any
]],str
]) – Function for JSON serialization, the default isjson.dumps()
- json_loads (
Callable
[[str
],Dict
[str
,Any
]]) – Function for JSON deserialization, the default isjson.loads()
-
client_id
= None¶ OAuth2 client id
-
client_secret
= None¶ OAuth2 client secret
-
refresh_token
= None¶ Salesforce refresh token
- consumer_key (
Replay¶
-
class
aiosfstream.
ReplayOption
[source]¶ Replay options supported by Salesforce
-
ALL_EVENTS
= -2¶ Receive all events, including past events that are within the 24-hour retention window and new events sent after subscription
-
NEW_EVENTS
= -1¶ Receive new events that are broadcast after the client subscribes
-
-
class
aiosfstream.
ReplayMarker
[source]¶ Bases:
tuple
Class for storing a message replay id and its creation date
Create new instance of ReplayMarker(date, replay_id)
-
date
¶ Creation date of a message, as a ISO 8601 formatted datetime string or a unix timestamp as a string
-
replay_id
¶ Replay id of a message
-
-
class
aiosfstream.
ReplayMarkerStorage
[source]¶ Abstract base class for replay marker storage implementations
-
coroutine
get_replay_marker
(self, subscription)[source]¶ Retrieve a stored replay marker for the given subscription
Parameters: subscription ( str
) – Name of the subscribed channelReturn type: Optional
[ReplayMarker
]Returns: A replay marker or None
if there is nothing stored for the given subscription
-
coroutine
set_replay_marker
(self, subscription, replay_marker)[source]¶ Store the replay_marker for the given subscription
Parameters: - subscription (
str
) – Name of the subscribed channel - replay_marker (
ReplayMarker
) – A replay marker
Return type: None
- subscription (
-
coroutine
extract_replay_id
(self, message)[source]¶ Extract and store the replay id present int the message
Parameters: message ( Dict
[str
,Any
]) – An incoming broadcast messageRaises: ReplayError – If no creation date can be found in the message Return type: None
-
coroutine
__call__
(self, message)[source]¶ Return an asynchronous context manager instance for extracting the replay id from the message if no exceptions occur inside the runtime context
Parameters: message ( Dict
[str
,Any
]) – An incoming messageReturn type: Asynccontextmanager
[None
]Returns: An asynchronous context manager
-
coroutine
-
class
aiosfstream.
MappingStorage
(mapping)[source]¶ Mapping based replay marker storage
Parameters: mapping ( Mutablemapping
[str
,ReplayMarker
]) – A MutableMapping object for storing replay markers
Exceptions¶
Exception types
Exception hierarchy:
AiosfstreamException
AuthenticationError
ClientError
ClientInvalidOperation
TransportError
TransportInvalidOperation
TransportTimeoutError
TransportConnectionClosed
ServerError
ReplayError
-
exception
aiosfstream.exceptions.
AiosfstreamException
[source]¶ Base exception type.
All exceptions of the package inherit from this class.
-
exception
aiosfstream.exceptions.
ClientInvalidOperation
[source]¶ The requested operation can’t be executed on the current state of the client
-
exception
aiosfstream.exceptions.
TransportError
[source]¶ Error during the transportation of messages
-
exception
aiosfstream.exceptions.
TransportInvalidOperation
[source]¶ The requested operation can’t be executed on the current state of the transport
-
exception
aiosfstream.exceptions.
TransportConnectionClosed
[source]¶ The connection unexpectedly closed
-
exception
aiosfstream.exceptions.
ServerError
(message, response)[source]¶ Streaming API server side error
If the response contains an error field it gets parsed according to the specs
Parameters: If the response contains an error field it gets parsed according to the specs
Parameters: