T - The type of the valuespublic abstract class Composable<T> extends java.lang.Object implements Pipeline<T>
map(reactor.function.Function)
and filter(reactor.function.Predicate) methods.| Modifier and Type | Field and Description |
|---|---|
static Event<java.lang.Object> |
END_EVENT |
FLUSH_EVENT| Modifier | Constructor and Description |
|---|---|
protected |
Composable(Observable observable,
Composable<U> parent) |
protected |
Composable(Observable observable,
Composable<U> parent,
Tuple2<Selector,java.lang.Object> acceptSelectorTuple,
Environment environment) |
| Modifier and Type | Method and Description |
|---|---|
Composable<T> |
add(Action<T> action)
Consume events with the passed
Action |
Composable<T> |
connect(Composable<T> composable)
Attach another
Composable to this one that will cascade the value or error received by this Composable into the next. |
Composable<T> |
connectErrors(Composable<?> composable)
Forward any error to the argument.
|
Composable<T> |
connectValues(Composable<T> composable)
Attach another
Composable to this one that will only cascade the value received by this Composable into the next. |
Composable<T> |
consume(Consumer<T> consumer)
|
Composable<T> |
consume(java.lang.Object key,
Observable observable)
Pass values accepted by this
Composable into the given Observable, notifying with the given key. |
protected Composable<T> |
consumeErrorAndFlush(Composable<?> composable)
Forward any error or flush to the argument.
|
Composable<T> |
consumeEvent(Consumer<Event<T>> consumer)
Attach a
Consumer to this Composable that will consume any values accepted by this Composable. |
Composable<T> |
consumeFlush(Flushable<?> action)
Consume flush with the passed
Flushable |
java.lang.String |
debug()
Print a debugged form of the root composable relative to this.
|
Composable<java.lang.Boolean> |
filter()
Evaluate each accepted boolean value.
|
Composable<java.lang.Boolean> |
filter(Composable<java.lang.Boolean> elseComposable)
Evaluate each accepted boolean value.
|
Composable<T> |
filter(Function<T,java.lang.Boolean> fn)
Evaluate each accepted value against the given predicate
Function. |
Composable<T> |
filter(Predicate<T> p)
Evaluate each accepted value against the given
Predicate. |
Composable<T> |
filter(Predicate<T> p,
Composable<T> elseComposable)
Evaluate each accepted value against the given
Predicate. |
Composable<T> |
flush()
Flush any cached or unprocessed values through this Stream.
|
protected java.lang.Object |
getAcceptKey()
Get the anonymous
Selector and notification key for doing accepts. |
protected Selector |
getAcceptSelector()
Get the anonymous
Selector and notification key for doing accepts. |
protected Environment |
getEnvironment()
Get the assigned
Environment. |
protected Selector |
getError()
Get the anonymous
Selector and notification key for doing errors. |
protected Selector |
getFlush()
Get the anonymous flush
Selector for batch consuming. |
protected Observable |
getObservable()
Get the current
Observable. |
protected Composable<?> |
getParent()
Get the parent
Composable for callback callback. |
<V> Composable<V> |
map(Function<T,V> fn)
Assign the given
Function to transform the incoming value T into a V and pass it into
another Composable. |
<V,C extends Composable<V>> |
mapMany(Function<T,C> fn)
Assign the given
Function to transform the incoming value T into a Composable<V> and pass
it into another Composable. |
Composable<T> |
merge(Composable<T>... composables)
this#connect(Composable) all the passed to this Composable,
merging values streams into the current pipeline. |
protected abstract <V> Composable<V> |
newComposable()
Create a
Composable that is compatible with the subclass of Composable in use. |
Composable<T> |
propagate(Supplier<T> supplier)
Create a new
Composable whose values will be generated from . |
Composable<T> |
timeout(long timeout)
Flush the parent if any or the current composable otherwise when the last notification occurred before milliseconds.
|
Composable<T> |
timeout(long timeout,
Timer timer)
Flush the parent if any or the current composable otherwise when the last notification occurred before milliseconds.
|
<E extends java.lang.Throwable> |
when(java.lang.Class<E> exceptionType,
Consumer<E> onError)
Assign an error handler to exceptions of the given type.
|
public static final Event<java.lang.Object> END_EVENT
protected Composable(@Nullable
Observable observable,
@Nullable
Composable<U> parent)
protected Composable(@Nullable
Observable observable,
@Nullable
Composable<U> parent,
@Nullable
Tuple2<Selector,java.lang.Object> acceptSelectorTuple,
@Nullable
Environment environment)
public <E extends java.lang.Throwable> Composable<T> when(@Nonnull java.lang.Class<E> exceptionType, @Nonnull Consumer<E> onError)
E - type of the exception to handleexceptionType - the type of exceptions to handleonError - the error handler for each exceptionpublic Composable<T> connect(@Nonnull Composable<T> composable)
Composable to this one that will cascade the value or error received by this Composable into the next.composable - the next Composable to cascade events topublic Composable<T> connectValues(@Nonnull Composable<T> composable)
Composable to this one that will only cascade the value received by this Composable into the next.composable - the next Composable to cascade events topublic Composable<T> consume(@Nonnull Consumer<T> consumer)
consumer - the conumer to invoke on each valuepublic Composable<T> consumeEvent(@Nonnull Consumer<Event<T>> consumer)
Consumer to this Composable that will consume any values accepted by this Composable.consumer - the conumer to invoke on each valuepublic Composable<T> consume(@Nonnull java.lang.Object key, @Nonnull Observable observable)
Composable into the given Observable, notifying with the given key.key - the key to notify onobservable - the Observable to notifypublic <V> Composable<V> map(@Nonnull Function<T,V> fn)
Function to transform the incoming value T into a V and pass it into
another Composable.V - the type of the return value of the transformation functionfn - the transformation functionComposable containing the transformed valuespublic <V,C extends Composable<V>> Composable<V> mapMany(@Nonnull Function<T,C> fn)
Function to transform the incoming value T into a Composable<V> and pass
it into another Composable.V - the type of the return value of the transformation functionfn - the transformation functionComposable containing the transformed valuespublic Composable<T> merge(Composable<T>... composables)
this#connect(Composable) all the passed to this Composable,
merging values streams into the current pipeline.composables - the the composables to connectpublic Composable<T> filter(@Nonnull Function<T,java.lang.Boolean> fn)
Function. If the predicate test succeeds, the
value is passed into the new Composable. If the predicate test fails, an exception is propagated into the
new Composable.fn - the predicate Function to test values againstComposable containing only values that pass the predicate testpublic Composable<java.lang.Boolean> filter()
Composable. If the predicate test fails, the value is ignored.Composable containing only values that pass the predicate testpublic Composable<java.lang.Boolean> filter(@Nonnull Composable<java.lang.Boolean> elseComposable)
Composable. the value is propagated into the .elseComposable - the Composable to test values againstComposable containing only values that pass the predicate testpublic Composable<T> filter(@Nonnull Predicate<T> p)
Predicate. If the predicate test succeeds, the value is
passed into the new Composable. If the predicate test fails, the value is ignored.p - the Predicate to test values againstComposable containing only values that pass the predicate testpublic Composable<T> filter(@Nonnull Predicate<T> p, Composable<T> elseComposable)
Predicate. If the predicate test succeeds, the value is
passed into the new Composable. If the predicate test fails, the value is propagated into the elseComposable.p - the Predicate to test values againstelseComposable - the optional Composable to pass rejected valuesComposable containing only values that pass the predicate testpublic Composable<T> timeout(long timeout)
timeout - the timeout in milliseconds between two notifications on this composableComposablepublic Composable<T> timeout(long timeout, Timer timer)
timeout - the timeout in milliseconds between two notifications on this composabletimer - the reactor timer to run the timeout onComposablepublic Composable<T> propagate(Supplier<T> supplier)
Composable whose values will be generated from .
Every time flush is triggered, is called.supplier - the supplier to drainComposable whose values are generated on each flushpublic Composable<T> flush()
public java.lang.String debug()
public Composable<T> add(Action<T> action)
Actionpublic Composable<T> consumeFlush(Flushable<?> action)
PipelineFlushableconsumeFlush in interface Pipeline<T>action - the action listening for flushpublic Composable<T> connectErrors(Composable<?> composable)
composable - the target sink for errores and flushesprotected Composable<T> consumeErrorAndFlush(Composable<?> composable)
composable - the target sink for errores and flushesprotected abstract <V> Composable<V> newComposable()
Composable that is compatible with the subclass of Composable in use.V - type the Composable handlesComposable compatible with the current subclass.protected Observable getObservable()
Observable.protected java.lang.Object getAcceptKey()
Selector and notification key for doing accepts.protected Selector getAcceptSelector()
Selector and notification key for doing accepts.protected Selector getFlush()
Selector for batch consuming.protected Selector getError()
Selector and notification key for doing errors.protected Composable<?> getParent()
Composable for callback callback.protected Environment getEnvironment()
Environment.