chain

package module
v1.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 15, 2025 License: MIT Imports: 8 Imported by: 0

README

Chain: A Flexible Action-Workflow Library

The chain package provides a flexible framework for building and executing sequential workflows of actions. It allows you to define a series of steps, each represented by an Action, which processes inputs and produces outputs. The package supports conditional branching, customizable execution paths, and error handling, enabling complex workflows with minimal boilerplate.

Features

  • Action and Workflow Composition
    Create modular, reusable units of work (Action) and orchestrate them into robust workflows using Workflow.

  • DAG-based Execution Plans
    Ensure predictable execution and robust validation with built-in cycle detection to enforce acyclic workflows.

  • Nested Workflows
    Use Workflows as actions within other Workflows, enabling modular and hierarchical workflow designs.

  • Conditional Branching
    Support for Success, Failure, Abort and custom direction-based branching within your execution flows.

  • AggregateAction Support
    Simplify the orchestration of complex workflows by combining Actions or Workflows of different types into a unified control flow using AggregateAction.

Getting Started

Installation

To install the chain package, use the following command:

go get github.com/JSYoo5B/chain
Key Concepts
Action

An Action represents a single task in the Workflow. Each action can process input data and return output or an error.

type Action[T any] interface {
    Name() string
    Run(ctx context.Context, input T) (output T, err error)
}
BranchAction

A BranchAction extends Action and supports conditional branching. It can change the execution flow based on the results of the action, allowing for multiple execution paths.

type BranchAction[T any] interface {
    Name() string
    Run(ctx context.Context, input T) (output T, err error)
    Directions() []string
    NextDirection(ctx context.Context, output T) (direction string, err error)
}
Workflow

A Workflow is a sequence of Actions executed in order. It orchestrates the flow of data between actions and handles branching, success, error, and abort conditions.

ActionPlan

An ActionPlan is a map that associates a direction (e.g., success, error, abort) with the next Action to execute, defining the flow of a Workflow.

type ActionPlan[T any] map[string]Action[T]

Examples

Practical examples for using the chain package will be added in future updates. Stay tuned!

Documentation

Index

Constants

View Source
const (
	// Success represents the direction indicating that the action completed successfully
	// and the Workflow should continue.
	Success = "success"
	// Failure represents the direction indicating that an error occurred,
	// and the Workflow should handle it accordingly.
	Failure = "failure"
	// Abort represents the direction indicating that
	// the Workflow execution should be aborted immediately.
	// This can occur due to a specific Abort condition or
	// in cases of unexpected errors or panics that cause the Workflow to halt.
	Abort = "abort"
)
View Source
const Error = Failure

Deprecated. Use Failure

Variables

This section is empty.

Functions

This section is empty.

Types

type Action

type Action[T any] interface {
	// Name provides the identifier of this Action.
	Name() string

	// Run executes the Action, processing the input and returning output or an error.
	Run(ctx context.Context, input T) (output T, err error)
}

Action is the basic unit of execution in a package. It represents a single task that processes input and produces output.

func AdaptAction added in v1.2.0

func AdaptAction[T any, U any](
	action Action[U],
	getter InternalTypeGetter[T, U],
	setter ExternalTypeSetter[T, U],
) Action[T]

AdaptAction creates an Action that works with a composite data structure (T), where T is a complex type (e.g., a struct with multiple fields) and U is the data type that the Action operates on. The InternalTypeGetter and ExternalTypeSetter functions are used to extract U from T and re-integrate the processed result back into T.

func AsParallelMapAction added in v1.2.0

func AsParallelMapAction[K comparable, T any](name string, action Action[T]) Action[map[K]T]

AsParallelMapAction creates an Action that processes a map's values in parallel. Each value is transformed by the given action concurrently, maintaining the original keys.

The action handles panics gracefully, continuing execution of other goroutines when one fails. If any error or panic occurs, the action returns an error but still provides the processed output for successful operations.

func AsParallelSliceAction added in v1.2.0

func AsParallelSliceAction[T any](name string, action Action[T]) Action[[]T]

AsParallelSliceAction creates an Action that processes a slice's elements in parallel. Each element is transformed by the given action concurrently, maintaining the original order.

