Adding Parallel Extensions to F# for VS2010 Beta 2

Luke Hoban has a great talk last month at PDC09 on F# for Parallel and Asynchronous Programming. In that talk he references some examples of starting with imperative programming, then recursive programming, and finally the functional way using  He then uses the reference to the ability to parallelize it using PSeq.  Unfortunatly, there is no PSeq in F# yet.  Matthew Podwysocki comes to the rescue with his blog post Adding Parallel Extensions to F#Unfortunately his code example doesn’t work in VS 2010 Beta 2 (his post was from February.)
So I have, with the help of Rick Minerich, updated Matthew’s PSeq implementation partially using the latest API (Beta 2).  Basically the System.Linq.IParallelEnumerable interface is gone in Beta 2, so I am using the ParallelQuery class in its place. Also, many of the ParallelQuery methods that Matthew used are now in ParallelEnumerable. So here is the updated implemenation of PSeq. First a couple of disclaimers/comments. I have not tested all of the code, only and only lightly. Also, I started working on a genaric implentation of PSeq.sum that will accept both int and int64. If anyone has any feedback, please provide via comments below or via twitter @talbott.



namespace PLinq.Wrapper

    /// from

    /// Type wrapper over the ParallelQuery

    type pseq<‘a> = System.Linq.ParallelQuery<‘a>


    module PSeq = 

      open System

      open System.Linq


      /// Append two parallel collections together

      let append (ie1: pseq<‘a>) (ie2: pseq<‘a>) : pseq<‘a> =

        ParallelEnumerable.Concat(ie1, ie2)


      /// This is the method to opt into Parallel LINQ.

      let adapt : seq<‘a> -> pseq<‘a> =



      /// This is the method to opt into Parallel LINQ with deg of parallelism

//      let adaptn (n:int) (seq:seq<‘a>) : pseq<‘a> = 

//        if n < 1  then adapt seq 

//        else ParallelEnumerable.AsParallel( seq, n )


      /// AsOrdered is a method that tells PLINQ to treat a data source as if it was ordered

      let ordered : pseq<‘a> -> pseq<‘a> = 



      /// AsUnordered tells PLINQ that it should treat a particular intermediate result as if no

      /// order was implied

      let unordered : pseq<‘a> -> pseq<‘a> = 



      /// This method is to opt out of Parallel LINQ.

      let as_seq : pseq<‘a> -> seq<‘a> =



      /// Parallel implementation of System.Linq.Enumerable.Average().

      let average_float : pseq<float> -> float = 



      /// Parallel implementation of System.Linq.Enumerable.Average().

      let average_float_by (f:’a -> float) (pe:pseq<‘a>) : float =

        ParallelEnumerable.Average(pe, Func<_,_>(f))


      /// Parallel implementation of System.Linq.ParallelEnumerable.Cast

      let cast : ParallelQuery -> pseq<‘a> =



      /// Parallel implementation of System.Linq.ParallelEnumerable.Distinct

      let distinct : pseq<‘a> -> pseq<‘a> =



      /// Parallel implementation of System.Linq.ParallelEnumerable.Any

      let exists (f:’a -> bool) (pe:pseq<‘a>) : bool =

        ParallelEnumerable.Any(pe, Func<_,_>(f))


      /// Parallel implementation of System.Linq.Enumerable.Where().

      let filter (f:’a -> bool) (pe:pseq<‘a>) : pseq<‘a> = 

        ParallelEnumerable.Where(pe, Func<_,_>(f))


      /// Parallel implementation of System.Linq.Enumerable.First

      let find (f:’a -> bool) (pe:pseq<‘a>) : ‘a =

        ParallelEnumerable.First(pe, Func<_,_>(f)) 


      /// Parallel implementation of System.Linq.Enumerable.Aggregate().

      let fold (f:’a -> ‘b -> ‘a) (seed:’a) (pe:pseq<‘b>) : ‘a = 

        ParallelEnumerable.Aggregate(pe, seed, Func<_,_,_>(f))


      /// Parallel implementation of System.Linq.Enumerable.All

      let forAll (f:’a -> bool) (pe:pseq<‘a>) : bool =

        ParallelEnumerable.All(pe, Func<_,_>(f))


      /// Empty ParallelEnumerable

      let empty<‘a> : pseq<‘a> = ParallelEnumerable.Empty<‘a>()


      /// Parallel implementation of System.Linq.Enumerable.First()

      let hd : pseq<‘a> -> ‘a =



      /// Parallel implementation of System.Linq.ParallelEnumerable.Count

      let length : pseq<‘a> -> int = 



      /// Parallel implementation of System.Linq.Enumerable.Select().

      let map (f:’a -> ‘b) (pe:pseq<‘a>) : pseq<‘b> = 

        ParallelEnumerable.Select(pe, Func<_,_>(f))           


      /// Parallel implementation of System.Linq.Enumerable.Select().

      let mapi (f:int -> ‘a -> ‘b) (pe:pseq<‘a>) : pseq<‘b> =

        let f’ x i = f i x

        ParallelEnumerable.Select(pe, Func<_,_,_>(f’))


      /// Parallel implementation of System.Linq.Enumerable.Zip

      let map2 (f:’a -> ‘b -> ‘c) (pe1:pseq<‘a>) (pe2:pseq<‘b>) : pseq<‘c> =

        ParallelEnumerable.Zip(pe1, pe2, Func<_,_,_>(f))


      /// Parallel implementation of System.Linq.Enumerable.Reverse

      let rev : pseq<‘a> -> pseq<‘a> = ParallelEnumerable.Reverse


      /// Parallel implementation of Seq.concat

      let concat (pe:pseq<pseq<‘a>>) : pseq<‘a> = 


            pe, Func<_,_>(fun x -> x :> seq<‘a>))


      /// Parallel implementation of System.Linq.Enumerable.ElementAt

      let nth (n:int) (pe:pseq<‘a>) : ‘a =

        ParallelEnumerable.ElementAt(pe, n)


      /// Parallel implementation of System.Linq.Enumerable.OrderBy

      let orderBy (f:’a -> ‘b) (pe:pseq<‘a>) =

        ParallelEnumerable.OrderBy(pe, Func<_,_>(f))


      /// Parallel implementation of System.Linq.Enumerable.Range

      let range (start:int) (count:int) : pseq<int> =

        ParallelEnumerable.Range(start, count)


      /// Parallel implementation of System.Linq.Enumerable.Skip

      let skip (n:int) (pe:pseq<‘a>) : pseq<‘a> =

        ParallelEnumerable.Skip(pe, n)


      /// Parallel implementation of System.Linq.Enumerable.SkipWhile

      let skipWhile (f:’a -> bool) (pe:pseq<‘a>) : pseq<‘a> =

        ParallelEnumerable.SkipWhile(pe, Func<_,_>(f))


      /// Parallel implementation of System.Linq.Enumerable.Sum().

      let sum_by_int : pseq<int> -> int = 



      // Trying to make sum genaric

//      let sum<‘a> : pseq<‘a> -> ‘a =

//        Seq.toList >> ParallelEnumerable.Sum


      /// Parallel implementation of System.Linq.Enumerable.Take

      let take (n:int) (pe:pseq<‘a>) : pseq<‘a> =

        ParallelEnumerable.Take(pe, n)


      /// Parallel implementation of System.Linq.Enumerable.TakeWhile

      let takeWhile (f:’a -> bool) (pe:pseq<‘a>) : pseq<‘a> =

        ParallelEnumerable.TakeWhile(pe, Func<_,_>(f))


      /// Parallel implementation of System.Linq.Enumerable.ToArray().

      let toArray : pseq<‘a> -> ‘a array = 



      /// Parallel implementation of to list  

      let toList<‘a> : pseq<‘a> -> ‘a list =

        toArray >> Array.toList    


      /// Parallel implementation of System.Linq.Enumerable.Zip

      let zip (pe1:pseq<‘a>) (pe2:pseq<‘b>) : pseq<‘a * ‘b> =

        let f a b = (a, b)

        ParallelEnumerable.Zip(pe1, pe2, Func<_,_,_>(f))             



    module Operators =


      /// Used for sequence expressions

      /// Example : let f = pseq [for x in [1..10] -> x * x] 

      let pseq (ie:seq<‘a>) : pseq<‘a> = PSeq.adapt ie


The way I was using this updated PSeq librarary was through the following expansion of Luke Hoban’s PDC09 talk:

