Skip to content
This repository was archived by the owner on Sep 22, 2023. It is now read-only.

Commit f930759

Browse files
authored
Adaptation for API v5 revisions (Phase 1) (#97)
* feat: Add background task support * feat: Implement progress bar for rescanning images using bgtask API * refactor: Clean up BaseFunction inheritance using contextvars - Previously, to bind the current session with API function classes, we generated new type objects at runtime. - This has confused IDEs and type checkers. - Now type checkers can statically deduce the types for individual API function classes. - Now we use contextvars (ai.backend.client.session.api_session) to keep the reference to the current session. - There are *NO* public Session/AsyncSession API changes! - Only the API function classes need to be rewritten. - For synchronous Session, we pass the context to the separate worker thread using copy_context() whenever calling API functions, which is a light-weight operation. * fix: Remove redundant src/ai/backend/client/etcd.py which had been already copied to src/ai/backend/client/func/etcd.py * ci: Bump the lint Python version to 3.7 * ci: Remove Python 3.6 from AppVeyor * ci: Make Python 3.8 as the officially supported environment by removing it from "allow_failures" environments in Travis CI * ci: Now type errors are no longer ignored, and all remaining type errors are now fixed. * test: All test cases in tests/cli/test_cli_proxy.py are marked "xfail" because there is an upstream issue rendering those tests always failing while real-world use cases have no problems. - ref) pytest-dev/pytest-asyncio#153 BREAKING-CHANGE: Dropped Python 3.6 support. Now it requires Python 3.7 or higher.
1 parent 1ce0454 commit f930759

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+1389
-1508
lines changed

.github/workflows/lint-flake8.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@ jobs:
99

1010
steps:
1111
- uses: actions/checkout@v1
12-
- name: Set up Python 3.6
12+
- name: Set up Python 3.7
1313
uses: actions/setup-python@v1
1414
with:
15-
python-version: 3.6
15+
python-version: 3.7
1616
- name: Install dependencies
1717
run: |
1818
python -m pip install -U pip setuptools

.github/workflows/typecheck-mypy.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,4 @@ jobs:
2424
else
2525
echo "::add-matcher::.github/workflows/mypy-matcher.json"
2626
fi
27-
python -m mypy --no-color-output src/ai/backend || exit 0
27+
python -m mypy --no-color-output src/ai/backend

.travis.yml

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ stages:
1010

1111
# build matrix for test stage
1212
python:
13-
- "3.6"
1413
- "3.7"
1514
- "3.8"
1615
os:
@@ -47,13 +46,6 @@ jobs:
4746
username: "__token__"
4847
password:
4948
secure: "nXxyiMTQZgdLnpw+hZBm2nHtlMV9prg5bl+3lB4Q/pnWWaW4VvAU6U2Lw/gljAaD3jxOV+RWKOCdt6ZWmQc9M8Fh5mcTlq9IjcMgk0R39onsP7YP7UJUh7saqZG1EkruglCHwCjcz3XwmRyJ+GKIANDH6jRooEmGQt/b8sR0ZIuMxx9ANNPozGEIxcrEqkO2CT1NQzEYc969danjYoyRImDUyDLKTJKd5ZkC7vwmT9z1chm0oxbZMdBJbL26g3TEr7dq1gQAiiLB5lhFVxklWqYlthlWl5qvmtgcn9ZNh1OA2WF8jTwDaafXoYHOotfq82ASRZI3dOckJQM6bsEJEPh5tTIvJJNxMmPTomHCRmc8/sNfOOoPPTLhjXVGE1BxL4u3DXZt0VAw80mkQseXu9wtzNEdZqCxGlSzycyut4cLtXpWXZDN/zqDYczAPUAYeRi2XbxT06OHhczmtn7WPGp2O/HYrXzHrMjAho0tNdch/62hJycEYAMRN0iQSnB2Gs2Ja7h6WUmf6lw2P4qS8gOSKuBJ3Z5Q0glbS2m28oCDZjP6zBqCwYucMZfUqF/aKiVei0NQp1dvjKUBqMJVogesuOAvtDVo+wN3rp2pcTntEKJHqYbNL9fOzwErJM8r/ZUMGC0HkdyTcnPS7uGkRF5WlzFl1cVBNmHzburc+N4="
50-
allow_failures:
51-
- python: "3.8"
52-
os: linux
53-
fast_finish: true
54-
# exclude the duplicate default test stage
55-
exclude:
56-
- python: "3.6"
5749

