Operationalizing Quartz Jobs

2023 Feb 13 byDruDru

How do you keep a tab on background process? Specifically ones that are orchestrated through a job timing system like Quartz.net. As dotnet developers we have a lot of options available to us, but we still have to glue them all together into a cohesive system. The following is how I keep the background jobs of Message Aid humming along with out any hiccups.

Jobs and Triggers

When I first started building out the various jobs in Message Aid, I was very centered on the IJob interface. This put a sense of awkwardness in my job creation. It wasn't untill I really dug into Why Jobs and Triggers? that I started to really appreciate what Quartz was giving me.

For the remainder of this post I'm going to focus on a generic job called FetchMessagesJob. This job connects to a broker (like RabbitMQ) and pulls messages out of a queue to be stored in Message Aid.

When I first started out with Quartz I would add jobs to the scheduler any time I needed to watch a new queue. We could simplistically say that it looked like this.

//a hypothetical MVC Action
[Route("~/broker/{id}/queue/{queueName})]
[HttpPost]
public async Task<IActionResult> Post(
    Guid id, 
    string queueName,
    [FromServices] ISchedulerFactor schedulerFactory) 
{
    var scheduler = await scheduleFactory.GetScheduler();

    var job = JobBuilder.Create<FetchMessagesJob>()
        .WithIdentity(
            $"fetch-messages-for-{queueName}",
            $"broker-jobs-{id}"
        )
        .UsingJobData("BrokerId", id)
        .UsingJobData("QueueName", queueName)
        .Build();

    var trigger = TriggerBuilder.Create()
        .WithIdentity(
            $"fetch-messages-trigger-{queueName}", 
            $"broker-jobs-{id}"
        )
        .StartNow()
        .WithSimpleSchedule(x => x.WithInterval(
            TimeSpan.FromMinutes(5)).RepeatForever()
        )
        .Build();

    await schedule.ScheduleJob(job, trigger);

    return Ok();
}

After talking with the Quartz team, I learned that I was adding jobs in an "odd" way based on the way the devs thought of jobs, I was effectively duplicating jobs. The idea is that you should define a job once, set common data on the job's data map, and then pass instance specific data using the trigger. Then you can use the merged data map to access data from either the trigger or the job.

Clean Up

After learning a bit more, I have changed the way I register things. First, I have an IHostedService that adds all of my jobs at start up.

public async Task StartAsync(CancellationToken ct)
{
    // setup
    var scheduler = await scheduleFactory.GetScheduler(ct);
    var job = JobBuilder.Create<FetchMessagesJob>()
        .WithIdentity("fetch-messages-job", "message-jobs")
        .StoreDurably()
        .Build();

    await schedule.AddJob(job, true, true, ct);
    
    // etc...
}

Then I add a trigger for those jobs, as needed.

//a hypothetical MVC Action
[Route("~/broker/{id}/queue/{queueName})]
[HttpPost]
public async Task<IActionResult> Post(
    Guid id, 
    string queueName,
    [FromServices] ISchedulerFactor schedulerFactory)
{
    var scheduler = await scheduleFactory.GetScheduler();

    var trigger = TriggerBuilder.Create()
        .WithIdentity($"fetch-messages-{queueName}", $"broker-{id}")
        .ForJob("fetch-messages-job", "message-jobs")
        .UsingJobData("BrokerId", id)
        .UsingJobData("QueueName", queueName)
        .StartNow()
        .WithSimpleSchedule(x => x.WithInterval(
            TimeSpan.FromMinutes(5)).RepeatForever()
        )
        .Build();

    await schedule.ScheduleJob(trigger);

    return Ok();
}

This small change means that I can now track metrics and health of the correct items. Quartz tracks TriggerState vs a job state. With this change it makes a lot more sense to report that I have N triggers in a given state, and then I can group them by the Job Name.

Simplifying Job and Trigger Identity

Another trick that I picked up working with the quartz team is the practice of putting the Job Key as a static property on the Job, like so:

public class FetchMessagesJob 
{
    public static JobKey Key => new("fetch-messages-job", "message-jobs")
}

This allows me to easily grab the job's key through out the code base with a FetchMessagesJob.Key. I'll lean on this later when we start to build out an ObservableJob later.

Durable Registration

A quick comment. Since I'm now storing my jobs BEFORE there are any triggers its important to StoreDurably() when adding the job. This way the job won't be immediately cleaned up when there are no more triggers.

using var scope = _provider.CreateScope();
var scheduleFactory = scope.ServiceProvider.GetRequiredService<ISchedulerFactory>();
var scheduler = await scheduleFactory.GetScheduler(ct);


await scheduler.AddJob(JobBuilder.Create<ImportQueuesJob>()
    .WithIdentity(ImportQueuesJob.Key)
    .StoreDurably()
    .Build(), true, true, ct);

Building a more robust job

Like most developers, when I start using a new framework I will inevitably start developing a common set of usage patterns. It's no different with Quartz.net, the following are some tricks that I've picked up as I've been building out and unit testing more and more jobs.

public interface IBetterJob<TJobData>
    where TJobdata : class
{
    static abstract JobKey Key { get; }

    Task Execute(TJobData data, IJobExecutionContext context);
}

public class BetterJob<TJobData> :
    IBetterJob<TJobData>
    where TJobData : class
{
    readonly ILogger _logger;

    protected BetterJob(ILogger logger)
    {
        _logger = logger;
    }

    public async Task Execute(IJobExecutionContext context)
    {
        // get TJobData

        using (var op = Operation.Begin("Running Job"))
        {
            try 
            {
                await Execute(data, context);
                op.Complete();
                // metrics increment (success)
            } 
            catch (Exception ex)
            {
                _logger.LogError(ex, "{Message}", ex.Message);
                op.Abandon();

                // metrics increment (error)

                // op
                // logging
                // sentry 
            }
        }
    }

    abstract Task Execute(TJobData data, IJobExecutionContext context);
}
public class FetchMessagesJob : BetterJob<FetchMessageData>
{
    public static JobKey Key => new("fetch-messages", "brokers");

    public override Task Execute(FetchMessageData data, IJobExecutionContext context)
    {

    }
}

Instrumentation Details

  • Log Error Volume
  • Job Completion Time / Duration
  • Trigger State by Job
  • Run Counts
public static class Instrumentation
{
    // counters
    // observable
}

A Task Api

Now that we have a standardized, and monitored approach to Quartz jobs, its time to lay the groundwork for an API that we can use to manage our jobs. The over all idea for this api is inspired by the Elasticsearch _tasks api. I'm going to paint a fairly high level picture and leave the rest for the readers imagination.

GET /_tasks
GET /_task/{id}
  • Job Key

  • Job and Trigger Split

  • Merged Job Data Map

  • Metrics

    • Count
    • Duration
    • Trigger Health
    • Sentry Error Pipeline
    • Serilog Timings
    • databag validation
    • /_tasks api
  • Key Name vs Job Name

  • Group Name

  • Trigger With T

    • Uses JSON anyways

link to Serilog Best Practices

Job Names and Group Names Trigger Names and Group Names

How you can match in quartz, should influence your naming.

© 2023 A Curious Mind, LLC. All rights reserved.
Message Aid and the Message Aid logo are trademarks and tradenames of A Curious Mind, LLC and may not be used or reproduced without consent.