挑戦! Elixirによる並行・分散アプリケーションの作り方【第二言語としてのElixir】

「第二言語としてのElixir」、いよいよ後編では、処理を並行に扱う方法を紹介します。Elixirでは、なぜ並行処理が書きやすく、分散アプリケーションをシンプルに記述できるのでしょうか。

挑戦! Elixirによる並行・分散アプリケーションの作り方【第二言語としてのElixir】

プログラミング言語Elixirの大きな特徴は、並行処理が書きやすく、分散アプリケーションをシンプルに記述できることです。

その背景となる「ErlangのVMにおけるプロセスをベースにした考え方」と「Erlang/OTP」については、前回の記事で説明しました。さらに、Elixirのプロジェクト管理ツールであるmixについても解説しました。

いよいよ今回は、こういったElixirの基礎知識をふまえて、Elixirで処理を並行に扱う方法を紹介します。

Elixirで並行処理を書きやすいわけ

そもそも、CやJavaといった言語では、どうして処理を並行に実行するのが大変なのでしょうか?

並行に実行したい処理をこれらの言語で記述する場合、通常はOSのスレッドを複数使って、プログラムを同時に実行します。複数のスレッド間でデータのやりとりが必要になるため、スレッドはすべて同じメモリ空間を共有しています。

このため、プログラムが予想外の動作をしないような排他制御が必要になります。結果的に、プログラムが複雑になってしまい、エラーの原因になり得ます。

一方、Elixirでは、ErlangのVMにおけるプロセス(OSのプロセスではないので注意してください)を使って並行処理を記述します。Erlangのプロセスは、OSのスレッドと違ってメモリ空間を共有していません。プロセスの間でデータのやりとりが必要なときは、前回の記事で見たように、メッセージの受け渡しをするだけです。排他制御が必要ないので、シンプルに処理を記述できるのです。

1

Elixirの並行処理に挑戦

Erlangのプロセスを使った並行処理を、実際にElixirで書いてみましょう。

ここでは例として、次のような処理を、並行にたくさん実行させてみることにします。

1,000以下のランダムな数字を生成し、その数字をミリ秒単位の時間とみなして、その時間だけスリープする

この処理を、並行ではなく逐次処理でたくさん実行すると、実行したぶんだけ長時間にわたって処理がスリープします。

しかし、すべての処理を並行に実行できれば、1つの処理がスリープする時間はたかだか1,000ミリ秒なので、たくさんの処理を実行しても、スリープ時間は1秒程度に収まるはずです。

本当にそうなるか、実際にElixirでプログラミングして確かめてみましょう!

ランダムな時間スリープする処理を定義する

バックグラウンドで実行させるサンプルの関数として、次の2つを定義します。

関数 処理
random 1,000以下のランダムな数字を返す
sleep 引数で与えた時間(ミリ秒)スリープし、文字列を返す

これを、Workerモジュールとして、worker.exファイルに定義します。

defmodule Worker do
  # 1,000以下のランダムな数字を生成
  def random do
    :rand.uniform(1000)
  end

  # nミリ秒スリープして結果を文字列として返す
  def sleep(n) do
    IO.puts "sleep(#{inspect n}) started."
    :timer.sleep(n) # nミリ秒スリープする
    IO.puts "sleep(#{inspect n}) ended."
    "result-sleep(#{inspect n})"
  end
end

randomsleepそれぞれをIExで対話的に実行すると、結果は以下のようになります。

$ iex worker.ex
# ランダムな1,000以下の数字を生成
iex(1)> Worker.random
837
iex(2)> Worker.random
957
iex(3)> Worker.random
987

# 100、1,000、10,000ミリ秒スリープして結果を返す
iex(4)> Worker.sleep(100)
sleep(100) started.
sleep(100) ended.
"result-sleep(100)"

iex(5)> Worker.sleep(1000)
sleep(1000) started.
sleep(1000) ended.
"result-sleep(1000)"

iex(6)> Worker.sleep(10000)
sleep(10000) started.
sleep(10000) ended.
"result-sleep(10000)"
iex(7)>

ランダムな整数のリストを作る

これからやりたいことは、次のような処理です。

Worker.randomでランダムな整数を生成し、それを引数にしてWorker.sleepを実行する

しかも、これを逐次的に何回も実行したり、並行して何個も実行したりする必要があります。

