View Javadoc

1   /*
2    * ====================================================================
3    *
4    *  Licensed to the Apache Software Foundation (ASF) under one or more
5    *  contributor license agreements.  See the NOTICE file distributed with
6    *  this work for additional information regarding copyright ownership.
7    *  The ASF licenses this file to You under the Apache License, Version 2.0
8    *  (the "License"); you may not use this file except in compliance with
9    *  the License.  You may obtain a copy of the License at
10   *
11   *      http://www.apache.org/licenses/LICENSE-2.0
12   *
13   *  Unless required by applicable law or agreed to in writing, software
14   *  distributed under the License is distributed on an "AS IS" BASIS,
15   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   *  See the License for the specific language governing permissions and
17   *  limitations under the License.
18   * ====================================================================
19   *
20   * This software consists of voluntary contributions made by many
21   * individuals on behalf of the Apache Software Foundation.  For more
22   * information on the Apache Software Foundation, please see
23   * <http://www.apache.org/>.
24   *
25   */
26  
27  package org.apache.http.conn;
28  
29  import java.io.InputStream;
30  import java.io.IOException;
31  
32  import org.apache.http.annotation.NotThreadSafe;
33  
34  /**
35   * A stream wrapper that triggers actions on {@link #close close()} and EOF.
36   * Primarily used to auto-release an underlying
37   * {@link ManagedClientConnection connection}
38   * when the response body is consumed or no longer needed.
39   * <p>
40   * This class is based on <code>AutoCloseInputStream</code> in HttpClient 3.1,
41   * but has notable differences. It does not allow mark/reset, distinguishes
42   * different kinds of event, and does not always close the underlying stream
43   * on EOF. That decision is left to the {@link EofSensorWatcher watcher}.
44   *
45   * @see EofSensorWatcher
46   *
47   * @since 4.0
48   */
49  // don't use FilterInputStream as the base class, we'd have to
50  // override markSupported(), mark(), and reset() to disable them
51  @NotThreadSafe
52  public class EofSensorInputStream extends InputStream implements ConnectionReleaseTrigger {
53  
54      /**
55       * The wrapped input stream, while accessible.
56       * The value changes to <code>null</code> when the wrapped stream
57       * becomes inaccessible.
58       */
59      protected InputStream wrappedStream;
60  
61      /**
62       * Indicates whether this stream itself is closed.
63       * If it isn't, but {@link #wrappedStream wrappedStream}
64       * is <code>null</code>, we're running in EOF mode.
65       * All read operations will indicate EOF without accessing
66       * the underlying stream. After closing this stream, read
67       * operations will trigger an {@link IOException IOException}.
68       *
69       * @see #isReadAllowed isReadAllowed
70       */
71      private boolean selfClosed;
72  
73      /** The watcher to be notified, if any. */
74      private final EofSensorWatcher eofWatcher;
75  
76      /**
77       * Creates a new EOF sensor.
78       * If no watcher is passed, the underlying stream will simply be
79       * closed when EOF is detected or {@link #close close} is called.
80       * Otherwise, the watcher decides whether the underlying stream
81       * should be closed before detaching from it.
82       *
83       * @param in        the wrapped stream
84       * @param watcher   the watcher for events, or <code>null</code> for
85       *                  auto-close behavior without notification
86       */
87      public EofSensorInputStream(final InputStream in,
88                                  final EofSensorWatcher watcher) {
89          if (in == null) {
90              throw new IllegalArgumentException
91                  ("Wrapped stream may not be null.");
92          }
93  
94          wrappedStream = in;
95          selfClosed = false;
96          eofWatcher = watcher;
97      }
98  
99      /**
100      * Checks whether the underlying stream can be read from.
101      *
102      * @return  <code>true</code> if the underlying stream is accessible,
103      *          <code>false</code> if this stream is in EOF mode and
104      *          detached from the underlying stream
105      *
106      * @throws IOException      if this stream is already closed
107      */
108     protected boolean isReadAllowed() throws IOException {
109         if (selfClosed) {
110             throw new IOException("Attempted read on closed stream.");
111         }
112         return (wrappedStream != null);
113     }
114 
115     @Override
116     public int read() throws IOException {
117         int l = -1;
118 
119         if (isReadAllowed()) {
120             try {
121                 l = wrappedStream.read();
122                 checkEOF(l);
123             } catch (IOException ex) {
124                 checkAbort();
125                 throw ex;
126             }
127         }
128 
129         return l;
130     }
131 
132     @Override
133     public int read(byte[] b, int off, int len) throws IOException {
134         int l = -1;
135 
136         if (isReadAllowed()) {
137             try {
138                 l = wrappedStream.read(b,  off,  len);
139                 checkEOF(l);
140             } catch (IOException ex) {
141                 checkAbort();
142                 throw ex;
143             }
144         }
145 
146         return l;
147     }
148 
149     @Override
150     public int read(byte[] b) throws IOException {
151         int l = -1;
152 
153         if (isReadAllowed()) {
154             try {
155                 l = wrappedStream.read(b);
156                 checkEOF(l);
157             } catch (IOException ex) {
158                 checkAbort();
159                 throw ex;
160             }
161         }
162         return l;
163     }
164 
165     @Override
166     public int available() throws IOException {
167         int a = 0; // not -1
168 
169         if (isReadAllowed()) {
170             try {
171                 a = wrappedStream.available();
172                 // no checkEOF() here, available() can't trigger EOF
173             } catch (IOException ex) {
174                 checkAbort();
175                 throw ex;
176             }
177         }
178 
179         return a;
180     }
181 
182     @Override
183     public void close() throws IOException {
184         // tolerate multiple calls to close()
185         selfClosed = true;
186         checkClose();
187     }
188 
189     /**
190      * Detects EOF and notifies the watcher.
191      * This method should only be called while the underlying stream is
192      * still accessible. Use {@link #isReadAllowed isReadAllowed} to
193      * check that condition.
194      * <br/>
195      * If EOF is detected, the watcher will be notified and this stream
196      * is detached from the underlying stream. This prevents multiple
197      * notifications from this stream.
198      *
199      * @param eof       the result of the calling read operation.
200      *                  A negative value indicates that EOF is reached.
201      *
202      * @throws IOException
203      *          in case of an IO problem on closing the underlying stream
204      */
205     protected void checkEOF(int eof) throws IOException {
206 
207         if ((wrappedStream != null) && (eof < 0)) {
208             try {
209                 boolean scws = true; // should close wrapped stream?
210                 if (eofWatcher != null)
211                     scws = eofWatcher.eofDetected(wrappedStream);
212                 if (scws)
213                     wrappedStream.close();
214             } finally {
215                 wrappedStream = null;
216             }
217         }
218     }
219 
220     /**
221      * Detects stream close and notifies the watcher.
222      * There's not much to detect since this is called by {@link #close close}.
223      * The watcher will only be notified if this stream is closed
224      * for the first time and before EOF has been detected.
225      * This stream will be detached from the underlying stream to prevent
226      * multiple notifications to the watcher.
227      *
228      * @throws IOException
229      *          in case of an IO problem on closing the underlying stream
230      */
231     protected void checkClose() throws IOException {
232 
233         if (wrappedStream != null) {
234             try {
235                 boolean scws = true; // should close wrapped stream?
236                 if (eofWatcher != null)
237                     scws = eofWatcher.streamClosed(wrappedStream);
238                 if (scws)
239                     wrappedStream.close();
240             } finally {
241                 wrappedStream = null;
242             }
243         }
244     }
245 
246     /**
247      * Detects stream abort and notifies the watcher.
248      * There's not much to detect since this is called by
249      * {@link #abortConnection abortConnection}.
250      * The watcher will only be notified if this stream is aborted
251      * for the first time and before EOF has been detected or the
252      * stream has been {@link #close closed} gracefully.
253      * This stream will be detached from the underlying stream to prevent
254      * multiple notifications to the watcher.
255      *
256      * @throws IOException
257      *          in case of an IO problem on closing the underlying stream
258      */
259     protected void checkAbort() throws IOException {
260 
261         if (wrappedStream != null) {
262             try {
263                 boolean scws = true; // should close wrapped stream?
264                 if (eofWatcher != null)
265                     scws = eofWatcher.streamAbort(wrappedStream);
266                 if (scws)
267                     wrappedStream.close();
268             } finally {
269                 wrappedStream = null;
270             }
271         }
272     }
273 
274     /**
275      * Same as {@link #close close()}.
276      */
277     public void releaseConnection() throws IOException {
278         close();
279     }
280 
281     /**
282      * Aborts this stream.
283      * This is a special version of {@link #close close()} which prevents
284      * re-use of the underlying connection, if any. Calling this method
285      * indicates that there should be no attempt to read until the end of
286      * the stream.
287      */
288     public void abortConnection() throws IOException {
289         // tolerate multiple calls
290         selfClosed = true;
291         checkAbort();
292     }
293 
294 }
295