- Notifications
You must be signed in to change notification settings - Fork4
I/O: Adapter for Apache Iceberg#444
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
base:main
Are you sure you want to change the base?
Uh oh!
There was an error while loading.Please reload this page.
Conversation
coderabbitaibot commentedJun 4, 2025 • edited
Loading Uh oh!
There was an error while loading.Please reload this page.
edited
Uh oh!
There was an error while loading.Please reload this page.
WalkthroughThe changes introduce Apache Iceberg integration for CrateDB Toolkit, adding new CLI commands for loading data from and saving data to Iceberg tables. This includes a new module for Iceberg I/O, updates to the CLI for symmetrical import/export functionality, documentation for Iceberg support, and an optional dependency on Changes
Sequence Diagram(s)sequenceDiagram participant User participant CLI participant StandaloneCluster participant IcebergIO User->>CLI: cratedb-toolkit io load table ... CLI->>StandaloneCluster: load_table(source, target, ...) StandaloneCluster->>IcebergIO: from_iceberg(source_url, cratedb_url) IcebergIO-->>StandaloneCluster: Data loaded into CrateDB User->>CLI: cratedb-toolkit io save table ... CLI->>StandaloneCluster: save_table(source, target, ...) StandaloneCluster->>IcebergIO: to_iceberg(source_url, target_url) IcebergIO-->>StandaloneCluster: Data exported to IcebergPoem
📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (6)
🚧 Files skipped from review as they are similar to previous changes (2)
🧰 Additional context used🪛 Ruff (0.11.9)cratedb_toolkit/io/cli.py118-118: Remove (T201) cratedb_toolkit/cluster/core.py621-621: Found commented-out code Remove commented-out code (ERA001) cratedb_toolkit/io/iceberg.py66-66: Line too long (139 > 120) (E501) 74-74: Line too long (124 > 120) (E501) 106-106: Found commented-out code Remove commented-out code (ERA001) 106-106: Line too long (143 > 120) (E501) 115-115: Line too long (139 > 120) (E501) 120-120: Remove (T201) 🪛 GitHub Check: codecov/patchcratedb_toolkit/io/cli.py[warning] 81-81: cratedb_toolkit/io/cli.py#L81 [warning] 112-113: cratedb_toolkit/io/cli.py#L112-L113 [warning] 116-118: cratedb_toolkit/io/cli.py#L116-L118 [warning] 121-121: cratedb_toolkit/io/cli.py#L121 [warning] 126-126: cratedb_toolkit/io/cli.py#L126 cratedb_toolkit/cluster/core.py[warning] 574-574: cratedb_toolkit/cluster/core.py#L574 [warning] 619-620: cratedb_toolkit/cluster/core.py#L619-L620 [warning] 623-624: cratedb_toolkit/cluster/core.py#L623-L624 [warning] 626-626: cratedb_toolkit/cluster/core.py#L626 [warning] 628-628: cratedb_toolkit/cluster/core.py#L628 cratedb_toolkit/io/iceberg.py[warning] 27-30: cratedb_toolkit/io/iceberg.py#L27-L30 [warning] 37-37: cratedb_toolkit/io/iceberg.py#L37 [warning] 48-48: cratedb_toolkit/io/iceberg.py#L48 [warning] 51-53: cratedb_toolkit/io/iceberg.py#L51-L53 [warning] 55-55: cratedb_toolkit/io/iceberg.py#L55 [warning] 70-70: cratedb_toolkit/io/iceberg.py#L70 [warning] 73-73: cratedb_toolkit/io/iceberg.py#L73 [warning] 77-81: cratedb_toolkit/io/iceberg.py#L77-L81 [warning] 84-85: cratedb_toolkit/io/iceberg.py#L84-L85 [warning] 87-88: cratedb_toolkit/io/iceberg.py#L87-L88 [warning] 95-95: cratedb_toolkit/io/iceberg.py#L95 [warning] 118-120: cratedb_toolkit/io/iceberg.py#L118-L120 [warning] 123-123: cratedb_toolkit/io/iceberg.py#L123 [warning] 126-127: cratedb_toolkit/io/iceberg.py#L126-L127 [warning] 133-134: cratedb_toolkit/io/iceberg.py#L133-L134 🪛 Pylint (3.3.7)cratedb_toolkit/io/cli.py[convention] 89-89: Line too long (116/100) (C0301) [convention] 90-90: Line too long (113/100) (C0301) [convention] 91-91: Line too long (105/100) (C0301) [convention] 92-92: Line too long (106/100) (C0301) [refactor] 95-95: Too many arguments (10/5) (R0913) [refactor] 95-95: Too many positional arguments (10/5) (R0917) [warning] 96-96: Unused argument 'ctx' (W0613) cratedb_toolkit/cluster/core.py[convention] 573-573: Line too long (102/100) (C0301) [convention] 607-607: Line too long (107/100) (C0301) [refactor] 623-626: Unnecessary "else" after "return", remove the "else" and de-indent the code inside it (R1705) [warning] 607-607: Unused argument 'source' (W0613) [warning] 607-607: Unused argument 'transformation' (W0613) cratedb_toolkit/io/iceberg.py[convention] 66-66: Line too long (139/100) (C0301) [convention] 74-74: Line too long (124/100) (C0301) [convention] 106-106: Line too long (143/100) (C0301) [convention] 115-115: Line too long (139/100) (C0301) [convention] 1-1: Missing module docstring (C0114) [error] 4-4: Unable to import 'polars' (E0401) [error] 5-5: Unable to import 'pyarrow.parquet' (E0401) [error] 6-6: Unable to import 'sqlalchemy' (E0401) [error] 8-8: Unable to import 'pyiceberg.catalog' (E0401) [error] 9-9: Unable to import 'sqlalchemy_cratedb' (E0401) [convention] 20-20: Missing class docstring (C0115) [convention] 26-26: Missing function or method docstring (C0116) [convention] 36-36: Missing function or method docstring (C0116) [convention] 47-47: Missing function or method docstring (C0116) [convention] 50-50: Missing function or method docstring (C0116) [refactor] 51-55: Unnecessary "else" after "return", remove the "else" and de-indent the code inside it (R1705) [warning] 73-75: Use lazy % formatting in logging functions (W1203) [warning] 81-81: Use lazy % formatting in logging functions (W1203) [warning] 58-58: Unused argument 'progress' (W0613) [warning] 109-109: Unused argument 'source_url' (W0613) [warning] 109-109: Unused argument 'progress' (W0613) 🪛 GitHub Actions: Tests: Commoncratedb_toolkit/io/cli.py[error] 118-118: Ruff: T201 cratedb_toolkit/cluster/core.py[error] 621-621: Ruff: ERA001 Found commented-out code. Remove commented-out code. cratedb_toolkit/io/iceberg.py[error] 4-4: ModuleNotFoundError: No module named 'polars'. The Python module 'polars' is missing and required by the test run. 🪛 GitHub Actions: Tests: DynamoDBcratedb_toolkit/io/iceberg.py[error] 4-4: ModuleNotFoundError: No module named 'polars' - The Python module 'polars' is missing, causing the pytest run to fail. 🪛 LanguageTooldoc/io/iceberg/index.md[uncategorized] ~1-~1: A punctuation mark might be missing here. (AI_EN_LECTOR_MISSING_PUNCTUATION) [uncategorized] ~5-~5: This verb may not be in the correct form. Consider using a different form for this context. (AI_EN_LECTOR_REPLACEMENT_VERB_FORM) 🔇 Additional comments (4)
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat withCodeRabbit:
SupportNeed help? Create a ticket on oursupport page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Actionable comments posted: 7
🧹 Nitpick comments (5)
cratedb_toolkit/io/iceberg.py (4)
1-13:Add module docstring.The module lacks a docstring explaining its purpose and functionality. Consider adding a module-level docstring to describe the Iceberg integration capabilities.
+"""+Apache Iceberg integration for CrateDB Toolkit.++This module provides functionality to transfer data between Iceberg tables+and CrateDB databases, supporting both import and export operations.+""" import dataclasses import logging🧰 Tools
🪛 Pylint (3.3.7)
[convention] 1-1: Missing module docstring
(C0114)
[error] 3-3: Unable to import 'sqlalchemy'
(E0401)
[error] 4-4: Unable to import 'polars'
(E0401)
[error] 5-5: Unable to import 'pyarrow.parquet'
(E0401)
[error] 7-7: Unable to import 'pyiceberg.catalog'
(E0401)
[error] 8-8: Unable to import 'sqlalchemy_cratedb'
(E0401)
🪛 GitHub Actions: Tests: DynamoDB
[error] 4-4: ModuleNotFoundError: No module named 'polars'. The Python module 'polars' is missing, causing pytest to fail.
🪛 GitHub Actions: Tests: Common
[error] 1-1: Ruff formatting check failed. File would be reformatted.
31-39:Consider making catalog configuration flexible.The catalog configuration is hardcoded to use SQLite with specific paths. This limits flexibility for different Iceberg deployments.
def load_catalog(self) -> Catalog:+ """Load the Iceberg catalog with appropriate configuration."""+ # TODO: Consider accepting catalog configuration as parameters+ # to support different catalog types (Hive, REST, etc.) return load_catalog( self.catalog, **{ 'type': 'sql', "uri": f"sqlite:///{self.path}/pyiceberg_catalog.db", "warehouse": f"file://{self.path}", }, )🧰 Tools
🪛 Pylint (3.3.7)
[convention] 31-31: Missing function or method docstring
(C0116)
61-62:Fix line length issues.Multiple lines exceed the maximum line length of 120 characters.
- ctk load table "file+iceberg:var/lib/iceberg/default.db/taxi_dataset/metadata/00001-dc8e5ed2-dc29-4e39-b2e4-019e466af4c3.metadata.json"+ ctk load table "file+iceberg:var/lib/iceberg/default.db/taxi_dataset/metadata/\+00001-dc8e5ed2-dc29-4e39-b2e4-019e466af4c3.metadata.json"- logger.info(f"Iceberg address: Path: {iceberg_address.path}, catalog: {iceberg_address.catalog}, table: {iceberg_address.table}")+ logger.info(+ "Iceberg address: Path: %s, catalog: %s, table: %s",+ iceberg_address.path, iceberg_address.catalog, iceberg_address.table+ )Also applies to: 68-68
🧰 Tools
🪛 Ruff (0.11.9)
61-61: Line too long (139 > 120)
(E501)
🪛 Pylint (3.3.7)
[convention] 61-61: Line too long (139/100)
(C0301)
74-74:Use lazy formatting in logging.Use lazy % formatting instead of f-strings in logging functions for better performance.
- logger.info(f"Target address: {cratedb_address}")+ logger.info("Target address: %s", cratedb_address)🧰 Tools
🪛 Pylint (3.3.7)
[warning] 74-74: Use lazy % formatting in logging functions
(W1203)
cratedb_toolkit/io/cli.py (1)
95-106:Remove unused parameter.The
ctxparameter is not used in the function.def save_table(- ctx: click.Context, url: str, cluster_id: str, cluster_name: str, cluster_url: str, schema: str, table: str, format_: str, compression: str, transformation: t.Union[Path, None], ):Note: If you remove the
ctxparameter, also remove@click.pass_contextfrom line 94.🧰 Tools
🪛 Pylint (3.3.7)
[refactor] 95-95: Too many arguments (10/5)
(R0913)
[refactor] 95-95: Too many positional arguments (10/5)
(R0917)
[warning] 96-96: Unused argument 'ctx'
(W0613)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
cratedb_toolkit/cli.py(2 hunks)cratedb_toolkit/cluster/core.py(3 hunks)cratedb_toolkit/io/cli.py(2 hunks)cratedb_toolkit/io/iceberg.py(1 hunks)doc/io/iceberg/index.md(1 hunks)pyproject.toml(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (2)
cratedb_toolkit/cli.py (1)
cratedb_toolkit/io/cli.py (2)
cli_load(21-25)cli_save(77-81)
cratedb_toolkit/io/cli.py (3)
cratedb_toolkit/util/cli.py (2)
boot_click(16-27)make_command(75-89)cratedb_toolkit/cluster/core.py (2)
DatabaseCluster(631-684)create(651-684)cratedb_toolkit/model.py (3)
schema(174-178)TableAddress(182-201)InputOutputResource(205-212)
🪛 LanguageTool
doc/io/iceberg/index.md
[uncategorized] ~1-~1: A punctuation mark might be missing here.
Context: (iceberg)= # Apache Iceberg I/O ## About Import and ...
(AI_EN_LECTOR_MISSING_PUNCTUATION)
[uncategorized] ~5-~5: This verb may not be in the correct form. Consider using a different form for this context.
Context: ...Apache Iceberg I/O ## About Import and export data into/from Iceberg tables, for huma...
(AI_EN_LECTOR_REPLACEMENT_VERB_FORM)
🪛 Pylint (3.3.7)
cratedb_toolkit/cluster/core.py
[convention] 573-573: Line too long (102/100)
(C0301)
[convention] 607-607: Line too long (107/100)
(C0301)
[refactor] 623-626: Unnecessary "else" after "return", remove the "else" and de-indent the code inside it
(R1705)
[warning] 607-607: Unused argument 'source'
(W0613)
[warning] 607-607: Unused argument 'transformation'
(W0613)
cratedb_toolkit/io/cli.py
[convention] 89-89: Line too long (116/100)
(C0301)
[convention] 90-90: Line too long (113/100)
(C0301)
[convention] 91-91: Line too long (105/100)
(C0301)
[convention] 92-92: Line too long (106/100)
(C0301)
[refactor] 95-95: Too many arguments (10/5)
(R0913)
[refactor] 95-95: Too many positional arguments (10/5)
(R0917)
[warning] 96-96: Unused argument 'ctx'
(W0613)
cratedb_toolkit/io/iceberg.py
[convention] 29-29: Line too long (135/100)
(C0301)
[convention] 61-61: Line too long (139/100)
(C0301)
[convention] 68-68: Line too long (133/100)
(C0301)
[convention] 89-89: Line too long (150/100)
(C0301)
[convention] 93-93: Line too long (143/100)
(C0301)
[convention] 102-102: Line too long (139/100)
(C0301)
[convention] 1-1: Missing module docstring
(C0114)
[error] 3-3: Unable to import 'sqlalchemy'
(E0401)
[error] 4-4: Unable to import 'polars'
(E0401)
[error] 5-5: Unable to import 'pyarrow.parquet'
(E0401)
[error] 7-7: Unable to import 'pyiceberg.catalog'
(E0401)
[error] 8-8: Unable to import 'sqlalchemy_cratedb'
(E0401)
[convention] 19-19: Missing class docstring
(C0115)
[convention] 25-25: Missing function or method docstring
(C0116)
[convention] 31-31: Missing function or method docstring
(C0116)
[convention] 42-42: Missing function or method docstring
(C0116)
[convention] 45-45: Missing function or method docstring
(C0116)
[refactor] 46-50: Unnecessary "else" after "return", remove the "else" and de-indent the code inside it
(R1705)
[warning] 68-68: Use lazy % formatting in logging functions
(W1203)
[warning] 74-74: Use lazy % formatting in logging functions
(W1203)
[warning] 53-53: Unused argument 'progress'
(W0613)
[warning] 96-96: Unused argument 'source_url'
(W0613)
[warning] 96-96: Unused argument 'progress'
(W0613)
🪛 Ruff (0.11.9)
cratedb_toolkit/cluster/core.py
621-621: Found commented-out code
Remove commented-out code
(ERA001)
cratedb_toolkit/io/cli.py
118-118:print found
Removeprint
(T201)
cratedb_toolkit/io/iceberg.py
29-29: Line too long (135 > 120)
(E501)
61-61: Line too long (139 > 120)
(E501)
68-68: Line too long (133 > 120)
(E501)
89-89: Line too long (150 > 120)
(E501)
93-93: Found commented-out code
Remove commented-out code
(ERA001)
93-93: Line too long (143 > 120)
(E501)
102-102: Line too long (139 > 120)
(E501)
107-107:print found
Removeprint
(T201)
🪛 GitHub Actions: Tests: Common
cratedb_toolkit/cluster/core.py
[error] 1-1: Ruff formatting check failed. File would be reformatted.
cratedb_toolkit/io/iceberg.py
[error] 1-1: Ruff formatting check failed. File would be reformatted.
🪛 GitHub Actions: Tests: DynamoDB
cratedb_toolkit/io/iceberg.py
[error] 4-4: ModuleNotFoundError: No module named 'polars'. The Python module 'polars' is missing, causing pytest to fail.
🔇 Additional comments (7)
pyproject.toml (1)
176-176:✅ Verification successful
Verify the version constraint for pyiceberg dependency.
The dependency addition looks correct, but please verify that the version constraint
<0.10is appropriate for the current Iceberg integration features.🌐 Web query:
What is the latest stable version of pyiceberg and are there any recent breaking changes that would affect the version constraint <0.10?💡 Result:
The latest stable version of pyiceberg is 0.9.1, which was released on April 30, 2025[1]. This release is part of the Apache Iceberg ecosystem, which is an open table format for huge analytic datasets.
Recent Changes in PyIceberg
PyIceberg has undergone significant development since its first release in September 2023, as mentioned during the Iceberg Summit 2025 by Fokko Driesprong[5]. The most recent version (0.9.1) follows several earlier versions with notable changes.
Breaking Changes
There are a few breaking changes that would affect users with version constraints <0.10:
Table.name Behavior Change: In a previous release, the behavior of
Table.namewas changed to return the table namewithout the catalog name. This was part of a broader effort to remove references to the catalog name in pyiceberg[6].Dependency Requirements: PyIceberg has removed upper bound restrictions for dependency libraries, allowing early testing of new versions. This includes removing Python library version upper bound restrictions, allowing for Python 3.13 support[6].
Python Version Support: While not directly in pyiceberg, but relevant to the Python ecosystem, it's worth noting that Python 3.8 support is being removed in related projects as it reached EOL in October 2024. The OpenMetadata project, for example, now only supports Python 3.9, 3.10, and 3.11 for their ingestion framework[7].
Features in Recent Versions
Recent versions of pyiceberg have added several new capabilities:
- Support for UPSERT operations directly within PyIceberg
- Dynamic overwrites optimization
- Extended table updates with new actions
- Support for Iceberg V3 spec
- Cross-region read support for S3FileIO operations
- Integration with pandas, polars, and other data processing libraries[6]
If you're using a version constraint of <0.10, you'll need to be aware of these changes, particularly the behavior change in
Table.namewhich might require code adjustments.Citations:
- 1:https://pypi.org/project/pyiceberg/
- 2:https://py.iceberg.apache.org
- 3:https://iceberg.apache.org/releases/
- 4:https://github.com/apache/iceberg-python
- 5:https://www.youtube.com/watch?v=TIJMWHplBRU
- 6:https://github.com/apache/iceberg-python/releases
- 7:https://docs.open-metadata.org/v1.1.x/releases/all-releases
- 8:https://pypi.org/project/pyiceberg/0.4.0rc2
- 9:https://estuary.dev/blog/getting-started-pyiceberg/
- 10:https://pyodide.org/en/stable/project/changelog.html
pyiceberg <0.10 constraint is valid—note upcoming breaking changes
The constraint
<0.10will install the latest stable 0.9.1 release, but please review and test for these recent changes before rolling out:
- Table.name now returns only the table name (catalog prefix removed)
- Python 3.8 support is dropped in the ecosystem (ensure Python ≥3.9)
- Upstream dependencies no longer have upper‐bound restrictions (may pull newer library versions)
No changes to the current version constraint are required.
doc/io/iceberg/index.md (1)
1-12:Documentation looks good and clearly introduces the Iceberg I/O functionality.The documentation structure and content are appropriate for introducing the new Apache Iceberg integration features.
🧰 Tools
🪛 LanguageTool
[uncategorized] ~1-~1: A punctuation mark might be missing here.
Context: (iceberg)= # Apache Iceberg I/O ## About Import and ...(AI_EN_LECTOR_MISSING_PUNCTUATION)
[uncategorized] ~5-~5: This verb may not be in the correct form. Consider using a different form for this context.
Context: ...Apache Iceberg I/O ## About Import and export data into/from Iceberg tables, for huma...(AI_EN_LECTOR_REPLACEMENT_VERB_FORM)
cratedb_toolkit/cli.py (2)
12-13:Good architectural improvement separating load and save operations.The split of the CLI import into separate
cli_loadandcli_savefunctions properly implements the symmetrical import/export functionality.
34-35:Correct implementation of separate CLI commands.The registration of distinct "load" and "save" commands enables users to access both import and export functionality independently.
cratedb_toolkit/cluster/core.py (2)
23-23:Import addition looks correct.The import of
from_icebergandto_icebergfunctions is properly placed and supports the new Iceberg integration functionality.
573-575:Iceberg scheme handling implemented correctly.The conditional logic properly detects Iceberg URLs and delegates to the appropriate
from_icebergfunction.🧰 Tools
🪛 Pylint (3.3.7)
[convention] 573-573: Line too long (102/100)
(C0301)
cratedb_toolkit/io/cli.py (1)
72-82:Well-structured command group for data export.The new
cli_savecommand group properly mirrors the structure ofcli_loadand provides a clear separation of concerns between loading and saving operations.
| defsave_table( | ||
| self,source:TableAddress,target:InputOutputResource,transformation:t.Union[Path,None]=None | ||
| )->"StandaloneCluster": | ||
| """ | ||
| Export data from a database table on a standalone CrateDB Server. | ||
| Synopsis | ||
| -------- | ||
| export CRATEDB_CLUSTER_URL=crate://crate@localhost:4200/testdrive/demo | ||
| ctk load table influxdb2://example:token@localhost:8086/testdrive/demo | ||
| ctk load table mongodb://localhost:27017/testdrive/demo | ||
| """ | ||
| source_url=self.address.dburi | ||
| target_url_obj=URL(target.url) | ||
| #source_url = source.url | ||
| iftarget_url_obj.scheme.startswith("iceberg")ortarget_url_obj.scheme.endswith("iceberg"): | ||
| returnto_iceberg(source_url,target.url) | ||
| else: | ||
| raiseNotImplementedError(f"Exporting resource not implemented yet:{target_url_obj}") | ||
| returnself | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Fix multiple issues in the save_table method.
The method has several issues that need to be addressed:
- Unused arguments:
sourceandtransformationparameters are not used - Commented-out code: Line 621 contains commented code that should be removed
- Unnecessary else clause: The else block after return is redundant
- Line length violations: Lines exceed the 100-character limit
- Formatting issues: The pipeline indicates this file needs reformatting
Apply this diff to fix the issues:
def save_table(- self, source: TableAddress, target: InputOutputResource, transformation: t.Union[Path, None] = None+ self, source: TableAddress, target: InputOutputResource ) -> "StandaloneCluster": """ Export data from a database table on a standalone CrateDB Server. Synopsis -------- export CRATEDB_CLUSTER_URL=crate://crate@localhost:4200/testdrive/demo- ctk load table influxdb2://example:token@localhost:8086/testdrive/demo- ctk load table mongodb://localhost:27017/testdrive/demo+ ctk save table iceberg://... """ source_url = self.address.dburi target_url_obj = URL(target.url)- #source_url = source.url- if target_url_obj.scheme.startswith("iceberg") or target_url_obj.scheme.endswith("iceberg"):+ if (target_url_obj.scheme.startswith("iceberg") or+ target_url_obj.scheme.endswith("iceberg")): return to_iceberg(source_url, target.url)- else:- raise NotImplementedError(f"Exporting resource not implemented yet: {target_url_obj}")++ raise NotImplementedError(f"Exporting resource not implemented yet: {target_url_obj}")- return self
Additionally, run the formatter to fix the pipeline failure:
#!/bin/bash# Fix formatting issuesruff format cratedb_toolkit/cluster/core.py
🧰 Tools
🪛 Ruff (0.11.9)
621-621: Found commented-out code
Remove commented-out code
(ERA001)
🪛 Pylint (3.3.7)
[convention] 607-607: Line too long (107/100)
(C0301)
[refactor] 623-626: Unnecessary "else" after "return", remove the "else" and de-indent the code inside it
(R1705)
[warning] 607-607: Unused argument 'source'
(W0613)
[warning] 607-607: Unused argument 'transformation'
(W0613)
🤖 Prompt for AI Agents
In cratedb_toolkit/cluster/core.py around lines 606 to 629, fix the save_tablemethod by removing the unused parameters source and transformation, deleting thecommented-out code on line 621, eliminating the unnecessary else clause afterthe return statement, and breaking long lines to respect the 100-characterlimit. After these code changes, run the ruff formatter on the file to resolveformatting issues and pass the pipeline.| @property | ||
| defidentifier(self): | ||
| return (self.catalog,self.table) | ||
| defload_table(self)->pl.LazyFrame: | ||
| ifself.catalogisnotNone: | ||
| catalog=self.load_catalog() | ||
| returncatalog.load_table(self.identifier).to_polars() | ||
| else: | ||
| returnpl.scan_iceberg(self.path) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
🛠️ Refactor suggestion
Simplify control flow and add documentation.
The code has an unnecessaryelse afterreturn and lacks documentation.
@property def identifier(self):+ """Return the catalog-table identifier tuple.""" return (self.catalog, self.table) def load_table(self) -> pl.LazyFrame:+ """Load the Iceberg table as a Polars LazyFrame.""" if self.catalog is not None: catalog = self.load_catalog() return catalog.load_table(self.identifier).to_polars()- else:- return pl.scan_iceberg(self.path)+ return pl.scan_iceberg(self.path)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| @property | |
| defidentifier(self): | |
| return (self.catalog,self.table) | |
| defload_table(self)->pl.LazyFrame: | |
| ifself.catalogisnotNone: | |
| catalog=self.load_catalog() | |
| returncatalog.load_table(self.identifier).to_polars() | |
| else: | |
| returnpl.scan_iceberg(self.path) | |
| @property | |
| defidentifier(self): | |
| """Return the catalog-table identifier tuple.""" | |
| return (self.catalog,self.table) | |
| defload_table(self)->pl.LazyFrame: | |
| """Load the Iceberg table as a Polars LazyFrame.""" | |
| ifself.catalogisnotNone: | |
| catalog=self.load_catalog() | |
| returncatalog.load_table(self.identifier).to_polars() | |
| returnpl.scan_iceberg(self.path) |
🧰 Tools
🪛 Pylint (3.3.7)
[convention] 42-42: Missing function or method docstring
(C0116)
[convention] 45-45: Missing function or method docstring
(C0116)
[refactor] 46-50: Unnecessary "else" after "return", remove the "else" and de-indent the code inside it
(R1705)
🤖 Prompt for AI Agents
In cratedb_toolkit/io/iceberg.py around lines 41 to 51, remove the unnecessaryelse block after the return statement in load_table to simplify control flow.Add a docstring to the load_table method explaining its purpose and behavior,including what it returns and under what conditions it loads from catalog orpath.Uh oh!
There was an error while loading.Please reload this page.
| table.collect(streaming=True).to_pandas().to_sql( | ||
| name=cratedb_table.table,schema=cratedb_table.schema,con=engine,if_exists="replace",index=False,chunksize=CHUNK_SIZE,method=insert_bulk, | ||
| ) | ||
| # Note: This was much slower. | ||
| # table.to_polars().collect(streaming=True).write_database(table_name=table_address.fullname, connection=engine, if_table_exists="replace") | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
🛠️ Refactor suggestion
Remove commented-out code and fix line length.
The code contains commented-out implementation and line length issues.
table.collect(streaming=True).to_pandas().to_sql(- name=cratedb_table.table, schema=cratedb_table.schema, con=engine, if_exists="replace", index=False, chunksize=CHUNK_SIZE, method=insert_bulk,+ name=cratedb_table.table,+ schema=cratedb_table.schema,+ con=engine,+ if_exists="replace",+ index=False,+ chunksize=CHUNK_SIZE,+ method=insert_bulk, )-- # Note: This was much slower.- # table.to_polars().collect(streaming=True).write_database(table_name=table_address.fullname, connection=engine, if_table_exists="replace")
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| table.collect(streaming=True).to_pandas().to_sql( | |
| name=cratedb_table.table,schema=cratedb_table.schema,con=engine,if_exists="replace",index=False,chunksize=CHUNK_SIZE,method=insert_bulk, | |
| ) | |
| # Note: This was much slower. | |
| # table.to_polars().collect(streaming=True).write_database(table_name=table_address.fullname, connection=engine, if_table_exists="replace") | |
| table.collect(streaming=True).to_pandas().to_sql( | |
| name=cratedb_table.table, | |
| schema=cratedb_table.schema, | |
| con=engine, | |
| if_exists="replace", | |
| index=False, | |
| chunksize=CHUNK_SIZE, | |
| method=insert_bulk, | |
| ) |
🧰 Tools
🪛 Ruff (0.11.9)
89-89: Line too long (150 > 120)
(E501)
93-93: Found commented-out code
Remove commented-out code
(ERA001)
93-93: Line too long (143 > 120)
(E501)
🪛 Pylint (3.3.7)
[convention] 89-89: Line too long (150/100)
(C0301)
[convention] 93-93: Line too long (143/100)
(C0301)
🤖 Prompt for AI Agents
In cratedb_toolkit/io/iceberg.py around lines 88 to 94, remove the commented-outcode block that uses the polars library for writing to the database, as it is nolonger needed. Also, reformat the long line calling to_sql to ensure it does notexceed the preferred line length, breaking it into multiple lines for betterreadability.| defto_iceberg(source_url,target_url,progress:bool=False): | ||
| """ | ||
| Synopsis | ||
| -------- | ||
| export CRATEDB_CLUSTER_URL=crate://crate@localhost:4200/testdrive/demo | ||
| ctk load table "iceberg://./var/lib/iceberg/?catalog=default&table=taxi_dataset" | ||
| ctk save table "file+iceberg:var/lib/iceberg/default.db/taxi_dataset/metadata/00001-dc8e5ed2-dc29-4e39-b2e4-019e466af4c3.metadata.json" | ||
| """ | ||
| iceberg_address=IcebergAddress.from_url(target_url) | ||
| catalog=iceberg_address.load_catalog() | ||
| print("catalog:",catalog) | ||
| # https://py.iceberg.apache.org/#write-a-pyarrow-dataframe | ||
| df=pq.read_table("tmp/yellow_tripdata_2023-01.parquet") | ||
| # Create a new Iceberg table. | ||
| catalog.create_namespace_if_not_exists("default") | ||
| table=catalog.create_table_if_not_exists( | ||
| "default.taxi_dataset", | ||
| schema=df.schema, | ||
| ) | ||
| # Append the dataframe to the table. | ||
| table.append(df) | ||
| len(table.scan().to_arrow()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Complete the implementation and fix multiple issues.
Theto_iceberg function has several critical issues:
- Hardcoded file path instead of using
source_urlparameter - Print statement instead of logging
- Unused parameters:
source_urlandprogress - Dangling expression at line 121
- Line length issues in docstring
-def to_iceberg(source_url, target_url, progress: bool = False):+def to_iceberg(source_url: str, target_url: str, progress: bool = False): """+ Export data from CrateDB to an Iceberg table.+ Synopsis -------- export CRATEDB_CLUSTER_URL=crate://crate@localhost:4200/testdrive/demo ctk load table "iceberg://./var/lib/iceberg/?catalog=default&table=taxi_dataset"- ctk save table "file+iceberg:var/lib/iceberg/default.db/taxi_dataset/metadata/00001-dc8e5ed2-dc29-4e39-b2e4-019e466af4c3.metadata.json"+ ctk save table "file+iceberg:var/lib/iceberg/default.db/taxi_dataset/metadata/\+00001-dc8e5ed2-dc29-4e39-b2e4-019e466af4c3.metadata.json" """ iceberg_address = IcebergAddress.from_url(target_url) catalog = iceberg_address.load_catalog()- print("catalog:", catalog)+ logger.info("Loading catalog: %s", catalog) # https://py.iceberg.apache.org/#write-a-pyarrow-dataframe- df = pq.read_table("tmp/yellow_tripdata_2023-01.parquet")+ # TODO: Implement reading from source_url (CrateDB) instead of hardcoded file+ df = pq.read_table(source_url) # This should read from CrateDB # Create a new Iceberg table. catalog.create_namespace_if_not_exists("default") table = catalog.create_table_if_not_exists( "default.taxi_dataset", schema=df.schema, ) # Append the dataframe to the table. table.append(df)- len(table.scan().to_arrow())+ row_count = len(table.scan().to_arrow())+ logger.info("Appended %d rows to Iceberg table", row_count)
Would you like me to help implement the complete functionality to read data from CrateDB using the source_url parameter?
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| defto_iceberg(source_url,target_url,progress:bool=False): | |
| """ | |
| Synopsis | |
| -------- | |
| exportCRATEDB_CLUSTER_URL=crate://crate@localhost:4200/testdrive/demo | |
| ctkloadtable"iceberg://./var/lib/iceberg/?catalog=default&table=taxi_dataset" | |
| ctksavetable"file+iceberg:var/lib/iceberg/default.db/taxi_dataset/metadata/00001-dc8e5ed2-dc29-4e39-b2e4-019e466af4c3.metadata.json" | |
| """ | |
| iceberg_address=IcebergAddress.from_url(target_url) | |
| catalog=iceberg_address.load_catalog() | |
| print("catalog:",catalog) | |
| # https://py.iceberg.apache.org/#write-a-pyarrow-dataframe | |
| df=pq.read_table("tmp/yellow_tripdata_2023-01.parquet") | |
| # Create a new Iceberg table. | |
| catalog.create_namespace_if_not_exists("default") | |
| table=catalog.create_table_if_not_exists( | |
| "default.taxi_dataset", | |
| schema=df.schema, | |
| ) | |
| # Append the dataframe to the table. | |
| table.append(df) | |
| len(table.scan().to_arrow()) | |
| defto_iceberg(source_url:str,target_url:str,progress:bool=False): | |
| """ | |
| ExportdatafromCrateDBtoanIcebergtable. | |
| Synopsis | |
| -------- | |
| exportCRATEDB_CLUSTER_URL=crate://crate@localhost:4200/testdrive/demo | |
| ctkloadtable"iceberg://./var/lib/iceberg/?catalog=default&table=taxi_dataset" | |
| ctksavetable "file+iceberg:var/lib/iceberg/default.db/taxi_dataset/metadata/\ | |
| 00001-dc8e5ed2-dc29-4e39-b2e4-019e466af4c3.metadata.json" | |
| """ | |
| iceberg_address=IcebergAddress.from_url(target_url) | |
| catalog=iceberg_address.load_catalog() | |
| logger.info("Loading catalog: %s",catalog) | |
| # https://py.iceberg.apache.org/#write-a-pyarrow-dataframe | |
| # TODO: Implement reading from source_url (CrateDB) instead of hardcoded file | |
| df=pq.read_table(source_url)# This should read from CrateDB | |
| # Create a new Iceberg table. | |
| catalog.create_namespace_if_not_exists("default") | |
| table=catalog.create_table_if_not_exists( | |
| "default.taxi_dataset", | |
| schema=df.schema, | |
| ) | |
| # Append the dataframe to the table. | |
| table.append(df) | |
| row_count=len(table.scan().to_arrow()) | |
| logger.info("Appended %d rows to Iceberg table",row_count) |
🧰 Tools
🪛 Ruff (0.11.9)
102-102: Line too long (139 > 120)
(E501)
107-107:print found
Removeprint
(T201)
🪛 Pylint (3.3.7)
[convention] 102-102: Line too long (139/100)
(C0301)
[warning] 96-96: Unused argument 'source_url'
(W0613)
[warning] 96-96: Unused argument 'progress'
(W0613)
🤖 Prompt for AI Agents
In cratedb_toolkit/io/iceberg.py from lines 96 to 121, the to_iceberg functionhas multiple issues: it uses a hardcoded file path instead of the source_urlparameter to read data, it uses print instead of proper logging, the parameterssource_url and progress are unused, there is a dangling expression at the end,and the docstring lines are too long. Fix these by replacing the hardcoded pathwith source_url to read the parquet file, replace print with a logger call,utilize the progress parameter if applicable or remove it if not needed, removethe dangling expression or assign its result properly, and reformat thedocstring to respect line length limits. This will complete and clean up thefunction implementation.| print("target:",target) | ||
| # Dispatch "load table" operation. | ||
| cluster=DatabaseCluster.create( | ||
| cluster_id=cluster_id, | ||
| cluster_name=cluster_name, | ||
| cluster_url=cluster_url, | ||
| ) | ||
| cluster.save_table(source=source,target=target,transformation=transformation) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Remove debug print statement and fix comment.
The code contains a debug print statement and an incorrect comment.
# Encapsulate source and target parameters. source = TableAddress(schema=schema, table=table) target = InputOutputResource(url=url, format=format_, compression=compression)- print("target:", target)- # Dispatch "load table" operation.+ # Dispatch "save table" operation. cluster = DatabaseCluster.create( cluster_id=cluster_id, cluster_name=cluster_name, cluster_url=cluster_url, ) cluster.save_table(source=source, target=target, transformation=transformation)
🧰 Tools
🪛 Ruff (0.11.9)
118-118:print found
Removeprint
(T201)
🤖 Prompt for AI Agents
In cratedb_toolkit/io/cli.py around lines 118 to 126, remove the debug printstatement printing "target:" and update the comment above the cluster creationto accurately describe the operation being performed instead of "Dispatch 'loadtable' operation." Ensure the comment clearly reflects the purpose of creatingthe DatabaseCluster and saving the table.| @click.option("--schema",envvar="CRATEDB_SCHEMA",type=str,required=False,help="Schema where to import the data") | ||
| @click.option("--table",envvar="CRATEDB_TABLE",type=str,required=False,help="Table where to import the data") | ||
| @click.option("--format","format_",type=str,required=False,help="File format of the import resource") | ||
| @click.option("--compression",type=str,required=False,help="Compression format of the import resource") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Fix help text to reflect export functionality.
The help text incorrectly refers to "import" when this command is for exporting data.
-@click.option("--schema", envvar="CRATEDB_SCHEMA", type=str, required=False, help="Schema where to import the data")-@click.option("--table", envvar="CRATEDB_TABLE", type=str, required=False, help="Table where to import the data")-@click.option("--format", "format_", type=str, required=False, help="File format of the import resource")-@click.option("--compression", type=str, required=False, help="Compression format of the import resource")+@click.option("--schema", envvar="CRATEDB_SCHEMA", type=str, required=False, help="Schema from which to export the data")+@click.option("--table", envvar="CRATEDB_TABLE", type=str, required=False, help="Table from which to export the data")+@click.option("--format", "format_", type=str, required=False, help="File format of the export resource")+@click.option("--compression", type=str, required=False, help="Compression format of the export resource")
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| @click.option("--schema",envvar="CRATEDB_SCHEMA",type=str,required=False,help="Schemawheretoimport the data") | |
| @click.option("--table",envvar="CRATEDB_TABLE",type=str,required=False,help="Tablewheretoimport the data") | |
| @click.option("--format","format_",type=str,required=False,help="File format of theimport resource") | |
| @click.option("--compression",type=str,required=False,help="Compression format of theimport resource") | |
| @click.option("--schema",envvar="CRATEDB_SCHEMA",type=str,required=False,help="Schemafrom whichtoexport the data") | |
| @click.option("--table",envvar="CRATEDB_TABLE",type=str,required=False,help="Tablefrom whichtoexport the data") | |
| @click.option("--format","format_",type=str,required=False,help="File format of theexport resource") | |
| @click.option("--compression",type=str,required=False,help="Compression format of theexport resource") |
🧰 Tools
🪛 Pylint (3.3.7)
[convention] 89-89: Line too long (116/100)
(C0301)
[convention] 90-90: Line too long (113/100)
(C0301)
[convention] 91-91: Line too long (105/100)
(C0301)
[convention] 92-92: Line too long (106/100)
(C0301)
🤖 Prompt for AI Agents
In cratedb_toolkit/io/cli.py around lines 89 to 92, the help text for thecommand options incorrectly mentions "import" instead of "export." Update thehelp strings to correctly describe that these options relate to exporting data,changing phrases like "where to import the data" to "where to export the data"and similarly adjusting other help messages to reflect export functionality.
About
Adding support to import and export Apache Iceberg tables.
References