Arrow Flight RPC#

Arrow Flight is an RPC framework for efficient transfer of Flight dataover the network.

See also

Flight protocol documentation

Documentation of the Flight protocol, including how to useFlight conceptually.

Flight API documentation

C++ API documentation listing all of the various client andserver types.

C++ Cookbook

Recipes for using Arrow Flight in C++.

Writing a Flight Service#

Servers are subclasses ofarrow::flight::FlightServerBase. Toimplement individual RPCs, override the RPC methods on this class.

classMyFlightServer:publicFlightServerBase{StatusListFlights(constServerCallContext&context,constCriteria*criteria,std::unique_ptr<FlightListing>*listings)override{std::vector<FlightInfo>flights=...;*listings=std::unique_ptr<FlightListing>(newSimpleFlightListing(flights));returnStatus::OK();}};

Each RPC method always takes aarrow::flight::ServerCallContext for common parameters andreturns aarrow::Status to indicate success orfailure. Flight-specific error codes can be returned viaarrow::flight::MakeFlightError().

RPC methods that return a value in addition to a status will use anout parameter, as shown above. Often, there are helper classesproviding basic implementations of these out parameters. For instance,above,arrow::flight::SimpleFlightListing uses a vector ofarrow::flight::FlightInfo objects as the result of aListFlights RPC.

To start a server, create aarrow::flight::Location tospecify where to listen, and callarrow::flight::FlightServerBase::Init(). This will start theserver, but won’t block the rest of the program. Usearrow::flight::FlightServerBase::SetShutdownOnSignals() toenable stopping the server if an interrupt signal is received, thencallarrow::flight::FlightServerBase::Serve() to block until theserver stops.

std::unique_ptr<arrow::flight::FlightServerBase>server;// Initialize serverarrow::flight::Locationlocation;// Listen to all interfaces on a free portARROW_CHECK_OK(arrow::flight::Location::ForGrpcTcp("0.0.0.0",0,&location));arrow::flight::FlightServerOptionsoptions(location);// Start the serverARROW_CHECK_OK(server->Init(options));// Exit with a clean error code (0) on SIGTERMARROW_CHECK_OK(server->SetShutdownOnSignals({SIGTERM}));std::cout<<"Server listening on localhost:"<<server->port()<<std::endl;ARROW_CHECK_OK(server->Serve());

Using the Flight Client#

To connect to a Flight service, create an instance ofarrow::flight::FlightClient by callingConnect.

Each RPC method returnsarrow::Result to indicate thesuccess/failure of the request, and the result object if the requestsucceeded. Some calls are streaming calls, so they will return areader and/or a writer object; the final call status isn’t known untilthe stream is completed.

Cancellation and Timeouts#

When making a call, clients can optionally provideFlightCallOptions. Thisallows clients to set a timeout on calls or provide custom HTTPheaders, among other features. Also, some objects returned by clientRPC calls expose aCancel method which allows terminating a callearly.

On the server side, no additional code is needed to implementtimeouts. For cancellation, the server needs to manually pollServerCallContext::is_cancelled to check if theclient has cancelled the call, and if so, break out of any processingthe server is currently doing.

Enabling TLS#

TLS can be enabled when setting up a server by providing a certificateand key pair toFlightServerBase::Init.

On the client side, useLocation::ForGrpcTls to construct thearrow::flight::Location to listen on.

Enabling Authentication#

Warning

Authentication is insecure without enabling TLS.

Handshake-based authentication can be enabled by implementingServerAuthHandler andproviding this to the server during construction.

Authentication consists of two parts: on initial client connection,the server and client authentication implementations can perform anynegotiation needed. The client authentication handler then provides atoken that will be attached to future calls. This is done by callingAuthenticate withthe desired client authentication implementation.

On each RPC thereafter, the client handler’s token is automaticallyadded to the call in the request headers. The server authenticationhandler validates the token and provides the identity of theclient. On the server, this identity can be obtained from thearrow::flight::ServerCallContext.

Custom Middleware#

Servers and clients support custom middleware (or interceptors) thatare called on every request and can modify the request in a limitedfashion. These can be implemented by subclassingServerMiddleware andClientMiddleware, then providing them when creatingthe client or server.

Middleware are fairly limited, but they can add headers to arequest/response. On the server, they can inspect incoming headers andfail the request; hence, they can be used to implement customauthentication methods.

Best practices#

gRPC#

When using the default gRPC transport, options can be passed to it viaarrow::flight::FlightClientOptions::generic_options. For example:

autooptions=FlightClientOptions::Defaults();// Set the period after which a keepalive ping is sent on transport.options.generic_options.emplace_back(GRPC_ARG_KEEPALIVE_TIME_MS,60000);
# Set the period after which a keepalive ping is sent on transport.generic_options=[("GRPC_ARG_KEEPALIVE_TIME_MS",60000)]client=pyarrow.flight.FlightClient(server_uri,generic_options=generic_options)

Also seebest gRPC practices and availablegRPC keys.

