icanal

package module
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 13, 2025 License: Apache-2.0 Imports: 24 Imported by: 0

README

icanal

A canal client for golang;

说明

icanal是一个alibaba canal的golang client,灵感来源于go-canal;但是比canal-go要更稳定和面像地道的golang设计,支持更灵活的配置化

安装

go get github.com/kalvinzhang/icanal

使用

Cluster Connector

参照 example/cluster/cluster.go

	ctx := context.TODO()
	// 创建一个通道来接收信号
	sigChan := make(chan os.Signal, 1)
	// 使用signal.Notify注册要接收的信号,这里注册了SIGINT(Ctrl+C)
	signal.Notify(sigChan, syscall.SIGINT)

	connector := icanal.NewClusterConnector(
		"example",
		[]string{"127.0.0.1:2181"},
		time.Second*10,
		icanal.WithUsername("canal"),
		icanal.WithPassword("canal"),
	)

	if err := connector.Connect(ctx); err != nil {
		slog.ErrorContext(ctx, "connect error", slog.Any("error", err))
		return
	}

	if err := connector.Subscribe(ctx, ".*\\..*"); err != nil {
		slog.ErrorContext(ctx, "subscribe error", slog.Any("error", err))
		return
	}

outer:
	for {
		select {
		case <-sigChan:
			fmt.Println("Received an interrupt, stopping example...")
			break outer
		default:
			message, err := connector.Get(ctx, 10, time.Second)
			if err != nil {
				slog.ErrorContext(ctx, "get error", slog.Any("error", err))
				return
			}
			batchId := message.Id
			if batchId == -1 || len(message.Entries) <= 0 {
				time.Sleep(300 * time.Millisecond)
				slog.DebugContext(ctx, "no data")
				continue
			}
			util.PrintEntry(ctx, message.Entries)
		}
	}

	if err := connector.Unsubscribe(ctx); err != nil {
		return
	}

	fmt.Println("unsubscribed")

	if err := connector.Disconnect(ctx); err != nil {
		return
	}

	fmt.Println("disconnected")

	fmt.Println("exited")
Simple Connector

参照 example/simple.go

    ctx := context.TODO()
	connector := icanal.NewSimpleConnector(
		"127.0.0.1:11111", "example",
		icanal.WithUsername("canal"),
		icanal.WithPassword("canal"),
	)

	if err := connector.Connect(ctx); err != nil {
		slog.ErrorContext(ctx, "connect error", slog.Any("error", err))
		return
	}

	if err := connector.Subscribe(ctx, ".*\\..*"); err != nil {
		slog.ErrorContext(ctx, "subscribe error", slog.Any("error", err))
		return
	}
	for {
		message, err := connector.Get(ctx, 10, time.Second)
		if err != nil {
			slog.ErrorContext(ctx, "get error", slog.Any("error", err))
			return
		}
		batchId := message.Id
		if batchId == -1 || len(message.Entries) <= 0 {
			time.Sleep(300 * time.Millisecond)
			slog.DebugContext(ctx, "no data")
			continue
		}

		util.PrintEntry(ctx, message.Entries)
	}

Documentation

Index

Constants

View Source
const (
	BatchSizeDefault     = 1000                  // 默认batch size
	ClientIdDefault      = 1001                  // 默认client id
	SoTimeoutDefault     = 60 * time.Second      // 默认socket超时;60秒
	IdleTimeoutDefault   = 60 * 60 * time.Second // 默认空闲超时;60分钟
	RetryTimesDefault    = 3                     // 默认重试次数
	RetryIntervalDefault = 5 * time.Second       // 默认重试间隔时间
)
View Source
const (
	TimeoutDefault = -1 // 立马返回,不阻塞
	TimeoutNever   = 0  // 一直阻塞,直到有数据才返回;主要soTimeout配置
)
View Source
const (
	CanalVersion1 = 1
)

Variables

