Synchronously Await an Async Workflow Using Observable.guard

The other day, I couldn’t figure out how to Synchronously wait for an Async workflow to complete. I was pretty sure I needed to use Observable.guard, I just couldn’t wrap my head around how to use it. This morning I was on a mission to figure it out.

open System
open FSharp.Control

let event = new Event<_>()

let createObservableByExecutingAsync asyncToExecute =
    event.Publish
    |> Observable.guard(fun _ -> 

        let asyncOperation = async {
            asyncToExecute
            |> Async.RunSynchronously

            event.Trigger(())
        }
        
        printfn "Start Executing Async Operation"

        Async.Start asyncOperation

        printfn "End Executing Async Operation"
    )

let someAsyncOperation = async {
        printfn "Async Operation"
    }

let asyncWorker = createObservableByExecutingAsync someAsyncOperation

printfn "Start Waiting for Async Result"

Async.AwaitObservable asyncWorker
|> Async.RunSynchronously

printfn "End Waiting for Async Result"

Console.ReadLine()
|> ignore

This bit of code produces the most beautiful output of…

Start Waiting for Async Result 
Start Executing Async Operation
End Executing Async Operation
Async Operation
End Waiting for Async Result

Figuring that out makes me feel better, but the bigger realization was that I didn’t need Observable.guard at all. I needed Async.AwaitObservable.

open System
open FSharp.Control
open RateLimiting
 
type Messages =
    | Message of string
 
type Responses =
    | Response of string
 
let operationFunction (message: Messages) =
    match message with
    | Message(input) -> Response(sprintf "Response: %s" input)
 
let agent =
    RateLimitedAgent(operationFunction, 4, 1000)
 
let event = new Event<_>()

let someAsyncOperation (response:Responses) = async {
        printfn "Async Response"
        event.Trigger(())
    }

agent.QueueItem({payload = Message("Awaited Payload") ; response = Some(someAsyncOperation)})
|> Async.Start

printfn "Start Waiting for Async Result"

Async.AwaitObservable event.Publish
|> Async.RunSynchronously

printfn "End Waiting for Async Result"

Console.ReadLine()
|> ignore
Start Waiting for Async Result
Async Response
End Waiting for Async Result

I love watching this stuff work.

Async Wait Operation

Rate Limiting a Function with Async Workflows

So I am trying to deal with an API that says I cannot make more than 4 requests per second. This is a really good use for F# since it has such great support for Asynchronous Workflows. I took a little inspiration from this post by Luca Bolognese.

module RateLimiting

open System
open FSharp.Control

type RateLimitedMessage<'a,'b> = {payload: 'a ; response: ('b -> Async<unit>) option}

type RateLimitedAgent<'a,'b>(
                                operation: 'a -> 'b,
                                workerCount, 
                                ?minExecutionTime: int,
                                ?errorHandler: Exception -> 'a -> Async<unit>
                            ) = class
    let blockingQueue = new BlockingQueueAgent<RateLimitedMessage<'a, 'b>>(workerCount)
    let semaphore = new System.Threading.Semaphore(workerCount, workerCount)

    let errorHandler = defaultArg errorHandler (fun _ _ -> async{()})
    let minExecutionTime = defaultArg minExecutionTime -1

    let createWorker () =
        async {
            while true do
                semaphore.WaitOne()
                |> ignore

                let! message = blockingQueue.AsyncGet()

                let messageProcessor =
                    async {
                            try
                                let response = operation message.payload
                                if message.response.IsSome then
                                    do! message.response.Value response
                            with 
                            | ex ->
                                errorHandler ex message.payload
                                |> Async.Start
                        }

                   
                seq {
                        yield messageProcessor
                        yield async {
                                do! Async.Sleep(minExecutionTime)
                                semaphore.Release()
                                    |> ignore
                            }
                    }
                |> Async.Parallel
                |> Async.Ignore
                |> Async.Start
            }

    do
        (fun _ -> createWorker ())
        |> Seq.init workerCount
        |> Async.Parallel
        |> Async.Ignore
        |> Async.Start

    member x.QueueItem item =
        blockingQueue.AsyncAdd item
end

RateLimitedAgent

Messages are passed in through a BlockingQueueAgent from fsharpx’s Agent Library. This allows users to choose whether or not they wait until messages are picked up for processing.

Here is an example of where we don’t wait at all. Although, we throw in some random exceptions for some fun.

open System
open System.Diagnostics
open System.Threading
open FSharp.Control
open RateLimiting

let counterCreationData name ctype = new CounterCreationData(name, "", ctype)

let counterCategoryName = "Test Rate Limiter"
let requestsProcessedPerSecondCounterItem = "Requests Processed per second"
let requestsQueuedPerSecondCounterItem = "Requests Queued per second"
let requestsQueuedCounterItem = "Number of Items Queued"

let performanceCounters = [|
        counterCreationData requestsProcessedPerSecondCounterItem PerformanceCounterType.RateOfCountsPerSecond32 
        counterCreationData requestsQueuedPerSecondCounterItem PerformanceCounterType.RateOfCountsPerSecond32 ;
        counterCreationData requestsQueuedCounterItem PerformanceCounterType.NumberOfItems32
    |]

let removeCounters () = PerformanceCounterCategory.Delete(counterCategoryName)

let installCounters () =
    if PerformanceCounterCategory.Exists(counterCategoryName)
    then
        removeCounters ()

    PerformanceCounterCategory.Create(counterCategoryName, "", PerformanceCounterCategoryType.SingleInstance, new CounterCreationDataCollection(performanceCounters)) 
    |> ignore 

installCounters()
    
let createCounter name =
    let curProc = Process.GetCurrentProcess()
    new PerformanceCounter(counterCategoryName, name, false)

let requestProcessedSpeedCounter = createCounter requestsProcessedPerSecondCounterItem
let requestQueuedSpeedCounter = createCounter requestsQueuedPerSecondCounterItem

let queueCounter = createCounter requestsQueuedCounterItem
queueCounter.RawValue <- 0L

let rndSeed = new Random()

type Messages =
    | Message of string

type Responses =
    | Response of string

let operationFunction (message: Messages) =
    match rndSeed.Next(10) with
    | x when x = 1 -> failwith "Error"
    | _ ->
        match message with
        | Message(input) -> Response(sprintf "Response: %s" input)

let errorHandler (ex:Exception) message = async {
                                    requestProcessedSpeedCounter.Increment()
                                    |> ignore

                                    queueCounter.Decrement()
                                    |> ignore

                                    printfn "Error"
                                }

let agent =
    RateLimitedAgent(operationFunction, 4, 1000, errorHandler)

let responseFunction (someArg:Responses) = async{
    match someArg with
    | Response(response) ->
        requestProcessedSpeedCounter.Increment()
        |> ignore

        queueCounter.Decrement()
        |> ignore

        printfn "Responded: %s" response
    }

let createDisposable name =
    let x = ref 1
    let rnd = new Random(rndSeed.Next())

    async {
        while true do               
            agent.QueueItem({payload = Message(sprintf "%s Request %i" name (!x)) ; response = Some(responseFunction)})
            |> Async.Start
            
            requestQueuedSpeedCounter.Increment()
            |> ignore

            queueCounter.Increment()
            |> ignore

            x := !x + 1

            do! Async.Sleep(550)
        }
    |> Async.StartDisposable

let disposable1 = createDisposable "Automatic 1"
let disposable2 = createDisposable "Automatic 2"
let disposable3 = createDisposable "Automatic 3"
let disposable4 = createDisposable "Automatic 4"

Console.ReadLine()
|> ignore

disposable1.Dispose()
disposable2.Dispose()
disposable3.Dispose()
disposable4.Dispose()

Console.ReadLine()
|> ignore

So this is just about what I was looking for. I say just about because apparently I can get spikes where I go over limit. I have a strong belief this is a calculation error within Performance Monitor, but if we were really concerned about it, we could increase the worker cooldown time to 1.1s.

Because it’s fun, I’ve toyed some with the operation function to include a random delay, just to see the results.

let operationFunction (message: Messages) =
    Thread.Sleep(rndSeed.Next(50,2500))
    match rndSeed.Next(10) with
    | x when x = 1 -> failwith "Error"
    | _ ->
        match message with
        | Message(input) -> Response(sprintf "Response: %s" input)

Not too shabby..

Messages are bundled with an optional Async response. This allows the caller to provide an Async function to handle the response. If a user were interested, I’m pretty sure this is a more likely and useful target to block your Async Workflow for.

I’m sure I have to use Observable.guard like Thomas Petricek reffered to here and here.

So, create an Async Workflow, make it throw an event, add the message to the agent, then wait until the event is observed. Sounds simple enough, but I just can’t wrap my head around the mechanics of it at this point. I think I’m going to ask Thomas to help me out.

Edited 10/17/2011
As per comments below