Functorium’s Usecase Pipeline system separates cross-cutting concerns into a Mediator IPipelineBehavior<TRequest, TResponse> chain. This document defines the 8 default Pipeline behaviors, custom extension points, the PipelineConfigurator API, and OpenTelemetry configuration types.
Type Namespace Description UsecasePipelineBase<TRequest>Functorium.Adapters.PipelinesCommon base class for all Pipelines UsecaseMetricsPipeline<TRequest, TResponse>Same Automatic metrics collection Pipeline UsecaseTracingPipeline<TRequest, TResponse>Same Distributed tracing Pipeline UsecaseLoggingPipeline<TRequest, TResponse>Same Structured logging Pipeline UsecaseValidationPipeline<TRequest, TResponse>Same FluentValidation validation Pipeline UsecaseCachingPipeline<TRequest, TResponse>Same Query caching Pipeline UsecaseExceptionPipeline<TRequest, TResponse>Same Exception to FinResponse.Fail conversion Pipeline UsecaseTransactionPipeline<TRequest, TResponse>Same Transaction + UoW + domain event Pipeline ICustomUsecasePipelineSame Custom Pipeline marker interface UsecaseMetricCustomPipelineBase<TRequest>Same Custom metrics Pipeline base UsecaseTracingCustomPipelineBase<TRequest>Same Custom tracing Pipeline base PipelineConfiguratorFunctorium.Adapters.Observabilities.Builders.ConfiguratorsPipeline enable/disable Fluent API OpenTelemetryBuilderFunctorium.Adapters.Observabilities.BuildersOpenTelemetry configuration main Builder LoggingConfiguratorSame (Configurators) Serilog extension configuration MetricsConfiguratorSame (Configurators) Metrics extension configuration TracingConfiguratorSame (Configurators) Tracing extension configuration OpenTelemetryOptionsFunctorium.Adapters.ObservabilitiesOTLP endpoint/protocol configuration ObservabilityNamingFunctorium.Adapters.Observabilities.NamingObservability naming constants
Pipelines execute from outside (Request side) to inside (Handler side) according to DI registration order. Different Pipelines apply to Commands and Queries.
Command execution order:
Request → Metrics → Tracing → Logging → Validation → Exception → Transaction → Custom → Handler
Query execution order:
Request → Metrics → Tracing → Logging → Validation → Caching → Exception → Custom → Handler
Order Pipeline Command Query Description 1 Metrics O O Collects request/response counts and processing time 2 Tracing O O Creates Activity Span and records tags 3 Logging O O Request/response structured logging 4 Validation O O FluentValidation validation 5 Caching - O IMemoryCache caching when ICacheable is implemented 6 Exception O O Exception to FinResponse.Fail conversion 7 Transaction O - UoW.SaveChanges + domain event publishing 8 Custom O O User-defined Pipeline 9 Handler O O Actual Usecase Handler
Transaction Pipeline applies only to Commands via the where TRequest : ICommand<TResponse> constraint.
Caching Pipeline applies only to Queries via the where TRequest : IQuery<TResponse> constraint.
An abstract base class inherited by all Pipelines. Provides common utilities such as request type analysis, handler name extraction, and error information extraction.
public abstract partial class UsecasePipelineBase <TRequest>
Method Return Type Description GetRequestCategoryType<T>(T request)stringIdentifies CQRS type from request instance (command, query, unknown) GetRequestCategoryType(Type requestType)stringIdentifies CQRS type from request Type GetRequestHandlerPath()stringReturns the FullName of TRequest (full path including namespace) GetRequestHandler()stringExtracts handler class name from the FullName of TRequest GetRequestHandlerLower()stringLowercase conversion of GetRequestHandler() (for metric naming) GetErrorInfo(Error error)(string ErrorType, string ErrorCode)Extracts type/code information from error
GetRequestCategoryType determines the type by checking whether ICommandRequest<> / IQueryRequest<> interfaces are implemented.
GetRequestHandler() parses nested types (+) and namespaces (.) from typeof(TRequest).FullName to extract only the class name.
Converts exceptions to FinResponse.Fail to prevent exceptions from propagating outside the Pipeline.
internal sealed class UsecaseExceptionPipeline <TRequest, TResponse>
: UsecasePipelineBase<TRequest>, IPipelineBehavior<TRequest, TResponse>
where TRequest : IMessage
where TResponse : IFinResponseFactory<TResponse>
Item Description Constraint TResponse : IFinResponseFactory<TResponse>Behavior Catches exceptions via try-catch and returns TResponse.CreateFail(AdapterError.FromException(...)) Error Type AdapterErrorType.PipelineException
Executes FluentValidation IValidator<TRequest> and returns FinResponse.Fail on validation failure.
internal sealed class UsecaseValidationPipeline <TRequest, TResponse>
: UsecasePipelineBase<TRequest>, IPipelineBehavior<TRequest, TResponse>
where TRequest : IMessage
where TResponse : IFinResponseFactory<TResponse>
Item Description DI Dependencies IEnumerable<IValidator<TRequest>>Behavior Passes through next() if no Validators exist; runs all Validators otherwise Error Type AdapterErrorType.PipelineValidation(PropertyName)Multiple errors Returns Error.Many(errors) when there are 2 or more validation failures
Records request/response information via structured logging. Automatically pushes custom attributes if IUsecaseCtxEnricher is registered in DI.
internal sealed class UsecaseLoggingPipeline <TRequest, TResponse>
: UsecasePipelineBase<TRequest>, IPipelineBehavior<TRequest, TResponse>
where TRequest : IMessage
where TResponse : IFinResponse, IFinResponseFactory<TResponse>
Item Description DI Dependencies ILogger<UsecaseLoggingPipeline<TRequest, TResponse>>, IUsecaseCtxEnricher<TRequest, TResponse>? (optional)Request log Information level, {Layer} {Category} {CategoryType} {Handler} {Method} requesting Response log (success) Information level, responded success in {Elapsed:0.0000} ms Response log (Expected error) Warning level, responded failure ... with {@Error} Response log (Exceptional error) Error level, responded failure ... with {@Error}
Creates distributed tracing Spans using OpenTelemetry ActivitySource.
internal sealed class UsecaseTracingPipeline <TRequest, TResponse>
: UsecasePipelineBase<TRequest>, IPipelineBehavior<TRequest, TResponse>
where TRequest : IMessage
where TResponse : IFinResponse, IFinResponseFactory<TResponse>
Item Description DI Dependencies ActivitySourceSpan name {layer} {category}.{categoryType} {handler}.{method}ActivityKind InternalRequest tags request.layer, request.category.name, request.category.type, request.handler.name, request.handler.methodResponse tags (success) response.status = success, ActivityStatusCode.OkResponse tags (failure) response.status = failure, error.type, error.code, ActivityStatusCode.ErrorTime tag response.elapsed (in seconds)
Collects request counts, response counts, and processing time via OpenTelemetry Meter.
internal sealed class UsecaseMetricsPipeline <TRequest, TResponse>
: UsecasePipelineBase<TRequest>, IPipelineBehavior<TRequest, TResponse>, IDisposable
where TRequest : IMessage
where TResponse : IFinResponse, IFinResponseFactory<TResponse>
Item Description DI Dependencies IOptions<OpenTelemetryOptions>, IMeterFactoryMeter name {ServiceNamespace}.applicationCounter (requests) application.usecase.{categoryType}.requests (unit: {request})Counter (responses) application.usecase.{categoryType}.responses (unit: {response})Histogram (duration) application.usecase.{categoryType}.duration (unit: s)Request tags request.layer, request.category.name, request.category.type, request.handler.name, request.handler.methodResponse tags (success) Request tags + response.status = success Response tags (failure) Request tags + response.status = failure + error.type + error.code
Automatically handles explicit transactions, UoW.SaveChanges, and domain event publishing for Command Usecases.
internal sealed class UsecaseTransactionPipeline <TRequest, TResponse>
: UsecasePipelineBase<TRequest>, IPipelineBehavior<TRequest, TResponse>
where TRequest : ICommand<TResponse>
where TResponse : IFinResponse, IFinResponseFactory<TResponse>
Item Description Constraint TRequest : ICommand<TResponse> (Command only)DI Dependencies IUnitOfWork, IDomainEventPublisher, ILoggerExecution order 1) Begin transaction → 2) Execute Handler → 3) Rollback on failure → 4) SaveChanges → 5) Commit → 6) Publish domain events Failure handling Returns TResponse.CreateFail(error) on Handler failure or SaveChanges failure, with automatic transaction rollback
Performs IMemoryCache-based caching for Query requests that implement ICacheable.
internal sealed class UsecaseCachingPipeline <TRequest, TResponse>
: UsecasePipelineBase<TRequest>, IPipelineBehavior<TRequest, TResponse>
where TRequest : IQuery<TResponse>
where TResponse : IFinResponse, IFinResponseFactory<TResponse>
Item Description Constraint TRequest : IQuery<TResponse> (Query only)DI Dependencies IMemoryCache (services.AddMemoryCache() required)Cache key ICacheable.CacheKeyCache duration ICacheable.Duration (defaults to 5 minutes if null)Behavior Returns immediately on cache hit; on cache miss, executes Handler and caches only successful responses
ICacheable interface:
public interface ICacheable
TimeSpan? Duration { get ; }
Custom Pipeline defined by the user. Executes after the default Pipelines (Exception, Validation, etc.) and just before the Handler. Implementing the ICustomUsecasePipeline marker interface enables automatic assembly scanning registration.
A marker interface for Scrutor auto-discovery registration.
public interface ICustomUsecasePipeline { }
Using AddCustomPipeline<T>() explicitly registers types implementing this interface in DI.
A base class for creating per-Usecase individual Metrics. Automatically identifies the CQRS type from the TRequest type.
public abstract class UsecaseMetricCustomPipelineBase <TRequest>
: UsecasePipelineBase<TRequest>, ICustomUsecasePipeline
Member Description protected readonly Meter _meter{ServiceNamespace}.application Meter instanceprotected const string DurationUnit"s"protected const string CountUnit"requests"GetMetricName(string metricName)Returns application.usecase.{cqrs}.{handler}.{metricName} format GetMetricNameWithoutHandler(string metricName)Returns application.usecase.{cqrs}.{metricName} format
Constructor:
protected UsecaseMetricCustomPipelineBase ( string serviceNamespace, IMeterFactory meterFactory)
RequestDuration helper:
public class RequestDuration : IDisposable
public RequestDuration (Histogram< double > histogram);
public void Dispose (); // Automatically records elapsed time to histogram
Used with the using statement to automatically perform time measurement and Histogram recording.
A base class for creating per-Usecase custom Tracing. Creates custom Activities (Spans) via ActivitySource and sets standard tags.
public abstract class UsecaseTracingCustomPipelineBase <TRequest>
: UsecasePipelineBase<TRequest>, ICustomUsecasePipeline
Member Description protected readonly ActivitySource _activitySourceActivitySource used for creating Activities StartCustomActivity(string operationName, ActivityKind kind)Creates custom Activity in {prefix}.{operationName} format GetActivityName(string operationName)Gets Activity name ({prefix}.{operationName}) SetStandardRequestTags(Activity activity, string method)Sets 5 standard Request tags
Constructor:
protected UsecaseTracingCustomPipelineBase (ActivitySource activitySource)
Activity name prefix: {layer} {category}.{categoryType} {handler}
If a parent Activity.Current exists, it is created as a child Span.
PipelineConfigurator enables/disables individual Pipelines via Fluent API and adds custom Pipelines.
public class PipelineConfigurator
Method Return Type Description UseObservability()PipelineConfiguratorBatch enable all 4 observability types (CtxEnricher, Metrics, Tracing, Logging) UseMetrics()PipelineConfiguratorEnable Metrics Pipeline (CtxEnricher auto-included) UseTracing()PipelineConfiguratorEnable Tracing Pipeline (CtxEnricher auto-included) UseLogging()PipelineConfiguratorEnable Logging Pipeline (CtxEnricher auto-included) UseValidation()PipelineConfiguratorEnable Validation Pipeline UseCaching()PipelineConfiguratorEnable Caching Pipeline (IMemoryCache DI registration required) UseException()PipelineConfiguratorEnable Exception Pipeline UseTransaction()PipelineConfiguratorEnable Transaction Pipeline (IUnitOfWork, IDomainEventPublisher, IDomainEventCollector DI registration required)
Method Return Type Description WithLifetime(ServiceLifetime lifetime)PipelineConfiguratorSet Pipeline service Lifetime (default: Scoped) AddCustomPipeline<TPipeline>()PipelineConfiguratorExplicitly register custom Pipeline by type (deterministic guarantee of Pipeline execution order)
// Enable observability + validation + exception handling
. RegisterOpenTelemetry (configuration, Assembly . GetExecutingAssembly ())
. ConfigurePipelines (pipelines => pipelines
. UseObservability () // Batch enable CtxEnricher, Metrics, Tracing, Logging
// Selective enabling + custom Pipeline registration
. RegisterOpenTelemetry (configuration, Assembly . GetExecutingAssembly ())
. ConfigurePipelines (pipelines => pipelines
. AddCustomPipeline <PlaceOrderCommandMetricPipeline>()
. WithLifetime ( ServiceLifetime . Scoped ))
Inside Apply(), Pipelines are registered to IPipelineBehavior<,> in the following order.
Metrics
Tracing
Logging
Validation
Caching
Exception
Transaction
Custom (registered in the order AddCustomPipeline<T>() is called)
Handler
Transaction Pipeline auto-detection: Even if UseTransaction() is called, registration is skipped if IUnitOfWork, IDomainEventPublisher, and IDomainEventCollector are not all registered in DI.
The main Builder class for OpenTelemetry configuration. Configures Serilog, Metrics, Tracing, and Pipeline settings via chaining.
public partial class OpenTelemetryBuilder
// IServiceCollection extension method
public static OpenTelemetryBuilder RegisterOpenTelemetry (
this IServiceCollection services,
IConfiguration configuration,
Assembly projectAssembly)
Method Return Type Description ConfigureLogging(Action<LoggingConfigurator> configure)OpenTelemetryBuilderSerilog extension configuration ConfigureMetrics(Action<MetricsConfigurator> configure)OpenTelemetryBuilderOpenTelemetry Metrics extension configuration ConfigureTracing(Action<TracingConfigurator> configure)OpenTelemetryBuilderOpenTelemetry Tracing extension configuration ConfigurePipelines(Action<PipelineConfigurator> configure)OpenTelemetryBuilderPipeline configuration (explicit opt-in required) ConfigureStartupLogger(Action<ILogger> configure)OpenTelemetryBuilderAdditional logging configuration at startup WithAdapterObservability(bool enable = true)OpenTelemetryBuilderEnable/disable Adapter observability (default: true) Build()IServiceCollectionReturns IServiceCollection after applying all settings
Read OpenTelemetryOptions (IOptions<OpenTelemetryOptions>)
Create Resource Attributes
Configure Serilog (ReadFrom.Configuration + WriteTo.OpenTelemetry + ErrorsDestructuringPolicy)
Configure CtxEnricherContext PushProperty factory
Configure OpenTelemetry (Metrics + Tracing + OTLP Exporter)
Register Adapter Observability (ActivitySource, IMeterFactory)
Register Usecase Pipeline
Register StartupLogger IHostedService
A Builder class for Serilog extension configuration.
public class LoggingConfigurator
Member Description OptionsOpenTelemetryOptions Access propertyAddDestructuringPolicy<TPolicy>()IDestructuringPolicy Register implementation typeAddEnricher(ILogEventEnricher enricher)Register Enricher instance AddEnricher<TEnricher>()Register Enricher type Configure(Action<LoggerConfiguration> configure)LoggerConfiguration Direct access
A Builder class for OpenTelemetry Metrics extension configuration.
public class MetricsConfigurator
Member Description OptionsOpenTelemetryOptions Access propertyAddMeter(string meterName)Register additional Meter (wildcard supported: "MyApp.*") AddInstrumentation(Action<MeterProviderBuilder> configure)Register additional Instrumentation Configure(Action<MeterProviderBuilder> configure)MeterProviderBuilder Direct access
A Builder class for OpenTelemetry Tracing extension configuration.
public class TracingConfigurator
Member Description OptionsOpenTelemetryOptions Access propertyAddSource(string sourceName)Register additional ActivitySource (wildcard supported: "MyApp.*") AddProcessor(BaseProcessor<Activity> processor)Register additional Processor Configure(Action<TracerProviderBuilder> configure)TracerProviderBuilder Direct access
A configuration class bound to the "OpenTelemetry" section in appsettings.json.
public sealed class OpenTelemetryOptions : IStartupOptionsLogger, IOpenTelemetryOptions
Property Type Default Description ServiceNamespacestring""Service namespace (group) ServiceNamestring""Service name ServiceVersionstring(assembly version) Service version (auto-configured) ServiceInstanceIdstring(hostname) Service instance ID (auto-configured) CollectorEndpointstring""Unified OTLP Collector endpoint TracingEndpointstring?nullTracing-specific endpoint (uses CollectorEndpoint if null) MetricsEndpointstring?nullMetrics-specific endpoint (uses CollectorEndpoint if null) LoggingEndpointstring?nullLogging-specific endpoint (uses CollectorEndpoint if null) CollectorProtocolstring"Grpc"Unified OTLP Protocol TracingProtocolstring?nullTracing-specific Protocol MetricsProtocolstring?nullMetrics-specific Protocol LoggingProtocolstring?nullLogging-specific Protocol SamplingRatedouble1.0Tracing sampling rate (0.0 ~ 1.0) EnablePrometheusExporterboolfalseEnable Prometheus Exporter
The resolution rules for individual endpoints (TracingEndpoint, MetricsEndpoint, LoggingEndpoint) are as follows.
Value Behavior nullUses CollectorEndpoint (default behavior) "" (empty string)Disables that signal "http://..."Uses that endpoint
Method Return Type Description GetTracingProtocol()OtlpCollectorProtocolTracing Protocol (individual setting takes priority) GetMetricsProtocol()OtlpCollectorProtocolMetrics Protocol (individual setting takes priority) GetLoggingProtocol()OtlpCollectorProtocolLogging Protocol (individual setting takes priority) GetTracingEndpoint()stringTracing endpoint (resolution rules applied) GetMetricsEndpoint()stringMetrics endpoint (resolution rules applied) GetLoggingEndpoint()stringLogging endpoint (resolution rules applied)
public sealed class OtlpCollectorProtocol : SmartEnum<OtlpCollectorProtocol>
Constant Value Description Grpc1 gRPC protocol (default) HttpProtobuf2 HTTP/Protobuf protocol
A FluentValidation-based options validator.
Rule Description ServiceNamespaceRequired (NotEmpty) ServiceNameRequired (NotEmpty) Endpoints CollectorEndpoint or at least one individual endpoint requiredSamplingRate0.0 ~ 1.0 Range Protocol SmartEnum valid value validation
"ServiceNamespace" : " mycompany.production " ,
"ServiceName" : " orderservice " ,
"CollectorEndpoint" : " http://localhost:4317 " ,
"CollectorProtocol" : " Grpc " ,
"EnablePrometheusExporter" : false
Defines unified naming conventions for observability. The single source of truth for metric names, tag keys, Span names, etc.
public static partial class ObservabilityNaming
Constant Value Description Application"application"Application Layer Adapter"adapter"Adapter Layer
Constant Value Description Usecase"usecase"Usecase category Repository"repository"Repository category Event"event"Event category Unknown"unknown"Unknown category
Constant Value Description Command"command"Command type Query"query"Query type Event"event"Event type Unknown"unknown"Unknown type
Constant Value Description Success"success"Success Failure"failure"Failure
Constant Value Description Expected"expected"Expected business error (IsExpected = true) Exceptional"exceptional"Exceptional system error (IsExceptional = true) Aggregate"aggregate"Aggregate error (ManyErrors)
Constant Value Description Handle"Handle"Usecase Handler method Publish"Publish"Event publishing method PublishTrackedEvents"PublishTrackedEvents"Tracked event publishing method
Constant Value ErrorType"error.type"ServiceNamespace"service.namespace"ServiceName"service.name"ServiceVersion"service.version"ServiceInstanceId"service.instance.id"DeploymentEnvironment"deployment.environment"
Constant Value Purpose RequestMessage"request.message"Request message RequestParams"request.params"Request parameters RequestLayer"request.layer"Request layer RequestCategoryName"request.category.name"Request category name RequestCategoryType"request.category.type"Request category type RequestHandlerName"request.handler.name"Request handler name RequestHandlerMethod"request.handler.method"Request handler method ResponseMessage"response.message"Response message ResponseStatus"response.status"Response status ResponseElapsed"response.elapsed"Response elapsed time ErrorCode"error.code"Error code
Method Example Metrics.UsecaseRequest("command")"application.usecase.command.requests"Metrics.UsecaseResponse("query")"application.usecase.query.responses"Metrics.UsecaseDuration("command")"application.usecase.command.duration"Metrics.AdapterRequest("repository")"adapter.repository.requests"Metrics.AdapterResponse("repository")"adapter.repository.responses"Metrics.AdapterDuration("repository")"adapter.repository.duration"
Method Example Spans.OperationName("adapter", "repository", "OrderRepository", "GetById")"adapter repository OrderRepository.GetById"
Scope ID Name Application Request 1001 application.requestApplication Response (Success) 1002 application.response.successApplication Response (Warning) 1003 application.response.warningApplication Response (Error) 1004 application.response.errorAdapter Request 2001 adapter.requestAdapter Response (Success) 2002 adapter.response.successAdapter Response (Warning) 2003 adapter.response.warningAdapter Response (Error) 2004 adapter.response.error