Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

[data] add ClickHouse sink#50377

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
bveeramani merged 40 commits intoray-project:masterfromjecsand838:clickhouse_datasink
Apr 18, 2025

Conversation

jecsand838
Copy link
Contributor

Why are these changes needed?

Greetings again from ElastiFlow!

This PR introduces a fully-featured ClickHouse Datasink for Ray, enabling distributed writes from Ray Datasets into ClickHouse. The implementation aligns with Ray’s modern Datasink lifecycle, incorporating mode-based table management (create, append, overwrite), automatic schema handling, parallel block insertion, and optional chunking for large inserts. Additionally, it enhances compatibility with newer Ray versions by handling WriteResult objects inon_write_complete(), ensuring robust write tracking. These changes are essential for supporting high-performance, scalable data ingestion into ClickHouse, making Ray more versatile for real-time analytics and ETL workflows.

Evidence

Screenshot 2025-02-10 at 3 26 59 AM

Related issue number

Follow-up on#49526
@alexeykudinkin

Checks

  • I've signed off every commit(by using the -s flag, i.e.,git commit -s) in this PR.
  • I've runscripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed forhttps://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it indoc/source/tune/api/ under the
      corresponding.rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures athttps://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

alexeykudinkin and raulchen reacted with hooray emoji
Signed-off-by: Connor Sanders <connor@elastiflow.com>
… were buildingSigned-off-by: Connor Sanders <connor@elastiflow.com>
Signed-off-by: Connor Sanders <connor@elastiflow.com>
Signed-off-by: Connor Sanders <connor@elastiflow.com>
Signed-off-by: Connor Sanders <connor@elastiflow.com>
@jecsand838jecsand838 requested a review froma team as acode ownerFebruary 10, 2025 09:52
@jecsand838jecsand838 changed the title[data] add Clickouse sink[data] add ClickHouse sinkFeb 10, 2025
Signed-off-by: Connor Sanders <connor@elastiflow.com>
Signed-off-by: Connor Sanders <connor@elastiflow.com>
@jcotant1jcotant1 added the dataRay Data-related issues labelFeb 11, 2025
@alexeykudinkinalexeykudinkin added the goadd ONLY when ready to merge, run all tests labelFeb 12, 2025
)
return False

