Skip to content

ENH: Implement to_iceberg #61507

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion doc/source/getting_started/install.rst
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ Dependency Minimum Version pip ex
`zlib <https://github.com/madler/zlib>`__ hdf5 Compression for HDF5
`fastparquet <https://github.com/dask/fastparquet>`__ 2024.2.0 - Parquet reading / writing (pyarrow is default)
`pyarrow <https://github.com/apache/arrow>`__ 10.0.1 parquet, feather Parquet, ORC, and feather reading / writing
`PyIceberg <https://py.iceberg.apache.org/>`__ 0.7.1 iceberg Apache Iceberg reading
`PyIceberg <https://py.iceberg.apache.org/>`__ 0.7.1 iceberg Apache Iceberg reading / writing
`pyreadstat <https://github.com/Roche/pyreadstat>`__ 1.2.6 spss SPSS files (.sav) reading
`odfpy <https://github.com/eea/odfpy>`__ 1.4.1 excel Open document format (.odf, .ods, .odt) reading / writing
====================================================== ================== ================ ==========================================================
Expand Down
1 change: 1 addition & 0 deletions doc/source/reference/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ Iceberg
:toctree: api/

read_iceberg
DataFrame.to_iceberg

.. warning:: ``read_iceberg`` is experimental and may change without warning.

Expand Down
27 changes: 25 additions & 2 deletions doc/source/user_guide/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ The pandas I/O API is a set of top level ``reader`` functions accessed like
binary,`HDF5 Format <https://support.hdfgroup.org/documentation/hdf5/latest/_intro_h_d_f5.html>`__, :ref:`read_hdf<io.hdf5>`, :ref:`to_hdf<io.hdf5>`
binary,`Feather Format <https://github.com/wesm/feather>`__, :ref:`read_feather<io.feather>`, :ref:`to_feather<io.feather>`
binary,`Parquet Format <https://parquet.apache.org/>`__, :ref:`read_parquet<io.parquet>`, :ref:`to_parquet<io.parquet>`
binary,`Apache Iceberg <https://iceberg.apache.org/>`__, :ref:`read_iceberg<io.iceberg>` , NA
binary,`Apache Iceberg <https://iceberg.apache.org/>`__, :ref:`read_iceberg<io.iceberg>` , :ref:`to_iceberg<io.iceberg>`
binary,`ORC Format <https://orc.apache.org/>`__, :ref:`read_orc<io.orc>`, :ref:`to_orc<io.orc>`
binary,`Stata <https://en.wikipedia.org/wiki/Stata>`__, :ref:`read_stata<io.stata_reader>`, :ref:`to_stata<io.stata_writer>`
binary,`SAS <https://en.wikipedia.org/wiki/SAS_(software)>`__, :ref:`read_sas<io.sas_reader>` , NA
Expand Down Expand Up @@ -5417,7 +5417,7 @@ engines to safely work with the same tables at the same time.

Iceberg support predicate pushdown and column pruning, which are available to pandas
users via the ``row_filter`` and ``selected_fields`` parameters of the :func:`~pandas.read_iceberg`
function. This is convenient to extract from large tables a subset that fits in memory asa
function. This is convenient to extract from large tables a subset that fits in memory as a
pandas ``DataFrame``.

Internally, pandas uses PyIceberg_ to query Iceberg.
Expand Down Expand Up @@ -5497,6 +5497,29 @@ parameter:
Reading a particular snapshot is also possible providing the snapshot ID as an argument to
``snapshot_id``.

To save a ``DataFrame`` to Iceberg, it can be done with the :meth:`DataFrame.to_iceberg`
method:

.. code-block:: python

df.to_iceberg("my_table", catalog_name="my_catalog")

To specify the catalog, it works in the same way as for :func:`read_iceberg` with the
``catalog_name`` and ``catalog_properties`` parameters.

The location of the table can be specified with the ``location`` parameter:

.. code-block:: python

df.to_iceberg(
"my_table",
catalog_name="my_catalog",
location="s://my-data-lake/my-iceberg-tables",
)

It is possible to add properties to the table snapshot by passing a dictionary to the
``snapshot_properties`` parameter.

More information about the Iceberg format can be found in the `Apache Iceberg official
page <https://iceberg.apache.org/>`__.

