Dissociated IPC Protocol#

Warning

Experimental: The Dissociated IPC Protocol is experimental in its currentform. Based on feedback and usage the protocol definition may change untilit is fully standardized.

Rationale#

TheArrow IPC format describes a protocol for transferringArrow data as a stream of record batches. This protocol expects a continuousstream of bytes divided into discrete messages (using a length prefix andcontinuation indicator). Each discrete message consists of two portions:

  • AFlatbuffers header message

  • A series of bytes consisting of the flattened and packed body buffers (somemessage types, like Schema messages, do not have this section)- This is referred to as themessage body in the IPC format spec.

For most cases, the existing IPC format as it currently exists is sufficiently efficient:

  • Receiving data in the IPC format allows zero-copy utilization of the bodybuffer bytes, no deserialization is required to form Arrow Arrays

  • An IPC file format can be memory-mapped because it is location agnosticand the bytes of the file are exactly what is expected in memory.

However, there are use cases that aren’t handled by this:

  • Constructing the IPC record batch message requires allocating a contiguouschunk of bytes and copying all of the data buffers into it, packed togetherback-to-back. This pessimizes the common case of wrapping existing, directlyconsumable data into an IPC message.

  • Even if Arrow data is located in a memory accessible across process boundariesor transports (such as UCX), there is no standard way to specify that sharedlocation to consumers which could take advantage of it.

  • Arrow data located on a non-CPU device (such as a GPU) cannot be sent usingArrow IPC without having to copy the data back to the host device or copyingthe Flatbuffers metadata bytes into device memory.

    • By the same token, receiving IPC messages into device memory would requireperforming a copy of the Flatbuffers metadata back to the host CPU device. Thisis due to the fact that the IPC stream interleaves data and metadata across asingle stream.

This protocol attempts to solve these use cases in an efficient manner.

Goals#

  • Define a generic protocol for passing Arrow IPC data, not tied to any particulartransport, that also allows for utilizing non-CPU device memory, shared memory, andnewer “high performance” transports such asUCX orlibfabric.

    • This allows for the data in the body to be kept on non-CPU devices (like GPUs)without expensive device-to-host copies.

  • Allow for usingFlight RPC purely for control flow by separatingthe stream of IPC metadata from IPC body bytes

Definitions#

IPC Metadata#

The Flatbuffers message bytes that encompass the header of an Arrow IPC message

Tag#

A little-endianuint64 value used for flow control and used in determininghow to interpret the body of a message. Specific bits can be masked to allowidentifying messages by only a portion of the tag, leaving the rest of the bitsto be used for control flow or other message metadata. Some transports, such asUCX, have built-in support for such tag values and will provide them in CPUmemory regardless of whether or not the body of the message may reside on anon-CPU device.

Sequence Number#

A little-endian, 4-byte unsigned integer starting at 0 for a stream, indicatingthe sequence order of messages. It is also used to identify specific messages totie the IPC metadata header to its corresponding body since the metadata and bodycan be sent across separate pipes/streams/transports.

If a sequence number reachesUINT32_MAX, it should be allowed to roll over asit is unlikely there would be enough unprocessed messages waiting to be processedthat would cause an overlap of sequence numbers.

The sequence number serves two purposes: To identify corresponding metadata andtagged body data messages and to ensure we do not rely on messages having to arrivein order. A client should use the sequence number to correctly order messages asthey arrive for processing.

The Protocol#

A reference example implementation utilizinglibcudf andUCX can be found in thearrow-experiments repo.

Requirements#

A transport implementing this protocolMUST provide two pieces of functionality:

  • Message sending

    • Delimited messages (like gRPC) as opposed to non-delimited streams (like plain TCPwithout further framing).

    • Alternatively, a framing mechanism like theencapsulated message formatfor the IPC protocol can be used while leaving out the body bytes.

  • Tagged message sending

    • Sending a message that has an attached little-endian, unsigned 64-bit integral tagfor control flow. A tag like this allows control flow to operate on a message whose bodyis on a non-CPU device without requiring the message itself to get copied off of the device.

URI Specification#

When providing a URI to a consumer to contact for use with this protocol (such as viatheLocation URI for Flight), the URI should specify a schemelikeucx: orfabric:, that is easily identifiable. In addition, the URI shouldencode the following URI query parameters:

Note

