- Notifications
You must be signed in to change notification settings - Fork6.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
[data] add ClickHouse sink#50377
base:master
Are you sure you want to change the base?
Conversation
Signed-off-by: Connor Sanders <connor@elastiflow.com>
… were buildingSigned-off-by: Connor Sanders <connor@elastiflow.com>
Signed-off-by: Connor Sanders <connor@elastiflow.com>
Signed-off-by: Connor Sanders <connor@elastiflow.com>
Signed-off-by: Connor Sanders <connor@elastiflow.com>
Signed-off-by: Connor Sanders <connor@elastiflow.com>
) | ||
return False | ||
def _generate_create_table_sql( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Can you help me understand why all these methods are local functions instead of module ones?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
It's all related to that issue you called out below regarding thefor block in blocks:
.
if create_needed: | ||
create_sql = _generate_create_table_sql( | ||
arrow_table.schema, | ||
table, | ||
table_settings, | ||
create_table_template, | ||
) | ||
local_client.command(create_sql) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Let's lift this up intoon_write_started
(this should happen once upon start of the writing, rather than inside the task writing individual blocks)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
This one is actually a bit tricky. I need a schema from one of the blocks in order to create the ClickHouse DDL and by extension the ClickHouse table. I couldn't seem to find a cleaner way to do that.
I added a detailed comment in the code explaining why it's there, however let me know how you'd want to proceed.
for block in blocks: | ||
tasks.append( | ||
_write_single_block.remote( | ||
block, | ||
self._table, | ||
self._mode, | ||
self._table_settings, | ||
self._CREATE_TABLE_TEMPLATE, | ||
self._max_insert_block_rows, | ||
self._dsn, | ||
self._client_settings, | ||
self._client_kwargs, | ||
) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
You don't need to do that
- This method's concurrency is already managed by Ray Data --
write
itself is invoked per block (it just has more generic APIs) - This will be circumventing resource management of Ray Data which might result in dead-locks for ex
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
This makes a ton of sense. I went ahead and attempted to implement this better. Let me know if it needs more work.
@alexeykudinkin I just read through your comments and will have them resolved by end of this week. |
…e ClickHouseDatasinkSigned-off-by: Connor Sanders <connor@elastiflow.com>
@alexeykudinkin Let me know what you think of these changes and how you'd want to handle the |
Signed-off-by: Connor Sanders <connor@elastiflow.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Mostly LGTM, just need to resolve race-condition creating the table
precision = field.type.precision or 38 | ||
scale = field.type.scale or 10 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Please extract to constants
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
@alexeykudinkin Done!
self, | ||
schema: pyarrow.Schema, | ||
) -> str: | ||
engine = self._table_settings.get("engine", "MergeTree()") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Can you please create a dataclass instead with all the opts documented?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
@alexeykudinkin Added in aClickHouseTableSettings
Dataclass
def _arrow_type_name(arrow_type: pyarrow.DataType) -> str: | ||
if pat.is_boolean(arrow_type): | ||
return "bool" | ||
elif pat.is_int8(arrow_type): | ||
return "int8" | ||
elif pat.is_int16(arrow_type): | ||
return "int16" | ||
elif pat.is_int32(arrow_type): | ||
return "int32" | ||
elif pat.is_int64(arrow_type): | ||
return "int64" | ||
elif pat.is_uint8(arrow_type): | ||
return "uint8" | ||
elif pat.is_uint16(arrow_type): | ||
return "uint16" | ||
elif pat.is_uint32(arrow_type): | ||
return "uint32" | ||
elif pat.is_uint64(arrow_type): | ||
return "uint64" | ||
elif pat.is_float16(arrow_type): | ||
return "float16" | ||
elif pat.is_float32(arrow_type): | ||
return "float32" | ||
elif pat.is_float64(arrow_type): | ||
return "float64" | ||
elif pat.is_decimal(arrow_type): | ||
return "decimal" | ||
elif pat.is_string(arrow_type) or pat.is_large_string(arrow_type): | ||
return "string" | ||
elif pat.is_binary(arrow_type) or pat.is_large_binary(arrow_type): | ||
return "string" | ||
elif pat.is_timestamp(arrow_type): | ||
return "timestamp" | ||
else: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
You don't need this you can just dostr(pa_type)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
@alexeykudinkin Made this change.
self._table_dropped = True | ||
# If the old table existed and no explicit "order_by" is set, | ||
# adopt the existing ORDER BY from the old table DDL. | ||
if table_exists and self._table_settings.get("order_by") is None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Shouldn't we do this before the drop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
@alexeykudinkin Good callout. I updated this.
if table_exists and self._table_settings.get("order_by") is None: | ||
existing_order_by = self._get_existing_order_by(client) | ||
if existing_order_by: | ||
self._table_settings["order_by"] = existing_order_by |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
I'd suggest to report an error if these 2 diverge, rather than just silently overwriting it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
@alexeykudinkin Added in that error.
f"Reusing existing ORDER BY for table {self._table}: {existing_order_by}" | ||
) | ||
except Exception as e: | ||
logger.warning( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
nit: Should be an error since we raise
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
@alexeykudinkin I updated this to log an error.
def on_write_start(self) -> None: | ||
client = None | ||
try: | ||
client = self._init_client() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Why don't we cache the client?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Let me try seeing what the impact is when usingAsyncClient
. I know from my experience working with ClickHouse in other languages such as Go the ClickHouse client generally isn't multi-thread safe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
It's not going to be used in multi-threaded env -- so long as the client is Pickle serializable (which it very well might not be) it will work out of the box if we just persist it.
This is a nit though, so don't spend too much time on it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
@alexeykudinkin This is the only change I didn't get in. Any issues if I get this done in a future PR?
client_settings: Optional[Dict[str, Any]] = None, | ||
client_kwargs: Optional[Dict[str, Any]] = None, | ||
table_settings: Optional[Dict[str, Any]] = None, | ||
max_insert_block_rows: Optional[int] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
(Doesn't need to be solved in this PR)
We should allow users to specify the write schema (to be able to override for ex auto-deduced one)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
@alexeykudinkin I implemented the user defined schema functionality.
# For now, we're creating the table here because | ||
# the arrow schema from a block is required to generate the | ||
# ClickHouse DDL. The first worker that calls this | ||
# will create the table. Subsequent workers skip it. | ||
if not self._table_exists(client): | ||
create_sql = self._generate_create_table_sql( | ||
arrow_table.schema, | ||
) | ||
client.command(create_sql) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
The problem with this approach though is that there're will be a race-condition b/w the write tasks trying to create a table here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
We might actually have to require user to specify the schema (inside write_clickhouse API)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
That's a good callout.
In a future PR do you think it would be valuable / possible to extendon_write_start
to include a sample block?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
We definitely re-evaluating schema handling a bit more holistically right now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
@alexeykudinkin I resolved this by removing the logic that infers the schema from a block. Now it will rely on a user defined schema or an existing ClickHouse table definition.
@alexeykudinkin I've read through your comments. I'll be able to address them around end of week / over the weekend. |
@jecsand838 please tag me in a comment so that it pops to the top of my inbox and i can prioritize landing it |
Co-authored-by: Alexey Kudinkin <alexey.kudinkin@gmail.com>Signed-off-by: Connor Sanders <connor@elastiflow.com>
Signed-off-by: Connor Sanders <connor@elastiflow.com>
@alexeykudinkin I went ahead and got those requested changes pushed up. |
Why are these changes needed?
Greetings again from ElastiFlow!
This PR introduces a fully-featured ClickHouse Datasink for Ray, enabling distributed writes from Ray Datasets into ClickHouse. The implementation aligns with Ray’s modern Datasink lifecycle, incorporating mode-based table management (create, append, overwrite), automatic schema handling, parallel block insertion, and optional chunking for large inserts. Additionally, it enhances compatibility with newer Ray versions by handling WriteResult objects in
on_write_complete()
, ensuring robust write tracking. These changes are essential for supporting high-performance, scalable data ingestion into ClickHouse, making Ray more versatile for real-time analytics and ETL workflows.Evidence
Related issue number
Follow-up on#49526
@alexeykudinkin
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.