View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.reactor;
28  
29  import java.util.ArrayDeque;
30  import java.util.HashSet;
31  import java.util.Queue;
32  import java.util.Set;
33  import java.util.concurrent.ConcurrentHashMap;
34  import java.util.concurrent.ConcurrentMap;
35  import java.util.concurrent.Future;
36  import java.util.concurrent.atomic.AtomicBoolean;
37  
38  import org.apache.hc.core5.annotation.Contract;
39  import org.apache.hc.core5.annotation.ThreadingBehavior;
40  import org.apache.hc.core5.concurrent.ComplexFuture;
41  import org.apache.hc.core5.concurrent.FutureCallback;
42  import org.apache.hc.core5.function.Callback;
43  import org.apache.hc.core5.http.ConnectionClosedException;
44  import org.apache.hc.core5.io.GracefullyCloseable;
45  import org.apache.hc.core5.io.ShutdownType;
46  import org.apache.hc.core5.util.Args;
47  import org.apache.hc.core5.util.Asserts;
48  import org.apache.hc.core5.util.TimeValue;
49  import org.apache.hc.core5.util.Timeout;
50  
51  /**
52   * @since 5.0
53   */
54  @Contract(threading = ThreadingBehavior.SAFE)
55  public abstract class AbstractIOSessionPool<T> implements GracefullyCloseable {
56  
57      private final ConcurrentMap<T, PoolEntry> sessionPool;
58      private final AtomicBoolean closed;
59  
60      public AbstractIOSessionPool() {
61          super();
62          this.sessionPool = new ConcurrentHashMap<>();
63          this.closed = new AtomicBoolean(false);
64      }
65  
66      protected abstract Future<IOSession> connectSession(
67              T namedEndpoint,
68              Timeout requestTimeout,
69              FutureCallback<IOSession> callback);
70  
71      protected abstract void validateSession(
72              IOSession ioSession,
73              Callback<Boolean> callback);
74  
75      protected abstract void closeSession(
76              IOSession ioSession,
77              ShutdownType shutdownType);
78  
79      @Override
80      public final void shutdown(final ShutdownType shutdownType) {
81          if (closed.compareAndSet(false, true)) {
82              for (final PoolEntry poolEntry : sessionPool.values()) {
83                  synchronized (poolEntry) {
84                      if (poolEntry.session != null) {
85                          closeSession(poolEntry.session, shutdownType);
86                          poolEntry.session = null;
87                      }
88                      if (poolEntry.sessionFuture != null) {
89                          poolEntry.sessionFuture.cancel(true);
90                          poolEntry.sessionFuture = null;
91                      }
92                      for (;;) {
93                          final FutureCallback<IOSession> callback = poolEntry.requestQueue.poll();
94                          if (callback != null) {
95                              callback.cancelled();
96                          } else {
97                              break;
98                          }
99                      }
100                 }
101             }
102             sessionPool.clear();
103         }
104     }
105 
106     @Override
107     public final void close() {
108         shutdown(ShutdownType.GRACEFUL);
109     }
110 
111     PoolEntry getPoolEntry(final T endpoint) {
112         PoolEntry poolEntry = sessionPool.get(endpoint);
113         if (poolEntry == null) {
114             final PoolEntry newPoolEntry = new PoolEntry();
115             poolEntry = sessionPool.putIfAbsent(endpoint, newPoolEntry);
116             if (poolEntry == null) {
117                 poolEntry = newPoolEntry;
118             }
119         }
120         return poolEntry;
121     }
122 
123     public final Future<IOSession> getSession(
124             final T endpoint,
125             final Timeout requestTimeout,
126             final FutureCallback<IOSession> callback) {
127         Args.notNull(endpoint, "Endpoint");
128         Asserts.check(!closed.get(), "Connection pool shut down");
129         final ComplexFuture<IOSession> future = new ComplexFuture<>(callback);
130         final PoolEntry poolEntry = getPoolEntry(endpoint);
131         getSessionInternal(poolEntry, false, endpoint, requestTimeout, new FutureCallback<IOSession>() {
132 
133             @Override
134             public void completed(final IOSession ioSession) {
135                 validateSession(ioSession, new Callback<Boolean>() {
136 
137                     @Override
138                     public void execute(final Boolean result) {
139                         if (result) {
140                             future.completed(ioSession);
141                         } else {
142                             getSessionInternal(poolEntry, true, endpoint, requestTimeout, new FutureCallback<IOSession>() {
143 
144                                 @Override
145                                 public void completed(final IOSession ioSession) {
146                                     future.completed(ioSession);
147                                 }
148 
149                                 @Override
150                                 public void failed(final Exception ex) {
151                                     future.failed(ex);
152                                 }
153 
154                                 @Override
155                                 public void cancelled() {
156                                     future.cancel();
157                                 }
158 
159                             });
160                         }
161                     }
162 
163                 });
164             }
165 
166             @Override
167             public void failed(final Exception ex) {
168                 future.failed(ex);
169             }
170 
171             @Override
172             public void cancelled() {
173                 future.cancel();
174             }
175 
176         });
177         return future;
178     }
179 
180     private void getSessionInternal(
181             final PoolEntry poolEntry,
182             final boolean requestNew,
183             final T namedEndpoint,
184             final Timeout requestTimeout,
185             final FutureCallback<IOSession> callback) {
186         synchronized (poolEntry) {
187             if (poolEntry.session != null && requestNew) {
188                 closeSession(poolEntry.session, ShutdownType.GRACEFUL);
189                 poolEntry.session = null;
190             }
191             if (poolEntry.session != null && poolEntry.session.isClosed()) {
192                 poolEntry.session = null;
193             }
194             if (poolEntry.session != null) {
195                 callback.completed(poolEntry.session);
196             } else {
197                 poolEntry.requestQueue.add(callback);
198                 if (poolEntry.sessionFuture == null) {
199                     poolEntry.sessionFuture = connectSession(
200                             namedEndpoint,
201                             requestTimeout,
202                             new FutureCallback<IOSession>() {
203 
204                                 @Override
205                                 public void completed(final IOSession result) {
206                                     synchronized (poolEntry) {
207                                         poolEntry.session = result;
208                                         poolEntry.sessionFuture = null;
209                                         for (;;) {
210                                             final FutureCallback<IOSession> callback = poolEntry.requestQueue.poll();
211                                             if (callback != null) {
212                                                 callback.completed(result);
213                                             } else {
214                                                 break;
215                                             }
216                                         }
217                                     }
218                                 }
219 
220                                 @Override
221                                 public void failed(final Exception ex) {
222                                     synchronized (poolEntry) {
223                                         poolEntry.session = null;
224                                         poolEntry.sessionFuture = null;
225                                         for (;;) {
226                                             final FutureCallback<IOSession> callback = poolEntry.requestQueue.poll();
227                                             if (callback != null) {
228                                                 callback.failed(ex);
229                                             } else {
230                                                 break;
231                                             }
232                                         }
233                                     }
234                                 }
235 
236                                 @Override
237                                 public void cancelled() {
238                                     failed(new ConnectionClosedException("Connection request cancelled"));
239                                 }
240 
241                             });
242                 }
243             }
244         }
245     }
246 
247     public final void enumAvailable(final Callback<IOSession> callback) {
248         for (final PoolEntry poolEntry: sessionPool.values()) {
249             if (poolEntry.session != null) {
250                 synchronized (poolEntry) {
251                     if (poolEntry.session != null) {
252                         callback.execute(poolEntry.session);
253                         if (poolEntry.session.isClosed()) {
254                             poolEntry.session = null;
255                         }
256                     }
257                 }
258             }
259         }
260     }
261 
262     public final void closeIdle(final TimeValue idleTime) {
263         final long deadline = System.currentTimeMillis() - (TimeValue.isPositive(idleTime) ? idleTime.toMillis() : 0);
264         for (final PoolEntry poolEntry: sessionPool.values()) {
265             if (poolEntry.session != null) {
266                 synchronized (poolEntry) {
267                     if (poolEntry.session != null && poolEntry.session.getLastReadTime() <= deadline) {
268                         closeSession(poolEntry.session, ShutdownType.GRACEFUL);
269                         poolEntry.session = null;
270                     }
271                 }
272             }
273         }
274     }
275 
276     public final Set<T> getRoutes() {
277         return new HashSet<>(sessionPool.keySet());
278     }
279 
280     @Override
281     public String toString() {
282         final StringBuilder buffer = new StringBuilder();
283         buffer.append("I/O sessions: ");
284         buffer.append(sessionPool.size());
285         return buffer.toString();
286     }
287 
288     static class PoolEntry {
289 
290         final Queue<FutureCallback<IOSession>> requestQueue;
291         volatile Future<IOSession> sessionFuture;
292         volatile IOSession session;
293 
294         PoolEntry() {
295             this.requestQueue = new ArrayDeque<>();
296         }
297 
298     }
299 
300 }