Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/wise-garlics-kneel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@core/electric-telemetry': patch
---

Restore the inclusion of stack_id in CallHomeReporter's static_info.
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ defmodule ElectricTelemetry.CallHomeReporter do

def report_home(telemetry_url, results) do
# Isolate the request in a separate task to avoid blocking and
# to not receive any messages from the HTTP pool internals
Task.start(fn -> Req.post!(telemetry_url, json: results, retry: :transient) end)
# to not receive any messages from the HTTP pool internals.
# The task process must be linked to CallHomeReporter to avoid orphaned processes when the
# CallHomeReporter is shut down deliberately by its supervisor.
Task.async(fn -> Req.post!(telemetry_url, json: results, retry: :transient) end)
:ok
end

Expand All @@ -52,6 +54,7 @@ defmodule ElectricTelemetry.CallHomeReporter do

defp cast_time_to_ms({time, :minute}), do: time * 60 * 1000
defp cast_time_to_ms({time, :second}), do: time * 1000
defp cast_time_to_ms({time, :millisecond}), do: time

@impl GenServer
def init(opts) do
Expand Down Expand Up @@ -173,6 +176,19 @@ defmodule ElectricTelemetry.CallHomeReporter do
{:noreply, state}
end

# Catch-all clauses to handle the result, EXIT and DOWN messages from the async task started in `report_home()`.
def handle_info({task_mon, %Req.Response{}}, state) when is_reference(task_mon) do
{:noreply, state}
end

def handle_info({:EXIT, _, _}, state) do
{:noreply, state}
end

def handle_info({:DOWN, _, :process, _, _}, state) do
{:noreply, state}
end

defp build_report(state) do
%{
last_reported: state.last_reported,
Expand Down
14 changes: 14 additions & 0 deletions packages/electric-telemetry/lib/electric/telemetry/opts.ex
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,20 @@ defmodule ElectricTelemetry.Opts do
resource: [type: :map, default: %{}]
],
default: []
],
call_home_reporter_opts: [
type: :keyword_list,
keys: [
first_report_in: [
type: {:tuple, [:pos_integer, {:in, [:millisecond, :second, :minute]}]},
default: {2, :minute}
],
reporting_period: [
type: {:tuple, [:pos_integer, {:in, [:millisecond, :second, :minute]}]},
default: {30, :minute}
]
],
default: []
]
]
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,14 @@ defmodule ElectricTelemetry.Reporters.CallHomeReporter do
import Telemetry.Metrics

def child_spec(telemetry_opts, reporter_opts) do
if call_home_url = get_in(telemetry_opts, [:reporters, :call_home_url]) do
if call_home_url = telemetry_opts.reporters.call_home_url do
start_opts =
Keyword.merge(
[
static_info: static_info(telemetry_opts),
call_home_url: call_home_url,
first_report_in: {2, :minute},
reporting_period: {30, :minute}
],
reporter_opts
)
[
static_info: static_info(telemetry_opts),
call_home_url: call_home_url
]
|> Keyword.merge(telemetry_opts.call_home_reporter_opts)
|> Keyword.merge(reporter_opts)

{ElectricTelemetry.CallHomeReporter, start_opts}
end
Expand Down Expand Up @@ -103,14 +100,24 @@ defmodule ElectricTelemetry.Reporters.CallHomeReporter do

%{
electric_version: telemetry_opts.version,
environment: %{
os: %{family: os_family, name: os_name},
arch: to_string(arch),
cores: processors,
ram: total_mem,
electric_instance_id: Map.fetch!(telemetry_opts, :instance_id),
electric_installation_id: Map.get(telemetry_opts, :installation_id, "electric_default")
}
environment:
%{
os: %{family: os_family, name: os_name},
arch: to_string(arch),
cores: processors,
ram: total_mem,
electric_instance_id: Map.fetch!(telemetry_opts, :instance_id),
electric_installation_id: Map.get(telemetry_opts, :installation_id, "electric_default")
}
|> maybe_put(telemetry_opts, :stack_id)
}
end

defp maybe_put(map, telemetry_opts, key) do
if val = telemetry_opts[key] do
Map.put(map, key, val)
else
map
end
end
end
5 changes: 3 additions & 2 deletions packages/electric-telemetry/mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ defmodule ElectricTelemetry.MixProject do

defp dev_and_test_deps do
[
{:bypass, "~> 2.1", only: [:test]},
{:dialyxir, "~> 1.4", only: [:test], runtime: false},
{:ex_doc, ">= 0.0.0", only: :dev, runtime: false},
{:excoveralls, "~> 0.18", only: [:test], runtime: false},
{:junit_formatter, "~> 3.4", only: [:test], runtime: false},
{:ex_doc, ">= 0.0.0", only: :dev, runtime: false}
{:junit_formatter, "~> 3.4", only: [:test], runtime: false}
]
end

Expand Down
8 changes: 8 additions & 0 deletions packages/electric-telemetry/mix.lock
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
%{
"bypass": {:hex, :bypass, "2.1.0", "909782781bf8e20ee86a9cabde36b259d44af8b9f38756173e8f5e2e1fabb9b1", [:mix], [{:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.0", [hex: :plug_cowboy, repo: "hexpm", optional: false]}, {:ranch, "~> 1.3", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "d9b5df8fa5b7a6efa08384e9bbecfe4ce61c77d28a4282f79e02f1ef78d96b80"},
"cowboy": {:hex, :cowboy, "2.14.2", "4008be1df6ade45e4f2a4e9e2d22b36d0b5aba4e20b0a0d7049e28d124e34847", [:make, :rebar3], [{:cowlib, ">= 2.16.0 and < 3.0.0", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, ">= 1.8.0 and < 3.0.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "569081da046e7b41b5df36aa359be71a0c8874e5b9cff6f747073fc57baf1ab9"},
"cowboy_telemetry": {:hex, :cowboy_telemetry, "0.4.0", "f239f68b588efa7707abce16a84d0d2acf3a0f50571f8bb7f56a15865aae820c", [:rebar3], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7d98bac1ee4565d31b62d59f8823dfd8356a169e7fcbb83831b8a5397404c9de"},
"cowlib": {:hex, :cowlib, "2.16.0", "54592074ebbbb92ee4746c8a8846e5605052f29309d3a873468d76cdf932076f", [:make, :rebar3], [], "hexpm", "7f478d80d66b747344f0ea7708c187645cfcc08b11aa424632f78e25bf05db51"},
"dialyxir": {:hex, :dialyxir, "1.4.7", "dda948fcee52962e4b6c5b4b16b2d8fa7d50d8645bbae8b8685c3f9ecb7f5f4d", [:mix], [{:erlex, ">= 0.2.8", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "b34527202e6eb8cee198efec110996c25c5898f43a4094df157f8d28f27d9efe"},
"earmark_parser": {:hex, :earmark_parser, "1.4.44", "f20830dd6b5c77afe2b063777ddbbff09f9759396500cdbe7523efd58d7a339c", [:mix], [], "hexpm", "4778ac752b4701a5599215f7030989c989ffdc4f6df457c5f36938cc2d2a2750"},
"erlex": {:hex, :erlex, "0.2.8", "cd8116f20f3c0afe376d1e8d1f0ae2452337729f68be016ea544a72f767d9c12", [:mix], [], "hexpm", "9d66ff9fedf69e49dc3fd12831e12a8a37b76f8651dd21cd45fcf5561a8a7590"},
Expand All @@ -17,7 +21,11 @@
"nimble_parsec": {:hex, :nimble_parsec, "1.4.2", "8efba0122db06df95bfaa78f791344a89352ba04baedd3849593bfce4d0dc1c6", [:mix], [], "hexpm", "4b21398942dda052b403bbe1da991ccd03a053668d147d53fb8c4e0efe09c973"},
"nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"},
"otel_metric_exporter": {:hex, :otel_metric_exporter, "0.4.2", "2cf96ac9879eb06ebde26fa0856e2cd4d5b5f6127eb9ca587532b89a5f981bfc", [:mix], [{:finch, "~> 0.19", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:nimble_options, "~> 1.1", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:protobuf, "~> 0.15", [hex: :protobuf, repo: "hexpm", optional: false]}, {:retry, "~> 0.19", [hex: :retry, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 1.0", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "e7d7ca69a3863a2b84badeb9dd275a369754d951c7c5a4cd5f321be868c6d613"},
"plug": {:hex, :plug, "1.19.1", "09bac17ae7a001a68ae393658aa23c7e38782be5c5c00c80be82901262c394c0", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "560a0017a8f6d5d30146916862aaf9300b7280063651dd7e532b8be168511e62"},
"plug_cowboy": {:hex, :plug_cowboy, "2.7.5", "261f21b67aea8162239b2d6d3b4c31efde4daa22a20d80b19c2c0f21b34b270e", [:mix], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:cowboy_telemetry, "~> 0.3", [hex: :cowboy_telemetry, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "20884bf58a90ff5a5663420f5d2c368e9e15ed1ad5e911daf0916ea3c57f77ac"},
"plug_crypto": {:hex, :plug_crypto, "2.1.1", "19bda8184399cb24afa10be734f84a16ea0a2bc65054e23a62bb10f06bc89491", [:mix], [], "hexpm", "6470bce6ffe41c8bd497612ffde1a7e4af67f36a15eea5f921af71cf3e11247c"},
"protobuf": {:hex, :protobuf, "0.15.0", "c9fc1e9fc1682b05c601df536d5ff21877b55e2023e0466a3855cc1273b74dcb", [:mix], [{:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "5d7bb325319db1d668838d2691c31c7b793c34111aec87d5ee467a39dac6e051"},
"ranch": {:hex, :ranch, "1.8.1", "208169e65292ac5d333d6cdbad49388c1ae198136e4697ae2f474697140f201c", [:make, :rebar3], [], "hexpm", "aed58910f4e21deea992a67bf51632b6d60114895eb03bb392bb733064594dd0"},
"req": {:hex, :req, "0.5.16", "99ba6a36b014458e52a8b9a0543bfa752cb0344b2a9d756651db1281d4ba4450", [:mix], [{:brotli, "~> 0.3.1", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:finch, "~> 0.17", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mime, "~> 2.0.6 or ~> 2.1", [hex: :mime, repo: "hexpm", optional: false]}, {:nimble_csv, "~> 1.0", [hex: :nimble_csv, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "974a7a27982b9b791df84e8f6687d21483795882a7840e8309abdbe08bb06f09"},
"retry": {:hex, :retry, "0.19.0", "aeb326d87f62295d950f41e1255fe6f43280a1b390d36e280b7c9b00601ccbc2", [:mix], [], "hexpm", "85ef376aa60007e7bff565c366310966ec1bd38078765a0e7f20ec8a220d02ca"},
"telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
defmodule ElectricTelemetry.CallHomeReporterTest do
use ExUnit.Case, async: true

@telemetry_opts [
instance_id: "test-instance_id",
stack_id: "test-stack",
version: "test-version",
reporters: [
call_home_url: "...fill this in inside the test case when bypass.port is known..."
],
call_home_reporter_opts: [first_report_in: {1, :millisecond}]
]

setup do
bypass = Bypass.open()
%{bypass: bypass, telemetry_opts: telemetry_opts(bypass)}
end

test "reports all expected info when started under ApplicationTelemetry", ctx do
add_bypass_expectation(ctx, fn report ->
# We assert the shape of the entire report here but values aren't valid since not enough
# time has passed during the test execution to gather the data.
assert %{
"data" => %{
"electric_version" => "test-version",
"environment" => %{
"arch" => _,
"cores" => _,
"electric_installation_id" => "electric_default",
"electric_instance_id" => "test-instance_id",
"os" => _,
"ram" => _
},
"resources" => %{
"run_queue_cpu" => %{"max" => _, "mean" => _, "min" => _},
"run_queue_io" => %{"max" => _, "mean" => _, "min" => _},
"run_queue_total" => %{"max" => _, "mean" => _, "min" => _},
"uptime" => _,
"used_memory" => %{"max" => _, "mean" => _, "min" => _}
},
"system" => %{
"load_avg1" => _,
"load_avg15" => _,
"load_avg5" => _,
"memory_free" => _,
"memory_free_percent" => _,
"memory_used" => _,
"memory_used_percent" => _,
"swap_free" => _,
"swap_free_percent" => _,
"swap_used" => _,
"swap_used_percent" => _
}
},
"last_reported" => _,
"report_version" => _,
"timestamp" => _
} = report
end)

start_supervised!({ElectricTelemetry.ApplicationTelemetry, ctx.telemetry_opts})

assert_receive :bypass_done, 500
end

test "reports all expected info when started under StackTelemetry", ctx do
add_bypass_expectation(ctx, fn report ->
# We assert the shape of the entire report here but values aren't valid since not enough
# time has passed during the test execution to gather the data.
assert %{
"data" => %{
"electric_version" => "test-version",
"environment" => %{
"arch" => _,
"cores" => _,
"electric_installation_id" => "electric_default",
"electric_instance_id" => "test-instance_id",
"os" => _,
"pg_version" => _,
"ram" => _,
"stack_id" => "test-stack"
},
"usage" => %{
"active_shapes" => _,
"inbound_bytes" => _,
"inbound_operations" => _,
"inbound_transactions" => _,
"live_requests" => _,
"served_bytes" => _,
"stored_bytes" => _,
"stored_operations" => _,
"stored_transactions" => _,
"sync_requests" => _,
"total_shapes" => _,
"total_used_storage_kb" => _,
"unique_clients" => _,
"wal_size" => %{"max" => _, "mean" => _, "min" => _}
}
},
"last_reported" => _,
"report_version" => _,
"timestamp" => _
} = report
end)

start_supervised!({ElectricTelemetry.StackTelemetry, ctx.telemetry_opts})

assert_receive :bypass_done, 500
end

defp add_bypass_expectation(%{bypass: bypass, telemetry_opts: telemetry_opts}, test_specific_fn) do
test_pid = self()

Bypass.expect(bypass, "POST", "/checkpoint", fn conn ->
assert {"content-type", "application/json"} in conn.req_headers
assert {:ok, body, conn} = Plug.Conn.read_body(conn)

report = :json.decode(body)

assert_call_home_report_common_fields(report, telemetry_opts)
test_specific_fn.(report)

send(test_pid, :bypass_done)

Plug.Conn.send_resp(conn, 200, "")
end)
end

defp telemetry_opts(bypass) do
# CallHomeReporter can work with both a string and a URI struct
url = "http://localhost:#{bypass.port}/checkpoint" |> maybe_parse_url()
put_in(@telemetry_opts, [:reporters, :call_home_url], url)
end

defp maybe_parse_url(url) do
if :rand.uniform(2) == 1 do
URI.parse(url)
else
url
end
end

def assert_call_home_report_common_fields(report, telemetry_opts) do
# Extracting only those fields from the report that will be validated in this function.
%{
"data" => %{
"environment" => %{
"arch" => arch,
"cores" => cores,
"os" => %{"family" => "unix", "name" => os_name},
"ram" => ram
}
},
"last_reported" => last_reported,
"report_version" => 2,
"timestamp" => timestamp
} = report

# Execute CallHomeReporter.static_info() here to make the assertions resilient to platform
# variations between different envinronments in which this test will run.
static_info =
telemetry_opts
|> ElectricTelemetry.validate_options()
|> then(fn {:ok, opts} ->
ElectricTelemetry.Reporters.CallHomeReporter.static_info(opts)
end)

assert arch == static_info.environment.arch
# If you get an assertion failure here when running the test suite on your dev
# machine, please add your arch to the list.
assert arch in ["x86_64-pc-linux-gnu"]

assert cores == static_info.environment.cores
assert is_integer(cores)
assert cores >= 4

assert os_name == to_string(static_info.environment.os.name)
# If you get an assertion failure here, please add your OS name to the list.
assert os_name in ["linux"]

assert ram == static_info.environment.ram
assert is_integer(ram)
assert ram >= 4 * 1024 * 1024 * 1024

assert {:ok, %DateTime{}, 0} = DateTime.from_iso8601(last_reported)
assert {:ok, %DateTime{}, 0} = DateTime.from_iso8601(timestamp)
end
end
8 changes: 1 addition & 7 deletions packages/electric-telemetry/test/test_helper.exs
Original file line number Diff line number Diff line change
@@ -1,8 +1,2 @@
# The process registry is implicitly used by processes in the dev, prod and test environments alike.
#
# Explicitly start the process registry here since the OTP application does not start a
# supervision tree in the test environment.
# Registry.start_link(name: Electric.Application.process_registry(), keys: :unique)

ExUnit.configure(formatters: [JUnitFormatter, ExUnit.CLIFormatter])
ExUnit.start(assert_receive_timeout: 400, exclude: [:slow], capture_log: true)
ExUnit.start(capture_log: true)
Loading