@@ -109,8 +109,23 @@ def test_propagation(enable_extended_tracing):
109109 len (from_inject_spans ) >= 2
110110 ) # "Expecting at least 2 spans from the injected trace exporter"
111111 gotNames = [span .name for span in from_inject_spans ]
112+
113+ # Check if multiplexed sessions are enabled
114+ import os
115+
116+ multiplexed_enabled = (
117+ os .getenv ("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS" , "" ).lower () == "true"
118+ )
119+
120+ # Determine expected session span name based on multiplexed sessions
121+ expected_session_span_name = (
122+ "CloudSpanner.CreateMultiplexedSession"
123+ if multiplexed_enabled
124+ else "CloudSpanner.CreateSession"
125+ )
126+
112127 wantNames = [
113- "CloudSpanner.CreateSession" ,
128+ expected_session_span_name ,
114129 "CloudSpanner.Snapshot.execute_sql" ,
115130 ]
116131 assert gotNames == wantNames
@@ -392,6 +407,7 @@ def tx_update(txn):
392407 reason = "Tracing requires OpenTelemetry" ,
393408)
394409def test_database_partitioned_error ():
410+ import os
395411 from opentelemetry .trace .status import StatusCode
396412
397413 db , trace_exporter = create_db_trace_exporter ()
@@ -402,43 +418,101 @@ def test_database_partitioned_error():
402418 pass
403419
404420 got_statuses , got_events = finished_spans_statuses (trace_exporter )
405- # Check for the series of events
406- want_events = [
407- ("Acquiring session" , {"kind" : "BurstyPool" }),
408- ("Waiting for a session to become available" , {"kind" : "BurstyPool" }),
409- ("No sessions available in pool. Creating session" , {"kind" : "BurstyPool" }),
410- ("Creating Session" , {}),
411- ("Starting BeginTransaction" , {}),
412- (
421+
422+ # Check if multiplexed sessions are enabled for partitioned operations
423+ multiplexed_partitioned_enabled = (
424+ os .getenv ("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS" ) == "true"
425+ )
426+
427+ # Define expected events based on whether multiplexed sessions are enabled
428+ if multiplexed_partitioned_enabled :
429+ # When multiplexed sessions are enabled for partitioned operations,
430+ # the execution path is different - sessions manager creates multiplexed sessions directly
431+ expected_event_names = [
432+ "Creating Session" ,
433+ "Using session" ,
434+ "Starting BeginTransaction" ,
435+ "Returning session" ,
413436 "exception" ,
414- {
415- "exception.type" : "google.api_core.exceptions.InvalidArgument" ,
416- "exception.message" : "400 Table not found: NonExistent [at 1:8]\n UPDATE NonExistent SET name = 'foo' WHERE id > 1\n ^" ,
417- "exception.stacktrace" : "EPHEMERAL" ,
418- "exception.escaped" : "False" ,
419- },
420- ),
421- (
422437 "exception" ,
423- {
424- "exception.type" : "google.api_core.exceptions.InvalidArgument" ,
425- "exception.message" : "400 Table not found: NonExistent [at 1:8]\n UPDATE NonExistent SET name = 'foo' WHERE id > 1\n ^" ,
426- "exception.stacktrace" : "EPHEMERAL" ,
427- "exception.escaped" : "False" ,
428- },
429- ),
430- ]
431- assert got_events == want_events
438+ ]
439+ # Check that we have the expected events
440+ assert len (got_events ) == len (expected_event_names )
441+ for i , expected_name in enumerate (expected_event_names ):
442+ assert got_events [i ][0 ] == expected_name
443+
444+ # Verify session usage event shows multiplexed session
445+ assert got_events [1 ][1 ]["multiplexed" ] is True
446+
447+ # Verify session return event shows multiplexed session
448+ assert got_events [3 ][1 ]["multiplexed" ] is True
449+
450+ # Verify the exception details
451+ for i in [4 , 5 ]: # Both exception events
452+ assert (
453+ got_events [i ][1 ]["exception.type" ]
454+ == "google.api_core.exceptions.InvalidArgument"
455+ )
456+ assert (
457+ "Table not found: NonExistent" in got_events [i ][1 ]["exception.message" ]
458+ )
459+ else :
460+ # When multiplexed sessions are disabled, sessions manager still manages sessions
461+ # but uses regular pool sessions instead of multiplexed sessions
462+ expected_event_names = [
463+ "Acquiring session" ,
464+ "Waiting for a session to become available" ,
465+ "No sessions available in pool. Creating session" ,
466+ "Creating Session" ,
467+ "Using session" ,
468+ "Starting BeginTransaction" ,
469+ "Returning session" ,
470+ "exception" ,
471+ "exception" ,
472+ ]
432473
433- # Check for the statues.
474+ # Check that we have the expected events
475+ assert len (got_events ) == len (expected_event_names )
476+ for i , expected_name in enumerate (expected_event_names ):
477+ assert got_events [i ][0 ] == expected_name
478+
479+ # Verify pool-related events
480+ assert got_events [0 ][1 ]["kind" ] == "BurstyPool"
481+ assert got_events [1 ][1 ]["kind" ] == "BurstyPool"
482+ assert got_events [2 ][1 ]["kind" ] == "BurstyPool"
483+
484+ # Verify session usage event shows non-multiplexed session
485+ assert got_events [4 ][1 ]["multiplexed" ] is False
486+
487+ # Verify session return event shows non-multiplexed session
488+ assert got_events [6 ][1 ]["multiplexed" ] is False
489+
490+ # Verify the exception details
491+ for i in [7 , 8 ]: # Both exception events
492+ assert (
493+ got_events [i ][1 ]["exception.type" ]
494+ == "google.api_core.exceptions.InvalidArgument"
495+ )
496+ assert (
497+ "Table not found: NonExistent" in got_events [i ][1 ]["exception.message" ]
498+ )
499+
500+ # Check for the statuses.
434501 codes = StatusCode
502+
503+ # Determine expected session creation span name based on multiplexed sessions
504+ expected_session_span_name = (
505+ "CloudSpanner.CreateMultiplexedSession"
506+ if multiplexed_partitioned_enabled
507+ else "CloudSpanner.CreateSession"
508+ )
435509 want_statuses = [
436510 (
437511 "CloudSpanner.Database.execute_partitioned_pdml" ,
438512 codes .ERROR ,
439513 "InvalidArgument: 400 Table not found: NonExistent [at 1:8]\n UPDATE NonExistent SET name = 'foo' WHERE id > 1\n ^" ,
440514 ),
441- ("CloudSpanner.CreateSession" , codes .OK , None ),
515+ (expected_session_span_name , codes .OK , None ),
442516 (
443517 "CloudSpanner.ExecuteStreamingSql" ,
444518 codes .ERROR ,
0 commit comments