-
-
Notifications
You must be signed in to change notification settings - Fork 18.6k
ENH: Implement to_iceberg #61507
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
ENH: Implement to_iceberg #61507
Changes from all commits
1875fb5
b31ae80
373138c
a83c62f
4246d17
fd728e0
f926ffd
1290931
08b234c
43255a7
e62ef3a
1c4de5b
21e6e9b
dfc76ca
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,6 +10,7 @@ | |
def read_iceberg( | ||
table_identifier: str, | ||
catalog_name: str | None = None, | ||
*, | ||
catalog_properties: dict[str, Any] | None = None, | ||
row_filter: str | None = None, | ||
selected_fields: tuple[str] | None = None, | ||
|
@@ -21,6 +22,8 @@ def read_iceberg( | |
""" | ||
Read an Apache Iceberg table into a pandas DataFrame. | ||
|
||
.. versionadded:: 3.0.0 | ||
|
||
.. warning:: | ||
|
||
read_iceberg is experimental and may change without warning. | ||
|
@@ -71,7 +74,6 @@ def read_iceberg( | |
""" | ||
pyiceberg_catalog = import_optional_dependency("pyiceberg.catalog") | ||
pyiceberg_expressions = import_optional_dependency("pyiceberg.expressions") | ||
|
||
if catalog_properties is None: | ||
catalog_properties = {} | ||
catalog = pyiceberg_catalog.load_catalog(catalog_name, **catalog_properties) | ||
|
@@ -91,3 +93,59 @@ def read_iceberg( | |
limit=limit, | ||
) | ||
return result.to_pandas() | ||
|
||
|
||
def to_iceberg( | ||
df: DataFrame, | ||
table_identifier: str, | ||
catalog_name: str | None = None, | ||
*, | ||
catalog_properties: dict[str, Any] | None = None, | ||
location: str | None = None, | ||
append: bool = False, | ||
snapshot_properties: dict[str, str] | None = None, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any thoughts on adding append to match to_parquet? Something like append: bool = False Then this could default to table.overwrite instead of append. I think it might be confusing if this doesn't match other to_* functions There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How does PyIceberg support it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With the table.overwrite method There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Of course, I didn't think about it. I'll add it, thanks for the feedback. |
||
) -> None: | ||
""" | ||
Write a DataFrame to an Apache Iceberg table. | ||
|
||
.. versionadded:: 3.0.0 | ||
|
||
Parameters | ||
---------- | ||
table_identifier : str | ||
Table identifier. | ||
catalog_name : str, optional | ||
The name of the catalog. | ||
catalog_properties : dict of {str: str}, optional | ||
The properties that are used next to the catalog configuration. | ||
location : str, optional | ||
Location for the table. | ||
append : bool, default False | ||
If ``True``, append data to the table, instead of replacing the content. | ||
snapshot_properties : dict of {str: str}, optional | ||
Custom properties to be added to the snapshot summary | ||
|
||
See Also | ||
-------- | ||
read_iceberg : Read an Apache Iceberg table. | ||
DataFrame.to_parquet : Write a DataFrame in Parquet format. | ||
""" | ||
pa = import_optional_dependency("pyarrow") | ||
pyiceberg_catalog = import_optional_dependency("pyiceberg.catalog") | ||
if catalog_properties is None: | ||
catalog_properties = {} | ||
catalog = pyiceberg_catalog.load_catalog(catalog_name, **catalog_properties) | ||
arrow_table = pa.Table.from_pandas(df) | ||
table = catalog.create_table_if_not_exists( | ||
identifier=table_identifier, | ||
schema=arrow_table.schema, | ||
location=location, | ||
# we could add `partition_spec`, `sort_order` and `properties` in the | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I definitely think these would be great to have but I don't really have any ideas on how to do it without just using PyIceberg objects There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Adding them later is easy if we think of a good signature. That's why I didn't worry too much about adding them. |
||
# future, but it may not be trivial without exposing PyIceberg objects | ||
) | ||
if snapshot_properties is None: | ||
snapshot_properties = {} | ||
if append: | ||
table.append(arrow_table, snapshot_properties=snapshot_properties) | ||
else: | ||
table.overwrite(arrow_table, snapshot_properties=snapshot_properties) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add an experimental tag to this API as well like we did with
read_iceberg
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Absolutely, I forgot about that. Added it now. I also expanded the user guide docs of iceberg with
to_iceberg
, which I also had forgotten. Thanks for the review and the feedback!