We've reviewed leader election and replication in Seq clusters. While these are tricky to get right, and involve nontrivial code, they're dwarfed in complexity by work sharing, which is the basis of Seq's scale-out functionality.
Seq clusters ingest data at a modest rate over long periods of time, and then are called on to answer queries about that data, which at MB/min ingestion levels can easily add up into GB- or TB-sized data sets to query. Scale-out is how each node in a Seq cluster can contribute some of its local processing power and I/O bandwidth to produce a result faster than any one node could on its own.
But work sharing isn't a single problem:
- How is a query broken up into independent pieces that can run on different nodes?
- How does work get to a node with the data required to perform it?
- How does the system decide that a node has capacity to run some work, and how much?
- How do nodes at different Seq versions safely share work, while query language features and functions might differ between them?
- How does cancellation propagate? Failures?
- How are results sent from one node to another?
- How do we unpick the mess when something goes wrong, or runs too slowly?
There's no way to bake all of this into one blog post, but if we run through each of these briefly, you'll have a good chance of mentally filling in the blanks. I love working on this stuff. It's the most interesting thing I think we've ever shipped, so if you're interested in more content around this, let me know, I'll gladly write it!
Query planning and materialization
How is a query broken up into independent pieces that can run on different nodes?
Seq's query engine is dataflow-based. Given a query like:
select count(*)
from stream
where @Elapsed > 1000
group by @Resource.service.name
The system breaks the query down into vertexes containing operations, and edges linking them. Each vertex runs independently, with data flowing along the edges, until the result arrives at the outermost vertex, by which point it looks like the result set you see in your browser.
The underlying system is SQL-based, so even simple searches like "StackOverflow" or "AccessViolation"
end up translated into SQL-style queries that are planned and executed as dataflows.
There are two distinct phases of query planning in the system. The "logical" phase breaks the query into layers, which are sets of vertexes of the same type. Layers are associated with a partitioning strategy that governs where in the query plan parallelism is allowed, and a merge strategy describing how results from vertexes in the layer are combined.
Putting explain
in front of the query in Seq will show the logical query plan, including what optimizations were applied:
explain select count(*)
from stream
where @Elapsed > 1000
group by @Resource.service.name
Query plans in Seq are fully-declarative, which means there's a lot of nitty-gritty detail in the query plan about how things run in each layer:
![(receive) (send) (transform) columns: [col(0), coalesce(col(1), 0)] (grouping) key: [col(0)] group: reduce operations: [sum(col(1))] (receive) merge: unordered (send) partition: unconstrained (memoize) computation key: [p{stream,(timestamp(id()) - @st) > 1000,[coalesce(@ra.service.name, null)],()},g{[col(0)],r{[count(true)]}}] (grouping) key: [col(0)] group: reduce operations: [count(true)] (records) source id: stream bounds: .. condition: (timestamp(id()) - col(0)) > 1000 condition source: [@st] projection: [coalesce(col(1), null)] projection source: [@st] + [@ra.service.name] applied optimizations: - propagate constant columns - parallelize reducer - detect cacheable aggregation - compute record source columns - asynchronously compute the final result](https://blog.datalust.co/content/images/2025/05/image-23.png)
Query plans use indentation to separate layers. In this plan we can see there are three layers:
- the root (output) layer just receives rows from a channel,
- the next layer performs a grouped sum operation, and
- the final layer of vertexes reads records from
stream
to produce intermediate grouped sums over each partition of the underlying data store.
The second phase of query planning is what we term "materialization", and the output is a complete description of the dataflow that the system will actually run.
Given the state of the event store (in the transaction executing the query), which portions of which data files need to be processed? Taking the layers of the query plan, the data is divided into partitions, and each partition results in a vertex. There can be a lot of these!
The materialized dataflow for the query above looks something like this:

Each vertex is stamped with a set of file ids and ranges. To get a feel for what constitutes a vertex, here's its wire representation in all of its rusty glory:
pub struct VertexData {
pub job_id: u128,
pub diagnostic_ctxt: Option<DiagnosticCtxtData>,
pub fixed_now: Option<u64>,
pub input: VertexInputData,
pub blocks: Vec<Block>,
pub output: OutgoingEdgeData,
pub requirements: Vec<String>,
}
I've dropped off the ubiquitous#[derive(..)]
attributes on thisstruct
, and throughout the article — they're a bit too noisy for unaccustomed eyes to skim over.
After materialization, the vertexes are dropped onto what we half-jokingly call the "job pile" (it's close enough to a queue, but not quite!). This leads on to the next question...
The job pile and available files
How does work get to a node with the data required to perform it?
Each node talks to every other node; using the "vertexes" protocol (alongside "snapshots", already discussed, and "edges" covered below), the calling node asks the node at the other end of the line for work. Making nodes poll for work reduces the chance of the leader node pushing work to an overloaded or faulty worker.
When a request reaches the current leader node, it carries a delta of the files available on that worker since the last request:
pub struct ReceiveVertexesRequest {
pub available_files: Vec<AvailableFileChange>,
..
}
pub enum AvailableFileChange {
Add(AvailableFile),
Update(AvailableFile),
Remove(u128),
}
pub struct AvailableFile {
pub id: u128,
pub byte_range: Option<Range<u64>>,
pub id_range: Option<Range<u128>>,
}
There's quite a bit of stuff tacked onto this request; one thing we've tried hard to do is keep protocols ultra-simple, so there's only one request type and one response type for the vertexes protocol. We'll look at a few more fields of this request type later on.
The available_files
field provides enough information for the server to scan the job pile and find vertexes that can be executed by the client. Since the client makes the request within a read transaction, we're guaranteed that if vertexes are sent back, the advertised file ids and ranges will still be there.
Capacity advertising
How does the system decide that a node has capacity to run some work, and how much?
This is where pulling on one thread started to unravel the whole sweater 😅. Earlier Seq versions allocated threads for each running query, and if the server ended up oversubscribed, would rely on timeouts to put the brakes on and get utilization back to an acceptable level.
This scheme didn't map well to nodes running in a cluster. The ideal picture is one where each node has a known total capacity, knows how much of that capacity is free, and can advertise that capacity when requesting work from other nodes.
So, Seq switched from using dedicated threads for each query, to a fixed-size pool of query threads. We initially implemented a reservation mechanism: when sending a ReceiveVertexesRequest
, the client node would "reserve" a subset of idle threads so that the advertised capacity was guaranteed to be available when vertexes were returned. In the end, this had implications that slowed down the thread pool, and introduced new failure modes, for only marginal benefit. So, we scrapped it and switched to treating available capacity as a (very accurate) estimate.
There's an interesting scheduling problem faced by dataflow systems, in that dataflows can often only make progress when vertexes at both sides of an edge are scheduled. The solution we chose is to reserve a certain number of threads in each pool for specific logical "layers" in the dataflow (and carefully ordering vertexes). The mechanism we use to make this work across the network ties in with the next topic: the system of "features" supported by logical workers, and the required_features
field on vertexes...
Worker features
How do nodes at different Seq versions safely share work, while query language features and functions might differ between them?
Let's take a closer look at how available capacity is reported by nodes:
pub struct ReceiveVertexesRequest {
..
pub available_capacity: Vec<AvailableCapacity>,
..
}
pub struct AvailableCapacity {
features: Vec<String>,
count: u64,
}
Capacity is split into worker groups, and each worker group declares the "features" it can provide. (We'd have called these "capabilities", but the codebase already uses that term extensively in its component system, so "features" was the next best thing!)
Among these are the query language versions available on the node; a Seq version that can run Seq 2025.1 queries will report ["v1"]
, and if we introduce some new functions in 2025.2, in a backwards-compatible way, this might report ["v1", "v2"]
. A vertex produced by a node requiring only "v1"
features will list that in its requirements:
pub struct VertexData {
..
pub requirements: Vec<String>,
}
A vertex that uses "v2"
features will list this instead. This simple system is enough that when queued work is matched against available worker capacity, the system can continue distributing work as widely as possible while also keeping mixed clusters healthy during upgrades.
Timeouts/cancellation, and failures
How does cancellation propagate? Failures?
The ReceiveVertexesRequest
and ReceiveVertexesResponse
types carry failed_jobs
and inactive_jobs
to communicate failures from the worker back to the leader node, and cancellations from the leader back to other workers that might be running vertexes from failed or canceled jobs:
pub struct ReceiveVertexesRequest {
..
pub failed_jobs: Vec<u128>,
}
pub struct ReceiveVertexesResponse {
..
pub inactive_jobs: Vec<u128>,
}
(A "job" is the unit of scheduling; from the runtime's perspective it's just a bag of vertexes, although these form a dataflow when assembled.)
This scheme seems very, very simple and narrow - how does the leader node identify which vertex failed, or a user receive information about the failure in an error message?
At one point, the system did account for exactly these things, but through a series of purges we removed these features:
- Failed vertexes aren't retried — keeping the system live and stable is more important than completing any one job, and failures are rare enough that there's no great value in attempting retries, so the leader doesn't need to know which vertex failed, only that a vertex failed, and the dataflow needs to be torn down
- The kinds of errors that make sense to a user are generated early in the process, when query syntax and semantics are analyzed; the kinds of failures that might occur on a worker running a vertex (i.e. transient I/O errors) aren't directly useful to an end-user
- The cluster diagnostic instance provides much more detailed query tracing and error reporting than any in-band error propagation system can provide
Knocking these things out of the network protocols and in-process channels was a big win: distributed error propagation is hard, which would be surprising if it wasn't such a widely-known problem. Reducing the scope of error propagation to a broad per-job indication of failure helped us build a more predictable and reliable system than we otherwise could.
Channels and the edge network
How are results sent from one node to another?
Another clustering requirement that had far-reaching effects on the rest of the system is result passing. In this context, results are streams of fixed-size rows.
In a single-node system, this is all pretty straightforward: the relationship between a consumer vertex and a producer is a channel (think flume
, or System.Threading.Channels
in .NET). Because Seq is frequently deployed as a single-node system, we didn't want to slow things down with another layer of indirection between producers and consumers to deal with routing data around the cluster.
What we ended up with was a library of channel types that support partitioning, and are aware up-front of the number of expected partitions that make up a more complex channel type, such as ordered multiple-producer-single-consumer (MPSC), which underlies the incremental searches you see on the Seq Events screen.

Initially, when a query starts executing vertexes either side of a channel like this, the channel is created. If the query continues to execute on the same node, senders and receivers for the channel are allocated locally.
If it turns out that only one or two senders end up running locally, and the rest of the senders and receivers run elsewhere, the MPSC channel can be broken apart into its constituent partitions, and the pieces running locally are attached to network-layer proxies that send or receive rows to the right machine elsewhere in the cluster:

The "edges" network is a point-to-point protocol that shares its wire representation with the in-memory format that transports rows between threads, keeping format conversion to a minimum.
Although the edge network would be a great match in some ways for an unframed streaming protocol - really just writing bytes to a socket - things get trickier when potentially thousands of edges are involved in a single query. There's a connection establishment cost for each socket, and scenarios involving large numbers of TCP connections are fraught with dangers and footguns. Instead, each node-to-node edge network connection uses a single TCP connection, and results are multiplexed over the connection using chunking.
When a chunk arrives at a node, its header identifies the edge it belongs to, and the corresponding channel is looked up to receive the rows contained within the chunk.
Visualizing work sharing
How do we unpick the mess when something goes wrong, or runs too slowly?
We've completed our 1000-foot flyover of work sharing. It's complex, highly-asynchronous, and despite being a lot of fun to work on, it's impossible to debug without the right tools.
Early in the development of Seq 2025.1 we realized distributed, hierarchical tracing is an excellent fit for visualizing distributed query execution. Although nodes don't communicate using HTTP, and chains of causality are only loosely-linked, the execution of work on the cluster maps extremely closely to the Gantt/waterfall-style traces we can collect and display in the Seq UI. In clustered scenarios, we recommend setting up a separate Seq instance to receive trace data via OTLP.
On the leader node, query tracing is just like any other tracing up until the point we schedule vertexes in the dataflow. To trace the parsing, planning, and materialization phases of query execution, we use emit
's lovely #[emit::span(..)]
attribute:
/**
Parse a `query.
*/
#[emit::span(err_lvl: "error", "parse")]
pub fn parse_query(query: &str) -> Result<Query, Error> {
..
}
Emit is a direct OTLP (OpenTelemetry Protocol) emitter for Rust that's being developed by Ashley Mannix, who is also a core Seq engineer. The focus of emit is on imposing as few layers and as little abstraction overhead as possible. We're somewhat "power users" of OTLP, and at a given point we know pretty much exactly what details we want stamped into an OpenTelemetry span or log record, so emit is perfect for us.
Once we start distributing vertexes, we need to propagate the same kind of trace information that the W3C traceparent
header would provide in an HTTP request. That info gets packed into VertexData::diagnostic_context
.
pub struct VertexData {
..
pub diagnostic_ctxt: Option<DiagnosticCtxtData>,
..
}
When a node picks up a vertex for execution, we use emit
to push information about the current trace and parent span onto the node's local context stack for the duration of processing.
Rather than let all vertexes execute in a kind of span soup, Seq pre-generates span ids and sets parent ids so that the hierarchy of spans in a trace follows the direction of dataflow. Seq also generates a condensed description of the operations in each span to serve as the span name.
Here's the final result, showing a query scheduled on NODE02
run across NODE02
, NODE03
, and NODE04
(clock drift and all 😁):

And with that, it's time to wrap up this tour. We hope you enjoyed it! If you're interested to learn more and haven't already read them, you can zip back now to the leader election and data replication posts that precede this one.