public class ChannelOperations<INBOUND extends NettyInbound,OUTBOUND extends NettyOutbound> extends Object implements NettyInbound, NettyOutbound, Connection, CoreSubscriber<Void>, ChannelOperationsId
Modifier and Type | Class and Description |
---|---|
static interface |
ChannelOperations.OnSetup
A
ChannelOperations factory |
Disposable.Composite, Disposable.Swap
Modifier | Constructor and Description |
---|---|
protected |
ChannelOperations(ChannelOperations<INBOUND,OUTBOUND> replaced) |
|
ChannelOperations(Connection connection,
ConnectionObserver listener)
Create a new
ChannelOperations attached to the Channel . |
Modifier and Type | Method and Description |
---|---|
static void |
addMetricsHandler(Channel ch,
ChannelMetricsRecorder recorder,
SocketAddress remoteAddress,
boolean onServer)
Add
NettyPipeline.ChannelMetricsHandler to the channel pipeline. |
static void |
addReactiveBridge(Channel ch,
ChannelOperations.OnSetup opsFactory,
ConnectionObserver listener)
Add
NettyPipeline.ReactiveBridge handler at the end of Channel
pipeline. |
protected void |
afterInboundComplete()
React after inbound completion (last packet)
|
ByteBufAllocator |
alloc()
Returns the assigned
ByteBufAllocator . |
<T extends Connection> |
as(Class<T> clazz)
Return an existing
Connection that must match the given type wrapper or
null. |
protected String |
asDebugLogMessage(Object o)
Transforms the object to a string for debug logs.
|
String |
asLongText()
The long string is a combination of the id of the underlying connection, local and remote addresses,
and in case of HTTP, the serial number of the request received on that connection.
|
String |
asShortText()
The short string is a combination of the id of the underlying connection
and in case of HTTP, the serial number of the request received on that connection.
|
Channel |
channel()
Returns the underlying
Channel . |
protected Connection |
connection()
Return the delegate IO
Connection for low-level IO access |
Context |
currentContext() |
void |
discard()
Drop pending content and complete inbound.
|
protected void |
discardWhenNoReceiver()
Drop pending content and complete inbound.
|
void |
dispose()
Releases or closes the underlying
Channel |
CoreSubscriber<Void> |
disposeSubscriber()
Returns a
CoreSubscriber that will dispose on complete or error |
protected String |
formatName()
Return formatted name of this operation
|
static ChannelOperations<?,?> |
get(Channel ch)
Return the current
Channel bound ChannelOperations or null if none |
NettyInbound |
inbound()
Return the
NettyInbound read API from this connection. |
protected String |
initShortId() |
boolean |
isDisposed() |
boolean |
isInboundCancelled()
Return true if inbound traffic is not expected anymore
|
protected boolean |
isInboundComplete()
Return true if inbound traffic is not incoming or expected anymore.
|
boolean |
isInboundDisposed()
Return true if inbound traffic is not incoming or expected anymore.
|
boolean |
isPersistent()
Return false if it will force a close on terminal protocol events thus defeating
any pooling strategy
Return true (default) if it will release on terminal protocol events thus
keeping alive the channel if possible.
|
boolean |
isSubscriptionDisposed()
Return true if dispose subscription has been terminated
|
ConnectionObserver |
listener()
Return the available parent
ConnectionObserver for user-facing lifecycle
handling |
void |
onComplete() |
Mono<Void> |
onDispose()
Returns an observing
Mono terminating with success when shutdown
successfully or error. |
Connection |
onDispose(Disposable onDispose)
Assigns a
Disposable to be invoked when the channel is closed. |
void |
onError(Throwable t) |
protected void |
onInboundCancel()
React on inbound cancel (receive() subscriber cancelled)
|
protected void |
onInboundClose()
React on inbound close (channel closed prematurely)
|
protected void |
onInboundComplete()
React on inbound completion (last packet)
|
protected void |
onInboundError(Throwable err)
React on inbound error
|
protected void |
onInboundNext(ChannelHandlerContext ctx,
Object msg)
React on inbound
Channel.read() |
void |
onNext(Void aVoid) |
protected void |
onOutboundComplete()
React on inbound/outbound completion (last packet)
|
protected void |
onOutboundError(Throwable err)
React on inbound/outbound error
|
void |
onSubscribe(Subscription s) |
Mono<Void> |
onTerminate()
Return a Mono succeeding when a
ChannelOperations has been terminated |
protected void |
onWritabilityChanged()
React on Channel writability change.
|
NettyOutbound |
outbound()
Return the
NettyOutbound write API from this connection. |
ByteBufFlux |
receive()
A
Flux extension that allows for extra decoding operators |
Flux<?> |
receiveObject()
a Object inbound
Flux |
NettyOutbound |
send(Publisher<? extends ByteBuf> dataStream,
Predicate<ByteBuf> predicate)
Sends data to the peer, listens for any error on write and closes on terminal signal
(complete|error).
|
NettyOutbound |
sendObject(Object message)
Sends data to the peer, listens for any error on write and closes on terminal signal
(complete|error).
|
NettyOutbound |
sendObject(Publisher<?> dataStream,
Predicate<Object> predicate)
Sends an object through Netty pipeline.
|
<S> NettyOutbound |
sendUsing(Callable<? extends S> sourceInput,
BiFunction<? super Connection,? super S,?> mappedInput,
Consumer<? super S> sourceCleanup)
Binds a send to a starting/cleanup lifecycle
|
protected void |
terminate()
Final release/close (last packet)
|
String |
toString() |
ChannelOperations<INBOUND,OUTBOUND> |
withConnection(Consumer<? super Connection> withConnection)
Calls the passed callback with a
Connection to operate on the
underlying Channel state. |
protected Throwable |
wrapInboundError(Throwable err)
Wrap an inbound error
|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
neverComplete, send, sendByteArray, sendFile, sendFile, sendFileChunked, sendGroups, sendObject, sendString, sendString, subscribe, then, then, then
addHandler, addHandler, addHandlerFirst, addHandlerFirst, addHandlerLast, addHandlerLast, bind, from, markPersistent, onReadIdle, onWriteIdle, rebind, removeHandler, replaceHandler
address, disposeNow, disposeNow
protected ChannelOperations(ChannelOperations<INBOUND,OUTBOUND> replaced)
public ChannelOperations(Connection connection, ConnectionObserver listener)
ChannelOperations
attached to the Channel
. Attach the NettyPipeline.ReactiveBridge
handle.connection
- the new Connection
connectionlistener
- the events callbackpublic static void addReactiveBridge(Channel ch, ChannelOperations.OnSetup opsFactory, ConnectionObserver listener)
NettyPipeline.ReactiveBridge
handler at the end of Channel
pipeline. The bridge will buffer outgoing write and pass along incoming read to
the current get(Channel)
.ch
- the channel to bridgeopsFactory
- the operations factory to invoke on channel activelistener
- the listener to forward connection events topublic static void addMetricsHandler(Channel ch, ChannelMetricsRecorder recorder, @Nullable SocketAddress remoteAddress, boolean onServer)
NettyPipeline.ChannelMetricsHandler
to the channel pipeline.ch
- the channelrecorder
- the configured metrics recorderremoteAddress
- the remote addressonServer
- true if ChannelMetricsRecorder
is for the server, false if it is for the client@Nullable public static ChannelOperations<?,?> get(Channel ch)
Channel
bound ChannelOperations
or null if nonech
- the current Channel
Channel
bound ChannelOperations
or null if none@Nullable public <T extends Connection> T as(Class<T> clazz)
Connection
Connection
that must match the given type wrapper or
null.as
in interface Connection
clazz
- connection type to match toConnection
reference or nullpublic ByteBufAllocator alloc()
NettyOutbound
ByteBufAllocator
.alloc
in interface NettyOutbound
ByteBufAllocator
public NettyInbound inbound()
Connection
NettyInbound
read API from this connection. If
Connection
has not been configured with a supporting bridge, receive
operations will be unavailable.inbound
in interface Connection
NettyInbound
read API from this connection.public NettyOutbound outbound()
Connection
NettyOutbound
write API from this connection. If
Connection
has not been configured with a supporting bridge, send
operations will be unavailable.outbound
in interface Connection
NettyOutbound
read API from this connection.public final Channel channel()
DisposableChannel
Channel
. Direct interaction might be considered
insecure if that affects the underlying I/O processing such as read, write or close
or state such as pipeline handler addition/removal.channel
in interface DisposableChannel
Channel
public ChannelOperations<INBOUND,OUTBOUND> withConnection(Consumer<? super Connection> withConnection)
NettyInbound
Connection
to operate on the
underlying Channel
state. This allows for chaining inbound API.withConnection
in interface NettyInbound
withConnection
in interface NettyOutbound
withConnection
- connection callbackConnection
public void dispose()
DisposableChannel
Channel
dispose
in interface Disposable
dispose
in interface DisposableChannel
public CoreSubscriber<Void> disposeSubscriber()
DisposableChannel
CoreSubscriber
that will dispose on complete or errordisposeSubscriber
in interface DisposableChannel
public final boolean isDisposed()
isDisposed
in interface Disposable
isDisposed
in interface DisposableChannel
public final boolean isSubscriptionDisposed()
public final Mono<Void> onDispose()
DisposableChannel
Mono
terminating with success when shutdown
successfully or error.onDispose
in interface DisposableChannel
Mono
terminating with success if shutdown successfully or errorpublic Connection onDispose(Disposable onDispose)
DisposableChannel
Disposable
to be invoked when the channel is closed.onDispose
in interface Connection
onDispose
in interface DisposableChannel
onDispose
- the close event handlerpublic final void onComplete()
onComplete
in interface Subscriber<Void>
public final void onError(Throwable t)
onError
in interface Subscriber<Void>
public final void onNext(Void aVoid)
onNext
in interface Subscriber<Void>
public final void onSubscribe(Subscription s)
onSubscribe
in interface Subscriber<Void>
onSubscribe
in interface CoreSubscriber<Void>
public Flux<?> receiveObject()
NettyInbound
Flux
receiveObject
in interface NettyInbound
Flux
public ByteBufFlux receive()
NettyInbound
Flux
extension that allows for extra decoding operatorsreceive
in interface NettyInbound
ByteBufFlux
public NettyOutbound send(Publisher<? extends ByteBuf> dataStream, Predicate<ByteBuf> predicate)
NettyOutbound
A new NettyOutbound
type (or the same) for typed send
sequences.
Note: Nesting any send* method is not supported.
send
in interface NettyOutbound
dataStream
- the dataStream publishing OUT items to write on this channelpredicate
- that returns true if explicit flush operation is needed after that bufferNettyOutbound
to append further send. It will emit a complete
signal successful sequence write (e.g. after "flush") or any error during write.public NettyOutbound sendObject(Publisher<?> dataStream, Predicate<Object> predicate)
NettyOutbound
Publisher
, sends all signals,
flushing on complete by default. Write occur in FIFO sequence.
Note: Nesting any send* method is not supported.
sendObject
in interface NettyOutbound
dataStream
- the dataStream publishing items to write on this channel
or a simple pojo supported by configured Netty handlerspredicate
- that returns true if explicit flush operation is needed after that objectpublic NettyOutbound sendObject(Object message)
NettyOutbound
Note: Nesting any send* method is not supported.
sendObject
in interface NettyOutbound
message
- the object to publishMono
to signal successful sequence write (e.g. after "flush") or
any error during writepublic <S> NettyOutbound sendUsing(Callable<? extends S> sourceInput, BiFunction<? super Connection,? super S,?> mappedInput, Consumer<? super S> sourceCleanup)
NettyOutbound
Note: Nesting any send* method is not supported.
sendUsing
in interface NettyOutbound
S
- state typesourceInput
- state generatormappedInput
- input to sendsourceCleanup
- state cleanerNettyOutbound
public final Mono<Void> onTerminate()
ChannelOperations
has been terminatedonTerminate
in interface Connection
ChannelOperations
has been terminatedpublic final ConnectionObserver listener()
ConnectionObserver
for user-facing lifecycle
handlingConnectionObserver
for user-facing lifecycle
handlingpublic final void discard()
protected final void discardWhenNoReceiver()
public final boolean isInboundCancelled()
public final boolean isInboundDisposed()
protected final boolean isInboundComplete()
protected void onInboundNext(ChannelHandlerContext ctx, Object msg)
Channel.read()
ctx
- the contextmsg
- the read payloadprotected void onInboundCancel()
protected void onInboundComplete()
protected void afterInboundComplete()
protected void onInboundClose()
protected void onOutboundComplete()
protected void onOutboundError(Throwable err)
err
- the Throwable
causeprotected final void terminate()
protected final void onInboundError(Throwable err)
err
- the Throwable
causeprotected final Connection connection()
Connection
for low-level IO accessConnection
for low-level IO accessprotected final String formatName()
protected String initShortId()
protected Throwable wrapInboundError(Throwable err)
err
- the Throwable
causeprotected String asDebugLogMessage(Object o)
o
- the object to be transformedprotected void onWritabilityChanged()
public boolean isPersistent()
Connection
isPersistent
in interface Connection
Connection
will be disposed on
terminal handler eventpublic Context currentContext()
currentContext
in interface CoreSubscriber<Void>
public String asShortText()
ChannelOperationsId
Format of the short string:
<CONNECTION_ID>-<REQUEST_NUMBER>
Example:
<CONNECTION_ID>: 329c6ffd
<REQUEST_NUMBER>: 5
Result: 329c6ffd-5
asShortText
in interface ChannelOperationsId
public String asLongText()
ChannelOperationsId
Format of the long string:
<CONNECTION_ID>-<REQUEST_NUMBER>, L:<LOCAL_ADDRESS> <CONNECTION_OPENED_CLOSED> R:<REMOTE_ADDRESS>
Example:
Opened connection
<CONNECTION_ID>: 329c6ffd
<REQUEST_NUMBER>: 5
<LOCAL_ADDRESS>: /0:0:0:0:0:0:0:1:64286
<CONNECTION_OPENED_CLOSED>: - (opened)
<REMOTE_ADDRESS>: /0:0:0:0:0:0:0:1:64284
Result: 329c6ffd-5, L:/0:0:0:0:0:0:0:1:64286 - R:/0:0:0:0:0:0:0:1:64284
Closed connection
<CONNECTION_ID>: 329c6ffd
<REQUEST_NUMBER>: 5
<LOCAL_ADDRESS>: /0:0:0:0:0:0:0:1:64286
<CONNECTION_OPENED_CLOSED>: ! (closed)
<REMOTE_ADDRESS>: /0:0:0:0:0:0:0:1:64284
Result: 329c6ffd-5, L:/0:0:0:0:0:0:0:1:64286 ! R:/0:0:0:0:0:0:0:1:64284
asLongText
in interface ChannelOperationsId