View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.reactor;
29  
30  import java.io.IOException;
31  import java.net.SocketAddress;
32  import java.nio.channels.ByteChannel;
33  import java.nio.channels.SelectionKey;
34  import java.util.Queue;
35  import java.util.concurrent.atomic.AtomicBoolean;
36  import java.util.concurrent.atomic.AtomicReference;
37  import java.util.concurrent.locks.Lock;
38  
39  import javax.net.ssl.SSLContext;
40  
41  import org.apache.hc.core5.function.Callback;
42  import org.apache.hc.core5.io.CloseMode;
43  import org.apache.hc.core5.net.NamedEndpoint;
44  import org.apache.hc.core5.reactor.ssl.SSLBufferMode;
45  import org.apache.hc.core5.reactor.ssl.SSLIOSession;
46  import org.apache.hc.core5.reactor.ssl.SSLMode;
47  import org.apache.hc.core5.reactor.ssl.SSLSessionInitializer;
48  import org.apache.hc.core5.reactor.ssl.SSLSessionVerifier;
49  import org.apache.hc.core5.reactor.ssl.TlsDetails;
50  import org.apache.hc.core5.util.Asserts;
51  import org.apache.hc.core5.util.Timeout;
52  
53  final class InternalDataChannel extends InternalChannel implements ProtocolIOSession {
54  
55      private final IOSession ioSession;
56      private final NamedEndpoint initialEndpoint;
57      private final IOSessionListener sessionListener;
58      private final AtomicReference<SSLIOSession> tlsSessionRef;
59      private final Queue<InternalDataChannel> closedSessions;
60      private final AtomicReference<IOEventHandler> handlerRef;
61      private final AtomicBoolean connected;
62      private final AtomicBoolean closed;
63  
64      InternalDataChannel(
65              final IOSession ioSession,
66              final NamedEndpoint initialEndpoint,
67              final IOSessionListener sessionListener,
68              final Queue<InternalDataChannel> closedSessions) {
69          this.ioSession = ioSession;
70          this.initialEndpoint = initialEndpoint;
71          this.closedSessions = closedSessions;
72          this.sessionListener = sessionListener;
73          this.tlsSessionRef = new AtomicReference<>(null);
74          this.handlerRef = new AtomicReference<>(null);
75          this.connected = new AtomicBoolean(false);
76          this.closed = new AtomicBoolean(false);
77      }
78  
79      @Override
80      public String getId() {
81          return ioSession.getId();
82      }
83  
84      @Override
85      public IOEventHandler getHandler() {
86          return handlerRef.get();
87      }
88  
89      @Override
90      public NamedEndpoint getInitialEndpoint() {
91          return initialEndpoint;
92      }
93  
94      @Override
95      public void upgrade(final IOEventHandler handler) {
96          handlerRef.set(handler);
97      }
98  
99      private IOEventHandler ensureHandler() {
100         final IOEventHandler handler = handlerRef.get();
101         Asserts.notNull(handler, "IO event handler");
102         return handler;
103     }
104 
105     @Override
106     void onIOEvent(final int readyOps) throws IOException {
107         final SSLIOSession tlsSession = tlsSessionRef.get();
108         if (tlsSession != null) {
109             if (!tlsSession.isInitialized()) {
110                 tlsSession.initialize();
111                 if (sessionListener != null) {
112                     sessionListener.tlsStarted(tlsSession);
113                 }
114             }
115             if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
116                 tlsSession.clearEvent(SelectionKey.OP_CONNECT);
117             }
118             if ((readyOps & SelectionKey.OP_READ) != 0) {
119                 ioSession.updateReadTime();
120                 do {
121                     tlsSession.resetReadCount();
122                     if (tlsSession.isAppInputReady()) {
123                         if (sessionListener != null) {
124                             sessionListener.inputReady(this);
125                         }
126                         final IOEventHandler handler = ensureHandler();
127                         handler.inputReady(this);
128                     }
129                     tlsSession.inboundTransport();
130                     if (sessionListener != null) {
131                         sessionListener.tlsInbound(tlsSession);
132                     }
133                 } while (tlsSession.getReadCount() > 0);
134             }
135             if ((readyOps & SelectionKey.OP_WRITE) != 0
136                     || (ioSession.getEventMask() & SelectionKey.OP_WRITE) != 0) {
137                 ioSession.updateWriteTime();
138                 if (tlsSession.isAppOutputReady()) {
139                     if (sessionListener != null) {
140                         sessionListener.outputReady(this);
141                     }
142                     final IOEventHandler handler = ensureHandler();
143                     handler.outputReady(this);
144                 }
145                 tlsSession.outboundTransport();
146                 if (sessionListener != null) {
147                     sessionListener.tlsOutbound(tlsSession);
148                 }
149             }
150         } else {
151             if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
152                 ioSession.clearEvent(SelectionKey.OP_CONNECT);
153                 if (connected.compareAndSet(false, true)) {
154                     if (sessionListener != null) {
155                         sessionListener.connected(this);
156                     }
157                     final IOEventHandler handler = ensureHandler();
158                     handler.connected(this);
159                 }
160             }
161             if ((readyOps & SelectionKey.OP_READ) != 0) {
162                 ioSession.updateReadTime();
163                 if (sessionListener != null) {
164                     sessionListener.inputReady(this);
165                 }
166                 final IOEventHandler handler = ensureHandler();
167                 handler.inputReady(this);
168             }
169             if ((readyOps & SelectionKey.OP_WRITE) != 0) {
170                 ioSession.updateWriteTime();
171                 if (sessionListener != null) {
172                     sessionListener.outputReady(this);
173                 }
174                 final IOEventHandler handler = ensureHandler();
175                 handler.outputReady(this);
176             }
177         }
178     }
179 
180     @Override
181     Timeout getTimeout() {
182         return ioSession.getSocketTimeout();
183     }
184 
185     @Override
186     void onTimeout(final Timeout timeout) throws IOException {
187         final IOEventHandler handler = ensureHandler();
188         handler.timeout(this, timeout);
189         final SSLIOSession tlsSession = tlsSessionRef.get();
190         if (tlsSession != null) {
191             if (tlsSession.isOutboundDone() && !tlsSession.isInboundDone()) {
192                 // The session failed to terminate cleanly
193                 tlsSession.close(CloseMode.IMMEDIATE);
194             }
195         }
196     }
197 
198     @Override
199     void onException(final Exception cause) {
200         final IOEventHandler handler = ensureHandler();
201         if (sessionListener != null) {
202             sessionListener.exception(this, cause);
203         }
204         handler.exception(this, cause);
205     }
206 
207     void disconnected() {
208         if (sessionListener != null) {
209             sessionListener.disconnected(this);
210         }
211         final IOEventHandler handler = ensureHandler();
212         handler.disconnected(this);
213     }
214 
215     @Override
216     public void startTls(
217             final SSLContext sslContext,
218             final NamedEndpoint endpoint,
219             final SSLBufferMode sslBufferMode,
220             final SSLSessionInitializer initializer,
221             final SSLSessionVerifier verifier) {
222         if (!tlsSessionRef.compareAndSet(null, new SSLIOSession(
223                 endpoint != null ? endpoint : initialEndpoint,
224                 ioSession,
225                 initialEndpoint != null ? SSLMode.CLIENT : SSLMode.SERVER,
226                 sslContext,
227                 sslBufferMode,
228                 initializer,
229                 verifier,
230                 new Callback<SSLIOSession>() {
231 
232                     @Override
233                     public void execute(final SSLIOSession sslSession) {
234                         if (connected.compareAndSet(false, true)) {
235                             final IOEventHandler handler = ensureHandler();
236                             try {
237                                 if (sessionListener != null) {
238                                     sessionListener.connected(InternalDataChannel.this);
239                                 }
240                                 handler.connected(InternalDataChannel.this);
241                             } catch (final Exception ex) {
242                                 if (sessionListener != null) {
243                                     sessionListener.exception(InternalDataChannel.this, ex);
244                                 }
245                                 handler.exception(InternalDataChannel.this, ex);
246                             }
247                         }
248                     }
249 
250                 }))) {
251             throw new IllegalStateException("TLS already activated");
252         }
253     }
254 
255     @SuppressWarnings("resource")
256     @Override
257     public TlsDetails getTlsDetails() {
258         final SSLIOSession sslIoSession = tlsSessionRef.get();
259         return sslIoSession != null ? sslIoSession.getTlsDetails() : null;
260     }
261 
262     @Override
263     public Lock lock() {
264         return ioSession.lock();
265     }
266 
267     private IOSession getSessionImpl() {
268         final SSLIOSession tlsSession = tlsSessionRef.get();
269         return tlsSession != null ? tlsSession : ioSession;
270     }
271 
272     @Override
273     public void close() {
274         if (closed.compareAndSet(false, true)) {
275             try {
276                 getSessionImpl().close();
277             } finally {
278                 closedSessions.add(this);
279             }
280         }
281     }
282 
283     @Override
284     public void close(final CloseMode closeMode) {
285         if (closed.compareAndSet(false, true)) {
286             try {
287                 getSessionImpl().close(closeMode);
288             } finally {
289                 closedSessions.add(this);
290             }
291         }
292     }
293 
294     @Override
295     public int getStatus() {
296         return getSessionImpl().getStatus();
297     }
298 
299     @Override
300     public boolean isClosed() {
301         return getSessionImpl().isClosed();
302     }
303 
304     @Override
305     public void enqueue(final Command command, final Command.Priority priority) {
306         getSessionImpl().enqueue(command, priority);
307     }
308 
309     @Override
310     public boolean hasCommands() {
311         return getSessionImpl().hasCommands();
312     }
313 
314     @Override
315     public Command poll() {
316         return getSessionImpl().poll();
317     }
318 
319     @Override
320     public ByteChannel channel() {
321         return getSessionImpl().channel();
322     }
323 
324     @Override
325     public SocketAddress getRemoteAddress() {
326         return ioSession.getRemoteAddress();
327     }
328 
329     @Override
330     public SocketAddress getLocalAddress() {
331         return ioSession.getLocalAddress();
332     }
333 
334     @Override
335     public int getEventMask() {
336         return getSessionImpl().getEventMask();
337     }
338 
339     @Override
340     public void setEventMask(final int ops) {
341         getSessionImpl().setEventMask(ops);
342     }
343 
344     @Override
345     public void setEvent(final int op) {
346         getSessionImpl().setEvent(op);
347     }
348 
349     @Override
350     public void clearEvent(final int op) {
351         getSessionImpl().clearEvent(op);
352     }
353 
354     @Override
355     public Timeout getSocketTimeout() {
356         return ioSession.getSocketTimeout();
357     }
358 
359     @Override
360     public void setSocketTimeout(final Timeout timeout) {
361         ioSession.setSocketTimeout(timeout);
362     }
363 
364     @Override
365     public void updateReadTime() {
366         ioSession.updateReadTime();
367     }
368 
369     @Override
370     public void updateWriteTime() {
371         ioSession.updateWriteTime();
372     }
373 
374     @Override
375     public long getLastReadTime() {
376         return ioSession.getLastReadTime();
377     }
378 
379     @Override
380     public long getLastWriteTime() {
381         return ioSession.getLastWriteTime();
382     }
383 
384     @Override
385     public String toString() {
386         final StringBuilder buf = new StringBuilder();
387         final SSLIOSession tlsSession = tlsSessionRef.get();
388         if (tlsSession != null) {
389             buf.append(tlsSession);
390         } else {
391             buf.append(ioSession);
392         }
393         final IOEventHandler handler = getHandler();
394         if (handler != null) {
395             buf.append(handler);
396         }
397         return buf.toString();
398     }
399 
400 }