Note that there are some explanatory texts on larger screens.

plurals
  1. PO
    text
    copied!<p>As Alex said, it seems this behaviour is a contract as per javadocs of :</p> <blockquote> <p>Subclasses just need to provide a method that gets the next result * and one that waits for all the results to be returned from concurrent * processes or threads</p> </blockquote> <p>Look at:</p> <blockquote> <p>TaskExecutorRepeatTemplate#waitForResults</p> </blockquote> <p>Another option for you would be to use Partitioning :</p> <ul> <li>A TaskExecutorPartitionHandler that will execute items from Partitionned ItemReader, see below</li> <li>A Partitioner implementation that gives the ranges to be processed by ItemReader, see ColumnRangePartitioner below</li> <li>A CustomReader that will read data using what Partitioner will have filled, see myItemReader configuration below</li> </ul> <p><strong>Michael Minella explains this in Chapter 11 of his book <a href="http://www.apress.com/9781430234524" rel="nofollow">Pro Spring Batch</a></strong>:</p> <blockquote> <pre><code>&lt;batch:job id="batchWithPartition"&gt; &lt;batch:step id="step1.master"&gt; &lt;batch:partition partitioner="myPartitioner" handler="partitionHandler"/&gt; &lt;/batch:step&gt; &lt;/batch:job&gt; &lt;!-- This one will create Paritions of Number of lines/ Grid Size--&gt; &lt;bean id="myPartitioner" class="....ColumnRangePartitioner"/&gt; &lt;!-- This one will handle every partition in a Thread --&gt; &lt;bean id="partitionHandler" class="org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler"&gt; &lt;property name="taskExecutor" ref="multiThreadedTaskExecutor"/&gt; &lt;property name="step" ref="step1" /&gt; &lt;property name="gridSize" value="10" /&gt; &lt;/bean&gt; &lt;batch:step id="step1"&gt; &lt;batch:tasklet transaction-manager="transactionManager"&gt; &lt;batch:chunk reader="myItemReader" writer="manipulatableWriterForTests" commit-interval="1" skip-limit="30000"&gt; &lt;batch:skippable-exception-classes&gt; &lt;batch:include class="java.lang.Exception" /&gt; &lt;/batch:skippable-exception-classes&gt; &lt;/batch:chunk&gt; &lt;/batch:tasklet&gt; &lt;/batch:step&gt; &lt;!-- scope step is critical here--&gt; &lt;bean id="myItemReader" class="org.springframework.batch.item.database.JdbcCursorItemReader" scope="step"&gt; &lt;property name="dataSource" ref="dataSource"/&gt; &lt;property name="sql"&gt; &lt;value&gt; &lt;![CDATA[ select * from customers where id &gt;= ? and id &lt;= ? ]]&gt; &lt;/value&gt; &lt;/property&gt; &lt;property name="preparedStatementSetter"&gt; &lt;bean class="org.springframework.batch.core.resource.ListPreparedStatementSetter"&gt; &lt;property name="parameters"&gt; &lt;list&gt; &lt;!-- minValue and maxValue are filled in by Partitioner for each Partition in an ExecutionContext--&gt; &lt;value&gt;{stepExecutionContext[minValue]}&lt;/value&gt; &lt;value&gt;#{stepExecutionContext[maxValue]}&lt;/value&gt; &lt;/list&gt; &lt;/property&gt; &lt;/bean&gt; &lt;/property&gt; &lt;property name="rowMapper" ref="customerRowMapper"/&gt; &lt;/bean&gt; </code></pre> </blockquote> <p>Partitioner.java:</p> <blockquote> <pre><code> package ...; import java.util.HashMap; import java.util.Map; import org.springframework.batch.core.partition.support.Partitioner; import org.springframework.batch.item.ExecutionContext; public class ColumnRangePartitioner implements Partitioner { private String column; private String table; public Map&lt;String, ExecutionContext&gt; partition(int gridSize) { int min = queryForInt("SELECT MIN(" + column + ") from " + table); int max = queryForInt("SELECT MAX(" + column + ") from " + table); int targetSize = (max - min) / gridSize; System.out.println("Our partition size will be " + targetSize); System.out.println("We will have " + gridSize + " partitions"); Map&lt;String, ExecutionContext&gt; result = new HashMap&lt;String, ExecutionContext&gt;(); int number = 0; int start = min; int end = start + targetSize - 1; while (start &lt;= max) { ExecutionContext value = new ExecutionContext(); result.put("partition" + number, value); if (end &gt;= max) { end = max; } value.putInt("minValue", start); value.putInt("maxValue", end); System.out.println("minValue = " + start); System.out.println("maxValue = " + end); start += targetSize; end += targetSize; number++; } System.out.println("We are returning " + result.size() + " partitions"); return result; } public void setColumn(String column) { this.column = column; } public void setTable(String table) { this.table = table; } } </code></pre> </blockquote>
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload