@@ -163,8 +163,35 @@ func (s *Depot) CreateBuild(
163163 "build_id" , buildResp .ID ,
164164 "unkey_project_id" , unkeyProjectID )
165165
166- //nolint: exhaustruct
167- solverOptions := client.SolveOpt {
166+ buildStatusCh := make (chan * client.SolveStatus , 100 )
167+ go s .processBuildStatus (buildStatusCh , req .Msg .GetWorkspaceId (), unkeyProjectID , deploymentID )
168+
169+ solverOptions := s .buildSolverOptions (platform , contextURL , dockerfilePath , imageName )
170+ _ , buildErr = buildkitClient .Solve (ctx , nil , solverOptions , buildStatusCh )
171+ if buildErr != nil {
172+ s .logger .Error ("Build failed" ,
173+ "error" , buildErr ,
174+ "image_name" , imageName ,
175+ "build_id" , buildResp .ID ,
176+ "depot_project_id" , depotProjectID ,
177+ "unkey_project_id" , unkeyProjectID )
178+ return nil , connect .NewError (connect .CodeInternal ,
179+ fmt .Errorf ("build failed: %w" , buildErr ))
180+ }
181+
182+ s .logger .Info ("Build completed successfully" )
183+
184+ return connect .NewResponse (& ctrlv1.CreateBuildResponse {
185+ ImageName : imageName ,
186+ BuildId : buildResp .ID ,
187+ DepotProjectId : depotProjectID ,
188+ }), nil
189+ }
190+
191+ func (s * Depot ) buildSolverOptions (
192+ platform , contextURL , dockerfilePath , imageName string ,
193+ ) client.SolveOpt {
194+ return client.SolveOpt {
168195 Frontend : "dockerfile.v0" ,
169196 FrontendAttrs : map [string ]string {
170197 "platform" : platform ,
@@ -196,49 +223,6 @@ func (s *Depot) CreateBuild(
196223 },
197224 },
198225 }
199-
200- buildStatusCh := make (chan * client.SolveStatus , 10 )
201- aggregator := NewBuildStepAggregator ()
202-
203- go func () {
204- for status := range buildStatusCh {
205- aggregator .ProcessStatus (
206- status ,
207- req .Msg .GetWorkspaceId (),
208- unkeyProjectID ,
209- deploymentID ,
210- s .clickhouse .BufferBuildStep ,
211- s .clickhouse .BufferBuildStepLog ,
212- )
213- }
214- }()
215-
216- _ , buildErr = buildkitClient .Solve (ctx , nil , solverOptions , buildStatusCh )
217- if buildErr != nil {
218- s .logger .Error ("Build failed" ,
219- "error" , buildErr ,
220- "image_name" , imageName ,
221- "build_id" , buildResp .ID ,
222- "depot_project_id" , depotProjectID ,
223- "unkey_project_id" , unkeyProjectID )
224- return nil , connect .NewError (connect .CodeInternal ,
225- fmt .Errorf ("build failed: %w" , buildErr ))
226- }
227-
228- s .logger .Info ("Build completed successfully" ,
229- "image_name" , imageName ,
230- "build_id" , buildResp .ID ,
231- "build_token" , buildResp .Token ,
232- "depot_project_id" , depotProjectID ,
233- "platform" , platform ,
234- "architecture" , architecture ,
235- "unkey_project_id" , unkeyProjectID )
236-
237- return connect .NewResponse (& ctrlv1.CreateBuildResponse {
238- ImageName : imageName ,
239- BuildId : buildResp .ID ,
240- DepotProjectId : depotProjectID ,
241- }), nil
242226}
243227
244228// getOrCreateDepotProject retrieves or creates a Depot project for the given Unkey project.
@@ -310,15 +294,32 @@ func (s *Depot) getOrCreateDepotProject(ctx context.Context, unkeyProjectID stri
310294 return depotProjectID , nil
311295}
312296
297+ func (s * Depot ) processBuildStatus (
298+ statusCh <- chan * client.SolveStatus ,
299+ workspaceID , projectID , deploymentID string ,
300+ ) {
301+ aggregator := NewBuildStepAggregator ()
302+ for status := range statusCh {
303+ aggregator .ProcessStatus (
304+ status ,
305+ workspaceID ,
306+ projectID ,
307+ deploymentID ,
308+ s .clickhouse .BufferBuildStep ,
309+ s .clickhouse .BufferBuildStepLog ,
310+ )
311+ }
312+ }
313+
313314type BuildStepAggregator struct {
314- completed map [digest.Digest ]bool
315- logsSent map [digest.Digest ]int
315+ completed map [digest.Digest ]bool
316+ verticesWithLogs map [digest.Digest ]bool
316317}
317318
318319func NewBuildStepAggregator () * BuildStepAggregator {
319320 return & BuildStepAggregator {
320- completed : make (map [digest.Digest ]bool ),
321- logsSent : make (map [digest.Digest ]int ),
321+ completed : make (map [digest.Digest ]bool ),
322+ verticesWithLogs : make (map [digest.Digest ]bool ),
322323 }
323324}
324325
@@ -328,7 +329,10 @@ func (a *BuildStepAggregator) ProcessStatus(
328329 cbStep func (schema.BuildStepV1 ),
329330 cbLog func (schema.BuildStepLogV1 ),
330331) {
331- // Process completed steps
332+ for _ , log := range status .Logs {
333+ a .verticesWithLogs [log .Vertex ] = true
334+ }
335+
332336 for _ , vertex := range status .Vertexes {
333337 if vertex .Completed != nil && ! a .completed [vertex .Digest ] {
334338 a .completed [vertex .Digest ] = true
@@ -343,27 +347,19 @@ func (a *BuildStepAggregator) ProcessStatus(
343347 StepID : string (vertex .Digest ),
344348 Name : vertex .Name ,
345349 Cache : vertex .Cached ,
346- HasLogs : len ( status . Logs ) > 0 , // will be true if any logs exist
350+ HasLogs : a . verticesWithLogs [ vertex . Digest ],
347351 })
348352 }
349353 }
350354
351- // Send all new logs
352355 for _ , log := range status .Logs {
353- // Track which logs we've already sent for this vertex
354- sentCount := a .logsSent [log .Vertex ]
355- a .logsSent [log .Vertex ]++
356-
357- // Only send if this is a new log we haven't seen
358- if sentCount < a .logsSent [log .Vertex ] {
359- cbLog (schema.BuildStepLogV1 {
360- WorkspaceID : workspaceID ,
361- ProjectID : projectID ,
362- DeploymentID : deploymentID ,
363- StepID : string (log .Vertex ),
364- Time : log .Timestamp .UnixMilli (),
365- Message : string (log .Data ),
366- })
367- }
356+ cbLog (schema.BuildStepLogV1 {
357+ WorkspaceID : workspaceID ,
358+ ProjectID : projectID ,
359+ DeploymentID : deploymentID ,
360+ StepID : string (log .Vertex ),
361+ Time : log .Timestamp .UnixMilli (),
362+ Message : string (log .Data ),
363+ })
368364 }
369365}
0 commit comments