首页 专栏 golang 文章详情
关注作者
关注作者
0
聊聊storagetapper的pipe
codecraft 发布于 3 月 2 日
序
本文主要研究一下storagetapper的pipe
Pipe
storagetapper/pipe/pipe.go
type Pipe interface {
NewConsumer(topic string) (Consumer, error)
NewProducer(topic string) (Producer, error)
Type() string
Config() *config.PipeConfig
Close() error
}
Pipe接口定义了NewConsumer、NewProducer、Type、Config、Close方法
Consumer
storagetapper/pipe/pipe.go
type Consumer interface {
Close() error
//CloseOnFailure doesn't save offsets
CloseOnFailure() error
Message() chan interface{}
Error() chan error
FetchNext() (interface{}, error)
//Allows to explicitly persists current consumer position
SaveOffset() error
//SetFormat allow to tell consumer the format of the file when there is no
//header
SetFormat(format string)
}
Consumer接口定义了Close、CloseOnFailure、Message、Error、FetchNext、SaveOffset、SetFormat方法
Producer
storagetapper/pipe/pipe.go
type Producer interface {
Push(data interface{}) error
PushK(key string, data interface{}) error
PushSchema(key string, data []byte) error
//PushBatch queues the messages instead of sending immediately
PushBatch(key string, data interface{}) error
//PushCommit writes out all the messages queued by PushBatch
PushBatchCommit() error
Close() error
CloseOnFailure() error
SetFormat(format string)
PartitionKey(source string, key string) string
}
Producer接口定义了Push、PushK、PushSchema、PushBatch、PushBatchCommit、Close、CloseOnFailure、SetFormat、PartitionKey
Create
storagetapper/pipe/pipe.go
func Create(pipeType string, cfg *config.PipeConfig, db *sql.DB) (Pipe, error) {
init := Pipes[strings.ToLower(pipeType)]
if init == nil {
return nil, fmt.Errorf("unsupported pipe: %s", strings.ToLower(pipeType))
}
pipe, err := init(cfg, db)
if err != nil {
return nil, err
}
return pipe, nil
}
type constructor func(cfg *config.PipeConfig, db *sql.DB) (Pipe, error)
//Pipes is the list of registered pipes
//Plugins insert their constructors into this map
var Pipes map[string]constructor
//registerPlugin should be called from plugin's init
func registerPlugin(name string, init constructor) {
if Pipes == nil {
Pipes = make(map[string]constructor)
}
Pipes[name] = init
}
Create方法根据pipeType、PipeConfig、db来创建pipe
小结
storagetapper的Pipe接口定义了NewConsumer、NewProducer、Type、Config、Close方法,其Create方法根据pipeType、PipeConfig、db来创建pipe。
doc
storagetapper golang
阅读 89 发布于 3 月 2 日
赞
收藏
分享
本作品系原创, 采用《署名-非商业性使用-禁止演绎 4.0 国际》许可协议
code-craft
spring boot , docker and so on 欢迎关注微信公众号: geek_luandun
关注专栏
codecraft
当一个代码的工匠回首往事时,不因虚度年华而悔恨,也不因碌碌无为而羞愧,这样,当他老的时候,可以很自豪告诉世人,我曾经将代码注入生命去打造互联网的浪潮之巅,那是个很疯狂的时代,我在一波波的浪潮上留下了或重如泰山或轻如鸿毛的几笔。
11.4k 声望
1.1k 粉丝
0 条评论
得票 时间
提交评论
codecraft
当一个代码的工匠回首往事时,不因虚度年华而悔恨,也不因碌碌无为而羞愧,这样,当他老的时候,可以很自豪告诉世人,我曾经将代码注入生命去打造互联网的浪潮之巅,那是个很疯狂的时代,我在一波波的浪潮上留下了或重如泰山或轻如鸿毛的几笔。
11.4k 声望
1.1k 粉丝
宣传栏
目录
▲
序
本文主要研究一下storagetapper的pipe
Pipe
storagetapper/pipe/pipe.go
type Pipe interface {
NewConsumer(topic string) (Consumer, error)
NewProducer(topic string) (Producer, error)
Type() string
Config() *config.PipeConfig
Close() error
}
Pipe接口定义了NewConsumer、NewProducer、Type、Config、Close方法
Consumer
storagetapper/pipe/pipe.go
type Consumer interface {
Close() error
//CloseOnFailure doesn't save offsets
CloseOnFailure() error
Message() chan interface{}
Error() chan error
FetchNext() (interface{}, error)
//Allows to explicitly persists current consumer position
SaveOffset() error
//SetFormat allow to tell consumer the format of the file when there is no
//header
SetFormat(format string)
}
Consumer接口定义了Close、CloseOnFailure、Message、Error、FetchNext、SaveOffset、SetFormat方法
Producer
storagetapper/pipe/pipe.go
type Producer interface {
Push(data interface{}) error
PushK(key string, data interface{}) error
PushSchema(key string, data []byte) error
//PushBatch queues the messages instead of sending immediately
PushBatch(key string, data interface{}) error
//PushCommit writes out all the messages queued by PushBatch
PushBatchCommit() error
Close() error
CloseOnFailure() error
SetFormat(format string)
PartitionKey(source string, key string) string
}
Producer接口定义了Push、PushK、PushSchema、PushBatch、PushBatchCommit、Close、CloseOnFailure、SetFormat、PartitionKey
Create
storagetapper/pipe/pipe.go
func Create(pipeType string, cfg *config.PipeConfig, db *sql.DB) (Pipe, error) {
init := Pipes[strings.ToLower(pipeType)]
if init == nil {
return nil, fmt.Errorf("unsupported pipe: %s", strings.ToLower(pipeType))
}
pipe, err := init(cfg, db)
if err != nil {
return nil, err
}
return pipe, nil
}
type constructor func(cfg *config.PipeConfig, db *sql.DB) (Pipe, error)
//Pipes is the list of registered pipes
//Plugins insert their constructors into this map
var Pipes map[string]constructor
//registerPlugin should be called from plugin's init
func registerPlugin(name string, init constructor) {
if Pipes == nil {
Pipes = make(map[string]constructor)
}
Pipes[name] = init
}
Create方法根据pipeType、PipeConfig、db来创建pipe
小结
storagetapper的Pipe接口定义了NewConsumer、NewProducer、Type、Config、Close方法,其Create方法根据pipeType、PipeConfig、db来创建pipe。