Note that there are some explanatory texts on larger screens.

plurals
  1. POUsing camel to aggregate messages of same header
    primarykey
    data
    text
    <p>I have multiple clients that send files to a server. For one set of data there are two files that contain information about that data, each with the same name. When a file is received, the server sends a message out to my queue containing the file path, file name, ID of the client, and the "type" of file it is (all have same file extension but there are two "types," call them A and B).</p> <p>The two files for one set of data have the same file name. As soon as the server has received both of the files I need to start a program that combines the two. Currently I have something that looks like this:</p> <pre><code>from("jms:queue.name").aggregate(header("CamelFileName")).completionSize(2).to("exec://FILEPATH?args="); </code></pre> <p>Where I am stuck is the header("CamelFileName"), and more specifically how the aggregator works.</p> <p>With the completionSize set to 2 does it just suck up all the messages and store them in some data structure until a second message that matches the first comes through? Also, does the header() expect a specific value? I have multiple clients so I was thinking of having the client ID and the file name in the header, but then again I don't know if I have to give a specific value. I also don't know if I can use a regex or not.</p> <p>Any ideas or tips would be super helpful. Thanks</p> <p>EDIT: Here is some code I have now. Based on my description of the problem here and in comments on selected answer does it seem accurate (besides close brackets that I didn't copy over)?</p> <pre><code>public static void main(String args[]) throws Exception{ CamelContext c = new DefaultCamelContext(); c.addComponent("activemq", activeMQComponent("vm://localhost?broker.persistent=false")); //ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); //c.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory)); c.addRoutes(new RouteBuilder() { public void configure() { from("activemq:queue:analytics.camelqueue").aggregate(new MyAggregationStrategy()).header("subject").completionSize(2).to("activemq:queue:analytics.success"); } }); c.start(); while (true) { System.out.println("Waiting on messages to come through for camel"); Thread.sleep(2 * 1000); } //c.stop(); } private static class MyAggregationStrategy implements AggregationStrategy { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (oldExchange == null) return newExchange; // and here is where combo stuff goes String oldBody = oldExchange.getIn().getBody(String.class); String newBody = newExchange.getIn().getBody(String.class); boolean oldSet = oldBody.contains("set"); boolean newSet = newBody.contains("set"); boolean oldFlow = oldBody.contains("flow"); boolean newFlow = newBody.contains("flow"); if ( (oldSet &amp;&amp; newFlow) || (oldFlow &amp;&amp; newSet) ) { //they match so return new exchange with info so extractor can be started with exec String combined = oldBody + "\n" + newBody + "\n"; newExchange.getIn().setBody(combined); return newExchange; } else { // no match so do something.... return null; } } } </code></pre>
    singulars
    1. This table or related slice is empty.
    plurals
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload