View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.http.nio.util;
28  
29  import java.io.IOException;
30  import java.io.InterruptedIOException;
31  import java.util.concurrent.locks.Condition;
32  import java.util.concurrent.locks.ReentrantLock;
33  
34  import org.apache.http.annotation.ThreadSafe;
35  import org.apache.http.nio.ContentDecoder;
36  import org.apache.http.nio.IOControl;
37  
38  /**
39   * Implementation of the {@link ContentInputBuffer} interface that can be
40   * shared by multiple threads, usually the I/O dispatch of an I/O reactor and
41   * a worker thread.
42   * <p>
43   * The I/O dispatch thread is expect to transfer data from {@link ContentDecoder} to the buffer
44   *   by calling {@link #consumeContent(ContentDecoder)}.
45   * <p>
46   * The worker thread is expected to read the data from the buffer by calling
47   *   {@link #read()} or {@link #read(byte[], int, int)} methods.
48   * <p>
49   * In case of an abnormal situation or when no longer needed the buffer must be shut down
50   * using {@link #shutdown()} method.
51   *
52   * @since 4.0
53   */
54  @ThreadSafe
55  public class SharedInputBuffer extends ExpandableBuffer implements ContentInputBuffer {
56  
57      private final ReentrantLock lock;
58      private final Condition condition;
59  
60      private volatile IOControl ioctrl;
61      private volatile boolean shutdown = false;
62      private volatile boolean endOfStream = false;
63  
64      /**
65       * @deprecated (4.3) use {@link SharedInputBuffer#SharedInputBuffer(int, ByteBufferAllocator)}
66       */
67      @Deprecated
68      public SharedInputBuffer(final int buffersize, final IOControl ioctrl, final ByteBufferAllocator allocator) {
69          super(buffersize, allocator);
70          this.ioctrl = ioctrl;
71          this.lock = new ReentrantLock();
72          this.condition = this.lock.newCondition();
73      }
74  
75      /**
76       * @since 4.3
77       */
78      public SharedInputBuffer(final int buffersize, final ByteBufferAllocator allocator) {
79          super(buffersize, allocator);
80          this.lock = new ReentrantLock();
81          this.condition = this.lock.newCondition();
82      }
83  
84      /**
85       * @since 4.3
86       */
87      public SharedInputBuffer(final int buffersize) {
88          this(buffersize, HeapByteBufferAllocator.INSTANCE);
89      }
90  
91      @Override
92      public void reset() {
93          if (this.shutdown) {
94              return;
95          }
96          this.lock.lock();
97          try {
98              clear();
99              this.endOfStream = false;
100         } finally {
101             this.lock.unlock();
102         }
103     }
104 
105     /**
106      * @deprecated (4.3) use {@link #consumeContent(ContentDecoder, IOControl)}
107      */
108     @Override
109     @Deprecated
110     public int consumeContent(final ContentDecoder decoder) throws IOException {
111         return consumeContent(decoder, null);
112     }
113 
114     /**
115      * @since 4.3
116      */
117     public int consumeContent(final ContentDecoder decoder, final IOControl ioctrl) throws IOException {
118         if (this.shutdown) {
119             return -1;
120         }
121         this.lock.lock();
122         try {
123             if (ioctrl != null) {
124                 this.ioctrl = ioctrl;
125             }
126             setInputMode();
127             int totalRead = 0;
128             int bytesRead;
129             while ((bytesRead = decoder.read(this.buffer)) > 0) {
130                 totalRead += bytesRead;
131             }
132             if (bytesRead == -1 || decoder.isCompleted()) {
133                 this.endOfStream = true;
134             }
135             if (!this.buffer.hasRemaining()) {
136                 if (this.ioctrl != null) {
137                     this.ioctrl.suspendInput();
138                 }
139             }
140             this.condition.signalAll();
141 
142             if (totalRead > 0) {
143                 return totalRead;
144             } else {
145                 if (this.endOfStream) {
146                     return -1;
147                 } else {
148                     return 0;
149                 }
150             }
151         } finally {
152             this.lock.unlock();
153         }
154     }
155 
156     @Override
157     public boolean hasData() {
158         this.lock.lock();
159         try {
160             return super.hasData();
161         } finally {
162             this.lock.unlock();
163         }
164     }
165 
166     @Override
167     public int available() {
168         this.lock.lock();
169         try {
170             return super.available();
171         } finally {
172             this.lock.unlock();
173         }
174     }
175 
176     @Override
177     public int capacity() {
178         this.lock.lock();
179         try {
180             return super.capacity();
181         } finally {
182             this.lock.unlock();
183         }
184     }
185 
186     @Override
187     public int length() {
188         this.lock.lock();
189         try {
190             return super.length();
191         } finally {
192             this.lock.unlock();
193         }
194     }
195 
196     protected void waitForData() throws IOException {
197         this.lock.lock();
198         try {
199             try {
200                 while (!super.hasData() && !this.endOfStream) {
201                     if (this.shutdown) {
202                         throw new InterruptedIOException("Input operation aborted");
203                     }
204                     if (this.ioctrl != null) {
205                         this.ioctrl.requestInput();
206                     }
207                     this.condition.await();
208                 }
209             } catch (final InterruptedException ex) {
210                 throw new IOException("Interrupted while waiting for more data");
211             }
212         } finally {
213             this.lock.unlock();
214         }
215     }
216 
217     public void close() {
218         if (this.shutdown) {
219             return;
220         }
221         this.endOfStream = true;
222         this.lock.lock();
223         try {
224             this.condition.signalAll();
225         } finally {
226             this.lock.unlock();
227         }
228     }
229 
230     public void shutdown() {
231         if (this.shutdown) {
232             return;
233         }
234         this.shutdown = true;
235         this.lock.lock();
236         try {
237             this.condition.signalAll();
238         } finally {
239             this.lock.unlock();
240         }
241     }
242 
243     protected boolean isShutdown() {
244         return this.shutdown;
245     }
246 
247     protected boolean isEndOfStream() {
248         return this.shutdown || (!hasData() && this.endOfStream);
249     }
250 
251     @Override
252     public int read() throws IOException {
253         if (this.shutdown) {
254             return -1;
255         }
256         this.lock.lock();
257         try {
258             if (!hasData()) {
259                 waitForData();
260             }
261             if (isEndOfStream()) {
262                 return -1;
263             }
264             return this.buffer.get() & 0xff;
265         } finally {
266             this.lock.unlock();
267         }
268     }
269 
270     @Override
271     public int read(final byte[] b, final int off, final int len) throws IOException {
272         if (this.shutdown) {
273             return -1;
274         }
275         if (b == null) {
276             return 0;
277         }
278         this.lock.lock();
279         try {
280             if (!hasData()) {
281                 waitForData();
282             }
283             if (isEndOfStream()) {
284                 return -1;
285             }
286             setOutputMode();
287             int chunk = len;
288             if (chunk > this.buffer.remaining()) {
289                 chunk = this.buffer.remaining();
290             }
291             this.buffer.get(b, off, chunk);
292             return chunk;
293         } finally {
294             this.lock.unlock();
295         }
296     }
297 
298     public int read(final byte[] b) throws IOException {
299         if (this.shutdown) {
300             return -1;
301         }
302         if (b == null) {
303             return 0;
304         }
305         return read(b, 0, b.length);
306     }
307 
308 }