diff --git a/.changeset/wise-garlics-kneel.md b/.changeset/wise-garlics-kneel.md new file mode 100644 index 0000000000..d00c16cf5b --- /dev/null +++ b/.changeset/wise-garlics-kneel.md @@ -0,0 +1,5 @@ +--- +'@core/electric-telemetry': patch +--- + +Restore the inclusion of stack_id in CallHomeReporter's static_info. diff --git a/packages/electric-telemetry/lib/electric/telemetry/call_home_reporter.ex b/packages/electric-telemetry/lib/electric/telemetry/call_home_reporter.ex index e55434d331..e632d557b9 100644 --- a/packages/electric-telemetry/lib/electric/telemetry/call_home_reporter.ex +++ b/packages/electric-telemetry/lib/electric/telemetry/call_home_reporter.ex @@ -41,8 +41,10 @@ defmodule ElectricTelemetry.CallHomeReporter do def report_home(telemetry_url, results) do # Isolate the request in a separate task to avoid blocking and - # to not receive any messages from the HTTP pool internals - Task.start(fn -> Req.post!(telemetry_url, json: results, retry: :transient) end) + # to not receive any messages from the HTTP pool internals. + # The task process must be linked to CallHomeReporter to avoid orphaned processes when the + # CallHomeReporter is shut down deliberately by its supervisor. + Task.async(fn -> Req.post!(telemetry_url, json: results, retry: :transient) end) :ok end @@ -52,6 +54,7 @@ defmodule ElectricTelemetry.CallHomeReporter do defp cast_time_to_ms({time, :minute}), do: time * 60 * 1000 defp cast_time_to_ms({time, :second}), do: time * 1000 + defp cast_time_to_ms({time, :millisecond}), do: time @impl GenServer def init(opts) do @@ -173,6 +176,19 @@ defmodule ElectricTelemetry.CallHomeReporter do {:noreply, state} end + # Catch-all clauses to handle the result, EXIT and DOWN messages from the async task started in `report_home()`. + def handle_info({task_mon, %Req.Response{}}, state) when is_reference(task_mon) do + {:noreply, state} + end + + def handle_info({:EXIT, _, _}, state) do + {:noreply, state} + end + + def handle_info({:DOWN, _, :process, _, _}, state) do + {:noreply, state} + end + defp build_report(state) do %{ last_reported: state.last_reported, diff --git a/packages/electric-telemetry/lib/electric/telemetry/opts.ex b/packages/electric-telemetry/lib/electric/telemetry/opts.ex index 3ffc16a7db..93672278cf 100644 --- a/packages/electric-telemetry/lib/electric/telemetry/opts.ex +++ b/packages/electric-telemetry/lib/electric/telemetry/opts.ex @@ -73,6 +73,20 @@ defmodule ElectricTelemetry.Opts do resource: [type: :map, default: %{}] ], default: [] + ], + call_home_reporter_opts: [ + type: :keyword_list, + keys: [ + first_report_in: [ + type: {:tuple, [:pos_integer, {:in, [:millisecond, :second, :minute]}]}, + default: {2, :minute} + ], + reporting_period: [ + type: {:tuple, [:pos_integer, {:in, [:millisecond, :second, :minute]}]}, + default: {30, :minute} + ] + ], + default: [] ] ] end diff --git a/packages/electric-telemetry/lib/electric/telemetry/reporters/call_home_reporter.ex b/packages/electric-telemetry/lib/electric/telemetry/reporters/call_home_reporter.ex index 0ac7d19199..89b51b5849 100644 --- a/packages/electric-telemetry/lib/electric/telemetry/reporters/call_home_reporter.ex +++ b/packages/electric-telemetry/lib/electric/telemetry/reporters/call_home_reporter.ex @@ -2,17 +2,14 @@ defmodule ElectricTelemetry.Reporters.CallHomeReporter do import Telemetry.Metrics def child_spec(telemetry_opts, reporter_opts) do - if call_home_url = get_in(telemetry_opts, [:reporters, :call_home_url]) do + if call_home_url = telemetry_opts.reporters.call_home_url do start_opts = - Keyword.merge( - [ - static_info: static_info(telemetry_opts), - call_home_url: call_home_url, - first_report_in: {2, :minute}, - reporting_period: {30, :minute} - ], - reporter_opts - ) + [ + static_info: static_info(telemetry_opts), + call_home_url: call_home_url + ] + |> Keyword.merge(telemetry_opts.call_home_reporter_opts) + |> Keyword.merge(reporter_opts) {ElectricTelemetry.CallHomeReporter, start_opts} end @@ -103,14 +100,24 @@ defmodule ElectricTelemetry.Reporters.CallHomeReporter do %{ electric_version: telemetry_opts.version, - environment: %{ - os: %{family: os_family, name: os_name}, - arch: to_string(arch), - cores: processors, - ram: total_mem, - electric_instance_id: Map.fetch!(telemetry_opts, :instance_id), - electric_installation_id: Map.get(telemetry_opts, :installation_id, "electric_default") - } + environment: + %{ + os: %{family: os_family, name: os_name}, + arch: to_string(arch), + cores: processors, + ram: total_mem, + electric_instance_id: Map.fetch!(telemetry_opts, :instance_id), + electric_installation_id: Map.get(telemetry_opts, :installation_id, "electric_default") + } + |> maybe_put(telemetry_opts, :stack_id) } end + + defp maybe_put(map, telemetry_opts, key) do + if val = telemetry_opts[key] do + Map.put(map, key, val) + else + map + end + end end diff --git a/packages/electric-telemetry/mix.exs b/packages/electric-telemetry/mix.exs index 6d5c337c83..34ffc4f187 100644 --- a/packages/electric-telemetry/mix.exs +++ b/packages/electric-telemetry/mix.exs @@ -33,10 +33,11 @@ defmodule ElectricTelemetry.MixProject do defp dev_and_test_deps do [ + {:bypass, "~> 2.1", only: [:test]}, {:dialyxir, "~> 1.4", only: [:test], runtime: false}, + {:ex_doc, ">= 0.0.0", only: :dev, runtime: false}, {:excoveralls, "~> 0.18", only: [:test], runtime: false}, - {:junit_formatter, "~> 3.4", only: [:test], runtime: false}, - {:ex_doc, ">= 0.0.0", only: :dev, runtime: false} + {:junit_formatter, "~> 3.4", only: [:test], runtime: false} ] end diff --git a/packages/electric-telemetry/mix.lock b/packages/electric-telemetry/mix.lock index 25b23fe15d..7e5dd738e9 100644 --- a/packages/electric-telemetry/mix.lock +++ b/packages/electric-telemetry/mix.lock @@ -1,4 +1,8 @@ %{ + "bypass": {:hex, :bypass, "2.1.0", "909782781bf8e20ee86a9cabde36b259d44af8b9f38756173e8f5e2e1fabb9b1", [:mix], [{:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.0", [hex: :plug_cowboy, repo: "hexpm", optional: false]}, {:ranch, "~> 1.3", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "d9b5df8fa5b7a6efa08384e9bbecfe4ce61c77d28a4282f79e02f1ef78d96b80"}, + "cowboy": {:hex, :cowboy, "2.14.2", "4008be1df6ade45e4f2a4e9e2d22b36d0b5aba4e20b0a0d7049e28d124e34847", [:make, :rebar3], [{:cowlib, ">= 2.16.0 and < 3.0.0", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, ">= 1.8.0 and < 3.0.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "569081da046e7b41b5df36aa359be71a0c8874e5b9cff6f747073fc57baf1ab9"}, + "cowboy_telemetry": {:hex, :cowboy_telemetry, "0.4.0", "f239f68b588efa7707abce16a84d0d2acf3a0f50571f8bb7f56a15865aae820c", [:rebar3], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7d98bac1ee4565d31b62d59f8823dfd8356a169e7fcbb83831b8a5397404c9de"}, + "cowlib": {:hex, :cowlib, "2.16.0", "54592074ebbbb92ee4746c8a8846e5605052f29309d3a873468d76cdf932076f", [:make, :rebar3], [], "hexpm", "7f478d80d66b747344f0ea7708c187645cfcc08b11aa424632f78e25bf05db51"}, "dialyxir": {:hex, :dialyxir, "1.4.7", "dda948fcee52962e4b6c5b4b16b2d8fa7d50d8645bbae8b8685c3f9ecb7f5f4d", [:mix], [{:erlex, ">= 0.2.8", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "b34527202e6eb8cee198efec110996c25c5898f43a4094df157f8d28f27d9efe"}, "earmark_parser": {:hex, :earmark_parser, "1.4.44", "f20830dd6b5c77afe2b063777ddbbff09f9759396500cdbe7523efd58d7a339c", [:mix], [], "hexpm", "4778ac752b4701a5599215f7030989c989ffdc4f6df457c5f36938cc2d2a2750"}, "erlex": {:hex, :erlex, "0.2.8", "cd8116f20f3c0afe376d1e8d1f0ae2452337729f68be016ea544a72f767d9c12", [:mix], [], "hexpm", "9d66ff9fedf69e49dc3fd12831e12a8a37b76f8651dd21cd45fcf5561a8a7590"}, @@ -17,7 +21,11 @@ "nimble_parsec": {:hex, :nimble_parsec, "1.4.2", "8efba0122db06df95bfaa78f791344a89352ba04baedd3849593bfce4d0dc1c6", [:mix], [], "hexpm", "4b21398942dda052b403bbe1da991ccd03a053668d147d53fb8c4e0efe09c973"}, "nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"}, "otel_metric_exporter": {:hex, :otel_metric_exporter, "0.4.2", "2cf96ac9879eb06ebde26fa0856e2cd4d5b5f6127eb9ca587532b89a5f981bfc", [:mix], [{:finch, "~> 0.19", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:nimble_options, "~> 1.1", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:protobuf, "~> 0.15", [hex: :protobuf, repo: "hexpm", optional: false]}, {:retry, "~> 0.19", [hex: :retry, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 1.0", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "e7d7ca69a3863a2b84badeb9dd275a369754d951c7c5a4cd5f321be868c6d613"}, + "plug": {:hex, :plug, "1.19.1", "09bac17ae7a001a68ae393658aa23c7e38782be5c5c00c80be82901262c394c0", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "560a0017a8f6d5d30146916862aaf9300b7280063651dd7e532b8be168511e62"}, + "plug_cowboy": {:hex, :plug_cowboy, "2.7.5", "261f21b67aea8162239b2d6d3b4c31efde4daa22a20d80b19c2c0f21b34b270e", [:mix], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:cowboy_telemetry, "~> 0.3", [hex: :cowboy_telemetry, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "20884bf58a90ff5a5663420f5d2c368e9e15ed1ad5e911daf0916ea3c57f77ac"}, + "plug_crypto": {:hex, :plug_crypto, "2.1.1", "19bda8184399cb24afa10be734f84a16ea0a2bc65054e23a62bb10f06bc89491", [:mix], [], "hexpm", "6470bce6ffe41c8bd497612ffde1a7e4af67f36a15eea5f921af71cf3e11247c"}, "protobuf": {:hex, :protobuf, "0.15.0", "c9fc1e9fc1682b05c601df536d5ff21877b55e2023e0466a3855cc1273b74dcb", [:mix], [{:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "5d7bb325319db1d668838d2691c31c7b793c34111aec87d5ee467a39dac6e051"}, + "ranch": {:hex, :ranch, "1.8.1", "208169e65292ac5d333d6cdbad49388c1ae198136e4697ae2f474697140f201c", [:make, :rebar3], [], "hexpm", "aed58910f4e21deea992a67bf51632b6d60114895eb03bb392bb733064594dd0"}, "req": {:hex, :req, "0.5.16", "99ba6a36b014458e52a8b9a0543bfa752cb0344b2a9d756651db1281d4ba4450", [:mix], [{:brotli, "~> 0.3.1", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:finch, "~> 0.17", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mime, "~> 2.0.6 or ~> 2.1", [hex: :mime, repo: "hexpm", optional: false]}, {:nimble_csv, "~> 1.0", [hex: :nimble_csv, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "974a7a27982b9b791df84e8f6687d21483795882a7840e8309abdbe08bb06f09"}, "retry": {:hex, :retry, "0.19.0", "aeb326d87f62295d950f41e1255fe6f43280a1b390d36e280b7c9b00601ccbc2", [:mix], [], "hexpm", "85ef376aa60007e7bff565c366310966ec1bd38078765a0e7f20ec8a220d02ca"}, "telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"}, diff --git a/packages/electric-telemetry/test/electric/telemetry/call_home_reporter_test.exs b/packages/electric-telemetry/test/electric/telemetry/call_home_reporter_test.exs new file mode 100644 index 0000000000..dbe21c8289 --- /dev/null +++ b/packages/electric-telemetry/test/electric/telemetry/call_home_reporter_test.exs @@ -0,0 +1,188 @@ +defmodule ElectricTelemetry.CallHomeReporterTest do + use ExUnit.Case, async: true + + @telemetry_opts [ + instance_id: "test-instance_id", + stack_id: "test-stack", + version: "test-version", + reporters: [ + call_home_url: "...fill this in inside the test case when bypass.port is known..." + ], + call_home_reporter_opts: [first_report_in: {1, :millisecond}] + ] + + setup do + bypass = Bypass.open() + %{bypass: bypass, telemetry_opts: telemetry_opts(bypass)} + end + + test "reports all expected info when started under ApplicationTelemetry", ctx do + add_bypass_expectation(ctx, fn report -> + # We assert the shape of the entire report here but values aren't valid since not enough + # time has passed during the test execution to gather the data. + assert %{ + "data" => %{ + "electric_version" => "test-version", + "environment" => %{ + "arch" => _, + "cores" => _, + "electric_installation_id" => "electric_default", + "electric_instance_id" => "test-instance_id", + "os" => _, + "ram" => _ + }, + "resources" => %{ + "run_queue_cpu" => %{"max" => _, "mean" => _, "min" => _}, + "run_queue_io" => %{"max" => _, "mean" => _, "min" => _}, + "run_queue_total" => %{"max" => _, "mean" => _, "min" => _}, + "uptime" => _, + "used_memory" => %{"max" => _, "mean" => _, "min" => _} + }, + "system" => %{ + "load_avg1" => _, + "load_avg15" => _, + "load_avg5" => _, + "memory_free" => _, + "memory_free_percent" => _, + "memory_used" => _, + "memory_used_percent" => _, + "swap_free" => _, + "swap_free_percent" => _, + "swap_used" => _, + "swap_used_percent" => _ + } + }, + "last_reported" => _, + "report_version" => _, + "timestamp" => _ + } = report + end) + + start_supervised!({ElectricTelemetry.ApplicationTelemetry, ctx.telemetry_opts}) + + assert_receive :bypass_done, 500 + end + + test "reports all expected info when started under StackTelemetry", ctx do + add_bypass_expectation(ctx, fn report -> + # We assert the shape of the entire report here but values aren't valid since not enough + # time has passed during the test execution to gather the data. + assert %{ + "data" => %{ + "electric_version" => "test-version", + "environment" => %{ + "arch" => _, + "cores" => _, + "electric_installation_id" => "electric_default", + "electric_instance_id" => "test-instance_id", + "os" => _, + "pg_version" => _, + "ram" => _, + "stack_id" => "test-stack" + }, + "usage" => %{ + "active_shapes" => _, + "inbound_bytes" => _, + "inbound_operations" => _, + "inbound_transactions" => _, + "live_requests" => _, + "served_bytes" => _, + "stored_bytes" => _, + "stored_operations" => _, + "stored_transactions" => _, + "sync_requests" => _, + "total_shapes" => _, + "total_used_storage_kb" => _, + "unique_clients" => _, + "wal_size" => %{"max" => _, "mean" => _, "min" => _} + } + }, + "last_reported" => _, + "report_version" => _, + "timestamp" => _ + } = report + end) + + start_supervised!({ElectricTelemetry.StackTelemetry, ctx.telemetry_opts}) + + assert_receive :bypass_done, 500 + end + + defp add_bypass_expectation(%{bypass: bypass, telemetry_opts: telemetry_opts}, test_specific_fn) do + test_pid = self() + + Bypass.expect(bypass, "POST", "/checkpoint", fn conn -> + assert {"content-type", "application/json"} in conn.req_headers + assert {:ok, body, conn} = Plug.Conn.read_body(conn) + + report = :json.decode(body) + + assert_call_home_report_common_fields(report, telemetry_opts) + test_specific_fn.(report) + + send(test_pid, :bypass_done) + + Plug.Conn.send_resp(conn, 200, "") + end) + end + + defp telemetry_opts(bypass) do + # CallHomeReporter can work with both a string and a URI struct + url = "http://localhost:#{bypass.port}/checkpoint" |> maybe_parse_url() + put_in(@telemetry_opts, [:reporters, :call_home_url], url) + end + + defp maybe_parse_url(url) do + if :rand.uniform(2) == 1 do + URI.parse(url) + else + url + end + end + + def assert_call_home_report_common_fields(report, telemetry_opts) do + # Extracting only those fields from the report that will be validated in this function. + %{ + "data" => %{ + "environment" => %{ + "arch" => arch, + "cores" => cores, + "os" => %{"family" => "unix", "name" => os_name}, + "ram" => ram + } + }, + "last_reported" => last_reported, + "report_version" => 2, + "timestamp" => timestamp + } = report + + # Execute CallHomeReporter.static_info() here to make the assertions resilient to platform + # variations between different envinronments in which this test will run. + static_info = + telemetry_opts + |> ElectricTelemetry.validate_options() + |> then(fn {:ok, opts} -> + ElectricTelemetry.Reporters.CallHomeReporter.static_info(opts) + end) + + assert arch == static_info.environment.arch + # If you get an assertion failure here when running the test suite on your dev + # machine, please add your arch to the list. + assert arch in ["x86_64-pc-linux-gnu"] + + assert cores == static_info.environment.cores + assert is_integer(cores) + assert cores >= 4 + + assert os_name == to_string(static_info.environment.os.name) + # If you get an assertion failure here, please add your OS name to the list. + assert os_name in ["linux"] + + assert ram == static_info.environment.ram + assert is_integer(ram) + assert ram >= 4 * 1024 * 1024 * 1024 + + assert {:ok, %DateTime{}, 0} = DateTime.from_iso8601(last_reported) + assert {:ok, %DateTime{}, 0} = DateTime.from_iso8601(timestamp) + end +end diff --git a/packages/electric-telemetry/test/test_helper.exs b/packages/electric-telemetry/test/test_helper.exs index 3ddc879f1a..4ae7774e68 100644 --- a/packages/electric-telemetry/test/test_helper.exs +++ b/packages/electric-telemetry/test/test_helper.exs @@ -1,8 +1,2 @@ -# The process registry is implicitly used by processes in the dev, prod and test environments alike. -# -# Explicitly start the process registry here since the OTP application does not start a -# supervision tree in the test environment. -# Registry.start_link(name: Electric.Application.process_registry(), keys: :unique) - ExUnit.configure(formatters: [JUnitFormatter, ExUnit.CLIFormatter]) -ExUnit.start(assert_receive_timeout: 400, exclude: [:slow], capture_log: true) +ExUnit.start(capture_log: true)