CEP Engine Guide
Complex Event Processing (CEP) fills the gap between single-request rules and behavior that only becomes meaningful over time. Use the Muonroi CEP package when a decision depends on a sequence of events, a time window, or a correlation key such as customer id, account id, session id, or device id.
Use stateless rules when a single payload already contains everything needed to decide. Use CEP when the answer depends on "how many times", "within how long", or "for which correlated stream".
Examples that fit CEP well:
- fraud detection: more than
Ntransactions inXseconds - operational alerting: repeated failures inside a short sliding window
- session behavior: too many login attempts for the same user or IP
- IoT monitoring: repeated threshold breaches for one device
- notification suppression: collapse bursts of equivalent events into one action
Examples that do not need CEP:
- a loan approval decision based on one application payload
- a price calculation using only the current cart
- a permission check for one request
What ships today
The current package exposes four main pieces:
CepEngine<T>inMuonroi.RuleEngine.CEPfor in-memory event window evaluationICepConfigRepositoryplusInMemoryCepConfigRepositoryandEfCoreCepConfigRepositoryfor tenant-aware config CRUDCepWindowBuilderfor readable DSL-style configuration and runtime window constructionCepControllerunder/api/v1/rule-engine/cepfor CRUD and simulationAddCepWeb()for registering the repository, controller, database migrator, and UI manifest contributor
The current implementation is still intentionally explicit, but it is no longer only a demo surface:
- event windows are held in memory per
CepEngine<T>instance - config storage flows through
ICepConfigRepository - the default repository is tenant-aware and process-local in memory
- hosts can switch config persistence to PostgreSQL or SQL Server without changing controller contracts
- OpenTelemetry-style activity and meter hooks are emitted through
CepMetrics - simulation sorts incoming events by timestamp before evaluation
- supported window types are
SlidingandTumbling
That means the package is useful today for operator tooling, embedded CEP logic, and production hosts that need durable CEP configuration. Event windows themselves are still in-memory runtime state. If you need cross-node event buffering or distributed window coordination, add those concerns in the host application around the package instead of assuming they exist in the package already.
Core concepts
Events
An event is a payload plus:
- a correlation key
- a UTC timestamp
The runtime type is:
public record CepEvent<T>(string Key, DateTime Timestamp, T Value);
The key is what isolates one stream from another. If you use customer id as the key, each customer gets a separate window. If you use device id, each device gets a separate window.
Window size
windowSize defines how much history participates in the current evaluation.
Examples:
TimeSpan.FromSeconds(30)for rapid fraud burstsTimeSpan.FromMinutes(5)for operational error spikesTimeSpan.FromHours(1)for slower quota or abuse signals
Time to live (TTL)
ttl controls how long old events remain in the internal list before pruning. If you omit it, the engine uses windowSize.
In practice:
- use
ttl == windowSizewhen the active window is all you care about - use a larger
ttlonly when you need slightly longer retention for replay-like behavior
Correlation key
Choose the smallest key that matches the decision boundary.
Good keys:
account-123user-42tenant-a:terminal-9
Bad keys:
"default"for all traffic in a multi-tenant stream- a random GUID per event when you actually need user-level correlation
Quick start in code
The low-level engine does not require ASP.NET. You can use it in an app service, message consumer, or background job.
using Muonroi.RuleEngine.CEP;
var engine = new CepEngine<TransactionEvent>(
TimeSpan.FromSeconds(30),
WindowType.Sliding,
ttl: TimeSpan.FromMinutes(2));
IReadOnlyList<CepEvent<TransactionEvent>> window = engine.AddEvent(
key: "customer-42",
value: new TransactionEvent(250m, "card"),
timestamp: DateTime.UtcNow);
if (window.Count >= 3)
{
Console.WriteLine("Fraud alert candidate.");
}
Typical host-level pattern:
- receive event
- choose correlation key
- append to
CepEngine<T> - inspect returned window
- trigger a Muonroi rule evaluation or business action
ASP.NET quick start
Register the web endpoints in an ASP.NET host:
using Muonroi.RuleEngine.CEP;
builder.Services.AddControllers();
builder.Services.AddCepWeb();
AddCepWeb() registers:
ICepConfigRepositorywith the in-memory tenant-aware repository- the controller assembly
- the CEP UI manifest contributor used by the UI engine catalog
To persist CEP configs in PostgreSQL or SQL Server, pass options:
using Muonroi.RuleEngine.CEP;
builder.Services.AddCepWeb(options =>
{
options.PostgresConnectionString = builder.Configuration.GetConnectionString("RuleEngineDb");
options.Schema = "ruleengine";
});
Supported built-in persistence modes:
- default:
InMemoryCepConfigRepository - PostgreSQL:
EfCoreCepConfigRepository - SQL Server:
EfCoreCepConfigRepository
When a relational connection string is configured, AddCepWeb() also wires CepConfigDatabaseMigrator so the CEP config table is created or migrated at startup.
The controller base route is:
/api/v1/rule-engine/cep
Window types
Sliding window
Sliding windows answer "what happened in the last X time units relative to this event?"
Current implementation:
private List<CepEvent<T>> GetSliding(List<CepEvent<T>> list, DateTime current)
{
DateTime start = current - windowSize;
return [.. list.Where(e => e.Timestamp > start && e.Timestamp <= current)];
}
Use sliding windows for:
- fraud spikes
- repeated retries
- burst detection
- rate limits
Tumbling window
Tumbling windows answer "which fixed bucket does this event belong to?"
Current implementation:
private List<CepEvent<T>> GetTumbling(List<CepEvent<T>> list, DateTime current)
{
long windowTicks = windowSize.Ticks;
long startTicks = current.Ticks / windowTicks * windowTicks;
DateTime start = new(startTicks, DateTimeKind.Utc);
DateTime end = start + windowSize;
return [.. list.Where(e => e.Timestamp >= start && e.Timestamp < end)];
}
Use tumbling windows for:
- fixed reporting buckets
- batch notifications per minute
- billing or quota summaries that should not overlap
Choosing between them
| Need | Prefer |
|---|---|
| "last 30 seconds from now" | Sliding |
| "current minute bucket" | Tumbling |
| overlapping alert conditions | Sliding |
| non-overlapping aggregation buckets | Tumbling |
Out-of-order events
The current engine supports out-of-order arrival by inserting events into the per-key list using a binary search. That matters if your source transport retries or delivers late messages.
Operational implications:
- you still need trustworthy UTC timestamps
- if timestamps are wrong, the engine will still sort them and produce wrong windows deterministically
- late events can change the apparent count of the current returned window
If the event source can be minutes late, choose a TTL that reflects that reality and test the behavior explicitly.
CEP controller API
The built-in controller is useful for operator-facing tooling and manual simulation. It now reads and writes through ICepConfigRepository, so hosts can replace the default repository without changing controller contracts.
List configs
GET /api/v1/rule-engine/cep
Returns all config entries ordered by Name.
Get one config
GET /api/v1/rule-engine/cep/{id}
Returns 404 if the config id does not exist.
Save or update a config
PUT /api/v1/rule-engine/cep/{id}
Content-Type: application/json
{
"name": "fraud-window",
"windowType": "Sliding",
"windowSizeSeconds": 30,
"timeToLiveSeconds": 120
}
Validation rules enforced by the controller:
nameis requiredwindowTypemust parse to theWindowTypeenum
The controller stamps UpdatedAtUtc using IMDateTimeService.UtcNow().
Simulate a config
POST /api/v1/rule-engine/cep/{id}/simulate
Content-Type: application/json
{
"events": [
{
"key": "customer-42",
"timestampUtc": "2026-03-09T10:00:00Z",
"payload": { "amount": 120, "channel": "card" }
},
{
"key": "customer-42",
"timestampUtc": "2026-03-09T10:00:10Z",
"payload": { "amount": 140, "channel": "card" }
}
]
}
Response shape:
config.idconfig.nameprocessedEventswindows[]
Each windows[] item includes:
keytimestampUtccount
The current simulation endpoint is intentionally count-oriented. If you need richer simulation output, extend the host API with domain-specific summaries rather than overloading the generic package.
Production persistence
The package now supports durable CEP configuration out of the box through EfCoreCepConfigRepository.
What becomes durable when you configure PostgreSQL or SQL Server:
- CEP config CRUD done through
CepController - tenant-scoped config lookup through
ICepConfigRepository - restart-safe config storage
What remains in-memory runtime state:
- event buffers inside each
CepEngine<T>instance - active window membership and counts
- cross-node correlation behavior
Recommended production pattern:
- keep
CepEngine<T>close to the service that owns the event stream - persist CEP configs with
AddCepWeb(options => ...) - reload or resolve config through
ICepConfigRepositorywhen operator changes are applied - use the controller simulation flow as an operator convenience layer
- use the
FraudDetectionsample as the reference host when bootstrapping a CEP-backed API
A small host-level record usually works well:
public sealed record TenantCepConfig(
string TenantId,
string ConfigId,
string Name,
string WindowType,
int WindowSizeSeconds,
int TimeToLiveSeconds);
If you need multi-node state, externalize event buffering and coordination. The package gives you durable config, not distributed window state.
Observability and OpenTelemetry
The CEP package publishes a dedicated ActivitySource and Meter through CepMetrics.
Built-in telemetry:
ActivitySource("Muonroi.CEP")- counter
cep.events.processed - counter
cep.window.evaluations - histogram
cep.window.event.count - counter
cep.config.reads - counter
cep.config.writes
What you can observe today:
- ASP.NET request traces around
CepControllerthrough your normal host instrumentation - CEP evaluation traces and metrics through
CepMetrics - your own application counters around emitted alerts, dropped events, or matched windows
Recommended host instrumentation:
builder.Services.AddMuonroiOpenTelemetry(builder.Configuration);
Then add custom counters around host-specific CEP decisions:
private static readonly Meter Meter = new("MyCompany.Cep");
private static readonly Counter<long> Alerts = Meter.CreateCounter<long>("cep.alerts");
Practical metrics to track:
cep.events.processedcep.window.evaluationscep.alerts.emittedcep.simulation.requestscep.late.events
Cross-reference the platform-level observability setup in Observability Guide.
Multi-tenant isolation
There are two different isolation layers to think about:
Event-key isolation inside one engine instance
CepEngine<T> isolates streams by the key argument. Events for tenant-a:user-1 do not mix with tenant-a:user-2.
Tenant isolation at the host boundary
The package now persists CEP configs in a tenant-aware way through ICepConfigRepository. If you are hosting CEP for many tenants:
- include tenant id in the correlation key or engine partition when the stream itself is multi-tenant
- let
ICepConfigRepositoryresolve config by current execution context tenant - never reuse a generic
"default"runtime key across unrelated tenants - secure admin APIs with tenant-aware authorization
Good pattern:
config id: tenant-a/fraud-window
event key: tenant-a:customer-42
Bad pattern:
config id: fraud-window
event key: customer-42
Integrating CEP with the rule engine
CEP is usually a trigger, not the final decision engine. A typical flow is:
- CEP receives events
- active window crosses a threshold
- the app creates a fact payload
- Muonroi rule engine evaluates the downstream rule set
- the app emits an alert or action
Example:
IReadOnlyList<CepEvent<TransactionEvent>> window = cepEngine.AddEvent(
request.CustomerId,
request,
request.TimestampUtc);
if (window.Count >= 3)
{
Dictionary<string, object?> facts = new(StringComparer.OrdinalIgnoreCase)
{
["transactionCountInWindow"] = window.Count,
["customerId"] = request.CustomerId,
["channel"] = request.Channel
};
// Then pass facts into your Muonroi ruleset or orchestrator.
}
This pairing works especially well when:
- CEP decides whether a burst is suspicious
- stateless rules decide what to do next
- decision tables decide which escalation path to take
Cross-reference the core rules runtime in Rule Engine Guide.
Troubleshooting
Window counts look too low
Check:
- timestamps are UTC
- the correct correlation key is used
windowSizeSecondsis large enough for the expected burst
Counts jump unexpectedly
Likely causes:
- out-of-order events arriving late
- a tumbling window was used where a sliding window was expected
- different tenants or users accidentally share the same key
Config disappears after restart
That is expected with the current built-in controller. Persist config in the host application if you need durability.
Multi-node instances disagree
That is also expected if each node keeps its own in-memory engine state. Use a shared state strategy in your host architecture when you scale out.