As this protocol matures, this document will get updated with commonly recognizedtransport schemes that get used with it.

  • want_data -REQUIRED - uint64 integer value

    • This value should be used to tag an initial message to the server to initiate adata transfer. The body of the initiating message should be an opaque binary identifierof the data stream being requested (like theTicket in the Flight RPC protocol)

  • free_data -OPTIONAL - uint64 integer value

    • If the server might send messages using offsets / addresses for remote memory accessingor shared memory locations, the URI should include this parameter. This value is used totag messages sent from the client to the data server, containing specific offsets / addresseswhich were provided that are no longer required by the client (i.e. any operations thatdirectly reference those memory locations, such as copying the remote data into local memory,have been completed).

  • remote_handle -OPTIONAL - base64-encoded string

    • When working with shared memory or remote memory, this value indicates any requiredhandle or identifier that is necessary for accessing the memory.

      • Using UCX, this would be anrkey value

      • With CUDA IPC, this would be the value of the base GPU pointer or memory handle,and subsequent addresses would be offsets from this base pointer.

Handling of Backpressure#

Currently this proposal does not specify any way to manage the backpressure ofmessages to throttle for memory and bandwidth reasons. For now, this will betransport-defined rather than lock into something sub-optimal.

As usage among different transports and libraries grows, common patterns will emergethat will allow for a generic, but efficient, way to handle backpressure acrossdifferent use cases.

Note

While the protocol itself is transport agnostic, the current usage and examplesonly have been tested using UCX and libfabric transports so far, but that’s all.

Protocol Description#

There are two possibilities that can occur:

  1. The streams of metadata and body data are sent across separate connections

%% Licensed to the Apache Software Foundation (ASF) under one%% or more contributor license agreements. See the NOTICE file%% distributed with this work for additional information%% regarding copyright ownership. The ASF licenses this file%% to you under the Apache License, Version 2.0 (the%% "License"); you may not use this file except in compliance%% with the License. You may obtain a copy of the License at%%%% http://www.apache.org/licenses/LICENSE-2.0%%%% Unless required by applicable law or agreed to in writing,%% software distributed under the License is distributed on an%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY%% KIND, either express or implied. See the License for the%% specific language governing permissions and limitations%% under the License.sequenceDiagram participant D as Data Stream participant C as Client participant M as Metadata Stream activate C C-->>+M: TaggedMessage(server.want_data, bytes=ID_of_desired_data) C-->>+D: TaggedMessage(server.want_data, bytes=ID_of_desired_data) M-->>C: Message(bytes([1]) + le_bytes(sequence_number) + schema_metadata) loop each batch par M-->>C: Message(bytes([1]) + le_bytes(sequence_number) + batch_metadata) and alt D-->>C: TaggedMessage((bytes[0] << 55) | le_bytes(sequence_number),<br/>bytes=batch_data) else D-->>C: TaggedMessage((bytes[1] << 55) | le_bytes(sequence_number),<br/>bytes=uint64_pairs) end end end M-->>C: Message(bytes([0]) + le_bytes(sequence_number)) deactivate M loop C-->>D: TaggedMessage(server.free_data, bytes=uint64_list) end deactivate D deactivate C

  1. The streams of metadata and body data are sent simultaneously across thesame connection

%% Licensed to the Apache Software Foundation (ASF) under one%% or more contributor license agreements. See the NOTICE file%% distributed with this work for additional information%% regarding copyright ownership. The ASF licenses this file%% to you under the Apache License, Version 2.0 (the%% "License"); you may not use this file except in compliance%% with the License. You may obtain a copy of the License at%%%% http://www.apache.org/licenses/LICENSE-2.0%%%% Unless required by applicable law or agreed to in writing,%% software distributed under the License is distributed on an%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY%% KIND, either express or implied. See the License for the%% specific language governing permissions and limitations%% under the License.sequenceDiagram participant C as Client participant S as Server activate C C-->>+S: TaggedMessage(server.want_data, bytes=ID_of_desired_data) S-->>C: Message(bytes([1]) + le_bytes(sequence_number) + schema_metadata) par loop each chunk S-->>C: Message(bytes([1]) + le_bytes(sequence_number) + batch_metadata) end S-->>C: Message(bytes([0]) + le_bytes(sequence_number)) and loop each chunk alt S-->>C: TaggedMessage((bytes[0] << 55) | le_bytes(sequence_number),<br/>bytes=batch_data) else S-->>C: TaggedMessage((bytes[1] << 55) | le_bytes(sequence_number),<br/>bytes=uint64_pairs) end end end loop C-->>S: TaggedMessage(server.free_data, bytes=uint64_list) end deactivate S deactivate C

Server Sequence#

There can be either a single server handling both the IPC Metadata stream and theBody data streams, or separate servers for handling the IPC Metadata and the bodydata. This allows for streaming of data across either a single transport pipe ortwo pipes if desired.

Metadata Stream Sequence#

The standing state of the server is waiting for atagged message with a specific<want_data> tag value to initiate a transfer. This<want_data> value is definedby the server and propagated to any clients via the URI they are provided. This protocoldoes not prescribe any particular value so that it will not interfere with any otherexisting protocols that rely on tag values. The body of that message will contain anopaque, binary identifier to indicate a particular dataset / data stream to send.

Note

For instance, theticket that was passed with aFlightInfo message would bethe body of this message. Because it is opaque, it can be anything the server wantsto use. The URI and identifier do not need to be given to the client via Flight RPC,but could come across from any transport or protocol desired.

Upon receiving a<want_data> request, the servershould respond by sending a streamof messages consisting of the following:

block-betacolumns 8block:P["\n\n\n\nPrefix"]:5 T["Message type\nByte 0"] S["Sequence number\nBytes 1-4"]endH["Flatbuffer bytes\nRest of the message"]:3

  • A 5-byte prefix

    • The first byte of the message indicates the type of message, currently there are onlytwo allowed message types (more types may get added in the future):

      1. End of Stream

      2. Flatbuffers IPC Metadata Message

    • the next 4-bytes are a little-endian, unsigned 32-bit integer indicating the sequence number ofthe message. The first message in the stream (MUST always be a schema message)MUSThave a sequence number of0. Each subsequent messageMUST increment the number by1.

  • The full Flatbuffers bytes of an Arrow IPC header

As defined in the Arrow IPC format, each metadata message can represent a chunk of data ordictionaries for use by the stream of data.

After sending the last metadata message, the serverMUST indicate the end of the streamby sending a message consisting ofexactly 5 bytes:

  • The first byte is0, indicating anEnd of Stream message

  • The last 4 bytes are the sequence number (4-byte, unsigned integer in little-endian byte order)

Data Stream Sequence#

If a single server is handling both the data and metadata streams, then the data messagesshould begin being sent to the client in parallel with the metadata messages. Otherwise,as with the metadata sequence, the standing state of the server is to wait for ataggedmessage with the<want_data> tag value, whose body indicates the dataset / data streamto send to the client.

For each IPC message in the stream of data, atagged messageMUST be sent on the datastream if that message has a body (i.e. a Record Batch or Dictionary message). Thetag for each message should be structured as follows:

block-betacolumns 8S["Sequence number\nBytes 0-3"]:4U["Unused (Reserved)\nBytes 4-6"]:3T["Message type\nByte 7"]:1

  • Theleast significant 4-bytes (bits 0 - 31) of the tag should be the unsigned 32-bit, little-endian sequencenumber of the message.

  • Themost significant byte (bits 56 - 63) of the tag indicates the message bodytype as an 8-bitunsigned integer. Currently only two message types are specified, but more can be added asneeded to expand the protocol:

    1. The body contains the raw body buffer bytes as a packed buffer (i.e. the standard IPCformat body bytes)

    2. The body contains a series of unsigned, little-endian 64-bit integer pairs to representeither shared or remote memory, schematically structured as

      • The first two integers (e.g. the first 16 bytes) represent thetotal size (in bytes)of all buffers and the number of buffers in this message (and thus the number of followingpairs ofuint64)

      • Each subsequent pair ofuint64 values are an address / offset followed the length ofthat particular buffer.

  • All unspecified bits (bits 32 - 55) of the tag arereserved for future use by potential updatesto this protocol. For now theyMUST be 0.

Note

Any shared/remote memory addresses that are sent acrossMUST be kept alive by the serveruntil a corresponding tagged<free_data> message is received. If the client disconnectsbefore sending any<free_data> messages, it can be assumed to be safe to clean up the memoryif desired by the server.

After sending the last tagged IPC body message, the server should maintain the connection and waitfor tagged<free_data> messages. The structure of these<free_data> messages is simple:one or more unsigned, little-endian 64-bit integers which indicate the addresses/offsets that canbe freed.

Once there are no more outstanding addresses to be freed, the work for this stream is complete.

Client Sequence#

A client for this protocol needs to concurrently handle both the data and metadata streams ofmessages which may either both come from the same server or different servers. Below is a flowchartshowing how a client might handle the metadata and data streams:

