Skip to main content

Messaging & Integration Packages

Comprehensive reference for Muonroi's async messaging, external integrations, and service communication packages. These packages layer tenant awareness, security, observability, and resilience into RabbitMQ, Kafka, HTTP, gRPC, and SignalR transports.


Muonroi.Messaging.Abstractions

NuGet: Muonroi.Messaging.Abstractions | Tier: OSS/Commercial | Distribution: NuGet public feed

Purpose

Defines core abstractions for message-driven architectures: events, routing contracts, outbox patterns, and saga support. No transport dependencies — purely interfaces and domain models.

Key Types

TypeKindPurpose
DomainEventAbstract classBase for in-process events; implements INotification for MediatR
IntegrationEventAbstract classBase for cross-service events (published to message bus)
IMessageRouter<TMessage>InterfaceCustom routing logic before message reaches consumer
IRoutingDecision / RoutingDecisionInterface + recordRouting outcome: pass-through, redirect, or reject
IMuonroiMessageEnvelope / MuonroiMessageEnvelopeInterface + classStandardized message headers: tenant, user, correlation, token
IMuonroiSagaInterfaceExtends MassTransit.ISaga + ITenantScoped — requires TenantId, CreationTime, LastModificationTime
IMessageRoutingRule<T>InterfaceLegacy routing contract (backward compatible)
IOutboxRelayServiceInterfaceExposes outbox relay operations
EventOutboxEntityPersistent outbox table schema: Id, EventName, EventType, EventContent, Status, ErrorMessage, CreationTime
EventOutboxStatusEnumPending, Published, Failed
IEventOutboxStoreInterfaceManages EventOutbox persistence
IdempotentAttributeAttributeMarks message handlers for idempotency enforcement

Integration Event Base

public abstract class IntegrationEvent
{
public string AggregateId { get; protected set; } = string.Empty;
public DateTime OccurredOn { get; } = DateTime.UtcNow;
}

Domain Event Example

public class OrderCreatedDomainEvent : DomainEvent
{
public string OrderId { get; init; }
public string CustomerId { get; init; }
}

DI Registration

// No explicit registration required — used by dependent packages
// Muonroi.Messaging.MassTransit consumes these abstractions

Muonroi.Messaging.MassTransit

NuGet: Muonroi.Messaging.MassTransit | Tier: Commercial | Feature: Premium.MessageBus | Distribution: Private NuGet feed

Purpose

Production-ready message bus implementation on MassTransit. Wires tenant context, security, quota enforcement, rule-based routing, persistent outbox, and saga support into a single cohesive pipeline for RabbitMQ and Kafka.

Key Components

Core Registration

ClassPurpose
MassTransitHandlerDI extension methods: AddMessageBus(), AddOutboxRelay()
BusTypeEnum: RabbitMq, Kafka
MessageBusConfigsConfiguration POCO with runtime, outbox relay, quota, routing options
MessageBusRuntimeConfigsRetry policy, prefetch, deduplication, endpoint naming
OutboxRelayConfigsPolling interval, batch size, retry thresholds

Consume Filters

Applied in order to every incoming message:

FilterResponsibility
AmqpContextConsumeFilter<T>Extracts headers → resolves ISystemExecutionContext (tenant, user, correlation)
TenantContextConsumeFilter<T>Validates tenant policy; enforces multi-tenant constraints
RuleEngineRoutingFilter<T>Evaluates IMessageRouter<T> + Redis FEEL routes (if enabled)
EcsConsumeLoggingFilter<T>OpenTelemetry tracing + metrics + structured logging

Publish/Send Filters

FilterResponsibility
MuonroiContextPublishFilter<T>Auto-stamps all context headers (tenant, user, token, correlation)
MuonroiContextSendFilter<T>Same as publish for direct sends
TenantQuotaMessagingFilter<T>Enforces quota limits (if enabled)
EcsPublishLoggingFilter<T>OpenTelemetry tracing for outbound publishes
EcsSendLoggingFilter<T>OpenTelemetry tracing for direct sends

Consumer Base Class

public abstract class MuonroiConsumerBase<TMessage>(
ISystemExecutionContextAccessor contextAccessor,
IMLog<TMessage> log,
ILicenseGuard? licenseGuard = null)
: IConsumer<TMessage>
where TMessage : class

Guarantees:

  • License feature check: Premium.MessageBus before any processing
  • ISystemExecutionContext pre-resolved by filter pipeline
  • Standardized error logging with tenant, user, message ID
  • OperationCanceledException re-thrown cleanly (no error log)

Outbox Relay

ClassPurpose
OutboxRelayBackgroundServiceIHostedService polling EventOutbox rows; publishes PendingPublished/Failed
MEventOutboxDbContextDbContext base with SaveWithOutboxAsync() for atomic save + outbox insertion

Bus Configuration

ConfiguratorTransportPurpose
RabbitMqBusConfiguratorRabbitMQAMQP endpoint setup, publisher confirmation, heartbeat
KafkaBusConfiguratorKafkaTopic naming, consumer group, SASL auth

Configuration Schema

{
"MessageBusConfigs": {
"BusType": "RabbitMq|Kafka",
"RabbitMq": {
"Host": "localhost",
"Port": 5672,
"VirtualHost": "/",
"Username": "guest",
"Password": "guest",
"HeartbeatSeconds": 30,
"PublisherConfirmation": true,
"UseSsl": false,
"UseQuorumQueues": false
},
"Kafka": {
"Host": "localhost:9092",
"Topic": "muonroi-events",
"GroupId": "consumer-group",
"ClientId": "muonroi-messagebus",
"SecurityProtocol": "SaslSsl|Plaintext|Ssl|SaslPlaintext",
"SaslMechanism": "Plain|ScramSha256|ScramSha512",
"SaslUsername": "user",
"SaslPassword": "password"
},
"Runtime": {
"RetryCount": 3,
"RetryIntervalMs": 500,
"PrefetchCount": 32,
"ConcurrentMessageLimit": 16,
"EnableInMemoryOutbox": true,
"EndpointPrefix": "my-service"
},
"OutboxRelay": {
"Enabled": true,
"PollingIntervalMs": 5000,
"BatchSize": 100,
"MaxRetryFailedCount": 5
},
"MaskAccessTokenInHeaders": false,
"EnableQuotaEnforcement": false,
"EnableRuleEngineRouting": false,
"EnableRedisRoutingTable": false
}
}

DI Registration

// Program.cs
builder.Services.AddMessageBus(
builder.Configuration,
consumersAssembly: typeof(Program).Assembly);

// Optional: persistent outbox relay
builder.Services.AddOutboxRelay();

Usage Examples

Publishing Messages (Auto-Context)

public class OrderService(IPublishEndpoint publisher)
{
public async Task CreateOrderAsync(CreateOrderCommand cmd, CancellationToken ct)
{
// Headers auto-stamped by MuonroiContextPublishFilter:
// TenantId, UserId, CorrelationId, SourceType, SentAt, AccessToken
await publisher.Publish(
new OrderCreated { OrderId = cmd.OrderId, Amount = cmd.Amount },
ct);
}
}

Publishing with Explicit Context

public class BackgroundJobService(
IPublishEndpoint publisher,
ISystemExecutionContextAccessor contextAccessor,
ITenantContextPolicy tenantPolicy)
{
public async Task ProcessAsync(CancellationToken ct)
{
await publisher.PublishWithAuthContext(
new OrderProcessed { OrderId = "123" },
contextAccessor,
tenantPolicy,
ct);
}
}

Consuming Messages

public class OrderCreatedConsumer(
ISystemExecutionContextAccessor contextAccessor,
IMLog<OrderCreated> log,
ILicenseGuard? licenseGuard = null)
: MuonroiConsumerBase<OrderCreated>(contextAccessor, log, licenseGuard)
{
protected override async Task HandleAsync(
ConsumeContext<OrderCreated> context,
ISystemExecutionContext executionContext,
CancellationToken cancellationToken)
{
Log.Info("Processing order {OrderId} for tenant {TenantId}",
context.Message.OrderId,
executionContext.TenantId);

// Business logic: save to database, trigger downstream tasks, etc.
await Task.Delay(100, cancellationToken);
}
}

Persistent Outbox Pattern

public class MyDbContext(DbContextOptions<MyDbContext> options, IMediator mediator)
: MEventOutboxDbContext(options, mediator)
{
public DbSet<Order> Orders => Set<Order>();
}

// In handler or service:
public async Task PlaceOrderAsync(PlaceOrderCommand cmd, CancellationToken ct)
{
var order = new Order { Id = cmd.OrderId, Amount = cmd.Amount };
_dbContext.Orders.Add(order);

// Atomic: saves entity + enqueues EventOutbox row in one transaction
await _dbContext.SaveWithOutboxAsync(
new OrderCreated { OrderId = cmd.OrderId },
_jsonService,
ct);
}

Saga State Persistence

