1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.reactor;
29
30 import java.io.IOException;
31 import java.net.SocketAddress;
32 import java.nio.ByteBuffer;
33 import java.nio.channels.ByteChannel;
34 import java.nio.channels.SelectionKey;
35 import java.util.Objects;
36 import java.util.Queue;
37 import java.util.concurrent.ConcurrentHashMap;
38 import java.util.concurrent.ConcurrentMap;
39 import java.util.concurrent.atomic.AtomicBoolean;
40 import java.util.concurrent.atomic.AtomicReference;
41 import java.util.concurrent.locks.Lock;
42
43 import javax.net.ssl.SSLContext;
44 import javax.net.ssl.SSLSession;
45
46 import org.apache.hc.core5.concurrent.CallbackContribution;
47 import org.apache.hc.core5.concurrent.FutureCallback;
48 import org.apache.hc.core5.function.Decorator;
49 import org.apache.hc.core5.io.CloseMode;
50 import org.apache.hc.core5.net.NamedEndpoint;
51 import org.apache.hc.core5.reactor.ssl.SSLBufferMode;
52 import org.apache.hc.core5.reactor.ssl.SSLIOSession;
53 import org.apache.hc.core5.reactor.ssl.SSLMode;
54 import org.apache.hc.core5.reactor.ssl.SSLSessionInitializer;
55 import org.apache.hc.core5.reactor.ssl.SSLSessionVerifier;
56 import org.apache.hc.core5.reactor.ssl.TlsDetails;
57 import org.apache.hc.core5.reactor.ssl.TransportSecurityLayer;
58 import org.apache.hc.core5.util.Args;
59 import org.apache.hc.core5.util.Asserts;
60 import org.apache.hc.core5.util.TextUtils;
61 import org.apache.hc.core5.util.Timeout;
62
63 final class InternalDataChannel extends InternalChannel implements ProtocolIOSession {
64
65 private final IOSession ioSession;
66 private final NamedEndpoint initialEndpoint;
67 private final Decorator<IOSession> ioSessionDecorator;
68 private final IOSessionListener sessionListener;
69 private final Queue<InternalDataChannel> closedSessions;
70 private final AtomicReference<SSLIOSession> tlsSessionRef;
71 private final AtomicReference<IOSession> currentSessionRef;
72 private final AtomicReference<IOEventHandler> eventHandlerRef;
73 private final ConcurrentMap<String, ProtocolUpgradeHandler> protocolUpgradeHandlerMap;
74 private final AtomicBoolean closed;
75
76 InternalDataChannel(
77 final IOSession ioSession,
78 final NamedEndpoint initialEndpoint,
79 final Decorator<IOSession> ioSessionDecorator,
80 final IOSessionListener sessionListener,
81 final Queue<InternalDataChannel> closedSessions) {
82 this.ioSession = ioSession;
83 this.initialEndpoint = initialEndpoint;
84 this.closedSessions = closedSessions;
85 this.ioSessionDecorator = ioSessionDecorator;
86 this.sessionListener = sessionListener;
87 this.tlsSessionRef = new AtomicReference<>();
88 this.currentSessionRef = new AtomicReference<>(
89 ioSessionDecorator != null ? ioSessionDecorator.decorate(ioSession) : ioSession);
90 this.eventHandlerRef = new AtomicReference<>();
91 this.protocolUpgradeHandlerMap = new ConcurrentHashMap<>();
92 this.closed = new AtomicBoolean();
93 }
94
95 @Override
96 public String getId() {
97 return ioSession.getId();
98 }
99
100 @Override
101 public NamedEndpoint getInitialEndpoint() {
102 return initialEndpoint;
103 }
104
105 @Override
106 public IOEventHandler getHandler() {
107 return eventHandlerRef.get();
108 }
109
110 @Override
111 public void upgrade(final IOEventHandler handler) {
112 final IOSession currentSession = currentSessionRef.get();
113 currentSession.upgrade(handler);
114 eventHandlerRef.set(handler);
115 }
116
117 private IOEventHandler ensureHandler(final IOSession session) {
118 final IOEventHandler handler = session.getHandler();
119 Asserts.notNull(handler, "IO event handler");
120 return handler;
121 }
122
123 @Override
124 void onIOEvent(final int readyOps) throws IOException {
125 if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
126 final IOSession currentSession = currentSessionRef.get();
127 currentSession.clearEvent(SelectionKey.OP_CONNECT);
128 if (tlsSessionRef.get() == null) {
129 if (sessionListener != null) {
130 sessionListener.connected(currentSession);
131 }
132 final IOEventHandler handler = ensureHandler(currentSession);
133 handler.connected(currentSession);
134 }
135 }
136 if ((readyOps & SelectionKey.OP_READ) != 0) {
137 final IOSession currentSession = currentSessionRef.get();
138 currentSession.updateReadTime();
139 if (sessionListener != null) {
140 sessionListener.inputReady(currentSession);
141 }
142 final IOEventHandler handler = ensureHandler(currentSession);
143 handler.inputReady(currentSession, null);
144 }
145 if ((readyOps & SelectionKey.OP_WRITE) != 0
146 || (ioSession.getEventMask() & SelectionKey.OP_WRITE) != 0) {
147 final IOSession currentSession = currentSessionRef.get();
148 currentSession.updateWriteTime();
149 if (sessionListener != null) {
150 sessionListener.outputReady(currentSession);
151 }
152 final IOEventHandler handler = ensureHandler(currentSession);
153 handler.outputReady(currentSession);
154 }
155 }
156
157 @Override
158 Timeout getTimeout() {
159 final IOSession currentSession = currentSessionRef.get();
160 return currentSession.getSocketTimeout();
161 }
162
163 @Override
164 void onTimeout(final Timeout timeout) throws IOException {
165 final IOSession currentSession = currentSessionRef.get();
166 if (sessionListener != null) {
167 sessionListener.timeout(currentSession);
168 }
169 final IOEventHandler handler = ensureHandler(currentSession);
170 handler.timeout(currentSession, timeout);
171 }
172
173 @Override
174 void onException(final Exception cause) {
175 final IOSession currentSession = currentSessionRef.get();
176 if (sessionListener != null) {
177 sessionListener.exception(currentSession, cause);
178 }
179 final IOEventHandler handler = currentSession.getHandler();
180 if (handler != null) {
181 handler.exception(currentSession, cause);
182 }
183 }
184
185 void onTLSSessionStart(final SSLIOSession sslSession) {
186 final IOSession currentSession = currentSessionRef.get();
187 if (sessionListener != null) {
188 sessionListener.connected(currentSession);
189 }
190 }
191
192 void onTLSSessionEnd(final SSLIOSession sslSession) {
193 if (closed.compareAndSet(false, true)) {
194 closedSessions.add(this);
195 }
196 }
197
198 void disconnected() {
199 final IOSession currentSession = currentSessionRef.get();
200 if (sessionListener != null) {
201 sessionListener.disconnected(currentSession);
202 }
203 final IOEventHandler handler = currentSession.getHandler();
204 if (handler != null) {
205 handler.disconnected(currentSession);
206 }
207 }
208
209 @Override
210 public void startTls(
211 final SSLContext sslContext,
212 final NamedEndpoint endpoint,
213 final SSLBufferMode sslBufferMode,
214 final SSLSessionInitializer initializer,
215 final SSLSessionVerifier verifier,
216 final Timeout handshakeTimeout) {
217 startTls(sslContext, endpoint, sslBufferMode, initializer, verifier, handshakeTimeout, null);
218 }
219
220 @Override
221 public void startTls(
222 final SSLContext sslContext,
223 final NamedEndpoint endpoint,
224 final SSLBufferMode sslBufferMode,
225 final SSLSessionInitializer initializer,
226 final SSLSessionVerifier verifier,
227 final Timeout handshakeTimeout,
228 final FutureCallback<TransportSecurityLayer> callback) {
229 final SSLIOSession sslioSession = new SSLIOSession(
230 endpoint != null ? endpoint : initialEndpoint,
231 ioSession,
232 initialEndpoint != null ? SSLMode.CLIENT : SSLMode.SERVER,
233 sslContext,
234 sslBufferMode,
235 initializer,
236 verifier,
237 handshakeTimeout,
238 this::onTLSSessionStart,
239 this::onTLSSessionEnd,
240 new CallbackContribution<SSLSession>(callback) {
241
242 @Override
243 public void completed(final SSLSession sslSession) {
244 if (callback != null) {
245 callback.completed(InternalDataChannel.this);
246 }
247 }
248
249 });
250 if (tlsSessionRef.compareAndSet(null, sslioSession)) {
251 currentSessionRef.set(ioSessionDecorator != null ? ioSessionDecorator.decorate(sslioSession) : sslioSession);
252 } else {
253 throw new IllegalStateException("TLS already activated");
254 }
255 try {
256 if (sessionListener != null) {
257 sessionListener.startTls(sslioSession);
258 }
259 sslioSession.beginHandshake(this);
260 } catch (final Exception ex) {
261 onException(ex);
262 }
263 }
264
265 @SuppressWarnings("resource")
266 @Override
267 public TlsDetails getTlsDetails() {
268 final SSLIOSession sslIoSession = tlsSessionRef.get();
269 return sslIoSession != null ? sslIoSession.getTlsDetails() : null;
270 }
271
272 @Override
273 public Lock getLock() {
274 return ioSession.getLock();
275 }
276
277 @Override
278 public void close() {
279 close(CloseMode.GRACEFUL);
280 }
281
282 @Override
283 public void close(final CloseMode closeMode) {
284 final IOSession currentSession = currentSessionRef.get();
285 if (closeMode == CloseMode.IMMEDIATE) {
286 closed.set(true);
287 currentSession.close(closeMode);
288 } else {
289 if (closed.compareAndSet(false, true)) {
290 try {
291 currentSession.close(closeMode);
292 } finally {
293 closedSessions.add(this);
294 }
295 }
296 }
297 }
298
299 @Override
300 public IOSession.Status getStatus() {
301 final IOSession currentSession = currentSessionRef.get();
302 return currentSession.getStatus();
303 }
304
305 @Override
306 public boolean isOpen() {
307 final IOSession currentSession = currentSessionRef.get();
308 return currentSession.isOpen();
309 }
310
311 @Override
312 public void enqueue(final Command command, final Command.Priority priority) {
313 final IOSession currentSession = currentSessionRef.get();
314 currentSession.enqueue(command, priority);
315 }
316
317 @Override
318 public boolean hasCommands() {
319 final IOSession currentSession = currentSessionRef.get();
320 return currentSession.hasCommands();
321 }
322
323 @Override
324 public Command poll() {
325 final IOSession currentSession = currentSessionRef.get();
326 return currentSession.poll();
327 }
328
329 @Override
330 public ByteChannel channel() {
331 final IOSession currentSession = currentSessionRef.get();
332 return currentSession.channel();
333 }
334
335 @Override
336 public SocketAddress getRemoteAddress() {
337 return ioSession.getRemoteAddress();
338 }
339
340 @Override
341 public SocketAddress getLocalAddress() {
342 return ioSession.getLocalAddress();
343 }
344
345 @Override
346 public int getEventMask() {
347 final IOSession currentSession = currentSessionRef.get();
348 return currentSession.getEventMask();
349 }
350
351 @Override
352 public void setEventMask(final int ops) {
353 final IOSession currentSession = currentSessionRef.get();
354 currentSession.setEventMask(ops);
355 }
356
357 @Override
358 public void setEvent(final int op) {
359 final IOSession currentSession = currentSessionRef.get();
360 currentSession.setEvent(op);
361 }
362
363 @Override
364 public void clearEvent(final int op) {
365 final IOSession currentSession = currentSessionRef.get();
366 currentSession.clearEvent(op);
367 }
368
369 @Override
370 public Timeout getSocketTimeout() {
371 return ioSession.getSocketTimeout();
372 }
373
374 @Override
375 public void setSocketTimeout(final Timeout timeout) {
376 ioSession.setSocketTimeout(timeout);
377 }
378
379 @Override
380 public int read(final ByteBuffer dst) throws IOException {
381 final IOSession currentSession = currentSessionRef.get();
382 return currentSession.read(dst);
383 }
384
385 @Override
386 public int write(final ByteBuffer src) throws IOException {
387 final IOSession currentSession = currentSessionRef.get();
388 return currentSession.write(src);
389 }
390
391 @Override
392 public void updateReadTime() {
393 ioSession.updateReadTime();
394 }
395
396 @Override
397 public void updateWriteTime() {
398 ioSession.updateWriteTime();
399 }
400
401 @Override
402 public long getLastReadTime() {
403 return ioSession.getLastReadTime();
404 }
405
406 @Override
407 public long getLastWriteTime() {
408 return ioSession.getLastWriteTime();
409 }
410
411 @Override
412 public long getLastEventTime() {
413 return ioSession.getLastEventTime();
414 }
415
416 @Override
417 public void switchProtocol(final String protocolId, final FutureCallback<ProtocolIOSession> callback) {
418 Args.notEmpty(protocolId, "Application protocol ID");
419 final ProtocolUpgradeHandler upgradeHandler = protocolUpgradeHandlerMap.get(TextUtils.toLowerCase(protocolId));
420 if (upgradeHandler != null) {
421 upgradeHandler.upgrade(this, callback);
422 } else {
423 throw new IllegalStateException("Unsupported protocol: " + protocolId);
424 }
425 }
426
427 @Override
428 public void registerProtocol(final String protocolId, final ProtocolUpgradeHandler upgradeHandler) {
429 Args.notEmpty(protocolId, "Application protocol ID");
430 Args.notNull(upgradeHandler, "Protocol upgrade handler");
431 protocolUpgradeHandlerMap.put(TextUtils.toLowerCase(protocolId), upgradeHandler);
432 }
433
434 @Override
435 public String toString() {
436 final IOSession currentSession = currentSessionRef.get();
437 return Objects.toString(currentSession != null ? currentSession: ioSession, null);
438 }
439
440 }