Skip to content

Commit 93c2390

Browse files
authored
Merge pull request #3178 from actiontech/support_pprof
feat(pprof): add pprof support for profiling and diagnostics
2 parents 8290f73 + 4202f23 commit 93c2390

File tree

4 files changed

+215
-15
lines changed

4 files changed

+215
-15
lines changed

sqle/config/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ type SeviceOpts struct {
5353
PluginPath string `yaml:"plugin_path"`
5454
Database Database `yaml:"database"`
5555
PluginConfig []PluginConfig `yaml:"plugin_config"`
56+
PprofPort int `yaml:"pprof_port"` // pprof 独立服务器端口,0 表示禁用
5657
}
5758

5859
type Database struct {

sqle/pprof/collector.go

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
package pprof
2+
3+
import (
4+
"fmt"
5+
"os"
6+
"path/filepath"
7+
"runtime"
8+
"runtime/pprof"
9+
"time"
10+
11+
"github.com/actiontech/sqle/sqle/log"
12+
)
13+
14+
const (
15+
pprofDirName = "pprof"
16+
)
17+
18+
// CollectHeapProfile 采集堆内存 profile 并保存到文件
19+
func CollectHeapProfile(logPath string) error {
20+
return collectProfile("heap", logPath, func(f *os.File) error {
21+
runtime.GC()
22+
return pprof.WriteHeapProfile(f)
23+
})
24+
}
25+
26+
// CollectGoroutineProfile 采集 goroutine profile 并保存到文件
27+
func CollectGoroutineProfile(logPath string) error {
28+
return collectProfile("goroutine", logPath, func(f *os.File) error {
29+
return pprof.Lookup("goroutine").WriteTo(f, 0)
30+
})
31+
}
32+
33+
// CollectAllocsProfile 采集内存分配 profile 并保存到文件
34+
func CollectAllocsProfile(logPath string) error {
35+
return collectProfile("allocs", logPath, func(f *os.File) error {
36+
return pprof.Lookup("allocs").WriteTo(f, 0)
37+
})
38+
}
39+
40+
// CollectBlockProfile 采集阻塞 profile 并保存到文件
41+
func CollectBlockProfile(logPath string) error {
42+
return collectProfile("block", logPath, func(f *os.File) error {
43+
return pprof.Lookup("block").WriteTo(f, 0)
44+
})
45+
}
46+
47+
// CollectMutexProfile 采集互斥锁 profile 并保存到文件
48+
func CollectMutexProfile(logPath string) error {
49+
return collectProfile("mutex", logPath, func(f *os.File) error {
50+
return pprof.Lookup("mutex").WriteTo(f, 0)
51+
})
52+
}
53+
54+
// CollectCPUProfile 采集 CPU profile 并保存到文件(持续指定秒数)
55+
func CollectCPUProfile(logPath string, duration time.Duration) error {
56+
return collectProfile("cpu", logPath, func(f *os.File) error {
57+
if err := pprof.StartCPUProfile(f); err != nil {
58+
return err
59+
}
60+
time.Sleep(duration)
61+
pprof.StopCPUProfile()
62+
return nil
63+
})
64+
}
65+
66+
// collectProfile 通用的 profile 采集函数
67+
func collectProfile(profileType, logPath string, writeFunc func(*os.File) error) error {
68+
pprofDir := filepath.Join(logPath, pprofDirName)
69+
if err := os.MkdirAll(pprofDir, 0755); err != nil {
70+
return fmt.Errorf("failed to create pprof directory: %v", err)
71+
}
72+
73+
timestamp := time.Now().Format("20060102_150405")
74+
filename := fmt.Sprintf("%s_%s.prof", profileType, timestamp)
75+
filePath := filepath.Join(pprofDir, filename)
76+
77+
f, err := os.Create(filePath)
78+
if err != nil {
79+
return fmt.Errorf("failed to create profile file: %v", err)
80+
}
81+
defer f.Close()
82+
83+
if err := writeFunc(f); err != nil {
84+
return fmt.Errorf("failed to write profile: %v", err)
85+
}
86+
87+
log.Logger().Infof("pprof %s profile saved to: %s", profileType, filePath)
88+
return nil
89+
}
90+
91+
// CollectAllProfiles 采集所有类型的 profile(除了 CPU,因为 CPU 需要持续时间)
92+
func CollectAllProfiles(logPath string) error {
93+
profiles := []struct {
94+
name string
95+
fn func(string) error
96+
}{
97+
{"heap", CollectHeapProfile},
98+
{"goroutine", CollectGoroutineProfile},
99+
{"allocs", CollectAllocsProfile},
100+
{"block", CollectBlockProfile},
101+
{"mutex", CollectMutexProfile},
102+
}
103+
104+
var lastErr error
105+
for _, p := range profiles {
106+
if err := p.fn(logPath); err != nil {
107+
log.Logger().Errorf("failed to collect %s profile: %v", p.name, err)
108+
lastErr = err
109+
}
110+
}
111+
112+
return lastErr
113+
}
114+
115+
// StartPeriodicCollection 启动定期自动采集 pprof profile
116+
// interval: 采集间隔时间,如果为 0 则不启用定期采集
117+
func StartPeriodicCollection(logPath string, interval time.Duration) {
118+
if interval <= 0 {
119+
return
120+
}
121+
122+
go func() {
123+
ticker := time.NewTicker(interval)
124+
defer ticker.Stop()
125+
126+
log.Logger().Infof("Starting periodic pprof collection, interval: %v", interval)
127+
128+
for range ticker.C {
129+
log.Logger().Infof("Periodic pprof collection triggered")
130+
if err := CollectAllProfiles(logPath); err != nil {
131+
log.Logger().Errorf("Periodic pprof collection failed: %v", err)
132+
}
133+
}
134+
}()
135+
}

sqle/pprof/server.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package pprof
2+
3+
import (
4+
"fmt"
5+
"net/http"
6+
_ "net/http/pprof"
7+
8+
"github.com/actiontech/sqle/sqle/log"
9+
)
10+
11+
// StartServer 启动独立的 pprof HTTP 服务器
12+
// port: pprof 服务器监听端口,如果为 0 则不启动
13+
func StartServer(port int) error {
14+
if port <= 0 {
15+
log.Logger().Infof("pprof server disabled (port: %d)", port)
16+
return nil
17+
}
18+
19+
address := fmt.Sprintf("0.0.0.0:%d", port)
20+
log.Logger().Infof("starting pprof server on %s", address)
21+
22+
// pprof 包在导入时会自动注册路由到 http.DefaultServeMux
23+
// 只需要启动一个 HTTP 服务器即可
24+
if err := http.ListenAndServe(address, nil); err != nil {
25+
return fmt.Errorf("pprof server failed: %v", err)
26+
}
27+
28+
return nil
29+
}
30+
31+
// StartServerAsync 异步启动独立的 pprof HTTP 服务器
32+
func StartServerAsync(port int) {
33+
if port <= 0 {
34+
log.Logger().Infof("pprof server disabled (port: %d)", port)
35+
return
36+
}
37+
38+
go func() {
39+
if err := StartServer(port); err != nil {
40+
log.Logger().Errorf("pprof server error: %v", err)
41+
}
42+
}()
43+
}

sqle/sqled.go

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/actiontech/dms/pkg/dms-common/pkg/http"
1212
"github.com/actiontech/sqle/sqle/api"
1313
"github.com/actiontech/sqle/sqle/dms"
14+
"github.com/actiontech/sqle/sqle/pprof"
1415
knowledge_base "github.com/actiontech/sqle/sqle/server/knowledge_base"
1516
optimization "github.com/actiontech/sqle/sqle/server/optimization"
1617

@@ -58,7 +59,7 @@ func Run(options *config.SqleOptions) error {
5859
// nofify singal
5960
exitChan := make(chan struct{})
6061
net := &gracenet.Net{}
61-
go NotifySignal(exitChan, net)
62+
go NotifySignal(exitChan, net, sqleCnf.LogPath)
6263

6364
// init plugins
6465
{
@@ -156,6 +157,9 @@ func Run(options *config.SqleOptions) error {
156157

157158
go api.StartApi(net, exitChan, options, sqleSwaggerYaml)
158159

160+
// start independent pprof server on separate port
161+
pprof.StartServerAsync(sqleCnf.PprofPort)
162+
159163
// Wait for exit signal from NotifySignal goroutine
160164
<-exitChan
161165
log.Logger().Infoln("sqled server will exit")
@@ -175,22 +179,39 @@ func validateConfig(options *config.SqleOptions) error {
175179
return nil
176180
}
177181

178-
func NotifySignal(exitChan chan struct{}, net *gracenet.Net) {
182+
func NotifySignal(exitChan chan struct{}, net *gracenet.Net, logPath string) {
179183
killChan := make(chan os.Signal, 1)
180184
// os.Kill is like kill -9 which kills a process immediately, can't be caught
181-
signal.Notify(killChan, os.Interrupt, syscall.SIGTERM, syscall.SIGUSR2 /*graceful-shutdown*/)
182-
sig := <-killChan
183-
switch sig {
184-
case syscall.SIGUSR2:
185-
if pid, err := net.StartProcess(); nil != err {
186-
log.Logger().Infof("Graceful restarted by signal SIGUSR2, but failed: %v", err)
187-
} else {
188-
log.Logger().Infof("Graceful restarted, new pid is %v", pid)
185+
// SIGUSR1: trigger pprof collection
186+
// SIGUSR2: graceful restart
187+
signal.Notify(killChan, os.Interrupt, syscall.SIGTERM, syscall.SIGUSR1, syscall.SIGUSR2)
188+
189+
for {
190+
sig := <-killChan
191+
switch sig {
192+
case syscall.SIGUSR1:
193+
// Trigger pprof collection
194+
log.Logger().Infof("Received SIGUSR1, collecting pprof profiles...")
195+
if err := pprof.CollectAllProfiles(logPath); err != nil {
196+
log.Logger().Errorf("Failed to collect pprof profiles: %v", err)
197+
} else {
198+
log.Logger().Infof("pprof profiles collected successfully")
199+
}
200+
// Continue running after collecting profiles
201+
continue
202+
case syscall.SIGUSR2:
203+
if pid, err := net.StartProcess(); nil != err {
204+
log.Logger().Infof("Graceful restarted by signal SIGUSR2, but failed: %v", err)
205+
} else {
206+
log.Logger().Infof("Graceful restarted, new pid is %v", pid)
207+
}
208+
log.Logger().Infof("old sqled exit")
209+
close(exitChan)
210+
return
211+
default:
212+
log.Logger().Infof("Exit by signal %v", sig)
213+
close(exitChan)
214+
return
189215
}
190-
log.Logger().Infof("old sqled exit")
191-
default:
192-
log.Logger().Infof("Exit by signal %v", sig)
193216
}
194-
195-
close(exitChan)
196217
}

0 commit comments

Comments
 (0)