-
Notifications
You must be signed in to change notification settings - Fork 339
feat: Add function to verify an App Check token #642
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 16 commits
25d3842
cf60bb9
3c4e191
4e84ce3
dc9cbfd
eb1725d
c5a25c2
aa98697
7e2259c
0978778
85145e1
41f93ea
5436d12
6a4815a
a592256
5b94963
89f29d3
a5290b5
c46b60b
e9148b7
b732aa6
46f22f6
2b6c7e7
e08f355
73edeb3
33f93e5
5321203
77eb730
fe30abb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,3 +12,4 @@ apikey.txt | |
htmlcov/ | ||
.pytest_cache/ | ||
.vscode/ | ||
.venv/ |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,132 @@ | ||
# Copyright 2022 Google Inc. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
"""Firebase App Check module.""" | ||
|
||
from typing import Any, Dict, List | ||
import jwt | ||
from jwt import PyJWKClient, DecodeError, InvalidKeyError | ||
from firebase_admin import _utils | ||
|
||
_APP_CHECK_ATTRIBUTE = '_app_check' | ||
|
||
def _get_app_check_service(app) -> Any: | ||
return _utils.get_app_service(app, _APP_CHECK_ATTRIBUTE, _AppCheckService) | ||
|
||
def verify_token(token: str, app=None) -> Dict[str, Any]: | ||
"""Verifies a Firebase App Check token. | ||
|
||
Args: | ||
token: A token from App Check. | ||
app: An App instance (optional). | ||
|
||
Returns: | ||
Dict[str, Any]: A token's decoded claims | ||
if the App Check token is valid; otherwise, a rejected promise. | ||
dwyfrequency marked this conversation as resolved.
Show resolved
Hide resolved
|
||
""" | ||
return _get_app_check_service(app).verify_token(token) | ||
|
||
class _AppCheckService: | ||
"""Service class that implements Firebase App Check functionality.""" | ||
|
||
_APP_CHECK_ISSUER = 'https://firebaseappcheck.googleapis.com/' | ||
_JWKS_URL = 'https://firebaseappcheck.googleapis.com/v1/jwks' | ||
_project_id = None | ||
|
||
def __init__(self, app): | ||
dwyfrequency marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# Validate and store the project_id to validate the JWT claims | ||
self._project_id = app.project_id | ||
if not self._project_id: | ||
raise ValueError( | ||
'Project ID is required to access App Check service. Either set the ' | ||
'projectId option, or use service account credentials. Alternatively, set the ' | ||
'GOOGLE_CLOUD_PROJECT environment variable.') | ||
dwyfrequency marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
def verify_token(self, token: str) -> Dict[str, Any]: | ||
"""Verifies a Firebase App Check token.""" | ||
_Validators.check_string("app check token", token) | ||
|
||
# Obtain the Firebase App Check Public Keys | ||
# Note: It is not recommended to hard code these keys as they rotate, | ||
# but you should cache them for up to 6 hours. | ||
dwyfrequency marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# Default lifespan is 300 seconds (5 minutes) so we change it to 21600 seconds (6 hours). | ||
jwks_client = PyJWKClient(self._JWKS_URL, lifespan=21600) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry for not catching earlier, I think we should initialize this in the constructor and make jwks_client a class member. Otherwise, every time we call |
||
signing_key = jwks_client.get_signing_key_from_jwt(token) | ||
self._has_valid_token_headers(jwt.get_unverified_header(token)) | ||
verified_claims = self._decode_and_verify(token, signing_key.get('key')) | ||
|
||
# The token's subject will be the app ID, you may optionally filter against | ||
# an allow list | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we can remove |
||
verified_claims['app_id'] = verified_claims.get('sub') | ||
return verified_claims | ||
|
||
def _has_valid_token_headers(self, headers: Any) -> None: | ||
"""Checks whether the token has valid headers for App Check.""" | ||
# Ensure the token's header has type JWT | ||
if headers.get('typ') != 'JWT': | ||
raise ValueError("The provided App Check token has an incorrect type header") | ||
# Ensure the token's header uses the algorithm RS256 | ||
algorithm = headers.get('alg') | ||
if algorithm != 'RS256': | ||
raise ValueError( | ||
'The provided App Check token has an incorrect algorithm. ' | ||
dwyfrequency marked this conversation as resolved.
Show resolved
Hide resolved
|
||
f'Expected RS256 but got {algorithm}.' | ||
) | ||
|
||
def _decode_token(self, token: str, signing_key: str, algorithms: List[str]) -> Dict[str, Any]: | ||
"""Decodes the JWT received from App Check.""" | ||
payload = {} | ||
try: | ||
payload = jwt.decode( | ||
token, | ||
signing_key, | ||
algorithms | ||
) | ||
except (DecodeError, InvalidKeyError): | ||
dwyfrequency marked this conversation as resolved.
Show resolved
Hide resolved
|
||
ValueError( | ||
'Decoding App Check token failed. Make sure you passed the entire string JWT ' | ||
'which represents the Firebase App Check token.' | ||
) | ||
return payload | ||
|
||
def _decode_and_verify(self, token: str, signing_key: str): | ||
"""Decodes and verifies the token from App Check.""" | ||
payload = self._decode_token( | ||
token, | ||
signing_key, | ||
algorithms=["RS256"] | ||
) | ||
|
||
scoped_project_id = 'projects/' + self._project_id | ||
audience = payload.get('aud') | ||
if not isinstance(audience, list) and scoped_project_id not in audience: | ||
raise ValueError('Firebase App Check token has incorrect "aud" (audience) claim.') | ||
if not payload.get('iss').startswith(self._APP_CHECK_ISSUER): | ||
raise ValueError('Token does not contain the correct "iss" (issuer).') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we also validate the example from Node.js:
|
||
|
||
return payload | ||
|
||
class _Validators: | ||
"""A collection of data validation utilities. | ||
|
||
Methods provided in this class raise ``ValueErrors`` if any validations fail. | ||
""" | ||
|
||
@classmethod | ||
def check_string(cls, label: str, value: Any): | ||
"""Checks if the given value is a string.""" | ||
if value is None: | ||
raise ValueError('{0} "{1}" must be a non-empty string.'.format(label, value)) | ||
if not isinstance(value, str): | ||
raise ValueError('{0} "{1}" must be a string.'.format(label, value)) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,142 @@ | ||
# Copyright 2022 Google Inc. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
"""Test cases for the firebase_admin.app_check module.""" | ||
|
||
import pytest | ||
|
||
import firebase_admin | ||
from firebase_admin import app_check | ||
from tests import testutils | ||
|
||
NON_STRING_ARGS = [list(), tuple(), dict(), True, False, 1, 0] | ||
|
||
APP_ID = "1234567890" | ||
JWT_PAYLOAD_SAMPLE = { | ||
"headers": { | ||
"alg": "RS256", | ||
"typ": "JWT" | ||
}, | ||
"sub": APP_ID, | ||
"name": "John Doe", | ||
"iss": "https://firebaseappcheck.googleapis.com/", | ||
"aud": ["projects/1334"] | ||
} | ||
|
||
class TestBatch: | ||
|
||
@classmethod | ||
def setup_class(cls): | ||
cred = testutils.MockCredential() | ||
firebase_admin.initialize_app(cred, {'projectId': 'explicit-project-id'}) | ||
|
||
@classmethod | ||
def teardown_class(cls): | ||
testutils.cleanup_apps() | ||
|
||
class TestVerifyToken(TestBatch): | ||
|
||
def test_no_project_id(self): | ||
def evaluate(): | ||
app = firebase_admin.initialize_app(testutils.MockCredential(), name='no_project_id') | ||
with pytest.raises(ValueError): | ||
app_check.verify_token(token="app_check_token", app=app) | ||
testutils.run_without_project_id(evaluate) | ||
|
||
@pytest.mark.parametrize('token', NON_STRING_ARGS) | ||
def test_verify_token_with_non_string_raises_error(self, token): | ||
with pytest.raises(ValueError) as excinfo: | ||
app_check.verify_token(token) | ||
expected = 'app check token "{0}" must be a string.'.format(token) | ||
assert str(excinfo.value) == expected | ||
|
||
def test_has_valid_token_headers(self): | ||
app = firebase_admin.get_app() | ||
app_check_service = app_check._get_app_check_service(app) | ||
|
||
headers = {"alg": "RS256", 'typ': "JWT"} | ||
assert app_check_service._has_valid_token_headers(headers=headers) is None | ||
|
||
def test_has_valid_token_headers_with_incorrect_type_raises_error(self): | ||
app = firebase_admin.get_app() | ||
app_check_service = app_check._get_app_check_service(app) | ||
headers = {"alg": "RS256", 'typ': "WRONG"} | ||
with pytest.raises(ValueError) as excinfo: | ||
app_check_service._has_valid_token_headers(headers=headers) | ||
|
||
expected = 'The provided App Check token has an incorrect type header' | ||
assert str(excinfo.value) == expected | ||
|
||
def test_has_valid_token_headers_with_incorrect_algorithm_raises_error(self): | ||
app = firebase_admin.get_app() | ||
app_check_service = app_check._get_app_check_service(app) | ||
headers = {"alg": "HS256", 'typ': "JWT"} | ||
with pytest.raises(ValueError) as excinfo: | ||
app_check_service._has_valid_token_headers(headers=headers) | ||
|
||
expected = ('The provided App Check token has an incorrect algorithm. ' | ||
'Expected RS256 but got HS256.') | ||
assert str(excinfo.value) == expected | ||
|
||
def test_decode_token(self, mocker): | ||
jwt_decode_mock = mocker.patch("jwt.decode", return_value=JWT_PAYLOAD_SAMPLE) | ||
app = firebase_admin.get_app() | ||
app_check_service = app_check._get_app_check_service(app) | ||
payload = app_check_service._decode_token( | ||
token=None, | ||
signing_key="1234", | ||
algorithms=["RS256"] | ||
) | ||
|
||
jwt_decode_mock.assert_called_once_with(None, "1234", ["RS256"]) | ||
assert payload == JWT_PAYLOAD_SAMPLE.copy() | ||
|
||
def test_verify_token(self, mocker): | ||
mocker.patch("jwt.decode", return_value=JWT_PAYLOAD_SAMPLE) | ||
mocker.patch("jwt.PyJWKClient.get_signing_key_from_jwt", return_value={"key": "secret"}) | ||
mocker.patch("jwt.get_unverified_header", return_value=JWT_PAYLOAD_SAMPLE.get("headers")) | ||
app = firebase_admin.get_app() | ||
|
||
payload = app_check.verify_token("encoded", app) | ||
expected = JWT_PAYLOAD_SAMPLE.copy() | ||
expected['app_id'] = APP_ID | ||
assert payload == expected | ||
|
||
def test_verify_token_with_non_list_audience_raises_error(self, mocker): | ||
jwt_with_non_list_audience = JWT_PAYLOAD_SAMPLE.copy() | ||
jwt_with_non_list_audience["aud"] = '1234' | ||
mocker.patch("jwt.decode", return_value=jwt_with_non_list_audience) | ||
mocker.patch("jwt.PyJWKClient.get_signing_key_from_jwt", return_value={"key": "secret"}) | ||
mocker.patch("jwt.get_unverified_header", return_value=JWT_PAYLOAD_SAMPLE.get("headers")) | ||
app = firebase_admin.get_app() | ||
|
||
with pytest.raises(ValueError) as excinfo: | ||
app_check.verify_token("encoded", app) | ||
|
||
expected = 'Firebase App Check token has incorrect "aud" (audience) claim.' | ||
assert str(excinfo.value) == expected | ||
|
||
def test_verify_token_with_incorrect_issuer_raises_error(self, mocker): | ||
jwt_with_non_incorrect_issuer = JWT_PAYLOAD_SAMPLE.copy() | ||
jwt_with_non_incorrect_issuer["iss"] = "https://dwyfrequency.googleapis.com/" | ||
mocker.patch("jwt.decode", return_value=jwt_with_non_incorrect_issuer) | ||
mocker.patch("jwt.PyJWKClient.get_signing_key_from_jwt", return_value={"key": "secret"}) | ||
mocker.patch("jwt.get_unverified_header", return_value=JWT_PAYLOAD_SAMPLE.get("headers")) | ||
app = firebase_admin.get_app() | ||
|
||
with pytest.raises(ValueError) as excinfo: | ||
app_check.verify_token("encoded", app) | ||
|
||
expected = 'Token does not contain the correct "iss" (issuer).' | ||
assert str(excinfo.value) == expected |
Uh oh!
There was an error while loading. Please reload this page.