Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 27 additions & 56 deletions src/uucore/src/lib/features/pipes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,6 @@ pub fn drain_pipe(pipe: &PipeReader, dest: &impl AsFd, len: usize) -> PipeRes {
Ok(Ok(()))
}

/// check that source is FUSE
/// we fallback to read() at FUSE <https://github.com/uutils/coreutils/issues/9609>
#[inline]
pub fn might_fuse(source: &impl AsFd) -> bool {
rustix::fs::fstatfs(source).map_or(true, |stats| stats.f_type == 0x6573_5546) // FUSE magic number, too many platform specific clippy warning with const
}

/// force-splice source to dest even both of them are not pipe via broker pipe
///
/// throughput is better than direct splice for the case one of in/output is pipe by unknown reason
Expand Down Expand Up @@ -125,59 +118,37 @@ pub fn send_n_bytes(input: impl AsFd, target: impl AsFd, n: u64) -> std::io::Res
}
let mut n = n;
let mut bytes_written: u64 = 0;
let succeed_or_fuse = loop {
if n == 0 {
// avoid unnecessary syscall
return Ok(bytes_written);
}
match splice(&input, &target, n as usize) {
Ok(0) => break true,
Ok(s) => {
n -= s as u64;
bytes_written += s as u64;
if let Some((broker_r, broker_w)) = PIPE_CACHE
.get_or_init(|| {
// use std::io::pipe to avoid unnecessary fcntl
let pair = std::io::pipe().ok()?;
if pipe_size > KERNEL_DEFAULT_PIPE_SIZE {
let _ = fcntl_setpipe_size(&pair.0, pipe_size);
}
_ => break false, // input or output is not pipe
}
};
let succeed_or_fuse = succeed_or_fuse
|| if let Some((broker_r, broker_w)) = PIPE_CACHE
.get_or_init(|| {
// use std::io::pipe to avoid unnecessary fcntl
let pair = std::io::pipe().ok()?;
if pipe_size > KERNEL_DEFAULT_PIPE_SIZE {
let _ = fcntl_setpipe_size(&pair.0, pipe_size);
}
Some(pair)
})
.as_ref()
{
// todo: create fn splice_bounded_broker
loop {
if n == 0 {
return Ok(bytes_written);
}
match splice(&input, &broker_w, n as usize) {
Ok(0) => break true,
Ok(s) => {
n -= s as u64;
bytes_written += s as u64;
if drain_pipe(broker_r, &target, s)?.is_err() {
break false;
}
Some(pair)
})
.as_ref()
{
// todo: create fn splice_bounded_broker
loop {
if n == 0 {
return Ok(bytes_written);
}
match splice(&input, &broker_w, n as usize) {
Ok(0) => return Ok(bytes_written),
Ok(s) => {
n -= s as u64;
bytes_written += s as u64;
if drain_pipe(broker_r, &target, s)?.is_err() {
break;
}
_ => break false,
}
_ => break,
}
} else {
false
};
// do not always fallback to write for fuse, or 2 Ctrl+D is required to exit on tty
// todo: move fuse patch to callers
if !succeed_or_fuse || might_fuse(&input) {
// remove buffering from this fallback by RawReader, or order of output would be wrong with multiple input
bytes_written += std::io::copy(&mut RawReader(input).take(n), &mut RawWriter(target))?;
}

}
};
// fallback. remove buffering from this fallback by RawReader, or order of output would be wrong with multiple input
bytes_written += std::io::copy(&mut RawReader(input).take(n), &mut RawWriter(target))?;
Ok(bytes_written)
}

Expand Down
Loading