Skip to content

Commit 4ce3b85

Browse files
authored
Merge pull request #4072 from tiif/blockop
Implement blocking unnamed_socket
2 parents 955ef09 + 1f77130 commit 4ce3b85

11 files changed

+391
-114
lines changed

src/tools/miri/src/concurrency/thread.rs

+2
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,8 @@ pub enum BlockReason {
159159
Epoll,
160160
/// Blocked on eventfd.
161161
Eventfd,
162+
/// Blocked on unnamed_socket.
163+
UnnamedSocket,
162164
}
163165

164166
/// The state of a thread.

src/tools/miri/src/shims/unix/unnamed_socket.rs

+157-80
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,12 @@ struct AnonSocket {
3636
/// This flag is set to `true` if the peer's `readbuf` is non-empty at the time
3737
/// of closure.
3838
peer_lost_data: Cell<bool>,
39+
/// A list of thread ids blocked because the buffer was empty.
40+
/// Once another thread writes some bytes, these threads will be unblocked.
41+
blocked_read_tid: RefCell<Vec<ThreadId>>,
42+
/// A list of thread ids blocked because the buffer was full.
43+
/// Once another thread reads some bytes, these threads will be unblocked.
44+
blocked_write_tid: RefCell<Vec<ThreadId>>,
3945
is_nonblock: bool,
4046
}
4147

