Skip to content

RuntimeError(“Task … running at … got Future … attached to a different loop” #207

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
david-shiko opened this issue Apr 16, 2021 · 9 comments

Comments

@david-shiko
Copy link

david-shiko commented Apr 16, 2021

How to fix an error?

Test file var 1

import pytest
from httpx import AsyncClient
import main
from tests import conftest


@pytest.mark.asyncio()
async def test_root():
    async with AsyncClient(app=main.app, base_url="http://test") as ac:
        response = await ac.post("/register", json={
            "phone_number": "+7 931 964 0000",
            "full_name": "Чехов Антон Павлович"
        })
    assert response.status_code == 201

test result var 1

$ python -m pytest tests/main_test.py 
============================================================================================================================================== test session starts ===============================================================================================================================================
platform linux -- Python 3.8.5, pytest-6.2.3, py-1.10.0, pluggy-0.13.1
rootdir: /home/david/PycharmProjects/work
plugins: asyncio-0.14.0
collected 1 item                                                                                                                                                                                                                                                                                                 

tests/main_test.py 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> PDB set_trace (IO-capturing turned off) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> /home/david/PycharmProjects/work/routers/auth.py(35)register()
-> services.error_handler(error)
(Pdb) ll
 18     @router.post("/register", status_code=201)
 19     async def register(
 20             user: schemas.UserCreate,
 21             postgres_session: database.SessionLocal = Depends(database.get_db)):
 22         try:
 23             if not await postgres_session.execute(statement=models.users.select().where(
 24                     models.users.c.phone_number == user.phone_number)):  # If user not exists yet:
 25                 secret_code = services.create_secret_code()
 26                 # await services.send_sms_code(phone_number=user.phone_number, secret_code=secret_code)
 27                 print(secret_code)
 28                 database.redis_full_names.set(name=user.phone_number, value=user.full_name, )
 29                 database.redis_secret_codes.set(name=secret_code, value=user.phone_number, )
 30                 return f'Данные для входа отправлены на номер {user.phone_number}'
 31             else:
 32                 raise config.fastapi_http_errors['already_exists_409']
 33         except Exception as error:
 34             pdb.set_trace()
 35  ->         services.error_handler(error)
(Pdb) error
RuntimeError("Task <Task pending name='Task-5' coro=<test_root() running at /home/david/PycharmProjects/work/tests/main_test.py:10> cb=[_run_until_complete_cb() at /usr/lib/python3.8/asyncio/base_events.py:184]> got Future <Future pending cb=[Protocol._on_waiter_completed()]> attached to a different loop")
(Pdb) 

test file var 2 (conftest.event_loop userd)

import pytest
from httpx import AsyncClient
import main
from tests import conftest


@pytest.mark.asyncio()
async def test_root():
    async with AsyncClient(app=main.app, base_url="http://test") as ac:
        response = conftest.event_loop(await ac.post("/register", json={
            "phone_number": "+7 931 964 0000",
            "full_name": "Чехов Антон Павлович"
        }))
    assert response.status_code == 201

test result var 2

# Function is the same
RuntimeError("Task <Task pending name='Task-5' coro=<test_root() running at /home/david/PycharmProjects/work/tests/main_test.py:10> cb=[_run_until_complete_cb() at /usr/lib/python3.8/asyncio/base_events.py:184]> got Future <Future pending cb=[Protocol._on_waiter_completed()]> attached to a different loop")

Main.py file

import fastapi
import uvicorn
from fastapi.middleware.cors import CORSMiddleware
# MY MODULES
import config
from routers import users, auth, claims, tickets, operators, assignments, clinics
from services import verify_credentials as services_verify_credentials
import database


app = fastapi.FastAPI()


app.add_middleware(
    middleware_class=CORSMiddleware,
    allow_origins=config.origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

app.debug = False


@app.on_event("startup")
async def startup_event():
    async with database.engine.begin() as conn:
        await conn.run_sync(database.Base.metadata.create_all)


if __name__ == "__main__" or 1:
    app.include_router(
        router=users.router,
        tags=["users"],
        prefix="/users",
    )

    app.include_router(
        router=auth.router,
        tags=["auth"],
    )

    many similar routers...

    uvicorn.run(app, host="0.0.0.0", port=8000)
@Tinche
Copy link
Member

Tinche commented Apr 16, 2021

@david-shiko I'm going to need main.py too, the simplest version that triggers it

@david-shiko
Copy link
Author

@Tinche Added to question

@david-shiko
Copy link
Author

@Tinche Do you will fix it or any solution? I jsut waiting to apply this in my work :)

@xnuinside
Copy link

must be @pytest.mark.asyncio not @pytest.mark.asyncio() as in your sample

@xnuinside
Copy link

and 'if name == "main" or 1:' mean code below will be always executed, that mean that you run uvicorn server during tests (when main imported in tests), this is why you get second event loop and this error

@david-shiko
Copy link
Author

Solution:

  1. Remove or from if __name__ == ... Because an app should run a test.
  2. Change @pytest.mark.asyncio() to @pytest.mark.asyncio.
  3. Move routers out of if branch in main file (otherwise app will be created automatically by tests without routers.

@Tinche
Copy link
Member

Tinche commented Apr 21, 2021

@david-shiko I see someone got to this before me, did it solve your issue?

@eboddington
Copy link

eboddington commented Oct 14, 2021

I am getting the same error message, so hopefully it's okay to continue this thread and add my example (let me know if I should create a new issue instead).

Below is the minimum example I've been able to reproduce it with. The interesting thing is that if I only have one test (e.g. comment out test_hello_world) then it passes, leading me to believe that it's some kind of issue with resources not being cleaned up between tests.

main.py:

from functools import lru_cache

from fastapi import FastAPI, Depends
from sqlalchemy import Column, Integer, String, select
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import declarative_base, sessionmaker

app = FastAPI()

Base = declarative_base()


class Foo(Base):
    __tablename__ = "foo"
    id = Column(Integer, primary_key=True)
    data = Column(String)


@lru_cache()
def get_engine():
    engine = create_async_engine(
        "postgresql+asyncpg://postgres:postgres@localhost/postgres", echo=True, future=True
    )
    return engine


@lru_cache
def get_session_maker():
    async_session = sessionmaker(
        get_engine(), expire_on_commit=False, class_=AsyncSession
    )
    return async_session


@app.get("/hello")
async def hello_world():
    return "Hello, world!"


@app.get("/foo")
async def get_foo(async_session=Depends(get_session_maker)):
    async with async_session() as session:
        session.add(Foo(data="test"))
        await session.commit()

        result = await session.execute(select(Foo).where(Foo.data == "test"))
        foo = result.scalar_one()

    return foo.id

test_main.py:

import pytest
from httpx import AsyncClient

from main import app, Base, get_engine


@pytest.fixture()
async def client() -> AsyncClient:
    engine = get_engine()
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)
        await conn.run_sync(Base.metadata.create_all)

    async with AsyncClient(app=app, base_url="http://test") as c:
        yield c


pytestmark = pytest.mark.asyncio


async def test_hello_world(client):
    r = await client.get("/hello")

    assert r.status_code == 200
    assert r.json() == "Hello, world!"


async def test_foo(client):
    r = await client.get("/foo")

    assert r.status_code == 200
    assert r.json() == 1

Edit

Fixed by adding a fixture to create an event loop:

@pytest.fixture(scope="session")
def event_loop() -> Generator:
    loop = asyncio.get_event_loop_policy().new_event_loop()
    yield loop
    loop.close()

Thanks to this blog post!

@asvetlov asvetlov closed this as completed Jan 8, 2022
@asvetlov
Copy link
Contributor

asvetlov commented Jan 8, 2022

Congratulations!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants