Kubernetes is the de-facto deployment mechanism that handles deployments, scaling, and management of containerized applications. However, this is true for stateless services. Although we have stateful sets and volumes that store data in a persistent pod, discover new pods that use the stateful-set, and manage state transfer to the node, we, being fond of Elixir, have built our game servers in Elixir. We will leverage the power of Erlang clusters, distributed systems, and awesome Elixir libraries to handle the heavy lifting.

Things we need for this architecture:

  1. A gameserver that needs to be deployed in Kubernetes
  2. A clustering mechanism that manages different Erlang clusters
  3. A distributed supervisor that manages process restarts across nodes
  4. A distributed agent that manages state transfer across nodes
  5. A distributed registry that registers processes across nodes

1/5 Gameserver in Phoenix

Anatomy of a gameserver

The way we implement our game servers is:

  • A Phoenix application supervisor starts a DynamicSupervisor for a game and Phoenix pubsub system
  • A game connects to the gameserver via a Phoenix client-server connection through a websocket
  • The Phoenix channel is notified about the client connection.
  • The Phoenix channel notifies DynamicSupervisor about the connection
  • DynamicSupervisor then spins off a GenServer that manages states for that game.

Thus, in a long-running gameserver, we will have a Dynamic Supervisor, a bunch of GenServers maintaining game states of the different games running, and Phoenix channels for communicating with game clients.

2/5 Erlang Clustering

Erlang has a built-in clustering mechanism. We can spawn new nodes in the same network with a unique name and connect each other by exchanging names.

iex --name node1@127.0.0.1 --cookie asdf -S mix
iex --name node2@127.0.0.1 --cookie asdf -S mix
iex --name node3@127.0.0.1 --cookie asdf -S mix

In this example each node has a unique name, and they all share the same cookie. Now we can connect these nodes by running the following code:

iex(node3@127.0.0.1)2> Node.connect :"node1@127.0.0.1"
true
iex(node3@127.0.0.1)3> Node.list
[:"node1@127.0.0.1"]

iex(node2@127.0.0.1)1> Node.connect :"node1@127.0.0.1"
true
iex(node2@127.0.0.1)2> Node.list
[:"node1@127.0.0.1", :"node3@127.0.0.1"]

We use libcluster for automatically forming clusters of Erlang nodes, with either static or dynamic node membership.

3/5 Distributed Supervisor

In OTP, supervisor is a process that supervises other processes. Supervisors are used to build a hierarchical process structure called a supervision tree. Supervision trees provide fault-tolerance and encapsulate how our applications start, restarts and shutdown.

In our case, we have gameserver nodes running in different nodes. In a single node, supervision tree can look after process restarts, but not if the process resided in a different node. For this we need a supervisor that exists in all the nodes across cluster, a Distributed Supervisor.

We use Horde’s distributed supervisor mechanism for distributing supervisor across nodes in the cluster. Horde uses CRDT to sync data between nodes. This means the supervisors look at different data but they achieve eventual-consistency.

4/5 Distributed Agent

The Distributed Agent is used to do state handoffs between a node that is going down and a new node that is coming up. K8s does a graceful shutdown on killing a node, where it will send a SIGTERM signal to the node. OTP can catch the termination message and perform an orderly shutdown of its applications. This provides an opportunity to do some cleanup. OTP takes the state of the node going down and stashes them in the Distributed Agent. In our case, the Distributed Agent is a CRDT. Since it is a CRDT, the data gets propagated to other nodes. The distributed supervisor in other nodes revives the processes, and the new processes read data from CRDT and populate the state.

5/5 Distributed Registry

We need the process registered in the cluster. When a process comes up in another node in the cluster, there is no way of knowing which process this refers to using a local registry. For this, we need a registry that is present across nodes and syncs process addresses in the cluster. We use Horde’s Registry module for a distributed registry. This uses delta-CRDT for keeping states and notifies processes when it lost a naming conflict.

Stitching it together

Now we have all the pieces we need to implement a stateful game server and deploy it to Kubernetes and let OTP, libcluster, and Horde do the heavy lifting of managing node discovery, state management, and state transfer for us

Libcluster config

config :libcluster,
  topologies: [
    chef_empire: [
      strategy: Cluster.Strategy.Kubernetes,
      config: [
        mode: :dns,
        kubernetes_node_basename: "gameserver",
        kubernetes_selector: "app=gameserverselector",
        kubernetes_namespace: "gamenamespace",
        polling_interval: 10_000
      ]
    ]
  ]

This clustering strategy works by fetching information about endpoints or pods, which are filtered by the given Kubernetes namespace and label.

Application Processes

Processes that needs to be started by application supervisor.

