vereis ♪⁠~⁠(⁠´⁠ε⁠`⁠ ⁠) rss posts projs </>

Cursed Oban Uses

Published 2024-11-04 @ 15:57:29
Approx. 26 minutes

Besides fantastic libraries like Ecto   and Phoenix  , Oban   is one of my favourite libraries in the Elixir ecosystem.

Oban is a "enterprise-grade" "job processing library" for Elixir. One of the main things that makes Oban great is its flexibility and ease of use.

Not only is it easy to get started with, its use is intuitive and it can be applied to a huge range of problems. I'd go as far to say that Oban can be a "simpler" alternative to using OTP abstractions like GenServer or Task for many problems.

However, with great power comes great responsibility. Oban is a powerful tool, and like many powerful tools, some of its use cases can be a bit... cursed.

ELI5: Oban

Before jumping into some fun cursed use cases I've seen on various Elixir projects, let's take a step back and talk about what Oban is and how it works.

Thankfully, getting started with Oban is pretty easy. You can add it to your mix.exs file like so:

defp deps do
  [
    {:oban, "~> 2.7"}
  ]
end

Then, you can run mix deps.get to install it. Once you've done that, you'll need to configure Oban to start in your application.ex file:

def start(_type, _args) do
  children = [
    ...,
    {Oban, Application.get_env(:my_app, Oban)}
  ]

  opts = [strategy: :one_for_one, name: MyApp.Supervisor]
  Supervisor.start_link(children, opts)
end

Your Oban configuration is primiarily responsible for configuring what queues you want Oban to process jobs in. It might look a little something like this:

config :my_app, MyApp.Oban,
  queues: [
    default: [ max_concurrency: 10 ],
    registration_emails: [ max_concurrency: 1 ],
    ...
  ]

In this example, we've defined two queues: default and registration_emails. The default queue has a max_concurrency of 10, meaning that Oban will process up to 10 jobs concurrently in this queue.

The registration_emails queue has a max_concurrency of 1, meaning that Oban will process only 1 job at a time in this queue.

If you're familiar with using a library like Sidekiq  , Oban is conceptually similar. Otherwise, it's not too dissimilar from tools such as RabbitMQ   or Kafka  .... but much simpler.

You can think of Oban vaguely like a database backed, self hosted RabbitMQ!!

Oban uses the database to store jobs, and to keep track of the state of jobs, retries, etc. Today it currently supports PostgreSQL and SQLite as backends.

As a result of this, one final step you'll need to do is to run the Oban migrations. Please refer to the Oban documentation   for more information on how to do this as it might change between releases.

Once all of this is done, running your application via iex -S mix should start Oban and you should be able to start worrying about writing and running jobs!!

Oban Jobs

Once you're able to boot your app up, you can start defining Oban jobs. Oban jobs are simple modules that define a perform function that Oban knows how to run.

A minimal job might look something like the following:

defmodule MyApp.Jobs.SendRegistrationEmail do
  use Oban.Job, queue: :registration_emails

  @impl true
  def perform(%{email: email}) do
    {:ok, _resp} = MyApp.Users.send_registration_email(email)
    :ok
  end
end

In this example, we've defined a job that sends a registration email to a user. We've told Oban to run these jobs in the registration_emails queue.

Anywhere else in your application, you can enqueue an Oban job by calling:

iex> Oban.insert(MyApp.Jobs.SendRegistrationEmail.new(%{email: "test@example.com"}))
{:ok, %Oban.Job{...}}

Once this is done, Oban will magically process the jobs for you based on your defined concurrency limits, handle retries, etc.

Whilst Oban is a totally free and open source library, it's worth noting that there is a paid version of Oban called Oban Pro  .

Subscribing to Oban Pro is totally worth it in my opinion, as it gives you a bunch of plug-and-play features that make Oban even more powerful and easy to use. Some of the features I admittedly couldn't live without now are:

  1. Oban Web  : A LiveView powered UI for monitoring the state of your jobs and queues, as well as giving you a convenient UI to scale queues, retry jobs, etc.
  2. Recorded Jobs  : A simple feature that allows you to store the result of a job in the database, making it easy to query and introspect the results of jobs.
  3. Workflows  : A feature that allows you to define job DAGs, making it easy to define complex job dependencies and workflows.
  4. Batch and Chunk workers  : A feature that allows you to process jobs in batches or chunks, making it easy to process large amounts of data in a controlled manner.

A lot of the things we talk about in this post will assume you're using Oban Pro, but most of the concepts are applicable to the free version of Oban as well.

If you really wanted to, you could probably build your own versions of a lot of these features. The time and effort you'd spend doing so would probably be better spent on other things, though.

Blessed Oban Use Cases

At Vetspire  , we provide a SaaS that has a huge surface area of features and functionality.

We've got features that require soft-realtime data processing, features that involve tonnes of expensive data munging, features that involve interacting with lots of external APIs, etc.

Oban has proven itself to be a fantastic, drop-in and forget solution across basically all of our use cases.

Cron Jobs

Cron jobs are one great, free, feature of Oban that we use a lot.

A lot of the third party integrations we support are simple polling APIs. This is especially true for a lot of older lab integrations; think fetching patient X-ray results, or blood test results so we can display them in our UI.

Because of how ubiquitous Cron   is, it's a pattern that a lot of developers are familiar with, and Oban makes it stupid easy to implement!

Alongside your queue configuration, you can define a crontab configuration that looks something like this:

config :my_app, MyApp.Oban,
  queues: [
    ...
  ],
  crontab: [
    {MyApp.Integrations.Xrays, "*/5 * * * *"},
    {MyApp.Integrations.Bloodwork, "*/5 * * * *"},
    {MyApp.Integrations.ActivePhoneCalls, "*/1 * * * *"},
    {MyApp.Integrations.PurgeOrphanedData, "@daily"}
  ]

In this example, we've defined a few cron jobs that run every 5 minutes, every minute, and once a day. Oban will automatically schedule these jobs to run at the specified intervals.

The first element of the tuple is a reference to an Oban job that you write, and the second is a cron expression   that defines when the job should run.

There are, of course, alternatives to using Oban for cron jobs. You could use a library like Quantum   which is a pure Elixir library, but there are always trade-offs:

  1. Cron jobs are a single feature of Oban, and you get a lot of other features for free.
  2. You need to worry more about tracking the state of your cron jobs, retries, etc. You can persist this information somewhere but its something you have to think about.
  3. In Quantum, ephemeral jobs that aren't backed by persistent storage means they'll run on every Elixir node in a distributed system. This can be a good or bad thing depending on your use case.
  4. There is, however , a cost to over-relying on Oban or the database for all of your job processing needs. We'll talk about this more later.

Depending on your needs, your Oban cron jobs can be as simple or as complex as you like. The actual cron functionality is only responsible for scheduling your jobs.

You can easily configure Jobs to be unique per queue, to have retries, to have timeouts, etc. You can also easily define dependencies between jobs, and even define complex workflows using Oban Pro's Workflows   feature.

For our lab integrations, depending on the external API, we might have a job with the following unique constraints:

use Oban.Pro.Worker,
  queue: :lab_results,
  unique: [:args],
  period: :timer.minutes(30) * 1000,
  max_attempts: 5

In this example, Oban will prevent duplicate jobs from being enqueued and processed based on the Job's arguments within a 30 minute interval. If the job fails, Oban will retry it up to 5 times.

For our integrations, not all of our customers use the same labs, so a common pattern we have is to have cron jobs be responsible for kicking off children jobs that are unique to a particular customer.

defmodule MyApp.Integrations.Bloodwork do
  use Oban.Pro.Worker,
    queue: :lab_results,
    unique: [:args],
    period: :timer.minutes(30) * 1000,
    max_attempts: 5

  # Oban pro uses the `process/1` callback, but the same pattern applies to `perform/1` in the free version of Oban
  @impl Oban.Pro.Worker
  def process(%Oban.Job{args: %{org_id: org_id}}) do
    %Org{} = org = MyApp.Orgs.get_org(id: org_id)

    org
    |> MyApp.Integrations.fetch_latest_bloodwork()
    |> MyApp.Integrations.persist_bloodwork!()

    :ok
  end

  def process(%Oban.Job{}) do
    [with_integration: Bloodwork]
    |> MyApp.Integrations.list_orgs()
    |> Enum.map(fn org -> __MODULE__.new(%{org_id: org.id}) end)
    |> Oban.insert_all()

    :ok
  end
end

Oban's cron scheduler will enqueue the job initially with no arguments. This job will then fetch a list of all the organizations that have the bloodwork integration enabled, and enqueue a child job for each organization.

Thanks to our unique constraints, we can be confident that we're not going to be processing the same job multiple times for the same customer, and we're only going to be processing results for customers that have the integration enabled.

Additionally, if anything fails talking to the external API (via fetch_latest_bloodwork/1), or persisting results into the database (via persist_bloodwork!/1), Oban will retry the job up to 5 times.

This is a pattern we use a lot at Vetspire, and it's a pattern that Oban makes stupid easy to implement.

The above example is just a simple example. I'd recommend reading this guide on Reliable Scheduled Jobs   in the Oban documentation for more information on how to implement more complex cron jobs.

One use case we have for this is to only enqueue child jobs on the first attempt of the parent job. This is because we don't want to be re-enqueuing child jobs if the parent job fails for some reason.

Resilient API Integrations

A great vanilla use-case for Oban is making resilient API integrations.

One large feature we have at Vetspire   is our ability to automatically send emails, SMS messages, and postcards to our customers' clients reminding them about vaccine renewals, appointment reminders, etc.

These are the sorts of features where you really, really want to make sure that messages are actually sent. If a message isn't sent, it can have real-world consequences for our customers.

At the very least, we want to make sure that we retry failed jobs to cover any transient failures, and we want to make sure that we can report on the state of any failed jobs.

This is free in Oban:

defmodule MyApp.Workers.ExtNotification do
  use Oban.Pro.Worker,
    queue: :notifications,
    unique: [:args],
    states: [:available, :scheduled, :executing, :retryable],
    period: :infinity

  @impl Oban.Pro.Worker
  def process(%Oban.Job{args: args) do
    %Org{} = org = MyApp.Orgs.get_org!(id: args.org_id)

    case args.contact_method do
      :sms ->
        MyApp.Integrations.Twilio.send_sms!(org, args |> Keyword.new() |> Keyword.delete(:org_id))

      :email ->
        MyApp.Integrations.SendGrid.send_email!(org, args |> Keyword.new() |> Keyword.delete(:org_id))

      :postcard ->
        ...
    end
  rescue
    _e in [ArgumentError, RuntimeError] ->
      {:discard, "Invalid Args Provided"}
  end
end

In this example, we've defined a job that sends an external notification to a client. We've told Oban to run these jobs in the notifications queue.

If the job fails, Oban will automatically retry it up to 5 times. If the job fails after 5 retries, it will be marked as failed and you can easily see the reason why in Oban Web.

We also have various reports that our clinics can pull to see how many reminders were sent, how many failed, why they failed, etc.

This is implemented directly by querying the "oban_jobs" table which Oban uses to store the state of jobs, and returning results as a CSV / as a nice UI:

def sms_report(%Org{} = org, filters) do
  base_query = from x in Oban.Job,
    where: x.worker == ^inspect(MyApp.Workers.ExtNotification),
    where: x.args["contact_method"] == "sms",
    where: x.args["org_id"] == ^org.id

  query =
    Enum.reduce(filters, base_query, fn
      {:status, "sent"}, query ->
        from x in query, where: x.state == "completed"

      {:status, "failed"}, query ->
        from x in query, where: x.state in ["discarded", "cancelled"]

      {:status, "enqueued"}, query ->
        from x in query, where: x.state in ["available", "scheduled", "retryable"]

      {:start_datetime, datetime}, query ->
        from x in query, where: x.inserted_at >= ^datetime

      {:end_datetime, datetime}, query ->
        from x in query, where: x.inserted_at <= ^datetime

      ...
    end)

  MyApp.Repo.transaction(fn ->
    query
    |> MyApp.Repo.stream()
    |> Stream.map(&build_csv_row!/1)
    |> MyApp.Utils.Stream.to_csv!()
  end)
end

We can then take the generated CSV and either return the results as JSON for rendering in the UI, or email the results to the clinic.

Of course, you'll probably need more than use handling retries to make your API integrations resilient.

Oban comes with configurable backoff strategies and timeouts, but you'll need to worry abour rate-limiting on your own. We'll talk about rate limiting with Oban later.

Long Running Background Tasks

Another great use case for Oban is long running background tasks.

In Vetspire's   early days, we built a simple feature that allowed our customers to upload a CSV of patient data, and we'd import that data into our system.

Because we knew up-front that this would take a long time, we opted to build it using OTP abstractions so that they'd run in the background. This was a pretty simple feature to implement, but...

  • It could fail for a variety of runtime reasons.
  • It could be extremely difficult to debug as failures tended to be coupled to the specific CSV files being uploaded.
  • It was difficult to tune how many jobs we could run concurrently, and how many retries we should have, etc.

We eventually migrated this feature to Oban, and it was stupid easy to do and was much less code to maintain also:

defmodule MyApp.DataSync do
  ...

  defmodule Importer do
    use Oban.Pro.Worker,
      queue: :datasync,
      max_attempts: 3,
      unique: [:args],
      states: [:available, :scheduled, :executing, :retryable],
      period: :timer.hours(1) * 1000

    @impl Oban.Pro.Worker
    def process(%Oban.Job{args: %{org_id: org_id, signed_url: signed_url}}) do
      %Org{} = org = MyApp.Orgs.get_org!(id: org_id)

      csv_filepath = MyApp.Integrations.GCP.download!(signed_url)
      :ok = MyApp.DataSync.do_import_csv!(org, csv_filepath)
    end
  end

  ...

  def import_csv!(%Org{} = org, csv_filepath) do
    {url, signed_url} =
      org
      |> MyApp.Integrations.GCP.upload!(csv_filepath)
      |> MyApp.Integrations.GCP.signed_url!(expires_in: :timer.hours(1))

    job_changeset =
      MyApp.DataSync.Importer.new(%{org_id: org.id, signed_url: signed_url, url: url})

    MyApp.Repo.transaction(fn ->
      {:ok, %Oban.Job{} = job} = Oban.insert(job_changeset)

      :ok =
        org
        |> create_datasync_log!(job.id)
        |> broadcast!()

      {:ok, job}
    end)
  end

  def do_import_csv!(%Org{} = org, csv_filepath) do
    # ... complex data processing stuff ...
  end

  ...
end

This worked just as well as our OTP implementation, but the caveats listed above were no longer a problem:

  • Any failures were automatically retried up to 3 times, and we could see stacktraces for why jobs failed in Oban Web, retry them, etc.
  • The URLs for the CSV files were part of the job itself so developers could easily fetch the CSVs and introspect them.
  • We could easily tune how many jobs we could run concurrently, how many retries we should have, etc.

Additionally, since Vetspire is a deployed across multiple K8S pods with multiple different deployments, we ended up being able to offload the processing of these jobs to a dedicated pod with more resources.

This helped us avoid wasting resources on our API serving pods -- very possible with OTP, but definitely more work and maintenance needed for that.

Oban Curses and Hexes!

Having covered our more conventional use cases for Oban, hopefully now you can see how powerful and flexible, but most of all simple Oban is to use.

However, with great power comes great responsibility, and there are definitely some use cases where Oban can be a bit... cursed.

Now, I'm not saying that you shouldn't use Oban for these use cases, but you should be aware of the trade-offs and potential pitfalls.

Replacing Task Async/Await

In the early days of Vetspire  , we had a lot of code that looked like this:

defmodule MyApp.SomeModule do
  def some_function() do
    result =
      do()
      |> some()
      |> important()
      |> work!()

    spawn(fn -> do_some_other_less_important_workdo_some_work() end)

    {:ok, result}
  end
end

The problem with this code is that the spawn call is fire-and-forget . If the process that was spawned failed, there was no way to know about it, and no way to retry it. If you're lucky you'll see a stacktrace in your logs or on Sentry  , but that's about it.

Because of how ubiquitous this was in our codebase, we ended up with a lot of "lost" work that was never retried, and a lot of "lost" work that was never reported on.

We wanted to replace this with a drop-in solution that would at very least give us slightly more resilience for these jobs, so we reached out to Oban.

Oban Pro   has a feature called Relay   that essentially allows you to replace Task.async/1 and Task.await/2 calls with Oban jobs, which you could use for this.

However, Relay requires you, like most Oban jobs, to implement dedicated workers for each job you want to run. This can be a bit of a pain if you have a lot of these ad-hoc fire-and-forget jobs.

So instead, we wrote the following module called MyApp.Task which provides various macros with a similar API to Task.async/1 and Task.await/2:

defmodule MyApp.Task do
  defmacro async(lambda, opts \\ []) do
    module = __CALLER__.module
    {function, arity} = __CALLER__.function
    line = __CALLER__.line

    source = Macro.to_string(lambda)

    # Mainly for debugging...
    caller = "#{inspect(module)}.#{function}/#{arity}:#{line}"

    quote bind_quoted: [
            lambda: lambda,
            caller: caller,
            source: source,
            opts: opts,
            parent: __MODULE__
          ] do
      opts =
        opts
        |> Keyword.pop(:node_type, :worker)
        |> then(fn {node_type, opts} -> Keyword.put(opts, :queue, parent.queue_of(node_type)) end)

      lambda
      |> :erlang.term_to_binary()
      |> Base.encode64()
      |> then(fn lambda -> %{source: source, caller: caller, lambda: lambda} end)
      |> MyApp.Task.AdHoc.new(opts)
      |> Oban.insert()
    end
  end

  def await(%Oban.Job{worker: MyApp.Task.AdHoc} = job, _timeout) when job.state == :completed do
    MyApp.Task.AdHoc.fetch_recorded(job)
  end

  def await(%Oban.Job{worker: MyApp.Task.AdHoc} = job, timeout) do
    if is_nil(Process.get({__MODULE__, :await_start})) do
      Process.put({__MODULE__, :await_start}, DateTime.utc_now())
    end

    if DateTime.diff(DateTime.utc_now(), Process.get({__MODULE__, :await_start})) > timeout do
      raise RuntimeError, "Timeout waiting for job to complete"
    end

    await(job, timeout)
  end

  @doc false
  def queue_of(:worker_node), do: :ad_hoc_worker
  def queue_of(:api_node), do: :ad_hoc_api
  def queue_of(:datasync_node), do: :ad_hoc_datasync
  def queue_of(_other), do: raise(ArgumentError, message: "Unsupported node type.")
end

This module allows us to replace the spawn call in the previous example with:

defmodule MyApp.SomeModule do
  require MyApp.Task

  def some_function() do
    result =
      do()
      |> some()
      |> important()
      |> work!()

    MyApp.Task.async(fn -> do_some_other_less_important_workdo_some_work() end)

    {:ok, result}
  end
end

This code will now enqueue the work to be processed by Oban, and if it fails, we can see the reason why in Oban Web, retry it, etc.

Additionally, you can use MyApp.Task.await/1 to block the parent process until the job has completed, or until a timeout has been reached.

We found this to be a great way to replace our spawn calls with something that was more resilient, and gave us more visibility into what was happening.

Its ergonomic enough that developers can also use this macro as a very quick form of prototyping, deferring the need to worry about building out more complex workers until we better understand workflows and failure modes.

Simple Caches

Another cursed use case for Oban is using it to implement simple state machines.

One simple feature Vetspire   provides is the ability for our customers' clients to email us directly. When they do so, we take their email and render it in our UI for our clients' staff to respond to.

However, if said clients don't exist in our system, we usually want to reply with some canned response.

This was done very easily, using Oban only to help improve the reliability of our emails, however, after not very long at all, our support team noticed that Vetspire was sending out stupid amounts of canned responses in a seeming infinite loop.

It turns out that when we replied to an email with our canned response, if that email belonged to a client who was on vacation, they had an auto-responder set up that would reply to our canned response, which would then trigger another canned response, etc.

What we really wanted to do was to cache the fact that we'd already sent a canned response to a particular email, and not send another one.

With our existing Oban setup, and our MyApp.Task module, we were able to implement a simple cache like so:

def send_unknown_sender_email(%Org{} = org, sender_email) do
  subject = "Error Unknown Sender"

  body =
    org.id
    |> MyApp.Orgs.preferences("org.unknown_sender_reply_text")
    |> case do
      [%OrgPreference{} = pref | _rest] when pref.value not in [nil, ""] ->
        pref.value

      _otherwise ->
        """
        <p>We do not have this email in our records, please call your veterinary practice directly. Thank you.</p>
        """
    end

  MyApp.Task.async(
    fn -> send_email(org, sender_email, subject, body) end,
    max_attempts: 1,
    meta: %{sender_email: sender_email, org_id: org.id},
    unique: [fields: [:meta], period: 24 |> :timer.hours() |> div(1000)]
  )
end

In this example, we've defined a job that sends an email to an unknown sender. We've told Oban to run these jobs in the ad_hoc_email queue.

Additionally, we're setting a custom unique constraint specifically for this single job instance, unique to the sender_email and org_id fields, and only allowing one job to be enqueued every 24 hours.

This way, we'll send our canned response the first time we run into this case for a given user, but we won't send it again for another 24 hours.

Backing Certain Reports

We also have a feature where we want to track which patients have visited a clinic on a given day. This is a feature that we want to be able to report on, and we want it to be accurate.

The problem is that we have no single point of truth for this information. We have a lot of different systems that track patient visits, and they all have different ways of tracking this information.

We already built a library called EctoMiddleware   that we use for hooking into various Ecto operations, and we wanted to use this to track patient visits.

Essentially, we implement a middleware that hooks onto any insert, update, or delete operations that have a patient_id field. This way, we know that the patient has been in clinic.

Of course, we'll also exclude entities which can be created/updated/deleted without the patient being in clinic, but the vast majority of the time, this is a good enough heuristic.

Once we've got out middleware, we've hooked into all the operations we care about, and we've got a good idea of which patients have visited a clinic on a given day, we want to be able to report on this.

We enqueue Oban jobs to write this information into a dedicated table in our database, and we can then query this table to generate reports.

The Oban job, much like the cache example, is configured to be unique per patient and per day:

defmodule MyApp.Middlewares.StalePatients.Worker do
  use Oban.Pro.Worker,
    queue: :stale_patients,
    max_attempts: 5,
    unique: [
      period: floor(:timer.hours(6) / 1000),
      keys: [:patient_id, :client_id],
      states: [:available, :scheduled, :executing, :completed]
    ]

  import Ecto.Query

  alias MyApp.Clinical.Patient
  alias MyApp.Repo

  require Logger

  @impl Oban.Pro.Worker
  def process(%{args: %{"client_id" => client_id, "org_id" => org_id}})
      when is_integer(client_id) and is_integer(org_id) do
    patient_ids =
      [client_id: client_id]
      |> Patient.query()
      |> Repo.replica().all()
      |> Enum.map(& &1.id)

    # TODO: use an `insert_all` here now that we use `Oban Pro`.
    for patient_id <- patient_ids do
      Oban.insert(new(%{patient_id: patient_id, org_id: org_id}))
    end

    :ok
  end

  def process(%{args: %{"patient_id" => patient_id, "org_id" => org_id}})
      when is_integer(patient_id) and is_integer(org_id) do
    now = NaiveDateTime.truncate(NaiveDateTime.utc_now(), :second)

    primary_location_id =
      Repo.replica().one(
        from x in Patient,
          join: y in assoc(x, :client),
          select: coalesce(y.location_id, -1),
          where: x.id == ^patient_id
      )

    Repo.insert_all(
      "stale_patients",
      [
        %{
          patient_id: patient_id,
          org_id: org_id,
          location_id: primary_location_id,
          inserted_at: now,
          updated_at: now
        }
      ],
      on_conflict: {:replace, [:updated_at]},
      conflict_target: [:patient_id]
    )

    :ok
  end

  def process(_job) do
    :ok
  end
end

Our middleware is also stupid-simple to minimize the potential runtime impact of hooking into all these operations. We simply:

1) Store any patient IDs for which the middleware has already run in the Process Dictionary  , defending against bulk operations. 2) For any patient IDs we've not already run the middleware for, we simply try inserting the above Oban job. 3) The Oban job will be picked up soon enough on a dedicated pod, and will write the information into the stale_patients table. 4) Reporting is done in 24h increments so by the time the reports are run, the data is already there.

Our middleware implementation ends up looking something like:

defmodule MyApp.Middlewares.StalePatients do
  @behaviour EctoMiddleware

  alias MyApp.Billing.Order
  alias MyApp.Clinical.Immunization
  alias MyApp.Clinical.Medication
  ...
  alias MyApp.Middlewares.StalePatients.Worker

  # We don't want to bother marking any records as needing materialization
  # if we're simply reading data.
  @write_actions [
    :insert,
    :insert!,
    :update,
    :update!,
    :delete,
    :delete!,
    :insert_all,
    :update_all,
    :delete_all
  ]

  # But for any creates, updates, or deletes for these following schemas and their underlying tables,
  # we want to mark the associated patient as needing materialization.
  @dependencies [
    Order, Immunization, Medication, Treatment, Estimate, Examination
  ]

  @impl EctoMiddleware
  def middleware(resource, resolution)
      when resolution.action in @write_actions and resource.__struct__ in @dependencies and
             is_integer(resource.org_id) do
    mark_stale!(resource)
  end

  def middleware(resource, _resolution) do
    resource
  end

  @doc "Skips the middleware for the current process"
  def disable!(resource) when is_integer(resource.patient_id) do
    Process.put({__MODULE__, :disabled, {:patient, resource.patient_id}}, true)
    :ok
  end

  def disable!(resource) when is_integer(resource.client_id) do
    Process.put({__MODULE__, :disabled, {:client, resource.client_id}}, true)
    :ok
  end

  @doc "Enables the middleware for the current process"
  def enable!(resource) when is_integer(resource.patient_id) do
    Process.put({__MODULE__, :disabled, {:patient, resource.patient_id}}, false)
    :ok
  end

  def enable!(resource) when is_integer(resource.client_id) do
    Process.put({__MODULE__, :disabled, {:client, resource.client_id}}, false)
    :ok
  end

  @doc "Returns true if the middleware is disabled for the current process"
  def enabled?(resource) when is_integer(resource.patient_id) do
    Process.get({__MODULE__, :disabled, {:patient, resource.patient_id}}) != true
  end

  def enabled?(resource) when is_integer(resource.client_id) do
    Process.get({__MODULE__, :disabled, {:client, resource.client_id}}) != true
  end

  def mark_stale!(resource) do
    cond do
      not enabled?(resource) ->
        :noop

      not FunWithFlags.enabled?(:stale_tracking, for: %{org_id: resource.org_id}) ->
        :noop

      is_map_key(resource, :patient_id) and is_integer(resource.patient_id) ->
        disable!(resource)
        Worker.schedule!(%{patient_id: resource.patient_id, org_id: resource.org_id})

      is_map_key(resource, :client_id) and is_integer(resource.client_id) ->
        disable!(resource)
        Worker.schedule!(%{client_id: resource.client_id, org_id: resource.org_id})

      true ->
        :noop
    end

    resource
  end
end

Rate Limiting

One final cursed use case for Oban is rate limiting.

Some of our external integrations have strict rate limits, so we need to be careful to tune our workers and queues to respect these limits.

Even doing so however, sometimes, retries or other bugs can cause us to hit limits.

This can be a real problem, especially if the API limits refuse to refresh unless you hold off making requests rather than self-correcting after some time.

However, if you recall from the above examples, all of our API integrations belong to their own dedicated queues. This means that once we hit a rate limit, we can simply pause the queue and cancel all jobs in that queue:

defmodule MyApp.Integrations.ActivePhoneCalls do
  use Oban.Pro.Worker,
    queue: :phone_calls,
    max_attempts: 5,
    unique: [:args],
    states: [:available, :scheduled, :executing, :retryable],
    period: :infinity

  @impl Oban.Pro.Worker
  def process(%Oban.Job{args: args}) do
    # ... poll external API for phone call information ...
  rescue
    %HTTPoison.Error{reason: reason} = e ->
      case reason do
        {:econnrefused, _rate_limit_hit} ->
          MyApp.Integrations.RateLimiter.run(:phone_calls, hours: 1, minutes: 30)

        _error ->
          reraise e
      end
  end
end

This, in tandem with a simple Oban job that unpauses the queue after a certain amount of time, can be a simple way to handle rate limiting:

defmodule MyApp.Integrations.RateLimiter do
  use Oban.Pro.Worker,
    ...

  @impl Oban.Pro.Worker
  def process(%Oban.Job{args: %{queue: queue}}) do
    Oban.start_queue(queue)
    :ok
  end

  def run(queue, duration) do
    Oban.cancel_all_jobs(from x in Oban.Job, where: x.queue == :phone_calls)
    Oban.pause_queue(:phone_calls)
    Oban.insert(new(%{queue: queue}), scheduled_at: Timex.shift(Timex.now(), duration))
  end
end

Note, we generally avoid using Timex in lieu of the new built-in functions for date/time manipulation and processing but Timex's fluent-api is very convenient for this use case.

I highly recommend checking it out!

Conclusion

Oban is a fantastic library that has a lot of great features out of the box, and is a great way to add resilience and reliability to your Elixir applications.

In this post, we've covered a lot of the conventional use cases for Oban, and some of the more cursed use cases that we've encountered at Vetspire  .

I hope that this post has given you some ideas on how you can use Oban in your own applications, and some of the trade-offs and pitfalls you might encounter.

(END)