- Notifications
You must be signed in to change notification settings - Fork6.6k
[data] add ClickHouse sink#50377
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Uh oh!
There was an error while loading.Please reload this page.
Conversation
Signed-off-by: Connor Sanders <connor@elastiflow.com>
… were buildingSigned-off-by: Connor Sanders <connor@elastiflow.com>
Signed-off-by: Connor Sanders <connor@elastiflow.com>
Signed-off-by: Connor Sanders <connor@elastiflow.com>
Signed-off-by: Connor Sanders <connor@elastiflow.com>
Signed-off-by: Connor Sanders <connor@elastiflow.com>
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
) | ||
return False | ||
def _generate_create_table_sql( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Can you help me understand why all these methods are local functions instead of module ones?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
It's all related to that issue you called out below regarding thefor block in blocks:
.
if create_needed: | ||
create_sql = _generate_create_table_sql( | ||
arrow_table.schema, | ||
table, | ||
table_settings, | ||
create_table_template, | ||
) | ||
local_client.command(create_sql) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Let's lift this up intoon_write_started
(this should happen once upon start of the writing, rather than inside the task writing individual blocks)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
This one is actually a bit tricky. I need a schema from one of the blocks in order to create the ClickHouse DDL and by extension the ClickHouse table. I couldn't seem to find a cleaner way to do that.
I added a detailed comment in the code explaining why it's there, however let me know how you'd want to proceed.
for block in blocks: | ||
tasks.append( | ||
_write_single_block.remote( | ||
block, | ||
self._table, | ||
self._mode, | ||
self._table_settings, | ||
self._CREATE_TABLE_TEMPLATE, | ||
self._max_insert_block_rows, | ||
self._dsn, | ||
self._client_settings, | ||
self._client_kwargs, | ||
) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
You don't need to do that
- This method's concurrency is already managed by Ray Data --
write
itself is invoked per block (it just has more generic APIs) - This will be circumventing resource management of Ray Data which might result in dead-locks for ex
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
This makes a ton of sense. I went ahead and attempted to implement this better. Let me know if it needs more work.
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
@alexeykudinkin I just read through your comments and will have them resolved by end of this week. |
…e ClickHouseDatasinkSigned-off-by: Connor Sanders <connor@elastiflow.com>
@alexeykudinkin Let me know what you think of these changes and how you'd want to handle the |
Signed-off-by: Connor Sanders <connor@elastiflow.com>
Signed-off-by: Connor Sanders <connor@elastiflow.com>
@alexeykudinkin I went ahead and got those requested changes pushed up. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
LGTM, please address the comments regarding running validations before we apply any changes and we're ready to ship this one!
if self._table_settings.order_by is not None: | ||
order_by = self._table_settings.order_by | ||
else: | ||
order_by = _pick_best_arrow_field_for_order_by(schema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Do we really need to force ORDER BY clause on the table? Does it carry some perf benefits?
jecsand838Apr 18, 2025 • edited
Loading Uh oh!
There was an error while loading.Please reload this page.
edited
Uh oh!
There was an error while loading.Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
@alexeykudinkin Yes, I'd definitely recommend forcing theORDER BY
clause.ORDER BY
clauses are required for ClickHouse table creation because they implicitly define the primary key while being crucial for query performance and storage efficiency. If it's omitted, ClickHouse raises an error.
The reason thatORDER BY
clauses are so important in ClickHouse is that they basicallyplay a dual role of:
- Defining the sorting key (which is critical to query performance in ClickHouse tables)
- Implicitly defining the primary key (i.e. the sparse primary index)
In contrast to most databases, ClickHouse's concept of a "primary key" is not about a uniqueness constraint. Instead, the primary key in ClickHouse informs a small and completely in-memory sparse index that marks one index entry per data granule (i.e., groupings of 8192 records by default). At query time, ClickHouse then does a binary search over these marks to quickly identify which granules might contain matching rows. It's actually pretty cool how they engineered it.
That translates into massive performance benefits on:
- Range queries since ClickHouse prunes whole granules at once if their key-range doesn’t overlap on the filter.
- Compression andcache hit rates due to rows with similar values in the sorting key being psychically stored in the same files, improving column‑level compression.
Here's some documentation on the topic if you're curious:
# The result from 'EXISTS table_name' is [[1]] if it exists, | ||
# or [[0]] if it does not. | ||
if result and result.result_rows: | ||
return result.result_rows[0][0] == 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
nit: Would suggest to reduce brittleness of this by explicitly handling the shape of the response instead of assuming it (unless its part of API)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
@alexeykudinkin I think this is a pretty good call out. I went ahead and attempted to make this section of the code less brittle and more flexible. Let me know what you think.
if self._schema is None: | ||
raise ValueError( | ||
f"Overwriting table {self._table} requires a user-provided schema." | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Let's lift validation as the first op -- we shouldn't drop the table if we fail the op
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
@alexeykudinkin I pushed this change.
Uh oh!
There was an error while loading.Please reload this page.
if self._schema is None: | ||
raise ValueError( | ||
f"Table {self._table} does not exist in mode='APPEND' and " | ||
"no schema was provided. Cannot create the table without a schema." | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Let's lift this check all the way to the top to avoid duplication in every branch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
@alexeykudinkin I got this change in as well.
if ( | ||
self._max_insert_block_rows | ||
and row_count > self._max_insert_block_rows | ||
): | ||
offset = 0 | ||
while offset < row_count: | ||
slice_end = min(offset + self._max_insert_block_rows, row_count) | ||
chunk = arrow_table.slice(offset, slice_end - offset) | ||
client.insert_arrow(self._table, chunk) | ||
total_inserted += chunk.num_rows | ||
offset = slice_end | ||
else: | ||
client.insert_arrow(self._table, arrow_table) | ||
total_inserted += row_count |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
This chunking could be unified:
offsets = list(range(0, block.num_rows, max_chunk_size))for idx in range(len(offsets) - 1): block.slice(offsets[idx], offsets[idx + 1])
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
@alexeykudinkin I pushed the chunking unification changes up in my latest commit.
@alexeykudinkin Awesome! I'll get those final comments addressed in the next day or so. |
Co-authored-by: Alexey Kudinkin <alexey.kudinkin@gmail.com>Signed-off-by: Connor Sanders <connor@elastiflow.com>
Signed-off-by: Connor Sanders <connor@elastiflow.com>
Signed-off-by: Connor Sanders <connor@elastiflow.com>
@alexeykudinkin I just pushed up changes addressing your follow-up comments. Let me know if you come across any other updates that need to be made. Also I went ahead and thoroughly tested the current state of the ClickHouse Datasink against a live ClickHouse instance. Everything is running perfectly. I made sure to cover create, append, and overwrite modes. Here's a screenshot from those tests: |
d06c5d2
intoray-project:masterUh oh!
There was an error while loading.Please reload this page.
@jecsand838 thank you very much for all of your efforts and amazing contributions! |
@jecsand838 Thank you for these contributions. |
<!-- Thank you for your contribution! Please reviewhttps://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst beforeopening a pull request. --><!-- Please add a reviewer to the assignee section when you create a PR.If you don't have the access to it, we will shortly find a reviewer andassign them to your PR. -->## Why are these changes needed?Greetings again from ElastiFlow!This PR introduces a fully-featured ClickHouse Datasink for Ray,enabling distributed writes from Ray Datasets into ClickHouse. Theimplementation aligns with Ray’s modern Datasink lifecycle,incorporating mode-based table management (create, append, overwrite),automatic schema handling, parallel block insertion, and optionalchunking for large inserts. Additionally, it enhances compatibility withnewer Ray versions by handling WriteResult objects in`on_write_complete()`, ensuring robust write tracking. These changes areessential for supporting high-performance, scalable data ingestion intoClickHouse, making Ray more versatile for real-time analytics and ETLworkflows.### Evidence<img width="1726" alt="Screenshot 2025-02-10 at 3 26 59 AM"src="https://github.com/user-attachments/assets/8bfdce09-0bc5-416b-bf48-fba6aabf7da3"/>## Related issue numberFollow-up onray-project#49526@alexeykudinkin ## Checks- [x] I've signed off every commit(by using the -s flag, i.e., `gitcommit -s`) in this PR.- [x] I've run `scripts/format.sh` to lint the changes in this PR.- [x] I've included any doc changes needed forhttps://docs.ray.io/en/master/.- [x] I've added any new APIs to the API Reference. For example, if Iadded amethod in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file.- [x] I've made sure the tests are passing. Note that there might be afew flaky tests, see the recent failures athttps://flakey-tests.ray.io/- Testing Strategy - [x] Unit tests - [ ] Release tests - [ ] This PR is not tested :(---------Signed-off-by: Connor Sanders <connor@elastiflow.com>Co-authored-by: Alexey Kudinkin <alexey.kudinkin@gmail.com>
<!-- Thank you for your contribution! Please reviewhttps://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst beforeopening a pull request. --><!-- Please add a reviewer to the assignee section when you create a PR.If you don't have the access to it, we will shortly find a reviewer andassign them to your PR. -->## Why are these changes needed?Greetings again from ElastiFlow!This PR introduces a fully-featured ClickHouse Datasink for Ray,enabling distributed writes from Ray Datasets into ClickHouse. Theimplementation aligns with Ray’s modern Datasink lifecycle,incorporating mode-based table management (create, append, overwrite),automatic schema handling, parallel block insertion, and optionalchunking for large inserts. Additionally, it enhances compatibility withnewer Ray versions by handling WriteResult objects in`on_write_complete()`, ensuring robust write tracking. These changes areessential for supporting high-performance, scalable data ingestion intoClickHouse, making Ray more versatile for real-time analytics and ETLworkflows.### Evidence<img width="1726" alt="Screenshot 2025-02-10 at 3 26 59 AM"src="https://github.com/user-attachments/assets/8bfdce09-0bc5-416b-bf48-fba6aabf7da3"/>## Related issue numberFollow-up onray-project#49526@alexeykudinkin ## Checks- [x] I've signed off every commit(by using the -s flag, i.e., `gitcommit -s`) in this PR.- [x] I've run `scripts/format.sh` to lint the changes in this PR.- [x] I've included any doc changes needed forhttps://docs.ray.io/en/master/.- [x] I've added any new APIs to the API Reference. For example, if Iadded amethod in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file.- [x] I've made sure the tests are passing. Note that there might be afew flaky tests, see the recent failures athttps://flakey-tests.ray.io/- Testing Strategy - [x] Unit tests - [ ] Release tests - [ ] This PR is not tested :(---------Signed-off-by: Connor Sanders <connor@elastiflow.com>Co-authored-by: Alexey Kudinkin <alexey.kudinkin@gmail.com>
<!-- Thank you for your contribution! Please reviewhttps://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst beforeopening a pull request. --><!-- Please add a reviewer to the assignee section when you create a PR.If you don't have the access to it, we will shortly find a reviewer andassign them to your PR. -->## Why are these changes needed?Greetings again from ElastiFlow!This PR introduces a fully-featured ClickHouse Datasink for Ray,enabling distributed writes from Ray Datasets into ClickHouse. Theimplementation aligns with Ray’s modern Datasink lifecycle,incorporating mode-based table management (create, append, overwrite),automatic schema handling, parallel block insertion, and optionalchunking for large inserts. Additionally, it enhances compatibility withnewer Ray versions by handling WriteResult objects in`on_write_complete()`, ensuring robust write tracking. These changes areessential for supporting high-performance, scalable data ingestion intoClickHouse, making Ray more versatile for real-time analytics and ETLworkflows.### Evidence<img width="1726" alt="Screenshot 2025-02-10 at 3 26 59 AM"src="https://github.com/user-attachments/assets/8bfdce09-0bc5-416b-bf48-fba6aabf7da3"/>## Related issue numberFollow-up onray-project#49526@alexeykudinkin## Checks- [x] I've signed off every commit(by using the -s flag, i.e., `gitcommit -s`) in this PR.- [x] I've run `scripts/format.sh` to lint the changes in this PR.- [x] I've included any doc changes needed forhttps://docs.ray.io/en/master/.- [x] I've added any new APIs to the API Reference. For example, if Iadded amethod in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file.- [x] I've made sure the tests are passing. Note that there might be afew flaky tests, see the recent failures athttps://flakey-tests.ray.io/- Testing Strategy - [x] Unit tests - [ ] Release tests - [ ] This PR is not tested :(---------Signed-off-by: Connor Sanders <connor@elastiflow.com>Co-authored-by: Alexey Kudinkin <alexey.kudinkin@gmail.com>Signed-off-by: zhaoch23 <c233zhao@uwaterloo.ca>
Why are these changes needed?
Greetings again from ElastiFlow!
This PR introduces a fully-featured ClickHouse Datasink for Ray, enabling distributed writes from Ray Datasets into ClickHouse. The implementation aligns with Ray’s modern Datasink lifecycle, incorporating mode-based table management (create, append, overwrite), automatic schema handling, parallel block insertion, and optional chunking for large inserts. Additionally, it enhances compatibility with newer Ray versions by handling WriteResult objects in
on_write_complete()
, ensuring robust write tracking. These changes are essential for supporting high-performance, scalable data ingestion into ClickHouse, making Ray more versatile for real-time analytics and ETL workflows.Evidence
Related issue number
Follow-up on#49526
@alexeykudinkin
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.