1 /* 2 * ==================================================================== 3 * Licensed to the Apache Software Foundation (ASF) under one 4 * or more contributor license agreements. See the NOTICE file 5 * distributed with this work for additional information 6 * regarding copyright ownership. The ASF licenses this file 7 * to you under the Apache License, Version 2.0 (the 8 * "License"); you may not use this file except in compliance 9 * with the License. You may obtain a copy of the License at 10 * 11 * http://www.apache.org/licenses/LICENSE-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, 14 * software distributed under the License is distributed on an 15 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 16 * KIND, either express or implied. See the License for the 17 * specific language governing permissions and limitations 18 * under the License. 19 * ==================================================================== 20 * 21 * This software consists of voluntary contributions made by many 22 * individuals on behalf of the Apache Software Foundation. For more 23 * information on the Apache Software Foundation, please see 24 * <http://www.apache.org/>. 25 * 26 */ 27 package org.apache.http.conn; 28 29 import java.io.IOException; 30 import java.io.InputStream; 31 32 import org.apache.http.util.Args; 33 34 /** 35 * A stream wrapper that triggers actions on {@link #close close()} and EOF. 36 * Primarily used to auto-release an underlying managed connection when the response 37 * body is consumed or no longer needed. 38 * 39 * @see EofSensorWatcher 40 * 41 * @since 4.0 42 */ 43 // don't use FilterInputStream as the base class, we'd have to 44 // override markSupported(), mark(), and reset() to disable them 45 public class EofSensorInputStream extends InputStream implements ConnectionReleaseTrigger { 46 47 /** 48 * The wrapped input stream, while accessible. 49 * The value changes to {@code null} when the wrapped stream 50 * becomes inaccessible. 51 */ 52 protected InputStream wrappedStream; 53 54 /** 55 * Indicates whether this stream itself is closed. 56 * If it isn't, but {@link #wrappedStream wrappedStream} 57 * is {@code null}, we're running in EOF mode. 58 * All read operations will indicate EOF without accessing 59 * the underlying stream. After closing this stream, read 60 * operations will trigger an {@link IOException IOException}. 61 * 62 * @see #isReadAllowed isReadAllowed 63 */ 64 private boolean selfClosed; 65 66 /** The watcher to be notified, if any. */ 67 private final EofSensorWatcher eofWatcher; 68 69 /** 70 * Creates a new EOF sensor. 71 * If no watcher is passed, the underlying stream will simply be 72 * closed when EOF is detected or {@link #close close} is called. 73 * Otherwise, the watcher decides whether the underlying stream 74 * should be closed before detaching from it. 75 * 76 * @param in the wrapped stream 77 * @param watcher the watcher for events, or {@code null} for 78 * auto-close behavior without notification 79 */ 80 public EofSensorInputStream(final InputStream in, 81 final EofSensorWatcher watcher) { 82 Args.notNull(in, "Wrapped stream"); 83 wrappedStream = in; 84 selfClosed = false; 85 eofWatcher = watcher; 86 } 87 88 boolean isSelfClosed() { 89 return selfClosed; 90 } 91 92 InputStream getWrappedStream() { 93 return wrappedStream; 94 } 95 96 /** 97 * Checks whether the underlying stream can be read from. 98 * 99 * @return {@code true} if the underlying stream is accessible, 100 * {@code false} if this stream is in EOF mode and 101 * detached from the underlying stream 102 * 103 * @throws IOException if this stream is already closed 104 */ 105 protected boolean isReadAllowed() throws IOException { 106 if (selfClosed) { 107 throw new IOException("Attempted read on closed stream."); 108 } 109 return (wrappedStream != null); 110 } 111 112 @Override 113 public int read() throws IOException { 114 int readLen = -1; 115 116 if (isReadAllowed()) { 117 try { 118 readLen = wrappedStream.read(); 119 checkEOF(readLen); 120 } catch (final IOException ex) { 121 checkAbort(); 122 throw ex; 123 } 124 } 125 126 return readLen; 127 } 128 129 @Override 130 public int read(final byte[] b, final int off, final int len) throws IOException { 131 int readLen = -1; 132 133 if (isReadAllowed()) { 134 try { 135 readLen = wrappedStream.read(b, off, len); 136 checkEOF(readLen); 137 } catch (final IOException ex) { 138 checkAbort(); 139 throw ex; 140 } 141 } 142 143 return readLen; 144 } 145 146 @Override 147 public int read(final byte[] b) throws IOException { 148 return read(b, 0, b.length); 149 } 150 151 @Override 152 public int available() throws IOException { 153 int a = 0; // not -1 154 155 if (isReadAllowed()) { 156 try { 157 a = wrappedStream.available(); 158 // no checkEOF() here, available() can't trigger EOF 159 } catch (final IOException ex) { 160 checkAbort(); 161 throw ex; 162 } 163 } 164 165 return a; 166 } 167 168 @Override 169 public void close() throws IOException { 170 // tolerate multiple calls to close() 171 selfClosed = true; 172 checkClose(); 173 } 174 175 /** 176 * Detects EOF and notifies the watcher. 177 * This method should only be called while the underlying stream is 178 * still accessible. Use {@link #isReadAllowed isReadAllowed} to 179 * check that condition. 180 * <p> 181 * If EOF is detected, the watcher will be notified and this stream 182 * is detached from the underlying stream. This prevents multiple 183 * notifications from this stream. 184 * </p> 185 * 186 * @param eof the result of the calling read operation. 187 * A negative value indicates that EOF is reached. 188 * 189 * @throws IOException 190 * in case of an IO problem on closing the underlying stream 191 */ 192 protected void checkEOF(final int eof) throws IOException { 193 194 final InputStream toCheckStream = wrappedStream; 195 if ((toCheckStream != null) && (eof < 0)) { 196 try { 197 boolean scws = true; // should close wrapped stream? 198 if (eofWatcher != null) { 199 scws = eofWatcher.eofDetected(toCheckStream); 200 } 201 if (scws) { 202 toCheckStream.close(); 203 } 204 } finally { 205 wrappedStream = null; 206 } 207 } 208 } 209 210 /** 211 * Detects stream close and notifies the watcher. 212 * There's not much to detect since this is called by {@link #close close}. 213 * The watcher will only be notified if this stream is closed 214 * for the first time and before EOF has been detected. 215 * This stream will be detached from the underlying stream to prevent 216 * multiple notifications to the watcher. 217 * 218 * @throws IOException 219 * in case of an IO problem on closing the underlying stream 220 */ 221 protected void checkClose() throws IOException { 222 223 final InputStream toCloseStream = wrappedStream; 224 if (toCloseStream != null) { 225 try { 226 boolean scws = true; // should close wrapped stream? 227 if (eofWatcher != null) { 228 scws = eofWatcher.streamClosed(toCloseStream); 229 } 230 if (scws) { 231 toCloseStream.close(); 232 } 233 } finally { 234 wrappedStream = null; 235 } 236 } 237 } 238 239 /** 240 * Detects stream abort and notifies the watcher. 241 * There's not much to detect since this is called by 242 * {@link #abortConnection abortConnection}. 243 * The watcher will only be notified if this stream is aborted 244 * for the first time and before EOF has been detected or the 245 * stream has been {@link #close closed} gracefully. 246 * This stream will be detached from the underlying stream to prevent 247 * multiple notifications to the watcher. 248 * 249 * @throws IOException 250 * in case of an IO problem on closing the underlying stream 251 */ 252 protected void checkAbort() throws IOException { 253 254 final InputStream toAbortStream = wrappedStream; 255 if (toAbortStream != null) { 256 try { 257 boolean scws = true; // should close wrapped stream? 258 if (eofWatcher != null) { 259 scws = eofWatcher.streamAbort(toAbortStream); 260 } 261 if (scws) { 262 toAbortStream.close(); 263 } 264 } finally { 265 wrappedStream = null; 266 } 267 } 268 } 269 270 /** 271 * Same as {@link #close close()}. 272 */ 273 @Override 274 public void releaseConnection() throws IOException { 275 close(); 276 } 277 278 /** 279 * Aborts this stream. 280 * This is a special version of {@link #close close()} which prevents 281 * re-use of the underlying connection, if any. Calling this method 282 * indicates that there should be no attempt to read until the end of 283 * the stream. 284 */ 285 @Override 286 public void abortConnection() throws IOException { 287 // tolerate multiple calls 288 selfClosed = true; 289 checkAbort(); 290 } 291 292 } 293