Skip to content

ENH: Implement to_iceberg #61507

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion doc/source/getting_started/install.rst
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ Dependency Minimum Version pip ex
`zlib <https://github.com/madler/zlib>`__ hdf5 Compression for HDF5
`fastparquet <https://github.com/dask/fastparquet>`__ 2024.2.0 - Parquet reading / writing (pyarrow is default)
`pyarrow <https://github.com/apache/arrow>`__ 10.0.1 parquet, feather Parquet, ORC, and feather reading / writing
`PyIceberg <https://py.iceberg.apache.org/>`__ 0.7.1 iceberg Apache Iceberg reading
`PyIceberg <https://py.iceberg.apache.org/>`__ 0.7.1 iceberg Apache Iceberg reading / writing
`pyreadstat <https://github.com/Roche/pyreadstat>`__ 1.2.6 spss SPSS files (.sav) reading
`odfpy <https://github.com/eea/odfpy>`__ 1.4.1 excel Open document format (.odf, .ods, .odt) reading / writing
====================================================== ================== ================ ==========================================================
Expand Down
1 change: 1 addition & 0 deletions doc/source/reference/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ Iceberg
:toctree: api/

read_iceberg
DataFrame.to_iceberg

.. warning:: ``read_iceberg`` is experimental and may change without warning.

