Skip to content

QST: How can I use df.to_sql() with SQLAlchemy AsyncSession/AsyncEgine #51633

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
2 tasks done
lamhktommy opened this issue Feb 25, 2023 · 3 comments
Closed
2 tasks done
Labels
Needs Triage Issue that has not been reviewed by a pandas team member Usage Question

Comments

@lamhktommy
Copy link

lamhktommy commented Feb 25, 2023

Research

  • I have searched the [pandas] tag on StackOverflow for similar questions.

  • I have asked my usage related question on StackOverflow.

Link to question on StackOverflow

https://stackoverflow.com/questions/75566585/how-can-i-use-df-to-sql-with-sqlalchemy-asyncsession-asyncegine

Question about pandas

df.to_sql("temp_data", engine)

Expecting similar things where engine is AsyncEngine

@lamhktommy lamhktommy added Needs Triage Issue that has not been reviewed by a pandas team member Usage Question labels Feb 25, 2023
@phi-friday
Copy link

pandas support for sqlachemy is implemented based on a fairly old version. Therefore, it is not completely compatible with recently added features. However, you can try the following ways:

  1. use to_sql
from __future__ import annotations

from asyncio import current_task, run
from contextlib import asynccontextmanager
from typing import TYPE_CHECKING

import pandas as pd
from sqlalchemy.ext.asyncio import (
    AsyncSession,
    async_scoped_session,
    create_async_engine,
)
from sqlalchemy.ext.automap import AutomapBase, automap_base
from sqlalchemy.orm import sessionmaker

if TYPE_CHECKING:
    from collections.abc import AsyncGenerator

automap: AutomapBase = automap_base()
engine = create_async_engine(...)
maker = sessionmaker(engine, class_=AsyncSession)
scope = async_scoped_session(maker, current_task)

chunk_size: int = ...
tablename: str = ...
data = pd.DataFrame(...)


@asynccontextmanager
async def get_session() -> AsyncGenerator[AsyncSession, None]:
    async with scope() as session:
        yield session


async def main():
    async with get_session() as session:
        conn = await session.connection()
        await conn.run_sync(
            lambda sync_conn: data.to_sql(
                tablename,
                con=sync_conn,
                ...,
            ),
        )
        await session.commit()


if __name__ == "__main__":
    run(main())
  1. use execute
from __future__ import annotations

from asyncio import current_task, run
from contextlib import asynccontextmanager
from typing import TYPE_CHECKING

import numpy as np
import pandas as pd
from sqlalchemy import insert
from sqlalchemy.ext.asyncio import (
    AsyncSession,
    async_scoped_session,
    create_async_engine,
)
from sqlalchemy.ext.automap import AutomapBase, automap_base
from sqlalchemy.orm import sessionmaker

if TYPE_CHECKING:
    from collections.abc import AsyncGenerator

automap: AutomapBase = automap_base()
engine = create_async_engine(...)
maker = sessionmaker(engine, class_=AsyncSession)
scope = async_scoped_session(maker, current_task)

chunk_size: int = ...
tablename: str = ...
data = pd.DataFrame(...)


@asynccontextmanager
async def get_session() -> AsyncGenerator[AsyncSession, None]:
    async with scope() as session:
        yield session


async def main():
    async with get_session() as session:
        conn = await session.connection()
        await conn.run_sync(lambda sync_conn: automap.prepare(autoload_with=sync_conn))
        mapper = getattr(automap.classes, tablename)
        stmt = insert(mapper)

        for _, group in data.groupby(np.arange(data.shape[0], dtype=int) // chunk_size):
            await session.execute(stmt, group.to_dict("records"))
        await session.commit()


if __name__ == "__main__":
    run(main())

Using to_sql is a familiar method, but sqlalchemy>2 is not available due to a bug #51015.

@lamhktommy
Copy link
Author

@phi-friday thanks. It works for me

@ryanmerolle
Copy link

Would this still be the case with the updates to the project dependencies over the past year? Thanks again for all the work here!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Needs Triage Issue that has not been reviewed by a pandas team member Usage Question
Projects
None yet
Development

No branches or pull requests

3 participants