Skip to content

Pipeline Specification

Functorium’s Usecase Pipeline system separates cross-cutting concerns into a Mediator IPipelineBehavior<TRequest, TResponse> chain. This document defines the 8 default Pipeline behaviors, custom extension points, the PipelineConfigurator API, and OpenTelemetry configuration types.

TypeNamespaceDescription
UsecasePipelineBase<TRequest>Functorium.Adapters.PipelinesCommon base class for all Pipelines
UsecaseMetricsPipeline<TRequest, TResponse>SameAutomatic metrics collection Pipeline
UsecaseTracingPipeline<TRequest, TResponse>SameDistributed tracing Pipeline
UsecaseLoggingPipeline<TRequest, TResponse>SameStructured logging Pipeline
UsecaseValidationPipeline<TRequest, TResponse>SameFluentValidation validation Pipeline
UsecaseCachingPipeline<TRequest, TResponse>SameQuery caching Pipeline
UsecaseExceptionPipeline<TRequest, TResponse>SameException to FinResponse.Fail conversion Pipeline
UsecaseTransactionPipeline<TRequest, TResponse>SameTransaction + UoW + domain event Pipeline
ICustomUsecasePipelineSameCustom Pipeline marker interface
UsecaseMetricCustomPipelineBase<TRequest>SameCustom metrics Pipeline base
UsecaseTracingCustomPipelineBase<TRequest>SameCustom tracing Pipeline base
PipelineConfiguratorFunctorium.Adapters.Observabilities.Builders.ConfiguratorsPipeline enable/disable Fluent API
OpenTelemetryBuilderFunctorium.Adapters.Observabilities.BuildersOpenTelemetry configuration main Builder
LoggingConfiguratorSame (Configurators)Serilog extension configuration
MetricsConfiguratorSame (Configurators)Metrics extension configuration
TracingConfiguratorSame (Configurators)Tracing extension configuration
OpenTelemetryOptionsFunctorium.Adapters.ObservabilitiesOTLP endpoint/protocol configuration
ObservabilityNamingFunctorium.Adapters.Observabilities.NamingObservability naming constants

Pipelines execute from outside (Request side) to inside (Handler side) according to DI registration order. Different Pipelines apply to Commands and Queries.

Command execution order:

Request → Metrics → Tracing → Logging → Validation → Exception → Transaction → Custom → Handler

Query execution order:

Request → Metrics → Tracing → Logging → Validation → Caching → Exception → Custom → Handler
OrderPipelineCommandQueryDescription
1MetricsOOCollects request/response counts and processing time
2TracingOOCreates Activity Span and records tags
3LoggingOORequest/response structured logging
4ValidationOOFluentValidation validation
5Caching-OIMemoryCache caching when ICacheable is implemented
6ExceptionOOException to FinResponse.Fail conversion
7TransactionO-UoW.SaveChanges + domain event publishing
8CustomOOUser-defined Pipeline
9HandlerOOActual Usecase Handler
  • Transaction Pipeline applies only to Commands via the where TRequest : ICommand<TResponse> constraint.
  • Caching Pipeline applies only to Queries via the where TRequest : IQuery<TResponse> constraint.

An abstract base class inherited by all Pipelines. Provides common utilities such as request type analysis, handler name extraction, and error information extraction.

public abstract partial class UsecasePipelineBase<TRequest>
MethodReturn TypeDescription
GetRequestCategoryType<T>(T request)stringIdentifies CQRS type from request instance (command, query, unknown)
GetRequestCategoryType(Type requestType)stringIdentifies CQRS type from request Type
GetRequestHandlerPath()stringReturns the FullName of TRequest (full path including namespace)
GetRequestHandler()stringExtracts handler class name from the FullName of TRequest
GetRequestHandlerLower()stringLowercase conversion of GetRequestHandler() (for metric naming)
GetErrorInfo(Error error)(string ErrorType, string ErrorCode)Extracts type/code information from error
  • GetRequestCategoryType determines the type by checking whether ICommandRequest<> / IQueryRequest<> interfaces are implemented.
  • GetRequestHandler() parses nested types (+) and namespaces (.) from typeof(TRequest).FullName to extract only the class name.

Converts exceptions to FinResponse.Fail to prevent exceptions from propagating outside the Pipeline.

internal sealed class UsecaseExceptionPipeline<TRequest, TResponse>
: UsecasePipelineBase<TRequest>, IPipelineBehavior<TRequest, TResponse>
where TRequest : IMessage
where TResponse : IFinResponseFactory<TResponse>
ItemDescription
ConstraintTResponse : IFinResponseFactory<TResponse>
BehaviorCatches exceptions via try-catch and returns TResponse.CreateFail(AdapterError.FromException(...))
Error TypeAdapterErrorType.PipelineException

Executes FluentValidation IValidator<TRequest> and returns FinResponse.Fail on validation failure.

internal sealed class UsecaseValidationPipeline<TRequest, TResponse>
: UsecasePipelineBase<TRequest>, IPipelineBehavior<TRequest, TResponse>
where TRequest : IMessage
where TResponse : IFinResponseFactory<TResponse>
ItemDescription
DI DependenciesIEnumerable<IValidator<TRequest>>
BehaviorPasses through next() if no Validators exist; runs all Validators otherwise
Error TypeAdapterErrorType.PipelineValidation(PropertyName)
Multiple errorsReturns Error.Many(errors) when there are 2 or more validation failures

Records request/response information via structured logging. Automatically pushes custom attributes if IUsecaseCtxEnricher is registered in DI.

internal sealed class UsecaseLoggingPipeline<TRequest, TResponse>
: UsecasePipelineBase<TRequest>, IPipelineBehavior<TRequest, TResponse>
where TRequest : IMessage
where TResponse : IFinResponse, IFinResponseFactory<TResponse>
ItemDescription
DI DependenciesILogger<UsecaseLoggingPipeline<TRequest, TResponse>>, IUsecaseCtxEnricher<TRequest, TResponse>? (optional)
Request logInformation level, {Layer} {Category} {CategoryType} {Handler} {Method} requesting
Response log (success)Information level, responded success in {Elapsed:0.0000} ms
Response log (Expected error)Warning level, responded failure ... with {@Error}
Response log (Exceptional error)Error level, responded failure ... with {@Error}

Creates distributed tracing Spans using OpenTelemetry ActivitySource.

internal sealed class UsecaseTracingPipeline<TRequest, TResponse>
: UsecasePipelineBase<TRequest>, IPipelineBehavior<TRequest, TResponse>
where TRequest : IMessage
where TResponse : IFinResponse, IFinResponseFactory<TResponse>
ItemDescription
DI DependenciesActivitySource
Span name{layer} {category}.{categoryType} {handler}.{method}
ActivityKindInternal
Request tagsrequest.layer, request.category.name, request.category.type, request.handler.name, request.handler.method
Response tags (success)response.status = success, ActivityStatusCode.Ok
Response tags (failure)response.status = failure, error.type, error.code, ActivityStatusCode.Error
Time tagresponse.elapsed (in seconds)

Collects request counts, response counts, and processing time via OpenTelemetry Meter.

internal sealed class UsecaseMetricsPipeline<TRequest, TResponse>
: UsecasePipelineBase<TRequest>, IPipelineBehavior<TRequest, TResponse>, IDisposable
where TRequest : IMessage
where TResponse : IFinResponse, IFinResponseFactory<TResponse>
ItemDescription
DI DependenciesIOptions<OpenTelemetryOptions>, IMeterFactory
Meter name{ServiceNamespace}.application
Counter (requests)application.usecase.{categoryType}.requests (unit: {request})
Counter (responses)application.usecase.{categoryType}.responses (unit: {response})
Histogram (duration)application.usecase.{categoryType}.duration (unit: s)
Request tagsrequest.layer, request.category.name, request.category.type, request.handler.name, request.handler.method
Response tags (success)Request tags + response.status = success
Response tags (failure)Request tags + response.status = failure + error.type + error.code

Automatically handles explicit transactions, UoW.SaveChanges, and domain event publishing for Command Usecases.

internal sealed class UsecaseTransactionPipeline<TRequest, TResponse>
: UsecasePipelineBase<TRequest>, IPipelineBehavior<TRequest, TResponse>
where TRequest : ICommand<TResponse>
where TResponse : IFinResponse, IFinResponseFactory<TResponse>
ItemDescription
ConstraintTRequest : ICommand<TResponse> (Command only)
DI DependenciesIUnitOfWork, IDomainEventPublisher, ILogger
Execution order1) Begin transaction → 2) Execute Handler → 3) Rollback on failure → 4) SaveChanges → 5) Commit → 6) Publish domain events
Failure handlingReturns TResponse.CreateFail(error) on Handler failure or SaveChanges failure, with automatic transaction rollback

Performs IMemoryCache-based caching for Query requests that implement ICacheable.

internal sealed class UsecaseCachingPipeline<TRequest, TResponse>
: UsecasePipelineBase<TRequest>, IPipelineBehavior<TRequest, TResponse>
where TRequest : IQuery<TResponse>
where TResponse : IFinResponse, IFinResponseFactory<TResponse>
ItemDescription
ConstraintTRequest : IQuery<TResponse> (Query only)
DI DependenciesIMemoryCache (services.AddMemoryCache() required)
Cache keyICacheable.CacheKey
Cache durationICacheable.Duration (defaults to 5 minutes if null)
BehaviorReturns immediately on cache hit; on cache miss, executes Handler and caches only successful responses

ICacheable interface:

public interface ICacheable
{
string CacheKey { get; }
TimeSpan? Duration { get; }
}

Custom Pipeline defined by the user. Executes after the default Pipelines (Exception, Validation, etc.) and just before the Handler. Implementing the ICustomUsecasePipeline marker interface enables automatic assembly scanning registration.


A marker interface for Scrutor auto-discovery registration.

public interface ICustomUsecasePipeline { }

Using AddCustomPipeline<T>() explicitly registers types implementing this interface in DI.

A base class for creating per-Usecase individual Metrics. Automatically identifies the CQRS type from the TRequest type.

public abstract class UsecaseMetricCustomPipelineBase<TRequest>
: UsecasePipelineBase<TRequest>, ICustomUsecasePipeline
MemberDescription
protected readonly Meter _meter{ServiceNamespace}.application Meter instance
protected const string DurationUnit"s"
protected const string CountUnit"requests"
GetMetricName(string metricName)Returns application.usecase.{cqrs}.{handler}.{metricName} format
GetMetricNameWithoutHandler(string metricName)Returns application.usecase.{cqrs}.{metricName} format

Constructor:

protected UsecaseMetricCustomPipelineBase(string serviceNamespace, IMeterFactory meterFactory)

RequestDuration helper:

public class RequestDuration : IDisposable
{
public RequestDuration(Histogram<double> histogram);
public void Dispose(); // Automatically records elapsed time to histogram
}

Used with the using statement to automatically perform time measurement and Histogram recording.

UsecaseTracingCustomPipelineBase<TRequest>

Section titled “UsecaseTracingCustomPipelineBase<TRequest>”

A base class for creating per-Usecase custom Tracing. Creates custom Activities (Spans) via ActivitySource and sets standard tags.

public abstract class UsecaseTracingCustomPipelineBase<TRequest>
: UsecasePipelineBase<TRequest>, ICustomUsecasePipeline
MemberDescription
protected readonly ActivitySource _activitySourceActivitySource used for creating Activities
StartCustomActivity(string operationName, ActivityKind kind)Creates custom Activity in {prefix}.{operationName} format
GetActivityName(string operationName)Gets Activity name ({prefix}.{operationName})
SetStandardRequestTags(Activity activity, string method)Sets 5 standard Request tags

Constructor:

protected UsecaseTracingCustomPipelineBase(ActivitySource activitySource)
  • Activity name prefix: {layer} {category}.{categoryType} {handler}
  • If a parent Activity.Current exists, it is created as a child Span.

PipelineConfigurator enables/disables individual Pipelines via Fluent API and adds custom Pipelines.

public class PipelineConfigurator
MethodReturn TypeDescription
UseObservability()PipelineConfiguratorBatch enable all 4 observability types (CtxEnricher, Metrics, Tracing, Logging)
UseMetrics()PipelineConfiguratorEnable Metrics Pipeline (CtxEnricher auto-included)
UseTracing()PipelineConfiguratorEnable Tracing Pipeline (CtxEnricher auto-included)
UseLogging()PipelineConfiguratorEnable Logging Pipeline (CtxEnricher auto-included)
UseValidation()PipelineConfiguratorEnable Validation Pipeline
UseCaching()PipelineConfiguratorEnable Caching Pipeline (IMemoryCache DI registration required)
UseException()PipelineConfiguratorEnable Exception Pipeline
UseTransaction()PipelineConfiguratorEnable Transaction Pipeline (IUnitOfWork, IDomainEventPublisher, IDomainEventCollector DI registration required)
MethodReturn TypeDescription
WithLifetime(ServiceLifetime lifetime)PipelineConfiguratorSet Pipeline service Lifetime (default: Scoped)
AddCustomPipeline<TPipeline>()PipelineConfiguratorExplicitly register custom Pipeline by type (deterministic guarantee of Pipeline execution order)
// Enable observability + validation + exception handling
services
.RegisterOpenTelemetry(configuration, Assembly.GetExecutingAssembly())
.ConfigurePipelines(pipelines => pipelines
.UseObservability() // Batch enable CtxEnricher, Metrics, Tracing, Logging
.UseValidation()
.UseException())
.Build();
// Selective enabling + custom Pipeline registration
services
.RegisterOpenTelemetry(configuration, Assembly.GetExecutingAssembly())
.ConfigurePipelines(pipelines => pipelines
.UseMetrics()
.UseTracing()
.UseLogging()
.UseValidation()
.UseException()
.UseTransaction()
.UseCaching()
.AddCustomPipeline<PlaceOrderCommandMetricPipeline>()
.WithLifetime(ServiceLifetime.Scoped))
.Build();

Inside Apply(), Pipelines are registered to IPipelineBehavior<,> in the following order.

  1. Metrics
  2. Tracing
  3. Logging
  4. Validation
  5. Caching
  6. Exception
  7. Transaction
  8. Custom (registered in the order AddCustomPipeline<T>() is called)
  9. Handler

Transaction Pipeline auto-detection: Even if UseTransaction() is called, registration is skipped if IUnitOfWork, IDomainEventPublisher, and IDomainEventCollector are not all registered in DI.


The main Builder class for OpenTelemetry configuration. Configures Serilog, Metrics, Tracing, and Pipeline settings via chaining.

public partial class OpenTelemetryBuilder
// IServiceCollection extension method
public static OpenTelemetryBuilder RegisterOpenTelemetry(
this IServiceCollection services,
IConfiguration configuration,
Assembly projectAssembly)
MethodReturn TypeDescription
ConfigureLogging(Action<LoggingConfigurator> configure)OpenTelemetryBuilderSerilog extension configuration
ConfigureMetrics(Action<MetricsConfigurator> configure)OpenTelemetryBuilderOpenTelemetry Metrics extension configuration
ConfigureTracing(Action<TracingConfigurator> configure)OpenTelemetryBuilderOpenTelemetry Tracing extension configuration
ConfigurePipelines(Action<PipelineConfigurator> configure)OpenTelemetryBuilderPipeline configuration (explicit opt-in required)
ConfigureStartupLogger(Action<ILogger> configure)OpenTelemetryBuilderAdditional logging configuration at startup
WithAdapterObservability(bool enable = true)OpenTelemetryBuilderEnable/disable Adapter observability (default: true)
Build()IServiceCollectionReturns IServiceCollection after applying all settings
  1. Read OpenTelemetryOptions (IOptions<OpenTelemetryOptions>)
  2. Create Resource Attributes
  3. Configure Serilog (ReadFrom.Configuration + WriteTo.OpenTelemetry + ErrorsDestructuringPolicy)
  4. Configure CtxEnricherContext PushProperty factory
  5. Configure OpenTelemetry (Metrics + Tracing + OTLP Exporter)
  6. Register Adapter Observability (ActivitySource, IMeterFactory)
  7. Register Usecase Pipeline
  8. Register StartupLogger IHostedService

A Builder class for Serilog extension configuration.

public class LoggingConfigurator
MemberDescription
OptionsOpenTelemetryOptions Access property
AddDestructuringPolicy<TPolicy>()IDestructuringPolicy Register implementation type
AddEnricher(ILogEventEnricher enricher)Register Enricher instance
AddEnricher<TEnricher>()Register Enricher type
Configure(Action<LoggerConfiguration> configure)LoggerConfiguration Direct access

A Builder class for OpenTelemetry Metrics extension configuration.

public class MetricsConfigurator
MemberDescription
OptionsOpenTelemetryOptions Access property
AddMeter(string meterName)Register additional Meter (wildcard supported: "MyApp.*")
AddInstrumentation(Action<MeterProviderBuilder> configure)Register additional Instrumentation
Configure(Action<MeterProviderBuilder> configure)MeterProviderBuilder Direct access

A Builder class for OpenTelemetry Tracing extension configuration.

public class TracingConfigurator
MemberDescription
OptionsOpenTelemetryOptions Access property
AddSource(string sourceName)Register additional ActivitySource (wildcard supported: "MyApp.*")
AddProcessor(BaseProcessor<Activity> processor)Register additional Processor
Configure(Action<TracerProviderBuilder> configure)TracerProviderBuilder Direct access

A configuration class bound to the "OpenTelemetry" section in appsettings.json.

public sealed class OpenTelemetryOptions : IStartupOptionsLogger, IOpenTelemetryOptions
PropertyTypeDefaultDescription
ServiceNamespacestring""Service namespace (group)
ServiceNamestring""Service name
ServiceVersionstring(assembly version)Service version (auto-configured)
ServiceInstanceIdstring(hostname)Service instance ID (auto-configured)
CollectorEndpointstring""Unified OTLP Collector endpoint
TracingEndpointstring?nullTracing-specific endpoint (uses CollectorEndpoint if null)
MetricsEndpointstring?nullMetrics-specific endpoint (uses CollectorEndpoint if null)
LoggingEndpointstring?nullLogging-specific endpoint (uses CollectorEndpoint if null)
CollectorProtocolstring"Grpc"Unified OTLP Protocol
TracingProtocolstring?nullTracing-specific Protocol
MetricsProtocolstring?nullMetrics-specific Protocol
LoggingProtocolstring?nullLogging-specific Protocol
SamplingRatedouble1.0Tracing sampling rate (0.0 ~ 1.0)
EnablePrometheusExporterboolfalseEnable Prometheus Exporter

The resolution rules for individual endpoints (TracingEndpoint, MetricsEndpoint, LoggingEndpoint) are as follows.

