Skip to content

Commit 7b533e3

Browse files
authored
[SILO-452] feat: Queue multiple job runs in importers (#3945)
* modify: queue multiple import jobs and bug fixes * moved queue job logic inside migrator + code refactoring * fix: code improvement
1 parent 00c523e commit 7b533e3

30 files changed

+153
-57
lines changed

apps/api/plane/ee/serializers/api/issue_property.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
class IssuePropertyAPISerializer(BaseSerializer):
1414
relation_type = serializers.ChoiceField(
15-
choices=RelationTypeEnum.choices, required=False
15+
choices=RelationTypeEnum.choices, required=False, allow_null=True
1616
)
1717

1818
class Meta:

apps/api/plane/ee/views/api/job/base.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,19 +54,28 @@ def post(self, request):
5454
def get(self, request, pk=None):
5555
if not pk:
5656
filters = {}
57+
order_by = "-created_at"
5758
# Handle source parameter
5859
if "source" in request.query_params:
5960
filters["source"] = request.query_params["source"]
6061

62+
# Handle status parameter
63+
if "statuses" in request.query_params:
64+
filters["status__in"] = request.query_params["statuses"].split(",")
65+
6166
# Handle workspace_id/workspaceId parameter
6267
if "workspace_id" in request.query_params:
6368
filters["workspace_id"] = request.query_params["workspace_id"]
6469
elif "workspaceId" in request.query_params:
6570
filters["workspace_id"] = request.query_params["workspaceId"]
6671

72+
# Handle order_by parameter
73+
if "order_by" in request.query_params:
74+
order_by = request.query_params["order_by"]
75+
6776
import_jobs = (
6877
ImportJob.objects.filter(**filters)
69-
.order_by("-created_at")
78+
.order_by(order_by)
7079
.select_related("workspace", "report")
7180
)
7281
serializer = ImportJobAPISerializer(import_jobs, many=True)

apps/silo/src/apps/engine/controllers/job.controller.ts

Lines changed: 12 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,16 @@
88

99
import { Request, Response } from "express";
1010

11-
import { TImporterKeys, TIntegrationKeys } from "@plane/etl/core";
12-
import { resetJobIfStarted } from "@/helpers/job";
11+
import { E_JOB_STATUS, TImporterKeys, TIntegrationKeys } from "@plane/etl/core";
1312
import { responseHandler } from "@/helpers/response-handler";
14-
import { Controller, Get, Post, Put, useValidateUserAuthentication } from "@/lib";
13+
import { APIError, Controller, Get, Post, Put, useValidateUserAuthentication } from "@/lib";
1514
import { logger } from "@/logger";
1615
import { getAPIClient } from "@/services/client";
1716
import { importTaskManger } from "@/worker";
17+
import { JobService } from "../services/job.service";
1818

1919
const client = getAPIClient();
20+
const jobService = new JobService();
2021

2122
@Controller("/api/jobs")
2223
export class JobController {
@@ -120,13 +121,13 @@ export class JobController {
120121

121122
// Get the job from the given job id
122123
const job = await client.importJob.getImportJob(body.jobId);
123-
if ((job.status && job.status === "FINISHED") || job.status === "ERROR") {
124+
if ((job.status && job.status === E_JOB_STATUS.FINISHED) || job.status === E_JOB_STATUS.ERROR) {
124125
res.status(400).json({ message: "Job already finished or errored out, can't cancel" });
125126
return;
126127
}
127128

128129
await client.importJob.updateImportJob(body.jobId, {
129-
status: "CANCELLED",
130+
status: E_JOB_STATUS.CANCELLED,
130131
cancelled_at: new Date().toISOString(),
131132
});
132133
res.status(200).json({ message: "Job cancelled successfully" });
@@ -157,6 +158,7 @@ export class JobController {
157158
},
158159
{ phase, isLastBatch }
159160
);
161+
160162
res.status(200).json({ message: "Job updated successfully" });
161163
} catch (error: any) {
162164
responseHandler(res, 500, error);
@@ -182,48 +184,14 @@ export class JobController {
182184
return;
183185
}
184186

185-
// Get the job from the given job id
186-
const job = await client.importJob.getImportJob(body.jobId);
187-
// If the job is not finished or error, just send 400 OK, and don't do
188-
// anything
189-
if (
190-
job.status &&
191-
job.status != "CREATED" &&
192-
job.status != "FINISHED" &&
193-
job.status != "ERROR" &&
194-
job.status != "CANCELLED"
195-
) {
196-
res.status(400).json({ message: "Job already in progress, can't instantiate again" });
197-
return;
198-
}
199-
// Check if the config is already present, for the particular job or not
200-
if (!job.config || job.source == null) {
201-
res.status(400).json({
202-
message: "Config for the requested job is not found, make sure to create a config before initiating a job",
203-
});
204-
return;
205-
}
206-
logger.info(`[${job.id}] Initiating job ${job.source}}`);
207-
208-
await client.importJob.updateImportJob(job.id, {
209-
status: "CREATED",
210-
cancelled_at: null,
211-
error_metadata: {},
212-
});
213-
214-
await resetJobIfStarted(job);
215-
216-
await importTaskManger.registerTask(
217-
{
218-
route: job.source.toLowerCase(),
219-
jobId: job.id,
220-
type: "initiate",
221-
},
222-
{}
223-
);
187+
await jobService.runJob(body.jobId);
224188

225189
res.status(200).json({ message: "Job initiated successfully" });
226190
} catch (error: any) {
191+
if (error instanceof APIError) {
192+
res.status(error.statusCode).json({ message: error.message });
193+
return;
194+
}
227195
responseHandler(res, 500, error);
228196
}
229197
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
import { E_JOB_STATUS } from "@plane/etl/core";
2+
import { resetJobIfStarted } from "@/helpers/job";
3+
import { APIError } from "@/lib";
4+
import { logger } from "@/logger";
5+
import { getAPIClient } from "@/services/client";
6+
import { importTaskManger } from "@/worker";
7+
8+
const JOB_IN_PROGRESS_STATUSES = [
9+
E_JOB_STATUS.INITIATED,
10+
E_JOB_STATUS.PULLED,
11+
E_JOB_STATUS.PULLING,
12+
E_JOB_STATUS.PROGRESSING,
13+
E_JOB_STATUS.TRANSFORMING,
14+
E_JOB_STATUS.TRANSFORMED,
15+
E_JOB_STATUS.PUSHING,
16+
];
17+
18+
const JOB_ALLOWED_FOR_RE_RUN_STATUSES = [
19+
E_JOB_STATUS.CREATED,
20+
E_JOB_STATUS.QUEUED,
21+
E_JOB_STATUS.CANCELLED,
22+
E_JOB_STATUS.ERROR,
23+
E_JOB_STATUS.FINISHED,
24+
];
25+
26+
export class JobService {
27+
private readonly client = getAPIClient();
28+
29+
async runJob(jobId: string) {
30+
// Get the job from the given job id
31+
const job = await this.client.importJob.getImportJob(jobId);
32+
const jobsInProgress = await this.client.importJob.listImportJobs({
33+
workspace_id: job.workspace_id,
34+
source: job.source,
35+
statuses: JOB_IN_PROGRESS_STATUSES.join(","),
36+
});
37+
const isAnyJobsInProgress = jobsInProgress.length > 0;
38+
// If the job is not finished or error, just send 400 OK, and don't do anything
39+
if (job.status && !JOB_ALLOWED_FOR_RE_RUN_STATUSES.includes(job.status as E_JOB_STATUS)) {
40+
throw new APIError("Job is not in a valid status to run, can't instantiate again", 400);
41+
}
42+
// Check if the config is already present, for the particular job or not
43+
if (!job.config || job.source == null) {
44+
throw new APIError(
45+
"Config for the requested job is not found, make sure to create a config before initiating a job",
46+
400
47+
);
48+
}
49+
50+
// Update the job status to initiated or queued if there are any jobs started
51+
await this.client.importJob.updateImportJob(job.id, {
52+
status: !isAnyJobsInProgress ? E_JOB_STATUS.INITIATED : E_JOB_STATUS.QUEUED,
53+
cancelled_at: null,
54+
error_metadata: {},
55+
});
56+
57+
// If there are no jobs created, then we need to register the task to initiate the job
58+
if (!isAnyJobsInProgress) {
59+
logger.info(`Initiating import job`, { jobId: job.id, source: job.source });
60+
// Reset the job if it has already started
61+
await resetJobIfStarted(job);
62+
await importTaskManger.registerTask(
63+
{
64+
route: job.source.toLowerCase(),
65+
jobId: job.id,
66+
type: "initiate",
67+
},
68+
{}
69+
);
70+
}
71+
72+
return job;
73+
}
74+
}

apps/silo/src/etl/base-import-worker.ts

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
import { TJobStatus, PlaneEntities } from "@plane/etl/core";
1+
import { TJobStatus, PlaneEntities, E_JOB_STATUS } from "@plane/etl/core";
22
import { TImportJob } from "@plane/types";
3+
import { JobService } from "@/apps/engine/services/job.service";
34
import { IMPORT_JOB_FIRST_PAGE_PUSHED_CACHE_KEY } from "@/helpers/cache-keys";
45
import { wait } from "@/helpers/delay";
56
import { updateJobWithReport } from "@/helpers/job";
@@ -67,6 +68,22 @@ export abstract class BaseDataMigrator<TJobConfig, TSourceEntity> implements Tas
6768
completed_batch_count: report.total_batch_count,
6869
}),
6970
]);
71+
72+
// start the queued jobs
73+
const queuedJobs = await client.importJob.listImportJobs({
74+
workspace_id: job.workspace_id,
75+
source: job.source,
76+
statuses: E_JOB_STATUS.QUEUED,
77+
order_by: "created_at",
78+
});
79+
const nextJob = queuedJobs[0];
80+
const isAnyJobsQueued = queuedJobs.length > 0;
81+
82+
if (isAnyJobsQueued) {
83+
const jobService = new JobService();
84+
logger.info(`Starting next job for ${job.source}`, { jobId: job.id, nextJobId: nextJob.id });
85+
await jobService.runJob(nextJob.id);
86+
}
7087
}
7188

