View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.impl.nio.reactor;
29  
30  import java.io.IOException;
31  import java.net.InetSocketAddress;
32  import java.net.Socket;
33  import java.net.SocketAddress;
34  import java.net.UnknownHostException;
35  import java.nio.channels.CancelledKeyException;
36  import java.nio.channels.SelectionKey;
37  import java.nio.channels.SocketChannel;
38  import java.util.Queue;
39  import java.util.Set;
40  import java.util.concurrent.ConcurrentLinkedQueue;
41  import java.util.concurrent.ThreadFactory;
42  
43  import org.apache.http.annotation.ThreadSafe;
44  import org.apache.http.nio.reactor.ConnectingIOReactor;
45  import org.apache.http.nio.reactor.IOReactorException;
46  import org.apache.http.nio.reactor.IOReactorStatus;
47  import org.apache.http.nio.reactor.SessionRequest;
48  import org.apache.http.nio.reactor.SessionRequestCallback;
49  import org.apache.http.params.HttpParams;
50  import org.apache.http.util.Asserts;
51  
52  /**
53   * Default implementation of {@link ConnectingIOReactor}. This class extends
54   * {@link AbstractMultiworkerIOReactor} with capability to connect to remote
55   * hosts.
56   *
57   * @since 4.0
58   */
59  @SuppressWarnings("deprecation")
60  @ThreadSafe // public methods only
61  public class DefaultConnectingIOReactor extends AbstractMultiworkerIOReactor
62          implements ConnectingIOReactor {
63  
64      private final Queue<SessionRequestImpl> requestQueue;
65  
66      private long lastTimeoutCheck;
67  
68      /**
69       * Creates an instance of DefaultConnectingIOReactor with the given configuration.
70       *
71       * @param config I/O reactor configuration.
72       * @param threadFactory the factory to create threads.
73       *   Can be <code>null</code>.
74       * @throws IOReactorException in case if a non-recoverable I/O error.
75       *
76       * @since 4.2
77       */
78      public DefaultConnectingIOReactor(
79              final IOReactorConfig config,
80              final ThreadFactory threadFactory) throws IOReactorException {
81          super(config, threadFactory);
82          this.requestQueue = new ConcurrentLinkedQueue<SessionRequestImpl>();
83          this.lastTimeoutCheck = System.currentTimeMillis();
84      }
85  
86      /**
87       * Creates an instance of DefaultConnectingIOReactor with the given configuration.
88       *
89       * @param config I/O reactor configuration.
90       *   Can be <code>null</code>.
91       * @throws IOReactorException in case if a non-recoverable I/O error.
92       *
93       * @since 4.2
94       */
95      public DefaultConnectingIOReactor(final IOReactorConfig config) throws IOReactorException {
96          this(config, null);
97      }
98  
99      /**
100      * Creates an instance of DefaultConnectingIOReactor with default configuration.
101      *
102      * @throws IOReactorException in case if a non-recoverable I/O error.
103      *
104      * @since 4.2
105      */
106     public DefaultConnectingIOReactor() throws IOReactorException {
107         this(null, null);
108     }
109 
110     /**
111      * @deprecated (4.2) use {@link DefaultConnectingIOReactor#DefaultConnectingIOReactor(IOReactorConfig, ThreadFactory)}
112      */
113     @Deprecated
114     public DefaultConnectingIOReactor(
115             final int workerCount,
116             final ThreadFactory threadFactory,
117             final HttpParams params) throws IOReactorException {
118         this(convert(workerCount, params), threadFactory);
119     }
120 
121     /**
122      * @deprecated (4.2) use {@link DefaultConnectingIOReactor#DefaultConnectingIOReactor(IOReactorConfig)}
123      */
124     @Deprecated
125     public DefaultConnectingIOReactor(
126             final int workerCount,
127             final HttpParams params) throws IOReactorException {
128         this(convert(workerCount, params), null);
129     }
130 
131     @Override
132     protected void cancelRequests() throws IOReactorException {
133         SessionRequestImpl request;
134         while ((request = this.requestQueue.poll()) != null) {
135             request.cancel();
136         }
137     }
138 
139     @Override
140     protected void processEvents(final int readyCount) throws IOReactorException {
141         processSessionRequests();
142 
143         if (readyCount > 0) {
144             final Set<SelectionKey> selectedKeys = this.selector.selectedKeys();
145             for (final SelectionKey key : selectedKeys) {
146 
147                 processEvent(key);
148 
149             }
150             selectedKeys.clear();
151         }
152 
153         final long currentTime = System.currentTimeMillis();
154         if ((currentTime - this.lastTimeoutCheck) >= this.selectTimeout) {
155             this.lastTimeoutCheck = currentTime;
156             final Set<SelectionKey> keys = this.selector.keys();
157             processTimeouts(keys);
158         }
159     }
160 
161     private void processEvent(final SelectionKey key) {
162         try {
163 
164             if (key.isConnectable()) {
165 
166                 final SocketChannel channel = (SocketChannel) key.channel();
167                 // Get request handle
168                 final SessionRequestHandle requestHandle = (SessionRequestHandle) key.attachment();
169                 final SessionRequestImpl sessionRequest = requestHandle.getSessionRequest();
170 
171                 // Finish connection process
172                 try {
173                     channel.finishConnect();
174                 } catch (final IOException ex) {
175                     sessionRequest.failed(ex);
176                 }
177                 key.cancel();
178                 key.attach(null);
179                 if (!sessionRequest.isCompleted()) {
180                     addChannel(new ChannelEntry(channel, sessionRequest));
181                 } else {
182                     try {
183                         channel.close();
184                     } catch (IOException ignore) {
185                     }
186                 }
187             }
188 
189         } catch (final CancelledKeyException ex) {
190             final SessionRequestHandle requestHandle = (SessionRequestHandle) key.attachment();
191             key.attach(null);
192             if (requestHandle != null) {
193                 final SessionRequestImpl sessionRequest = requestHandle.getSessionRequest();
194                 if (sessionRequest != null) {
195                     sessionRequest.cancel();
196                 }
197             }
198         }
199     }
200 
201     private void processTimeouts(final Set<SelectionKey> keys) {
202         final long now = System.currentTimeMillis();
203         for (final SelectionKey key : keys) {
204             final Object attachment = key.attachment();
205 
206             if (attachment instanceof SessionRequestHandle) {
207                 final SessionRequestHandle handle = (SessionRequestHandle) key.attachment();
208                 final SessionRequestImpl sessionRequest = handle.getSessionRequest();
209                 final int timeout = sessionRequest.getConnectTimeout();
210                 if (timeout > 0) {
211                     if (handle.getRequestTime() + timeout < now) {
212                         sessionRequest.timeout();
213                     }
214                 }
215             }
216 
217         }
218     }
219 
220     public SessionRequest connect(
221             final SocketAddress remoteAddress,
222             final SocketAddress localAddress,
223             final Object attachment,
224             final SessionRequestCallback callback) {
225         Asserts.check(this.status.compareTo(IOReactorStatus.ACTIVE) <= 0,
226             "I/O reactor has been shut down");
227         final SessionRequestImpl sessionRequest = new SessionRequestImpl(
228                 remoteAddress, localAddress, attachment, callback);
229         sessionRequest.setConnectTimeout(this.config.getConnectTimeout());
230 
231         this.requestQueue.add(sessionRequest);
232         this.selector.wakeup();
233 
234         return sessionRequest;
235     }
236 
237     private void validateAddress(final SocketAddress address) throws UnknownHostException {
238         if (address == null) {
239             return;
240         }
241         if (address instanceof InetSocketAddress) {
242             final InetSocketAddress endpoint = (InetSocketAddress) address;
243             if (endpoint.isUnresolved()) {
244                 throw new UnknownHostException(endpoint.getHostName());
245             }
246         }
247     }
248 
249     private void processSessionRequests() throws IOReactorException {
250         SessionRequestImpl request;
251         while ((request = this.requestQueue.poll()) != null) {
252             if (request.isCompleted()) {
253                 continue;
254             }
255             final SocketChannel socketChannel;
256             try {
257                 socketChannel = SocketChannel.open();
258             } catch (final IOException ex) {
259                 throw new IOReactorException("Failure opening socket", ex);
260             }
261             try {
262                 socketChannel.configureBlocking(false);
263                 validateAddress(request.getLocalAddress());
264                 validateAddress(request.getRemoteAddress());
265 
266                 if (request.getLocalAddress() != null) {
267                     final Socket sock = socketChannel.socket();
268                     sock.setReuseAddress(this.config.isSoReuseAddress());
269                     sock.bind(request.getLocalAddress());
270                 }
271                 prepareSocket(socketChannel.socket());
272                 final boolean connected = socketChannel.connect(request.getRemoteAddress());
273                 if (connected) {
274                     final ChannelEntry entry = new ChannelEntry(socketChannel, request);
275                     addChannel(entry);
276                     continue;
277                 }
278             } catch (final IOException ex) {
279                 closeChannel(socketChannel);
280                 request.failed(ex);
281                 return;
282             }
283 
284             final SessionRequestHandle requestHandle = new SessionRequestHandle(request);
285             try {
286                 final SelectionKey key = socketChannel.register(this.selector, SelectionKey.OP_CONNECT,
287                         requestHandle);
288                 request.setKey(key);
289             } catch (final IOException ex) {
290                 closeChannel(socketChannel);
291                 throw new IOReactorException("Failure registering channel " +
292                         "with the selector", ex);
293             }
294         }
295     }
296 
297 }