Skip to content

Package: TunnelFilter$ConnectCompletionHandler

TunnelFilter$ConnectCompletionHandler

nameinstructionbranchcomplexitylinemethod
TunnelFilter.ConnectCompletionHandler(TunnelFilter, FilterChainContext)
M: 9 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 3 C: 0
0%
M: 1 C: 0
0%
cancelled()
M: 7 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 3 C: 0
0%
M: 1 C: 0
0%
completed(Connection)
M: 19 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 5 C: 0
0%
M: 1 C: 0
0%
failed(Throwable)
M: 7 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 3 C: 0
0%
M: 1 C: 0
0%
resumeContext()
M: 4 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 2 C: 0
0%
M: 1 C: 0
0%
updated(Connection)
M: 1 C: 0
0%
M: 0 C: 0
100%
M: 1 C: 0
0%
M: 1 C: 0
0%
M: 1 C: 0
0%

Coverage

1: /*
2: * Copyright (c) 2009, 2020 Oracle and/or its affiliates. All rights reserved.
3: *
4: * This program and the accompanying materials are made available under the
5: * terms of the Eclipse Distribution License v. 1.0, which is available at
6: * http://www.eclipse.org/org/documents/edl-v10.php.
7: *
8: * SPDX-License-Identifier: BSD-3-Clause
9: */
10:
11: package org.glassfish.grizzly.samples.tunnel;
12:
13: import java.io.IOException;
14: import java.net.InetSocketAddress;
15: import java.net.SocketAddress;
16: import java.util.logging.Level;
17: import java.util.logging.Logger;
18:
19: import org.glassfish.grizzly.CompletionHandler;
20: import org.glassfish.grizzly.Connection;
21: import org.glassfish.grizzly.Grizzly;
22: import org.glassfish.grizzly.SocketConnectorHandler;
23: import org.glassfish.grizzly.WriteHandler;
24: import org.glassfish.grizzly.asyncqueue.AsyncQueueWriter;
25: import org.glassfish.grizzly.attributes.Attribute;
26: import org.glassfish.grizzly.filterchain.BaseFilter;
27: import org.glassfish.grizzly.filterchain.FilterChainContext;
28: import org.glassfish.grizzly.filterchain.NextAction;
29:
30: /**
31: * Simple tunneling filter, which maps input of one connection to the output of another and vise versa.
32: *
33: * @author Alexey Stashok
34: */
35: public class TunnelFilter extends BaseFilter {
36: private static final Logger logger = Grizzly.logger(TunnelFilter.class);
37:
38: private final Attribute<Connection> peerConnectionAttribute = Grizzly.DEFAULT_ATTRIBUTE_BUILDER.createAttribute("TunnelFilter.peerConnection");
39:
40: // Transport, which will be used to create peer connection
41: private final SocketConnectorHandler transport;
42:
43: // Destination address for peer connections
44: private final SocketAddress redirectAddress;
45:
46: public TunnelFilter(SocketConnectorHandler transport, String host, int port) {
47: this(transport, new InetSocketAddress(host, port));
48: }
49:
50: public TunnelFilter(SocketConnectorHandler transport, SocketAddress redirectAddress) {
51: this.transport = transport;
52: this.redirectAddress = redirectAddress;
53: }
54:
55: /**
56: * This method will be called, once {@link Connection} has some available data
57: */
58: @SuppressWarnings("unchecked")
59: @Override
60: public NextAction handleRead(final FilterChainContext ctx) throws IOException {
61: logger.log(Level.FINEST, "Connection: {0} handleRead: {1}", new Object[] { ctx.getConnection(), ctx.getMessage() });
62:
63: final Connection connection = ctx.getConnection();
64: final Connection peerConnection = peerConnectionAttribute.get(connection);
65:
66: // if connection is closed - stop the execution
67: if (!connection.isOpen()) {
68: return ctx.getStopAction();
69: }
70:
71: final NextAction suspendNextAction = ctx.getSuspendAction();
72:
73: // if peerConnection wasn't created - create it (usually happens on first connection request)
74: if (peerConnection == null) {
75: // "Peer connect" phase could take some time - so execute it in non-blocking mode
76:
77: // Connect peer connection and register completion handler
78: transport.connect(redirectAddress, new ConnectCompletionHandler(ctx));
79:
80: // return suspend status
81: return suspendNextAction;
82: }
83:
84: final Object message = ctx.getMessage();
85:
86: // if peer connection is already created - just forward data to peer
87: redirectToPeer(ctx, peerConnection, message);
88:
89: final AsyncQueueWriter writer = (AsyncQueueWriter) connection.getTransport().getWriter(false);
90:
91: if (writer.canWrite(peerConnection)) {
92: return ctx.getStopAction();
93: }
94:
95: // Make sure we don't overload peer's output buffer and do not cause OutOfMemoryError
96: ctx.suspend();
97: writer.notifyWritePossible(peerConnection, new WriteHandler() {
98:
99: @Override
100: public void onWritePossible() throws Exception {
101: finish();
102: }
103:
104: @Override
105: public void onError(Throwable t) {
106: finish();
107: }
108:
109: private void finish() {
110: ctx.resumeNext();
111: }
112: });
113:
114: // return ctx.getStopAction();
115: return suspendNextAction;
116: }
117:
118: /**
119: * This method will be called, to notify about {@link Connection} closing.
120: */
121: @Override
122: public NextAction handleClose(FilterChainContext ctx) throws IOException {
123: final Connection connection = ctx.getConnection();
124: final Connection peerConnection = peerConnectionAttribute.get(connection);
125:
126: // Close peer connection as well, if it wasn't closed before
127: if (peerConnection != null && peerConnection.isOpen()) {
128: peerConnection.closeSilently();
129: }
130:
131: return ctx.getInvokeAction();
132: }
133:
134: /**
135: * Redirect data from {@link Connection} to its peer.
136: *
137: * @param context {@link FilterChainContext}
138: * @param peerConnection peer {@link Connection}
139: * @throws IOException
140: */
141: @SuppressWarnings("unchecked")
142: private static void redirectToPeer(final FilterChainContext context, final Connection peerConnection, Object message) throws IOException {
143:
144: final Connection srcConnection = context.getConnection();
145: logger.log(Level.FINE, "Redirecting from {0} to {1} message: {2}",
146: new Object[] { srcConnection.getPeerAddress(), peerConnection.getPeerAddress(), message });
147:
148: peerConnection.write(message);
149: }
150:
151: /**
152: * Peer connect {@link CompletionHandler}
153: */
154: private class ConnectCompletionHandler implements CompletionHandler<Connection> {
155: private final FilterChainContext context;
156:
157: private ConnectCompletionHandler(FilterChainContext context) {
158: this.context = context;
159: }
160:
161: @Override
162: public void cancelled() {
163: context.getConnection().closeSilently();
164: resumeContext();
165: }
166:
167: @Override
168: public void failed(Throwable throwable) {
169: context.getConnection().closeSilently();
170: resumeContext();
171: }
172:
173: /**
174: * If peer was successfully connected - map both connections to each other.
175: */
176: @Override
177: public void completed(Connection peerConnection) {
178: final Connection connection = context.getConnection();
179:
180: // Map connections
181: peerConnectionAttribute.set(connection, peerConnection);
182: peerConnectionAttribute.set(peerConnection, connection);
183:
184: // Resume filter chain execution
185: resumeContext();
186: }
187:
188: @Override
189: public void updated(Connection peerConnection) {
190: }
191:
192: /**
193: * Resume {@link org.glassfish.grizzly.filterchain.FilterChain} execution on stage, where it was earlier suspended.
194: */
195: private void resumeContext() {
196: context.resume();
197: }
198: }
199: }