Skip to content

Commit 4561731

Browse files
Merge pull request #3152 from actiontech/task_re-execute
Task re execute
2 parents d513df5 + 29e7c64 commit 4561731

File tree

6 files changed

+243
-11
lines changed

6 files changed

+243
-11
lines changed

sqle/api/controller/v1/workflow.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1367,9 +1367,101 @@ func ReExecuteTaskOnWorkflowV1(c echo.Context) error {
13671367
if err := controller.BindAndValidateReq(c, req); err != nil {
13681368
return controller.JSONBaseErrorReq(c, err)
13691369
}
1370+
projectUid, err := dms.GetProjectUIDByName(context.TODO(), c.Param("project_name"), true)
1371+
if err != nil {
1372+
return controller.JSONBaseErrorReq(c, err)
1373+
}
1374+
workflowId := c.Param("workflow_id")
1375+
taskId := c.Param("task_id")
1376+
reExecSqlIds := req.ExecSqlIds
1377+
1378+
s := model.GetStorage()
1379+
workflow, err := dms.GetWorkflowDetailByWorkflowId(projectUid, workflowId, s.GetWorkflowDetailWithoutInstancesByWorkflowID)
1380+
if err != nil {
1381+
return controller.JSONBaseErrorReq(c, err)
1382+
}
1383+
1384+
task, exist, err := s.GetTaskDetailById(taskId)
1385+
if err != nil {
1386+
return controller.JSONBaseErrorReq(c, err)
1387+
}
1388+
if !exist {
1389+
return controller.JSONBaseErrorReq(c, fmt.Errorf("task is not exist"))
1390+
}
1391+
1392+
user, err := controller.GetCurrentUser(c, dms.GetUser)
1393+
if err != nil {
1394+
return controller.JSONBaseErrorReq(c, err)
1395+
}
1396+
1397+
if err := PrepareForTaskReExecution(c, projectUid, workflow, user, task, reExecSqlIds); err != nil {
1398+
return controller.JSONBaseErrorReq(c, err)
1399+
}
1400+
1401+
err = server.ReExecuteTaskSQLs(workflow, task, reExecSqlIds, user)
1402+
if err != nil {
1403+
return controller.JSONBaseErrorReq(c, err)
1404+
}
1405+
13701406
return c.JSON(http.StatusOK, controller.NewBaseReq(nil))
13711407
}
13721408

1409+
func PrepareForTaskReExecution(c echo.Context, projectID string, workflow *model.Workflow, user *model.User, task *model.Task, reExecSqlIds []uint) error {
1410+
// 只有上线失败的工单可以重新上线sql
1411+
if workflow.Record.Status != model.WorkflowStatusExecFailed {
1412+
return errors.New(errors.DataInvalid, e.New("workflow status is not exec failed"))
1413+
}
1414+
1415+
if task.Status != model.TaskStatusExecuteFailed {
1416+
return errors.New(errors.DataInvalid, e.New("task status is not execute failed"))
1417+
}
1418+
1419+
err := CheckCurrentUserCanOperateTasks(c, projectID, workflow, []dmsV1.OpPermissionType{dmsV1.OpPermissionTypeExecuteWorkflow}, []uint{task.ID})
1420+
if err != nil {
1421+
return err
1422+
}
1423+
1424+
for _, record := range workflow.Record.InstanceRecords {
1425+
if record.TaskId != task.ID {
1426+
continue
1427+
}
1428+
1429+
for _, u := range strings.Split(record.ExecutionAssignees, ",") {
1430+
if u == user.GetIDStr() {
1431+
goto CheckReExecSqlIds
1432+
}
1433+
}
1434+
}
1435+
1436+
return e.New("you are not allow to execute the task")
1437+
1438+
CheckReExecSqlIds:
1439+
// 校验reExecSqlIds对应的SQL状态是否都为SQLExecuteStatusFailed
1440+
if len(reExecSqlIds) == 0 {
1441+
return errors.New(errors.DataInvalid, e.New("re-execute sql ids cannot be empty"))
1442+
}
1443+
1444+
// 创建一个map用于快速查找ExecuteSQLs中的SQL
1445+
execSqlMap := make(map[uint]*model.ExecuteSQL)
1446+
for _, execSql := range task.ExecuteSQLs {
1447+
execSqlMap[execSql.ID] = execSql
1448+
}
1449+
1450+
// 检查每个reExecSqlId
1451+
for _, sqlId := range reExecSqlIds {
1452+
execSql, exists := execSqlMap[sqlId]
1453+
if !exists {
1454+
return errors.New(errors.DataInvalid, fmt.Errorf("execute sql id %d not found in task", sqlId))
1455+
}
1456+
1457+
if execSql.ExecStatus != model.SQLExecuteStatusFailed && execSql.ExecStatus != model.SQLExecuteStatusInitialized {
1458+
return errors.New(errors.DataInvalid, fmt.Errorf("execute sql id %d status is %s, only failed or initialized sql can be re-executed", sqlId, execSql.ExecStatus))
1459+
}
1460+
}
1461+
1462+
return nil
1463+
}
1464+
13731465
type GetWorkflowResV1 struct {
13741466
controller.BaseRes
13751467
Data *WorkflowResV1 `json:"data"`

sqle/model/task.go

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -411,7 +411,7 @@ func (t *Task) HasDoingAudit() bool {
411411
func (t *Task) HasDoingExecute() bool {
412412
if t.ExecuteSQLs != nil {
413413
for _, commitSQL := range t.ExecuteSQLs {
414-
if commitSQL.ExecStatus != SQLExecuteStatusInitialized {
414+
if commitSQL.ExecStatus != SQLExecuteStatusInitialized && commitSQL.ExecStatus != SQLExecuteStatusFailed {
415415
return true
416416
}
417417
}
@@ -490,6 +490,38 @@ func (s *Storage) GetTaskDetailById(taskId string) (*Task, bool, error) {
490490
return task, true, errors.New(errors.ConnectStorageError, err)
491491
}
492492

493+
func (s *Storage) GetTaskDetailByIdWithExecSqlIds(taskId string, execSqlIds []uint) (*Task, bool, error) {
494+
task := &Task{}
495+
496+
db := s.db.Where("id = ?", taskId).
497+
Preload("RuleTemplate").
498+
Preload("RollbackSQLs")
499+
500+
if len(execSqlIds) > 0 {
501+
// 重新执行上线,获取指定需要执行的sql
502+
db = db.Preload("ExecuteSQLs", "id IN (?)", execSqlIds)
503+
} else {
504+
// 未指定则加载所有待执行sql
505+
db = db.Preload("ExecuteSQLs")
506+
}
507+
508+
err := db.First(task).Error
509+
510+
if err == gorm.ErrRecordNotFound {
511+
return nil, false, nil
512+
}
513+
return task, true, errors.New(errors.ConnectStorageError, err)
514+
}
515+
516+
func (s *Storage) GetExecSqlsByTaskIdAndStatus(taskId uint, status []string) ([]*ExecuteSQL, error) {
517+
executeSQLs := []*ExecuteSQL{}
518+
err := s.db.Where("task_id = ? and exec_status IN (?)", taskId, status).Find(&executeSQLs).Error
519+
if err != nil {
520+
return nil, errors.New(errors.ConnectStorageError, err)
521+
}
522+
return executeSQLs, nil
523+
}
524+
493525
func (s *Storage) GetTaskExecuteSQLContent(taskId string) ([]byte, error) {
494526
rows, err := s.db.Model(&ExecuteSQL{}).Select("content").
495527
Where("task_id = ?", taskId).Rows()

sqle/model/workflow.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -773,6 +773,16 @@ func (s *Storage) UpdateWorkflowExecInstanceRecord(w *Workflow, operateStep *Wor
773773
})
774774
}
775775

776+
// UpdateWorkflowExecInstanceRecordForReExecute, 用于重新执行SQL时更新上线状态和执行人
777+
func (s *Storage) UpdateWorkflowExecInstanceRecordForReExecute(w *Workflow, needExecInstanceRecords []*WorkflowInstanceRecord) error {
778+
return s.Tx(func(tx *gorm.DB) error {
779+
if err := updateWorkflowStatus(tx, w); err != nil {
780+
return err
781+
}
782+
return updateWorkflowInstanceRecordForReExecute(tx, needExecInstanceRecords)
783+
})
784+
}
785+
776786
func updateWorkflowStatus(tx *gorm.DB, w *Workflow) error {
777787
db := tx.Exec("UPDATE workflow_records SET status = ?, current_workflow_step_id = ? WHERE id = ?",
778788
w.Record.Status, w.Record.CurrentWorkflowStepId, w.Record.ID)
@@ -810,6 +820,17 @@ func updateWorkflowInstanceRecord(tx *gorm.DB, needExecInstanceRecords []*Workfl
810820
return nil
811821
}
812822

823+
func updateWorkflowInstanceRecordForReExecute(tx *gorm.DB, needExecInstanceRecords []*WorkflowInstanceRecord) error {
824+
for _, inst := range needExecInstanceRecords {
825+
db := tx.Exec("UPDATE workflow_instance_records SET is_sql_executed = ?, execution_user_id = ? WHERE id = ? AND is_sql_executed = 0 AND execution_user_id = 0",
826+
inst.IsSQLExecuted, inst.ExecutionUserId, inst.ID)
827+
if db.Error != nil {
828+
return db.Error
829+
}
830+
}
831+
return nil
832+
}
833+
813834
func updateWorkflowInstanceRecordById(tx *gorm.DB, needExecInstanceRecords []*WorkflowInstanceRecord) error {
814835
for _, inst := range needExecInstanceRecords {
815836
db := tx.Exec("UPDATE workflow_instance_records SET is_sql_executed = ?, execution_user_id = ? WHERE id = ?",

sqle/server/sqled.go

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func (s *Sqled) HasTask(taskId string) bool {
6262

6363
// addTask receive taskId and action type, using taskId and typ to create an action;
6464
// action will be validated, and sent to Sqled.queue.
65-
func (s *Sqled) addTask(projectId string, taskId string, typ int) (*action, error) {
65+
func (s *Sqled) addTask(projectId string, taskId string, typ int, execSqlIds []uint) (*action, error) {
6666
var err error
6767
var p driver.Plugin
6868
var rules []*model.Rule
@@ -87,7 +87,7 @@ func (s *Sqled) addTask(projectId string, taskId string, typ int) (*action, erro
8787
return action, errors.New(errors.TaskRunning, fmt.Errorf("task is running"))
8888
}
8989

90-
task, exist, err := st.GetTaskDetailById(taskId)
90+
task, exist, err := st.GetTaskDetailByIdWithExecSqlIds(taskId, execSqlIds)
9191
if err != nil {
9292
goto Error
9393
}
@@ -140,12 +140,21 @@ Error:
140140
}
141141

142142
func (s *Sqled) AddTask(projectId string, taskId string, typ int) error {
143-
_, err := s.addTask(projectId, taskId, typ)
143+
_, err := s.addTask(projectId, taskId, typ, nil)
144144
return err
145145
}
146146

147147
func (s *Sqled) AddTaskWaitResult(projectId string, taskId string, typ int) (*model.Task, error) {
148-
action, err := s.addTask(projectId, taskId, typ)
148+
action, err := s.addTask(projectId, taskId, typ, nil)
149+
if err != nil {
150+
return nil, err
151+
}
152+
<-action.done
153+
return action.task, action.err
154+
}
155+
156+
func (s *Sqled) AddTaskWaitResultWithSQLIds(projectId string, taskId string, execSqlIds []uint, typ int) (*model.Task, error) {
157+
action, err := s.addTask(projectId, taskId, typ, execSqlIds)
149158
if err != nil {
150159
return nil, err
151160
}
@@ -390,12 +399,13 @@ func (a *action) execute() (err error) {
390399
taskStatus = model.TaskStatusExecuteSucceeded
391400
}
392401
// update task status by sql
393-
for _, sql := range task.ExecuteSQLs {
394-
if sql.ExecStatus == model.SQLExecuteStatusFailed ||
395-
sql.ExecStatus == model.SQLExecuteStatusTerminateSucc {
396-
taskStatus = model.TaskStatusExecuteFailed
397-
break
398-
}
402+
// 验证task下所有的sql是否全部成功(工单中允许重新上线部分sql,所以需要验证全部sql是否成功)
403+
failedSqls, queryErr := st.GetExecSqlsByTaskIdAndStatus(task.ID, []string{model.SQLExecuteStatusFailed, model.SQLExecuteStatusTerminateSucc})
404+
if queryErr != nil {
405+
return queryErr
406+
}
407+
if len(failedSqls) > 0 {
408+
taskStatus = model.TaskStatusExecuteFailed
399409
}
400410

401411
case terminationErr := <-terminateErrChan:

sqle/server/sqled_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,8 @@ func Test_action_execute(t *testing.T) {
331331
mockDB, mock, err := sqlmock.New()
332332
assert.NoError(t, err)
333333
mock.ExpectQuery("SELECT VERSION()").WillReturnRows(sqlmock.NewRows([]string{"VERSION()"}).AddRow("5.7"))
334+
mock.ExpectQuery("SELECT \\* FROM `execute_sql_detail`").
335+
WillReturnRows(sqlmock.NewRows([]string{"id", "task_id", "exec_status"}))
334336
model.InitMockStorage(mockDB)
335337
a := getAction(tt.sqls, ActionTypeExecute, d)
336338
if err := a.execute(); (err != nil) != tt.wantErr {

sqle/server/workflow.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package server
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"strconv"
7+
"sync"
8+
9+
"github.com/actiontech/sqle/sqle/common"
10+
"github.com/actiontech/sqle/sqle/dms"
11+
"github.com/actiontech/sqle/sqle/errors"
12+
"github.com/actiontech/sqle/sqle/log"
13+
"github.com/actiontech/sqle/sqle/model"
14+
"github.com/actiontech/sqle/sqle/notification"
15+
)
16+
17+
func ReExecuteTaskSQLs(workflow *model.Workflow, task *model.Task, execSqlIds []uint, user *model.User) error {
18+
s := model.GetStorage()
19+
l := log.NewEntry()
20+
21+
instance, exist, err := dms.GetInstancesById(context.Background(), fmt.Sprintf("%d", task.InstanceId))
22+
if err != nil {
23+
return err
24+
}
25+
if !exist {
26+
return errors.New(errors.DataNotExist, fmt.Errorf("instance is not exist. instanceId=%v", task.InstanceId))
27+
}
28+
task.Instance = instance
29+
if task.Instance == nil {
30+
return errors.New(errors.DataNotExist, fmt.Errorf("instance is not exist"))
31+
}
32+
33+
// if instance is not connectable, exec sql must be failed;
34+
// commit action unable to retry, so don't to exec it.
35+
if err = common.CheckInstanceIsConnectable(task.Instance); err != nil {
36+
return errors.New(errors.ConnectRemoteDatabaseError, err)
37+
}
38+
39+
needExecTaskRecords := make([]*model.WorkflowInstanceRecord, 0, len(workflow.Record.InstanceRecords))
40+
// update workflow
41+
for _, inst := range workflow.Record.InstanceRecords {
42+
if inst.TaskId != task.ID {
43+
continue
44+
}
45+
inst.IsSQLExecuted = true
46+
inst.ExecutionUserId = user.GetIDStr()
47+
needExecTaskRecords = append(needExecTaskRecords, inst)
48+
}
49+
50+
workflow.Record.Status = model.WorkflowStatusExecuting
51+
workflow.Record.CurrentWorkflowStepId = 0
52+
53+
err = s.UpdateWorkflowExecInstanceRecordForReExecute(workflow, needExecTaskRecords)
54+
if err != nil {
55+
return err
56+
}
57+
var lock sync.Mutex
58+
go func() {
59+
sqledServer := GetSqled()
60+
task, err := sqledServer.AddTaskWaitResultWithSQLIds(string(workflow.ProjectId), strconv.Itoa(int(task.ID)), execSqlIds, ActionTypeExecute)
61+
{
62+
lock.Lock()
63+
updateStatus(s, workflow, l, nil)
64+
lock.Unlock()
65+
}
66+
if err != nil || task.Status == model.TaskStatusExecuteFailed {
67+
go notification.NotifyWorkflow(string(workflow.ProjectId), workflow.WorkflowId, notification.WorkflowNotifyTypeExecuteFail)
68+
} else {
69+
go notification.NotifyWorkflow(string(workflow.ProjectId), workflow.WorkflowId, notification.WorkflowNotifyTypeExecuteSuccess)
70+
}
71+
72+
}()
73+
74+
return nil
75+
}

0 commit comments

Comments
 (0)