From f5af4fbed905e34838b10ffa6128042b3a7a00ae Mon Sep 17 00:00:00 2001 From: Laurence Tratt Date: Tue, 20 May 2025 08:33:07 +0100 Subject: [PATCH 1/2] Simplify. --- src/queue.rs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/queue.rs b/src/queue.rs index 6b6206a..6812dfd 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -48,12 +48,7 @@ impl Queue { /// Are there any jobs in the queue? pub fn is_empty(&self) -> bool { - for v in self.q.values() { - if !v.is_empty() { - return false; - } - } - true + self.q.values().all(|x| x.is_empty()) } /// Push a new request to the back of the queue. From 31c5910bae4c455a971c738b26d6df62656e07e3 Mon Sep 17 00:00:00 2001 From: Laurence Tratt Date: Tue, 20 May 2025 12:17:03 +0100 Subject: [PATCH 2/2] Split "check queue" and "temporary failure" into two. Previously "there is stuff in the queue" caused us to initiate "continually wake up if there's a temporary failure". --- src/jobrunner.rs | 41 +++++++++++++++++++++++++++++------------ 1 file changed, 29 insertions(+), 12 deletions(-) diff --git a/src/jobrunner.rs b/src/jobrunner.rs index 67e0a0d..c066bd2 100644 --- a/src/jobrunner.rs +++ b/src/jobrunner.rs @@ -81,9 +81,10 @@ impl JobRunner { /// Listen for new jobs on the queue and then run them. fn attend(&mut self) { self.update_pollfds(); - // `check_queue` serves two subtly different purposes: - // * Has the event pipe told us there are new jobs in the queue? - // * Are there jobs in the queue from a previous round that we couldn't run yet? + // Have we encountered a temporary failure when trying to run a job? If so, we'll need to + // retry again after a timeout. Note: if this is true then `check_queue` will also be true. + let mut tmp_failure = false; + // Do we need to check the queue? let mut check_queue = false; // A scratch buffer used to read from files. let mut buf = Box::new([0; READBUF]); @@ -91,9 +92,10 @@ impl JobRunner { // the soonest). let mut next_finish_by: Option = None; loop { + assert!(!tmp_failure || check_queue); // If there are jobs on the queue we haven't been able to run for temporary reasons, // then wait a short amount of time and try again. - let mut timeout = if check_queue { WAIT_TIMEOUT * 1000 } else { -1 }; + let mut timeout = if tmp_failure { WAIT_TIMEOUT * 1000 } else { -1 }; // If any processes will exceed their timeout then, if that's shorter than the above // timeout, only wait for enough time to pass before we need to send them SIGTERM. if let Some(fby) = next_finish_by { @@ -247,6 +249,7 @@ impl JobRunner { self.running[i] = None; self.num_running -= 1; self.update_pollfds(); + check_queue = true; } } } @@ -273,19 +276,33 @@ impl JobRunner { // However, it's only worth us checking the queue (which requires a lock) if there's // space for us to run further jobs. if check_queue && self.num_running < self.maxjobs { - check_queue = !self.try_pop_queue(); + match self.try_pop_queue() { + (false, false) => { + check_queue = false; + tmp_failure = false; + } + (true, false) => { + check_queue = true; + tmp_failure = false; + } + (true, true) => { + check_queue = true; + tmp_failure = true; + } + (false, true) => unreachable!(), + } } } } - /// Try to pop all jobs on the queue: returns `true` if it was able to do so successfully or - /// `false` otherwise. - fn try_pop_queue(&mut self) -> bool { + /// Try to pop all jobs on the queue: returns `(queue_is_empty, + /// encountered_temporary_failure)`. + fn try_pop_queue(&mut self) -> (bool, bool) { let snare = Arc::clone(&self.snare); let mut queue = snare.queue.lock().unwrap(); loop { - if self.num_running == self.maxjobs && !queue.is_empty() { - return false; + if self.num_running == self.maxjobs { + return (queue.is_empty(), false); } let pjob = queue.pop(|repo_id| { self.running.iter().any(|jobslot| { @@ -311,7 +328,7 @@ impl JobRunner { Err(Some(qj)) => { // The job couldn't be run for temporary reasons: we'll retry later. queue.push_front(qj); - return false; + return (true, true); } Err(None) => { // The job couldn't be run for permanent reasons: it has been consumed @@ -326,7 +343,7 @@ impl JobRunner { // We weren't able to pop any jobs from the queue, but that doesn't mean that // the queue is necessarily empty: there may be `QueueKind::Sequential` jobs in // it which can't be popped until others with the same path have completed. - return queue.is_empty(); + return (queue.is_empty(), false); } } }