Galapagos Tech Blog

株式会社ガラパゴスのメンバーによる技術ブログです。

Phoenix.Channelで接続中のクライアント数を数える

御機嫌よう、ガラパゴスのおとめです。

今日は、PhoenixのChannel機能を触りつつ、いつもわたしたちを苦しめる「ある問題」に挑戦してみようと思います。

Phoenix.Channelのおさらい

さて、PhoenixのChannelについては既にいろいろな方が書かれていますが、簡単に言ってしまえば、WebSocketを使って複数のクライアントでリアルタイムに通信しましょう、ということになります。ちょっと簡易すぎるチャットなど実装してみましょう。公式サンプル通りのおさらいですのでサクッといきます。

まずはmix phoenix.newでプロジェクトを作成します。今回はchannel_sample_appというプロジェクトにしましたが、もちろんこの名前はお好みで。

$ mix phoenix.new channel_sample_app

デフォルトでweb/channels/user_socket.exが生成されますが、Channelはコメントアウトされていますので、コメントアウトを外します。

defmodule ChannelSampleApp.UserSocket do
  use Phoenix.Socket

  ## Channels
  channel "rooms:*", ChannelSampleApp.RoomChannel
  # ...

サーバ側の実装をしてしまいましょう。web/channels/user_socket.exのコメントを外してRoomChannelを使うようにしましたので、web/channels/room_channel.exに実装していきます。

defmodule ChannelSampleApp.RoomChannel do
  use Phoenix.Channel

  def join("room:chat", _message, socket) do
    {:ok, socket}
  end

  def handle_in("chat", %{"message" => message}, socket) do
    broadcast! socket, "chat", %{message: message}
    {:noreply, socket}
  end

  def handle_out("chat", payload, socket) do
    push socket, "chat", payload
    {:noreply, socket}
  end
end

デフォルトで生成されたweb/templates/page/index.html.eexを変更します。テキストボックスとメッセージを表示するブロックがあるだけの簡単なものです。

<div>
  <input class="form-channel" id="message" placeholder="なにしてはりますの?" type="text" />
</div>
<ul id="messages">
</ul>

web/channels/room_channel.exroom:chatというトピック名にしましたので、web/static/js/socket.jsをそれに合わせます。

// ...
let channel = socket.channel("room:chat", {})
channel.join()
  .receive("ok", resp => { console.log("Joined successfully", resp) })
  .receive("error", resp => { console.log("Unable to join", resp) })

export default socket

web/static/js/app.jsを実装します。テキストボックスでエンターキーが押されたら入力内容を送信して、受信したメッセージを追加していくだけの最低限な感じで。

//...
import socket from "./socket"
let channel = socket.channel("room:chat", {})
let message = document.getElementById("message")
let messages = document.getElementById("messages")
message.addEventListener("keypress", event => {
  if (event.keyCode === 13) {
    channel.push("chat", {message: message.value})
    message.value = ""
  }
})
channel.on("chat", payload => {
  let incoming = document.createElement("li");
  incoming.innerText = `[${Date()}] ${payload.message}`
  messages.appendChild(incoming)
})
channel.join()
  .receive("ok", resp => { console.log("Joined successfully", resp) })
  .receive("error", resp => { console.log("Unable to join", resp) })
export default socket

では起動してみます^1

$ mix phoenix.server

デフォルトでは4000番ポートで起動します。二つのブラウザからアクセスして、何か書いてみます。するとこのように別のブラウザにメッセージが表示されましたね?

f:id:glpgsinc:20160930192932p:plain

さて、これでごく簡単なチャットができました。実際にはユーザ管理ですとか、投稿内容の永続化ですとか、まだまだいろいろとやることはあるのですが、この記事ではそれらには触れず、別のことを見ていきます。

いま何人繋がっているの?

さて、プログラミングをされている皆様は、いつもいつもいつもいつも、「数を数える」という呪縛にとらわれているのではないかと思います。

その数、必要?

などと思うことも多々ありますが、今回紹介したようにチャット的な感じでChannelを使った場合、クライアント数のリアルタイム表示なんかは避けて通れないかもしれません^2。また数えるの……。

でも、接続数を数えるくらいならできそうな期待を胸に公式のドキュメントをざっと見ても、数を数える的なことは書いていないですね。

broadcastを追いかけてみる

ここに、Phoenixのソースコードがありんす。早速git cloneしてみましょう。

さて、先ほどのおさらいで、broadcast!/3をコールすると接続しているすべてのブラウザにメッセージが送られることがわかりました。ので、まずはここから見てみましょう。phoenix/lib/phoenix/channel.exに実装があります。

  def broadcast!(socket, event, message) do
    %{pubsub_server: pubsub_server, topic: topic} = assert_joined!(socket)
    Server.broadcast! pubsub_server, topic, event, message
  end

pubsub_serverをくっつけて、Server.broadcast/4をコールしていますが、pubsub_serverてなあに?

公式のドキュメントがリンク切れ^3なので、ソースコードをつつきまわしてみると、phoenix/lib/phoenix/endpoint.exに次のような魅力的な行が見つかります。

      @pubsub_server var!(config)[:pubsub][:name] ||
        (if var!(config)[:pubsub][:adapter] do
          raise ArgumentError, "an adapter was given to :pubsub but no :name was defined, " <>
                               "please pass the :name option accordingly"
        end)

おや設定に書いてある? それでは先ほど作ったプロジェクトのconfig.exsを見てみましょう。するとこのように書いてありますね(自動生成されたconfigです)。

  pubsub: [name: ChannelSampleApp.PubSub,
           adapter: Phoenix.PubSub.PG2]

pubsub_serverの正体がわかってきたところで、phoenix/lib/phoenix/channel/server.exを見てみましょう。するとこのように、PubSub.boradcast!/3をコールしていることがわかります。

  def broadcast!(pubsub_server, topic, event, payload)
      when is_binary(topic) and is_binary(event) and is_map(payload) do
    PubSub.broadcast! pubsub_server, topic, %Broadcast{
      topic: topic,
      event: event,
      payload: payload
    }
  end

