Skip to content

Commit 5c1f0f4

Browse files
authored
Arlington automation (#165)
1 parent 4e3e53b commit 5c1f0f4

File tree

1 file changed

+178
-117
lines changed

1 file changed

+178
-117
lines changed

tests/test_e2e.py

Lines changed: 178 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,32 @@ def _test_username_password(self,
102102
username=username if ".b2clogin.com" not in authority else None,
103103
)
104104

105+
def _test_device_flow(
106+
self, client_id=None, authority=None, scope=None, **ignored):
107+
assert client_id and authority and scope
108+
self.app = msal.PublicClientApplication(
109+
client_id, authority=authority)
110+
flow = self.app.initiate_device_flow(scopes=scope)
111+
assert "user_code" in flow, "DF does not seem to be provisioned: %s".format(
112+
json.dumps(flow, indent=4))
113+
logger.info(flow["message"])
114+
115+
duration = 60
116+
logger.info("We will wait up to %d seconds for you to sign in" % duration)
117+
flow["expires_at"] = min( # Shorten the time for quick test
118+
flow["expires_at"], time.time() + duration)
119+
result = self.app.acquire_token_by_device_flow(flow)
120+
self.assertLoosely( # It will skip this test if there is no user interaction
121+
result,
122+
assertion=lambda: self.assertIn('access_token', result),
123+
skippable_errors=self.app.client.DEVICE_FLOW_RETRIABLE_ERRORS)
124+
if "access_token" not in result:
125+
self.skip("End user did not complete Device Flow in time")
126+
self.assertCacheWorksForUser(result, scope, username=None)
127+
result["access_token"] = result["refresh_token"] = "************"
128+
logger.info(
129+
"%s obtained tokens: %s", self.id(), json.dumps(result, indent=4))
130+
105131

106132
THIS_FOLDER = os.path.dirname(__file__)
107133
CONFIG = os.path.join(THIS_FOLDER, "config.json")
@@ -256,29 +282,7 @@ def setUpClass(cls):
256282
cls.config = json.load(f)
257283

258284
def test_device_flow(self):
259-
scopes = self.config["scope"]
260-
self.app = msal.PublicClientApplication(
261-
self.config['client_id'], authority=self.config["authority"])
262-
flow = self.app.initiate_device_flow(scopes=scopes)
263-
assert "user_code" in flow, "DF does not seem to be provisioned: %s".format(
264-
json.dumps(flow, indent=4))
265-
logger.info(flow["message"])
266-
267-
duration = 60
268-
logger.info("We will wait up to %d seconds for you to sign in" % duration)
269-
flow["expires_at"] = min( # Shorten the time for quick test
270-
flow["expires_at"], time.time() + duration)
271-
result = self.app.acquire_token_by_device_flow(flow)
272-
self.assertLoosely( # It will skip this test if there is no user interaction
273-
result,
274-
assertion=lambda: self.assertIn('access_token', result),
275-
skippable_errors=self.app.client.DEVICE_FLOW_RETRIABLE_ERRORS)
276-
if "access_token" not in result:
277-
self.skip("End user did not complete Device Flow in time")
278-
self.assertCacheWorksForUser(result, scopes, username=None)
279-
result["access_token"] = result["refresh_token"] = "************"
280-
logger.info(
281-
"%s obtained tokens: %s", self.id(), json.dumps(result, indent=4))
285+
self._test_device_flow(**self.config)
282286

283287

284288
def get_lab_app(
@@ -334,6 +338,12 @@ def setUpClass(cls):
334338
def tearDownClass(cls):
335339
cls.session.close()
336340

341+
@classmethod
342+
def get_lab_app_object(cls, **query): # https://msidlab.com/swagger/index.html
343+
url = "https://msidlab.com/api/app"
344+
resp = cls.session.get(url, params=query)
345+
return resp.json()[0]
346+
337347
@classmethod
338348
def get_lab_user_secret(cls, lab_name="msidlab4"):
339349
lab_name = lab_name.lower()
@@ -348,67 +358,29 @@ def get_lab_user_secret(cls, lab_name="msidlab4"):
348358
def get_lab_user(cls, **query): # https://docs.msidlab.com/labapi/userapi.html
349359
resp = cls.session.get("https://msidlab.com/api/user", params=query)
350360
result = resp.json()[0]
361+
_env = query.get("azureenvironment", "").lower()
362+
authority_base = {
363+
"azureusgovernment": "https://login.microsoftonline.us/"
364+
}.get(_env, "https://login.microsoftonline.com/")
365+
scope = {
366+
"azureusgovernment": ["https://graph.microsoft.us/.default"],
367+
}.get(_env, ["https://graph.microsoft.com/.default"])
351368
return { # Mapping lab API response to our simplified configuration format
352-
"authority": "https://login.microsoftonline.com/{}.onmicrosoft.com".format(
353-
result["labName"]),
369+
"authority": authority_base + result["tenantID"],
354370
"client_id": result["appId"],
355371
"username": result["upn"],
356372
"lab_name": result["labName"],
357-
"scope": ["https://graph.microsoft.com/.default"],
373+
"scope": scope,
358374
}
359375

360-
def test_aad_managed_user(self): # Pure cloud
361-
config = self.get_lab_user(usertype="cloud")
362-
self._test_username_password(
363-
password=self.get_lab_user_secret(config["lab_name"]), **config)
364-
365-
def test_adfs4_fed_user(self):
366-
config = self.get_lab_user(usertype="federated", federationProvider="ADFSv4")
367-
self._test_username_password(
368-
password=self.get_lab_user_secret(config["lab_name"]), **config)
369-
370-
def test_adfs3_fed_user(self):
371-
config = self.get_lab_user(usertype="federated", federationProvider="ADFSv3")
372-
self._test_username_password(
373-
password=self.get_lab_user_secret(config["lab_name"]), **config)
374-
375-
def test_adfs2_fed_user(self):
376-
config = self.get_lab_user(usertype="federated", federationProvider="ADFSv2")
377-
self._test_username_password(
378-
password=self.get_lab_user_secret(config["lab_name"]), **config)
379-
380-
def test_adfs2019_fed_user(self):
381-
config = self.get_lab_user(usertype="federated", federationProvider="ADFSv2019")
382-
self._test_username_password(
383-
password=self.get_lab_user_secret(config["lab_name"]), **config)
384-
385-
def test_ropc_adfs2019_onprem(self):
386-
config = self.get_lab_user(usertype="onprem", federationProvider="ADFSv2019")
387-
config["authority"] = "https://fs.%s.com/adfs" % config["lab_name"]
388-
config["client_id"] = "PublicClientId"
389-
config["scope"] = self.adfs2019_scopes
390-
self._test_username_password(
391-
password=self.get_lab_user_secret(config["lab_name"]), **config)
392-
393-
@unittest.skipIf(os.getenv("TRAVIS"), "Browser automation is not yet implemented")
394-
def test_adfs2019_onprem_acquire_token_by_auth_code(self):
395-
"""When prompted, you can manually login using this account:
396-
397-
# https://msidlab.com/api/user?usertype=onprem&federationprovider=ADFSv2019
398-
username = "..." # The upn from the link above
399-
password="***" # From https://aka.ms/GetLabUserSecret?Secret=msidlabXYZ
400-
"""
401-
scopes = self.adfs2019_scopes
402-
config = self.get_lab_user(usertype="onprem", federationProvider="ADFSv2019")
376+
def _test_acquire_token_by_auth_code(
377+
self, client_id=None, authority=None, port=None, scope=None,
378+
**ignored):
379+
assert client_id and authority and port and scope
403380
(self.app, ac, redirect_uri) = _get_app_and_auth_code(
404-
# Configuration is derived from https://github.com/AzureAD/microsoft-authentication-library-for-dotnet/blob/4.7.0/tests/Microsoft.Identity.Test.Common/TestConstants.cs#L250-L259
405-
"PublicClientId",
406-
authority="https://fs.%s.com/adfs" % config["lab_name"],
407-
port=8080,
408-
scopes=scopes,
409-
)
381+
client_id, authority=authority, port=port, scopes=scope)
410382
result = self.app.acquire_token_by_authorization_code(
411-
ac, scopes, redirect_uri=redirect_uri)
383+
ac, scope, redirect_uri=redirect_uri)
412384
logger.debug(
413385
"%s: cache = %s, id_token_claims = %s",
414386
self.id(),
@@ -421,41 +393,32 @@ def test_adfs2019_onprem_acquire_token_by_auth_code(self):
421393
# Note: No interpolation here, cause error won't always present
422394
error=result.get("error"),
423395
error_description=result.get("error_description")))
424-
self.assertCacheWorksForUser(result, scopes, username=None)
425-
426-
@unittest.skipUnless(
427-
os.getenv("LAB_OBO_CLIENT_SECRET"),
428-
"Need LAB_OBO_CLIENT SECRET from https://msidlabs.vault.azure.net/secrets/TodoListServiceV2-OBO/c58ba97c34ca4464886943a847d1db56")
429-
def test_acquire_token_obo(self):
430-
# Some hardcoded, pre-defined settings
431-
obo_client_id = "f4aa5217-e87c-42b2-82af-5624dd14ee72"
432-
downstream_scopes = ["https://graph.microsoft.com/.default"]
433-
config = self.get_lab_user(usertype="cloud")
396+
self.assertCacheWorksForUser(result, scope, username=None)
434397

