================== Live Log Streaming ================== Debusine lets you view the full log of a task after it finishes. But while a task is still running, there is nothing to see. You have to wait until it completes before you can tell whether anything went wrong. For long-running tasks like package builds, this means you might wait twenty minutes only to discover a failure that happened in the first two. This blueprint describes how to make task output visible in real time, while the task is still running. Architecture Overview ===================== The change involves three layers working together: 1. **Worker**: as a task runs, the worker reads its output line by line and sends each line to the server over a new WebSocket connection, using a new message type called ``log_line``. 2. **Server**: as ``log_line`` messages arrive, the server writes them into a Redis Stream, one entry per line, keyed by the work request ID. Any number of readers can then consume from that stream independently. 3. **Browser**: a small JavaScript snippet on the task detail page subscribes to a server endpoint that reads from the Redis Stream and pushes lines down to the browser as they arrive. The page shows the output appearing line by line, without any refresh. The overall flow looks like this:: Worker subprocess | | (line by line) v Worker WebSocket client -- log_line message --> Server WebSocket handler | | xadd v Redis Stream task:logs:{id} | | xread (blocking) v Django streaming view (new WebSocket consumer) | | WebSocket v Browser (task page) SERVER task (runs on Celery worker, no WebSocket connection) | | xadd (directly) v Redis Stream task:logs:{id} (same path from here onward) ``SERVER`` tasks run on a Celery worker and do not have a WebSocket connection to the server. They write log lines directly to Redis instead of routing them through the worker WebSocket handler. From the Redis Stream onward, the flow is identical to the worker path above. Worker-Side Changes =================== The worker already runs tasks as subprocesses. Tasks are defined under ``debusine/task/`` and are executed through the executor layer at ``debusine/task/executor/``. The change here is to capture output from that execution incrementally, rather than waiting for the task to finish. There are two possible approaches to capturing the output: * **Option A: Stream from the subprocess directly**: if the executor runs subprocesses in a way that exposes stdout/stderr as a stream, we can wrap them in a ``TextIOWrapper`` and read line by line, forwarding each line over the WebSocket as it arrives. This may require some restructuring of the executor API. With this approach, we get a single log stream per job for stdout and another one for stderr. * **Option B: Watch the log file**: if the executor already writes output to a log file on disk, the worker can tail that file while the task runs and forward new lines as they appear. This avoids touching the executor internals and may be simpler to integrate. It also opens the door to streaming multiple log files per job in the future, since the watcher is not tied to a single subprocess stream. So we can get multiple log files per task. The chosen approach is **Option A**. This gives a single combined log stream per job. **Option B** was considered and discarded. While it avoids touching the executor internals, it introduces filesystem indirection and makes it harder to reason about ordering and completeness. The simpler, more direct approach is preferred. The stream should include more than just the subprocess output. Internal worker events like: setting up the executor, downloading input artifacts, uploading result artifacts are useful context for someone watching a task run. These events will be written into the same stream as stdout and stderr, tagged with an identifier so the browser can display or filter them separately. For each line captured, the worker sends a ``log_line`` message to the server over a WebSocket connection. The message looks like this: .. code-block:: json { "type": "log_line", "work_request_id": 42, "line": "SAMPLE LINE FOR LOG", "timestamp": "2026-05-09T14:23:01+00:00", "stream": "stdout" } The ``stream`` field distinguishes between stdout, stderr, and internal, so the server and browser can display them differently if needed (for example, showing stderr lines in a different colour). The worker does not send every line directly to the WebSocket as soon as it arrives. There is a buffer between the running task and the WebSocket sender. Under normal conditions, lines move through the buffer and are forwarded promptly. But if the buffer fills up faster than the WebSocket can drain it. For example, a task emitting output at very high speed. The worker will drop the buffered lines rather than block the task, and send a synthetic message instead: .. code-block:: json { "type": "log_line", "work_request_id": 42, "line": "[15 messages skipped]", "timestamp": "2026-05-09T14:23:01+00:00", "stream": "internal" } This keeps the worker from falling behind or consuming unbounded memory, while still giving the user a visible signal that some output was lost. If the WebSocket connection drops mid-task, the worker should not crash. It should log the failure locally and continue running the task. Losing the live stream is acceptable but losing the task result is not. When the task finishes, the worker sends a final ``log_line_end`` message with the exit code, so the server knows the stream is complete: .. code-block:: json { "type": "log_line_end", "work_request_id": 42, "exit_code": 0, "timestamp": "2026-05-09T14:25:00+00:00" } For reference, here are representative examples of each message variant the worker may send: *stdout line*: normal task output: .. code-block:: json { "type": "log_line", "work_request_id": 42, "line": "Building package foo 1.2.3...", "timestamp": "2026-05-09T14:23:01+00:00", "stream": "stdout" } *stderr line*: error or diagnostic output from the subprocess: .. code-block:: json { "type": "log_line", "work_request_id": 42, "line": "warning: deprecated function used", "timestamp": "2026-05-09T14:23:05+00:00", "stream": "stderr" } *internal line*: worker lifecycle events (setup, artifact upload, etc.): .. code-block:: json { "type": "log_line", "work_request_id": 42, "line": "Downloading input artifact foo.dsc", "timestamp": "2026-05-09T14:22:58+00:00", "stream": "internal" } *skipped-messages notice*: emitted when the buffer overflows: .. code-block:: json { "type": "log_line", "work_request_id": 42, "line": "[15 messages skipped]", "timestamp": "2026-05-09T14:23:10+00:00", "stream": "internal" } Server-Side: Storing Logs in Redis =================================== When the server receives a ``log_line`` message, it writes the line into a Redis Stream. Each work request gets its own stream, keyed by its ID: .. code-block:: text task:logs:{work_request_id} So for work request 42, the key would be ``task:logs:42``. Each entry in the stream stores three fields: .. code-block:: text line -> the text of the log line timestamp -> ISO 8601 UTC timestamp from the worker stream -> "stdout" or "stderr" Redis Streams are a good fit here because they are an ordered, persistent log. A consumer that connects late can still request all entries from the beginning by starting from ID ``0``. This is different from Redis Pub/Sub, where late consumers miss anything sent before they connected, which would be wrong for a task log where you want the full output from the start. To prevent the stream from growing without bound for very verbose tasks, the server applies a maximum length when writing: .. code-block:: python r.xadd(f"task:logs:{work_request_id}", entry, maxlen=10000, approximate=True) When the task completes, the server uploads the full log content as an artifact. After that, the Redis key is deleted, since the durable copy now lives in the artifact store. Server-Side: The Streaming View ================================ The server needs a view that a browser can subscribe to and receive log lines from as they arrive. This view reads from the Redis Stream for the requested work request and pushes each line down to the client. Debusine already has two WebSocket consumers: one for the worker connection, and one for clients waiting for a job to complete. The streaming view will be a **third WebSocket consumer** added to that same file. It handles browser connections that want to watch a running task's output. This transport was chosen over Server-Sent Events because WebSockets are bidirectional, leaving the door open for client-to-server feedback in the future. For example, backpressure signalling or explicit acknowledgements from the browser. The view works as follows, Given a ``work_request_id``, it: 1. Checks that the requesting user has permission to view that work request. 2. Opens a blocking ``xread`` loop on ``task:logs:{work_request_id}``, starting from ID ``0`` to get all lines from the beginning. 3. Sends each line to the client as it arrives. 4. Stops when the work request status moves to ``completed`` or ``aborted``, and closes the connection. If the work request is already completed when the browser subscribes, the view reads the full stream from Redis (if the key still exists) or falls back to the artifact. This way, the same view works for both live and recently-finished tasks. .. note:: The WebSocket protocol between the server and the browser (the exact message format, event types, and connection lifecycle) will be designed in detail at a later stage, once the worker and server storage layers are in place. Log Persistence =============== When a task finishes, the live Redis Stream has served its purpose. The full log needs to move somewhere durable before the stream is cleaned up. When a task finishes, the existing system already collects the task output and uploads it as an artifact attached to the work request. That behavior does not change. After the existing artifact upload completes successfully, the Redis key is deleted since the durable copy now lives in the artifact store. A hard expiry is also set on the Redis key as a safety net, independent of whether the artifact upload succeeds. This prevents orphaned streams from accumulating in Redis if something goes wrong silently: .. code-block:: python r.expire(f"task:logs:{work_request_id}", 60 * 60 * 24) # 24 hours Browser-Side ============ The worker and server changes in the previous steps are the core of the feature. The browser side makes it user-visible. The task detail page gets a small vanilla JavaScript snippet. No framework, no build step. When the page loads for a task that is currently ``running``, the snippet opens a WebSocket connection to the new streaming consumer and appends each incoming message to the page. A few small things worth handling: * **Auto-scroll**: the page should scroll to the bottom as new lines arrive, so the user always sees the latest output without manual scrolling. * **stderr styling**: if the server includes a ``stream`` field in the SSE event, lines from stderr can be given a different style (a muted colour or a small label) so they are visually distinct from stdout. * **Already-completed tasks**: if the page loads for a task that just finished, the same endpoint serves the full log from Redis or from the artifact, so the snippet does not need any special case.