58a4f350a8
This patch refactors gun pooling to use Elixir process registry and simplifies adapter option insertion. Having the pool use process registry instead of a GenServer has a number of advantages: - Simpler code: the initial implementation adds about half the lines of code it deletes - Concurrency: unlike a GenServer, ETS-based registry can handle multiple checkout/checkin requests at the same time - Precise and easy idle connection clousure: current proposal for closing idle connections in the GenServer-based pool needs to filter through all connections once a minute and compare their last active time with closing time. With Elixir process registry this can be done by just using `Process.send_after`/`Process.cancel_timer` in the worker process. - Lower memory footprint: In my tests `gun-memory-leak` branch uses about 290mb on peak load (250 connections) and 235mb on idle (5-10 connections). Registry-based pool uses 210mb on idle and 240mb on peak load
138 lines
3.6 KiB
Elixir
138 lines
3.6 KiB
Elixir
# Pleroma: A lightweight social networking server
|
|
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
|
|
# SPDX-License-Identifier: AGPL-3.0-only
|
|
|
|
defmodule Pleroma.Gun.Conn do
|
|
alias Pleroma.Gun
|
|
|
|
require Logger
|
|
|
|
def open(%URI{} = uri, opts) do
|
|
pool_opts = Pleroma.Config.get([:connections_pool], [])
|
|
|
|
opts =
|
|
opts
|
|
|> Enum.into(%{})
|
|
|> Map.put_new(:retry, pool_opts[:retry] || 1)
|
|
|> Map.put_new(:retry_timeout, pool_opts[:retry_timeout] || 1000)
|
|
|> Map.put_new(:await_up_timeout, pool_opts[:await_up_timeout] || 5_000)
|
|
|> Map.put_new(:supervise, false)
|
|
|> maybe_add_tls_opts(uri)
|
|
|
|
do_open(uri, opts)
|
|
end
|
|
|
|
defp maybe_add_tls_opts(opts, %URI{scheme: "http"}), do: opts
|
|
|
|
defp maybe_add_tls_opts(opts, %URI{scheme: "https", host: host}) do
|
|
tls_opts = [
|
|
verify: :verify_peer,
|
|
cacertfile: CAStore.file_path(),
|
|
depth: 20,
|
|
reuse_sessions: false,
|
|
verify_fun:
|
|
{&:ssl_verify_hostname.verify_fun/3,
|
|
[check_hostname: Pleroma.HTTP.AdapterHelper.format_host(host)]}
|
|
]
|
|
|
|
tls_opts =
|
|
if Keyword.keyword?(opts[:tls_opts]) do
|
|
Keyword.merge(tls_opts, opts[:tls_opts])
|
|
else
|
|
tls_opts
|
|
end
|
|
|
|
Map.put(opts, :tls_opts, tls_opts)
|
|
end
|
|
|
|
defp do_open(uri, %{proxy: {proxy_host, proxy_port}} = opts) do
|
|
connect_opts =
|
|
uri
|
|
|> destination_opts()
|
|
|> add_http2_opts(uri.scheme, Map.get(opts, :tls_opts, []))
|
|
|
|
with open_opts <- Map.delete(opts, :tls_opts),
|
|
{:ok, conn} <- Gun.open(proxy_host, proxy_port, open_opts),
|
|
{:ok, _} <- Gun.await_up(conn, opts[:await_up_timeout]),
|
|
stream <- Gun.connect(conn, connect_opts),
|
|
{:response, :fin, 200, _} <- Gun.await(conn, stream) do
|
|
{:ok, conn}
|
|
else
|
|
error ->
|
|
Logger.warn(
|
|
"Opening proxied connection to #{compose_uri_log(uri)} failed with error #{
|
|
inspect(error)
|
|
}"
|
|
)
|
|
|
|
error
|
|
end
|
|
end
|
|
|
|
defp do_open(uri, %{proxy: {proxy_type, proxy_host, proxy_port}} = opts) do
|
|
version =
|
|
proxy_type
|
|
|> to_string()
|
|
|> String.last()
|
|
|> case do
|
|
"4" -> 4
|
|
_ -> 5
|
|
end
|
|
|
|
socks_opts =
|
|
uri
|
|
|> destination_opts()
|
|
|> add_http2_opts(uri.scheme, Map.get(opts, :tls_opts, []))
|
|
|> Map.put(:version, version)
|
|
|
|
opts =
|
|
opts
|
|
|> Map.put(:protocols, [:socks])
|
|
|> Map.put(:socks_opts, socks_opts)
|
|
|
|
with {:ok, conn} <- Gun.open(proxy_host, proxy_port, opts),
|
|
{:ok, _} <- Gun.await_up(conn, opts[:await_up_timeout]) do
|
|
{:ok, conn}
|
|
else
|
|
error ->
|
|
Logger.warn(
|
|
"Opening socks proxied connection to #{compose_uri_log(uri)} failed with error #{
|
|
inspect(error)
|
|
}"
|
|
)
|
|
|
|
error
|
|
end
|
|
end
|
|
|
|
defp do_open(%URI{host: host, port: port} = uri, opts) do
|
|
host = Pleroma.HTTP.AdapterHelper.parse_host(host)
|
|
|
|
with {:ok, conn} <- Gun.open(host, port, opts),
|
|
{:ok, _} <- Gun.await_up(conn, opts[:await_up_timeout]) do
|
|
{:ok, conn}
|
|
else
|
|
error ->
|
|
Logger.warn(
|
|
"Opening connection to #{compose_uri_log(uri)} failed with error #{inspect(error)}"
|
|
)
|
|
|
|
error
|
|
end
|
|
end
|
|
|
|
defp destination_opts(%URI{host: host, port: port}) do
|
|
host = Pleroma.HTTP.AdapterHelper.parse_host(host)
|
|
%{host: host, port: port}
|
|
end
|
|
|
|
defp add_http2_opts(opts, "https", tls_opts) do
|
|
Map.merge(opts, %{protocols: [:http2], transport: :tls, tls_opts: tls_opts})
|
|
end
|
|
|
|
defp add_http2_opts(opts, _, _), do: opts
|
|
|
|
def compose_uri_log(%URI{scheme: scheme, host: host, path: path}) do
|
|
"#{scheme}://#{host}#{path}"
|
|
end
|
|
end
|