View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.client5.http.impl.compat;
28  
29  import java.io.IOException;
30  import java.io.InterruptedIOException;
31  import java.nio.ByteBuffer;
32  import java.util.concurrent.atomic.AtomicBoolean;
33  import java.util.concurrent.locks.ReentrantLock;
34  
35  import org.apache.hc.core5.annotation.Internal;
36  import org.apache.hc.core5.http.nio.DataStreamChannel;
37  import org.apache.hc.core5.http.nio.support.classic.ContentOutputBuffer;
38  import org.apache.hc.core5.util.Timeout;
39  
40  /**
41   * TODO: to be replaced by core functionality
42   */
43  @Internal
44  final class SharedOutputBuffer extends AbstractSharedBuffer implements ContentOutputBuffer {
45  
46      private final AtomicBoolean endStreamPropagated;
47      private volatile DataStreamChannel dataStreamChannel;
48      private volatile boolean hasCapacity;
49  
50      public SharedOutputBuffer(final ReentrantLock lock, final int initialBufferSize) {
51          super(lock, initialBufferSize);
52          this.hasCapacity = false;
53          this.endStreamPropagated = new AtomicBoolean();
54      }
55  
56      public SharedOutputBuffer(final int bufferSize) {
57          this(new ReentrantLock(), bufferSize);
58      }
59  
60      public void flush(final DataStreamChannel channel) throws IOException {
61          lock.lock();
62          try {
63              dataStreamChannel = channel;
64              hasCapacity = true;
65              setOutputMode();
66              if (buffer().hasRemaining()) {
67                  dataStreamChannel.write(buffer());
68              }
69              if (!buffer().hasRemaining() && endStream) {
70                  propagateEndStream();
71              }
72              condition.signalAll();
73          } finally {
74              lock.unlock();
75          }
76      }
77  
78      private void ensureNotAborted() throws InterruptedIOException {
79          if (aborted) {
80              throw new InterruptedIOException("Operation aborted");
81          }
82      }
83  
84      /**
85       * @since 5.4
86       */
87      public void write(final byte[] b, final int off, final int len, final Timeout timeout) throws IOException {
88          final ByteBuffer src = ByteBuffer.wrap(b, off, len);
89          lock.lock();
90          try {
91              ensureNotAborted();
92              setInputMode();
93              while (src.hasRemaining()) {
94                  // always buffer small chunks
95                  if (src.remaining() < 1024 && buffer().remaining() > src.remaining()) {
96                      buffer().put(src);
97                  } else {
98                      if (buffer().position() > 0 || dataStreamChannel == null) {
99                          waitFlush(timeout);
100                     }
101                     if (buffer().position() == 0 && dataStreamChannel != null) {
102                         final int bytesWritten = dataStreamChannel.write(src);
103                         if (bytesWritten == 0) {
104                             hasCapacity = false;
105                             waitFlush(timeout);
106                         }
107                     }
108                 }
109             }
110         } finally {
111             lock.unlock();
112         }
113     }
114 
115     @Override
116     public void write(final byte[] b, final int off, final int len) throws IOException {
117         write(b, off, len, null);
118     }
119 
120     /**
121      * @since 5.4
122      */
123     public void write(final int b, final Timeout timeout) throws IOException {
124         lock.lock();
125         try {
126             ensureNotAborted();
127             setInputMode();
128             if (!buffer().hasRemaining()) {
129                 waitFlush(timeout);
130             }
131             buffer().put((byte)b);
132         } finally {
133             lock.unlock();
134         }
135     }
136 
137     @Override
138     public void write(final int b) throws IOException {
139         write(b, null);
140     }
141 
142     /**
143      * @since 5.4
144      */
145     public void writeCompleted(final Timeout timeout) throws IOException {
146         if (endStream) {
147             return;
148         }
149         lock.lock();
150         try {
151             if (!endStream) {
152                 endStream = true;
153                 if (dataStreamChannel != null) {
154                     setOutputMode();
155                     if (buffer().hasRemaining()) {
156                         dataStreamChannel.requestOutput();
157                         waitEndStream(timeout);
158                     } else {
159                         propagateEndStream();
160                     }
161                 }
162             }
163         } finally {
164             lock.unlock();
165         }
166     }
167 
168     @Override
169     public void writeCompleted() throws IOException {
170         writeCompleted(null);
171     }
172 
173     private void waitFlush(final Timeout timeout) throws InterruptedIOException {
174         if (dataStreamChannel != null) {
175             dataStreamChannel.requestOutput();
176         }
177         setOutputMode();
178         while (buffer().hasRemaining() || !hasCapacity) {
179             ensureNotAborted();
180             waitForSignal(timeout);
181         }
182         setInputMode();
183     }
184 
185     private void waitEndStream(final Timeout timeout) throws InterruptedIOException {
186         if (dataStreamChannel != null) {
187             dataStreamChannel.requestOutput();
188         }
189         while (!endStreamPropagated.get() && !aborted) {
190             waitForSignal(timeout);
191         }
192     }
193 
194     private void waitForSignal(final Timeout timeout) throws InterruptedIOException {
195         try {
196             if (timeout == null) {
197                 condition.await();
198             } else {
199                 if (!condition.await(timeout.getDuration(), timeout.getTimeUnit())) {
200                     aborted = true;
201                     throw new InterruptedIOException("Timeout blocked waiting for output (" + timeout + ")");
202                 }
203             }
204         } catch (final InterruptedException ex) {
205             Thread.currentThread().interrupt();
206             throw new InterruptedIOException(ex.getMessage());
207         }
208     }
209 
210     private void propagateEndStream() throws IOException {
211         if (endStreamPropagated.compareAndSet(false, true)) {
212             dataStreamChannel.endStream();
213         }
214     }
215 
216 }