Documentation
¶
Index ¶
- Constants
- type Action
- func AdaptAction[T any, U any](action Action[U], getter InternalTypeGetter[T, U], ...) Action[T]
- func AsParallelMapAction[K comparable, T any](name string, action Action[T]) Action[map[K]T]
- func AsParallelSliceAction[T any](name string, action Action[T]) Action[[]T]
- func AsRetryableAction[T any](name string, mainAction, rollbackAction Action[T], maxRetry int) Action[T]
- func AsSequenceMapAction[K comparable, T any](name string, action Action[T]) Action[map[K]T]
- func AsSequenceSliceAction[T any](name string, action Action[T], stopOnError bool) Action[[]T]
- func NewAggregateAction[T any, U any](action Action[U], getter AggregateGetter[T, U], setter AggregateSetter[T, U]) Action[T]
- func NewParallelMapAction[K comparable, T any](name string, action Action[T]) Action[map[K]T]
- func NewParallelMapPipeline[K comparable, T any](name string, action Action[T]) Action[map[K]T]
- func NewParallelSliceAction[T any](name string, action Action[T]) Action[[]T]
- func NewParallelSlicePipeline[T any](name string, action Action[T]) Action[[]T]
- func NewSequenceMapAction[K comparable, T any](name string, action Action[T]) Action[map[K]T]
- func NewSequenceMapPipeline[K comparable, T any](name string, action Action[T]) Action[map[K]T]
- func NewSequenceSliceAction[T any](name string, action Action[T], stopOnError bool) Action[[]T]
- func NewSequenceSlicePipeline[T any](name string, action Action[T], stopOnError bool) Action[[]T]
- func NewSimpleAction[T any](name string, runFunc RunFunc[T]) Action[T]
- func NewTypeAdapterAction[T any, U any](action Action[U], getter InternalValueGetter[T, U], ...) Action[T]
- func SkipRollback[T any]() Action[T]
- func Terminate[T any]() Action[T]
- type ActionPlan
- type AggregateGetter
- type AggregateSetter
- type BranchAction
- type BranchFunc
- type ExternalTypeSetter
- type InternalTypeGetter
- type InternalValueGetter
- type InternalValueSetter
- type Pipeline
- type RunFunc
- type RunPlan
- type Workflow
- func (w *Workflow[T]) Name() string
- func (w *Workflow[T]) Run(ctx context.Context, input T) (output T, err error)
- func (w *Workflow[T]) RunAt(initAction Action[T], ctx context.Context, input T) (output T, lastErr error)
- func (w *Workflow[T]) SetRunPlan(currentAction Action[T], plan RunPlan[T])
- func (w *Workflow[T]) ValidateGraph() error
Constants ¶
const ( // Success represents the direction indicating that the action completed successfully // and the Workflow should continue. Success = "success" // Failure represents the direction indicating that an error occurred, // and the Workflow should handle it accordingly. Failure = "failure" // Abort represents the direction indicating that // the Workflow execution should be aborted immediately. // This can occur due to a specific Abort condition or // in cases of unexpected errors or panics that cause the Workflow to halt. Abort = "abort" )
const Error = Failure
Deprecated. Use Failure
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Action ¶
type Action[T any] interface { // Name provides the identifier of this Action. Name() string // Run executes the Action, processing the input and returning output or an error. Run(ctx context.Context, input T) (output T, err error) }
Action is the basic unit of execution in a package. It represents a single task that processes input and produces output.
func AdaptAction ¶ added in v1.2.0
func AdaptAction[T any, U any]( action Action[U], getter InternalTypeGetter[T, U], setter ExternalTypeSetter[T, U], ) Action[T]
AdaptAction creates an Action that works with a composite data structure (T), where T is a complex type (e.g., a struct with multiple fields) and U is the data type that the Action operates on. The InternalTypeGetter and ExternalTypeSetter functions are used to extract U from T and re-integrate the processed result back into T.
func AsParallelMapAction ¶ added in v1.2.0
func AsParallelMapAction[K comparable, T any](name string, action Action[T]) Action[map[K]T]
AsParallelMapAction creates an Action that processes a map's values in parallel. Each value is transformed by the given action concurrently, maintaining the original keys.
The action handles panics gracefully, continuing execution of other goroutines when one fails. If any error or panic occurs, the action returns an error but still provides the processed output for successful operations.
func AsParallelSliceAction ¶ added in v1.2.0
AsParallelSliceAction creates an Action that processes a slice's elements in parallel. Each element is transformed by the given action concurrently, maintaining the original order.
The action handles panics gracefully, continuing execution of other goroutines when one fails. If any error or panic occurs, the action returns an error but still provides the processed output for successful operations.
func AsRetryableAction ¶ added in v1.2.0
func AsRetryableAction[T any](name string, mainAction, rollbackAction Action[T], maxRetry int) Action[T]
AsRetryableAction creates an Action that retries the mainAction up to maxRetry times.
If the mainAction fails, it executes the rollbackAction before the next retry attempt. The rollbackAction is not executed on the final attempt if it fails. rollbackAction can be skipped when it is nil.
func AsSequenceMapAction ¶ added in v1.2.0
func AsSequenceMapAction[K comparable, T any](name string, action Action[T]) Action[map[K]T]
AsSequenceMapAction creates an Action that processes a map's values sequentially. Each value is transformed by the given action one at a time, maintaining the original keys.
Unlike parallel processing, sequential execution stops immediately when a panic occurs, leaving unprocessed values unchanged in the output.
func AsSequenceSliceAction ¶ added in v1.2.0
AsSequenceSliceAction creates an Action that processes a slice's elements sequentially. Each element is transformed by the given action one at a time, maintaining the original order.
The stopOnError parameter controls error handling behavior: - When true: stops processing immediately on the first error, leaving remaining elements unchanged - When false: continues processing all elements even if errors occur Panics always stop execution regardless of the stopOnError setting.
func NewAggregateAction ¶
func NewAggregateAction[T any, U any]( action Action[U], getter AggregateGetter[T, U], setter AggregateSetter[T, U], ) Action[T]
Deprecated. Use AdaptAction
func NewParallelMapAction ¶ added in v1.1.1
func NewParallelMapAction[K comparable, T any](name string, action Action[T]) Action[map[K]T]
Deprecated. Use AsParallelMapAction
func NewParallelMapPipeline ¶ added in v1.1.0
func NewParallelMapPipeline[K comparable, T any](name string, action Action[T]) Action[map[K]T]
Deprecated. Use AsParallelMapAction
func NewParallelSliceAction ¶ added in v1.1.1
Deprecated. Use AsParallelSliceAction
func NewParallelSlicePipeline ¶ added in v1.1.0
Deprecated. Use AsParallelSliceAction
func NewSequenceMapAction ¶ added in v1.1.1
func NewSequenceMapAction[K comparable, T any](name string, action Action[T]) Action[map[K]T]
Deprecated. Use AsSequenceMapAction
func NewSequenceMapPipeline ¶ added in v1.1.0
func NewSequenceMapPipeline[K comparable, T any](name string, action Action[T]) Action[map[K]T]
Deprecated. Use AsSequenceMapAction
func NewSequenceSliceAction ¶ added in v1.1.1
Deprecated. Use AsSequenceSliceAction
func NewSequenceSlicePipeline ¶ added in v1.1.0
Deprecated. Use AsSequenceSliceAction
func NewSimpleAction ¶
NewSimpleAction creates a new Action with a custom Run function, which can be a pure function or closure. The provided runFunc must match the RunFunc signature, where T is a generic type representing the input and output types for the Action's execution.
This allows for the creation of simple Actions without manually defining a separate struct that implements the Action interface.
func NewTypeAdapterAction ¶ added in v1.1.1
func NewTypeAdapterAction[T any, U any]( action Action[U], getter InternalValueGetter[T, U], setter InternalValueSetter[T, U], ) Action[T]
Deprecated. Use AdaptAction
func SkipRollback ¶ added in v1.2.1
SkipRollback provides an Action that explicitly skips the rollback process for a RetryableAction.
When creating a RetryableAction with AsRetryableAction, use SkipRollback() as the rollback action to make the intent clear, rather than passing raw nil.
func Terminate ¶
Terminate provides an Action that explicitly stops the execution of a Workflow.
When returned from a RunPlan, it signals that the Workflow should halt and no further actions should be-executed. This serves as a clear, intentional way to stop a Workflow, as opposed to returning raw nil.
type AggregateGetter ¶
type AggregateGetter[T any, U any] InternalTypeGetter[T, U]
Deprecated. Use InternalTypeGetter
type AggregateSetter ¶
type AggregateSetter[T any, U any] ExternalTypeSetter[T, U]
Deprecated. Use ExternalTypeSetter
type BranchAction ¶
type BranchAction[T any] interface { // Name returns the name of the BranchAction. Name() string // Run executes the branch action, optionally modifying the input and returning an output. // If the input doesn't need changes, it can be passed through as output. The method also // returns an error if the action cannot be executed successfully. Run(ctx context.Context, input T) (output T, err error) // Directions return a list of possible directions that the Workflow can take. // These directions are used for validation and must include all possible values that // NextDirection can return. Directions() []string // NextDirection determines the next execution path based on the result of Run. // It is called only if Run succeeds (err == nil). // The method returns a direction from the list defined by Directions. NextDirection(ctx context.Context, output T) (direction string, err error) }
BranchAction is an interface for actions that control branching in the execution flow of a Workflow. It extends the Action interface and adds methods for handling conditional branching based on the execution results.
func NewSimpleBranchAction ¶
func NewSimpleBranchAction[T any](name string, runFunc RunFunc[T], directions []string, branchFunc BranchFunc[T]) BranchAction[T]
NewSimpleBranchAction creates a new BranchAction with customizable directions. It accepts a name for the action, a slice of directions that define the possible control flow, and a BranchFunc that contains the branching logic, which dictates the next direction based on the action's output.
Additionally, a custom runFunc can be provided to define the execution logic of the action. This function must match the RunFunc signature, where T is the generic type representing the input and output types for the action. If no specific execution logic is needed, the runFunc can be provided as `nil`. In this case, the action will simply pass the input through to the output without modification.
This allows for the creation of simple BranchActions without manually defining a separate struct that implements the BranchAction interface.
type BranchFunc ¶
BranchFunc represents the signature for the function that defines the branching logic for a BranchAction in the package. It takes the running context and output as input and returns the direction for the next step in the process along with any potential error.
type ExternalTypeSetter ¶ added in v1.2.0
ExternalTypeSetter updates a composite data structure (T) with a new subpart (U). It reintegrates the modified part back into the structure and returns the updated structure.
type InternalTypeGetter ¶ added in v1.2.0
InternalTypeGetter extracts a subpart (U) from a composite data structure (T). It allows access to a part of the structure without modifying the entire one.
type InternalValueGetter ¶ added in v1.1.1
type InternalValueGetter[T any, U any] InternalTypeGetter[T, U]
Deprecated. Use InternalTypeGetter
type InternalValueSetter ¶ added in v1.1.1
type InternalValueSetter[T any, U any] ExternalTypeSetter[T, U]
Deprecated. Use ExternalTypeSetter
type RunFunc ¶
RunFunc defines the signature of a function used to implement an Action's execution logic. It is a function that takes an input of type T (a generic type) and returns an output of type T along with any error encountered during execution.
type RunPlan ¶ added in v1.2.2
RunPlan represents a map that associates a direction (Success, Failure, Abort, and other custom branching directions) with the next Action to execute. It is used to define the flow of actions in a Workflow based on the direction of execution.
func DefaultPlan ¶
DefaultPlan returns a standard RunPlan with valid next actions for Success and Failure, and Termination for Abort.
func DefaultPlanWithAbort ¶
DefaultPlanWithAbort returns a RunPlan with valid next actions for Success, Failure, and Abort.
func SuccessOnlyPlan ¶
SuccessOnlyPlan returns a RunPlan where only a success direction has a valid next action, and Failure and Abort both lead to termination.
func TerminationPlan ¶
TerminationPlan returns a RunPlan with all directions leading to termination immediately, providing a clear sign of termination rather than using nil.
type Workflow ¶ added in v1.1.1
type Workflow[T any] struct { // contains filtered or unexported fields }
Workflow represents a sequence of Actions that are executed in a structured flow. It executes each of its constituent Actions in sequence, with each Action following its own Run method. The flow proceeds based on the defined structure of the Workflow, allowing flexible and organized execution of actions to build workflows that can be as simple or complex as needed.
Workflow implements the Action interface, meaning it can be treated as an Action itself. This allows Workflows to be composed hierarchically, enabling more complex workflows by nesting Workflows within other Workflows.
func NewWorkflow ¶ added in v1.1.1
NewWorkflow creates a new Workflow by taking a series of Actions as its members. These Actions will be executed sequentially in the order they are provided, with the output of one Action being passed as input to the next, forming a unidirectional flow of execution.
func (*Workflow[T]) Run ¶ added in v1.1.1
Run executes the Workflow by running Actions in the order they were configured, starting from the initAction, which is the first one of the memberActions provided by the constructor such as NewWorkflow. The actions are executed in order, passing the output of one action as input to the next.
func (*Workflow[T]) RunAt ¶ added in v1.1.1
func (w *Workflow[T]) RunAt(initAction Action[T], ctx context.Context, input T) (output T, lastErr error)
RunAt starts the execution of the Workflow from a given Action (initAction). It follows the action plan, executing actions sequentially based on the specified directions. If an action returns an error, the Workflow will proceed to the next action according to the defined plan, potentially directing the flow to an action mapped for the Failure direction. The Abort direction, when encountered, will immediately halt the Workflow execution unless the plan specifies otherwise. If no action plan is found for a given direction, the Workflow will terminate with the appropriate error.
func (*Workflow[T]) SetRunPlan ¶ added in v1.1.1
SetRunPlan updates the execution flow for the given currentAction in the Workflow by associating it with a specified RunPlan. The currentAction will be validated to ensure it is a member of the Workflow. The RunPlan defines the directions (such as Success, Failure, Abort) and their corresponding next actions in the execution flow.
If the currentAction is nil or not part of the Workflow, a panic will occur. The plan can be nil, in which case the currentAction will be set to terminate for any direction not explicitly specified in the plan. If a direction is encountered in the plan that is not valid for the currentAction, or if it leads to an invalid action, another panic will occur.
Additionally, self-loops are not allowed in the plan. If the next action for a direction is the current action itself, a panic will be triggered.
func (*Workflow[T]) ValidateGraph ¶ added in v1.1.1
ValidateGraph ensures the workflow's graph is connected and acyclic. It checks for cycles first, then verifies that all nodes are connected as a single graph.