Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 29 additions & 12 deletions src/jobrunner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,19 +81,21 @@ impl JobRunner {
/// Listen for new jobs on the queue and then run them.
fn attend(&mut self) {
self.update_pollfds();
// `check_queue` serves two subtly different purposes:
// * Has the event pipe told us there are new jobs in the queue?
// * Are there jobs in the queue from a previous round that we couldn't run yet?
// Have we encountered a temporary failure when trying to run a job? If so, we'll need to
// retry again after a timeout. Note: if this is true then `check_queue` will also be true.
let mut tmp_failure = false;
// Do we need to check the queue?
let mut check_queue = false;
// A scratch buffer used to read from files.
let mut buf = Box::new([0; READBUF]);
// The earliest finish_by time of any running process (i.e. the process that will timeout
// the soonest).
let mut next_finish_by: Option<Instant> = None;
loop {
assert!(!tmp_failure || check_queue);
// If there are jobs on the queue we haven't been able to run for temporary reasons,
// then wait a short amount of time and try again.
let mut timeout = if check_queue { WAIT_TIMEOUT * 1000 } else { -1 };
let mut timeout = if tmp_failure { WAIT_TIMEOUT * 1000 } else { -1 };
// If any processes will exceed their timeout then, if that's shorter than the above
// timeout, only wait for enough time to pass before we need to send them SIGTERM.
if let Some(fby) = next_finish_by {
Expand Down Expand Up @@ -247,6 +249,7 @@ impl JobRunner {
self.running[i] = None;
self.num_running -= 1;
self.update_pollfds();
check_queue = true;
}
}
}
Expand All @@ -273,19 +276,33 @@ impl JobRunner {
// However, it's only worth us checking the queue (which requires a lock) if there's
// space for us to run further jobs.
if check_queue && self.num_running < self.maxjobs {
check_queue = !self.try_pop_queue();
match self.try_pop_queue() {
(false, false) => {
check_queue = false;
tmp_failure = false;
}
(true, false) => {
check_queue = true;
tmp_failure = false;
}
(true, true) => {
check_queue = true;
tmp_failure = true;
}
(false, true) => unreachable!(),
}
}
}
}

/// Try to pop all jobs on the queue: returns `true` if it was able to do so successfully or
/// `false` otherwise.
fn try_pop_queue(&mut self) -> bool {
/// Try to pop all jobs on the queue: returns `(queue_is_empty,
/// encountered_temporary_failure)`.
fn try_pop_queue(&mut self) -> (bool, bool) {
let snare = Arc::clone(&self.snare);
let mut queue = snare.queue.lock().unwrap();
loop {
if self.num_running == self.maxjobs && !queue.is_empty() {
return false;
if self.num_running == self.maxjobs {
return (queue.is_empty(), false);
}
let pjob = queue.pop(|repo_id| {
self.running.iter().any(|jobslot| {
Expand All @@ -311,7 +328,7 @@ impl JobRunner {
Err(Some(qj)) => {
// The job couldn't be run for temporary reasons: we'll retry later.
queue.push_front(qj);
return false;
return (true, true);
}
Err(None) => {
// The job couldn't be run for permanent reasons: it has been consumed
Expand All @@ -326,7 +343,7 @@ impl JobRunner {
// We weren't able to pop any jobs from the queue, but that doesn't mean that
// the queue is necessarily empty: there may be `QueueKind::Sequential` jobs in
// it which can't be popped until others with the same path have completed.
return queue.is_empty();
return (queue.is_empty(), false);
}
}
}
Expand Down
7 changes: 1 addition & 6 deletions src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,7 @@ impl Queue {

/// Are there any jobs in the queue?
pub fn is_empty(&self) -> bool {
for v in self.q.values() {
if !v.is_empty() {
return false;
}
}
true
self.q.values().all(|x| x.is_empty())
}

/// Push a new request to the back of the queue.
Expand Down