Subscribe to access all episodes. View plans →

#9: Elixir Job Processing with Exq

Published March 17, 2017

Elixir 1.3.4

Phoenix 1.2.1

Exq 0.8.1

ExqUI 0.8.2

Source Code on GitHub


Phoenix is a great framework for building fast applications in Elixir. Unfortunately there are things that can slow down your application. For example a long running task or integrating with a third party API.

Here we have a demo app, but it has an issue. When the user fills out the contact form there’s a delay while the email is being sent. We’re going to solve this by offloading the work into an async or background job. And to do that we’ll use the Exq job processing library.

Exq uses Redis as a store for background jobs and handles things like concurrency, job persistence, and retries for you with a format that is rescue and Sidekiq compatible.

One sidenote: before adding any Redis backing queuing library to your Elixir application ensure that the needs of your application can’t be handled by OTP.

So let’s get started. Exq uses Redis so we’ll have to have that running. I’m on a Mac so I’ll install it using Homebrew.

$ brew install redis

And the start it:

$ brew services start redis

Now from our application’s Mixfile we’ll include exq as a dependency. We’ll also need to include Exq in the application list.

mix.exs

defmodule Teacher.Mixfile do

  def application do
    [mod: {Teacher, []},
     applications: [... :exq]
  end

  defp deps do
    ... 
    {:exq, "~> 0.8.1"}
    ... 
  end
end

The let’s download our dependencies:

$ mix deps.get

Now we can configure our application to use Exq. Here we’re doing some basic setup. The ‘host’ and ‘port’ variables tell Exq how to access Redis. ‘Name’ let’s us customize Exq’s registered name, we’ll leave that as the default: ‘Exq’. ‘Concurrency’ defines the number of concurrent workers allowed. And ‘queues’, here we’re creating a single queue named ‘email’, which we’ll use to listen for our email jobs. You can configure this and add additional queues.

config/config.exs

config :exq,
  name: Exq,
  host: "127.0.0.1",
  port: 6379,
  namespace: "exq",
  concurrency: 500,
  queues: ["email"]

Our ‘MessageController’ is where we’re handling the post from the form and sending the email. Mailer.send_contact_email(message_params) is where the bottleneck is - this is what we’ll want to pull into a worker. We’ll create our worker in ‘lib’ with a module called ‘send_email_worker’.

Inside our module we’ll create the required ‘perform’ function and inside it add our long-running Mailer.send_contact_email(message_params). On the next line we’re going to print a message so we see when the work is completed in our logs.

lib/send_email_worker.ex

defmodule Teacher.SendEmailWorker do
  alias Teacher.Mailer

  def perform(message_params) do
    Mailer.send_contact_email(message_params)
    IO.puts("Finished -- #{message_params["body"]} - #{message_params["subject"]}")
  end
end

Now that Mailer.send_contact_email(message_params) has been added to our worker, we can remove it from our controller. And then to enqueue our job we’ll call Exq.enqueue/4 and pass in four arguments: ‘Exq’, the name of the queue we want to handle this job (‘email’), the name of the worker (‘SendEmailWorker’), and the arguments that the ‘perform’ function in the worker requires.

web/controllers/message_controller.ex

defmodule Teacher.MessageController do
... 
    def create(conn, %{"message" => message_params}) do
      ... 
        Exq.enqueue(Exq, "email", Teacher.SendEmailWorker, [message_params])
      ... 
    end
 ... 
end

Now we’ll start up our server

$ mix phoenix.server

Going back to the contact form, if we fill it out and submit it again, the delay is gone. From our development logs we can see the message that was printed from our worker.

Another great feature of using Exq is that we can bring in the optional ‘exq_ui’ library, which provides an interface where we can easily view stats on our jobs.

To add it we’ll go back to our ‘Mixfile’ and add ‘exq_ui’ as a dependency.

mix.exs

defmodule Teacher.Mixfile do

  defp deps do
  ...
     {:exq, "~> 0.8.1"},
     {:exq_ui, "~> 0.8.2"}
  ...
  end
end

Then we’ll download our new package:

$ mix deps.get

In our ‘config’ file the options we have access to are: ‘webport’, ‘web_namespace’, and ‘sever’.

Since we’re using Phoenix we can remove ‘webport’ and ‘web_namespace’.

config/config.exs

config :exq_ui,
  server: true

Going to our ‘router’ let’s add a new pipeline. At the end of the pipeline we are using the ‘ExqUi.RouterPlug’ with the namespace set to the default of ‘exq’.

Then we’ll create a scope at ‘/exq’ and tell it to use the new ‘exq’ pipeline.

web/router.ex

defmodule Teacher.Router do
...
  pipeline :exq do
    plug :accepts, ["html"]
    plug :fetch_session
    plug :fetch_flash
    plug :put_secure_browser_headers
    plug ExqUi.RouterPlug, namespace: "exq"
  end
  
  scope "/exq", ExqUi do
    pipe_through :exq
    
    forward "/", RouterPlug.Router, :index
  end
...
end

Now if we restart our web server

$ mix phoenix.server

And go to ‘localhost:4000/exq’ we’ll see our new jobs UI. Here we can see how many jobs are being processed, what the size of each queue is, the jobs being retried. We can also remove jobs from a queue.

An important aspect of job processing is the ability to schedule jobs. Let’s say we wanted to wait one hour after our contact form was submitted before we actually delivered that email, how would we do that?

The ‘Exq’ API provides the ability to schedule jobs to be processed at some point in the future. So let’s update our code to do just that.

First we’ll go back to our ‘config’ and add scheduler_enable: true to our ‘Exq’ config.

config/config.exs

... 
config :exq,
  name: Exq,
  host: "127.0.0.1",
  port: 6379,
  namespace: "exq",
  concurrency: 500,
  queues: ["email"],
  scheduler_enable: true
...

Then in our ‘MessageController’ we’ll change our function from ‘enqueue’ to ‘enqueue_in’ and add 3600 seconds to get our hour delay.

web/controllers/message_controller.ex

defmodule Teacher.MessageController do
...
    def create(conn, %{"message" => message_params}) do
      …
      Exq.enqueue_in(Exq, "email", 3600, Teacher.SendEmailWorker, [message_params])
      …
    end
...
end

Now if we go back to our app we see an error. This is because we made changes to our config file and didn’t restart the server. So let’s go ahead and restart it. Now with the page loading, let’s submit another contact form. And if go to our jobs interface page, we’ll see that we have one scheduled job set to be processed in one hour.

© 2024 HEXMONSTER LLC