View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.http.nio.support;
28  
29  import java.io.IOException;
30  import java.nio.ByteBuffer;
31  import java.util.List;
32  import java.util.concurrent.atomic.AtomicBoolean;
33  
34  import org.apache.hc.core5.concurrent.FutureCallback;
35  import org.apache.hc.core5.http.EntityDetails;
36  import org.apache.hc.core5.http.Header;
37  import org.apache.hc.core5.http.HttpException;
38  import org.apache.hc.core5.http.HttpResponse;
39  import org.apache.hc.core5.http.HttpStatus;
40  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
41  import org.apache.hc.core5.http.nio.AsyncRequestProducer;
42  import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
43  import org.apache.hc.core5.http.nio.CapacityChannel;
44  import org.apache.hc.core5.http.nio.DataStreamChannel;
45  import org.apache.hc.core5.http.nio.RequestChannel;
46  import org.apache.hc.core5.http.protocol.HttpContext;
47  import org.apache.hc.core5.util.Args;
48  
49  /**
50   * Basic {@link AsyncClientExchangeHandler} implementation that makes use
51   * of {@link AsyncRequestProducer} to generate request message
52   * and {@link AsyncResponseConsumer} to process the response message returned by the server.
53   *
54   * @since 5.0
55   */
56  public final class BasicClientExchangeHandler<T> implements AsyncClientExchangeHandler {
57  
58      private final AsyncRequestProducer requestProducer;
59      private final AsyncResponseConsumer<T> responseConsumer;
60      private final AtomicBoolean completed;
61      private final FutureCallback<T> resultCallback;
62      private final AtomicBoolean outputTerminated;
63  
64      public BasicClientExchangeHandler(
65              final AsyncRequestProducer requestProducer,
66              final AsyncResponseConsumer<T> responseConsumer,
67              final FutureCallback<T> resultCallback) {
68          this.requestProducer = Args.notNull(requestProducer, "Request producer");
69          this.responseConsumer = Args.notNull(responseConsumer, "Response consumer");
70          this.completed = new AtomicBoolean(false);
71          this.resultCallback = resultCallback;
72          this.outputTerminated = new AtomicBoolean(false);
73      }
74  
75      @Override
76      public void produceRequest(final RequestChannel requestChannel, final HttpContext httpContext) throws HttpException, IOException {
77          requestProducer.sendRequest(requestChannel, httpContext);
78      }
79  
80      @Override
81      public int available() {
82          return requestProducer.available();
83      }
84  
85      @Override
86      public void produce(final DataStreamChannel channel) throws IOException {
87          if (outputTerminated.get()) {
88              channel.endStream();
89              return;
90          }
91          requestProducer.produce(channel);
92      }
93  
94      @Override
95      public void consumeInformation(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
96          responseConsumer.informationResponse(response, httpContext);
97      }
98  
99      @Override
100     public void consumeResponse(final HttpResponse response, final EntityDetails entityDetails, final HttpContext httpContext) throws HttpException, IOException {
101         if (response.getCode() >= HttpStatus.SC_CLIENT_ERROR) {
102             outputTerminated.set(true);
103             requestProducer.releaseResources();
104         }
105         responseConsumer.consumeResponse(response, entityDetails, httpContext, new FutureCallback<T>() {
106 
107             @Override
108             public void completed(final T result) {
109                 if (completed.compareAndSet(false, true)) {
110                     try {
111                         if (resultCallback != null) {
112                             resultCallback.completed(result);
113                         }
114                     } finally {
115                         internalReleaseResources();
116                     }
117                 }
118             }
119 
120             @Override
121             public void failed(final Exception ex) {
122                 if (completed.compareAndSet(false, true)) {
123                     try {
124                         if (resultCallback != null) {
125                             resultCallback.failed(ex);
126                         }
127                     } finally {
128                         internalReleaseResources();
129                     }
130                 }
131             }
132 
133             @Override
134             public void cancelled() {
135                 if (completed.compareAndSet(false, true)) {
136                     try {
137                         if (resultCallback != null) {
138                             resultCallback.cancelled();
139                         }
140                     } finally {
141                         internalReleaseResources();
142                     }
143                 }
144             }
145 
146         });
147     }
148 
149     @Override
150     public void cancel() {
151         if (completed.compareAndSet(false, true)) {
152             try {
153                 if (resultCallback != null) {
154                     resultCallback.cancelled();
155                 }
156             } finally {
157                 internalReleaseResources();
158             }
159         }
160     }
161 
162     @Override
163     public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
164         responseConsumer.updateCapacity(capacityChannel);
165     }
166 
167     @Override
168     public void consume(final ByteBuffer src) throws IOException {
169         responseConsumer.consume(src);
170     }
171 
172     @Override
173     public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
174         responseConsumer.streamEnd(trailers);
175     }
176 
177     @Override
178     public void failed(final Exception cause) {
179         try {
180             requestProducer.failed(cause);
181             responseConsumer.failed(cause);
182         } finally {
183             if (completed.compareAndSet(false, true)) {
184                 try {
185                     if (resultCallback != null) {
186                         resultCallback.failed(cause);
187                     }
188                 } finally {
189                     internalReleaseResources();
190                 }
191             }
192         }
193     }
194 
195     private void internalReleaseResources() {
196         requestProducer.releaseResources();
197         responseConsumer.releaseResources();
198     }
199 
200     @Override
201     public void releaseResources() {
202     }
203 
204 }