|
| 1 | +import httpcore |
| 2 | +import httpx |
| 3 | +import urllib |
| 4 | +import logging |
| 5 | + |
| 6 | +from typing import Any, Dict, Optional, Sequence, Union, Generator, NewType |
| 7 | +from pathlib import Path |
| 8 | + |
| 9 | +from resolver_types import SchemaData |
| 10 | +from reference import Reference |
| 11 | +from resolved_schemas import ResolvedSchemas |
| 12 | +from data_loader import DataLoader |
| 13 | + |
| 14 | +class SchemaResolver: |
| 15 | + |
| 16 | + def __init__(self, url_or_path: Union[str, Path]): |
| 17 | + if not url_or_path: |
| 18 | + raise ValueError('Invalid document root reference, it shall be an remote url or local file path') |
| 19 | + |
| 20 | + self._root_path: Optional[Path] = None |
| 21 | + self._root_path_dir: Optional[Path] = None |
| 22 | + self._root_url: Optional[str] = None |
| 23 | + self._root_url_scheme: Optional[str] = None |
| 24 | + |
| 25 | + if isinstance(url_or_path, Path): |
| 26 | + self._root_path = url_or_path.absolute() |
| 27 | + self._root_path_dir = self._root_path.parent |
| 28 | + else: |
| 29 | + self._root_url = url_or_path |
| 30 | + self._root_url_scheme = urllib.parse.urlparse(url_or_path).scheme |
| 31 | + |
| 32 | + def resolve(self, recursive: bool = True) -> ResolvedSchemas: |
| 33 | + root_schema: SchemaData |
| 34 | + external_schemas: Dict[str, SchemaData] = {} |
| 35 | + errors: Sequence[str] = [] |
| 36 | + |
| 37 | + if self._root_path: |
| 38 | + root_schema = self._fetch_remote_file_path(self._root_path) |
| 39 | + else: |
| 40 | + root_schema = self._fetch_url_reference(self._root_url) |
| 41 | + |
| 42 | + self._resolve_schema_references(root_schema, external_schemas, errors, recursive) |
| 43 | + return ResolvedSchemas(root_schema, external_schemas, errors) |
| 44 | + |
| 45 | + def _resolve_schema_references(self, root: SchemaData, external_schemas: Dict[str, SchemaData], errors: Sequence[str], recursive: bool) -> Sequence[SchemaData]: |
| 46 | + |
| 47 | + for ref in self._lookup_schema_references(root): |
| 48 | + if ref.is_local_ref(): |
| 49 | + continue |
| 50 | + |
| 51 | + try: |
| 52 | + path = ref.value.split('#')[0] |
| 53 | + if path in external_schemas: |
| 54 | + continue |
| 55 | + |
| 56 | + if ref.is_url_reference(): |
| 57 | + external_schemas[path] = self._fetch_url_reference(path) |
| 58 | + else: |
| 59 | + external_schemas[path] = self._fetch_remote_reference(path) |
| 60 | + |
| 61 | + if recursive: |
| 62 | + self._resolve_schema_references(external_schemas[path], external_schemas, errors, recursive) |
| 63 | + |
| 64 | + except Exception as e: |
| 65 | + errors.append('Failed to gather external reference data of {0}'.format(ref.value)) |
| 66 | + logging.exception('Failed to gather external reference data of {0}'.format(ref.value)) |
| 67 | + |
| 68 | + def _fetch_remote_reference(self, relative_path: str) -> SchemaData: |
| 69 | + if self._root_path: |
| 70 | + abs_path = self._root_path_dir.joinpath(relative_path) |
| 71 | + return self._fetch_remote_file_path(abs_path) |
| 72 | + else: |
| 73 | + abs_url = urllib.parse.urljoin(self._root_url, relative_path) |
| 74 | + return self._fetch_url_reference(abs_url) |
| 75 | + |
| 76 | + def _fetch_remote_file_path(self, path: Path) -> SchemaData: |
| 77 | + logging.info('Fetching remote ref file path > {0}'.format(path)) |
| 78 | + return DataLoader.load(str(path), path.read_bytes()) |
| 79 | + |
| 80 | + def _fetch_url_reference(self, url: str) -> SchemaData: |
| 81 | + if url.startswith('//', 0): |
| 82 | + url = "{0}{1}".format(self._root_url_scheme, url) |
| 83 | + |
| 84 | + logging.info('Fetching remote ref url > {0}'.format(url)) |
| 85 | + return DataLoader.load(url, httpx.get(url).content) |
| 86 | + |
| 87 | + def _lookup_schema_references(self, attr: Any) -> Generator[Reference, None, None]: |
| 88 | + if isinstance(attr, dict): |
| 89 | + for key, val in attr.items(): |
| 90 | + if key == '$ref': |
| 91 | + yield Reference(val) |
| 92 | + else: |
| 93 | + yield from self._lookup_schema_references(val) |
| 94 | + |
| 95 | + elif isinstance(attr, list): |
| 96 | + for val in attr: |
| 97 | + yield from self._lookup_schema_references(val) |
| 98 | + |
0 commit comments