View Source
var (
	EntryType_name = map[int32]string{
		0: "ENTRYTYPECOMPATIBLEPROTO2",
		1: "TRANSACTIONBEGIN",
		2: "ROWDATA",
		3: "TRANSACTIONEND",
		4: "HEARTBEAT",
		5: "GTIDLOG",
	}
	EntryType_value = map[string]int32{
		"ENTRYTYPECOMPATIBLEPROTO2": 0,
		"TRANSACTIONBEGIN":          1,
		"ROWDATA":                   2,
		"TRANSACTIONEND":            3,
		"HEARTBEAT":                 4,
		"GTIDLOG":                   5,
	}
)

Enum value maps for EntryType.

View Source
var (
	EventType_name = map[int32]string{
		0:  "EVENTTYPECOMPATIBLEPROTO2",
		1:  "INSERT",
		2:  "UPDATE",
		3:  "DELETE",
		4:  "CREATE",
		5:  "ALTER",
		6:  "ERASE",
		7:  "QUERY",
		8:  "TRUNCATE",
		9:  "RENAME",
		10: "CINDEX",
		11: "DINDEX",
		12: "GTID",
		13: "XACOMMIT",
		14: "XAROLLBACK",
		15: "MHEARTBEAT",
	}
	EventType_value = map[string]int32{
		"EVENTTYPECOMPATIBLEPROTO2": 0,
		"INSERT":                    1,
		"UPDATE":                    2,
		"DELETE":                    3,
		"CREATE":                    4,
		"ALTER":                     5,
		"ERASE":                     6,
		"QUERY":                     7,
		"TRUNCATE":                  8,
		"RENAME":                    9,
		"CINDEX":                    10,
		"DINDEX":                    11,
		"GTID":                      12,
		"XACOMMIT":                  13,
		"XAROLLBACK":                14,
		"MHEARTBEAT":                15,
	}
)

Enum value maps for EventType.

View Source
var (
	Type_name = map[int32]string{
		0: "TYPECOMPATIBLEPROTO2",
		1: "ORACLE",
		2: "MYSQL",
		3: "PGSQL",
	}
	Type_value = map[string]int32{
		"TYPECOMPATIBLEPROTO2": 0,
		"ORACLE":               1,
		"MYSQL":                2,
		"PGSQL":                3,
	}
)

Enum value maps for Type.

View Source
var (
	ErrUnsupportedVersion    = errors.New("unsupported version at this client")
	ErrHandshake             = errors.New("expect handshake but found other type")
	ErrExpectedPacketType    = errors.New("expect packet type but found other type")
	ErrAuth                  = errors.New("auth error")
	ErrUnmarshal             = errors.New("unmarshal error")
	ErrCompressionNotSupport = errors.New("compression is not supported in this connector")
	ErrNetwork               = errors.New("network error")
	ErrOverRetryTimes        = errors.New("over retry times")
	ErrSubscribe             = errors.New("subscribe error")
	ErrUnsubscribe           = errors.New("unsubscribe error")
)
View Source
var File_proto_entry_protocol_proto protoreflect.FileDescriptor

Functions

func NewCanalError added in v1.0.0

func NewCanalError(code int32, msg string) error

Types

type CanalError added in v1.0.0

type CanalError struct {
	Code int32
	Msg  string
}

func (*CanalError) Error added in v1.0.0

func (e *CanalError) Error() string

type ClientIdentity

type ClientIdentity struct {
	Destination string
	ClientId    int
	Filter      string
}

ClientIdentity 客户端标识

type ClusterManager added in v1.0.0

type ClusterManager interface {
	Init(ctx context.Context) error
	GetNode(ctx context.Context) (string, error)
	GetLock(ctx context.Context) error // 获取锁
}

ClusterManager 集群经理;处理集群节点选取和分布式锁

func NewClusterNodeManager added in v1.0.0

func NewClusterNodeManager(destination string, zkServer []string, sessionTimeout time.Duration) ClusterManager

NewClusterNodeManager 新建集群节点经理

type Column

