Arrow Flight RPC#

Arrow Flight is an RPC framework for efficient transfer of Arrow dataover the network.

See also

Flight protocol documentation

Documentation of the Flight protocol, including how to useFlight conceptually.

Java Cookbook

Recipes for using Arrow Flight in Java.

Writing a Flight Service#

Flight servers implement theFlightProducer interface. For convenience,they can subclassNoOpFlightProducer instead, which offers defaultimplementations of all the RPC methods.

publicclassTutorialFlightProducerimplementsFlightProducer{@Override// Override methods or use NoOpFlightProducer for only methods needed}

Each RPC method always takes aCallContext for common parameters. To indicatefailure, pass an exception to the “listener” if present, or else raise anexception.

// Server@OverridepublicvoidlistFlights(CallContextcontext,Criteriacriteria,StreamListener<FlightInfo>listener){// ...listener.onError(CallStatus.UNAUTHENTICATED.withDescription("Custom UNAUTHENTICATED description message.").toRuntimeException());// ...}// Clienttry{Iterable<FlightInfo>flightInfosBefore=flightClient.listFlights(Criteria.ALL);// ...}catch(FlightRuntimeExceptione){// Catch UNAUTHENTICATED exception}

To start a server, create aLocation to specify where to listen, and then createaFlightServer with an instance of a producer. This will start the server, butwon’t block the rest of the program. CallFlightServer.awaitTerminationto block until the server stops.

classTutorialFlightProducerimplementsFlightProducer{@Override// Override methods or use NoOpFlightProducer for only methods needed}Locationlocation=Location.forGrpcInsecure("0.0.0.0",0);try(BufferAllocatorallocator=newRootAllocator();FlightServerserver=FlightServer.builder(allocator,location,newTutorialFlightProducer()).build();){server.start();System.out.println("Server listening on port "+server.getPort());server.awaitTermination();}catch(Exceptione){e.printStackTrace();}
Serverlisteningonport58104

Using the Flight Client#

To connect to a Flight service, create aFlightClient with a location.

Locationlocation=Location.forGrpcInsecure("0.0.0.0",58104);try(BufferAllocatorallocator=newRootAllocator();FlightClientclient=FlightClient.builder(allocator,location).build()){// ... Consume operations exposed by Flight server}catch(Exceptione){e.printStackTrace();}

Cancellation and Timeouts#

When making a call, clients can optionally provideCallOptions. This allowsclients to set a timeout on calls. Also, some objects returned by client RPC callsexpose a cancel method which allows terminating a call early.

Locationlocation=Location.forGrpcInsecure("0.0.0.0",58609);try(BufferAllocatorallocator=newRootAllocator();FlightClienttutorialFlightClient=FlightClient.builder(allocator,location).build()){Iterator<Result>resultIterator=tutorialFlightClient.doAction(newAction("test-timeout"),CallOptions.timeout(2,TimeUnit.SECONDS));}catch(Exceptione){e.printStackTrace();}

On the server side, timeouts are transparent. For cancellation, the server needs to manually pollsetOnCancelHandler orisCancelled to check if the client has cancelled the call,and if so, break out of any processing the server is currently doing.

// ClientLocationlocation=Location.forGrpcInsecure("0.0.0.0",58609);try(BufferAllocatorallocator=newRootAllocator();FlightClienttutorialFlightClient=FlightClient.builder(allocator,location).build()){try(FlightStreamflightStream=flightClient.getStream(newTicket(newbyte[]{}))){// ...flightStream.cancel("tutorial-cancel",newException("Testing cancellation option!"));}}catch(Exceptione){e.printStackTrace();}// Server@OverridepublicvoidgetStream(CallContextcontext,Ticketticket,ServerStreamListenerlistener){// ...listener.setOnCancelHandler(()->{// Implement logic to handle cancellation option});}

Enabling TLS#

TLS can be enabled when setting up a server by providing acertificate and key pair toFlightServer.Builder.useTls.

On the client side, useLocation.forGrpcTls to create the Location for the client.

Enabling Authentication#

Warning

Authentication is insecure without enabling TLS.

Handshake-based authentication can be enabled by implementingServerAuthHandler. Authentication consists of two parts: oninitial client connection, the server and client authenticationimplementations can perform any negotiation needed. The client authenticationhandler then provides a token that will be attached to future calls.

The client send data to be validated throughClientAuthHandler.authenticateThe server validate data received throughServerAuthHandler.authenticate.

Custom Middleware#

Servers and clients support custom middleware (or interceptors) that are called on everyrequest and can modify the request in a limited fashion. These can be implemented by implementing theFlightServerMiddleware andFlightClientMiddleware interfaces.

Middleware are fairly limited, but they can add headers to arequest/response. On the server, they can inspect incoming headers andfail the request; hence, they can be used to implement customauthentication methods.

Adding Services#

Servers can add other gRPC services. For example, to add theHealth Check service:

finalHealthStatusManagerstatusManager=newHealthStatusManager();finalConsumer<NettyServerBuilder>consumer=(builder)->{builder.addService(statusManager.getHealthService());};finalLocationlocation=forGrpcInsecure(LOCALHOST,5555);try(BufferAllocatora=newRootAllocator(Long.MAX_VALUE);Producerproducer=newProducer(a);FlightServers=FlightServer.builder(a,location,producer).transportHint("grpc.builderConsumer",consumer).build().start();){Channelchannel=NettyChannelBuilder.forAddress(location.toSocketAddress()).usePlaintext().build();HealthCheckResponseresponse=HealthGrpc.newBlockingStub(channel).check(HealthCheckRequest.getDefaultInstance());System.out.println(response.getStatus());}

Flight best practices#