Besides fantastic libraries like Ecto and Phoenix , Oban is one of my favourite libraries in the Elixir ecosystem.
Oban is a "enterprise-grade" "job processing library" for Elixir. One of the main things that makes Oban great is its flexibility and ease of use.
Not only is it easy to get started with, its use is intuitive and it can be applied to a huge range of problems. I'd go as far to say that Oban
can
be a "simpler" alternative to using OTP abstractions like
GenServer
or
Task
for many problems.
However, with great power comes great responsibility. Oban is a powerful tool, and like many powerful tools, some of its use cases can be a bit... cursed.
ELI5: Oban
Before jumping into some fun cursed use cases I've seen on various Elixir projects, let's take a step back and talk about what Oban is and how it works.
Thankfully, getting started with Oban is pretty easy. You can add it to your
mix.exs
file like so:
defp deps do
[
{:oban, "~> 2.7"}
]
end
Then, you can run
mix deps.get
to install it. Once you've done that, you'll need to configure Oban to start in your
application.ex
file:
def start(_type, _args) do
children = [
...,
{Oban, Application.get_env(:my_app, Oban)}
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
Your Oban configuration is primiarily responsible for configuring what
queues
you want Oban to process jobs in. It might look a little something like this:
config :my_app, MyApp.Oban,
queues: [
default: [ max_concurrency: 10 ],
registration_emails: [ max_concurrency: 1 ],
...
]
In this example, we've defined two queues:
default
and
registration_emails
. The
default
queue has a
max_concurrency
of
10
, meaning that Oban will process up to
10
jobs concurrently in this queue.
The
registration_emails
queue has a
max_concurrency
of
1
, meaning that Oban will process only
1
job at a time in this queue.
If you're familiar with using a library like Sidekiq , Oban is conceptually similar. Otherwise, it's not too dissimilar from tools such as RabbitMQ or Kafka .... but much simpler.
You can think of Oban vaguely like a database backed, self hosted RabbitMQ!!
Oban uses the database to store jobs, and to keep track of the state of jobs, retries, etc. Today it currently supports
PostgreSQL
and
SQLite
as backends.
As a result of this, one final step you'll need to do is to run the Oban migrations. Please refer to the Oban documentation for more information on how to do this as it might change between releases.
Once all of this is done, running your application via
iex -S mix
should start Oban and you should be able to start worrying about writing and running jobs!!
Oban Jobs
Once you're able to boot your app up, you can start defining Oban jobs. Oban jobs are simple modules that define a
perform
function that Oban knows how to run.
A minimal job might look something like the following:
defmodule MyApp.Jobs.SendRegistrationEmail do
use Oban.Job, queue: :registration_emails
@impl true
def perform(%{email: email}) do
{:ok, _resp} = MyApp.Users.send_registration_email(email)
:ok
end
end
In this example, we've defined a job that sends a registration email to a user. We've told Oban to run these jobs in the
registration_emails
queue.
Anywhere else in your application, you can enqueue an Oban job by calling:
iex> Oban.insert(MyApp.Jobs.SendRegistrationEmail.new(%{email: "test@example.com"}))
{:ok, %Oban.Job{...}}
Once this is done, Oban will magically process the jobs for you based on your defined concurrency limits, handle retries, etc.
Whilst Oban is a totally free and open source library, it's worth noting that there is a paid version of Oban called Oban Pro .
Subscribing to Oban Pro is totally worth it in my opinion, as it gives you a bunch of plug-and-play features that make Oban even more powerful and easy to use. Some of the features I admittedly couldn't live without now are:
- Oban Web : A LiveView powered UI for monitoring the state of your jobs and queues, as well as giving you a convenient UI to scale queues, retry jobs, etc.
- Recorded Jobs : A simple feature that allows you to store the result of a job in the database, making it easy to query and introspect the results of jobs.
- Workflows : A feature that allows you to define job DAGs, making it easy to define complex job dependencies and workflows.
- Batch and Chunk workers : A feature that allows you to process jobs in batches or chunks, making it easy to process large amounts of data in a controlled manner.
A lot of the things we talk about in this post will assume you're using Oban Pro, but most of the concepts are applicable to the free version of Oban as well.
If you really wanted to, you could probably build your own versions of a lot of these features. The time and effort you'd spend doing so would probably be better spent on other things, though.
Blessed Oban Use Cases
At Vetspire , we provide a SaaS that has a huge surface area of features and functionality.
We've got features that require soft-realtime data processing, features that involve tonnes of expensive data munging, features that involve interacting with lots of external APIs, etc.
Oban has proven itself to be a fantastic, drop-in and forget solution across basically all of our use cases.
Cron Jobs
Cron jobs are one great, free, feature of Oban that we use a lot.
A lot of the third party integrations we support are simple polling APIs. This is especially true for a lot of older lab integrations; think fetching patient X-ray results, or blood test results so we can display them in our UI.
Because of how ubiquitous Cron is, it's a pattern that a lot of developers are familiar with, and Oban makes it stupid easy to implement!
Alongside your queue configuration, you can define a
crontab
configuration that looks something like this:
config :my_app, MyApp.Oban,
queues: [
...
],
crontab: [
{MyApp.Integrations.Xrays, "*/5 * * * *"},
{MyApp.Integrations.Bloodwork, "*/5 * * * *"},
{MyApp.Integrations.ActivePhoneCalls, "*/1 * * * *"},
{MyApp.Integrations.PurgeOrphanedData, "@daily"}
]
In this example, we've defined a few cron jobs that run every 5 minutes, every minute, and once a day. Oban will automatically schedule these jobs to run at the specified intervals.
The first element of the tuple is a reference to an Oban job that you write, and the second is a cron expression that defines when the job should run.
There are, of course, alternatives to using Oban for cron jobs. You could use a library like Quantum which is a pure Elixir library, but there are always trade-offs:
- Cron jobs are a single feature of Oban, and you get a lot of other features for free.
- You need to worry more about tracking the state of your cron jobs, retries, etc. You can persist this information somewhere but its something you have to think about.
- In Quantum, ephemeral jobs that aren't backed by persistent storage means they'll run on every Elixir node in a distributed system. This can be a good or bad thing depending on your use case.
- There is, however , a cost to over-relying on Oban or the database for all of your job processing needs. We'll talk about this more later.
Depending on your needs, your Oban cron jobs can be as simple or as complex as you like. The actual cron functionality is only responsible for scheduling your jobs.
You can easily configure Jobs to be unique per queue, to have retries, to have timeouts, etc. You can also easily define dependencies between jobs, and even define complex workflows using Oban Pro's Workflows feature.
For our lab integrations, depending on the external API, we might have a job with the following unique constraints:
use Oban.Pro.Worker,
queue: :lab_results,
unique: [:args],
period: :timer.minutes(30) * 1000,
max_attempts: 5
In this example, Oban will prevent duplicate jobs from being enqueued and processed based on the Job's arguments within a 30 minute interval. If the job fails, Oban will retry it up to 5 times.
For our integrations, not all of our customers use the same labs, so a common pattern we have is to have cron jobs be responsible for kicking off children jobs that are unique to a particular customer.
defmodule MyApp.Integrations.Bloodwork do
use Oban.Pro.Worker,
queue: :lab_results,
unique: [:args],
period: :timer.minutes(30) * 1000,
max_attempts: 5
# Oban pro uses the `process/1` callback, but the same pattern applies to `perform/1` in the free version of Oban
@impl Oban.Pro.Worker
def process(%Oban.Job{args: %{org_id: org_id}}) do
%Org{} = org = MyApp.Orgs.get_org(id: org_id)
org
|> MyApp.Integrations.fetch_latest_bloodwork()
|> MyApp.Integrations.persist_bloodwork!()
:ok
end
def process(%Oban.Job{}) do
[with_integration: Bloodwork]
|> MyApp.Integrations.list_orgs()
|> Enum.map(fn org -> __MODULE__.new(%{org_id: org.id}) end)
|> Oban.insert_all()
:ok
end
end
Oban's cron scheduler will enqueue the job initially with no arguments. This job will then fetch a list of all the organizations that have the bloodwork integration enabled, and enqueue a child job for each organization.
Thanks to our unique constraints, we can be confident that we're not going to be processing the same job multiple times for the same customer, and we're only going to be processing results for customers that have the integration enabled.
Additionally, if anything fails talking to the external API (via
fetch_latest_bloodwork/1
), or persisting results into the database (via
persist_bloodwork!/1
), Oban will retry the job up to 5 times.
This is a pattern we use a lot at Vetspire, and it's a pattern that Oban makes stupid easy to implement.
The above example is just a simple example. I'd recommend reading this guide on Reliable Scheduled Jobs in the Oban documentation for more information on how to implement more complex cron jobs.
One use case we have for this is to only enqueue child jobs on the first attempt of the parent job. This is because we don't want to be re-enqueuing child jobs if the parent job fails for some reason.
Resilient API Integrations
A great vanilla use-case for Oban is making resilient API integrations.
One large feature we have at Vetspire is our ability to automatically send emails, SMS messages, and postcards to our customers' clients reminding them about vaccine renewals, appointment reminders, etc.
These are the sorts of features where you really, really want to make sure that messages are actually sent. If a message isn't sent, it can have real-world consequences for our customers.
At the very least, we want to make sure that we retry failed jobs to cover any transient failures, and we want to make sure that we can report on the state of any failed jobs.
This is free in Oban:
defmodule MyApp.Workers.ExtNotification do
use Oban.Pro.Worker,
queue: :notifications,
unique: [:args],
states: [:available, :scheduled, :executing, :retryable],
period: :infinity
@impl Oban.Pro.Worker
def process(%Oban.Job{args: args) do
%Org{} = org = MyApp.Orgs.get_org!(id: args.org_id)
case args.contact_method do
:sms ->
MyApp.Integrations.Twilio.send_sms!(org, args |> Keyword.new() |> Keyword.delete(:org_id))
:email ->
MyApp.Integrations.SendGrid.send_email!(org, args |> Keyword.new() |> Keyword.delete(:org_id))
:postcard ->
...
end
rescue
_e in [ArgumentError, RuntimeError] ->
{:discard, "Invalid Args Provided"}
end
end
In this example, we've defined a job that sends an external notification to a client. We've told Oban to run these jobs in the
notifications
queue.
If the job fails, Oban will automatically retry it up to 5 times. If the job fails after 5 retries, it will be marked as
failed
and you can easily see the reason why in Oban Web.
We also have various reports that our clinics can pull to see how many reminders were sent, how many failed, why they failed, etc.
This is implemented directly by querying the
"oban_jobs"
table which Oban uses to store the state of jobs, and returning results as a CSV / as a nice UI:
def sms_report(%Org{} = org, filters) do
base_query = from x in Oban.Job,
where: x.worker == ^inspect(MyApp.Workers.ExtNotification),
where: x.args["contact_method"] == "sms",
where: x.args["org_id"] == ^org.id
query =
Enum.reduce(filters, base_query, fn
{:status, "sent"}, query ->
from x in query, where: x.state == "completed"
{:status, "failed"}, query ->
from x in query, where: x.state in ["discarded", "cancelled"]
{:status, "enqueued"}, query ->
from x in query, where: x.state in ["available", "scheduled", "retryable"]
{:start_datetime, datetime}, query ->
from x in query, where: x.inserted_at >= ^datetime
{:end_datetime, datetime}, query ->
from x in query, where: x.inserted_at <= ^datetime
...
end)
MyApp.Repo.transaction(fn ->
query
|> MyApp.Repo.stream()
|> Stream.map(&build_csv_row!/1)
|> MyApp.Utils.Stream.to_csv!()
end)
end
We can then take the generated CSV and either return the results as JSON for rendering in the UI, or email the results to the clinic.
Of course, you'll probably need more than use handling retries to make your API integrations resilient.
Oban comes with configurable backoff strategies and timeouts, but you'll need to worry abour rate-limiting on your own. We'll talk about rate limiting with Oban later.
Long Running Background Tasks
Another great use case for Oban is long running background tasks.
In Vetspire's early days, we built a simple feature that allowed our customers to upload a CSV of patient data, and we'd import that data into our system.
Because we knew up-front that this would take a long time, we opted to build it using OTP abstractions so that they'd run in the background. This was a pretty simple feature to implement, but...
- It could fail for a variety of runtime reasons.
- It could be extremely difficult to debug as failures tended to be coupled to the specific CSV files being uploaded.
- It was difficult to tune how many jobs we could run concurrently, and how many retries we should have, etc.
We eventually migrated this feature to Oban, and it was stupid easy to do and was much less code to maintain also:
defmodule MyApp.DataSync do
...
defmodule Importer do
use Oban.Pro.Worker,
queue: :datasync,
max_attempts: 3,
unique: [:args],
states: [:available, :scheduled, :executing, :retryable],
period: :timer.hours(1) * 1000
@impl Oban.Pro.Worker
def process(%Oban.Job{args: %{org_id: org_id, signed_url: signed_url}}) do
%Org{} = org = MyApp.Orgs.get_org!(id: org_id)
csv_filepath = MyApp.Integrations.GCP.download!(signed_url)
:ok = MyApp.DataSync.do_import_csv!(org, csv_filepath)
end
end
...
def import_csv!(%Org{} = org, csv_filepath) do
{url, signed_url} =
org
|> MyApp.Integrations.GCP.upload!(csv_filepath)
|> MyApp.Integrations.GCP.signed_url!(expires_in: :timer.hours(1))
job_changeset =
MyApp.DataSync.Importer.new(%{org_id: org.id, signed_url: signed_url, url: url})
MyApp.Repo.transaction(fn ->
{:ok, %Oban.Job{} = job} = Oban.insert(job_changeset)
:ok =
org
|> create_datasync_log!(job.id)
|> broadcast!()
{:ok, job}
end)
end
def do_import_csv!(%Org{} = org, csv_filepath) do
# ... complex data processing stuff ...
end
...
end
This worked just as well as our OTP implementation, but the caveats listed above were no longer a problem:
- Any failures were automatically retried up to 3 times, and we could see stacktraces for why jobs failed in Oban Web, retry them, etc.
- The URLs for the CSV files were part of the job itself so developers could easily fetch the CSVs and introspect them.
- We could easily tune how many jobs we could run concurrently, how many retries we should have, etc.
Additionally, since Vetspire is a deployed across multiple K8S pods with multiple different deployments, we ended up being able to offload the processing of these jobs to a dedicated pod with more resources.
This helped us avoid wasting resources on our API serving pods -- very possible with OTP, but definitely more work and maintenance needed for that.
Oban Curses and Hexes!
Having covered our more conventional use cases for Oban, hopefully now you can see how powerful and flexible, but most of all simple Oban is to use.
However, with great power comes great responsibility, and there are definitely some use cases where Oban can be a bit... cursed.
Now, I'm not saying that you shouldn't use Oban for these use cases, but you should be aware of the trade-offs and potential pitfalls.
Replacing Task Async/Await
In the early days of Vetspire , we had a lot of code that looked like this:
defmodule MyApp.SomeModule do
def some_function() do
result =
do()
|> some()
|> important()
|> work!()
spawn(fn -> do_some_other_less_important_workdo_some_work() end)
{:ok, result}
end
end
The problem with this code is that the
spawn
call is
fire-and-forget
. If the process that was spawned failed, there was no way to know about it, and no way to retry it. If you're lucky you'll see
a stacktrace in your logs or on
Sentry
, but that's about it.
Because of how ubiquitous this was in our codebase, we ended up with a lot of "lost" work that was never retried, and a lot of "lost" work that was never reported on.
We wanted to replace this with a drop-in solution that would at very least give us slightly more resilience for these jobs, so we reached out to Oban.
Oban Pro has a feature called Relay that essentially allows you to replaceTask.async/1
and
Task.await/2
calls with Oban jobs, which you could use for this.
However,
Relay
requires you, like most Oban jobs, to implement dedicated workers for each job you want to run. This can be a bit of a pain if you have a lot of these ad-hoc fire-and-forget jobs.
So instead, we wrote the following module called
MyApp.Task
which provides various macros with a similar API to
Task.async/1
and
Task.await/2
:
defmodule MyApp.Task do
defmacro async(lambda, opts \\ []) do
module = __CALLER__.module
{function, arity} = __CALLER__.function
line = __CALLER__.line
source = Macro.to_string(lambda)
# Mainly for debugging...
caller = "#{inspect(module)}.#{function}/#{arity}:#{line}"
quote bind_quoted: [
lambda: lambda,
caller: caller,
source: source,
opts: opts,
parent: __MODULE__
] do
opts =
opts
|> Keyword.pop(:node_type, :worker)
|> then(fn {node_type, opts} -> Keyword.put(opts, :queue, parent.queue_of(node_type)) end)
lambda
|> :erlang.term_to_binary()
|> Base.encode64()
|> then(fn lambda -> %{source: source, caller: caller, lambda: lambda} end)
|> MyApp.Task.AdHoc.new(opts)
|> Oban.insert()
end
end
def await(%Oban.Job{worker: MyApp.Task.AdHoc} = job, _timeout) when job.state == :completed do
MyApp.Task.AdHoc.fetch_recorded(job)
end
def await(%Oban.Job{worker: MyApp.Task.AdHoc} = job, timeout) do
if is_nil(Process.get({__MODULE__, :await_start})) do
Process.put({__MODULE__, :await_start}, DateTime.utc_now())
end
if DateTime.diff(DateTime.utc_now(), Process.get({__MODULE__, :await_start})) > timeout do
raise RuntimeError, "Timeout waiting for job to complete"
end
await(job, timeout)
end
@doc false
def queue_of(:worker_node), do: :ad_hoc_worker
def queue_of(:api_node), do: :ad_hoc_api
def queue_of(:datasync_node), do: :ad_hoc_datasync
def queue_of(_other), do: raise(ArgumentError, message: "Unsupported node type.")
end
This module allows us to replace the
spawn
call in the previous example with:
defmodule MyApp.SomeModule do
require MyApp.Task
def some_function() do
result =
do()
|> some()
|> important()
|> work!()
MyApp.Task.async(fn -> do_some_other_less_important_workdo_some_work() end)
{:ok, result}
end
end
This code will now enqueue the work to be processed by Oban, and if it fails, we can see the reason why in Oban Web, retry it, etc.
Additionally, you can use
MyApp.Task.await/1
to block the parent process until the job has completed, or until a timeout has been reached.
We found this to be a great way to replace our
spawn
calls with something that was more resilient, and gave us more visibility into what was happening.
Its ergonomic enough that developers can also use this macro as a very quick form of prototyping, deferring the need to worry about building out more complex workers until we better understand workflows and failure modes.
Simple Caches
Another cursed use case for Oban is using it to implement simple state machines.
One simple feature Vetspire provides is the ability for our customers' clients to email us directly. When they do so, we take their email and render it in our UI for our clients' staff to respond to.
However, if said clients don't exist in our system, we usually want to reply with some canned response.
This was done very easily, using Oban only to help improve the reliability of our emails, however, after not very long at all, our support team noticed that Vetspire was sending out stupid amounts of canned responses in a seeming infinite loop.
It turns out that when we replied to an email with our canned response, if that email belonged to a client who was on vacation, they had an auto-responder set up that would reply to our canned response, which would then trigger another canned response, etc.
What we really wanted to do was to cache the fact that we'd already sent a canned response to a particular email, and not send another one.
With our existing Oban setup, and our
MyApp.Task
module, we were able to implement a simple cache like so:
def send_unknown_sender_email(%Org{} = org, sender_email) do
subject = "Error Unknown Sender"
body =
org.id
|> MyApp.Orgs.preferences("org.unknown_sender_reply_text")
|> case do
[%OrgPreference{} = pref | _rest] when pref.value not in [nil, ""] ->
pref.value
_otherwise ->
"""
<p>We do not have this email in our records, please call your veterinary practice directly. Thank you.</p>
"""
end
MyApp.Task.async(
fn -> send_email(org, sender_email, subject, body) end,
max_attempts: 1,
meta: %{sender_email: sender_email, org_id: org.id},
unique: [fields: [:meta], period: 24 |> :timer.hours() |> div(1000)]
)
end
In this example, we've defined a job that sends an email to an unknown sender. We've told Oban to run these jobs in the
ad_hoc_email
queue.
Additionally, we're setting a custom unique constraint specifically for this single job instance, unique to the
sender_email
and
org_id
fields, and only allowing one job to be enqueued every 24 hours.
This way, we'll send our canned response the first time we run into this case for a given user, but we won't send it again for another 24 hours.
Backing Certain Reports
We also have a feature where we want to track which patients have visited a clinic on a given day. This is a feature that we want to be able to report on, and we want it to be accurate.
The problem is that we have no single point of truth for this information. We have a lot of different systems that track patient visits, and they all have different ways of tracking this information.
We already built a library called EctoMiddleware that we use for hooking into various Ecto operations, and we wanted to use this to track patient visits.
Essentially, we implement a middleware that hooks onto any
insert
,
update
, or
delete
operations that have a
patient_id
field. This way, we know that the patient has been in clinic.
Of course, we'll also exclude entities which can be created/updated/deleted without the patient being in clinic, but the vast majority of the time, this is a good enough heuristic.
Once we've got out middleware, we've hooked into all the operations we care about, and we've got a good idea of which patients have visited a clinic on a given day, we want to be able to report on this.
We enqueue Oban jobs to write this information into a dedicated table in our database, and we can then query this table to generate reports.
The Oban job, much like the cache example, is configured to be unique per patient and per day:
defmodule MyApp.Middlewares.StalePatients.Worker do
use Oban.Pro.Worker,
queue: :stale_patients,
max_attempts: 5,
unique: [
period: floor(:timer.hours(6) / 1000),
keys: [:patient_id, :client_id],
states: [:available, :scheduled, :executing, :completed]
]
import Ecto.Query
alias MyApp.Clinical.Patient
alias MyApp.Repo
require Logger
@impl Oban.Pro.Worker
def process(%{args: %{"client_id" => client_id, "org_id" => org_id}})
when is_integer(client_id) and is_integer(org_id) do
patient_ids =
[client_id: client_id]
|> Patient.query()
|> Repo.replica().all()
|> Enum.map(& &1.id)
# TODO: use an `insert_all` here now that we use `Oban Pro`.
for patient_id <- patient_ids do
Oban.insert(new(%{patient_id: patient_id, org_id: org_id}))
end
:ok
end
def process(%{args: %{"patient_id" => patient_id, "org_id" => org_id}})
when is_integer(patient_id) and is_integer(org_id) do
now = NaiveDateTime.truncate(NaiveDateTime.utc_now(), :second)
primary_location_id =
Repo.replica().one(
from x in Patient,
join: y in assoc(x, :client),
select: coalesce(y.location_id, -1),
where: x.id == ^patient_id
)
Repo.insert_all(
"stale_patients",
[
%{
patient_id: patient_id,
org_id: org_id,
location_id: primary_location_id,
inserted_at: now,
updated_at: now
}
],
on_conflict: {:replace, [:updated_at]},
conflict_target: [:patient_id]
)
:ok
end
def process(_job) do
:ok
end
end
Our middleware is also stupid-simple to minimize the potential runtime impact of hooking into all these operations. We simply:
1) Store any patient IDs for which the middleware has already run in the
Process Dictionary
, defending against bulk operations.
2) For any patient IDs we've not already run the middleware for, we simply try inserting the above Oban job.
3) The Oban job will be picked up soon enough on a dedicated pod, and will write the information into the
stale_patients
table.
4) Reporting is done in 24h increments so by the time the reports are run, the data is already there.
Our middleware implementation ends up looking something like:
defmodule MyApp.Middlewares.StalePatients do
@behaviour EctoMiddleware
alias MyApp.Billing.Order
alias MyApp.Clinical.Immunization
alias MyApp.Clinical.Medication
...
alias MyApp.Middlewares.StalePatients.Worker
# We don't want to bother marking any records as needing materialization
# if we're simply reading data.
@write_actions [
:insert,
:insert!,
:update,
:update!,
:delete,
:delete!,
:insert_all,
:update_all,
:delete_all
]
# But for any creates, updates, or deletes for these following schemas and their underlying tables,
# we want to mark the associated patient as needing materialization.
@dependencies [
Order, Immunization, Medication, Treatment, Estimate, Examination
]
@impl EctoMiddleware
def middleware(resource, resolution)
when resolution.action in @write_actions and resource.__struct__ in @dependencies and
is_integer(resource.org_id) do
mark_stale!(resource)
end
def middleware(resource, _resolution) do
resource
end
@doc "Skips the middleware for the current process"
def disable!(resource) when is_integer(resource.patient_id) do
Process.put({__MODULE__, :disabled, {:patient, resource.patient_id}}, true)
:ok
end
def disable!(resource) when is_integer(resource.client_id) do
Process.put({__MODULE__, :disabled, {:client, resource.client_id}}, true)
:ok
end
@doc "Enables the middleware for the current process"
def enable!(resource) when is_integer(resource.patient_id) do
Process.put({__MODULE__, :disabled, {:patient, resource.patient_id}}, false)
:ok
end
def enable!(resource) when is_integer(resource.client_id) do
Process.put({__MODULE__, :disabled, {:client, resource.client_id}}, false)
:ok
end
@doc "Returns true if the middleware is disabled for the current process"
def enabled?(resource) when is_integer(resource.patient_id) do
Process.get({__MODULE__, :disabled, {:patient, resource.patient_id}}) != true
end
def enabled?(resource) when is_integer(resource.client_id) do
Process.get({__MODULE__, :disabled, {:client, resource.client_id}}) != true
end
def mark_stale!(resource) do
cond do
not enabled?(resource) ->
:noop
not FunWithFlags.enabled?(:stale_tracking, for: %{org_id: resource.org_id}) ->
:noop
is_map_key(resource, :patient_id) and is_integer(resource.patient_id) ->
disable!(resource)
Worker.schedule!(%{patient_id: resource.patient_id, org_id: resource.org_id})
is_map_key(resource, :client_id) and is_integer(resource.client_id) ->
disable!(resource)
Worker.schedule!(%{client_id: resource.client_id, org_id: resource.org_id})
true ->
:noop
end
resource
end
end
Rate Limiting
One final cursed use case for Oban is rate limiting.
Some of our external integrations have strict rate limits, so we need to be careful to tune our workers and queues to respect these limits.
Even doing so however, sometimes, retries or other bugs can cause us to hit limits.
This can be a real problem, especially if the API limits refuse to refresh unless you hold off making requests rather than self-correcting after some time.
However, if you recall from the above examples, all of our API integrations belong to their own dedicated queues. This means that once we hit a rate limit, we can simply pause the queue and cancel all jobs in that queue:
defmodule MyApp.Integrations.ActivePhoneCalls do
use Oban.Pro.Worker,
queue: :phone_calls,
max_attempts: 5,
unique: [:args],
states: [:available, :scheduled, :executing, :retryable],
period: :infinity
@impl Oban.Pro.Worker
def process(%Oban.Job{args: args}) do
# ... poll external API for phone call information ...
rescue
%HTTPoison.Error{reason: reason} = e ->
case reason do
{:econnrefused, _rate_limit_hit} ->
MyApp.Integrations.RateLimiter.run(:phone_calls, hours: 1, minutes: 30)
_error ->
reraise e
end
end
end
This, in tandem with a simple Oban job that unpauses the queue after a certain amount of time, can be a simple way to handle rate limiting:
defmodule MyApp.Integrations.RateLimiter do
use Oban.Pro.Worker,
...
@impl Oban.Pro.Worker
def process(%Oban.Job{args: %{queue: queue}}) do
Oban.start_queue(queue)
:ok
end
def run(queue, duration) do
Oban.cancel_all_jobs(from x in Oban.Job, where: x.queue == :phone_calls)
Oban.pause_queue(:phone_calls)
Oban.insert(new(%{queue: queue}), scheduled_at: Timex.shift(Timex.now(), duration))
end
end
Note, we generally avoid using
Timex
in lieu of the new built-in functions for date/time manipulation and processing butTimex
's fluent-api is very convenient for this use case.I highly recommend checking it out!
Conclusion
Oban is a fantastic library that has a lot of great features out of the box, and is a great way to add resilience and reliability to your Elixir applications.
In this post, we've covered a lot of the conventional use cases for Oban, and some of the more cursed use cases that we've encountered at Vetspire .
I hope that this post has given you some ideas on how you can use Oban in your own applications, and some of the trade-offs and pitfalls you might encounter.