2019年2月8日金曜日

elixir map & timeout on async async_stream amd scraping

 
1..100 |> Enum.map(&Task.async(IO, :inspect, [&1])) |> Enum.map(&Task.await/1)
1..100 |> Task.async_stream(IO, :inspect, [], max_concurrency: System.schedulers_online) |> Enum.to_list


ーーーーーーーーーーーーーーーーーーーーーーーーーーー
defmodule Example do

  def file_read_of(n) do
    if File.exists?("result") do :ok = File.rm("result") end
    1..n |> Enum.map(fn n -> "file#{n}" end) |>
  Enum.map(fn name -> load_file(name) end)
String.split(File.read!("result"), "\r\n", trim: true) # ! is needed
  end

  defp load_file(name) do
     {:ok, c} = File.read(name)
  File.write("result", c, [:append])
  end

  def foo(n) do
    if File.exists?("result") do :ok = File.rm("result") end
1..n |> Enum.map(fn n -> "file#{n}" end) |>
    Enum.map( fn name -> (Task.async(fn -> load_file(name) end)) end )
    |> Enum.map(&Task.await/1)
String.split(File.read!("result"), "\r\n", trim: true) # ! is needed
  end

   def goo(n) do
    if File.exists?("result") do :ok = File.rm("result") end
1..n |> Enum.map(fn n -> "file#{n}" end) |>
     Task.async_stream(fn name -> load_file(name) end, [max_concurrency: 10]  ) |>
  Enum.map(fn {:ok, val} -> val end) # asyncでは,この冗長関数が不要!
  String.split(File.read!("result"), "\r\n", trim: true) # ! is needed
  end

 def sum(n) do
 
   goo(n) |> Enum.map(fn item -> String.split(item, ",") end)
   |> Enum.map(fn ( [word, num] ) -> %{"word" => word, "num" => String.to_integer(num)} end)
    |> Enum.reduce(%{}, fn (%{"word" => key, "num" => num} , acc) ->
      acc=Map.update(acc, key, num, fn v ->  v + num end) end)
   # variable acc is unused -- warning occurs but do not mind
   # IO.inspect Map.keys(m)
   # IO.inspect Map.values(m)
  # keyがないときは、初期値は最初値をとる! [%{"word" => "a", "num" => 1},....] に対して
 end
end

# fileN aaa,1\r\nという風になっておりtrim:trueしないと””がふくまれてしまう
# Map.update(m, key, 0, fn v -> v + 1 end)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
いずれも、タイムアウトは5秒でデフォルトだ つまり、非同期はかなり軽い処理に限定

defmodule FIO do
 def fw(n) do
   :timer.sleep(4990) #5000近くなるとタイムアウト頻発,
   #ファイルだけできて書き込まれない事態もおこってしまう
   File.write "file#{n}", "1"
 end
end

defmodule Parallel do
  def pmap(collection, func) do
    collection
    |> Enum.map(&(Task.async(fn -> func.(&1) end)))
    |> Enum.map(&Task.await/1)
  end
end

# Parallel.pmap(1..10,fn(n) -> FIO.fw "file#{n}" end)


defmodule Parallel2 do
  def pmap(collection, m,f) do
    collection  |>
# Task.async_stream(m, f, [], [max_concurrency: 10, timeout: :infinity])
Task.async_stream(m, f, [], [max_concurrency: 10])
   # []に1..nが順にはいっていく
|> Enum.map(fn {:ok, :ok} -> 1 end) # 適当な関数!
  end
end

# Parallel2.pmap(1..10,FIO,:fw)
# timeout: :infinityをいれたらOKだが

--------------------------------------------------------------------------------

 https://www.souya.biz/blog2/pinevillage/2017/02/14/elixir%E3%81%A7%E3%82%AF%E3%83%AD%E3%83%BC%E3%83%A9%E3%83%BC%E4%BD%9C%E6%88%90-%E3%82%B9%E3%82%AF%E3%83%AC%E3%82%A4%E3%83%94%E3%83%B3%E3%82%B0%E7%B7%A8/  => scraing example site

0 件のコメント:

コメントを投稿