Skip to content

Scheduler

Proportional credit-weight task scheduling.

Proportional fair-share scheduler for assigning tasks to idle agents.

Uses a purely deterministic algorithm -- zero LLM calls. Every token budget is spent on agent work, not on deciding which work to do.

The scheduling algorithm runs in two phases each time an idle agent needs a task:

  1. Min-task guarantee -- Projects that have completed zero tasks in the current scheduling window are prioritized first. This ensures every active project gets at least one task assigned before proportional allocation kicks in.

  2. Deficit-based proportional allocation -- Among projects that already have at least one completion, the scheduler picks the project whose actual token usage ratio is furthest below its target ratio (derived from credit_weight). This gradually converges each project toward its fair share of total agent time.

Both phases respect per-project concurrency limits (max_concurrent_agents), per-project / global budget caps, and workspace availability (a project with all workspaces locked cannot receive new assignments even if it has quota).

Key design properties:

  • Pure function — the scheduler takes a snapshot (SchedulerState) and returns actions with zero side effects, zero LLM calls, and zero I/O.
  • Starvation-freemin_task_guarantee ensures every active project eventually receives at least one task per scheduling window.
  • Convergent — deficit-based proportional allocation gradually steers each project toward its fair share; short-term imbalances self-correct over multiple scheduling rounds.

Concrete example of deficit-based scheduling::

Projects: A (weight=3), B (weight=1)
Total weight = 4 → target ratios: A=75%, B=25%
Current window usage: A=1000 tokens, B=500 tokens
Total tokens = 1500 → actual ratios: A=66.7%, B=33.3%

Deficits:  A = 66.7% - 75% = -8.3%  (under-served)
           B = 33.3% - 25% = +8.3%  (over-served)

→ Project A sorts first because its deficit is more negative.
→ The scheduler assigns A's highest-priority READY task next.

Over multiple rounds, this converges: A will keep getting priority
until its actual usage ratio approaches 75%.

Time complexity: O(A × P × log P) per cycle, where A = idle agents and P = active projects. Both are typically small (<10), so scheduling is effectively instant.

Integration with the orchestrator:

The orchestrator's ``_schedule()`` method builds a ``SchedulerState``
snapshot from DB queries each cycle, passes it to ``Scheduler.schedule()``,
and receives back a list of ``AssignAction`` objects.  The orchestrator
then launches background asyncio tasks for each assignment.

See ``src/orchestrator.py::_schedule()`` for snapshot construction.
See ``specs/scheduler-and-budget.md`` for the full specification.

Classes

AssignAction dataclass

AssignAction(agent_id: str, task_id: str, project_id: str)

A scheduling decision: assign one specific task to one specific agent.

This is the output type of the scheduler -- a list of these actions is returned each scheduling round, one per idle agent that received work. The orchestrator is responsible for actually executing the assignment (updating the database, starting the agent process, etc.).

SchedulerState dataclass

SchedulerState(projects: list[Project], tasks: list[Task], agents: list[Agent], project_token_usage: dict[str, int], project_active_agent_counts: dict[str, int], tasks_completed_in_window: dict[str, int], project_available_workspaces: dict[str, int] = dict(), workspace_locks: dict[str, str | None] = dict(), global_budget: int | None = None, global_tokens_used: int = 0)

A snapshot of all system state the scheduler needs to make decisions.

The scheduler is a pure function: given a SchedulerState, it returns a list of AssignActions with no side effects. This stateless/functional design makes the algorithm easy to test and reason about -- the orchestrator builds this snapshot each tick, and the scheduler never touches the database or any external resource.

All "window" fields (token usage, completed counts) are scoped to the rolling_window_hours configured in the scheduling config. The rolling window creates a "forgetting" mechanism: old usage ages out, so a project that was over-served yesterday can still receive fair allocation today. The orchestrator computes these from DB queries filtered by time.time() - window_hours * 3600.

Scheduler

Functions

schedule staticmethod
schedule(state: SchedulerState) -> list[AssignAction]

Assign READY tasks to idle agents using proportional fair-share.

Algorithm steps: 1. Bail out early if the global token budget is exhausted. 2. Collect idle agents and group READY tasks by project. 3. For each idle agent (in order), rank active projects by: a. Min-task guarantee -- projects with zero completions in the window sort first (phase 1). b. Deficit -- among the rest, the project whose actual token usage is furthest below its credit_weight share sorts first (phase 2). 4. Walk the ranked project list; skip any project that has hit its budget cap or concurrency limit. Pick the highest-priority READY task from the first eligible project. 5. Record the assignment and move to the next idle agent.

Returns a list of :class:AssignAction -- one per agent that was matched with a task. May be empty if no work can be assigned.