Phoenixソースコードを見渡してもPubSubが見つかりませんか? その答えはmix.exsにあります。

  defp deps do
    [#...
     {:phoenix_pubsub, "~> 1.0"},
     #...]
  end

外部に切り出されていることがわかりました。こちらもgit cloneしてphoenix_pubsub/lib/phoenix/pubsub.exを追いかけていくと、ついにこのような関数に到達すると思います。

  def broadcast(server, topic, message) when is_atom(server) or is_tuple(server),
    do: call(server, :broadcast, [:none, topic, message])

これで、pubsub_serverbroadcastが目的地だとわかりました。先ほどpubsub_serverPhoenix.PubSub.PG2だということも突き止めましたので、おもむろにphoenix_pubsub/lib/phoenix/pubsub/pg2_server.exを見てみます。

  def broadcast(fastlane, server_name, pool_size, from_pid, topic, msg) do
    server_name
    |> get_members()
    |> do_broadcast(fastlane, server_name, pool_size, from_pid, topic, msg)
  end
  #...
  defp do_broadcast(pids, fastlane, server_name, pool_size, from_pid, topic, msg)
    when is_list(pids) do
    local_node = Phoenix.PubSub.node_name(server_name)

    Enum.each(pids, fn
      pid when is_pid(pid) and node(pid) == node() ->
        Local.broadcast(fastlane, server_name, pool_size, from_pid, topic, msg)
      {^server_name, node_name} when node_name == local_node ->
        Local.broadcast(fastlane, server_name, pool_size, from_pid, topic, msg)
      pid_or_tuple ->
        send(pid_or_tuple, {:forward_to_local, fastlane, from_pid, topic, msg})
    end)
    :ok
  end

ついに問題の核心部分に到達しつつあるように思えますね? ええと、なぜChannelのソースコードを追いかけていたのかというと、Channelに接続しているユーザの数を知りたいからでした。:pg2.get_members/1の結果をLocal.broadcast/6に渡しています。でも、ここに出てくるfastlaneとかserver_nameてなあに? どこから来るの?

ぐるりとUターンして出どころと正体を突き止めましょう。すると、先ほどあえて触れなかったcall(server, :broadcast, [:none, topic, message])に行き着きます。引数が幾つかなくなってそうな気がしますね? 実はこの引数の3番目のリストは、broadcast/6の4〜6番目の引数です。最初の三つはどこからくるのかと言いますと、phoenix_pubsub/lib/phoenix/pubsub/pg2.exにヒントがあります。見てみましょう。

    dispatch_rules = [{:broadcast, Phoenix.PubSub.PG2Server, [opts[:fastlane], server, pool_size]},
                      {:direct_broadcast, Phoenix.PubSub.PG2Server, [opts[:fastlane], server, pool_size]},
                      {:node_name, __MODULE__, [node_name]}]

:broadcastがきた時にどうするかが書いてありますね。そしてここにある三番目のリストがstart_link/2の最初の引数に当たります。じゃあstart_link/2はどこから来るのかというと、PubSubが起動した時になります。ちょっとphoenix_pubsub/lib/phoenix/pubsub/supervisor.exを見てみましょう。

  def start(_type, _args) do
    children = if pubsub = Application.get_env(:phoenix_pubsub, :pubsub) do
      [supervisor(Phoenix.PubSub.PG2, pubsub)]
    else
      []
    end
    opts = [strategy: :one_for_one]
    Supervisor.start_link(children, opts)
  end

出所がわかったところで、broadcast/6に戻ります。Local.broadcast/6を追いかけていくと……

  defp do_broadcast(fastlane, pubsub_server, shard, from, topic, msg) do
    pubsub_server
    |> subscribers_with_fastlanes(topic, shard)
    |> fastlane.fastlane(from, msg) # TODO: Test this contract
  end

  def subscribers_with_fastlanes(pubsub_server, topic, shard) when is_atom(pubsub_server) do
    try do
      shard
      |> local_for_shard(pubsub_server)
      |> :ets.lookup_element(topic, 2)
    catch
      :error, :badarg -> []
    end
  end

:ets.lookup_element/3が鍵を握っていそうです。このパラメタが何になるかというと、最初の引数はlocal_for_shared/2の戻りで、ここではpubsub_severをキーにしてローカルサーバーを取得しています。pubsub_serverは結局設定から読んでいることが先ほど分かりましたので、もう一度アプリケーションのconfig.exsを見てみましょう。

  pubsub: [name: ChannelSampleApp.PubSub,
           adapter: Phoenix.PubSub.PG2]

なあんだ、ていう感じですね。ChannelSampleApp.PubSubと書いてあります。二番目はトピック名ですので、web/channels/room_channel.exに書いたroom:chatになります。

確かめてみる

Elixirではモジュールでreqire IEx;してIEx.pryすることでデバッグコンソールに落とせますので、まずweb/channels/room_channel.exに書きます。

require IEx;
defmodule ChannelSampleApp.RoomChannel do
  # ...
  def handle_in("chat", %{"message" => message}, socket) do
    IEx.pry
    # ...

また、iexで起動する必要があります。

$ iex -S mix phoenix.server

handle_in/3にIExを仕込んだので、ブラウザから何か書いてみましょう。するとiexが起動します。:etsを触ってみましょう。

pry(1)> :ets.lookup_element(ChannelSampleApp.PubSub, 0, 2)
{ChannelSampleApp.PubSub.Local0, ChannelSampleApp.PubSub.GC0}

ローカルサーバはChannelSampleApp.PubSub.Local0だということが分かりました。これをもとにもう一度lookup_elementしてみると……

pry(2)> :ets.lookup_element(ChannelSampleApp.PubSub.Local0, "room:chat", 2)
[{#PID<0.360.0>, {#PID<0.357.0>, Phoenix.Transports.WebSocketSerializer, []}},
 {#PID<0.406.0>, {#PID<0.403.0>, Phoenix.Transports.WebSocketSerializer, []}}]

何やらリストが得られました。respawnしてデバッグコンソールから抜けて、ブラウザをもう一つ増やしてみると……

Interactive Elixir (1.3.3) - press Ctrl+C to exit (type h() ENTER for help)
pry(1)> :ets.lookup_element(ChannelSampleApp.PubSub.Local0, "room:chat", 2)
[{#PID<0.360.0>, {#PID<0.357.0>, Phoenix.Transports.WebSocketSerializer, []}},
 {#PID<0.406.0>, {#PID<0.403.0>, Phoenix.Transports.WebSocketSerializer, []}},
 {#PID<0.425.0>, {#PID<0.422.0>, Phoenix.Transports.WebSocketSerializer, []}}]

リストが増えましたね。では、この数を数えればいいのじゃないかしらん?

実装してみる

web/channels/room_channel.exに実装してみましょう。今回は簡単に、メッセージの末尾に数えた数を入れてみます。

  def handle_in("chat", %{"message" => message}, socket) do
    broadcast! socket, "chat", %{message: message <> "(#{length :ets.lookup_element(ChannelSampleApp.PubSub.Local0, "room:chat", 2)})"}
    {:noreply, socket}
  end

ブラウザからアクセスして何か書いてみましょう。

f:id:glpgsinc:20160930193228p:plain

見栄えは良くありませんが、無事に接続中クライアント数(らしきもの)が出てきました。

さいごに

今回は、Phoenix.Channelのソースコードを読んでみました。一緒にコードを読まれた皆様は、そのシンプルにすっきりしたコードに驚かれたことかと思います。

でもでも、今回の数を数える処理は、フレームワークの実装が変わるとアウトですね。また、PubSubアダプタにPG2以外を指定した時も、もちろん動きません。実際にはRedisか何かにsocket.channel_pidか何かを入れて数を数える方が良いでしょう。

さて、ガラパゴスでは、一緒にいろいろなコードを読む仲間を大絶賛超募集しています。皆様の応募をお待ちしています。

RECRUIT | 株式会社ガラパゴス iPhone/iPad/Androidのスマートフォンアプリ開発

では、御機嫌よう。