From 2ae78d92c1bd204b05b791c08023f37698967e0a Mon Sep 17 00:00:00 2001 From: Laurence Tratt Date: Fri, 13 Jun 2025 23:16:29 +0100 Subject: [PATCH] Simplify timeout checks and remove a race condition. This commit simplifies how we calculate when `poll` should return. Whilst doing this I realised that the HTTP server check is in the wrong place and is subject to a race condition: we could empty the queue, and then get stuck not processing anything for an arbitrary period of time. --- src/jobrunner.rs | 82 ++++++++++++++++++++++++++---------------------- 1 file changed, 45 insertions(+), 37 deletions(-) diff --git a/src/jobrunner.rs b/src/jobrunner.rs index c066bd2..6607dfb 100644 --- a/src/jobrunner.rs +++ b/src/jobrunner.rs @@ -88,24 +88,35 @@ impl JobRunner { let mut check_queue = false; // A scratch buffer used to read from files. let mut buf = Box::new([0; READBUF]); - // The earliest finish_by time of any running process (i.e. the process that will timeout - // the soonest). - let mut next_finish_by: Option = None; loop { assert!(!tmp_failure || check_queue); - // If there are jobs on the queue we haven't been able to run for temporary reasons, - // then wait a short amount of time and try again. - let mut timeout = if tmp_failure { WAIT_TIMEOUT * 1000 } else { -1 }; - // If any processes will exceed their timeout then, if that's shorter than the above - // timeout, only wait for enough time to pass before we need to send them SIGTERM. - if let Some(fby) = next_finish_by { - let fby_timeout = fby.saturating_duration_since(Instant::now()); - if timeout == -1 - || fby_timeout < Duration::from_millis(timeout.try_into().unwrap_or(0)) - { - timeout = fby_timeout.as_millis().try_into().unwrap_or(c_int::MAX); + let timeout = if tmp_failure { + // If there are jobs on the queue we haven't been able to run for temporary + // reasons, then wait a short amount of time and try again. + WAIT_TIMEOUT * 1000 + } else { + // The earliest finish_by time of any running process (i.e. the process that will timeout + // the soonest). + let mut next_finish_by = None; + for i in 0..self.running.len() { + if let Some(Job { finish_by, .. }) = self.running[i] { + if next_finish_by.is_none() || Some(finish_by) < next_finish_by { + next_finish_by = Some(finish_by); + } + } } - } + + if let Some(x) = next_finish_by { + x.saturating_duration_since(Instant::now()) + .as_millis() + .try_into() + .unwrap_or(c_int::MAX) + } else { + // No running jobs. Wait indefinitely until someone tells us to do something. + -1 + } + }; + poll(&mut self.pollfds, timeout).ok(); self.check_for_sighup(); @@ -174,11 +185,27 @@ impl JobRunner { } } + // Has the HTTP server told us that we should check for new jobs and/or SIGCHLD/SIGHUP + // has been received? + match self.pollfds[self.maxjobs * 2].revents() { + Some(flags) if flags == PollFlags::POLLIN => { + check_queue = true; + // It's fine for us to drain the event pipe completely: we'll process all the + // events it contains below. + loop { + match nix::unistd::read(self.snare.event_read_fd, &mut *buf) { + Ok(0) | Err(_) => break, + Ok(_) => (), + } + } + } + _ => (), + } + // Iterate over the running jobs and: // * If any jobs have exceeded their timeout, send them SIGTERM. // * If there are jobs whose stderr/stdout have closed, keep waiting on them until // they exit. - next_finish_by = None; for i in 0..self.running.len() { if let Some(Job { finish_by, @@ -188,8 +215,6 @@ impl JobRunner { { if finish_by <= Instant::now() { kill(Pid::from_raw(child.id() as i32), Signal::SIGTERM).ok(); - } else if next_finish_by.is_none() || Some(finish_by) < next_finish_by { - next_finish_by = Some(finish_by); } } @@ -254,23 +279,6 @@ impl JobRunner { } } - // Has the HTTP server told us that we should check for new jobs and/or SIGCHLD/SIGHUP - // has been received? - match self.pollfds[self.maxjobs * 2].revents() { - Some(flags) if flags == PollFlags::POLLIN => { - check_queue = true; - // It's fine for us to drain the event pipe completely: we'll process all the - // events it contains. - loop { - match nix::unistd::read(self.snare.event_read_fd, &mut *buf) { - Ok(0) | Err(_) => break, - Ok(_) => (), - } - } - } - _ => (), - } - // Should we check the queue? This could be because we were previously unable to empty // it fully, or because the HTTP server has told us that there might be new jobs. // However, it's only worth us checking the queue (which requires a lock) if there's @@ -278,11 +286,11 @@ impl JobRunner { if check_queue && self.num_running < self.maxjobs { match self.try_pop_queue() { (false, false) => { - check_queue = false; + check_queue = true; tmp_failure = false; } (true, false) => { - check_queue = true; + check_queue = false; tmp_failure = false; } (true, true) => {