type Column struct {

	// *字段下标*
	Index int32 `protobuf:"varint,1,opt,name=index,proto3" json:"index,omitempty"`
	// *字段java中类型*
	SqlType int32 `protobuf:"varint,2,opt,name=sqlType,proto3" json:"sqlType,omitempty"`
	// *字段名称(忽略大小写),在mysql中是没有的*
	Name string `protobuf:"bytes,3,opt,name=name,proto3" json:"name,omitempty"`
	// *是否是主键*
	IsKey bool `protobuf:"varint,4,opt,name=isKey,proto3" json:"isKey,omitempty"`
	// *如果EventType=UPDATE,用于标识这个字段值是否有修改*
	Updated bool `protobuf:"varint,5,opt,name=updated,proto3" json:"updated,omitempty"`
	// [default = false]
	//
	// Types that are valid to be assigned to IsNullPresent:
	//
	//	*Column_IsNull
	IsNullPresent isColumn_IsNullPresent `protobuf_oneof:"isNull_present"`
	// *预留扩展*
	Props []*Pair `protobuf:"bytes,7,rep,name=props,proto3" json:"props,omitempty"`
	// * 字段值,timestamp,Datetime是一个时间格式的文本 *
	Value string `protobuf:"bytes,8,opt,name=value,proto3" json:"value,omitempty"`
	// * 对应数据对象原始长度 *
	Length int32 `protobuf:"varint,9,opt,name=length,proto3" json:"length,omitempty"`
	// *字段mysql类型*
	MysqlType string `protobuf:"bytes,10,opt,name=mysqlType,proto3" json:"mysqlType,omitempty"`
	// contains filtered or unexported fields
}

*每个字段的数据结构*

func (*Column) Descriptor deprecated

func (*Column) Descriptor() ([]byte, []int)

Deprecated: Use Column.ProtoReflect.Descriptor instead.

func (*Column) GetIndex

func (x *Column) GetIndex() int32

func (*Column) GetIsKey

func (x *Column) GetIsKey() bool

func (*Column) GetIsNull

func (x *Column) GetIsNull() bool

func (*Column) GetIsNullPresent

func (x *Column) GetIsNullPresent() isColumn_IsNullPresent

func (*Column) GetLength

func (x *Column) GetLength() int32

func (*Column) GetMysqlType

func (x *Column) GetMysqlType() string

func (*Column) GetName

func (x *Column) GetName() string

func (*Column) GetProps

func (x *Column) GetProps() []*Pair

func (*Column) GetSqlType

func (x *Column) GetSqlType() int32

func (*Column) GetUpdated

func (x *Column) GetUpdated() bool

func (*Column) GetValue

func (x *Column) GetValue() string

func (*Column) ProtoMessage

func (*Column) ProtoMessage()

func (*Column) ProtoReflect

func (x *Column) ProtoReflect() protoreflect.Message

func (*Column) Reset

func (x *Column) Reset()

func (*Column) String

func (x *Column) String() string

type Column_IsNull

type Column_IsNull struct {
	IsNull bool `protobuf:"varint,6,opt,name=isNull,proto3,oneof"`
}

type Connector

type Connector interface {
	Connect(ctx context.Context) error
	Disconnect(ctx context.Context) error
	Subscribe(ctx context.Context, filter string) error
	Unsubscribe(ctx context.Context) error
	Get(ctx context.Context, batchSize int32, timeout time.Duration) (*Message, error)
	GetWithoutAck(ctx context.Context, batchSize int32, timeout time.Duration) (*Message, error)
	Ack(ctx context.Context, batchId int64) error
	Rollback(ctx context.Context, batchId int64) error
}

Connector 连接器

func NewClusterConnector

func NewClusterConnector(destination string, zkServer []string, zkSessionTimeout time.Duration, opts ...Option) Connector

NewClusterConnector 新建集群连接器

func NewSimpleConnector

func NewSimpleConnector(address string, destination string, opts ...Option) Connector

NewSimpleConnector 新建简单连接器

type ConnectorConfig

type ConnectorConfig struct {
	Username             string        // 用户名
	Password             string        // 密码
	SoTimeout            time.Duration // 网络超时
	IdleTimeout          time.Duration // 空闲超时
	RollbackOnConnect    bool          // 是否在connect链接成功后,自动执行rollback操作
	RollbackOnDisconnect bool          // 是否在connect链接断开后,自动执行rollback操作
	LazyParseEntry       bool          // 是否自动化解析Entry对象,如果考虑最大化性能可以延后解析
	Filter               string        // 记录上一次的filter提交值,便于自动重试时提交
	RetryTimes           int
	RetryInterval        time.Duration
}

ConnectorConfig 简单客户端配置

type Entry

type Entry struct {

	// *协议头部信息*
	Header *Header `protobuf:"bytes,1,opt,name=header,proto3" json:"header,omitempty"`
	// /**打散后的事件类型**/ [default = ROWDATA]
	//
	// Types that are valid to be assigned to EntryTypePresent:
	//
	//	*Entry_EntryType
	EntryTypePresent isEntry_EntryTypePresent `protobuf_oneof:"entryType_present"`
	// *传输的二进制数组*
	StoreValue []byte `protobuf:"bytes,3,opt,name=storeValue,proto3" json:"storeValue,omitempty"`
	// contains filtered or unexported fields
}

*************************************************************** message model 如果要在Enum中新增类型,确保以前的类型的下标值不变. **************************************************************

func (*Entry) Descriptor deprecated

func (*Entry) Descriptor() ([]byte, []int)

Deprecated: Use Entry.ProtoReflect.Descriptor instead.

func (*Entry) GetEntryType

func (x *Entry) GetEntryType() EntryType

func (*Entry) GetEntryTypePresent

func (x *Entry) GetEntryTypePresent() isEntry_EntryTypePresent

func (*Entry) GetHeader

func (x *Entry) GetHeader() *Header

func (*Entry) GetStoreValue

func (x *Entry) GetStoreValue() []byte

func (*Entry) ProtoMessage

func (*Entry) ProtoMessage()

func (*Entry) ProtoReflect

func (x *Entry) ProtoReflect() protoreflect.Message

func (*Entry) Reset

func (x *Entry) Reset()

func (*Entry) String

func (x *Entry) String() string

type EntryType

type EntryType int32

*打散后的事件类型,主要用于标识事务的开始,变更数据,结束*

const (
	EntryType_ENTRYTYPECOMPATIBLEPROTO2 EntryType = 0
	EntryType_TRANSACTIONBEGIN          EntryType = 1
	EntryType_ROWDATA                   EntryType = 2
	EntryType_TRANSACTIONEND            EntryType = 3
	// * 心跳类型,内部使用,外部暂不可见,可忽略 *
	EntryType_HEARTBEAT EntryType = 4
	EntryType_GTIDLOG   EntryType = 5
)

func (EntryType) Descriptor

func (EntryType) Descriptor() protoreflect.EnumDescriptor

func (EntryType) Enum

func (x EntryType) Enum() *EntryType

func (EntryType) EnumDescriptor deprecated

func (EntryType) EnumDescriptor() ([]byte, []int)

Deprecated: Use EntryType.Descriptor instead.

func (EntryType) Number

func (x EntryType) Number() protoreflect.EnumNumber

func (EntryType) String

func (x EntryType) String() string

func (EntryType) Type

type Entry_EntryType

type Entry_EntryType struct {
	EntryType EntryType `protobuf:"varint,2,opt,name=entryType,proto3,enum=com.alibaba.otter.canal.protocol.entry.EntryType,oneof"`
}

type EventType

type EventType int32

* 事件类型 *

const (
	EventType_EVENTTYPECOMPATIBLEPROTO2 EventType = 0
	EventType_INSERT                    EventType = 1
	EventType_UPDATE                    EventType = 2
	EventType_DELETE                    EventType = 3
	EventType_CREATE                    EventType = 4
	EventType_ALTER                     EventType = 5
	EventType_ERASE                     EventType = 6
	EventType_QUERY                     EventType = 7
	EventType_TRUNCATE                  EventType = 8
	EventType_RENAME                    EventType = 9
	// *CREATE INDEX*
	EventType_CINDEX EventType = 10
	EventType_DINDEX EventType = 11
	EventType_GTID   EventType = 12
	// * XA *
	EventType_XACOMMIT   EventType = 13
	EventType_XAROLLBACK EventType = 14
	// * MASTER HEARTBEAT *
	EventType_MHEARTBEAT EventType = 15
)

func (EventType) Descriptor

func (EventType) Descriptor() protoreflect.EnumDescriptor

func (EventType) Enum

func (x EventType) Enum() *EventType

func (EventType) EnumDescriptor deprecated

func (EventType) EnumDescriptor() ([]byte, []int)

Deprecated: Use EventType.Descriptor instead.

func (EventType) Number

func (x EventType) Number() protoreflect.EnumNumber

func (EventType) String

func (x EventType) String() string

func (EventType) Type

type Header struct {

	// [default = 1]
	//
	// Types that are valid to be assigned to VersionPresent:
	//
	//	*Header_Version
	VersionPresent isHeader_VersionPresent `protobuf_oneof:"version_present"`
	// *binlog/redolog 文件名*
	LogfileName string `protobuf:"bytes,2,opt,name=logfileName,proto3" json:"logfileName,omitempty"`
	// *binlog/redolog 文件的偏移位置*
	LogfileOffset int64 `protobuf:"varint,3,opt,name=logfileOffset,proto3" json:"logfileOffset,omitempty"`
	// *服务端serverId*
	ServerId int64 `protobuf:"varint,4,opt,name=serverId,proto3" json:"serverId,omitempty"`
	// * 变更数据的编码 *
	ServerenCode string `protobuf:"bytes,5,opt,name=serverenCode,proto3" json:"serverenCode,omitempty"`
	// *变更数据的执行时间 *
	ExecuteTime int64 `protobuf:"varint,6,opt,name=executeTime,proto3" json:"executeTime,omitempty"`
	// [default = MYSQL]
	//
	// Types that are valid to be assigned to SourceTypePresent:
	//
	//	*Header_SourceType
	SourceTypePresent isHeader_SourceTypePresent `protobuf_oneof:"sourceType_present"`
	// * 变更数据的schemaname*
	SchemaName string `protobuf:"bytes,8,opt,name=schemaName,proto3" json:"schemaName,omitempty"`
	// *变更数据的tablename*
	TableName string `protobuf:"bytes,9,opt,name=tableName,proto3" json:"tableName,omitempty"`
	// *每个event的长度*
	EventLength int64 `protobuf:"varint,10,opt,name=eventLength,proto3" json:"eventLength,omitempty"`
	// [default = UPDATE]
	//
	// Types that are valid to be assigned to EventTypePresent:
	//
	//	*Header_EventType
	EventTypePresent isHeader_EventTypePresent `protobuf_oneof:"eventType_present"`
	// *预留扩展*
	Props []*Pair `protobuf:"bytes,12,rep,name=props,proto3" json:"props,omitempty"`
	// *当前事务的gitd*
	Gtid string `protobuf:"bytes,13,opt,name=gtid,proto3" json:"gtid,omitempty"`
	// contains filtered or unexported fields
}

*message Header*

func (*Header) Descriptor deprecated

func (*Header) Descriptor() ([]byte, []int)

Deprecated: Use Header.ProtoReflect.Descriptor instead.

func (*Header) GetEventLength

func (x *Header) GetEventLength() int64

func (*Header) GetEventType

func (x *Header) GetEventType() EventType

func (*Header) GetEventTypePresent

func (x *Header) GetEventTypePresent() isHeader_EventTypePresent

func (*Header) GetExecuteTime

func (x *Header) GetExecuteTime() int64

func (*Header) GetGtid

func (x *Header) GetGtid() string

func (*Header) GetLogfileName

func (x *Header) GetLogfileName() string

func (*Header) GetLogfileOffset

func (x *Header) GetLogfileOffset() int64

func (*Header) GetProps

func (x *Header) GetProps() []*Pair

func (*Header) GetSchemaName

func (x *Header) GetSchemaName() string

