1 /*
2 * ====================================================================
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing,
14 * software distributed under the License is distributed on an
15 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16 * KIND, either express or implied. See the License for the
17 * specific language governing permissions and limitations
18 * under the License.
19 * ====================================================================
20 *
21 * This software consists of voluntary contributions made by many
22 * individuals on behalf of the Apache Software Foundation. For more
23 * information on the Apache Software Foundation, please see
24 * <http://www.apache.org/>.
25 *
26 */
27 package org.apache.http.conn;
28
29 import java.io.IOException;
30 import java.io.InputStream;
31
32 import org.apache.http.annotation.NotThreadSafe;
33 import org.apache.http.util.Args;
34
35 /**
36 * A stream wrapper that triggers actions on {@link #close close()} and EOF.
37 * Primarily used to auto-release an underlying managed connection when the response
38 * body is consumed or no longer needed.
39 *
40 * @see EofSensorWatcher
41 *
42 * @since 4.0
43 */
44 // don't use FilterInputStream as the base class, we'd have to
45 // override markSupported(), mark(), and reset() to disable them
46 @NotThreadSafe
47 public class EofSensorInputStream extends InputStream implements ConnectionReleaseTrigger {
48
49 /**
50 * The wrapped input stream, while accessible.
51 * The value changes to <code>null</code> when the wrapped stream
52 * becomes inaccessible.
53 */
54 protected InputStream wrappedStream;
55
56 /**
57 * Indicates whether this stream itself is closed.
58 * If it isn't, but {@link #wrappedStream wrappedStream}
59 * is <code>null</code>, we're running in EOF mode.
60 * All read operations will indicate EOF without accessing
61 * the underlying stream. After closing this stream, read
62 * operations will trigger an {@link IOException IOException}.
63 *
64 * @see #isReadAllowed isReadAllowed
65 */
66 private boolean selfClosed;
67
68 /** The watcher to be notified, if any. */
69 private final EofSensorWatcher eofWatcher;
70
71 /**
72 * Creates a new EOF sensor.
73 * If no watcher is passed, the underlying stream will simply be
74 * closed when EOF is detected or {@link #close close} is called.
75 * Otherwise, the watcher decides whether the underlying stream
76 * should be closed before detaching from it.
77 *
78 * @param in the wrapped stream
79 * @param watcher the watcher for events, or <code>null</code> for
80 * auto-close behavior without notification
81 */
82 public EofSensorInputStream(final InputStream in,
83 final EofSensorWatcher watcher) {
84 Args.notNull(in, "Wrapped stream");
85 wrappedStream = in;
86 selfClosed = false;
87 eofWatcher = watcher;
88 }
89
90 boolean isSelfClosed() {
91 return selfClosed;
92 }
93
94 InputStream getWrappedStream() {
95 return wrappedStream;
96 }
97
98 /**
99 * Checks whether the underlying stream can be read from.
100 *
101 * @return <code>true</code> if the underlying stream is accessible,
102 * <code>false</code> if this stream is in EOF mode and
103 * detached from the underlying stream
104 *
105 * @throws IOException if this stream is already closed
106 */
107 protected boolean isReadAllowed() throws IOException {
108 if (selfClosed) {
109 throw new IOException("Attempted read on closed stream.");
110 }
111 return (wrappedStream != null);
112 }
113
114 @Override
115 public int read() throws IOException {
116 int l = -1;
117
118 if (isReadAllowed()) {
119 try {
120 l = wrappedStream.read();
121 checkEOF(l);
122 } catch (final IOException ex) {
123 checkAbort();
124 throw ex;
125 }
126 }
127
128 return l;
129 }
130
131 @Override
132 public int read(final byte[] b, final int off, final int len) throws IOException {
133 int l = -1;
134
135 if (isReadAllowed()) {
136 try {
137 l = wrappedStream.read(b, off, len);
138 checkEOF(l);
139 } catch (final IOException ex) {
140 checkAbort();
141 throw ex;
142 }
143 }
144
145 return l;
146 }
147
148 @Override
149 public int read(final byte[] b) throws IOException {
150 return read(b, 0, b.length);
151 }
152
153 @Override
154 public int available() throws IOException {
155 int a = 0; // not -1
156
157 if (isReadAllowed()) {
158 try {
159 a = wrappedStream.available();
160 // no checkEOF() here, available() can't trigger EOF
161 } catch (final IOException ex) {
162 checkAbort();
163 throw ex;
164 }
165 }
166
167 return a;
168 }
169
170 @Override
171 public void close() throws IOException {
172 // tolerate multiple calls to close()
173 selfClosed = true;
174 checkClose();
175 }
176
177 /**
178 * Detects EOF and notifies the watcher.
179 * This method should only be called while the underlying stream is
180 * still accessible. Use {@link #isReadAllowed isReadAllowed} to
181 * check that condition.
182 * <br/>
183 * If EOF is detected, the watcher will be notified and this stream
184 * is detached from the underlying stream. This prevents multiple
185 * notifications from this stream.
186 *
187 * @param eof the result of the calling read operation.
188 * A negative value indicates that EOF is reached.
189 *
190 * @throws IOException
191 * in case of an IO problem on closing the underlying stream
192 */
193 protected void checkEOF(final int eof) throws IOException {
194
195 if ((wrappedStream != null) && (eof < 0)) {
196 try {
197 boolean scws = true; // should close wrapped stream?
198 if (eofWatcher != null) {
199 scws = eofWatcher.eofDetected(wrappedStream);
200 }
201 if (scws) {
202 wrappedStream.close();
203 }
204 } finally {
205 wrappedStream = null;
206 }
207 }
208 }
209
210 /**
211 * Detects stream close and notifies the watcher.
212 * There's not much to detect since this is called by {@link #close close}.
213 * The watcher will only be notified if this stream is closed
214 * for the first time and before EOF has been detected.
215 * This stream will be detached from the underlying stream to prevent
216 * multiple notifications to the watcher.
217 *
218 * @throws IOException
219 * in case of an IO problem on closing the underlying stream
220 */
221 protected void checkClose() throws IOException {
222
223 if (wrappedStream != null) {
224 try {
225 boolean scws = true; // should close wrapped stream?
226 if (eofWatcher != null) {
227 scws = eofWatcher.streamClosed(wrappedStream);
228 }
229 if (scws) {
230 wrappedStream.close();
231 }
232 } finally {
233 wrappedStream = null;
234 }
235 }
236 }
237
238 /**
239 * Detects stream abort and notifies the watcher.
240 * There's not much to detect since this is called by
241 * {@link #abortConnection abortConnection}.
242 * The watcher will only be notified if this stream is aborted
243 * for the first time and before EOF has been detected or the
244 * stream has been {@link #close closed} gracefully.
245 * This stream will be detached from the underlying stream to prevent
246 * multiple notifications to the watcher.
247 *
248 * @throws IOException
249 * in case of an IO problem on closing the underlying stream
250 */
251 protected void checkAbort() throws IOException {
252
253 if (wrappedStream != null) {
254 try {
255 boolean scws = true; // should close wrapped stream?
256 if (eofWatcher != null) {
257 scws = eofWatcher.streamAbort(wrappedStream);
258 }
259 if (scws) {
260 wrappedStream.close();
261 }
262 } finally {
263 wrappedStream = null;
264 }
265 }
266 }
267
268 /**
269 * Same as {@link #close close()}.
270 */
271 public void releaseConnection() throws IOException {
272 close();
273 }
274
275 /**
276 * Aborts this stream.
277 * This is a special version of {@link #close close()} which prevents
278 * re-use of the underlying connection, if any. Calling this method
279 * indicates that there should be no attempt to read until the end of
280 * the stream.
281 */
282 public void abortConnection() throws IOException {
283 // tolerate multiple calls
284 selfClosed = true;
285 checkAbort();
286 }
287
288 }
289