Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 45 additions & 37 deletions src/jobrunner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,24 +88,35 @@ impl JobRunner {
let mut check_queue = false;
// A scratch buffer used to read from files.
let mut buf = Box::new([0; READBUF]);
// The earliest finish_by time of any running process (i.e. the process that will timeout
// the soonest).
let mut next_finish_by: Option<Instant> = None;
loop {
assert!(!tmp_failure || check_queue);
// If there are jobs on the queue we haven't been able to run for temporary reasons,
// then wait a short amount of time and try again.
let mut timeout = if tmp_failure { WAIT_TIMEOUT * 1000 } else { -1 };
// If any processes will exceed their timeout then, if that's shorter than the above
// timeout, only wait for enough time to pass before we need to send them SIGTERM.
if let Some(fby) = next_finish_by {
let fby_timeout = fby.saturating_duration_since(Instant::now());
if timeout == -1
|| fby_timeout < Duration::from_millis(timeout.try_into().unwrap_or(0))
{
timeout = fby_timeout.as_millis().try_into().unwrap_or(c_int::MAX);
let timeout = if tmp_failure {
// If there are jobs on the queue we haven't been able to run for temporary
// reasons, then wait a short amount of time and try again.
WAIT_TIMEOUT * 1000
} else {
// The earliest finish_by time of any running process (i.e. the process that will timeout
// the soonest).
let mut next_finish_by = None;
for i in 0..self.running.len() {
if let Some(Job { finish_by, .. }) = self.running[i] {
if next_finish_by.is_none() || Some(finish_by) < next_finish_by {
next_finish_by = Some(finish_by);
}
}
}
}

if let Some(x) = next_finish_by {
x.saturating_duration_since(Instant::now())
.as_millis()
.try_into()
.unwrap_or(c_int::MAX)
} else {
// No running jobs. Wait indefinitely until someone tells us to do something.
-1
}
};

poll(&mut self.pollfds, timeout).ok();

self.check_for_sighup();
Expand Down Expand Up @@ -174,11 +185,27 @@ impl JobRunner {
}
}

// Has the HTTP server told us that we should check for new jobs and/or SIGCHLD/SIGHUP
// has been received?
match self.pollfds[self.maxjobs * 2].revents() {
Some(flags) if flags == PollFlags::POLLIN => {
check_queue = true;
// It's fine for us to drain the event pipe completely: we'll process all the
// events it contains below.
loop {
match nix::unistd::read(self.snare.event_read_fd, &mut *buf) {
Ok(0) | Err(_) => break,
Ok(_) => (),
}
}
}
_ => (),
}

// Iterate over the running jobs and:
// * If any jobs have exceeded their timeout, send them SIGTERM.
// * If there are jobs whose stderr/stdout have closed, keep waiting on them until
// they exit.
next_finish_by = None;
for i in 0..self.running.len() {
if let Some(Job {
finish_by,
Expand All @@ -188,8 +215,6 @@ impl JobRunner {
{
if finish_by <= Instant::now() {
kill(Pid::from_raw(child.id() as i32), Signal::SIGTERM).ok();
} else if next_finish_by.is_none() || Some(finish_by) < next_finish_by {
next_finish_by = Some(finish_by);
}
}

Expand Down Expand Up @@ -254,35 +279,18 @@ impl JobRunner {
}
}

// Has the HTTP server told us that we should check for new jobs and/or SIGCHLD/SIGHUP
// has been received?
match self.pollfds[self.maxjobs * 2].revents() {
Some(flags) if flags == PollFlags::POLLIN => {
check_queue = true;
// It's fine for us to drain the event pipe completely: we'll process all the
// events it contains.
loop {
match nix::unistd::read(self.snare.event_read_fd, &mut *buf) {
Ok(0) | Err(_) => break,
Ok(_) => (),
}
}
}
_ => (),
}

// Should we check the queue? This could be because we were previously unable to empty
// it fully, or because the HTTP server has told us that there might be new jobs.
// However, it's only worth us checking the queue (which requires a lock) if there's
// space for us to run further jobs.
if check_queue && self.num_running < self.maxjobs {
match self.try_pop_queue() {
(false, false) => {
check_queue = false;
check_queue = true;
tmp_failure = false;
}
(true, false) => {
check_queue = true;
check_queue = false;
tmp_failure = false;
}
(true, true) => {
Expand Down