mix new batch_job --sup
Add the req library to the mix.exs dependencies:
{:req, git: "https://github.com/wojtekmach/req.git"}
and run mix deps.get
to install.
Setup the rate limiting module in lib/batch_job/rate_limiter.ex
:
defmodule BatchJob.RateLimiter do
use GenServer
require Logger
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
@impl true
def init(opts) do
state = %{
request_queue: :queue.new(),
request_queue_size: 0,
request_queue_poll_rate:
calculate_refresh_rate(opts.timeframe_max_requests, opts.timeframe, opts.timeframe_units),
send_after_ref: nil
}
{:ok, state, {:continue, :initial_timer}}
end
# ---- Client facing function ----
def queue_request(request_handler, response_handler) do
GenServer.cast(__MODULE__, {:enqueue_request, request_handler, response_handler})
end
def next_request(request_handler, response_handler) do
GenServer.cast(__MODULE__, {:enqueue_front_request, request_handler, response_handler})
end
# ---- Server Callbacks ----
@impl true
def handle_continue(:initial_timer, state) do
{:noreply, %{state | send_after_ref: schedule_timer(state.request_queue_poll_rate)}}
end
@impl true
def handle_cast({:enqueue_request, request_handler, response_handler}, state) do
updated_queue = :queue.in({request_handler, response_handler}, state.request_queue)
new_queue_size = state.request_queue_size + 1
{:noreply, %{state | request_queue: updated_queue, request_queue_size: new_queue_size}}
end
@impl true
def handle_cast({:enqueue_front_request, request_handler, response_handler}, state) do
updated_queue = :queue.in_r({request_handler, response_handler}, state.request_queue)
new_queue_size = state.request_queue_size + 1
{:noreply, %{state | request_queue: updated_queue, request_queue_size: new_queue_size}}
end
@impl true
def handle_info(:pop_from_request_queue, %{request_queue_size: 0} = state) do
# No work to do as the queue size is zero...schedule the next timer
{:noreply, %{state | send_after_ref: schedule_timer(state.request_queue_poll_rate)}}
end
@impl true
def handle_info(:pop_from_request_queue, state) do
{{:value, {request_handler, response_handler}}, new_request_queue} = :queue.out(state.request_queue)
start_message = "Request started #{NaiveDateTime.utc_now()}"
Task.Supervisor.async_nolink(BatchJob.TaskSupervisor, fn ->
{req_module, req_function, req_args} = request_handler
{resp_module, resp_function} = response_handler
response = apply(req_module, req_function, req_args)
apply(resp_module, resp_function, [response])
Logger.info("#{start_message}\nRequest completed #{NaiveDateTime.utc_now()}")
end)
{:noreply,
%{
state
| request_queue: new_request_queue,
send_after_ref: schedule_timer(state.request_queue_poll_rate),
request_queue_size: state.request_queue_size - 1
}}
end
@impl true
def handle_info({ref, _result}, state) do
Process.demonitor(ref, [:flush])
{:noreply, state}
end
@impl true
def handle_info({:DOWN, _ref, :process, _pid, _reason}, state) do
{:noreply, state}
end
def get_requests_per_timeframe, do: get_rate_limiter_config(:timeframe_max_requests)
def get_timeframe_unit, do: get_rate_limiter_config(:timeframe_units)
def get_timeframe, do: get_rate_limiter_config(:timeframe)
defp calculate_refresh_rate(num_requests, time, timeframe_units) do
floor(convert_time_to_milliseconds(timeframe_units, time) / num_requests)
end
defp convert_time_to_milliseconds(:hours, time), do: :timer.hours(time)
defp convert_time_to_milliseconds(:minutes, time), do: :timer.minutes(time)
defp convert_time_to_milliseconds(:seconds, time), do: :timer.seconds(time)
defp convert_time_to_milliseconds(:milliseconds, milliseconds), do: milliseconds
defp schedule_timer(queue_poll_rate) do
Process.send_after(self(), :pop_from_request_queue, queue_poll_rate)
end
end
Setup the reqest module that will handle making the request to the external API:
defmodule BatchJob.Request do
require Logger
@baseUrl "https://external-api-mock.com"
def make_api_request(arg) do
Req.build(:get, "#{@baseUrl}/api/endpoint/#{arg}", headers: [{"X-ApiKey", "api-key"}, {"Accept", "application/json"}], finch: BatchJob.Finch)
|> Req.add_default_steps(retry: true)
|> Req.run!()
end
end
The configuration for both the rate limiter and Finch request pools are supplied when launching the processes in application.ex
:
def start(_type, _args) do
children = [
{Task.Supervisor, name: BatchJob.TaskSupervisor},
{BatchJob.RateLimiter,
%{
timeframe_max_requests: 600,
timeframe_units: :seconds,
timeframe: 60
}},
{Finch,
name: BatchJob.Finch,
pools: %{
:default => [size: 30, count: 1],
}}
]
opts = [strategy: :one_for_one, name: BatchJob.Supervisor]
Supervisor.start_link(children, opts)
end
Finally the client facing API is created to allow queueing all the requests for the job, in lib/batch_job.ex
:
defmodule BatchJob do
require Logger
alias BatchJob.{RateLimiter,Request}
def enqueue_batch_tasks() do
read_task_inputs()
|> Enum.each(fn input -> enqueue_task(input) end)
end
def enqueue_task(input) do
RateLimiter.queue_request({Request, :make_api_request, [input]}, {BatchJob, :log_result})
end
def log_result(%{status: status, body: body}) do
{:ok, file} = File.open "results.log", [:append, {:delayed_write, 100, 20}]
IO.puts(file, "#{status} - #{Jason.encode!(body)}")
File.close file
end
def read_task_inputs() do
File.read!("batch_inputs.txt")
|> String.trim()
|> String.split("\n")
end
end
With this setup, the app can be started with iex (iex -S mix
) and then all the requests can be queued up with BatchJob.enqueue_batch_tasks()
.