View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.http.conn;
28  
29  import java.io.IOException;
30  import java.io.InputStream;
31  
32  import org.apache.http.util.Args;
33  
34  /**
35   * A stream wrapper that triggers actions on {@link #close close()} and EOF.
36   * Primarily used to auto-release an underlying managed connection when the response
37   * body is consumed or no longer needed.
38   *
39   * @see EofSensorWatcher
40   *
41   * @since 4.0
42   */
43  // don't use FilterInputStream as the base class, we'd have to
44  // override markSupported(), mark(), and reset() to disable them
45  public class EofSensorInputStream extends InputStream implements ConnectionReleaseTrigger {
46  
47      /**
48       * The wrapped input stream, while accessible.
49       * The value changes to {@code null} when the wrapped stream
50       * becomes inaccessible.
51       */
52      protected InputStream wrappedStream;
53  
54      /**
55       * Indicates whether this stream itself is closed.
56       * If it isn't, but {@link #wrappedStream wrappedStream}
57       * is {@code null}, we're running in EOF mode.
58       * All read operations will indicate EOF without accessing
59       * the underlying stream. After closing this stream, read
60       * operations will trigger an {@link IOException IOException}.
61       *
62       * @see #isReadAllowed isReadAllowed
63       */
64      private boolean selfClosed;
65  
66      /** The watcher to be notified, if any. */
67      private final EofSensorWatcher eofWatcher;
68  
69      /**
70       * Creates a new EOF sensor.
71       * If no watcher is passed, the underlying stream will simply be
72       * closed when EOF is detected or {@link #close close} is called.
73       * Otherwise, the watcher decides whether the underlying stream
74       * should be closed before detaching from it.
75       *
76       * @param in        the wrapped stream
77       * @param watcher   the watcher for events, or {@code null} for
78       *                  auto-close behavior without notification
79       */
80      public EofSensorInputStream(final InputStream in,
81                                  final EofSensorWatcher watcher) {
82          Args.notNull(in, "Wrapped stream");
83          wrappedStream = in;
84          selfClosed = false;
85          eofWatcher = watcher;
86      }
87  
88      boolean isSelfClosed() {
89          return selfClosed;
90      }
91  
92      InputStream getWrappedStream() {
93          return wrappedStream;
94      }
95  
96      /**
97       * Checks whether the underlying stream can be read from.
98       *
99       * @return  {@code true} if the underlying stream is accessible,
100      *          {@code false} if this stream is in EOF mode and
101      *          detached from the underlying stream
102      *
103      * @throws IOException      if this stream is already closed
104      */
105     protected boolean isReadAllowed() throws IOException {
106         if (selfClosed) {
107             throw new IOException("Attempted read on closed stream.");
108         }
109         return (wrappedStream != null);
110     }
111 
112     @Override
113     public int read() throws IOException {
114         int readLen = -1;
115 
116         if (isReadAllowed()) {
117             try {
118                 readLen = wrappedStream.read();
119                 checkEOF(readLen);
120             } catch (final IOException ex) {
121                 checkAbort();
122                 throw ex;
123             }
124         }
125 
126         return readLen;
127     }
128 
129     @Override
130     public int read(final byte[] b, final int off, final int len) throws IOException {
131         int readLen = -1;
132 
133         if (isReadAllowed()) {
134             try {
135                 readLen = wrappedStream.read(b,  off,  len);
136                 checkEOF(readLen);
137             } catch (final IOException ex) {
138                 checkAbort();
139                 throw ex;
140             }
141         }
142 
143         return readLen;
144     }
145 
146     @Override
147     public int read(final byte[] b) throws IOException {
148         return read(b, 0, b.length);
149     }
150 
151     @Override
152     public int available() throws IOException {
153         int a = 0; // not -1
154 
155         if (isReadAllowed()) {
156             try {
157                 a = wrappedStream.available();
158                 // no checkEOF() here, available() can't trigger EOF
159             } catch (final IOException ex) {
160                 checkAbort();
161                 throw ex;
162             }
163         }
164 
165         return a;
166     }
167 
168     @Override
169     public void close() throws IOException {
170         // tolerate multiple calls to close()
171         selfClosed = true;
172         checkClose();
173     }
174 
175     /**
176      * Detects EOF and notifies the watcher.
177      * This method should only be called while the underlying stream is
178      * still accessible. Use {@link #isReadAllowed isReadAllowed} to
179      * check that condition.
180      * <p>
181      * If EOF is detected, the watcher will be notified and this stream
182      * is detached from the underlying stream. This prevents multiple
183      * notifications from this stream.
184      * </p>
185      *
186      * @param eof       the result of the calling read operation.
187      *                  A negative value indicates that EOF is reached.
188      *
189      * @throws IOException
190      *          in case of an IO problem on closing the underlying stream
191      */
192     protected void checkEOF(final int eof) throws IOException {
193 
194         final InputStream toCheckStream = wrappedStream;
195         if ((toCheckStream != null) && (eof < 0)) {
196             try {
197                 boolean scws = true; // should close wrapped stream?
198                 if (eofWatcher != null) {
199                     scws = eofWatcher.eofDetected(toCheckStream);
200                 }
201                 if (scws) {
202                     toCheckStream.close();
203                 }
204             } finally {
205                 wrappedStream = null;
206             }
207         }
208     }
209 
210     /**
211      * Detects stream close and notifies the watcher.
212      * There's not much to detect since this is called by {@link #close close}.
213      * The watcher will only be notified if this stream is closed
214      * for the first time and before EOF has been detected.
215      * This stream will be detached from the underlying stream to prevent
216      * multiple notifications to the watcher.
217      *
218      * @throws IOException
219      *          in case of an IO problem on closing the underlying stream
220      */
221     protected void checkClose() throws IOException {
222 
223         final InputStream toCloseStream = wrappedStream;
224         if (toCloseStream != null) {
225             try {
226                 boolean scws = true; // should close wrapped stream?
227                 if (eofWatcher != null) {
228                     scws = eofWatcher.streamClosed(toCloseStream);
229                 }
230                 if (scws) {
231                     toCloseStream.close();
232                 }
233             } finally {
234                 wrappedStream = null;
235             }
236         }
237     }
238 
239     /**
240      * Detects stream abort and notifies the watcher.
241      * There's not much to detect since this is called by
242      * {@link #abortConnection abortConnection}.
243      * The watcher will only be notified if this stream is aborted
244      * for the first time and before EOF has been detected or the
245      * stream has been {@link #close closed} gracefully.
246      * This stream will be detached from the underlying stream to prevent
247      * multiple notifications to the watcher.
248      *
249      * @throws IOException
250      *          in case of an IO problem on closing the underlying stream
251      */
252     protected void checkAbort() throws IOException {
253 
254         final InputStream toAbortStream = wrappedStream;
255         if (toAbortStream != null) {
256             try {
257                 boolean scws = true; // should close wrapped stream?
258                 if (eofWatcher != null) {
259                     scws = eofWatcher.streamAbort(toAbortStream);
260                 }
261                 if (scws) {
262                     toAbortStream.close();
263                 }
264             } finally {
265                 wrappedStream = null;
266             }
267         }
268     }
269 
270     /**
271      * Same as {@link #close close()}.
272      */
273     @Override
274     public void releaseConnection() throws IOException {
275         close();
276     }
277 
278     /**
279      * Aborts this stream.
280      * This is a special version of {@link #close close()} which prevents
281      * re-use of the underlying connection, if any. Calling this method
282      * indicates that there should be no attempt to read until the end of
283      * the stream.
284      */
285     @Override
286     public void abortConnection() throws IOException {
287         // tolerate multiple calls
288         selfClosed = true;
289         checkAbort();
290     }
291 
292 }
293