291 lines
10 KiB
PHP
291 lines
10 KiB
PHP
<?php
|
|
|
|
declare(strict_types=1);
|
|
|
|
namespace OpenTelemetry\SDK\Trace\SpanProcessor;
|
|
|
|
use function assert;
|
|
use function count;
|
|
use InvalidArgumentException;
|
|
use OpenTelemetry\API\Behavior\LogsMessagesTrait;
|
|
use OpenTelemetry\API\Metrics\MeterProviderInterface;
|
|
use OpenTelemetry\API\Metrics\ObserverInterface;
|
|
use OpenTelemetry\Context\Context;
|
|
use OpenTelemetry\Context\ContextInterface;
|
|
use OpenTelemetry\SDK\Common\Future\CancellationInterface;
|
|
use OpenTelemetry\SDK\Common\Time\ClockInterface;
|
|
use OpenTelemetry\SDK\Trace\ReadableSpanInterface;
|
|
use OpenTelemetry\SDK\Trace\ReadWriteSpanInterface;
|
|
use OpenTelemetry\SDK\Trace\SpanDataInterface;
|
|
use OpenTelemetry\SDK\Trace\SpanExporterInterface;
|
|
use OpenTelemetry\SDK\Trace\SpanProcessorInterface;
|
|
use SplQueue;
|
|
use function sprintf;
|
|
use Throwable;
|
|
|
|
class BatchSpanProcessor implements SpanProcessorInterface
|
|
{
|
|
use LogsMessagesTrait;
|
|
|
|
public const DEFAULT_SCHEDULE_DELAY = 5000;
|
|
public const DEFAULT_EXPORT_TIMEOUT = 30000;
|
|
public const DEFAULT_MAX_QUEUE_SIZE = 2048;
|
|
public const DEFAULT_MAX_EXPORT_BATCH_SIZE = 512;
|
|
|
|
private const ATTRIBUTES_PROCESSOR = ['processor' => 'batching'];
|
|
private const ATTRIBUTES_QUEUED = self::ATTRIBUTES_PROCESSOR + ['state' => 'queued'];
|
|
private const ATTRIBUTES_PENDING = self::ATTRIBUTES_PROCESSOR + ['state' => 'pending'];
|
|
private const ATTRIBUTES_PROCESSED = self::ATTRIBUTES_PROCESSOR + ['state' => 'processed'];
|
|
private const ATTRIBUTES_DROPPED = self::ATTRIBUTES_PROCESSOR + ['state' => 'dropped'];
|
|
private const ATTRIBUTES_FREE = self::ATTRIBUTES_PROCESSOR + ['state' => 'free'];
|
|
|
|
private SpanExporterInterface $exporter;
|
|
private ClockInterface $clock;
|
|
private int $maxQueueSize;
|
|
private int $scheduledDelayNanos;
|
|
private int $maxExportBatchSize;
|
|
private bool $autoFlush;
|
|
private ContextInterface $exportContext;
|
|
|
|
private ?int $nextScheduledRun = null;
|
|
private bool $running = false;
|
|
private int $dropped = 0;
|
|
private int $processed = 0;
|
|
private int $batchId = 0;
|
|
private int $queueSize = 0;
|
|
/** @var list<SpanDataInterface> */
|
|
private array $batch = [];
|
|
/** @var SplQueue<list<SpanDataInterface>> */
|
|
private SplQueue $queue;
|
|
/** @var SplQueue<array{int, string, ?CancellationInterface, bool, ContextInterface}> */
|
|
private SplQueue $flush;
|
|
|
|
private bool $closed = false;
|
|
|
|
public function __construct(
|
|
SpanExporterInterface $exporter,
|
|
ClockInterface $clock,
|
|
int $maxQueueSize = self::DEFAULT_MAX_QUEUE_SIZE,
|
|
int $scheduledDelayMillis = self::DEFAULT_SCHEDULE_DELAY,
|
|
int $exportTimeoutMillis = self::DEFAULT_EXPORT_TIMEOUT,
|
|
int $maxExportBatchSize = self::DEFAULT_MAX_EXPORT_BATCH_SIZE,
|
|
bool $autoFlush = true,
|
|
?MeterProviderInterface $meterProvider = null
|
|
) {
|
|
if ($maxQueueSize <= 0) {
|
|
throw new InvalidArgumentException(sprintf('Maximum queue size (%d) must be greater than zero', $maxQueueSize));
|
|
}
|
|
if ($scheduledDelayMillis <= 0) {
|
|
throw new InvalidArgumentException(sprintf('Scheduled delay (%d) must be greater than zero', $scheduledDelayMillis));
|
|
}
|
|
if ($exportTimeoutMillis <= 0) {
|
|
throw new InvalidArgumentException(sprintf('Export timeout (%d) must be greater than zero', $exportTimeoutMillis));
|
|
}
|
|
if ($maxExportBatchSize <= 0) {
|
|
throw new InvalidArgumentException(sprintf('Maximum export batch size (%d) must be greater than zero', $maxExportBatchSize));
|
|
}
|
|
if ($maxExportBatchSize > $maxQueueSize) {
|
|
throw new InvalidArgumentException(sprintf('Maximum export batch size (%d) must be less than or equal to maximum queue size (%d)', $maxExportBatchSize, $maxQueueSize));
|
|
}
|
|
|
|
$this->exporter = $exporter;
|
|
$this->clock = $clock;
|
|
$this->maxQueueSize = $maxQueueSize;
|
|
$this->scheduledDelayNanos = $scheduledDelayMillis * 1_000_000;
|
|
$this->maxExportBatchSize = $maxExportBatchSize;
|
|
$this->autoFlush = $autoFlush;
|
|
|
|
$this->exportContext = Context::getCurrent();
|
|
$this->queue = new SplQueue();
|
|
$this->flush = new SplQueue();
|
|
|
|
if ($meterProvider === null) {
|
|
return;
|
|
}
|
|
|
|
$meter = $meterProvider->getMeter('io.opentelemetry.sdk');
|
|
$meter
|
|
->createObservableUpDownCounter(
|
|
'otel.trace.span_processor.spans',
|
|
'{spans}',
|
|
'The number of sampled spans received by the span processor',
|
|
)
|
|
->observe(function (ObserverInterface $observer): void {
|
|
$queued = $this->queue->count() * $this->maxExportBatchSize + count($this->batch);
|
|
$pending = $this->queueSize - $queued;
|
|
$processed = $this->processed;
|
|
$dropped = $this->dropped;
|
|
|
|
$observer->observe($queued, self::ATTRIBUTES_QUEUED);
|
|
$observer->observe($pending, self::ATTRIBUTES_PENDING);
|
|
$observer->observe($processed, self::ATTRIBUTES_PROCESSED);
|
|
$observer->observe($dropped, self::ATTRIBUTES_DROPPED);
|
|
});
|
|
$meter
|
|
->createObservableUpDownCounter(
|
|
'otel.trace.span_processor.queue.limit',
|
|
'{spans}',
|
|
'The queue size limit',
|
|
)
|
|
->observe(function (ObserverInterface $observer): void {
|
|
$observer->observe($this->maxQueueSize, self::ATTRIBUTES_PROCESSOR);
|
|
});
|
|
$meter
|
|
->createObservableUpDownCounter(
|
|
'otel.trace.span_processor.queue.usage',
|
|
'{spans}',
|
|
'The current queue usage',
|
|
)
|
|
->observe(function (ObserverInterface $observer): void {
|
|
$queued = $this->queue->count() * $this->maxExportBatchSize + count($this->batch);
|
|
$pending = $this->queueSize - $queued;
|
|
$free = $this->maxQueueSize - $this->queueSize;
|
|
|
|
$observer->observe($queued, self::ATTRIBUTES_QUEUED);
|
|
$observer->observe($pending, self::ATTRIBUTES_PENDING);
|
|
$observer->observe($free, self::ATTRIBUTES_FREE);
|
|
});
|
|
}
|
|
|
|
public function onStart(ReadWriteSpanInterface $span, ContextInterface $parentContext): void
|
|
{
|
|
}
|
|
|
|
public function onEnd(ReadableSpanInterface $span): void
|
|
{
|
|
if ($this->closed) {
|
|
return;
|
|
}
|
|
if (!$span->getContext()->isSampled()) {
|
|
return;
|
|
}
|
|
|
|
if ($this->queueSize === $this->maxQueueSize) {
|
|
$this->dropped++;
|
|
|
|
return;
|
|
}
|
|
|
|
$this->queueSize++;
|
|
$this->batch[] = $span->toSpanData();
|
|
$this->nextScheduledRun ??= $this->clock->now() + $this->scheduledDelayNanos;
|
|
|
|
if (count($this->batch) === $this->maxExportBatchSize) {
|
|
$this->enqueueBatch();
|
|
}
|
|
if ($this->autoFlush) {
|
|
$this->flush();
|
|
}
|
|
}
|
|
|
|
public function forceFlush(?CancellationInterface $cancellation = null): bool
|
|
{
|
|
if ($this->closed) {
|
|
return false;
|
|
}
|
|
|
|
return $this->flush(__FUNCTION__, $cancellation);
|
|
}
|
|
|
|
public function shutdown(?CancellationInterface $cancellation = null): bool
|
|
{
|
|
if ($this->closed) {
|
|
return false;
|
|
}
|
|
|
|
$this->closed = true;
|
|
|
|
return $this->flush(__FUNCTION__, $cancellation);
|
|
}
|
|
|
|
public static function builder(SpanExporterInterface $exporter): BatchSpanProcessorBuilder
|
|
{
|
|
return new BatchSpanProcessorBuilder($exporter);
|
|
}
|
|
|
|
private function flush(?string $flushMethod = null, ?CancellationInterface $cancellation = null): bool
|
|
{
|
|
if ($flushMethod !== null) {
|
|
$flushId = $this->batchId + $this->queue->count() + (int) (bool) $this->batch;
|
|
$this->flush->enqueue([$flushId, $flushMethod, $cancellation, !$this->running, Context::getCurrent()]);
|
|
}
|
|
|
|
if ($this->running) {
|
|
return false;
|
|
}
|
|
|
|
$success = true;
|
|
$exception = null;
|
|
$this->running = true;
|
|
|
|
try {
|
|
for (;;) {
|
|
while (!$this->flush->isEmpty() && $this->flush->bottom()[0] <= $this->batchId) {
|
|
[, $flushMethod, $cancellation, $propagateResult, $context] = $this->flush->dequeue();
|
|
$scope = $context->activate();
|
|
|
|
try {
|
|
$result = $this->exporter->$flushMethod($cancellation);
|
|
if ($propagateResult) {
|
|
$success = $result;
|
|
}
|
|
} catch (Throwable $e) {
|
|
if ($propagateResult) {
|
|
$exception = $e;
|
|
} else {
|
|
self::logError(sprintf('Unhandled %s error', $flushMethod), ['exception' => $e]);
|
|
}
|
|
} finally {
|
|
$scope->detach();
|
|
}
|
|
}
|
|
|
|
if (!$this->shouldFlush()) {
|
|
break;
|
|
}
|
|
|
|
if ($this->queue->isEmpty()) {
|
|
$this->enqueueBatch();
|
|
}
|
|
$batchSize = count($this->queue->bottom());
|
|
$this->batchId++;
|
|
$scope = $this->exportContext->activate();
|
|
|
|
try {
|
|
$this->exporter->export($this->queue->dequeue())->await();
|
|
} catch (Throwable $e) {
|
|
self::logError('Unhandled export error', ['exception' => $e]);
|
|
} finally {
|
|
$this->processed += $batchSize;
|
|
$this->queueSize -= $batchSize;
|
|
$scope->detach();
|
|
}
|
|
}
|
|
} finally {
|
|
$this->running = false;
|
|
}
|
|
|
|
if ($exception !== null) {
|
|
throw $exception;
|
|
}
|
|
|
|
return $success;
|
|
}
|
|
|
|
private function shouldFlush(): bool
|
|
{
|
|
return !$this->flush->isEmpty()
|
|
|| $this->autoFlush && !$this->queue->isEmpty()
|
|
|| $this->autoFlush && $this->nextScheduledRun !== null && $this->clock->now() > $this->nextScheduledRun;
|
|
}
|
|
|
|
private function enqueueBatch(): void
|
|
{
|
|
assert($this->batch !== []);
|
|
|
|
$this->queue->enqueue($this->batch);
|
|
$this->batch = [];
|
|
$this->nextScheduledRun = null;
|
|
}
|
|
}
|