Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions openfga_sdk/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class CredentialConfiguration:
:param api_token: Bearer token to be sent for authentication
:param api_audience: API audience used for OAuth2
:param api_issuer: API issuer used for OAuth2
:param scopes: OAuth2 scopes to request, can be a list of strings or a space-separated string
"""

def __init__(
Expand All @@ -39,12 +40,14 @@ def __init__(
api_audience: str | None = None,
api_issuer: str | None = None,
api_token: str | None = None,
scopes: str | list[str] | None = None,
):
self._client_id = client_id
self._client_secret = client_secret
self._api_audience = api_audience
self._api_issuer = api_issuer
self._api_token = api_token
self._scopes = scopes

@property
def client_id(self):
Expand Down Expand Up @@ -116,6 +119,20 @@ def api_token(self, value):
"""
self._api_token = value

@property
def scopes(self):
"""
Return the scopes configured
"""
return self._scopes

@scopes.setter
def scopes(self, value):
"""
Update the scopes
"""
self._scopes = value


class Credentials:
"""
Expand Down
7 changes: 7 additions & 0 deletions openfga_sdk/oauth2.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@ async def _obtain_token(self, client):
"grant_type": "client_credentials",
}

# Add scope parameter if scopes are configured
if configuration.scopes is not None:
if isinstance(configuration.scopes, list):
post_params["scope"] = " ".join(configuration.scopes)
else:
post_params["scope"] = configuration.scopes

headers = urllib3.response.HTTPHeaderDict(
{
"Accept": "application/json",
Expand Down
7 changes: 7 additions & 0 deletions openfga_sdk/sync/oauth2.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@ def _obtain_token(self, client):
"grant_type": "client_credentials",
}

# Add scope parameter if scopes are configured
if configuration.scopes is not None:
if isinstance(configuration.scopes, list):
post_params["scope"] = " ".join(configuration.scopes)
else:
post_params["scope"] = configuration.scopes

headers = urllib3.response.HTTPHeaderDict(
{
"Accept": "application/json",
Expand Down
36 changes: 36 additions & 0 deletions test/credentials_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,42 @@ def test_configuration_client_credentials(self):
credential.validate_credentials_config()
self.assertEqual(credential.method, "client_credentials")

def test_configuration_client_credentials_with_scopes_list(self):
"""
Test credential with method client_credentials and scopes as list is valid
"""
credential = Credentials(
method="client_credentials",
configuration=CredentialConfiguration(
client_id="myclientid",
client_secret="mysecret",
api_issuer="issuer.fga.example",
api_audience="myaudience",
scopes=["read", "write", "admin"],
),
)
credential.validate_credentials_config()
self.assertEqual(credential.method, "client_credentials")
self.assertEqual(credential.configuration.scopes, ["read", "write", "admin"])

def test_configuration_client_credentials_with_scopes_string(self):
"""
Test credential with method client_credentials and scopes as string is valid
"""
credential = Credentials(
method="client_credentials",
configuration=CredentialConfiguration(
client_id="myclientid",
client_secret="mysecret",
api_issuer="issuer.fga.example",
api_audience="myaudience",
scopes="read write admin",
),
)
credential.validate_credentials_config()
self.assertEqual(credential.method, "client_credentials")
self.assertEqual(credential.configuration.scopes, "read write admin")

def test_configuration_client_credentials_missing_config(self):
"""
Test credential with method client_credentials and configuration is missing
Expand Down
114 changes: 114 additions & 0 deletions test/oauth2_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,3 +494,117 @@ async def test_get_authentication_add_scheme_and_path(self, mock_request):
},
)
await rest_client.close()

@patch.object(rest.RESTClientObject, "request")
async def test_get_authentication_obtain_client_credentials_with_scopes_list(self, mock_request):
"""
Test getting authentication header when method is client credentials with scopes as list
"""
response_body = """
{
"expires_in": 120,
"access_token": "AABBCCDD"
}
"""
mock_request.return_value = mock_response(response_body, 200)

credentials = Credentials(
method="client_credentials",
configuration=CredentialConfiguration(
client_id="myclientid",
client_secret="mysecret",
api_issuer="issuer.fga.example",
api_audience="myaudience",
scopes=["read", "write", "admin"],
),
)
rest_client = rest.RESTClientObject(Configuration())
current_time = datetime.now()
client = OAuth2Client(credentials)
auth_header = await client.get_authentication_header(rest_client)
self.assertEqual(auth_header, {"Authorization": "Bearer AABBCCDD"})
self.assertEqual(client._access_token, "AABBCCDD")
self.assertGreaterEqual(
client._access_expiry_time, current_time + timedelta(seconds=120)
)
expected_header = urllib3.response.HTTPHeaderDict(
{
"Accept": "application/json",
"Content-Type": "application/x-www-form-urlencoded",
"User-Agent": "openfga-sdk (python) 0.9.5",
}
)
mock_request.assert_called_once_with(
method="POST",
url="https://issuer.fga.example/oauth/token",
headers=expected_header,
query_params=None,
body=None,
_preload_content=True,
_request_timeout=None,
post_params={
"client_id": "myclientid",
"client_secret": "mysecret",
"audience": "myaudience",
"grant_type": "client_credentials",
"scope": "read write admin",
},
)
await rest_client.close()

