-
-
Notifications
You must be signed in to change notification settings - Fork 18.4k
QST: How can I use df.to_sql() with SQLAlchemy AsyncSession/AsyncEgine #51633
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
from __future__ import annotations
from asyncio import current_task, run
from contextlib import asynccontextmanager
from typing import TYPE_CHECKING
import pandas as pd
from sqlalchemy.ext.asyncio import (
AsyncSession,
async_scoped_session,
create_async_engine,
)
from sqlalchemy.ext.automap import AutomapBase, automap_base
from sqlalchemy.orm import sessionmaker
if TYPE_CHECKING:
from collections.abc import AsyncGenerator
automap: AutomapBase = automap_base()
engine = create_async_engine(...)
maker = sessionmaker(engine, class_=AsyncSession)
scope = async_scoped_session(maker, current_task)
chunk_size: int = ...
tablename: str = ...
data = pd.DataFrame(...)
@asynccontextmanager
async def get_session() -> AsyncGenerator[AsyncSession, None]:
async with scope() as session:
yield session
async def main():
async with get_session() as session:
conn = await session.connection()
await conn.run_sync(
lambda sync_conn: data.to_sql(
tablename,
con=sync_conn,
...,
),
)
await session.commit()
if __name__ == "__main__":
run(main())
from __future__ import annotations
from asyncio import current_task, run
from contextlib import asynccontextmanager
from typing import TYPE_CHECKING
import numpy as np
import pandas as pd
from sqlalchemy import insert
from sqlalchemy.ext.asyncio import (
AsyncSession,
async_scoped_session,
create_async_engine,
)
from sqlalchemy.ext.automap import AutomapBase, automap_base
from sqlalchemy.orm import sessionmaker
if TYPE_CHECKING:
from collections.abc import AsyncGenerator
automap: AutomapBase = automap_base()
engine = create_async_engine(...)
maker = sessionmaker(engine, class_=AsyncSession)
scope = async_scoped_session(maker, current_task)
chunk_size: int = ...
tablename: str = ...
data = pd.DataFrame(...)
@asynccontextmanager
async def get_session() -> AsyncGenerator[AsyncSession, None]:
async with scope() as session:
yield session
async def main():
async with get_session() as session:
conn = await session.connection()
await conn.run_sync(lambda sync_conn: automap.prepare(autoload_with=sync_conn))
mapper = getattr(automap.classes, tablename)
stmt = insert(mapper)
for _, group in data.groupby(np.arange(data.shape[0], dtype=int) // chunk_size):
await session.execute(stmt, group.to_dict("records"))
await session.commit()
if __name__ == "__main__":
run(main()) Using |
@phi-friday thanks. It works for me |
Would this still be the case with the updates to the project dependencies over the past year? Thanks again for all the work here! |
Research
I have searched the [pandas] tag on StackOverflow for similar questions.
I have asked my usage related question on StackOverflow.
Link to question on StackOverflow
https://stackoverflow.com/questions/75566585/how-can-i-use-df-to-sql-with-sqlalchemy-asyncsession-asyncegine
Question about pandas
Expecting similar things where engine is AsyncEngine
The text was updated successfully, but these errors were encountered: