#light
namespace PLinq.Wrapper
/// from http://weblogs.asp.net/podwysocki/archive/2009/02/23/adding-parallel-extensions-to-f.aspx
/// Type wrapper over the ParallelQuery
type pseq<‘a> = System.Linq.ParallelQuery<‘a>
module PSeq =
open System
open System.Linq
/// Append two parallel collections together
let append (ie1: pseq<‘a>) (ie2: pseq<‘a>) : pseq<‘a> =
ParallelEnumerable.Concat(ie1, ie2)
/// This is the method to opt into Parallel LINQ.
let adapt : seq<‘a> -> pseq<‘a> =
ParallelEnumerable.AsParallel
/// This is the method to opt into Parallel LINQ with deg of parallelism
// let adaptn (n:int) (seq:seq<‘a>) : pseq<‘a> =
// if n < 1 then adapt seq
// else ParallelEnumerable.AsParallel( seq, n )
/// AsOrdered is a method that tells PLINQ to treat a data source as if it was ordered
let ordered : pseq<‘a> -> pseq<‘a> =
ParallelEnumerable.AsOrdered
/// AsUnordered tells PLINQ that it should treat a particular intermediate result as if no
/// order was implied
let unordered : pseq<‘a> -> pseq<‘a> =
ParallelEnumerable.AsUnordered
/// This method is to opt out of Parallel LINQ.
let as_seq : pseq<‘a> -> seq<‘a> =
ParallelEnumerable.AsSequential
/// Parallel implementation of System.Linq.Enumerable.Average().
let average_float : pseq<float> -> float =
ParallelEnumerable.Average
/// Parallel implementation of System.Linq.Enumerable.Average().
let average_float_by (f:’a -> float) (pe:pseq<‘a>) : float =
ParallelEnumerable.Average(pe, Func<_,_>(f))
/// Parallel implementation of System.Linq.ParallelEnumerable.Cast
let cast : ParallelQuery -> pseq<‘a> =
ParallelEnumerable.Cast
/// Parallel implementation of System.Linq.ParallelEnumerable.Distinct
let distinct : pseq<‘a> -> pseq<‘a> =
ParallelEnumerable.Distinct
/// Parallel implementation of System.Linq.ParallelEnumerable.Any
let exists (f:’a -> bool) (pe:pseq<‘a>) : bool =
ParallelEnumerable.Any(pe, Func<_,_>(f))
/// Parallel implementation of System.Linq.Enumerable.Where().
let filter (f:’a -> bool) (pe:pseq<‘a>) : pseq<‘a> =
ParallelEnumerable.Where(pe, Func<_,_>(f))
/// Parallel implementation of System.Linq.Enumerable.First
let find (f:’a -> bool) (pe:pseq<‘a>) : ‘a =
ParallelEnumerable.First(pe, Func<_,_>(f))
/// Parallel implementation of System.Linq.Enumerable.Aggregate().
let fold (f:’a -> ‘b -> ‘a) (seed:’a) (pe:pseq<‘b>) : ‘a =
ParallelEnumerable.Aggregate(pe, seed, Func<_,_,_>(f))
/// Parallel implementation of System.Linq.Enumerable.All
let forAll (f:’a -> bool) (pe:pseq<‘a>) : bool =
ParallelEnumerable.All(pe, Func<_,_>(f))
/// Empty ParallelEnumerable
let empty<‘a> : pseq<‘a> = ParallelEnumerable.Empty<‘a>()
/// Parallel implementation of System.Linq.Enumerable.First()
let hd : pseq<‘a> -> ‘a =
ParallelEnumerable.First
/// Parallel implementation of System.Linq.ParallelEnumerable.Count
let length : pseq<‘a> -> int =
ParallelEnumerable.Count
/// Parallel implementation of System.Linq.Enumerable.Select().
let map (f:’a -> ‘b) (pe:pseq<‘a>) : pseq<‘b> =
ParallelEnumerable.Select(pe, Func<_,_>(f))
/// Parallel implementation of System.Linq.Enumerable.Select().
let mapi (f:int -> ‘a -> ‘b) (pe:pseq<‘a>) : pseq<‘b> =
let f’ x i = f i x
ParallelEnumerable.Select(pe, Func<_,_,_>(f’))
/// Parallel implementation of System.Linq.Enumerable.Zip
let map2 (f:’a -> ‘b -> ‘c) (pe1:pseq<‘a>) (pe2:pseq<‘b>) : pseq<‘c> =
ParallelEnumerable.Zip(pe1, pe2, Func<_,_,_>(f))
/// Parallel implementation of System.Linq.Enumerable.Reverse
let rev : pseq<‘a> -> pseq<‘a> = ParallelEnumerable.Reverse
/// Parallel implementation of Seq.concat
let concat (pe:pseq<pseq<‘a>>) : pseq<‘a> =
ParallelEnumerable.SelectMany(
pe, Func<_,_>(fun x -> x :> seq<‘a>))
/// Parallel implementation of System.Linq.Enumerable.ElementAt
let nth (n:int) (pe:pseq<‘a>) : ‘a =
ParallelEnumerable.ElementAt(pe, n)
/// Parallel implementation of System.Linq.Enumerable.OrderBy
let orderBy (f:’a -> ‘b) (pe:pseq<‘a>) =
ParallelEnumerable.OrderBy(pe, Func<_,_>(f))
/// Parallel implementation of System.Linq.Enumerable.Range
let range (start:int) (count:int) : pseq<int> =
ParallelEnumerable.Range(start, count)
/// Parallel implementation of System.Linq.Enumerable.Skip
let skip (n:int) (pe:pseq<‘a>) : pseq<‘a> =
ParallelEnumerable.Skip(pe, n)
/// Parallel implementation of System.Linq.Enumerable.SkipWhile
let skipWhile (f:’a -> bool) (pe:pseq<‘a>) : pseq<‘a> =
ParallelEnumerable.SkipWhile(pe, Func<_,_>(f))
/// Parallel implementation of System.Linq.Enumerable.Sum().
let sum_by_int : pseq<int> -> int =
ParallelEnumerable.Sum
// Trying to make sum genaric
// let sum<‘a> : pseq<‘a> -> ‘a =
// Seq.toList >> ParallelEnumerable.Sum
/// Parallel implementation of System.Linq.Enumerable.Take
let take (n:int) (pe:pseq<‘a>) : pseq<‘a> =
ParallelEnumerable.Take(pe, n)
/// Parallel implementation of System.Linq.Enumerable.TakeWhile
let takeWhile (f:’a -> bool) (pe:pseq<‘a>) : pseq<‘a> =
ParallelEnumerable.TakeWhile(pe, Func<_,_>(f))
/// Parallel implementation of System.Linq.Enumerable.ToArray().
let toArray : pseq<‘a> -> ‘a array =
ParallelEnumerable.ToArray
/// Parallel implementation of to list
let toList<‘a> : pseq<‘a> -> ‘a list =
toArray >> Array.toList
/// Parallel implementation of System.Linq.Enumerable.Zip
let zip (pe1:pseq<‘a>) (pe2:pseq<‘b>) : pseq<‘a * ‘b> =
let f a b = (a, b)
ParallelEnumerable.Zip(pe1, pe2, Func<_,_,_>(f))
[<AutoOpen>]
module Operators =
/// Used for sequence expressions
/// Example : let f = pseq [for x in [1..10] -> x * x]
let pseq (ie:seq<‘a>) : pseq<‘a> = PSeq.adapt ie
#r "System.Core.dll"
open System.Linq
open System
#load "PSeqLib.fs"
open PLinq.Wrapper
let nums = List.ofArray [|99L..999999L|]
//let nums = List.ofArray <| [|1..100|]
let sqr x = x * x
let sumOfSquaresI nums =
let mutable acc = 0L
for n in nums do
acc <- acc + sqr n
acc
let rec sumOfSquaresR nums =
match nums with
| n :: rest -> (sqr n) + (sumOfSquaresR rest)
| [] -> 0L
let sumOfSquaresF nums =
nums
|> Seq.map sqr
|> Seq.sum
let sumOfSquaresP nums =
nums
|> pseq
|> PSeq.map sqr
|> Seq.sum
let pnums = pseq nums
let si = sumOfSquaresI nums
//let sr = sumOfSquaresR nums
let sf = sumOfSquaresF nums
let sp = sumOfSquaresP nums
I thought I was crazy. Last week I was searching all over for PSeq. Luke just added it like it was no big deal. Is it expected that PSeq will be in the final version of VS2010?
Me too. I have no idea if they plan to include it in the final version of VS2010. But at this late stage in the game (Beta 2), with 4 months until release, I wouldn’t hold your breath. Usually it is just bug fixes at this stage. So unless they have the implementation but didn’t include it with Beta 2 due to stability or completeness reasons, my guess is that we won’t see it. Remember that this is a version 1.0 product from the Visual Studio product division standpoint even though it is in version 1.9.x from a Microsoft Research project. So there may be different levels of robustness and testing that will be required since Microsoft will need to support the PSeq module if it is included. Mind you, this is all speculation from my end and I have not had any discussions on the topic with the Microsoft product team.
let inline sum (pe:ParallelQuery<‘a>) = let zero = LanguagePrimitives.GenericZero<‘a> pe |> fold (+) zero let inline sumBy (f:’a -> ‘b) (pe:ParallelQuery<‘a>) = let zero = LanguagePrimitives.GenericZero<‘b> pe |> fold (fun a b -> a + f b) zero let inline max (pe:ParallelQuery<‘a>) = let zero = LanguagePrimitives.GenericZero<‘a> pe |> fold Operators.max zero let inline maxBy (f:’a -> ‘b) (pe:ParallelQuery<‘a>) = let zeroB = LanguagePrimitives.GenericZero<‘b> let max’ a (b:’a) = let a’ = snd a let b’ = f b if a’ > b’ then a else (Some(b), b’) match (pe |> fold max’ (None, zeroB) |> fst) with | Some(x) -> x | None -> Seq.empty<‘a> |> Seq.max let inline min (pe:ParallelQuery<‘a>) = let zero = LanguagePrimitives.GenericZero<‘a> pe |> fold Operators.min zero let inline minBy (f:’a -> ‘b) (pe:ParallelQuery<‘a>) = let zeroB = LanguagePrimitives.GenericZero<‘b> let min’ a (b:’a) = let a’ = snd a let b’ = f b if a’ < b’ then a else (Some(b), b’) match (pe |> fold min’ (None, zeroB) |> fst) with | Some(x) -> x | None -> Seq.empty<‘a> |> Seq.min let inline distinctBy (f:’a -> ‘b) (pe:ParallelQuery<‘a>) = let cmp = LanguagePrimitives.FastGenericEqualityComparer<‘b> let comparer = { new System.Collections.Generic.IEqualityComparer<‘a> with member x.Equals(a, b) = cmp.Equals(f(a), f(b)) member x.GetHashCode a = cmp.GetHashCode(f(a)) } ParallelEnumerable.Distinct(pe, comparer)