func (*Header) GetServerId

func (x *Header) GetServerId() int64

func (*Header) GetServerenCode

func (x *Header) GetServerenCode() string

func (*Header) GetSourceType

func (x *Header) GetSourceType() Type

func (*Header) GetSourceTypePresent

func (x *Header) GetSourceTypePresent() isHeader_SourceTypePresent

func (*Header) GetTableName

func (x *Header) GetTableName() string

func (*Header) GetVersion

func (x *Header) GetVersion() int32

func (*Header) GetVersionPresent

func (x *Header) GetVersionPresent() isHeader_VersionPresent

func (*Header) ProtoMessage

func (*Header) ProtoMessage()

func (*Header) ProtoReflect

func (x *Header) ProtoReflect() protoreflect.Message

func (*Header) Reset

func (x *Header) Reset()

func (*Header) String

func (x *Header) String() string

type Header_EventType

type Header_EventType struct {
	EventType EventType `protobuf:"varint,11,opt,name=eventType,proto3,enum=com.alibaba.otter.canal.protocol.entry.EventType,oneof"`
}

type Header_SourceType

type Header_SourceType struct {
	SourceType Type `protobuf:"varint,7,opt,name=sourceType,proto3,enum=com.alibaba.otter.canal.protocol.entry.Type,oneof"`
}

type Header_Version

type Header_Version struct {
	Version int32 `protobuf:"varint,1,opt,name=version,proto3,oneof"`
}

type Message

type Message struct {
	Id         int64
	Entries    []*Entry
	Raw        bool
	RawEntries any
}

type Option

type Option func(*ConnectorConfig)

func WithFilter

func WithFilter(filter string) Option

func WithIdleTimeout

func WithIdleTimeout(idleTimeout time.Duration) Option

func WithLazyParseEntry

func WithLazyParseEntry(lazyParseEntry bool) Option

func WithPassword

func WithPassword(password string) Option

func WithRetryInterval added in v1.0.0

func WithRetryInterval(retryInterval time.Duration) Option

func WithRetryTimes

func WithRetryTimes(retryTimes int) Option

func WithRollbackOnConnect

func WithRollbackOnConnect(rollbackOnConnect bool) Option

func WithRollbackOnDisconnect

func WithRollbackOnDisconnect(rollbackOnDisconnect bool) Option

func WithSoTimeout

func WithSoTimeout(soTimeout time.Duration) Option

func WithUsername

func WithUsername(username string) Option

type Pair

type Pair struct {
	Key   string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
	Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
	// contains filtered or unexported fields
}

*预留扩展*

func (*Pair) Descriptor deprecated

func (*Pair) Descriptor() ([]byte, []int)

Deprecated: Use Pair.ProtoReflect.Descriptor instead.

func (*Pair) GetKey

func (x *Pair) GetKey() string

func (*Pair) GetValue

func (x *Pair) GetValue() string

func (*Pair) ProtoMessage

func (*Pair) ProtoMessage()

func (*Pair) ProtoReflect

func (x *Pair) ProtoReflect() protoreflect.Message

func (*Pair) Reset

func (x *Pair) Reset()

func (*Pair) String

func (x *Pair) String() string

type RowChange

type RowChange struct {

	// *tableId,由数据库产生*
	TableId int64 `protobuf:"varint,1,opt,name=tableId,proto3" json:"tableId,omitempty"`
	// [default = UPDATE]
	//
	// Types that are valid to be assigned to EventTypePresent:
	//
	//	*RowChange_EventType
	EventTypePresent isRowChange_EventTypePresent `protobuf_oneof:"eventType_present"`
	// [default = false]
	//
	// Types that are valid to be assigned to IsDdlPresent:
	//
	//	*RowChange_IsDdl
	IsDdlPresent isRowChange_IsDdlPresent `protobuf_oneof:"isDdl_present"`
	// * ddl/query的sql语句  *
	Sql string `protobuf:"bytes,11,opt,name=sql,proto3" json:"sql,omitempty"`
	// * 一次数据库变更可能存在多行  *
	RowDatas []*RowData `protobuf:"bytes,12,rep,name=rowDatas,proto3" json:"rowDatas,omitempty"`
	// *预留扩展*
	Props []*Pair `protobuf:"bytes,13,rep,name=props,proto3" json:"props,omitempty"`
	// * ddl/query的schemaName,会存在跨库ddl,需要保留执行ddl的当前schemaName  *
	DdlSchemaName string `protobuf:"bytes,14,opt,name=ddlSchemaName,proto3" json:"ddlSchemaName,omitempty"`
	// contains filtered or unexported fields
}

