Note that there are some explanatory texts on larger screens.

plurals
  1. PO
    text
    copied!<p>In your example there are a lot of nested DISTINCT operators within FOREACH which are executed in the reducer, it relies on RAM to calculate unique values and this query produces just one Job. In case of just too many unique elements in a group you could get memory related exceptions as well. </p> <p>Luckily PIG Latin is a dataflow language and you write sort of execution plan. In order to utilize more CPUs you could change your code in such way that forces more MapReduce jobs which could be executed in parallel. For that we should rewrite query without using nested DISTINCT, the trick is to do distinct operations and than group by as if you had just one column and than merge the results. It is very SQL like, but it works. Here it is:</p> <pre><code>records = LOAD '....' USING PigStorage(',') AS (g, a, b, c, d, fd, s, w); selected = FOREACH records GENERATE g, a, b, c, d; grouped_a = FOREACH selected GENERATE g, a; grouped_a = DISTINCT grouped_a; grouped_a_count = GROUP grouped_a BY g; grouped_a_count = FOREACH grouped_a_count GENERATE FLATTEN(group) as g, COUNT(grouped_a) as a_count; grouped_b = FOREACH selected GENERATE g, b; grouped_b = DISTINCT grouped_b; grouped_b_count = GROUP grouped_b BY g; grouped_b_count = FOREACH grouped_b_count GENERATE FLATTEN(group) as g, COUNT(grouped_b) as b_count; grouped_c = FOREACH selected GENERATE g, c; grouped_c = DISTINCT grouped_c; grouped_c_count = GROUP grouped_c BY g; grouped_c_count = FOREACH grouped_c_count GENERATE FLATTEN(group) as g, COUNT(grouped_c) as c_count; grouped_d = FOREACH selected GENERATE g, d; grouped_d = DISTINCT grouped_d; grouped_d_count = GROUP grouped_d BY g; grouped_d_count = FOREACH grouped_d_count GENERATE FLATTEN(group) as g, COUNT(grouped_d) as d_count; mrg = JOIN grouped_a_count BY g, grouped_b_count BY g, grouped_c_count BY g, grouped_d_count BY g; out = FOREACH mrg GENERATE grouped_a_count::g, grouped_a_count::a_count, grouped_b_count::b_count, grouped_c_count::c_count, grouped_d_count::d_count; STORE out into '....' USING PigStorage(','); </code></pre> <p>After execution I got following summary which shows that distinct operations did not suffer from the skew in data were processed by the first Job:</p> <pre><code>Job Stats (time in seconds): JobId Maps Reduces MaxMapTime MinMapTIme AvgMapTime MaxReduceTime MinReduceTime AvgReduceTime Alias Feature Outputs job_201206061712_0244 669 45 75 8 13 376 18 202 grouped_a,grouped_b,grouped_c,grouped_d,records,selected DISTINCT,MULTI_QUERY job_201206061712_0245 1 1 3 3 3 12 12 12 grouped_c_count GROUP_BY,COMBINER job_201206061712_0246 1 1 3 3 3 12 12 12 grouped_b_count GROUP_BY,COMBINER job_201206061712_0247 5 1 48 27 33 30 30 30 grouped_a_count GROUP_BY,COMBINER job_201206061712_0248 1 1 3 3 3 12 12 12 grouped_d_count GROUP_BY,COMBINER job_201206061712_0249 4 1 3 3 3 12 12 12 mrg,out HASH_JOIN ..., Input(s): Successfully read 52215768 records (44863559501 bytes) from: "...." Output(s): Successfully stored 9 records (181 bytes) in: "..." </code></pre> <p>From Job DAG we can see that groupby operations were executed in parallel:</p> <pre><code>Job DAG: job_201206061712_0244 -&gt; job_201206061712_0248,job_201206061712_0246,job_201206061712_0247,job_201206061712_0245, job_201206061712_0248 -&gt; job_201206061712_0249, job_201206061712_0246 -&gt; job_201206061712_0249, job_201206061712_0247 -&gt; job_201206061712_0249, job_201206061712_0245 -&gt; job_201206061712_0249, job_201206061712_0249 </code></pre> <p>It works fine on my datasets where one of the group key values (in column g) makes 95% of the data. It also gets rid of memory related exceptions. </p>
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload