Subscribe to access all episodes. View plans →

#110: Job Processing with Oban

Published January 27, 2020

Elixir 1.9

Oban

View source on GitHub


Here we have an Elixir application and whenever a user clicks this “do work” button, our application does some work, that we can see takes several seconds to complete, creating a poor user experience.

Let’s take a quick look at the code. When the “do work” button is clicked, the “create” action here is executed. In the function we see it’s adding a flash message and rendering our “done.html” template, but before it gets to this point it calls Processor.process - this is where the work is being done. If we open that module, we see that it’s simulating some long running task by calling :timer.sleep.

In this episode let’s look at how we can use Oban to help process our long running job in the background. Many job processing libraries will use Redis as the data store for jobs and if you’re not already using Redis, you now need to bring in another database to your application. One nice thing about Oban is that it uses Postgres to process jobs. So if you’re application is already using Postgres - which ours is - we don’t need to bring in anything other than the Oban package. Let’s start with that.

We’ll go to Hex and grab the config. Then let’s open our Mixfile and add Oban to our list of dependencies. At the time of this recording the most recent version is 1.0 release candidate 2. The current version of Oban may be different for you.

mix.exs

...
{:oban, "1.0.0-rc.2"}
...

Then we’ll go to the command line and download Oban and its dependencies.

$ mix deps.get
...
New:
  oban 1.0.0-rc.2
* Getting oban (Hex package)

Now we need to create a database migration to add the oban_jobs table that Oban uses. Let’s generate a new Ecto migration named add_oban_jobs_table.

$ mix ecto.gen.migration add_oban_jobs_table
Generated teacher app
* creating priv/repo/migrations/{timestamp}_add_oban_jobs_table.exs

Then let’s open up our newly created migration. We’ll need to call the up and down functions on Oban.Migrations. I’ll paste in the required migration from the Oban README.

priv/repo/migrations/{timestamp}_add_oban_jobs_table.exs

defmodule Teacher.Repo.Migrations.AddObanJobsTable do
  use Ecto.Migration

  def up do
    Oban.Migrations.up()
  end

  def down do
    Oban.Migrations.down(version: 1)
  end

end

With that we can go to the command line and run the migration.

$ mix ecto.migrate
...

Now let’s configure Oban to work in our application. To start let’s open our “config.exs” and add a config for Oban, we’ll specify our Repo, then we’ll add the prune option. Since our jobs are stored in the database this allows us to specify how many jobs we want to keep. We can specify based on the number of rows or the age of the row.

Let’s set ours up to prune by the number of rows. We’ll use a tuple specifying :maxlen and then the number of rows, let’s set this to 5000. Now only the most recent 5000 jobs will be stored. Finally, let’s specify the queues. To configure our queues, we’ll use a keyword list with the name of our queue “default” and then the maximum number of concurrent jobs we’ll allow in the queue. Let’s start with 2.

config/config.exs

...
config :teacher, Oban,
  repo: Teacher.Repo,
  prune: {:maxlen, 5_000},
  queues: [default: 2]

...

Oban is not an application and needs to be started from our supervisor, so let’s open our project’s application.ex and add Oban to our list of children to be supervised. We’ll also load our configuration into the supervisor.

lib/teacher/application.ex

...
children = [
  Teacher.Repo,
  TeacherWeb.Endpoint,
  {Oban, Application.get_env(:teacher, Oban)}
]
...

Now let’s check that we have everything setup correctly. We’ll go to the command line and start and IEx session with our project. Then let’s open the observer.

$ iex -S mix
Generated teacher app
Interactive Elixir (1.9.4)
> :observer.start()

If we click the “Applications” tab - great, we see that Oban was started as part of our applications supervision tree. Now that everything is setup, let’s create a job to handle our long running task.

Let’s create a new directory in “teacher” called “workers”, then we’ll create a new module “processor_worker.ex”. We’ll define our module and use Oban.Worker. Each worker must define a perform function, so let’s define that. It takes a map of arguments, which we’ll ignore for now, and the job struct. Inside the function we can call Processor.process to execute our work in the background. And let’s add some logging so we can see when our job starts, the job struct, and when it’s finished.

lib/teacher/workers/processor_worker.ex

defmodule Teacher.Workers.ProcessorWorker do
use Oban.Worker

  def perform(_args, job) do
    IO.puts("Starting work...")
    Teacher.Processor.process
    IO.inspect(job)
    IO.puts("...Finished work")
  end

end

Then we’ll open our page_controller.ex and enqueue our job. Oban Jobs are Ecto structs that are enqueued when inserted into the database. Oban provides workers with a new function that we can call to return a job changeset that can then be inserted into the database with Oban.insert.

Let’s define an empty map, that we’ll pipe into ProcessorWorker.new, then we can pipe the changeset that’s returned into Oban.insert. We’ll also alias our ProcessorWorker so we can call it without the prefix.

lib/teacher_web/controllers/page_controller.ex

defmodule TeacherWeb.PageController do
  use TeacherWeb, :controller

  alias Teacher.Workers.ProcessorWorker
  
  ...
  
  def create(conn, _params) do
    %{}
    |> ProcessorWorker.new()
    |> Oban.insert()

    conn
    |> put_flash(:info, "Work completed")
    |> render("done.html")
  end
  ...
end

Now let’s go to the command line and start up our server.

$ mix phx.server
...

Then we can go back to the browser and if we click our “do work” button again there’s no delay since the work is now being done in the background.

Let’s look at our development logs. We see our work was started, the job struct was logged and from it we can see the data about our job, like the state “executing”. We also see that the job finished.

Starting work...
%Oban.Job{
  __meta__: #Ecto.Schema.Metadata<:loaded, "public", "oban_jobs">,
  args: %{},
  attempt: 1,
  attempted_at: ~U[2020-01-25 03:33:01.567035Z],
  attempted_by: ["alekx", "default", "2gsdghng"],
  completed_at: nil,
  discarded_at: nil,
  errors: [],
  id: 1,
  inserted_at: ~U[2020-01-25 03:33:01.537800Z],
  max_attempts: 20,
  priority: 0,
  queue: "default",
  scheduled_at: ~U[2020-01-25 03:33:01.537800Z],
  state: "executing",
  tags: [],
  unique: nil,
  worker: "Teacher.Workers.ProcessorWorker"
}
...Finished work

Because Oban uses Postgres, let’s check out the oban_jobs table and here we see our job with the data we saw from our logs. Since the job was completed successfully you can see the “state” is now “completed”.

Now let’s update our worker a bit. First let’s pass in the length of time our Processor takes to run. We’ll open our processor.ex module and let’s update it to take the length to sleep for.

lib/teacher/processor.ex

defmodule Teacher.Processor do

  def process(length) do
    :timer.sleep(length)
  end

end

Then let’s go back to our page_controller.ex and we’ll create a random length between 4 and 8 seconds. Then we’ll update our map so that the length is passed into our ProcessorWorker.

lib/teacher_web/controllers/page_controller.ex

...
def create(conn, _params) do
  length = Enum.random(4_000..8_000)

  %{length: length}
  |> ProcessorWorker.new()
  |> Oban.insert()

  conn
  |> put_flash(:info, "Work completed")
  |> render("done.html")

end
...

Now we’ll need to open our processor_worker.ex and our length will now be included here in our args. Let’s pattern match on it. Notice here that we’re using a string to pattern match on the “length” key where we used an atom in our page_controller.ex this is because the args map is deserialized from Postgres, so any keys will be converted to strings.

Now let’s also specify the queue we want these jobs to use. We can specify the queue here by updating the use macro. We’ll include the queue option and then the name of the queue we want to use hard_worker. We can also customize more things here like the maximum number of attempts, priority, tags, and uniqueness.

lib/teacher/workers/processor_worker.ex

defmodule Teacher.Workers.ProcessorWorker do
  use Oban.Worker, queue: :hard_worker

  def perform(%{"length" => length}, job) do
    IO.puts("Starting work...")
    Teacher.Processor.process(length)
    IO.inspect(job)
    IO.puts("Finished work")
  end

end

Now let’s start up our server.

$ mix phx.server
...

Then let’s open the browser and let’s start a few jobs.

Now if we look at our logs we don’t see anything coming through. So let’s open our oban_jobs table and we see our three jobs were inserted and currently have a status of “available” but haven’t been attempted yet.

That’s because when we specified the new queue “hard_worker” for our ProcessorWorker jobs, we didn’t update our Oban config to include it. Let’s do that now. We’ll open our config.exs and update our queues option to include “hard_worker”.

config/config.exs

...
config :teacher, Oban,
  repo: Teacher.Repo,
  prune: {:maxlen, 5_000},
  queues: [default: 2, hard_worker: 2]

...

Now if we go restart our server

$ mix phx.server
...

We see or our jobs come through in the logs and in the Oban.Job struct we can see the length included in the args and the queue we specified.

There’s a lot more customization available in Oban that we didn’t talk about in this episode, but the README has some really good documentation and a nice summary of Oban’s features. Another feature we didn’t look at in this episode but is really cool is the Oban UI, which is currently in available as a private beta. You can sign up for the beta as well as see a really nice demo of it on the Oban website.

© 2024 HEXMONSTER LLC