Documentation
¶
Index ¶
- func ParseTo[T any](e *Event) (*T, error)
- type BackoffManager
- type ClientContentConfig
- type Event
- type EventHandler
- type EventObject
- type EventType
- type Interface
- type IsRetryableErrorFunc
- type NoBackoff
- type RESTClient
- type Request
- func (r *Request) Body(obj interface{}) *Request
- func (r *Request) Do(ctx context.Context) Result
- func (r *Request) MaxRetries(maxRetries int) *Request
- func (r *Request) Param(paramName, s string) *Request
- func (r *Request) Prefix(segments ...string) *Request
- func (r *Request) RequestURI(uri string) *Request
- func (r *Request) SSEWatch(ctx context.Context) (SSEWatcher, error)
- func (r *Request) SetHeader(key string, values ...string) *Request
- func (r *Request) SetHost(host string) *Request
- func (r *Request) Stream(ctx context.Context) (*StreamResult, error)
- func (r *Request) Suffix(segments ...string) *Request
- func (r *Request) Timeout(d time.Duration) *Request
- func (r *Request) URL() *url.URL
- func (r *Request) Verb(verb string) *Request
- func (r *Request) WithBufferSize(bufferSize int) *Request
- func (r *Request) WithEventHandler(handler EventHandler) *Request
- func (r *Request) WithLastEventID(lastEventID string) *Request
- func (r *Request) WithRetryInterval(interval time.Duration) *Request
- type Result
- type RetryAfter
- type SSEWatcher
- type StreamResult
- type URLBackoff
- type WarningHandler
- type WarningLogger
- type WithRetry
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type BackoffManager ¶
type ClientContentConfig ¶
type ClientContentConfig struct {
// AcceptContentTypes specifies the types the client will accept and is optional.
// If not set, ContentType will be used to define the Accept header
AcceptContentTypes string
// ContentType specifies the wire format used to communicate with the server.
// This value will be set as the Accept header on requests made to the server if
// AcceptContentTypes is not set, and as the default content type on any object
// sent to the server. If not set, "application/json" is used.
ContentType string
// Negotiator is used for obtaining encoders and decoders for multiple
// supported media types.
Negotiator runtime.ClientNegotiator
}
type Event ¶
type Event struct {
// Type is the event type (optional, defaults to "message")
Type EventType `json:"type,omitempty"`
// ID is the event ID (optional)
ID string `json:"id,omitempty"`
// Raw contains the original bytes for efficient parsing (K8s RawExtension pattern)
Raw []byte `json:"raw"`
// Retry is the retry time in milliseconds (optional)
Retry int `json:"retry,omitempty"`
// Timestamp is when the event was received
Timestamp time.Time `json:"timestamp"`
}
Event represents a Server-Sent Event with K8s-inspired design patterns
func (*Event) GetDataAsString ¶
GetDataAsString returns the raw data as string for display purposes
func (*Event) GetEventID ¶
GetEventID implements EventObject interface
func (*Event) GetEventType ¶
GetEventType implements EventObject interface
func (*Event) GetRawData ¶
GetRawData returns the raw bytes for direct processing
func (*Event) GetTimestamp ¶
GetTimestamp implements EventObject interface
func (*Event) ParseEventData ¶
ParseEventData parses event data as JSON into the target object
type EventHandler ¶
type EventHandler interface {
// OnEvent is called when an event is received
OnEvent(event *Event)
// OnError is called when an error occurs
OnError(err error)
// OnConnect is called when the connection is established
OnConnect()
// OnDisconnect is called when the connection is closed
OnDisconnect()
}
EventHandler defines the interface for handling SSE events
type EventObject ¶
type EventObject interface {
// GetEventType returns the event type
GetEventType() EventType
// GetEventID returns the event ID
GetEventID() string
// GetTimestamp returns the event timestamp
GetTimestamp() time.Time
}
EventObject represents a parsed event with typed data (K8s Object pattern)
type EventType ¶
type EventType string
EventType represents the type of SSE event
const ( // EventTypeData represents a data event EventTypeData EventType = "data" // EventTypeError represents an error event EventTypeError EventType = "error" // EventTypeClose represents a connection close event EventTypeClose EventType = "close" // EventTypeOpen represents a connection open event EventTypeOpen EventType = "open" // EventTypeRetry represents a retry event EventTypeRetry EventType = "retry" )
type Interface ¶
type Interface interface {
Verb(verb string) *Request
Get() *Request
Post() *Request
Put() *Request
Delete() *Request
// SSE creates a Server-Sent Events watcher
SSE() *Request
}
Interface captures the set of operations for generically interacting with other services
type IsRetryableErrorFunc ¶
IsRetryableErrorFunc allows the client to provide its own function that determines whether the specified err from the server is retryable.
request: the original request sent to the server err: the server sent this error to us
The function returns true if the error is retryable and the request can be retried, otherwise it returns false. We have four mode of communications - 'Stream', 'Watch', 'Do' and 'DoRaw', this function allows us to customize the retryability aspect of each.
func (IsRetryableErrorFunc) IsErrorRetryable ¶
func (r IsRetryableErrorFunc) IsErrorRetryable(request *http.Request, err error) bool
type NoBackoff ¶
type NoBackoff struct {
}
NoBackoff is a stub implementation, can be used for mocking or else as a default.
func (*NoBackoff) CalculateBackoff ¶
type RESTClient ¶
type RESTClient struct {
// contains filtered or unexported fields
}
func NewRESTClient ¶
func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ClientContentConfig, client *http.Client) *RESTClient
func (*RESTClient) Delete ¶
func (c *RESTClient) Delete() *Request
Delete begins a DELETE request. Short for c.Verb("DELETE").
func (*RESTClient) Get ¶
func (c *RESTClient) Get() *Request
Get begins a GET request. Short for c.Verb("GET").
func (*RESTClient) Post ¶
func (c *RESTClient) Post() *Request
Post begins a POST request. Short for c.Verb("POST").
func (*RESTClient) Put ¶
func (c *RESTClient) Put() *Request
Put begins a PUT request. Short for c.Verb("PUT").
func (*RESTClient) SSE ¶
func (c *RESTClient) SSE() *Request
SSE begins a Server-Sent Events request. Short for c.Verb("GET") with SSE headers.
func (*RESTClient) Verb ¶
func (c *RESTClient) Verb(verb string) *Request
type Request ¶
type Request struct {
// contains filtered or unexported fields
}
func NewRequest ¶
func NewRequest(c *RESTClient) *Request
func (*Request) Body ¶
Body makes the request use obj as the body. Optional. If obj is a string, try to read a file of that name. If obj is a []byte, send it directly. If obj is an io.Reader, use it directly. Otherwise, set an error.
func (*Request) Do ¶
Do formats and executes the request. Returns a Result object for easy response processing.
Error type:
- If the server responds with a status: *errors.StatusError or *errors.UnexpectedObjectError
- http.Client.Do errors are returned directly.
func (*Request) MaxRetries ¶
func (*Request) RequestURI ¶
func (*Request) SSEWatch ¶
func (r *Request) SSEWatch(ctx context.Context) (SSEWatcher, error)
SSEWatch creates an SSE watcher for streaming events
func (*Request) Stream ¶
func (r *Request) Stream(ctx context.Context) (*StreamResult, error)
Stream executes the request and returns a streaming result for SSE
func (*Request) Timeout ¶
Timeout makes the request use the given duration as an overall timeout for the request. Additionally, if set passes the value as "timeout" parameter in URL.
func (*Request) URL ¶
URL returns the current working URL. Check the result of Error() to ensure that the returned URL is valid.
func (*Request) WithBufferSize ¶
WithBufferSize sets the buffer size for SSE event channels
func (*Request) WithEventHandler ¶
func (r *Request) WithEventHandler(handler EventHandler) *Request
WithEventHandler sets the event handler for SSE connections
func (*Request) WithLastEventID ¶
WithLastEventID sets the last event ID for SSE reconnection
type Result ¶
type Result struct {
// contains filtered or unexported fields
}
Result contains the result of calling Request.Do().
type RetryAfter ¶
type RetryAfter struct {
// Wait is the duration the server has asked us to wait before
// the next retry is initiated.
// This is the value of the 'Retry-After' response header in seconds.
Wait time.Duration
// Attempt is the Nth attempt after which we have received a retryable
// error or a 'Retry-After' response header from the server.
Attempt int
// Reason describes why we are retrying the request
Reason string
}
RetryAfter holds information associated with the next retry.
type SSEWatcher ¶
type SSEWatcher interface {
// Start begins watching for SSE events
Start(ctx context.Context) error
// Stop stops watching for SSE events
Stop()
// Events returns a channel to receive events
Events() <-chan *Event
// Errors returns a channel to receive errors
Errors() <-chan error
// IsStopped returns true if the watcher is stopped
IsStopped() bool
}
SSEWatcher defines the interface for SSE watching functionality
type StreamResult ¶
type StreamResult struct {
// contains filtered or unexported fields
}
StreamResult represents a streaming HTTP response
func (*StreamResult) Close ¶
func (sr *StreamResult) Close() error
Close closes the streaming response
func (*StreamResult) ReadEvent ¶
func (sr *StreamResult) ReadEvent() (*Event, error)
ReadEvent reads the next SSE event from the stream
func (*StreamResult) Response ¶
func (sr *StreamResult) Response() *http.Response
Response returns the underlying HTTP response
type URLBackoff ¶
type URLBackoff struct {
// Uses backoff as underlying implementation.
Backoff *flowcontrol.Backoff
}
URLBackoff struct implements the semantics on top of Backoff which we need for URL specific exponential backoff.
func (*URLBackoff) CalculateBackoff ¶
func (b *URLBackoff) CalculateBackoff(actualUrl *url.URL) time.Duration
CalculateBackoff takes a url and back's off exponentially, based on its knowledge of existing failures.
func (*URLBackoff) Disable ¶
func (b *URLBackoff) Disable()
Disable makes the backoff trivial, i.e., sets it to zero. This might be used by tests which want to run 1000s of mock requests without slowing down.
func (*URLBackoff) Sleep ¶
func (b *URLBackoff) Sleep(d time.Duration)
func (*URLBackoff) UpdateBackoff ¶
func (b *URLBackoff) UpdateBackoff(actualUrl *url.URL, err error, responseCode int)
UpdateBackoff updates backoff metadata
type WarningHandler ¶
type WarningHandler interface {
// HandleWarningHeader is called with the warn code, agent, and text when a warning header is countered.
HandleWarningHeader(code int, agent string, text string)
}
WarningHandler is an interface for handling warning headers
type WarningLogger ¶
type WarningLogger struct{}
WarningLogger is an implementation of WarningHandler that logs code 299 warnings
func (WarningLogger) HandleWarningHeader ¶
func (WarningLogger) HandleWarningHeader(code int, agent string, message string)
type WithRetry ¶
type WithRetry interface {
// IsNextRetry advances the retry counter appropriately
// and returns true if the request should be retried,
// otherwise it returns false, if:
// - we have already reached the maximum retry threshold.
// - the error does not fall into the retryable category.
// - the server has not sent us a 429, or 5xx status code and the
// 'Retry-After' response header is not set with a value.
// - we need to seek to the beginning of the request body before we
// initiate the next retry, the function should log an error and
// return false if it fails to do so.
//
// restReq: the associated rest.Request
// httpReq: the HTTP Request sent to the server
// resp: the response sent from the server, it is set if err is nil
// err: the server sent this error to us, if err is set then resp is nil.
// f: a IsRetryableErrorFunc function provided by the client that determines
// if the err sent by the server is retryable.
IsNextRetry(ctx context.Context, restReq *Request, httpReq *http.Request, resp *http.Response, err error, f IsRetryableErrorFunc) bool
// Before should be invoked prior to each attempt, including
// the first one. If an error is returned, the request should
// be aborted immediately.
//
// Before may also be additionally responsible for preparing
// the request for the next retry, namely in terms of resetting
// the request body in case it has been read.
Before(ctx context.Context, r *Request) error
// After should be invoked immediately after an attempt is made.
After(ctx context.Context, r *Request, resp *http.Response, err error)
// WrapPreviousError wraps the error from any previous attempt into
// the final error specified in 'finalErr', so the user has more
// context why the request failed.
// For example, if a request times out after multiple retries then
// we see a generic context.Canceled or context.DeadlineExceeded
// error which is not very useful in debugging. This function can
// wrap any error from previous attempt(s) to provide more context to
// the user. The error returned in 'err' must satisfy the
// following conditions:
// a: errors.Unwrap(err) = errors.Unwrap(finalErr) if finalErr
// implements Unwrap
// b: errors.Unwrap(err) = finalErr if finalErr does not
// implements Unwrap
// c: errors.Is(err, otherErr) = errors.Is(finalErr, otherErr)
WrapPreviousError(finalErr error) (err error)
}
WithRetry allows the client to retry a request up to a certain number of times Note that WithRetry is not safe for concurrent use by multiple goroutines without additional locking or coordination.