public class OrderSagaDbContext(
DbContextOptions<OrderSagaDbContext> options,
IMediator mediator,
IMDateTimeService dateTimeService,
ISystemExecutionContextAccessor contextAccessor)
: MSagaDbContext(options, mediator, dateTimeService: dateTimeService, executionContextAccessor: contextAccessor)
{
public DbSet<OrderSagaState> OrderSagas => Set<OrderSagaState>();
}

public class OrderSagaState : IMuonroiSaga
{
public Guid CorrelationId { get; set; }
public string? TenantId { get; set; }
public DateTime CreationTime { get; set; }
public DateTime? LastModificationTime { get; set; }
public string CurrentState { get; set; } = "Pending";
public Guid OrderId { get; set; }
}

Custom Message Routing

public sealed class OrderCreatedRouter : IMessageRouter<OrderCreated>
{
public int Order => 0;
public string Code => "order.route";

public Task<IRoutingDecision> RouteAsync(
OrderCreated message,
IRoutingContext context,
CancellationToken ct = default)
{
if (context.TenantId == "acme-corp")
{
return Task.FromResult(RoutingDecision.RedirectTo(
"rabbitmq://rabbitmq-acme/acme-orders",
"Route to tenant-specific broker"));
}

return Task.FromResult(RoutingDecision.PassThrough);
}
}

// DI registration:
builder.Services.AddMessageRouter<OrderCreated, OrderCreatedRouter>();

Observability

OpenTelemetry Sources:

SourceActivitiesMeterAttributes
MassTransitW3C context propagation (auto)--
Muonroi.BuildingBlock.MessageBusmessagebus.consume, messagebus.publish, messagebus.sendmessagebus_messages_total, messagebus_errors_total, messagebus_operation_duration_msmessaging.operation, messaging.message_type, messaging.destination, messaging.system, tenant.id, correlation.id

Health Checks

app.MapHealthChecks("/health");
// Health check registered as "rabbitmq" or "kafka" depending on BusType

Muonroi.Integration.Abstractions

NuGet: Muonroi.Integration.Abstractions | Tier: OSS | Distribution: NuGet public feed

Purpose

Core contracts for external service integrations: connectors (HTTP, Email, Slack, etc.), credential/config management, and rule engine task execution.

Key Types

TypeKindPurpose
IServiceTaskConnectorInterfaceCore connector contract: ExecuteAsync(), TestConnectionAsync(), GetConfigSchema(), Metadata
ConnectorMetadataClassConnector identity: type, display name, category, icon, description, credential requirement
ConnectorContextClassExecution context: Config (JsonDocument), Credentials (dict), TenantId, RuleEngineContext
ConnectorResultClassExecution outcome: Success, OutputFacts, ErrorMessage, StatusCode, Duration
ConnectorResilienceOptionsClassRetry, circuit breaker, timeout configuration
IConnectorRegistryInterfaceDiscovers + resolves connectors by type
IConnectorConfigStoreInterfacePersists connector configurations
IConnectorCredentialStoreInterfacePersists encrypted connector credentials

ConnectorResult Examples

// Success with output facts
ConnectorResult.Ok(
new() { ["apiResponse"] = jsonData, ["httpStatusCode"] = 200 },
statusCode: 200,
duration: TimeSpan.FromMilliseconds(150));

// Failure
ConnectorResult.Fail(
"API returned 500: Internal Server Error",
statusCode: 500,
duration: TimeSpan.FromMilliseconds(2000));

Muonroi.Integration.Connectors

NuGet: Muonroi.Integration.Connectors | Tier: Commercial | Distribution: Private NuGet feed

Purpose

Production connectors for common external services: HTTP/REST, Slack, Email (SMTP), Redis, SQL databases. Each implements IServiceTaskConnector with metadata, execution, and connection testing.

Built-in Connectors

HttpConnector

Type: "http"

Executes HTTP/REST requests (GET, POST, PUT, DELETE, PATCH) with header/body templating and response mapping.

Config Schema:

{
"url": "https://api.example.com/data",
"method": "POST",
"headers": {
"X-Custom": "value"
},
"body": "{{ json(items) }}",
"contentType": "application/json",
"responseMapping": {
"userId": "user.id",
"userName": "user.name"
},
"timeout": 30
}

Credentials:

{
"authorization": "Bearer token123",
"apiKey": "key123",
"apiKeyHeader": "X-API-Key"
}

Output Facts:

  • httpStatusCode: int
  • httpResponseBody: string
  • httpSuccess: bool
  • Mapped properties per responseMapping

SlackWebhookConnector

Type: "slack"

Sends messages to Slack via incoming webhooks. Supports channel override.

Config Schema:

{
"text": "Order #{{ orderId }} approved by {{ userName }}",
"channel": "#orders"
}

Credentials:

{
"webhookUrl": "https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXX"
}

Output Facts:

  • slackSent: bool

SmtpConnector

Type: "email"

Sends emails via configured SMTP server. Supports template rendering.

Config Schema:

{
"to": "{{ customerEmail }}",
"cc": "[email protected]",
"subject": "Order Confirmation #{{ orderId }}",
"body": "Thank you for your order...",
"isHtml": true
}

Credentials:

{
"smtpHost": "smtp.example.com",
"smtpPort": 587,
"username": "[email protected]",
"password": "password123",
"fromAddress": "[email protected]",
"enableTls": true
}

RedisConnector

Type: "redis"

Executes Redis commands (GET, SET, LPUSH, HSET, etc.) with key/value templating.

Config Schema:

{
"command": "SET",
"key": "user:{{ userId }}",
"value": "{{ json(userData) }}",
"ttlSeconds": 3600
}

Output Facts:

  • redisResult: string (command result)
  • redisSuccess: bool

SqlQueryConnector

Type: "sql"

Executes SQL queries (SELECT, INSERT, UPDATE) with parameterized statements.

Config Schema:

{
"query": "SELECT * FROM orders WHERE customer_id = @customerId",
"parameters": {
"@customerId": "{{ customerId }}"
},
"timeout": 30
}

Output Facts:

  • sqlRows: array of row objects (SELECT)
  • sqlRowsAffected: int (INSERT/UPDATE)

DI Registration

// Automatic registration
services.AddConnectorRegistry();
// Registers: HttpConnector, SlackWebhookConnector, SmtpConnector, RedisConnector, SqlQueryConnector

Custom Connector Example

public sealed class CustomApiConnector : IServiceTaskConnector
{
public ConnectorMetadata Metadata => new()
{
Type = "custom-api",
DisplayName = "Custom API",
Category = "API",
Description = "Connects to proprietary API",
RequiresCredentials = true
};

public async Task<ConnectorResult> ExecuteAsync(ConnectorContext context, CancellationToken ct)
{
var sw = Stopwatch.StartNew();
var apiKey = context.Credentials.GetValueOrDefault("apiKey");
var url = context.Config.RootElement.GetProperty("url").GetString();

try
{
var client = new HttpClient();
client.DefaultRequestHeaders.Add("Authorization", $"Bearer {apiKey}");
var response = await client.GetAsync(url, ct);
sw.Stop();

if (response.IsSuccessStatusCode)
{
var content = await response.Content.ReadAsStringAsync(ct);
return ConnectorResult.Ok(
new() { ["response"] = content },
(int)response.StatusCode,
sw.Elapsed);
}

return ConnectorResult.Fail("API error", (int)response.StatusCode, sw.Elapsed);
}
catch (Exception ex)
{
sw.Stop();
return ConnectorResult.Fail(ex.Message, duration: sw.Elapsed);
}
}

public async Task<bool> TestConnectionAsync(ConnectorContext context, CancellationToken ct)
{
try
{
var result = await ExecuteAsync(context, ct);
return result.Success;
}
catch { return false; }
}

public JsonElement GetConfigSchema()
{
var schema = JsonDocument.Parse("""
{
"type": "object",
"properties": {
"url": { "type": "string", "format": "uri" }
},
"required": ["url"]
}
""");
return schema.RootElement.Clone();
}
}

// Register custom connector:
builder.Services.AddSingleton<IServiceTaskConnector, CustomApiConnector>();

Muonroi.Integration.Persistence

NuGet: Muonroi.Integration.Persistence | Tier: Commercial | Distribution: Private NuGet feed

Purpose

EF Core persistence layer for connector configurations and encrypted credentials. Provides tenant-scoped storage with automatic query filtering.

Key Types

TypeKindPurpose
ConnectorDbContextDbContextEF context for configs + credentials; applies TenantId query filter
ConnectorConfigEntityEntitySchema: Id, TenantId, ConnectorType, Name, ConfigJson, CredentialId, Status, CreatedAt
ConnectorCredentialEntityEntitySchema: Id, TenantId, Name, EncryptedValues, CreatedAt
EfConnectorConfigStoreClassImplements IConnectorConfigStore via EF
EfConnectorCredentialStoreClassImplements IConnectorCredentialStore via EF; handles encryption/decryption
ConnectorPersistenceRegistrationClassDI extension: AddConnectorPersistence()

