Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions config/clients/python/template/src/credentials.py.mustache
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class CredentialConfiguration:
:param api_token: Bearer token to be sent for authentication
:param api_audience: API audience used for OAuth2
:param api_issuer: API issuer used for OAuth2
:param scopes: OAuth2 scopes to request, can be a list of strings or a space-separated string
"""

def __init__(
Expand All @@ -28,12 +29,14 @@ class CredentialConfiguration:
api_audience: str | None = None,
api_issuer: str | None = None,
api_token: str | None = None,
scopes: str | list[str] | None = None,
):
self._client_id = client_id
self._client_secret = client_secret
self._api_audience = api_audience
self._api_issuer = api_issuer
self._api_token = api_token
self._scopes = scopes


@property
Expand Down Expand Up @@ -106,6 +109,20 @@ class CredentialConfiguration:
"""
self._api_token = value

@property
def scopes(self):
"""
Return the scopes configured
"""
return self._scopes

@scopes.setter
def scopes(self, value):
"""
Update the scopes
"""
self._scopes = value


class Credentials:
"""
Expand Down
7 changes: 7 additions & 0 deletions config/clients/python/template/src/oauth2.py.mustache
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@ class OAuth2Client:
'grant_type': "client_credentials",
}

# Add scope parameter if scopes are configured
if configuration.scopes is not None:
if isinstance(configuration.scopes, list):
post_params["scope"] = " ".join(configuration.scopes)
else:
post_params["scope"] = configuration.scopes

headers = urllib3.response.HTTPHeaderDict({'Accept': 'application/json', 'Content-Type': 'application/x-www-form-urlencoded', 'User-Agent': 'openfga-sdk (python) {{packageVersion}}'})

max_retry = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class OAuth2Client:
"""
configuration = self._credentials.configuration

token_url = f'https://{configuration.api_issuer}/oauth/token'
token_url = self._credentials._parse_issuer(configuration.api_issuer)

post_params = {
'client_id': configuration.client_id,
Expand All @@ -69,6 +69,13 @@ class OAuth2Client:
'grant_type': "client_credentials",
}

# Add scope parameter if scopes are configured
if configuration.scopes is not None:
if isinstance(configuration.scopes, list):
post_params["scope"] = " ".join(configuration.scopes)
else:
post_params["scope"] = configuration.scopes

headers = urllib3.response.HTTPHeaderDict({'Accept': 'application/json', 'Content-Type': 'application/x-www-form-urlencoded', 'User-Agent': 'openfga-sdk (python) {{packageVersion}}'})

max_retry = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,42 @@ class TestCredentials(IsolatedAsyncioTestCase):
credential.validate_credentials_config()
self.assertEqual(credential.method, 'client_credentials')

def test_configuration_client_credentials_with_scopes_list(self):
"""
Test credential with method client_credentials and scopes as list is valid
"""
credential = Credentials(
method="client_credentials",
configuration=CredentialConfiguration(
client_id="myclientid",
client_secret="mysecret",
api_issuer="issuer.{{sampleApiDomain}}",
api_audience="myaudience",
scopes=["read", "write", "admin"],
),
)
credential.validate_credentials_config()
self.assertEqual(credential.method, "client_credentials")
self.assertEqual(credential.configuration.scopes, ["read", "write", "admin"])

def test_configuration_client_credentials_with_scopes_string(self):
"""
Test credential with method client_credentials and scopes as string is valid
"""
credential = Credentials(
method="client_credentials",
configuration=CredentialConfiguration(
client_id="myclientid",
client_secret="mysecret",
api_issuer="issuer.{{sampleApiDomain}}",
api_audience="myaudience",
scopes="read write admin",
),
)
credential.validate_credentials_config()
self.assertEqual(credential.method, "client_credentials")
self.assertEqual(credential.configuration.scopes, "read write admin")