@@ -83,7 +89,7 @@ impl FileDescription for AnonSocket {
8389

8490
fn read<'tcx>(
8591
&self,
86-
_self_ref: &FileDescriptionRef,
92+
self_ref: &FileDescriptionRef,
8793
_communicate_allowed: bool,
8894
ptr: Pointer,
8995
len: usize,
@@ -100,33 +106,21 @@ impl FileDescription for AnonSocket {
100106
// corresponding ErrorKind variant.
101107
throw_unsup_format!("reading from the write end of a pipe");
102108
};
103-
if readbuf.borrow().buf.is_empty() {
104-
if self.peer_fd().upgrade().is_none() {
105-
// Socketpair with no peer and empty buffer.
106-
// 0 bytes successfully read indicates end-of-file.
107-
return ecx.return_read_success(ptr, &[], 0, dest);
108-
} else {
109-
if self.is_nonblock {
110-
// Non-blocking socketpair with writer and empty buffer.
111-
// https://linux.die.net/man/2/read
112-
// EAGAIN or EWOULDBLOCK can be returned for socket,
113-
// POSIX.1-2001 allows either error to be returned for this case.
114-
// Since there is no ErrorKind for EAGAIN, WouldBlock is used.
115-
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
116-
} else {
117-
// Blocking socketpair with writer and empty buffer.
118-
// FIXME: blocking is currently not supported
119-
throw_unsup_format!("socketpair/pipe/pipe2 read: blocking isn't supported yet");
120-
}
121-
}
109+
110+
if readbuf.borrow().buf.is_empty() && self.is_nonblock {
111+
// Non-blocking socketpair with writer and empty buffer.
112+
// https://linux.die.net/man/2/read
113+
// EAGAIN or EWOULDBLOCK can be returned for socket,
114+
// POSIX.1-2001 allows either error to be returned for this case.
115+
// Since there is no ErrorKind for EAGAIN, WouldBlock is used.
116+
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
122117
}
123-
// TODO: We might need to decide what to do if peer_fd is closed when read is blocked.
124-
anonsocket_read(self, self.peer_fd().upgrade(), len, ptr, dest, ecx)
118+
anonsocket_read(self_ref.downgrade(), len, ptr, dest.clone(), ecx)
125119
}
126120

127121
fn write<'tcx>(
128122
&self,
129-
_self_ref: &FileDescriptionRef,
123+
self_ref: &FileDescriptionRef,
130124
_communicate_allowed: bool,
131125
ptr: Pointer,
132126
len: usize,
@@ -153,16 +147,11 @@ impl FileDescription for AnonSocket {
153147
};
154148
let available_space =
155149
MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(writebuf.borrow().buf.len());
156-
if available_space == 0 {
157-
if self.is_nonblock {
158-
// Non-blocking socketpair with a full buffer.
159-
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
160-
} else {
161-
// Blocking socketpair with a full buffer.
162-
throw_unsup_format!("socketpair/pipe/pipe2 write: blocking isn't supported yet");
163-
}
150+
if available_space == 0 && self.is_nonblock {
151+
// Non-blocking socketpair with a full buffer.
152+
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
164153
}
165-
anonsocket_write(available_space, &peer_fd, ptr, len, dest, ecx)
154+
anonsocket_write(self_ref.downgrade(), ptr, len, dest.clone(), ecx)
166155
}
167156

168157
fn as_unix(&self) -> &dyn UnixFileDescription {
@@ -172,81 +161,161 @@ impl FileDescription for AnonSocket {
172161

173162
/// Write to AnonSocket based on the space available and return the written byte size.
174163
fn anonsocket_write<'tcx>(
175-
available_space: usize,
176-
peer_fd: &FileDescriptionRef,
164+
weak_self_ref: WeakFileDescriptionRef,
177165
ptr: Pointer,
178166
len: usize,
179-
dest: &MPlaceTy<'tcx>,
167+
dest: MPlaceTy<'tcx>,
180168
ecx: &mut MiriInterpCx<'tcx>,
181169
) -> InterpResult<'tcx> {
170+
let Some(self_ref) = weak_self_ref.upgrade() else {
171+
// FIXME: We should raise a deadlock error if the self_ref upgrade failed.
172+
throw_unsup_format!("This will be a deadlock error in future")
173+
};
174+
let self_anonsocket = self_ref.downcast::<AnonSocket>().unwrap();
175+
let Some(peer_fd) = self_anonsocket.peer_fd().upgrade() else {
176+
// If the upgrade from Weak to Rc fails, it indicates that all read ends have been
177+
// closed.
178+
return ecx.set_last_error_and_return(ErrorKind::BrokenPipe, &dest);
179+
};
182180
let Some(writebuf) = &peer_fd.downcast::<AnonSocket>().unwrap().readbuf else {
183181
// FIXME: This should return EBADF, but there's no nice way to do that as there's no
184182
// corresponding ErrorKind variant.
185183
throw_unsup_format!("writing to the reading end of a pipe")
186184
};
187-
let mut writebuf = writebuf.borrow_mut();
188185

189-
// Remember this clock so `read` can synchronize with us.
190-
ecx.release_clock(|clock| {
191-
writebuf.clock.join(clock);
192-
});
193-
// Do full write / partial write based on the space available.
194-
let actual_write_size = len.min(available_space);
195-
let bytes = ecx.read_bytes_ptr_strip_provenance(ptr, Size::from_bytes(len))?;
196-
writebuf.buf.extend(&bytes[..actual_write_size]);
186+
let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(writebuf.borrow().buf.len());
187+
188+
if available_space == 0 {
189+
// Blocking socketpair with a full buffer.
190+
let dest = dest.clone();
191+
self_anonsocket.blocked_write_tid.borrow_mut().push(ecx.active_thread());
192+
ecx.block_thread(
193+
BlockReason::UnnamedSocket,
194+
None,
195+
callback!(
196+
@capture<'tcx> {
197+
weak_self_ref: WeakFileDescriptionRef,
198+
ptr: Pointer,
199+
len: usize,
200+
dest: MPlaceTy<'tcx>,
201+
}
202+
@unblock = |this| {
203+
anonsocket_write(weak_self_ref, ptr, len, dest, this)
204+
}
205+
),
206+
);
207+
} else {
208+
let mut writebuf = writebuf.borrow_mut();
209+
// Remember this clock so `read` can synchronize with us.
210+
ecx.release_clock(|clock| {
211+
writebuf.clock.join(clock);
212+
});
213+
// Do full write / partial write based on the space available.
214+
let actual_write_size = len.min(available_space);
215+
let bytes = ecx.read_bytes_ptr_strip_provenance(ptr, Size::from_bytes(len))?;
216+
writebuf.buf.extend(&bytes[..actual_write_size]);
197217

198-
// Need to stop accessing peer_fd so that it can be notified.
199-
drop(writebuf);
218+
// Need to stop accessing peer_fd so that it can be notified.
219+
drop(writebuf);
200220

201-
// Notification should be provided for peer fd as it became readable.
202-
// The kernel does this even if the fd was already readable before, so we follow suit.
203-
ecx.check_and_update_readiness(peer_fd)?;
221+
// Notification should be provided for peer fd as it became readable.
222+
// The kernel does this even if the fd was already readable before, so we follow suit.
223+
ecx.check_and_update_readiness(&peer_fd)?;
224+
let peer_anonsocket = peer_fd.downcast::<AnonSocket>().unwrap();
225+
// Unblock all threads that are currently blocked on peer_fd's read.
226+
let waiting_threads = std::mem::take(&mut *peer_anonsocket.blocked_read_tid.borrow_mut());
227+
// FIXME: We can randomize the order of unblocking.
228+
for thread_id in waiting_threads {
229+
ecx.unblock_thread(thread_id, BlockReason::UnnamedSocket)?;
230+
}
204231

205-
ecx.return_write_success(actual_write_size, dest)
232+
return ecx.return_write_success(actual_write_size, &dest);
233+
}
234+
interp_ok(())
206235
}
207236

208237
/// Read from AnonSocket and return the number of bytes read.
209238
fn anonsocket_read<'tcx>(
210-
anonsocket: &AnonSocket,
211-
peer_fd: Option<FileDescriptionRef>,
239+
weak_self_ref: WeakFileDescriptionRef,
212240
len: usize,
213241
ptr: Pointer,
214-
dest: &MPlaceTy<'tcx>,
242+
dest: MPlaceTy<'tcx>,
215243
ecx: &mut MiriInterpCx<'tcx>,
216244
) -> InterpResult<'tcx> {
217-
let mut bytes = vec![0; len];
245+
let Some(self_ref) = weak_self_ref.upgrade() else {
246+
// FIXME: We should raise a deadlock error if the self_ref upgrade failed.
247+
throw_unsup_format!("This will be a deadlock error in future")
248+
};
249+
let self_anonsocket = self_ref.downcast::<AnonSocket>().unwrap();
218250

219-
let Some(readbuf) = &anonsocket.readbuf else {
251+
let Some(readbuf) = &self_anonsocket.readbuf else {
220252
// FIXME: This should return EBADF, but there's no nice way to do that as there's no
221253
// corresponding ErrorKind variant.
222254
throw_unsup_format!("reading from the write end of a pipe")
223255
};
224-
let mut readbuf = readbuf.borrow_mut();
225-
226-
// Synchronize with all previous writes to this buffer.
227-
// FIXME: this over-synchronizes; a more precise approach would be to
228-
// only sync with the writes whose data we will read.
229-
ecx.acquire_clock(&readbuf.clock);
230-
231-
// Do full read / partial read based on the space available.
232-
// Conveniently, `read` exists on `VecDeque` and has exactly the desired behavior.
233-
let actual_read_size = readbuf.buf.read(&mut bytes[..]).unwrap();
234-
235-
// Need to drop before others can access the readbuf again.
236-
drop(readbuf);
237-
238-
// A notification should be provided for the peer file description even when it can
239-
// only write 1 byte. This implementation is not compliant with the actual Linux kernel
240-
// implementation. For optimization reasons, the kernel will only mark the file description
241-
// as "writable" when it can write more than a certain number of bytes. Since we
242-
// don't know what that *certain number* is, we will provide a notification every time
243-
// a read is successful. This might result in our epoll emulation providing more
244-
// notifications than the real system.
245-
if let Some(peer_fd) = peer_fd {
246-
ecx.check_and_update_readiness(&peer_fd)?;
247-
}
248256

249-
ecx.return_read_success(ptr, &bytes, actual_read_size, dest)
257+
if readbuf.borrow_mut().buf.is_empty() {
258+
if self_anonsocket.peer_fd().upgrade().is_none() {
259+
// Socketpair with no peer and empty buffer.
260+
// 0 bytes successfully read indicates end-of-file.
261+
return ecx.return_read_success(ptr, &[], 0, &dest);
262+
} else {
263+
// Blocking socketpair with writer and empty buffer.
264+
let weak_self_ref = weak_self_ref.clone();
265+
self_anonsocket.blocked_read_tid.borrow_mut().push(ecx.active_thread());
266+
ecx.block_thread(
267+
BlockReason::UnnamedSocket,
268+
None,
269+
callback!(
270+
@capture<'tcx> {
271+
weak_self_ref: WeakFileDescriptionRef,
272+
len: usize,
273+
ptr: Pointer,
274+
dest: MPlaceTy<'tcx>,
275+
}
276+
@unblock = |this| {
277+
anonsocket_read(weak_self_ref, len, ptr, dest, this)
278+
}
279+
),
280+
);
281+
}
282+
} else {
283+
let mut bytes = vec![0; len];
284+
let mut readbuf = readbuf.borrow_mut();
285+
// Synchronize with all previous writes to this buffer.
286+
// FIXME: this over-synchronizes; a more precise approach would be to
287+
// only sync with the writes whose data we will read.
288+
ecx.acquire_clock(&readbuf.clock);
289+
290+
// Do full read / partial read based on the space available.
291+
// Conveniently, `read` exists on `VecDeque` and has exactly the desired behavior.
292+
let actual_read_size = readbuf.buf.read(&mut bytes[..]).unwrap();
293+
294+
// Need to drop before others can access the readbuf again.
295+
drop(readbuf);
296+
297+
// A notification should be provided for the peer file description even when it can
298+
// only write 1 byte. This implementation is not compliant with the actual Linux kernel
299+
// implementation. For optimization reasons, the kernel will only mark the file description
300+
// as "writable" when it can write more than a certain number of bytes. Since we
301+
// don't know what that *certain number* is, we will provide a notification every time
302+
// a read is successful. This might result in our epoll emulation providing more
303+
// notifications than the real system.
304+
if let Some(peer_fd) = self_anonsocket.peer_fd().upgrade() {
305+
ecx.check_and_update_readiness(&peer_fd)?;
306+
let peer_anonsocket = peer_fd.downcast::<AnonSocket>().unwrap();
307+
// Unblock all threads that are currently blocked on peer_fd's write.
308+
let waiting_threads =
309+
std::mem::take(&mut *peer_anonsocket.blocked_write_tid.borrow_mut());
310+
// FIXME: We can randomize the order of unblocking.
311+
for thread_id in waiting_threads {
312+
ecx.unblock_thread(thread_id, BlockReason::UnnamedSocket)?;
313+
}
314+
};
315+
316+
return ecx.return_read_success(ptr, &bytes, actual_read_size, &dest);
317+
}
318+
interp_ok(())
250319
}
251320

