Interface Gatherer<T,A,R>

Type Parameters:
T - the type of input elements to the gatherer operation
A - the potentially mutable state type of the gatherer operation (often hidden as an implementation detail)
R - the type of output elements from the gatherer operation

public interfaceGatherer<T,A,R>
An intermediate operation that transforms a stream of input elements into a stream of output elements, optionally applying a final action when the end of the upstream is reached. The transformation may be stateless or stateful, and may buffer input before producing any output.

Gatherer operations can be performed either sequentially, or be parallelized -- if a combiner function is supplied.

There are many examples of gathering operations, including but not limited to: grouping elements into batches (windowing functions); de-duplicating consecutively similar elements; incremental accumulation functions (prefix scan); incremental reordering functions, etc. The classGatherers provides implementations of common gathering operations.

API Note:

AGatherer is specified by four functions that work together to process input elements, optionally using intermediate state, and optionally perform a final action at the end of input. They are:

Each invocation ofinitializer(),integrator(),combiner(), andfinisher() must return a semantically identical result.

Implementations of Gatherer must not capture, retain, or expose to other threads, the references to the state instance, or the downstreamGatherer.Downstream for longer than the invocation duration of the method which they are passed to.

Performing a gathering operation with aGatherer should produce a result equivalent to:

Gatherer.Downstream<? super R> downstream = ...;A state = gatherer.initializer().get();for (T t : data) {    gatherer.integrator().integrate(state, t, downstream);}gatherer.finisher().accept(state, downstream);

However, the library is free to partition the input, perform the integrations on the partitions, and then use the combiner function to combine the partial results to achieve a gathering operation. (Depending on the specific gathering operation, this may perform better or worse, depending on the relative cost of the integrator and combiner functions.)

In addition to the predefined implementations inGatherers, the static factory methodsof(...) andofSequential(...) can be used to construct gatherers. For example, you could create a gatherer that implements the equivalent ofStream.map(java.util.function.Function) with:

public static <T, R> Gatherer<T, ?, R> map(Function<? super T, ? extends R> mapper) {    return Gatherer.of(        (unused, element, downstream) -> // integrator            downstream.push(mapper.apply(element))    );}

Gatherers are designed to becomposed; two or more Gatherers can be composed into a single Gatherer using theandThen(Gatherer) method.

// using the implementation of `map` as seen aboveGatherer<Integer, ?, Integer> increment = map(i -> i + 1);Gatherer<Object, ?, String> toString = map(i -> i.toString());Gatherer<Integer, ?, String> incrementThenToString = increment.andThen(toString);

As an example, a Gatherer implementing a sequential Prefix Scan could be done the following way:

public static <T, R> Gatherer<T, ?, R> scan(    Supplier<R> initial,    BiFunction<? super R, ? super T, ? extends R> scanner) {    class State {        R current = initial.get();    }    return Gatherer.<T, State, R>ofSequential(         State::new,         Gatherer.Integrator.ofGreedy((state, element, downstream) -> {             state.current = scanner.apply(state.current, element);             return downstream.push(state.current);         })    );}

Example of usage:

// will contain: ["1", "12", "123", "1234", "12345", "123456", "1234567", "12345678", "123456789"]List<String> numberStrings =    Stream.of(1,2,3,4,5,6,7,8,9)          .gather(              scan(() -> "", (string, number) -> string + number)           )          .toList();

Implementation Requirements:
Libraries that implement transformations based onGatherer, such asStream.gather(Gatherer), must adhere to the following constraints:
  • Gatherers whose initializer isdefaultInitializer() are considered to be stateless, and invoking their initializer is optional.
  • Gatherers whose integrator is an instance ofGatherer.Integrator.Greedy can be assumed not to short-circuit, and the return value of invokingGatherer.Integrator.integrate(Object, Object, Downstream) does not need to be inspected.
  • The first argument passed to the integration function, both arguments passed to the combiner function, and the argument passed to the finisher function must be the result of a previous invocation of the initializer or combiner functions.
  • The implementation should not do anything with the result of any of the initializer or combiner functions other than to pass them again to the integrator, combiner, or finisher functions.
  • Once a state object is passed to the combiner or finisher function, it is never passed to the integrator function again.
  • When the integrator function returnsfalse, it shall be interpreted just as if there were no more elements to pass it.
  • For parallel evaluation, the gathering implementation must manage that the input is properly partitioned, that partitions are processed in isolation, and combining happens only after integration is complete for both partitions.
  • Gatherers whose combiner isdefaultCombiner() may only be evaluated sequentially. All other combiners allow the operation to be parallelized by initializing each partition in separation, invoking the integrator until it returnsfalse, and then joining each partitions state using the combiner, and then invoking the finisher on the joined state. Outputs and state later in the input sequence will be discarded if processing an earlier partition short-circuits.
  • Gatherers whose finisher isdefaultFinisher() are considered to not have an end-of-stream hook and invoking their finisher is optional.
Since:
24
See Also:
  • Method Details

    • initializer

      default Supplier<A> initializer()
      A function that produces an instance of the intermediate state used for this gathering operation.
      Implementation Requirements:
      The implementation in this interface returnsdefaultInitializer().
      Returns:
      A function that produces an instance of the intermediate state used for this gathering operation
    • integrator

      Gatherer.Integrator<A,T,R> integrator()
      A function which integrates provided elements, potentially using the provided intermediate state, optionally producing output to the providedGatherer.Downstream.
      Returns:
      a function which integrates provided elements, potentially using the provided state, optionally producing output to the provided Downstream
    • combiner

      default BinaryOperator<A> combiner()
      A function which accepts two intermediate states and combines them into one.
      Implementation Requirements:
      The implementation in this interface returnsdefaultCombiner().
      Returns:
      a function which accepts two intermediate states and combines them into one
    • finisher

      default BiConsumer<A,Gatherer.Downstream<? superR>> finisher()
      A function which accepts the final intermediate state and aGatherer.Downstream object, allowing to perform a final action at the end of input elements.
      Implementation Requirements:
      The implementation in this interface returnsdefaultFinisher().
      Returns:
      a function which transforms the intermediate result to the final result(s) which are then passed on to the provided Downstream
    • andThen

      default <RR> Gatherer<T,?,RR> andThen(Gatherer<? superR, ?, ? extends RR> that)
      Returns a composed Gatherer which connects the output of this Gatherer to the input of that Gatherer.
      Implementation Requirements:
      The implementation in this interface returns a new Gatherer which is semantically equivalent to the combination ofthis andthat gatherer.
      Type Parameters:
      RR - The type of output of that Gatherer
      Parameters:
      that - the other gatherer
      Returns:
      returns a composed Gatherer which connects the output of this Gatherer as input that Gatherer
      Throws:
      NullPointerException - if the argument isnull
    • defaultInitializer

      static <A> Supplier<A> defaultInitializer()
      Returns an initializer which is the default initializer of a Gatherer. The returned initializer identifies that the owner Gatherer is stateless.
      Implementation Requirements:
      This method always returns the same instance.
      Type Parameters:
      A - the type of the state of the returned initializer
      Returns:
      the instance of the default initializer
      See Also:
    • defaultCombiner

      static <A> BinaryOperator<A> defaultCombiner()
      Returns a combiner which is the default combiner of a Gatherer. The returned combiner identifies that the owning Gatherer must only be evaluated sequentially.
      Implementation Requirements:
      This method always returns the same instance.
      Type Parameters:
      A - the type of the state of the returned combiner
      Returns:
      the instance of the default combiner
      See Also:
    • defaultFinisher

      static <A,R>BiConsumer<A,Gatherer.Downstream<? super R>> defaultFinisher()
      Returns afinisher which is the default finisher of aGatherer. The returned finisher identifies that the owning Gatherer performs no additional actions at the end of input.
      Implementation Requirements:
      This method always returns the same instance.
      Type Parameters:
      A - the type of the state of the returned finisher
      R - the type of the Downstream of the returned finisher
      Returns:
      the instance of the default finisher
      See Also:
    • ofSequential

      static <T,R> Gatherer<T,Void,R> ofSequential(Gatherer.Integrator<Void,T,R> integrator)
      Returns a new, sequential, and statelessGatherer described by the givenintegrator.
      Type Parameters:
      T - the type of input elements for the new gatherer
      R - the type of results for the new gatherer
      Parameters:
      integrator - the integrator function for the new gatherer
      Returns:
      the newGatherer
      Throws:
      NullPointerException - if the argument isnull
    • ofSequential

      static <T,R> Gatherer<T,Void,R> ofSequential(Gatherer.Integrator<Void,T,R> integrator,BiConsumer<Void,Gatherer.Downstream<? super R>> finisher)
      Returns a new, sequential, and statelessGatherer described by the givenintegrator andfinisher.
      Type Parameters:
      T - the type of input elements for the new gatherer
      R - the type of results for the new gatherer
      Parameters:
      integrator - the integrator function for the new gatherer
      finisher - the finisher function for the new gatherer
      Returns:
      the newGatherer
      Throws:
      NullPointerException - if any argument isnull
    • ofSequential

      static <T,A,R> Gatherer<T,A,R> ofSequential(Supplier<A> initializer,Gatherer.Integrator<A,T,R> integrator)
      Returns a new, sequential,Gatherer described by the giveninitializer andintegrator.
      Type Parameters:
      T - the type of input elements for the new gatherer
      A - the type of state for the new gatherer
      R - the type of results for the new gatherer
      Parameters:
      initializer - the initializer function for the new gatherer
      integrator - the integrator function for the new gatherer
      Returns:
      the newGatherer
      Throws:
      NullPointerException - if any argument isnull
    • ofSequential

      static <T,A,R> Gatherer<T,A,R> ofSequential(Supplier<A> initializer,Gatherer.Integrator<A,T,R> integrator,BiConsumer<A,Gatherer.Downstream<? super R>> finisher)
      Returns a new, sequential,Gatherer described by the giveninitializer,integrator, andfinisher.
      Type Parameters:
      T - the type of input elements for the new gatherer
      A - the type of state for the new gatherer
      R - the type of results for the new gatherer
      Parameters:
      initializer - the initializer function for the new gatherer
      integrator - the integrator function for the new gatherer
      finisher - the finisher function for the new gatherer
      Returns:
      the newGatherer
      Throws:
      NullPointerException - if any argument isnull
    • of

      static <T,R> Gatherer<T,Void,R> of(Gatherer.Integrator<Void,T,R> integrator)
      Returns a new, parallelizable, and statelessGatherer described by the givenintegrator.
      Type Parameters:
      T - the type of input elements for the new gatherer
      R - the type of results for the new gatherer
      Parameters:
      integrator - the integrator function for the new gatherer
      Returns:
      the newGatherer
      Throws:
      NullPointerException - if any argument isnull
    • of

      static <T,R> Gatherer<T,Void,R> of(Gatherer.Integrator<Void,T,R> integrator,BiConsumer<Void,Gatherer.Downstream<? super R>> finisher)
      Returns a new, parallelizable, and statelessGatherer described by the givenintegrator andfinisher.
      Type Parameters:
      T - the type of input elements for the new gatherer
      R - the type of results for the new gatherer
      Parameters:
      integrator - the integrator function for the new gatherer
      finisher - the finisher function for the new gatherer
      Returns:
      the newGatherer
      Throws:
      NullPointerException - if any argument isnull
    • of

      static <T,A,R> Gatherer<T,A,R> of(Supplier<A> initializer,Gatherer.Integrator<A,T,R> integrator,BinaryOperator<A> combiner,BiConsumer<A,Gatherer.Downstream<? super R>> finisher)
      Returns a new, parallelizable,Gatherer described by the giveninitializer,integrator,combiner andfinisher.
      Type Parameters:
      T - the type of input elements for the new gatherer
      A - the type of state for the new gatherer
      R - the type of results for the new gatherer
      Parameters:
      initializer - the initializer function for the new gatherer
      integrator - the integrator function for the new gatherer
      combiner - the combiner function for the new gatherer
      finisher - the finisher function for the new gatherer
      Returns:
      the newGatherer
      Throws:
      NullPointerException - if any argument isnull