for文のような繰り返しの仕組みを使いたくなるかもしれませんが、ここでは次のような手順で、この処理を何回も実行させることにします。

  1. スリープさせる時間を表す「ランダムな整数のリスト」を作成
  2. そのリストの各要素を引数にして、Worker.sleepを実行

逐次的に実行する場合と、並行に実行する場合とでは、2のやり方が変わります。そこで、まず両者に共通する1のほうから考えていきましょう。

Enum.map、無名関数、Range.new

Elixirには、リストのようなコレクションを扱うために、Enumという汎用モジュールが用意されています。このモジュールのEnum.mapという関数を使うと、「コレクションの要素を関数に適用した結果のコレクション」が得られます。

単純な例をIExで実行して、確認してみましょう。

iex> Enum.map([1, 2, 3], fn(x) -> x * 2 end)
[2, 4, 6]

上記の例では、第一引数である[1,2,3]に対し、各要素を2倍にする無名関数を適用しています。Elixirの無名関数は次のようにして定義できます。

fn(<引数>) -> <実行したい処理> end

なお、引数がない無名関数は、fn -> <実行したい処理> endのように引数を省略して定義できます。

定義した無名関数を実行するには、<関数>.(<引数>)とします。IEx上で、無名関数の定義と実行を試してみましょう。

# 無名関数をfに代入して呼び出し
iex(1)> f = fn(a) -> "arg is #{a}." end
#Function<6.118419387/1 in :erl_eval.expr/5>
iex(2)> f.("abc")
"arg is abc."
iex(3)> f.(123)
"arg is 123."

# 直接無名関数を呼び出し
iex(4)> (fn -> "result" end).()
"result"
iex(5)> (fn(a,b,c) -> "result is #{a},#{b},#{c}." end).("x","y","z")
"result is x,y,z."

無名関数の使い方がわかったところで、これをEnum.mapと組み合わせて「ランダムな整数のリスト」を作る話に戻りましょう。

ランダムな整数は、先ほど定義したWorker.randomを使えば1つ生成できます。ということは、Worker.randomを呼び出すだけの無名関数を使って、次のようにすれば「ランダムな整数が100個含まれたリスト」が作れそうです。

Enum.map(【長さが100のリスト】, fn(_a) -> Worker.random end)

Worker.random自体は引数をとりませんが、無名関数はEnum.mapと組み合わせるため、引数を1つとるものとして定義する必要があります。この引数には、_aというように、先頭に_が付く名前をつけました。これは、使用されない引数であることを示すための慣習です。

先頭が_でなくても動作上問題はありませんが、コンパイラが警告を出します。可読性をあげるためにも、Elixirのプログラムを書くときは、使用しない引数名の先頭を_としましょう。

さて、残るは【長さが100のリスト】を用意する方法だけです。これには、Range.new(<最初の要素>,<最後の要素>)というElixirの関数が使えます。

例えば、Range.new(1, 100)とすれば、1から100までの整数のリスト[1,2,3,4,5,...,100]が取得できますRange.new(<最初の要素>,<最後の要素>)には、<最初の要素> .. <最後の要素>という省略表記もあるので、1 .. 100としても同じです)

以上により、次のようにして「ランダムな整数が100個含まれたリスト」を作成できるようになりました!

iex(1)> Enum.map(Range.new(1, 100), fn(_a) -> Worker.random end)
[34, 747, 725, 197, 113, 262, 756, 104, 503, 606, 97, 44, 919, 664, 973, 997,
 479, 793, 410, 767, 682, 140, 357, 198, 40, 824, 594, 281, 459, 833, 333, 865,
 810, 331, 344, 686, 128, 358, 882, 56, 448, 968, 779, 867, 607, 25, 16, 440,
 161, 310, ...]
パイプ演算子で連鎖する処理をすっきり表現

先ほどの例で、Enum.mapRange.newとが入れ子になっていることに気がついたでしょうか?

Elixirには、このような関数の入れ子の呼び出しを簡潔に書くための記法が用意されています。それはパイプ演算子です。パイプ演算子|>を使うと、A(B(x))x |> B |> Aと書けます。

この例のように入れ子の深さが浅い場合、あまりパイプ演算子のメリットは感じられないかもしれません。しかし、たとえばA(B(C(D(E(F(G(x)))))))のように深い入れ子になると、パイプ演算子なしでは構造が把握しづらくなります。この例は、パイプ演算子を使って書くと、x |> G |> F |> E |> D |> C |> B |> Aという具合に処理の連鎖を直感的に理解しやすい構造で記述でき、非常に見通しがよくなります。

先ほどの例をパイプ演算子を使って記述し、さらにRange.newの省略表記を使えば、以下のように書けます。

iex(2)> 1 .. 100 |> Enum.map(fn(_a) -> Worker.random end)
[956, 316, 558, 208, 346, 433, 320, 974, 344, 899, 322, 129, 660, 873, 20, 5,
 201, 4, 56, 462, 603, 306, 63, 168, 568, 299, 12, 582, 788, 189, 527, 938, 295,
 108, 681, 502, 594, 377, 994, 390, 183, 860, 712, 57, 685, 516, 669, 392, 50,
 428, ...]
2

まずは普通に逐次実行

ランダムな1,000以下の整数100個からなるリストが取得できるようになったので、それぞれの要素にWorker.sleepを適用させて逐次実行してみましょう。

やはりEnum.mapを使い、Worker.sleepを引数にして、以下のように実行します。

iex(1)> 1 .. 100 |> Enum.map(fn(_a) -> Worker.random end) |> Enum.map(fn(t) -> Worker.sleep(t) end)
sleep(409) started.
sleep(409) ended.
sleep(803) started.
sleep(803) ended.
~略~
sleep(780) started.
sleep(780) ended.
["result-sleep(409)", "result-sleep(803)", "result-sleep(755)",
~略~
 "result-sleep(552)", "result-sleep(603)", "result-sleep(84)",
 "result-sleep(193)", "result-sleep(638)", ...]
iex(2)>

パイプ演算子を使ったことで、生成したリストを次のEnum.mapに渡していることが直感的に理解できるでしょう。

この逐次実行の処理を、Workerモジュール内の関数として、exex_seqという名前で定義しておきましょう。

defmodule Worker do
  def sleep(n) do
    # 略
  end

  def random do
    # 略
  end

  def exec_seq do
    IO.puts "===== 逐次実行開始 ====="
    result = 1 .. 100
             |> Enum.map(fn(_) -> random() end)
             |> Enum.map(fn(t) -> Worker.sleep(t) end)

    IO.puts "===== 逐次実行結果 ====="
    result
  end
end

