Skip to content

Commit c9e8bf7

Browse files
committed
Merge remote-tracking branch 'upstream/master'
2 parents 7c777fe + 0c2076c commit c9e8bf7

File tree

72 files changed

+2658
-397
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

72 files changed

+2658
-397
lines changed

samza-api/src/main/java/org/apache/samza/application/SamzaApplication.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,19 +23,29 @@
2323

2424

2525
/**
26-
* The base interface for all user-implemented applications in Samza.
26+
* A {@link SamzaApplication} describes the inputs, outputs, state, configuration and the logic
27+
* for processing data from one or more streaming sources.
2728
* <p>
28-
* The main processing logic of the user application should be implemented in {@link SamzaApplication#describe(ApplicationDescriptor)}
29-
* method. Sub-classes {@link StreamApplication} and {@link TaskApplication} are specific interfaces for applications
30-
* written in high-level DAG and low-level task APIs, respectively.
29+
* This is the base {@link SamzaApplication}. Implement a {@link StreamApplication} to describe the
30+
* processing logic using Samza's High Level API in terms of {@link org.apache.samza.operators.MessageStream}
31+
* operators, or a {@link TaskApplication} to describe it using Samza's Low Level API in terms of per-message
32+
* processing logic.
33+
* <p>
34+
* A {@link SamzaApplication} implementation must have a no-argument constructor, which will be used by the framework
35+
* to create new instances and call {@link #describe(ApplicationDescriptor)}.
36+
* <p>
37+
* Per container context may be managed using {@link org.apache.samza.context.ApplicationContainerContext} and
38+
* set using {@link ApplicationDescriptor#withApplicationContainerContextFactory}. Similarly, per task context
39+
* may be managed using {@link org.apache.samza.context.ApplicationTaskContext} and set using
40+
* {@link ApplicationDescriptor#withApplicationTaskContextFactory}.
3141
*/
3242
@InterfaceStability.Evolving
3343
public interface SamzaApplication<S extends ApplicationDescriptor> {
3444

3545
/**
36-
* Describes the user processing logic via {@link ApplicationDescriptor}
46+
* Describes the inputs, outputs, state, configuration and processing logic using the provided {@code appDescriptor}.
3747
*
38-
* @param appDesc the {@link ApplicationDescriptor} object to describe user application logic
48+
* @param appDescriptor the {@link ApplicationDescriptor} to use for describing the application.
3949
*/
40-
void describe(S appDesc);
50+
void describe(S appDescriptor);
4151
}

samza-api/src/main/java/org/apache/samza/application/StreamApplication.java

Lines changed: 35 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -21,56 +21,59 @@
2121
import org.apache.samza.annotation.InterfaceStability;
2222
import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
2323

24+
2425
/**
25-
* Describes and initializes the transforms for processing message streams and generating results in high-level API.
26+
* A {@link StreamApplication} describes the inputs, outputs, state, configuration and the processing logic
27+
* in Samza's High Level API.
28+
* <p>
29+
* A typical {@link StreamApplication} implementation consists of the following stages:
30+
* <ol>
31+
* <li>Configuring the inputs, outputs and state (tables) using the appropriate
32+
* {@link org.apache.samza.system.descriptors.SystemDescriptor}s,
33+
* {@link org.apache.samza.system.descriptors.InputDescriptor}s,
34+
* {@link org.apache.samza.system.descriptors.OutputDescriptor}s and
35+
* {@link org.apache.samza.table.descriptors.TableDescriptor}s
36+
* <li>Obtaining the corresponding
37+
* {@link org.apache.samza.operators.MessageStream}s,
38+
* {@link org.apache.samza.operators.OutputStream}s and
39+
* {@link org.apache.samza.table.Table}s from the provided {@link StreamApplicationDescriptor}.
40+
* <li>Defining the processing logic using operators and functions on the streams and tables thus obtained.
41+
* E.g., {@link org.apache.samza.operators.MessageStream#filter(org.apache.samza.operators.functions.FilterFunction)}
42+
* </ol>
2643
* <p>
27-
* The following example removes page views older than 1 hour from the input stream:
44+
* The following example {@link StreamApplication} removes page views older than 1 hour from the input stream:
2845
* <pre>{@code
2946
* public class PageViewFilter implements StreamApplication {
30-
* public void describe(StreamAppDescriptor appDesc) {
31-
* KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking");
47+
* public void describe(StreamApplicationDescriptor appDescriptor) {
48+
* KafkaSystemDescriptor trackingSystemDescriptor = new KafkaSystemDescriptor("tracking");
3249
* KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor =
33-
* trackingSystem.getInputDescriptor("pageViewEvent", new JsonSerdeV2<>(PageViewEvent.class));
34-
*
50+
* trackingSystemDescriptor.getInputDescriptor("pageViewEvent", new JsonSerdeV2<>(PageViewEvent.class));
3551
* KafkaOutputDescriptor<PageViewEvent>> outputStreamDescriptor =
36-
* trackingSystem.getOutputDescriptor("recentPageViewEvent", new JsonSerdeV2<>(PageViewEvent.class)));
52+
* trackingSystemDescriptor.getOutputDescriptor("recentPageViewEvent", new JsonSerdeV2<>(PageViewEvent.class)));
3753
*
38-
* MessageStream<PageViewEvent> pageViewEvents = appDesc.getInputStream(inputStreamDescriptor);
39-
* OutputStream<PageViewEvent> recentPageViewEvents = appDesc.getOutputStream(outputStreamDescriptor);
54+
* MessageStream<PageViewEvent> pageViewEvents = appDescriptor.getInputStream(inputStreamDescriptor);
55+
* OutputStream<PageViewEvent> recentPageViewEvents = appDescriptor.getOutputStream(outputStreamDescriptor);
4056
*
4157
* pageViewEvents
4258
* .filter(m -> m.getCreationTime() > System.currentTimeMillis() - Duration.ofHours(1).toMillis())
4359
* .sendTo(recentPageViewEvents);
4460
* }
4561
* }
4662
* }</pre>
47-
*<p>
48-
* The example above can be run using an ApplicationRunner:
49-
* <pre>{@code
50-
* public static void main(String[] args) {
51-
* CommandLine cmdLine = new CommandLine();
52-
* Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
53-
* PageViewFilter app = new PageViewFilter();
54-
* ApplicationRunner runner = ApplicationRunners.getApplicationRunner(app, config);
55-
* runner.run();
56-
* runner.waitForFinish();
57-
* }
58-
* }</pre>
59-
*
63+
* <p>
64+
* All operator function implementations used in a {@link StreamApplication} must be {@link java.io.Serializable}. Any
65+
* context required within an operator function may be managed by implementing the
66+
* {@link org.apache.samza.operators.functions.InitableFunction#init} and
67+
* {@link org.apache.samza.operators.functions.ClosableFunction#close} methods in the function implementation.
68+
* <p>
69+
* Functions may implement the {@link org.apache.samza.operators.functions.ScheduledFunction} interface
70+
* to schedule and receive periodic callbacks from the Samza framework.
6071
* <p>
6172
* Implementation Notes: Currently {@link StreamApplication}s are wrapped in a {@link org.apache.samza.task.StreamTask}
6273
* during execution. The execution planner will generate a serialized DAG which will be deserialized in each
6374
* {@link org.apache.samza.task.StreamTask} instance used for processing incoming messages. Execution is synchronous
64-
* and thread-safe within each {@link org.apache.samza.task.StreamTask}.
65-
*
66-
* <p>
67-
* A {@link StreamApplication} implementation must have a proper fully-qualified class name and a default constructor
68-
* with no parameters to ensure successful instantiation in both local and remote environments.
69-
* Functions implemented for transforms in StreamApplications ({@link org.apache.samza.operators.functions.MapFunction},
70-
* {@link org.apache.samza.operators.functions.FilterFunction} for e.g.) are initable and closable. They are initialized
71-
* before messages are delivered to them and closed after their execution when the {@link org.apache.samza.task.StreamTask}
72-
* instance is closed. See {@link org.apache.samza.operators.functions.InitableFunction} and {@link org.apache.samza.operators.functions.ClosableFunction}.
73-
* Function implementations are required to be {@link java.io.Serializable}.
75+
* and thread-safe within each {@link org.apache.samza.task.StreamTask}. Multiple tasks may process their
76+
* messages concurrently depending on the job parallelism configuration.
7477
*/
7578
@InterfaceStability.Evolving
7679
public interface StreamApplication extends SamzaApplication<StreamApplicationDescriptor> {

samza-api/src/main/java/org/apache/samza/application/TaskApplication.java

Lines changed: 27 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -23,64 +23,49 @@
2323

2424

2525
/**
26-
* Describes and initializes the transforms for processing message streams and generating results in low-level API. Your
27-
* application is expected to implement this interface.
26+
* A {@link TaskApplication} describes the inputs, outputs, state, configuration and the processing logic
27+
* in Samza's Low Level API.
28+
* A typical {@link TaskApplication} implementation consists of the following stages:
29+
* <ol>
30+
* <li>Configuring the inputs, outputs and state (tables) using the appropriate
31+
* {@link org.apache.samza.system.descriptors.SystemDescriptor}s,
32+
* {@link org.apache.samza.system.descriptors.StreamDescriptor}s and
33+
* {@link org.apache.samza.table.descriptors.TableDescriptor}s
34+
* <li>Adding these descriptors to the provided {@link TaskApplicationDescriptor}.
35+
* <li>Defining the processing logic by implementing a {@link org.apache.samza.task.StreamTask} or
36+
* {@link org.apache.samza.task.AsyncStreamTask} that operates on each
37+
* {@link org.apache.samza.system.IncomingMessageEnvelope} one at a time.
38+
* <li>Setting a {@link org.apache.samza.task.TaskFactory} using
39+
* {@link TaskApplicationDescriptor#setTaskFactory(org.apache.samza.task.TaskFactory)} that creates instances of the
40+
* task above. The {@link org.apache.samza.task.TaskFactory} implementation must be {@link java.io.Serializable}.
41+
* </ol>
2842
* <p>
29-
* The following example removes page views older than 1 hour from the input stream:
43+
* The following example {@link TaskApplication} removes page views older than 1 hour from the input stream:
3044
* <pre>{@code
3145
* public class PageViewFilter implements TaskApplication {
32-
* public void describe(TaskAppDescriptor appDesc) {
33-
* KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor(PageViewTask.SYSTEM);
46+
* public void describe(TaskApplicationDescriptor appDescriptor) {
47+
* KafkaSystemDescriptor trackingSystemDescriptor = new KafkaSystemDescriptor("tracking");
3448
* KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor =
35-
* trackingSystem.getInputDescriptor(PageViewTask.TASK_INPUT, new JsonSerdeV2<>(PageViewEvent.class));
36-
*
49+
* trackingSystemDescriptor.getInputDescriptor("pageViewEvent", new JsonSerdeV2<>(PageViewEvent.class));
3750
* KafkaOutputDescriptor<PageViewEvent>> outputStreamDescriptor =
38-
* trackingSystem.getOutputDescriptor(PageViewTask.TASK_OUTPUT, new JsonSerdeV2<>(PageViewEvent.class)));
51+
* trackingSystemDescriptor.getOutputDescriptor("recentPageViewEvent", new JsonSerdeV2<>(PageViewEvent.class)));
3952
*
40-
* appDesc.addInputStream(inputStreamDescriptor);
41-
* appDesc.addOutputStream(outputStreamDescriptor);
42-
* appDesc.setTaskFactory((StreamTaskFactory) () -> new PageViewTask());
53+
* appDescriptor.addInputStream(inputStreamDescriptor);
54+
* appDescriptor.addOutputStream(outputStreamDescriptor);
55+
* appDescriptor.setTaskFactory((StreamTaskFactory) () -> new PageViewTask());
4356
* }
4457
* }
4558
*
4659
* public class PageViewTask implements StreamTask {
47-
* final static String TASK_INPUT = "pageViewEvents";
48-
* final static String TASK_OUTPUT = "recentPageViewEvents";
49-
* final static String SYSTEM = "kafka";
50-
*
51-
* public void process(IncomingMessageEnvelope message, MessageCollector collector,
52-
* TaskCoordinator coordinator) {
60+
* public void process(IncomingMessageEnvelope message, MessageCollector collector, TaskCoordinator coordinator) {
5361
* PageViewEvent m = (PageViewEvent) message.getValue();
5462
* if (m.getCreationTime() > System.currentTimeMillis() - Duration.ofHours(1).toMillis()) {
55-
* collector.send(new OutgoingMessageEnvelope(new SystemStream(SYSTEM, TASK_OUTPUT),
56-
* message.getKey(), message.getKey(), m));
63+
* collector.send(new OutgoingMessageEnvelope(
64+
* new SystemStream("tracking", "recentPageViewEvent"), message.getKey(), message.getKey(), m));
5765
* }
5866
* }
5967
* }
6068
* }</pre>
61-
*
62-
*<p>
63-
* The example above can be run using an ApplicationRunner:
64-
* <pre>{@code
65-
* public static void main(String[] args) {
66-
* CommandLine cmdLine = new CommandLine();
67-
* Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
68-
* PageViewFilter app = new PageViewFilter();
69-
* ApplicationRunner runner = ApplicationRunners.getApplicationRunner(app, config);
70-
* runner.run();
71-
* runner.waitForFinish();
72-
* }
73-
* }</pre>
74-
*
75-
* <p>
76-
* Implementation Notes: {@link TaskApplication} allow users to instantiate {@link org.apache.samza.task.StreamTask} or
77-
* {@link org.apache.samza.task.AsyncStreamTask} when describing the processing logic. A new {@link TaskApplicationDescriptor }
78-
* instance will be created and described by the user-defined {@link TaskApplication} when planning the execution.
79-
* {@link org.apache.samza.task.TaskFactory} is required to be serializable.
80-
*
81-
* <p>
82-
* The user-implemented {@link TaskApplication} class must be a class with proper fully-qualified class name and
83-
* a default constructor with no parameters to ensure successful instantiation in both local and remote environments.
8469
*/
8570
@InterfaceStability.Evolving
8671
public interface TaskApplication extends SamzaApplication<TaskApplicationDescriptor> {

samza-api/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptor.java

Lines changed: 39 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -29,32 +29,42 @@
2929

3030

3131
/**
32-
* The interface class to describe the configuration, input and output streams, and processing logic in a
32+
* An {@link ApplicationDescriptor} contains the description of inputs, outputs, state, configuration and the
33+
* processing logic for a {@link org.apache.samza.application.SamzaApplication}.
34+
* <p>
35+
* This is the base {@link ApplicationDescriptor} and provides functionality common to all
3336
* {@link org.apache.samza.application.SamzaApplication}.
37+
* {@link org.apache.samza.application.StreamApplication#describe} will provide access to a
38+
* {@link StreamApplicationDescriptor} with additional functionality for describing High Level API applications.
39+
* Similarly, {@link org.apache.samza.application.TaskApplication#describe} will provide access to a
40+
* {@link TaskApplicationDescriptor} with additional functionality for describing Low Level API applications.
3441
* <p>
35-
* Sub-classes {@link StreamApplicationDescriptor} and {@link TaskApplicationDescriptor} are specific interfaces for
36-
* applications written in high-level {@link org.apache.samza.application.StreamApplication} and low-level
37-
* {@link org.apache.samza.application.TaskApplication} APIs, respectively.
38-
*
39-
* @param <S> sub-class of user application descriptor.
42+
* Use the {@link ApplicationDescriptor} to set the container scope context factory using
43+
* {@link ApplicationDescriptor#withApplicationContainerContextFactory}, and task scope context factory using
44+
* {@link ApplicationDescriptor#withApplicationTaskContextFactory}. Please note that the terms {@code container}
45+
* and {@code task} here refer to the units of physical and logical parallelism, not the programming API.
4046
*/
4147
@InterfaceStability.Evolving
4248
public interface ApplicationDescriptor<S extends ApplicationDescriptor> {
4349

4450
/**
45-
* Get the {@link Config} of the application
46-
* @return config of the application
51+
* Get the configuration for the application.
52+
* @return config for the application
4753
*/
4854
Config getConfig();
4955

5056
/**
51-
* Sets the default SystemDescriptor to use for the application. This is equivalent to setting
52-
* {@code job.default.system} and its properties in configuration.
57+
* Sets the {@link SystemDescriptor} for the default system for the application.
58+
* <p>
59+
* The default system is used by the framework for creating any internal (e.g., coordinator, changelog, checkpoint)
60+
* streams. In an {@link org.apache.samza.application.StreamApplication}, it is also used for creating any
61+
* intermediate streams; e.g., those created by the {@link org.apache.samza.operators.MessageStream#partitionBy} and
62+
* {@link org.apache.samza.operators.MessageStream#broadcast} operators.
5363
* <p>
5464
* If the default system descriptor is set, it must be set <b>before</b> creating any input/output/intermediate streams.
5565
*
56-
* @param defaultSystemDescriptor the default system descriptor to use
57-
* @return type {@code S} of {@link ApplicationDescriptor} with {@code defaultSystemDescriptor} set as its default system
66+
* @param defaultSystemDescriptor the {@link SystemDescriptor} for the default system for the application
67+
* @return this {@link ApplicationDescriptor}
5868
*/
5969
S withDefaultSystem(SystemDescriptor<?> defaultSystemDescriptor);
6070

@@ -64,10 +74,11 @@ public interface ApplicationDescriptor<S extends ApplicationDescriptor> {
6474
* context can be accessed through the {@link org.apache.samza.context.Context}.
6575
* <p>
6676
* Setting this is optional.
77+
* <p>
78+
* The provided {@code factory} instance must be {@link java.io.Serializable}.
6779
*
6880
* @param factory the {@link ApplicationContainerContextFactory} for this application
69-
* @return type {@code S} of {@link ApplicationDescriptor} with {@code factory} set as its
70-
* {@link ApplicationContainerContextFactory}
81+
* @return this {@link ApplicationDescriptor}
7182
*/
7283
S withApplicationContainerContextFactory(ApplicationContainerContextFactory<?> factory);
7384

@@ -77,31 +88,37 @@ public interface ApplicationDescriptor<S extends ApplicationDescriptor> {
7788
* accessed through the {@link org.apache.samza.context.Context}.
7889
* <p>
7990
* Setting this is optional.
91+
* <p>
92+
* The provided {@code factory} instance must be {@link java.io.Serializable}.
8093
*
8194
* @param factory the {@link ApplicationTaskContextFactory} for this application
82-
* @return type {@code S} of {@link ApplicationDescriptor} with {@code factory} set as its
83-
* {@link ApplicationTaskContextFactory}
95+
* @return this {@link ApplicationDescriptor}
8496
*/
8597
S withApplicationTaskContextFactory(ApplicationTaskContextFactory<?> factory);
8698

8799
/**
88100
* Sets the {@link ProcessorLifecycleListenerFactory} for this application.
89-
*
90-
* <p>Setting a {@link ProcessorLifecycleListenerFactory} is optional to a user application. It allows users to
101+
* <p>
102+
* Setting a {@link ProcessorLifecycleListenerFactory} is optional to a user application. It allows users to
91103
* plug in optional code to be invoked in different stages before/after the main processing logic is started/stopped in
92104
* the application.
105+
* <p>
106+
* The provided {@code factory} instance must be {@link java.io.Serializable}.
93107
*
94108
* @param listenerFactory the user implemented {@link ProcessorLifecycleListenerFactory} that creates lifecycle listener
95109
* with callback methods before and after the start/stop of each StreamProcessor in the application
96-
* @return type {@code S} of {@link ApplicationDescriptor} with {@code listenerFactory} set as its {@link ProcessorLifecycleListenerFactory}
110+
* @return this {@link ApplicationDescriptor}
97111
*/
98112
S withProcessorLifecycleListenerFactory(ProcessorLifecycleListenerFactory listenerFactory);
99113

100114
/**
101-
* Sets a set of customized {@link MetricsReporterFactory}s in the application
115+
* Sets the {@link org.apache.samza.metrics.MetricsReporterFactory}s for creating the
116+
* {@link org.apache.samza.metrics.MetricsReporter}s to use for the application.
117+
* <p>
118+
* The provided {@link MetricsReporterFactory} instances must be {@link java.io.Serializable}.
102119
*
103-
* @param reporterFactories the map of customized {@link MetricsReporterFactory}s to be used
104-
* @return type {@code S} of {@link ApplicationDescriptor} with {@code reporterFactories}
120+
* @param reporterFactories a map of {@link org.apache.samza.metrics.MetricsReporter} names to their factories.
121+
* @return this {@link ApplicationDescriptor}
105122
*/
106123
S withMetricsReporterFactories(Map<String, MetricsReporterFactory> reporterFactories);
107124

0 commit comments

Comments
 (0)