Skip to content

Commit 82fefaf

Browse files
adierkingrokhinip
authored andcommitted
event: implement source muxing on Windows
Implement the cases in `_dispatch_unote_register_muxed()` and `_dispatch_unote_unregister_muxed()` for when multiple event sources are open on a handle and we need to combine them. The test suite doesn't hit these codepaths anywhere and we haven't run into issues with Foundation yet, so I added a dispatch_io_muxed test which opens multiple sources on a file/pipe/socket and checks that events fire correctly. Signed-off-by: Rokhini Prabhu <[email protected]>
1 parent 708257f commit 82fefaf

File tree

1 file changed

+82
-26
lines changed

1 file changed

+82
-26
lines changed

src/event/event_windows.c

+82-26
Original file line numberDiff line numberDiff line change
@@ -174,27 +174,62 @@ _dispatch_muxnote_create(dispatch_unote_t du,
174174
}
175175

176176
static void
177-
_dispatch_muxnote_stop(dispatch_muxnote_t dmn)
177+
_dispatch_muxnote_disarm_events(dispatch_muxnote_t dmn,
178+
enum _dispatch_muxnote_events events)
178179
{
179-
if (dmn->dmn_thread) {
180-
// Keep trying to cancel ReadFile() until the thread exits
181-
os_atomic_store(&dmn->dmn_stop, true, relaxed);
182-
SetEvent(dmn->dmn_event);
183-
do {
184-
CancelIoEx((HANDLE)dmn->dmn_ident, /* lpOverlapped */ NULL);
185-
} while (WaitForSingleObject(dmn->dmn_thread, 1) == WAIT_TIMEOUT);
186-
CloseHandle(dmn->dmn_thread);
187-
dmn->dmn_thread = NULL;
188-
}
189-
if (dmn->dmn_threadpool_wait) {
190-
SetThreadpoolWait(dmn->dmn_threadpool_wait, NULL, NULL);
191-
WaitForThreadpoolWaitCallbacks(dmn->dmn_threadpool_wait,
192-
/* fCancelPendingCallbacks */ FALSE);
193-
CloseThreadpoolWait(dmn->dmn_threadpool_wait);
194-
dmn->dmn_threadpool_wait = NULL;
195-
}
196-
if (dmn->dmn_handle_type == DISPATCH_MUXNOTE_HANDLE_TYPE_SOCKET) {
197-
WSAEventSelect((SOCKET)dmn->dmn_ident, NULL, 0);
180+
long lNetworkEvents;
181+
dmn->dmn_events &= ~events;
182+
switch (dmn->dmn_handle_type) {
183+
case DISPATCH_MUXNOTE_HANDLE_TYPE_INVALID:
184+
DISPATCH_INTERNAL_CRASH(0, "invalid handle");
185+
186+
case DISPATCH_MUXNOTE_HANDLE_TYPE_FILE:
187+
break;
188+
189+
case DISPATCH_MUXNOTE_HANDLE_TYPE_PIPE:
190+
if ((events & DISPATCH_MUXNOTE_EVENT_READ) && dmn->dmn_thread) {
191+
// Keep trying to cancel ReadFile() until the thread exits
192+
os_atomic_store(&dmn->dmn_stop, true, relaxed);
193+
SetEvent(dmn->dmn_event);
194+
do {
195+
CancelIoEx((HANDLE)dmn->dmn_ident, /* lpOverlapped */ NULL);
196+
} while (WaitForSingleObject(dmn->dmn_thread, 1) == WAIT_TIMEOUT);
197+
CloseHandle(dmn->dmn_thread);
198+
dmn->dmn_thread = NULL;
199+
}
200+
break;
201+
202+
case DISPATCH_MUXNOTE_HANDLE_TYPE_SOCKET:
203+
lNetworkEvents = dmn->dmn_network_events;
204+
if (events & DISPATCH_MUXNOTE_EVENT_READ) {
205+
lNetworkEvents &= ~FD_READ;
206+
}
207+
if (events & DISPATCH_MUXNOTE_EVENT_WRITE) {
208+
lNetworkEvents &= ~FD_WRITE;
209+
}
210+
if (lNetworkEvents == dmn->dmn_network_events) {
211+
break;
212+
}
213+
int iResult;
214+
if (lNetworkEvents & (FD_READ | FD_WRITE)) {
215+
iResult = WSAEventSelect((SOCKET)dmn->dmn_ident,
216+
(WSAEVENT)dmn->dmn_event, lNetworkEvents);
217+
} else {
218+
lNetworkEvents = 0;
219+
iResult = WSAEventSelect((SOCKET)dmn->dmn_ident, NULL, 0);
220+
}
221+
if (iResult != 0) {
222+
DISPATCH_INTERNAL_CRASH(WSAGetLastError(), "WSAEventSelect");
223+
}
224+
dmn->dmn_network_events = lNetworkEvents;
225+
if (!lNetworkEvents && dmn->dmn_threadpool_wait) {
226+
SetThreadpoolWait(dmn->dmn_threadpool_wait, NULL, NULL);
227+
WaitForThreadpoolWaitCallbacks(dmn->dmn_threadpool_wait,
228+
/* fCancelPendingCallbacks */ FALSE);
229+
CloseThreadpoolWait(dmn->dmn_threadpool_wait);
230+
dmn->dmn_threadpool_wait = NULL;
231+
}
232+
break;
198233
}
199234
}
200235

@@ -389,8 +424,16 @@ _dispatch_io_trigger(dispatch_muxnote_t dmn)
389424
}
390425
if (dmn->dmn_events & DISPATCH_MUXNOTE_EVENT_WRITE) {
391426
_dispatch_muxnote_retain(dmn);
392-
DWORD available =
427+
DWORD available;
428+
if (dmn->dmn_events & DISPATCH_MUXNOTE_EVENT_READ) {
429+
// We can't query a pipe which has a read source open on it
430+
// because the ReadFile() in the background thread might cause
431+
// NtQueryInformationFile() to block
432+
available = 1;
433+
} else {
434+
available =
393435
_dispatch_pipe_write_availability((HANDLE)dmn->dmn_ident);
436+
}
394437
bSuccess = PostQueuedCompletionStatus(hPort, available,
395438
(ULONG_PTR)DISPATCH_PORT_PIPE_HANDLE_WRITE,
396439
(LPOVERLAPPED)dmn);
@@ -487,8 +530,12 @@ _dispatch_unote_register_muxed(dispatch_unote_t du)
487530
dmn = _dispatch_unote_muxnote_find(dmb, du._du->du_ident,
488531
du._du->du_filter);
489532
if (dmn) {
490-
WIN_PORT_ERROR();
491-
DISPATCH_INTERNAL_CRASH(0, "muxnote updating is not supported");
533+
if (events & ~dmn->dmn_events) {
534+
dmn->dmn_events |= events;
535+
if (_dispatch_io_trigger(dmn) == FALSE) {
536+
return false;
537+
}
538+
}
492539
} else {
493540
dmn = _dispatch_muxnote_create(du, events);
494541
if (!dmn) {
@@ -551,9 +598,18 @@ _dispatch_unote_unregister_muxed(dispatch_unote_t du)
551598
}
552599
dul->du_muxnote = NULL;
553600

554-
LIST_REMOVE(dmn, dmn_list);
555-
_dispatch_muxnote_stop(dmn);
556-
_dispatch_muxnote_release(dmn);
601+
enum _dispatch_muxnote_events disarmed = 0;
602+
if (LIST_EMPTY(&dmn->dmn_readers_head)) {
603+
disarmed |= DISPATCH_MUXNOTE_EVENT_READ;
604+
}
605+
if (LIST_EMPTY(&dmn->dmn_writers_head)) {
606+
disarmed |= DISPATCH_MUXNOTE_EVENT_WRITE;
607+
}
608+
_dispatch_muxnote_disarm_events(dmn, disarmed);
609+
if (!dmn->dmn_events) {
610+
LIST_REMOVE(dmn, dmn_list);
611+
_dispatch_muxnote_release(dmn);
612+
}
557613

558614
_dispatch_unote_state_set(du, DU_STATE_UNREGISTERED);
559615
return true;

0 commit comments

Comments
 (0)