View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.http.nio.util;
28  
29  import java.io.IOException;
30  import java.io.InterruptedIOException;
31  import java.util.concurrent.locks.Condition;
32  import java.util.concurrent.locks.ReentrantLock;
33  
34  import org.apache.http.annotation.ThreadSafe;
35  import org.apache.http.nio.ContentDecoder;
36  import org.apache.http.nio.IOControl;
37  
38  /**
39   * Implementation of the {@link ContentInputBuffer} interface that can be
40   * shared by multiple threads, usually the I/O dispatch of an I/O reactor and
41   * a worker thread.
42   * <p>
43   * The I/O dispatch thread is expect to transfer data from {@link ContentDecoder} to the buffer
44   *   by calling {@link #consumeContent(ContentDecoder)}.
45   * <p>
46   * The worker thread is expected to read the data from the buffer by calling
47   *   {@link #read()} or {@link #read(byte[], int, int)} methods.
48   * <p>
49   * In case of an abnormal situation or when no longer needed the buffer must be shut down
50   * using {@link #shutdown()} method.
51   *
52   * @since 4.0
53   */
54  @ThreadSafe
55  public class SharedInputBuffer extends ExpandableBuffer implements ContentInputBuffer {
56  
57      private final IOControl ioctrl;
58      private final ReentrantLock lock;
59      private final Condition condition;
60  
61      private volatile boolean shutdown = false;
62      private volatile boolean endOfStream = false;
63  
64      public SharedInputBuffer(int buffersize, final IOControl ioctrl, final ByteBufferAllocator allocator) {
65          super(buffersize, allocator);
66          if (ioctrl == null) {
67              throw new IllegalArgumentException("I/O content control may not be null");
68          }
69          this.ioctrl = ioctrl;
70          this.lock = new ReentrantLock();
71          this.condition = this.lock.newCondition();
72      }
73  
74      public void reset() {
75          if (this.shutdown) {
76              return;
77          }
78          this.lock.lock();
79          try {
80              clear();
81              this.endOfStream = false;
82          } finally {
83              this.lock.unlock();
84          }
85      }
86  
87      public int consumeContent(final ContentDecoder decoder) throws IOException {
88          if (this.shutdown) {
89              return -1;
90          }
91          this.lock.lock();
92          try {
93              setInputMode();
94              int totalRead = 0;
95              int bytesRead;
96              while ((bytesRead = decoder.read(this.buffer)) > 0) {
97                  totalRead += bytesRead;
98              }
99              if (bytesRead == -1 || decoder.isCompleted()) {
100                 this.endOfStream = true;
101             }
102             if (!this.buffer.hasRemaining()) {
103                 this.ioctrl.suspendInput();
104             }
105             this.condition.signalAll();
106 
107             if (totalRead > 0) {
108                 return totalRead;
109             } else {
110                 if (this.endOfStream) {
111                     return -1;
112                 } else {
113                     return 0;
114                 }
115             }
116         } finally {
117             this.lock.unlock();
118         }
119     }
120 
121     @Override
122     public boolean hasData() {
123         this.lock.lock();
124         try {
125             return super.hasData();
126         } finally {
127             this.lock.unlock();
128         }
129     }
130 
131     @Override
132     public int available() {
133         this.lock.lock();
134         try {
135             return super.available();
136         } finally {
137             this.lock.unlock();
138         }
139     }
140 
141     @Override
142     public int capacity() {
143         this.lock.lock();
144         try {
145             return super.capacity();
146         } finally {
147             this.lock.unlock();
148         }
149     }
150 
151     @Override
152     public int length() {
153         this.lock.lock();
154         try {
155             return super.length();
156         } finally {
157             this.lock.unlock();
158         }
159     }
160 
161     protected void waitForData() throws IOException {
162         this.lock.lock();
163         try {
164             try {
165                 while (!super.hasData() && !this.endOfStream) {
166                     if (this.shutdown) {
167                         throw new InterruptedIOException("Input operation aborted");
168                     }
169                     this.ioctrl.requestInput();
170                     this.condition.await();
171                 }
172             } catch (InterruptedException ex) {
173                 throw new IOException("Interrupted while waiting for more data");
174             }
175         } finally {
176             this.lock.unlock();
177         }
178     }
179 
180     public void close() {
181         if (this.shutdown) {
182             return;
183         }
184         this.endOfStream = true;
185         this.lock.lock();
186         try {
187             this.condition.signalAll();
188         } finally {
189             this.lock.unlock();
190         }
191     }
192 
193     public void shutdown() {
194         if (this.shutdown) {
195             return;
196         }
197         this.shutdown = true;
198         this.lock.lock();
199         try {
200             this.condition.signalAll();
201         } finally {
202             this.lock.unlock();
203         }
204     }
205 
206     protected boolean isShutdown() {
207         return this.shutdown;
208     }
209 
210     protected boolean isEndOfStream() {
211         return this.shutdown || (!hasData() && this.endOfStream);
212     }
213 
214     public int read() throws IOException {
215         if (this.shutdown) {
216             return -1;
217         }
218         this.lock.lock();
219         try {
220             if (!hasData()) {
221                 waitForData();
222             }
223             if (isEndOfStream()) {
224                 return -1;
225             }
226             return this.buffer.get() & 0xff;
227         } finally {
228             this.lock.unlock();
229         }
230     }
231 
232     public int read(final byte[] b, int off, int len) throws IOException {
233         if (this.shutdown) {
234             return -1;
235         }
236         if (b == null) {
237             return 0;
238         }
239         this.lock.lock();
240         try {
241             if (!hasData()) {
242                 waitForData();
243             }
244             if (isEndOfStream()) {
245                 return -1;
246             }
247             setOutputMode();
248             int chunk = len;
249             if (chunk > this.buffer.remaining()) {
250                 chunk = this.buffer.remaining();
251             }
252             this.buffer.get(b, off, chunk);
253             return chunk;
254         } finally {
255             this.lock.unlock();
256         }
257     }
258 
259     public int read(final byte[] b) throws IOException {
260         if (this.shutdown) {
261             return -1;
262         }
263         if (b == null) {
264             return 0;
265         }
266         return read(b, 0, b.length);
267     }
268 
269 }