Skip to content
Open
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go/ai/gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ type ModelSupports struct {
SystemRole bool `json:"systemRole,omitempty"`
ToolChoice bool `json:"toolChoice,omitempty"`
Tools bool `json:"tools,omitempty"`
LongRunning bool `json:"longRunning,omitempty"`
}

type ConstrainedSupport string
Expand Down Expand Up @@ -259,6 +260,8 @@ type ModelResponse struct {
// LatencyMs is the time the request took in milliseconds.
LatencyMs float64 `json:"latencyMs,omitempty"`
Message *Message `json:"message,omitempty"`
// Operation holds the background operation details for long-running operations.
Operation map[string]any `json:"operation,omitempty"`
// Request is the [ModelRequest] struct used to trigger this response.
Request *ModelRequest `json:"request,omitempty"`
// Usage describes how many resources were used by this generation request.
Expand Down
224 changes: 223 additions & 1 deletion go/ai/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,29 @@ type ModelOptions struct {
Versions []string `json:"versions,omitempty"`
}

type BackgroundModel = core.BackgroundAction[*ModelRequest, *ModelResponse] // Background action for model operations

// StartOperationFunc starts a background operation
type StartOperationFunc[In, Out any] = func(ctx context.Context, input In) (*core.Operation[Out], error)

// CheckOperationFunc checks the status of a background operation
type CheckOperationFunc[Out any] = func(ctx context.Context, operation *core.Operation[Out]) (*core.Operation[Out], error)

// CancelOperationFunc cancels a background operation
type CancelOperationFunc[Out any] = func(ctx context.Context, operation *core.Operation[Out]) (*core.Operation[Out], error)

// BackgroundModelOptions holds configuration for defining a background model
type BackgroundModelOptions struct {
Versions []string `json:"versions,omitempty"`
Supports *ModelSupports `json:"supports,omitempty"`
ConfigSchema map[string]any `json:"configSchema,omitempty"`
Metadata map[string]any `json:"metadata,omitempty"`
Label string `json:"label,omitempty"`
Start StartOperationFunc[*ModelRequest, *ModelResponse]
Check CheckOperationFunc[*ModelResponse]
Cancel CancelOperationFunc[*ModelResponse]
}

// DefineGenerateAction defines a utility generate action.
func DefineGenerateAction(ctx context.Context, r *registry.Registry) *generateAction {
return (*generateAction)(core.DefineStreamingAction(r, "generate", core.ActionTypeUtil, nil,
Expand Down Expand Up @@ -154,6 +177,7 @@ func DefineModel(r *registry.Registry, name string, opts *ModelOptions, fn Model
"constrained": opts.Supports.Constrained,
"output": opts.Supports.Output,
"contentType": opts.Supports.ContentType,
"longRunning": opts.Supports.LongRunning,
},
"versions": opts.Versions,
"stage": opts.Stage,
Expand Down Expand Up @@ -189,6 +213,12 @@ func LookupModel(r *registry.Registry, name string) Model {
return (*model)(action)
}

// LookupBackgroundModel looks up a BackgroundAction registered by [DefineBackgroundModel].
// It returns nil if the background model was not found.
func LookupBackgroundModel(r *registry.Registry, name string) BackgroundModel {
return core.LookupBackgroundAction[*ModelRequest, *ModelResponse](r, name)
}

// GenerateWithRequest is the central generation implementation for ai.Generate(), prompt.Execute(), and the GenerateAction direct call.
func GenerateWithRequest(ctx context.Context, r *registry.Registry, opts *GenerateActionOptions, mw []ModelMiddleware, cb ModelStreamCallback) (*ModelResponse, error) {
if opts.Model == "" {
Expand All @@ -201,10 +231,12 @@ func GenerateWithRequest(ctx context.Context, r *registry.Registry, opts *Genera
}

m := LookupModel(r, opts.Model)

if m == nil {
return nil, core.NewError(core.NOT_FOUND, "ai.GenerateWithRequest: model %q not found", opts.Model)
}

isLongRunning := SupportsLongRunning(r, opts.Model)
resumeOutput, err := handleResumeOption(ctx, r, opts)
if err != nil {
return nil, err
Expand Down Expand Up @@ -300,7 +332,50 @@ func GenerateWithRequest(ctx context.Context, r *registry.Registry, opts *Genera
Output: &outputCfg,
}

fn := core.ChainMiddleware(mw...)(m.Generate)
var fn ModelFunc
if isLongRunning {
_, name, _ := strings.Cut(opts.Model, "/")

bgAction := LookupBackgroundModel(r, name)
if bgAction == nil {
return nil, core.NewError(core.NOT_FOUND, "background model %q not found", opts.Model)
}

// Create a wrapper function that calls the background model but returns a ModelResponse with operation
fn = func(ctx context.Context, req *ModelRequest, cb ModelStreamCallback) (*ModelResponse, error) {
op, err := bgAction.Start(ctx, req)
if err != nil {
return nil, err
}

// Return response with operation
operationMap := map[string]any{
"action": op.Action,
"id": op.ID,
"done": op.Done,
}
if op.Output != nil {
operationMap["output"] = op.Output
}
if op.Error != nil {
operationMap["error"] = map[string]any{
"message": op.Error.Error(),
}
}
if op.Metadata != nil {
operationMap["metadata"] = op.Metadata
}

return &ModelResponse{
Operation: operationMap,
Request: req,
}, nil
}
} else {
fn = m.Generate
}

fn = core.ChainMiddleware(mw...)(fn)

currentTurn := 0
for {
Expand All @@ -309,6 +384,11 @@ func GenerateWithRequest(ctx context.Context, r *registry.Registry, opts *Genera
return nil, err
}

// If this is a long-running operation response, return it immediately without further processing
if resp.Operation != nil {
return resp, nil
}

if formatHandler != nil {
resp.Message, err = formatHandler.ParseMessage(resp.Message)
if err != nil {
Expand Down Expand Up @@ -1148,3 +1228,145 @@ func executeResourcePart(ctx context.Context, r *registry.Registry, resourceURI

return output.Content, nil
}

// DefineBackgroundModel defines a new model that runs in the background
func DefineBackgroundModel(
r *registry.Registry,
name string,
opts *BackgroundModelOptions,
) BackgroundModel {
label := opts.Label
if label == "" {
label = name
}
metadata := map[string]any{
"model": map[string]any{
"label": label,
"versions": opts.Versions,
"supports": opts.Supports,
},
}

if opts.ConfigSchema != nil {
metadata["customOptions"] = opts.ConfigSchema
if metadata["model"] == nil {
metadata["model"] = make(map[string]any)
}
modelMeta := metadata["model"].(map[string]any)
modelMeta["customOptions"] = opts.ConfigSchema
}

bgAction := core.DefineBackgroundAction[*ModelRequest, *ModelResponse](r, name, opts.Metadata,
opts.Start, opts.Check, opts.Cancel)

return bgAction
}

// SupportsLongRunning checks if a model supports long-running operations by model name.
func SupportsLongRunning(r *registry.Registry, modelName string) bool {
if modelName == "" {
return false
}

modelInstance := LookupModel(r, modelName)
if modelInstance == nil {
return false
}

modelImpl, ok := modelInstance.(*model)
if !ok {
return false
}

action := (*core.ActionDef[*ModelRequest, *ModelResponse, *ModelResponseChunk])(modelImpl)
if action == nil {
return false
}

metadata := action.Desc().Metadata
if metadata == nil {
return false
}

modelMeta, ok := metadata["model"].(map[string]any)
if !ok {
return false
}

supportsMeta, ok := modelMeta["supports"].(map[string]any)
if !ok {
return false
}

longRunning, ok := supportsMeta["longRunning"].(bool)
if !ok {
return false
}

return longRunning
}

// GenerateOperation generates a model response as a long-running operation based on the provided options. s.
func GenerateOperation(ctx context.Context, r *registry.Registry, opts ...GenerateOption) (*core.Operation[*ModelResponse], error) {

resp, err := Generate(ctx, r, opts...)
if err != nil {
return nil, err
}

if resp.Operation == nil {
return nil, core.NewError(core.FAILED_PRECONDITION, "model did not return an operation")
}

var action string
if v, ok := resp.Operation["action"].(string); ok {
action = v
} else {
return nil, core.NewError(core.INTERNAL, "operation missing or invalid 'action' field")
}
var id string
if v, ok := resp.Operation["id"].(string); ok {
id = v
} else {
return nil, core.NewError(core.INTERNAL, "operation missing or invalid 'id' field")
}
var done bool
if v, ok := resp.Operation["done"].(bool); ok {
done = v
} else {
return nil, core.NewError(core.INTERNAL, "operation missing or invalid 'done' field")
}
var metadata map[string]any
if v, ok := resp.Operation["metadata"].(map[string]any); ok {
metadata = v
}

op := &core.Operation[*ModelResponse]{
Action: action,
ID: id,
Done: done,
Metadata: metadata,
}

if op.Done {
if output, ok := resp.Operation["output"]; ok {
if modelResp, ok := output.(*ModelResponse); ok {
op.Output = modelResp
} else {
op.Output = resp
}
} else {
op.Output = resp
}
}

if errorData, ok := resp.Operation["error"]; ok {
if errorMap, ok := errorData.(map[string]any); ok {
if message, ok := errorMap["message"].(string); ok {
op.Error = errors.New(message)
}
}
}

return op, nil
}
3 changes: 3 additions & 0 deletions go/core/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ const (
ActionTypeTool ActionType = "tool"
ActionTypeUtil ActionType = "util"
ActionTypeCustom ActionType = "custom"
ActionTypeBackgroundModel ActionType = "background-model"
ActionTypeCheckOperation ActionType = "check-operation"
ActionTypeCancelOperation ActionType = "cancel-operation"
)

// An ActionDef is a named, observable operation that underlies all Genkit primitives.
Expand Down
Loading
Loading