聊聊storagetapper的pipe
首页 专栏 golang 文章详情
0

聊聊storagetapper的pipe

codecraft 发布于 3 月 2 日

本文主要研究一下storagetapper的pipe

Pipe

storagetapper/pipe/pipe.go

type Pipe interface {
    NewConsumer(topic string) (Consumer, error)
    NewProducer(topic string) (Producer, error)
    Type() string
    Config() *config.PipeConfig
    Close() error
}
Pipe接口定义了NewConsumer、NewProducer、Type、Config、Close方法

Consumer

storagetapper/pipe/pipe.go

type Consumer interface {
    Close() error
    //CloseOnFailure doesn't save offsets
    CloseOnFailure() error
    Message() chan interface{}
    Error() chan error
    FetchNext() (interface{}, error)
    //Allows to explicitly persists current consumer position
    SaveOffset() error

    //SetFormat allow to tell consumer the format of the file when there is no
    //header
    SetFormat(format string)
}
Consumer接口定义了Close、CloseOnFailure、Message、Error、FetchNext、SaveOffset、SetFormat方法

Producer

storagetapper/pipe/pipe.go

type Producer interface {
    Push(data interface{}) error
    PushK(key string, data interface{}) error
    PushSchema(key string, data []byte) error
    //PushBatch queues the messages instead of sending immediately
    PushBatch(key string, data interface{}) error
    //PushCommit writes out all the messages queued by PushBatch
    PushBatchCommit() error
    Close() error
    CloseOnFailure() error

    SetFormat(format string)

    PartitionKey(source string, key string) string
}
Producer接口定义了Push、PushK、PushSchema、PushBatch、PushBatchCommit、Close、CloseOnFailure、SetFormat、PartitionKey

Create

storagetapper/pipe/pipe.go

func Create(pipeType string, cfg *config.PipeConfig, db *sql.DB) (Pipe, error) {

    init := Pipes[strings.ToLower(pipeType)]
    if init == nil {
        return nil, fmt.Errorf("unsupported pipe: %s", strings.ToLower(pipeType))
    }

    pipe, err := init(cfg, db)
    if err != nil {
        return nil, err
    }

    return pipe, nil
}

type constructor func(cfg *config.PipeConfig, db *sql.DB) (Pipe, error)

//Pipes is the list of registered pipes
//Plugins insert their constructors into this map
var Pipes map[string]constructor

//registerPlugin should be called from plugin's init
func registerPlugin(name string, init constructor) {
    if Pipes == nil {
        Pipes = make(map[string]constructor)
    }
    Pipes[name] = init
}
Create方法根据pipeType、PipeConfig、db来创建pipe

小结

storagetapper的Pipe接口定义了NewConsumer、NewProducer、Type、Config、Close方法,其Create方法根据pipeType、PipeConfig、db来创建pipe。

doc

storagetapper
golang
阅读 89 发布于 3 月 2 日
收藏
分享
本作品系原创, 采用《署名-非商业性使用-禁止演绎 4.0 国际》许可协议
code-craft
spring boot , docker and so on 欢迎关注微信公众号: geek_luandun
关注专栏
avatar
codecraft

当一个代码的工匠回首往事时,不因虚度年华而悔恨,也不因碌碌无为而羞愧,这样,当他老的时候,可以很自豪告诉世人,我曾经将代码注入生命去打造互联网的浪潮之巅,那是个很疯狂的时代,我在一波波的浪潮上留下了或重如泰山或轻如鸿毛的几笔。

11.4k 声望
1.1k 粉丝
关注作者
0 条评论
得票 时间
提交评论
avatar
codecraft

当一个代码的工匠回首往事时,不因虚度年华而悔恨,也不因碌碌无为而羞愧,这样,当他老的时候,可以很自豪告诉世人,我曾经将代码注入生命去打造互联网的浪潮之巅,那是个很疯狂的时代,我在一波波的浪潮上留下了或重如泰山或轻如鸿毛的几笔。

11.4k 声望
1.1k 粉丝
关注作者
宣传栏
目录

本文主要研究一下storagetapper的pipe

Pipe

storagetapper/pipe/pipe.go

type Pipe interface {
    NewConsumer(topic string) (Consumer, error)
    NewProducer(topic string) (Producer, error)
    Type() string
    Config() *config.PipeConfig
    Close() error
}
Pipe接口定义了NewConsumer、NewProducer、Type、Config、Close方法

Consumer

storagetapper/pipe/pipe.go

type Consumer interface {
    Close() error
    //CloseOnFailure doesn't save offsets
    CloseOnFailure() error
    Message() chan interface{}
    Error() chan error
    FetchNext() (interface{}, error)
    //Allows to explicitly persists current consumer position
    SaveOffset() error

    //SetFormat allow to tell consumer the format of the file when there is no
    //header
    SetFormat(format string)
}
Consumer接口定义了Close、CloseOnFailure、Message、Error、FetchNext、SaveOffset、SetFormat方法

Producer

storagetapper/pipe/pipe.go

type Producer interface {
    Push(data interface{}) error
    PushK(key string, data interface{}) error
    PushSchema(key string, data []byte) error
    //PushBatch queues the messages instead of sending immediately
    PushBatch(key string, data interface{}) error
    //PushCommit writes out all the messages queued by PushBatch
    PushBatchCommit() error
    Close() error
    CloseOnFailure() error

    SetFormat(format string)

    PartitionKey(source string, key string) string
}
Producer接口定义了Push、PushK、PushSchema、PushBatch、PushBatchCommit、Close、CloseOnFailure、SetFormat、PartitionKey

Create

storagetapper/pipe/pipe.go

func Create(pipeType string, cfg *config.PipeConfig, db *sql.DB) (Pipe, error) {

    init := Pipes[strings.ToLower(pipeType)]
    if init == nil {
        return nil, fmt.Errorf("unsupported pipe: %s", strings.ToLower(pipeType))
    }

    pipe, err := init(cfg, db)
    if err != nil {
        return nil, err
    }

    return pipe, nil
}

type constructor func(cfg *config.PipeConfig, db *sql.DB) (Pipe, error)

//Pipes is the list of registered pipes
//Plugins insert their constructors into this map
var Pipes map[string]constructor

//registerPlugin should be called from plugin's init
func registerPlugin(name string, init constructor) {
    if Pipes == nil {
        Pipes = make(map[string]constructor)
    }
    Pipes[name] = init
}
Create方法根据pipeType、PipeConfig、db来创建pipe

小结

storagetapper的Pipe接口定义了NewConsumer、NewProducer、Type、Config、Close方法,其Create方法根据pipeType、PipeConfig、db来创建pipe。

doc

storagetapper