Search

Synchronously Await an Async Workflow Using Observable.guard

The other day, I couldn’t figure out how to Synchronously wait for an Async workflow to complete. I was pretty sure I needed to use Observable.guard, I just couldn’t wrap my head around how to use it. This morning I was on a mission to figure it out.

open System
open FSharp.Control

let event = new Event<_>()

let createObservableByExecutingAsync asyncToExecute =
    event.Publish
    |> Observable.guard(fun _ -> 

        let asyncOperation = async {
            asyncToExecute
            |> Async.RunSynchronously

            event.Trigger(())
        }
        
        printfn "Start Executing Async Operation"

        Async.Start asyncOperation

        printfn "End Executing Async Operation"
    )

let someAsyncOperation = async {
        printfn "Async Operation"
    }

let asyncWorker = createObservableByExecutingAsync someAsyncOperation

printfn "Start Waiting for Async Result"

Async.AwaitObservable asyncWorker
|> Async.RunSynchronously

printfn "End Waiting for Async Result"

Console.ReadLine()
|> ignore

This bit of code produces the most beautiful output of…

Start Waiting for Async Result 
Start Executing Async Operation
End Executing Async Operation
Async Operation
End Waiting for Async Result

Figuring that out makes me feel better, but the bigger realization was that I didn’t need Observable.guard at all. I needed Async.AwaitObservable.

open System
open FSharp.Control
open RateLimiting
 
type Messages =
    | Message of string
 
type Responses =
    | Response of string
 
let operationFunction (message: Messages) =
    match message with
    | Message(input) -> Response(sprintf "Response: %s" input)
 
let agent =
    RateLimitedAgent(operationFunction, 4, 1000)
 
let event = new Event<_>()

let someAsyncOperation (response:Responses) = async {
        printfn "Async Response"
        event.Trigger(())
    }

agent.QueueItem({payload = Message("Awaited Payload") ; response = Some(someAsyncOperation)})
|> Async.Start

printfn "Start Waiting for Async Result"

Async.AwaitObservable event.Publish
|> Async.RunSynchronously

printfn "End Waiting for Async Result"

Console.ReadLine()
|> ignore
Start Waiting for Async Result
Async Response
End Waiting for Async Result

I love watching this stuff work.

Async Wait Operation

Rate Limiting a Function with Async Workflows

So I am trying to deal with an API that says I cannot make more than 4 requests per second. This is a really good use for F# since it has such great support for Asynchronous Workflows. I took a little inspiration from this post by Luca Bolognese.

module RateLimiting

open System
open FSharp.Control

type RateLimitedMessage<'a,'b> = {payload: 'a ; response: ('b -> Async<unit>) option}

type RateLimitedAgent<'a,'b>(
                                operation: 'a -> 'b,
                                workerCount, 
                                ?minExecutionTime: int,
                                ?errorHandler: Exception -> 'a -> Async<unit>
                            ) = class
    let blockingQueue = new BlockingQueueAgent<RateLimitedMessage<'a, 'b>>(workerCount)
    let semaphore = new System.Threading.Semaphore(workerCount, workerCount)

    let errorHandler = defaultArg errorHandler (fun _ _ -> async{()})
    let minExecutionTime = defaultArg minExecutionTime -1

    let createWorker () =
        async {
            while true do
                semaphore.WaitOne()
                |> ignore

                let! message = blockingQueue.AsyncGet()

                let messageProcessor =
                    async {
                            try
                                let response = operation message.payload
                                if message.response.IsSome then
                                    do! message.response.Value response
                            with 
                            | ex ->
                                errorHandler ex message.payload
                                |> Async.Start
                        }

                   
                seq {
                        yield messageProcessor
                        yield async {
                                do! Async.Sleep(minExecutionTime)
                                semaphore.Release()
                                    |> ignore
                            }
                    }
                |> Async.Parallel
                |> Async.Ignore
                |> Async.Start
            }

    do
        (fun _ -> createWorker ())
        |> Seq.init workerCount
        |> Async.Parallel
        |> Async.Ignore
        |> Async.Start

    member x.QueueItem item =
        blockingQueue.AsyncAdd item
end

RateLimitedAgent

Messages are passed in through a BlockingQueueAgent from fsharpx’s Agent Library. This allows users to choose whether or not they wait until messages are picked up for processing.

Here is an example of where we don’t wait at all. Although, we throw in some random exceptions for some fun.

open System
open System.Diagnostics
open System.Threading
open FSharp.Control
open RateLimiting

let counterCreationData name ctype = new CounterCreationData(name, "", ctype)

let counterCategoryName = "Test Rate Limiter"
let requestsProcessedPerSecondCounterItem = "Requests Processed per second"
let requestsQueuedPerSecondCounterItem = "Requests Queued per second"
let requestsQueuedCounterItem = "Number of Items Queued"

let performanceCounters = [|
        counterCreationData requestsProcessedPerSecondCounterItem PerformanceCounterType.RateOfCountsPerSecond32 
        counterCreationData requestsQueuedPerSecondCounterItem PerformanceCounterType.RateOfCountsPerSecond32 ;
        counterCreationData requestsQueuedCounterItem PerformanceCounterType.NumberOfItems32
    |]

let removeCounters () = PerformanceCounterCategory.Delete(counterCategoryName)

let installCounters () =
    if PerformanceCounterCategory.Exists(counterCategoryName)
    then
        removeCounters ()

    PerformanceCounterCategory.Create(counterCategoryName, "", PerformanceCounterCategoryType.SingleInstance, new CounterCreationDataCollection(performanceCounters)) 
    |> ignore 

installCounters()
    
let createCounter name =
    let curProc = Process.GetCurrentProcess()
    new PerformanceCounter(counterCategoryName, name, false)

let requestProcessedSpeedCounter = createCounter requestsProcessedPerSecondCounterItem
let requestQueuedSpeedCounter = createCounter requestsQueuedPerSecondCounterItem

let queueCounter = createCounter requestsQueuedCounterItem
queueCounter.RawValue <- 0L

let rndSeed = new Random()

type Messages =
    | Message of string

type Responses =
    | Response of string

let operationFunction (message: Messages) =
    match rndSeed.Next(10) with
    | x when x = 1 -> failwith "Error"
    | _ ->
        match message with
        | Message(input) -> Response(sprintf "Response: %s" input)

let errorHandler (ex:Exception) message = async {
                                    requestProcessedSpeedCounter.Increment()
                                    |> ignore

                                    queueCounter.Decrement()
                                    |> ignore

                                    printfn "Error"
                                }

let agent =
    RateLimitedAgent(operationFunction, 4, 1000, errorHandler)

let responseFunction (someArg:Responses) = async{
    match someArg with
    | Response(response) ->
        requestProcessedSpeedCounter.Increment()
        |> ignore

        queueCounter.Decrement()
        |> ignore

        printfn "Responded: %s" response
    }

let createDisposable name =
    let x = ref 1
    let rnd = new Random(rndSeed.Next())

    async {
        while true do               
            agent.QueueItem({payload = Message(sprintf "%s Request %i" name (!x)) ; response = Some(responseFunction)})
            |> Async.Start
            
            requestQueuedSpeedCounter.Increment()
            |> ignore

            queueCounter.Increment()
            |> ignore

            x := !x + 1

            do! Async.Sleep(550)
        }
    |> Async.StartDisposable

let disposable1 = createDisposable "Automatic 1"
let disposable2 = createDisposable "Automatic 2"
let disposable3 = createDisposable "Automatic 3"
let disposable4 = createDisposable "Automatic 4"

Console.ReadLine()
|> ignore

disposable1.Dispose()
disposable2.Dispose()
disposable3.Dispose()
disposable4.Dispose()

Console.ReadLine()
|> ignore

So this is just about what I was looking for. I say just about because apparently I can get spikes where I go over limit. I have a strong belief this is a calculation error within Performance Monitor, but if we were really concerned about it, we could increase the worker cooldown time to 1.1s.

Because it’s fun, I’ve toyed some with the operation function to include a random delay, just to see the results.