DI Registration

builder.Services.AddConnectorPersistence(builder.Configuration);
// Registers ConnectorDbContext, EfConnectorConfigStore, EfConnectorCredentialStore

Usage

public class ConnectorService(
IConnectorConfigStore configStore,
IConnectorCredentialStore credentialStore)
{
public async Task SaveConnectorAsync(string tenantId, string type, JsonDocument config, CancellationToken ct)
{
var entity = new ConnectorConfigEntity
{
Id = Guid.NewGuid().ToString(),
TenantId = tenantId,
ConnectorType = type,
Name = "My HTTP Connector",
ConfigJson = config.RootElement.GetRawText(),
Status = "active"
};

await configStore.AddAsync(entity, ct);
await configStore.SaveChangesAsync(ct);
}
}

Muonroi.Grpc

NuGet: Muonroi.Grpc | Tier: Commercial | Feature: Premium.Grpc | Distribution: Private NuGet feed

Purpose

gRPC service registration (server + named clients) with built-in tenant awareness, auth context forwarding, resilience policies (retry, timeout, circuit breaker), and OpenTelemetry tracing.

Key Components

Server

ClassPurpose
GrpcServerInterceptorServer-side interceptor: tenant resolution, auth validation
GrpcServerExtensionsAddGrpcServer() DI extension

Client

ClassPurpose
GrpcClientAuthInterceptorClient-side interceptor: forwards auth token + tenant ID
GrpcClientTelemetryInterceptorClient-side interceptor: OpenTelemetry tracing
GrpcHandlerNamed client factory
GrpcRateLimiterServer-side rate limiting per API key / tenant

Configuration

ClassPurpose
GrpcServicesConfigRoot config container
GrpcServerConfigServer options: message sizes, compression, gRPC-Web, mTLS, rate limiting
GrpcClientDefaultsConfigDefault client options: timeout, retries, backoff, message sizes, forwarding flags
GrpcServiceConfigPer-service client options: URI, retry policy, forwarding overrides
GrpcMethodPolicyConfigPer-method policy: timeout, retry, backoff
GrpcRateLimitConfigServer rate limiting: RPM per API key / tenant

Configuration Schema

{
"GrpcServicesConfig": {
"Server": {
"EnableDetailedErrors": true,
"MaxSendMessageSizeBytes": 104857600,
"MaxReceiveMessageSizeBytes": 104857600,
"ResponseCompressionAlgorithm": "gzip",
"ResponseCompressionLevel": "Optimal",
"EnableGrpcWeb": false,
"EnableGrpcWebForAllServices": false,
"EnableJsonTranscoding": false,
"RequireMutualTls": false,
"AllowedClientCertificateThumbprints": [],
"RateLimit": {
"Enabled": true,
"RequestsPerMinutePerApiKey": 600,
"RequestsPerMinutePerTenant": 1200
}
},
"ClientDefaults": {
"TimeoutSeconds": 10,
"RetryCount": 3,
"InitialBackoffSeconds": 1,
"MaxBackoffSeconds": 8,
"LoadBalancingPolicy": "pick_first",
"MaxReceiveMessageSizeBytes": 104857600,
"MaxSendMessageSizeBytes": 104857600,
"ForwardAuthToken": false,
"ForwardTenantId": true
},
"Services": {
"PaymentService": {
"Uri": "https://payment.example.com:5001",
"TimeoutSeconds": 20,
"RetryCount": 5,
"ForwardAuthToken": true,
"Methods": {
"ProcessPayment": {
"TimeoutSeconds": 30
}
}
}
}
}
}

DI Registration

services.AddGrpcServer();
services.AddGrpcClients(configuration, new Dictionary<string, Type>
{
{ "PaymentService", typeof(Payment.PaymentClient) },
{ "NotificationService", typeof(Notification.NotificationClient) }
});

BaseGrpcService Usage

public class PaymentGrpcService(
ISystemExecutionContextAccessor contextAccessor,
ILicenseGuard licenseGuard,
PaymentClient client)
: BaseGrpcService(contextAccessor, licenseGuard: licenseGuard)
{
private readonly PaymentClient _client = client;

public async Task<PaymentResponse> ProcessPaymentAsync(decimal amount, CancellationToken ct)
{
return await CallGrpcServiceAsync(
nameof(ProcessPaymentAsync),
meta => _client.ProcessPaymentAsync(
new PaymentRequest { Amount = amount },
meta),
null);
}
}

