Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion pkg/dyninst/module/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func makeRealDependencies(
return ret, fmt.Errorf("error getting monotonic time: %w", err)
}
ret.dispatcher = dispatcher.NewDispatcher(ret.loader.OutputReader())
ret.procSubscriber = procsubscribe.NewRemoteConfigProcessSubscriber(
ret.procSubscriber = procsubscribe.NewSubscriber(
remoteConfigSubscriber,
)

Expand Down Expand Up @@ -295,6 +295,19 @@ func (m *Module) Register(router *module.Router) error {
},
),
)
// Handler for printing debug information about the known Go processes with
// the Datadog tracer. These processes are watched for Remote Config updates
// related to Dynamic Instrumentation.
router.HandleFunc(
"/debug/goprocs",
utils.WithConcurrencyLimit(
utils.DefaultMaxConcurrentRequests,
func(w http.ResponseWriter, _ *http.Request) {
report := m.shutdown.realDependencies.procSubscriber.GetReport()
utils.WriteAsJSON(w, report, utils.PrettyPrint)
},
),
)
return nil
}

Expand Down
32 changes: 25 additions & 7 deletions pkg/dyninst/procsubscribe/procscan/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"fmt"
"io/fs"
"iter"
"sync"
"syscall"
"time"

Expand Down Expand Up @@ -44,8 +45,11 @@ type Scanner struct {
// nowTicks returns the current time in ticks since boot.
nowTicks func() (ticks, error)

// live tracks discovered processes that have been reported as live.
live *btree.BTreeG[uint32]
mu struct {
sync.Mutex
// live tracks discovered processes that have been reported as live.
live *btree.BTreeG[uint32]
}

// listPids returns an iterator over all PIDs in the system.
listPids func() iter.Seq2[uint32, error]
Expand Down Expand Up @@ -102,15 +106,16 @@ func newScanner(
tracerMetadataReader func(pid int32) (tracermetadata.TracerMetadata, error),
resolveExecutable func(pid int32) (process.Executable, error),
) *Scanner {
return &Scanner{
s := &Scanner{
startDelay: startDelay,
nowTicks: nowTicks,
listPids: listPids,
readStartTime: readStartTime,
tracerMetadataReader: tracerMetadataReader,
resolveExecutable: resolveExecutable,
live: btree.NewG(16, cmp.Less[uint32]),
}
s.mu.live = btree.NewG(16, cmp.Less[uint32])
return s
}

// DiscoveredProcess represents a newly discovered process that should be
Expand Down Expand Up @@ -138,6 +143,8 @@ func (p *Scanner) Scan() (
removed []ProcessID,
err error,
) {
p.mu.Lock()
defer p.mu.Unlock()
now, err := p.nowTicks()
if err != nil {
return nil, nil, fmt.Errorf("get timestamp: %w", err)
Expand Down Expand Up @@ -169,7 +176,7 @@ func (p *Scanner) Scan() (

// Clone the live set. Processes still alive will be removed from this
// clone. Whatever remains has exited.
noLongerLive := p.live.Clone()
noLongerLive := p.mu.live.Clone()
var ret []DiscoveredProcess

for pid, err := range p.listPids() {
Expand Down Expand Up @@ -207,7 +214,7 @@ func (p *Scanner) Scan() (
continue
}

p.live.ReplaceOrInsert(pid)
p.mu.live.ReplaceOrInsert(pid)
ret = append(ret, DiscoveredProcess{
PID: pid,
StartTimeTicks: uint64(startTime),
Expand All @@ -219,11 +226,22 @@ func (p *Scanner) Scan() (
removed = make([]ProcessID, 0, noLongerLive.Len())
noLongerLive.Ascend(func(pid uint32) bool {
removed = append(removed, ProcessID(pid))
p.live.Delete(pid)
p.mu.live.Delete(pid)
return true
})
noLongerLive.Clear(true)

p.lastWatermark = nextWatermark
return ret, removed, nil
}

func (p *Scanner) LiveProcesses() []ProcessID {
p.mu.Lock()
defer p.mu.Unlock()
ret := make([]ProcessID, 0, p.mu.live.Len())
p.mu.live.Ascend(func(pid uint32) bool {
ret = append(ret, ProcessID(pid))
return true
})
return ret
}
4 changes: 3 additions & 1 deletion pkg/dyninst/procsubscribe/procscan/scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,8 +374,10 @@ type scanOutput struct {
func (ts *scannerTestState) cloneState(
includeStartDelay bool,
) *scannerStateSnapshot {
ts.scanner.mu.Lock()
defer ts.scanner.mu.Unlock()
live := make([]int32, 0)
ts.scanner.live.Ascend(func(pid uint32) bool {
ts.scanner.mu.live.Ascend(func(pid uint32) bool {
live = append(live, int32(pid))
return true
})
Expand Down
90 changes: 85 additions & 5 deletions pkg/dyninst/procsubscribe/remote_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ type Subscriber struct {

mu struct {
sync.Mutex
state subscriberState
state *subscriberState
started bool
pendingRequests []*pbgo.ConfigSubscriptionRequest
callback func(process.ProcessesUpdate)
Expand Down Expand Up @@ -116,9 +116,9 @@ type optionFunc func(*config)

func (f optionFunc) apply(c *config) { f(c) }

// NewRemoteConfigProcessSubscriber creates a ProcessSubscriber that sources
// updates directly from Remote Config.
func NewRemoteConfigProcessSubscriber(
// NewSubscriber creates a Subscriber that sources updates directly from Remote
// Config.
func NewSubscriber(
client RemoteConfigSubscriber,
opts ...Option,
) *Subscriber {
Expand All @@ -139,7 +139,7 @@ func NewRemoteConfigProcessSubscriber(
scanInterval: cfg.scanInterval,
wait: cfg.wait,
}
s.mu.state = makeSubscriberState()
s.mu.state = newSubscriberState()
return s
}

Expand Down Expand Up @@ -398,6 +398,86 @@ func (s *Subscriber) runConnectedStream(
return <-errCh
}

// ProcessReport contains information about a Go process that has been detected
// and is being monitored for Dynamic Instrumentation updates.
type ProcessReport struct {
RuntimeID string
ProcessID int32
Executable process.Executable
SymDBEnabled bool
Probes []ProbeInfo
// ProcessAlive is set if the procscan.Scanner reports the process as alive.
// This should be true, except for races between the report and the Scanner
// recently figuring out that a process is dead.
ProcessAlive bool
}

// ProbeInfo contains information about a probe for the ProcessReport.
type ProbeInfo struct {
ID string
Version int
}

// Report is a snapshot of the current state of the subscriber.
type Report struct {
// Processes contains the state for all the currently tracked processes.
Processes []ProcessReport
// ProcessesNotTracked contains the PIDs of processes known to the scanner
// that are not tracked. This should be empty, except for to race conditions
// between producing this report and the scanner discovering new processes
// that have not been added to the tracked set yet.
ProcessesNotTracked []int32
}

// GetReport returns a snapshot of the current state of the subscriber.
func (s *Subscriber) GetReport() Report {
liveProcs := map[int32]struct{}{}
if scanner, ok := s.scanner.(*procscan.Scanner); ok {
procs := scanner.LiveProcesses()
for _, proc := range procs {
liveProcs[int32(proc)] = struct{}{}
}
}

var ret Report

s.mu.Lock()
defer s.mu.Unlock()
s.mu.state.mu.Lock()
defer s.mu.state.mu.Unlock()

for _, entry := range s.mu.state.mu.tracked {
pid := entry.Info.ProcessID.PID
_, alive := liveProcs[pid]
pr := ProcessReport{
RuntimeID: entry.runtimeID,
ProcessID: pid,
Executable: entry.Executable,
SymDBEnabled: entry.symdbEnabled,
ProcessAlive: alive,
}
for _, probe := range entry.probesByPath {
pr.Probes = append(pr.Probes, ProbeInfo{
ID: probe.GetID(),
Version: probe.GetVersion(),
})
}
ret.Processes = append(ret.Processes, pr)
}
// Look for processes known to the scanner that are not tracked. There
// should be no such processes, modulo race conditions between producing
// this report and the scanner discovering new processes that have not been
// added to the tracked set yet.
for pid := range liveProcs {
_, ok := s.mu.state.mu.pidToRuntime[pid]
if ok {
continue
}
ret.ProcessesNotTracked = append(ret.ProcessesNotTracked, pid)
}
return ret
}

type parsedRemoteConfigUpdate struct {
probes map[string]ir.ProbeDefinition
haveSymdbFile bool
Expand Down
51 changes: 31 additions & 20 deletions pkg/dyninst/procsubscribe/remote_config_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package procsubscribe
import (
"maps"
"slices"
"sync"

"github.com/DataDog/datadog-agent/pkg/config/remote/data"
"github.com/DataDog/datadog-agent/pkg/dyninst/ir"
Expand All @@ -27,9 +28,12 @@ type effects interface {
}

type subscriberState struct {
streamEstablished bool
tracked map[string]*runtimeEntry
pidToRuntime map[int32]string
mu struct {
sync.Mutex
streamEstablished bool
tracked map[string]*runtimeEntry
pidToRuntime map[int32]string
}
}

type runtimeEntry struct {
Expand All @@ -39,17 +43,20 @@ type runtimeEntry struct {
symdbEnabled bool
}

func makeSubscriberState() subscriberState {
return subscriberState{
tracked: make(map[string]*runtimeEntry),
pidToRuntime: make(map[int32]string),
}
func newSubscriberState() *subscriberState {
s := &subscriberState{}
s.mu.streamEstablished = false
s.mu.tracked = make(map[string]*runtimeEntry)
s.mu.pidToRuntime = make(map[int32]string)
return s
}

func (s *subscriberState) onStreamEstablished(effects effects) {
s.streamEstablished = true
toTrack := make([]string, 0, len(s.tracked))
for _, entry := range s.tracked {
s.mu.Lock()
defer s.mu.Unlock()
s.mu.streamEstablished = true
toTrack := make([]string, 0, len(s.mu.tracked))
for _, entry := range s.mu.tracked {
toTrack = append(toTrack, entry.runtimeID)
}
slices.Sort(toTrack)
Expand All @@ -63,6 +70,8 @@ func (s *subscriberState) onScanUpdate(
removed []procscan.ProcessID,
effects effects,
) {
s.mu.Lock()
defer s.mu.Unlock()
if log.ShouldLog(log.TraceLvl) {
added := added
removed := removed
Expand All @@ -78,8 +87,8 @@ func (s *subscriberState) onScanUpdate(
continue
}
pid := process.ID{PID: int32(proc.PID)}
if _, ok := s.tracked[runtimeID]; !ok {
s.tracked[runtimeID] = &runtimeEntry{
if _, ok := s.mu.tracked[runtimeID]; !ok {
s.mu.tracked[runtimeID] = &runtimeEntry{
Info: process.Info{
ProcessID: pid,
Executable: proc.Executable,
Expand All @@ -94,25 +103,25 @@ func (s *subscriberState) onScanUpdate(
"process subscriber: discovered new runtime %s (pid=%d)",
runtimeID, pid.PID,
)
if s.streamEstablished {
if s.mu.streamEstablished {
effects.track(runtimeID)
}
}

s.pidToRuntime[pid.PID] = runtimeID
s.mu.pidToRuntime[pid.PID] = runtimeID
}

var removals []process.ID
for _, removedPID := range removed {
pid := int32(removedPID)
runtimeID, ok := s.pidToRuntime[pid]
runtimeID, ok := s.mu.pidToRuntime[pid]
if !ok {
continue
}
delete(s.pidToRuntime, pid)
delete(s.tracked, runtimeID)
delete(s.mu.pidToRuntime, pid)
delete(s.mu.tracked, runtimeID)
removals = append(removals, process.ID{PID: pid})
if s.streamEstablished {
if s.mu.streamEstablished {
effects.untrack(runtimeID)
}
if log.ShouldLog(log.TraceLvl) {
Expand All @@ -135,13 +144,15 @@ func (s *subscriberState) onStreamConfig(
if resp == nil || resp.Client == nil {
return
}
s.mu.Lock()
defer s.mu.Unlock()

tracer := resp.Client.GetClientTracer()
if tracer == nil {
return
}
runtimeID := tracer.GetRuntimeId()
entry, ok := s.tracked[runtimeID]
entry, ok := s.mu.tracked[runtimeID]
if !ok {
return
}
Expand Down
Loading