T - the type of the values in the streampublic class Stream<T> extends Composable<T>
Stream is a stateless event processor that provides methods for attaching Consumers to consume the values passing through the stream. Some methods like
map(reactor.function.Function), filter(reactor.function.Predicate),
first(), last(), and reduce(reactor.function.Function, Object)
return new Stream Streams whose values are the result of transformation functions,
filtering, and the like.
Typically, new Stream Streams aren't created directly. To create a Stream,
create a DeferredStreamSpec and configure it with the appropriate Environment,
Dispatcher, and other settings, then call Deferred.compose(), which will
return a Stream that handles the values passed into the Deferred.
| Modifier and Type | Field and Description |
|---|---|
protected int |
batchSize |
END_EVENTFLUSH_EVENT| Constructor and Description |
|---|
Stream(Observable observable,
int batchSize,
Composable<?> parent,
Environment environment)
Create a new Stream that will use the
Observable to pass its values to registered
handlers. |
Stream(Observable observable,
int batchSize,
Composable<?> parent,
Tuple2<Selector,java.lang.Object> acceptSelector,
Environment environment)
Create a new Stream that will use the
Observable to pass its values to registered
handlers. |
| Modifier and Type | Method and Description |
|---|---|
Stream<T> |
buffer()
Use
Observable.batchNotify(Object) to group dispatching by the current batch size. |
Stream<T> |
buffer(int batchSize)
Use
Observable.batchNotify(Object) to group dispatch by the passed . |
Stream<T> |
bufferWithErrors()
Collect incoming values into a
List that will be pushed separately into the returned Stream
every time batchSize has been reached. |
Stream<T> |
bufferWithErrors(int batchSize)
Collect incoming values into a
List that will be pushed separately into the returned Stream every
time batchSize has been reached. |
Stream<java.util.List<T>> |
collect()
Collect incoming values into a
List that will be pushed into the returned Stream every time batchSize or flush is triggered has been reached. |
Stream<java.util.List<T>> |
collect(int batchSize)
Collect incoming values into a
List that will be pushed into the returned Stream every time batchSize has been reached. |
Stream<T> |
connect(Composable<T> consumer)
Attach another
Composable to this one that will cascade the value or error received by this Composable into the next. |
Stream<T> |
connectErrors(Composable<?> composable)
Forward any error to the argument.
|
Stream<T> |
connectValues(Composable<T> composable)
Attach another
Composable to this one that will only cascade the value received by this Composable into the next. |
Stream<T> |
consume(Consumer<T> consumer)
|
Stream<T> |
consume(java.lang.Object key,
Observable observable)
Pass values accepted by this
Composable into the given Observable, notifying with the given key. |
Stream<T> |
consumeFlush(Flushable<?> consumer)
Consume flush with the passed
Flushable |
Stream<T> |
count(Stream<java.lang.Long> stream)
Count accepted events for each batch (every flush) and pass each accumulated long to the .
|
Stream<T> |
distinct()
Create a new
Stream that filters out consecutive equals values. |
Stream<java.lang.Boolean> |
filter()
Evaluate each accepted boolean value.
|
Stream<java.lang.Boolean> |
filter(Composable<java.lang.Boolean> elseComposable)
Evaluate each accepted boolean value.
|
Stream<T> |
filter(Function<T,java.lang.Boolean> p)
Evaluate each accepted value against the given predicate
Function. |
Stream<T> |
filter(Predicate<T> p)
Evaluate each accepted value against the given
Predicate. |
Stream<T> |
filter(Predicate<T> p,
Composable<T> elseComposable)
Evaluate each accepted value against the given
Predicate. |
Stream<T> |
first()
Create a new
Stream whose values will be only the first value of each batch. |
Stream<T> |
first(int batchSize)
Create a new
Stream whose values will be only the first value of each batch. |
Stream<T> |
flush()
Flush any cached or unprocessed values through this Stream.
|
Stream<T> |
flushWhen(Predicate<T> predicate)
Consume values and trigger flush when matches.
|
Stream<T> |
last()
Create a new
Stream whose values will be only the last value of each batch. |
Stream<T> |
last(int batchSize)
Create a new
Stream whose values will be only the last value of each batch. |
<V> Stream<V> |
map(Function<T,V> fn)
Assign the given
Function to transform the incoming value T into a V and pass it into
another Composable. |
<V,C extends Composable<V>> |
mapMany(Function<T,C> fn)
Assign the given
Function to transform the incoming value T into a Composable<V> and pass
it into another Composable. |
Stream<T> |
merge(Composable<T>... composables)
this#connect(Composable) all the passed to this Composable,
merging values streams into the current pipeline. |
Stream<java.util.List<T>> |
movingWindow(int period,
int backlog)
Collect incoming values into an internal array, providing a
List that will be pushed into the returned
Stream every specified in milliseconds. |
Stream<java.util.List<T>> |
movingWindow(int period,
java.util.concurrent.TimeUnit timeUnit,
int backlog)
Collect incoming values into an internal array, providing a
List that will be pushed into the returned
Stream every specified time from the and a . |
Stream<java.util.List<T>> |
movingWindow(int period,
java.util.concurrent.TimeUnit timeUnit,
int delay,
int backlog)
Collect incoming values into an internal array, providing a
List that will be pushed into the returned
Stream every specified time from the and a after an initial
in milliseconds. |
Stream<java.util.List<T>> |
movingWindow(int period,
java.util.concurrent.TimeUnit timeUnit,
int delay,
int backlog,
Timer timer)
Collect incoming values into an internal array, providing a
List that will be pushed into the returned
Stream every specified time from the and a after an initial
in milliseconds. |
protected <V> Stream<V> |
newComposable()
Create a
Composable that is compatible with the subclass of Composable in use. |
protected <V> Stream<V> |
newComposable(int batchSize) |
Stream<T> |
propagate(java.lang.Iterable<T> iterable)
Create a new
Stream whose values will be each iterated item from
Every time flush is triggered, is drained. |
Stream<T> |
propagate(Supplier<T> supplier)
Create a new
Composable whose values will be generated from . |
<A> Stream<A> |
reduce(Function<Tuple2<T,A>,A> fn)
Reduce the values passing through this
Stream into an object A. |
<A> Stream<A> |
reduce(Function<Tuple2<T,A>,A> fn,
A initial)
Reduce the values passing through this
Stream into an object A. |
<A> Stream<A> |
reduce(Function<Tuple2<T,A>,A> fn,
Supplier<A> accumulators,
int batchSize)
Reduce the values passing through this
Stream into an object A. |
<A> Stream<A> |
scan(Function<Tuple2<T,A>,A> fn)
Scan the values passing through this
Stream into an object A. |
<A> Stream<A> |
scan(Function<Tuple2<T,A>,A> fn,
A initial)
Scan the values passing through this
Stream into an object A. |
<A> Stream<A> |
scan(Function<Tuple2<T,A>,A> fn,
Supplier<A> accumulators)
Scan the values passing through this
Stream into an object A. |
<V> Stream<V> |
split()
Create a new
Stream whose values will be each element E of any Iterable |
<V> Stream<V> |
split(int batchSize)
Create a new
Stream whose values will be each element E of any Iterable |
Tap<T> |
tap()
Create a
Tap that maintains a reference to the last value seen by this Stream. |
Stream<T> |
timeout(long timeout)
Flush the parent if any or the current composable otherwise when the last notification occurred before milliseconds.
|
Stream<T> |
timeout(long timeout,
Timer timer)
Flush the parent if any or the current composable otherwise when the last notification occurred before milliseconds.
|
<E extends java.lang.Throwable> |
when(java.lang.Class<E> exceptionType,
Consumer<E> onError)
Assign an error handler to exceptions of the given type.
|
Stream<java.util.List<T>> |
window(int period)
Collect incoming values into a
List that will be pushed into the returned Stream every specified
time from the in milliseconds. |
Stream<java.util.List<T>> |
window(int period,
java.util.concurrent.TimeUnit timeUnit)
Collect incoming values into a
List that will be pushed into the returned Stream every specified
time from the and a . |
Stream<java.util.List<T>> |
window(int period,
java.util.concurrent.TimeUnit timeUnit,
int delay)
Collect incoming values into a
List that will be pushed into the returned Stream every specified
time from the , after an initial in milliseconds. |
Stream<java.util.List<T>> |
window(int period,
java.util.concurrent.TimeUnit timeUnit,
int delay,
Timer timer)
Collect incoming values into a
List that will be pushed into the returned Stream every specified
time from the , after an initial in milliseconds. |
add, consumeErrorAndFlush, consumeEvent, debug, getAcceptKey, getAcceptSelector, getEnvironment, getError, getFlush, getObservable, getParentpublic Stream(@Nullable
Observable observable,
int batchSize,
@Nullable
Composable<?> parent,
@Nullable
Environment environment)
Observable to pass its values to registered
handlers.
The stream will batch values into batches of the given
batchSize, affecting the values that are passed to the first() and last() substreams. A size of -1 indicates that the stream should not be batched.
flush() must be called to pass the initial values
to those handlers. The stream will accept errors from the given parent.observable - The observable used to drive event handlersbatchSize - The size of the batches, or -1 for no batchingparent - The stream's parent. May be nullpublic Stream(@Nullable
Observable observable,
int batchSize,
@Nullable
Composable<?> parent,
Tuple2<Selector,java.lang.Object> acceptSelector,
@Nullable
Environment environment)
Observable to pass its values to registered
handlers.
The stream will batch values into batches of the given batchSize, affecting the values that are passed to
the first() and last() substreams. A size of -1 indicates that the stream should not be
batched.
flush() must be called to pass the initial values
to those handlers. The stream will accept errors from the given parent.observable - The observable used to drive event handlersbatchSize - The size of the batches, or -1 for no batchingparent - The stream's parent. May be nullacceptSelector - The tuple Selector/Key to accept values on this observable. May be nullpublic Stream<T> consume(@Nonnull Consumer<T> consumer)
Composableconsume in class Composable<T>consumer - the conumer to invoke on each valuepublic Stream<T> connect(@Nonnull Composable<T> consumer)
ComposableComposable to this one that will cascade the value or error received by this Composable into the next.connect in class Composable<T>consumer - the next Composable to cascade events topublic Stream<T> consume(@Nonnull java.lang.Object key, @Nonnull Observable observable)
ComposableComposable into the given Observable, notifying with the given key.consume in class Composable<T>key - the key to notify onobservable - the Observable to notifypublic Stream<T> flush()
Composablepublic <E extends java.lang.Throwable> Stream<T> when(@Nonnull java.lang.Class<E> exceptionType, @Nonnull Consumer<E> onError)
Composablewhen in class Composable<T>E - type of the exception to handleexceptionType - the type of exceptions to handleonError - the error handler for each exceptionpublic <V> Stream<V> map(@Nonnull Function<T,V> fn)
ComposableFunction to transform the incoming value T into a V and pass it into
another Composable.map in class Composable<T>V - the type of the return value of the transformation functionfn - the transformation functionComposable containing the transformed valuespublic <V,C extends Composable<V>> Stream<V> mapMany(@Nonnull Function<T,C> fn)
ComposableFunction to transform the incoming value T into a Composable<V> and pass
it into another Composable.mapMany in class Composable<T>V - the type of the return value of the transformation functionfn - the transformation functionComposable containing the transformed valuespublic Stream<java.lang.Boolean> filter()
ComposableComposable. If the predicate test fails, the value is ignored.filter in class Composable<T>Composable containing only values that pass the predicate testpublic Stream<java.lang.Boolean> filter(@Nonnull Composable<java.lang.Boolean> elseComposable)
ComposableComposable. the value is propagated into the .filter in class Composable<T>elseComposable - the Composable to test values againstComposable containing only values that pass the predicate testpublic Stream<T> filter(@Nonnull Function<T,java.lang.Boolean> p)
ComposableFunction. If the predicate test succeeds, the
value is passed into the new Composable. If the predicate test fails, an exception is propagated into the
new Composable.filter in class Composable<T>p - the predicate Function to test values againstComposable containing only values that pass the predicate testpublic Stream<T> filter(@Nonnull Predicate<T> p)
ComposablePredicate. If the predicate test succeeds, the value is
passed into the new Composable. If the predicate test fails, the value is ignored.filter in class Composable<T>p - the Predicate to test values againstComposable containing only values that pass the predicate testpublic Stream<T> filter(@Nonnull Predicate<T> p, Composable<T> elseComposable)
ComposablePredicate. If the predicate test succeeds, the value is
passed into the new Composable. If the predicate test fails, the value is propagated into the elseComposable.filter in class Composable<T>p - the Predicate to test values againstelseComposable - the optional Composable to pass rejected valuesComposable containing only values that pass the predicate testpublic Stream<T> connectValues(@Nonnull Composable<T> composable)
ComposableComposable to this one that will only cascade the value received by this Composable into the next.connectValues in class Composable<T>composable - the next Composable to cascade events topublic Stream<T> timeout(long timeout, Timer timer)
Composabletimeout in class Composable<T>timeout - the timeout in milliseconds between two notifications on this composabletimer - the reactor timer to run the timeout onComposablepublic Stream<T> timeout(long timeout)
Composabletimeout in class Composable<T>timeout - the timeout in milliseconds between two notifications on this composableComposablepublic Stream<T> merge(Composable<T>... composables)
Composablethis#connect(Composable) all the passed to this Composable,
merging values streams into the current pipeline.merge in class Composable<T>composables - the the composables to connectpublic Stream<T> consumeFlush(@Nonnull Flushable<?> consumer)
PipelineFlushableconsumeFlush in interface Pipeline<T>consumeFlush in class Composable<T>consumer - the action listening for flushpublic Stream<T> connectErrors(Composable<?> composable)
ComposableconnectErrors in class Composable<T>composable - the target sink for errores and flushespublic Stream<T> propagate(java.lang.Iterable<T> iterable)
Stream whose values will be each iterated item from
Every time flush is triggered, is drained.iterable - the iterable to drainStream whose values are the iterated one on flushpublic Stream<T> propagate(Supplier<T> supplier)
ComposableComposable whose values will be generated from .
Every time flush is triggered, is called.propagate in class Composable<T>supplier - the supplier to drainComposable whose values are generated on each flushpublic Stream<T> flushWhen(Predicate<T> predicate)
predicate - the test returning true to trigger flushpublic Stream<T> first()
Stream whose values will be only the first value of each batch. Requires a batchSize
to
have been set.
When a new batch is triggered, the first value of that next batch will be pushed into this Stream.
Stream whose values are the first value of each batchpublic Stream<T> first(int batchSize)
Stream whose values will be only the first value of each batch. Requires a batchSize
to
have been set.
When a new batch is triggered, the first value of that next batch will be pushed into this Stream.
batchSize - the batch size to useStream whose values are the first value of each batch)public Stream<T> last()
Stream whose values will be only the last value of each batch. Requires a batchSizeStream whose values are the last value of each batchpublic Stream<T> last(int batchSize)
Stream whose values will be only the last value of each batch. Requires a batchSizebatchSize - the batch size to useStream whose values are the last value of each batchpublic Stream<T> distinct()
Stream that filters out consecutive equals values.Stream whose values are the last value of each batchpublic <V> Stream<V> split()
Stream whose values will be each element E of any Iterable
When a new batch is triggered, the last value of that next batch will be pushed into this Stream.
Stream whose values result from the iterable inputpublic <V> Stream<V> split(int batchSize)
Stream whose values will be each element E of any Iterable
When a new batch is triggered, the last value of that next batch will be pushed into this Stream.
batchSize - the batch size to useStream whose values result from the iterable inputpublic Stream<T> buffer()
Observable.batchNotify(Object) to group dispatching by the current batch size.
When a new batch is triggered, the last value of that next batch will be pushed into this Stream.Stream whose values result from the iterable inputpublic Stream<T> buffer(int batchSize)
Observable.batchNotify(Object) to group dispatch by the passed .
When a new batch is triggered, the last value of that next batch will be pushed into this Stream.batchSize - the batch size to useStream whose values result from the iterable inputpublic Stream<T> bufferWithErrors()
List that will be pushed separately into the returned Stream
every time batchSize has been reached. All errors are also captured until current batchSize or flush is called.Stream whose values of all values in this batchpublic Stream<T> bufferWithErrors(int batchSize)
List that will be pushed separately into the returned Stream every
time batchSize has been reached. All errors are also captured until batchSize or flush is called.batchSize - the collected sizeStream whose values are all values in this batchpublic Stream<java.util.List<T>> collect()
List that will be pushed into the returned Stream every time batchSize or flush is triggered has been reached.Stream whose values are a List of all values in this batchpublic Stream<java.util.List<T>> collect(int batchSize)
List that will be pushed into the returned Stream every time batchSize has been reached.batchSize - the collected sizeStream whose values are a List of all values in this batchpublic Stream<java.util.List<T>> window(int period)
List that will be pushed into the returned Stream every specified
time from the in milliseconds. The window runs on a timer from the stream this#environment.period - the time period when each window close and flush the attached consumerStream whose values are a List of all values in this windowpublic Stream<java.util.List<T>> movingWindow(int period, int backlog)
List that will be pushed into the returned
Stream every specified in milliseconds. The window runs on a timer from the stream this#environment. After accepting of items, every old item will be dropped. Resulting List will be at most items long.period - the time period when each window close and flush the attached consumerbacklog - maximum amount of items to keepStream whose values are a List of all values in this windowpublic Stream<java.util.List<T>> window(int period, java.util.concurrent.TimeUnit timeUnit)
List that will be pushed into the returned Stream every specified
time from the and a . The window
runs on a timer from the stream this#environment.period - the time period when each window close and flush the attached consumertimeUnit - the time unit used for the periodStream whose values are a List of all values in this windowpublic Stream<java.util.List<T>> movingWindow(int period, java.util.concurrent.TimeUnit timeUnit, int backlog)
List that will be pushed into the returned
Stream every specified time from the and a .period - the time period when each window close and flush the attached consumertimeUnit - the time unit used for the periodbacklog - maximum amount of items to keepStream whose values are a List of all values in this windowpublic Stream<java.util.List<T>> window(int period, java.util.concurrent.TimeUnit timeUnit, int delay)
List that will be pushed into the returned Stream every specified
time from the , after an initial in milliseconds. The window
runs on a timer from the stream this#environment.period - the time period when each window close and flush the attached consumertimeUnit - the time unit used for the perioddelay - the initial delay in millisecondsStream whose values are a List of all values in this windowpublic Stream<java.util.List<T>> movingWindow(int period, java.util.concurrent.TimeUnit timeUnit, int delay, int backlog)
List that will be pushed into the returned
Stream every specified time from the and a after an initial
in milliseconds.period - the time period when each window close and flush the attached consumertimeUnit - the time unit used for the perioddelay - the initial delay in millisecondsbacklog - maximum amount of items to keepStream whose values are a List of all values in this windowpublic Stream<java.util.List<T>> window(int period, java.util.concurrent.TimeUnit timeUnit, int delay, Timer timer)
List that will be pushed into the returned Stream every specified
time from the , after an initial in milliseconds. The window
runs on a supplied .period - the time period when each window close and flush the attached consumertimeUnit - the time unit used for the perioddelay - the initial delay in millisecondstimer - the reactor timer to run the window onStream whose values are a List of all values in this windowpublic Stream<java.util.List<T>> movingWindow(int period, java.util.concurrent.TimeUnit timeUnit, int delay, int backlog, Timer timer)
List that will be pushed into the returned
Stream every specified time from the and a after an initial
in milliseconds.period - the time period when each window close and flush the attached consumertimeUnit - the time unit used for the perioddelay - the initial delay in millisecondsbacklog - maximum amount of items to keeptimer - the reactor timer to run the window onStream whose values are a List of all values in this windowpublic <A> Stream<A> reduce(@Nonnull Function<Tuple2<T,A>,A> fn, A initial)
Stream into an object A. The given initial object will be
passed to the function's Tuple2 argument.A - the type of the reduced objectfn - the reduce functioninitial - the initial argument to pass to the reduce functionStream whose values contain only the reduced objectspublic <A> Stream<A> reduce(@Nonnull Function<Tuple2<T,A>,A> fn, @Nullable Supplier<A> accumulators, int batchSize)
Stream into an object A. The given Supplier will be
used to produce initial accumulator objects either on the first reduce call, in the case of an unbounded Stream, or on the first value of each batch, if a batchSize is set.
In an unbounded Stream, the accumulated value will be published on the returned Stream on flush
only. But when a batchSize has been, the accumulated
value will only be published on the new Stream at the end of each batch. On the next value (the first of
the next batch), the Supplier is called again for a new accumulator object and the reduce starts over with
a new accumulator.
A - the type of the reduced objectfn - the reduce functionaccumulators - the Supplier that will provide accumulatorsbatchSize - the batch size to useStream whose values contain only the reduced objectspublic <A> Stream<A> reduce(@Nonnull Function<Tuple2<T,A>,A> fn)
Stream into an object A.A - the type of the reduced objectfn - the reduce functionStream whose values contain only the reduced objectspublic <A> Stream<A> scan(@Nonnull Function<Tuple2<T,A>,A> fn, A initial)
Stream into an object A. The given initial object will be
passed to the function's Tuple2 argument. Behave like Reduce but triggers downstream Stream for every
transformation.A - the type of the reduced objectfn - the scan functioninitial - the initial argument to pass to the reduce functionStream whose values contain only the reduced objectspublic <A> Stream<A> scan(@Nonnull Function<Tuple2<T,A>,A> fn, @Nullable Supplier<A> accumulators)
Stream into an object A. The given Supplier will be
used to produce initial accumulator objects either on the first reduce call, in the case of an unbounded Stream, or on the first value of each batch, if a batchSize is set.
The accumulated value will be published on the returned Stream every time
a
value is accepted.
A - the type of the reduced objectfn - the scan functionaccumulators - the Supplier that will provide accumulatorsStream whose values contain only the reduced objectspublic <A> Stream<A> scan(@Nonnull Function<Tuple2<T,A>,A> fn)
Stream into an object A.A - the type of the reduced objectfn - the reduce functionStream whose values contain only the reduced objectspublic Stream<T> count(Stream<java.lang.Long> stream)
stream - the stream to consume accumulated number of accepted event between 2 flushesprotected <V> Stream<V> newComposable()
ComposableComposable that is compatible with the subclass of Composable in use.newComposable in class Composable<T>V - type the Composable handlesComposable compatible with the current subclass.protected <V> Stream<V> newComposable(int batchSize)