Subscribe to access all episodes. View plans →
Published January 27, 2020
Elixir 1.9
View source on GitHub
Here we have an Elixir application and whenever a user clicks this “do work” button, our application does some work, that we can see takes several seconds to complete, creating a poor user experience.
Let’s take a quick look at the code. When the “do work” button is clicked, the “create” action here is executed. In the function we see it’s adding a flash message and rendering our “done.html” template, but before it gets to this point it calls Processor.process
- this is where the work is being done. If we open that module, we see that it’s simulating some long running task by calling :timer.sleep
.
In this episode let’s look at how we can use Oban to help process our long running job in the background. Many job processing libraries will use Redis as the data store for jobs and if you’re not already using Redis, you now need to bring in another database to your application. One nice thing about Oban is that it uses Postgres to process jobs. So if you’re application is already using Postgres - which ours is - we don’t need to bring in anything other than the Oban package. Let’s start with that.
We’ll go to Hex and grab the config. Then let’s open our Mixfile and add Oban to our list of dependencies. At the time of this recording the most recent version is 1.0 release candidate 2. The current version of Oban may be different for you.
mix.exs
...
{:oban, "1.0.0-rc.2"}
...
Then we’ll go to the command line and download Oban and its dependencies.
$ mix deps.get
...
New:
oban 1.0.0-rc.2
* Getting oban (Hex package)
Now we need to create a database migration to add the oban_jobs
table that Oban uses. Let’s generate a new Ecto migration named add_oban_jobs_table
.
$ mix ecto.gen.migration add_oban_jobs_table
Generated teacher app
* creating priv/repo/migrations/{timestamp}_add_oban_jobs_table.exs
Then let’s open up our newly created migration. We’ll need to call the up
and down
functions on Oban.Migrations
. I’ll paste in the required migration from the Oban README.
priv/repo/migrations/{timestamp}_add_oban_jobs_table.exs
defmodule Teacher.Repo.Migrations.AddObanJobsTable do
use Ecto.Migration
def up do
Oban.Migrations.up()
end
def down do
Oban.Migrations.down(version: 1)
end
end
With that we can go to the command line and run the migration.
$ mix ecto.migrate
...
Now let’s configure Oban to work in our application. To start let’s open our “config.exs” and add a config for Oban, we’ll specify our Repo, then we’ll add the prune
option. Since our jobs are stored in the database this allows us to specify how many jobs we want to keep. We can specify based on the number of rows or the age of the row.
Let’s set ours up to prune by the number of rows. We’ll use a tuple specifying :maxlen
and then the number of rows, let’s set this to 5000. Now only the most recent 5000 jobs will be stored. Finally, let’s specify the queues. To configure our queues, we’ll use a keyword list with the name of our queue “default” and then the maximum number of concurrent jobs we’ll allow in the queue. Let’s start with 2.
config/config.exs
...
config :teacher, Oban,
repo: Teacher.Repo,
prune: {:maxlen, 5_000},
queues: [default: 2]
...
Oban is not an application and needs to be started from our supervisor, so let’s open our project’s application.ex
and add Oban to our list of children to be supervised. We’ll also load our configuration into the supervisor.
lib/teacher/application.ex
...
children = [
Teacher.Repo,
TeacherWeb.Endpoint,
{Oban, Application.get_env(:teacher, Oban)}
]
...
Now let’s check that we have everything setup correctly. We’ll go to the command line and start and IEx session with our project. Then let’s open the observer.
$ iex -S mix
Generated teacher app
Interactive Elixir (1.9.4)
> :observer.start()
If we click the “Applications” tab - great, we see that Oban was started as part of our applications supervision tree. Now that everything is setup, let’s create a job to handle our long running task.
Let’s create a new directory in “teacher” called “workers”, then we’ll create a new module “processor_worker.ex”. We’ll define our module and use Oban.Worker
. Each worker must define a perform
function, so let’s define that. It takes a map of arguments, which we’ll ignore for now, and the job struct. Inside the function we can call Processor.process
to execute our work in the background. And let’s add some logging so we can see when our job starts, the job struct, and when it’s finished.
lib/teacher/workers/processor_worker.ex
defmodule Teacher.Workers.ProcessorWorker do
use Oban.Worker
def perform(_args, job) do
IO.puts("Starting work...")
Teacher.Processor.process
IO.inspect(job)
IO.puts("...Finished work")
end
end
Then we’ll open our page_controller.ex
and enqueue our job. Oban Jobs are Ecto structs that are enqueued when inserted into the database. Oban provides workers with a new
function that we can call to return a job changeset that can then be inserted into the database with Oban.insert
.
Let’s define an empty map, that we’ll pipe into ProcessorWorker.new
, then we can pipe the changeset that’s returned into Oban.insert
. We’ll also alias our ProcessorWorker
so we can call it without the prefix.
lib/teacher_web/controllers/page_controller.ex
defmodule TeacherWeb.PageController do
use TeacherWeb, :controller
alias Teacher.Workers.ProcessorWorker
...
def create(conn, _params) do
%{}
|> ProcessorWorker.new()
|> Oban.insert()
conn
|> put_flash(:info, "Work completed")
|> render("done.html")
end
...
end
Now let’s go to the command line and start up our server.
$ mix phx.server
...
Then we can go back to the browser and if we click our “do work” button again there’s no delay since the work is now being done in the background.
Let’s look at our development logs. We see our work was started, the job struct was logged and from it we can see the data about our job, like the state “executing”. We also see that the job finished.
Starting work...
%Oban.Job{
__meta__: #Ecto.Schema.Metadata<:loaded, "public", "oban_jobs">,
args: %{},
attempt: 1,
attempted_at: ~U[2020-01-25 03:33:01.567035Z],
attempted_by: ["alekx", "default", "2gsdghng"],
completed_at: nil,
discarded_at: nil,
errors: [],
id: 1,
inserted_at: ~U[2020-01-25 03:33:01.537800Z],
max_attempts: 20,
priority: 0,
queue: "default",
scheduled_at: ~U[2020-01-25 03:33:01.537800Z],
state: "executing",
tags: [],
unique: nil,
worker: "Teacher.Workers.ProcessorWorker"
}
...Finished work
Because Oban uses Postgres, let’s check out the oban_jobs
table and here we see our job with the data we saw from our logs. Since the job was completed successfully you can see the “state” is now “completed”.
Now let’s update our worker a bit. First let’s pass in the length of time our Processor
takes to run. We’ll open our processor.ex
module and let’s update it to take the length
to sleep for.
lib/teacher/processor.ex
defmodule Teacher.Processor do
def process(length) do
:timer.sleep(length)
end
end
Then let’s go back to our page_controller.ex
and we’ll create a random length between 4 and 8 seconds. Then we’ll update our map so that the length is passed into our ProcessorWorker
.
lib/teacher_web/controllers/page_controller.ex
...
def create(conn, _params) do
length = Enum.random(4_000..8_000)
%{length: length}
|> ProcessorWorker.new()
|> Oban.insert()
conn
|> put_flash(:info, "Work completed")
|> render("done.html")
end
...
Now we’ll need to open our processor_worker.ex
and our length will now be included here in our args
. Let’s pattern match on it. Notice here that we’re using a string to pattern match on the “length” key where we used an atom in our page_controller.ex
this is because the args map is deserialized from Postgres, so any keys will be converted to strings.
Now let’s also specify the queue we want these jobs to use. We can specify the queue
here by updating the use
macro. We’ll include the queue option and then the name of the queue we want to use hard_worker
. We can also customize more things here like the maximum number of attempts, priority, tags, and uniqueness.
lib/teacher/workers/processor_worker.ex
defmodule Teacher.Workers.ProcessorWorker do
use Oban.Worker, queue: :hard_worker
def perform(%{"length" => length}, job) do
IO.puts("Starting work...")
Teacher.Processor.process(length)
IO.inspect(job)
IO.puts("Finished work")
end
end
Now let’s start up our server.
$ mix phx.server
...
Then let’s open the browser and let’s start a few jobs.
Now if we look at our logs we don’t see anything coming through. So let’s open our oban_jobs
table and we see our three jobs were inserted and currently have a status of “available” but haven’t been attempted yet.
That’s because when we specified the new queue “hard_worker” for our ProcessorWorker
jobs, we didn’t update our Oban config to include it. Let’s do that now. We’ll open our config.exs
and update our queues option to include “hard_worker”.
config/config.exs
...
config :teacher, Oban,
repo: Teacher.Repo,
prune: {:maxlen, 5_000},
queues: [default: 2, hard_worker: 2]
...
Now if we go restart our server
$ mix phx.server
...
We see or our jobs come through in the logs and in the Oban.Job
struct we can see the length included in the args and the queue we specified.
There’s a lot more customization available in Oban that we didn’t talk about in this episode, but the README has some really good documentation and a nice summary of Oban’s features. Another feature we didn’t look at in this episode but is really cool is the Oban UI, which is currently in available as a private beta. You can sign up for the beta as well as see a really nice demo of it on the Oban website.