Subscribe to access all episodes. View plans →

#58: Recurring Work with GenServer

Published July 9, 2018

Elixir 1.6

HTTPoison 1.2

Jason 1.0

Source code on GitHub


Generally if you needed to do some kind of recurring work you’d maybe use something like a cron or maybe even a separate library. In this episode we’re going to see how we can use GenServer to schedule some recurring work. We’ll create a GenServer process that fetches the current price of Bitcoin at a regular interval.

The first thing we’ll need to do is create a new Elixir project. Let’s call our’s teacher and we’ll pass the --sup option to create an OTP application skeleton with a supervision tree.

$ mix new teacher --sup
* creating README.md
* creating .formatter.exs
* creating .gitignore
* creating mix.exs
* creating config
* creating config/config.exs
* creating lib
* creating lib/teacher.ex
* creating lib/teacher/application.ex
* creating test
* creating test/test_helper.exs
* creating test/teacher_test.exs

Your Mix project was created successfully.
You can use "mix" to compile it, test it, and more:

    cd teacher
    mix test

Run "mix help" for more commands.

Then let’s change into our new directory and open our project. Since our project will be fetching the price of a Bitcoin, we need a place to fetch Bitcoin data.

Let’s use the free API provided by CoinCap. We can use the “/page” endpoint to get data about a specific coin.

Alright, now that we know where we’ll want to fetch our data from, we just need a way to fetch the data. Let’s bring in two packages to help us. We’ll use the “HTTPoison” library to get the price data from CoinCap. Let’s copy the package.

The we’ll go back to our project and open the Mixfile and add httpoison to our list of dependencies. We’ll also add the jason package to help us parse the JSON that’s returned.

mix.exs

...

defp deps do
[
{:httpoison, "~> 1.2"},
{:jason, "~> 1.0"}
]
end
...

Then let’s go to the command line and download our dependencies.

$ mix deps.get
...

Great, now let’s see how we can fetch the price of a Bitcoin.

Let’s start iex with our project. The let’s get the pricing data. We’ll call HTTPoison.get! passing in the URL we want to want to get data from. We’ll use the “/page” endpoint to get data about a specific cryptocurrency and we’ll give it the ID of the coin we want to get - in this case BTC for Bitcoin.

Great, we were able to get the data. Let’s take our response body and pass it into Jason.decode!. And great, we see that the data was parsed and we can see the information that’s provided, like the name of the coin, the market cap, and the current price. This is exactly the data we want.

$ iex -S mix
...
Generated teacher app
> response = HTTPoison.get!("http://coincap.io/page/BTC")
%HTTPoison.Response{
...
}
> response.body |> Jason.decode!()
%{
  "_id" => "179bd7dc-72b3-4eee-b373-e719a9489ed9",
  "altCap" => 151272960760.85364,
  "alt_name" => "bitcoin",
  ...
}

Let’s close our iex session. Then if we go into our “lib/teacher” directory, we see a module named “application”. Let’s open it.

When our Elixir application is started, our start callback is called, which starts our supervision tree. You can read more about applications in Elixir here in the Elixir docs, which I’ve linked to here: https://hexdocs.pm/elixir/Application.html

But for our project all you really need to remember is that if a worker module is included here in our “children” it will be started automatically as part of our supervision tree and will be supervised according to the options specified here: strategy: one_for_one

What this means is that if something happens while we’re trying to fetch our data and our child process is terminated, only that process will be restarted. Alright, now let’s create a module to do some work. Let’s create a new module in the same directory named coin_data_worker.ex. The we’ll define our module.

And we’ll make this module a GenServer. If you’re new to GenServers and want to learn more, check out episode #12 where we get an introduction to them. Our GenServer will schedule recurring work by sending a message to itself in a specified interval.

First off let’s implement the start_link function we’ll use the current module and pass down any arguments. And let’s make this a named GenServer, using the current module for the name, Then we’ll implement the init callback. We’ll need to return an OK tuple with our state.

Now when our GenServer is started, let’s schedule the first fetch of our coin data. We’ll do that here, so let’s call a new a function we’ll need to implement called schedule_coin_fetch(), then let’s implement it as a private function.

Inside the function we’ll call Process.send_after. This allows us to send a message to a process after a certain interval. Let’s use self() for the destination process, we’ll call our message :coin_fetch, and let’s schedule it to happen in 5 seconds. We’ll use 5 seconds here so we have some nice feedback when we run it. Normally we wouldn’t want to this to be as frequent since prices are only updated every minute of so.

Now we need to implement a handle_info callback to handle this message. We’ll pattern match on the :coin_fetch atom and accept the current state of the GenServer.

Now let’s fetch our coin data. We’ll take the URL we’ll use to get our data and pipe it into HTTPoison.get! and then into Map.get to get the response body, then we’ll decode it with Jason.decode!.

Finally let’s get the price with Map.get, now that we have the price let’s print what the price was.

Then we’ll need to return an ‘noreply’ tuple and let’s update the state of our GenServer to hold the current price of a Bitcoin. This will be triggered when our GenServer is started and the price will be logged. Now if we want this to keep running and print our message again in another 5 seconds, we’ll need to make another call to our schedule_coin_fetch function. So let’s add that.

And let’s clean our function up a bit and move the logic that fetches our coin price into it’s own function we’ll call coin_price.

lib/teacher/coin_data_worker.ex

defmodule Teacher.CoinDataWorker do
  use GenServer

  def start_link(args) do
    GenServer.start_link(__MODULE__, args, name: __MODULE__)
  end

  def init(state) do
    schedule_coin_fetch()
    {:ok, state}
  end

  def handle_info(:coin_fetch, state) do
    price = coin_price()
    IO.inspect("Current Bitcoin price is $#{price}")
    schedule_coin_fetch()
    {:noreply, Map.put(state, :btc, price)}
  end

  defp coin_price do
    "http://coincap.io/page/BTC"
    |> HTTPoison.get!()
    |> Map.get(:body)
    |> Jason.decode!()
    |> Map.get("price_usd")
  end

  defp schedule_coin_fetch do
    Process.send_after(self(), :coin_fetch, 5_000)
  end
end

Now that our module is finished, let’s add it to our supervisor. We’ll go back to our application.ex module. And let’s include our new module in our children list. For the initial state we’ll use an empty map.

lib/teacher/application.ex

...
{Teacher.CoinDataWorker, %{}},
...

Now let’s go to the command line and run iex with our project. And we see the price of a Bitcoin is being logged.

Let’s also start our :observer. And if we click on the “applications” tab we see our supervisor and it has a single child process - our CoinDataWorker.

We see it’s named this in the observer since we added the module for the name earlier. And if we go to it and then click the “state” tab we see the state of our GenServer has the current price of a Bitcoin.

$ iex -S mix
"Current Bitcoin price is $6149.728"
"Current Bitcoin price is $6149.728"
"Current Bitcoin price is $6149.728"
"Current Bitcoin price is $6149.728"
> :observer.start()

This is great - we’re fetching data from the API, parsing it, and logging the price. Then our GenServer is scheduling itself to fetch the price again in the future.

This is pretty cool. But now let’s say we wanted to get the prices of other cryptocurrencies - not just Bitcoin. How can we update our program to do that?

Let’s go our coin_data_worker.ex module. There are a couple different ways we could do this. We could store the price for any cryptocurrency we wanted to track in this single GenServer process. However let’s create a CoinDataWorker process for each cryptocurrency we want to track.

To differentiate what coin we want to track, let’s pass the ID of the coin in the arguments. So for bitcoin it will be :btc as a lowercase atom.

Our args will be a map, so let’s grab the ID from them. And because we can’t start multiple GenServers with the same name, let’s use the ID for the name.

Since we want to make the lookup for the coin we are getting data for dynamic, let’s go to our handle_info callback. And we’ve hardcoded this to be about Bitcoin, so let’s fix that.

First since we’ll have access to the coin’s ID in our state, let’s get that. We’ll take our state and pipe it into Map.get :id to get the id.

Then let’s pipe it into our coin_price function. But we’ll now want this to return more data than just the price. In fact let’s rename it to coin_data and have it return all the coin data we get back from the API. Then we’ll go where we’ve defined our coin_price function, rename it, and update it to take the ID.

Since the ID will be a lowercase atom, we’ll need to convert to an uppercase string to use in the URL. So let’s take our ID and pipe it into Atom.to_string then we’ll pipe our string into String.upcase

Now that we’ll have our coin ID we can use it in our URL. Let’s create another function we’ll add to our pipeline to build our URL. Let’s call it url

We’ll define our new function url and it will take the ID. Then in the function we’ll update our URL and append the ID to it. Then back in our coin_data function we’ll get the response body and decode the JSON just like before, only now let’ return the whole data map.

Alright, now that we are returning all our data, let’s create a way to merge our existing state with the data we care about. Let’s add a new function to our pipeline called update_state that we’ll need to implement. It will take our new data and our existing state.

Then let’s define it as a private function and let’s pattern match on our new data to get the name of the coin and the price.

Then inside the function we’ll call Map.merge to update our existing state with our new values. Great then back in our handle_info callback. We’re no longer returning a price, so let’ change the variable name to updated_state, and let’s update our message.

Now this will log the current price of a cryptocurrency regardless of whether the price has changed or not. This is pretty noisy and will be even worse now that we want to track more coins. So let’s update this to only print if our new price is different than our existing price. Finally let’s return our updated_state

lib/teacher/coin_data_worker.ex

...
def start_link(args) do
  id = Map.get(args, :id)
  GenServer.start_link(__MODULE__, args, name: id)
end

def handle_info(:coin_fetch, state) do
  updated_state = state
    |> Map.get(:id)
    |> coin_data()
    |> update_state(state)

  if updated_state[:price] != state[:price] do
    IO.inspect("Current #{updated_state[:name]} price is $#{updated_state[:price]}")
  end

  {:noreply, updated_state}
end

defp coin_data(id) do
  id
  |> Atom.to_string()
  |> String.upcase()
  |> url()
  |> HTTPoison.get!()
  |> Map.get(:body)
  |> Jason.decode!()
end

defp url(id) do
  "http://coincap.io/page/" <> id
end

defp update_state(%{"display_name" => name, "price" => price}, existing_state) do
  Map.merge(existing_state, %{name: name, price: price})
end
...

Great, now this will work, but let’s break up our module a bit. Let’s move the logic that fetches the coin data into its own module.

We’ll create a new module named coin_data.ex, then let’s give it a public function named fetch and back in our coin_data_worker.ex let’s copy our two functions that fetch the coin data - url and coin_data. Then let’s paste them into the coin_data.ex module. And we’ll move our pipeline from the coin_data function into the fetch function and update it to take an ID.

lib/teacher/coin_data.ex

defmodule Teacher.CoinData do

  def fetch(id) do
    id
    |> Atom.to_string()
    |> String.upcase()
    |> url()
    |> HTTPoison.get!()
    |> Map.get(:body)
    |> Jason.decode!()
  end

  defp url(id) do
    "http://coincap.io/page/" <> id
  end
end

Then we’ll go back to our coin_data_worker.ex module and where we were calling the coin_data function, we’ll now call CoinData.fetch and let’s add an alias for it so we can call it without the prefix.

lib/teacher/coin_data_worker.ex

...
  alias Teacher.CoinData
...
  def handle_info(:coin_fetch, state) do
    updated_state = state
      |> Map.get(:id)
      |> CoinData.fetch()
      |> update_state(state)

    if updated_state[:price] != state[:price] do
      IO.inspect("Current #{updated_state[:name]} price is $#{updated_state[:price]}")
    end

    {:noreply, updated_state}
  end
...

Now let’s go to our application.ex module. And let’s keep this simple and just add a new private function named get_children. Then we’ll call Enum.map with a list of the coin IDs we want to track - Bitcoin, Ethereum , and Litecoin.

Let’s return our Teacher.CoinDataWorker and we’ll need to include a map that has the coin ID so our module can use it. Then let’s update our start callback to use our new function.

lib/teacher/application.ex

...
def start(_type, _args) do
  # List all child processes to be supervised
  children = get_children()

  # See https://hexdocs.pm/elixir/Supervisor.html
  # for other strategies and supported options
  opts = [strategy: :one_for_one, name: Teacher.Supervisor]
  Supervisor.start_link(children, opts)
end

defp get_children do
  Enum.map([:btc, :eth, :ltc], fn(coin) ->
    {Teacher.CoinDataWorker, %{id: coin}}
  end)
end
...

Then let’s go to the command line and start iex with our project - and we get an error. Elixir is telling up that multiple child specifications are using the same ID.

But if we read farther down, Elixir tells us that we can customize the ID with the Supervisor.child_spec function, giving it a unique ID. So let’s do that.

$ iex -S mix
Compiling 2 files (.ex)
** (Mix) Could not start application teacher: Teacher.Application.start(:normal, []) returned an error: bad child specification, more than one child specification has the id: Teacher.CoinDataWorker.
If using maps as child specifications, make sure the :id keys are unique.
If using a module or {module, arg} as child, use Supervisor.child_spec/2 to change the :id, for example:

    children = [
      Supervisor.child_spec({MyWorker, arg}, id: :my_worker_1),
      Supervisor.child_spec({MyWorker, arg}, id: :my_worker_2)
    ]

We’ll go back to our application.ex module and update our get_children function to use Supervisor.child_spec and we’ll use the coin ID for the ID.

lib/teacher/application.ex

...
defp get_children do
  Enum.map([:btc, :eth, :ltc], fn(coin) ->
    Supervisor.child_spec({Teacher.CoinDataWorker, %{id: coin}}, id: coin)
  end)
end
...

Now we can start our project again. And perfect, the prices for our three coins are printed.

$ iex -S mix 
> "Current Litecoin price is $81.0156"
"Current Ethereum price is $454.076"
"Current Bitcoin price is $6148.09"
> :observer.start()

And let’s start the observer. Because we are using the coin ID for the GenServer name, we see all three of our processes have the coin ID for their name. Let’s choose one. And click the “state” tab.

Perfect, we see all the information is stored. Our program is working and printing the price for us when it change.

© 2024 HEXMONSTER LLC