#30: Moving to Elixir Part 6

Published December 13, 2017 6m 48s

Elixir 1.4.5

Phoenix 1.3

Flow 0.12.0


Our app is working pretty well right now. It displays our inventory and has a working task we can use to update that inventory. Now let’s play around with a few improvements we can make.

For example, wouldn’t it be great if our app could be in charge of running our inventory import instead of relying on some scheduler? Let’s see how we can do just that by leveraging some of the benefits of OTP.

Inside of our inventory_sync directory we’ll create a new file, worker.ex.

And we’ll make it a GenServer. Now let’s set this GenServer up so it’s responsible for updating our inventory. And let’s also configure it to be self scheduling.

First let’s define a function named start_link that will call GenServer.start_link with our current module.

And we’ll just pass an empty map for it’s arguments. Now let’s implement the init GenServer callback.

It will take our state. And then return ok and our state. With that let’s create another function that will schedule our import. Let’s call it schedule_import.

And inside it we’ll use the Process.send_after function to send a message after a certain amount of time.

The receiver, or destination, of our message will be the current process, which we’ll get with self(). Then let’s send the message :import. And let’s send it every five seconds to start. Now we’ll need to implement the GenServer handle_info function to handle the message we’re sending.

We’ll use :import to match on the message. And we’ll get the state. Let’s print a message to start. And once our message is printed, let’s reschedule it by calling schedule_import. Then we’ll return {:noreply, state}.

We’ll also need to schedule our initial import. So let’s go back to our init callback and we’ll call schedule_import there as well.

lib/dealership/inventory_sync/worker.exdefmodule Dealership.InventorySync.Worker do
  use GenServer

  def start_link do
    GenServer.start_link(__MODULE__, %{})
  end

  def init(state) do
    schedule_import() 
    {:ok, state}
  end

  def handle_info(:import, state) do
    IO.puts("Importing…")
    schedule_import()
    {:noreply, state}
  end

  defp schedule_import do
    Process.send_after(self(), :import, 5_000)
  end
end

Now let’s open our application.ex module.

And let’s supervise our new worker by adding it to our list of children. Now since our module is not a supervisor, we’ll use the worker/3 function.

defmodule Dealership.Application do
  use Application

  # See https://hexdocs.pm/elixir/Application.html
  # for more information on OTP Applications
  def start(_type, _args) do
    import Supervisor.Spec

    # Define workers and child supervisors to be supervised
    children = [
      # Start the Ecto repository
      supervisor(Dealership.Repo, []),
      # Start the endpoint when the application starts
      supervisor(DealershipWeb.Endpoint, []),
      # Start your own worker by calling: Dealership.Worker.start_link(arg1, arg2, arg3)
      # worker(Dealership.Worker, [arg1, arg2, arg3]),
      worker(Dealership.InventorySync.Worker, [])
    ]

    # See https://hexdocs.pm/elixir/Supervisor.html
    # for other strategies and supported options
    opts = [strategy: :one_for_one, name: Dealership.Supervisor]
    Supervisor.start_link(children, opts)
  end
  …
end

Then we’ll go to the command line and start our server.

$ mix phx.server

And if we watch the logs, we see our message is printed every five seconds. Our process is started and schedules some work to do, in this case printing our message, and the reschedules itself.

Now that we know it’s working - let’s add in some real work. Let’s go back to our worker.ex and we’ll add our import logic. Let’s also increase our scheduler to 10 seconds.

And we’ll again include module attributes for our URL and filename. And aliases for our Downloader and Importer modules.

lib/dealership/inventory_sync/worker.exdefmodule Dealership.InventorySync.Worker do
  use GenServer

  alias Dealership.InventorySync.{Downloader, Importer}

  @url 'https://elixircastsio.github.io/inventory/incoming/new_cars.csv'
  @filename "new_inventory.csv"

  def start_link do
    GenServer.start_link(__MODULE__, %{})
  end

  def init(state) do
    schedule_import() # Schedule the import
    {:ok, state}
  end

  def handle_info(:import, state) do
    @url
    |> Downloader.download!(@filename)
    |> Importer.sync_inventory!()

    schedule_import()
    {:noreply, state}
  end

  defp schedule_import do
    Process.send_after(self(), :import, 10_000)
  end
end

And if we go back and start our server:

$ mix phx.server

We see the queries generated by our import that’s running every 10 seconds.

When we built our importer, we didn’t do much error handling.

So now what happens if our import raises an error? Will our application run?

Let’s find out.

We’ll open our downloader.ex module. And let’s simulate an error by raising our CsvRequestError.

lib/dealership/inventory_sync/downloader.ex…
  defp csv_data!(url) do
    raise CsvRequestError
    # 
    # case fetch_csv_data(url) do
    #   {:ok, {{_, 200, _}, _headers, data}} ->
    #     data
    #   _ ->
    #     raise CsvRequestError
    # end
  end
…

Then let’s go back to the command line and start our server:

$ mix phx.server

And we see our error in the logs.

Now let’s go to the browser and check out our site: localhost:4000. And great, we’re able to access it normally. So what’s going on here?

Let’s look again at our application.ex

Our worker module is being supervised with the one_for_one strategy, which means if it terminates that process, and only that process, is restarted.

By default the restartoption is permanent, which means the child process is always restarted.

Let’s update it to never restart.

lib/dealership/application.ex…
worker(Dealership.InventorySync.Worker, [], restart: :temporary)
…

We’ll start our server again:

$ mix phx.server

And we see it crashes once and that’s it - the process is never restarted. Let’s go back to our application.ex and remove the restart option.

And let’s play around with one other option. This time for the strategy. By default our supervisor will tolerate a maximum of 3 restarts within 5 seconds. Let’s change it to a maximum of 1 restarts in 15 seconds.

lib/dealership/application.ex…
  def start(_type, _args) do
    import Supervisor.Spec

    # Define workers and child supervisors to be supervised
    children = [
      # Start the Ecto repository
      supervisor(Dealership.Repo, []),
      # Start the endpoint when the application starts
      supervisor(DealershipWeb.Endpoint, []),
      # Start your own worker by calling: Dealership.Worker.start_link(arg1, arg2, arg3)
      # worker(Dealership.Worker, [arg1, arg2, arg3]),
      worker(Dealership.InventorySync.Worker, [])
    ]

    # See https://hexdocs.pm/elixir/Supervisor.html
    # for other strategies and supported options
    opts = [strategy: :one_for_one, 
            name: Dealership.Supervisor,
            max_restarts: 1,
            max_seconds: 15]
    Supervisor.start_link(children, opts)
  end
…

We’ll start our server again.

$ mix phx.server

I’ve sped up the video here, but we see it fails once, then twice and exits with shutdown.

Let’s go back to our application.ex and remove our custom restart options. For now let’s remove our import worker from the list of children as well.

lib/dealership/application.ex  …
  def start(_type, _args) do
    import Supervisor.Spec

    # Define workers and child supervisors to be supervised
    children = [
      # Start the Ecto repository
      supervisor(Dealership.Repo, []),
      # Start the endpoint when the application starts
      supervisor(DealershipWeb.Endpoint, []),
      # Start your own worker by calling: Dealership.Worker.start_link(arg1, arg2, arg3)
      # worker(Dealership.Worker, [arg1, arg2, arg3]),
    ]

    # See https://hexdocs.pm/elixir/Supervisor.html
    # for other strategies and supported options
    opts = [strategy: :one_for_one, name: Dealership.Supervisor]
    Supervisor.start_link(children, opts)
  end
  …

We’ll also go back to our downloader.ex module and fix our csv_data! function.

lib/dealership/inventory_sync/downloader.ex…
defp csv_data!(url) do
    case fetch_csv_data(url) do
      {:ok, {{_, 200, _}, _headers, data}} ->
        data
      _ ->
        raise CsvRequestError
    end
  end
  …

Now let’s shift gears here a bit and imagine that the dealership has grown. We now have lots and lots of cars to import every day.

And not only do we have more cars to import, but because our app and the data along with it has grown, it take longer to import a single car.

Let’s simulate our issue. We’ll open our listings.ex module.

And in the create_or_update_car! function let’s set a sleep of 2 seconds at the beginning of the function with :timer.sleep(2_000)

lib/dealership/listings/listings.ex…
  def create_or_update_car!(params) do
    :timer.sleep(2_000)

    params
    |> get_car_struct()
    |> Car.changeset(params)
    |> Repo.insert_or_update!()
  end
…

Then from the command line compile our app:

$ mix compile

And then run our import task

$ mix dealership.importer

We can see a noticeable delay between each car.

How can we speed this up?

Luckily we’re using Elixir - and while there are a few different ways we could parallelize our imports, we’re going to using the Flow library, which is build on GenStage.

First we’ll open our ‘Mixfile’ and add flow to our list of dependencies.

mix.exsdefp deps do
  ...
  {:flow, "~> 0.12.0"},
  …
end

Then let’s download it.

$ mix deps.get

Now we can open our importer.ex module and get started. Because Flow works on collections, the API has a similar feel to both the Enum and Stream modules, this helps a lot when you’re refactoring to use it.

Let’s update our our import_inventory! function to use Flow. To get started, we’ll pipe the stream from CSV.decode! into Flow.from_enumerable() - this will start our flow.

Now let’s set a few options. We’ll set a max_demand of 1. And we’ll set the number of stages to 10.

You’ll want to experiment with these values to optimize the performance for the kind of data your working with, but this will be fine for our test case.

We can then pipe our flow into Flow.map and we’ll change our Enum.map to Flow.map. Now to run a Flow we can call and Enum function, or another option, if we don’t need a return value, is to call Flow.run. Since we aren’t relying on anything being returned, let’s go ahead and use Flow.run.

lib/dealership/inventroy_sync/importer.ex  …
  defp import_inventory!(filename) do
    filename
    |> File.stream!()
    |> CSV.decode!(headers: true)
    |> Flow.from_enumerable(max_demand: 1, stages: 10)
    |> Flow.map(&Listings.create_or_update_car!/1)
    |> Flow.run()
  end
  …

With that let’s go to the command line and compile our app:

$ mix compile

And then run our import task:

$ mix dealership.importer

And you can see from our logs that our import is now running much faster, because our data is being imported concurrently.

Ready to Learn More?

Subscribe to get access to all episodes and exclusive content.

Subscribe Now