Note that there are some explanatory texts on larger screens.

plurals
  1. PO
    primarykey
    data
    text
    <p>One option is to use a <code>collection-aggregator</code> to act as an accumulator, blocking the final flow action until all the messages have been processed. The trick is that the <code>collection-splitter</code>s will set up a correlation group size that is only good for either the number of lines in the file or the number of columns in the file. But we want to accumulate until all columns of all lines have been processed. The solution consists in computing first this value (ie total number of expected messages) and overriding whatever correlation group size had been calculated the <code>collection-splitter</code>s with the total value.</p> <p>Here is how I've done this (you'll note that I replaced all Groovy snippets with more Mule-3-esque MEL expressions):</p> <pre><code>&lt;file:connector name="inputFileConnector" autoDelete="true" streaming="false" validateConnections="true" fileAge="60000" readFromDirectory="#{systemProperties['user.home']}" /&gt; &lt;flow name="flow1" processingStrategy="synchronous"&gt; &lt;file:inbound-endpoint path="#{systemProperties['user.home']}" responseTimeout="10000" fileAge="100" connector-ref="inputFileConnector"&gt; &lt;file:filename-regex-filter pattern="input.csv" caseSensitive="false" /&gt; &lt;/file:inbound-endpoint&gt; &lt;byte-array-to-string-transformer /&gt; &lt;set-session-variable variableName="expectedMessageCount" value="#[org.mule.util.StringUtils.countMatches(message.payload, '\n') + org.mule.util.StringUtils.countMatches(message.payload, ',') - 1]" /&gt; &lt;expression-transformer expression="#[message.payload.split('\n')]" /&gt; &lt;collection-splitter enableCorrelation="IF_NOT_SET" /&gt; &lt;set-property propertyName="MULE_CORRELATION_GROUP_SIZE" value="#[sessionVars.expectedMessageCount]" /&gt; &lt;choice&gt; &lt;when expression="#[message.payload != 'end']"&gt; &lt;processor-chain&gt; &lt;logger message="." level="INFO" /&gt; &lt;vm:outbound-endpoint path="toFlow2" /&gt; &lt;/processor-chain&gt; &lt;/when&gt; &lt;otherwise&gt; &lt;processor-chain&gt; &lt;logger message="|||| END" level="INFO" /&gt; &lt;/processor-chain&gt; &lt;/otherwise&gt; &lt;/choice&gt; &lt;/flow&gt; &lt;flow name="flow2"&gt; &lt;vm:inbound-endpoint path="toFlow2"/&gt; &lt;expression-transformer expression="#[message.payload.split(',')]" /&gt; &lt;collection-splitter /&gt; &lt;set-property propertyName="MULE_CORRELATION_GROUP_SIZE" value="#[sessionVars.expectedMessageCount]" /&gt; &lt;logger message="|||||| #[message.payload]" level="INFO"/&gt; &lt;vm:outbound-endpoint path="toFinalizer" /&gt; &lt;vm:outbound-endpoint path="toFlow3" /&gt; &lt;/flow&gt; &lt;flow name="finalizer"&gt; &lt;vm:inbound-endpoint path="toFinalizer" /&gt; &lt;collection-aggregator /&gt; &lt;logger message="|||| DONE" level="INFO" /&gt; &lt;/flow&gt; </code></pre> <p>NB. Alternatively, if using a <code>collection-aggregator</code> is an issue because it uses too much memory, you could use an expression component to decrement <code>sessionVars.expectedMessageCount</code> and filter to let a message hit the final message processor when the counter is back to 0.</p>
    singulars
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    plurals
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload