Note that there are some explanatory texts on larger screens.

plurals
  1. PO
    text
    copied!<p>I like to use MailboxProcessor for the reduce part of the algorithm, and async block that's invoked with Async.Parallel for the map part. It makes things more explicit, giving you finer control over exception handling, timeouts, and cancellation.</p> <p>The following code was designed with Brian's help, and with the help of his excellent F# block highlighting "F# Depth Colorizer" plug-in for VS2010.</p> <p>This code is meant to pull RSS feeds from yahoo weather server in a map-reduce pattern. It demonstrates how we can control execution flow from the outside of actual algorithm.</p> <p>fetchWeather is the map part, and mailboxLoop is the reduce part of the algorithm.</p> <pre><code>#r "System.Xml.Linq.dll" #r "FSharp.PowerPack.dll" open System open System.Diagnostics open System.IO open System.Linq open System.Net open System.Xml.Linq open Microsoft.FSharp.Control.WebExtensions type Weather (city, region, temperature) = class member x.City = city member x.Region = region member x.Temperature : int = temperature override this.ToString() = sprintf "%s, %s: %d F" this.City this.Region this.Temperature end type MessageForActor = | ProcessWeather of Weather | ProcessError of int | GetResults of (Weather * Weather * Weather list) AsyncReplyChannel let parseRss woeid (rssStream : Stream) = let xn str = XName.Get str let yweather elementName = XName.Get(elementName, "http://xml.weather.yahoo.com/ns/rss/1.0") let channel = (XDocument.Load rssStream).Descendants(xn "channel").First() let location = channel.Element(yweather "location") let condition = channel.Element(xn "item").Element(yweather "condition") // If the RSS server returns error, condition XML element won't be available. if not(condition = null) then let temperature = Int32.Parse(condition.Attribute(xn "temp").Value) ProcessWeather(new Weather( location.Attribute(xn "city").Value, location.Attribute(xn "region").Value, temperature)) else ProcessError(woeid) let fetchWeather (actor : MessageForActor MailboxProcessor) woeid = async { let rssAddress = sprintf "http://weather.yahooapis.com/forecastrss?w=%d&amp;u=f" woeid let webRequest = WebRequest.Create rssAddress use! response = webRequest.AsyncGetResponse() use responseStream = response.GetResponseStream() let weather = parseRss woeid responseStream //do! Async.Sleep 1000 // enable this line to see amplified timing that proves concurrent flow actor.Post(weather) } let mailboxLoop initialCount = let chooseCityByTemperature op (x : Weather) (y : Weather) = if op x.Temperature y.Temperature then x else y let sortWeatherByCityAndState (weatherList : Weather list) = weatherList |&gt; List.sortWith (fun x y -&gt; x.City.CompareTo(y.City)) |&gt; List.sortWith (fun x y -&gt; x.Region.CompareTo(y.Region)) MailboxProcessor.Start(fun inbox -&gt; let rec loop minAcc maxAcc weatherList remaining = async { let! message = inbox.Receive() let remaining = remaining - 1 match message with | ProcessWeather weather -&gt; let colderCity = chooseCityByTemperature (&lt;) minAcc weather let warmerCity = chooseCityByTemperature (&gt;) maxAcc weather return! loop colderCity warmerCity (weather :: weatherList) remaining | ProcessError woeid -&gt; let errorWeather = new Weather(sprintf "Error with woeid=%d" woeid, "ZZ", 99999) return! loop minAcc maxAcc (errorWeather :: weatherList) remaining | GetResults replyChannel -&gt; replyChannel.Reply(minAcc, maxAcc, sortWeatherByCityAndState weatherList) } let minValueInitial = new Weather("", "", Int32.MaxValue) let maxValueInitial = new Weather("", "", Int32.MinValue) loop minValueInitial maxValueInitial [] initialCount ) let RunSynchronouslyWithExceptionAndTimeoutHandlers computation = let timeout = 30000 try Async.RunSynchronously(Async.Catch(computation), timeout) |&gt; function Choice1Of2 answer -&gt; answer |&gt; ignore | Choice2Of2 (except : Exception) -&gt; printfn "%s" except.Message; printfn "%s" except.StackTrace; exit -4 with | :? System.TimeoutException -&gt; printfn "Timed out waiting for results for %d seconds!" &lt;| timeout / 1000; exit -5 let main = // Should have script name, sync/async select, and at least one woeid if fsi.CommandLineArgs.Length &lt; 3 then printfn "Expecting at least two arguments!" printfn "There were %d arguments" (fsi.CommandLineArgs.Length - 1) exit -1 let woeids = try fsi.CommandLineArgs |&gt; Seq.skip 2 // skip the script name and sync/async select |&gt; Seq.map Int32.Parse |&gt; Seq.toList with | except -&gt; printfn "One of supplied arguments was not an integer: %s" except.Message; exit -2 let actor = mailboxLoop woeids.Length let processWeatherItemsConcurrently woeids = woeids |&gt; Seq.map (fetchWeather actor) |&gt; Async.Parallel |&gt; RunSynchronouslyWithExceptionAndTimeoutHandlers let processOneWeatherItem woeid = woeid |&gt; fetchWeather actor |&gt; RunSynchronouslyWithExceptionAndTimeoutHandlers let stopWatch = new Stopwatch() stopWatch.Start() match fsi.CommandLineArgs.[1].ToUpper() with | "C" -&gt; printfn "Concurrent execution: "; processWeatherItemsConcurrently woeids | "S" -&gt; printfn "Synchronous execution: "; woeids |&gt; Seq.iter processOneWeatherItem | _ -&gt; printfn "Unexpected run options!"; exit -3 let (min, max, weatherList) = actor.PostAndReply GetResults stopWatch.Stop() assert (weatherList.Length = woeids.Length) printfn "{" weatherList |&gt; List.iter (printfn " %O") printfn "}" printfn "Coldest place: %O" min printfn "Hottest place: %O" max printfn "Completed in %d millisec" stopWatch.ElapsedMilliseconds main </code></pre>
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload