44from datetime import timedelta
55from typing import Any
66
7+ import sentry_sdk
78from celery import Task
89from django .utils import timezone
910
@@ -512,21 +513,24 @@ def process_delayed_workflows(
512513 """
513514 Grab workflows, groups, and data condition groups from the Redis buffer, evaluate the "slow" conditions in a bulk snuba query, and fire them if they pass
514515 """
515- project = fetch_project (project_id )
516- if not project :
517- return
516+ with sentry_sdk .start_span (op = "delayed_workflow.prepare_data" ):
517+ project = fetch_project (project_id )
518+ if not project :
519+ return
518520
519- workflow_event_dcg_data = fetch_group_to_event_data (project_id , Workflow , batch_key )
521+ workflow_event_dcg_data = fetch_group_to_event_data (project_id , Workflow , batch_key )
520522
521- # Get mappings from DataConditionGroups to other info
522- dcg_to_groups , trigger_group_to_dcg_model = get_dcg_group_workflow_detector_data (
523- workflow_event_dcg_data
524- )
525- dcg_to_workflow = trigger_group_to_dcg_model [DataConditionHandler .Group .WORKFLOW_TRIGGER ].copy ()
526- dcg_to_workflow .update (trigger_group_to_dcg_model [DataConditionHandler .Group .ACTION_FILTER ])
523+ # Get mappings from DataConditionGroups to other info
524+ dcg_to_groups , trigger_group_to_dcg_model = get_dcg_group_workflow_detector_data (
525+ workflow_event_dcg_data
526+ )
527+ dcg_to_workflow = trigger_group_to_dcg_model [
528+ DataConditionHandler .Group .WORKFLOW_TRIGGER
529+ ].copy ()
530+ dcg_to_workflow .update (trigger_group_to_dcg_model [DataConditionHandler .Group .ACTION_FILTER ])
527531
528- _ , workflows_to_envs = fetch_workflows_envs (list (dcg_to_workflow .values ()))
529- data_condition_groups = fetch_data_condition_groups (list (dcg_to_groups .keys ()))
532+ _ , workflows_to_envs = fetch_workflows_envs (list (dcg_to_workflow .values ()))
533+ data_condition_groups = fetch_data_condition_groups (list (dcg_to_groups .keys ()))
530534
531535 logger .info (
532536 "delayed_workflow.workflows" ,
@@ -537,10 +541,11 @@ def process_delayed_workflows(
537541 },
538542 )
539543
540- # Get unique query groups to query Snuba
541- condition_groups = get_condition_query_groups (
542- data_condition_groups , dcg_to_groups , dcg_to_workflow , workflows_to_envs
543- )
544+ with sentry_sdk .start_span (op = "delayed_workflow.get_condition_query_groups" ):
545+ # Get unique query groups to query Snuba
546+ condition_groups = get_condition_query_groups (
547+ data_condition_groups , dcg_to_groups , dcg_to_workflow , workflows_to_envs
548+ )
544549
545550 if not condition_groups :
546551 return
@@ -556,7 +561,8 @@ def process_delayed_workflows(
556561 "project_id" : project_id ,
557562 },
558563 )
559- condition_group_results = get_condition_group_results (condition_groups )
564+ with sentry_sdk .start_span (op = "delayed_workflow.get_condition_group_results" ):
565+ condition_group_results = get_condition_group_results (condition_groups )
560566
561567 serialized_results = {
562568 str (query ): count_dict for query , count_dict in condition_group_results .items ()
@@ -566,30 +572,40 @@ def process_delayed_workflows(
566572 extra = {"condition_group_results" : serialized_results , "project_id" : project_id },
567573 )
568574
569- # Evaluate DCGs
570- groups_to_dcgs = get_groups_to_fire (
571- data_condition_groups ,
572- workflows_to_envs ,
573- dcg_to_workflow ,
574- dcg_to_groups ,
575- condition_group_results ,
576- )
575+ with sentry_sdk .start_span (op = "delayed_workflow.get_groups_to_fire" ):
576+ # Evaluate DCGs
577+ groups_to_dcgs = get_groups_to_fire (
578+ data_condition_groups ,
579+ workflows_to_envs ,
580+ dcg_to_workflow ,
581+ dcg_to_groups ,
582+ condition_group_results ,
583+ )
577584
578585 logger .info (
579586 "delayed_workflow.groups_to_fire" ,
580587 extra = {"groups_to_dcgs" : groups_to_dcgs , "project_id" : project_id },
581588 )
582589
583- dcg_group_to_event_data , event_ids , occurrence_ids = parse_dcg_group_event_data (
584- workflow_event_dcg_data , groups_to_dcgs
585- )
586- group_to_groupevent = get_group_to_groupevent (
587- dcg_group_to_event_data , list (groups_to_dcgs .keys ()), event_ids , occurrence_ids , project_id
588- )
590+ with sentry_sdk .start_span (op = "delayed_workflow.get_group_to_groupevent" ):
591+ dcg_group_to_event_data , event_ids , occurrence_ids = parse_dcg_group_event_data (
592+ workflow_event_dcg_data , groups_to_dcgs
593+ )
594+ group_to_groupevent = get_group_to_groupevent (
595+ dcg_group_to_event_data ,
596+ list (groups_to_dcgs .keys ()),
597+ event_ids ,
598+ occurrence_ids ,
599+ project_id ,
600+ )
589601
590- fire_actions_for_groups (groups_to_dcgs , trigger_group_to_dcg_model , group_to_groupevent )
602+ with sentry_sdk .start_span (op = "delayed_workflow.fire_actions" ):
603+ fire_actions_for_groups (groups_to_dcgs , trigger_group_to_dcg_model , group_to_groupevent )
591604
592- cleanup_redis_buffer (project_id , workflow_event_dcg_data , batch_key )
605+ with sentry_sdk .start_span (
606+ op = "delayed_workflow.cleanup_redis_buffer" ,
607+ ):
608+ cleanup_redis_buffer (project_id , workflow_event_dcg_data , batch_key )
593609
594610
595611@delayed_processing_registry .register ("delayed_workflow" )
0 commit comments