Skip to content

Commit ac3b1d9

Browse files
authored
feat: jira epic collection / extraction support incremental mode for better performance (#8414)
1 parent 8bc1f1a commit ac3b1d9

File tree

5 files changed

+45
-20
lines changed

5 files changed

+45
-20
lines changed

backend/core/config/config_viper.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ func setDefaultValue(v *viper.Viper) {
106106
v.SetDefault("SWAGGER_DOCS_DIR", "resources/swagger")
107107
v.SetDefault("RESUME_PIPELINES", true)
108108
v.SetDefault("CORS_ALLOW_ORIGIN", "*")
109+
v.SetDefault("CONSUME_PIPELINES", true)
109110
}
110111

111112
func init() {

backend/plugins/jira/tasks/epic_collector.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,9 @@ func CollectEpics(taskCtx plugin.SubTaskContext) errors.Error {
7878
logger.Info("got user's timezone: %v", loc.String())
7979
}
8080
jql := "ORDER BY created ASC"
81+
if apiCollector.GetSince() != nil {
82+
jql = buildJQL(*apiCollector.GetSince(), loc)
83+
}
8184

8285
err = apiCollector.InitCollector(api.ApiCollectorArgs{
8386
ApiClient: data.ApiClient,
@@ -90,7 +93,7 @@ func CollectEpics(taskCtx plugin.SubTaskContext) errors.Error {
9093
for _, e := range reqData.Input.([]interface{}) {
9194
epicKeys = append(epicKeys, *e.(*string))
9295
}
93-
localJQL := fmt.Sprintf("issue in (%s) %s", strings.Join(epicKeys, ","), jql)
96+
localJQL := fmt.Sprintf("issue in (%s) and %s", strings.Join(epicKeys, ","), jql)
9497
query.Set("jql", localJQL)
9598
query.Set("startAt", fmt.Sprintf("%v", reqData.Pager.Skip))
9699
query.Set("maxResults", fmt.Sprintf("%v", reqData.Pager.Size))
@@ -130,12 +133,12 @@ func GetEpicKeysIterator(db dal.Dal, data *JiraTaskData, batchSize int) (api.Ite
130133
dal.Join(`
131134
LEFT JOIN _tool_jira_board_issues bi ON (
132135
i.connection_id = bi.connection_id
133-
AND
136+
AND
134137
i.issue_id = bi.issue_id
135138
)`),
136139
dal.Where(`
137140
i.connection_id = ?
138-
AND
141+
AND
139142
bi.board_id = ?
140143
AND
141144
i.epic_key != ''

backend/plugins/jira/tasks/epic_extractor.go

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@ limitations under the License.
1818
package tasks
1919

2020
import (
21-
"encoding/json"
22-
2321
"github.com/apache/incubator-devlake/core/dal"
2422
"github.com/apache/incubator-devlake/core/errors"
2523
"github.com/apache/incubator-devlake/core/log"
@@ -39,12 +37,12 @@ var ExtractEpicsMeta = plugin.SubTaskMeta{
3937
DomainTypes: []string{plugin.DOMAIN_TYPE_TICKET, plugin.DOMAIN_TYPE_CROSS},
4038
}
4139

42-
func ExtractEpics(taskCtx plugin.SubTaskContext) errors.Error {
43-
data := taskCtx.GetData().(*JiraTaskData)
44-
db := taskCtx.GetDal()
40+
func ExtractEpics(subtaskCtx plugin.SubTaskContext) errors.Error {
41+
data := subtaskCtx.GetData().(*JiraTaskData)
42+
db := subtaskCtx.GetDal()
4543
connectionId := data.Options.ConnectionId
4644
boardId := data.Options.BoardId
47-
logger := taskCtx.GetLogger()
45+
logger := subtaskCtx.GetLogger()
4846
logger.Info("extract external epic Issues, connection_id=%d, board_id=%d", connectionId, boardId)
4947
mappings, err := getTypeMappings(data, db)
5048
if err != nil {
@@ -54,21 +52,40 @@ func ExtractEpics(taskCtx plugin.SubTaskContext) errors.Error {
5452
if err != nil {
5553
return err
5654
}
57-
extractor, err := api.NewApiExtractor(api.ApiExtractorArgs{
58-
RawDataSubTaskArgs: api.RawDataSubTaskArgs{
59-
Ctx: taskCtx,
55+
56+
extractor, err := api.NewStatefulApiExtractor(&api.StatefulApiExtractorArgs[apiv2models.Issue]{
57+
SubtaskCommonArgs: &api.SubtaskCommonArgs{
58+
SubTaskContext: subtaskCtx,
59+
Table: RAW_EPIC_TABLE,
6060
Params: JiraApiParams{
6161
ConnectionId: data.Options.ConnectionId,
6262
BoardId: data.Options.BoardId,
6363
},
64-
Table: RAW_EPIC_TABLE,
64+
SubtaskConfig: map[string]any{
65+
"typeMappings": mappings,
66+
"storyPointField": data.Options.ScopeConfig.StoryPointField,
67+
},
6568
},
66-
Extract: func(row *api.RawData) ([]interface{}, errors.Error) {
67-
apiIssue := &apiv2models.Issue{}
68-
err = errors.Convert(json.Unmarshal(row.Data, apiIssue))
69-
if err != nil {
70-
return nil, err
69+
BeforeExtract: func(apiIssue *apiv2models.Issue, stateManager *api.SubtaskStateManager) errors.Error {
70+
if stateManager.IsIncremental() {
71+
err := db.Delete(
72+
&models.JiraIssueLabel{},
73+
dal.Where("connection_id = ? AND issue_id = ?", data.Options.ConnectionId, apiIssue.ID),
74+
)
75+
if err != nil {
76+
return err
77+
}
78+
err = db.Delete(
79+
&models.JiraIssueRelationship{},
80+
dal.Where("connection_id = ? AND issue_id = ?", data.Options.ConnectionId, apiIssue.ID),
81+
)
82+
if err != nil {
83+
return err
84+
}
7185
}
86+
return nil
87+
},
88+
Extract: func(apiIssue *apiv2models.Issue, row *api.RawData) ([]interface{}, errors.Error) {
7289
return extractIssues(data, mappings, apiIssue, row, userFieldMap)
7390
},
7491
})

backend/server/api/api.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ func SetupApiServer(router *gin.Engine) {
118118
// Required for `/projects/hello%20%2F%20world` to be parsed properly with `/projects/:projectName`
119119
// end up with `name = "hello / world"`
120120
router.UseRawPath = true
121+
// router.UnescapePathValues = false
121122

122123
// Endpoint to proceed database migration
123124
router.GET("/proceed-db-migration", func(ctx *gin.Context) {

backend/server/services/pipeline.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,15 @@ package services
2020
import (
2121
"context"
2222
"fmt"
23-
"golang.org/x/sync/errgroup"
2423
"net/url"
2524
"os"
2625
"path/filepath"
2726
"strings"
2827
"sync"
2928
"time"
3029

30+
"golang.org/x/sync/errgroup"
31+
3132
"github.com/spf13/cast"
3233

3334
"github.com/apache/incubator-devlake/core/dal"
@@ -107,7 +108,9 @@ func pipelineServiceInit() {
107108
pipelineMaxParallel = 10000
108109
}
109110
// run pipeline with independent goroutine
110-
go RunPipelineInQueue(pipelineMaxParallel)
111+
if cfg.GetBool("CONSUME_PIPELINES") {
112+
go RunPipelineInQueue(pipelineMaxParallel)
113+
}
111114
}
112115

113116
func markInterruptedPipelineAs(status string) {

0 commit comments

Comments
 (0)