akkoma/lib/pleroma/mfa.ex

156 lines
3.9 KiB
Elixir
Raw Permalink Normal View History

# Pleroma: A lightweight social networking server
# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.MFA do
@moduledoc """
The MFA context.
"""
alias Pleroma.User
alias Pleroma.MFA.BackupCodes
alias Pleroma.MFA.Changeset
alias Pleroma.MFA.Settings
alias Pleroma.MFA.TOTP
@doc """
Returns MFA methods the user has enabled.
## Examples
iex> Pleroma.MFA.supported_method(User)
"totp, u2f"
"""
@spec supported_methods(User.t()) :: String.t()
def supported_methods(user) do
settings = fetch_settings(user)
Settings.mfa_methods()
|> Enum.reduce([], fn m, acc ->
if method_enabled?(m, settings) do
acc ++ [m]
else
acc
end
end)
|> Enum.join(",")
end
@doc "Checks that user enabled MFA"
def require?(user) do
fetch_settings(user).enabled
end
@doc """
Display MFA settings of user
"""
def mfa_settings(user) do
settings = fetch_settings(user)
Settings.mfa_methods()
|> Enum.map(fn m -> [m, method_enabled?(m, settings)] end)
|> Enum.into(%{enabled: settings.enabled}, fn [a, b] -> {a, b} end)
end
@doc false
def fetch_settings(%User{} = user) do
user.multi_factor_authentication_settings || %Settings{}
end
@doc "clears backup codes"
def invalidate_backup_code(%User{} = user, hash_code) do
%{backup_codes: codes} = fetch_settings(user)
user
|> Changeset.cast_backup_codes(codes -- [hash_code])
|> User.update_and_set_cache()
end
@doc "generates backup codes"
@spec generate_backup_codes(User.t()) :: {:ok, list(binary)} | {:error, String.t()}
def generate_backup_codes(%User{} = user) do
with codes <- BackupCodes.generate(),
2021-01-14 07:06:16 -07:00
hashed_codes <- Enum.map(codes, &Pleroma.Password.Pbkdf2.hash_pwd_salt/1),
changeset <- Changeset.cast_backup_codes(user, hashed_codes),
{:ok, _} <- User.update_and_set_cache(changeset) do
{:ok, codes}
else
{:error, msg} ->
%{error: msg}
end
end
@doc """
Generates secret key and set delivery_type to 'app' for TOTP method.
"""
@spec setup_totp(User.t()) :: {:ok, User.t()} | {:error, Ecto.Changeset.t()}
def setup_totp(user) do
user
|> Changeset.setup_totp(%{secret: TOTP.generate_secret(), delivery_type: "app"})
|> User.update_and_set_cache()
end
@doc """
Confirms the TOTP method for user.
`attrs`:
`password` - current user password
`code` - TOTP token
"""
@spec confirm_totp(User.t(), map()) :: {:ok, User.t()} | {:error, Ecto.Changeset.t() | atom()}
def confirm_totp(%User{} = user, attrs) do
with settings <- user.multi_factor_authentication_settings.totp,
{:ok, :pass} <- TOTP.validate_token(settings.secret, attrs["code"]) do
user
|> Changeset.confirm_totp()
|> User.update_and_set_cache()
end
end
@doc """
Disables the TOTP method for user.
`attrs`:
`password` - current user password
"""
@spec disable_totp(User.t()) :: {:ok, User.t()} | {:error, Ecto.Changeset.t()}
def disable_totp(%User{} = user) do
user
|> Changeset.disable_totp()
|> Changeset.disable()
|> User.update_and_set_cache()
end
@doc """
Force disables all MFA methods for user.
"""
@spec disable(User.t()) :: {:ok, User.t()} | {:error, Ecto.Changeset.t()}
def disable(%User{} = user) do
user
|> Changeset.disable_totp()
|> Changeset.disable(true)
|> User.update_and_set_cache()
end
@doc """
Checks if the user has MFA method enabled.
"""
def method_enabled?(method, settings) do
with {:ok, %{confirmed: true} = _} <- Map.fetch(settings, method) do
true
else
_ -> false
end
end
@doc """
Checks if the user has enabled at least one MFA method.
"""
def enabled?(settings) do
Settings.mfa_methods()
|> Enum.map(fn m -> method_enabled?(m, settings) end)
|> Enum.any?()
end
end