398+
def _test_acquire_token_obo(self, config_pca, config_cca):
435399
# 1. An app obtains a token representing a user, for our mid-tier service
436400
pca = msal.PublicClientApplication(
437-
"c0485386-1e9a-4663-bc96-7ab30656de7f", authority=config.get("authority"))
401+
config_pca["client_id"], authority=config_pca["authority"])
438402
pca_result = pca.acquire_token_by_username_password(
439-
config["username"],
440-
self.get_lab_user_secret(config["lab_name"]),
441-
scopes=[ # The OBO app's scope. Yours might be different.
442-
"api://%s/read" % obo_client_id],
403+
config_pca["username"],
404+
config_pca["password"],
405+
scopes=config_pca["scope"],
443406
)
444407
self.assertIsNotNone(
445408
pca_result.get("access_token"),
446409
"PCA failed to get AT because %s" % json.dumps(pca_result, indent=2))
447410

448411
# 2. Our mid-tier service uses OBO to obtain a token for downstream service
449412
cca = msal.ConfidentialClientApplication(
450-
obo_client_id,
451-
client_credential=os.getenv("LAB_OBO_CLIENT_SECRET"),
452-
authority=config.get("authority"),
413+
config_cca["client_id"],
414+
client_credential=config_cca["client_secret"],
415+
authority=config_cca["authority"],
453416
# token_cache= ..., # Default token cache is all-tokens-store-in-memory.
454417
# That's fine if OBO app uses short-lived msal instance per session.
455418
# Otherwise, the OBO app need to implement a one-cache-per-user setup.
456419
)
457420
cca_result = cca.acquire_token_on_behalf_of(
458-
pca_result['access_token'], downstream_scopes)
421+
pca_result['access_token'], config_cca["scope"])
459422
self.assertNotEqual(None, cca_result.get("access_token"), str(cca_result))
460423

