Skip to content
68 changes: 33 additions & 35 deletions go/apps/ctrl/services/build/backend/depot/create_build.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ const (
// Cache policy constants for Depot projects
defaultCacheKeepGB = 50
defaultCacheKeepDays = 14

// Buffer size for BuildKit status updates channel.
// If set too low, it will drop some of the buildStepLogs in big dockerfiles.
buildStatusChannelBuffer = 1000
)

// CreateBuild orchestrates the container image build process using Depot.
Expand Down Expand Up @@ -101,8 +105,7 @@ func (s *Depot) CreateBuild(
"error", err,
"depot_project_id", depotProjectID,
"unkey_project_id", unkeyProjectID)
return nil, connect.NewError(connect.CodeInternal,
fmt.Errorf("failed to create build: %w", err))
return nil, wrapBuildError(err, connect.CodeInternal, "failed to create build")
}

s.logger.Info("Depot build created",
Expand All @@ -126,8 +129,7 @@ func (s *Depot) CreateBuild(
"build_id", buildResp.ID,
"depot_project_id", depotProjectID,
"unkey_project_id", unkeyProjectID)
return nil, connect.NewError(connect.CodeInternal,
fmt.Errorf("failed to acquire machine: %w", buildErr))
return nil, wrapBuildError(buildErr, connect.CodeInternal, "failed to acquire machine")
}
//nolint: all
defer buildkit.Release()
Expand All @@ -144,8 +146,7 @@ func (s *Depot) CreateBuild(
"build_id", buildResp.ID,
"depot_project_id", depotProjectID,
"unkey_project_id", unkeyProjectID)
return nil, connect.NewError(connect.CodeInternal,
fmt.Errorf("failed to connect to buildkit: %w", buildErr))
return nil, wrapBuildError(buildErr, connect.CodeInternal, "failed to connect to buildkit")
}
defer buildkitClient.Close()

Expand All @@ -164,8 +165,8 @@ func (s *Depot) CreateBuild(
"build_id", buildResp.ID,
"unkey_project_id", unkeyProjectID)

buildStatusCh := make(chan *client.SolveStatus, 100)
go s.processBuildStatus(buildStatusCh, req.Msg.GetWorkspaceId(), unkeyProjectID, deploymentID)
buildStatusCh := make(chan *client.SolveStatus, buildStatusChannelBuffer)
go s.processBuildStatusLogs(buildStatusCh, req.Msg.GetWorkspaceId(), unkeyProjectID, deploymentID)

solverOptions := s.buildSolverOptions(platform, contextURL, dockerfilePath, imageName)
_, buildErr = buildkitClient.Solve(ctx, nil, solverOptions, buildStatusCh)
Expand All @@ -176,8 +177,7 @@ func (s *Depot) CreateBuild(
"build_id", buildResp.ID,
"depot_project_id", depotProjectID,
"unkey_project_id", unkeyProjectID)
return nil, connect.NewError(connect.CodeInternal,
fmt.Errorf("build failed: %w", buildErr))
return nil, wrapBuildError(buildErr, connect.CodeInternal, "build failed")
}