Source code in src/scheduler.py
@staticmethod
def schedule(state: SchedulerState) -> list[AssignAction]:
    """Assign READY tasks to idle agents using proportional fair-share.

    Algorithm steps:
    1. Bail out early if the global token budget is exhausted.
    2. Collect idle agents and group READY tasks by project.
    3. For each idle agent (in order), rank active projects by:
       a. Min-task guarantee -- projects with zero completions in the
          window sort first (phase 1).
       b. Deficit -- among the rest, the project whose actual token
          usage is furthest below its ``credit_weight`` share sorts
          first (phase 2).
    4. Walk the ranked project list; skip any project that has hit its
       budget cap or concurrency limit.  Pick the highest-priority
       READY task from the first eligible project.
    5. Record the assignment and move to the next idle agent.

    Returns a list of :class:`AssignAction` -- one per agent that was
    matched with a task.  May be empty if no work can be assigned.
    """
    # Check global budget
    if (
        state.global_budget is not None
        and state.global_tokens_used >= state.global_budget
    ):
        return []

    idle_agents = [a for a in state.agents if a.state == AgentState.IDLE]
    if not idle_agents:
        return []

    # Group ready tasks by project
    ready_by_project: dict[str, list[Task]] = {}
    for task in state.tasks:
        if task.status == TaskStatus.READY:
            ready_by_project.setdefault(task.project_id, []).append(task)

    # Sort tasks within each project by priority (lower = higher priority),
    # then by creation order (id as a proxy for FIFO within same priority).
    # This determines which task the scheduler picks when a project is selected.
    for tasks in ready_by_project.values():
        tasks.sort(key=lambda t: (t.priority, t.id))

    # Filter to active projects with ready tasks
    active_projects = [
        p for p in state.projects
        if p.status == ProjectStatus.ACTIVE and p.id in ready_by_project
    ]
    if not active_projects:
        return []

    # Calculate totals for proportional ratio computation.
    # ``total_weight`` is the denominator for target ratios (each
    # project's target = credit_weight / total_weight).
    # ``total_tokens`` is the denominator for actual ratios (each
    # project's actual = tokens_used / total_tokens).
    # We clamp total_tokens to at least 1 to avoid division by zero
    # during the first scheduling round before any tokens are used.
    total_weight = sum(p.credit_weight for p in active_projects)
    total_tokens = sum(state.project_token_usage.values()) or 1  # avoid div/0

    # Track assignments made in this scheduling round.  These sets
    # prevent double-assignment: an agent or task matched once won't be
    # considered again in the same round.  ``round_agent_counts`` is a
    # mutable copy of the live counts so that assignments within this
    # round are reflected in subsequent concurrency-limit checks.
    actions: list[AssignAction] = []
    assigned_agents: set[str] = set()
    assigned_tasks: set[str] = set()
    round_agent_counts: dict[str, int] = dict(state.project_active_agent_counts)

    for agent in idle_agents:
        if agent.id in assigned_agents:
            continue

        # Sort projects by scheduling priority using a two-level key:
        #
        # Level 1 — Min-task guarantee (binary):
        #   Projects with zero completions in the window sort first
        #   (has_guarantee=0).  This ensures starvation prevention:
        #   every active project gets at least one task before
        #   proportional allocation kicks in.
        #
        # Level 2 — Deficit score (continuous):
        #   Among projects at the same guarantee level, the one whose
        #   actual token usage ratio is furthest *below* its target
        #   ratio (derived from credit_weight) sorts first.  A negative
        #   deficit means the project is under-served relative to its
        #   weight; a positive deficit means over-served.
        #
        # Together these produce a fair ordering: starved projects go
        # first, then under-served projects, then over-served ones.
        def project_sort_key(p: Project) -> tuple[int, float]:
            completed = state.tasks_completed_in_window.get(p.id, 0)
            has_guarantee = 1 if completed > 0 else 0  # 0 = needs guarantee (sorts first)
            target_ratio = p.credit_weight / total_weight
            actual_ratio = state.project_token_usage.get(p.id, 0) / total_tokens
            deficit = actual_ratio - target_ratio  # negative = below target
            return (has_guarantee, deficit)

        sorted_projects = sorted(active_projects, key=project_sort_key)

        for project in sorted_projects:
            # Check per-project budget
            if (
                project.budget_limit is not None
                and state.project_token_usage.get(project.id, 0)
                >= project.budget_limit
            ):
                continue

            # Check concurrency limit
            current_agents = round_agent_counts.get(project.id, 0)
            if current_agents >= project.max_concurrent_agents:
                continue

            # Skip projects with no available workspaces.
            # Workspace availability is a hard physical constraint: each
            # agent execution needs an exclusive workspace lock, so we
            # can't assign more tasks than there are unlocked workspaces.
            # When project_available_workspaces is empty (e.g. in tests),
            # this check is skipped — the orchestrator handles the
            # "no workspace" case gracefully in _prepare_workspace.
            if (
                state.project_available_workspaces
                and state.project_available_workspaces.get(project.id, 0) <= 0
            ):
                continue

            # Pick highest priority ready task not yet assigned
            # Also filter out tasks whose preferred workspace is locked
            available = [
                t for t in ready_by_project.get(project.id, [])
                if t.id not in assigned_tasks
                and _workspace_available(t, state.workspace_locks)
            ]
            if not available:
                continue

            task = available[0]
            actions.append(AssignAction(
                agent_id=agent.id,
                task_id=task.id,
                project_id=project.id,
            ))
            assigned_agents.add(agent.id)
            assigned_tasks.add(task.id)
            round_agent_counts[project.id] = current_agents + 1
            break

    return actions