1 /*
2 * ====================================================================
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing,
14 * software distributed under the License is distributed on an
15 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16 * KIND, either express or implied. See the License for the
17 * specific language governing permissions and limitations
18 * under the License.
19 * ====================================================================
20 *
21 * This software consists of voluntary contributions made by many
22 * individuals on behalf of the Apache Software Foundation. For more
23 * information on the Apache Software Foundation, please see
24 * <http://www.apache.org/>.
25 *
26 */
27 package org.apache.http.conn;
28
29 import java.io.IOException;
30 import java.io.InputStream;
31
32 import org.apache.http.util.Args;
33
34 /**
35 * A stream wrapper that triggers actions on {@link #close close()} and EOF.
36 * Primarily used to auto-release an underlying managed connection when the response
37 * body is consumed or no longer needed.
38 *
39 * @see EofSensorWatcher
40 *
41 * @since 4.0
42 */
43 // don't use FilterInputStream as the base class, we'd have to
44 // override markSupported(), mark(), and reset() to disable them
45 public class EofSensorInputStream extends InputStream implements ConnectionReleaseTrigger {
46
47 /**
48 * The wrapped input stream, while accessible.
49 * The value changes to {@code null} when the wrapped stream
50 * becomes inaccessible.
51 */
52 protected InputStream wrappedStream;
53
54 /**
55 * Indicates whether this stream itself is closed.
56 * If it isn't, but {@link #wrappedStream wrappedStream}
57 * is {@code null}, we're running in EOF mode.
58 * All read operations will indicate EOF without accessing
59 * the underlying stream. After closing this stream, read
60 * operations will trigger an {@link IOException IOException}.
61 *
62 * @see #isReadAllowed isReadAllowed
63 */
64 private boolean selfClosed;
65
66 /** The watcher to be notified, if any. */
67 private final EofSensorWatcher eofWatcher;
68
69 /**
70 * Creates a new EOF sensor.
71 * If no watcher is passed, the underlying stream will simply be
72 * closed when EOF is detected or {@link #close close} is called.
73 * Otherwise, the watcher decides whether the underlying stream
74 * should be closed before detaching from it.
75 *
76 * @param in the wrapped stream
77 * @param watcher the watcher for events, or {@code null} for
78 * auto-close behavior without notification
79 */
80 public EofSensorInputStream(final InputStream in,
81 final EofSensorWatcher watcher) {
82 Args.notNull(in, "Wrapped stream");
83 wrappedStream = in;
84 selfClosed = false;
85 eofWatcher = watcher;
86 }
87
88 boolean isSelfClosed() {
89 return selfClosed;
90 }
91
92 InputStream getWrappedStream() {
93 return wrappedStream;
94 }
95
96 /**
97 * Checks whether the underlying stream can be read from.
98 *
99 * @return {@code true} if the underlying stream is accessible,
100 * {@code false} if this stream is in EOF mode and
101 * detached from the underlying stream
102 *
103 * @throws IOException if this stream is already closed
104 */
105 protected boolean isReadAllowed() throws IOException {
106 if (selfClosed) {
107 throw new IOException("Attempted read on closed stream.");
108 }
109 return (wrappedStream != null);
110 }
111
112 @Override
113 public int read() throws IOException {
114 int readLen = -1;
115
116 if (isReadAllowed()) {
117 try {
118 readLen = wrappedStream.read();
119 checkEOF(readLen);
120 } catch (final IOException ex) {
121 checkAbort();
122 throw ex;
123 }
124 }
125
126 return readLen;
127 }
128
129 @Override
130 public int read(final byte[] b, final int off, final int len) throws IOException {
131 int readLen = -1;
132
133 if (isReadAllowed()) {
134 try {
135 readLen = wrappedStream.read(b, off, len);
136 checkEOF(readLen);
137 } catch (final IOException ex) {
138 checkAbort();
139 throw ex;
140 }
141 }
142
143 return readLen;
144 }
145
146 @Override
147 public int read(final byte[] b) throws IOException {
148 return read(b, 0, b.length);
149 }
150
151 @Override
152 public int available() throws IOException {
153 int a = 0; // not -1
154
155 if (isReadAllowed()) {
156 try {
157 a = wrappedStream.available();
158 // no checkEOF() here, available() can't trigger EOF
159 } catch (final IOException ex) {
160 checkAbort();
161 throw ex;
162 }
163 }
164
165 return a;
166 }
167
168 @Override
169 public void close() throws IOException {
170 // tolerate multiple calls to close()
171 selfClosed = true;
172 checkClose();
173 }
174
175 /**
176 * Detects EOF and notifies the watcher.
177 * This method should only be called while the underlying stream is
178 * still accessible. Use {@link #isReadAllowed isReadAllowed} to
179 * check that condition.
180 * <p>
181 * If EOF is detected, the watcher will be notified and this stream
182 * is detached from the underlying stream. This prevents multiple
183 * notifications from this stream.
184 * </p>
185 *
186 * @param eof the result of the calling read operation.
187 * A negative value indicates that EOF is reached.
188 *
189 * @throws IOException
190 * in case of an IO problem on closing the underlying stream
191 */
192 protected void checkEOF(final int eof) throws IOException {
193
194 final InputStream toCheckStream = wrappedStream;
195 if ((toCheckStream != null) && (eof < 0)) {
196 try {
197 boolean scws = true; // should close wrapped stream?
198 if (eofWatcher != null) {
199 scws = eofWatcher.eofDetected(toCheckStream);
200 }
201 if (scws) {
202 toCheckStream.close();
203 }
204 } finally {
205 wrappedStream = null;
206 }
207 }
208 }
209
210 /**
211 * Detects stream close and notifies the watcher.
212 * There's not much to detect since this is called by {@link #close close}.
213 * The watcher will only be notified if this stream is closed
214 * for the first time and before EOF has been detected.
215 * This stream will be detached from the underlying stream to prevent
216 * multiple notifications to the watcher.
217 *
218 * @throws IOException
219 * in case of an IO problem on closing the underlying stream
220 */
221 protected void checkClose() throws IOException {
222
223 final InputStream toCloseStream = wrappedStream;
224 if (toCloseStream != null) {
225 try {
226 boolean scws = true; // should close wrapped stream?
227 if (eofWatcher != null) {
228 scws = eofWatcher.streamClosed(toCloseStream);
229 }
230 if (scws) {
231 toCloseStream.close();
232 }
233 } finally {
234 wrappedStream = null;
235 }
236 }
237 }
238
239 /**
240 * Detects stream abort and notifies the watcher.
241 * There's not much to detect since this is called by
242 * {@link #abortConnection abortConnection}.
243 * The watcher will only be notified if this stream is aborted
244 * for the first time and before EOF has been detected or the
245 * stream has been {@link #close closed} gracefully.
246 * This stream will be detached from the underlying stream to prevent
247 * multiple notifications to the watcher.
248 *
249 * @throws IOException
250 * in case of an IO problem on closing the underlying stream
251 */
252 protected void checkAbort() throws IOException {
253
254 final InputStream toAbortStream = wrappedStream;
255 if (toAbortStream != null) {
256 try {
257 boolean scws = true; // should close wrapped stream?
258 if (eofWatcher != null) {
259 scws = eofWatcher.streamAbort(toAbortStream);
260 }
261 if (scws) {
262 toAbortStream.close();
263 }
264 } finally {
265 wrappedStream = null;
266 }
267 }
268 }
269
270 /**
271 * Same as {@link #close close()}.
272 */
273 @Override
274 public void releaseConnection() throws IOException {
275 close();
276 }
277
278 /**
279 * Aborts this stream.
280 * This is a special version of {@link #close close()} which prevents
281 * re-use of the underlying connection, if any. Calling this method
282 * indicates that there should be no attempt to read until the end of
283 * the stream.
284 */
285 @Override
286 public void abortConnection() throws IOException {
287 // tolerate multiple calls
288 selfClosed = true;
289 checkAbort();
290 }
291
292 }
293