Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

[data] add ClickHouse sink#50377

Open
jecsand838 wants to merge33 commits intoray-project:master
base:master
Choose a base branch
Loading
fromjecsand838:clickhouse_datasink

Conversation

jecsand838
Copy link
Contributor

Why are these changes needed?

Greetings again from ElastiFlow!

This PR introduces a fully-featured ClickHouse Datasink for Ray, enabling distributed writes from Ray Datasets into ClickHouse. The implementation aligns with Ray’s modern Datasink lifecycle, incorporating mode-based table management (create, append, overwrite), automatic schema handling, parallel block insertion, and optional chunking for large inserts. Additionally, it enhances compatibility with newer Ray versions by handling WriteResult objects inon_write_complete(), ensuring robust write tracking. These changes are essential for supporting high-performance, scalable data ingestion into ClickHouse, making Ray more versatile for real-time analytics and ETL workflows.

Evidence

Screenshot 2025-02-10 at 3 26 59 AM

Related issue number

Follow-up on#49526
@alexeykudinkin

Checks

  • I've signed off every commit(by using the -s flag, i.e.,git commit -s) in this PR.
  • I've runscripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed forhttps://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it indoc/source/tune/api/ under the
      corresponding.rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures athttps://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

alexeykudinkin reacted with hooray emoji
Signed-off-by: Connor Sanders <connor@elastiflow.com>
… were buildingSigned-off-by: Connor Sanders <connor@elastiflow.com>
Signed-off-by: Connor Sanders <connor@elastiflow.com>
Signed-off-by: Connor Sanders <connor@elastiflow.com>
Signed-off-by: Connor Sanders <connor@elastiflow.com>
@jecsand838jecsand838 requested a review froma team as acode ownerFebruary 10, 2025 09:52
@jecsand838jecsand838 changed the title[data] add Clickouse sink[data] add ClickHouse sinkFeb 10, 2025
Signed-off-by: Connor Sanders <connor@elastiflow.com>
Signed-off-by: Connor Sanders <connor@elastiflow.com>
@jcotant1jcotant1 added the dataRay Data-related issues labelFeb 11, 2025
@alexeykudinkinalexeykudinkin added the goadd ONLY when ready to merge, run all tests labelFeb 12, 2025
)
return False