#r "System.Core.dll"

open System.Linq

open System

#load "PSeqLib.fs"

open PLinq.Wrapper


let nums = List.ofArray [|99L..999999L|]

//let nums = List.ofArray <| [|1..100|] 


let sqr x = x * x


let sumOfSquaresI nums =

    let mutable acc = 0L

    for n in nums do

        acc <- acc + sqr n



let rec sumOfSquaresR nums =

    match nums with

    | n :: rest -> (sqr n) + (sumOfSquaresR rest)

    | [] -> 0L


let sumOfSquaresF nums =


    |> sqr

    |> Seq.sum


let sumOfSquaresP nums =


    |> pseq

    |> sqr

    |> Seq.sum



let pnums = pseq nums


let si = sumOfSquaresI nums

//let sr = sumOfSquaresR nums

let sf = sumOfSquaresF nums

let sp = sumOfSquaresP nums



Notice that the example above uses int64 instead of Luke’s int.  Also, I found that for large sets, the recursive version of the sumOfSquares function (sumOfSquaresR) runs into a StackOverflowException.  That is why it is commented out.  If you use the int list of 1..100 you can uncomment the call to sumOfSquaresR, but be sure to change the 0L (int64 literal for zero) to 0 (int literal for zero in the sumOfSquaresI and sumOfSquaresR functions.

3 thoughts on “Adding Parallel Extensions to F# for VS2010 Beta 2

  1. I thought I was crazy. Last week I was searching all over for PSeq. Luke just added it like it was no big deal. Is it expected that PSeq will be in the final version of VS2010?

  2. Me too. I have no idea if they plan to include it in the final version of VS2010. But at this late stage in the game (Beta 2), with 4 months until release, I wouldn’t hold your breath. Usually it is just bug fixes at this stage. So unless they have the implementation but didn’t include it with Beta 2 due to stability or completeness reasons, my guess is that we won’t see it. Remember that this is a version 1.0 product from the Visual Studio product division standpoint even though it is in version 1.9.x from a Microsoft Research project. So there may be different levels of robustness and testing that will be required since Microsoft will need to support the PSeq module if it is included. Mind you, this is all speculation from my end and I have not had any discussions on the topic with the Microsoft product team.

  3. let inline sum (pe:ParallelQuery<‘a>) = let zero = LanguagePrimitives.GenericZero<‘a> pe |> fold (+) zero let inline sumBy (f:’a -> ‘b) (pe:ParallelQuery<‘a>) = let zero = LanguagePrimitives.GenericZero<‘b> pe |> fold (fun a b -> a + f b) zero let inline max (pe:ParallelQuery<‘a>) = let zero = LanguagePrimitives.GenericZero<‘a> pe |> fold Operators.max zero let inline maxBy (f:’a -> ‘b) (pe:ParallelQuery<‘a>) = let zeroB = LanguagePrimitives.GenericZero<‘b> let max’ a (b:’a) = let a’ = snd a let b’ = f b if a’ > b’ then a else (Some(b), b’) match (pe |> fold max’ (None, zeroB) |> fst) with | Some(x) -> x | None -> Seq.empty<‘a> |> Seq.max let inline min (pe:ParallelQuery<‘a>) = let zero = LanguagePrimitives.GenericZero<‘a> pe |> fold Operators.min zero let inline minBy (f:’a -> ‘b) (pe:ParallelQuery<‘a>) = let zeroB = LanguagePrimitives.GenericZero<‘b> let min’ a (b:’a) = let a’ = snd a let b’ = f b if a’ < b’ then a else (Some(b), b’) match (pe |> fold min’ (None, zeroB) |> fst) with | Some(x) -> x | None -> Seq.empty<‘a> |> Seq.min let inline distinctBy (f:’a -> ‘b) (pe:ParallelQuery<‘a>) = let cmp = LanguagePrimitives.FastGenericEqualityComparer<‘b> let comparer = { new System.Collections.Generic.IEqualityComparer<‘a> with member x.Equals(a, b) = cmp.Equals(f(a), f(b)) member x.GetHashCode a = cmp.GetHashCode(f(a)) } ParallelEnumerable.Distinct(pe, comparer)

Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )


Connecting to %s