View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.http.nio.util;
28  
29  import java.io.IOException;
30  import java.io.InterruptedIOException;
31  import java.util.concurrent.locks.Condition;
32  import java.util.concurrent.locks.ReentrantLock;
33  
34  import org.apache.http.annotation.ThreadSafe;
35  import org.apache.http.nio.ContentDecoder;
36  import org.apache.http.nio.IOControl;
37  
38  /**
39   * Implementation of the {@link ContentInputBuffer} interface that can be
40   * shared by multiple threads, usually the I/O dispatch of an I/O reactor and
41   * a worker thread.
42   * <p>
43   * The I/O dispatch thread is expect to transfer data from {@link ContentDecoder} to the buffer
44   *   by calling {@link #consumeContent(ContentDecoder)}.
45   * <p>
46   * The worker thread is expected to read the data from the buffer by calling
47   *   {@link #read()} or {@link #read(byte[], int, int)} methods.
48   * <p>
49   * In case of an abnormal situation or when no longer needed the buffer must be shut down
50   * using {@link #shutdown()} method.
51   *
52   * @since 4.0
53   */
54  @ThreadSafe
55  public class SharedInputBuffer extends ExpandableBuffer implements ContentInputBuffer {
56  
57      private final ReentrantLock lock;
58      private final Condition condition;
59  
60      private volatile IOControl ioctrl;
61      private volatile boolean shutdown = false;
62      private volatile boolean endOfStream = false;
63  
64      /**
65       * @deprecated (4.3) use {@link SharedInputBuffer#SharedInputBuffer(int, ByteBufferAllocator)}
66       */
67      @Deprecated
68      public SharedInputBuffer(final int buffersize, final IOControl ioctrl, final ByteBufferAllocator allocator) {
69          super(buffersize, allocator);
70          this.ioctrl = ioctrl;
71          this.lock = new ReentrantLock();
72          this.condition = this.lock.newCondition();
73      }
74  
75      /**
76       * @since 4.3
77       */
78      public SharedInputBuffer(final int buffersize, final ByteBufferAllocator allocator) {
79          super(buffersize, allocator);
80          this.lock = new ReentrantLock();
81          this.condition = this.lock.newCondition();
82      }
83  
84      /**
85       * @since 4.3
86       */
87      public SharedInputBuffer(final int buffersize) {
88          this(buffersize, HeapByteBufferAllocator.INSTANCE);
89      }
90  
91      public void reset() {
92          if (this.shutdown) {
93              return;
94          }
95          this.lock.lock();
96          try {
97              clear();
98              this.endOfStream = false;
99          } finally {
100             this.lock.unlock();
101         }
102     }
103 
104     /**
105      * @deprecated (4.3) use {@link #consumeContent(ContentDecoder, IOControl)}
106      */
107     @Deprecated
108     public int consumeContent(final ContentDecoder decoder) throws IOException {
109         return consumeContent(decoder, null);
110     }
111 
112     /**
113      * @since 4.3
114      */
115     public int consumeContent(final ContentDecoder decoder, final IOControl ioctrl) throws IOException {
116         if (this.shutdown) {
117             return -1;
118         }
119         this.lock.lock();
120         try {
121             if (ioctrl != null) {
122                 this.ioctrl = ioctrl;
123             }
124             setInputMode();
125             int totalRead = 0;
126             int bytesRead;
127             while ((bytesRead = decoder.read(this.buffer)) > 0) {
128                 totalRead += bytesRead;
129             }
130             if (bytesRead == -1 || decoder.isCompleted()) {
131                 this.endOfStream = true;
132             }
133             if (!this.buffer.hasRemaining()) {
134                 if (this.ioctrl != null) {
135                     this.ioctrl.suspendInput();
136                 }
137             }
138             this.condition.signalAll();
139 
140             if (totalRead > 0) {
141                 return totalRead;
142             } else {
143                 if (this.endOfStream) {
144                     return -1;
145                 } else {
146                     return 0;
147                 }
148             }
149         } finally {
150             this.lock.unlock();
151         }
152     }
153 
154     @Override
155     public boolean hasData() {
156         this.lock.lock();
157         try {
158             return super.hasData();
159         } finally {
160             this.lock.unlock();
161         }
162     }
163 
164     @Override
165     public int available() {
166         this.lock.lock();
167         try {
168             return super.available();
169         } finally {
170             this.lock.unlock();
171         }
172     }
173 
174     @Override
175     public int capacity() {
176         this.lock.lock();
177         try {
178             return super.capacity();
179         } finally {
180             this.lock.unlock();
181         }
182     }
183 
184     @Override
185     public int length() {
186         this.lock.lock();
187         try {
188             return super.length();
189         } finally {
190             this.lock.unlock();
191         }
192     }
193 
194     protected void waitForData() throws IOException {
195         this.lock.lock();
196         try {
197             try {
198                 while (!super.hasData() && !this.endOfStream) {
199                     if (this.shutdown) {
200                         throw new InterruptedIOException("Input operation aborted");
201                     }
202                     if (this.ioctrl != null) {
203                         this.ioctrl.requestInput();
204                     }
205                     this.condition.await();
206                 }
207             } catch (final InterruptedException ex) {
208                 throw new IOException("Interrupted while waiting for more data");
209             }
210         } finally {
211             this.lock.unlock();
212         }
213     }
214 
215     public void close() {
216         if (this.shutdown) {
217             return;
218         }
219         this.endOfStream = true;
220         this.lock.lock();
221         try {
222             this.condition.signalAll();
223         } finally {
224             this.lock.unlock();
225         }
226     }
227 
228     public void shutdown() {
229         if (this.shutdown) {
230             return;
231         }
232         this.shutdown = true;
233         this.lock.lock();
234         try {
235             this.condition.signalAll();
236         } finally {
237             this.lock.unlock();
238         }
239     }
240 
241     protected boolean isShutdown() {
242         return this.shutdown;
243     }
244 
245     protected boolean isEndOfStream() {
246         return this.shutdown || (!hasData() && this.endOfStream);
247     }
248 
249     public int read() throws IOException {
250         if (this.shutdown) {
251             return -1;
252         }
253         this.lock.lock();
254         try {
255             if (!hasData()) {
256                 waitForData();
257             }
258             if (isEndOfStream()) {
259                 return -1;
260             }
261             return this.buffer.get() & 0xff;
262         } finally {
263             this.lock.unlock();
264         }
265     }
266 
267     public int read(final byte[] b, final int off, final int len) throws IOException {
268         if (this.shutdown) {
269             return -1;
270         }
271         if (b == null) {
272             return 0;
273         }
274         this.lock.lock();
275         try {
276             if (!hasData()) {
277                 waitForData();
278             }
279             if (isEndOfStream()) {
280                 return -1;
281             }
282             setOutputMode();
283             int chunk = len;
284             if (chunk > this.buffer.remaining()) {
285                 chunk = this.buffer.remaining();
286             }
287             this.buffer.get(b, off, chunk);
288             return chunk;
289         } finally {
290             this.lock.unlock();
291         }
292     }
293 
294     public int read(final byte[] b) throws IOException {
295         if (this.shutdown) {
296             return -1;
297         }
298         if (b == null) {
299             return 0;
300         }
301         return read(b, 0, b.length);
302     }
303 
304 }