461424
# 3. Now the OBO app can simply store downstream token(s) in same session.
@@ -465,12 +428,93 @@ def test_acquire_token_obo(self):
465428
# Assuming you already did that (which is not shown in this test case),
466429
# the following part shows one of the ways to obtain an AT from cache.
467430
username = cca_result.get("id_token_claims", {}).get("preferred_username")
468-
self.assertEqual(config["username"], username)
431+
self.assertEqual(config_cca["username"], username)
469432
if username: # A precaution so that we won't use other user's token
470433
account = cca.get_accounts(username=username)[0]
471-
result = cca.acquire_token_silent(downstream_scopes, account)
434+
result = cca.acquire_token_silent(config_cca["scope"], account)
472435
self.assertEqual(cca_result["access_token"], result["access_token"])
473436

437+
def _test_acquire_token_by_client_secret(
438+
self, client_id=None, client_secret=None, authority=None, scope=None,
439+
**ignored):
440+
assert client_id and client_secret and authority and scope
441+
app = msal.ConfidentialClientApplication(
442+
client_id, client_credential=client_secret, authority=authority)
443+
result = app.acquire_token_for_client(scope)
444+
self.assertIsNotNone(result.get("access_token"), "Got %s instead" % result)
445+
446+
447+
class WorldWideTestCase(LabBasedTestCase):
448+
449+
def test_aad_managed_user(self): # Pure cloud
450+
config = self.get_lab_user(usertype="cloud")
451+
config["password"] = self.get_lab_user_secret(config["lab_name"])
452+
self._test_username_password(**config)
453+
454+
def test_adfs4_fed_user(self):
455+
config = self.get_lab_user(usertype="federated", federationProvider="ADFSv4")
456+
config["password"] = self.get_lab_user_secret(config["lab_name"])
457+
self._test_username_password(**config)
458+
459+
def test_adfs3_fed_user(self):
460+
config = self.get_lab_user(usertype="federated", federationProvider="ADFSv3")
461+
config["password"] = self.get_lab_user_secret(config["lab_name"])
462+
self._test_username_password(**config)
463+
464+
def test_adfs2_fed_user(self):
465+
config = self.get_lab_user(usertype="federated", federationProvider="ADFSv2")
466+
config["password"] = self.get_lab_user_secret(config["lab_name"])
467+
self._test_username_password(**config)
468+
469+
def test_adfs2019_fed_user(self):
470+
config = self.get_lab_user(usertype="federated", federationProvider="ADFSv2019")
471+
config["password"] = self.get_lab_user_secret(config["lab_name"])
472+
self._test_username_password(**config)
473+
474+
def test_ropc_adfs2019_onprem(self):
475+
# Configuration is derived from https://github.com/AzureAD/microsoft-authentication-library-for-dotnet/blob/4.7.0/tests/Microsoft.Identity.Test.Common/TestConstants.cs#L250-L259
476+
config = self.get_lab_user(usertype="onprem", federationProvider="ADFSv2019")
477+
config["authority"] = "https://fs.%s.com/adfs" % config["lab_name"]
478+
config["client_id"] = "PublicClientId"
479+
config["scope"] = self.adfs2019_scopes
480+
config["password"] = self.get_lab_user_secret(config["lab_name"])
481+
self._test_username_password(**config)
482+
483+
@unittest.skipIf(os.getenv("TRAVIS"), "Browser automation is not yet implemented")
484+
def test_adfs2019_onprem_acquire_token_by_auth_code(self):
485+
"""When prompted, you can manually login using this account:
486+
487+
# https://msidlab.com/api/user?usertype=onprem&federationprovider=ADFSv2019
488+
username = "..." # The upn from the link above
489+
password="***" # From https://aka.ms/GetLabUserSecret?Secret=msidlabXYZ
490+
"""
491+
config = self.get_lab_user(usertype="onprem", federationProvider="ADFSv2019")
492+
config["authority"] = "https://fs.%s.com/adfs" % config["lab_name"]
493+
config["client_id"] = "PublicClientId"
494+
config["scope"] = self.adfs2019_scopes
495+
config["port"] = 8080
496+
self._test_acquire_token_by_auth_code(**config)
497+
498+
@unittest.skipUnless(
499+
os.getenv("LAB_OBO_CLIENT_SECRET"),
500+
"Need LAB_OBO_CLIENT SECRET from https://msidlabs.vault.azure.net/secrets/TodoListServiceV2-OBO/c58ba97c34ca4464886943a847d1db56")
501+
def test_acquire_token_obo(self):
502+
config = self.get_lab_user(usertype="cloud")
503+
504+
config_cca = {}
505+
config_cca.update(config)
506+
config_cca["client_id"] = "f4aa5217-e87c-42b2-82af-5624dd14ee72"
507+
config_cca["scope"] = ["https://graph.microsoft.com/.default"]
508+
config_cca["client_secret"] = os.getenv("LAB_OBO_CLIENT_SECRET")
509+
510+
config_pca = {}
511+
config_pca.update(config)
512+
config_pca["client_id"] = "c0485386-1e9a-4663-bc96-7ab30656de7f"
513+
config_pca["password"] = self.get_lab_user_secret(config_pca["lab_name"])
514+
config_pca["scope"] = ["api://%s/read" % config_cca["client_id"]]
515+
516+
self._test_acquire_token_obo(config_pca, config_cca)
517+
474518
def _build_b2c_authority(self, policy):
475519
base = "https://msidlabb2c.b2clogin.com/msidlabb2c.onmicrosoft.com"
476520
return base + "/" + policy # We do not support base + "?p=" + policy
@@ -484,29 +528,12 @@ def test_b2c_acquire_token_by_auth_code(self):
484528
# This won't work https://msidlab.com/api/user?usertype=b2c
485529
password="***" # From https://aka.ms/GetLabUserSecret?Secret=msidlabb2c
486530
"""
487-
scopes = ["https://msidlabb2c.onmicrosoft.com/msaapp/user_impersonation"]
488-
(self.app, ac, redirect_uri) = _get_app_and_auth_code(
489-
"b876a048-55a5-4fc5-9403-f5d90cb1c852",
490-
client_secret=self.get_lab_user_secret("MSIDLABB2C-MSAapp-AppSecret"),
531+
self._test_acquire_token_by_auth_code(
491532
authority=self._build_b2c_authority("B2C_1_SignInPolicy"),
533+
client_id="b876a048-55a5-4fc5-9403-f5d90cb1c852",
492534
port=3843, # Lab defines 4 of them: [3843, 4584, 4843, 60000]
493-
scopes=scopes,
535+
scope=["https://msidlabb2c.onmicrosoft.com/msaapp/user_impersonation"]
494536
)
495-
result = self.app.acquire_token_by_authorization_code(
496-
ac, scopes, redirect_uri=redirect_uri)
497-
logger.debug(
498-
"%s: cache = %s, id_token_claims = %s",
499-
self.id(),
500-
json.dumps(self.app.token_cache._cache, indent=4),
501-
json.dumps(result.get("id_token_claims"), indent=4),
502-
)
503-
self.assertIn(
504-
"access_token", result,
505-
"{error}: {error_description}".format(
506-
# Note: No interpolation here, cause error won't always present
507-
error=result.get("error"),
508-
error_description=result.get("error_description")))
509-
self.assertCacheWorksForUser(result, scopes, username=None)
510537

511538
def test_b2c_acquire_token_by_ropc(self):
512539
self._test_username_password(
@@ -517,6 +544,40 @@ def test_b2c_acquire_token_by_ropc(self):
517544
scope=["https://msidlabb2c.onmicrosoft.com/msidlabb2capi/read"],
518545
)
519546

547+
548+
class ArlingtonCloudTestCase(LabBasedTestCase):
549+
environment = "azureusgovernment"
550+
551+
def test_acquire_token_by_ropc(self):
552+
config = self.get_lab_user(azureenvironment=self.environment)
553+
config["password"] = self.get_lab_user_secret(config["lab_name"])
554+
self._test_username_password(**config)
555+
556+
def test_acquire_token_by_client_secret(self):
557+
config = self.get_lab_user(usertype="cloud", azureenvironment=self.environment, publicClient="no")
558+
config["client_secret"] = self.get_lab_user_secret("ARLMSIDLAB1-IDLASBS-App-CC-Secret")
559+
self._test_acquire_token_by_client_secret(**config)
560+
561+
def test_acquire_token_obo(self):
562+
config_cca = self.get_lab_user(
563+
usertype="cloud", azureenvironment=self.environment, publicClient="no")
564+
config_cca["scope"] = ["https://graph.microsoft.us/.default"]
565+
config_cca["client_secret"] = self.get_lab_user_secret("ARLMSIDLAB1-IDLASBS-App-CC-Secret")
566+
567+
config_pca = self.get_lab_user(usertype="cloud", azureenvironment=self.environment, publicClient="yes")
568+
obo_app_object = self.get_lab_app_object(
569+
usertype="cloud", azureenvironment=self.environment, publicClient="no")
570+
config_pca["password"] = self.get_lab_user_secret(config_pca["lab_name"])
571+
config_pca["scope"] = ["{app_uri}/files.read".format(app_uri=obo_app_object.get("identifierUris"))]
572+
573+
self._test_acquire_token_obo(config_pca, config_cca)
574+
575+
def test_acquire_token_device_flow(self):
576+
config = self.get_lab_user(usertype="cloud", azureenvironment=self.environment, publicClient="yes")
577+
config["scope"] = ["user.read"]
578+
self._test_device_flow(**config)
579+
580+
520581
if __name__ == "__main__":
521582
unittest.main()
522583

0 commit comments

Comments
 (0)