Quickstart¶
Authentication¶
To connect to the Salesforce Streaming API all clients must authenticate themselves. The library supports the username-password based OAuth2 authentication flow as well as the refresh token based authentication.
Whichever technique you end up using, you must first create a Connected App on Salesforce to acquire a Consumer Key and Consumer Secret value. Which are actually the client_id and client_secret parameters in OAuth2 terminology.
Username-Password authentication¶
For username-password based authentication you can use the
SalesforceStreamingClient
class, with the Salesforce user’s
username and password:
client = SalesforceStreamingClient(
consumer_key="<consumer key>",
consumer_secret="<consumer secret>",
username="<username>",
password="<password>"
)
SalesforceStreamingClient
is actually just a convenience class,
based on Client
. It enables you to create a client object
with the most common authentication technique, without having to create a
separate PasswordAuthenticator
object. You can actually use the
Client
class to create client that would be equivalent with the
example above:
auth = PasswordAuthenticator(
consumer_key="<consumer key>",
consumer_secret="<consumer secret>",
username="<username>",
password="<password>"
)
client = Client(auth)
Refresh token authentication¶
The refresh token base authentication technique can be used by creating
a RefreshTokenAuthenticator
and passing it to the Client
class:
auth = RefreshTokenAuthenticator(
consumer_key="<consumer key>",
consumer_secret="<consumer secret>",
refresh_token="<refresh_token>"
)
client = Client(auth)
You can get a refresh token using several different authentication techniques supported by Salesforce, the most commonly used one is probably the web server authentication flow.
Authentication on sandbox orgs¶
If you’re trying to connect to a sandbox org, then you have to assign
True
to the sandbox
parameter when creating the
SalesforceStreamingClient
, PasswordAuthenticator
or
RefreshTokenAuthenticator
object. Furthermore, the name of the
sandbox should be appended to the username. For example, if a username for a
production org is user1@acme.com, and the sandbox is named test, the modified
username to log in to the sandbox is user1@acme.com.test.
client = SalesforceStreamingClient(
consumer_key="<consumer key>",
consumer_secret="<consumer secret>",
username="<username>.<sandbox_name>",
password="<password>",
sandbox=True
)
Connecting¶
After creating a Client
object the open()
method
should be called to establish a connection with the server. The connection is
closed and the session is terminated by calling the close()
method.
client = SalesforceStreamingClient(
consumer_key="<consumer key>",
consumer_secret="<consumer secret>",
username="<username>",
password="<password>"
)
await client.open()
# subscribe and receive messsages...
await client.close()
Client
objects can be also used as asynchronous context managers.
async with SalesforceStreamingClient(
consumer_key="<consumer key>",
consumer_secret="<consumer secret>",
username="<username>",
password="<password>") as client:
# subscribe and receive messsages...
Channels¶
A channel is a string that looks like a URL path such as /topic/foo
or
/topic/bar
.
For detailed guidance on how to work with PushTopics or how to create Generic Streaming Channels please consult the Streaming API documentation.
Subscriptions¶
To receive notification messages the client must subscribe to the channels it’s interested in.
await client.subscribe("/topic/foo")
If you no longer want to receive messages from one of the channels you’re subscribed to then you must unsubscribe from the channel.
await client.unsubscribe("/topic/foo")
The current set of subscriptions can be obtained from the
Client.subscriptions
attribute.
Receiving messages¶
To receive messages broadcasted by Salesforce after
subscribing to these channels the
receive()
method should be used.
message = await client.receive()
The receive()
method will wait until a message is received
or it will raise a TransportTimeoutError
in case the
connection is lost with the server and the client can’t re-establish the
connection or a ServerError
if the connection gets
closed by the server.
The client can also be used as an asynchronous iterator in a for loop to wait for incoming messages.
async for message in client:
# process message
Replay of events¶
The great thing about streaming is that the client gets instantly notified about events as they occur. The downside is that if the client becomes temporarily disconnected, due to hardware, software or network failure, then it might miss some of the messages emitted by the server. This is where Salesforce’s message durability comes in handy.
Salesforce stores events for 24 hours. Events outside the 24-hour retention
period are discarded. Salesforce extends the event messages with repalyId
and createdDate
fields (called as ReplayMarker
by aiosfstream).
These fields can be used by the client to request the missed event messages
from the server when it reconnects.
The default behavior of the client is to receive only the new events sent after
subscribing. To take advantage of message durability, all you have to do is to
pass an object capable of storing the most recent ReplayMarker
objects, so the next time the client reconnects, it can continue to process
event messages from the point where it left off. The most convenient
choice is a Shelf
object, which can store
ReplayMarkers
on the disk, between application
restarts.
with shelve.open("replay.db") as replay:
async with SalesforceStreamingClient(
consumer_key="<consumer key>",
consumer_secret="<consumer secret>",
username="<username>",
password="<password>",
replay=replay) as client:
await client.subscribe("/topic/foo")
async for message in client:
# process message
Besides Shelf
objects you can pass a lot of different kind
of objects to the replay parameter, and you can configure different aspects of
replay behavior as well. For a full description of replay configuration
options check out the Replay configuration section.