Quickstart

Authentication

To connect to the Salesforce Streaming API all clients must authenticate themselves. The library supports the username-password based OAuth2 authentication flow as well as the refresh token based authentication.

Whichever technique you end up using, you must first create a Connected App on Salesforce to acquire a Consumer Key and Consumer Secret value. Which are actually the client_id and client_secret parameters in OAuth2 terminology.

Username-Password authentication

For username-password based authentication you can use the SalesforceStreamingClient class, with the Salesforce user’s username and password:

client = SalesforceStreamingClient(
    consumer_key="<consumer key>",
    consumer_secret="<consumer secret>",
    username="<username>",
    password="<password>"
)

SalesforceStreamingClient is actually just a convenience class, based on Client. It enables you to create a client object with the most common authentication technique, without having to create a separate PasswordAuthenticator object. You can actually use the Client class to create client that would be equivalent with the example above:

auth = PasswordAuthenticator(
    consumer_key="<consumer key>",
    consumer_secret="<consumer secret>",
    username="<username>",
    password="<password>"
)
client = Client(auth)

Refresh token authentication

The refresh token base authentication technique can be used by creating a RefreshTokenAuthenticator and passing it to the Client class:

auth = RefreshTokenAuthenticator(
    consumer_key="<consumer key>",
    consumer_secret="<consumer secret>",
    refresh_token="<refresh_token>"
)
client = Client(auth)

You can get a refresh token using several different authentication techniques supported by Salesforce, the most commonly used one is probably the web server authentication flow.

Connecting

After creating a Client object the open() method should be called to establish a connection with the server. The connection is closed and the session is terminated by calling the close() method.

client = SalesforceStreamingClient(
    consumer_key="<consumer key>",
    consumer_secret="<consumer secret>",
    username="<username>",
    password="<password>"
)
await client.open()
# subscribe and receive messsages...
await client.close()

Client objects can be also used as asynchronous context managers.

async with SalesforceStreamingClient(
        consumer_key="<consumer key>",
        consumer_secret="<consumer secret>",
        username="<username>",
        password="<password>") as client:
    # subscribe and receive messsages...

Channels

A channel is a string that looks like a URL path such as /topic/foo or /topic/bar.

For detailed guidance on how to work with PushTopics or how to create Generic Streaming Channels please consult the Streaming API documentation.

Subscriptions

To receive notification messages the client must subscribe to the channels it’s interested in.

await client.subscribe("/topic/foo")

If you no longer want to receive messages from one of the channels you’re subscribed to then you must unsubscribe from the channel.

await client.unsubscribe("/topic/foo")

The current set of subscriptions can be obtained from the Client.subscriptions attribute.

Receiving messages

To receive messages broadcasted by Salesforce after subscribing to these channels the receive() method should be used.

message = await client.receive()

The receive() method will wait until a message is received or it will raise a TransportTimeoutError in case the connection is lost with the server and the client can’t re-establish the connection or a ServerError if the connection gets closed by the server.

The client can also be used as an asynchronous iterator in a for loop to wait for incoming messages.

async for message in client:
    # process message

Replay of events

The great thing about streaming is that the client gets instantly notified about events as they occur. The downside is that if the client becomes temporarily disconnected, due to hardware, software or network failure, then it might miss some of the messages emitted by the server. This is where Salesforce’s message durability comes in handy.

Salesforce stores events for 24 hours. Events outside the 24-hour retention period are discarded. Salesforce extends the event messages with repalyId and createdDate fields (called as ReplayMarker by aiosfstream). These fields can be used by the client to request the missed event messages from the server when it reconnects.

The default behavior of the client is to receive only the new events sent after subscribing. To take advantage of message durability, all you have to do is to pass an object capable of storing the most recent ReplayMarker objects, so the next time the client reconnects, it can continue to process event messages from the point where it left off. The most convenient choice is a Shelf object, which can store ReplayMarkers on the disk, between application restarts.

with shelve.open("replay.db") as replay:

    async with SalesforceStreamingClient(
        consumer_key="<consumer key>",
        consumer_secret="<consumer secret>",
        username="<username>",
        password="<password>",
        replay=replay) as client:

        await client.subscribe("/topic/foo")

        async for message in client:
            # process message

Besides Shelf objects you can pass a lot of different kind of objects to the replay parameter, and you can configure different aspects of replay behavior as well. For a full description of replay configuration options check out the Replay configuration section.