children = [
      ChefEmpire.Repo,
      ChefEmpireWeb.Telemetry,
      {Phoenix.PubSub, name: ChefEmpire.PubSub},
      ChefEmpireWeb.Endpoint,

      # cluster supervisor for libcluster
      {Cluster.Supervisor, [topologies, [name: ChefEmpire.ClusterSupervisor]]},
      
      # horde registry
      {Horde.Registry, keys: :unique, name: NodeRegistry},
      
      # horde dynamic supervisor
      {Horde.DynamicSupervisor,
       name: NodeSupervisor, strategy: :one_for_one, shutdown: 5_000},
      
      # genserver for observer nodes in cluster
      {NodeListener, []},
      
      # genserver for state handoffs
      {StateHandoff, []},
    ]

We want the Horde.Registry to start before Horde.DynamicSupervisor so that the dynamic supervisor can keep track of processes that are spread across the cluster using Horde.Registry.

Horde.Registry

We use a module-based registries to enable dynamic runtime configuration of Horde.Registry.

defmodule NodeRegistry do
  @moduledoc """
  Horde registry for registering processes across cluster
  """
  use Horde.Registry
  require Logger

  def start_link do
    Horde.Registry.start_link(__MODULE__, keys: :unique, name: __MODULE__)
  end

  def init(opts) do
    members()
    |> Keyword.merge(opts)
    |> Horde.Registry.init()
  end

  def via_tuple(name) do
    {:via, Horde.Registry, {NodeRegistry, name}}
  end

  defp members do
    [Node.self() | Node.list()]
    |> Enum.map(&{__MODULE__, &1})
  end
end

Here, on Horde.Registry init, we pass the current members of the cluster. Horde.Registry implements a distributed Registry backed by a δ-CRDT (provided by DeltaCrdt). This CRDT is used for both tracking membership of the cluster and implementing the registry functionality itself. Local changes to the registry will automatically be synced to other nodes in the cluster.

Horde.DynamicSupervisor

Similar to Registry, we use a module based dynamic supervisor for Horde.DynamicSupervisor too.

defmodule NodeSupervisor do
  @moduledoc """
  A horde dynamic supervisor
  """
  use Horde.DynamicSupervisor
  require Logger

  def start_link(_) do
    Horde.DynamicSupervisor.start_link(__MODULE__, strategy: :one_for_one, name: __MODULE__)
  end

  @impl true
  def init(opts) do
    members()
    |> Keyword.merge(opts)
    |> Horde.DynamicSupervisor.init()
  end

  def start_child(child_spec) do
    Horde.DynamicSupervisor.start_child(__MODULE__, child_spec)
  end

  defp members do
    [Node.self() | Node.list()]
    |> tap(&Logger.info(inspect(&1)))
    |> Enum.map(&{__MODULE__, &1})
  end
end

State Handoff

The state handoff agent using DeltaCRDT and OTP. This process is the data layer that sends data across nodes that syncs up eventually. Also, whenever a new node comes up, it will fetch data from this distributed agent (GenServer).

defmodule StateHandoff do
  @moduledoc """
  Module that transfers state amoung the cluster processes.
  """
  use GenServer

  def start_link(_opts) do
    GenServer.start_link(__MODULE__, [], name: __MODULE__)
  end

  def join(node) do
    GenServer.call(__MODULE__, {:add_node, {__MODULE__, node}})
  end

  def handoff(state_id, state_details) do
    GenServer.call(__MODULE__, {:handleoff, state_id, state_details})
  end

  def get_state_details(state_id) do
    GenServer.call(__MODULE__, {:get_state_details, state_id})
  end

  @impl true
  def init(_opts) do
    ## State handoff initiated
    {:ok, crdt_pid} = DeltaCrdt.start_link(DeltaCrdt.AWLWWMap, sync_interval: 5)

    {:ok, crdt_pid}
  end

  @impl true
  def handle_call({:add_node, node_module}, _from, crdt_pid) do
    other_crdt_pid = GenServer.call(node_module, {:ack_add_node, crdt_pid})
    DeltaCrdt.set_neighbours(crdt_pid, [other_crdt_pid])
    {:reply, :ok, crdt_pid}
  end

  @impl true
  def handle_call({:ack_add_node, other_crdt_pid}, _from, crdt_pid) do
    DeltaCrdt.set_neighbours(crdt_pid, [other_crdt_pid])
    {:reply, crdt_pid, crdt_pid}
  end

  @impl true
  def handle_call({:handleoff, state_id, state_details}, _from, crdt_pid) do
    DeltaCrdt.put(crdt_pid, state_id, state_details)
    {:reply, :ok, crdt_pid}
  end

  @impl true
  def handle_call({:get_state_details, state_id}, _from, crdt_pid) do
    details = DeltaCrdt.to_map(crdt_pid) |> Map.get(state_id)
    {:reply, details, crdt_pid}
  end
