diff --git a/src/jobrunner.rs b/src/jobrunner.rs index 67e0a0d..c066bd2 100644 --- a/src/jobrunner.rs +++ b/src/jobrunner.rs @@ -81,9 +81,10 @@ impl JobRunner { /// Listen for new jobs on the queue and then run them. fn attend(&mut self) { self.update_pollfds(); - // `check_queue` serves two subtly different purposes: - // * Has the event pipe told us there are new jobs in the queue? - // * Are there jobs in the queue from a previous round that we couldn't run yet? + // Have we encountered a temporary failure when trying to run a job? If so, we'll need to + // retry again after a timeout. Note: if this is true then `check_queue` will also be true. + let mut tmp_failure = false; + // Do we need to check the queue? let mut check_queue = false; // A scratch buffer used to read from files. let mut buf = Box::new([0; READBUF]); @@ -91,9 +92,10 @@ impl JobRunner { // the soonest). let mut next_finish_by: Option = None; loop { + assert!(!tmp_failure || check_queue); // If there are jobs on the queue we haven't been able to run for temporary reasons, // then wait a short amount of time and try again. - let mut timeout = if check_queue { WAIT_TIMEOUT * 1000 } else { -1 }; + let mut timeout = if tmp_failure { WAIT_TIMEOUT * 1000 } else { -1 }; // If any processes will exceed their timeout then, if that's shorter than the above // timeout, only wait for enough time to pass before we need to send them SIGTERM. if let Some(fby) = next_finish_by { @@ -247,6 +249,7 @@ impl JobRunner { self.running[i] = None; self.num_running -= 1; self.update_pollfds(); + check_queue = true; } } } @@ -273,19 +276,33 @@ impl JobRunner { // However, it's only worth us checking the queue (which requires a lock) if there's // space for us to run further jobs. if check_queue && self.num_running < self.maxjobs { - check_queue = !self.try_pop_queue(); + match self.try_pop_queue() { + (false, false) => { + check_queue = false; + tmp_failure = false; + } + (true, false) => { + check_queue = true; + tmp_failure = false; + } + (true, true) => { + check_queue = true; + tmp_failure = true; + } + (false, true) => unreachable!(), + } } } } - /// Try to pop all jobs on the queue: returns `true` if it was able to do so successfully or - /// `false` otherwise. - fn try_pop_queue(&mut self) -> bool { + /// Try to pop all jobs on the queue: returns `(queue_is_empty, + /// encountered_temporary_failure)`. + fn try_pop_queue(&mut self) -> (bool, bool) { let snare = Arc::clone(&self.snare); let mut queue = snare.queue.lock().unwrap(); loop { - if self.num_running == self.maxjobs && !queue.is_empty() { - return false; + if self.num_running == self.maxjobs { + return (queue.is_empty(), false); } let pjob = queue.pop(|repo_id| { self.running.iter().any(|jobslot| { @@ -311,7 +328,7 @@ impl JobRunner { Err(Some(qj)) => { // The job couldn't be run for temporary reasons: we'll retry later. queue.push_front(qj); - return false; + return (true, true); } Err(None) => { // The job couldn't be run for permanent reasons: it has been consumed @@ -326,7 +343,7 @@ impl JobRunner { // We weren't able to pop any jobs from the queue, but that doesn't mean that // the queue is necessarily empty: there may be `QueueKind::Sequential` jobs in // it which can't be popped until others with the same path have completed. - return queue.is_empty(); + return (queue.is_empty(), false); } } } diff --git a/src/queue.rs b/src/queue.rs index 6b6206a..6812dfd 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -48,12 +48,7 @@ impl Queue { /// Are there any jobs in the queue? pub fn is_empty(&self) -> bool { - for v in self.q.values() { - if !v.is_empty() { - return false; - } - } - true + self.q.values().all(|x| x.is_empty()) } /// Push a new request to the back of the queue.