7289
async handleTask(headers: TaskHeaders, data: any): Promise<boolean> {

apps/silo/src/lib/errors.ts

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { AxiosError, RawAxiosResponseHeaders } from "axios";
22
import { wait } from "@/helpers/delay";
3-
import { logger } from "@/logger";
3+
import { captureException, logger } from "@/logger";
44

55
export type APIRatelimitResponse = {
66
error_code: number;
@@ -26,11 +26,6 @@ interface RateLimitHeaders {
2626
"x-ratelimit-reset": string;
2727
}
2828

29-
interface APIRateLimitError extends Error {
30-
response?: {
31-
headers?: Partial<RateLimitHeaders>;
32-
};
33-
}
3429

3530
function isRateLimitHeaders(headers: any): headers is RateLimitHeaders {
3631
return (
@@ -65,6 +60,7 @@ export async function protect<T>(fn: (...args: any[]) => Promise<T>, ...args: an
6560
try {
6661
return await fn(...args);
6762
} catch (error) {
63+
captureException(error as Error);
6864
// Check if the error is an Axios error with status 429
6965
if (error instanceof AxiosError && error.response?.status === 429) {
7066
logger.info("Rate limit exceeded ====== in protect");

apps/silo/src/services/job/import-job.service.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,12 @@ import { logger } from "@/logger";
44
import { APIService } from "@/services/api.service";
55
// types
66
import { ClientOptions } from "@/types";
7+
8+
export type TImportJobListQueryParams = Partial<Record<keyof TImportJob, string | boolean | number>> & {
9+
statuses?: string;
10+
order_by?: "created_at" | "updated_at" | "status";
11+
};
12+
713
export class ImportJobAPIService<TJobConfig = object> extends APIService {
814
constructor(options: ClientOptions) {
915
super(options);
@@ -38,7 +44,7 @@ export class ImportJobAPIService<TJobConfig = object> extends APIService {
3844
});
3945
}
4046

41-
async listImportJobs(params?: Partial<Record<keyof TImportJob, string | boolean | number>>): Promise<TImportJob[]> {
47+
async listImportJobs(params?: TImportJobListQueryParams): Promise<TImportJob[]> {
4248
params = removeUndefinedFromObject(params);
4349
return this.get(`/api/v1/import-jobs/`, { params: params })
4450
.then((response) => response.data)

apps/web/ee/components/importers/common/dashboard/base-dashboard.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ export const BaseDashboard = observer(<T,>(props: IBaseDashboardProps<T>) => {
9797
const isReRunDisabled = (job: any) => {
9898
if (!job || !job?.status) return true;
9999

100-
return ![E_JOB_STATUS.CREATED, E_JOB_STATUS.FINISHED, E_JOB_STATUS.ERROR, E_JOB_STATUS.CANCELLED].includes(
100+
return ![E_JOB_STATUS.CREATED, E_JOB_STATUS.FINISHED, E_JOB_STATUS.ERROR, E_JOB_STATUS.CANCELLED, E_JOB_STATUS.QUEUED].includes(
101101
job?.status as E_JOB_STATUS
102102
);
103103
};

apps/web/ee/components/importers/common/dashboard/status.tsx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ type TSyncJobStatusProps = {
1313

1414
const STATUS_CLASSNAMES: { [key in TJobStatus]: string } = {
1515
[E_JOB_STATUS.CREATED]: "text-gray-500 border border-gray-500 bg-gray-500/10",
16+
[E_JOB_STATUS.QUEUED]: "text-gray-500 border border-gray-500 bg-gray-500/10",
1617
[E_JOB_STATUS.INITIATED]: "text-gray-500 border border-gray-500 bg-gray-500/10",
1718
[E_JOB_STATUS.PULLING]: "text-yellow-500 border border-yellow-500 bg-yellow-500/10",
1819
[E_JOB_STATUS.PULLED]: "text-yellow-500 border border-yellow-500 bg-yellow-500/10",

apps/web/ee/store/importers/clickup/root.store.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,11 @@ const defaultImporterData: TImporterClickUpDataPayload = {
4141
},
4242
};
4343

44+
const defaultConfigData: Partial<TClickUpConfig> = {
45+
skipAdditionalDataImport: true,
46+
skipUserImport: false,
47+
};
48+
4449
export interface IClickUpStore extends IImporterBaseStore {
4550
// observables
4651
dashboardView: boolean;
@@ -71,7 +76,7 @@ export class ClickUpStore extends ImporterBaseStore implements IClickUpStore {
7176
dashboardView: boolean = true;
7277
stepper: TClickUpImporterStepKeys = E_CLICKUP_IMPORTER_STEPS.CONFIGURE_CLICKUP;
7378
importerData: TImporterClickUpDataPayload = defaultImporterData;
74-
configData: Partial<TClickUpConfig> = {};
79+
configData: Partial<TClickUpConfig> = defaultConfigData;
7580
// store instances
7681
job: IImporterJobStore<TClickUpConfig>;
7782
auth: IClickUpAuthStore;
@@ -182,6 +187,6 @@ export class ClickUpStore extends ImporterBaseStore implements IClickUpStore {
182187
this.dashboardView = true;
183188
this.stepper = E_CLICKUP_IMPORTER_STEPS.CONFIGURE_CLICKUP;
184189
this.importerData = defaultImporterData;
185-
this.configData = {};
190+
this.configData = defaultConfigData;
186191
};
187192
}

0 commit comments

Comments
 (0)