5850
notifications:
5951
webhooks:

appveyor.yml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
version: 1.0.dev{build}
22
environment:
33
matrix:
4-
- PYTHON: "C:\\Python36"
5-
- PYTHON: "C:\\Python36-x64"
64
- PYTHON: "C:\\Python37"
75
- PYTHON: "C:\\Python37-x64"
86
- PYTHON: "C:\\Python38"

changes/97.breaking

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Drop support for Python 3.6

changes/97.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Support APIv5's new GraphQL schema and kernel/session naming changes

setup.cfg

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,7 @@ norecursedirs = venv virtualenv .git
1313
timeout = 5
1414
markers =
1515
integration: Test cases that require real manager (and agents) to be running on http://localhost:8081.
16+
17+
[mypy]
18+
ignore_missing_imports = true
19+
namespace_packages = true

setup.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
'tabulate~=0.8.6',
2121
'tqdm~=4.42',
2222
'yarl~=1.4.2',
23+
'typing-extensions~=3.7.4',
2324
]
2425
build_requires = [
2526
'wheel>=0.34.2',
@@ -82,7 +83,6 @@ def read_src_version():
8283
'Intended Audience :: Developers',
8384
'Programming Language :: Python',
8485
'Programming Language :: Python :: 3',
85-
'Programming Language :: Python :: 3.6',
8686
'Programming Language :: Python :: 3.7',
8787
'Programming Language :: Python :: 3.8',
8888
'Operating System :: POSIX',
@@ -94,7 +94,7 @@ def read_src_version():
9494
],
9595
package_dir={'': 'src'},
9696
packages=find_namespace_packages(where='src', include='ai.backend.*'),
97-
python_requires='>=3.6',
97+
python_requires='>=3.7',
9898
setup_requires=setup_requires,
9999
install_requires=install_requires,
100100
extras_require={

src/ai/backend/client/cli/admin/images.py

Lines changed: 51 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,22 @@
1+
import json
12
import sys
3+
24
import click
35
from tabulate import tabulate
6+
from tqdm import tqdm
47

58
from . import admin
6-
from ...session import Session
9+
from ...compat import asyncio_run
10+
from ...session import Session, AsyncSession
711
from ..pretty import print_done, print_warn, print_fail, print_error
812

913

1014
@admin.command()
1115
@click.option('--operation', is_flag=True, help='Get operational images only')
12-
def images(operation):
13-
'''
16+
def images(operation: bool) -> None:
17+
"""
1418
Show the list of registered images in this cluster.
15-
'''
19+
"""
1620
fields = [
1721
('Name', 'name'),
1822
('Registry', 'registry'),
@@ -32,33 +36,61 @@ def images(operation):
3236
print_warn('There are no registered images.')
3337
return
3438
print(tabulate((item.values() for item in items),
35-
headers=(item[0] for item in fields),
39+
headers=[item[0] for item in fields],
3640
floatfmt=',.0f'))
3741

3842

3943
@admin.command()
4044
@click.option('-r', '--registry', type=str, default=None,
4145
help='The name (usually hostname or "lablup") '
4246
'of the Docker registry configured.')
43-
def rescan_images(registry):
44-
'''Update the kernel image metadata from all configured docker registries.'''
45-
with Session() as session:
46-
try:
47-
result = session.Image.rescan_images(registry)
48-
except Exception as e:
49-
print_error(e)
50-
sys.exit(1)
51-
if result['ok']:
52-
print_done("Updated the image metadata from the configured registries.")
53-
else:
54-
print_fail(f"Rescanning has failed: {result['msg']}")
47+
def rescan_images(registry: str) -> None:
48+
"""
49+
Update the kernel image metadata from all configured docker registries.
50+
"""
51+
52+
async def rescan_images_impl(registry: str) -> None:
53+
async with AsyncSession() as session:
54+
try:
55+
result = await session.Image.rescan_images(registry)
56+
except Exception as e:
57+
print_error(e)
58+
sys.exit(1)
59+
if not result['ok']:
60+
print_fail(f"Failed to begin registry scanning: {result['msg']}")
61+
sys.exit(1)
62+
print_done("Started updating the image metadata from the configured registries.")
63+
task_id = result['task_id']
64+
bgtask = session.BackgroundTask(task_id)
65+
try:
66+
completion_msg_func = lambda: print_done("Finished registry scanning.")
67+
with tqdm(unit='image') as pbar:
68+
async with bgtask.listen_events() as response:
69+
async for ev in response:
70+
data = json.loads(ev.data)
71+
if ev.event == 'task_updated':
72+
pbar.total = data['total_progress']
73+
pbar.write(data['message'])
74+
pbar.update(data['current_progress'] - pbar.n)
75+
elif ev.event == 'task_failed':
76+
error_msg = data['message']
77+
completion_msg_func = \
78+
lambda: print_fail(f"Error occurred: {error_msg}")
79+
elif ev.event == 'task_cancelled':
80+
completion_msg_func = \
81+
lambda: print_warn("Registry scanning has been "
82+
"cancelled in the middle.")
83+
finally:
84+
completion_msg_func()
85+
86+
asyncio_run(rescan_images_impl(registry))
5587

5688

5789
@admin.command()
5890
@click.argument('alias', type=str)
5991
@click.argument('target', type=str)
6092
def alias_image(alias, target):
61-
'''Add an image alias.'''
93+
"""Add an image alias."""
6294
with Session() as session:
6395
try:
6496
result = session.Image.alias_image(alias, target)
@@ -74,7 +106,7 @@ def alias_image(alias, target):
74106
@admin.command()
75107
@click.argument('alias', type=str)
76108
def dealias_image(alias):
77-
'''Remove an image alias.'''
109+
"""Remove an image alias."""
78110
with Session() as session:
79111
try:
80112
result = session.Image.dealias_image(alias)

src/ai/backend/client/cli/app.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ class ProxyRunnerContext:
128128
protocol: str
129129
host: str
130130
port: int
131-
args: Dict[str, str]
131+
args: Dict[str, Union[None, str, List[str]]]
132132
envs: Dict[str, str]
133133
api_session: Optional[AsyncSession]
134134
local_server: Optional[asyncio.AbstractServer]
@@ -150,7 +150,7 @@ def __init__(self, host: str, port: int,
150150
self.exit_code = 0
151151

152152
self.args, self.envs = {}, {}
153-
if len(args) > 0:
153+
if args is not None and len(args) > 0:
154154
for argline in args:
155155
tokens = []
156156
for token in shlex.shlex(argline,
@@ -168,7 +168,7 @@ def __init__(self, host: str, port: int,
168168
self.args[tokens[0]] = tokens[1]
169169
else:
170170
self.args[tokens[0]] = tokens[1:]
171-
if len(envs) > 0:
171+
if envs is not None and len(envs) > 0:
172172
for envline in envs:
173173
split = envline.strip().split('=', maxsplit=2)
174174
if len(split) == 2:
@@ -178,6 +178,7 @@ def __init__(self, host: str, port: int,
178178

179179
async def handle_connection(self, reader: asyncio.StreamReader,
180180
writer: asyncio.StreamWriter) -> None:
181+
assert self.api_session is not None
181182
p = WSProxy(self.api_session, self.session_name,
182183
self.app_name, self.protocol,
183184
self.args, self.envs,
@@ -232,6 +233,7 @@ async def __aexit__(self, *exc_info) -> None:
232233
print_info("Shutting down....")
233234
self.local_server.close()
234235
await self.local_server.wait_closed()
236+
assert self.api_session is not None
235237
await self.api_session.__aexit__(*exc_info)
236238
assert self.api_session.closed
237239
if self.local_server is not None:

src/ai/backend/client/cli/proxy.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,13 @@
1+
from __future__ import annotations
2+
13
import asyncio
24
import json
35
import re
6+
from typing import (
7+
Union,
8+
Tuple,
9+
AsyncIterator,
10+
)
411

512
import aiohttp
613
from aiohttp import web
@@ -19,6 +26,8 @@ class WebSocketProxy:
1926
'upstream_buffer', 'upstream_buffer_task',
2027
)
2128

29+
upstream_buffer: asyncio.Queue[Tuple[Union[str, bytes], aiohttp.WSMsgType]]
30+
2231
def __init__(self, up_conn: aiohttp.ClientWebSocketResponse,
2332
down_conn: web.WebSocketResponse):
2433
self.up_conn = up_conn
@@ -182,18 +191,15 @@ async def websocket_handler(request):
182191
reason="Internal Server Error")
183192

184193

185-
async def startup_proxy(app):
194+
async def proxy_context(app: web.Application) -> AsyncIterator[None]:
186195
app['client_session'] = AsyncSession()
187-
188-
189-
async def cleanup_proxy(app):
190-
await app['client_session'].close()
196+
async with app['client_session']:
197+
yield
191198

192199

193200
def create_proxy_app():
194201
app = web.Application()
195-
app.on_startup.append(startup_proxy)
196-
app.on_cleanup.append(cleanup_proxy)
202+
app.cleanup_ctx.append(proxy_context)
197203

198204
app.router.add_route("GET", r'/stream/{path:.*$}', websocket_handler)
199205
app.router.add_route("GET", r'/wsproxy/{path:.*$}', websocket_handler)

src/ai/backend/client/cli/run.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
from ..compat import asyncio_run, current_loop
2323
from ..exceptions import BackendError, BackendAPIError
2424
from ..session import Session, AsyncSession, is_legacy_server
25-
from ..utils import undefined
25+
from ..types import undefined
2626
from .pretty import (
2727
print_info, print_wait, print_done, print_error, print_fail, print_warn,
2828
format_info,
@@ -881,7 +881,7 @@ def start(image, name, owner, # base args
881881
help='Set the owner of the target session explicitly.')
882882
# job scheduling options
883883
@click.option('--type', 'type_', metavar='SESSTYPE',
884-
type=click.Choice(['batch', 'interactive', undefined]),
884+
type=click.Choice(['batch', 'interactive', undefined]), # type: ignore
885885
default=undefined,
886886
help='Either batch or interactive')
887887
@click.option('-i', '--image', default=undefined,
@@ -1145,9 +1145,9 @@ def events(name, owner_access_key):
11451145
async def _run_events():
11461146
async with AsyncSession() as session:
11471147
compute_session = session.ComputeSession(name, owner_access_key)
1148-
async with compute_session.stream_events() as sse_response:
1149-
async for ev in sse_response.fetch_events():
1150-
print(click.style(ev['event'], fg='cyan', bold=True), json.loads(ev['data']))
1148+
async with compute_session.listen_events() as response:
1149+
async for ev in response:
1150+
print(click.style(ev.event, fg='cyan', bold=True), json.loads(ev.data))
11511151

11521152
try:
11531153
asyncio_run(_run_events())

src/ai/backend/client/compat.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,16 @@
1-
'''
1+
"""
22
A compatibility module for backported codes from Python 3.6+ standard library.
3-
'''
3+
"""
44

55
import asyncio
66

7+
__all__ = (
8+
'current_loop',
9+
'all_tasks',
10+
'asyncio_run',
11+
'asyncio_run_forever',
12+
)
13+
714

815
if hasattr(asyncio, 'get_running_loop'): # Python 3.7+
916
current_loop = asyncio.get_running_loop
@@ -60,11 +67,11 @@ def _asyncio_run(coro, *, debug=False):
6067

6168

6269
def asyncio_run_forever(server_context, *, debug=False):
63-
'''
70+
"""
6471
A proposed-but-not-implemented asyncio.run_forever() API based on
6572
@vxgmichel's idea.
6673
See discussions on https://github.com/python/asyncio/pull/465
67-
'''
74+
"""
6875
loop = asyncio.new_event_loop()
6976
asyncio.set_event_loop(loop)
7077
loop.set_debug(debug)

0 commit comments

Comments
 (0)