View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.impl.nio.reactor;
29  
30  import java.io.IOException;
31  import java.net.ServerSocket;
32  import java.net.SocketAddress;
33  import java.nio.channels.CancelledKeyException;
34  import java.nio.channels.SelectionKey;
35  import java.nio.channels.ServerSocketChannel;
36  import java.nio.channels.SocketChannel;
37  import java.util.Collections;
38  import java.util.HashSet;
39  import java.util.Iterator;
40  import java.util.Queue;
41  import java.util.Set;
42  import java.util.concurrent.ConcurrentLinkedQueue;
43  import java.util.concurrent.ThreadFactory;
44  
45  import org.apache.http.nio.reactor.IOReactorException;
46  import org.apache.http.nio.reactor.IOReactorStatus;
47  import org.apache.http.nio.reactor.ListenerEndpoint;
48  import org.apache.http.nio.reactor.ListeningIOReactor;
49  import org.apache.http.params.HttpParams;
50  import org.apache.http.util.Asserts;
51  
52  /**
53   * Default implementation of {@link ListeningIOReactor}. This class extends
54   * {@link AbstractMultiworkerIOReactor} with capability to listen for incoming
55   * connections.
56   *
57   * @since 4.0
58   */
59  @SuppressWarnings("deprecation")
60  public class DefaultListeningIOReactor extends AbstractMultiworkerIOReactor
61          implements ListeningIOReactor {
62  
63      private final Queue<ListenerEndpointImpl> requestQueue;
64      private final Set<ListenerEndpointImpl> endpoints;
65      private final Set<SocketAddress> pausedEndpoints;
66  
67      private volatile boolean paused;
68  
69      /**
70       * Creates an instance of DefaultListeningIOReactor with the given configuration.
71       *
72       * @param config I/O reactor configuration.
73       * @param threadFactory the factory to create threads.
74       *   Can be {@code null}.
75       * @throws IOReactorException in case if a non-recoverable I/O error.
76       *
77       * @since 4.2
78       */
79      public DefaultListeningIOReactor(
80              final IOReactorConfig config,
81              final ThreadFactory threadFactory) throws IOReactorException {
82          super(config, threadFactory);
83          this.requestQueue = new ConcurrentLinkedQueue<ListenerEndpointImpl>();
84          this.endpoints = Collections.synchronizedSet(new HashSet<ListenerEndpointImpl>());
85          this.pausedEndpoints = new HashSet<SocketAddress>();
86      }
87  
88      /**
89       * Creates an instance of DefaultListeningIOReactor with the given configuration.
90       *
91       * @param config I/O reactor configuration.
92       *   Can be {@code null}.
93       * @throws IOReactorException in case if a non-recoverable I/O error.
94       *
95       * @since 4.2
96       */
97      public DefaultListeningIOReactor(final IOReactorConfig config) throws IOReactorException {
98          this(config, null);
99      }
100 
101     /**
102      * Creates an instance of DefaultListeningIOReactor with default configuration.
103      *
104      * @throws IOReactorException in case if a non-recoverable I/O error.
105      *
106      * @since 4.2
107      */
108     public DefaultListeningIOReactor() throws IOReactorException {
109         this(null, null);
110     }
111 
112     /**
113      * @deprecated (4.2) use {@link DefaultListeningIOReactor#DefaultListeningIOReactor(IOReactorConfig, ThreadFactory)}
114      */
115     @Deprecated
116     public DefaultListeningIOReactor(
117             final int workerCount,
118             final ThreadFactory threadFactory,
119             final HttpParams params) throws IOReactorException {
120         this(convert(workerCount, params), threadFactory);
121     }
122 
123     /**
124      * @deprecated (4.2) use {@link DefaultListeningIOReactor#DefaultListeningIOReactor(IOReactorConfig)}
125      */
126     @Deprecated
127     public DefaultListeningIOReactor(
128             final int workerCount,
129             final HttpParams params) throws IOReactorException {
130         this(convert(workerCount, params), null);
131     }
132 
133     @Override
134     protected void cancelRequests() throws IOReactorException {
135         ListenerEndpointImpl request;
136         while ((request = this.requestQueue.poll()) != null) {
137             request.cancel();
138         }
139     }
140 
141     @Override
142     protected void processEvents(final int readyCount) throws IOReactorException {
143         if (!this.paused) {
144             processSessionRequests();
145         }
146 
147         if (readyCount > 0) {
148             final Set<SelectionKey> selectedKeys = this.selector.selectedKeys();
149             for (final SelectionKey key : selectedKeys) {
150 
151                 processEvent(key);
152 
153             }
154             selectedKeys.clear();
155         }
156     }
157 
158     private void processEvent(final SelectionKey key)
159             throws IOReactorException {
160         try {
161 
162             if (key.isAcceptable()) {
163 
164                 final ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
165                 for (;;) {
166                     SocketChannel socketChannel = null;
167                     try {
168                         socketChannel = serverChannel.accept();
169                     } catch (final IOException ex) {
170                         if (this.exceptionHandler == null ||
171                                 !this.exceptionHandler.handle(ex)) {
172                             throw new IOReactorException(
173                                     "Failure accepting connection", ex);
174                         }
175                     }
176                     if (socketChannel == null) {
177                         break;
178                     }
179                     try {
180                         prepareSocket(socketChannel.socket());
181                     } catch (final IOException ex) {
182                         if (this.exceptionHandler == null ||
183                                 !this.exceptionHandler.handle(ex)) {
184                             throw new IOReactorException(
185                                     "Failure initalizing socket", ex);
186                         }
187                     }
188                     final ChannelEntry entry = new ChannelEntry(socketChannel);
189                     addChannel(entry);
190                 }
191             }
192 
193         } catch (final CancelledKeyException ex) {
194             final ListenerEndpoint endpoint = (ListenerEndpoint) key.attachment();
195             this.endpoints.remove(endpoint);
196             key.attach(null);
197         }
198     }
199 
200     private ListenerEndpointImpl createEndpoint(final SocketAddress address) {
201         return new ListenerEndpointImpl(
202                 address,
203                 new ListenerEndpointClosedCallback() {
204 
205                     @Override
206                     public void endpointClosed(final ListenerEndpoint endpoint) {
207                         endpoints.remove(endpoint);
208                     }
209 
210                 });
211     }
212 
213     @Override
214     public ListenerEndpoint listen(final SocketAddress address) {
215         Asserts.check(this.status.compareTo(IOReactorStatus.ACTIVE) <= 0,
216                 "I/O reactor has been shut down");
217         final ListenerEndpointImpl request = createEndpoint(address);
218         this.requestQueue.add(request);
219         this.selector.wakeup();
220         return request;
221     }
222 
223     private void processSessionRequests() throws IOReactorException {
224         ListenerEndpointImpl request;
225         while ((request = this.requestQueue.poll()) != null) {
226             final SocketAddress address = request.getAddress();
227             final ServerSocketChannel serverChannel;
228             try {
229                 serverChannel = ServerSocketChannel.open();
230             } catch (final IOException ex) {
231                 throw new IOReactorException("Failure opening server socket", ex);
232             }
233             try {
234                 final ServerSocket socket = serverChannel.socket();
235                 socket.setReuseAddress(this.config.isSoReuseAddress());
236                 if (this.config.getSoTimeout() > 0) {
237                     socket.setSoTimeout(this.config.getSoTimeout());
238                 }
239                 if (this.config.getRcvBufSize() > 0) {
240                     socket.setReceiveBufferSize(this.config.getRcvBufSize());
241                 }
242                 serverChannel.configureBlocking(false);
243                 socket.bind(address, this.config.getBacklogSize());
244             } catch (final IOException ex) {
245                 closeChannel(serverChannel);
246                 request.failed(ex);
247                 if (this.exceptionHandler == null || !this.exceptionHandler.handle(ex)) {
248                     throw new IOReactorException("Failure binding socket to address "
249                             + address, ex);
250                 } else {
251                     return;
252                 }
253             }
254             try {
255                 final SelectionKey key = serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);
256                 key.attach(request);
257                 request.setKey(key);
258             } catch (final IOException ex) {
259                 closeChannel(serverChannel);
260                 throw new IOReactorException("Failure registering channel " +
261                         "with the selector", ex);
262             }
263 
264             this.endpoints.add(request);
265             request.completed(serverChannel.socket().getLocalSocketAddress());
266         }
267     }
268 
269     @Override
270     public Set<ListenerEndpoint> getEndpoints() {
271         final Set<ListenerEndpoint> set = new HashSet<ListenerEndpoint>();
272         synchronized (this.endpoints) {
273             final Iterator<ListenerEndpointImpl> it = this.endpoints.iterator();
274             while (it.hasNext()) {
275                 final ListenerEndpoint endpoint = it.next();
276                 if (!endpoint.isClosed()) {
277                     set.add(endpoint);
278                 } else {
279                     it.remove();
280                 }
281             }
282         }
283         return set;
284     }
285 
286     @Override
287     public void pause() throws IOException {
288         if (this.paused) {
289             return;
290         }
291         this.paused = true;
292         synchronized (this.endpoints) {
293             for (final ListenerEndpointImpl endpoint : this.endpoints) {
294                 if (!endpoint.isClosed()) {
295                     endpoint.close();
296                     this.pausedEndpoints.add(endpoint.getAddress());
297                 }
298             }
299             this.endpoints.clear();
300         }
301     }
302 
303     @Override
304     public void resume() throws IOException {
305         if (!this.paused) {
306             return;
307         }
308         this.paused = false;
309         for (final SocketAddress address: this.pausedEndpoints) {
310             final ListenerEndpointImpl request = createEndpoint(address);
311             this.requestQueue.add(request);
312         }
313         this.pausedEndpoints.clear();
314         this.selector.wakeup();
315     }
316 
317 }