ValueBehavior
nullUses CollectorEndpoint (default behavior)
"" (empty string)Disables that signal
"http://..."Uses that endpoint
MethodReturn TypeDescription
GetTracingProtocol()OtlpCollectorProtocolTracing Protocol (individual setting takes priority)
GetMetricsProtocol()OtlpCollectorProtocolMetrics Protocol (individual setting takes priority)
GetLoggingProtocol()OtlpCollectorProtocolLogging Protocol (individual setting takes priority)
GetTracingEndpoint()stringTracing endpoint (resolution rules applied)
GetMetricsEndpoint()stringMetrics endpoint (resolution rules applied)
GetLoggingEndpoint()stringLogging endpoint (resolution rules applied)
public sealed class OtlpCollectorProtocol : SmartEnum<OtlpCollectorProtocol>
ConstantValueDescription
Grpc1gRPC protocol (default)
HttpProtobuf2HTTP/Protobuf protocol

A FluentValidation-based options validator.

RuleDescription
ServiceNamespaceRequired (NotEmpty)
ServiceNameRequired (NotEmpty)
EndpointsCollectorEndpoint or at least one individual endpoint required
SamplingRate0.0 ~ 1.0 Range
ProtocolSmartEnum valid value validation
{
"OpenTelemetry": {
"ServiceNamespace": "mycompany.production",
"ServiceName": "orderservice",
"CollectorEndpoint": "http://localhost:4317",
"CollectorProtocol": "Grpc",
"SamplingRate": 1.0,
"EnablePrometheusExporter": false
}
}

Defines unified naming conventions for observability. The single source of truth for metric names, tag keys, Span names, etc.

public static partial class ObservabilityNaming
ConstantValueDescription
Application"application"Application Layer
Adapter"adapter"Adapter Layer
ConstantValueDescription
Usecase"usecase"Usecase category
Repository"repository"Repository category
Event"event"Event category
Unknown"unknown"Unknown category
ConstantValueDescription
Command"command"Command type
Query"query"Query type
Event"event"Event type
Unknown"unknown"Unknown type
ConstantValueDescription
Success"success"Success
Failure"failure"Failure
ConstantValueDescription
Expected"expected"Expected business error (IsExpected = true)
Exceptional"exceptional"Exceptional system error (IsExceptional = true)
Aggregate"aggregate"Aggregate error (ManyErrors)
ConstantValueDescription
Handle"Handle"Usecase Handler method
Publish"Publish"Event publishing method
PublishTrackedEvents"PublishTrackedEvents"Tracked event publishing method
ConstantValue
ErrorType"error.type"
ServiceNamespace"service.namespace"
ServiceName"service.name"
ServiceVersion"service.version"
ServiceInstanceId"service.instance.id"
DeploymentEnvironment"deployment.environment"
ConstantValuePurpose
RequestMessage"request.message"Request message
RequestParams"request.params"Request parameters
RequestLayer"request.layer"Request layer
RequestCategoryName"request.category.name"Request category name
RequestCategoryType"request.category.type"Request category type
RequestHandlerName"request.handler.name"Request handler name
RequestHandlerMethod"request.handler.method"Request handler method
ResponseMessage"response.message"Response message
ResponseStatus"response.status"Response status
ResponseElapsed"response.elapsed"Response elapsed time
ErrorCode"error.code"Error code
MethodExample
Metrics.UsecaseRequest("command")"application.usecase.command.requests"
Metrics.UsecaseResponse("query")"application.usecase.query.responses"
Metrics.UsecaseDuration("command")"application.usecase.command.duration"
Metrics.AdapterRequest("repository")"adapter.repository.requests"
Metrics.AdapterResponse("repository")"adapter.repository.responses"
Metrics.AdapterDuration("repository")"adapter.repository.duration"
MethodExample
Spans.OperationName("adapter", "repository", "OrderRepository", "GetById")"adapter repository OrderRepository.GetById"
ScopeIDName
Application Request1001application.request
Application Response (Success)1002application.response.success
Application Response (Warning)1003application.response.warning
Application Response (Error)1004application.response.error
Adapter Request2001adapter.request
Adapter Response (Success)2002adapter.response.success
Adapter Response (Warning)2003adapter.response.warning
Adapter Response (Error)2004adapter.response.error