The action handles panics gracefully, continuing execution of other goroutines when one fails. If any error or panic occurs, the action returns an error but still provides the processed output for successful operations.

func AsRetryableAction added in v1.2.0

func AsRetryableAction[T any](name string, mainAction, rollbackAction Action[T], maxRetry int) Action[T]

AsRetryableAction creates an Action that retries the mainAction up to maxRetry times.

If the mainAction fails, it executes the rollbackAction before the next retry attempt. The rollbackAction is not executed on the final attempt if it fails. rollbackAction can be skipped when it is nil.

func AsSequenceMapAction added in v1.2.0

func AsSequenceMapAction[K comparable, T any](name string, action Action[T]) Action[map[K]T]

AsSequenceMapAction creates an Action that processes a map's values sequentially. Each value is transformed by the given action one at a time, maintaining the original keys.

Unlike parallel processing, sequential execution stops immediately when a panic occurs, leaving unprocessed values unchanged in the output.

func AsSequenceSliceAction added in v1.2.0

func AsSequenceSliceAction[T any](name string, action Action[T], stopOnError bool) Action[[]T]

AsSequenceSliceAction creates an Action that processes a slice's elements sequentially. Each element is transformed by the given action one at a time, maintaining the original order.

The stopOnError parameter controls error handling behavior: - When true: stops processing immediately on the first error, leaving remaining elements unchanged - When false: continues processing all elements even if errors occur Panics always stop execution regardless of the stopOnError setting.

func NewAggregateAction

func NewAggregateAction[T any, U any](
	action Action[U],
	getter AggregateGetter[T, U],
	setter AggregateSetter[T, U],
) Action[T]

Deprecated. Use AdaptAction

func NewParallelMapAction added in v1.1.1

func NewParallelMapAction[K comparable, T any](name string, action Action[T]) Action[map[K]T]

Deprecated. Use AsParallelMapAction

func NewParallelMapPipeline added in v1.1.0

func NewParallelMapPipeline[K comparable, T any](name string, action Action[T]) Action[map[K]T]

Deprecated. Use AsParallelMapAction

func NewParallelSliceAction added in v1.1.1

func NewParallelSliceAction[T any](name string, action Action[T]) Action[[]T]

Deprecated. Use AsParallelSliceAction

func NewParallelSlicePipeline added in v1.1.0

func NewParallelSlicePipeline[T any](name string, action Action[T]) Action[[]T]

Deprecated. Use AsParallelSliceAction

func NewSequenceMapAction added in v1.1.1

func NewSequenceMapAction[K comparable, T any](name string, action Action[T]) Action[map[K]T]

Deprecated. Use AsSequenceMapAction

func NewSequenceMapPipeline added in v1.1.0

func NewSequenceMapPipeline[K comparable, T any](name string, action Action[T]) Action[map[K]T]

Deprecated. Use AsSequenceMapAction

func NewSequenceSliceAction added in v1.1.1

func NewSequenceSliceAction[T any](name string, action Action[T], stopOnError bool) Action[[]T]

Deprecated. Use AsSequenceSliceAction

func NewSequenceSlicePipeline added in v1.1.0

func NewSequenceSlicePipeline[T any](name string, action Action[T], stopOnError bool) Action[[]T]

Deprecated. Use AsSequenceSliceAction

func NewSimpleAction

func NewSimpleAction[T any](name string, runFunc RunFunc[T]) Action[T]

NewSimpleAction creates a new Action with a custom Run function, which can be a pure function or closure. The provided runFunc must match the RunFunc signature, where T is a generic type representing the input and output types for the Action's execution.

This allows for the creation of simple Actions without manually defining a separate struct that implements the Action interface.

func NewTypeAdapterAction added in v1.1.1

func NewTypeAdapterAction[T any, U any](
	action Action[U],
	getter InternalValueGetter[T, U],
	setter InternalValueSetter[T, U],
) Action[T]

Deprecated. Use AdaptAction

func SkipRollback added in v1.2.1

func SkipRollback[T any]() Action[T]

SkipRollback provides an Action that explicitly skips the rollback process for a RetryableAction.

