View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.client5.http.impl.async;
28  
29  import java.net.SocketAddress;
30  import java.util.concurrent.ExecutorService;
31  import java.util.concurrent.Executors;
32  import java.util.concurrent.Future;
33  import java.util.concurrent.ThreadFactory;
34  import java.util.concurrent.TimeUnit;
35  import java.util.concurrent.atomic.AtomicReference;
36  
37  import org.apache.hc.core5.concurrent.FutureCallback;
38  import org.apache.hc.core5.function.Supplier;
39  import org.apache.hc.core5.http.nio.AsyncPushConsumer;
40  import org.apache.hc.core5.io.CloseMode;
41  import org.apache.hc.core5.net.NamedEndpoint;
42  import org.apache.hc.core5.reactor.ConnectionInitiator;
43  import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
44  import org.apache.hc.core5.reactor.IOReactorStatus;
45  import org.apache.hc.core5.reactor.IOSession;
46  import org.apache.hc.core5.util.TimeValue;
47  import org.apache.hc.core5.util.Timeout;
48  import org.slf4j.Logger;
49  import org.slf4j.LoggerFactory;
50  
51  abstract class AbstractHttpAsyncClientBase extends CloseableHttpAsyncClient implements ConnectionInitiator {
52  
53      enum Status { READY, RUNNING, TERMINATED }
54  
55      private static final Logger LOG = LoggerFactory.getLogger(AbstractHttpAsyncClientBase.class);
56  
57      private final AsyncPushConsumerRegistry pushConsumerRegistry;
58      private final DefaultConnectingIOReactor ioReactor;
59      private final ExecutorService executorService;
60      private final AtomicReference<Status> status;
61  
62      AbstractHttpAsyncClientBase(
63              final DefaultConnectingIOReactor ioReactor,
64              final AsyncPushConsumerRegistry pushConsumerRegistry,
65              final ThreadFactory threadFactory) {
66          super();
67          this.ioReactor = ioReactor;
68          this.pushConsumerRegistry = pushConsumerRegistry;
69          this.executorService = Executors.newSingleThreadExecutor(threadFactory);
70          this.status = new AtomicReference<>(Status.READY);
71      }
72  
73      @Override
74      public final void start() {
75          if (status.compareAndSet(Status.READY, Status.RUNNING)) {
76              executorService.execute(ioReactor::start);
77          }
78      }
79  
80      /**
81       * @deprecated Use {@link org.apache.hc.core5.http.impl.routing.RequestRouter}
82       * at the construction time
83       */
84      @Deprecated
85      @Override
86      public void register(final String hostname, final String uriPattern, final Supplier<AsyncPushConsumer> supplier) {
87          pushConsumerRegistry.register(hostname, uriPattern, supplier);
88      }
89  
90      boolean isRunning() {
91          return status.get() == Status.RUNNING;
92      }
93  
94      @Override
95      public Future<IOSession> connect(
96              final NamedEndpoint remoteEndpoint,
97              final SocketAddress remoteAddress,
98              final SocketAddress localAddress,
99              final Timeout timeout,
100             final Object attachment,
101             final FutureCallback<IOSession> callback) {
102         return ioReactor.connect(remoteEndpoint, remoteAddress, localAddress, timeout, attachment, callback);
103     }
104 
105     ConnectionInitiator getConnectionInitiator() {
106         return ioReactor;
107     }
108 
109     @Override
110     public final IOReactorStatus getStatus() {
111         return ioReactor.getStatus();
112     }
113 
114     @Override
115     public final void awaitShutdown(final TimeValue waitTime) throws InterruptedException {
116         ioReactor.awaitShutdown(waitTime);
117     }
118 
119     @Override
120     public final void initiateShutdown() {
121         if (LOG.isDebugEnabled()) {
122             LOG.debug("Initiating shutdown");
123         }
124         ioReactor.initiateShutdown();
125     }
126 
127     void internalClose(final CloseMode closeMode) {
128     }
129 
130     @Override
131     public final void close(final CloseMode closeMode) {
132         if (LOG.isDebugEnabled()) {
133             LOG.debug("Shutdown {}", closeMode);
134         }
135         ioReactor.initiateShutdown();
136         ioReactor.close(closeMode);
137         if (closeMode == CloseMode.GRACEFUL) {
138             executorService.shutdown();
139             try {
140                 if (!executorService.awaitTermination(1, TimeUnit.SECONDS)) {
141                     executorService.shutdownNow();
142                 }
143             } catch (final InterruptedException ignore) {
144                 Thread.currentThread().interrupt();
145             }
146         } else {
147             executorService.shutdownNow();
148         }
149         internalClose(closeMode);
150     }
151 
152     @Override
153     public void close() {
154         close(CloseMode.GRACEFUL);
155     }
156 
157 }