"""Authenticatior class implementations"""
from abc import abstractmethod
from http import HTTPStatus
import reprlib
import json
from typing import Optional, Tuple
from aiocometd import AuthExtension
from aiocometd.typing import JsonObject, JsonLoader, JsonDumper, Payload, \
Headers
from aiohttp import ClientSession
from aiohttp.client_exceptions import ClientError
from aiosfstream.exceptions import AuthenticationError
TOKEN_URL = "https://login.salesforce.com/services/oauth2/token"
SANDBOX_TOKEN_URL = "https://test.salesforce.com/services/oauth2/token"
# pylint: disable=too-many-instance-attributes
[docs]class AuthenticatorBase(AuthExtension):
"""Abstract base class to serve as a base for implementing concrete
authenticators"""
def __init__(self, sandbox: bool = False,
json_dumps: JsonDumper = json.dumps,
json_loads: JsonLoader = json.loads) -> None:
"""
:param sandbox: Marks whether the authentication has to be done \
for a sandbox org or for a production org
:param json_dumps: Function for JSON serialization, the default is \
:func:`json.dumps`
:param json_loads: Function for JSON deserialization, the default is \
:func:`json.loads`
"""
#: Marks whether the authentication has to be done for a sandbox org \
#: or for a production org
self._sandbox = sandbox
#: Salesforce session ID that can be used with the web services API
self.access_token: Optional[str] = None
#: Value is Bearer for all responses that include an access token
self.token_type: Optional[str] = None
#: A URL indicating the instance of the user’s org
self.instance_url: Optional[str] = None
#: Identity URL that can be used to both identify the user and query \
#: for more information about the user
self.id: Optional[str] = None # pylint: disable=invalid-name
#: Base64-encoded HMAC-SHA256 signature signed with the consumer’s \
#: private key containing the concatenated ID and issued_at. Use to \
#: verify that the identity URL hasn’t changed since the server sent it
self.signature: Optional[str] = None
#: Timestamp when the signature was created
self.issued_at: Optional[str] = None
#: Function for JSON serialization
self.json_dumps = json_dumps
#: Function for JSON deserialization
self.json_loads = json_loads
@property
def _token_url(self) -> str:
"""The URL that should be used for token requests"""
if self._sandbox:
return SANDBOX_TOKEN_URL
return TOKEN_URL
async def outgoing(self, payload: Payload, headers: Headers) -> None:
"""Process outgoing *payload* and *headers*
Called just before a payload is sent to insert the ``Authorization`` \
header value.
:param payload: List of outgoing messages
:param headers: Headers to send
:raise AuthenticationError: If the value of :py:attr:`~token_type` or \
:py:attr:`~access_token` is ``None``. In other words, it's raised if \
the method is called without authenticating first.
"""
if self.token_type is None or self.access_token is None:
raise AuthenticationError("Unknown token_type and access_token "
"values. Method called without "
"authenticating first.")
headers["Authorization"] = self.token_type + " " + self.access_token
async def incoming(self, payload: Payload,
headers: Optional[Headers] = None) -> None:
pass
async def authenticate(self) -> None:
"""Called on initialization and after a failed authentication attempt
:raise AuthenticationError: If the server rejects the authentication \
request or if a network failure occurs
"""
try:
status_code, response_data = await self._authenticate()
except ClientError as error:
raise AuthenticationError("Network request failed") from error
if status_code != HTTPStatus.OK:
self.access_token = None
self.token_type = None
self.instance_url = None
self.id = None
self.signature = None
self.issued_at = None
raise AuthenticationError("Authentication failed", response_data)
self.__dict__.update(response_data)
@abstractmethod
async def _authenticate(self) -> Tuple[int, JsonObject]:
"""Authenticate the user
:return: The status code and response data from the server's response
:raise aiohttp.client_exceptions.ClientError: If a network failure \
occurs
"""
# pylint: enable=too-many-instance-attributes
# pylint: disable=too-many-arguments
[docs]class PasswordAuthenticator(AuthenticatorBase):
"""Authenticator for using the OAuth 2.0 Username-Password Flow"""
def __init__(self, consumer_key: str, consumer_secret: str,
username: str, password: str, sandbox: bool = False,
json_dumps: JsonDumper = json.dumps,
json_loads: JsonLoader = json.loads) -> None:
"""
:param consumer_key: Consumer key from the Salesforce connected \
app definition
:param consumer_secret: Consumer secret from the Salesforce \
connected app definition
:param username: Salesforce username
:param password: Salesforce password
:param sandbox: Marks whether the authentication has to be done \
for a sandbox org or for a production org
:param json_dumps: Function for JSON serialization, the default is \
:func:`json.dumps`
:param json_loads: Function for JSON deserialization, the default is \
:func:`json.loads`
"""
super().__init__(sandbox=sandbox,
json_dumps=json_dumps,
json_loads=json_loads)
#: OAuth2 client id
self.client_id = consumer_key
#: OAuth2 client secret
self.client_secret = consumer_secret
#: Salesforce username
self.username = username
#: Salesforce password
self.password = password
def __repr__(self) -> str:
"""Formal string representation"""
cls_name = type(self).__name__
return f"{cls_name}(consumer_key={reprlib.repr(self.client_id)}," \
f"consumer_secret={reprlib.repr(self.client_secret)}, " \
f"username={reprlib.repr(self.username)}, " \
f"password={reprlib.repr(self.password)})"
async def _authenticate(self) -> Tuple[int, JsonObject]:
async with ClientSession(json_serialize=self.json_dumps) as session:
data = {
"grant_type": "password",
"client_id": self.client_id,
"client_secret": self.client_secret,
"username": self.username,
"password": self.password
}
response = await session.post(self._token_url, data=data)
response_data = await response.json(loads=self.json_loads)
return response.status, response_data
[docs]class RefreshTokenAuthenticator(AuthenticatorBase):
"""Authenticator for using the OAuth 2.0 Refresh Token Flow"""
def __init__(self, consumer_key: str, consumer_secret: str,
refresh_token: str, sandbox: bool = False,
json_dumps: JsonDumper = json.dumps,
json_loads: JsonLoader = json.loads) -> None:
"""
:param consumer_key: Consumer key from the Salesforce connected \
app definition
:param consumer_secret: Consumer secret from the Salesforce \
connected app definition
:param refresh_token: A refresh token obtained from Salesforce \
by using one of its authentication methods (for example with the \
OAuth 2.0 Web Server Authentication Flow)
:param sandbox: Marks whether the authentication has to be done \
for a sandbox org or for a production org
:param json_dumps: Function for JSON serialization, the default is \
:func:`json.dumps`
:param json_loads: Function for JSON deserialization, the default is \
:func:`json.loads`
"""
super().__init__(sandbox=sandbox,
json_dumps=json_dumps,
json_loads=json_loads)
#: OAuth2 client id
self.client_id = consumer_key
#: OAuth2 client secret
self.client_secret = consumer_secret
#: Salesforce refresh token
self.refresh_token = refresh_token
def __repr__(self) -> str:
"""Formal string representation"""
cls_name = type(self).__name__
return f"{cls_name}(consumer_key={reprlib.repr(self.client_id)}," \
f"consumer_secret={reprlib.repr(self.client_secret)}, " \
f"refresh_token={reprlib.repr(self.refresh_token)})"
async def _authenticate(self) -> Tuple[int, JsonObject]:
async with ClientSession(json_serialize=self.json_dumps) as session:
data = {
"grant_type": "refresh_token",
"client_id": self.client_id,
"client_secret": self.client_secret,
"refresh_token": self.refresh_token
}
response = await session.post(self._token_url, data=data)
response_data = await response.json(loads=self.json_loads)
return response.status, response_data
# pylint: enable=too-many-arguments