When creating a RetryableAction with AsRetryableAction, use SkipRollback() as the rollback action to make the intent clear, rather than passing raw nil.

func Terminate

func Terminate[T any]() Action[T]

Terminate provides an Action that explicitly stops the execution of a Workflow.

When returned from a RunPlan, it signals that the Workflow should halt and no further actions should be-executed. This serves as a clear, intentional way to stop a Workflow, as opposed to returning raw nil.

type ActionPlan

type ActionPlan[T any] RunPlan[T]

Deprecated. Use RunPlan

type AggregateGetter

type AggregateGetter[T any, U any] InternalTypeGetter[T, U]

Deprecated. Use InternalTypeGetter

type AggregateSetter

type AggregateSetter[T any, U any] ExternalTypeSetter[T, U]

Deprecated. Use ExternalTypeSetter

type BranchAction

type BranchAction[T any] interface {
	// Name returns the name of the BranchAction.
	Name() string

	// Run executes the branch action, optionally modifying the input and returning an output.
	// If the input doesn't need changes, it can be passed through as output. The method also
	// returns an error if the action cannot be executed successfully.
	Run(ctx context.Context, input T) (output T, err error)

	// Directions return a list of possible directions that the Workflow can take.
	// These directions are used for validation and must include all possible values that
	// NextDirection can return.
	Directions() []string

	// NextDirection determines the next execution path based on the result of Run.
	// It is called only if Run succeeds (err == nil).
	// The method returns a direction from the list defined by Directions.
	NextDirection(ctx context.Context, output T) (direction string, err error)
}

BranchAction is an interface for actions that control branching in the execution flow of a Workflow. It extends the Action interface and adds methods for handling conditional branching based on the execution results.

func NewSimpleBranchAction

func NewSimpleBranchAction[T any](name string, runFunc RunFunc[T], directions []string, branchFunc BranchFunc[T]) BranchAction[T]

NewSimpleBranchAction creates a new BranchAction with customizable directions. It accepts a name for the action, a slice of directions that define the possible control flow, and a BranchFunc that contains the branching logic, which dictates the next direction based on the action's output.

Additionally, a custom runFunc can be provided to define the execution logic of the action. This function must match the RunFunc signature, where T is the generic type representing the input and output types for the action. If no specific execution logic is needed, the runFunc can be provided as `nil`. In this case, the action will simply pass the input through to the output without modification.

This allows for the creation of simple BranchActions without manually defining a separate struct that implements the BranchAction interface.

type BranchFunc

type BranchFunc[T any] func(ctx context.Context, output T) (direction string, err error)

BranchFunc represents the signature for the function that defines the branching logic for a BranchAction in the package. It takes the running context and output as input and returns the direction for the next step in the process along with any potential error.

type ExternalTypeSetter added in v1.2.0

type ExternalTypeSetter[T any, U any] func(T, U) T

ExternalTypeSetter updates a composite data structure (T) with a new subpart (U). It reintegrates the modified part back into the structure and returns the updated structure.

type InternalTypeGetter added in v1.2.0

type InternalTypeGetter[T any, U any] func(T) U

InternalTypeGetter extracts a subpart (U) from a composite data structure (T). It allows access to a part of the structure without modifying the entire one.

type InternalValueGetter added in v1.1.1

type InternalValueGetter[T any, U any] InternalTypeGetter[T, U]

Deprecated. Use InternalTypeGetter

type InternalValueSetter added in v1.1.1

type InternalValueSetter[T any, U any] ExternalTypeSetter[T, U]

Deprecated. Use ExternalTypeSetter

type Pipeline

type Pipeline[T any] struct {
	*Workflow[T]
}

Deprecated. Use Workflow

func NewPipeline

func NewPipeline[T any](name string, memberActions ...Action[T]) *Pipeline[T]

Deprecated. Use NewWorkflow

type RunFunc

type RunFunc[T any] func(ctx context.Context, input T) (output T, err error)

RunFunc defines the signature of a function used to implement an Action's execution logic. It is a function that takes an input of type T (a generic type) and returns an output of type T along with any error encountered during execution.

type RunPlan added in v1.2.2