def _generate_create_table_sql(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Can you help me understand why all these methods are local functions instead of module ones?

Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

It's all related to that issue you called out below regarding thefor block in blocks:.

Comment on lines 318 to 325
if create_needed:
create_sql = _generate_create_table_sql(
arrow_table.schema,
table,
table_settings,
create_table_template,
)
local_client.command(create_sql)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Let's lift this up intoon_write_started (this should happen once upon start of the writing, rather than inside the task writing individual blocks)

jecsand838 reacted with thumbs up emoji
Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

This one is actually a bit tricky. I need a schema from one of the blocks in order to create the ClickHouse DDL and by extension the ClickHouse table. I couldn't seem to find a cleaner way to do that.

I added a detailed comment in the code explaining why it's there, however let me know how you'd want to proceed.

Comment on lines 347 to 360
for block in blocks:
tasks.append(
_write_single_block.remote(
block,
self._table,
self._mode,
self._table_settings,
self._CREATE_TABLE_TEMPLATE,
self._max_insert_block_rows,
self._dsn,
self._client_settings,
self._client_kwargs,
)
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

You don't need to do that

  • This method's concurrency is already managed by Ray Data --write itself is invoked per block (it just has more generic APIs)
  • This will be circumventing resource management of Ray Data which might result in dead-locks for ex

jecsand838 reacted with thumbs up emoji
Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

This makes a ton of sense. I went ahead and attempted to implement this better. Let me know if it needs more work.

@jecsand838
Copy link
ContributorAuthor

@alexeykudinkin I just read through your comments and will have them resolved by end of this week.

@jecsand838
Copy link
ContributorAuthor

@alexeykudinkin Let me know what you think of these changes and how you'd want to handle thecreate_table logic.

Signed-off-by: Connor Sanders <connor@elastiflow.com>
Signed-off-by: Connor Sanders <connor@elastiflow.com>
@jecsand838
Copy link
ContributorAuthor

@alexeykudinkin I went ahead and got those requested changes pushed up.

alexeykudinkin reacted with thumbs up emoji

Copy link
Contributor

@alexeykudinkinalexeykudinkin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

LGTM, please address the comments regarding running validations before we apply any changes and we're ready to ship this one!

jecsand838 reacted with heart emoji
if self._table_settings.order_by is not None:
order_by = self._table_settings.order_by
else:
order_by = _pick_best_arrow_field_for_order_by(schema)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Do we really need to force ORDER BY clause on the table? Does it carry some perf benefits?

Copy link
ContributorAuthor

@jecsand838jecsand838Apr 18, 2025
edited
Loading

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

@alexeykudinkin Yes, I'd definitely recommend forcing theORDER BY clause.ORDER BY clauses are required for ClickHouse table creation because they implicitly define the primary key while being crucial for query performance and storage efficiency. If it's omitted, ClickHouse raises an error.

The reason thatORDER BY clauses are so important in ClickHouse is that they basicallyplay a dual role of:

  1. Defining the sorting key (which is critical to query performance in ClickHouse tables)
  2. Implicitly defining the primary key (i.e. the sparse primary index)

In contrast to most databases, ClickHouse's concept of a "primary key" is not about a uniqueness constraint. Instead, the primary key in ClickHouse informs a small and completely in-memory sparse index that marks one index entry per data granule (i.e., groupings of 8192 records by default). At query time, ClickHouse then does a binary search over these marks to quickly identify which granules might contain matching rows. It's actually pretty cool how they engineered it.

That translates into massive performance benefits on:

  1. Range queries since ClickHouse prunes whole granules at once if their key-range doesn’t overlap on the filter.
  2. Compression andcache hit rates due to rows with similar values in the sorting key being psychically stored in the same files, improving column‑level compression.

Here's some documentation on the topic if you're curious:

# The result from 'EXISTS table_name' is [[1]] if it exists,
# or [[0]] if it does not.
if result and result.result_rows:
return result.result_rows[0][0] == 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

nit: Would suggest to reduce brittleness of this by explicitly handling the shape of the response instead of assuming it (unless its part of API)

jecsand838 reacted with thumbs up emoji
Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

@alexeykudinkin I think this is a pretty good call out. I went ahead and attempted to make this section of the code less brittle and more flexible. Let me know what you think.

Comment on lines 301 to 304
if self._schema is None:
raise ValueError(
f"Overwriting table {self._table} requires a user-provided schema."
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Let's lift validation as the first op -- we shouldn't drop the table if we fail the op

jecsand838 reacted with thumbs up emoji
Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

@alexeykudinkin I pushed this change.

Comment on lines 348 to 352
if self._schema is None:
raise ValueError(
f"Table {self._table} does not exist in mode='APPEND' and "
"no schema was provided. Cannot create the table without a schema."
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Let's lift this check all the way to the top to avoid duplication in every branch

jecsand838 reacted with thumbs up emoji
Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

@alexeykudinkin I got this change in as well.

Comment on lines 381 to 394
if (
self._max_insert_block_rows
and row_count > self._max_insert_block_rows
):
offset = 0
while offset < row_count:
slice_end = min(offset + self._max_insert_block_rows, row_count)
chunk = arrow_table.slice(offset, slice_end - offset)
client.insert_arrow(self._table, chunk)
total_inserted += chunk.num_rows
offset = slice_end
else:
client.insert_arrow(self._table, arrow_table)
total_inserted += row_count
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

This chunking could be unified:

offsets = list(range(0, block.num_rows, max_chunk_size))for idx in range(len(offsets) - 1):   block.slice(offsets[idx], offsets[idx + 1])

jecsand838 reacted with thumbs up emoji
Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

@alexeykudinkin I pushed the chunking unification changes up in my latest commit.

@jecsand838
Copy link
ContributorAuthor

@alexeykudinkin Awesome! I'll get those final comments addressed in the next day or so.

jecsand838and others added5 commitsApril 17, 2025 17:48
Co-authored-by: Alexey Kudinkin <alexey.kudinkin@gmail.com>Signed-off-by: Connor Sanders <connor@elastiflow.com>
Signed-off-by: Connor Sanders <connor@elastiflow.com>
Signed-off-by: Connor Sanders <connor@elastiflow.com>
@jecsand838
Copy link
ContributorAuthor

@alexeykudinkin I just pushed up changes addressing your follow-up comments. Let me know if you come across any other updates that need to be made.

Also I went ahead and thoroughly tested the current state of the ClickHouse Datasink against a live ClickHouse instance. Everything is running perfectly. I made sure to cover create, append, and overwrite modes.

Here's a screenshot from those tests:

Screenshot 2025-04-18 at 12 55 02 AM

@bveeramanibveeramani merged commitd06c5d2 intoray-project:masterApr 18, 2025
5 checks passed
@alexeykudinkin
Copy link
Contributor

@jecsand838 thank you very much for all of your efforts and amazing contributions!

jecsand838 reacted with heart emoji

@gvspraveen
Copy link
Contributor

@jecsand838 Thank you for these contributions.

jecsand838 reacted with heart emoji

ktyxx pushed a commit to ktyxx/ray that referenced this pull requestApr 29, 2025
<!-- Thank you for your contribution! Please reviewhttps://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst beforeopening a pull request. --><!-- Please add a reviewer to the assignee section when you create a PR.If you don't have the access to it, we will shortly find a reviewer andassign them to your PR. -->## Why are these changes needed?Greetings again from ElastiFlow!This PR introduces a fully-featured ClickHouse Datasink for Ray,enabling distributed writes from Ray Datasets into ClickHouse. Theimplementation aligns with Ray’s modern Datasink lifecycle,incorporating mode-based table management (create, append, overwrite),automatic schema handling, parallel block insertion, and optionalchunking for large inserts. Additionally, it enhances compatibility withnewer Ray versions by handling WriteResult objects in`on_write_complete()`, ensuring robust write tracking. These changes areessential for supporting high-performance, scalable data ingestion intoClickHouse, making Ray more versatile for real-time analytics and ETLworkflows.### Evidence<img width="1726" alt="Screenshot 2025-02-10 at 3 26 59 AM"src="https://github.com/user-attachments/assets/8bfdce09-0bc5-416b-bf48-fba6aabf7da3"/>## Related issue numberFollow-up onray-project#49526@alexeykudinkin ## Checks- [x] I've signed off every commit(by using the -s flag, i.e., `gitcommit -s`) in this PR.- [x] I've run `scripts/format.sh` to lint the changes in this PR.- [x] I've included any doc changes needed forhttps://docs.ray.io/en/master/.- [x] I've added any new APIs to the API Reference. For example, if Iadded amethod in Tune, I've added it in `doc/source/tune/api/` under the           corresponding `.rst` file.- [x] I've made sure the tests are passing. Note that there might be afew flaky tests, see the recent failures athttps://flakey-tests.ray.io/- Testing Strategy   - [x] Unit tests   - [ ] Release tests   - [ ] This PR is not tested :(---------Signed-off-by: Connor Sanders <connor@elastiflow.com>Co-authored-by: Alexey Kudinkin <alexey.kudinkin@gmail.com>
vickytsang pushed a commit to ROCm/ray that referenced this pull requestMay 5, 2025
<!-- Thank you for your contribution! Please reviewhttps://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst beforeopening a pull request. --><!-- Please add a reviewer to the assignee section when you create a PR.If you don't have the access to it, we will shortly find a reviewer andassign them to your PR. -->## Why are these changes needed?Greetings again from ElastiFlow!This PR introduces a fully-featured ClickHouse Datasink for Ray,enabling distributed writes from Ray Datasets into ClickHouse. Theimplementation aligns with Ray’s modern Datasink lifecycle,incorporating mode-based table management (create, append, overwrite),automatic schema handling, parallel block insertion, and optionalchunking for large inserts. Additionally, it enhances compatibility withnewer Ray versions by handling WriteResult objects in`on_write_complete()`, ensuring robust write tracking. These changes areessential for supporting high-performance, scalable data ingestion intoClickHouse, making Ray more versatile for real-time analytics and ETLworkflows.### Evidence<img width="1726" alt="Screenshot 2025-02-10 at 3 26 59 AM"src="https://github.com/user-attachments/assets/8bfdce09-0bc5-416b-bf48-fba6aabf7da3"/>## Related issue numberFollow-up onray-project#49526@alexeykudinkin ## Checks- [x] I've signed off every commit(by using the -s flag, i.e., `gitcommit -s`) in this PR.- [x] I've run `scripts/format.sh` to lint the changes in this PR.- [x] I've included any doc changes needed forhttps://docs.ray.io/en/master/.- [x] I've added any new APIs to the API Reference. For example, if Iadded amethod in Tune, I've added it in `doc/source/tune/api/` under the           corresponding `.rst` file.- [x] I've made sure the tests are passing. Note that there might be afew flaky tests, see the recent failures athttps://flakey-tests.ray.io/- Testing Strategy   - [x] Unit tests   - [ ] Release tests   - [ ] This PR is not tested :(---------Signed-off-by: Connor Sanders <connor@elastiflow.com>Co-authored-by: Alexey Kudinkin <alexey.kudinkin@gmail.com>
@jecsand838jecsand838 deleted the clickhouse_datasink branchMay 5, 2025 21:39
zhaoch23 pushed a commit to Bye-legumes/ray that referenced this pull requestMay 14, 2025
<!-- Thank you for your contribution! Please reviewhttps://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst beforeopening a pull request. --><!-- Please add a reviewer to the assignee section when you create a PR.If you don't have the access to it, we will shortly find a reviewer andassign them to your PR. -->## Why are these changes needed?Greetings again from ElastiFlow!This PR introduces a fully-featured ClickHouse Datasink for Ray,enabling distributed writes from Ray Datasets into ClickHouse. Theimplementation aligns with Ray’s modern Datasink lifecycle,incorporating mode-based table management (create, append, overwrite),automatic schema handling, parallel block insertion, and optionalchunking for large inserts. Additionally, it enhances compatibility withnewer Ray versions by handling WriteResult objects in`on_write_complete()`, ensuring robust write tracking. These changes areessential for supporting high-performance, scalable data ingestion intoClickHouse, making Ray more versatile for real-time analytics and ETLworkflows.### Evidence<img width="1726" alt="Screenshot 2025-02-10 at 3 26 59 AM"src="https://github.com/user-attachments/assets/8bfdce09-0bc5-416b-bf48-fba6aabf7da3"/>## Related issue numberFollow-up onray-project#49526@alexeykudinkin## Checks- [x] I've signed off every commit(by using the -s flag, i.e., `gitcommit -s`) in this PR.- [x] I've run `scripts/format.sh` to lint the changes in this PR.- [x] I've included any doc changes needed forhttps://docs.ray.io/en/master/.- [x] I've added any new APIs to the API Reference. For example, if Iadded amethod in Tune, I've added it in `doc/source/tune/api/` under the           corresponding `.rst` file.- [x] I've made sure the tests are passing. Note that there might be afew flaky tests, see the recent failures athttps://flakey-tests.ray.io/- Testing Strategy   - [x] Unit tests   - [ ] Release tests   - [ ] This PR is not tested :(---------Signed-off-by: Connor Sanders <connor@elastiflow.com>Co-authored-by: Alexey Kudinkin <alexey.kudinkin@gmail.com>Signed-off-by: zhaoch23 <c233zhao@uwaterloo.ca>
Sign up for freeto join this conversation on GitHub. Already have an account?Sign in to comment
Reviewers

@alexeykudinkinalexeykudinkinalexeykudinkin approved these changes

Assignees

@alexeykudinkinalexeykudinkin

Labels
community-backlogcommunity-contributionContributed by the communitydataRay Data-related issuesgoadd ONLY when ready to merge, run all tests
Projects
None yet
Milestone
No milestone
Development

Successfully merging this pull request may close these issues.

6 participants
@jecsand838@alexeykudinkin@gvspraveen@bveeramani@hainesmichaelc@jcotant1

[8]ページ先頭

©2009-2025 Movatter.jp