end

NodeListener

We also need a separate process that will listen for {:nodeup, node} and {:nodedown, node} events and adjust the members of the Horde cluster accordingly.

defmodule NodeListener do
  use GenServer

  def start_link(_), do: GenServer.start_link(__MODULE__, [])

  def init(_) do
    :net_kernel.monitor_nodes(true, node_type: :visible)
    {:ok, nil}
  end

  def handle_info({:nodeup, _node, _node_type}, state) do
    set_members(MyHordeRegistry)
    set_members(MyHordeSupervisor)
    join_state_handoff()
    {:noreply, state}
  end

  def handle_info({:nodedown, _node, _node_type}, state) do
    set_members(MyHordeRegistry)
    set_members(MyHordeSupervisor)
    {:noreply, state}
  end

  defp set_members(name) do
    members =
      [Node.self() | Node.list()]
      |> Enum.map(fn node -> {name, node} end)

    :ok = Horde.Cluster.set_members(name, members)
  end

  defp join_state_handoff() do
    Node.list()
    |> Enum.map(&StateHandoff.join/1)
  end
end

The end game

Now with all the mechanisms in place, we have a distributed dynamic supervisor that keeps track of processes across the cluster that can be found via a distributed registry. We can know when a node comes up and when a node goes down, and we have a distributed agent to store the data.

All we need is a process to start under the supervisor, register itself with the registry, and read and write data to the agent.

Starting a process

The gameserver we made is a distributed restaurant. We have a bunch of generator processes that generate products when they have a certain raw material. For example, a coffee machine genserver that needs coffee beans.

Here we are starting a coffee machine genserver under the NodeSupervisor.

defmodule ChefEmpire do

  def install_coffee_machine(machine_id) do
    child_spec = %{
      id: ChefEmpire.Generator.CoffeeMachine,
      start:
        {ChefEmpire.Generator.CoffeeMachine, :start_link,
         [[name: machine_id, details: %{beans: 0}]]}
    }

    NodeSupervisor.start_child(child_spec)
  end

We will name the process using the NodeRegistry module we wrote earlier that uses HordeRegistry. On init, we use handle continue so that our process starts up in a non-blocking asynchronous manner and fetch the data from CRDT in the handle_continue block.

defmodule ChefEmpire.Generator do

  use GenServer

  require Logger

  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: NodeRegistry.via_tuple(opts[:name]))
  end

  @impl true
  def init(opts) do
    ## Process traps the exit, so that it gets to know when supervisor shuts it down.
    Process.flag(:trap_exit, true)

    state =
      ChefEmpire.generator_module(opts[:type])
      |> apply(:init_state, [opts[:type], opts[:name], opts[:serial]])

    {:ok, state, {:continue, :handle_state}}
  end

  @impl true
  def handle_continue(:handle_state, %{name: name} = state) do
    state =
      ## Get state data from handoff agent
      NodeManager.StateHandoff.get_state_details(name)
      |> get_state_from_store(state)
      ## Set GenServer data
      |> tap(fn state -> state |> generate() end)

    {:noreply, state}
  end

  @impl true
  def handle_call("stock", _from, state) do
    {:reply, state, state}
  end

  @impl true
  def handle_info("generate", state) do
    state =
      state
      ## In game logic to update GenServer state data
      |> ChefEmpire.GeneratorProtocol.generate()
      |> tap(fn state -> state |> generate() end)

    ## Store new data in handoff agent
    NodeManager.StateHandoff.handoff(state.name, state)
    {:noreply, state}
  end

  @impl true
  def terminate(reason, state) do
    ## When node goes down, Supervisor does a graceful shutdown of processes and calls the terminate function.
    ## Child process in turn saves the data in the distributed state handoff agent.
    NodeManager.StateHandoff.handoff(state.name, state)
    :ok
  end

  defp generate(state) do
    generation_interval = ChefEmpire.GeneratorProtocol.generation_interval(state)
    Process.send_after(self(), "generate", generation_interval)
  end

  defp get_state_from_store(nil, old_state), do: old_state
  defp get_state_from_store(state, _old_state), do: state
end

And now, we are all set to deploy the application to Kubernetes and let OTP, CRDT, and Horde work their magic. The steps to address the shortcomings of stateful deployment might seem complex at first, but understanding the underlying concepts makes it easier and allows us to make modifications to better suit our requirements. For instance, instead of Delta CRDT, we can opt for Redis, Postgres, or distributed ETS. Alternatives to Horde’s Registry include Swarm and registry modules from lasp-lang.