2022.04.08
Developing Fluent Bit Golang Output Plugins
こんにちは、次世代システム研究室のN.M.です。
Fluent Bit is developed by the same people who made Fluentd, and it serves the same purpose, which is log ingestion.
The difference is that it is designed for resource-restricted environments such as sidecar containers in Kubernetes. It is written in C and has a much lighter memory footprint than Fluentd while performing as well or better, but Fluentd is more mature and has more plugins available.
See Fluent Bit plugins here: https://docs.fluentbit.io/manual/pipeline/outputs
See Fluentd plugins here: https://www.fluentd.org/plugins/all
If you need a plugin for Fluentd chances are someone has written it already. If you need a plugin for Fluent Bit it is quite possible that it doesn’t exist. In which case, you may choose the easy path and just go with Fluentd. Alternatively, you may decide to write a new plugin yourself.
Here I cover writing Golang output plugins for Fluent Bit.
As shown in the document linked above, Fluent Bit provides hooks where the output plugin’s functionality can be defined. Let’s explore these, one by one.
FLBPluginRegister
//export FLBPluginRegister func FLBPluginRegister(ctx unsafe.Pointer) int { return output.FLBPluginRegister(ctx, "myoutputplugin", "example output plugin") }
The above method registers the plugin, myoutputplugin
.
Remember I said that Fluent Bit is written in C, well it communicates with Go plugins using cgo
. The //export FLBPluginRegister
tells cgo
that this function is to be accessed from C code.
The plugin may now be referred to within an [OUTPUT]
element in a Fluent Bit configuration file, such as shown below:
[OUTPUT] Name myoutputplugin
Let’s say we save this in a file called fluent-bit.conf
FLBPluginRegister
gets called once only.
This by itself is not very interesting, we can refer to a plugin by name, but we can’t configure it. Let’s do some configuration.
To do this we need to implement FLBPluginInit
.
FLBPluginInit
var id string //export FLBPluginInit func FLBPluginInit(ctx unsafe.Pointer) int { id = output.FLBPluginConfigKey(ctx, "Id") fmt.Printf("id: %v\n", id) return output.FLB_OK }
FLBPluginInit
gets called once per [OUTPUT]
config that refers to the plugin name, in our case myoutputplugin
.
We initialize the global id
variable to whatever was passed into the config:
[OUTPUT] Name myoutputplugin Id a
So the global variable id
will be initialized to the string "a"
But, since id
is a global variable, subsequent plugin configs will
overwrite id
‘s value.
# Initialize the id variable with 'a' [OUTPUT] Name myoutputplugin Id a # Then overwrite the id variable with 'b' [OUTPUT] Name myoutputplugin Id b
Obviously, we need a way to configure multiple plugins. To do this we will create a map in our go code, where the key is the Id passed in the plugin
config and the values are any other parameters that we may wish to configure. Let’s use a struct to hold these other parameters so that we can have various types.
Now our FLBPluginInit
function and plugin variables will look like this:
type PluginConf struct { foo string bar int } var ( plugin map[string]PluginConf ) //export FLBPluginInit func FLBPluginInit(ctx unsafe.Pointer) int { if plugin == nil { plugin = make(map[string]PluginConf) } var id = output.FLBPluginConfigKey(ctx, "Id") fmt.Printf("id: %v\n", id) var foo = output.FLBPluginConfigKey(ctx, "Foo") var bar int var err error bar, err = strconv.Atoi(output.FLBPluginConfigKey(ctx, "Bar")) if err != nil { fmt.Printf("failed to initialize bar, %v\n", err) return output.FLB_ERROR } var pluginConf = PluginConf{foo: foo, bar: bar} fmt.Printf("initialized pluginConf: %+v\n", pluginConf) plugin[id] = pluginConf return output.FLB_OK }
We can configure our updated plugin using multiple [OUTPUT]
configurations.
[OUTPUT] Name myoutputplugin Id a Foo foo1 Bar 1 [OUTPUT] Name myoutputplugin Id b Foo foo2 Bar 2
If we run our code we get something like this:
... [2022/04/07 13:12:20] [ info] [storage] normal synchronization mode, checksum disabled, max_chunks_up=128 [2022/04/07 13:12:20] [ info] [cmetrics] version=0.2.2 id: a initialized pluginConf: {foo:foo1 bar:1} id: b initialized pluginConf: {foo:foo2 bar:2} [2022/04/07 13:12:20] [ info] [sp] stream processor started ...
We can see that our plugin
map holds the values that we configured.
So now we can configure multiple output plugins, but they still don’t actually do anything.
FLBPluginFlush and FLBPluginFlushCtx
To define output behavior we need to implement either the FLBPluginFlush
or the FLBPluginFlushCtx
functions. The difference between these two functions is that FLBPluginFlushCtx
includes a Context parameter from which you can extract context that was initialized in FLBPluginInit
.
We want to tell FLBPluginFlushCtx the key to use to get the correct PluginConf
instance.
Remember that the key to the plugin
map is the Id
that was configured in fluent-bit.conf
. We will set this into the Context
inside the FLBPluginInit
function and then extract it from the Context in the FLBPluginFlushCtx
function.
To set the id into the Context:
func FLBPluginInit(ctx unsafe.Pointer) int { if plugin == nil { plugin = make(map[string]PluginConf) } var id = output.FLBPluginConfigKey(ctx, "Id") ...
To retrieve the id from the Context inside FLBPluginFlushCtx
//export FLBPluginFlushCtx func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int { // Type assert context back into the original type for the Go variable id := output.FLBPluginGetContext(ctx).(string) fmt.Printf("using id: %v, and PluginConf: %+v", id, plugin[id]) return output.FLB_OK }
Ok, so what happens if we run this now?
Nothing, if we use the current configuration. This is because we did not define an input that will feed into our two outputs. Even if we do define an input, we are still not actually doing anything with the input data inside our FLBPluginFlushCtx
code.
Firstly, here is the configuration with input defined:
[INPUT] Name tail Path logs/in.log Tag intail Read_from_Head True [OUTPUT] Name myoutputplugin Match intail Id a Foo foo1 Bar 1 [OUTPUT] Name myoutputplugin Match intail Id b Foo foo2 Bar 2
We will be using the tail input plugin to tail a file (logs/in.log
) inside the plugin working directory, we have tagged this input as intail
. Both our outputs, a
and b
, receive their input from intail
, but we could just as easily have defined separate inputs for each output plugin.
The go code to process our input is shown below:
//export FLBPluginFlushCtx func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int { // Type assert context back into the original type for the Go variable id := output.FLBPluginGetContext(ctx).(string) fmt.Printf("using id: %v, and PluginConf: %+v\n", id, plugin[id]) dec := output.NewDecoder(data, int(length)) for { // Extract Record ret, _, record := output.GetRecord(dec) if ret != 0 { break } var logKey interface{} = "log" fmt.Printf("log: %v\n", record[logKey]) } return output.FLB_OK }
If we run this code and append some text into our log file:
echo "Hello World!" >> logs/in.log
we get the output:
[2022/04/07 13:54:11] [ info] [cmetrics] version=0.2.2 id: a initialized pluginConf: {foo:foo1 bar:1} id: b initialized pluginConf: {foo:foo2 bar:2} [2022/04/07 13:54:11] [ info] [sp] stream processor started using id: a, and PluginConf: {foo:foo1 bar:1} log: [72 101 108 108 111 32 87 111 114 108 100 33] using id: b, and PluginConf: {foo:foo2 bar:2} log: [72 101 108 108 111 32 87 111 114 108 100 33] ...
So, our plugin configs are correctly initialized and we are passing the correct Id
value to the FLBPluginFlushCtx
function, via the ctx
parameter.
But our log data is in bytes as shown on lines 8 and 10.
These are the ASCII codes for “Hello World!”. To convert this to a string in go, we need to take a closer look at the output.GetRecord(dec)
signature, as shown below:
func GetRecord(dec *FLBDecoder) (ret int, ts interface{}, rec map[interface{}]interface{})
We see that it returns three values
– ret
an int, which represents that return status and will be non-zero for errors
– ts
a timestamp, taken from an underlying msgpack
object inside GetRecord
– rec
, the payload data which is of type map[interface{}]interface{}
, also taken from the underlying msgpack
object inside GetRecord
Now it should be more obvious why we use the interface logKey
as a key. The keys are interfaces as are the values. So in order to convert the value from an interface to a byte array, we can check its type then use the appropriate byte conversion as below:
func interfaceToBytes(v interface{}) []byte { switch d := v.(type) { case []byte: fmt.Printf("[]byte: %v\n", d) return d case string: fmt.Printf("string: %v\n", d) return []byte(d) case int, int32, int64, uint, uint32, uint64: fmt.Printf("int, int32, int64, uint, uint32, uint64: %v\n", d) return []byte(fmt.Sprintf("%d", d)) case float32, float64: fmt.Printf("float32, float64: %v\n", d) return []byte(fmt.Sprintf("%f", d)) case bool: fmt.Printf("bool: %v\n", d) return []byte(strconv.FormatBool(d)) case time.Time: fmt.Printf("time: %v\n", d) return []byte(d.Format(time.RFC3339)) default: fmt.Printf("default: %v\n", d) return []byte(fmt.Sprintf("%v", d)) } }
We also print out the type of the interface matched.
Then inside the FLBPluginFlushCtx
function we can call interfaceToBytes
before converting the byte array to a string and printing as shown below:
func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int { // Type assert context back into the original type for the Go variable id := output.FLBPluginGetContext(ctx).(string) fmt.Printf("using id: %v, and PluginConf: %+v\n", id, plugin[id]) dec := output.NewDecoder(data, int(length)) for { // Extract Record ret, ts, record := output.GetRecord(dec) if ret != 0 { break } var logKey interface{} = "log" fmt.Printf("timestamp: %v, log: %v\n", ts, string(interfaceToBytes(record[logKey]))) } return output.FLB_OK }))
Putting all this together we get the following output:
... [2022/04/07 15:34:27] [ info] [sp] stream processor started using id: a, and PluginConf: {foo:foo1 bar:1} []byte: [72 101 108 108 111 32 87 111 114 108 100 33] timestamp: 2022-04-07 15:34:27.589327 +0900 JST, log: Hello World! using id: b, and PluginConf: {foo:foo2 bar:2} []byte: [72 101 108 108 111 32 87 111 114 108 100 33] timestamp: 2022-04-07 15:34:27.589327 +0900 JST, log: Hello World! ...
The output from interfaceToBytes
shows us that the type of our payload data was an array of bytes and after converting to a byte array,
and then to a string, we see our original log: “Hello World!”
次世代システム研究室では、グループ全体のインテグレーションを支援してくれるアーキテクトを募集しています。インフラ設計、構築経験者の方、次世代システム研究室にご興味を持って頂ける方がいらっしゃいましたら、ぜひ募集職種一覧からご応募をお願いします。
グループ研究開発本部の最新情報をTwitterで配信中です。ぜひフォローください。
Follow @GMO_RD