Expand Down
2 changes: 1 addition & 1 deletion doc/source/whatsnew/v3.0.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ Other enhancements
- :py:class:`frozenset` elements in pandas objects are now natively printed (:issue:`60690`)
- Add ``"delete_rows"`` option to ``if_exists`` argument in :meth:`DataFrame.to_sql` deleting all records of the table before inserting data (:issue:`37210`).
- Added half-year offset classes :class:`HalfYearBegin`, :class:`HalfYearEnd`, :class:`BHalfYearBegin` and :class:`BHalfYearEnd` (:issue:`60928`)
- Added support to read from Apache Iceberg tables with the new :func:`read_iceberg` function (:issue:`61383`)
- Added support to read and write from and to Apache Iceberg tables with the new :func:`read_iceberg` and :meth:`DataFrame.to_iceberg` functions (:issue:`61383`)
- Errors occurring during SQL I/O will now throw a generic :class:`.DatabaseError` instead of the raw Exception type from the underlying driver manager library (:issue:`60748`)
- Implemented :meth:`Series.str.isascii` and :meth:`Series.str.isascii` (:issue:`59091`)
- Improved deprecation message for offset aliases (:issue:`60820`)
Expand Down
56 changes: 56 additions & 0 deletions pandas/core/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -3547,6 +3547,62 @@ def to_xml(

return xml_formatter.write_output()

def to_iceberg(
self,
table_identifier: str,
catalog_name: str | None = None,
*,
catalog_properties: dict[str, Any] | None = None,
location: str | None = None,
append: bool = False,
snapshot_properties: dict[str, str] | None = None,
) -> None:
"""
Write a DataFrame to an Apache Iceberg table.

.. versionadded:: 3.0.0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add an experimental tag to this API as well like we did with read_iceberg?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely, I forgot about that. Added it now. I also expanded the user guide docs of iceberg with to_iceberg, which I also had forgotten. Thanks for the review and the feedback!


.. warning::

to_iceberg is experimental and may change without warning.

Parameters
----------
table_identifier : str
Table identifier.
catalog_name : str, optional
The name of the catalog.
catalog_properties : dict of {str: str}, optional
The properties that are used next to the catalog configuration.
location : str, optional
Location for the table.
append : bool, default False
If ``True``, append data to the table, instead of replacing the content.
snapshot_properties : dict of {str: str}, optional
Custom properties to be added to the snapshot summary

See Also
--------
read_iceberg : Read an Apache Iceberg table.
DataFrame.to_parquet : Write a DataFrame in Parquet format.

Examples
--------
>>> df = pd.DataFrame(data={"col1": [1, 2], "col2": [4, 3]})
>>> df.to_iceberg("my_table", catalog_name="my_catalog") # doctest: +SKIP
"""
from pandas.io.iceberg import to_iceberg

to_iceberg(
self,
table_identifier,
catalog_name,
catalog_properties=catalog_properties,
location=location,
append=append,
snapshot_properties=snapshot_properties,
)

# ----------------------------------------------------------------------
@doc(INFO_DOCSTRING, **frame_sub_kwargs)
def info(
Expand Down
60 changes: 59 additions & 1 deletion pandas/io/iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
def read_iceberg(
table_identifier: str,
catalog_name: str | None = None,
*,
catalog_properties: dict[str, Any] | None = None,
row_filter: str | None = None,
selected_fields: tuple[str] | None = None,
Expand All @@ -21,6 +22,8 @@ def read_iceberg(
"""
Read an Apache Iceberg table into a pandas DataFrame.

.. versionadded:: 3.0.0

.. warning::

read_iceberg is experimental and may change without warning.
Expand Down Expand Up @@ -71,7 +74,6 @@ def read_iceberg(
"""
pyiceberg_catalog = import_optional_dependency("pyiceberg.catalog")
pyiceberg_expressions = import_optional_dependency("pyiceberg.expressions")

if catalog_properties is None:
catalog_properties = {}
catalog = pyiceberg_catalog.load_catalog(catalog_name, **catalog_properties)
Expand All @@ -91,3 +93,59 @@ def read_iceberg(
limit=limit,
)
return result.to_pandas()


def to_iceberg(
df: DataFrame,
table_identifier: str,
catalog_name: str | None = None,
*,
catalog_properties: dict[str, Any] | None = None,
location: str | None = None,
append: bool = False,
snapshot_properties: dict[str, str] | None = None,
Copy link

@IsaacWarren IsaacWarren Jun 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any thoughts on adding append to match to_parquet? Something like

append: bool = False

Then this could default to table.overwrite instead of append. I think it might be confusing if this doesn't match other to_* functions

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does PyIceberg support it?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the table.overwrite method

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course, I didn't think about it. I'll add it, thanks for the feedback.

) -> None:
"""
Write a DataFrame to an Apache Iceberg table.

.. versionadded:: 3.0.0

Parameters
----------
table_identifier : str
Table identifier.
catalog_name : str, optional
The name of the catalog.
catalog_properties : dict of {str: str}, optional
The properties that are used next to the catalog configuration.
location : str, optional
Location for the table.
append : bool, default False
If ``True``, append data to the table, instead of replacing the content.
snapshot_properties : dict of {str: str}, optional
Custom properties to be added to the snapshot summary

See Also
--------
read_iceberg : Read an Apache Iceberg table.
DataFrame.to_parquet : Write a DataFrame in Parquet format.
"""
pa = import_optional_dependency("pyarrow")
pyiceberg_catalog = import_optional_dependency("pyiceberg.catalog")
if catalog_properties is None:
catalog_properties = {}
catalog = pyiceberg_catalog.load_catalog(catalog_name, **catalog_properties)
arrow_table = pa.Table.from_pandas(df)
table = catalog.create_table_if_not_exists(
identifier=table_identifier,
schema=arrow_table.schema,
location=location,
# we could add `partition_spec`, `sort_order` and `properties` in the

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I definitely think these would be great to have but I don't really have any ideas on how to do it without just using PyIceberg objects

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding them later is easy if we think of a good signature. That's why I didn't worry too much about adding them.

# future, but it may not be trivial without exposing PyIceberg objects
)
if snapshot_properties is None:
snapshot_properties = {}
if append:
table.append(arrow_table, snapshot_properties=snapshot_properties)
else:
table.overwrite(arrow_table, snapshot_properties=snapshot_properties)
83 changes: 81 additions & 2 deletions pandas/tests/io/test_iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
pyiceberg_catalog = pytest.importorskip("pyiceberg.catalog")
pq = pytest.importorskip("pyarrow.parquet")

Catalog = collections.namedtuple("Catalog", ["name", "uri"])
Catalog = collections.namedtuple("Catalog", ["name", "uri", "warehouse"])


@pytest.fixture
Expand Down Expand Up @@ -58,7 +58,7 @@ def catalog(request, tmp_path):

importlib.reload(pyiceberg_catalog) # needed to reload the config file

yield Catalog(name=catalog_name or "default", uri=uri)
yield Catalog(name=catalog_name or "default", uri=uri, warehouse=warehouse)

if catalog_name is not None:
config_path.unlink()
Expand Down Expand Up @@ -141,3 +141,82 @@ def test_read_with_limit(self, catalog):
limit=2,
)
tm.assert_frame_equal(result, expected)

def test_write(self, catalog):
df = pd.DataFrame(
{
"A": [1, 2, 3],
"B": ["foo", "foo", "foo"],
}
)
df.to_iceberg(
"ns.new_table",
catalog_properties={"uri": catalog.uri},
location=catalog.warehouse,
)
result = read_iceberg(
"ns.new_table",
catalog_properties={"uri": catalog.uri},
)
tm.assert_frame_equal(result, df)

@pytest.mark.parametrize("catalog", ["default", "pandas_tests"], indirect=True)
def test_write_by_catalog_name(self, catalog):
df = pd.DataFrame(
{
"A": [1, 2, 3],
"B": ["foo", "foo", "foo"],
}
)
df.to_iceberg(
"ns.new_table",
catalog_name=catalog.name,
)
result = read_iceberg(
"ns.new_table",
catalog_name=catalog.name,
)
tm.assert_frame_equal(result, df)

def test_write_existing_table_with_append_true(self, catalog):
original = read_iceberg(
"ns.my_table",
catalog_properties={"uri": catalog.uri},
)
new = pd.DataFrame(
{
"A": [1, 2, 3],
"B": ["foo", "foo", "foo"],
}
)
expected = pd.concat([original, new], ignore_index=True)
new.to_iceberg(
"ns.my_table",
catalog_properties={"uri": catalog.uri},
location=catalog.warehouse,
append=True,
)
result = read_iceberg(
"ns.my_table",
catalog_properties={"uri": catalog.uri},
)
tm.assert_frame_equal(result, expected)

def test_write_existing_table_with_append_false(self, catalog):
df = pd.DataFrame(
{
"A": [1, 2, 3],
"B": ["foo", "foo", "foo"],
}
)
df.to_iceberg(
"ns.my_table",
catalog_properties={"uri": catalog.uri},
location=catalog.warehouse,
append=False,
)
result = read_iceberg(
"ns.my_table",
catalog_properties={"uri": catalog.uri},
)
tm.assert_frame_equal(result, df)
Loading