Re-use clients whenever possible#

Creating and closing clients requires setup and teardown on the client andserver side which can take away from actually handling RPCs. Reuse clientswhenever possible to avoid this. Note that clients are thread-safe, so asingle client can be shared across multiple threads.

Don’t round-robin load balance#

Round robin load balancing means every client can have an open connection toevery server, causing an unexpected number of open connections and depletingserver resources.

Debugging connection issues#

When facing unexpected disconnects on long running connections use netstat tomonitor the number of open connections. If number of connections is muchgreater than the number of clients it might cause issues.

For debugging, certain environment variables enable logging in gRPC. Forexample,envGRPC_VERBOSITY=infoGRPC_TRACE=http will print the initialheaders (on both sides) so you can see if gRPC established the connection ornot. It will also print when a message is sent, so you can tell if theconnection is open or not.

gRPC may not report connection errors until a call is actually made.Hence, to detect connection errors when creating a client, some sortof dummy RPC should be made.

Memory management#

Flight tries to reuse allocations made by gRPC to avoid redundantdata copies. However, experience shows that such data is frequentlymisaligned. Some use cases might require data to have data type-specificalignment (for example, for the data buffer of an Int32 array to be alignedon a 4-byte boundary), which can be enforcedby settingarrow::ipc::IpcReadOptions::ensure_alignmenttoarrow::ipc::Alignment::kDataTypeSpecificAlignment.This uses thearrow::ipc::IpcReadOptions::memory_poolto allocate memory with aligned addresses, but only for mis-aligned data.However, this creates data copies of your data received via Flight.

Unless gRPC data are copied as described above, allocations made by gRPC may notbe tracked by the Arrow memory pool, and that memory usage behavior,such as whether free memory is returned to the system, is dependenton the allocator that gRPC uses (usually the system allocator).

A quick way of testing: attach to the process with a debugger and callmalloc_trim, or callReleaseUnusedon the system pool. If memory usage drops, then likely, there is memoryallocated by gRPC or by the application that the system allocator was holdingon to. This can be adjusted in platform-specific ways; see an investigationinARROW-16697 for an example of how this works on Linux/glibc. glibc malloccan be explicitly told to dump caches.

Excessive traffic#

gRPC will spawn up to max threads quota of threads for concurrent clients. Thosethreads are not necessarily cleaned up (a “cached thread pool” in Java parlance).glibc malloc clears some per thread state and the default tuning never clearscaches in some workloads.

gRPC’s default behaviour allows one server to accept many connections from manydifferent clients, but if requests do a lot of work (as they may under Flight),the server may not be able to keep up. Configuring clients to retrywith backoff (and potentially connect to a different node), would give moreconsistent quality of service.

autooptions=FlightClientOptions::Defaults();// Set the minimum time between subsequent connection attempts.options.generic_options.emplace_back(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS,2000);
# Set the minimum time between subsequent connection attempts.generic_options=[("GRPC_ARG_MIN_RECONNECT_BACKOFF_MS",2000)]client=pyarrow.flight.FlightClient(server_uri,generic_options=generic_options)

Limiting DoPut Batch Size#

You may wish to limit the maximum batch size a client can submit to a server throughDoPut, to prevent a request from taking up too much memory on the server. Onthe client-side, setarrow::flight::FlightClientOptions::write_size_limit_bytes.On the server-side, set the gRPC optionGRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH.The client-side option will return an error that can be retried with smaller batches,while the server-side limit will close out the connection. Setting both can be wise, sincethe former provides a better user experience but the latter may be necessary to defendagainst impolite clients.

Closing unresponsive connections#

  1. A stale call can be closed usingarrow::flight::FlightCallOptions::stop_token. This requires recording thestop token at call establishment time.

    StopSourcestop_source;FlightCallOptionsoptions;options.stop_token=stop_source.token();stop_source.RequestStop(Status::Cancelled("StopSource"));flight_client->DoAction(options,{});
  2. Use call timeouts. (This is a general gRPC best practice.)

    FlightCallOptionsoptions;options.timeout=TimeoutDuration{0.2};Statusstatus=client->GetFlightInfo(options,FlightDescriptor{}).status();
    Iterator<Result>results=client.doAction(newAction("hang"),CallOptions.timeout(0.2,TimeUnit.SECONDS));
    options=pyarrow.flight.FlightCallOptions(timeout=0.2)result=client.do_action(action,options=options)
  3. Client timeouts are not great for long-running streaming calls, where it maybe hard to choose a timeout for the entire operation. Instead, what is oftendesired is a per-read or per-write timeout so that the operation fails if itisn’t making progress. This can be implemented with a background thread thatcalls Cancel() on a timer, with the main thread resetting the timer every timean operation completes successfully. For a fully-worked out example, see theCookbook.

    Note

    There is a long standing ticket for a per-write/per-read timeoutinstead of a per call timeout (ARROW-6062), but this is not (easily)possible to implement with the blocking gRPC API.