def test_configuration_client_credentials_missing_config(self):
"""
Test credential with method client_credentials and configuration is missing
Expand Down Expand Up @@ -131,7 +167,6 @@ class TestCredentials(IsolatedAsyncioTestCase):
with self.assertRaises(openfga_sdk.ApiValueError):
credential.validate_credentials_config()


class TestCredentialsIssuer(IsolatedAsyncioTestCase):
def setUp(self):
# Setup a basic configuration that can be modified per test case
Expand Down
114 changes: 114 additions & 0 deletions config/clients/python/template/test/oauth2_test.py.mustache
Original file line number Diff line number Diff line change
Expand Up @@ -484,3 +484,117 @@ This is not a JSON response
},
)
await rest_client.close()

@patch.object(rest.RESTClientObject, "request")
async def test_get_authentication_obtain_client_credentials_with_scopes_list(self, mock_request):
"""
Test getting authentication header when method is client credentials with scopes as list
"""
response_body = """
{
"expires_in": 120,
"access_token": "AABBCCDD"
}
"""
mock_request.return_value = mock_response(response_body, 200)

credentials = Credentials(
method="client_credentials",
configuration=CredentialConfiguration(
client_id="myclientid",
client_secret="mysecret",
api_issuer="issuer.fga.example",
api_audience="myaudience",
scopes=["read", "write", "admin"],
),
)
rest_client = rest.RESTClientObject(Configuration())
current_time = datetime.now()
client = OAuth2Client(credentials)
auth_header = await client.get_authentication_header(rest_client)
self.assertEqual(auth_header, {"Authorization": "Bearer AABBCCDD"})
self.assertEqual(client._access_token, "AABBCCDD")
self.assertGreaterEqual(
client._access_expiry_time, current_time + timedelta(seconds=120)
)
expected_header = urllib3.response.HTTPHeaderDict(
{
"Accept": "application/json",
"Content-Type": "application/x-www-form-urlencoded",
"User-Agent": "openfga-sdk (python) {{packageVersion}}",
}
)
mock_request.assert_called_once_with(
method="POST",
url="https://issuer.fga.example/oauth/token",
headers=expected_header,
query_params=None,
body=None,
_preload_content=True,
_request_timeout=None,
post_params={
"client_id": "myclientid",
"client_secret": "mysecret",
"audience": "myaudience",
"grant_type": "client_credentials",
"scope": "read write admin",
},
)
await rest_client.close()

@patch.object(rest.RESTClientObject, "request")
async def test_get_authentication_obtain_client_credentials_with_scopes_string(self, mock_request):
"""
Test getting authentication header when method is client credentials with scopes as string
"""
response_body = """
{
"expires_in": 120,
"access_token": "AABBCCDD"
}
"""
mock_request.return_value = mock_response(response_body, 200)

credentials = Credentials(
method="client_credentials",
configuration=CredentialConfiguration(
client_id="myclientid",
client_secret="mysecret",
api_issuer="issuer.fga.example",
api_audience="myaudience",
scopes="read write admin",
),
)
rest_client = rest.RESTClientObject(Configuration())
current_time = datetime.now()
client = OAuth2Client(credentials)
auth_header = await client.get_authentication_header(rest_client)
self.assertEqual(auth_header, {"Authorization": "Bearer AABBCCDD"})
self.assertEqual(client._access_token, "AABBCCDD")
self.assertGreaterEqual(
client._access_expiry_time, current_time + timedelta(seconds=120)
)
expected_header = urllib3.response.HTTPHeaderDict(
{
"Accept": "application/json",
"Content-Type": "application/x-www-form-urlencoded",
"User-Agent": "openfga-sdk (python) {{packageVersion}}",
}
)
mock_request.assert_called_once_with(
method="POST",
url="https://issuer.fga.example/oauth/token",
headers=expected_header,
query_params=None,
body=None,
_preload_content=True,
_request_timeout=None,
post_params={
"client_id": "myclientid",
"client_secret": "mysecret",
"audience": "myaudience",
"grant_type": "client_credentials",
"scope": "read write admin",
},
)
await rest_client.close()
116 changes: 115 additions & 1 deletion config/clients/python/template/test/sync/oauth2_test.py.mustache
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,120 @@ class TestOAuth2Client(IsolatedAsyncioTestCase):
)
rest_client.close()

@patch.object(rest.RESTClientObject, "request")
def test_get_authentication_obtain_client_credentials_with_scopes_list(self, mock_request):
"""
Test getting authentication header when method is client credentials with scopes as list
"""
response_body = """
{
"expires_in": 120,
"access_token": "AABBCCDD"
}
"""
mock_request.return_value = mock_response(response_body, 200)

credentials = Credentials(
method="client_credentials",
configuration=CredentialConfiguration(
client_id="myclientid",
client_secret="mysecret",
api_issuer="issuer.fga.example",
api_audience="myaudience",
scopes=["read", "write", "admin"],
),
)
rest_client = rest.RESTClientObject(Configuration())
current_time = datetime.now()
client = OAuth2Client(credentials)
auth_header = client.get_authentication_header(rest_client)
self.assertEqual(auth_header, {"Authorization": "Bearer AABBCCDD"})
self.assertEqual(client._access_token, "AABBCCDD")
self.assertGreaterEqual(
client._access_expiry_time, current_time + timedelta(seconds=120)
)
expected_header = urllib3.response.HTTPHeaderDict(
{
"Accept": "application/json",
"Content-Type": "application/x-www-form-urlencoded",
"User-Agent": "openfga-sdk (python) {{packageVersion}}",
}
)
mock_request.assert_called_once_with(
method="POST",
url="https://issuer.fga.example/oauth/token",
headers=expected_header,
query_params=None,
body=None,
_preload_content=True,
_request_timeout=None,
post_params={
"client_id": "myclientid",
"client_secret": "mysecret",
"audience": "myaudience",
"grant_type": "client_credentials",
"scope": "read write admin",
},
)
rest_client.close()

@patch.object(rest.RESTClientObject, "request")
def test_get_authentication_obtain_client_credentials_with_scopes_string(self, mock_request):
"""
Test getting authentication header when method is client credentials with scopes as string
"""
response_body = """
{
"expires_in": 120,
"access_token": "AABBCCDD"
}
"""
mock_request.return_value = mock_response(response_body, 200)

credentials = Credentials(
method="client_credentials",
configuration=CredentialConfiguration(
client_id="myclientid",
client_secret="mysecret",
api_issuer="issuer.fga.example",
api_audience="myaudience",
scopes="read write admin",
),
)
rest_client = rest.RESTClientObject(Configuration())
current_time = datetime.now()
client = OAuth2Client(credentials)
auth_header = client.get_authentication_header(rest_client)
self.assertEqual(auth_header, {"Authorization": "Bearer AABBCCDD"})
self.assertEqual(client._access_token, "AABBCCDD")
self.assertGreaterEqual(
client._access_expiry_time, current_time + timedelta(seconds=120)
)
expected_header = urllib3.response.HTTPHeaderDict(
{
"Accept": "application/json",
"Content-Type": "application/x-www-form-urlencoded",
"User-Agent": "openfga-sdk (python) {{packageVersion}}",
}
)
mock_request.assert_called_once_with(
method="POST",
url="https://issuer.fga.example/oauth/token",
headers=expected_header,
query_params=None,
body=None,
_preload_content=True,
_request_timeout=None,
post_params={
"client_id": "myclientid",
"client_secret": "mysecret",
"audience": "myaudience",
"grant_type": "client_credentials",
"scope": "read write admin",
},
)
rest_client.close()

@patch.object(rest.RESTClientObject, "request")
def test_get_authentication_obtain_client_credentials_failed(self, mock_request):
"""
Expand Down Expand Up @@ -212,7 +326,7 @@ This is not a JSON response
rest_client.close()

@patch.object(rest.RESTClientObject, "request")
async def test_get_authentication_retries_5xx_responses(self, mock_request):
def test_get_authentication_retries_5xx_responses(self, mock_request):
"""
Receiving a 5xx response from the server should be retried
"""
Expand Down
Loading