内容简介:在PSPDFKit,我们使用sbroker这个Erlang库提供了用于创建池和(或)负载调节器的构建模块。 它使用让我们看一个如何在Elixir应用程序中使用sbroker库的简单示例。
在PSPDFKit,我们使用 Elixir 构建可靠且高性能的分布式系统。 在这样的系统中,我们经常需要调用外部服务,同时异步消息传递允许客户端进行这些调用而不必等待响应。 无法立即处理的消息将排队并稍后处理,但是当队列过载时会发生什么呢? 由于我们不希望系统崩溃,因此我们必须使用反压机制,以防止队列无限增长。 此文介绍了如何使用 sbroker 库将反压机制应用于Elixir应用程序。
在Elixir应用中使用sbroker的Erlang库
sbroker这个Erlang库提供了用于创建池和(或)负载调节器的构建模块。 它使用 broker模式 ,其中与服务worker的通信由负责worker与其调用之间协调的broker处理。
一个简单例子
让我们看一个如何在Elixir应用程序中使用sbroker库的简单示例。
首先我们在命令行运行如下命令:
mix new example
它将在当前目录下创建一个样例工程,名字就叫做 “example” 。在该示例中,我们将模拟在一个worker中对外部服务的调用并由broker处理相关通信。为此,我们编辑example/mix.exs文件,以便将sbroker库添加到我们的应用程序中:
defmodule Example.Mixfile do
use Mix.Project
def project do
[
app: :example,
version: "0.1.0",
elixir: "~> 1.5",
start_permanent: Mix.env == :prod,
deps: deps()
]
end
def application do
[
applications: [:sbroker],
extra_applications: [:logger],
mod: {Example, []}
]
end
defp deps do
[{:sbroker, "~> 1.0-beta"}]
end
end
我们将sbroker库添加到上面的依赖项和应用程序中。我们也在18行指定了我们应用的模块 mod: {Example, []},这个模块我们稍后创建。现在我们准备添加一个broker,所以我们在example/lib/example/broker.ex中创建我们对broker模块:
defmodule Example.Broker do
@behaviour :sbroker
def start_link() do
start_link(timeout: 10000)
end
def start_link(opts) do
:sbroker.start_link({:local, __MODULE__}, __MODULE__, opts, [])
end
def init(opts) do
# See `DBConnection.Sojourn.Broker`.
# 使broker的“左”侧成为一个FIFO队列,在超时后丢弃请求。
client_queue =
{:sbroker_timeout_queue,
%{
out: :out,
timeout: opts[:timeout],
drop: :drop,
min: 0,
max: 128
}}
# 使broker对“右”侧成为一个FIFO队列,这个队列没有超时。
worker_queue =
{:sbroker_drop_queue,
%{
out: :out_r,
drop: :drop,
timeout: :infinity
}}
{:ok, {client_queue, worker_queue, []}}
end
end
上面的模块实现了sbroker行为。在第9行我们启动sbroker,并且在选项里设置超时为10秒。这个超时的意思是,当这些调用在队列里等候worker的时间超过10秒,它们将被丢弃。在init/1函数里,我们给broker定义了客户端和worker的队列。我们定义了broker模块后,我们需要定义worker模块,它负责定义worker,并且向broker请求任务。我们在 example/lib/example/worker.ex里定义worker模块:
defmodule Example.Worker do
use GenServer
alias Example.{Broker}
def start_link() do
GenServer.start_link(__MODULE__, [])
end
#
# GenServer 的回调函数
#
def init([]) do
state =
ask(%{
tag: make_ref()
})
{:ok, state}
end
def handle_info({tag, {:go, ref, {pid, {:fetch, [params]}}, _, _}}, %{tag: tag} = s) do
send(pid, {ref, fetch_from_external_resource(params)})
{:noreply, ask(s)}
end
# 当sborker发现有匹配的任务,它将给我们发送 {tag, {:go, ref, req, _, _}} 消息
defp ask(%{tag: tag} = s) do
{:await, ^tag, _} = :sbroker.async_ask_r(Broker, self(), {self(), tag})
s
end
defp fetch_from_external_resource(params) do
# 模拟处理工作
Process.sleep(1000)
{:ok, "External service called with#{inspect(params)}"}
end
end
fetchfrom_external_resource/1
函数是一个简单的模拟函数,它将使得进程等待一秒,然后返回 {:ok, “External service called with #{inspect(params)}”} 。当worker这个GenServer进程收到 {tag, {:go, ref, {pid, {:fetch, [params]}}, ,
}} 这样的消息时,这个函数将被调用。这个元组中的tag变量是一个唯一标识符,它被用来标识worker并且被存储在GenServer进程的状态中。
在worker获得所需数据后,它会向broker请求新的任务。我们已经定义了broker和worker模块,因此我们现在可以定义一个监督者,它将启动broker和一个worker池。监督者在 example/lib/example/supervisor.ex 里定义:
defmodule Example.Supervisor do
use Supervisor
alias Example.{Broker, Worker}
def start_link() do
Supervisor.start_link(__MODULE__, [])
end
def init(_args) do
pool_size = 5
broker = worker(Broker, [], id: :broker)
workers =
for id <- 1..pool_size do
worker(Worker, [], id: id)
end
worker_sup_opts = [strategy: :one_for_one, max_restarts: pool_size]
worker_sup = supervisor(Supervisor, [workers, worker_sup_opts], id: :workers)
supervise([broker, worker_sup], strategy: :one_for_one)
end
end
在这个例子中,我们的worker池有5个worker。我们基本完成工作了,还剩下要创建一个application模块,这个模块定义在 example/lib/example.ex 中:
defmodule Example do
use Application
alias Example.{Broker}
def start(_type, _args) do
Example.Supervisor.start_link()
end
def fetch_from_external_resource(params) do
perform({:fetch, [params]})
|> inspect()
|> IO.puts()
end
defp perform({action, args} = params) do
case :sbroker.ask(Broker, {self(), params}) do
{:go, ref, worker, _, _queue_time} ->
monitor = Process.monitor(worker)
receive do
{^ref, result} ->
Process.demonitor(monitor, [:flush])
result
{:DOWN, ^monitor, _, _, reason} ->
exit({reason, {__MODULE__, action, args}})
end
{:drop, _time} ->
{:error, :overload}
end
end
end
这个模块启动监督者,并且它有一个函数:fetch_from_external_resource/1,这个函数向broker请求一个worker,当broker能够为我们的请求分配一个worker的时候,会向这个worker发送了消息 {:fetch, [params]} 。当broker不能分配给我们worker的时候,返回的是 {:drop, time} 消息,这样的话,我们的私有函数 perform/1,将返回 {:error, :overload} 。函数 fetch_from_external_resource/1 将打印worker的返回值,或者因为broker丢弃了我们的请求而打印 {:error, :overload} 。
我们现在可以在iex里测试这个例子:
iex -S mix run
然后我们可以运行如下语句来从外部资源获取数据:
Example.fetch_from_external_resource("test")
一秒后在iex里会输出如下信息:
{:ok, "External service called with \"test\""}
:ok
为了模拟和测试更多的调用,我们可以通过运行以下语句多次并行地调用 Example.fetch_from_external_resource(“test”) :
Enum.each(1..500, fn _ ->
Task.start(fn ->
Example.fetch_from_external_resource("test")
end)
end)
这将打印相同的行并一次打印五行,因为我们的示例worker池包含五个worker。我们也将得到 {:error, :overload} 响应,因为broker不能分配worker并且任务在队列等待太长时间。 {:error, :overload} 响应是用于防止外部服务过载的反压力示例。例如,我们的系统现在可以通过HTTP / 1.1 429 Too Many Requests回复请求服务的客户端,并且它不会因为过载而崩溃。
以上所述就是小编给大家介绍的《如何用Sbroker在Elixir中构建一个反压队列系统》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- 基于 Swoole 构建高性能 Laravel 应用系列 —— 基于 Swoole 在 Laravel 中实现异步任务队列
- rabbitmq实现延时队列(死信队列)
- 消息队列(三)常见消息队列介绍
- 消息队列探秘 – RabbitMQ 消息队列介绍
- 消息队列和任务队列有什么区别?
- 数据结构之——队列与循环队列
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Design for Hackers
David Kadavy / Wiley / 2011-10-18 / USD 39.99
Discover the techniques behind beautiful design?by deconstructing designs to understand them The term ?hacker? has been redefined to consist of anyone who has an insatiable curiosity as to how thin......一起来看看 《Design for Hackers》 这本书的介绍吧!