内容简介:For the Dashbit website, we wanted to avoid tracking users as much as possible. This means no cookies and unfortunately most analytics use cookies for tracking and/or fingerprinting. However, we still want to see which pages on our website are being freque
For the Dashbit website, we wanted to avoid tracking users as much as possible. This means no cookies and unfortunately most analytics use cookies for tracking and/or fingerprinting. However, we still want to see which pages on our website are being frequently accessed. For this purpose, we have decided to roll our own analytics system.
In this article, we will cover how we implemented the analytics system with Ecto upserts and how we have used the Elixir registry and Elixir processes to reduce the pressure on the database.
Tracking with upserts
The idea is very simple: every time someone accesses a page, we will store this information in the database. However, we don’t need to track each access at the instant they happen. For us, tracking how many accesses a page had in a day is completely fine. Therefore, every time a page is accessed on a given date, we will attempt to insert an entry in the database. If an entry already exists, we update its counter instead.
Luckily, this can be done with an upsert in Ecto. Let’s first define the schema for the database resource:
defmodule MyApp.Metrics.Metric do use Ecto.Schema @primary_key false schema "metrics" do field :date, :date, primary_key: true field :path, :string, primary_key: true field :counter, :integer, default: 0 end end
It has three fields: a date, the page path, and the counter (number of accesses). The date and path make a composite primary key. Our migration looks like this:
defmodule Dashbit.Repo.Migrations.CreateMetrics do use Ecto.Migration def change do create table(:metrics, primary_key: false) do add :date, :date, primary_key: true add :path, :string, primary_key: true add :counter, :integer, default: 0 end end end
Now we execute the following command whenever we want to count one page access:
defp upsert!(path, counter) do import Ecto.Query date = Date.utc_today() query = from(m in Dashbit.Metrics.Metric, update: [inc: [counter: ^counter]]) Dashbit.Repo.insert!( %Dashbit.Metrics.Metric{date: date, path: path, counter: counter}, on_conflict: query, conflict_target: [:date, :path] ) end
The code above performs an upsert, incrementing the number of accesses in a page by the value of counter
, which is typically 1. If an entry does not exist, one is immediately created.
This is the core of our analytics. It is a very straight-forward solution, but it does have a strong requirement on the database accepting all of our writes. While most applications heavily rely on a database, the analytics system is the only place in our website that uses a database, so we believe it is important to show an article, such as this blog post, even if there is an error when talking to the storage layer. To address this, we have decided to move the upserts to separate processes.
Async and batched writes with processes
As laid out in the previous section, we want to move all the database writes done by our analytics code to a separate process. Another concern we have with our solution so far is how it will handle overloads. If there is a huge spike in traffic, could we end up putting too much pressure in the database? In this sense, would it be a good idea to batch our writes?
To be honest, our application will be just fine with spikes. Most of our page loads are within hundreds of microseconds, thanks to Phoenix, and our database usage is minimal. On the other hand, such a small project is a perfect opportunity to experiment, so we decided to explore how our analytics solution would look like if we performed writes asynchronously and in batches.
Here is what we came up with. Every time a user accesses a page, we will spawn an Elixir process that tracks all accesses to that page. If a process already exists for said page, we will message the existing process instead. The goal of this process is to collect all accesses within a time internal, writing to the database after X seconds.
We are going to call this the Worker
process and it starts like this:
defmodule Dashbit.Metrics.Worker do use GenServer, restart: :temporary
We define a module for the process and declare it as a
GenServer
. We also say that this process is :temporary
. I.e. if it dies, we don’t want the supervisor to restart it. That’s because we are assuming that, if the process dies, our logic that dynamically spawns processes for each page will eventually start a new one anyway.
Next we define the init
callback of the process:
@impl true def init(path) do Process.flag(:trap_exit, true) {:ok, {path, _counter = 0}} end
The init
callback traps exits and sets the process state to {path, 0}
. The first element is the page path, the second element is the number of page visits.
Our process should be able to receive a :bump
message. This message is sent whenever we need to bump the counter and is handled by the handle_info
callback:
@impl true def handle_info(:bump, {path, 0}) do schedule_upsert() {:noreply, {path, 1}} end @impl true def handle_info(:bump, {path, counter}) do {:noreply, {path, counter + 1}} end
If we receive the :bump
when the page had no access (i.e. counter is zero), we will bump the counter to 1 and we will also schedule an upsert event, so we eventually write those accesses to the database. If the counter is more than 0, we simply bump it and return an updated state.
The scheduling and upsert code will look like this:
defp schedule_upsert() do Process.send_after(self(), :upsert, Enum.random(10..20) * 1_000) end @impl true def handle_info(:upsert, {path, counter}) do upsert!(path, counter) {:noreply, {path, 0}} end defp upsert!(path, counter) do # same function as the previous section end
The schedule_upsert()
function schedules a message to the current process ( self()
). The message will be named :upsert
and it will be delivered in a random value between 10s to 20s. The reason we picked a random value is to avoid a scenario where multiple processes for different pages are spawned at the same time and they all write to the database at the same time.
Next we define another handle_info
clause, this time to handle the scheduled :upsert
message. This clause simply invokes the upsert!
function, defined in the previous section, and resets the state back to {path, 0}
. This makes it so that, once there is a new bump, we will schedule a new upsert.
Finally, we implement the terminate
callback, which will be invoked whenever our application is shutting down:
@impl true def terminate(_, {_path, 0}), do: :ok def terminate(_, {path, counter}), do: upsert!(path, counter) end
If our application is shutting down, we may have pending writes in our worker, so we want to send them to the database as part of our termination logic. One important thing to remember is that the terminate
callback is not called by default when shutting down, unless you are trapping exits. That’s why we called Process.flag(:trap_exit, true)
in the init
function.
The process we just implemented delivers all of the requirements we have so far: writes are now asynchronous, as they happen in a separate process, and they are also batched, using intervals between 10s and 20s. The last step we need to implement is to actually spawn those processes on the fly as users navigate through the website.
Dynamic processes with the Elixir registry
In order to spawn and find processes for each page, we are going to use
Elixir’s Registry
. We also need a dynamic supervisor which is going to be the parent of all worker processes. Let’s implement this logic in the overaching Metrics
module, alongside our bump(page)
function.
Let’s get started with the basics:
defmodule Dashbit.Metrics do use Supervisor @worker Dashbit.Metrics.Worker @registry Dashbit.Metrics.Registry @supervisor Dashbit.Metrics.WorkerSupervisor
Our Dashbit.Metrics
module is a
Supervisor
, which will have two children: the registry and the supervisor of all workers. Since the workers are started dynamically, as requests come, we will use a DynamicSupervisor
. We store the names of the worker, registry and dynamic supervisor processes in module attributes for convenience.
Next we will define how our supervisor is started and its init
callback:
def start_link(_opts) do Supervisor.start_link(__MODULE__, :ok, name: __MODULE__) end @impl true def init(:ok) do children = [ {Registry, keys: :unique, name: @registry}, {DynamicSupervisor, name: @supervisor, strategy: :one_for_one} ] Supervisor.init(children, strategy: :one_for_all) end
With the registry and dynamic supervisor in place, we can write the bump function:
def bump(path) when is_binary(path) do pid = case Registry.lookup(@registry, path) do [{pid, _}] -> pid [] -> case DynamicSupervisor.start_child(@supervisor, {@worker, path}) do {:ok, pid} -> pid {:error, {:already_started, pid}} -> pid end end send(pid, :bump) end end
The bump
function looks up in the registry if there is a process for the given path and returns its process identifier ( pid
). If one does not exist, we ask the worker supervisor to start a worker dynamically. We expect two possible outcomes from start_child
:
-
{:ok, pid}
- the worker was started -
{:error, {:already_started, pid}}
- a worker for the givenpath
already exists
We need the second branch to address a potential race condition where two users accessing a page for the first time will attempt to spawn the same worker more than once. Once we find the pid
, we send it the :bump
message.
We are almost there. There are just two steps left. First, we need to configure the worker to register itself whenever it is started. This is done via the start_link
function. Let’s go back to the worker and add this:
@registry Dashbit.Metrics.Registry def start_link(path) do GenServer.start_link(__MODULE__, path, name: {:via, Registry, {@registry, path}}) end
Now we just need to start the Dashbit.Metrics
supervision tree. This is typically done in your application supervision tree, typically located in “lib/my_app/application.ex”:
children = [ Dashbit.Repo, Dashbit.Metrics, Dashbit.Endpoint ]
And that’s it. Now whenever a user accesses a page, we just need to call Dashbit.Metrics.bump(path)
where path
is the current page address. In our case, we store just the path, without host and without the query string). If you are using Plug
, it can be built from the conn.path_info
field. We also only perform writes if the page was successfully rendered with 200 status. Overall, our bumping code looks like this:
plug :bump_metric defp bump_metric(conn, _opts) do register_before_send(conn, fn conn -> if conn.status == 200 do path = "/" <> Enum.join(conn.path_info, "/") Dashbit.Metrics.bump(path) end conn end) end
Summary
In this article we have covered a minimal analytics system, using Ecto, GenServer and Elixir’s Registry, that performs writes asynchronously and in batches. The usage of the Registry to dynamically spawn processes that map to different resources, each with their own life-cycle, can be used in many different scenarios.
One important aspect in our solution is that, after a process for a page is created, it stays alive until there is a new deployment. This works for us because we have less than 100 pages, so we know the maximum number of processes is bound to a very low value.
Although Elixir process are lightweight thanks to the Erlang VM, if we had a large number of pages, such as millions of pages, we could potentially end-up with hundreds of thousands of unused processes. In this case, we would slightly change our solution to terminate the process after every upsert. Something along these lines:
@impl true def handle_info(:upsert, {path, counter}) do # We first unregister ourselves so we stop receiving new # messages. We then schedule to stop after processing all # pending messages to finally upsert on terminate. Registry.unregister(@registry, path) send(self(), :stop) {:noreply, {path, counter}} end @impl true def handle_info(:stop, {path, counter}) {:stop, :shutdown, {path, counter}} end
That’s it, we hope you have enjoyed the article and learned a thing or two that could be useful in your next project!
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
超级连接者:破解新互联时代的成功密码
伊桑•祖克曼(ETHAN ZUCKERMAN) / 林玮、张晨 / 浙江人民出版社 / 2018-8-1 / CNY 72.90
● 我们生活在一个互联互通的世界,我们需要辩证地看待某些事件,发现隐藏在背后的真相。着眼当下,看清彼此之间的联系,而非凭空幻想未来世界联系之紧密。数字世界主义要求我们承担起责任,让隐藏的联系变成现实。 ● 我们对世界的看法是局限的、不完整的、带有偏见的。如果我们想要改变从这个广阔的世界所获取的信息,我们需要做出结构性的改变。 ● 建立联系是一种新的力量。无论是在国家层面、企业层面还是个......一起来看看 《超级连接者:破解新互联时代的成功密码》 这本书的介绍吧!