/* $NetBSD: qmgr_job.c,v 1.2 2017/02/14 01:16:47 christos Exp $ */
/*++
/* NAME
/* qmgr_job 3
/* SUMMARY
/* per-transport jobs
/* SYNOPSIS
/* #include "qmgr.h"
/*
/* QMGR_JOB *qmgr_job_obtain(message, transport)
/* QMGR_MESSAGE *message;
/* QMGR_TRANSPORT *transport;
/*
/* void qmgr_job_free(job)
/* QMGR_JOB *job;
/*
/* void qmgr_job_move_limits(job)
/* QMGR_JOB *job;
/*
/* QMGR_ENTRY *qmgr_job_entry_select(transport)
/* QMGR_TRANSPORT *transport;
/*
/* void qmgr_job_blocker_update(queue)
/* QMGR_QUEUE *queue;
/* DESCRIPTION
/* These routines add/delete/manipulate per-transport jobs.
/* Each job corresponds to a specific transport and message.
/* Each job has a peer list containing all pending delivery
/* requests for that message.
/*
/* qmgr_job_obtain() finds an existing job for named message and
/* transport combination. New empty job is created if no existing can
/* be found. In either case, the job is prepared for assignment of
/* (more) message recipients.
/*
/* qmgr_job_free() disposes of a per-transport job after all
/* its entries have been taken care of. It is an error to dispose
/* of a job that is still in use.
/*
/* qmgr_job_entry_select() attempts to find the next entry suitable
/* for delivery. The job preempting algorithm is also exercised.
/* If necessary, an attempt to read more recipients into core is made.
/* This can result in creation of more job, queue and entry structures.
/*
/* qmgr_job_blocker_update() updates the status of blocked
/* jobs after a decrease in the queue's concurrency level,
/* after the queue is throttled, or after the queue is resumed
/* from suspension.
/*
/* qmgr_job_move_limits() takes care of proper distribution of the
/* per-transport recipients limit among the per-transport jobs.
/* Should be called whenever a job's recipient slot becomes available.
/* DIAGNOSTICS
/* Panic: consistency check failure.
/* LICENSE
/* .ad
/* .fi
/* The Secure Mailer license must be distributed with this software.
/* AUTHOR(S)
/* Patrik Rak
/* patrik@raxoft.cz
/*--*/
/* System library. */
#include <sys_defs.h>
/* Utility library. */
#include <msg.h>
#include <htable.h>
#include <mymalloc.h>
#include <sane_time.h>
/* Application-specific. */
#include "qmgr.h"
/* Forward declarations */
static void qmgr_job_pop(QMGR_JOB *);
/* Helper macros */
#define HAS_ENTRIES(job) ((job)->selected_entries < (job)->read_entries)
/*
* The MIN_ENTRIES macro may underestimate a lot but we can't use message->rcpt_unread
* because we don't know if all those unread recipients go to our transport yet.
*/
#define MIN_ENTRIES(job) ((job)->read_entries)
#define MAX_ENTRIES(job) ((job)->read_entries + (job)->message->rcpt_unread)
#define RESET_CANDIDATE_CACHE(transport) ((transport)->candidate_cache_current = 0)
#define IS_BLOCKER(job,transport) ((job)->blocker_tag == (transport)->blocker_tag)
/* qmgr_job_create - create and initialize message job structure */
static QMGR_JOB *qmgr_job_create(QMGR_MESSAGE *message, QMGR_TRANSPORT *transport)
{
QMGR_JOB *job;
job = (QMGR_JOB *) mymalloc(sizeof(QMGR_JOB));
job->message = message;
QMGR_LIST_APPEND(message->job_list, job, message_peers);
htable_enter(transport->job_byname, message->queue_id, (void *) job);
job->transport = transport;
QMGR_LIST_INIT(job->transport_peers);
QMGR_LIST_INIT(job->time_peers);
job->stack_parent = 0;
QMGR_LIST_INIT(job->stack_children);
QMGR_LIST_INIT(job->stack_siblings);
job->stack_level = -1;
job->blocker_tag = 0;
job->peer_byname = htable_create(0);
QMGR_LIST_INIT(job->peer_list);
job->slots_used = 0;
job->slots_available = 0;
job->selected_entries = 0;
job->read_entries = 0;
job->rcpt_count = 0;
job->rcpt_limit = 0;
return (job);
}
/* qmgr_job_link - append the job to the job lists based on the time it was queued */
static void qmgr_job_link(QMGR_JOB *job)
{
QMGR_TRANSPORT *transport = job->transport;
QMGR_MESSAGE *message = job->message;
QMGR_JOB *prev, *next, *list_prev, *list_next, *unread, *current;
int delay;
/*
* Sanity checks.
*/
if (job->stack_level >= 0)
msg_panic("qmgr_job_link: already on the job lists (%d)", job->stack_level);
/*
* Traverse the time list and the scheduler list from the end and stop
* when we found job older than the one being linked.
*
* During the traversals keep track if we have come across either the
* current job or the first unread job on the job list. If this is the
* case, these pointers will be adjusted below as required.
*
* Although both lists are exactly the same when only jobs on the stack
* level zero are considered, it's easier to traverse them separately.
* Otherwise it's impossible to keep track of the current job pointer
* effectively.
*
* This may look inefficient but under normal operation it is expected that
* the loops will stop right away, resulting in normal list appends
* below. However, this code is necessary for reviving retired jobs and
* for jobs which are created long after the first chunk of recipients
* was read in-core (either of these can happen only for multi-transport
* messages).
*
* XXX Note that we test stack_parent rather than stack_level below. This
* subtle difference allows us to enqueue the job in correct time order
* with respect to orphaned children even after their original parent on
* level zero is gone. Consequently, the early loop stop in candidate
* selection works reliably, too. These are the reasons why we care to
* bother with children adoption at all.
*/
current = transport->job_current;
for (next = 0, prev = transport->job_list.prev; prev;
next = prev, prev = prev->transport_peers.prev) {
if (prev->stack_parent == 0) {
delay = message->queued_time - prev->message->queued_time;
if (delay >= 0)
break;
}
if (current == prev)
current = 0;
}
list_prev = prev;
list_next = next;
unread = transport->job_next_unread;
for (next = 0, prev = transport->job_bytime.prev; prev;
next = prev, prev = prev->time_peers.prev) {
delay = message->queued_time - prev->message->queued_time;
if (delay >= 0)
break;
if (unread == prev)
unread = 0;
}
/*
* Link the job into the proper place on the job lists and mark it so we
* know it has been linked.
*/
job->stack_level = 0;
QMGR_LIST_LINK(transport->job_list, list_prev, job, list_next, transport_peers);
QMGR_LIST_LINK(transport->job_bytime, prev, job, next, time_peers);
/*
* Update the current job pointer if necessary.
*/
if (current == 0)
transport->job_current = job;
/*
* Update the pointer to the first unread job on the job list and steal
* the unused recipient slots from the old one.
*/
if (unread == 0) {
unread = transport->job_next_unread;
transport->job_next_unread = job;
if (unread != 0)
qmgr_job_move_limits(unread);
}
/*
* Get as much recipient slots as possible. The excess will be returned
* to the transport pool as soon as the exact amount required is known
* (which is usually after all recipients have been read in core).
*/
if (transport->rcpt_unused > 0) {
job->rcpt_limit += transport->rcpt_unused;
message->rcpt_limit += transport->rcpt_unused;
transport->rcpt_unused = 0;
}
}
/* qmgr_job_find - lookup job associated with named message and transport */
static QMGR_JOB *qmgr_job_find(QMGR_MESSAGE *message, QMGR_TRANSPORT *transport)
{
/*
* Instead of traversing the message job list, we use single per
* transport hash table. This is better (at least with respect to memory
* usage) than having single hash table (usually almost empty) for each
* message.
*/
return ((QMGR_JOB *) htable_find(transport->job_byname, message->queue_id));
}
/* qmgr_job_obtain - find/create the appropriate job and make it ready for new recipients */
QMGR_JOB *qmgr_job_obtain(QMGR_MESSAGE *message, QMGR_TRANSPORT *transport)
{
QMGR_JOB *job;
/*
* Try finding an existing job, reviving it if it was already retired.
* Create a new job for this transport/message combination otherwise. In
* either case, the job ends linked on the job lists.
*/
if ((job = qmgr_job_find(message, transport)) == 0)
job = qmgr_job_create(message, transport);
if (job->stack_level < 0)
qmgr_job_link(job);
/*
* Reset the candidate cache because of the new expected recipients. Make
* sure the job is not marked as a blocker for the same reason. Note that
* this can result in having a non-blocker followed by more blockers.
* Consequently, we can't just update the current job pointer, we have to
* reset it. Fortunately qmgr_job_entry_select() will easily deal with
* this and will lookup the real current job for us.
*/
RESET_CANDIDATE_CACHE(transport);
if (IS_BLOCKER(job, transport)) {
job->blocker_tag = 0;
transport->job_current = transport->job_list.next;
}
return (job);
}
/* qmgr_job_move_limits - move unused recipient slots to the next unread job */
void qmgr_job_move_limits(QMGR_JOB *job)
{
QMGR_TRANSPORT *transport = job->transport;
QMGR_MESSAGE *message = job->message;
QMGR_JOB *next = transport->job_next_unread;
int rcpt_unused, msg_rcpt_unused;
/*
* Find next unread job on the job list if necessary. Cache it for later.
* This makes the amortized efficiency of this routine O(1) per job. Note
* that we use the time list whose ordering doesn't change over time.
*/
if (job == next) {
for (next = next->time_peers.next; next; next = next->time_peers.next)
if (next->message->rcpt_offset != 0)
break;
transport->job_next_unread = next;
}
/*
* Calculate the number of available unused slots.
*/
rcpt_unused = job->rcpt_limit - job->rcpt_count;
msg_rcpt_unused = message->rcpt_limit - message->rcpt_count;
if (msg_rcpt_unused < rcpt_unused)
rcpt_unused = msg_rcpt_unused;
/*
* Transfer the unused recipient slots back to the transport pool and to
* the next not-fully-read job. Job's message limits are adjusted
* accordingly. Note that the transport pool can be negative if we used
* some of the rcpt_per_stack slots.
*/
if (rcpt_unused > 0) {
job->rcpt_limit -= rcpt_unused;
message->rcpt_limit -= rcpt_unused;
transport->rcpt_unused += rcpt_unused;
if (next != 0 && (rcpt_unused = transport->rcpt_unused) > 0) {
next->rcpt_limit += rcpt_unused;
next->message->rcpt_limit += rcpt_unused;
transport->rcpt_unused = 0;
}
}
}
/* qmgr_job_parent_gone - take care of orphaned stack children */
static void qmgr_job_parent_gone(QMGR_JOB *job, QMGR_JOB *parent)
{
QMGR_JOB *child;
while ((child = job->stack_children.next) != 0) {
QMGR_LIST_UNLINK(job->stack_children, QMGR_JOB *, child, stack_siblings);
if (parent != 0)
QMGR_LIST_APPEND(parent->stack_children, child, stack_siblings);
child->stack_parent = parent;
}
}
/* qmgr_job_unlink - unlink the job from the job lists */
static void qmgr_job_unlink(QMGR_JOB *job)
{
const char *myname = "qmgr_job_unlink";
QMGR_TRANSPORT *transport = job->transport;
/*
* Sanity checks.
*/
if (job->stack_level != 0)
msg_panic("%s: non-zero stack level (%d)", myname, job->stack_level);
if (job->stack_parent != 0)
msg_panic("%s: parent present", myname);
if (job->stack_siblings.next != 0)
msg_panic("%s: siblings present", myname);
/*
* Make sure that children of job on zero stack level are informed that
* their parent is gone too.
*/
qmgr_job_parent_gone(job, 0);
/*
* Update the current job pointer if necessary.
*/
if (transport->job_current == job)
transport->job_current = job->transport_peers.next;
/*
* Invalidate the candidate selection cache if necessary.
*/
if (job == transport->candidate_cache
|| job == transport->candidate_cache_current)
RESET_CANDIDATE_CACHE(transport);
/*
* Remove the job from the job lists and mark it as unlinked.
*/
QMGR_LIST_UNLINK(transport->job_list, QMGR_JOB *, job, transport_peers);
QMGR_LIST_UNLINK(transport->job_bytime, QMGR_JOB *, job, time_peers);
job->stack_level = -1;
}
/* qmgr_job_retire - remove the job from the job lists while waiting for recipients to deliver */
static void qmgr_job_retire(QMGR_JOB *job)
{
if (msg_verbose)
msg_info("qmgr_job_retire: %s", job->message->queue_id);
/*
* Pop the job from the job stack if necessary.
*/
if (job->stack_level > 0)
qmgr_job_pop(job);
/*
* Make sure this job is not cached as the next unread job for this
* transport. The qmgr_entry_done() will make sure that the slots donated
* by this job are moved back to the transport pool as soon as possible.
*/
qmgr_job_move_limits(job);
/*
* Remove the job from the job lists. Note that it remains on the message
* job list, though, and that it can be revived by using
* qmgr_job_obtain(). Also note that the available slot counter is left
* intact.
*/
qmgr_job_unlink(job);
}
/* qmgr_job_free - release the job structure */
void qmgr_job_free(QMGR_JOB *job)
{
const char *myname = "qmgr_job_free";
QMGR_MESSAGE *message = job->message;
QMGR_TRANSPORT *transport = job->transport;
if (msg_verbose)
msg_info("%s: %s %s", myname, message->queue_id, transport->name);
/*
* Sanity checks.
*/
if (job->rcpt_count)
msg_panic("%s: non-zero recipient count (%d)", myname, job->rcpt_count);
/*
* Pop the job from the job stack if necessary.
*/
if (job->stack_level > 0)
qmgr_job_pop(job);
/*
* Return any remaining recipient slots back to the recipient slots pool.
*/
qmgr_job_move_limits(job);
if (job->rcpt_limit)
msg_panic("%s: recipient slots leak (%d)", myname, job->rcpt_limit);
/*
* Unlink and discard the structure. Check if the job is still linked on
* the job lists or if it was already retired before unlinking it.
*/
if (job->stack_level >= 0)
qmgr_job_unlink(job);
QMGR_LIST_UNLINK(message->job_list, QMGR_JOB *, job, message_peers);
htable_delete(transport->job_byname, message->queue_id, (void (*) (void *)) 0);
htable_free(job->peer_byname, (void (*) (void *)) 0);
myfree((void *) job);
}
/* qmgr_job_count_slots - maintain the delivery slot counters */
static void qmgr_job_count_slots(QMGR_JOB *job)
{
/*
* Count the number of delivery slots used during the delivery of the
* selected job. Also count the number of delivery slots available for
* its preemption.
*
* Despite its trivial look, this is one of the key parts of the theory
* behind this preempting scheduler.
*/
job->slots_available++;
job->slots_used++;
/*
* If the selected job is not the original current job, reset the
* candidate cache because the change above have slightly increased the
* chance of this job becoming a candidate next time.
*
* Don't expect that the change of the current jobs this turn will render
* the candidate cache invalid the next turn - it can happen that the
* next turn the original current job will be selected again and the
* cache would be considered valid in such case.
*/
if (job != job->transport->candidate_cache_current)
RESET_CANDIDATE_CACHE(job->transport);
}
/* qmgr_job_candidate - find best job candidate for preempting given job */
static QMGR_JOB *qmgr_job_candidate(QMGR_JOB *current)
{
QMGR_TRANSPORT *transport = current->transport;
QMGR_JOB *job, *best_job = 0;
double score, best_score = 0.0;
int max_slots, max_needed_entries, max_total_entries;
int delay;
time_t now = sane_time();
/*
* Fetch the result directly from the cache if the cache is still valid.
*
* Note that we cache negative results too, so the cache must be invalidated
* by resetting the cached current job pointer, not the candidate pointer
* itself.
*
* In case the cache is valid and contains no candidate, we can ignore the
* time change, as it affects only which candidate is the best, not if
* one exists. However, this feature requires that we no longer relax the
* cache resetting rules, depending on the automatic cache timeout.
*/
if (transport->candidate_cache_current == current
&& (transport->candidate_cache_time == now
|| transport->candidate_cache == 0))
return (transport->candidate_cache);
/*
* Estimate the minimum amount of delivery slots that can ever be
* accumulated for the given job. All jobs that won't fit into these
* slots are excluded from the candidate selection.
*/
max_slots = (MIN_ENTRIES(current) - current->selected_entries
+ current->slots_available) / transport->slot_cost;
/*
* Select the candidate with best time_since_queued/total_recipients
* score. In addition to jobs which don't meet the max_slots limit, skip
* also jobs which don't have any selectable entries at the moment.
*
* Instead of traversing the whole job list we traverse it just from the
* current job forward. This has several advantages. First, we skip some
* of the blocker jobs and the current job itself right away. But the
* really important advantage is that we are sure that we don't consider
* any jobs that are already stack children of the current job. Thanks to
* this we can easily include all encountered jobs which are leaf
* children of some of the preempting stacks as valid candidates. All we
* need to do is to make sure we do not include any of the stack parents.
* And, because the leaf children are not ordered by the time since
* queued, we have to exclude them from the early loop end test.
*
* However, don't bother searching if we can't find anything suitable
* anyway.
*/
if (max_slots > 0) {
for (job = current->transport_peers.next; job; job = job->transport_peers.next) {
if (job->stack_children.next != 0 || IS_BLOCKER(job, transport))
continue;
max_total_entries = MAX_ENTRIES(job);
max_needed_entries = max_total_entries - job->selected_entries;
delay = now - job->message->queued_time + 1;
if (max_needed_entries > 0 && max_needed_entries <= max_slots) {
score = (double) delay / max_total_entries;
if (score > best_score) {
best_score = score;
best_job = job;
}
}
/*
* Stop early if the best score is as good as it can get.
*/
if (delay <= best_score && job->stack_level == 0)
break;
}
}
/*
* Cache the result for later use.
*/
transport->candidate_cache = best_job;
transport->candidate_cache_current = current;
transport->candidate_cache_time = now;
return (best_job);
}
/* qmgr_job_preempt - preempt large message with smaller one */
static QMGR_JOB *qmgr_job_preempt(QMGR_JOB *current)
{
const char *myname = "qmgr_job_preempt";
QMGR_TRANSPORT *transport = current->transport;
QMGR_JOB *job, *prev;
int expected_slots;
int rcpt_slots;
/*
* Suppress preempting completely if the current job is not big enough to
* accumulate even the minimal number of slots required.
*
* Also, don't look for better job candidate if there are no available slots
* yet (the count can get negative due to the slot loans below).
*/
if (current->slots_available <= 0
|| MAX_ENTRIES(current) < transport->min_slots * transport->slot_cost)
return (current);
/*
* Find best candidate for preempting the current job.
*
* Note that the function also takes care that the candidate fits within the
* number of delivery slots which the current job is still able to
* accumulate.
*/
if ((job = qmgr_job_candidate(current)) == 0)
return (current);
/*
* Sanity checks.
*/
if (job == current)
msg_panic("%s: attempt to preempt itself", myname);
if (job->stack_children.next != 0)
msg_panic("%s: already on the job stack (%d)", myname, job->stack_level);
if (job->stack_level < 0)
msg_panic("%s: not on the job list (%d)", myname, job->stack_level);
/*
* Check if there is enough available delivery slots accumulated to
* preempt the current job.
*
* The slot loaning scheme improves the average message response time. Note
* that the loan only allows the preemption happen earlier, though. It
* doesn't affect how many slots have to be "paid" - in either case the
* full number of slots required has to be accumulated later before the
* current job can be preempted again.
*/
expected_slots = MAX_ENTRIES(job) - job->selected_entries;
if (current->slots_available / transport->slot_cost + transport->slot_loan
< expected_slots * transport->slot_loan_factor / 100.0)
return (current);
/*
* Preempt the current job.
*
* This involves placing the selected candidate in front of the current job
* on the job list and updating the stack parent/child/sibling pointers
* appropriately. But first we need to make sure that the candidate is
* taken from its previous job stack which it might be top of.
*/
if (job->stack_level > 0)
qmgr_job_pop(job);
QMGR_LIST_UNLINK(transport->job_list, QMGR_JOB *, job, transport_peers);
prev = current->transport_peers.prev;
QMGR_LIST_LINK(transport->job_list, prev, job, current, transport_peers);
job->stack_parent = current;
QMGR_LIST_APPEND(current->stack_children, job, stack_siblings);
job->stack_level = current->stack_level + 1;
/*
* Update the current job pointer and explicitly reset the candidate
* cache.
*/
transport->job_current = job;
RESET_CANDIDATE_CACHE(transport);
/*
* Since the single job can be preempted by several jobs at the same
* time, we have to adjust the available slot count now to prevent using
* the same slots multiple times. To do that we subtract the number of
* slots the preempting job will supposedly use. This number will be
* corrected later when that job is popped from the stack to reflect the
* number of slots really used.
*
* As long as we don't need to keep track of how many slots were really
* used, we can (ab)use the slots_used counter for counting the
* difference between the real and expected amounts instead of the
* absolute amount.
*/
current->slots_available -= expected_slots * transport->slot_cost;
job->slots_used = -expected_slots;
/*
* Add part of extra recipient slots reserved for preempting jobs to the
* new current job if necessary.
*
* Note that transport->rcpt_unused is within <-rcpt_per_stack,0> in such
* case.
*/
if (job->message->rcpt_offset != 0) {
rcpt_slots = (transport->rcpt_per_stack + transport->rcpt_unused + 1) / 2;
job->rcpt_limit += rcpt_slots;
job->message->rcpt_limit += rcpt_slots;
transport->rcpt_unused -= rcpt_slots;
}
if (msg_verbose)
msg_info("%s: %s by %s, level %d", myname, current->message->queue_id,
job->message->queue_id, job->stack_level);
return (job);
}
/* qmgr_job_pop - remove the job from its job preemption stack */
static void qmgr_job_pop(QMGR_JOB *job)
{
const char *myname = "qmgr_job_pop";
QMGR_TRANSPORT *transport = job->transport;
QMGR_JOB *parent;
if (msg_verbose)
msg_info("%s: %s", myname, job->message->queue_id);
/*
* Sanity checks.
*/
if (job->stack_level <= 0)
msg_panic("%s: not on the job stack (%d)", myname, job->stack_level);
/*
* Adjust the number of delivery slots available to preempt job's parent.
* Note that the -= actually adds back any unused slots, as we have
* already subtracted the expected amount of slots from both counters
* when we did the preemption.
*
* Note that we intentionally do not adjust slots_used of the parent. Doing
* so would decrease the maximum per message inflation factor if the
* preemption appeared near the end of parent delivery.
*
* For the same reason we do not adjust parent's slots_available if the
* parent is not the original parent that was preempted by this job
* (i.e., the original parent job has already completed).
*
* This is another key part of the theory behind this preempting scheduler.
*/
if ((parent = job->stack_parent) != 0
&& job->stack_level == parent->stack_level + 1)
parent->slots_available -= job->slots_used * transport->slot_cost;
/*
* Remove the job from its parent's children list.
*/
if (parent != 0) {
QMGR_LIST_UNLINK(parent->stack_children, QMGR_JOB *, job, stack_siblings);
job->stack_parent = 0;
}
/*
* If there is a parent, let it adopt all those orphaned children.
* Otherwise at least notify the children that their parent is gone.
*/
qmgr_job_parent_gone(job, parent);
/*
* Put the job back to stack level zero.
*/
job->stack_level = 0;
/*
* Explicitly reset the candidate cache. It's not worth trying to skip
* this under some complicated conditions - in most cases the popped job
* is the current job so we would have to reset it anyway.
*/
RESET_CANDIDATE_CACHE(transport);
/*
* Here we leave the remaining work involving the proper placement on the
* job list to the caller. The most important reason for this is that it
* allows us not to look up where exactly to place the job.
*
* The caller is also made responsible for invalidating the current job
* cache if necessary.
*/
#if 0
QMGR_LIST_UNLINK(transport->job_list, QMGR_JOB *, job, transport_peers);
QMGR_LIST_LINK(transport->job_list, some_prev, job, some_next, transport_peers);
if (transport->job_current == job)
transport->job_current = job->transport_peers.next;
#endif
}
/* qmgr_job_peer_select - select next peer suitable for delivery */
static QMGR_PEER *qmgr_job_peer_select(QMGR_JOB *job)
{
QMGR_PEER *peer;
QMGR_MESSAGE *message = job->message;
/*
* Try reading in more recipients. We do that as soon as possible
* (almost, see below), to make sure there is enough new blood pouring
* in. Otherwise single recipient for slow destination might starve the
* entire message delivery, leaving lot of fast destination recipients
* sitting idle in the queue file.
*
* Ideally we would like to read in recipients whenever there is a space,
* but to prevent excessive I/O, we read them only when enough time has
* passed or we can read enough of them at once.
*
* Note that even if we read the recipients few at a time, the message
* loading code tries to put them to existing recipient entries whenever
* possible, so the per-destination recipient grouping is not grossly
* affected.
*
* XXX Workaround for logic mismatch. The message->refcount test needs
* explanation. If the refcount is zero, it means that qmgr_active_done()
* is being completed asynchronously. In such case, we can't read in
* more recipients as bad things would happen after qmgr_active_done()
* continues processing. Note that this results in the given job being
* stalled for some time, but fortunately this particular situation is so
* rare that it is not critical. Still we seek for better solution.
*/
if (message->rcpt_offset != 0
&& message->refcount > 0
&& (message->rcpt_limit - message->rcpt_count >= job->transport->refill_limit
|| (message->rcpt_limit > message->rcpt_count
&& sane_time() - message->refill_time >= job->transport->refill_delay)))
qmgr_message_realloc(message);
/*
* Get the next suitable peer, if there is any.
*/
if (HAS_ENTRIES(job) && (peer = qmgr_peer_select(job)) != 0)
return (peer);
/*
* There is no suitable peer in-core, so try reading in more recipients
* if possible. This is our last chance to get suitable peer before
* giving up on this job for now.
*
* XXX For message->refcount, see above.
*/
if (message->rcpt_offset != 0
&& message->refcount > 0
&& message->rcpt_limit > message->rcpt_count) {
qmgr_message_realloc(message);
if (HAS_ENTRIES(job))
return (qmgr_peer_select(job));
}
return (0);
}
/* qmgr_job_entry_select - select next entry suitable for delivery */
QMGR_ENTRY *qmgr_job_entry_select(QMGR_TRANSPORT *transport)
{
QMGR_JOB *job, *next;
QMGR_PEER *peer;
QMGR_ENTRY *entry;
/*
* Get the current job if there is one.
*/
if ((job = transport->job_current) == 0)
return (0);
/*
* Exercise the preempting algorithm if enabled.
*
* The slot_cost equal to 1 causes the algorithm to degenerate and is
* therefore disabled too.
*/
if (transport->slot_cost >= 2)
job = qmgr_job_preempt(job);
/*
* Select next entry suitable for delivery. In case the current job can't
* provide one because of the per-destination concurrency limits, we mark
* it as a "blocker" job and continue with the next job on the job list.
*
* Note that the loop also takes care of getting the "stall" jobs (job with
* no entries currently available) out of the way if necessary. Stall
* jobs can appear in case of multi-transport messages whose recipients
* don't fit in-core at once. Some jobs created by such message may have
* only few recipients and would stay on the job list until all other
* jobs of that message are delivered, blocking precious recipient slots
* available to this transport. Or it can happen that the job has some
* more entries but suddenly they all get deferred. Whatever the reason,
* we retire such jobs below if we happen to come across some.
*/
for ( /* empty */ ; job; job = next) {
next = job->transport_peers.next;
/*
* Don't bother if the job is known to have no available entries
* because of the per-destination concurrency limits.
*/
if (IS_BLOCKER(job, transport))
continue;
if ((peer = qmgr_job_peer_select(job)) != 0) {
/*
* We have found a suitable peer. Select one of its entries and
* adjust the delivery slot counters.
*/
entry = qmgr_entry_select(peer);
qmgr_job_count_slots(job);
/*
* Remember the current job for the next time so we don't have to
* crawl over all those blockers again. They will be reconsidered
* when the concurrency limit permits.
*/
transport->job_current = job;
/*
* In case we selected the very last job entry, remove the job
* from the job lists right now.
*
* This action uses the assumption that once the job entry has been
* selected, it can be unselected only before the message ifself
* is deferred. Thus the job with all entries selected can't
* re-appear with more entries available for selection again
* (without reading in more entries from the queue file, which in
* turn invokes qmgr_job_obtain() which re-links the job back on
* the lists if necessary).
*
* Note that qmgr_job_move_limits() transfers the recipients slots
* correctly even if the job is unlinked from the job list thanks
* to the job_next_unread caching.
*/
if (!HAS_ENTRIES(job) && job->message->rcpt_offset == 0)
qmgr_job_retire(job);
/*
* Finally. Hand back the fruit of our tedious effort.
*/
return (entry);
} else if (HAS_ENTRIES(job)) {
/*
* The job can't be selected due the concurrency limits. Mark it
* together with its queues so we know they are blocking the job
* list and they get the appropriate treatment. In particular,
* all blockers will be reconsidered when one of the problematic
* queues will accept more deliveries. And the job itself will be
* reconsidered if it is assigned some more entries.
*/
job->blocker_tag = transport->blocker_tag;
for (peer = job->peer_list.next; peer; peer = peer->peers.next)
if (peer->entry_list.next != 0)
peer->queue->blocker_tag = transport->blocker_tag;
} else {
/*
* The job is "stalled". Retire it until it either gets freed or
* gets more entries later.
*/
qmgr_job_retire(job);
}
}
/*
* We have not found any entry we could use for delivery. Well, things
* must have changed since this transport was selected for asynchronous
* allocation. Never mind. Clear the current job pointer and reluctantly
* report back that we have failed in our task.
*/
transport->job_current = 0;
return (0);
}
/* qmgr_job_blocker_update - update "blocked job" status */
void qmgr_job_blocker_update(QMGR_QUEUE *queue)
{
QMGR_TRANSPORT *transport = queue->transport;
/*
* If the queue was blocking some of the jobs on the job list, check if
* the concurrency limit has lifted. If there are still some pending
* deliveries, give it a try and unmark all transport blockers at once.
* The qmgr_job_entry_select() will do the rest. In either case make sure
* the queue is not marked as a blocker anymore, with extra handling of
* queues which were declared dead.
*
* Note that changing the blocker status also affects the candidate cache.
* Most of the cases would be automatically recognized by the current job
* change, but we play safe and reset the cache explicitly below.
*
* Keeping the transport blocker tag odd is an easy way to make sure the tag
* never matches jobs that are not explicitly marked as blockers.
*/
if (queue->blocker_tag == transport->blocker_tag) {
if (queue->window > queue->busy_refcount && queue->todo.next != 0) {
transport->blocker_tag += 2;
transport->job_current = transport->job_list.next;
transport->candidate_cache_current = 0;
}
if (queue->window > queue->busy_refcount || QMGR_QUEUE_THROTTLED(queue))
queue->blocker_tag = 0;
}
}