From 88b175d0823185cea18a9e820ea6542abffcc314 Mon Sep 17 00:00:00 2001 From: Haiyuan Cao Date: Wed, 1 Jul 2026 12:04:20 -0700 Subject: [PATCH 1/4] BQAA (Java): preview-readiness fixes (redaction, table bootstrap, drop stats, reliability) Addresses google/adk-java#1316. Highlights: - P1: secret redaction in JsonFormatter; empty-STATE_DELTA guard; DAY time-partitioning + retry-after-failure table bootstrap; root_agent_name initialization; no-current-agent guard. - P2: drop-row accounting via getDropStats(); StreamWriter regional routing; JVM shutdown-hook drain; schema auto-upgrade always diffs (+ drift warning). - P3: config numeric validation; view-identifier validation; truncation depth guard; hot-path INFO->FINE; EventData license header + Javadoc fix. Not built/tested locally; unit tests still to add. Deep-semantic parity items (span-tree-under-OTel, full ADK envelope, long-running tool resume, off-thread first-event ensure) intentionally deferred; see PR description. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../agentanalytics/BatchProcessor.java | 22 ++++ .../BigQueryAgentAnalyticsPlugin.java | 121 +++++++++++++++--- .../agentanalytics/BigQueryLoggerConfig.java | 18 ++- .../plugins/agentanalytics/BigQueryUtils.java | 50 ++++++-- .../adk/plugins/agentanalytics/EventData.java | 18 ++- .../plugins/agentanalytics/JsonFormatter.java | 42 +++++- .../plugins/agentanalytics/PluginState.java | 44 ++++++- .../plugins/agentanalytics/TraceManager.java | 28 +++- 8 files changed, 295 insertions(+), 48 deletions(-) diff --git a/core/src/main/java/com/google/adk/plugins/agentanalytics/BatchProcessor.java b/core/src/main/java/com/google/adk/plugins/agentanalytics/BatchProcessor.java index 68d3f4c6d..8278e0ad6 100644 --- a/core/src/main/java/com/google/adk/plugins/agentanalytics/BatchProcessor.java +++ b/core/src/main/java/com/google/adk/plugins/agentanalytics/BatchProcessor.java @@ -28,6 +28,7 @@ import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializationError; import com.google.cloud.bigquery.storage.v1.StreamWriter; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; @@ -37,6 +38,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; import org.apache.arrow.memory.BufferAllocator; @@ -68,6 +70,11 @@ class BatchProcessor implements AutoCloseable { private final Schema arrowSchema; private final VectorSchemaRoot root; + // Drop accounting so hosts can programmatically detect lost analytics rows. + private final AtomicLong droppedQueueFull = new AtomicLong(); + private final AtomicLong droppedAppendError = new AtomicLong(); + private final AtomicLong droppedSerializationError = new AtomicLong(); + public BatchProcessor( StreamWriter writer, int batchSize, @@ -105,6 +112,7 @@ public void start() { public void append(Map row) { if (!queue.offer(row)) { + droppedQueueFull.incrementAndGet(); logger.warning("BigQuery event queue is full, dropping event."); return; } @@ -139,6 +147,7 @@ public void flush() { try (ArrowRecordBatch recordBatch = new VectorUnloader(root).getRecordBatch()) { AppendRowsResponse result = writer.append(recordBatch).get(); if (result.hasError()) { + droppedAppendError.addAndGet(batch.size()); logger.severe("BigQuery append error: " + result.getError().getMessage()); for (var error : result.getRowErrorsList()) { logger.severe( @@ -153,6 +162,7 @@ public void flush() { Thread.currentThread().interrupt(); } if (e.getCause() instanceof AppendSerializationError ase) { + droppedSerializationError.addAndGet(batch.size()); logger.log( Level.SEVERE, "Failed to write batch to BigQuery due to serialization error", ase); Map rowIndexToErrorMessage = ase.getRowIndexToErrorMessage(); @@ -167,6 +177,7 @@ public void flush() { "AppendSerializationError occurred, but no row-specific errors were provided."); } } else { + droppedAppendError.addAndGet(batch.size()); logger.log(Level.SEVERE, "Failed to write batch to BigQuery", e); } } finally { @@ -247,6 +258,17 @@ private void populateVector(FieldVector vector, int index, Object value) { } } + /** + * Returns a snapshot of dropped-row counters keyed by reason ({@code queue_full}, {@code + * append_error}, {@code serialization_error}). Non-zero values indicate lost analytics rows. + */ + ImmutableMap getDropStats() { + return ImmutableMap.of( + "queue_full", droppedQueueFull.get(), + "append_error", droppedAppendError.get(), + "serialization_error", droppedSerializationError.get()); + } + @Override public void close() { while (this.queue != null && !this.queue.isEmpty()) { diff --git a/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPlugin.java b/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPlugin.java index 5775d6d4d..a0cbe2275 100644 --- a/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPlugin.java +++ b/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPlugin.java @@ -51,6 +51,7 @@ import com.google.cloud.bigquery.Table; import com.google.cloud.bigquery.TableId; import com.google.cloud.bigquery.TableInfo; +import com.google.cloud.bigquery.TimePartitioning; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -65,10 +66,12 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import org.jspecify.annotations.Nullable; @@ -105,6 +108,9 @@ public BigQueryAgentAnalyticsPlugin(BigQueryLoggerConfig config) throws IOExcept public BigQueryAgentAnalyticsPlugin(BigQueryLoggerConfig config, BigQuery bigQuery) throws IOException { this(config, bigQuery, new PluginState(config)); + // Register on the public construction paths only (not the package-private test constructor), + // so a host that never calls close() still gets a best-effort drain at JVM exit. + registerShutdownHook(); } BigQueryAgentAnalyticsPlugin(BigQueryLoggerConfig config, BigQuery bigQuery, PluginState state) { @@ -114,6 +120,31 @@ public BigQueryAgentAnalyticsPlugin(BigQueryLoggerConfig config, BigQuery bigQue this.state = state; } + private void registerShutdownHook() { + Runtime.getRuntime() + .addShutdownHook( + new Thread( + () -> { + try { + state + .close() + .blockingAwait(config.shutdownTimeout().toMillis(), TimeUnit.MILLISECONDS); + } catch (RuntimeException e) { + logger.log(Level.WARNING, "Error draining BQAA analytics on JVM shutdown", e); + } + }, + "bq-analytics-shutdown")); + } + + /** + * Returns aggregated dropped-row counters keyed by reason ({@code queue_full}, {@code + * append_error}, {@code serialization_error}). Non-zero values indicate analytics rows that never + * reached BigQuery. + */ + public ImmutableMap getDropStats() { + return state.getDropStats(); + } + private static BigQuery createBigQuery(BigQueryLoggerConfig config) throws IOException { BigQueryOptions.Builder builder = BigQueryOptions.newBuilder(); builder.setHeaderProvider( @@ -133,24 +164,36 @@ private void ensureTableExistsOnce() { if (!tableEnsured) { synchronized (tableEnsuredLock) { if (!tableEnsured) { - // Table creation is expensive, so we only do it once per plugin instance. - tableEnsured = true; - ensureTableExists(bigQuery, config); + // Only mark the table as ensured after a successful setup, so a transient first-run + // failure (auth blip, missing dataset, quota) is retried on subsequent events instead + // of permanently disabling table creation/upgrade for this plugin instance. + if (ensureTableExists(bigQuery, config)) { + tableEnsured = true; + } } } } } - private void ensureTableExists(BigQuery bigQuery, BigQueryLoggerConfig config) { + /** Returns true if the events table is present (created or already existed) and ready. */ + private boolean ensureTableExists(BigQuery bigQuery, BigQueryLoggerConfig config) { TableId tableId = TableId.of(config.projectId(), config.datasetId(), config.tableName()); Schema schema = BigQuerySchema.getEventsSchema(); + boolean tableReady = false; try { Table table = bigQuery.getTable(tableId); - logger.info("BigQuery table: " + tableId); + logger.fine("BigQuery table: " + tableId); if (table == null) { logger.info("Creating BigQuery table: " + tableId); StandardTableDefinition.Builder tableDefinitionBuilder = - StandardTableDefinition.newBuilder().setSchema(schema); + StandardTableDefinition.newBuilder() + .setSchema(schema) + // Day-partition on the event timestamp for cost/pruning parity with the Python + // plugin. Time-filtered analytics queries prune partitions instead of full scans. + .setTimePartitioning( + TimePartitioning.newBuilder(TimePartitioning.Type.DAY) + .setField("timestamp") + .build()); if (!config.clusteringFields().isEmpty()) { tableDefinitionBuilder.setClustering( Clustering.newBuilder().setFields(config.clusteringFields()).build()); @@ -161,10 +204,21 @@ private void ensureTableExists(BigQuery bigQuery, BigQueryLoggerConfig config) { ImmutableMap.of( BigQuerySchema.SCHEMA_VERSION_LABEL_KEY, BigQuerySchema.SCHEMA_VERSION)) .build(); - bigQuery.create(tableInfo); + try { + bigQuery.create(tableInfo); + } catch (BigQueryException e) { + // Another writer may have created the table concurrently; treat that as success. + String msg = e.getMessage(); + if (msg != null && msg.toLowerCase(Locale.ROOT).contains("already exists")) { + logger.info("BigQuery table already exists (concurrent create): " + tableId); + } else { + throw e; + } + } } else if (config.autoSchemaUpgrade()) { maybeUpgradeSchema(bigQuery, table); } + tableReady = true; } catch (BigQueryException e) { processBigQueryException(e, "Failed to check or create/upgrade BigQuery table: " + tableId); } catch (RuntimeException e) { @@ -178,6 +232,7 @@ private void ensureTableExists(BigQuery bigQuery, BigQueryLoggerConfig config) { } catch (RuntimeException e) { logger.log(Level.WARNING, "Failed to create/update BigQuery views for table: " + tableId, e); } + return tableReady; } private void processBigQueryException(BigQueryException e, String logMessage) { @@ -234,7 +289,7 @@ private Completable logEvent( Map row = new HashMap<>(); row.put("timestamp", Instant.now()); row.put("event_type", eventType); - row.put("agent", invocationContext.agent().name()); + row.put("agent", resolveAgentName(invocationContext)); row.put("session_id", invocationContext.session().id()); row.put("invocation_id", invocationContext.invocationId()); row.put("user_id", invocationContext.userId()); @@ -295,6 +350,22 @@ private Completable logEvent( return Completable.complete(); } + /** + * Resolves the agent name defensively. Workflow-driven callbacks may have no current agent; fall + * back to a sentinel rather than letting an NPE drop the row. + */ + private static String resolveAgentName(InvocationContext invocationContext) { + try { + BaseAgent agent = invocationContext.agent(); + if (agent != null && agent.name() != null) { + return agent.name(); + } + } catch (RuntimeException e) { + // Fall through to the sentinel below. + } + return "unknown"; + } + private ResolvedTraceIds getResolvedTraceIds( InvocationContext invocationContext, Optional eventData) { TraceManager traceManager = state.getTraceManager(invocationContext.invocationId()); @@ -329,6 +400,9 @@ private Map getAttributes( EventData eventData, InvocationContext invocationContext) { Map attributes = new HashMap<>(eventData.extraAttributes()); TraceManager traceManager = state.getTraceManager(invocationContext.invocationId()); + // Populate the root agent name from the invocation context if it has not been set yet, so + // attributes.root_agent_name is a real name rather than the sentinel default. + traceManager.initTraceIfNeeded(invocationContext); attributes.put("root_agent_name", traceManager.getRootAgentName()); eventData.model().ifPresent(m -> attributes.put("model", m)); eventData.modelVersion().ifPresent(mv -> attributes.put("model_version", mv)); @@ -446,19 +520,24 @@ public Maybe onEventCallback(InvocationContext invocationContext, Event e if (state.isProcessed(invocationContext.invocationId())) { return Maybe.empty(); } - EventData.Builder eventDataBuilder = - EventData.builder() - .setExtraAttributes( - ImmutableMap.builder() - .put("state_delta", event.actions().stateDelta()) - .put("author", event.author()) - .buildOrThrow()); - Completable logCompletable = - logEvent( - "STATE_DELTA", - invocationContext, - event.content().orElse(null), - Optional.of(eventDataBuilder.build())); + // Only emit STATE_DELTA when there is an actual state change, matching the Python plugin + // (which does not write a STATE_DELTA row for events with an empty state delta). + Completable logCompletable = Completable.complete(); + if (!event.actions().stateDelta().isEmpty()) { + EventData.Builder eventDataBuilder = + EventData.builder() + .setExtraAttributes( + ImmutableMap.builder() + .put("state_delta", event.actions().stateDelta()) + .put("author", event.author()) + .buildOrThrow()); + logCompletable = + logEvent( + "STATE_DELTA", + invocationContext, + event.content().orElse(null), + Optional.of(eventDataBuilder.build())); + } if (event.content().isPresent() && event.content().get().parts().isPresent()) { for (Part part : event.content().get().parts().get()) { diff --git a/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryLoggerConfig.java b/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryLoggerConfig.java index 92a35b7d7..ec05c2616 100644 --- a/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryLoggerConfig.java +++ b/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryLoggerConfig.java @@ -222,7 +222,23 @@ public abstract Builder contentFormatter( @CanIgnoreReturnValue public abstract Builder credentials(Credentials credentials); - public abstract BigQueryLoggerConfig build(); + abstract BigQueryLoggerConfig autoBuild(); + + public BigQueryLoggerConfig build() { + BigQueryLoggerConfig config = autoBuild(); + if (config.batchSize() <= 0) { + throw new IllegalArgumentException("batchSize must be positive, got " + config.batchSize()); + } + if (config.queueMaxSize() <= 0) { + throw new IllegalArgumentException( + "queueMaxSize must be positive, got " + config.queueMaxSize()); + } + if (config.maxContentLength() <= 0) { + throw new IllegalArgumentException( + "maxContentLength must be positive, got " + config.maxContentLength()); + } + return config; + } } /** Retry configuration for BigQuery writes. */ diff --git a/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryUtils.java b/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryUtils.java index 9d8be582a..93582c067 100644 --- a/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryUtils.java +++ b/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryUtils.java @@ -16,8 +16,6 @@ package com.google.adk.plugins.agentanalytics; -import static com.google.adk.plugins.agentanalytics.BigQuerySchema.SCHEMA_VERSION; -import static com.google.adk.plugins.agentanalytics.BigQuerySchema.SCHEMA_VERSION_LABEL_KEY; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; import static java.util.stream.Collectors.toCollection; @@ -179,8 +177,27 @@ static String getVersionHeaderValue() { return FRAMEWORK_PREFIX + "/" + Version.JAVA_ADK_VERSION; } + private static final java.util.regex.Pattern SAFE_IDENTIFIER = + java.util.regex.Pattern.compile("[A-Za-z0-9_\\-]+"); + + private static boolean isSafeIdentifier(String id) { + return id != null && SAFE_IDENTIFIER.matcher(id).matches(); + } + /** Creates and/or replaces the analytics views in BigQuery. */ static void createAnalyticsViews(BigQuery bigQuery, BigQueryLoggerConfig config) { + // View DDL is assembled by string interpolation; refuse to build it if any operator-supplied + // identifier contains characters (backticks, quotes, dots, semicolons) that could break or + // redirect the statement. + if (!isSafeIdentifier(config.projectId()) + || !isSafeIdentifier(config.datasetId()) + || !isSafeIdentifier(config.tableName()) + || !isSafeIdentifier(config.viewPrefix())) { + logger.warning( + "Skipping analytics view creation: project/dataset/table/viewPrefix contains characters" + + " that are unsafe to interpolate into DDL."); + return; + } for (Map.Entry> entry : EVENT_VIEW_DEFS.entrySet()) { String eventType = entry.getKey(); ImmutableList extraCols = entry.getValue(); @@ -215,17 +232,11 @@ static void createAnalyticsViews(BigQuery bigQuery, BigQueryLoggerConfig config) } } - /** Adds missing columns to an existing table if the schema version has changed. */ + /** Adds missing columns to an existing table if the actual schema is behind the desired schema. */ static void maybeUpgradeSchema(BigQuery bigQuery, Table existingTable) { - String storedVersion = - Optional.ofNullable(existingTable.getLabels()) - .map(labels -> labels.get(SCHEMA_VERSION_LABEL_KEY)) - .orElse(""); - - if (storedVersion.equals(SCHEMA_VERSION)) { - return; - } - + // Always diff the actual table schema against the desired schema rather than trusting the + // stored version label alone: a table stamped with the current label can still be missing + // columns (e.g. it was created by an older build), and those must be reconciled. SchemaDiff diff = schemaFieldsMatch( existingTable.getDefinition().getSchema().getFields(), @@ -314,6 +325,21 @@ private static SchemaDiff schemaFieldsMatch(FieldList existing, FieldList desire .setType(StandardSQLTypeName.STRUCT, FieldList.of(mergedSub)) .build()); } + } else if (!desiredField + .getType() + .getStandardType() + .equals(existingField.getType().getStandardType())) { + // Additive auto-upgrade cannot reconcile a type change on an existing column. Surface it + // instead of silently ignoring it, since it will otherwise appear later as opaque Storage + // Write append failures. + logger.warning( + String.format( + "Incompatible schema drift on column '%s': table has %s but the plugin expects %s." + + " This cannot be auto-upgraded; writes may fail until the column is fixed" + + " manually.", + desiredField.getName(), + existingField.getType().getStandardType(), + desiredField.getType().getStandardType())); } } return new SchemaDiff(ImmutableList.copyOf(newFields), ImmutableList.copyOf(updatedRecords)); diff --git a/core/src/main/java/com/google/adk/plugins/agentanalytics/EventData.java b/core/src/main/java/com/google/adk/plugins/agentanalytics/EventData.java index 8fd95a070..351ca3e28 100644 --- a/core/src/main/java/com/google/adk/plugins/agentanalytics/EventData.java +++ b/core/src/main/java/com/google/adk/plugins/agentanalytics/EventData.java @@ -1,3 +1,19 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.google.adk.plugins.agentanalytics; import com.google.auto.value.AutoValue; @@ -6,7 +22,7 @@ import java.util.Map; import java.util.Optional; -/** Typed container for structured fields passed to _log_event. */ +/** Typed container for structured fields passed to the plugin's event-logging method. */ @AutoValue abstract class EventData { abstract Optional spanIdOverride(); diff --git a/core/src/main/java/com/google/adk/plugins/agentanalytics/JsonFormatter.java b/core/src/main/java/com/google/adk/plugins/agentanalytics/JsonFormatter.java index 58049a9ef..ca514fefc 100644 --- a/core/src/main/java/com/google/adk/plugins/agentanalytics/JsonFormatter.java +++ b/core/src/main/java/com/google/adk/plugins/agentanalytics/JsonFormatter.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.auto.value.AutoValue; import com.google.common.base.Utf8; +import com.google.common.collect.ImmutableSet; import java.util.IdentityHashMap; import java.util.Map; import java.util.Set; @@ -36,6 +37,23 @@ final class JsonFormatter { static final ObjectMapper mapper = new ObjectMapper().findAndRegisterModules(); static final String TRUNCATION_SUFFIX = "...[truncated]"; static final String CYCLE_DETECTED_MESSAGE = "[cycle detected]"; + static final String MAX_DEPTH_MESSAGE = "[max depth exceeded]"; + static final String REDACTED_MESSAGE = "[REDACTED]"; + // Guard against unbounded recursion on deeply nested (non-cyclic) payloads. + static final int MAX_TRUNCATE_DEPTH = 200; + + // Keys whose values are redacted before logging. Mirrors the Python BQAA plugin's + // _SENSITIVE_KEYS (OAuth tokens / secrets); matching is case-insensitive, plus any + // key prefixed with "temp:" (ADK temporary session state). + private static final ImmutableSet SENSITIVE_KEYS = + ImmutableSet.of( + "client_secret", "access_token", "refresh_token", "id_token", "api_key", "password"); + private static final String TEMP_KEY_PREFIX = "temp:"; + + private static boolean isSensitiveKey(String key) { + String lower = key.toLowerCase(java.util.Locale.ROOT); + return SENSITIVE_KEYS.contains(lower) || lower.startsWith(TEMP_KEY_PREFIX); + } @AutoValue abstract static class TruncationResult { @@ -55,12 +73,14 @@ static TruncationResult smartTruncate(Object obj, int maxLength) { } try { if (obj instanceof JsonNode jsonNode) { - return recursiveSmartTruncate(jsonNode, maxLength, newSetFromMap(new IdentityHashMap<>())); + return recursiveSmartTruncate( + jsonNode, maxLength, newSetFromMap(new IdentityHashMap<>()), 0); } return recursiveSmartTruncate( - mapper.valueToTree(obj), maxLength, newSetFromMap(new IdentityHashMap<>())); + mapper.valueToTree(obj), maxLength, newSetFromMap(new IdentityHashMap<>()), 0); } catch (IllegalArgumentException e) { - // Fallback for types that mapper can't handle directly as a tree + // Fallback for types that mapper can't handle directly as a tree. + logger.fine("smartTruncate falling back to string conversion: " + e.getMessage()); return truncateWithStatus(safeToString(obj), maxLength); } } @@ -87,7 +107,10 @@ static String safeToString(Object obj) { } private static TruncationResult recursiveSmartTruncate( - JsonNode node, int maxLength, Set visited) { + JsonNode node, int maxLength, Set visited, int depth) { + if (depth > MAX_TRUNCATE_DEPTH) { + return TruncationResult.create(mapper.valueToTree(MAX_DEPTH_MESSAGE), true); + } if (node.isContainerNode()) { if (visited.contains(node)) { return TruncationResult.create(mapper.valueToTree(CYCLE_DETECTED_MESSAGE), true); @@ -106,7 +129,14 @@ private static TruncationResult recursiveSmartTruncate( ObjectNode newNode = mapper.createObjectNode(); Set> properties = node.properties(); for (Map.Entry entry : properties) { - TruncationResult res = recursiveSmartTruncate(entry.getValue(), maxLength, visited); + // Redact sensitive values without descending into them. Per parity with the + // Python plugin, redaction does not set the is_truncated flag. + if (isSensitiveKey(entry.getKey())) { + newNode.set(entry.getKey(), mapper.valueToTree(REDACTED_MESSAGE)); + continue; + } + TruncationResult res = + recursiveSmartTruncate(entry.getValue(), maxLength, visited, depth + 1); newNode.set(entry.getKey(), res.node()); isTruncated = isTruncated || res.isTruncated(); } @@ -114,7 +144,7 @@ private static TruncationResult recursiveSmartTruncate( } else if (node.isArray()) { ArrayNode newNode = mapper.createArrayNode(); for (JsonNode element : node) { - TruncationResult res = recursiveSmartTruncate(element, maxLength, visited); + TruncationResult res = recursiveSmartTruncate(element, maxLength, visited, depth + 1); newNode.add(res.node()); isTruncated = isTruncated || res.isTruncated(); } diff --git a/core/src/main/java/com/google/adk/plugins/agentanalytics/PluginState.java b/core/src/main/java/com/google/adk/plugins/agentanalytics/PluginState.java index d1826ec5e..680052df7 100644 --- a/core/src/main/java/com/google/adk/plugins/agentanalytics/PluginState.java +++ b/core/src/main/java/com/google/adk/plugins/agentanalytics/PluginState.java @@ -20,6 +20,7 @@ import io.reactivex.rxjava3.core.Completable; import java.io.IOException; import java.util.Collection; +import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; @@ -64,6 +65,11 @@ class PluginState { private final Parser parser; private final ConcurrentHashMap>> pendingTasks = new ConcurrentHashMap<>(); + // Drop counters accumulated from BatchProcessors that have already been closed/removed, so the + // aggregate survives per-invocation processor churn. + private final AtomicLong droppedQueueFull = new AtomicLong(); + private final AtomicLong droppedAppendError = new AtomicLong(); + private final AtomicLong droppedSerializationError = new AtomicLong(); PluginState(BigQueryLoggerConfig config) throws IOException { this.config = config; @@ -106,7 +112,7 @@ ScheduledExecutorService getExecutor() { boolean isProcessed(String invocationId) { boolean isProcessed = processedInvocations.getIfPresent(invocationId) != null; if (isProcessed) { - logger.info("Invocation ID: " + invocationId + " already processed"); + logger.fine("Invocation ID: " + invocationId + " already processed"); } return isProcessed; } @@ -142,6 +148,9 @@ protected StreamWriter createWriter() { .setTraceId(BigQueryUtils.getVersionHeaderValue() + ":" + UUID.randomUUID()) .setRetrySettings(retrySettings) .setWriterSchema(BigQuerySchema.getArrowSchema()) + // Route Storage Write append RPCs to the dataset's region. Without this, appends to any + // location other than the US multi-region can fail with stream-not-found errors. + .setLocation(config.location()) .build(); } catch (Exception e) { throw new VerifyException("Failed to create StreamWriter for " + streamName, e); @@ -236,7 +245,7 @@ Completable ensureInvocationCompleted(String invocationId) { Completable.fromCompletionStage( CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0]))); } - logger.info("Waiting for pending tasks to complete for invocation ID: " + invocationId); + logger.fine("Waiting for pending tasks to complete for invocation ID: " + invocationId); return tasksState .timeout(config.shutdownTimeout().toMillis(), MILLISECONDS) .doOnError( @@ -257,17 +266,45 @@ Completable ensureInvocationCompleted(String invocationId) { BatchProcessor processor = removeProcessor(invocationId); if (processor != null) { processor.flush(); + foldDropStats(processor); processor.close(); } TraceManager traceManager = removeTraceManager(invocationId); if (traceManager != null) { traceManager.clearStack(); } - logger.info("Removing pending tasks for invocation ID: " + invocationId); + logger.fine("Removing pending tasks for invocation ID: " + invocationId); pendingTasks.remove(invocationId); }); } + private void foldDropStats(BatchProcessor processor) { + Map stats = processor.getDropStats(); + droppedQueueFull.addAndGet(stats.getOrDefault("queue_full", 0L)); + droppedAppendError.addAndGet(stats.getOrDefault("append_error", 0L)); + droppedSerializationError.addAndGet(stats.getOrDefault("serialization_error", 0L)); + } + + /** + * Aggregated dropped-row counters across closed and still-live BatchProcessors. Non-zero values + * indicate analytics rows that never reached BigQuery. + */ + ImmutableMap getDropStats() { + long queueFull = droppedQueueFull.get(); + long appendError = droppedAppendError.get(); + long serializationError = droppedSerializationError.get(); + for (BatchProcessor processor : getBatchProcessors()) { + Map stats = processor.getDropStats(); + queueFull += stats.getOrDefault("queue_full", 0L); + appendError += stats.getOrDefault("append_error", 0L); + serializationError += stats.getOrDefault("serialization_error", 0L); + } + return ImmutableMap.of( + "queue_full", queueFull, + "append_error", appendError, + "serialization_error", serializationError); + } + Completable close() { ImmutableList> tasks = pendingTasks.values().stream().flatMap(Set::stream).collect(toImmutableList()); @@ -290,6 +327,7 @@ Completable close() { .doFinally( () -> { for (BatchProcessor processor : getBatchProcessors()) { + foldDropStats(processor); processor.close(); } for (TraceManager traceManager : getTraceManagers()) { diff --git a/core/src/main/java/com/google/adk/plugins/agentanalytics/TraceManager.java b/core/src/main/java/com/google/adk/plugins/agentanalytics/TraceManager.java index a02ea00b4..00155990b 100644 --- a/core/src/main/java/com/google/adk/plugins/agentanalytics/TraceManager.java +++ b/core/src/main/java/com/google/adk/plugins/agentanalytics/TraceManager.java @@ -43,8 +43,10 @@ public final class TraceManager { private static final Logger logger = Logger.getLogger(TraceManager.class.getName()); + static final String DEFAULT_ROOT_AGENT_NAME = "_bq_analytics_root_agent_name"; + private final ConcurrentLinkedDeque records = new ConcurrentLinkedDeque<>(); - private String rootAgentName = "_bq_analytics_root_agent_name"; + private String rootAgentName = DEFAULT_ROOT_AGENT_NAME; private String activeInvocationId = "_bq_analytics_active_invocation_id"; private final Tracer tracer; @@ -103,8 +105,26 @@ public String getRootAgentName() { } public void initTrace(InvocationContext context) { - String rootAgentName = context.agent().rootAgent().name(); - this.rootAgentName = rootAgentName; + var rootAgent = context.agent().rootAgent(); + if (rootAgent != null && rootAgent.name() != null) { + this.rootAgentName = rootAgent.name(); + } + } + + /** + * Sets the root agent name from the invocation context if it is still the sentinel default. + * Null-safe: workflow-driven callbacks with no current agent leave the sentinel in place for a + * later event to resolve. + */ + public void initTraceIfNeeded(InvocationContext context) { + if (!DEFAULT_ROOT_AGENT_NAME.equals(rootAgentName)) { + return; + } + try { + initTrace(context); + } catch (RuntimeException e) { + // Leave the sentinel; a subsequent event may be able to resolve the root agent. + } } public String getTraceId(InvocationContext context) { @@ -173,7 +193,7 @@ public void ensureInvocationSpan(InvocationContext context) { if (currentInv.equals(activeInvocationId)) { return; } - logger.info("Clearing stale span records from previous invocation."); + logger.fine("Clearing stale span records from previous invocation."); clearStack(); } From 11d2fab3117528d56856fc534088ff9ac13f5706 Mon Sep 17 00:00:00 2001 From: Haiyuan Cao Date: Wed, 1 Jul 2026 12:20:01 -0700 Subject: [PATCH 2/4] Address PR #1318 review: harden table-ensure, drop stats, drift, license, agent fallback - F1: maybeUpgradeSchema now returns success; ensureTableExists only marks the table ready when create/existing/upgrade all succeed (failed upgrade retries). - F2: add missing Apache license header to PluginState.java. - F3: fold drop counters AFTER BatchProcessor.close() so final-drain drops count. - F4: schema drift check now also compares Field.Mode (was type-only), incl. nested fields via recursion. - F5: agent column falls back to Event.author (via EventData.fallbackAgentName) on source-event rows (STATE_DELTA/A2A/AGENT_RESPONSE) before the "unknown" sentinel, matching the Python plugin. Still untested locally; unit tests to follow. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../BigQueryAgentAnalyticsPlugin.java | 54 +++++++++++++----- .../plugins/agentanalytics/BigQueryUtils.java | 56 +++++++++++++------ .../adk/plugins/agentanalytics/EventData.java | 6 ++ .../plugins/agentanalytics/PluginState.java | 22 +++++++- 4 files changed, 105 insertions(+), 33 deletions(-) diff --git a/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPlugin.java b/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPlugin.java index a0cbe2275..5571008eb 100644 --- a/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPlugin.java +++ b/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPlugin.java @@ -215,10 +215,14 @@ private boolean ensureTableExists(BigQuery bigQuery, BigQueryLoggerConfig config throw e; } } + tableReady = true; } else if (config.autoSchemaUpgrade()) { - maybeUpgradeSchema(bigQuery, table); + // Only treat the table as ready if the schema upgrade actually succeeded, so a failed + // upgrade is retried on a later event instead of being masked by tableEnsured=true. + tableReady = maybeUpgradeSchema(bigQuery, table); + } else { + tableReady = true; } - tableReady = true; } catch (BigQueryException e) { processBigQueryException(e, "Failed to check or create/upgrade BigQuery table: " + tableId); } catch (RuntimeException e) { @@ -289,7 +293,7 @@ private Completable logEvent( Map row = new HashMap<>(); row.put("timestamp", Instant.now()); row.put("event_type", eventType); - row.put("agent", resolveAgentName(invocationContext)); + row.put("agent", resolveAgentName(invocationContext, eventData)); row.put("session_id", invocationContext.session().id()); row.put("invocation_id", invocationContext.invocationId()); row.put("user_id", invocationContext.userId()); @@ -352,18 +356,28 @@ private Completable logEvent( /** * Resolves the agent name defensively. Workflow-driven callbacks may have no current agent; fall - * back to a sentinel rather than letting an NPE drop the row. + * back to the event author (via {@code EventData.fallbackAgentName}, mirroring the Python plugin) + * and finally to a sentinel, rather than letting an NPE drop the row. */ - private static String resolveAgentName(InvocationContext invocationContext) { + private static String resolveAgentName( + InvocationContext invocationContext, Optional eventData) { try { BaseAgent agent = invocationContext.agent(); if (agent != null && agent.name() != null) { return agent.name(); } } catch (RuntimeException e) { - // Fall through to the sentinel below. + // Fall through to the author/sentinel fallback below. + } + return eventData.flatMap(EventData::fallbackAgentName).orElse("unknown"); + } + + private static EventData.Builder withFallbackAgent( + EventData.Builder builder, @Nullable String author) { + if (author != null && !author.isEmpty()) { + builder.setFallbackAgentName(author); } - return "unknown"; + return builder; } private ResolvedTraceIds getResolvedTraceIds( @@ -525,12 +539,14 @@ public Maybe onEventCallback(InvocationContext invocationContext, Event e Completable logCompletable = Completable.complete(); if (!event.actions().stateDelta().isEmpty()) { EventData.Builder eventDataBuilder = - EventData.builder() - .setExtraAttributes( - ImmutableMap.builder() - .put("state_delta", event.actions().stateDelta()) - .put("author", event.author()) - .buildOrThrow()); + withFallbackAgent( + EventData.builder() + .setExtraAttributes( + ImmutableMap.builder() + .put("state_delta", event.actions().stateDelta()) + .put("author", event.author()) + .buildOrThrow()), + event.author()); logCompletable = logEvent( "STATE_DELTA", @@ -618,7 +634,11 @@ public Maybe onEventCallback(InvocationContext invocationContext, Event e invocationContext, contentObject, a2aTruncated.isTruncated() || contentTruncated, - Optional.of(EventData.builder().setExtraAttributes(extraAttributes).build()))); + Optional.of( + withFallbackAgent( + EventData.builder().setExtraAttributes(extraAttributes), + event.author()) + .build()))); } } @@ -653,7 +673,11 @@ public Maybe onEventCallback(InvocationContext invocationContext, Event e invocationContext, visibleContent, false, - Optional.of(EventData.builder().setExtraAttributes(extraAttributes).build()))); + Optional.of( + withFallbackAgent( + EventData.builder().setExtraAttributes(extraAttributes), + event.author()) + .build()))); } } diff --git a/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryUtils.java b/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryUtils.java index 93582c067..47ce486b0 100644 --- a/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryUtils.java +++ b/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryUtils.java @@ -233,7 +233,7 @@ static void createAnalyticsViews(BigQuery bigQuery, BigQueryLoggerConfig config) } /** Adds missing columns to an existing table if the actual schema is behind the desired schema. */ - static void maybeUpgradeSchema(BigQuery bigQuery, Table existingTable) { + static boolean maybeUpgradeSchema(BigQuery bigQuery, Table existingTable) { // Always diff the actual table schema against the desired schema rather than trusting the // stored version label alone: a table stamped with the current label can still be missing // columns (e.g. it was created by an older build), and those must be reconciled. @@ -242,7 +242,12 @@ static void maybeUpgradeSchema(BigQuery bigQuery, Table existingTable) { existingTable.getDefinition().getSchema().getFields(), BigQuerySchema.getEventsSchema().getFields()); - if (!diff.newTopLevelFields().isEmpty() || !diff.updatedRecordFields().isEmpty()) { + if (diff.newTopLevelFields().isEmpty() && diff.updatedRecordFields().isEmpty()) { + // Nothing to reconcile; the table already satisfies the desired schema. + return true; + } + + { ImmutableMap updatedFields = diff.updatedRecordFields().stream().collect(toImmutableMap(Field::getName, f -> f)); ImmutableSet updatedNames = updatedFields.keySet(); @@ -281,9 +286,11 @@ static void maybeUpgradeSchema(BigQuery bigQuery, Table existingTable) { .build(); var unused = bigQuery.update(updatedTable); + return true; } catch (BigQueryException e) { logger.log( Level.WARNING, "Schema auto-upgrade failed for " + existingTable.getTableId(), e); + return false; } } } @@ -325,26 +332,43 @@ private static SchemaDiff schemaFieldsMatch(FieldList existing, FieldList desire .setType(StandardSQLTypeName.STRUCT, FieldList.of(mergedSub)) .build()); } - } else if (!desiredField - .getType() - .getStandardType() - .equals(existingField.getType().getStandardType())) { - // Additive auto-upgrade cannot reconcile a type change on an existing column. Surface it - // instead of silently ignoring it, since it will otherwise appear later as opaque Storage + } else { + // Additive auto-upgrade cannot reconcile a type or mode change on an existing column + // (including nested non-STRUCT fields, since this method recurses into STRUCTs). Surface + // it instead of silently ignoring it, since it otherwise appears later as opaque Storage // Write append failures. - logger.warning( - String.format( - "Incompatible schema drift on column '%s': table has %s but the plugin expects %s." - + " This cannot be auto-upgraded; writes may fail until the column is fixed" - + " manually.", - desiredField.getName(), - existingField.getType().getStandardType(), - desiredField.getType().getStandardType())); + boolean typeDrift = + !desiredField + .getType() + .getStandardType() + .equals(existingField.getType().getStandardType()); + boolean modeDrift = !modesEqual(existingField.getMode(), desiredField.getMode()); + if (typeDrift || modeDrift) { + logger.warning( + String.format( + "Incompatible schema drift on column '%s': table has %s/%s but the plugin expects" + + " %s/%s. This cannot be auto-upgraded; writes may fail until the column is" + + " fixed manually.", + desiredField.getName(), + existingField.getType().getStandardType(), + normalizeMode(existingField.getMode()), + desiredField.getType().getStandardType(), + normalizeMode(desiredField.getMode()))); + } } } return new SchemaDiff(ImmutableList.copyOf(newFields), ImmutableList.copyOf(updatedRecords)); } + // BigQuery leaves Field.getMode() null to mean NULLABLE; normalize before comparing. + private static Field.Mode normalizeMode(Field.Mode mode) { + return mode == null ? Field.Mode.NULLABLE : mode; + } + + private static boolean modesEqual(Field.Mode a, Field.Mode b) { + return normalizeMode(a) == normalizeMode(b); + } + private record SchemaDiff( ImmutableList newTopLevelFields, ImmutableList updatedRecordFields) {} diff --git a/core/src/main/java/com/google/adk/plugins/agentanalytics/EventData.java b/core/src/main/java/com/google/adk/plugins/agentanalytics/EventData.java index 351ca3e28..41d0ee312 100644 --- a/core/src/main/java/com/google/adk/plugins/agentanalytics/EventData.java +++ b/core/src/main/java/com/google/adk/plugins/agentanalytics/EventData.java @@ -47,6 +47,10 @@ abstract class EventData { abstract Optional traceIdOverride(); + // Fallback name for the `agent` column when the InvocationContext has no current agent (e.g. + // workflow-driven callbacks). Mirrors the Python plugin's fallback to Event.author. + abstract Optional fallbackAgentName(); + static Builder builder() { return new AutoValue_EventData.Builder().setStatus("OK").setExtraAttributes(ImmutableMap.of()); } @@ -75,6 +79,8 @@ abstract static class Builder { abstract Builder setTraceIdOverride(String value); + abstract Builder setFallbackAgentName(String value); + abstract EventData build(); } } diff --git a/core/src/main/java/com/google/adk/plugins/agentanalytics/PluginState.java b/core/src/main/java/com/google/adk/plugins/agentanalytics/PluginState.java index 680052df7..241528823 100644 --- a/core/src/main/java/com/google/adk/plugins/agentanalytics/PluginState.java +++ b/core/src/main/java/com/google/adk/plugins/agentanalytics/PluginState.java @@ -1,3 +1,19 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.google.adk.plugins.agentanalytics; import static com.google.adk.plugins.agentanalytics.BigQueryUtils.getVersionHeaderValue; @@ -266,8 +282,9 @@ Completable ensureInvocationCompleted(String invocationId) { BatchProcessor processor = removeProcessor(invocationId); if (processor != null) { processor.flush(); - foldDropStats(processor); processor.close(); + // Fold after close() so rows dropped during the final drain are also counted. + foldDropStats(processor); } TraceManager traceManager = removeTraceManager(invocationId); if (traceManager != null) { @@ -327,8 +344,9 @@ Completable close() { .doFinally( () -> { for (BatchProcessor processor : getBatchProcessors()) { - foldDropStats(processor); processor.close(); + // Fold after close() so rows dropped during the final drain are also counted. + foldDropStats(processor); } for (TraceManager traceManager : getTraceManagers()) { traceManager.clearStack(); From 08fe97044f55587be8eae51622ed5cae54b38a53 Mon Sep 17 00:00:00 2001 From: Haiyuan Cao Date: Wed, 1 Jul 2026 15:44:36 -0700 Subject: [PATCH 3/4] Preserve internal span tree under ambient OTel + add unit tests Deferred #1316 P1 item: span_id/parent_span_id now always come from the BQAA internal execution tree (getCurrentSpanAndParent) instead of preferring ambient OpenTelemetry span IDs, so parent_span_id always references another logged row rather than an unlogged framework span. Ambient OTel still governs trace_id via getTraceId. The completed/error callbacks (LLM/tool) and getCompletedEventData now always record the popped internal span, and the now-dead getAmbientSpanAndParent() helper is removed. Tests (JUnit4/vintage, matching existing style): - JsonFormatterTest: sensitive-key redaction (top-level, case-insensitive + temp: prefix, nested; is_truncated unaffected) and recursion depth guard. - BigQueryLoggerConfigTest (new): build() rejects non-positive batchSize/queueMaxSize/maxContentLength. - TraceManagerTest: initTraceIfNeeded sets root agent name; null-agent keeps the sentinel. Still not executed locally; CI verifies. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../BigQueryAgentAnalyticsPlugin.java | 58 ++++++++--------- .../plugins/agentanalytics/TraceManager.java | 17 ----- .../BigQueryLoggerConfigTest.java | 55 ++++++++++++++++ .../agentanalytics/JsonFormatterTest.java | 64 +++++++++++++++++++ .../agentanalytics/TraceManagerTest.java | 15 +++++ 5 files changed, 161 insertions(+), 48 deletions(-) create mode 100644 core/src/test/java/com/google/adk/plugins/agentanalytics/BigQueryLoggerConfigTest.java diff --git a/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPlugin.java b/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPlugin.java index 5571008eb..eb1cb62e5 100644 --- a/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPlugin.java +++ b/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPlugin.java @@ -387,8 +387,11 @@ private ResolvedTraceIds getResolvedTraceIds( eventData .flatMap(EventData::traceIdOverride) .orElseGet(() -> traceManager.getTraceId(invocationContext)); - Optional ambientSpanIds = traceManager.getAmbientSpanAndParent(); - SpanIds spanIds = ambientSpanIds.orElse(traceManager.getCurrentSpanAndParent()); + // span_id / parent_span_id must reference the BQAA internal execution tree (spans that are + // written as rows), not an ambient OpenTelemetry framework span that is never logged as a row. + // Otherwise parent_span_id would dangle. Ambient OTel still governs trace_id (via getTraceId) + // for cross-system correlation. + SpanIds spanIds = traceManager.getCurrentSpanAndParent(); return new ResolvedTraceIds( traceId, @@ -481,15 +484,13 @@ private Optional getCompletedEventData(InvocationContext invocationCo EventData.Builder eventDataBuilder = EventData.builder(); eventDataBuilder.setTraceIdOverride(traceId); eventDataBuilder.setLatency(popped.get().duration()); - // Only override span IDs when no ambient OTel span exists. - // Keep STARTING/COMPLETED pairs consistent. - if (!traceManager.hasAmbientSpan()) { - if (parentSpanId.isPresent()) { - eventDataBuilder.setParentSpanIdOverride(parentSpanId.get()); - } - if (popped.get().spanId() != null) { - eventDataBuilder.setSpanIdOverride(popped.get().spanId()); - } + // Always record the internal execution-tree span so the STARTING/COMPLETED pair stays + // internally joinable and parent_span_id references a logged row, regardless of ambient OTel. + if (parentSpanId.isPresent()) { + eventDataBuilder.setParentSpanIdOverride(parentSpanId.get()); + } + if (popped.get().spanId() != null) { + eventDataBuilder.setSpanIdOverride(popped.get().spanId()); } return Optional.of(eventDataBuilder.build()); } @@ -860,8 +861,9 @@ public Maybe afterModelCallback( } } - boolean hasAmbient = traceManager.hasAmbientSpan(); - boolean useOverride = isPopped && !hasAmbient; + // Always record the internal execution-tree span for the final response so parent_span_id + // references a logged row, regardless of any ambient OpenTelemetry span. + boolean useOverride = isPopped; EventData.Builder eventDataBuilder = EventData.builder(); if (!duration.isZero()) { @@ -909,19 +911,17 @@ public Maybe onModelErrorCallback( SpanIds spanIds = traceManager.getCurrentSpanAndParent(); String parentSpanId = spanIds.spanId().orElse(null); - boolean hasAmbient = traceManager.hasAmbientSpan(); EventData.Builder eventDataBuilder = EventData.builder().setStatus("ERROR").setErrorMessage(error.getMessage()); if (popped.isPresent()) { eventDataBuilder.setLatency(popped.get().duration()); } - if (!hasAmbient) { - if (spanId != null) { - eventDataBuilder.setSpanIdOverride(spanId); - } - if (parentSpanId != null) { - eventDataBuilder.setParentSpanIdOverride(parentSpanId); - } + // Always record the internal execution-tree span so parent_span_id references a logged row. + if (spanId != null) { + eventDataBuilder.setSpanIdOverride(spanId); + } + if (parentSpanId != null) { + eventDataBuilder.setParentSpanIdOverride(parentSpanId); } return logEvent("LLM_ERROR", invocationContext, null, Optional.of(eventDataBuilder.build())) .andThen(Maybe.empty()); @@ -966,16 +966,14 @@ public Maybe> afterToolCallback( getToolOrigin(tool)); SpanIds spanIds = traceManager.getCurrentSpanAndParent(); - boolean hasAmbient = traceManager.hasAmbientSpan(); EventData.Builder eventDataBuilder = EventData.builder(); if (popped.isPresent()) { eventDataBuilder.setLatency(popped.get().duration()); } - if (!hasAmbient) { - popped.ifPresent(p -> eventDataBuilder.setSpanIdOverride(p.spanId())); - spanIds.spanId().ifPresent(eventDataBuilder::setParentSpanIdOverride); - } + // Always record the internal execution-tree span so parent_span_id references a logged row. + popped.ifPresent(p -> eventDataBuilder.setSpanIdOverride(p.spanId())); + spanIds.spanId().ifPresent(eventDataBuilder::setParentSpanIdOverride); return logEvent( "TOOL_COMPLETED", @@ -1008,17 +1006,15 @@ public Maybe> onToolErrorCallback( .buildOrThrow(); SpanIds spanIds = traceManager.getCurrentSpanAndParent(); - boolean hasAmbient = traceManager.hasAmbientSpan(); EventData.Builder eventDataBuilder = EventData.builder().setStatus("ERROR").setErrorMessage(error.getMessage()); if (popped.isPresent()) { eventDataBuilder.setLatency(popped.get().duration()); } - if (!hasAmbient) { - popped.ifPresent(p -> eventDataBuilder.setSpanIdOverride(p.spanId())); - spanIds.spanId().ifPresent(eventDataBuilder::setParentSpanIdOverride); - } + // Always record the internal execution-tree span so parent_span_id references a logged row. + popped.ifPresent(p -> eventDataBuilder.setSpanIdOverride(p.spanId())); + spanIds.spanId().ifPresent(eventDataBuilder::setParentSpanIdOverride); return logEvent( "TOOL_ERROR", diff --git a/core/src/main/java/com/google/adk/plugins/agentanalytics/TraceManager.java b/core/src/main/java/com/google/adk/plugins/agentanalytics/TraceManager.java index 00155990b..d7f1acb44 100644 --- a/core/src/main/java/com/google/adk/plugins/agentanalytics/TraceManager.java +++ b/core/src/main/java/com/google/adk/plugins/agentanalytics/TraceManager.java @@ -24,7 +24,6 @@ import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.context.Context; -import io.opentelemetry.sdk.trace.ReadableSpan; import java.time.Duration; import java.time.Instant; import java.util.Iterator; @@ -245,22 +244,6 @@ public SpanIds getCurrentSpanAndParent() { return SpanIds.create(spanId, parentId); } - Optional getAmbientSpanAndParent() { - Span ambient = Span.current(); - if (!ambient.getSpanContext().isValid()) { - return Optional.empty(); - } - String spanId = ambient.getSpanContext().getSpanId(); - String parentSpanId = null; - if (ambient instanceof ReadableSpan readableSpan) { - SpanContext parentCtx = readableSpan.getParentSpanContext(); - if (parentCtx != null && parentCtx.isValid()) { - parentSpanId = parentCtx.getSpanId(); - } - } - return Optional.of(SpanIds.create(spanId, parentSpanId)); - } - public Optional getCurrentSpanId() { if (records.isEmpty()) { return Optional.empty(); diff --git a/core/src/test/java/com/google/adk/plugins/agentanalytics/BigQueryLoggerConfigTest.java b/core/src/test/java/com/google/adk/plugins/agentanalytics/BigQueryLoggerConfigTest.java new file mode 100644 index 000000000..1dce6b890 --- /dev/null +++ b/core/src/test/java/com/google/adk/plugins/agentanalytics/BigQueryLoggerConfigTest.java @@ -0,0 +1,55 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.adk.plugins.agentanalytics; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class BigQueryLoggerConfigTest { + + private static BigQueryLoggerConfig.Builder validBuilder() { + return BigQueryLoggerConfig.builder().projectId("test-project"); + } + + @Test + public void build_validConfig_succeeds() { + BigQueryLoggerConfig config = validBuilder().build(); + assertEquals("test-project", config.projectId()); + assertEquals(1, config.batchSize()); + assertEquals(10000, config.queueMaxSize()); + } + + @Test + public void build_nonPositiveBatchSize_throws() { + assertThrows(IllegalArgumentException.class, () -> validBuilder().batchSize(0).build()); + } + + @Test + public void build_nonPositiveQueueMaxSize_throws() { + assertThrows(IllegalArgumentException.class, () -> validBuilder().queueMaxSize(0).build()); + } + + @Test + public void build_nonPositiveMaxContentLength_throws() { + assertThrows(IllegalArgumentException.class, () -> validBuilder().maxContentLength(-1).build()); + } +} diff --git a/core/src/test/java/com/google/adk/plugins/agentanalytics/JsonFormatterTest.java b/core/src/test/java/com/google/adk/plugins/agentanalytics/JsonFormatterTest.java index 3ade94093..7eed192cb 100644 --- a/core/src/test/java/com/google/adk/plugins/agentanalytics/JsonFormatterTest.java +++ b/core/src/test/java/com/google/adk/plugins/agentanalytics/JsonFormatterTest.java @@ -272,4 +272,68 @@ public void smartTruncate_withCycle_detectsCycle() { assertTrue(result.isTruncated()); assertEquals("[cycle detected]", result.node().get("child").asText()); } + + @Test + public void smartTruncate_redactsSensitiveTopLevelKeys() { + ImmutableMap map = + ImmutableMap.of("api_key", "sk-secret", "password", "hunter2", "keep", "value"); + JsonFormatter.TruncationResult result = JsonFormatter.smartTruncate(map, 5000); + + JsonNode node = result.node(); + assertEquals("[REDACTED]", node.get("api_key").asText()); + assertEquals("[REDACTED]", node.get("password").asText()); + assertEquals("value", node.get("keep").asText()); + // Redaction must not flip the truncation flag. + assertFalse(result.isTruncated()); + } + + @Test + public void smartTruncate_redactsCaseInsensitiveAndTempPrefixKeys() { + ImmutableMap map = + ImmutableMap.of("Access_Token", "abc", "temp:scratch", "xyz", "keep", "ok"); + JsonNode node = JsonFormatter.smartTruncate(map, 5000).node(); + + assertEquals("[REDACTED]", node.get("Access_Token").asText()); + assertEquals("[REDACTED]", node.get("temp:scratch").asText()); + assertEquals("ok", node.get("keep").asText()); + } + + @Test + public void smartTruncate_redactsNestedSensitiveKeys() { + ImmutableMap map = + ImmutableMap.of("outer", ImmutableMap.of("client_secret", "s", "ok", "v")); + JsonNode node = JsonFormatter.smartTruncate(map, 5000).node(); + + assertEquals("[REDACTED]", node.get("outer").get("client_secret").asText()); + assertEquals("v", node.get("outer").get("ok").asText()); + } + + @Test + public void smartTruncate_depthGuard_replacesDeepSubtreeWithSentinel() { + java.util.Map root = new java.util.HashMap<>(); + java.util.Map cur = root; + for (int i = 0; i < 300; i++) { + java.util.Map next = new java.util.HashMap<>(); + cur.put("child", next); + cur = next; + } + + JsonFormatter.TruncationResult result = JsonFormatter.smartTruncate(root, 5000); + assertTrue(result.isTruncated()); + + JsonNode node = result.node(); + boolean foundSentinel = false; + for (int i = 0; i < 400; i++) { + JsonNode child = node.get("child"); + if (child == null) { + break; + } + if (child.isTextual() && JsonFormatter.MAX_DEPTH_MESSAGE.equals(child.asText())) { + foundSentinel = true; + break; + } + node = child; + } + assertTrue("Expected the max-depth sentinel in the deep chain", foundSentinel); + } } diff --git a/core/src/test/java/com/google/adk/plugins/agentanalytics/TraceManagerTest.java b/core/src/test/java/com/google/adk/plugins/agentanalytics/TraceManagerTest.java index b7ce7823f..5526f4404 100644 --- a/core/src/test/java/com/google/adk/plugins/agentanalytics/TraceManagerTest.java +++ b/core/src/test/java/com/google/adk/plugins/agentanalytics/TraceManagerTest.java @@ -227,4 +227,19 @@ public void clearStack_doesNothing_whenRecordsIsEmpty() { traceManager.clearStack(); assertTrue(traceManager.getCurrentSpanAndParent().spanId().isEmpty()); } + + @Test + public void initTraceIfNeeded_setsRootAgentNameFromContext() { + assertEquals(TraceManager.DEFAULT_ROOT_AGENT_NAME, traceManager.getRootAgentName()); + traceManager.initTraceIfNeeded(mockContext); + assertEquals("test-agent", traceManager.getRootAgentName()); + } + + @Test + public void initTraceIfNeeded_nullAgent_keepsSentinel() { + InvocationContext ctx = mock(InvocationContext.class); + when(ctx.agent()).thenReturn(null); + traceManager.initTraceIfNeeded(ctx); + assertEquals(TraceManager.DEFAULT_ROOT_AGENT_NAME, traceManager.getRootAgentName()); + } } From 3dfcdb883c1fe9a5b6aa8b6e4e74d7ce1fb81aba Mon Sep 17 00:00:00 2001 From: Haiyuan Cao Date: Wed, 1 Jul 2026 16:27:29 -0700 Subject: [PATCH 4/4] Address review: align STATE_DELTA tests with empty-delta skip, warn on STRUCT mode drift - Tests no longer expect a STATE_DELTA row for events that never set a state delta; onEventCallback_populatesCorrectFields now sets a real stateDelta and asserts it lands in attributes, and a new test pins down that empty deltas emit no row. - schemaFieldsMatch now checks mode drift on STRUCT columns themselves (e.g. NULLABLE vs REPEATED content_parts) before recursing into subfields, via a shared warnOnIncompatibleDrift helper; added a regression test capturing the warning. Co-Authored-By: Claude Fable 5 --- .../plugins/agentanalytics/BigQueryUtils.java | 52 ++++++----- .../BigQueryAgentAnalyticsPluginTest.java | 86 ++++++++++++++----- 2 files changed, 92 insertions(+), 46 deletions(-) diff --git a/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryUtils.java b/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryUtils.java index 47ce486b0..1c8760057 100644 --- a/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryUtils.java +++ b/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryUtils.java @@ -232,7 +232,9 @@ static void createAnalyticsViews(BigQuery bigQuery, BigQueryLoggerConfig config) } } - /** Adds missing columns to an existing table if the actual schema is behind the desired schema. */ + /** + * Adds missing columns to an existing table if the actual schema is behind the desired schema. + */ static boolean maybeUpgradeSchema(BigQuery bigQuery, Table existingTable) { // Always diff the actual table schema against the desired schema rather than trusting the // stored version label alone: a table stamped with the current label can still be missing @@ -310,6 +312,9 @@ private static SchemaDiff schemaFieldsMatch(FieldList existing, FieldList desire } else if (desiredField.getType().getStandardType().equals(StandardSQLTypeName.STRUCT) && existingField.getType().getStandardType().equals(StandardSQLTypeName.STRUCT) && desiredField.getSubFields() != null) { + // Mode drift on the STRUCT column itself (e.g. NULLABLE vs REPEATED) is just as + // un-upgradeable as on a scalar; check it before recursing into subfields. + warnOnIncompatibleDrift(existingField, desiredField); SchemaDiff subDiff = schemaFieldsMatch(existingField.getSubFields(), desiredField.getSubFields()); @@ -333,33 +338,34 @@ private static SchemaDiff schemaFieldsMatch(FieldList existing, FieldList desire .build()); } } else { - // Additive auto-upgrade cannot reconcile a type or mode change on an existing column - // (including nested non-STRUCT fields, since this method recurses into STRUCTs). Surface - // it instead of silently ignoring it, since it otherwise appears later as opaque Storage - // Write append failures. - boolean typeDrift = - !desiredField - .getType() - .getStandardType() - .equals(existingField.getType().getStandardType()); - boolean modeDrift = !modesEqual(existingField.getMode(), desiredField.getMode()); - if (typeDrift || modeDrift) { - logger.warning( - String.format( - "Incompatible schema drift on column '%s': table has %s/%s but the plugin expects" - + " %s/%s. This cannot be auto-upgraded; writes may fail until the column is" - + " fixed manually.", - desiredField.getName(), - existingField.getType().getStandardType(), - normalizeMode(existingField.getMode()), - desiredField.getType().getStandardType(), - normalizeMode(desiredField.getMode()))); - } + warnOnIncompatibleDrift(existingField, desiredField); } } return new SchemaDiff(ImmutableList.copyOf(newFields), ImmutableList.copyOf(updatedRecords)); } + // Additive auto-upgrade cannot reconcile a type or mode change on an existing column + // (including nested non-STRUCT fields, since schemaFieldsMatch recurses into STRUCTs). Surface + // it instead of silently ignoring it, since it otherwise appears later as opaque Storage + // Write append failures. + private static void warnOnIncompatibleDrift(Field existingField, Field desiredField) { + boolean typeDrift = + !desiredField.getType().getStandardType().equals(existingField.getType().getStandardType()); + boolean modeDrift = !modesEqual(existingField.getMode(), desiredField.getMode()); + if (typeDrift || modeDrift) { + logger.warning( + String.format( + "Incompatible schema drift on column '%s': table has %s/%s but the plugin expects" + + " %s/%s. This cannot be auto-upgraded; writes may fail until the column is" + + " fixed manually.", + desiredField.getName(), + existingField.getType().getStandardType(), + normalizeMode(existingField.getMode()), + desiredField.getType().getStandardType(), + normalizeMode(desiredField.getMode()))); + } + } + // BigQuery leaves Field.getMode() null to mean NULLABLE; normalize before comparing. private static Field.Mode normalizeMode(Field.Mode mode) { return mode == null ? Field.Mode.NULLABLE : mode; diff --git a/core/src/test/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPluginTest.java b/core/src/test/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPluginTest.java index a4def5bf7..d2a040647 100644 --- a/core/src/test/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPluginTest.java +++ b/core/src/test/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPluginTest.java @@ -29,6 +29,7 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -545,6 +546,7 @@ public void onEventCallback_populatesCorrectFields() throws Exception { Event event = Event.builder() .author("agent_author") + .actions(EventActions.builder().stateDelta(ImmutableMap.of("key", "new_value")).build()) .content(Content.fromParts(Part.fromText("event content"))) .build(); @@ -556,10 +558,22 @@ public void onEventCallback_populatesCorrectFields() throws Exception { assertEquals("agent_name", row.get("agent")); ObjectNode attributes = (ObjectNode) row.get("attributes"); assertEquals("agent_author", attributes.get("author").asText()); + assertEquals("new_value", attributes.get("state_delta").get("key").asText()); assertTrue(row.get("content").toString().contains("event content")); assertEquals(false, row.get("is_truncated")); } + @Test + public void onEventCallback_emptyStateDelta_doesNotEmitStateDelta() throws Exception { + Event event = Event.builder().author("agent_author").build(); + + plugin.onEventCallback(mockInvocationContext, event).blockingSubscribe(); + + assertNull( + "No STATE_DELTA row should be emitted for an empty state delta", + state.getBatchProcessor("invocation_id").queue.poll()); + } + @Test public void onEventCallback_withA2AMetadata_emitsA2AInteraction() throws Exception { Event event = @@ -581,10 +595,6 @@ public void onEventCallback_withA2AMetadata_emitsA2AInteraction() throws Excepti .toArray(new CompletableFuture[0])) .join(); - Map stateDeltaRow = state.getBatchProcessor("invocation_id").queue.poll(); - assertNotNull(stateDeltaRow); - assertEquals("STATE_DELTA", stateDeltaRow.get("event_type")); - Map a2aRow = state.getBatchProcessor("invocation_id").queue.poll(); assertNotNull("A2A_INTERACTION row not found in queue", a2aRow); assertEquals("A2A_INTERACTION", a2aRow.get("event_type")); @@ -624,10 +634,6 @@ public void onEventCallback_agentResponse_emitsAgentResponse() throws Exception .toArray(new CompletableFuture[0])) .join(); - Map stateDeltaRow = state.getBatchProcessor("invocation_id").queue.poll(); - assertNotNull(stateDeltaRow); - assertEquals("STATE_DELTA", stateDeltaRow.get("event_type")); - Map agentResponseRow = state.getBatchProcessor("invocation_id").queue.poll(); assertNotNull("AGENT_RESPONSE row not found in queue", agentResponseRow); assertEquals("AGENT_RESPONSE", agentResponseRow.get("event_type")); @@ -670,10 +676,6 @@ public void onEventCallback_skipSummarizationAndFunctionCall_doesNotEmitAgentRes .toArray(new CompletableFuture[0])) .join(); - Map stateDeltaRow = state.getBatchProcessor("invocation_id").queue.poll(); - assertNotNull(stateDeltaRow); - assertEquals("STATE_DELTA", stateDeltaRow.get("event_type")); - Map nextRow = state.getBatchProcessor("invocation_id").queue.poll(); assertNull("No AGENT_RESPONSE row should be emitted", nextRow); } @@ -698,10 +700,6 @@ public void onEventCallback_longRunningToolIdsPresent_doesNotEmitAgentResponse() .toArray(new CompletableFuture[0])) .join(); - Map stateDeltaRow = state.getBatchProcessor("invocation_id").queue.poll(); - assertNotNull(stateDeltaRow); - assertEquals("STATE_DELTA", stateDeltaRow.get("event_type")); - Map nextRow = state.getBatchProcessor("invocation_id").queue.poll(); assertNull("No AGENT_RESPONSE row should be emitted", nextRow); } @@ -725,9 +723,6 @@ public void onEventCallback_withA2ARequestOnlyMetadata_emitsA2AInteraction() thr .toArray(new CompletableFuture[0])) .join(); - Map stateDeltaRow = state.getBatchProcessor("invocation_id").queue.poll(); - assertNotNull(stateDeltaRow); - Map a2aRow = state.getBatchProcessor("invocation_id").queue.poll(); assertNotNull("A2A_INTERACTION row not found in queue", a2aRow); assertEquals("A2A_INTERACTION", a2aRow.get("event_type")); @@ -781,10 +776,6 @@ protected StreamWriter createWriter() { .toArray(new CompletableFuture[0])) .join(); - // Consume STATE_DELTA - Map stateDeltaRow = customState.getBatchProcessor("invocation_id").queue.poll(); - assertNotNull(stateDeltaRow); - // Get AGENT_RESPONSE Map agentResponseRow = customState.getBatchProcessor("invocation_id").queue.poll(); @@ -1173,6 +1164,55 @@ public void maybeUpgradeSchema_addsNewNestedField() throws Exception { verify(mockBigQuery).update(any(Table.class)); } + @Test + public void maybeUpgradeSchema_warnsOnStructModeDrift() throws Exception { + Table mockTable = mock(Table.class); + when(mockTable.getTableId()).thenReturn(TableId.of("project", "dataset", "table")); + when(mockTable.getLabels()).thenReturn(ImmutableMap.of()); + + // Existing table has 'content_parts' as a NULLABLE STRUCT instead of the expected REPEATED + ImmutableList initialFields = + BigQuerySchema.getEventsSchema().getFields().stream() + .map( + f -> + f.getName().equals("content_parts") + ? f.toBuilder() + .setMode(com.google.cloud.bigquery.Field.Mode.NULLABLE) + .build() + : f) + .collect(toImmutableList()); + + StandardTableDefinition tableDefinition = + StandardTableDefinition.newBuilder() + .setSchema(com.google.cloud.bigquery.Schema.of(initialFields)) + .build(); + when(mockTable.getDefinition()).thenReturn(tableDefinition); + + Logger logger = Logger.getLogger(BigQueryUtils.class.getName()); + Handler mockLogHandler = mock(Handler.class); + logger.addHandler(mockLogHandler); + try { + BigQueryUtils.maybeUpgradeSchema(mockBigQuery, mockTable); + } finally { + logger.removeHandler(mockLogHandler); + } + + ArgumentCaptor captor = ArgumentCaptor.forClass(LogRecord.class); + verify(mockLogHandler, atLeastOnce()).publish(captor.capture()); + assertTrue( + "Should have warned about STRUCT mode drift on content_parts", + captor.getAllValues().stream() + .anyMatch( + record -> + Objects.equals(record.getLevel(), Level.WARNING) + && record + .getMessage() + .contains("Incompatible schema drift on column 'content_parts'"))); + + // Mode drift alone is not auto-upgradeable, so no table update should be attempted. + verify(mockBigQuery, never()).update(any(Table.class)); + } + @Test public void createAnalyticsViews_executesQueries() throws Exception { BigQueryUtils.createAnalyticsViews(mockBigQuery, config);