|
13 | 13 | # limitations under the License. |
14 | 14 |
|
15 | 15 | import os |
| 16 | +from typing import Iterator |
16 | 17 | import uuid |
17 | 18 |
|
18 | | -from googleapiclient import errors |
| 19 | +from googleapiclient import errors # type: ignore |
19 | 20 | import pytest |
20 | | -from retrying import retry |
| 21 | +from retrying import retry # type: ignore |
21 | 22 |
|
22 | 23 | import access |
23 | 24 | import service_accounts |
24 | 25 |
|
25 | | - |
26 | 26 | # Setting up variables for testing |
27 | 27 | GCLOUD_PROJECT = os.environ["GOOGLE_CLOUD_PROJECT"] |
28 | 28 |
|
29 | 29 | # specifying a sample role to be assigned |
30 | 30 | GCP_ROLE = "roles/owner" |
31 | 31 |
|
32 | 32 |
|
33 | | -def retry_if_conflict(exception): |
34 | | - return (isinstance(exception, errors.HttpError) |
35 | | - and 'There were concurrent policy changes' in str(exception)) |
| 33 | +def retry_if_conflict(exception: Exception) -> bool: |
| 34 | + return isinstance( |
| 35 | + exception, errors.HttpError |
| 36 | + ) and "There were concurrent policy changes" in str(exception) |
36 | 37 |
|
37 | 38 |
|
38 | 39 | @pytest.fixture(scope="module") |
39 | | -def test_member(): |
| 40 | +def test_member() -> Iterator[str]: |
40 | 41 | # section to create service account to test policy updates. |
41 | 42 | # we use the first portion of uuid4 because full version is too long. |
42 | | - name = "python-test-" + str(uuid.uuid4()).split('-')[0] |
| 43 | + name = "python-test-" + str(uuid.uuid4()).split("-")[0] |
43 | 44 | email = name + "@" + GCLOUD_PROJECT + ".iam.gserviceaccount.com" |
44 | 45 | member = "serviceAccount:" + email |
45 | | - service_accounts.create_service_account( |
46 | | - GCLOUD_PROJECT, name, "Py Test Account" |
47 | | - ) |
| 46 | + service_accounts.create_service_account(GCLOUD_PROJECT, name, "Py Test Account") |
48 | 47 |
|
49 | 48 | yield member |
50 | 49 |
|
51 | 50 | # deleting the service account created above |
52 | 51 | service_accounts.delete_service_account(email) |
53 | 52 |
|
54 | 53 |
|
55 | | -def test_get_policy(capsys): |
| 54 | +def test_get_policy(capsys: pytest.LogCaptureFixture) -> None: |
56 | 55 | access.get_policy(GCLOUD_PROJECT, version=3) |
57 | 56 | out, _ = capsys.readouterr() |
58 | 57 | assert "etag" in out |
59 | 58 |
|
60 | 59 |
|
61 | | -def test_modify_policy_add_role(test_member, capsys): |
62 | | - @retry(wait_exponential_multiplier=1000, wait_exponential_max=10000, |
63 | | - stop_max_attempt_number=5, retry_on_exception=retry_if_conflict) |
64 | | - def test_call(): |
| 60 | +def test_modify_policy_add_role( |
| 61 | + test_member: str, capsys: pytest.LogCaptureFixture |
| 62 | +) -> None: |
| 63 | + @retry( |
| 64 | + wait_exponential_multiplier=1000, |
| 65 | + wait_exponential_max=10000, |
| 66 | + stop_max_attempt_number=5, |
| 67 | + retry_on_exception=retry_if_conflict, |
| 68 | + ) |
| 69 | + def test_call() -> None: |
65 | 70 | policy = access.get_policy(GCLOUD_PROJECT, version=3) |
66 | 71 | access.modify_policy_add_role(policy, GCLOUD_PROJECT, test_member) |
67 | 72 | out, _ = capsys.readouterr() |
68 | 73 | assert "etag" in out |
| 74 | + |
69 | 75 | test_call() |
70 | 76 |
|
71 | 77 |
|
72 | | -def test_modify_policy_remove_member(test_member, capsys): |
73 | | - @retry(wait_exponential_multiplier=1000, wait_exponential_max=10000, |
74 | | - stop_max_attempt_number=5, retry_on_exception=retry_if_conflict) |
75 | | - def test_call(): |
| 78 | +def test_modify_policy_remove_member(test_member: str, capsys: pytest.LogCaptureFixture) -> None: |
| 79 | + @retry( |
| 80 | + wait_exponential_multiplier=1000, |
| 81 | + wait_exponential_max=10000, |
| 82 | + stop_max_attempt_number=5, |
| 83 | + retry_on_exception=retry_if_conflict, |
| 84 | + ) |
| 85 | + def test_call() -> None: |
76 | 86 | policy = access.get_policy(GCLOUD_PROJECT, version=3) |
77 | 87 | access.modify_policy_remove_member(policy, GCP_ROLE, test_member) |
78 | 88 | out, _ = capsys.readouterr() |
79 | 89 | assert "iam.gserviceaccount.com" in out |
| 90 | + |
80 | 91 | test_call() |
81 | 92 |
|
82 | 93 |
|
83 | | -def test_set_policy(capsys): |
84 | | - @retry(wait_exponential_multiplier=1000, wait_exponential_max=10000, |
85 | | - stop_max_attempt_number=5, retry_on_exception=retry_if_conflict) |
86 | | - def test_call(): |
| 94 | +def test_set_policy(capsys: pytest.LogCaptureFixture) -> None: |
| 95 | + @retry( |
| 96 | + wait_exponential_multiplier=1000, |
| 97 | + wait_exponential_max=10000, |
| 98 | + stop_max_attempt_number=5, |
| 99 | + retry_on_exception=retry_if_conflict, |
| 100 | + ) |
| 101 | + def test_call() -> None: |
87 | 102 | policy = access.get_policy(GCLOUD_PROJECT, version=3) |
88 | 103 | access.set_policy(GCLOUD_PROJECT, policy) |
89 | 104 | out, _ = capsys.readouterr() |
90 | 105 | assert "etag" in out |
| 106 | + |
91 | 107 | test_call() |
92 | 108 |
|
93 | 109 |
|
94 | | -def test_permissions(capsys): |
| 110 | +def test_permissions(capsys: pytest.LogCaptureFixture) -> None: |
95 | 111 | access.test_permissions(GCLOUD_PROJECT) |
96 | 112 | out, _ = capsys.readouterr() |
97 | 113 | assert "permissions" in out |
0 commit comments