View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.impl.nio.reactor;
29  
30  import java.io.IOException;
31  import java.net.InetSocketAddress;
32  import java.net.Socket;
33  import java.net.SocketAddress;
34  import java.net.UnknownHostException;
35  import java.nio.channels.CancelledKeyException;
36  import java.nio.channels.SelectionKey;
37  import java.nio.channels.SocketChannel;
38  import java.util.Iterator;
39  import java.util.Queue;
40  import java.util.Set;
41  import java.util.concurrent.ConcurrentLinkedQueue;
42  import java.util.concurrent.ThreadFactory;
43  
44  import org.apache.http.annotation.ThreadSafe;
45  import org.apache.http.nio.reactor.ConnectingIOReactor;
46  import org.apache.http.nio.reactor.IOReactorException;
47  import org.apache.http.nio.reactor.IOReactorStatus;
48  import org.apache.http.nio.reactor.SessionRequest;
49  import org.apache.http.nio.reactor.SessionRequestCallback;
50  import org.apache.http.params.HttpParams;
51  
52  /**
53   * Default implementation of {@link ConnectingIOReactor}. This class extends
54   * {@link AbstractMultiworkerIOReactor} with capability to connect to remote
55   * hosts.
56   *
57   * @since 4.0
58   */
59  @ThreadSafe // public methods only
60  public class DefaultConnectingIOReactor extends AbstractMultiworkerIOReactor
61          implements ConnectingIOReactor {
62  
63      private final Queue<SessionRequestImpl> requestQueue;
64  
65      private long lastTimeoutCheck;
66  
67      /**
68       * Creates an instance of DefaultConnectingIOReactor with the given configuration.
69       *
70       * @param config I/O reactor configuration.
71       * @param threadFactory the factory to create threads.
72       *   Can be <code>null</code>.
73       * @throws IOReactorException in case if a non-recoverable I/O error.
74       *
75       * @since 4.2
76       */
77      public DefaultConnectingIOReactor(
78              final IOReactorConfig config,
79              final ThreadFactory threadFactory) throws IOReactorException {
80          super(config, threadFactory);
81          this.requestQueue = new ConcurrentLinkedQueue<SessionRequestImpl>();
82          this.lastTimeoutCheck = System.currentTimeMillis();
83      }
84  
85      /**
86       * Creates an instance of DefaultConnectingIOReactor with the given configuration.
87       *
88       * @param config I/O reactor configuration.
89       *   Can be <code>null</code>.
90       * @throws IOReactorException in case if a non-recoverable I/O error.
91       *
92       * @since 4.2
93       */
94      public DefaultConnectingIOReactor(final IOReactorConfig config) throws IOReactorException {
95          this(config, null);
96      }
97  
98      /**
99       * Creates an instance of DefaultConnectingIOReactor with default configuration.
100      *
101      * @throws IOReactorException in case if a non-recoverable I/O error.
102      *
103      * @since 4.2
104      */
105     public DefaultConnectingIOReactor() throws IOReactorException {
106         this(null, null);
107     }
108 
109     /**
110      * @deprecated (4.2) use {@link DefaultConnectingIOReactor#DefaultConnectingIOReactor(IOReactorConfig, ThreadFactory)}
111      */
112     @Deprecated
113     public DefaultConnectingIOReactor(
114             int workerCount,
115             final ThreadFactory threadFactory,
116             final HttpParams params) throws IOReactorException {
117         this(convert(workerCount, params), threadFactory);
118     }
119 
120     /**
121      * @deprecated (4.2) use {@link DefaultConnectingIOReactor#DefaultConnectingIOReactor(IOReactorConfig)}
122      */
123     @Deprecated
124     public DefaultConnectingIOReactor(
125             int workerCount,
126             final HttpParams params) throws IOReactorException {
127         this(convert(workerCount, params), null);
128     }
129 
130     @Override
131     protected void cancelRequests() throws IOReactorException {
132         SessionRequestImpl request;
133         while ((request = this.requestQueue.poll()) != null) {
134             request.cancel();
135         }
136     }
137 
138     @Override
139     protected void processEvents(int readyCount) throws IOReactorException {
140         processSessionRequests();
141 
142         if (readyCount > 0) {
143             Set<SelectionKey> selectedKeys = this.selector.selectedKeys();
144             for (Iterator<SelectionKey> it = selectedKeys.iterator(); it.hasNext(); ) {
145 
146                 SelectionKey key = it.next();
147                 processEvent(key);
148 
149             }
150             selectedKeys.clear();
151         }
152 
153         long currentTime = System.currentTimeMillis();
154         if ((currentTime - this.lastTimeoutCheck) >= this.selectTimeout) {
155             this.lastTimeoutCheck = currentTime;
156             Set<SelectionKey> keys = this.selector.keys();
157             processTimeouts(keys);
158         }
159     }
160 
161     private void processEvent(final SelectionKey key) {
162         try {
163 
164             if (key.isConnectable()) {
165 
166                 SocketChannel channel = (SocketChannel) key.channel();
167                 // Get request handle
168                 SessionRequestHandle requestHandle = (SessionRequestHandle) key.attachment();
169                 SessionRequestImpl sessionRequest = requestHandle.getSessionRequest();
170 
171                 // Finish connection process
172                 try {
173                     channel.finishConnect();
174                 } catch (IOException ex) {
175                     sessionRequest.failed(ex);
176                 }
177                 key.cancel();
178                 if (channel.isConnected()) {
179                     try {
180                         try {
181                             prepareSocket(channel.socket());
182                         } catch (IOException ex) {
183                             if (this.exceptionHandler == null
184                                     || !this.exceptionHandler.handle(ex)) {
185                                 throw new IOReactorException(
186                                         "Failure initalizing socket", ex);
187                             }
188                         }
189                         ChannelEntry entry = new ChannelEntry(channel, sessionRequest);
190                         addChannel(entry);
191                     } catch (IOException ex) {
192                         sessionRequest.failed(ex);
193                     }
194                 }
195             }
196 
197         } catch (CancelledKeyException ex) {
198             key.attach(null);
199         }
200     }
201 
202     private void processTimeouts(final Set<SelectionKey> keys) {
203         long now = System.currentTimeMillis();
204         for (Iterator<SelectionKey> it = keys.iterator(); it.hasNext();) {
205             SelectionKey key = it.next();
206             Object attachment = key.attachment();
207 
208             if (attachment instanceof SessionRequestHandle) {
209                 SessionRequestHandle handle = (SessionRequestHandle) key.attachment();
210                 SessionRequestImpl sessionRequest = handle.getSessionRequest();
211                 int timeout = sessionRequest.getConnectTimeout();
212                 if (timeout > 0) {
213                     if (handle.getRequestTime() + timeout < now) {
214                         sessionRequest.timeout();
215                     }
216                 }
217             }
218 
219         }
220     }
221 
222     public SessionRequest connect(
223             final SocketAddress remoteAddress,
224             final SocketAddress localAddress,
225             final Object attachment,
226             final SessionRequestCallback callback) {
227 
228         if (this.status.compareTo(IOReactorStatus.ACTIVE) > 0) {
229             throw new IllegalStateException("I/O reactor has been shut down");
230         }
231         SessionRequestImpl sessionRequest = new SessionRequestImpl(
232                 remoteAddress, localAddress, attachment, callback);
233         sessionRequest.setConnectTimeout(this.config.getConnectTimeout());
234 
235         this.requestQueue.add(sessionRequest);
236         this.selector.wakeup();
237 
238         return sessionRequest;
239     }
240 
241     private void validateAddress(final SocketAddress address) throws UnknownHostException {
242         if (address == null) {
243             return;
244         }
245         if (address instanceof InetSocketAddress) {
246             InetSocketAddress endpoint = (InetSocketAddress) address;
247             if (endpoint.isUnresolved()) {
248                 throw new UnknownHostException(endpoint.getHostName());
249             }
250         }
251     }
252 
253     private void processSessionRequests() throws IOReactorException {
254         SessionRequestImpl request;
255         while ((request = this.requestQueue.poll()) != null) {
256             if (request.isCompleted()) {
257                 continue;
258             }
259             SocketChannel socketChannel;
260             try {
261                 socketChannel = SocketChannel.open();
262             } catch (IOException ex) {
263                 throw new IOReactorException("Failure opening socket", ex);
264             }
265             try {
266                 socketChannel.configureBlocking(false);
267                 validateAddress(request.getLocalAddress());
268                 validateAddress(request.getRemoteAddress());
269 
270                 if (request.getLocalAddress() != null) {
271                     Socket sock = socketChannel.socket();
272                     sock.setReuseAddress(this.config.isSoReuseAddress());
273                     sock.bind(request.getLocalAddress());
274                 }
275                 boolean connected = socketChannel.connect(request.getRemoteAddress());
276                 if (connected) {
277                     prepareSocket(socketChannel.socket());
278                     ChannelEntry entry = new ChannelEntry(socketChannel, request);
279                     addChannel(entry);
280                     return;
281                 }
282             } catch (IOException ex) {
283                 closeChannel(socketChannel);
284                 request.failed(ex);
285                 return;
286             }
287 
288             SessionRequestHandle requestHandle = new SessionRequestHandle(request);
289             try {
290                 SelectionKey key = socketChannel.register(this.selector, SelectionKey.OP_CONNECT,
291                         requestHandle);
292                 request.setKey(key);
293             } catch (IOException ex) {
294                 closeChannel(socketChannel);
295                 throw new IOReactorException("Failure registering channel " +
296                         "with the selector", ex);
297             }
298         }
299     }
300 
301 }