View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.impl.nio.reactor;
29  
30  import java.io.IOException;
31  import java.net.ServerSocket;
32  import java.net.SocketAddress;
33  import java.nio.channels.CancelledKeyException;
34  import java.nio.channels.SelectionKey;
35  import java.nio.channels.ServerSocketChannel;
36  import java.nio.channels.SocketChannel;
37  import java.util.Collections;
38  import java.util.HashSet;
39  import java.util.Iterator;
40  import java.util.Queue;
41  import java.util.Set;
42  import java.util.concurrent.ConcurrentLinkedQueue;
43  import java.util.concurrent.ThreadFactory;
44  
45  import org.apache.http.annotation.ThreadSafe;
46  import org.apache.http.nio.reactor.IOReactorException;
47  import org.apache.http.nio.reactor.IOReactorStatus;
48  import org.apache.http.nio.reactor.ListenerEndpoint;
49  import org.apache.http.nio.reactor.ListeningIOReactor;
50  import org.apache.http.params.HttpParams;
51  import org.apache.http.util.Asserts;
52  
53  /**
54   * Default implementation of {@link ListeningIOReactor}. This class extends
55   * {@link AbstractMultiworkerIOReactor} with capability to listen for incoming
56   * connections.
57   *
58   * @since 4.0
59   */
60  @SuppressWarnings("deprecation")
61  @ThreadSafe // public methods only
62  public class DefaultListeningIOReactor extends AbstractMultiworkerIOReactor
63          implements ListeningIOReactor {
64  
65      private final Queue<ListenerEndpointImpl> requestQueue;
66      private final Set<ListenerEndpointImpl> endpoints;
67      private final Set<SocketAddress> pausedEndpoints;
68  
69      private volatile boolean paused;
70  
71      /**
72       * Creates an instance of DefaultListeningIOReactor with the given configuration.
73       *
74       * @param config I/O reactor configuration.
75       * @param threadFactory the factory to create threads.
76       *   Can be <code>null</code>.
77       * @throws IOReactorException in case if a non-recoverable I/O error.
78       *
79       * @since 4.2
80       */
81      public DefaultListeningIOReactor(
82              final IOReactorConfig config,
83              final ThreadFactory threadFactory) throws IOReactorException {
84          super(config, threadFactory);
85          this.requestQueue = new ConcurrentLinkedQueue<ListenerEndpointImpl>();
86          this.endpoints = Collections.synchronizedSet(new HashSet<ListenerEndpointImpl>());
87          this.pausedEndpoints = new HashSet<SocketAddress>();
88      }
89  
90      /**
91       * Creates an instance of DefaultListeningIOReactor with the given configuration.
92       *
93       * @param config I/O reactor configuration.
94       *   Can be <code>null</code>.
95       * @throws IOReactorException in case if a non-recoverable I/O error.
96       *
97       * @since 4.2
98       */
99      public DefaultListeningIOReactor(final IOReactorConfig config) throws IOReactorException {
100         this(config, null);
101     }
102 
103     /**
104      * Creates an instance of DefaultListeningIOReactor with default configuration.
105      *
106      * @throws IOReactorException in case if a non-recoverable I/O error.
107      *
108      * @since 4.2
109      */
110     public DefaultListeningIOReactor() throws IOReactorException {
111         this(null, null);
112     }
113 
114     /**
115      * @deprecated (4.2) use {@link DefaultListeningIOReactor#DefaultListeningIOReactor(IOReactorConfig, ThreadFactory)}
116      */
117     @Deprecated
118     public DefaultListeningIOReactor(
119             final int workerCount,
120             final ThreadFactory threadFactory,
121             final HttpParams params) throws IOReactorException {
122         this(convert(workerCount, params), threadFactory);
123     }
124 
125     /**
126      * @deprecated (4.2) use {@link DefaultListeningIOReactor#DefaultListeningIOReactor(IOReactorConfig)}
127      */
128     @Deprecated
129     public DefaultListeningIOReactor(
130             final int workerCount,
131             final HttpParams params) throws IOReactorException {
132         this(convert(workerCount, params), null);
133     }
134 
135     @Override
136     protected void cancelRequests() throws IOReactorException {
137         ListenerEndpointImpl request;
138         while ((request = this.requestQueue.poll()) != null) {
139             request.cancel();
140         }
141     }
142 
143     @Override
144     protected void processEvents(final int readyCount) throws IOReactorException {
145         if (!this.paused) {
146             processSessionRequests();
147         }
148 
149         if (readyCount > 0) {
150             final Set<SelectionKey> selectedKeys = this.selector.selectedKeys();
151             for (final SelectionKey key : selectedKeys) {
152 
153                 processEvent(key);
154 
155             }
156             selectedKeys.clear();
157         }
158     }
159 
160     private void processEvent(final SelectionKey key)
161             throws IOReactorException {
162         try {
163 
164             if (key.isAcceptable()) {
165 
166                 final ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
167                 for (;;) {
168                     SocketChannel socketChannel = null;
169                     try {
170                         socketChannel = serverChannel.accept();
171                     } catch (final IOException ex) {
172                         if (this.exceptionHandler == null ||
173                                 !this.exceptionHandler.handle(ex)) {
174                             throw new IOReactorException(
175                                     "Failure accepting connection", ex);
176                         }
177                     }
178                     if (socketChannel == null) {
179                         break;
180                     }
181                     try {
182                         prepareSocket(socketChannel.socket());
183                     } catch (final IOException ex) {
184                         if (this.exceptionHandler == null ||
185                                 !this.exceptionHandler.handle(ex)) {
186                             throw new IOReactorException(
187                                     "Failure initalizing socket", ex);
188                         }
189                     }
190                     final ChannelEntry entry = new ChannelEntry(socketChannel);
191                     addChannel(entry);
192                 }
193             }
194 
195         } catch (final CancelledKeyException ex) {
196             final ListenerEndpoint endpoint = (ListenerEndpoint) key.attachment();
197             this.endpoints.remove(endpoint);
198             key.attach(null);
199         }
200     }
201 
202     private ListenerEndpointImpl createEndpoint(final SocketAddress address) {
203         return new ListenerEndpointImpl(
204                 address,
205                 new ListenerEndpointClosedCallback() {
206 
207                     @Override
208                     public void endpointClosed(final ListenerEndpoint endpoint) {
209                         endpoints.remove(endpoint);
210                     }
211 
212                 });
213     }
214 
215     @Override
216     public ListenerEndpoint listen(final SocketAddress address) {
217         Asserts.check(this.status.compareTo(IOReactorStatus.ACTIVE) <= 0,
218                 "I/O reactor has been shut down");
219         final ListenerEndpointImpl request = createEndpoint(address);
220         this.requestQueue.add(request);
221         this.selector.wakeup();
222         return request;
223     }
224 
225     private void processSessionRequests() throws IOReactorException {
226         ListenerEndpointImpl request;
227         while ((request = this.requestQueue.poll()) != null) {
228             final SocketAddress address = request.getAddress();
229             final ServerSocketChannel serverChannel;
230             try {
231                 serverChannel = ServerSocketChannel.open();
232             } catch (final IOException ex) {
233                 throw new IOReactorException("Failure opening server socket", ex);
234             }
235             try {
236                 final ServerSocket socket = serverChannel.socket();
237                 socket.setReuseAddress(this.config.isSoReuseAddress());
238                 if (this.config.getSoTimeout() > 0) {
239                     socket.setSoTimeout(this.config.getSoTimeout());
240                 }
241                 if (this.config.getRcvBufSize() > 0) {
242                     socket.setReceiveBufferSize(this.config.getRcvBufSize());
243                 }
244                 serverChannel.configureBlocking(false);
245                 socket.bind(address, this.config.getBacklogSize());
246             } catch (final IOException ex) {
247                 closeChannel(serverChannel);
248                 request.failed(ex);
249                 if (this.exceptionHandler == null || !this.exceptionHandler.handle(ex)) {
250                     throw new IOReactorException("Failure binding socket to address "
251                             + address, ex);
252                 } else {
253                     return;
254                 }
255             }
256             try {
257                 final SelectionKey key = serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);
258                 key.attach(request);
259                 request.setKey(key);
260             } catch (final IOException ex) {
261                 closeChannel(serverChannel);
262                 throw new IOReactorException("Failure registering channel " +
263                         "with the selector", ex);
264             }
265 
266             this.endpoints.add(request);
267             request.completed(serverChannel.socket().getLocalSocketAddress());
268         }
269     }
270 
271     @Override
272     public Set<ListenerEndpoint> getEndpoints() {
273         final Set<ListenerEndpoint> set = new HashSet<ListenerEndpoint>();
274         synchronized (this.endpoints) {
275             final Iterator<ListenerEndpointImpl> it = this.endpoints.iterator();
276             while (it.hasNext()) {
277                 final ListenerEndpoint endpoint = it.next();
278                 if (!endpoint.isClosed()) {
279                     set.add(endpoint);
280                 } else {
281                     it.remove();
282                 }
283             }
284         }
285         return set;
286     }
287 
288     @Override
289     public void pause() throws IOException {
290         if (this.paused) {
291             return;
292         }
293         this.paused = true;
294         synchronized (this.endpoints) {
295             for (final ListenerEndpointImpl endpoint : this.endpoints) {
296                 if (!endpoint.isClosed()) {
297                     endpoint.close();
298                     this.pausedEndpoints.add(endpoint.getAddress());
299                 }
300             }
301             this.endpoints.clear();
302         }
303     }
304 
305     @Override
306     public void resume() throws IOException {
307         if (!this.paused) {
308             return;
309         }
310         this.paused = false;
311         for (final SocketAddress address: this.pausedEndpoints) {
312             final ListenerEndpointImpl request = createEndpoint(address);
313             this.requestQueue.add(request);
314         }
315         this.pausedEndpoints.clear();
316         this.selector.wakeup();
317     }
318 
319 }