Skip to content

Observability Implementation Results

List of Observable wrappers generated by the [GenerateObservablePort] Source Generator.

Original ClassObservable WrapperPort
AIModelRepositoryInMemoryAIModelRepositoryInMemoryObservableIAIModelRepository
DeploymentRepositoryInMemoryDeploymentRepositoryInMemoryObservableIDeploymentRepository
AssessmentRepositoryInMemoryAssessmentRepositoryInMemoryObservableIAssessmentRepository
IncidentRepositoryInMemoryIncidentRepositoryInMemoryObservableIIncidentRepository
UnitOfWorkInMemoryUnitOfWorkInMemoryObservableIUnitOfWork
Original ClassObservable WrapperPort
AIModelRepositoryEfCoreAIModelRepositoryEfCoreObservableIAIModelRepository
DeploymentRepositoryEfCoreDeploymentRepositoryEfCoreObservableIDeploymentRepository
AssessmentRepositoryEfCoreAssessmentRepositoryEfCoreObservableIAssessmentRepository
IncidentRepositoryEfCoreIncidentRepositoryEfCoreObservableIIncidentRepository
UnitOfWorkEfCoreUnitOfWorkEfCoreObservableIUnitOfWork
Original ClassObservable WrapperPort
AIModelQueryInMemoryAIModelQueryInMemoryObservableIAIModelQuery
AIModelDetailQueryInMemoryAIModelDetailQueryInMemoryObservableIModelDetailQuery
DeploymentQueryInMemoryDeploymentQueryInMemoryObservableIDeploymentQuery
DeploymentDetailQueryInMemoryDeploymentDetailQueryInMemoryObservableIDeploymentDetailQuery
IncidentQueryInMemoryIncidentQueryInMemoryObservableIIncidentQuery
Original ClassIO PatternPort
ModelHealthCheckServiceTimeout + CatchIModelHealthCheckService
ModelMonitoringServiceRetry + ScheduleIModelMonitoringService
ParallelComplianceCheckServiceFork + awaitAllIParallelComplianceCheckService
ModelRegistryServiceBracketIModelRegistryService

Observable Port Total: 19 (InMemory Repository 5 + EfCore Repository 5 + Query 5 + External Service 4)


The Functorium Pipeline’s UseCtxEnricher() middleware automatically propagates properties of Command/Query Requests and DomainEvents as ctx.* fields.

Commandctx.* FieldPillar
RegisterModelCommandctx.register_model_command.request.nameDefault(L+T)
ctx.register_model_command.request.versionDefault(L+T)
ctx.register_model_command.request.purposeLogging
CreateDeploymentCommandctx.create_deployment_command.request.model_idDefault(L+T)
ctx.create_deployment_command.request.environmentAll(L+T+M)
ctx.create_deployment_command.request.drift_thresholdDefault+MetricsValue
ReportIncidentCommandctx.report_incident_command.request.severityAll(L+T+M)
ctx.report_incident_command.request.descriptionLogging
Eventctx.* FieldPillar
AIModel.RiskClassifiedEventctx.risk_classified_event.model_idDefault(L+T)
ctx.risk_classified_event.new_risk_tierDefault(L+T)
ModelIncident.ReportedEventctx.reported_event.severityAll(L+T+M)
ctx.reported_event.deployment_idDefault(L+T)

Observability Pipeline Configuration Status

Section titled “Observability Pipeline Configuration Status”
services
.RegisterOpenTelemetry(configuration, AssemblyReference.Assembly)
.ConfigurePipelines(pipelines => pipelines
.UseObservability() // Batch enable CtxEnricher + Metrics + Tracing + Logging
.UseValidation()
.UseException())
.Build();

UseObservability() batch enables all 4 observability types. The remaining pipelines are registered via explicit opt-in:

OrderMiddlewareCollection TargetOutput
1UseObservability() -> MetricsCounter(requests, responses), Histogram(duration)Prometheus / OTLP
2UseObservability() -> TracingActivity Span (entry/exit/tags)Jaeger / OTLP
3UseObservability() -> CtxEnricherRequest/Response/Event propertiesLogContext + Activity.SetTag + MetricsTagContext
4UseObservability() -> LoggingStructured logs (Serilog)Console / OTLP
5UseValidation()FluentValidation-based request validationValidation Error
6UseException()Exception -> DomainError/AdapterError conversionerror.type, error.code tags
services.AddMediator(options =>
{
options.ServiceLifetime = ServiceLifetime.Scoped;
options.NotificationPublisherType =
typeof(ObservableDomainEventNotificationPublisher);
});
services.RegisterDomainEventPublisher();
services.RegisterDomainEventHandlersFromAssembly(
AiGovernance.Application.AssemblyReference.Assembly);

ObservableDomainEventNotificationPublisher automatically performs metrics, tracing, and logging when publishing DomainEvents. EventHandler observability is provided through Mediator’s NotificationPublisherType.


IO patterns and observability integration status for the 4 external services.

IO.liftAsync -> .Timeout(10s) -> .Catch(TimedOut -> fallback) -> .Catch(Exceptional -> error)
|
[GenerateObservablePort] -> Observable wrapper auto-generated
|
adapter.external_service.requests (Counter)
adapter.external_service.duration (Histogram)
adapter.external_service.responses (Counter + error.type)
IO.liftAsync -> .Retry(exponential|jitter|recurs|maxDelay) -> .Catch(Exceptional -> error)
|
Observable wrapper: observes only the final result
Retry attempt count tracked via internal logging

ParallelComplianceCheckService (Fork + awaitAll)

Section titled “ParallelComplianceCheckService (Fork + awaitAll)”
CriterionNames.Map(name => CheckSingle(name).Fork()) -> awaitAll(forks) -> .Map(aggregate)
|
Observable wrapper: observes the entire parallel check result as a single call
Individual criterion checks execute inside Fork (not individually observed)
acquireSession.Bracket(Use: lookupModel, Fin: releaseSession) -> .Catch(Exceptional -> error)
|
Observable wrapper: observes the entire Bracket as a single call
Acquire/Use/Release sub-steps tracked via internal logging

ItemCount
Observable Port (InMemory Repository)5
Observable Port (EfCore Repository)5
Observable Port (Query)5
Observable Port (External Service)4
Observable Port Total19
Pipeline Middleware5 (Metrics, Tracing, CtxEnricher, Logging, Exception)
DomainEvent ObservabilityObservableDomainEventNotificationPublisher
ctx.* MetricsTag Fields2 (Environment, Severity)
ctx.* MetricsValue Fields1 (DriftThreshold)
ctx.* Default(L+T) Fields8+ (ModelId, DeploymentId, Name, Version, etc.)
ctx.* Logging Only Fields2 (Purpose, Description)