From 8c7319df9bec520c553900463236b0293e99bed6 Mon Sep 17 00:00:00 2001 From: Mahmoud Ben Hassine Date: Tue, 23 Jan 2018 08:57:43 +0100 Subject: [PATCH 1/2] BATCH-1767: fix optimistic locking exception in multi-threaded step Currently, when the commit of the chunk fails in a multi-threaded step, the step execution is rolled back to a previous, eventually obsolete version. This previous version might be obsolete because it could be modified by another thread. In this case, a OptimisticLockingFailureException is thrown when trying to persist the step execution leaving the step in an UNKNOWN state while it should be FAILED. This commit fixes the issue by refreshing the step execution to the latest correctly persisted version before applying the step contribution so that the contribution is applied on a fresh correct state. Resolves BATCH-1767 --- .../batch/core/step/tasklet/TaskletStep.java | 26 +- ...tiThreadedTaskletStepIntegrationTests.java | 251 ++++++++++++++++++ .../src/main/asciidoc/scalability.adoc | 14 +- 3 files changed, 282 insertions(+), 9 deletions(-) create mode 100644 spring-batch-core/src/test/java/org/springframework/batch/core/step/item/MultiThreadedTaskletStepIntegrationTests.java diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/TaskletStep.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/TaskletStep.java index 077a9c530d..c1c9242829 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/TaskletStep.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/tasklet/TaskletStep.java @@ -19,6 +19,7 @@ import org.apache.commons.logging.LogFactory; import org.springframework.batch.core.BatchStatus; import org.springframework.batch.core.ChunkListener; +import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobInterruptedException; import org.springframework.batch.core.StepContribution; import org.springframework.batch.core.StepExecution; @@ -339,6 +340,7 @@ private class ChunkTransactionCallback extends TransactionSynchronizationAdapter private boolean stepExecutionUpdated = false; private StepExecution oldVersion; + private ExecutionContext oldExecutionContext; private boolean locked = false; @@ -360,6 +362,7 @@ public void afterCompletion(int status) { logger.info("Commit failed while step execution data was already updated. " + "Reverting to old version."); copy(oldVersion, stepExecution); + stepExecution.setExecutionContext(oldExecutionContext); if (status == TransactionSynchronization.STATUS_ROLLED_BACK) { rollback(stepExecution); } @@ -371,7 +374,6 @@ public void afterCompletion(int status) { logger.error("Rolling back with transaction in unknown state"); rollback(stepExecution); stepExecution.upgradeStatus(BatchStatus.UNKNOWN); - stepExecution.setTerminateOnly(); } } finally { @@ -397,8 +399,7 @@ public RepeatStatus doInTransaction(TransactionStatus status) { // In case we need to push it back to its old value // after a commit fails... - oldVersion = new StepExecution(stepExecution.getStepName(), stepExecution.getJobExecution()); - copy(stepExecution, oldVersion); + oldExecutionContext = new ExecutionContext(stepExecution.getExecutionContext()); try { @@ -433,6 +434,23 @@ public RepeatStatus doInTransaction(TransactionStatus status) { Thread.currentThread().interrupt(); } + // Refresh stepExecution to the latest correctly persisted + // state in order to apply the contribution on the latest version + String stepName = stepExecution.getStepName(); + JobExecution jobExecution = stepExecution.getJobExecution(); + StepExecution lastStepExecution = getJobRepository() + .getLastStepExecution(jobExecution.getJobInstance(), stepName); + if (lastStepExecution != null && + !lastStepExecution.getVersion().equals(stepExecution.getVersion())) { + copy(lastStepExecution, stepExecution); + } + + // Take a copy of the stepExecution in case we need to + // undo the current contribution to the in memory instance + // if the commit fails + oldVersion = new StepExecution(stepName, jobExecution); + copy(stepExecution, oldVersion); + // Apply the contribution to the step // even if unsuccessful if (logger.isDebugEnabled()) { @@ -499,11 +517,9 @@ private void rollback(StepExecution stepExecution) { } private void copy(final StepExecution source, final StepExecution target) { - target.setVersion(source.getVersion()); target.setWriteCount(source.getWriteCount()); target.setFilterCount(source.getFilterCount()); target.setCommitCount(source.getCommitCount()); - target.setExecutionContext(new ExecutionContext(source.getExecutionContext())); } } diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/MultiThreadedTaskletStepIntegrationTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/MultiThreadedTaskletStepIntegrationTests.java new file mode 100644 index 0000000000..633b7e9c70 --- /dev/null +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/MultiThreadedTaskletStepIntegrationTests.java @@ -0,0 +1,251 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.batch.core.step.item; + +import org.junit.Test; +import org.springframework.batch.core.BatchStatus; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobParameters; +import org.springframework.batch.core.StepExecution; +import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; +import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; +import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; +import org.springframework.batch.core.launch.JobLauncher; +import org.springframework.batch.core.listener.JobExecutionListenerSupport; +import org.springframework.batch.core.step.tasklet.TaskletStep; +import org.springframework.batch.item.ItemReader; +import org.springframework.batch.item.ItemWriter; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.AnnotationConfigApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; +import org.springframework.jdbc.datasource.DataSourceTransactionManager; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder; +import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.support.DefaultTransactionStatus; + +import javax.sql.DataSource; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +/** + * Tests for the behavior of a multi-threaded TaskletStep. + * + * @author Mahmoud Ben Hassine + */ +public class MultiThreadedTaskletStepIntegrationTests { + + @Test + public void testMultiThreadedTaskletExecutionWhenNoErrors() throws Exception { + // given + Class[] configurationClasses = {JobConfiguration.class, TransactionManagerConfiguration.class}; + ApplicationContext context = new AnnotationConfigApplicationContext(configurationClasses); + JobLauncher jobLauncher = context.getBean(JobLauncher.class); + Job job = context.getBean(Job.class); + + // when + JobExecution jobExecution = jobLauncher.run(job, new JobParameters()); + + // then + assertNotNull(jobExecution); + assertEquals(BatchStatus.COMPLETED, jobExecution.getStatus()); + StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next(); + assertEquals(BatchStatus.COMPLETED, stepExecution.getStatus()); + assertEquals(0, stepExecution.getFailureExceptions().size()); + } + + @Test + public void testMultiThreadedTaskletExecutionWhenCommitFails() throws Exception { + // given + Class[] configurationClasses = {JobConfiguration.class, CommitFailingTransactionManagerConfiguration.class}; + ApplicationContext context = new AnnotationConfigApplicationContext(configurationClasses); + JobLauncher jobLauncher = context.getBean(JobLauncher.class); + Job job = context.getBean(Job.class); + + // when + JobExecution jobExecution = jobLauncher.run(job, new JobParameters()); + + // then + assertNotNull(jobExecution); + assertEquals(BatchStatus.FAILED, jobExecution.getStatus()); + StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next(); + assertEquals(BatchStatus.FAILED, stepExecution.getStatus()); + Throwable e = stepExecution.getFailureExceptions().get(0); + assertEquals("Planned commit exception!", e.getMessage()); + // No assertions on execution context because it is undefined in this case + } + + @Test + public void testMultiThreadedTaskletExecutionWhenRollbackFails() throws Exception { + // given + Class[] configurationClasses = {JobConfiguration.class, RollbackFailingTransactionManagerConfiguration.class}; + ApplicationContext context = new AnnotationConfigApplicationContext(configurationClasses); + JobLauncher jobLauncher = context.getBean(JobLauncher.class); + Job job = context.getBean(Job.class); + + // when + JobExecution jobExecution = jobLauncher.run(job, new JobParameters()); + + // then + assertNotNull(jobExecution); + assertEquals(BatchStatus.UNKNOWN, jobExecution.getStatus()); + StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next(); + assertEquals(BatchStatus.UNKNOWN, stepExecution.getStatus()); + Throwable e = stepExecution.getFailureExceptions().get(0); + assertEquals("Planned rollback exception!", e.getMessage()); + // No assertions on execution context because it is undefined in this case + } + + @Configuration + @EnableBatchProcessing + public static class JobConfiguration { + + @Autowired + private JobBuilderFactory jobBuilderFactory; + @Autowired + private StepBuilderFactory stepBuilderFactory; + + @Bean + public TaskletStep step() { + return stepBuilderFactory.get("step") + .chunk(3) + .reader(itemReader()) + .writer(itemWriter()) + .taskExecutor(taskExecutor()) + .build(); + } + + @Bean + public Job job(ThreadPoolTaskExecutor taskExecutor) { + return jobBuilderFactory.get("job") + .start(step()) + .listener(new JobExecutionListenerSupport() { + @Override + public void afterJob(JobExecution jobExecution) { + taskExecutor.shutdown(); + } + }) + .build(); + } + + @Bean + public ThreadPoolTaskExecutor taskExecutor() { + ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); + taskExecutor.setCorePoolSize(3); + taskExecutor.setMaxPoolSize(3); + taskExecutor.setThreadNamePrefix("spring-batch-worker-thread-"); + return taskExecutor; + } + + @Bean + public ItemReader itemReader() { + return new ItemReader() { + private AtomicInteger atomicInteger = new AtomicInteger(); + + @Override + public synchronized Integer read() { + int value = atomicInteger.incrementAndGet(); + return value <= 9 ? value : null; + } + }; + } + + @Bean + public ItemWriter itemWriter() { + return items -> { + }; + } + } + + @Configuration + public static class DataSourceConfiguration { + + @Bean + public DataSource dataSource() { + return new EmbeddedDatabaseBuilder() + .setType(EmbeddedDatabaseType.HSQL) + .addScript("org/springframework/batch/core/schema-drop-hsqldb.sql") + .addScript("org/springframework/batch/core/schema-hsqldb.sql") + .build(); + } + + } + + @Configuration + @Import(DataSourceConfiguration.class) + public static class TransactionManagerConfiguration { + + @Bean + public PlatformTransactionManager transactionManager(DataSource dataSource) { + return new DataSourceTransactionManager(dataSource); + } + + } + + @Configuration + @Import(DataSourceConfiguration.class) + public static class CommitFailingTransactionManagerConfiguration { + + @Bean + public PlatformTransactionManager transactionManager(DataSource dataSource) { + return new DataSourceTransactionManager(dataSource) { + @Override + protected void doCommit(DefaultTransactionStatus status) { + super.doCommit(status); + if (Thread.currentThread().getName().equals("spring-batch-worker-thread-2")) { + throw new RuntimeException("Planned commit exception!"); + } + } + }; + } + + } + + @Configuration + @Import(DataSourceConfiguration.class) + public static class RollbackFailingTransactionManagerConfiguration { + + @Bean + public PlatformTransactionManager transactionManager(DataSource dataSource) { + return new DataSourceTransactionManager(dataSource) { + @Override + protected void doCommit(DefaultTransactionStatus status) { + super.doCommit(status); + if (Thread.currentThread().getName().equals("spring-batch-worker-thread-2")) { + throw new RuntimeException("Planned commit exception!"); + } + } + + @Override + protected void doRollback(DefaultTransactionStatus status) { + super.doRollback(status); + if (Thread.currentThread().getName().equals("spring-batch-worker-thread-2")) { + throw new RuntimeException("Planned rollback exception!"); + } + } + }; + } + + } + +} diff --git a/spring-batch-docs/src/main/asciidoc/scalability.adoc b/spring-batch-docs/src/main/asciidoc/scalability.adoc index 76997fcb9c..3df42524d0 100644 --- a/spring-batch-docs/src/main/asciidoc/scalability.adoc +++ b/spring-batch-docs/src/main/asciidoc/scalability.adoc @@ -123,7 +123,9 @@ your step, such as a `DataSource`. Be sure to make the pool in those resources as large as the desired number of concurrent threads in the step. There are some practical limitations of using multi-threaded `Step` implementations for -some common batch use cases. Many participants in a `Step` (such as readers and writers) +some common batch use cases: + +* Many participants in a `Step` (such as readers and writers) are stateful. If the state is not segregated by thread, then those components are not usable in a multi-threaded `Step`. In particular, most of the off-the-shelf readers and writers from Spring Batch are not designed for multi-threaded use. It is, however, @@ -132,9 +134,8 @@ possible to work with stateless or thread safe readers and writers, and there is https://github.com/spring-projects/spring-batch/tree/master/spring-batch-samples[Spring Batch Samples] that shows the use of a process indicator (see <>) to keep track -of items that have been processed in a database input table. - -Spring Batch provides some implementations of `ItemWriter` and `ItemReader`. Usually, +of items that have been processed in a database input table. Spring Batch provides some + implementations of `ItemWriter` and `ItemReader`. Usually, they say in the Javadoc if they are thread safe or not or what you have to do to avoid problems in a concurrent environment. If there is no information in the Javadoc, you can check the implementation to see if there is any state. If a reader is not thread safe, @@ -143,6 +144,11 @@ synchronizing delegator. You can synchronize the call to `read()` and as long as processing and writing is the most expensive part of the chunk, your step may still complete much faster than it would in a single threaded configuration. +* In a multi-threaded `Step`, each thread runs in its own transaction and the `ChunkContext` +is shared between threads. This shared state might end up in an inconsistent state +if one of the transactions is rolled back. Hence, we recommend avoiding `ExecutionContext` +manipulation in a multi-threaded `Step`. + [[scalabilityParallelSteps]] From 7ba5c10c7cad68896b711faae4564271aad548ae Mon Sep 17 00:00:00 2001 From: Mahmoud Ben Hassine Date: Tue, 23 Mar 2021 17:00:04 +0100 Subject: [PATCH 2/2] Update tests to use the new way of providing a tx manager as in #1289 --- ...tiThreadedTaskletStepIntegrationTests.java | 91 +++++++++++-------- 1 file changed, 52 insertions(+), 39 deletions(-) diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/MultiThreadedTaskletStepIntegrationTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/MultiThreadedTaskletStepIntegrationTests.java index 633b7e9c70..0518e8d957 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/MultiThreadedTaskletStepIntegrationTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/MultiThreadedTaskletStepIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018 the original author or authors. + * Copyright 2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +21,8 @@ import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.StepExecution; +import org.springframework.batch.core.configuration.annotation.BatchConfigurer; +import org.springframework.batch.core.configuration.annotation.DefaultBatchConfigurer; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; @@ -28,7 +30,6 @@ import org.springframework.batch.core.listener.JobExecutionListenerSupport; import org.springframework.batch.core.step.tasklet.TaskletStep; import org.springframework.batch.item.ItemReader; -import org.springframework.batch.item.ItemWriter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.AnnotationConfigApplicationContext; @@ -58,13 +59,14 @@ public class MultiThreadedTaskletStepIntegrationTests { @Test public void testMultiThreadedTaskletExecutionWhenNoErrors() throws Exception { // given - Class[] configurationClasses = {JobConfiguration.class, TransactionManagerConfiguration.class}; + Class[] configurationClasses = {JobConfiguration.class, TransactionManagerConfiguration.class}; ApplicationContext context = new AnnotationConfigApplicationContext(configurationClasses); JobLauncher jobLauncher = context.getBean(JobLauncher.class); Job job = context.getBean(Job.class); + JobParameters jobParameters = new JobParameters(); // when - JobExecution jobExecution = jobLauncher.run(job, new JobParameters()); + JobExecution jobExecution = jobLauncher.run(job, jobParameters); // then assertNotNull(jobExecution); @@ -77,13 +79,14 @@ public void testMultiThreadedTaskletExecutionWhenNoErrors() throws Exception { @Test public void testMultiThreadedTaskletExecutionWhenCommitFails() throws Exception { // given - Class[] configurationClasses = {JobConfiguration.class, CommitFailingTransactionManagerConfiguration.class}; + Class[] configurationClasses = {JobConfiguration.class, CommitFailingTransactionManagerConfiguration.class}; ApplicationContext context = new AnnotationConfigApplicationContext(configurationClasses); JobLauncher jobLauncher = context.getBean(JobLauncher.class); Job job = context.getBean(Job.class); + JobParameters jobParameters = new JobParameters(); // when - JobExecution jobExecution = jobLauncher.run(job, new JobParameters()); + JobExecution jobExecution = jobLauncher.run(job, jobParameters); // then assertNotNull(jobExecution); @@ -98,13 +101,14 @@ public void testMultiThreadedTaskletExecutionWhenCommitFails() throws Exception @Test public void testMultiThreadedTaskletExecutionWhenRollbackFails() throws Exception { // given - Class[] configurationClasses = {JobConfiguration.class, RollbackFailingTransactionManagerConfiguration.class}; + Class[] configurationClasses = {JobConfiguration.class, RollbackFailingTransactionManagerConfiguration.class}; ApplicationContext context = new AnnotationConfigApplicationContext(configurationClasses); JobLauncher jobLauncher = context.getBean(JobLauncher.class); Job job = context.getBean(Job.class); + JobParameters jobParameters = new JobParameters(); // when - JobExecution jobExecution = jobLauncher.run(job, new JobParameters()); + JobExecution jobExecution = jobLauncher.run(job, jobParameters); // then assertNotNull(jobExecution); @@ -130,7 +134,7 @@ public TaskletStep step() { return stepBuilderFactory.get("step") .chunk(3) .reader(itemReader()) - .writer(itemWriter()) + .writer(items -> {}) .taskExecutor(taskExecutor()) .build(); } @@ -160,8 +164,7 @@ public ThreadPoolTaskExecutor taskExecutor() { @Bean public ItemReader itemReader() { return new ItemReader() { - private AtomicInteger atomicInteger = new AtomicInteger(); - + private final AtomicInteger atomicInteger = new AtomicInteger(); @Override public synchronized Integer read() { int value = atomicInteger.incrementAndGet(); @@ -170,11 +173,6 @@ public synchronized Integer read() { }; } - @Bean - public ItemWriter itemWriter() { - return items -> { - }; - } } @Configuration @@ -196,8 +194,13 @@ public DataSource dataSource() { public static class TransactionManagerConfiguration { @Bean - public PlatformTransactionManager transactionManager(DataSource dataSource) { - return new DataSourceTransactionManager(dataSource); + public BatchConfigurer batchConfigurer(DataSource dataSource) { + return new DefaultBatchConfigurer(dataSource) { + @Override + public PlatformTransactionManager getTransactionManager() { + return new DataSourceTransactionManager(dataSource); + } + }; } } @@ -207,14 +210,19 @@ public PlatformTransactionManager transactionManager(DataSource dataSource) { public static class CommitFailingTransactionManagerConfiguration { @Bean - public PlatformTransactionManager transactionManager(DataSource dataSource) { - return new DataSourceTransactionManager(dataSource) { + public BatchConfigurer batchConfigurer(DataSource dataSource) { + return new DefaultBatchConfigurer(dataSource) { @Override - protected void doCommit(DefaultTransactionStatus status) { - super.doCommit(status); - if (Thread.currentThread().getName().equals("spring-batch-worker-thread-2")) { - throw new RuntimeException("Planned commit exception!"); - } + public PlatformTransactionManager getTransactionManager() { + return new DataSourceTransactionManager(dataSource) { + @Override + protected void doCommit(DefaultTransactionStatus status) { + super.doCommit(status); + if (Thread.currentThread().getName().equals("spring-batch-worker-thread-2")) { + throw new RuntimeException("Planned commit exception!"); + } + } + }; } }; } @@ -226,22 +234,27 @@ protected void doCommit(DefaultTransactionStatus status) { public static class RollbackFailingTransactionManagerConfiguration { @Bean - public PlatformTransactionManager transactionManager(DataSource dataSource) { - return new DataSourceTransactionManager(dataSource) { + public BatchConfigurer batchConfigurer(DataSource dataSource) { + return new DefaultBatchConfigurer(dataSource) { @Override - protected void doCommit(DefaultTransactionStatus status) { - super.doCommit(status); - if (Thread.currentThread().getName().equals("spring-batch-worker-thread-2")) { - throw new RuntimeException("Planned commit exception!"); - } - } + public PlatformTransactionManager getTransactionManager() { + return new DataSourceTransactionManager(dataSource) { + @Override + protected void doCommit(DefaultTransactionStatus status) { + super.doCommit(status); + if (Thread.currentThread().getName().equals("spring-batch-worker-thread-2")) { + throw new RuntimeException("Planned commit exception!"); + } + } - @Override - protected void doRollback(DefaultTransactionStatus status) { - super.doRollback(status); - if (Thread.currentThread().getName().equals("spring-batch-worker-thread-2")) { - throw new RuntimeException("Planned rollback exception!"); - } + @Override + protected void doRollback(DefaultTransactionStatus status) { + super.doRollback(status); + if (Thread.currentThread().getName().equals("spring-batch-worker-thread-2")) { + throw new RuntimeException("Planned rollback exception!"); + } + } + }; } }; }