252321
impl UnixFileDescription for AnonSocket {
@@ -360,12 +429,16 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
360429
readbuf: Some(RefCell::new(Buffer::new())),
361430
peer_fd: OnceCell::new(),
362431
peer_lost_data: Cell::new(false),
432+
blocked_read_tid: RefCell::new(Vec::new()),
433+
blocked_write_tid: RefCell::new(Vec::new()),
363434
is_nonblock: is_sock_nonblock,
364435
});
365436
let fd1 = fds.new_ref(AnonSocket {
366437
readbuf: Some(RefCell::new(Buffer::new())),
367438
peer_fd: OnceCell::new(),
368439
peer_lost_data: Cell::new(false),
440+
blocked_read_tid: RefCell::new(Vec::new()),
441+
blocked_write_tid: RefCell::new(Vec::new()),
369442
is_nonblock: is_sock_nonblock,
370443
});
371444

@@ -424,12 +497,16 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
424497
readbuf: Some(RefCell::new(Buffer::new())),
425498
peer_fd: OnceCell::new(),
426499
peer_lost_data: Cell::new(false),
500+
blocked_read_tid: RefCell::new(Vec::new()),
501+
blocked_write_tid: RefCell::new(Vec::new()),
427502
is_nonblock,
428503
});
429504
let fd1 = fds.new_ref(AnonSocket {
430505
readbuf: None,
431506
peer_fd: OnceCell::new(),
432507
peer_lost_data: Cell::new(false),
508+
blocked_read_tid: RefCell::new(Vec::new()),
509+
blocked_write_tid: RefCell::new(Vec::new()),
433510
is_nonblock,
434511
});
435512

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
//@ignore-target: windows # No libc socketpair on Windows
2+
//~^ERROR: deadlocked
3+
//~^^ERROR: deadlocked
4+
// test_race depends on a deterministic schedule.
5+
//@compile-flags: -Zmiri-preemption-rate=0
6+
//@error-in-other-file: deadlock
7+
8+
use std::thread;
9+
10+
// Test the behaviour of a thread being blocked on read, get unblocked, then blocked again.
11+
12+
// The expected execution is
13+
// 1. Thread 1 blocks.
14+
// 2. Thread 2 blocks.
15+
// 3. Thread 3 unblocks both thread 1 and thread 2.
16+
// 4. Thread 1 reads.
17+
// 5. Thread 2's `read` can never complete -> deadlocked.
18+
19+
fn main() {
20+
let mut fds = [-1, -1];
21+
let res = unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) };
22+
assert_eq!(res, 0);
23+
let thread1 = thread::spawn(move || {
24+
// Let this thread block on read.
25+
let mut buf: [u8; 3] = [0; 3];
26+
let res = unsafe { libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t) };
27+
assert_eq!(res, 3);
28+
assert_eq!(&buf, "abc".as_bytes());
29+
});
30+
let thread2 = thread::spawn(move || {
31+
// Let this thread block on read.
32+
let mut buf: [u8; 3] = [0; 3];
33+
let res = unsafe { libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t) };
34+
//~^ERROR: deadlocked
35+
assert_eq!(res, 3);
36+
assert_eq!(&buf, "abc".as_bytes());
37+
});
38+
let thread3 = thread::spawn(move || {
39+
// Unblock thread1 by writing something.
40+
let data = "abc".as_bytes().as_ptr();
41+
let res = unsafe { libc::write(fds[0], data as *const libc::c_void, 3) };
42+
assert_eq!(res, 3);
43+
});
44+
thread1.join().unwrap();
45+
thread2.join().unwrap();
46+
thread3.join().unwrap();
47+
}

0 commit comments

Comments
 (0)