View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.impl.nio.reactor;
29  
30  import java.io.IOException;
31  import java.net.ServerSocket;
32  import java.net.SocketAddress;
33  import java.nio.channels.CancelledKeyException;
34  import java.nio.channels.SelectionKey;
35  import java.nio.channels.ServerSocketChannel;
36  import java.nio.channels.SocketChannel;
37  import java.util.Collections;
38  import java.util.HashSet;
39  import java.util.Iterator;
40  import java.util.Queue;
41  import java.util.Set;
42  import java.util.concurrent.ConcurrentLinkedQueue;
43  import java.util.concurrent.ThreadFactory;
44  
45  import org.apache.http.annotation.ThreadSafe;
46  import org.apache.http.nio.reactor.IOReactorException;
47  import org.apache.http.nio.reactor.IOReactorStatus;
48  import org.apache.http.nio.reactor.ListenerEndpoint;
49  import org.apache.http.nio.reactor.ListeningIOReactor;
50  import org.apache.http.params.HttpParams;
51  
52  /**
53   * Default implementation of {@link ListeningIOReactor}. This class extends
54   * {@link AbstractMultiworkerIOReactor} with capability to listen for incoming
55   * connections.
56   *
57   * @since 4.0
58   */
59  @ThreadSafe // public methods only
60  public class DefaultListeningIOReactor extends AbstractMultiworkerIOReactor
61          implements ListeningIOReactor {
62  
63      private final Queue<ListenerEndpointImpl> requestQueue;
64      private final Set<ListenerEndpointImpl> endpoints;
65      private final Set<SocketAddress> pausedEndpoints;
66  
67      private volatile boolean paused;
68  
69      /**
70       * Creates an instance of DefaultListeningIOReactor with the given configuration.
71       *
72       * @param config I/O reactor configuration.
73       * @param threadFactory the factory to create threads.
74       *   Can be <code>null</code>.
75       * @throws IOReactorException in case if a non-recoverable I/O error.
76       *
77       * @since 4.2
78       */
79      public DefaultListeningIOReactor(
80              final IOReactorConfig config,
81              final ThreadFactory threadFactory) throws IOReactorException {
82          super(config, threadFactory);
83          this.requestQueue = new ConcurrentLinkedQueue<ListenerEndpointImpl>();
84          this.endpoints = Collections.synchronizedSet(new HashSet<ListenerEndpointImpl>());
85          this.pausedEndpoints = new HashSet<SocketAddress>();
86      }
87  
88      /**
89       * Creates an instance of DefaultListeningIOReactor with the given configuration.
90       *
91       * @param config I/O reactor configuration.
92       *   Can be <code>null</code>.
93       * @throws IOReactorException in case if a non-recoverable I/O error.
94       *
95       * @since 4.2
96       */
97      public DefaultListeningIOReactor(final IOReactorConfig config) throws IOReactorException {
98          this(config, null);
99      }
100 
101     /**
102      * Creates an instance of DefaultListeningIOReactor with default configuration.
103      *
104      * @throws IOReactorException in case if a non-recoverable I/O error.
105      *
106      * @since 4.2
107      */
108     public DefaultListeningIOReactor() throws IOReactorException {
109         this(null, null);
110     }
111 
112     /**
113      * @deprecated (4.2) use {@link DefaultListeningIOReactor#DefaultListeningIOReactor(IOReactorConfig, ThreadFactory)}
114      */
115     @Deprecated
116     public DefaultListeningIOReactor(
117             int workerCount,
118             final ThreadFactory threadFactory,
119             final HttpParams params) throws IOReactorException {
120         this(convert(workerCount, params), threadFactory);
121     }
122 
123     /**
124      * @deprecated (4.2) use {@link DefaultListeningIOReactor#DefaultListeningIOReactor(IOReactorConfig)}
125      */
126     @Deprecated
127     public DefaultListeningIOReactor(
128             int workerCount,
129             final HttpParams params) throws IOReactorException {
130         this(convert(workerCount, params), null);
131     }
132 
133     @Override
134     protected void cancelRequests() throws IOReactorException {
135         ListenerEndpointImpl request;
136         while ((request = this.requestQueue.poll()) != null) {
137             request.cancel();
138         }
139     }
140 
141     @Override
142     protected void processEvents(int readyCount) throws IOReactorException {
143         if (!this.paused) {
144             processSessionRequests();
145         }
146 
147         if (readyCount > 0) {
148             Set<SelectionKey> selectedKeys = this.selector.selectedKeys();
149             for (Iterator<SelectionKey> it = selectedKeys.iterator(); it.hasNext(); ) {
150 
151                 SelectionKey key = it.next();
152                 processEvent(key);
153 
154             }
155             selectedKeys.clear();
156         }
157     }
158 
159     private void processEvent(final SelectionKey key)
160             throws IOReactorException {
161         try {
162 
163             if (key.isAcceptable()) {
164 
165                 ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
166                 for (;;) {
167                     SocketChannel socketChannel = null;
168                     try {
169                         socketChannel = serverChannel.accept();
170                     } catch (IOException ex) {
171                         if (this.exceptionHandler == null ||
172                                 !this.exceptionHandler.handle(ex)) {
173                             throw new IOReactorException(
174                                     "Failure accepting connection", ex);
175                         }
176                     }
177                     if (socketChannel == null) {
178                         break;
179                     }
180                     try {
181                         prepareSocket(socketChannel.socket());
182                     } catch (IOException ex) {
183                         if (this.exceptionHandler == null ||
184                                 !this.exceptionHandler.handle(ex)) {
185                             throw new IOReactorException(
186                                     "Failure initalizing socket", ex);
187                         }
188                     }
189                     ChannelEntry entry = new ChannelEntry(socketChannel);
190                     addChannel(entry);
191                 }
192             }
193 
194         } catch (CancelledKeyException ex) {
195             ListenerEndpoint endpoint = (ListenerEndpoint) key.attachment();
196             this.endpoints.remove(endpoint);
197             key.attach(null);
198         }
199     }
200 
201     private ListenerEndpointImpl createEndpoint(final SocketAddress address) {
202         ListenerEndpointImpl endpoint = new ListenerEndpointImpl(
203                 address,
204                 new ListenerEndpointClosedCallback() {
205 
206                     public void endpointClosed(final ListenerEndpoint endpoint) {
207                         endpoints.remove(endpoint);
208                     }
209 
210                 });
211         return endpoint;
212     }
213 
214     public ListenerEndpoint listen(final SocketAddress address) {
215         if (this.status.compareTo(IOReactorStatus.ACTIVE) > 0) {
216             throw new IllegalStateException("I/O reactor has been shut down");
217         }
218         ListenerEndpointImpl request = createEndpoint(address);
219         this.requestQueue.add(request);
220         this.selector.wakeup();
221         return request;
222     }
223 
224     private void processSessionRequests() throws IOReactorException {
225         ListenerEndpointImpl request;
226         while ((request = this.requestQueue.poll()) != null) {
227             SocketAddress address = request.getAddress();
228             ServerSocketChannel serverChannel;
229             try {
230                 serverChannel = ServerSocketChannel.open();
231             } catch (IOException ex) {
232                 throw new IOReactorException("Failure opening server socket", ex);
233             }
234             try {
235                 ServerSocket socket = serverChannel.socket();
236                 socket.setReuseAddress(this.config.isSoReuseAddress());
237                 serverChannel.configureBlocking(false);
238                 socket.bind(address);
239             } catch (IOException ex) {
240                 closeChannel(serverChannel);
241                 request.failed(ex);
242                 if (this.exceptionHandler == null || !this.exceptionHandler.handle(ex)) {
243                     throw new IOReactorException("Failure binding socket to address "
244                             + address, ex);
245                 } else {
246                     return;
247                 }
248             }
249             try {
250                 SelectionKey key = serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);
251                 key.attach(request);
252                 request.setKey(key);
253             } catch (IOException ex) {
254                 closeChannel(serverChannel);
255                 throw new IOReactorException("Failure registering channel " +
256                         "with the selector", ex);
257             }
258 
259             this.endpoints.add(request);
260             request.completed(serverChannel.socket().getLocalSocketAddress());
261         }
262     }
263 
264     public Set<ListenerEndpoint> getEndpoints() {
265         Set<ListenerEndpoint> set = new HashSet<ListenerEndpoint>();
266         synchronized (this.endpoints) {
267             Iterator<ListenerEndpointImpl> it = this.endpoints.iterator();
268             while (it.hasNext()) {
269                 ListenerEndpoint endpoint = it.next();
270                 if (!endpoint.isClosed()) {
271                     set.add(endpoint);
272                 } else {
273                     it.remove();
274                 }
275             }
276         }
277         return set;
278     }
279 
280     public void pause() throws IOException {
281         if (this.paused) {
282             return;
283         }
284         this.paused = true;
285         synchronized (this.endpoints) {
286             Iterator<ListenerEndpointImpl> it = this.endpoints.iterator();
287             while (it.hasNext()) {
288                 ListenerEndpoint endpoint = it.next();
289                 if (!endpoint.isClosed()) {
290                     endpoint.close();
291                     this.pausedEndpoints.add(endpoint.getAddress());
292                 }
293             }
294             this.endpoints.clear();
295         }
296     }
297 
298     public void resume() throws IOException {
299         if (!this.paused) {
300             return;
301         }
302         this.paused = false;
303         for (SocketAddress address: this.pausedEndpoints) {
304             ListenerEndpointImpl request = createEndpoint(address);
305             this.requestQueue.add(request);
306         }
307         this.pausedEndpoints.clear();
308         this.selector.wakeup();
309     }
310 
311 }