逐次処理は、実行する処理(ここではWorker.sleepの回数が多ければ多いほど処理時間が長くなります。この例では実行回数が100回ですが、これを1,000回、10,000回、100,000回と増やせば、処理時間も10倍、100倍、1,000倍と線形に増加していきます。

Taskを使って並行実行

逐次実行では、回数に比例して処理時間が増加していくことを見ました。同じ数の処理でも、同時に処理を走らせることで、処理時間の短縮が見込めます。

バックグラウンドで処理を走らせたいとき、ElixirではTaskを利用します。

Task.async(<実行させたい関数>)とすれば、引数の関数を実行する独立したプロセスが起動し、そのプロセスのIDと参照(タスクディスクリプタといいます)が返されます。Task.asyncの結果は、Task.await(<タスクディスクリプタ>)とすれば取得できます。

まとめると、Elixirのプログラムで新たなプロセスを生成して、バックグラウンドで処理を行わせる定型のパターンは、以下のようになります。

  • Task.asyncで処理を行うプロセスを生成
  • Task.awaitで結果を取得
4

IEx上で試してみましょう。

# 1.Task.asyncで引数の関数を処理するプロセスを生成
iex(1)> task = Task.async(fn -> :timer.sleep(4000); "result" end) # 4秒sleepして"result"を返却
%Task{owner: #PID<0.80.0>, pid: #PID<0.82.0>, ref: #Reference<0.0.4.210>} # タスクディスクリプタ

# 2.Task.awaitで結果を取得する
iex(2)> Task.await(task)
"result"

いまの例では、複数個のプロセスでWorker.sleepを実行させたいので、Enum.mapを使ってタスクディスクリプタのリストを生成し、それをさらにEnum.mapを使って結果のリストに変換します。

iex(1)> 1 .. 100 |> Enum.map(fn(_a) -> Worker.random() end) |> # sleepさせる整数のリスト
...(1)> Enum.map(fn(t) -> Task.async(Worker, :sleep, [t]) end) |> # タスクディスクリプタのリスト
...(1)> Enum.map(fn(t) -> Task.await(t) end) # Worker.sleepの結果のリスト
sleep(466) started.
sleep(453) started.
sleep(761) started.
sleep(856) started.
~略~
sleep(968) ended.
sleep(994) ended.
["result-sleep(234)", "result-sleep(894)", "result-sleep(38)",
~略~
 "result-sleep(307)", "result-sleep(912)", ...]
iex(2)>

exex_seqと同様に、Workerモジュールにexec_conとして並行実行の処理を定義しましょう。

defmodule Worker do
  def sleep(n) do
    # 略
  end

  def random do
    # 略
  end

  def exec_seq do
    # 略
  end

  def exec_con do
  IO.puts "===== 並行実行開始 ====="
  result = 1 .. 100
           |> Enum.map(fn(_) -> random() end)
           |> Enum.map(fn(t) -> Task.async(Worker, :sleep, [t]) end)
           |> Enum.map(fn(d) -> Task.await(d) end)

  IO.puts "===== 並行実行結果 ====="
  result
  end
end
4

コンパイルして逐次実行と並行実行にかかる時間を比べてみる

逐次実行のサンプルであるWorker.exec_seqと、並行実行のサンプルであるWorker.exec_conは、処理の内容は同じですが、処理時間が大きく異なります。実際に両者の実行時間を計測して確かめてみましょう。

実行時間を計測するために、ソースコードをスクリプトとしてIExで実行するのではなく、ErlangのVM上で実行できるバイナリファイルにコンパイルしてから実行しましょう。Workerモジュールは、すべてworker.exというファイルに実装していたので、これをソースコードとしてコンパイルする手順を説明します。

(Elixirのソースコードを保存するファイルの拡張子には.ex.exsの2種類があり、どちらも実行可能なソースコード(スクリプト)なのですが、慣習としてコンパイルするソースコードには.ex、スクリプトとして実行するソースコードには.exsという拡張子を使います。)

Elixirコードのコンパイルは、elixircというコマンドで行います。

$ ls
worker.ex
$ elixirc worker.ex # worker.exをコンパイル
$ ls
Elixir.Worker.beam worker.ex
$

コンパイルに成功すると、上記のように、Elixir.Worker.beamというバイナリファイルが生成されているはずです.beamはErlangのVM上で動作するバイナリファイルを表す拡張子です)

生成されたバイナリファイルを実行するには、elixirコマンドを使います。elixirコマンドは、実行時、カレントディレクトリ(現在のディレクトリ)にある.beamファイルを自動で読み込みます。

その際に実行するモジュールと関数を、-eオプションを使ってelixir -e <モジュール>.<関数>、または文字列としてelixir -e "モジュール.関数(引数)"のように指定します(ソースコードであるworker.exには、Workerモジュール内の関数として、exec_seqexec_palを定義していたことを思い出してください)

$ elixir -e "Worker.sleep(3)"
sleep(3) started.
sleep(3) ended.

$ elixir -e Worker.exec_seq

 逐次実行開始 

sleep(261) started.
sleep(261) ended.
~略~
sleep(944) ended.

 逐次実行結果 


$ elixir -e Worker.exec_con

 並行実行開始 

sleep(93) started.
sleep(85) started.
~略~
sleep(996) ended.

 並行実行結果 

$

実際にexec_seqexec_conを実行してみればわかりますが、両者の実行時間は大きく異なります。筆者の手元の環境でtimeコマンドを使って実行時間を計測した結果を下記に示します。

$ time elixir -e Worker.exec_seq

 逐次実行開始 

sleep(834) started.
~略~
sleep(970) ended.

 逐次実行結果 

elixir -e Worker.exec_seq  0.29s user 0.16s system 0% cpu 50.947 total

$ time elixir -e Worker.exec_con

 並行実行開始 

sleep(823) started.
~略~

 並行実行結果 

elixir -e Worker.exec_con  0.28s user 0.16s system 34% cpu 1.305 total
$

上記の例では、ランダムな整数を100個生成してそれぞれの時間だけスリープする処理をすべて終えるまでに、逐次実行では50.9秒、並行実行では1.3秒と大きく処理時間が異なっています。

この差は、同時に実行させる処理の数を増やせば増やすほど大きくなります。ぜひ、exec_seqexec_palの定義を自分で書き換えて確かめてみてください。

5

チュートリアルが終わったら次に何をすべきか

エンジニアHubに会員登録すると
続きをお読みいただけます(無料)。
登録のメリット
  • すべての過去記事を読める
  • 過去のウェビナー動画を
    視聴できる
  • 企業やエージェントから
    スカウトが届く