본문으로 건너뛰기

관측성 구현 결과

[GenerateObservablePort] Source Generator가 생성한 Observable 래퍼 목록입니다.

원본 클래스Observable 래퍼포트
AIModelRepositoryInMemoryAIModelRepositoryInMemoryObservableIAIModelRepository
DeploymentRepositoryInMemoryDeploymentRepositoryInMemoryObservableIDeploymentRepository
AssessmentRepositoryInMemoryAssessmentRepositoryInMemoryObservableIAssessmentRepository
IncidentRepositoryInMemoryIncidentRepositoryInMemoryObservableIIncidentRepository
UnitOfWorkInMemoryUnitOfWorkInMemoryObservableIUnitOfWork
원본 클래스Observable 래퍼포트
AIModelRepositoryEfCoreAIModelRepositoryEfCoreObservableIAIModelRepository
DeploymentRepositoryEfCoreDeploymentRepositoryEfCoreObservableIDeploymentRepository
AssessmentRepositoryEfCoreAssessmentRepositoryEfCoreObservableIAssessmentRepository
IncidentRepositoryEfCoreIncidentRepositoryEfCoreObservableIIncidentRepository
UnitOfWorkEfCoreUnitOfWorkEfCoreObservableIUnitOfWork
원본 클래스Observable 래퍼포트
AIModelQueryInMemoryAIModelQueryInMemoryObservableIAIModelQuery
AIModelDetailQueryInMemoryAIModelDetailQueryInMemoryObservableIModelDetailQuery
DeploymentQueryInMemoryDeploymentQueryInMemoryObservableIDeploymentQuery
DeploymentDetailQueryInMemoryDeploymentDetailQueryInMemoryObservableIDeploymentDetailQuery
IncidentQueryInMemoryIncidentQueryInMemoryObservableIIncidentQuery
원본 클래스IO 패턴포트
ModelHealthCheckServiceTimeout + CatchIModelHealthCheckService
ModelMonitoringServiceRetry + ScheduleIModelMonitoringService
ParallelComplianceCheckServiceFork + awaitAllIParallelComplianceCheckService
ModelRegistryServiceBracketIModelRegistryService

Observable Port 합계: 19개 (InMemory Repository 5 + EfCore Repository 5 + Query 5 + External Service 4)


Functorium Pipeline의 UseCtxEnricher() 미들웨어가 Command/Query Request와 DomainEvent의 프로퍼티를 자동으로 ctx.* 필드로 전파합니다.

Commandctx.* 필드Pillar
RegisterModelCommandctx.register_model_command.request.nameDefault(L+T)
ctx.register_model_command.request.versionDefault(L+T)
ctx.register_model_command.request.purposeLogging
CreateDeploymentCommandctx.create_deployment_command.request.model_idDefault(L+T)
ctx.create_deployment_command.request.environmentAll(L+T+M)
ctx.create_deployment_command.request.drift_thresholdDefault+MetricsValue
ReportIncidentCommandctx.report_incident_command.request.severityAll(L+T+M)
ctx.report_incident_command.request.descriptionLogging
Eventctx.* 필드Pillar
AIModel.RiskClassifiedEventctx.risk_classified_event.model_idDefault(L+T)
ctx.risk_classified_event.new_risk_tierDefault(L+T)
ModelIncident.ReportedEventctx.reported_event.severityAll(L+T+M)
ctx.reported_event.deployment_idDefault(L+T)

services
.RegisterOpenTelemetry(configuration, AssemblyReference.Assembly)
.ConfigurePipelines(pipelines => pipelines
.UseObservability() // CtxEnricher + Metrics + Tracing + Logging 일괄 활성화
.UseValidation()
.UseException())
.Build();

UseObservability()는 관측성 4종을 일괄 활성화합니다. 나머지 파이프라인은 명시적 opt-in으로 등록합니다:

순서미들웨어수집 대상출력
1UseObservability() → MetricsCounter(requests, responses), Histogram(duration)Prometheus / OTLP
2UseObservability() → TracingActivity Span (진입/종료/태그)Jaeger / OTLP
3UseObservability() → CtxEnricherRequest/Response/Event 프로퍼티LogContext + Activity.SetTag + MetricsTagContext
4UseObservability() → Logging구조화 로그 (Serilog)Console / OTLP
5UseValidation()FluentValidation 기반 요청 검증Validation Error
6UseException()예외 -> DomainError/AdapterError 변환error.type, error.code 태그
services.AddMediator(options =>
{
options.ServiceLifetime = ServiceLifetime.Scoped;
options.NotificationPublisherType =
typeof(ObservableDomainEventNotificationPublisher);
});
services.RegisterDomainEventPublisher();
services.RegisterDomainEventHandlersFromAssembly(
AiGovernance.Application.AssemblyReference.Assembly);

ObservableDomainEventNotificationPublisher는 DomainEvent 발행 시 자동으로 메트릭, 트레이싱, 로깅을 수행합니다. EventHandler 관측성은 Mediator의 NotificationPublisherType으로 제공됩니다.


4종 외부 서비스의 IO 패턴과 관측성 통합 현황입니다.

IO.liftAsync → .Timeout(10s) → .Catch(TimedOut → fallback) → .Catch(Exceptional → error)
[GenerateObservablePort] → Observable 래퍼 자동 생성
adapter.external_service.requests (Counter)
adapter.external_service.duration (Histogram)
adapter.external_service.responses (Counter + error.type)
IO.liftAsync → .Retry(exponential|jitter|recurs|maxDelay) → .Catch(Exceptional → error)
Observable 래퍼: 최종 결과만 관측
재시도 시도 횟수는 내부 로깅으로 추적

ParallelComplianceCheckService (Fork + awaitAll)

섹션 제목: “ParallelComplianceCheckService (Fork + awaitAll)”
CriterionNames.Map(name => CheckSingle(name).Fork()) → awaitAll(forks) → .Map(aggregate)
Observable 래퍼: 전체 병렬 체크 결과를 하나의 호출로 관측
개별 기준 체크는 Fork 내부에서 실행 (개별 관측 안함)
acquireSession.Bracket(Use: lookupModel, Fin: releaseSession) → .Catch(Exceptional → error)
Observable 래퍼: Bracket 전체를 하나의 호출로 관측
Acquire/Use/Release 세부 단계는 내부 로깅으로 추적

항목수량
Observable Port (InMemory Repository)5
Observable Port (EfCore Repository)5
Observable Port (Query)5
Observable Port (External Service)4
Observable Port 합계19
Pipeline 미들웨어5 (Metrics, Tracing, CtxEnricher, Logging, Exception)
DomainEvent 관측성ObservableDomainEventNotificationPublisher
ctx.* MetricsTag 필드2 (Environment, Severity)
ctx.* MetricsValue 필드1 (DriftThreshold)
ctx.* Default(L+T) 필드8+ (ModelId, DeploymentId, Name, Version 등)
ctx.* Logging only 필드2 (Purpose, Description)