*message row 每行变更数据的数据结构*

func (*RowChange) Descriptor deprecated

func (*RowChange) Descriptor() ([]byte, []int)

Deprecated: Use RowChange.ProtoReflect.Descriptor instead.

func (*RowChange) GetDdlSchemaName

func (x *RowChange) GetDdlSchemaName() string

func (*RowChange) GetEventType

func (x *RowChange) GetEventType() EventType

func (*RowChange) GetEventTypePresent

func (x *RowChange) GetEventTypePresent() isRowChange_EventTypePresent

func (*RowChange) GetIsDdl

func (x *RowChange) GetIsDdl() bool

func (*RowChange) GetIsDdlPresent

func (x *RowChange) GetIsDdlPresent() isRowChange_IsDdlPresent

func (*RowChange) GetProps

func (x *RowChange) GetProps() []*Pair

func (*RowChange) GetRowDatas

func (x *RowChange) GetRowDatas() []*RowData

func (*RowChange) GetSql

func (x *RowChange) GetSql() string

func (*RowChange) GetTableId

func (x *RowChange) GetTableId() int64

func (*RowChange) ProtoMessage

func (*RowChange) ProtoMessage()

func (*RowChange) ProtoReflect

func (x *RowChange) ProtoReflect() protoreflect.Message

func (*RowChange) Reset

func (x *RowChange) Reset()

func (*RowChange) String

func (x *RowChange) String() string

type RowChange_EventType

type RowChange_EventType struct {
	EventType EventType `protobuf:"varint,2,opt,name=eventType,proto3,enum=com.alibaba.otter.canal.protocol.entry.EventType,oneof"`
}

type RowChange_IsDdl

type RowChange_IsDdl struct {
	IsDdl bool `protobuf:"varint,10,opt,name=isDdl,proto3,oneof"`
}

type RowData

type RowData struct {

	// * 字段信息,增量数据(修改前,删除前) *
	BeforeColumns []*Column `protobuf:"bytes,1,rep,name=beforeColumns,proto3" json:"beforeColumns,omitempty"`
	// * 字段信息,增量数据(修改后,新增后)  *
	AfterColumns []*Column `protobuf:"bytes,2,rep,name=afterColumns,proto3" json:"afterColumns,omitempty"`
	// *预留扩展*
	Props []*Pair `protobuf:"bytes,3,rep,name=props,proto3" json:"props,omitempty"`
	// contains filtered or unexported fields
}

func (*RowData) Descriptor deprecated

func (*RowData) Descriptor() ([]byte, []int)

Deprecated: Use RowData.ProtoReflect.Descriptor instead.

func (*RowData) GetAfterColumns

func (x *RowData) GetAfterColumns() []*Column

func (*RowData) GetBeforeColumns

func (x *RowData) GetBeforeColumns() []*Column

func (*RowData) GetProps

func (x *RowData) GetProps() []*Pair

func (*RowData) ProtoMessage

func (*RowData) ProtoMessage()

func (*RowData) ProtoReflect

func (x *RowData) ProtoReflect() protoreflect.Message

func (*RowData) Reset

func (x *RowData) Reset()

func (*RowData) String

func (x *RowData) String() string

type TimeUint

type TimeUint int32
const (
	TimeUintNanoseconds TimeUint = iota
	TimeUnitMicroseconds
	TimeUnitMilliseconds
	TimeUnitSeconds
	TimeUnitMinutes
	TimeUnitHours
	TimeUnitDays
)

