御機嫌よう、ガラパゴスのおとめです。
今日は、PhoenixのChannel機能を触りつつ、いつもわたしたちを苦しめる「ある問題」に挑戦してみようと思います。
Phoenix.Channelのおさらい
さて、PhoenixのChannelについては既にいろいろな方が書かれていますが、簡単に言ってしまえば、WebSocketを使って複数のクライアントでリアルタイムに通信しましょう、ということになります。ちょっと簡易すぎるチャットなど実装してみましょう。公式サンプル通りのおさらいですのでサクッといきます。
まずはmix phoenix.new
でプロジェクトを作成します。今回はchannel_sample_app
というプロジェクトにしましたが、もちろんこの名前はお好みで。
$ mix phoenix.new channel_sample_app
デフォルトでweb/channels/user_socket.ex
が生成されますが、Channelはコメントアウトされていますので、コメントアウトを外します。
defmodule ChannelSampleApp.UserSocket do use Phoenix.Socket ## Channels channel "rooms:*", ChannelSampleApp.RoomChannel # ...
サーバ側の実装をしてしまいましょう。web/channels/user_socket.ex
のコメントを外してRoomChannel
を使うようにしましたので、web/channels/room_channel.ex
に実装していきます。
defmodule ChannelSampleApp.RoomChannel do use Phoenix.Channel def join("room:chat", _message, socket) do {:ok, socket} end def handle_in("chat", %{"message" => message}, socket) do broadcast! socket, "chat", %{message: message} {:noreply, socket} end def handle_out("chat", payload, socket) do push socket, "chat", payload {:noreply, socket} end end
デフォルトで生成されたweb/templates/page/index.html.eex
を変更します。テキストボックスとメッセージを表示するブロックがあるだけの簡単なものです。
<div> <input class="form-channel" id="message" placeholder="なにしてはりますの?" type="text" /> </div> <ul id="messages"> </ul>
web/channels/room_channel.ex
でroom:chat
というトピック名にしましたので、web/static/js/socket.js
をそれに合わせます。
// ... let channel = socket.channel("room:chat", {}) channel.join() .receive("ok", resp => { console.log("Joined successfully", resp) }) .receive("error", resp => { console.log("Unable to join", resp) }) export default socket
web/static/js/app.js
を実装します。テキストボックスでエンターキーが押されたら入力内容を送信して、受信したメッセージを追加していくだけの最低限な感じで。
//... import socket from "./socket" let channel = socket.channel("room:chat", {}) let message = document.getElementById("message") let messages = document.getElementById("messages") message.addEventListener("keypress", event => { if (event.keyCode === 13) { channel.push("chat", {message: message.value}) message.value = "" } }) channel.on("chat", payload => { let incoming = document.createElement("li"); incoming.innerText = `[${Date()}] ${payload.message}` messages.appendChild(incoming) }) channel.join() .receive("ok", resp => { console.log("Joined successfully", resp) }) .receive("error", resp => { console.log("Unable to join", resp) }) export default socket
では起動してみます^1。
$ mix phoenix.server
デフォルトでは4000番ポートで起動します。二つのブラウザからアクセスして、何か書いてみます。するとこのように別のブラウザにメッセージが表示されましたね?
さて、これでごく簡単なチャットができました。実際にはユーザ管理ですとか、投稿内容の永続化ですとか、まだまだいろいろとやることはあるのですが、この記事ではそれらには触れず、別のことを見ていきます。
いま何人繋がっているの?
さて、プログラミングをされている皆様は、いつもいつもいつもいつも、「数を数える」という呪縛にとらわれているのではないかと思います。
その数、必要?
などと思うことも多々ありますが、今回紹介したようにチャット的な感じでChannelを使った場合、クライアント数のリアルタイム表示なんかは避けて通れないかもしれません^2。また数えるの……。
でも、接続数を数えるくらいならできそうな期待を胸に公式のドキュメントをざっと見ても、数を数える的なことは書いていないですね。
broadcastを追いかけてみる
ここに、Phoenixのソースコードがありんす。早速git clone
してみましょう。
さて、先ほどのおさらいで、broadcast!/3
をコールすると接続しているすべてのブラウザにメッセージが送られることがわかりました。ので、まずはここから見てみましょう。phoenix/lib/phoenix/channel.ex
に実装があります。
def broadcast!(socket, event, message) do %{pubsub_server: pubsub_server, topic: topic} = assert_joined!(socket) Server.broadcast! pubsub_server, topic, event, message end
pubsub_server
をくっつけて、Server.broadcast/4
をコールしていますが、pubsub_server
てなあに?
公式のドキュメントがリンク切れ^3なので、ソースコードをつつきまわしてみると、phoenix/lib/phoenix/endpoint.ex
に次のような魅力的な行が見つかります。
@pubsub_server var!(config)[:pubsub][:name] || (if var!(config)[:pubsub][:adapter] do raise ArgumentError, "an adapter was given to :pubsub but no :name was defined, " <> "please pass the :name option accordingly" end)
おや設定に書いてある? それでは先ほど作ったプロジェクトのconfig.exs
を見てみましょう。するとこのように書いてありますね(自動生成されたconfigです)。
pubsub: [name: ChannelSampleApp.PubSub, adapter: Phoenix.PubSub.PG2]
pubsub_server
の正体がわかってきたところで、phoenix/lib/phoenix/channel/server.ex
を見てみましょう。するとこのように、PubSub.boradcast!/3
をコールしていることがわかります。
def broadcast!(pubsub_server, topic, event, payload) when is_binary(topic) and is_binary(event) and is_map(payload) do PubSub.broadcast! pubsub_server, topic, %Broadcast{ topic: topic, event: event, payload: payload } end
Phoenixのソースコードを見渡してもPubSub
が見つかりませんか? その答えはmix.exs
にあります。
defp deps do [#... {:phoenix_pubsub, "~> 1.0"}, #...] end
外部に切り出されていることがわかりました。こちらもgit clone
して、phoenix_pubsub/lib/phoenix/pubsub.ex
を追いかけていくと、ついにこのような関数に到達すると思います。
def broadcast(server, topic, message) when is_atom(server) or is_tuple(server), do: call(server, :broadcast, [:none, topic, message])
これで、pubsub_server
のbroadcast
が目的地だとわかりました。先ほどpubsub_server
はPhoenix.PubSub.PG2
だということも突き止めましたので、おもむろにphoenix_pubsub/lib/phoenix/pubsub/pg2_server.ex
を見てみます。
def broadcast(fastlane, server_name, pool_size, from_pid, topic, msg) do server_name |> get_members() |> do_broadcast(fastlane, server_name, pool_size, from_pid, topic, msg) end #... defp do_broadcast(pids, fastlane, server_name, pool_size, from_pid, topic, msg) when is_list(pids) do local_node = Phoenix.PubSub.node_name(server_name) Enum.each(pids, fn pid when is_pid(pid) and node(pid) == node() -> Local.broadcast(fastlane, server_name, pool_size, from_pid, topic, msg) {^server_name, node_name} when node_name == local_node -> Local.broadcast(fastlane, server_name, pool_size, from_pid, topic, msg) pid_or_tuple -> send(pid_or_tuple, {:forward_to_local, fastlane, from_pid, topic, msg}) end) :ok end
ついに問題の核心部分に到達しつつあるように思えますね? ええと、なぜChannelのソースコードを追いかけていたのかというと、Channelに接続しているユーザの数を知りたいからでした。:pg2.get_members/1
の結果をLocal.broadcast/6
に渡しています。でも、ここに出てくるfastlane
とかserver_name
てなあに? どこから来るの?
ぐるりとUターンして出どころと正体を突き止めましょう。すると、先ほどあえて触れなかったcall(server, :broadcast, [:none, topic, message])
に行き着きます。引数が幾つかなくなってそうな気がしますね? 実はこの引数の3番目のリストは、broadcast/6
の4〜6番目の引数です。最初の三つはどこからくるのかと言いますと、phoenix_pubsub/lib/phoenix/pubsub/pg2.ex
にヒントがあります。見てみましょう。
dispatch_rules = [{:broadcast, Phoenix.PubSub.PG2Server, [opts[:fastlane], server, pool_size]}, {:direct_broadcast, Phoenix.PubSub.PG2Server, [opts[:fastlane], server, pool_size]}, {:node_name, __MODULE__, [node_name]}]
:broadcast
がきた時にどうするかが書いてありますね。そしてここにある三番目のリストがstart_link/2
の最初の引数に当たります。じゃあstart_link/2
はどこから来るのかというと、PubSubが起動した時になります。ちょっとphoenix_pubsub/lib/phoenix/pubsub/supervisor.ex
を見てみましょう。
def start(_type, _args) do children = if pubsub = Application.get_env(:phoenix_pubsub, :pubsub) do [supervisor(Phoenix.PubSub.PG2, pubsub)] else [] end opts = [strategy: :one_for_one] Supervisor.start_link(children, opts) end
出所がわかったところで、broadcast/6
に戻ります。Local.broadcast/6
を追いかけていくと……
defp do_broadcast(fastlane, pubsub_server, shard, from, topic, msg) do pubsub_server |> subscribers_with_fastlanes(topic, shard) |> fastlane.fastlane(from, msg) # TODO: Test this contract end def subscribers_with_fastlanes(pubsub_server, topic, shard) when is_atom(pubsub_server) do try do shard |> local_for_shard(pubsub_server) |> :ets.lookup_element(topic, 2) catch :error, :badarg -> [] end end
:ets.lookup_element/3
が鍵を握っていそうです。このパラメタが何になるかというと、最初の引数はlocal_for_shared/2
の戻りで、ここではpubsub_sever
をキーにしてローカルサーバーを取得しています。pubsub_server
は結局設定から読んでいることが先ほど分かりましたので、もう一度アプリケーションのconfig.exs
を見てみましょう。
pubsub: [name: ChannelSampleApp.PubSub, adapter: Phoenix.PubSub.PG2]
なあんだ、ていう感じですね。ChannelSampleApp.PubSub
と書いてあります。二番目はトピック名ですので、web/channels/room_channel.ex
に書いたroom:chat
になります。
確かめてみる
Elixirではモジュールでreqire IEx;
してIEx.pry
することでデバッグコンソールに落とせますので、まずweb/channels/room_channel.ex
に書きます。
require IEx; defmodule ChannelSampleApp.RoomChannel do # ... def handle_in("chat", %{"message" => message}, socket) do IEx.pry # ...
また、iex
で起動する必要があります。
$ iex -S mix phoenix.server
handle_in/3
にIExを仕込んだので、ブラウザから何か書いてみましょう。するとiex
が起動します。:ets
を触ってみましょう。
pry(1)> :ets.lookup_element(ChannelSampleApp.PubSub, 0, 2) {ChannelSampleApp.PubSub.Local0, ChannelSampleApp.PubSub.GC0}
ローカルサーバはChannelSampleApp.PubSub.Local0
だということが分かりました。これをもとにもう一度lookup_element
してみると……
pry(2)> :ets.lookup_element(ChannelSampleApp.PubSub.Local0, "room:chat", 2) [{#PID<0.360.0>, {#PID<0.357.0>, Phoenix.Transports.WebSocketSerializer, []}}, {#PID<0.406.0>, {#PID<0.403.0>, Phoenix.Transports.WebSocketSerializer, []}}]
何やらリストが得られました。respawn
してデバッグコンソールから抜けて、ブラウザをもう一つ増やしてみると……
Interactive Elixir (1.3.3) - press Ctrl+C to exit (type h() ENTER for help) pry(1)> :ets.lookup_element(ChannelSampleApp.PubSub.Local0, "room:chat", 2) [{#PID<0.360.0>, {#PID<0.357.0>, Phoenix.Transports.WebSocketSerializer, []}}, {#PID<0.406.0>, {#PID<0.403.0>, Phoenix.Transports.WebSocketSerializer, []}}, {#PID<0.425.0>, {#PID<0.422.0>, Phoenix.Transports.WebSocketSerializer, []}}]
リストが増えましたね。では、この数を数えればいいのじゃないかしらん?
実装してみる
web/channels/room_channel.ex
に実装してみましょう。今回は簡単に、メッセージの末尾に数えた数を入れてみます。
def handle_in("chat", %{"message" => message}, socket) do broadcast! socket, "chat", %{message: message <> "(#{length :ets.lookup_element(ChannelSampleApp.PubSub.Local0, "room:chat", 2)})"} {:noreply, socket} end
ブラウザからアクセスして何か書いてみましょう。
見栄えは良くありませんが、無事に接続中クライアント数(らしきもの)が出てきました。
さいごに
今回は、Phoenix.Channelのソースコードを読んでみました。一緒にコードを読まれた皆様は、そのシンプルにすっきりしたコードに驚かれたことかと思います。
でもでも、今回の数を数える処理は、フレームワークの実装が変わるとアウトですね。また、PubSub
アダプタにPG2
以外を指定した時も、もちろん動きません。実際にはRedisか何かにsocket.channel_pid
か何かを入れて数を数える方が良いでしょう。
さて、ガラパゴスでは、一緒にいろいろなコードを読む仲間を大絶賛超募集しています。皆様の応募をお待ちしています。
RECRUIT | 株式会社ガラパゴス iPhone/iPad/Androidのスマートフォンアプリ開発
では、御機嫌よう。