Skip to content

Commit fd9ee13

Browse files
committed
Replace long callback interval with request polling handled in renderer.
1 parent 3a207ce commit fd9ee13

File tree

12 files changed

+571
-319
lines changed

12 files changed

+571
-319
lines changed

dash/_callback.py

Lines changed: 194 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
import collections
22
from functools import wraps
33

4+
import flask
5+
46
from .dependencies import (
57
handle_callback_args,
68
handle_grouped_callback_args,
79
Output,
810
)
9-
from .exceptions import PreventUpdate
11+
from .exceptions import PreventUpdate, WildcardInLongCallback, DuplicateCallback
1012

1113
from ._grouping import (
1214
flatten_grouping,
@@ -17,9 +19,11 @@
1719
create_callback_id,
1820
stringify_id,
1921
to_json,
22+
coerce_to_list,
2023
)
2124

2225
from . import _validate
26+
from .long_callback.managers import BaseLongCallbackManager
2327

2428

2529
class NoUpdate:
@@ -30,15 +34,28 @@ def to_plotly_json(self): # pylint: disable=no-self-use
3034

3135
@staticmethod
3236
def is_no_update(obj):
33-
return obj == {"_dash_no_update": "_dash_no_update"}
37+
return isinstance(obj, NoUpdate) or obj == {
38+
"_dash_no_update": "_dash_no_update"
39+
}
3440

3541

3642
GLOBAL_CALLBACK_LIST = []
3743
GLOBAL_CALLBACK_MAP = {}
3844
GLOBAL_INLINE_SCRIPTS = []
3945

4046

41-
def callback(*_args, **_kwargs):
47+
def callback(
48+
*_args,
49+
long=False,
50+
long_interval=1000,
51+
long_progress=None,
52+
long_progress_default=None,
53+
long_running=None,
54+
long_cancel=None,
55+
long_manager=None,
56+
long_cache_args_to_ignore=None,
57+
**_kwargs,
58+
):
4259
"""
4360
Normally used as a decorator, `@dash.callback` provides a server-side
4461
callback relating the values of one or more `Output` items to one or
@@ -56,15 +73,79 @@ def callback(*_args, **_kwargs):
5673
not to fire when its outputs are first added to the page. Defaults to
5774
`False` and unlike `app.callback` is not configurable at the app level.
5875
"""
76+
77+
long_spec = None
78+
79+
if long:
80+
long_spec = {
81+
"interval": long_interval,
82+
}
83+
84+
if long_manager:
85+
long_spec["manager"] = long_manager
86+
87+
if long_progress:
88+
long_spec["progress"] = coerce_to_list(long_progress)
89+
validate_long_inputs(long_spec["progress"])
90+
91+
if long_progress_default:
92+
long_spec["progressDefault"] = coerce_to_list(long_progress_default)
93+
94+
if not len(long_spec["progress"]) == len(long_spec["progressDefault"]):
95+
raise Exception(
96+
"Progress and progress default needs to be of same length"
97+
)
98+
99+
if long_running:
100+
long_spec["running"] = coerce_to_list(long_running)
101+
validate_long_inputs(x[0] for x in long_spec["running"])
102+
103+
if long_cancel:
104+
cancel_inputs = coerce_to_list(long_cancel)
105+
validate_long_inputs(cancel_inputs)
106+
107+
cancels_output = [Output(c.component_id, "id") for c in cancel_inputs]
108+
109+
try:
110+
111+
@callback(cancels_output, cancel_inputs, prevent_initial_call=True)
112+
def cancel_call(*_):
113+
job_ids = flask.request.args.getlist("cancelJob")
114+
manager = long_manager or flask.g.long_callback_manager
115+
if job_ids:
116+
for job_id in job_ids:
117+
manager.terminate_job(int(job_id))
118+
return NoUpdate()
119+
120+
except DuplicateCallback:
121+
pass # Already a callback to cancel, will get the proper jobs from the store.
122+
123+
long_spec["cancel"] = [c.to_dict() for c in cancel_inputs]
124+
125+
if long_cache_args_to_ignore:
126+
long_spec["cache_args_to_ignore"] = long_cache_args_to_ignore
127+
59128
return register_callback(
60129
GLOBAL_CALLBACK_LIST,
61130
GLOBAL_CALLBACK_MAP,
62131
False,
63132
*_args,
64133
**_kwargs,
134+
long=long_spec,
65135
)
66136

67137

138+
def validate_long_inputs(deps):
139+
for dep in deps:
140+
if dep.has_wildcard():
141+
raise WildcardInLongCallback(
142+
f"""
143+
long callbacks does not support dependencies with
144+
pattern-matching ids
145+
Received: {repr(dep)}\n"""
146+
)
147+
148+
68149
def clientside_callback(clientside_function, *args, **kwargs):
69150
return register_clientside_callback(
70151
GLOBAL_CALLBACK_LIST,
@@ -87,6 +168,7 @@ def insert_callback(
87168
state,
88169
inputs_state_indices,
89170
prevent_initial_call,
171+
long=None,
90172
):
91173
if prevent_initial_call is None:
92174
prevent_initial_call = config_prevent_initial_callbacks
@@ -98,19 +180,26 @@ def insert_callback(
98180
"state": [c.to_dict() for c in state],
99181
"clientside_function": None,
100182
"prevent_initial_call": prevent_initial_call,
183+
"long": long
184+
and {
185+
"interval": long["interval"],
186+
},
101187
}
188+
102189
callback_map[callback_id] = {
103190
"inputs": callback_spec["inputs"],
104191
"state": callback_spec["state"],
105192
"outputs_indices": outputs_indices,
106193
"inputs_state_indices": inputs_state_indices,
194+
"long": long,
107195
}
108196
callback_list.append(callback_spec)
109197

110198
return callback_id
111199

112200

113-
def register_callback(
201+
# pylint: disable=R0912
202+
def register_callback( # pylint: disable=R0914
114203
callback_list, callback_map, config_prevent_initial_callbacks, *_args, **_kwargs
115204
):
116205
(
@@ -129,6 +218,8 @@ def register_callback(
129218
insert_output = flatten_grouping(output)
130219
multi = True
131220

221+
long = _kwargs.get("long")
222+
132223
output_indices = make_grouping_by_index(output, list(range(grouping_len(output))))
133224
callback_id = insert_callback(
134225
callback_list,
@@ -140,23 +231,118 @@ def register_callback(
140231
flat_state,
141232
inputs_state_indices,
142233
prevent_initial_call,
234+
long=long,
143235
)
144236

145237
# pylint: disable=too-many-locals
146238
def wrap_func(func):
239+
240+
if long is not None:
241+
long_key = BaseLongCallbackManager.register_func(
242+
func, long.get("progress") is not None
243+
)
244+
147245
@wraps(func)
148246
def add_context(*args, **kwargs):
149247
output_spec = kwargs.pop("outputs_list")
248+
callback_manager = long.get(
249+
"manager", kwargs.pop("long_callback_manager", None)
250+
)
150251
_validate.validate_output_spec(insert_output, output_spec, Output)
151252

152253
func_args, func_kwargs = _validate.validate_and_group_input_args(
153254
args, inputs_state_indices
154255
)
155256

156-
# don't touch the comment on the next line - used by debugger
157-
output_value = func(*func_args, **func_kwargs) # %% callback invoked %%
257+
response = {"multi": True}
258+
259+
if long is not None:
260+
progress_outputs = long.get("progress")
261+
cache_key = flask.request.args.get("cacheKey")
262+
job_id = flask.request.args.get("job")
263+
264+
current_key = callback_manager.build_cache_key(
265+
func,
266+
# Inputs provided as dict is kwargs.
267+
func_args if func_args else func_kwargs,
268+
long.get("cache_args_to_ignore", []),
269+
)
270+
271+
if not cache_key:
272+
cache_key = current_key
273+
274+
job_fn = callback_manager.func_registry.get(long_key)
275+
276+
job = callback_manager.call_job_fn(
277+
cache_key,
278+
job_fn,
279+
args,
280+
)
281+
282+
data = {
283+
"cacheKey": cache_key,
284+
"job": job,
285+
}
286+
287+
running = long.get("running")
288+
289+
if running:
290+
data["running"] = {str(r[0]): r[1] for r in running}
291+
data["runningOff"] = {str(r[0]): r[2] for r in running}
292+
cancel = long.get("cancel")
293+
if cancel:
294+
data["cancel"] = cancel
295+
296+
progress_default = long.get("progressDefault")
297+
if progress_default:
298+
data["progressDefault"] = {
299+
str(o): x
300+
for o, x in zip(progress_outputs, progress_default)
301+
}
302+
return to_json(data)
303+
else:
304+
if progress_outputs:
305+
# Get the progress before the result as it would be erased after the results.
306+
progress = callback_manager.get_progress(cache_key)
307+
if progress:
308+
response["progress"] = {
309+
str(x): progress[i]
310+
for i, x in enumerate(progress_outputs)
311+
}
312+
313+
output_value = callback_manager.get_result(cache_key, job_id)
314+
# Must get job_running after get_result since get_results terminates it.
315+
job_running = callback_manager.job_running(job_id)
316+
if not job_running and output_value is callback_manager.UNDEFINED:
317+
# Job canceled -> no output to close the loop.
318+
output_value = NoUpdate()
319+
320+
elif (
321+
isinstance(output_value, dict)
322+
and "long_callback_error" in output_value
323+
):
324+
error = output_value.get("long_callback_error")
325+
raise Exception(
326+
f"An error occurred inside a long callback: {error['msg']}\n{error['tb']}"
327+
)
328+
329+
if job_running and output_value is not callback_manager.UNDEFINED:
330+
# cached results.
331+
callback_manager.terminate_job(job_id)
332+
333+
if multi and isinstance(output_value, (list, tuple)):
334+
output_value = [
335+
NoUpdate() if NoUpdate.is_no_update(r) else r
336+
for r in output_value
337+
]
338+
339+
if output_value is callback_manager.UNDEFINED:
340+
return to_json(response)
341+
else:
342+
# don't touch the comment on the next line - used by debugger
343+
output_value = func(*func_args, **func_kwargs) # %% callback invoked %%
158344

159-
if isinstance(output_value, NoUpdate):
345+
if NoUpdate.is_no_update(output_value):
160346
raise PreventUpdate
161347

162348
if not multi:
@@ -191,7 +377,7 @@ def add_context(*args, **kwargs):
191377
if not has_update:
192378
raise PreventUpdate
193379

194-
response = {"response": component_ids, "multi": True}
380+
response["response"] = component_ids
195381

196382
try:
197383
jsonResponse = to_json(response)

dash/_utils.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,3 +217,9 @@ def gen_salt(chars):
217217
return "".join(
218218
secrets.choice(string.ascii_letters + string.digits) for _ in range(chars)
219219
)
220+
221+
222+
def coerce_to_list(obj):
223+
if not isinstance(obj, (list, tuple)):
224+
return [obj]
225+
return obj

0 commit comments

Comments
 (0)