let operationFunction (message: Messages) =
    Thread.Sleep(rndSeed.Next(50,2500))
    match rndSeed.Next(10) with
    | x when x = 1 -> failwith "Error"
    | _ ->
        match message with
        | Message(input) -> Response(sprintf "Response: %s" input)

Not too shabby..

Messages are bundled with an optional Async response. This allows the caller to provide an Async function to handle the response. If a user were interested, I’m pretty sure this is a more likely and useful target to block your Async Workflow for.

I’m sure I have to use Observable.guard like Thomas Petricek reffered to here and here.

So, create an Async Workflow, make it throw an event, add the message to the agent, then wait until the event is observed. Sounds simple enough, but I just can’t wrap my head around the mechanics of it at this point. I think I’m going to ask Thomas to help me out.

Edited 10/17/2011
As per comments below

Project Euler in F# – Problems 4 – 6

Problem 4

Find the largest palindrome that results from the multiplication of two numbers under 1000

I feel like I have a strange obsession with the Seq.unfold function. That function does most of the heavy lifting here.

Although the wise Euler pointed out to me that I can avoid duplicate multiplication factors, and keep track of the largest palindrome so far to avoid unnecessary `isPalindrome` checks.

let problem4 () =
    let isPalindrome (value: string) =
        match value.Length with
        | x when x % 2 = 1 -> (x-1)/2, (x-1)/2 + 1
        | x -> (x/2), (x/2)
        |> fun (pos1, pos2) ->
                    value.Substring(0, pos1), value.Substring(pos2)
        |> (fun (firstHalf, secondHalf) ->

            Array.rev (secondHalf.ToCharArray())
            |> (fun reversedcharArray -> new string(reversedcharArray))
            |> (fun reversedSecondHalf ->
                    firstHalf.Equals(reversedSecondHalf)))

    let productSeq (maximumInt:int) =
        Seq.unfold (fun state ->
            let mult1, mult2 = state
            let result = mult1 * mult2

            match state with
            | 100, _ -> None
            | _, 100 -> Some((result), ( (mult1 - 1) , (mult1 - 1)))
            | _ -> Some((result), ( mult1, (mult2 - 1)))
        ) (maximumInt, maximumInt)

    let largestPalindromeSoFar = ref None

    productSeq 999
    |> Seq.filter(fun (result) ->

        match !largestPalindromeSoFar with
        | Some(largestResult) when largestResult > result -> false
        | _ -> match isPalindrome (result.ToString()) with
                  | true ->
                        largestPalindromeSoFar := Some(result)
                        true
                  | _ -> false)
    |> Seq.max

Problem 5

What is the smallest positive number that is evenly divisible by all of the numbers from 1 to 20?

I was so happy with my brute force solution, it looks so pretty, unfortunately it takes 3 1/2 minutes on my machine.

let problem5 () =
    let evenlyDivisibleByAllUpTo limit number =
        {limit .. -1 .. 1}
        |> Seq.forall(fun testNumber -> number % testNumber = 0)

    Seq.initInfinite(fun i -> (i + 1) * 20)
    |> Seq.find(fun testIndex ->evenlyDivisibleByAllUpTo 20 testIndex)

Project Euler’s math explains the problem a bit differently. I cant explain the formula in ten words or less, but after understanding it and applying a bit of functional style, results in this, which runs in under 1ms.

let problem5_correct () =
    
    let primesWithLimit limit =
        let testPrime (possiblePrime:float) =
            let sqrRootOfPrime = sqrt(possiblePrime)
            
            {2.0 .. sqrRootOfPrime}
            |> Seq.forall(fun divisor -> 
                match divisor with
                | 1.0 -> true
                | x when divisor = possiblePrime -> true
                | _ -> possiblePrime % divisor > 0.0)

        {2.0 .. limit}
        |> Seq.filter(fun index -> testPrime index)

    let computeLimit = sqrt(20.0)

    primesWithLimit 20.0
    |> Seq.fold(fun state prime ->
        
        let exponent =
            match prime < computeLimit with
            | true -> floor( (log(20.0) / log(prime)) )
            | _ -> 1.0

        state * (float(prime) ** exponent)) 1.0
    |> int64

Problem 6

Find the difference between the sum of the squares of the first one hundred natural numbers and the square of the sum.

The brute force of this is rather innocent looking, and runs in < 1ms. [fsharp] let problem6 () = let sumOfSquares maximum = {1L .. maximum} |> Seq.map(fun number -> number * number) |> Seq.sum let squareOfSums maximum = {1L .. maximum} |> Seq.sum |> fun result -> result * result (squareOfSums 100L) - (sumOfSquares 100L) [/fsharp] Following the algorithm from Euler, there isn't much to it at all. [fsharp] let problem6_correct () = let limit = 100L let sum = limit * (limit + 1L) / 2L let sum_sq = ((2L * limit) + 1L) * (limit + 1L) * limit / 6L int64(pown sum 2) - sum_sq [/fsharp]

Project Euler in F# – Problems 1 – 3

Problem 1

Find the sum of all the multiples of 3 or 5 below 1000.

Brute force is the first thing that comes to mind

let problem1 () =
    {1 .. 999}
    |> Seq.filter (fun x -> (x % 3 = 0) || (x % 5 = 0))
    |> Seq.sum

But the wise Euler shows Math to be a better tactic, as his answer is unquestionably faster.

let problem1_correct () =
    let target = 999

    let sumDivisibleBy value =
        let p = target / value
        value * (p * (p + 1)) / 2

    (sumDivisibleBy 3) + (sumDivisibleBy 5) - (sumDivisibleBy 15)

Problem 2

Sum of all even Fibonacci numbers under 4 million

No real curveball here, unfold with a limit was the first Fibonacci method that came to mind

let problem2 () =
    let fibonacciSeq withLimit =
        Seq.unfold (fun previousState ->
            let nMinusTwo, nMinusOne = previousState
            let nextValue = nMinusTwo + nMinusOne
            let nextState = (nMinusOne, nextValue)

            match nextValue < withLimit with
            | true -> Some(nextValue, nextState)
            | false -> None
        ) (0, 1)

    fibonacciSeq 4000000
    |> Seq.filter(fun x -> x % 2 = 0)
    |> Seq.sum

Problem 3

What is the largest prime factor of the number 600851475143?

This one might take a few tries to get correct. I actually went and completed a few other Euler problems before coming back to this one.

The sample number, 13195, is the red herring here because it is so easy to get the answer if you aren’t thinking.

let problem3 () =
    let testPrime (possiblePrime:int64) =
        let sqrRootOfPrime = int64(System.Math.Sqrt(float(possiblePrime)))
            
        {1L .. sqrRootOfPrime}
        |> Seq.forall(fun divisor -> 
            match divisor with
            | 1L -> true
            | x when divisor = possiblePrime -> true
            | _ -> possiblePrime % divisor > 0L)

    let factorSeq ofNumber =
        {2L .. (ofNumber/2L)}
        |> Seq.filter(fun testNumber -> ofNumber % testNumber = 0L)
        |> Seq.map(fun factor1 -> (factor1, (ofNumber / factor1)))
        |> Seq.takeWhile(fun (factor1, factor2) -> factor1 <= factor2)
        |> Seq.fold (fun acc (factor1, factor2) -> factor1 :: factor2 :: acc) []
        |> Seq.sort
        |> Seq.toList
        |> List.rev

    factorSeq 600851475143L
    |> Seq.find (fun factor -> testPrime factor)

That was unofficially my fourth attempt.

Project Euler in F#

While job hunting this time around, a recruiter pointed me to a website called Project Euler, as a great place to practice.

Project Euler is a series of challenging mathematical/computer programming problems that will require more than just mathematical insights to solve. Although mathematics will help you arrive at elegant and efficient methods, the use of a computer and programming skills will be required to solve most problems.

The problems always seem overly complicated, and somehow ask you to find the sum of this operation or said sequence. Most of the time, you are even provided with a sample input and output. An input I’m sure which is carefully chosen to tempt you to write a brute force algorithm.

I immediately went to do them in F#, because I find it so fun. 25 problems later… Why stop there…
After talking to Justin about it, I decided to blog my progress.

So this is a test to see how far I can get.
This might take a while…

Current Level: