View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.reactor;
29  
30  import java.io.IOException;
31  import java.net.SocketAddress;
32  import java.nio.channels.ByteChannel;
33  import java.nio.channels.SelectionKey;
34  import java.util.Queue;
35  import java.util.concurrent.atomic.AtomicBoolean;
36  import java.util.concurrent.atomic.AtomicReference;
37  import java.util.concurrent.locks.Lock;
38  
39  import javax.net.ssl.SSLContext;
40  
41  import org.apache.hc.core5.function.Callback;
42  import org.apache.hc.core5.io.CloseMode;
43  import org.apache.hc.core5.net.NamedEndpoint;
44  import org.apache.hc.core5.reactor.ssl.SSLBufferMode;
45  import org.apache.hc.core5.reactor.ssl.SSLIOSession;
46  import org.apache.hc.core5.reactor.ssl.SSLMode;
47  import org.apache.hc.core5.reactor.ssl.SSLSessionInitializer;
48  import org.apache.hc.core5.reactor.ssl.SSLSessionVerifier;
49  import org.apache.hc.core5.reactor.ssl.TlsDetails;
50  import org.apache.hc.core5.util.Asserts;
51  
52  final class InternalDataChannel extends InternalChannel implements ProtocolIOSession {
53  
54      private final IOSession ioSession;
55      private final NamedEndpoint initialEndpoint;
56      private final IOSessionListener sessionListener;
57      private final AtomicReference<SSLIOSession> tlsSessionRef;
58      private final Queue<InternalDataChannel> closedSessions;
59      private final AtomicReference<IOEventHandler> handlerRef;
60      private final AtomicBoolean connected;
61      private final AtomicBoolean closed;
62  
63      InternalDataChannel(
64              final IOSession ioSession,
65              final NamedEndpoint initialEndpoint,
66              final IOSessionListener sessionListener,
67              final Queue<InternalDataChannel> closedSessions) {
68          this.ioSession = ioSession;
69          this.initialEndpoint = initialEndpoint;
70          this.closedSessions = closedSessions;
71          this.sessionListener = sessionListener;
72          this.tlsSessionRef = new AtomicReference<>(null);
73          this.handlerRef = new AtomicReference<>(null);
74          this.connected = new AtomicBoolean(false);
75          this.closed = new AtomicBoolean(false);
76      }
77  
78      @Override
79      public String getId() {
80          return ioSession.getId();
81      }
82  
83  
84      @Override
85      public IOEventHandler getHandler() {
86          return handlerRef.get();
87      }
88  
89      @Override
90      public void upgrade(final IOEventHandler handler) {
91          handlerRef.set(handler);
92      }
93  
94      private IOEventHandler ensureHandler() {
95          final IOEventHandler handler = handlerRef.get();
96          Asserts.notNull(handler, "IO event handler");
97          return handler;
98      }
99  
100     @Override
101     void onIOEvent(final int readyOps) throws IOException {
102         final SSLIOSession tlsSession = tlsSessionRef.get();
103         if (tlsSession != null) {
104             if (!tlsSession.isInitialized()) {
105                 tlsSession.initialize();
106                 if (sessionListener != null) {
107                     sessionListener.tlsStarted(tlsSession);
108                 }
109             }
110             if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
111                 ioSession.clearEvent(SelectionKey.OP_CONNECT);
112             }
113             if ((readyOps & SelectionKey.OP_READ) != 0) {
114                 ioSession.updateReadTime();
115                 do {
116                     tlsSession.resetReadCount();
117                     if (tlsSession.isAppInputReady()) {
118                         if (sessionListener != null) {
119                             sessionListener.inputReady(this);
120                         }
121                         final IOEventHandler handler = ensureHandler();
122                         handler.inputReady(this);
123                     }
124                     tlsSession.inboundTransport();
125                     if (sessionListener != null) {
126                         sessionListener.tlsInbound(tlsSession);
127                     }
128                 } while (tlsSession.getReadCount() > 0);
129             }
130             if ((readyOps & SelectionKey.OP_WRITE) != 0) {
131                 ioSession.updateWriteTime();
132                 if (tlsSession.isAppOutputReady()) {
133                     if (sessionListener != null) {
134                         sessionListener.outputReady(this);
135                     }
136                     final IOEventHandler handler = ensureHandler();
137                     handler.outputReady(this);
138                 }
139                 tlsSession.outboundTransport();
140                 if (sessionListener != null) {
141                     sessionListener.tlsOutbound(tlsSession);
142                 }
143             }
144         } else {
145             if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
146                 ioSession.clearEvent(SelectionKey.OP_CONNECT);
147                 if (connected.compareAndSet(false, true)) {
148                     if (sessionListener != null) {
149                         sessionListener.connected(this);
150                     }
151                     final IOEventHandler handler = ensureHandler();
152                     handler.connected(this);
153                 }
154             }
155             if ((readyOps & SelectionKey.OP_READ) != 0) {
156                 ioSession.updateReadTime();
157                 if (sessionListener != null) {
158                     sessionListener.inputReady(this);
159                 }
160                 final IOEventHandler handler = ensureHandler();
161                 handler.inputReady(this);
162             }
163             if ((readyOps & SelectionKey.OP_WRITE) != 0) {
164                 ioSession.updateWriteTime();
165                 if (sessionListener != null) {
166                     sessionListener.outputReady(this);
167                 }
168                 final IOEventHandler handler = ensureHandler();
169                 handler.outputReady(this);
170             }
171         }
172     }
173 
174     @Override
175     int getTimeoutMillis() {
176         return ioSession.getSocketTimeoutMillis();
177     }
178 
179     @Override
180     void onTimeout(final int timeoutMillis) throws IOException {
181         final IOEventHandler handler = ensureHandler();
182         handler.timeout(this, timeoutMillis);
183         final SSLIOSession tlsSession = tlsSessionRef.get();
184         if (tlsSession != null) {
185             if (tlsSession.isOutboundDone() && !tlsSession.isInboundDone()) {
186                 // The session failed to terminate cleanly
187                 tlsSession.close(CloseMode.IMMEDIATE);
188             }
189         }
190     }
191 
192     @Override
193     void onException(final Exception cause) {
194         final IOEventHandler handler = ensureHandler();
195         if (sessionListener != null) {
196             sessionListener.exception(this, cause);
197         }
198         handler.exception(this, cause);
199     }
200 
201     void disconnected() {
202         if (sessionListener != null) {
203             sessionListener.disconnected(this);
204         }
205         final IOEventHandler handler = ensureHandler();
206         handler.disconnected(this);
207     }
208 
209     @Override
210     public void startTls(
211             final SSLContext sslContext,
212             final NamedEndpoint endpoint,
213             final SSLBufferMode sslBufferMode,
214             final SSLSessionInitializer initializer,
215             final SSLSessionVerifier verifier) {
216         if (!tlsSessionRef.compareAndSet(null, new SSLIOSession(
217                 endpoint != null ? endpoint : initialEndpoint,
218                 ioSession,
219                 initialEndpoint != null ? SSLMode.CLIENT : SSLMode.SERVER,
220                 sslContext,
221                 sslBufferMode,
222                 initializer,
223                 verifier,
224                 new Callback<SSLIOSession>() {
225 
226                     @Override
227                     public void execute(final SSLIOSession sslSession) {
228                         if (connected.compareAndSet(false, true)) {
229                             final IOEventHandler handler = ensureHandler();
230                             try {
231                                 if (sessionListener != null) {
232                                     sessionListener.connected(InternalDataChannel.this);
233                                 }
234                                 handler.connected(InternalDataChannel.this);
235                             } catch (final Exception ex) {
236                                 if (sessionListener != null) {
237                                     sessionListener.exception(InternalDataChannel.this, ex);
238                                 }
239                                 handler.exception(InternalDataChannel.this, ex);
240                             }
241                         }
242                     }
243 
244                 }))) {
245             throw new IllegalStateException("TLS already activated");
246         }
247     }
248 
249     @Override
250     public TlsDetails getTlsDetails() {
251         final SSLIOSession sslIoSession = tlsSessionRef.get();
252         return sslIoSession != null ? sslIoSession.getTlsDetails() : null;
253     }
254 
255     @Override
256     public Lock lock() {
257         return ioSession.lock();
258     }
259 
260     private IOSession getSessionImpl() {
261         final SSLIOSession tlsSession = tlsSessionRef.get();
262         return tlsSession != null ? tlsSession : ioSession;
263     }
264 
265     @Override
266     public void close() {
267         if (closed.compareAndSet(false, true)) {
268             try {
269                 getSessionImpl().close();
270             } finally {
271                 closedSessions.add(this);
272             }
273         }
274     }
275 
276     @Override
277     public void close(final CloseMode closeMode) {
278         if (closed.compareAndSet(false, true)) {
279             try {
280                 getSessionImpl().close(closeMode);
281             } finally {
282                 closedSessions.add(this);
283             }
284         }
285     }
286 
287     @Override
288     public int getStatus() {
289         return getSessionImpl().getStatus();
290     }
291 
292     @Override
293     public boolean isClosed() {
294         return getSessionImpl().isClosed();
295     }
296 
297     @Override
298     public void enqueue(final Command command, final Command.Priority priority) {
299         getSessionImpl().enqueue(command, priority);
300     }
301 
302     @Override
303     public boolean hasCommands() {
304         return getSessionImpl().hasCommands();
305     }
306 
307     @Override
308     public Command poll() {
309         return getSessionImpl().poll();
310     }
311 
312     @Override
313     public ByteChannel channel() {
314         return getSessionImpl().channel();
315     }
316 
317     @Override
318     public SocketAddress getRemoteAddress() {
319         return ioSession.getRemoteAddress();
320     }
321 
322     @Override
323     public SocketAddress getLocalAddress() {
324         return ioSession.getLocalAddress();
325     }
326 
327     @Override
328     public int getEventMask() {
329         return getSessionImpl().getEventMask();
330     }
331 
332     @Override
333     public void setEventMask(final int ops) {
334         getSessionImpl().setEventMask(ops);
335     }
336 
337     @Override
338     public void setEvent(final int op) {
339         getSessionImpl().setEvent(op);
340     }
341 
342     @Override
343     public void clearEvent(final int op) {
344         getSessionImpl().clearEvent(op);
345     }
346 
347     @Override
348     public int getSocketTimeoutMillis() {
349         return ioSession.getSocketTimeoutMillis();
350     }
351 
352     @Override
353     public void setSocketTimeoutMillis(final int timeout) {
354         ioSession.setSocketTimeoutMillis(timeout);
355     }
356 
357     @Override
358     public void updateReadTime() {
359         ioSession.updateReadTime();
360     }
361 
362     @Override
363     public void updateWriteTime() {
364         ioSession.updateWriteTime();
365     }
366 
367     @Override
368     public long getLastReadTimeMillis() {
369         return ioSession.getLastReadTimeMillis();
370     }
371 
372     @Override
373     public long getLastWriteTime() {
374         return ioSession.getLastWriteTime();
375     }
376 
377     @Override
378     public String toString() {
379         return getSessionImpl().toString();
380     }
381 
382 }