@@ -58,6 +58,7 @@ def worker_title(title):
58
58
59
59
class WorkerInteractor :
60
60
SHUTDOWN_MARK = object ()
61
+ QUEUE_REPLACED_MARK = object ()
61
62
62
63
def __init__ (self , config , channel ):
63
64
self .config = config
@@ -72,6 +73,15 @@ def __init__(self, config, channel):
72
73
def _make_queue (self ):
73
74
return self .channel .gateway .execmodel .queue .Queue ()
74
75
76
+ def _get_next_item_index (self ):
77
+ """Gets the next item from test queue. Handles the case when the queue
78
+ is replaced concurrently in another thread.
79
+ """
80
+ result = self .torun .get ()
81
+ while result is self .QUEUE_REPLACED_MARK :
82
+ result = self .torun .get ()
83
+ return result
84
+
75
85
def sendevent (self , name , ** kwargs ):
76
86
self .log ("sending" , name , kwargs )
77
87
self .channel .send ((name , kwargs ))
@@ -136,19 +146,22 @@ def old_queue_get_nowait_noraise():
136
146
self .torun .put (i )
137
147
138
148
self .sendevent ("unscheduled" , indices = stolen )
149
+ old_queue .put (self .QUEUE_REPLACED_MARK )
139
150
140
151
@pytest .hookimpl
141
152
def pytest_runtestloop (self , session ):
142
153
self .log ("entering main loop" )
143
154
self .channel .setcallback (self .handle_command , endmarker = self .SHUTDOWN_MARK )
144
- self .nextitem_index = self .torun . get ()
155
+ self .nextitem_index = self ._get_next_item_index ()
145
156
while self .nextitem_index is not self .SHUTDOWN_MARK :
146
157
self .run_one_test ()
147
158
return True
148
159
149
160
def run_one_test (self ):
161
+ self .item_index = self .nextitem_index
162
+ self .nextitem_index = self ._get_next_item_index ()
163
+
150
164
items = self .session .items
151
- self .item_index , self .nextitem_index = self .nextitem_index , self .torun .get ()
152
165
item = items [self .item_index ]
153
166
if self .nextitem_index is self .SHUTDOWN_MARK :
154
167
nextitem = None
0 commit comments