%% Licensed to the Apache Software Foundation (ASF) under one%% or more contributor license agreements. See the NOTICE file%% distributed with this work for additional information%% regarding copyright ownership. The ASF licenses this file%% to you under the Apache License, Version 2.0 (the%% "License"); you may not use this file except in compliance%% with the License. You may obtain a copy of the License at%% http://www.apache.org/licenses/LICENSE-2.0%% Unless required by applicable law or agreed to in writing,%% software distributed under the License is distributed on an%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY%% KIND, either express or implied. See the License for the%% specific language governing permissions and limitations%% under the License.graph LRclient((Client))-->c1{{Send #60;want_data#gt; Msg}}subgraph meta [Meta Message] direction LR m1[/Msg Type #40;byte 0#41;<br/>Seq Num #40;bytes 1-5#41;/]-- type 1 -->m2[[Process IPC Header]] m2-- IPC has body -->m3[Get Corresponding<br/>Tagged Msg] m2-- Schema Msg -->m4[/Store Schema/] m1-- type 0 -->e[Indicate End of Stream]endsubgraph data [Data Stream] direction LR d1[Request Msg<br/>for Seq Num]-->d2{Most Significant<br/>Byte} d2-- 0 -->d3[Construct from<br/>Metadata and Body] d2-- 1 -->d4[Get shared/remote<br/>buffers] d4 -->d5[Construct from<br/>Metadata and buffers] d3 & d5 -->e2[Output Batch]endclient -- recv untagged msg --> metaclient -- get tagged msg --> data

  1. First the client sends a tagged message using the<want_data> value it was provided in theURI as the tag, and the opaque ID as the body.

    • If the metadata and data servers are separate, then a<want_data> message needs to be sentseparately to each.

    • In either scenario, the metadata and data streams can be processed concurrently and/or asynchronouslydepending on the nature of the transports.

  2. For eachuntagged message the client receives in the metadata stream:

    • The first byte of the message indicates whether it is anEnd of Stream message (value0)or a metadata message (value1).

    • The next 4 bytes are the sequence number of the message, an unsigned 32-bit integer inlittle-endian byte order.

    • If it isnot anEnd of Stream message, the remaining bytes are the IPC Flatbuffer bytes whichcan be interpreted as normal.

      • If the message has a body (i.e. Record Batch or Dictionary message) then the client should retrievea tagged message from the Data Stream using the same sequence number.

    • If itis anEnd of Stream message, then it is safe to close the metadata connection if there areno gaps in the sequence numbers received.

  3. When a metadata message that requires a body is received, the tag mask of0x00000000FFFFFFFFshouldbe used alongside the sequence number to match the message regardless of the higher bytes (e.g. we onlycare about matching the lower 4 bytes to the sequence number)

    • Once received, the Most Significant Byte’s value determines how the client processes the body data:

      • If the most significant byte is 0: Then the body of the message is the raw IPC packed body buffersallowing it to easily be processed with the corresponding metadata header bytes.

      • If the most significant byte is 1: The body of the message will consist of a series of pairs ofunsigned, 64-bit integers in little-endian byte order.

        • The first two integers represent1) the total size of all the body buffers together to allowfor easy allocation if an intermediate buffer is needed and2) the number of buffers being sent (nbuf).

        • The rest of the message will benbuf pairs of integers, one for each buffer. Each pair is1) the address / offset of the buffer and2) the length of that buffer. Memory can then be retrievedvia shared or remote memory routines based on the underlying transport. These addresses / offsetsMUSTbe retained so they can be sent back in<free_data> messages later, indicating to the server thatthe client no longer needs the shared memory.

  4. Once anEnd of Stream message is received, the client should process any remaining un-processedIPC metadata messages.

  5. After individual memory addresses / offsets are able to be freed by the remote server (in the case whereit has sent these rather than the full body bytes), the client should send corresponding<free_data> messagesto the server.

    • A single<free_data> message consists of an arbitrary number of unsigned 64-bit integer values, representingthe addresses / offsets which can be freed. The reason for it being anarbitrary number is to allow a clientto choose whether to send multiple messages to free multiple addresses or to coalesce multiple addresses intofewer messages to be freed (thus making the protocol less “chatty” if desired)

Continuing Development#

If you decide to try this protocol in your own environments and system, we’d love feedback and to learn aboutyour use case. As this is anexperimental protocol currently, we need real-world usage in order to facilitateimproving it and finding the right generalizations to standardize on across transports.

Please chime in using the Arrow Developers Mailing list:https://arrow.apache.org/community/#mailing-lists