@patch.object(rest.RESTClientObject, "request")
async def test_get_authentication_obtain_client_credentials_with_scopes_string(self, mock_request):
"""
Test getting authentication header when method is client credentials with scopes as string
"""
response_body = """
{
"expires_in": 120,
"access_token": "AABBCCDD"
}
"""
mock_request.return_value = mock_response(response_body, 200)

credentials = Credentials(
method="client_credentials",
configuration=CredentialConfiguration(
client_id="myclientid",
client_secret="mysecret",
api_issuer="issuer.fga.example",
api_audience="myaudience",
scopes="read write admin",
),
)
rest_client = rest.RESTClientObject(Configuration())
current_time = datetime.now()
client = OAuth2Client(credentials)
auth_header = await client.get_authentication_header(rest_client)
self.assertEqual(auth_header, {"Authorization": "Bearer AABBCCDD"})
self.assertEqual(client._access_token, "AABBCCDD")
self.assertGreaterEqual(
client._access_expiry_time, current_time + timedelta(seconds=120)
)
expected_header = urllib3.response.HTTPHeaderDict(
{
"Accept": "application/json",
"Content-Type": "application/x-www-form-urlencoded",
"User-Agent": "openfga-sdk (python) 0.9.5",
}
)
mock_request.assert_called_once_with(
method="POST",
url="https://issuer.fga.example/oauth/token",
headers=expected_header,
query_params=None,
body=None,
_preload_content=True,
_request_timeout=None,
post_params={
"client_id": "myclientid",
"client_secret": "mysecret",
"audience": "myaudience",
"grant_type": "client_credentials",
"scope": "read write admin",
},
)
await rest_client.close()
113 changes: 113 additions & 0 deletions test/sync/oauth2_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,119 @@ def test_get_authentication_obtain_client_credentials(self, mock_request):
)
rest_client.close()

@patch.object(rest.RESTClientObject, "request")
def test_get_authentication_obtain_client_credentials_with_scopes_list(self, mock_request):
"""
Test getting authentication header when method is client credentials with scopes as list
"""
response_body = """
{
"expires_in": 120,
"access_token": "AABBCCDD"
}
"""
mock_request.return_value = mock_response(response_body, 200)

credentials = Credentials(
method="client_credentials",
configuration=CredentialConfiguration(
client_id="myclientid",
client_secret="mysecret",
api_issuer="issuer.fga.example",
api_audience="myaudience",
scopes=["read", "write", "admin"],
),
)
rest_client = rest.RESTClientObject(Configuration())
current_time = datetime.now()
client = OAuth2Client(credentials)
auth_header = client.get_authentication_header(rest_client)
self.assertEqual(auth_header, {"Authorization": "Bearer AABBCCDD"})
self.assertEqual(client._access_token, "AABBCCDD")
self.assertGreaterEqual(
client._access_expiry_time, current_time + timedelta(seconds=120)
)
expected_header = urllib3.response.HTTPHeaderDict(
{
"Accept": "application/json",
"Content-Type": "application/x-www-form-urlencoded",
"User-Agent": "openfga-sdk (python) 0.9.5",
}
)
mock_request.assert_called_once_with(
method="POST",
url="https://issuer.fga.example/oauth/token",
headers=expected_header,
query_params=None,
body=None,
_preload_content=True,
_request_timeout=None,
post_params={
"client_id": "myclientid",
"client_secret": "mysecret",
"audience": "myaudience",
"grant_type": "client_credentials",
"scope": "read write admin",
},
)
rest_client.close()

@patch.object(rest.RESTClientObject, "request")
def test_get_authentication_obtain_client_credentials_with_scopes_string(self, mock_request):
"""
Test getting authentication header when method is client credentials with scopes as string
"""
response_body = """
{
"expires_in": 120,
"access_token": "AABBCCDD"
}
"""
mock_request.return_value = mock_response(response_body, 200)

credentials = Credentials(
method="client_credentials",
configuration=CredentialConfiguration(
client_id="myclientid",
client_secret="mysecret",
api_issuer="issuer.fga.example",
api_audience="myaudience",
scopes="read write admin",
),
)
rest_client = rest.RESTClientObject(Configuration())
current_time = datetime.now()
client = OAuth2Client(credentials)
auth_header = client.get_authentication_header(rest_client)
self.assertEqual(auth_header, {"Authorization": "Bearer AABBCCDD"})
self.assertEqual(client._access_token, "AABBCCDD")
self.assertGreaterEqual(
client._access_expiry_time, current_time + timedelta(seconds=120)
)
expected_header = urllib3.response.HTTPHeaderDict(
{
"Accept": "application/json",
"Content-Type": "application/x-www-form-urlencoded",
"User-Agent": "openfga-sdk (python) 0.9.5",
}
)
mock_request.assert_called_once_with(
method="POST",
url="https://issuer.fga.example/oauth/token",
headers=expected_header,
query_params=None,
body=None,
_preload_content=True,
_request_timeout=None,
post_params={
"client_id": "myclientid",
"client_secret": "mysecret",
"audience": "myaudience",
"grant_type": "client_credentials",
"scope": "read write admin",
},
)

@patch.object(rest.RESTClientObject, "request")
def test_get_authentication_obtain_client_credentials_failed(self, mock_request):
"""
Expand Down