Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions documentation/components/bridges/symfony-telemetry-bundle.md
Original file line number Diff line number Diff line change
Expand Up @@ -1024,8 +1024,22 @@ flow_telemetry:
messenger:
enabled: true
context_propagation: true # Propagate context across message boundaries
propagation_style: link # How the consumer span relates to the producer span
```

When `context_propagation` is enabled, `propagation_style` controls how a consumed message's span
relates to the producing (publishing) span:

- `link` (default) — the consumer span stays in the worker's own trace (under the `messenger:consume`
console span) and carries a span link back to the producer span. Producer and consumer get separate,
clean traces connected by a link. Recommended for decoupled, batch, or long-delay queues, where
continuing the trace would otherwise absorb the entire queue wait into a single span's duration.
- `continue` — the consumer span adopts the producer's trace and becomes its child, so
publish → queue → consume is one continuous distributed trace. Fine for fast, 1:1 processing.

`propagation_style` has no effect when `context_propagation` is `false` (there is nothing to relate to).
The producer side is identical in both modes — the telemetry stamp is always written on dispatch.

#### Twig

Traces Twig template rendering.
Expand Down Expand Up @@ -1580,6 +1594,7 @@ flow_telemetry:
messenger:
enabled: true
context_propagation: true
propagation_style: link
dbal:
enabled: true
log_sql: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use Flow\Bridge\Symfony\TelemetryBundle\DependencyInjection\Compiler\ProfilerSignalCapturePass;
use Flow\Bridge\Symfony\TelemetryBundle\DependencyInjection\Compiler\Psr18ClientTelemetryPass;
use Flow\Bridge\Symfony\TelemetryBundle\Exception\RuntimeException;
use Flow\Bridge\Symfony\TelemetryBundle\Instrumentation\Messenger\MessengerTracePropagation;
use Flow\Bridge\Symfony\TelemetryBundle\Resource\Detector\SymfonyDeploymentDetector;
use Flow\Bridge\Telemetry\OTLP\Exporter\OTLPExporter;
use Flow\Bridge\Telemetry\OTLP\Serializer\JsonSerializer;
Expand Down Expand Up @@ -497,6 +498,13 @@ public function configure(DefinitionConfigurator $definition): void
->info('Enable context propagation across message boundaries (requires propagator)')
->defaultTrue()
->end()
->enumNode('propagation_style')
->info('When context propagation is enabled, how the consumer span relates to the producer span: '
. '"link" (default) keeps the consumer in the worker\'s own trace and links back to the producer; '
. '"continue" makes the consumer a child in the producer\'s trace.')
->values(['continue', 'link'])
->defaultValue('link')
->end()
->end()
->end()
->arrayNode('twig')
Expand Down Expand Up @@ -707,7 +715,7 @@ public function configure(DefinitionConfigurator $definition): void
}

/**
* @param array{resource: array{detectors?: array{enabled?: bool, static?: array{cache?: array{enabled?: bool, path?: null|string}, os?: array{enabled?: bool}, host?: array{enabled?: bool}, service?: array{enabled?: bool}, deployment?: array{enabled?: bool}, environment?: array{enabled?: bool}}, dynamic?: array{process?: array{enabled?: bool}}}, custom?: array<string, mixed>}, clock_service_id?: null|string, framework_logger?: null|string, capture_framework_channels?: bool, channel_attribute_target?: 'scope'|'signal'|'both', context_storage?: array{type?: string, service_id?: null|string}, propagator?: array{type?: string, service_id?: null|string}, exporters?: array<string, array<string, mixed>>, error_handlers?: array<string, array<string, mixed>>, tracer_provider?: array<string, mixed>, meter_provider?: array<string, mixed>, logger_provider?: array<string, mixed>, instrumentation?: array{http_kernel?: array{enabled?: bool, exclude_paths?: array<array{path: string, method?: null|string}>, context_propagation?: bool}, console?: array{enabled?: bool, exclude_commands?: array<string>}, messenger?: array{enabled?: bool, context_propagation?: bool}, twig?: array{enabled?: bool, trace_templates?: bool, trace_blocks?: bool, trace_macros?: bool, exclude_templates?: array<string>}, http_client?: array{enabled?: bool, exclude_clients?: array<string>}, psr18_client?: array{enabled?: bool, exclude_clients?: array<string>}, dbal?: array{enabled?: bool, log_sql?: bool, max_sql_length?: int, exclude_connections?: array<string>}, cache?: array{enabled?: bool, exclude_pools?: array<string>}}, profiler?: array{enabled?: bool|null, capture_logs?: bool}, tracers?: array<string, array{version?: string, schema_url?: null|string, attributes?: array{scope?: array<string, mixed>, signal?: array<string, mixed>}}>, meters?: array<string, array{version?: string, schema_url?: null|string, attributes?: array{scope?: array<string, mixed>, signal?: array<string, mixed>}}>, loggers?: array<string, array{version?: string, schema_url?: null|string, attributes?: array{scope?: array<string, mixed>, signal?: array<string, mixed>}}>} $config
* @param array{resource: array{detectors?: array{enabled?: bool, static?: array{cache?: array{enabled?: bool, path?: null|string}, os?: array{enabled?: bool}, host?: array{enabled?: bool}, service?: array{enabled?: bool}, deployment?: array{enabled?: bool}, environment?: array{enabled?: bool}}, dynamic?: array{process?: array{enabled?: bool}}}, custom?: array<string, mixed>}, clock_service_id?: null|string, framework_logger?: null|string, capture_framework_channels?: bool, channel_attribute_target?: 'scope'|'signal'|'both', context_storage?: array{type?: string, service_id?: null|string}, propagator?: array{type?: string, service_id?: null|string}, exporters?: array<string, array<string, mixed>>, error_handlers?: array<string, array<string, mixed>>, tracer_provider?: array<string, mixed>, meter_provider?: array<string, mixed>, logger_provider?: array<string, mixed>, instrumentation?: array{http_kernel?: array{enabled?: bool, exclude_paths?: array<array{path: string, method?: null|string}>, context_propagation?: bool}, console?: array{enabled?: bool, exclude_commands?: array<string>}, messenger?: array{enabled?: bool, context_propagation?: bool, propagation_style?: 'continue'|'link'}, twig?: array{enabled?: bool, trace_templates?: bool, trace_blocks?: bool, trace_macros?: bool, exclude_templates?: array<string>}, http_client?: array{enabled?: bool, exclude_clients?: array<string>}, psr18_client?: array{enabled?: bool, exclude_clients?: array<string>}, dbal?: array{enabled?: bool, log_sql?: bool, max_sql_length?: int, exclude_connections?: array<string>}, cache?: array{enabled?: bool, exclude_pools?: array<string>}}, profiler?: array{enabled?: bool|null, capture_logs?: bool}, tracers?: array<string, array{version?: string, schema_url?: null|string, attributes?: array{scope?: array<string, mixed>, signal?: array<string, mixed>}}>, meters?: array<string, array{version?: string, schema_url?: null|string, attributes?: array{scope?: array<string, mixed>, signal?: array<string, mixed>}}>, loggers?: array<string, array{version?: string, schema_url?: null|string, attributes?: array{scope?: array<string, mixed>, signal?: array<string, mixed>}}>} $config
*/
#[Override]
public function loadExtension(array $config, ContainerConfigurator $container, ContainerBuilder $builder): void
Expand Down Expand Up @@ -2526,7 +2534,7 @@ private function registerGlobalServices(array $config, ContainerBuilder $builder
}

/**
* @param array{http_kernel?: array{enabled?: bool, exclude_routes?: array<string>, exclude_paths?: array<array{path: string, method?: null|string}>, context_propagation?: bool}, console?: array{enabled?: bool, exclude_commands?: array<string>}, messenger?: array{enabled?: bool, context_propagation?: bool}, twig?: array{enabled?: bool, trace_templates?: bool, trace_blocks?: bool, trace_macros?: bool, exclude_templates?: array<string>}, http_client?: array{enabled?: bool, exclude_clients?: array<string>}, psr18_client?: array{enabled?: bool, exclude_clients?: array<string>}, dbal?: array{enabled?: bool, log_sql?: bool, max_sql_length?: int, exclude_connections?: array<string>}, cache?: array{enabled?: bool, exclude_pools?: array<string>}} $config
* @param array{http_kernel?: array{enabled?: bool, exclude_routes?: array<string>, exclude_paths?: array<array{path: string, method?: null|string}>, context_propagation?: bool}, console?: array{enabled?: bool, exclude_commands?: array<string>}, messenger?: array{enabled?: bool, context_propagation?: bool, propagation_style?: 'continue'|'link'}, twig?: array{enabled?: bool, trace_templates?: bool, trace_blocks?: bool, trace_macros?: bool, exclude_templates?: array<string>}, http_client?: array{enabled?: bool, exclude_clients?: array<string>}, psr18_client?: array{enabled?: bool, exclude_clients?: array<string>}, dbal?: array{enabled?: bool, log_sql?: bool, max_sql_length?: int, exclude_connections?: array<string>}, cache?: array{enabled?: bool, exclude_pools?: array<string>}} $config
*/
private function registerInstrumentation(array $config, ContainerConfigurator $container, ContainerBuilder $builder): void
{
Expand Down Expand Up @@ -2569,6 +2577,7 @@ private function registerInstrumentation(array $config, ContainerConfigurator $c
$definition = $builder->getDefinition('flow.telemetry.messenger.middleware');
$definition->setArgument(1, new Reference('flow.telemetry.context_storage'));
$definition->setArgument(2, new Reference('flow.telemetry.propagator'));
$definition->setArgument(3, MessengerTracePropagation::from($messengerConfig['propagation_style'] ?? 'link'));
}
}

Expand Down Expand Up @@ -2884,7 +2893,7 @@ private function registerNamedExporters(array $config, ContainerBuilder $builder
}

/**
* @param array{http_kernel?: array{enabled?: bool, exclude_routes?: array<string>, exclude_paths?: array<array{path: string, method?: null|string}>, context_propagation?: bool}, console?: array{enabled?: bool, exclude_commands?: array<string>}, messenger?: array{enabled?: bool, context_propagation?: bool}, twig?: array{enabled?: bool, trace_templates?: bool, trace_blocks?: bool, trace_macros?: bool, exclude_templates?: array<string>}, http_client?: array{enabled?: bool, exclude_clients?: array<string>}, psr18_client?: array{enabled?: bool, exclude_clients?: array<string>}, dbal?: array{enabled?: bool, log_sql?: bool, max_sql_length?: int, exclude_connections?: array<string>}, cache?: array{enabled?: bool, exclude_pools?: array<string>}} $config
* @param array{http_kernel?: array{enabled?: bool, exclude_routes?: array<string>, exclude_paths?: array<array{path: string, method?: null|string}>, context_propagation?: bool}, console?: array{enabled?: bool, exclude_commands?: array<string>}, messenger?: array{enabled?: bool, context_propagation?: bool, propagation_style?: 'continue'|'link'}, twig?: array{enabled?: bool, trace_templates?: bool, trace_blocks?: bool, trace_macros?: bool, exclude_templates?: array<string>}, http_client?: array{enabled?: bool, exclude_clients?: array<string>}, psr18_client?: array{enabled?: bool, exclude_clients?: array<string>}, dbal?: array{enabled?: bool, log_sql?: bool, max_sql_length?: int, exclude_connections?: array<string>}, cache?: array{enabled?: bool, exclude_pools?: array<string>}} $config
*/
private function registerParameterOnlyInstrumentation(array $config, ContainerBuilder $builder): void
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?php

declare(strict_types=1);

namespace Flow\Bridge\Symfony\TelemetryBundle\Instrumentation\Messenger;

/**
* How a consumed message's span relates to the producing (publishing) span when
* trace context is propagated across the Messenger transport.
*/
enum MessengerTracePropagation: string
{
/**
* The consumer span adopts the producer's trace and becomes its child, so
* publish -> queue -> consume is one continuous distributed trace.
*/
case Continuation = 'continue';

/**
* The consumer span stays in the worker's own trace and carries a span link
* back to the producer span. Producer and consumer get separate, clean traces
* connected by a link (recommended for decoupled / batch / long-delay queues).
*/
case Link = 'link';
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
use Flow\Telemetry\Telemetry;
use Flow\Telemetry\Tracer\SpanContext;
use Flow\Telemetry\Tracer\SpanKind;
use Flow\Telemetry\Tracer\SpanLink;
use Flow\Telemetry\Tracer\SpanStatus;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
Expand All @@ -31,6 +32,7 @@ public function __construct(
private Telemetry $telemetry,
private ?ContextStorage $contextStorage = null,
private ?Propagator $propagator = null,
private MessengerTracePropagation $propagation = MessengerTracePropagation::Link,
) {}

public function handle(Envelope $envelope, StackInterface $stack): Envelope
Expand All @@ -47,10 +49,6 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope

$isReceived = $receivedStamp !== null;

if ($isReceived) {
$this->extractContext($envelope);
}

$kind = $isReceived ? SpanKind::CONSUMER : SpanKind::PRODUCER;
$operation = $isReceived ? 'receive' : 'send';

Expand All @@ -73,7 +71,34 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
$attributes['messaging.message.id'] = (string) $transportIdStamp->getId();
}

$span = $tracer->span($spanName, $kind, $attributes);
$remote = $isReceived ? $this->extractRemoteContext($envelope) : null;
$links = [];

if ($remote !== null && $remote->spanContext !== null) {
$remoteSpanContext = $remote->spanContext;
$remoteBaggage = $remote->baggage;

if ($this->propagation === MessengerTracePropagation::Continuation) {
$context = Context::withTraceId($remoteSpanContext->traceId)->withActiveSpan($remoteSpanContext->spanId);

if ($remoteBaggage !== null) {
$context = $context->withBaggage($remoteBaggage);
}

$this->contextStorage?->attach($context);
} else {
$links[] = SpanLink::create(
SpanContext::createRemote($remoteSpanContext->traceId, $remoteSpanContext->spanId),
['messaging.operation.type' => 'process'],
);

if ($remoteBaggage !== null && $this->contextStorage !== null) {
$this->contextStorage->attach($this->contextStorage->current()->withBaggage($remoteBaggage));
}
}
}

$span = $tracer->span($spanName, $kind, $attributes, $links);

if (!$isReceived) {
$envelope = $this->injectContext($envelope);
Expand All @@ -94,33 +119,19 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
}
}

private function extractContext(Envelope $envelope): void
private function extractRemoteContext(Envelope $envelope): ?PropagationContext
{
if ($this->contextStorage === null || $this->propagator === null) {
return;
return null;
}

$stamp = $envelope->last(TelemetryStamp::class);

if (!$stamp instanceof TelemetryStamp) {
return;
return null;
}

$carrier = new TelemetryStampCarrier($stamp);
$propagationContext = $this->propagator->extract($carrier);

$spanContext = $propagationContext->spanContext;

if ($spanContext !== null) {
$context = Context::withTraceId($spanContext->traceId);
$context = $context->withActiveSpan($spanContext->spanId);

if ($propagationContext->baggage !== null) {
$context = $context->withBaggage($propagationContext->baggage);
}

$this->contextStorage->attach($context);
}
return $this->propagator->extract(new TelemetryStampCarrier($stamp));
}

private function getShortClassName(string $className): string
Expand Down
Loading
Loading