Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
c606598
Add priority flag to Dispatch CLI and API
tehut Apr 8, 2025
bf98caf
added TestJobDispatchCommand_Priority
tehut Apr 8, 2025
94306a1
add priority bounds check to validateDispatchRequest
tehut Apr 11, 2025
9bc62fa
make priority flag Int again
tehut Apr 11, 2025
438de41
add DispatchOptions struct and DispatchOpts endpoint to HTTP api
tehut Apr 11, 2025
a308566
clean up dispatch test
tehut Apr 11, 2025
a97eea2
Refactor HTTP_JobDispatch test to use shoenig
tehut Apr 14, 2025
90d1962
add priority based test cases to TestJobEndpoint_Dispatch
tehut Apr 16, 2025
68384e9
update dispatch cli docs
tehut Apr 16, 2025
153873c
use title in attempt to link
tehut Apr 16, 2025
3578ae3
add invalid case to command test
tehut Apr 16, 2025
4f7f663
add changelog entry
tehut Apr 17, 2025
475ef8a
remove redundant second args.Priority check
tehut Apr 17, 2025
b7d82de
add priority check to TestJobEndpoint_Dispatch
tehut Apr 17, 2025
81638c1
refactor TestJobEndpoint_Dispatch to shoenig and split 50 line if/els…
tehut Apr 17, 2025
f3c6564
fixups: typo in slug and add errString for invalid priority test
tehut Apr 17, 2025
3282f58
docs: priority value 1-job_max_priority
tehut Apr 17, 2025
9613820
fix command dir tests
tehut Apr 18, 2025
dfd0145
add expectedPriority to all cases and rename err to expectError
tehut Apr 18, 2025
c1fff15
remove crufty Logfs and refactor noEval and EvalID assertions
tehut Apr 18, 2025
320ef2a
set expectedPriority on all test cases and use dispatchRequestStruct …
tehut Apr 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/25622.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
command: added priority flag to job dispatch command
```
32 changes: 28 additions & 4 deletions api/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,16 +529,39 @@ func (j *Jobs) Summary(jobID string, q *QueryOptions) (*JobSummary, *QueryMeta,
return &resp, qm, nil
}

// DispatchOptions is used to pass through job dispatch parameters
type DispatchOptions struct {
JobID string
Meta map[string]string
Payload []byte
IdPrefixTemplate string
Priority int
}

func (j *Jobs) Dispatch(jobID string, meta map[string]string,
payload []byte, idPrefixTemplate string, q *WriteOptions) (*JobDispatchResponse, *WriteMeta, error) {
var resp JobDispatchResponse
req := &JobDispatchRequest{

return j.DispatchOpts(&DispatchOptions{
JobID: jobID,
Meta: meta,
Payload: payload,
IdPrefixTemplate: idPrefixTemplate,
IdPrefixTemplate: idPrefixTemplate},
q,
)
}

// DispatchOpts is used to dispatch a new job with the passed DispatchOpts. It
// returns the ID of the evaluation, along with any errors encountered.
func (j *Jobs) DispatchOpts(opts *DispatchOptions, q *WriteOptions) (*JobDispatchResponse, *WriteMeta, error) {
var resp JobDispatchResponse
req := &JobDispatchRequest{
JobID: opts.JobID,
Meta: opts.Meta,
Payload: opts.Payload,
IdPrefixTemplate: opts.IdPrefixTemplate,
Priority: opts.Priority,
}
wm, err := j.client.put("/v1/job/"+url.PathEscape(jobID)+"/dispatch", req, &resp, q)
wm, err := j.client.put("/v1/job/"+url.PathEscape(opts.JobID)+"/dispatch", req, &resp, q)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -1571,6 +1594,7 @@ type JobDispatchRequest struct {
Payload []byte
Meta map[string]string
IdPrefixTemplate string
Priority int
}

type JobDispatchResponse struct {
Expand Down
138 changes: 122 additions & 16 deletions command/agent/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1964,9 +1964,8 @@ func TestHTTP_JobDispatch(t *testing.T) {
},
}
var resp structs.JobRegisterResponse
if err := s.Agent.RPC("Job.Register", &args, &resp); err != nil {
t.Fatalf("err: %v", err)
}
err := s.Agent.RPC("Job.Register", &args, &resp)
must.NoError(t, err)

// Make the request
respW := httptest.NewRecorder()
Expand All @@ -1981,29 +1980,136 @@ func TestHTTP_JobDispatch(t *testing.T) {

// Make the HTTP request
req2, err := http.NewRequest(http.MethodPut, "/v1/job/"+job.ID+"/dispatch", buf)
if err != nil {
t.Fatalf("err: %v", err)
}
must.NoError(t, err)
respW.Flush()

// Make the request
obj, err := s.Server.JobSpecificRequest(respW, req2)
if err != nil {
t.Fatalf("err: %v", err)
}
must.NoError(t, err)

// Check the response
dispatch := obj.(structs.JobDispatchResponse)
if dispatch.EvalID == "" {
t.Fatalf("bad: %v", dispatch)
}

if dispatch.DispatchedJobID == "" {
t.Fatalf("bad: %v", dispatch)
}
must.NotEq(t, dispatch.EvalID, "")
must.NotEq(t, dispatch.DispatchedJobID, "")
})
}

func TestHTTP_JobDispatchPriority(t *testing.T) {
ci.Parallel(t)
defaultPriority := 50
validPriority := 90
invalidPriority := -1

testCases := []struct {
name string
dispatchReq *structs.JobDispatchRequest
expectedPriority int
expectedErr bool
}{
{
name: "no priority",
dispatchReq: &structs.JobDispatchRequest{
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: structs.DefaultNamespace,
IdempotencyToken: "foo",
},
},
expectedPriority: defaultPriority,
},
{
name: "set invalid priority",
dispatchReq: &structs.JobDispatchRequest{
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: structs.DefaultNamespace,
IdempotencyToken: "foo",
},
Priority: invalidPriority,
},
expectedPriority: invalidPriority,
expectedErr: true,
},
{
name: "set valid priority",
dispatchReq: &structs.JobDispatchRequest{
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: structs.DefaultNamespace,
IdempotencyToken: "foo",
},
Priority: validPriority,
},
expectedPriority: validPriority,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
httpTest(t, nil, func(s *TestAgent) {

// Create the parameterized job
job := mock.BatchJob()
job.ParameterizedJob = &structs.ParameterizedJobConfig{}

args := structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: structs.DefaultNamespace,
},
}

var resp structs.JobRegisterResponse
err := s.Agent.RPC("Job.Register", &args, &resp)
must.NoError(t, err)

// Make the request
respW := httptest.NewRecorder()
args2 := tc.dispatchReq
buf := encodeReq(args2)

// Make the HTTP request
req2, err := http.NewRequest(http.MethodPut, "/v1/job/"+job.ID+"/dispatch", buf)
must.NoError(t, err)
respW.Flush()

// Make the request
obj, err := s.Server.JobSpecificRequest(respW, req2)
if tc.expectedErr {
must.ErrorContains(t, err, "job priority must be between")
return
} else {
must.NoError(t, err)
}

// Check the response
dispatch := obj.(structs.JobDispatchResponse)
must.NotEq(t, dispatch.EvalID, "")
must.NotEq(t, dispatch.DispatchedJobID, "")
must.NotEq(t, job.ID, dispatch.DispatchedJobID)

//Check job priority

// Make the HTTP request
bufInfo := encodeReq(structs.Job{ParentID: job.ID})
reqInfo, err := http.NewRequest(http.MethodGet, "/v1/job/"+dispatch.DispatchedJobID, bufInfo)
must.NoError(t, err)

// Make the request
respInfo := httptest.NewRecorder()
objInfo, err := s.Server.JobSpecificRequest(respInfo, reqInfo)
must.NoError(t, err)
respInfo.Flush()

// Check the response
dispatchJob := objInfo.(*structs.Job)
must.Eq(t, tc.expectedPriority, dispatchJob.Priority)
})
})
}
}

func TestHTTP_JobDispatchPayload(t *testing.T) {
ci.Parallel(t)
httpTest(t, nil, func(s *TestAgent) {
Expand Down
12 changes: 10 additions & 2 deletions command/job_dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ func (c *JobDispatchCommand) Run(args []string) int {
var idempotencyToken string
var meta []string
var idPrefixTemplate string
var priority int

flags := c.Meta.FlagSet(c.Name(), FlagSetClient)
flags.Usage = func() { c.Ui.Output(c.Help()) }
Expand All @@ -131,7 +132,7 @@ func (c *JobDispatchCommand) Run(args []string) int {
flags.Var((*flaghelper.StringFlag)(&meta), "meta", "")
flags.StringVar(&idPrefixTemplate, "id-prefix-template", "", "")
flags.BoolVar(&openURL, "ui", false, "")

flags.IntVar(&priority, "priority", 0, "")
if err := flags.Parse(args); err != nil {
return 1
}
Expand Down Expand Up @@ -201,7 +202,14 @@ func (c *JobDispatchCommand) Run(args []string) int {
IdempotencyToken: idempotencyToken,
Namespace: namespace,
}
resp, _, err := client.Jobs().Dispatch(jobID, metaMap, payload, idPrefixTemplate, w)
opts := &api.DispatchOptions{
JobID: jobID,
Meta: metaMap,
Payload: payload,
IdPrefixTemplate: idPrefixTemplate,
Priority: priority,
}
resp, _, err := client.Jobs().DispatchOpts(opts, w)
if err != nil {
c.Ui.Error(fmt.Sprintf("Failed to dispatch job: %s", err))
return 1
Expand Down
100 changes: 100 additions & 0 deletions command/job_dispatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package command

import (
"strconv"
"strings"
"testing"

Expand Down Expand Up @@ -104,6 +105,7 @@ func TestJobDispatchCommand_ACL(t *testing.T) {
job := mock.MinJob()
job.Type = "batch"
job.ParameterizedJob = &structs.ParameterizedJobConfig{}
job.Priority = 20 //set priority on parent job
state := srv.Agent.Server().State()
err := state.UpsertJob(structs.MsgTypeTestSetup, 100, nil, job)
must.NoError(t, err)
Expand Down Expand Up @@ -210,3 +212,101 @@ namespace "default" {
})
}
}

func TestJobDispatchCommand_Priority(t *testing.T) {
ci.Parallel(t)
defaultJobPriority := 50
// Start server
srv, client, url := testServer(t, true, nil)
t.Cleanup(srv.Shutdown)

waitForNodes(t, client)

// Create a parameterized job.
job := mock.MinJob()
job.Type = "batch"
job.ParameterizedJob = &structs.ParameterizedJobConfig{}
job.Priority = defaultJobPriority // set default priority on parent job
state := srv.Agent.Server().State()
err := state.UpsertJob(structs.MsgTypeTestSetup, 100, nil, job)
must.NoError(t, err)

testCases := []struct {
name string
priority string
expectedErr bool
additionalFlags []string
payload map[string]string
}{
{
name: "no priority",
},
{
name: "valid priority",
priority: "80",
},
{
name: "invalid priority",
priority: "-1",
expectedErr: true,
},
{
name: "priority + flag",
priority: "90",
additionalFlags: []string{"-verbose"},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ui := cli.NewMockUi()
cmd := &JobDispatchCommand{Meta: Meta{Ui: ui}}
args := []string{
"-address", url,
}
// Add priority, if present
if len(tc.priority) >= 1 {
args = append(args, []string{"-priority", tc.priority}...)
}

// Add additional flags, if present
if len(tc.additionalFlags) >= 1 {
args = append(args, tc.additionalFlags...)
}

// Add job ID to the command.
args = append(args, job.ID)

// Run command.
code := cmd.Run(args)
if !tc.expectedErr {
must.Zero(t, code)
} else {
// Confirm expected error case
must.NonZero(t, code)
out := ui.ErrorWriter.String()
must.StrContains(t, out, "dispatch job priority must be between [1, 100]")
return
}

// Confirm successful dispatch and parse job ID
out := ui.OutputWriter.String()
must.StrContains(t, out, "Dispatched Job ID =")
parts := strings.Fields(out)
id := strings.TrimSpace(parts[4])

// Confirm dispatched job priority set correctly
job, _, err := client.Jobs().Info(id, nil)
must.NoError(t, err)
must.NotNil(t, job)

if len(tc.priority) >= 1 {
priority, err := strconv.Atoi(tc.priority)
must.NoError(t, err)
must.Eq(t, job.Priority, &priority)
} else {
must.Eq(t, defaultJobPriority, *job.Priority)
}
})
}
}
Loading