Skip to content

Commit 0574b8b

Browse files
committed
ABANDON select/poll
1 parent 6e98416 commit 0574b8b

File tree

1 file changed

+48
-185
lines changed

1 file changed

+48
-185
lines changed

git/cmd.py

+48-185
Original file line numberDiff line numberDiff line change
@@ -4,48 +4,43 @@
44
# This module is part of GitPython and is released under
55
# the BSD License: http://www.opensource.org/licenses/bsd-license.php
66

7-
import os
8-
import sys
9-
import select
10-
import logging
11-
import threading
12-
import errno
13-
import mmap
14-
15-
from git.odict import OrderedDict
167
from contextlib import contextmanager
8+
import io
9+
import logging
10+
import os
1711
import signal
18-
import subprocess
1912
from subprocess import (
2013
call,
2114
Popen,
2215
PIPE
2316
)
17+
import subprocess
18+
import sys
19+
import threading
2420

25-
26-
from .util import (
27-
LazyMixin,
28-
stream_copy,
29-
)
30-
from .exc import (
31-
GitCommandError,
32-
GitCommandNotFound
33-
)
3421
from git.compat import (
3522
string_types,
3623
defenc,
3724
force_bytes,
3825
PY3,
39-
bchr,
4026
# just to satisfy flake8 on py3
4127
unicode,
4228
safe_decode,
4329
is_posix,
4430
is_win,
4531
)
46-
import io
47-
from _io import UnsupportedOperation
4832
from git.exc import CommandError
33+
from git.odict import OrderedDict
34+
35+
from .exc import (
36+
GitCommandError,
37+
GitCommandNotFound
38+
)
39+
from .util import (
40+
LazyMixin,
41+
stream_copy,
42+
)
43+
4944

5045
execute_kwargs = set(('istream', 'with_keep_cwd', 'with_extended_output',
5146
'with_exceptions', 'as_process', 'stdout_as_string',
@@ -57,13 +52,6 @@
5752

5853
__all__ = ('Git',)
5954

60-
if PY3:
61-
_bchr = bchr
62-
else:
63-
def _bchr(c):
64-
return c
65-
# get custom byte character handling
66-
6755

6856
# ==============================================================================
6957
## @name Utilities
@@ -73,8 +61,7 @@ def _bchr(c):
7361

7462
def handle_process_output(process, stdout_handler, stderr_handler, finalizer, decode_streams=True):
7563
"""Registers for notifications to lean that process output is ready to read, and dispatches lines to
76-
the respective line handlers. We are able to handle carriage returns in case progress is sent by that
77-
mean. For performance reasons, we only apply this to stderr.
64+
the respective line handlers.
7865
This function returns once the finalizer returns
7966
8067
:return: result of finalizer
@@ -88,160 +75,36 @@ def handle_process_output(process, stdout_handler, stderr_handler, finalizer, de
8875
Set it to False if `universal_newline == True` (then streams are in text-mode)
8976
or if decoding must happen later (i.e. for Diffs).
9077
"""
91-
if decode_streams:
92-
ZERO = b''
93-
LF = b'\n'
94-
CR = b'\r'
95-
else:
96-
ZERO = u''
97-
LF = u'\n'
98-
CR = u'\r'
99-
100-
def _parse_lines_from_buffer(buf):
101-
line = ZERO
102-
bi = 0
103-
lb = len(buf)
104-
while bi < lb:
105-
char = buf[bi]
106-
bi += 1
107-
108-
if char in (LF, CR) and line:
109-
yield bi, line + LF
110-
line = ZERO
111-
else:
112-
line += char
113-
# END process parsed line
114-
# END while file is not done reading
115-
# end
116-
117-
def _read_lines_from_fno(fno, last_buf_list):
118-
buf = fno.read(mmap.PAGESIZE)
119-
buf = last_buf_list[0] + buf
120-
121-
bi = 0
122-
for bi, line in _parse_lines_from_buffer(buf):
123-
yield line
124-
# for each line to parse from the buffer
125-
126-
# keep remainder
127-
last_buf_list[0] = buf[bi:]
128-
129-
def _dispatch_single_line(line, handler, decode):
130-
if decode:
131-
line = line.decode(defenc)
132-
if line and handler:
133-
handler(line)
134-
# end dispatch helper
135-
# end single line helper
136-
137-
def _dispatch_lines(fno, handler, buf_list, decode):
138-
lc = 0
139-
for line in _read_lines_from_fno(fno, buf_list):
140-
_dispatch_single_line(line, handler, decode)
141-
lc += 1
142-
# for each line
143-
return lc
144-
# end
145-
146-
def _deplete_buffer(fno, handler, buf_list, decode):
147-
lc = 0
148-
while True:
149-
line_count = _dispatch_lines(fno, handler, buf_list, decode)
150-
lc += line_count
151-
if line_count == 0:
152-
break
153-
# end deplete buffer
154-
155-
if buf_list[0]:
156-
_dispatch_single_line(buf_list[0], handler, decode)
157-
lc += 1
158-
# end
159-
160-
return lc
161-
# end
162-
163-
try:
164-
outfn = process.stdout.fileno()
165-
errfn = process.stderr.fileno()
166-
poll = select.poll() # @UndefinedVariable
167-
except (UnsupportedOperation, AttributeError):
168-
# Oh ... probably we are on windows. or TC mockap provided for streams.
169-
# Anyhow, select.select() can only handle sockets, we have files
170-
# The only reliable way to do this now is to use threads and wait for both to finish
171-
def pump_stream(cmdline, name, stream, is_decode, handler):
172-
try:
173-
for line in stream:
174-
if handler:
175-
if is_decode:
176-
line = line.decode(defenc)
177-
handler(line)
178-
except Exception as ex:
179-
log.error("Pumping %r of cmd(%s) failed due to: %r", name, cmdline, ex)
180-
raise CommandError(['<%s-pump>' % name] + cmdline, ex)
181-
finally:
182-
stream.close()
183-
184-
cmdline = getattr(process, 'args', '') # PY3+ only
185-
if not isinstance(cmdline, (tuple, list)):
186-
cmdline = cmdline.split()
187-
threads = []
188-
for name, stream, handler in (
189-
('stdout', process.stdout, stdout_handler),
190-
('stderr', process.stderr, stderr_handler),
191-
):
192-
t = threading.Thread(target=pump_stream,
193-
args=(cmdline, name, stream, decode_streams, handler))
194-
t.setDaemon(True)
195-
t.start()
196-
threads.append(t)
197-
198-
for t in threads:
199-
t.join()
200-
else:
201-
# poll is preferred, as select is limited to file handles up to 1024 ... . This could otherwise be
202-
# an issue for us, as it matters how many handles our own process has
203-
fdmap = {outfn: (process.stdout, stdout_handler, [ZERO], decode_streams),
204-
errfn: (process.stderr, stderr_handler, [ZERO], decode_streams)}
205-
206-
READ_ONLY = select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR # @UndefinedVariable
207-
CLOSED = select.POLLHUP | select.POLLERR # @UndefinedVariable
208-
209-
poll.register(process.stdout, READ_ONLY)
210-
poll.register(process.stderr, READ_ONLY)
211-
212-
closed_streams = set()
213-
while True:
214-
# no timeout
215-
216-
try:
217-
poll_result = poll.poll()
218-
except select.error as e:
219-
if e.args[0] == errno.EINTR:
220-
continue
221-
raise
222-
# end handle poll exception
223-
224-
for fd, result in poll_result:
225-
if result & CLOSED:
226-
closed_streams.add(fd)
227-
else:
228-
_dispatch_lines(*fdmap[fd])
229-
# end handle closed stream
230-
# end for each poll-result tuple
231-
232-
if len(closed_streams) == len(fdmap):
233-
break
234-
# end its all done
235-
# end endless loop
236-
237-
# Depelete all remaining buffers
238-
for fno, args in fdmap.items():
239-
_deplete_buffer(*args)
240-
# end for each file handle
241-
242-
for fno in fdmap.keys():
243-
poll.unregister(fno)
244-
# end don't forget to unregister !
78+
# Use 2 "pupm" threads and wait for both to finish.
79+
def pump_stream(cmdline, name, stream, is_decode, handler):
80+
try:
81+
for line in stream:
82+
if handler:
83+
if is_decode:
84+
line = line.decode(defenc)
85+
handler(line)
86+
except Exception as ex:
87+
log.error("Pumping %r of cmd(%s) failed due to: %r", name, cmdline, ex)
88+
raise CommandError(['<%s-pump>' % name] + cmdline, ex)
89+
finally:
90+
stream.close()
91+
92+
cmdline = getattr(process, 'args', '') # PY3+ only
93+
if not isinstance(cmdline, (tuple, list)):
94+
cmdline = cmdline.split()
95+
threads = []
96+
for name, stream, handler in (
97+
('stdout', process.stdout, stdout_handler),
98+
('stderr', process.stderr, stderr_handler),
99+
):
100+
t = threading.Thread(target=pump_stream,
101+
args=(cmdline, name, stream, decode_streams, handler))
102+
t.setDaemon(True)
103+
t.start()
104+
threads.append(t)
105+
106+
for t in threads:
107+
t.join()
245108

246109
return finalizer(process)
247110

0 commit comments

Comments
 (0)