What CallGrpcServiceAsync provides:

  • Metadata creation: correlation ID, tenant ID, API key, auth token
  • Retry policy: exponential backoff, max 3 attempts
  • Timeout: 10 seconds by default
  • Circuit breaker: opens after 5 failures, half-open after 30 seconds
  • OpenTelemetry tracing: activity per call with status tags
  • License validation: Premium.Grpc feature check
  • Telemetry: request duration, status code, tenant ID

Forwarded Metadata

When enabled in config, gRPC client calls automatically append:

authorization: Bearer <token>                    (if ForwardAuthToken = true)
x-tenant-id: <tenant-id> (if ForwardTenantId = true)
x-correlation-id: <correlation-id> (always, when present)

Existing headers are preserved and not overwritten.


Muonroi.Http

NuGet: Muonroi.Http | Tier: Commercial | Distribution: Private NuGet feed

Purpose

HTTP/REST client infrastructure with resilience, auth context forwarding, correlation ID propagation, and standardized error handling.

Key Components

ClassPurpose
BaseApiServiceAbstract base for API services with resilience pipeline support
AuthenticateHeaderHandlerDelegatingHandler: auto-injects Authorization header
CorrelationIdHandlerDelegatingHandler: auto-injects X-Correlation-ID header

DI Registration

services.AddHttpClient<IPaymentApi, PaymentApiService>()
.AddHttpMessageHandler<AuthenticateHeaderHandler>()
.AddHttpMessageHandler<CorrelationIdHandler>();

BaseApiService Usage

public class PaymentApiService(
IHttpClientFactory httpClientFactory,
IAuthenticateInfoContext authContext,
IMLog<BaseApiService> logger)
: BaseApiService(httpClientFactory, authContext, logger)
{
public async Task<PaymentResponse> ChargeAsync(
decimal amount,
ResiliencePipeline<HttpResponseMessage> pipeline,
CancellationToken ct)
{
var request = new HttpRequestMessage(HttpMethod.Post, "https://api.payment.com/charges")
{
Content = new StringContent(
JsonSerializer.Serialize(new { amount }),
Encoding.UTF8,
"application/json")
};

return await SendAsync<PaymentResponse>(
"PaymentClient",
request,
pipeline,
ct);
}
}

Muonroi.SignalR

NuGet: Muonroi.SignalR | Tier: Commercial | Distribution: Private NuGet feed

Purpose

SignalR hub registration with tenant-aware connection filtering, automatic context resolution, and multi-tenant enforcement.

Key Components

ClassPurpose
SignalRServiceCollectionExtensionsAddSignalRWithTenant() DI extension
TenantHubFilterIHubFilter: resolves + validates tenant ID for each hub invocation
MUiEngineHubSchema change notification hub (rule engine runtime hot-reload)
IUiEngineSchemaNotifierSends schema changes to connected clients

DI Registration

services.AddSignalRWithTenant(configuration);
// Automatically configures tenant filtering if MultiTenantConfigs:Enabled = true

Custom Hub Example

public class ChatHub(ISystemExecutionContextAccessor contextAccessor) : Hub
{
public async Task SendMessage(string message)
{
string? tenantId = contextAccessor.Get().TenantId;
string userId = contextAccessor.Get().UserId ?? "unknown";

// Broadcast to all clients in tenant context
await Clients.All.SendAsync("Receive", tenantId, userId, message);
}

public async Task JoinGroup(string groupName)
{
string? tenantId = contextAccessor.Get().TenantId;
string groupKey = $"{tenantId}:{groupName}";
await Groups.AddToGroupAsync(Context.ConnectionId, groupKey);
}
}

// Startup:
app.MapHub<ChatHub>("/hubs/chat");

Tenant Validation

When MultiTenantConfigs.Enabled = true:

  1. TenantHubFilter resolves tenant from headers/claims
  2. Claims tenant ID is validated against resolved tenant ID (if present)
  3. Missing tenant ID throws HubException("Tenant ID is required.")
  4. Tenant mismatch throws HubException("Tenant mismatch.")
  5. TenantContext.CurrentTenantId is set for the invocation

Schema Notifier (Rule Engine Integration)

public class RuleEngineSchemaService(IUiEngineSchemaNotifier notifier)
{
public async Task NotifySchemaChange(string tenantId, CancellationToken ct)
{
await notifier.NotifySchemaChangeAsync(tenantId, ct);
// Sends real-time update to all connected UI clients for the tenant
}
}