View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.impl.nio.reactor;
29  
30  import java.io.IOException;
31  import java.net.ServerSocket;
32  import java.net.SocketAddress;
33  import java.nio.channels.CancelledKeyException;
34  import java.nio.channels.SelectionKey;
35  import java.nio.channels.ServerSocketChannel;
36  import java.nio.channels.SocketChannel;
37  import java.util.Collections;
38  import java.util.HashSet;
39  import java.util.Iterator;
40  import java.util.Queue;
41  import java.util.Set;
42  import java.util.concurrent.ConcurrentLinkedQueue;
43  import java.util.concurrent.ThreadFactory;
44  
45  import org.apache.http.annotation.ThreadSafe;
46  import org.apache.http.nio.reactor.IOReactorException;
47  import org.apache.http.nio.reactor.IOReactorStatus;
48  import org.apache.http.nio.reactor.ListenerEndpoint;
49  import org.apache.http.nio.reactor.ListeningIOReactor;
50  import org.apache.http.params.HttpParams;
51  import org.apache.http.util.Asserts;
52  
53  /**
54   * Default implementation of {@link ListeningIOReactor}. This class extends
55   * {@link AbstractMultiworkerIOReactor} with capability to listen for incoming
56   * connections.
57   *
58   * @since 4.0
59   */
60  @SuppressWarnings("deprecation")
61  @ThreadSafe // public methods only
62  public class DefaultListeningIOReactor extends AbstractMultiworkerIOReactor
63          implements ListeningIOReactor {
64  
65      private final Queue<ListenerEndpointImpl> requestQueue;
66      private final Set<ListenerEndpointImpl> endpoints;
67      private final Set<SocketAddress> pausedEndpoints;
68  
69      private volatile boolean paused;
70  
71      /**
72       * Creates an instance of DefaultListeningIOReactor with the given configuration.
73       *
74       * @param config I/O reactor configuration.
75       * @param threadFactory the factory to create threads.
76       *   Can be <code>null</code>.
77       * @throws IOReactorException in case if a non-recoverable I/O error.
78       *
79       * @since 4.2
80       */
81      public DefaultListeningIOReactor(
82              final IOReactorConfig config,
83              final ThreadFactory threadFactory) throws IOReactorException {
84          super(config, threadFactory);
85          this.requestQueue = new ConcurrentLinkedQueue<ListenerEndpointImpl>();
86          this.endpoints = Collections.synchronizedSet(new HashSet<ListenerEndpointImpl>());
87          this.pausedEndpoints = new HashSet<SocketAddress>();
88      }
89  
90      /**
91       * Creates an instance of DefaultListeningIOReactor with the given configuration.
92       *
93       * @param config I/O reactor configuration.
94       *   Can be <code>null</code>.
95       * @throws IOReactorException in case if a non-recoverable I/O error.
96       *
97       * @since 4.2
98       */
99      public DefaultListeningIOReactor(final IOReactorConfig config) throws IOReactorException {
100         this(config, null);
101     }
102 
103     /**
104      * Creates an instance of DefaultListeningIOReactor with default configuration.
105      *
106      * @throws IOReactorException in case if a non-recoverable I/O error.
107      *
108      * @since 4.2
109      */
110     public DefaultListeningIOReactor() throws IOReactorException {
111         this(null, null);
112     }
113 
114     /**
115      * @deprecated (4.2) use {@link DefaultListeningIOReactor#DefaultListeningIOReactor(IOReactorConfig, ThreadFactory)}
116      */
117     @Deprecated
118     public DefaultListeningIOReactor(
119             final int workerCount,
120             final ThreadFactory threadFactory,
121             final HttpParams params) throws IOReactorException {
122         this(convert(workerCount, params), threadFactory);
123     }
124 
125     /**
126      * @deprecated (4.2) use {@link DefaultListeningIOReactor#DefaultListeningIOReactor(IOReactorConfig)}
127      */
128     @Deprecated
129     public DefaultListeningIOReactor(
130             final int workerCount,
131             final HttpParams params) throws IOReactorException {
132         this(convert(workerCount, params), null);
133     }
134 
135     @Override
136     protected void cancelRequests() throws IOReactorException {
137         ListenerEndpointImpl request;
138         while ((request = this.requestQueue.poll()) != null) {
139             request.cancel();
140         }
141     }
142 
143     @Override
144     protected void processEvents(final int readyCount) throws IOReactorException {
145         if (!this.paused) {
146             processSessionRequests();
147         }
148 
149         if (readyCount > 0) {
150             final Set<SelectionKey> selectedKeys = this.selector.selectedKeys();
151             for (final SelectionKey key : selectedKeys) {
152 
153                 processEvent(key);
154 
155             }
156             selectedKeys.clear();
157         }
158     }
159 
160     private void processEvent(final SelectionKey key)
161             throws IOReactorException {
162         try {
163 
164             if (key.isAcceptable()) {
165 
166                 final ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
167                 for (;;) {
168                     SocketChannel socketChannel = null;
169                     try {
170                         socketChannel = serverChannel.accept();
171                     } catch (final IOException ex) {
172                         if (this.exceptionHandler == null ||
173                                 !this.exceptionHandler.handle(ex)) {
174                             throw new IOReactorException(
175                                     "Failure accepting connection", ex);
176                         }
177                     }
178                     if (socketChannel == null) {
179                         break;
180                     }
181                     try {
182                         prepareSocket(socketChannel.socket());
183                     } catch (final IOException ex) {
184                         if (this.exceptionHandler == null ||
185                                 !this.exceptionHandler.handle(ex)) {
186                             throw new IOReactorException(
187                                     "Failure initalizing socket", ex);
188                         }
189                     }
190                     final ChannelEntry entry = new ChannelEntry(socketChannel);
191                     addChannel(entry);
192                 }
193             }
194 
195         } catch (final CancelledKeyException ex) {
196             final ListenerEndpoint endpoint = (ListenerEndpoint) key.attachment();
197             this.endpoints.remove(endpoint);
198             key.attach(null);
199         }
200     }
201 
202     private ListenerEndpointImpl createEndpoint(final SocketAddress address) {
203         return new ListenerEndpointImpl(
204                 address,
205                 new ListenerEndpointClosedCallback() {
206 
207                     public void endpointClosed(final ListenerEndpoint endpoint) {
208                         endpoints.remove(endpoint);
209                     }
210 
211                 });
212     }
213 
214     public ListenerEndpoint listen(final SocketAddress address) {
215         Asserts.check(this.status.compareTo(IOReactorStatus.ACTIVE) <= 0,
216                 "I/O reactor has been shut down");
217         final ListenerEndpointImpl request = createEndpoint(address);
218         this.requestQueue.add(request);
219         this.selector.wakeup();
220         return request;
221     }
222 
223     private void processSessionRequests() throws IOReactorException {
224         ListenerEndpointImpl request;
225         while ((request = this.requestQueue.poll()) != null) {
226             final SocketAddress address = request.getAddress();
227             final ServerSocketChannel serverChannel;
228             try {
229                 serverChannel = ServerSocketChannel.open();
230             } catch (final IOException ex) {
231                 throw new IOReactorException("Failure opening server socket", ex);
232             }
233             try {
234                 final ServerSocket socket = serverChannel.socket();
235                 socket.setReuseAddress(this.config.isSoReuseAddress());
236                 if (this.config.getSoTimeout() > 0) {
237                     socket.setSoTimeout(this.config.getSoTimeout());
238                 }
239                 if (this.config.getRcvBufSize() > 0) {
240                     socket.setReceiveBufferSize(this.config.getRcvBufSize());
241                 }
242                 serverChannel.configureBlocking(false);
243                 socket.bind(address);
244             } catch (final IOException ex) {
245                 closeChannel(serverChannel);
246                 request.failed(ex);
247                 if (this.exceptionHandler == null || !this.exceptionHandler.handle(ex)) {
248                     throw new IOReactorException("Failure binding socket to address "
249                             + address, ex);
250                 } else {
251                     return;
252                 }
253             }
254             try {
255                 final SelectionKey key = serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);
256                 key.attach(request);
257                 request.setKey(key);
258             } catch (final IOException ex) {
259                 closeChannel(serverChannel);
260                 throw new IOReactorException("Failure registering channel " +
261                         "with the selector", ex);
262             }
263 
264             this.endpoints.add(request);
265             request.completed(serverChannel.socket().getLocalSocketAddress());
266         }
267     }
268 
269     public Set<ListenerEndpoint> getEndpoints() {
270         final Set<ListenerEndpoint> set = new HashSet<ListenerEndpoint>();
271         synchronized (this.endpoints) {
272             final Iterator<ListenerEndpointImpl> it = this.endpoints.iterator();
273             while (it.hasNext()) {
274                 final ListenerEndpoint endpoint = it.next();
275                 if (!endpoint.isClosed()) {
276                     set.add(endpoint);
277                 } else {
278                     it.remove();
279                 }
280             }
281         }
282         return set;
283     }
284 
285     public void pause() throws IOException {
286         if (this.paused) {
287             return;
288         }
289         this.paused = true;
290         synchronized (this.endpoints) {
291             for (final ListenerEndpointImpl endpoint : this.endpoints) {
292                 if (!endpoint.isClosed()) {
293                     endpoint.close();
294                     this.pausedEndpoints.add(endpoint.getAddress());
295                 }
296             }
297             this.endpoints.clear();
298         }
299     }
300 
301     public void resume() throws IOException {
302         if (!this.paused) {
303             return;
304         }
305         this.paused = false;
306         for (final SocketAddress address: this.pausedEndpoints) {
307             final ListenerEndpointImpl request = createEndpoint(address);
308             this.requestQueue.add(request);
309         }
310         this.pausedEndpoints.clear();
311         this.selector.wakeup();
312     }
313 
314 }