type RunPlan[T any] map[string]Action[T]

RunPlan represents a map that associates a direction (Success, Failure, Abort, and other custom branching directions) with the next Action to execute. It is used to define the flow of actions in a Workflow based on the direction of execution.

func DefaultPlan

func DefaultPlan[T any](success, error Action[T]) RunPlan[T]

DefaultPlan returns a standard RunPlan with valid next actions for Success and Failure, and Termination for Abort.

func DefaultPlanWithAbort

func DefaultPlanWithAbort[T any](success, error, abort Action[T]) RunPlan[T]

DefaultPlanWithAbort returns a RunPlan with valid next actions for Success, Failure, and Abort.

func SuccessOnlyPlan

func SuccessOnlyPlan[T any](success Action[T]) RunPlan[T]

SuccessOnlyPlan returns a RunPlan where only a success direction has a valid next action, and Failure and Abort both lead to termination.

func TerminationPlan

func TerminationPlan[T any]() RunPlan[T]

TerminationPlan returns a RunPlan with all directions leading to termination immediately, providing a clear sign of termination rather than using nil.

type Workflow added in v1.1.1

type Workflow[T any] struct {
	// contains filtered or unexported fields
}

Workflow represents a sequence of Actions that are executed in a structured flow. It executes each of its constituent Actions in sequence, with each Action following its own Run method. The flow proceeds based on the defined structure of the Workflow, allowing flexible and organized execution of actions to build workflows that can be as simple or complex as needed.

Workflow implements the Action interface, meaning it can be treated as an Action itself. This allows Workflows to be composed hierarchically, enabling more complex workflows by nesting Workflows within other Workflows.

func NewWorkflow added in v1.1.1

func NewWorkflow[T any](name string, memberActions ...Action[T]) *Workflow[T]

NewWorkflow creates a new Workflow by taking a series of Actions as its members. These Actions will be executed sequentially in the order they are provided, with the output of one Action being passed as input to the next, forming a unidirectional flow of execution.

func (*Workflow[T]) Name added in v1.1.1

func (w *Workflow[T]) Name() string

Name provides the identifier of this Workflow.

func (*Workflow[T]) Run added in v1.1.1

func (w *Workflow[T]) Run(ctx context.Context, input T) (output T, err error)

Run executes the Workflow by running Actions in the order they were configured, starting from the initAction, which is the first one of the memberActions provided by the constructor such as NewWorkflow. The actions are executed in order, passing the output of one action as input to the next.

func (*Workflow[T]) RunAt added in v1.1.1

func (w *Workflow[T]) RunAt(initAction Action[T], ctx context.Context, input T) (output T, lastErr error)

RunAt starts the execution of the Workflow from a given Action (initAction). It follows the action plan, executing actions sequentially based on the specified directions. If an action returns an error, the Workflow will proceed to the next action according to the defined plan, potentially directing the flow to an action mapped for the Failure direction. The Abort direction, when encountered, will immediately halt the Workflow execution unless the plan specifies otherwise. If no action plan is found for a given direction, the Workflow will terminate with the appropriate error.

func (*Workflow[T]) SetRunPlan added in v1.1.1

func (w *Workflow[T]) SetRunPlan(currentAction Action[T], plan RunPlan[T])

SetRunPlan updates the execution flow for the given currentAction in the Workflow by associating it with a specified RunPlan. The currentAction will be validated to ensure it is a member of the Workflow. The RunPlan defines the directions (such as Success, Failure, Abort) and their corresponding next actions in the execution flow.

If the currentAction is nil or not part of the Workflow, a panic will occur. The plan can be nil, in which case the currentAction will be set to terminate for any direction not explicitly specified in the plan. If a direction is encountered in the plan that is not valid for the currentAction, or if it leads to an invalid action, another panic will occur.

Additionally, self-loops are not allowed in the plan. If the next action for a direction is the current action itself, a panic will be triggered.

func (*Workflow[T]) ValidateGraph added in v1.1.1

func (w *Workflow[T]) ValidateGraph() error

ValidateGraph ensures the workflow's graph is connected and acyclic. It checks for cycles first, then verifies that all nodes are connected as a single graph.

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL