View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.http.conn;
28  
29  import java.io.IOException;
30  import java.io.InputStream;
31  
32  import org.apache.http.annotation.NotThreadSafe;
33  import org.apache.http.util.Args;
34  
35  /**
36   * A stream wrapper that triggers actions on {@link #close close()} and EOF.
37   * Primarily used to auto-release an underlying managed connection when the response
38   * body is consumed or no longer needed.
39   *
40   * @see EofSensorWatcher
41   *
42   * @since 4.0
43   */
44  // don't use FilterInputStream as the base class, we'd have to
45  // override markSupported(), mark(), and reset() to disable them
46  @NotThreadSafe
47  public class EofSensorInputStream extends InputStream implements ConnectionReleaseTrigger {
48  
49      /**
50       * The wrapped input stream, while accessible.
51       * The value changes to {@code null} when the wrapped stream
52       * becomes inaccessible.
53       */
54      protected InputStream wrappedStream;
55  
56      /**
57       * Indicates whether this stream itself is closed.
58       * If it isn't, but {@link #wrappedStream wrappedStream}
59       * is {@code null}, we're running in EOF mode.
60       * All read operations will indicate EOF without accessing
61       * the underlying stream. After closing this stream, read
62       * operations will trigger an {@link IOException IOException}.
63       *
64       * @see #isReadAllowed isReadAllowed
65       */
66      private boolean selfClosed;
67  
68      /** The watcher to be notified, if any. */
69      private final EofSensorWatcher eofWatcher;
70  
71      /**
72       * Creates a new EOF sensor.
73       * If no watcher is passed, the underlying stream will simply be
74       * closed when EOF is detected or {@link #close close} is called.
75       * Otherwise, the watcher decides whether the underlying stream
76       * should be closed before detaching from it.
77       *
78       * @param in        the wrapped stream
79       * @param watcher   the watcher for events, or {@code null} for
80       *                  auto-close behavior without notification
81       */
82      public EofSensorInputStream(final InputStream in,
83                                  final EofSensorWatcher watcher) {
84          Args.notNull(in, "Wrapped stream");
85          wrappedStream = in;
86          selfClosed = false;
87          eofWatcher = watcher;
88      }
89  
90      boolean isSelfClosed() {
91          return selfClosed;
92      }
93  
94      InputStream getWrappedStream() {
95          return wrappedStream;
96      }
97  
98      /**
99       * Checks whether the underlying stream can be read from.
100      *
101      * @return  {@code true} if the underlying stream is accessible,
102      *          {@code false} if this stream is in EOF mode and
103      *          detached from the underlying stream
104      *
105      * @throws IOException      if this stream is already closed
106      */
107     protected boolean isReadAllowed() throws IOException {
108         if (selfClosed) {
109             throw new IOException("Attempted read on closed stream.");
110         }
111         return (wrappedStream != null);
112     }
113 
114     @Override
115     public int read() throws IOException {
116         int l = -1;
117 
118         if (isReadAllowed()) {
119             try {
120                 l = wrappedStream.read();
121                 checkEOF(l);
122             } catch (final IOException ex) {
123                 checkAbort();
124                 throw ex;
125             }
126         }
127 
128         return l;
129     }
130 
131     @Override
132     public int read(final byte[] b, final int off, final int len) throws IOException {
133         int l = -1;
134 
135         if (isReadAllowed()) {
136             try {
137                 l = wrappedStream.read(b,  off,  len);
138                 checkEOF(l);
139             } catch (final IOException ex) {
140                 checkAbort();
141                 throw ex;
142             }
143         }
144 
145         return l;
146     }
147 
148     @Override
149     public int read(final byte[] b) throws IOException {
150         return read(b, 0, b.length);
151     }
152 
153     @Override
154     public int available() throws IOException {
155         int a = 0; // not -1
156 
157         if (isReadAllowed()) {
158             try {
159                 a = wrappedStream.available();
160                 // no checkEOF() here, available() can't trigger EOF
161             } catch (final IOException ex) {
162                 checkAbort();
163                 throw ex;
164             }
165         }
166 
167         return a;
168     }
169 
170     @Override
171     public void close() throws IOException {
172         // tolerate multiple calls to close()
173         selfClosed = true;
174         checkClose();
175     }
176 
177     /**
178      * Detects EOF and notifies the watcher.
179      * This method should only be called while the underlying stream is
180      * still accessible. Use {@link #isReadAllowed isReadAllowed} to
181      * check that condition.
182      * <p>
183      * If EOF is detected, the watcher will be notified and this stream
184      * is detached from the underlying stream. This prevents multiple
185      * notifications from this stream.
186      * </p>
187      *
188      * @param eof       the result of the calling read operation.
189      *                  A negative value indicates that EOF is reached.
190      *
191      * @throws IOException
192      *          in case of an IO problem on closing the underlying stream
193      */
194     protected void checkEOF(final int eof) throws IOException {
195 
196         if ((wrappedStream != null) && (eof < 0)) {
197             try {
198                 boolean scws = true; // should close wrapped stream?
199                 if (eofWatcher != null) {
200                     scws = eofWatcher.eofDetected(wrappedStream);
201                 }
202                 if (scws) {
203                     wrappedStream.close();
204                 }
205             } finally {
206                 wrappedStream = null;
207             }
208         }
209     }
210 
211     /**
212      * Detects stream close and notifies the watcher.
213      * There's not much to detect since this is called by {@link #close close}.
214      * The watcher will only be notified if this stream is closed
215      * for the first time and before EOF has been detected.
216      * This stream will be detached from the underlying stream to prevent
217      * multiple notifications to the watcher.
218      *
219      * @throws IOException
220      *          in case of an IO problem on closing the underlying stream
221      */
222     protected void checkClose() throws IOException {
223 
224         if (wrappedStream != null) {
225             try {
226                 boolean scws = true; // should close wrapped stream?
227                 if (eofWatcher != null) {
228                     scws = eofWatcher.streamClosed(wrappedStream);
229                 }
230                 if (scws) {
231                     wrappedStream.close();
232                 }
233             } finally {
234                 wrappedStream = null;
235             }
236         }
237     }
238 
239     /**
240      * Detects stream abort and notifies the watcher.
241      * There's not much to detect since this is called by
242      * {@link #abortConnection abortConnection}.
243      * The watcher will only be notified if this stream is aborted
244      * for the first time and before EOF has been detected or the
245      * stream has been {@link #close closed} gracefully.
246      * This stream will be detached from the underlying stream to prevent
247      * multiple notifications to the watcher.
248      *
249      * @throws IOException
250      *          in case of an IO problem on closing the underlying stream
251      */
252     protected void checkAbort() throws IOException {
253 
254         if (wrappedStream != null) {
255             try {
256                 boolean scws = true; // should close wrapped stream?
257                 if (eofWatcher != null) {
258                     scws = eofWatcher.streamAbort(wrappedStream);
259                 }
260                 if (scws) {
261                     wrappedStream.close();
262                 }
263             } finally {
264                 wrappedStream = null;
265             }
266         }
267     }
268 
269     /**
270      * Same as {@link #close close()}.
271      */
272     @Override
273     public void releaseConnection() throws IOException {
274         close();
275     }
276 
277     /**
278      * Aborts this stream.
279      * This is a special version of {@link #close close()} which prevents
280      * re-use of the underlying connection, if any. Calling this method
281      * indicates that there should be no attempt to read until the end of
282      * the stream.
283      */
284     @Override
285     public void abortConnection() throws IOException {
286         // tolerate multiple calls
287         selfClosed = true;
288         checkAbort();
289     }
290 
291 }
292