s.logger.Info("Build completed successfully")
Expand Down Expand Up @@ -234,7 +234,9 @@ func (s *Depot) buildSolverOptions(
// Create new Depot project if not found
// Store project mapping in database
func (s *Depot) getOrCreateDepotProject(ctx context.Context, unkeyProjectID string) (string, error) {
project, err := db.Query.FindProjectById(ctx, s.db.RO(), unkeyProjectID)
project, err := db.WithRetryContext(ctx, func() (db.FindProjectByIdRow, error) {
return db.Query.FindProjectById(ctx, s.db.RO(), unkeyProjectID)
})
if err != nil {
return "", fmt.Errorf("failed to query project: %w", err)
}
Expand Down Expand Up @@ -275,13 +277,16 @@ func (s *Depot) getOrCreateDepotProject(ctx context.Context, unkeyProjectID stri
depotProjectID := createResp.Msg.GetProject().GetProjectId()

now := time.Now().UnixMilli()
err = db.Query.UpdateProjectDepotID(ctx, s.db.RW(), db.UpdateProjectDepotIDParams{
DepotProjectID: sql.NullString{
String: depotProjectID,
Valid: true,
},
UpdatedAt: sql.NullInt64{Int64: now, Valid: true},
ID: unkeyProjectID,
_, err = db.WithRetryContext(ctx, func() (struct{}, error) {
err := db.Query.UpdateProjectDepotID(ctx, s.db.RW(), db.UpdateProjectDepotIDParams{
DepotProjectID: sql.NullString{
String: depotProjectID,
Valid: true,
},
UpdatedAt: sql.NullInt64{Int64: now, Valid: true},
ID: unkeyProjectID,
})
return struct{}{}, err
})
if err != nil {
return "", fmt.Errorf("failed to update depot_project_id: %w", err)
Expand All @@ -295,28 +300,33 @@ func (s *Depot) getOrCreateDepotProject(ctx context.Context, unkeyProjectID stri
return depotProjectID, nil
}

func (s *Depot) processBuildStatus(
func (s *Depot) processBuildStatusLogs(
statusCh <-chan *client.SolveStatus,
workspaceID, projectID, deploymentID string,
) {
completed := map[digest.Digest]bool{}
verticesWithLogs := map[digest.Digest]bool{}
completed := map[digest.Digest]bool{}

for status := range statusCh {
// Mark vertices that have logs
for _, log := range status.Logs {
verticesWithLogs[log.Vertex] = true
s.clickhouse.BufferBuildStepLog(schema.BuildStepLogV1{
WorkspaceID: workspaceID,
ProjectID: projectID,
DeploymentID: deploymentID,
StepID: log.Vertex.String(),
Time: log.Timestamp.UnixMilli(),
Message: string(log.Data),
})
}

// Process completed vertices
for _, vertex := range status.Vertexes {
if vertex == nil {
s.logger.Warn("vertex is nil")
continue
}
if vertex.Completed != nil && !completed[vertex.Digest] {
completed[vertex.Digest] = true

s.clickhouse.BufferBuildStep(schema.BuildStepV1{
Error: vertex.Error,
StartedAt: ptr.SafeDeref(vertex.Started).UnixMilli(),
Expand All @@ -331,17 +341,5 @@ func (s *Depot) processBuildStatus(
})
}
}

// Process logs
for _, log := range status.Logs {
s.clickhouse.BufferBuildStepLog(schema.BuildStepLogV1{
WorkspaceID: workspaceID,
ProjectID: projectID,
DeploymentID: deploymentID,
StepID: log.Vertex.String(),
Time: log.Timestamp.UnixMilli(),
Message: string(log.Data),
})
}
}
}
51 changes: 51 additions & 0 deletions go/apps/ctrl/services/build/backend/depot/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package depot

import (
"fmt"
"strings"

"connectrpc.com/connect"
restate "github.com/restatedev/sdk-go"
)

// These patterns indicate terminal errors that should NOT be retried
var terminalErrorPatterns = []string{
"exit code:", // Usually terminal errors thrown by upstream providers contains that.
"failed to solve", // This happens when it fails to fetch an image or build from source
"failed to compute cache key", // Broken dockerfile
"no such file or directory", // This happens when user passes wrong dokcerfile path
"internal error", // This is mostly thrown by depot. It means something is wrong on their end.
"permission denied", // This is on us we either pass wrong depot token or organization/project mismatch
"unauthenticated", // Wrong depot token
"unknown key: platforms", // Depot only allows "us-east-1" and "eu-central-1"
"dockerfile parse error on line",
}

// isTerminalBuildError returns true if the error is terminal and Restate should NOT retry.
// In some cases such as broken dockerfile, internal error, wrong dockerfile path, there is no reason to keep retrying.
// We could retry, but those would pollute the CH logs
func isTerminalBuildError(err error) bool {
if err == nil {
return false
}

errMsg := strings.ToLower(err.Error())

for _, pattern := range terminalErrorPatterns {
if strings.Contains(errMsg, pattern) {
return true
}
}

return false
}

func wrapBuildError(err error, code connect.Code, msg string) error {
wrapped := fmt.Errorf("%s: %w", msg, err)

if isTerminalBuildError(err) {
return restate.TerminalError(wrapped)
}

return connect.NewError(code, wrapped)
}
11 changes: 11 additions & 0 deletions go/apps/ctrl/workflows/deploy/deploy_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,17 @@ func (w *Workflow) Deploy(ctx restate.ObjectContext, req *hydrav1.DeployRequest)
return imageName, nil
}, restate.WithName("building docker image"))
if err != nil {
if restate.IsTerminalError(err) {
w.logger.Error("docker build failed with terminal error - not retrying",
"deployment_id", deployment.ID,
"error", err,
"reason", "error indicates a permanent failure (e.g. invalid Dockerfile, missing files, permission issues)")
return nil, fmt.Errorf("docker build failed permanently: %w", err)
}

w.logger.Error("docker build failed - will be retried by restate",
"deployment_id", deployment.ID,
"error", err)
return nil, fmt.Errorf("failed to build docker image: %w", err)
}

Expand Down
Loading