type TransactionBegin

type TransactionBegin struct {

	// *已废弃,请使用header里的executeTime*
	ExecuteTime int64 `protobuf:"varint,1,opt,name=executeTime,proto3" json:"executeTime,omitempty"`
	// *已废弃,Begin里不提供事务id*
	TransactionId string `protobuf:"bytes,2,opt,name=transactionId,proto3" json:"transactionId,omitempty"`
	// *预留扩展*
	Props []*Pair `protobuf:"bytes,3,rep,name=props,proto3" json:"props,omitempty"`
	// *执行的thread Id*
	ThreadId int64 `protobuf:"varint,4,opt,name=threadId,proto3" json:"threadId,omitempty"`
	// contains filtered or unexported fields
}

*开始事务的一些信息*

func (*TransactionBegin) Descriptor deprecated

func (*TransactionBegin) Descriptor() ([]byte, []int)

Deprecated: Use TransactionBegin.ProtoReflect.Descriptor instead.

func (*TransactionBegin) GetExecuteTime

func (x *TransactionBegin) GetExecuteTime() int64

func (*TransactionBegin) GetProps

func (x *TransactionBegin) GetProps() []*Pair

func (*TransactionBegin) GetThreadId

func (x *TransactionBegin) GetThreadId() int64

func (*TransactionBegin) GetTransactionId

func (x *TransactionBegin) GetTransactionId() string

func (*TransactionBegin) ProtoMessage

func (*TransactionBegin) ProtoMessage()

func (*TransactionBegin) ProtoReflect

func (x *TransactionBegin) ProtoReflect() protoreflect.Message

func (*TransactionBegin) Reset

func (x *TransactionBegin) Reset()

func (*TransactionBegin) String

func (x *TransactionBegin) String() string

type TransactionEnd

type TransactionEnd struct {

	// *已废弃,请使用header里的executeTime*
	ExecuteTime int64 `protobuf:"varint,1,opt,name=executeTime,proto3" json:"executeTime,omitempty"`
	// *事务号*
	TransactionId string `protobuf:"bytes,2,opt,name=transactionId,proto3" json:"transactionId,omitempty"`
	// *预留扩展*
	Props []*Pair `protobuf:"bytes,3,rep,name=props,proto3" json:"props,omitempty"`
	// contains filtered or unexported fields
}

*结束事务的一些信息*

func (*TransactionEnd) Descriptor deprecated

func (*TransactionEnd) Descriptor() ([]byte, []int)

Deprecated: Use TransactionEnd.ProtoReflect.Descriptor instead.

func (*TransactionEnd) GetExecuteTime

func (x *TransactionEnd) GetExecuteTime() int64

func (*TransactionEnd) GetProps

func (x *TransactionEnd) GetProps() []*Pair

func (*TransactionEnd) GetTransactionId

func (x *TransactionEnd) GetTransactionId() string

func (*TransactionEnd) ProtoMessage

func (*TransactionEnd) ProtoMessage()

func (*TransactionEnd) ProtoReflect

func (x *TransactionEnd) ProtoReflect() protoreflect.Message

func (*TransactionEnd) Reset

func (x *TransactionEnd) Reset()

func (*TransactionEnd) String

func (x *TransactionEnd) String() string

type Type

type Type int32

*数据库类型*

const (
	Type_TYPECOMPATIBLEPROTO2 Type = 0
	Type_ORACLE               Type = 1
	Type_MYSQL                Type = 2
	Type_PGSQL                Type = 3
)

func (Type) Descriptor

func (Type) Descriptor() protoreflect.EnumDescriptor

func (Type) Enum

func (x Type) Enum() *Type

func (Type) EnumDescriptor deprecated

func (Type) EnumDescriptor() ([]byte, []int)

Deprecated: Use Type.Descriptor instead.

func (Type) Number

func (x Type) Number() protoreflect.EnumNumber

func (Type) String

func (x Type) String() string

func (Type) Type

func (Type) Type() protoreflect.EnumType

Directories

Path Synopsis
cluster command
protocol

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL