Note that there are some explanatory texts on larger screens.

plurals
  1. POMap Reduce with F# agents
    primarykey
    data
    text
    <p>After playing with F# agents I tried to do a map reduce using them. </p> <p>The basic structure I use is:</p> <ul> <li>map supervisor which queues up all the work to do in its state and receives work request from map workers</li> <li>reduce supervisor does the same thing as map supervisor for reduce work</li> <li>a bunch of map and reduce workers that map and reduce, if one fails its work it sends it back to the respective supervisr to be reprocessed.</li> </ul> <p>The questions I wonder about is:</p> <ul> <li>does this make any sense compared to a more traditional (yet very nice) map reduce like (http://tomasp.net/blog/fsharp-parallel-aggregate.aspx) that uses PSeq ? </li> <li>the way I implemented the map and reduce workers seems ugly is there a better way ?</li> <li>it seems like I can create a 1000 000 map workers and 1000 0000 reduce workers lol, how should I choose these numbers, the more the better ?</li> </ul> <p>Thanks a lot,</p> <pre><code>type Agent&lt;'T&gt; = MailboxProcessor&lt;'T&gt; //This is the response the supervisor //gives to the worker request for work type 'work SupervisorResponse = | Work of 'work //a piece of work | NoWork//no work left to do //This is the message to the supervisor type 'work WorkMsg = | ToDo of 'work //piles up work in the Supervisor queue | WorkReq of AsyncReplyChannel&lt;SupervisorResponse&lt;'work&gt;&gt; //' //The supervisor agent can be interacted with type AgentOperation = | Stop //stop the agent | Status //yield the current status of supervisor type 'work SupervisorMsg = | WorkRel of 'work WorkMsg | Operation of AgentOperation //Supervises Map and Reduce workers module AgentSupervisor= let getNew (name:string) = new Agent&lt;SupervisorMsg&lt;'work&gt;&gt;(fun inbox -&gt; //' let rec loop state = async { let! msg = inbox.Receive() match msg with | WorkRel(m) -&gt; match m with | ToDo(work) -&gt; let newState = work:state return! loop newState | WorkReq(replyChannel) -&gt; match state with | [] -&gt; replyChannel.Reply(NoWork) return! loop [] | [item] -&gt; replyChannel.Reply(Work(item)) return! loop [] | (item::remaining) -&gt; replyChannel.Reply(Work(item)) return! loop remaining | Operation(op) -&gt; match op with | Status -&gt; Console.WriteLine(name+" current Work Queue "+ string (state.Length)) return! loop state | Stop -&gt; Console.WriteLine("Stoppped SuperVisor Agent "+name) return() } loop [] ) let stop (agent:Agent&lt;SupervisorMsg&lt;'work&gt;&gt;) = agent.Post(Operation(Stop)) let status (agent:Agent&lt;SupervisorMsg&lt;'work&gt;&gt;) =agent.Post(Operation(Status)) //Code for the workers type 'success WorkOutcome = | Success of 'success | Fail type WorkerMsg = | Start | Stop | Continue module AgentWorker = type WorkerSupervisors&lt;'reduce,'work&gt; = { Map:Agent&lt;SupervisorMsg&lt;'work&gt;&gt; ; Reduce:Agent&lt;SupervisorMsg&lt;'reduce&gt;&gt; } let stop (agent:Agent&lt;WorkerMsg&gt;) = agent.Post(Stop) let start (agent:Agent&lt;WorkerMsg&gt;) = agent.Start() agent.Post(Start) let getNewMapWorker( map, supervisors:WorkerSupervisors&lt;'reduce,'work&gt; ) = new Agent&lt;WorkerMsg&gt;(fun inbox -&gt; let rec loop () = async { let! msg = inbox.Receive() match msg with | Start -&gt; inbox.Post(Continue) return! loop () | Continue -&gt; let! supervisorOrder = supervisors.Map.PostAndAsyncReply( fun replyChannel -&gt; WorkRel(WorkReq(replyChannel))) match supervisorOrder with | Work(work) -&gt; let! res = map work match res with | Success(toReduce) -&gt; supervisors.Reduce .Post(WorkRel(ToDo(toReduce))) | Fail -&gt; Console.WriteLine("Map Fail") supervisors.Map .Post(WorkRel(ToDo(work))) inbox.Post(Continue) | NoWork -&gt; inbox.Post(Continue) return! loop () | Stop -&gt; Console.WriteLine("Map worker stopped") return () } loop () ) let getNewReduceWorker(reduce,reduceSupervisor:Agent&lt;SupervisorMsg&lt;'work&gt;&gt;)=//' new Agent&lt;WorkerMsg&gt;(fun inbox -&gt; let rec loop () = async { let! msg = inbox.Receive() match msg with | Start -&gt; inbox.Post(Continue) return! loop() | Continue -&gt; let! supervisorOrder = reduceSupervisor.PostAndAsyncReply(fun replyChannel -&gt; WorkRel(WorkReq(replyChannel))) match supervisorOrder with | Work(work) -&gt; let! res = reduce work match res with | Success(toReduce) -&gt; inbox.Post(Continue) | Fail -&gt; Console.WriteLine("ReduceFail") reduceSupervisor.Post(WorkRel(ToDo(work))) inbox.Post(Continue) | NoWork -&gt; inbox.Post(Continue) return! loop() |Stop -&gt;Console.WriteLine("Reduce worker stopped"); return () } loop() ) open AgentWorker type MapReduce&lt;'work,'reduce&gt;( numberMap:int , numberReduce: int, toProcess:'work list, map:'work-&gt;Async&lt;'reduce WorkOutcome&gt;, reduce:'reduce-&gt; Async&lt;unit WorkOutcome&gt;) = let mapSupervisor= AgentSupervisor.getNew("MapSupervisor") let reduceSupervisor = AgentSupervisor.getNew("ReduceSupervisor") let workerSupervisors = {Map = mapSupervisor ; Reduce = reduceSupervisor } let mapWorkers = [for i in 1..numberMap -&gt; AgentWorker.getNewMapWorker(map,workerSupervisors) ] let reduceWorkers = [for i in 1..numberReduce -&gt; AgentWorker.getNewReduceWorker(reduce,workerSupervisors.Reduce) ] member this.Start() = //Post work to do toProcess |&gt;List.iter(fun elem -&gt; mapSupervisor.Post( WorkRel(ToDo(elem)))) //Start supervisors mapSupervisor.Start() reduceSupervisor.Start() //start workers List.iter( fun mapper -&gt; mapper |&gt;start) mapWorkers List.iter( fun reducer -&gt;reducer|&gt;start) reduceWorkers member this.Status() = (mapSupervisor|&gt;AgentSupervisor.status) (reduceSupervisor|&gt;AgentSupervisor.status) member this.Stop() = List.map2(fun mapper reducer -&gt; mapper |&gt;stop; reducer|&gt;stop) mapWorkers reduceWorkers //Run some tests let map = function (n:int64) -&gt; async{ return Success(n) } let reduce = function (toto: int64) -&gt; async{ return Success() } let mp = MapReduce&lt;int64,int64&gt;( 1,1,[for i in 1L..1000000L-&gt;i],map,reduce) mp.Start() mp.Status() mp.Stop() </code></pre>
    singulars
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    plurals
    1. This table or related slice is empty.
    1. This table or related slice is empty.
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload