#30: Moving to Elixir Part 6
Elixir 1.4.5
Phoenix 1.3
Flow 0.12.0
Our app is working pretty well right now. It displays our inventory and has a working task we can use to update that inventory. Now let’s play around with a few improvements we can make.
For example, wouldn’t it be great if our app could be in charge of running our inventory import instead of relying on some scheduler? Let’s see how we can do just that by leveraging some of the benefits of OTP.
Inside of our inventory_sync
directory we’ll create a new file, worker.ex
.
And we’ll make it a GenServer
. Now let’s set this GenServer
up so it’s responsible for updating our inventory. And let’s also configure it to be self scheduling.
First let’s define a function named start_link
that will call GenServer.start_link
with our current module.
And we’ll just pass an empty map for it’s arguments. Now let’s implement the init
GenServer
callback.
It will take our state. And then return ok and our state. With that let’s create another function that will schedule our import. Let’s call it schedule_import
.
And inside it we’ll use the Process.send_after
function to send a message after a certain amount of time.
The receiver, or destination, of our message will be the current process, which we’ll get with self()
. Then let’s send the message :import
. And let’s send it every five seconds to start. Now we’ll need to implement the GenServer handle_info
function to handle the message we’re sending.
We’ll use :import
to match on the message. And we’ll get the state. Let’s print a message to start. And once our message is printed, let’s reschedule it by calling schedule_import
. Then we’ll return {:noreply, state}
.
We’ll also need to schedule our initial import. So let’s go back to our init
callback and we’ll call schedule_import
there as well.
lib/dealership/inventory_sync/worker.exdefmodule Dealership.InventorySync.Worker do
use GenServer
def start_link do
GenServer.start_link(__MODULE__, %{})
end
def init(state) do
schedule_import()
{:ok, state}
end
def handle_info(:import, state) do
IO.puts("Importing…")
schedule_import()
{:noreply, state}
end
defp schedule_import do
Process.send_after(self(), :import, 5_000)
end
end
Now let’s open our application.ex
module.
And let’s supervise our new worker by adding it to our list of children. Now since our module is not a supervisor, we’ll use the worker/3
function.
defmodule Dealership.Application do
use Application
# See https://hexdocs.pm/elixir/Application.html
# for more information on OTP Applications
def start(_type, _args) do
import Supervisor.Spec
# Define workers and child supervisors to be supervised
children = [
# Start the Ecto repository
supervisor(Dealership.Repo, []),
# Start the endpoint when the application starts
supervisor(DealershipWeb.Endpoint, []),
# Start your own worker by calling: Dealership.Worker.start_link(arg1, arg2, arg3)
# worker(Dealership.Worker, [arg1, arg2, arg3]),
worker(Dealership.InventorySync.Worker, [])
]
# See https://hexdocs.pm/elixir/Supervisor.html
# for other strategies and supported options
opts = [strategy: :one_for_one, name: Dealership.Supervisor]
Supervisor.start_link(children, opts)
end
…
end
Then we’ll go to the command line and start our server.
$ mix phx.server
And if we watch the logs, we see our message is printed every five seconds. Our process is started and schedules some work to do, in this case printing our message, and the reschedules itself.
Now that we know it’s working - let’s add in some real work. Let’s go back to our worker.ex
and we’ll add our import logic. Let’s also increase our scheduler to 10 seconds.
And we’ll again include module attributes for our URL and filename. And aliases for our Downloader
and Importer
modules.
lib/dealership/inventory_sync/worker.exdefmodule Dealership.InventorySync.Worker do
use GenServer
alias Dealership.InventorySync.{Downloader, Importer}
@url 'https://elixircastsio.github.io/inventory/incoming/new_cars.csv'
@filename "new_inventory.csv"
def start_link do
GenServer.start_link(__MODULE__, %{})
end
def init(state) do
schedule_import() # Schedule the import
{:ok, state}
end
def handle_info(:import, state) do
@url
|> Downloader.download!(@filename)
|> Importer.sync_inventory!()
schedule_import()
{:noreply, state}
end
defp schedule_import do
Process.send_after(self(), :import, 10_000)
end
end
And if we go back and start our server:
$ mix phx.server
We see the queries generated by our import that’s running every 10 seconds.
When we built our importer, we didn’t do much error handling.
So now what happens if our import raises an error? Will our application run?
Let’s find out.
We’ll open our downloader.ex
module. And let’s simulate an error by raising our CsvRequestError
.
lib/dealership/inventory_sync/downloader.ex…
defp csv_data!(url) do
raise CsvRequestError
#
# case fetch_csv_data(url) do
# {:ok, {{_, 200, _}, _headers, data}} ->
# data
# _ ->
# raise CsvRequestError
# end
end
…
Then let’s go back to the command line and start our server:
$ mix phx.server
And we see our error in the logs.
Now let’s go to the browser and check out our site: localhost:4000
. And great, we’re able to access it normally. So what’s going on here?
Let’s look again at our application.ex
Our worker module is being supervised with the one_for_one
strategy, which means if it terminates that process, and only that process, is restarted.
By default the restart
option is permanent, which means the child process is always restarted.
Let’s update it to never restart.
lib/dealership/application.ex…
worker(Dealership.InventorySync.Worker, [], restart: :temporary)
…
We’ll start our server again:
$ mix phx.server
And we see it crashes once and that’s it - the process is never restarted. Let’s go back to our application.ex
and remove the restart option.
And let’s play around with one other option. This time for the strategy. By default our supervisor will tolerate a maximum of 3 restarts within 5 seconds. Let’s change it to a maximum of 1 restarts in 15 seconds.
lib/dealership/application.ex…
def start(_type, _args) do
import Supervisor.Spec
# Define workers and child supervisors to be supervised
children = [
# Start the Ecto repository
supervisor(Dealership.Repo, []),
# Start the endpoint when the application starts
supervisor(DealershipWeb.Endpoint, []),
# Start your own worker by calling: Dealership.Worker.start_link(arg1, arg2, arg3)
# worker(Dealership.Worker, [arg1, arg2, arg3]),
worker(Dealership.InventorySync.Worker, [])
]
# See https://hexdocs.pm/elixir/Supervisor.html
# for other strategies and supported options
opts = [strategy: :one_for_one,
name: Dealership.Supervisor,
max_restarts: 1,
max_seconds: 15]
Supervisor.start_link(children, opts)
end
…
We’ll start our server again.
$ mix phx.server
I’ve sped up the video here, but we see it fails once, then twice and exits with shutdown
.
Let’s go back to our application.ex
and remove our custom restart options. For now let’s remove our import worker from the list of children as well.
lib/dealership/application.ex …
def start(_type, _args) do
import Supervisor.Spec
# Define workers and child supervisors to be supervised
children = [
# Start the Ecto repository
supervisor(Dealership.Repo, []),
# Start the endpoint when the application starts
supervisor(DealershipWeb.Endpoint, []),
# Start your own worker by calling: Dealership.Worker.start_link(arg1, arg2, arg3)
# worker(Dealership.Worker, [arg1, arg2, arg3]),
]
# See https://hexdocs.pm/elixir/Supervisor.html
# for other strategies and supported options
opts = [strategy: :one_for_one, name: Dealership.Supervisor]
Supervisor.start_link(children, opts)
end
…
We’ll also go back to our downloader.ex
module and fix our csv_data!
function.
lib/dealership/inventory_sync/downloader.ex…
defp csv_data!(url) do
case fetch_csv_data(url) do
{:ok, {{_, 200, _}, _headers, data}} ->
data
_ ->
raise CsvRequestError
end
end
…
Now let’s shift gears here a bit and imagine that the dealership has grown. We now have lots and lots of cars to import every day.
And not only do we have more cars to import, but because our app and the data along with it has grown, it take longer to import a single car.
Let’s simulate our issue. We’ll open our listings.ex
module.
And in the create_or_update_car!
function let’s set a sleep of 2 seconds at the beginning of the function with :timer.sleep(2_000)
lib/dealership/listings/listings.ex…
def create_or_update_car!(params) do
:timer.sleep(2_000)
params
|> get_car_struct()
|> Car.changeset(params)
|> Repo.insert_or_update!()
end
…
Then from the command line compile our app:
$ mix compile
And then run our import task
$ mix dealership.importer
We can see a noticeable delay between each car.
How can we speed this up?
Luckily we’re using Elixir - and while there are a few different ways we could parallelize our imports, we’re going to using the Flow
library, which is build on GenStage
.
First we’ll open our ‘Mixfile’ and add flow
to our list of dependencies.
mix.exsdefp deps do
...
{:flow, "~> 0.12.0"},
…
end
Then let’s download it.
$ mix deps.get
Now we can open our importer.ex
module and get started. Because Flow
works on collections, the API has a similar feel to both the Enum
and Stream
modules, this helps a lot when you’re refactoring to use it.
Let’s update our our import_inventory!
function to use Flow. To get started, we’ll pipe the stream from CSV.decode!
into Flow.from_enumerable()
- this will start our flow.
Now let’s set a few options. We’ll set a max_demand
of 1. And we’ll set the number of stages
to 10.
You’ll want to experiment with these values to optimize the performance for the kind of data your working with, but this will be fine for our test case.
We can then pipe our flow into Flow.map
and we’ll change our Enum.map
to Flow.map
. Now to run a Flow
we can call and Enum
function, or another option, if we don’t need a return value, is to call Flow.run
. Since we aren’t relying on anything being returned, let’s go ahead and use Flow.run
.
lib/dealership/inventroy_sync/importer.ex …
defp import_inventory!(filename) do
filename
|> File.stream!()
|> CSV.decode!(headers: true)
|> Flow.from_enumerable(max_demand: 1, stages: 10)
|> Flow.map(&Listings.create_or_update_car!/1)
|> Flow.run()
end
…
With that let’s go to the command line and compile our app:
$ mix compile
And then run our import task:
$ mix dealership.importer
And you can see from our logs that our import is now running much faster, because our data is being imported concurrently.