Package reactor.netty.channel
Class ChannelOperations<INBOUND extends NettyInbound,OUTBOUND extends NettyOutbound>
java.lang.Object
reactor.netty.channel.ChannelOperations<INBOUND,OUTBOUND>
- All Implemented Interfaces:
Publisher<Void>
,Subscriber<Void>
,CoreSubscriber<Void>
,Disposable
,ChannelOperationsId
,Connection
,DisposableChannel
,NettyInbound
,NettyOutbound
- Direct Known Subclasses:
HttpOperations
public class ChannelOperations<INBOUND extends NettyInbound,OUTBOUND extends NettyOutbound>
extends Object
implements NettyInbound, NettyOutbound, Connection, CoreSubscriber<Void>, ChannelOperationsId
- Since:
- 0.6
- Author:
- Stephane Maldini
-
Nested Class Summary
Nested ClassesNested classes/interfaces inherited from interface reactor.core.Disposable
Disposable.Composite, Disposable.Swap
-
Constructor Summary
ConstructorsModifierConstructorDescriptionprotected
ChannelOperations
(ChannelOperations<INBOUND, OUTBOUND> replaced) ChannelOperations
(Connection connection, ConnectionObserver listener) Create a newChannelOperations
attached to theChannel
. -
Method Summary
Modifier and TypeMethodDescriptionstatic void
addMetricsHandler
(Channel ch, ChannelMetricsRecorder recorder, @Nullable SocketAddress remoteAddress, boolean onServer) AddNettyPipeline.ChannelMetricsHandler
to the channel pipeline.static void
addReactiveBridge
(Channel ch, ChannelOperations.OnSetup opsFactory, ConnectionObserver listener) AddNettyPipeline.ReactiveBridge
handler at the end ofChannel
pipeline.protected void
React after inbound completion (last packet).alloc()
Returns the assignedByteBufAllocator
.<T extends Connection>
@Nullable TReturn an existingConnection
that must match the given type wrapper or null.protected String
Transforms the object to a string for debug logs.The long string is a combination of the id of the underlying connection, local and remote addresses, and in case of HTTP, the serial number of the request received on that connection.The short string is a combination of the id of the underlying connection and in case of HTTP, the serial number of the request received on that connection.final Channel
channel()
Returns the underlyingChannel
.protected final Connection
Return the delegate IOConnection
for low-level IO access.final void
discard()
Drop pending content and complete inbound.protected final void
Drop pending content and complete inbound.void
dispose()
Releases or closes the underlyingChannel
.Returns aCoreSubscriber
that will dispose on complete or error.protected final String
Return formatted name of this operation.static @Nullable ChannelOperations<?,
?> Return the currentChannel
boundChannelOperations
or null if none.inbound()
Return theNettyInbound
read API from this connection.protected String
final boolean
final boolean
Return true if inbound traffic is not expected anymore.protected final boolean
Return true if inbound traffic is not incoming or expected anymore.final boolean
Return true if inbound traffic is not incoming or expected anymore.boolean
Return false if it will force a close on terminal protocol events thus defeating any pooling strategy Return true (default) if it will release on terminal protocol events thus keeping alive the channel if possible.final boolean
Return true if dispose subscription has been terminated.final ConnectionObserver
listener()
Return the available parentConnectionObserver
for user-facing lifecycle handling.final void
Returns an observingMono
terminating with success when shutdown successfully or error.onDispose
(Disposable onDispose) Assigns aDisposable
to be invoked when the channel is closed.final void
protected void
React on inbound cancel (receive() subscriber cancelled).protected void
React on inbound close (channel closed prematurely).protected void
React on inbound completion (last packet).protected final void
onInboundError
(Throwable err) React on inbound error.protected void
onInboundNext
(ChannelHandlerContext ctx, Object msg) React on inboundChannel.read()
.final void
protected void
React on inbound/outbound completion (last packet).protected void
onOutboundError
(Throwable err) React on inbound/outbound error.final void
Return a Mono succeeding when aChannelOperations
has been terminated.protected void
React on an unprocessed outbound error.protected void
React on Channel writability change.outbound()
Return theNettyOutbound
write API from this connection.receive()
AFlux
extension that allows for extra decoding operators.Flux<?>
a Object inboundFlux
.Sends data to the peer, listens for any error on write and closes on terminal signal (complete|error).sendObject
(Object message) Sends data to the peer, listens for any error on write and closes on terminal signal (complete|error).sendObject
(Publisher<?> dataStream, Predicate<Object> predicate) Sends an object through Netty pipeline.<S> NettyOutbound
sendUsing
(Callable<? extends S> sourceInput, BiFunction<? super Connection, ? super S, ?> mappedInput, Consumer<? super S> sourceCleanup) Binds a send to a starting/cleanup lifecycleprotected final void
Final release/close (last packet).toString()
withConnection
(Consumer<? super Connection> withConnection) Calls the passed callback with aConnection
to operate on the underlyingChannel
state.protected Throwable
Wrap an inbound error.Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
Methods inherited from interface reactor.netty.Connection
addHandler, addHandler, addHandlerFirst, addHandlerFirst, addHandlerLast, addHandlerLast, bind, markPersistent, onReadIdle, onWriteIdle, rebind, removeHandler, replaceHandler
Methods inherited from interface reactor.netty.DisposableChannel
address, disposeNow, disposeNow
Methods inherited from interface reactor.netty.NettyOutbound
neverComplete, send, sendByteArray, sendFile, sendFile, sendFileChunked, sendGroups, sendObject, sendString, sendString, subscribe, then, then, then
-
Constructor Details
-
ChannelOperations
-
ChannelOperations
Create a newChannelOperations
attached to theChannel
. Attach theNettyPipeline.ReactiveBridge
handle.- Parameters:
connection
- the newConnection
connectionlistener
- the events callback
-
-
Method Details
-
addReactiveBridge
public static void addReactiveBridge(Channel ch, ChannelOperations.OnSetup opsFactory, ConnectionObserver listener) AddNettyPipeline.ReactiveBridge
handler at the end ofChannel
pipeline. The bridge will buffer outgoing write and pass along incoming read to the currentget(Channel)
.- Parameters:
ch
- the channel to bridgeopsFactory
- the operations factory to invoke on channel activelistener
- the listener to forward connection events to
-
addMetricsHandler
public static void addMetricsHandler(Channel ch, ChannelMetricsRecorder recorder, @Nullable SocketAddress remoteAddress, boolean onServer) AddNettyPipeline.ChannelMetricsHandler
to the channel pipeline.- Parameters:
ch
- the channelrecorder
- the configured metrics recorderremoteAddress
- the remote addressonServer
- true ifChannelMetricsRecorder
is for the server, false if it is for the client
-
get
Return the currentChannel
boundChannelOperations
or null if none.- Parameters:
ch
- the currentChannel
- Returns:
- the current
Channel
boundChannelOperations
or null if none
-
as
Description copied from interface:Connection
Return an existingConnection
that must match the given type wrapper or null.- Specified by:
as
in interfaceConnection
- Parameters:
clazz
- connection type to match to- Returns:
- a matching
Connection
reference or null
-
alloc
Description copied from interface:NettyOutbound
Returns the assignedByteBufAllocator
.- Specified by:
alloc
in interfaceNettyOutbound
- Returns:
- the
ByteBufAllocator
-
inbound
Description copied from interface:Connection
Return theNettyInbound
read API from this connection. IfConnection
has not been configured with a supporting bridge, receive operations will be unavailable.- Specified by:
inbound
in interfaceConnection
- Returns:
- the
NettyInbound
read API from this connection.
-
outbound
Description copied from interface:Connection
Return theNettyOutbound
write API from this connection. IfConnection
has not been configured with a supporting bridge, send operations will be unavailable.- Specified by:
outbound
in interfaceConnection
- Returns:
- the
NettyOutbound
read API from this connection.
-
channel
Description copied from interface:DisposableChannel
Returns the underlyingChannel
. Direct interaction might be considered insecure if that affects the underlying I/O processing such as read, write or close or state such as pipeline handler addition/removal.- Specified by:
channel
in interfaceDisposableChannel
- Returns:
- the underlying
Channel
-
withConnection
public ChannelOperations<INBOUND,OUTBOUND> withConnection(Consumer<? super Connection> withConnection) Description copied from interface:NettyInbound
Calls the passed callback with aConnection
to operate on the underlyingChannel
state. This allows for chaining inbound API.- Specified by:
withConnection
in interfaceNettyInbound
- Specified by:
withConnection
in interfaceNettyOutbound
- Parameters:
withConnection
- connection callback- Returns:
- the
Connection
-
dispose
public void dispose()Description copied from interface:DisposableChannel
Releases or closes the underlyingChannel
.- Specified by:
dispose
in interfaceDisposable
- Specified by:
dispose
in interfaceDisposableChannel
-
disposeSubscriber
Description copied from interface:DisposableChannel
Returns aCoreSubscriber
that will dispose on complete or error.- Specified by:
disposeSubscriber
in interfaceDisposableChannel
-
isDisposed
public final boolean isDisposed()- Specified by:
isDisposed
in interfaceDisposable
- Specified by:
isDisposed
in interfaceDisposableChannel
-
isSubscriptionDisposed
public final boolean isSubscriptionDisposed()Return true if dispose subscription has been terminated.- Returns:
- true if dispose subscription has been terminated
-
onDispose
Description copied from interface:DisposableChannel
Returns an observingMono
terminating with success when shutdown successfully or error.- Specified by:
onDispose
in interfaceDisposableChannel
- Returns:
- a
Mono
terminating with success if shutdown successfully or error
-
onDispose
Description copied from interface:DisposableChannel
Assigns aDisposable
to be invoked when the channel is closed.- Specified by:
onDispose
in interfaceConnection
- Specified by:
onDispose
in interfaceDisposableChannel
- Parameters:
onDispose
- the close event handler- Returns:
- this
-
onComplete
public final void onComplete()- Specified by:
onComplete
in interfaceSubscriber<INBOUND extends NettyInbound>
-
onError
- Specified by:
onError
in interfaceSubscriber<INBOUND extends NettyInbound>
-
onNext
- Specified by:
onNext
in interfaceSubscriber<INBOUND extends NettyInbound>
-
onSubscribe
- Specified by:
onSubscribe
in interfaceCoreSubscriber<INBOUND extends NettyInbound>
- Specified by:
onSubscribe
in interfaceSubscriber<INBOUND extends NettyInbound>
-
receiveObject
Description copied from interface:NettyInbound
a Object inboundFlux
.- Specified by:
receiveObject
in interfaceNettyInbound
- Returns:
- a Object inbound
Flux
-
receive
Description copied from interface:NettyInbound
AFlux
extension that allows for extra decoding operators.- Specified by:
receive
in interfaceNettyInbound
- Returns:
- a new
ByteBufFlux
-
send
Description copied from interface:NettyOutbound
Sends data to the peer, listens for any error on write and closes on terminal signal (complete|error).A new
NettyOutbound
type (or the same) for typed send sequences.Note: Nesting any send* method is not supported.
- Specified by:
send
in interfaceNettyOutbound
- Parameters:
dataStream
- the dataStream publishing OUT items to write on this channelpredicate
- that returns true if explicit flush operation is needed after that buffer- Returns:
- A new
NettyOutbound
to append further send. It will emit a complete signal successful sequence write (e.g. after "flush") or any error during write.
-
sendObject
Description copied from interface:NettyOutbound
Sends an object through Netty pipeline. If type ofPublisher
, sends all signals, flushing on complete by default. Write occur in FIFO sequence.Note: Nesting any send* method is not supported.
- Specified by:
sendObject
in interfaceNettyOutbound
- Parameters:
dataStream
- the dataStream publishing items to write on this channel or a simple pojo supported by configured Netty handlerspredicate
- that returns true if explicit flush operation is needed after that object- Returns:
- A Publisher to signal successful sequence write (e.g. after "flush") or any error during write
-
sendObject
Description copied from interface:NettyOutbound
Sends data to the peer, listens for any error on write and closes on terminal signal (complete|error).Note: Nesting any send* method is not supported.
- Specified by:
sendObject
in interfaceNettyOutbound
- Parameters:
message
- the object to publish- Returns:
- A
Mono
to signal successful sequence write (e.g. after "flush") or any error during write
-
sendUsing
public <S> NettyOutbound sendUsing(Callable<? extends S> sourceInput, BiFunction<? super Connection, ? super S, ?> mappedInput, Consumer<? super S> sourceCleanup) Description copied from interface:NettyOutbound
Binds a send to a starting/cleanup lifecycleNote: Nesting any send* method is not supported.
- Specified by:
sendUsing
in interfaceNettyOutbound
- Type Parameters:
S
- state type- Parameters:
sourceInput
- state generatormappedInput
- input to sendsourceCleanup
- state cleaner- Returns:
- a new
NettyOutbound
-
onTerminate
Return a Mono succeeding when aChannelOperations
has been terminated.- Specified by:
onTerminate
in interfaceConnection
- Returns:
- a Mono succeeding when a
ChannelOperations
has been terminated
-
listener
Return the available parentConnectionObserver
for user-facing lifecycle handling.- Returns:
- the available parent
ConnectionObserver
for user-facing lifecycle handling
-
toString
-
discard
public final void discard()Drop pending content and complete inbound. Always discard content regardless whether there is a receiver. -
discardWhenNoReceiver
protected final void discardWhenNoReceiver()Drop pending content and complete inbound. Discard content only in case there is no receiver. -
isInboundCancelled
public final boolean isInboundCancelled()Return true if inbound traffic is not expected anymore.- Returns:
- true if inbound traffic is not expected anymore
-
isInboundDisposed
public final boolean isInboundDisposed()Return true if inbound traffic is not incoming or expected anymore. The buffered data is consumed.- Returns:
- true if inbound traffic is not incoming or expected anymore. The buffered data is consumed
-
isInboundComplete
protected final boolean isInboundComplete()Return true if inbound traffic is not incoming or expected anymore. The buffered data might still not be consumed.- Returns:
- true if inbound traffic is not incoming or expected anymore. The buffered data might still not be consumed.
-
onInboundNext
React on inboundChannel.read()
.- Parameters:
ctx
- the contextmsg
- the read payload
-
onInboundCancel
protected void onInboundCancel()React on inbound cancel (receive() subscriber cancelled). -
onInboundComplete
protected void onInboundComplete()React on inbound completion (last packet). -
afterInboundComplete
protected void afterInboundComplete()React after inbound completion (last packet). -
onInboundClose
protected void onInboundClose()React on inbound close (channel closed prematurely). -
onOutboundComplete
protected void onOutboundComplete()React on inbound/outbound completion (last packet). -
onOutboundError
React on inbound/outbound error.- Parameters:
err
- theThrowable
cause
-
terminate
protected final void terminate()Final release/close (last packet). -
onInboundError
React on inbound error.- Parameters:
err
- theThrowable
cause
-
connection
Return the delegate IOConnection
for low-level IO access.- Returns:
- the delegate IO
Connection
for low-level IO access
-
formatName
Return formatted name of this operation.- Returns:
- formatted name of this operation
-
initShortId
-
wrapInboundError
Wrap an inbound error.- Parameters:
err
- theThrowable
cause
-
asDebugLogMessage
Transforms the object to a string for debug logs.- Parameters:
o
- the object to be transformed- Returns:
- the string to be logged
- Since:
- 1.0.24
-
onWritabilityChanged
protected void onWritabilityChanged()React on Channel writability change.- Since:
- 1.0.37
-
onUnprocessedOutboundError
React on an unprocessed outbound error.- Since:
- 1.0.39
-
isPersistent
public boolean isPersistent()Description copied from interface:Connection
Return false if it will force a close on terminal protocol events thus defeating any pooling strategy Return true (default) if it will release on terminal protocol events thus keeping alive the channel if possible.- Specified by:
isPersistent
in interfaceConnection
- Returns:
- whether the underlying
Connection
will be disposed on terminal handler event
-
currentContext
- Specified by:
currentContext
in interfaceCoreSubscriber<INBOUND extends NettyInbound>
-
asShortText
Description copied from interface:ChannelOperationsId
The short string is a combination of the id of the underlying connection and in case of HTTP, the serial number of the request received on that connection.Format of the short string:
<CONNECTION_ID>-<REQUEST_NUMBER>
Example:
<CONNECTION_ID>: 329c6ffd <REQUEST_NUMBER>: 5 Result: 329c6ffd-5
- Specified by:
asShortText
in interfaceChannelOperationsId
-
asLongText
Description copied from interface:ChannelOperationsId
The long string is a combination of the id of the underlying connection, local and remote addresses, and in case of HTTP, the serial number of the request received on that connection.Format of the long string:
<CONNECTION_ID>-<REQUEST_NUMBER>, L:<LOCAL_ADDRESS> <CONNECTION_OPENED_CLOSED> R:<REMOTE_ADDRESS>
Example:
Opened connection <CONNECTION_ID>: 329c6ffd <REQUEST_NUMBER>: 5 <LOCAL_ADDRESS>: /0:0:0:0:0:0:0:1:64286 <CONNECTION_OPENED_CLOSED>: - (opened) <REMOTE_ADDRESS>: /0:0:0:0:0:0:0:1:64284 Result: 329c6ffd-5, L:/0:0:0:0:0:0:0:1:64286 - R:/0:0:0:0:0:0:0:1:64284 Closed connection <CONNECTION_ID>: 329c6ffd <REQUEST_NUMBER>: 5 <LOCAL_ADDRESS>: /0:0:0:0:0:0:0:1:64286 <CONNECTION_OPENED_CLOSED>: ! (closed) <REMOTE_ADDRESS>: /0:0:0:0:0:0:0:1:64284 Result: 329c6ffd-5, L:/0:0:0:0:0:0:0:1:64286 ! R:/0:0:0:0:0:0:0:1:64284
- Specified by:
asLongText
in interfaceChannelOperationsId
-