def _generate_create_table_sql(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Can you help me understand why all these methods are local functions instead of module ones?

Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

It's all related to that issue you called out below regarding thefor block in blocks:.

Comment on lines 318 to 325
if create_needed:
create_sql = _generate_create_table_sql(
arrow_table.schema,
table,
table_settings,
create_table_template,
)
local_client.command(create_sql)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Let's lift this up intoon_write_started (this should happen once upon start of the writing, rather than inside the task writing individual blocks)

jecsand838 reacted with thumbs up emoji
Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

This one is actually a bit tricky. I need a schema from one of the blocks in order to create the ClickHouse DDL and by extension the ClickHouse table. I couldn't seem to find a cleaner way to do that.

I added a detailed comment in the code explaining why it's there, however let me know how you'd want to proceed.

Comment on lines 347 to 360
for block in blocks:
tasks.append(
_write_single_block.remote(
block,
self._table,
self._mode,
self._table_settings,
self._CREATE_TABLE_TEMPLATE,
self._max_insert_block_rows,
self._dsn,
self._client_settings,
self._client_kwargs,
)
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

You don't need to do that

  • This method's concurrency is already managed by Ray Data --write itself is invoked per block (it just has more generic APIs)
  • This will be circumventing resource management of Ray Data which might result in dead-locks for ex

jecsand838 reacted with thumbs up emoji
Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

This makes a ton of sense. I went ahead and attempted to implement this better. Let me know if it needs more work.

@jecsand838
Copy link
ContributorAuthor

@alexeykudinkin I just read through your comments and will have them resolved by end of this week.

@jecsand838
Copy link
ContributorAuthor

@alexeykudinkin Let me know what you think of these changes and how you'd want to handle thecreate_table logic.

Copy link
Contributor

@alexeykudinkinalexeykudinkin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Mostly LGTM, just need to resolve race-condition creating the table

Comment on lines 93 to 94
precision = field.type.precision or 38
scale = field.type.scale or 10
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Please extract to constants

jecsand838 reacted with thumbs up emoji
Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

self,
schema: pyarrow.Schema,
) -> str:
engine = self._table_settings.get("engine", "MergeTree()")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Can you please create a dataclass instead with all the opts documented?

jecsand838 reacted with thumbs up emoji
Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

@alexeykudinkin Added in aClickHouseTableSettings Dataclass

Comment on lines 37 to 70
def _arrow_type_name(arrow_type: pyarrow.DataType) -> str:
if pat.is_boolean(arrow_type):
return "bool"
elif pat.is_int8(arrow_type):
return "int8"
elif pat.is_int16(arrow_type):
return "int16"
elif pat.is_int32(arrow_type):
return "int32"
elif pat.is_int64(arrow_type):
return "int64"
elif pat.is_uint8(arrow_type):
return "uint8"
elif pat.is_uint16(arrow_type):
return "uint16"
elif pat.is_uint32(arrow_type):
return "uint32"
elif pat.is_uint64(arrow_type):
return "uint64"
elif pat.is_float16(arrow_type):
return "float16"
elif pat.is_float32(arrow_type):
return "float32"
elif pat.is_float64(arrow_type):
return "float64"
elif pat.is_decimal(arrow_type):
return "decimal"
elif pat.is_string(arrow_type) or pat.is_large_string(arrow_type):
return "string"
elif pat.is_binary(arrow_type) or pat.is_large_binary(arrow_type):
return "string"
elif pat.is_timestamp(arrow_type):
return "timestamp"
else:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

You don't need this you can just dostr(pa_type)

jecsand838 reacted with thumbs up emoji
Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

@alexeykudinkin Made this change.

self._table_dropped = True
# If the old table existed and no explicit "order_by" is set,
# adopt the existing ORDER BY from the old table DDL.
if table_exists and self._table_settings.get("order_by") is None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Shouldn't we do this before the drop?

jecsand838 reacted with thumbs up emoji
Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

@alexeykudinkin Good callout. I updated this.

if table_exists and self._table_settings.get("order_by") is None:
existing_order_by = self._get_existing_order_by(client)
if existing_order_by:
self._table_settings["order_by"] = existing_order_by
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

I'd suggest to report an error if these 2 diverge, rather than just silently overwriting it

jecsand838 reacted with thumbs up emoji
Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

@alexeykudinkin Added in that error.

f"Reusing existing ORDER BY for table {self._table}: {existing_order_by}"
)
except Exception as e:
logger.warning(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

nit: Should be an error since we raise

jecsand838 reacted with thumbs up emoji
Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

@alexeykudinkin I updated this to log an error.

def on_write_start(self) -> None:
client = None
try:
client = self._init_client()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Why don't we cache the client?

Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Let me try seeing what the impact is when usingAsyncClient. I know from my experience working with ClickHouse in other languages such as Go the ClickHouse client generally isn't multi-thread safe.

i.e.ClickHouse/clickhouse-connect#141

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

It's not going to be used in multi-threaded env -- so long as the client is Pickle serializable (which it very well might not be) it will work out of the box if we just persist it.

This is a nit though, so don't spend too much time on it

Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

@alexeykudinkin This is the only change I didn't get in. Any issues if I get this done in a future PR?

client_settings: Optional[Dict[str, Any]] = None,
client_kwargs: Optional[Dict[str, Any]] = None,
table_settings: Optional[Dict[str, Any]] = None,
max_insert_block_rows: Optional[int] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

(Doesn't need to be solved in this PR)

We should allow users to specify the write schema (to be able to override for ex auto-deduced one)

jecsand838 reacted with thumbs up emoji
Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

@alexeykudinkin I implemented the user defined schema functionality.

Comment on lines 338 to 346
# For now, we're creating the table here because
# the arrow schema from a block is required to generate the
# ClickHouse DDL. The first worker that calls this
# will create the table. Subsequent workers skip it.
if not self._table_exists(client):
create_sql = self._generate_create_table_sql(
arrow_table.schema,
)
client.command(create_sql)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

The problem with this approach though is that there're will be a race-condition b/w the write tasks trying to create a table here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

We might actually have to require user to specify the schema (inside write_clickhouse API)

Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

That's a good callout.

In a future PR do you think it would be valuable / possible to extendon_write_start to include a sample block?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

We definitely re-evaluating schema handling a bit more holistically right now

Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

@alexeykudinkin I resolved this by removing the logic that infers the schema from a block. Now it will rely on a user defined schema or an existing ClickHouse table definition.

@jecsand838
Copy link
ContributorAuthor

@alexeykudinkin I've read through your comments. I'll be able to address them around end of week / over the weekend.

alexeykudinkin reacted with thumbs up emoji

@alexeykudinkin
Copy link
Contributor

@jecsand838 please tag me in a comment so that it pops to the top of my inbox and i can prioritize landing it

jecsand838 reacted with thumbs up emoji

@hainesmichaelchainesmichaelc added the community-contributionContributed by the community labelApr 4, 2025
jecsand838and others added5 commitsApril 5, 2025 18:19
Co-authored-by: Alexey Kudinkin <alexey.kudinkin@gmail.com>Signed-off-by: Connor Sanders <connor@elastiflow.com>
Signed-off-by: Connor Sanders <connor@elastiflow.com>
Signed-off-by: Connor Sanders <connor@elastiflow.com>
@jecsand838
Copy link
ContributorAuthor

@alexeykudinkin I went ahead and got those requested changes pushed up.

Sign up for freeto join this conversation on GitHub. Already have an account?Sign in to comment
Reviewers

@alexeykudinkinalexeykudinkinAwaiting requested review from alexeykudinkin

At least 1 approving review is required to merge this pull request.

Assignees

@alexeykudinkinalexeykudinkin

Labels
community-contributionContributed by the communitydataRay Data-related issuesgoadd ONLY when ready to merge, run all tests
Projects
None yet
Milestone
No milestone
Development

Successfully merging this pull request may close these issues.

4 participants
@jecsand838@alexeykudinkin@hainesmichaelc@jcotant1

[8]ページ先頭

©2009-2025 Movatter.jp