Expand Down
2 changes: 1 addition & 1 deletion doc/source/user_guide/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ The pandas I/O API is a set of top level ``reader`` functions accessed like
binary,`HDF5 Format <https://support.hdfgroup.org/documentation/hdf5/latest/_intro_h_d_f5.html>`__, :ref:`read_hdf<io.hdf5>`, :ref:`to_hdf<io.hdf5>`
binary,`Feather Format <https://github.com/wesm/feather>`__, :ref:`read_feather<io.feather>`, :ref:`to_feather<io.feather>`
binary,`Parquet Format <https://parquet.apache.org/>`__, :ref:`read_parquet<io.parquet>`, :ref:`to_parquet<io.parquet>`
binary,`Apache Iceberg <https://iceberg.apache.org/>`__, :ref:`read_iceberg<io.iceberg>` , NA
binary,`Apache Iceberg <https://iceberg.apache.org/>`__, :ref:`read_iceberg<io.iceberg>` , :ref:`to_iceberg<io.iceberg>`
binary,`ORC Format <https://orc.apache.org/>`__, :ref:`read_orc<io.orc>`, :ref:`to_orc<io.orc>`
binary,`Stata <https://en.wikipedia.org/wiki/Stata>`__, :ref:`read_stata<io.stata_reader>`, :ref:`to_stata<io.stata_writer>`
binary,`SAS <https://en.wikipedia.org/wiki/SAS_(software)>`__, :ref:`read_sas<io.sas_reader>` , NA
Expand Down
2 changes: 1 addition & 1 deletion doc/source/whatsnew/v3.0.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ Other enhancements
- :py:class:`frozenset` elements in pandas objects are now natively printed (:issue:`60690`)
- Add ``"delete_rows"`` option to ``if_exists`` argument in :meth:`DataFrame.to_sql` deleting all records of the table before inserting data (:issue:`37210`).
- Added half-year offset classes :class:`HalfYearBegin`, :class:`HalfYearEnd`, :class:`BHalfYearBegin` and :class:`BHalfYearEnd` (:issue:`60928`)
- Added support to read from Apache Iceberg tables with the new :func:`read_iceberg` function (:issue:`61383`)
- Added support to read and write from and to Apache Iceberg tables with the new :func:`read_iceberg` and :meth:`DataFrame.to_iceberg` functions (:issue:`61383`)
- Errors occurring during SQL I/O will now throw a generic :class:`.DatabaseError` instead of the raw Exception type from the underlying driver manager library (:issue:`60748`)
- Implemented :meth:`Series.str.isascii` and :meth:`Series.str.isascii` (:issue:`59091`)
- Improved deprecation message for offset aliases (:issue:`60820`)
Expand Down
48 changes: 48 additions & 0 deletions pandas/core/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -3547,6 +3547,54 @@ def to_xml(

return xml_formatter.write_output()

def to_iceberg(
self,
table_identifier: str,
catalog_name: str | None = None,
*,
catalog_properties: dict[str, Any] | None = None,
location: str | None = None,
snapshot_properties: dict[str, str] | None = None,
) -> None:
"""
Write a DataFrame to an Apache Iceberg table.

.. versionadded:: 3.0.0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add an experimental tag to this API as well like we did with read_iceberg?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely, I forgot about that. Added it now. I also expanded the user guide docs of iceberg with to_iceberg, which I also had forgotten. Thanks for the review and the feedback!


Parameters
----------
table_identifier : str
Table identifier.
catalog_name : str, optional
The name of the catalog.
catalog_properties : dict of {str: str}, optional
The properties that are used next to the catalog configuration.
location : str, optional
Location for the table.
snapshot_properties : dict of {str: str}, optional
Custom properties to be added to the snapshot summary

See Also
--------
read_iceberg : Read an Apache Iceberg table.
DataFrame.to_parquet : Write a DataFrame in Parquet format.

Examples
--------
>>> df = pd.DataFrame(data={"col1": [1, 2], "col2": [4, 3]})
>>> df.to_iceberg("my_table", catalog_name="my_catalog") # doctest: +SKIP
"""
from pandas.io.iceberg import to_iceberg

to_iceberg(
self,
table_identifier,
catalog_name,
catalog_properties=catalog_properties,
location=location,
snapshot_properties=snapshot_properties,
)

# ----------------------------------------------------------------------
@doc(INFO_DOCSTRING, **frame_sub_kwargs)
def info(
Expand Down
52 changes: 51 additions & 1 deletion pandas/io/iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
def read_iceberg(
table_identifier: str,
catalog_name: str | None = None,
*,
catalog_properties: dict[str, Any] | None = None,
row_filter: str | None = None,
selected_fields: tuple[str] | None = None,
Expand Down Expand Up @@ -71,7 +72,6 @@ def read_iceberg(
"""
pyiceberg_catalog = import_optional_dependency("pyiceberg.catalog")
pyiceberg_expressions = import_optional_dependency("pyiceberg.expressions")

if catalog_properties is None:
catalog_properties = {}
catalog = pyiceberg_catalog.load_catalog(catalog_name, **catalog_properties)
Expand All @@ -91,3 +91,53 @@ def read_iceberg(
limit=limit,
)
return result.to_pandas()


def to_iceberg(
df: DataFrame,
table_identifier: str,
catalog_name: str | None = None,
*,
catalog_properties: dict[str, Any] | None = None,
location: str | None = None,
snapshot_properties: dict[str, str] | None = None,
Copy link

@IsaacWarren IsaacWarren Jun 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any thoughts on adding append to match to_parquet? Something like

append: bool = False

Then this could default to table.overwrite instead of append. I think it might be confusing if this doesn't match other to_* functions

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does PyIceberg support it?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the table.overwrite method

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course, I didn't think about it. I'll add it, thanks for the feedback.

) -> None:
"""
Write a DataFrame to an Apache Iceberg table.

.. versionadded:: 3.0.0

Parameters
----------
table_identifier : str
Table identifier.
catalog_name : str, optional
The name of the catalog.
catalog_properties : dict of {str: str}, optional
The properties that are used next to the catalog configuration.
location : str, optional
Location for the table.
snapshot_properties : dict of {str: str}, optional
Custom properties to be added to the snapshot summary

See Also
--------
read_iceberg : Read an Apache Iceberg table.
DataFrame.to_parquet : Write a DataFrame in Parquet format.
"""
pa = import_optional_dependency("pyarrow")
pyiceberg_catalog = import_optional_dependency("pyiceberg.catalog")
if catalog_properties is None:
catalog_properties = {}
catalog = pyiceberg_catalog.load_catalog(catalog_name, **catalog_properties)
arrow_table = pa.Table.from_pandas(df)
table = catalog.create_table_if_not_exists(
identifier=table_identifier,
schema=arrow_table.schema,
location=location,
# we could add `partition_spec`, `sort_order` and `properties` in the

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I definitely think these would be great to have but I don't really have any ideas on how to do it without just using PyIceberg objects

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding them later is easy if we think of a good signature. That's why I didn't worry too much about adding them.

# future, but it may not be trivial without exposing PyIceberg objects
)
if snapshot_properties is None:
snapshot_properties = {}
table.append(arrow_table, snapshot_properties=snapshot_properties)
63 changes: 61 additions & 2 deletions pandas/tests/io/test_iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
pyiceberg_catalog = pytest.importorskip("pyiceberg.catalog")
pq = pytest.importorskip("pyarrow.parquet")

Catalog = collections.namedtuple("Catalog", ["name", "uri"])
Catalog = collections.namedtuple("Catalog", ["name", "uri", "warehouse"])


@pytest.fixture
Expand Down Expand Up @@ -58,7 +58,7 @@ def catalog(request, tmp_path):

importlib.reload(pyiceberg_catalog) # needed to reload the config file

yield Catalog(name=catalog_name or "default", uri=uri)
yield Catalog(name=catalog_name or "default", uri=uri, warehouse=warehouse)

if catalog_name is not None:
config_path.unlink()
Expand Down Expand Up @@ -141,3 +141,62 @@ def test_read_with_limit(self, catalog):
limit=2,
)
tm.assert_frame_equal(result, expected)

def test_write(self, catalog):
df = pd.DataFrame(
{
"A": [1, 2, 3],
"B": ["foo", "foo", "foo"],
}
)
df.to_iceberg(
"ns.new_table",
catalog_properties={"uri": catalog.uri},
location=catalog.warehouse,
)
result = read_iceberg(
"ns.new_table",
catalog_properties={"uri": catalog.uri},
)
tm.assert_frame_equal(result, df)

@pytest.mark.parametrize("catalog", ["default", "pandas_tests"], indirect=True)
def test_write_by_catalog_name(self, catalog):
df = pd.DataFrame(
{
"A": [1, 2, 3],
"B": ["foo", "foo", "foo"],
}
)
df.to_iceberg(
"ns.new_table",
catalog_name=catalog.name,
)
result = read_iceberg(
"ns.new_table",
catalog_name=catalog.name,
)
tm.assert_frame_equal(result, df)

def test_write_existing_table(self, catalog):
original = read_iceberg(
"ns.my_table",
catalog_properties={"uri": catalog.uri},
)
new = pd.DataFrame(
{
"A": [1, 2, 3],
"B": ["foo", "foo", "foo"],
}
)
expected = pd.concat([original, new], ignore_index=True)
new.to_iceberg(
"ns.my_table",
catalog_properties={"uri": catalog.uri},
location=catalog.warehouse,
)
result = read_iceberg(
"ns.my_table",
catalog_properties={"uri": catalog.uri},
)
tm.assert_frame_equal(result, expected)
Loading