001    /* 
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     *  contributor license agreements.  See the NOTICE file distributed with
004     *  this work for additional information regarding copyright ownership.
005     *  The ASF licenses this file to You under the Apache License, Version 2.0
006     *  (the "License"); you may not use this file except in compliance with
007     *  the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     *  Unless required by applicable law or agreed to in writing, software
012     *  distributed under the License is distributed on an "AS IS" BASIS,
013     *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     *  See the License for the specific language governing permissions and
015     *  limitations under the License.
016     *
017     */
018    
019    package org.apache.commons.exec;
020    
021    import org.apache.commons.exec.util.DebugUtils;
022    
023    import java.io.IOException;
024    import java.io.InputStream;
025    import java.io.OutputStream;
026    
027    /**
028     * Copies standard output and error of subprocesses to standard output and error
029     * of the parent process. If output or error stream are set to null, any feedback
030     * from that stream will be lost. 
031     */
032    public class PumpStreamHandler implements ExecuteStreamHandler {
033    
034        private Thread outputThread;
035    
036        private Thread errorThread;
037    
038        private Thread inputThread;
039    
040        private final OutputStream out;
041    
042        private final OutputStream err;
043    
044        private final InputStream input;
045    
046        private InputStreamPumper inputStreamPumper;
047        
048        /**
049         * Construct a new <CODE>PumpStreamHandler</CODE>.
050         */
051        public PumpStreamHandler() {
052            this(System.out, System.err);
053        }
054    
055        /**
056         * Construct a new <CODE>PumpStreamHandler</CODE>.
057         *
058         * @param outAndErr
059         *            the output/error <CODE>OutputStream</CODE>.
060         */
061        public PumpStreamHandler(final OutputStream outAndErr) {
062            this(outAndErr, outAndErr);
063        }
064        
065        /**
066         * Construct a new <CODE>PumpStreamHandler</CODE>.
067         *
068         * @param out
069         *            the output <CODE>OutputStream</CODE>.
070         * @param err
071         *            the error <CODE>OutputStream</CODE>.
072         */
073        public PumpStreamHandler(final OutputStream out, final OutputStream err) {
074            this(out, err, null);
075        }
076    
077        /**
078         * Construct a new <CODE>PumpStreamHandler</CODE>.
079         * 
080         * @param out
081         *            the output <CODE>OutputStream</CODE>.
082         * @param err
083         *            the error <CODE>OutputStream</CODE>.
084         * @param input
085         *            the input <CODE>InputStream</CODE>.
086         */
087        public PumpStreamHandler(final OutputStream out, final OutputStream err,
088                final InputStream input) {
089    
090            this.out = out;
091            this.err = err;
092            this.input = input;
093        }
094    
095        /**
096         * Set the <CODE>InputStream</CODE> from which to read the standard output
097         * of the process.
098         * 
099         * @param is
100         *            the <CODE>InputStream</CODE>.
101         */
102        public void setProcessOutputStream(final InputStream is) {
103            if (out != null) {
104                createProcessOutputPump(is, out);
105            }
106        }
107    
108        /**
109         * Set the <CODE>InputStream</CODE> from which to read the standard error
110         * of the process.
111         * 
112         * @param is
113         *            the <CODE>InputStream</CODE>.
114         */
115        public void setProcessErrorStream(final InputStream is) {
116            if (err != null) {
117                createProcessErrorPump(is, err);
118            }
119        }
120    
121        /**
122         * Set the <CODE>OutputStream</CODE> by means of which input can be sent
123         * to the process.
124         * 
125         * @param os
126         *            the <CODE>OutputStream</CODE>.
127         */
128        public void setProcessInputStream(final OutputStream os) {
129            if (input != null) {
130                if (input == System.in) {
131                    inputThread = createSystemInPump(input, os);
132            } else {
133                    inputThread = createPump(input, os, true);
134                }        } 
135            else {
136                try {
137                    os.close();
138                } catch (IOException e) {
139                    String msg = "Got exception while closing output stream";
140                    DebugUtils.handleException(msg ,e);
141                }
142            }
143        }
144            
145        /**
146         * Start the <CODE>Thread</CODE>s.
147         */
148        public void start() {
149            if (outputThread != null) {
150                outputThread.start();
151            }
152            if (errorThread != null) {
153                errorThread.start();
154            }
155            if (inputThread != null) {
156                inputThread.start();
157            }
158        }
159    
160        /**
161         * Stop pumping the streams.
162         */
163        public void stop() {
164    
165            if (inputStreamPumper != null) {
166                inputStreamPumper.stopProcessing();
167            }
168    
169            if (outputThread != null) {
170                try {
171                    outputThread.join();
172                    outputThread = null;
173                } catch (InterruptedException e) {
174                    // ignore
175                }
176            }
177    
178            if (errorThread != null) {
179                try {
180                    errorThread.join();
181                    errorThread = null;
182                } catch (InterruptedException e) {
183                    // ignore
184                }
185            }
186    
187            if (inputThread != null) {
188                try {
189                    inputThread.join();
190                    inputThread = null;
191                } catch (InterruptedException e) {
192                    // ignore
193                }
194            }
195    
196             if (err != null && err != out) {
197                 try {
198                     err.flush();
199                 } catch (IOException e) {
200                     String msg = "Got exception while flushing the error stream : " + e.getMessage();
201                     DebugUtils.handleException(msg ,e);
202                 }
203             }
204    
205             if (out != null) {
206                 try {
207                     out.flush();
208                 } catch (IOException e) {
209                     String msg = "Got exception while flushing the output stream";
210                     DebugUtils.handleException(msg ,e);
211                 }
212             }
213        }
214    
215        /**
216         * Get the error stream.
217         * 
218         * @return <CODE>OutputStream</CODE>.
219         */
220        protected OutputStream getErr() {
221            return err;
222        }
223    
224        /**
225         * Get the output stream.
226         * 
227         * @return <CODE>OutputStream</CODE>.
228         */
229        protected OutputStream getOut() {
230            return out;
231        }
232    
233        /**
234         * Create the pump to handle process output.
235         * 
236         * @param is
237         *            the <CODE>InputStream</CODE>.
238         * @param os
239         *            the <CODE>OutputStream</CODE>.
240         */
241        protected void createProcessOutputPump(final InputStream is,
242                final OutputStream os) {
243            outputThread = createPump(is, os);
244        }
245    
246        /**
247         * Create the pump to handle error output.
248         * 
249         * @param is
250         *            the <CODE>InputStream</CODE>.
251         * @param os
252         *            the <CODE>OutputStream</CODE>.
253         */
254        protected void createProcessErrorPump(final InputStream is,
255                final OutputStream os) {
256            errorThread = createPump(is, os);
257        }
258    
259        /**
260         * Creates a stream pumper to copy the given input stream to the given
261         * output stream.
262         *
263         * @param is the input stream to copy from
264         * @param os the output stream to copy into
265         * @return the stream pumper thread
266         */
267        protected Thread createPump(final InputStream is, final OutputStream os) {
268            return createPump(is, os, false);
269        }
270    
271        /**
272         * Creates a stream pumper to copy the given input stream to the given
273         * output stream.
274         *
275         * @param is the input stream to copy from
276         * @param os the output stream to copy into
277         * @param closeWhenExhausted close the output stream when the input stream is exhausted
278         * @return the stream pumper thread
279         */
280        protected Thread createPump(final InputStream is, final OutputStream os,
281                final boolean closeWhenExhausted) {
282            final Thread result = new Thread(new StreamPumper(is, os,
283                    closeWhenExhausted));
284            result.setDaemon(true);
285            return result;
286        }
287    
288    
289        /**
290         * Creates a stream pumper to copy the given input stream to the given
291         * output stream.
292         *
293         * @param is the System.in input stream to copy from
294         * @param os the output stream to copy into
295         * @return the stream pumper thread
296         */
297        private Thread createSystemInPump(InputStream is, OutputStream os) {
298            inputStreamPumper = new InputStreamPumper(is, os);
299            final Thread result = new Thread(inputStreamPumper);
300            result.setDaemon(true);
301            return result;
302        }
303    
304    }