1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 """Twisted process handling
20
21 The name of this module, "knotted", was suggested by Andrew Bennetts
22 as a way to avoid the name clash with the"twisted" top-level package::
23
24 <spiv> I'd say "knotted" would be good.
25 <spiv> And then you can tell people to "get knotted" ;)
26
27 I disclaim all responsibility for the bad taste of Twisted developers.
28 """
29
30 import Queue
31 from twisted.internet import protocol
32 from pybaz import errors
33 import commandline
34
35
36 __all__ = ['TwistedSpawningStrategy']
37
38
39 import logging
40 logging = logging.getLogger('pybaz.knotted')
41
42
47
48
50
54
55 - def _spawn(self, args, chdir):
56 if __debug__:
57 from twisted.python import threadable
58 assert not threadable.isInIOThread()
59 from twisted.internet import reactor
60
61
62 queue = Queue.Queue(0)
63 process_protocol = ProcessProtocol(queue)
64 argv = [self._command]
65 argv.extend(args)
66 logging.debug("%#x spawning args=%r, chdir=%r", id(queue), argv, chdir)
67 reactor.callFromThread(
68 reactor.spawnProcess,
69 process_protocol, self._command, args=argv, env=None, path=chdir)
70 return queue
71
73 queue = self._spawn(args, chdir)
74 logging.debug("%#x await termination", id(queue))
75 while True:
76 msg = queue.get(block=True)
77 logging.debug("%#x event %.60r", id(queue), msg)
78 if msg[0] == 'start':
79 self._logger.log_command(self._command, args)
80 elif msg[0] == 'output':
81 text = ''.join(msg[1])
82 self._logger.log_output(text)
83 elif msg[0] == 'error':
84 text = ''.join(msg[1])
85 self._logger.log_error(text)
86 elif msg[0] == 'terminate':
87 if msg[1] in expected and msg[2] is None:
88 return msg[1]
89 else:
90 raise make_exec_problem(
91 msg[1:], self._command, args, expected, chdir)
92 else:
93 raise AssertionError('bad message: %r', msg)
94
96 queue = self._spawn(args, chdir)
97 return SequenceCmd(
98 queue, self._command, args, chdir, expected, self._logger)
99
100 - def status_text_cmd(self, args, chdir, expected):
101 queue = self._spawn(args, chdir)
102 seq = SequenceCmd(
103 queue, self._command, args, chdir, expected, self._logger)
104 seq.strip = False
105 text = ''.join(seq)
106 return seq.status, text
107
109
110 - def __init__(self, queue, command, args, chdir, expected, logger):
111 self._queue = queue
112 self._command = command
113 self._logger = logger
114 self._args = args
115 self._chdir = chdir
116 self._expected = expected
117 self._buffer = []
118 self._status = None
119 self.finished = False
120 self.strip = True
121
124
126 msg = None
127 while True:
128 if self.finished:
129 raise StopIteration
130 if self._buffer:
131 line = self._buffer.pop(0)
132 if self.strip:
133 return line.rstrip('\n')
134 else:
135 return line
136 if msg is None:
137 logging.debug("%#x read output", id(self._queue))
138 msg = self._queue.get(block=True)
139 logging.debug("%#x event %.60r", id(self._queue), msg)
140 if msg[0] == 'start':
141 self._logger.log_command(self._command, self._args)
142 elif msg[0] == 'output':
143 self._buffer = msg[1] + self._buffer
144 elif msg[0] == 'error':
145 text = ''.join(msg[1])
146 self._logger.log_error(text)
147 elif msg[0] == 'terminate':
148 self.finished = True
149 if msg[1] in self._expected and msg[2] is None:
150 self._status = msg[1]
151 else:
152 raise make_exec_problem(
153 msg[1:], self._command, self._args,
154 self._expected, self._chdir)
155 else:
156 raise AssertionError('bad message: %r', msg)
157
159 if self._status is None:
160 raise AttributeError, "no status, process has not finished"
161 return self._status
162 status = property(_get_status)
163
164
166 """Dummy object providing the same interface as `forkexec.ChildProcess`."""
167
168 - def __init__(self, command, args, expected, chdir, status, signal, error):
169 self.command = command
170 self.args = args
171 self.chdir = chdir
172 self.expected = expected
173 self.error = ''.join(error)
174 self.status = status
175 self.signal = signal
176
177
179
181 self.__queue = queue
182 self.__out_buffer = str()
183 self.__err_buffer = str()
184 self.error_lines = []
185 self.status = None
186
188 """The process has started."""
189 self.__queue.put(('start',), block=True)
190
192 self.__queue.put(('output', lines), block=True)
193
195 self.__queue.put(('error', lines), block=True)
196
198 """The process has produced data on standard output."""
199 data = self.__out_buffer + data
200 lines = data.splitlines(True)
201 if lines[-1].endswith('\n'):
202 self.__out_buffer = str()
203 complete_lines = lines
204 else:
205 self.__out_buffer = lines[-1]
206 complete_lines = lines[:-1]
207 self.__send_output(complete_lines)
208
210 """The process has produced data on standard error."""
211 data = self.__err_buffer + data
212 lines = data.splitlines(True)
213 if lines[-1].endswith('\n'):
214 self.__err_buffer = str()
215 complete_lines = lines
216 else:
217 self.__err_buffer = lines[-1]
218 complete_lines = lines[:-1]
219 self.__send_error(complete_lines)
220 self.error_lines.extend(complete_lines)
221
223 """The process has closed standard output."""
224 if self.__out_buffer:
225 self.__send_output([self.__out_buffer])
226 self.__out_buffer = str()
227
229 """The process has closed standard error."""
230 if self.__err_buffer:
231 self.__send_error([self.__err_buffer])
232 self.error_lines.append(self.__err_buffer)
233 self.__err_buffer = str()
234
236 """The process has terminated."""
237 signal = reason.value.signal
238 status = reason.value.exitCode
239 msg = ('terminate', status, signal, self.error_lines)
240 self.__queue.put(msg, block=True)
241