View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.http.nio.util;
28  
29  import java.io.IOException;
30  import java.io.InterruptedIOException;
31  import java.util.concurrent.locks.Condition;
32  import java.util.concurrent.locks.ReentrantLock;
33  
34  import org.apache.http.annotation.ThreadingBehavior;
35  import org.apache.http.annotation.Contract;
36  import org.apache.http.nio.ContentDecoder;
37  import org.apache.http.nio.IOControl;
38  
39  /**
40   * Implementation of the {@link ContentInputBuffer} interface that can be
41   * shared by multiple threads, usually the I/O dispatch of an I/O reactor and
42   * a worker thread.
43   * <p>
44   * The I/O dispatch thread is expect to transfer data from {@link ContentDecoder} to the buffer
45   *   by calling {@link #consumeContent(ContentDecoder)}.
46   * <p>
47   * The worker thread is expected to read the data from the buffer by calling
48   *   {@link #read()} or {@link #read(byte[], int, int)} methods.
49   * <p>
50   * In case of an abnormal situation or when no longer needed the buffer must be shut down
51   * using {@link #shutdown()} method.
52   *
53   * @since 4.0
54   */
55  @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
56  public class SharedInputBuffer extends ExpandableBuffer implements ContentInputBuffer {
57  
58      private final ReentrantLock lock;
59      private final Condition condition;
60  
61      private volatile IOControl ioctrl;
62      private volatile boolean shutdown = false;
63      private volatile boolean endOfStream = false;
64  
65      /**
66       * @deprecated (4.3) use {@link SharedInputBuffer#SharedInputBuffer(int, ByteBufferAllocator)}
67       */
68      @Deprecated
69      public SharedInputBuffer(final int buffersize, final IOControl ioctrl, final ByteBufferAllocator allocator) {
70          super(buffersize, allocator);
71          this.ioctrl = ioctrl;
72          this.lock = new ReentrantLock();
73          this.condition = this.lock.newCondition();
74      }
75  
76      /**
77       * @since 4.3
78       */
79      public SharedInputBuffer(final int buffersize, final ByteBufferAllocator allocator) {
80          super(buffersize, allocator);
81          this.lock = new ReentrantLock();
82          this.condition = this.lock.newCondition();
83      }
84  
85      /**
86       * @since 4.3
87       */
88      public SharedInputBuffer(final int buffersize) {
89          this(buffersize, HeapByteBufferAllocator.INSTANCE);
90      }
91  
92      @Override
93      public void reset() {
94          if (this.shutdown) {
95              return;
96          }
97          this.lock.lock();
98          try {
99              clear();
100             this.endOfStream = false;
101         } finally {
102             this.lock.unlock();
103         }
104     }
105 
106     /**
107      * @deprecated (4.3) use {@link #consumeContent(ContentDecoder, IOControl)}
108      */
109     @Override
110     @Deprecated
111     public int consumeContent(final ContentDecoder decoder) throws IOException {
112         return consumeContent(decoder, null);
113     }
114 
115     /**
116      * @since 4.3
117      */
118     public int consumeContent(final ContentDecoder decoder, final IOControl ioctrl) throws IOException {
119         if (this.shutdown) {
120             return -1;
121         }
122         this.lock.lock();
123         try {
124             if (ioctrl != null) {
125                 this.ioctrl = ioctrl;
126             }
127             setInputMode();
128             int totalRead = 0;
129             int bytesRead;
130             while ((bytesRead = decoder.read(this.buffer)) > 0) {
131                 totalRead += bytesRead;
132             }
133             if (bytesRead == -1 || decoder.isCompleted()) {
134                 this.endOfStream = true;
135             }
136             if (!this.buffer.hasRemaining()) {
137                 if (this.ioctrl != null) {
138                     this.ioctrl.suspendInput();
139                 }
140             }
141             this.condition.signalAll();
142 
143             if (totalRead > 0) {
144                 return totalRead;
145             } else {
146                 if (this.endOfStream) {
147                     return -1;
148                 } else {
149                     return 0;
150                 }
151             }
152         } finally {
153             this.lock.unlock();
154         }
155     }
156 
157     @Override
158     public boolean hasData() {
159         this.lock.lock();
160         try {
161             return super.hasData();
162         } finally {
163             this.lock.unlock();
164         }
165     }
166 
167     @Override
168     public int available() {
169         this.lock.lock();
170         try {
171             return super.available();
172         } finally {
173             this.lock.unlock();
174         }
175     }
176 
177     @Override
178     public int capacity() {
179         this.lock.lock();
180         try {
181             return super.capacity();
182         } finally {
183             this.lock.unlock();
184         }
185     }
186 
187     @Override
188     public int length() {
189         this.lock.lock();
190         try {
191             return super.length();
192         } finally {
193             this.lock.unlock();
194         }
195     }
196 
197     protected void waitForData() throws IOException {
198         this.lock.lock();
199         try {
200             try {
201                 while (!super.hasData() && !this.endOfStream) {
202                     if (this.shutdown) {
203                         throw new InterruptedIOException("Input operation aborted");
204                     }
205                     if (this.ioctrl != null) {
206                         this.ioctrl.requestInput();
207                     }
208                     this.condition.await();
209                 }
210             } catch (final InterruptedException ex) {
211                 throw new IOException("Interrupted while waiting for more data");
212             }
213         } finally {
214             this.lock.unlock();
215         }
216     }
217 
218     public void close() {
219         if (this.shutdown) {
220             return;
221         }
222         this.endOfStream = true;
223         this.lock.lock();
224         try {
225             this.condition.signalAll();
226         } finally {
227             this.lock.unlock();
228         }
229     }
230 
231     public void shutdown() {
232         if (this.shutdown) {
233             return;
234         }
235         this.shutdown = true;
236         this.lock.lock();
237         try {
238             this.condition.signalAll();
239         } finally {
240             this.lock.unlock();
241         }
242     }
243 
244     protected boolean isShutdown() {
245         return this.shutdown;
246     }
247 
248     protected boolean isEndOfStream() {
249         return this.shutdown || (!hasData() && this.endOfStream);
250     }
251 
252     @Override
253     public int read() throws IOException {
254         if (this.shutdown) {
255             return -1;
256         }
257         this.lock.lock();
258         try {
259             if (!hasData()) {
260                 waitForData();
261             }
262             if (isEndOfStream()) {
263                 return -1;
264             }
265             return this.buffer.get() & 0xff;
266         } finally {
267             this.lock.unlock();
268         }
269     }
270 
271     @Override
272     public int read(final byte[] b, final int off, final int len) throws IOException {
273         if (this.shutdown) {
274             return -1;
275         }
276         if (b == null) {
277             return 0;
278         }
279         this.lock.lock();
280         try {
281             if (!hasData()) {
282                 waitForData();
283             }
284             if (isEndOfStream()) {
285                 return -1;
286             }
287             setOutputMode();
288             int chunk = len;
289             if (chunk > this.buffer.remaining()) {
290                 chunk = this.buffer.remaining();
291             }
292             this.buffer.get(b, off, chunk);
293             return chunk;
294         } finally {
295             this.lock.unlock();
296         }
297     }
298 
299     public int read(final byte[] b) throws IOException {
300         if (this.shutdown) {
301             return -1;
302         }
303         if (b == null) {
304             return 0;
305         }
306         return read(b, 0, b.length);
307     }
308 
309 }