View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.http.nio.entity;
28  
29  import java.io.IOException;
30  import java.io.InterruptedIOException;
31  import java.nio.ByteBuffer;
32  import java.util.concurrent.locks.ReentrantLock;
33  
34  import org.apache.hc.core5.annotation.Contract;
35  import org.apache.hc.core5.annotation.ThreadingBehavior;
36  import org.apache.hc.core5.http.nio.DataStreamChannel;
37  
38  /**
39   * @since 5.0
40   */
41  @Contract(threading = ThreadingBehavior.SAFE)
42  public final class SharedOutputBuffer extends AbstractSharedBuffer implements ContentOutputBuffer {
43  
44      private volatile DataStreamChannel dataStreamChannel;
45      private volatile boolean hasCapacity;
46  
47      public SharedOutputBuffer(final ReentrantLock lock, final int initialBufferSize) {
48          super(lock, initialBufferSize);
49          this.hasCapacity = false;
50      }
51  
52      public SharedOutputBuffer(final int buffersize) {
53          this(new ReentrantLock(), buffersize);
54      }
55  
56      public void flush(final DataStreamChannel channel) throws IOException {
57          lock.lock();
58          try {
59              dataStreamChannel = channel;
60              hasCapacity = true;
61              setOutputMode();
62              if (buffer().hasRemaining()) {
63                  dataStreamChannel.write(buffer());
64              }
65              if (!buffer().hasRemaining() && endStream) {
66                  dataStreamChannel.endStream();
67              }
68              condition.signalAll();
69          } finally {
70              lock.unlock();
71          }
72      }
73  
74      private void ensureNotAborted() throws InterruptedIOException {
75          if (aborted) {
76              throw new InterruptedIOException("Operation aborted");
77          }
78      }
79  
80      @Override
81      public void write(final byte[] b, final int off, final int len) throws IOException {
82          final ByteBuffer src = ByteBuffer.wrap(b, off, len);
83          lock.lock();
84          try {
85              ensureNotAborted();
86              setInputMode();
87              while (src.hasRemaining()) {
88                  // always buffer small chunks
89                  if (src.remaining() < 1024 && buffer().remaining() > src.remaining()) {
90                      buffer().put(src);
91                  } else {
92                      if (buffer().position() > 0 || dataStreamChannel == null) {
93                          waitFlush();
94                      }
95                      if (buffer().position() == 0 && dataStreamChannel != null) {
96                          final int bytesWritten = dataStreamChannel.write(src);
97                          if (bytesWritten == 0) {
98                              hasCapacity = false;
99                              waitFlush();
100                         }
101                     }
102                 }
103             }
104         } finally {
105             lock.unlock();
106         }
107     }
108 
109     @Override
110     public void write(final int b) throws IOException {
111         lock.lock();
112         try {
113             ensureNotAborted();
114             setInputMode();
115             if (!buffer().hasRemaining()) {
116                 waitFlush();
117             }
118             buffer().put((byte)b);
119         } finally {
120             lock.unlock();
121         }
122     }
123 
124     @Override
125     public void writeCompleted() throws IOException {
126         if (endStream) {
127             return;
128         }
129         lock.lock();
130         try {
131             if (!endStream) {
132                 endStream = true;
133                 if (dataStreamChannel != null) {
134                     setOutputMode();
135                     if (buffer().hasRemaining()) {
136                         dataStreamChannel.requestOutput();
137                     } else {
138                         dataStreamChannel.endStream();
139                     }
140                 }
141             }
142         } finally {
143             lock.unlock();
144         }
145     }
146 
147     private void waitFlush() throws InterruptedIOException {
148         setOutputMode();
149         if (dataStreamChannel != null) {
150             dataStreamChannel.requestOutput();
151         }
152         ensureNotAborted();
153         while (buffer().hasRemaining() || !hasCapacity) {
154             try {
155                 condition.await();
156             } catch (final InterruptedException ex) {
157                 Thread.currentThread().interrupt();
158                 throw new InterruptedIOException(ex.getMessage());
159             }
160             ensureNotAborted();
161         }
162         setInputMode();
163     }
164 
165 }