reF#actoring: rewriting an actor in F
This article is an instalment in the F# Advent Calendar.
Last time I checked, Akka.NET NuGet package had almost 100K downloads – I am pretty sure when you are reading this article it will pass this number. The package for Akka.NET F# API has much more modest use: less than 10K downloads at the time of writing. Some of these downloads are from our team – a small development group writing a media distribution engine at NRK (Norwegian Broadcasting Corporation).
Akka.NET is an actor model framework, and actors are building blocks of our system. All of them – with exception of system actors – are written by us. However, recently we found an open source actor that provided functionality we were looking for. We started using it but soon decided that for our needs it will be better to write a similar one in F#. This article summarizes our rewriting experience.
Quartz.NET and Akka.Quartz.Actor
While Akka.NET provides an in-memory scheduler, there is no support for persistent message scheduling out of the box. It’s not hard to roll up your own, but unless you have very special needs, you should be fine with some of third-party libraries. One of the popular choices is Quartz.NET (port from a Java project), and it even has an Akka.NET actor! We didn’t have to look further.
Here's how standard communication between actors look:
Decorating Quartz.NET job trigger with F# discriminated union
The reason for Akka.Quartz.Actor to require a client to reference Quartz.NET is that the client itself builds an instance of Quartz.NET trigger object (that implements ITrigger interface). So a client typically contains code like this:
F#TriggerBuilder.Create().WithCronSchedule("*0/10 * * * * ?").Build();
F#TriggerBuilder.Create().StartAt(startTime).Build()
F#type JobSchedule =
| Once of DateTimeOffset
| RepeatForever of TimeSpan
| RepeatForeverAfter of DateTimeOffset * TimeSpan
| RepeatWithCount of TimeSpan * int
| RepeatWithCountAfter of DateTimeOffset * TimeSpan * int
F#let createTriggerBuilder (jobSchedule : JobSchedule) =
let builder = TriggerBuilder.Create()
match jobSchedule with
| Once startTime ->
builder.StartAt(startTime)
| RepeatForever interval ->
builder.StartNow().WithSimpleSchedule(
fun x -> x.WithInterval(interval).RepeatForever() |> ignore)
| RepeatForeverAfter (startTime, interval) ->
builder.StartAt(startTime).WithSimpleSchedule(
fun x -> x.WithInterval(interval).RepeatForever() |> ignore)
| RepeatWithCount (interval, count) ->
builder.StartNow().WithSimpleSchedule(
fun x -> x.WithInterval(interval).WithRepeatCount(count) |> ignore)
| RepeatWithCountAfter (startTime, interval, count) ->
builder.StartAt(startTime).WithSimpleSchedule(
fun x -> x.WithInterval(interval).WithRepeatCount(count) |> ignore)
Porting commands and events
The Akka.Quartz.Actor project is under active development, so I will refer to the version built for Akka.NET 1.1.1 (committed on August 5th, 2016). The original project contained 10 source files in C#: 3 command definitions (1 of them is interface), 5 event definitions (also 1 of them is interface), 1 file with the implementation of QuartzJob (implements IJob interface) and the actual actor implementation QuartzActor. In total: 10 files (2 interfaces + 8 classes), 321 lines of code (267 lines without comments). F# code is typically more compact (much more compact) than C#, so it was interesting to see what the metrics for the ported code would be.
We started with commands and events placing them in a single file (instead of the original 8). Here's what we ended up with:
F#open System
open Akka.Actor
type JobMessage = obj
type JobId = obj
type JobCommand =
| CreateJob of IActorRef * JobMessage * JobSchedule
| RemoveJob of JobId
type JobCommandResult =
| Success of JobId
| Error of JobId * Exception
F#using Akka.Actor;
using Quartz;
namespace Akka.Quartz.Actor.Commands
{
public class CreateJob : IJobCommand
{
public CreateJob(IActorRef to, object message, ITrigger trigger)
{
To = to;
Message = message;
Trigger = trigger;
}
public IActorRef To { get; private set; }
public object Message { get; private set; }
public ITrigger Trigger { get; private set; }
}
}
F#namespace Akka.Quartz.Actor.Commands
{
internal interface IJobCommand
{
}
}
F#let scheduleActor props (mailbox: Actor<_>) = ...
F#let scheduleActor props (mailbox: Actor<_>) =
let scheduler =
match props with
| Some props -> StdSchedulerFactory(props).GetScheduler()
| None -> StdSchedulerFactory().GetScheduler()
scheduler.Start()
mailbox.Defer (fun _ -> scheduler.Shutdown())
let rec loop () =
actor {
let! message = mailbox.Receive ()
match message with
| CreateJob (actor, message, jobSchedule) ->
match (actor, jobSchedule) with
| (null,_) -> mailbox.Sender() <! Error (null, new ArgumentNullException("CreateJob actor is null"))
| _ ->
let builder = createTriggerBuilder jobSchedule
let trigger = builder.Build()
try
let job = QuartzJob.CreateBuilderWithData(actor, message)
.WithIdentity(trigger.JobKey)
.Build()
scheduler.ScheduleJob(job, trigger) |> ignore
mailbox.Sender() <! Success trigger.JobKey
with ex ->
mailbox.Sender() <! Error (trigger.JobKey, ex)
| RemoveJob (jobKey) ->
try
match scheduler.DeleteJob(jobKey :?> JobKey) with
| true ->
mailbox.Sender() <! Success jobKey
| false ->
mailbox.Sender() <! Error (jobKey, new InvalidOperationException("Job not found"))
with ex ->
mailbox.Sender() <! Error (jobKey, ex)
return! loop ()
}
loop ()
F#type private QuartzJob () =
static let MessageKey = "message"
static let ActorKey = "actor"
interface IJob with
member this.Execute (context : IJobExecutionContext) =
let jdm = context.JobDetail.JobDataMap
if jdm.ContainsKey(MessageKey) && jdm.ContainsKey(ActorKey) then
match jdm.[ActorKey] with
| :? IActorRef as actor -> actor <! jdm.[MessageKey]
| _ -> ()
static member CreateBuilderWithData (actorRef : IActorRef, message : obj) =
let jdm = new JobDataMap()
jdm.AddAndReturn(MessageKey, message).Add(ActorKey, actorRef)
JobBuilder.Create<QuartzJob>().UsingJobData(jdm)
F#[<Fact>]
let ``Should deliver message with short delivery time`` () : unit = testDefault <| fun tck ->
let scheduler = spawn tck "scheduler" <| scheduleActor None
let schedule = Once <| DateTimeOffset.Now.AddSeconds(1.)
scheduler <! CreateJob (tck.TestActor, 1, schedule)
expectMsgFilter tck <| validateResult |> ignore
expectMsg tck 1 |> ignore
[<Fact>]
let ``Should not deliver message with long delivery time`` () : unit = testDefault <| fun tck ->
let scheduler = spawn tck "scheduler" <| scheduleActor None
let schedule1 = Once <| DateTimeOffset.Now.AddSeconds(10.)
scheduler <! CreateJob (tck.TestActor, 1, schedule1)
let schedule2 = Once <| DateTimeOffset.Now.AddSeconds(1.)
scheduler <! CreateJob (tck.TestActor, 2, schedule2)
expectMsgFilter tck <| validateResult |> ignore
expectMsgFilter tck <| validateResult |> ignore
expectMsg tck 2 |> ignore