Package pybaz :: Package backends :: Module knotted
[frames] | no frames]

Source Code for Module pybaz.backends.knotted

  1  # arch-tag: 0abcff18-da5d-49f0-a03e-1d866a86b5cd 
  2  # Copyright (C) 2004 Canonical Ltd. 
  3  #               Author: David Allouche <david@canonical.com> 
  4  # 
  5  #    This program is free software; you can redistribute it and/or modify 
  6  #    it under the terms of the GNU General Public License as published by 
  7  #    the Free Software Foundation; either version 2 of the License, or 
  8  #    (at your option) any later version. 
  9  # 
 10  #    This program is distributed in the hope that it will be useful, 
 11  #    but WITHOUT ANY WARRANTY; without even the implied warranty of 
 12  #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 13  #    GNU General Public License for more details. 
 14  # 
 15  #    You should have received a copy of the GNU General Public License 
 16  #    along with this program; if not, write to the Free Software 
 17  #    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA 
 18   
 19  """Twisted process handling 
 20   
 21  The name of this module, "knotted", was suggested by Andrew Bennetts 
 22  as a way to avoid the name clash with the"twisted" top-level package:: 
 23   
 24      <spiv> I'd say "knotted" would be good. 
 25      <spiv> And then you can tell people to "get knotted" ;) 
 26   
 27  I disclaim all responsibility for the bad taste of Twisted developers. 
 28  """ 
 29   
 30  import Queue 
 31  from twisted.internet import protocol 
 32  from pybaz import errors 
 33  import commandline 
 34   
 35   
 36  __all__ = ['TwistedSpawningStrategy'] 
 37   
 38   
 39  import logging 
 40  logging = logging.getLogger('pybaz.knotted') 
 41   
 42   
43 -def make_exec_problem(data, command, args, expected, chdir):
44 status, signal, error = data 45 proc = DummyProcess(command, args, expected, chdir, status, signal, error) 46 return errors.ExecProblem(proc)
47 48
49 -class TwistedSpawningStrategy(commandline.SpawningStrategy):
50
51 - def __init__(self, command, logger):
52 self._command = command 53 self._logger = logger
54
55 - def _spawn(self, args, chdir):
56 if __debug__: 57 from twisted.python import threadable 58 assert not threadable.isInIOThread() 59 from twisted.internet import reactor 60 # The queue size must be unlimited, otherwise the blocking 61 # put() in the reactor could cause a deadlock. 62 queue = Queue.Queue(0) # unlimited queue 63 process_protocol = ProcessProtocol(queue) 64 argv = [self._command] 65 argv.extend(args) 66 logging.debug("%#x spawning args=%r, chdir=%r", id(queue), argv, chdir) 67 reactor.callFromThread( 68 reactor.spawnProcess, 69 process_protocol, self._command, args=argv, env=None, path=chdir) 70 return queue
71
72 - def status_cmd(self, args, chdir, expected):
73 queue = self._spawn(args, chdir) 74 logging.debug("%#x await termination", id(queue)) 75 while True: 76 msg = queue.get(block=True) 77 logging.debug("%#x event %.60r", id(queue), msg) 78 if msg[0] == 'start': 79 self._logger.log_command(self._command, args) 80 elif msg[0] == 'output': 81 text = ''.join(msg[1]) 82 self._logger.log_output(text) 83 elif msg[0] == 'error': 84 text = ''.join(msg[1]) 85 self._logger.log_error(text) 86 elif msg[0] == 'terminate': 87 if msg[1] in expected and msg[2] is None: 88 return msg[1] 89 else: 90 raise make_exec_problem( 91 msg[1:], self._command, args, expected, chdir) 92 else: 93 raise AssertionError('bad message: %r', msg)
94
95 - def sequence_cmd(self, args, chdir, expected):
96 queue = self._spawn(args, chdir) 97 return SequenceCmd( 98 queue, self._command, args, chdir, expected, self._logger)
99
100 - def status_text_cmd(self, args, chdir, expected):
101 queue = self._spawn(args, chdir) 102 seq = SequenceCmd( 103 queue, self._command, args, chdir, expected, self._logger) 104 seq.strip = False 105 text = ''.join(seq) 106 return seq.status, text
107
108 -class SequenceCmd(object):
109
110 - def __init__(self, queue, command, args, chdir, expected, logger):
111 self._queue = queue 112 self._command = command 113 self._logger = logger 114 self._args = args 115 self._chdir = chdir 116 self._expected = expected 117 self._buffer = [] 118 self._status = None 119 self.finished = False 120 self.strip = True
121
122 - def __iter__(self):
123 return self
124
125 - def next(self):
126 msg = None 127 while True: 128 if self.finished: 129 raise StopIteration 130 if self._buffer: 131 line = self._buffer.pop(0) 132 if self.strip: 133 return line.rstrip('\n') 134 else: 135 return line 136 if msg is None: 137 logging.debug("%#x read output", id(self._queue)) 138 msg = self._queue.get(block=True) 139 logging.debug("%#x event %.60r", id(self._queue), msg) 140 if msg[0] == 'start': 141 self._logger.log_command(self._command, self._args) 142 elif msg[0] == 'output': 143 self._buffer = msg[1] + self._buffer 144 elif msg[0] == 'error': 145 text = ''.join(msg[1]) 146 self._logger.log_error(text) 147 elif msg[0] == 'terminate': 148 self.finished = True 149 if msg[1] in self._expected and msg[2] is None: 150 self._status = msg[1] 151 else: 152 raise make_exec_problem( 153 msg[1:], self._command, self._args, 154 self._expected, self._chdir) 155 else: 156 raise AssertionError('bad message: %r', msg)
157
158 - def _get_status(self):
159 if self._status is None: 160 raise AttributeError, "no status, process has not finished" 161 return self._status
162 status = property(_get_status)
163 164
165 -class DummyProcess(object):
166 """Dummy object providing the same interface as `forkexec.ChildProcess`.""" 167
168 - def __init__(self, command, args, expected, chdir, status, signal, error):
169 self.command = command 170 self.args = args 171 self.chdir = chdir 172 self.expected = expected 173 self.error = ''.join(error) 174 self.status = status 175 self.signal = signal
176 177
178 -class ProcessProtocol(protocol.ProcessProtocol):
179
180 - def __init__(self, queue):
181 self.__queue = queue 182 self.__out_buffer = str() 183 self.__err_buffer = str() 184 self.error_lines = [] 185 self.status = None
186
187 - def connectionMade(self):
188 """The process has started.""" 189 self.__queue.put(('start',), block=True)
190
191 - def __send_output(self, lines):
192 self.__queue.put(('output', lines), block=True)
193
194 - def __send_error(self, lines):
195 self.__queue.put(('error', lines), block=True)
196
197 - def outReceived(self, data):
198 """The process has produced data on standard output.""" 199 data = self.__out_buffer + data 200 lines = data.splitlines(True) 201 if lines[-1].endswith('\n'): 202 self.__out_buffer = str() 203 complete_lines = lines 204 else: 205 self.__out_buffer = lines[-1] 206 complete_lines = lines[:-1] 207 self.__send_output(complete_lines)
208
209 - def errReceived(self, data):
210 """The process has produced data on standard error.""" 211 data = self.__err_buffer + data 212 lines = data.splitlines(True) 213 if lines[-1].endswith('\n'): 214 self.__err_buffer = str() 215 complete_lines = lines 216 else: 217 self.__err_buffer = lines[-1] 218 complete_lines = lines[:-1] 219 self.__send_error(complete_lines) 220 self.error_lines.extend(complete_lines)
221
222 - def outConnectionLost(self):
223 """The process has closed standard output.""" 224 if self.__out_buffer: 225 self.__send_output([self.__out_buffer]) 226 self.__out_buffer = str()
227
228 - def errConnectionLost(self):
229 """The process has closed standard error.""" 230 if self.__err_buffer: 231 self.__send_error([self.__err_buffer]) 232 self.error_lines.append(self.__err_buffer) 233 self.__err_buffer = str()
234
235 - def processEnded(self, reason):
236 """The process has terminated.""" 237 signal = reason.value.signal 238 status = reason.value.exitCode 239 msg = ('terminate', status, signal, self.error_lines) 240 self.__queue.put(msg, block=True)
241