How do you keep a tab on background process? Specifically ones that are orchestrated through a job timing system like Quartz.net. As dotnet developers we have a lot of options available to us, but we still have to glue them all together into a cohesive system. The following is how I keep the background jobs of Message Aid humming along with out any hiccups.
Jobs and Triggers
When I first started building out the various jobs in Message Aid, I was very centered on the IJob
interface. This put a sense of awkwardness in my job creation. It wasn't untill I really dug into Why Jobs and Triggers? that I started to really appreciate what Quartz was giving me.
For the remainder of this post I'm going to focus on a generic job called FetchMessagesJob
. This job connects to a broker (like RabbitMQ) and pulls messages out of a queue to be stored in Message Aid.
When I first started out with Quartz I would add jobs to the scheduler any time I needed to watch a new queue. We could simplistically say that it looked like this.
//a hypothetical MVC Action [Route("~/broker/{id}/queue/{queueName})] [HttpPost] public async Task<IActionResult> Post( Guid id, string queueName, [FromServices] ISchedulerFactor schedulerFactory) { var scheduler = await scheduleFactory.GetScheduler(); var job = JobBuilder.Create<FetchMessagesJob>() .WithIdentity( $"fetch-messages-for-{queueName}", $"broker-jobs-{id}" ) .UsingJobData("BrokerId", id) .UsingJobData("QueueName", queueName) .Build(); var trigger = TriggerBuilder.Create() .WithIdentity( $"fetch-messages-trigger-{queueName}", $"broker-jobs-{id}" ) .StartNow() .WithSimpleSchedule(x => x.WithInterval( TimeSpan.FromMinutes(5)).RepeatForever() ) .Build(); await schedule.ScheduleJob(job, trigger); return Ok(); }
After talking with the Quartz team, I learned that I was adding jobs in an "odd" way based on the way the devs thought of jobs, I was effectively duplicating jobs. The idea is that you should define a job once, set common data on the job's data map, and then pass instance specific data using the trigger. Then you can use the merged data map to access data from either the trigger or the job.
Clean Up
After learning a bit more, I have changed the way I register things. First, I have an IHostedService
that adds all of my jobs at start up.
public async Task StartAsync(CancellationToken ct) { // setup var scheduler = await scheduleFactory.GetScheduler(ct); var job = JobBuilder.Create<FetchMessagesJob>() .WithIdentity("fetch-messages-job", "message-jobs") .StoreDurably() .Build(); await schedule.AddJob(job, true, true, ct); // etc... }
Then I add a trigger for those jobs, as needed.
//a hypothetical MVC Action [Route("~/broker/{id}/queue/{queueName})] [HttpPost] public async Task<IActionResult> Post( Guid id, string queueName, [FromServices] ISchedulerFactor schedulerFactory) { var scheduler = await scheduleFactory.GetScheduler(); var trigger = TriggerBuilder.Create() .WithIdentity($"fetch-messages-{queueName}", $"broker-{id}") .ForJob("fetch-messages-job", "message-jobs") .UsingJobData("BrokerId", id) .UsingJobData("QueueName", queueName) .StartNow() .WithSimpleSchedule(x => x.WithInterval( TimeSpan.FromMinutes(5)).RepeatForever() ) .Build(); await schedule.ScheduleJob(trigger); return Ok(); }
This small change means that I can now track metrics and health of the correct items. Quartz tracks TriggerState
vs a job state. With this change it makes a lot more sense to report that I have N
triggers in a given state, and then I can group them by the Job Name.
Simplifying Job and Trigger Identity
Another trick that I picked up working with the quartz team is the practice of putting the Job Key as a static property on the Job, like so:
public class FetchMessagesJob { public static JobKey Key => new("fetch-messages-job", "message-jobs") }
This allows me to easily grab the job's key through out the code base with a FetchMessagesJob.Key
. I'll lean on this later when we start to build out an ObservableJob
later.
Durable Registration
A quick comment. Since I'm now storing my jobs BEFORE there are any triggers its important to StoreDurably()
when adding the job. This way the job won't be immediately cleaned up when there are no more triggers.
using var scope = _provider.CreateScope(); var scheduleFactory = scope.ServiceProvider.GetRequiredService<ISchedulerFactory>(); var scheduler = await scheduleFactory.GetScheduler(ct); await scheduler.AddJob(JobBuilder.Create<ImportQueuesJob>() .WithIdentity(ImportQueuesJob.Key) .StoreDurably() .Build(), true, true, ct);
Building a more robust job
Like most developers, when I start using a new framework I will inevitably start developing a common set of usage patterns. It's no different with Quartz.net, the following are some tricks that I've picked up as I've been building out and unit testing more and more jobs.
public interface IBetterJob<TJobData> where TJobdata : class { static abstract JobKey Key { get; } Task Execute(TJobData data, IJobExecutionContext context); } public class BetterJob<TJobData> : IBetterJob<TJobData> where TJobData : class { readonly ILogger _logger; protected BetterJob(ILogger logger) { _logger = logger; } public async Task Execute(IJobExecutionContext context) { // get TJobData using (var op = Operation.Begin("Running Job")) { try { await Execute(data, context); op.Complete(); // metrics increment (success) } catch (Exception ex) { _logger.LogError(ex, "{Message}", ex.Message); op.Abandon(); // metrics increment (error) // op // logging // sentry } } } abstract Task Execute(TJobData data, IJobExecutionContext context); }
public class FetchMessagesJob : BetterJob<FetchMessageData> { public static JobKey Key => new("fetch-messages", "brokers"); public override Task Execute(FetchMessageData data, IJobExecutionContext context) { } }
Instrumentation Details
- Log Error Volume
- Job Completion Time / Duration
- Trigger State by Job
- Run Counts
public static class Instrumentation { // counters // observable }
A Task Api
Now that we have a standardized, and monitored approach to Quartz jobs, its time to lay the groundwork for an API that we can use to manage our jobs. The over all idea for this api is inspired by the Elasticsearch _tasks
api. I'm going to paint a fairly high level picture and leave the rest for the readers imagination.
GET /_tasks GET /_task/{id}
Job Key
Job and Trigger Split
Merged Job Data Map
Metrics
- Count
- Duration
- Trigger Health
- Sentry Error Pipeline
- Serilog Timings
- databag validation
/_tasks
api
Key Name vs Job Name
Group Name
Trigger With T
- Uses JSON anyways
link to Serilog Best Practices
Job Names and Group Names Trigger Names and Group Names
How you can match in quartz, should influence your naming.