2022.04.08

Developing Fluent Bit Golang Output Plugins

こんにちは、次世代システム研究室のN.M.です。

Fluent Bit is developed by the same people who made Fluentd, and it serves the same purpose, which is log ingestion.

The difference is that it is designed for resource-restricted environments such as sidecar containers in Kubernetes. It is written in C and has a much lighter memory footprint than Fluentd while performing as well or better, but Fluentd is more mature and has more plugins available.

See Fluent Bit plugins here: https://docs.fluentbit.io/manual/pipeline/outputs

See Fluentd plugins here: https://www.fluentd.org/plugins/all

If you need a plugin for Fluentd chances are someone has written it already. If you need a plugin for Fluent Bit it is quite possible that it doesn’t exist. In which case, you may choose the easy path and just go with Fluentd. Alternatively, you may decide to write a new plugin yourself.

Here I cover writing Golang output plugins for Fluent Bit.

As shown in the document linked above, Fluent Bit provides hooks where the output plugin’s functionality can be defined. Let’s explore these, one by one.

 

FLBPluginRegister

//export FLBPluginRegister
func FLBPluginRegister(ctx unsafe.Pointer) int {
	return output.FLBPluginRegister(ctx, "myoutputplugin", "example output plugin")
}
The above method registers the plugin, myoutputplugin .

Remember I said that Fluent Bit is written in C, well it communicates with Go plugins using cgo. The //export FLBPluginRegister tells cgo that this function is to be accessed from C code.

The plugin may now be referred to within an [OUTPUT] element in a Fluent Bit configuration file, such as shown below:
[OUTPUT]
    Name myoutputplugin
Let’s say we save this in a file called fluent-bit.conf

FLBPluginRegister gets called once only.

This by itself is not very interesting, we can refer to a plugin by name, but we can’t configure it. Let’s do some configuration.
To do this we need to implement FLBPluginInit.

FLBPluginInit

var id string

//export FLBPluginInit
func FLBPluginInit(ctx unsafe.Pointer) int {
	id = output.FLBPluginConfigKey(ctx, "Id")
	fmt.Printf("id: %v\n", id)

	return output.FLB_OK
}
FLBPluginInit gets called once per [OUTPUT] config that refers to the plugin name, in our case myoutputplugin.

We initialize the global id variable to whatever was passed into the config:
[OUTPUT]
    Name myoutputplugin
    Id   a
So the global variable id will be initialized to the string "a"

But, since id is a global variable, subsequent plugin configs will
overwrite id‘s value.
# Initialize the id variable with 'a' 
[OUTPUT]
    Name myoutputplugin
    Id   a
# Then overwrite the id variable with 'b'
[OUTPUT]
    Name myoutputplugin
    Id   b
Obviously, we need a way to configure multiple plugins. To do this we will create a map in our go code, where the key is the Id passed in the plugin
config and the values are any other parameters that we may wish to configure. Let’s use a struct to hold these other parameters so that we can have various types.

Now our FLBPluginInit function and plugin variables will look like this:
type PluginConf struct {
	foo string
	bar int
}

var (
	plugin map[string]PluginConf
)

//export FLBPluginInit
func FLBPluginInit(ctx unsafe.Pointer) int {
	if plugin == nil {
		plugin = make(map[string]PluginConf)
	}

	var id = output.FLBPluginConfigKey(ctx, "Id")
	fmt.Printf("id: %v\n", id)

	var foo = output.FLBPluginConfigKey(ctx, "Foo")

        var bar int
        var err error
	bar, err = strconv.Atoi(output.FLBPluginConfigKey(ctx, "Bar"))
        if err != nil {
		fmt.Printf("failed to initialize bar, %v\n", err)
		return output.FLB_ERROR
	}
	var pluginConf = PluginConf{foo: foo, bar: bar}
	fmt.Printf("initialized pluginConf: %+v\n", pluginConf)
	plugin[id] = pluginConf

	return output.FLB_OK
}
We can configure our updated plugin using multiple [OUTPUT] configurations.
[OUTPUT]
    Name myoutputplugin
    Id   a
    Foo  foo1
    Bar  1

[OUTPUT]
    Name myoutputplugin
    Id   b
    Foo  foo2
    Bar  2
If we run our code we get something like this:
...
[2022/04/07 13:12:20] [ info] [storage] normal synchronization mode, checksum disabled, max_chunks_up=128
[2022/04/07 13:12:20] [ info] [cmetrics] version=0.2.2
id: a
initialized pluginConf: {foo:foo1 bar:1}
id: b
initialized pluginConf: {foo:foo2 bar:2}
[2022/04/07 13:12:20] [ info] [sp] stream processor started
...
We can see that our plugin map holds the values that we configured.
So now we can configure multiple output plugins, but they still don’t actually do anything.

 

FLBPluginFlush and FLBPluginFlushCtx

To define output behavior we need to implement either the FLBPluginFlush or the FLBPluginFlushCtx functions. The difference between these two functions is that FLBPluginFlushCtx includes a Context parameter from which you can extract context that was initialized in FLBPluginInit.

We want to tell FLBPluginFlushCtx the key to use to get the correct PluginConf instance.

Remember that the key to the plugin map is the Id that was configured in fluent-bit.conf. We will set this into the Context inside the FLBPluginInit function and then extract it from the Context in the FLBPluginFlushCtx function.

To set the id into the Context:
func FLBPluginInit(ctx unsafe.Pointer) int {
	if plugin == nil {
		plugin = make(map[string]PluginConf)
	}

	var id = output.FLBPluginConfigKey(ctx, "Id")
        ...
To retrieve the id from the Context inside FLBPluginFlushCtx
//export FLBPluginFlushCtx
func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int {
	// Type assert context back into the original type for the Go variable
	id := output.FLBPluginGetContext(ctx).(string)
	fmt.Printf("using id: %v, and PluginConf: %+v", id, plugin[id])

	return output.FLB_OK
}
Ok, so what happens if we run this now?

Nothing, if we use the current configuration. This is because we did not define an input that will feed into our two outputs. Even if we do define an input, we are still not actually doing anything with the input data inside our FLBPluginFlushCtx code.
Firstly, here is the configuration with input defined:
[INPUT]
    Name            tail
    Path            logs/in.log
    Tag             intail
    Read_from_Head  True

[OUTPUT]
    Name  myoutputplugin
    Match intail
    Id    a
    Foo   foo1
    Bar   1

[OUTPUT]
    Name myoutputplugin
    Match intail
    Id   b
    Foo  foo2
    Bar  2
We will be using the tail input plugin to tail a file (logs/in.log) inside the plugin working directory, we have tagged this input as intail. Both our outputs, a and b, receive their input from intail, but we could just as easily have defined separate inputs for each output plugin.

The go code to process our input is shown below:
//export FLBPluginFlushCtx
func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int {
	// Type assert context back into the original type for the Go variable
	id := output.FLBPluginGetContext(ctx).(string)
	fmt.Printf("using id: %v, and PluginConf: %+v\n", id, plugin[id])

	dec := output.NewDecoder(data, int(length))
	for {
		// Extract Record
		ret, _, record := output.GetRecord(dec)

		if ret != 0 {
			break
		}
		var logKey interface{} = "log"
		fmt.Printf("log: %v\n", record[logKey])
	}

	return output.FLB_OK
}
If we run this code and append some text into our log file:
echo "Hello World!" >> logs/in.log
we get the output:
[2022/04/07 13:54:11] [ info] [cmetrics] version=0.2.2
id: a
initialized pluginConf: {foo:foo1 bar:1}
id: b
initialized pluginConf: {foo:foo2 bar:2}
[2022/04/07 13:54:11] [ info] [sp] stream processor started
using id: a, and PluginConf: {foo:foo1 bar:1}
log: [72 101 108 108 111 32 87 111 114 108 100 33]
using id: b, and PluginConf: {foo:foo2 bar:2}
log: [72 101 108 108 111 32 87 111 114 108 100 33]
...
So, our plugin configs are correctly initialized and we are passing the correct Id value to the FLBPluginFlushCtx function, via the ctx parameter.
But our log data is in bytes as shown on lines 8 and 10.
These are the ASCII codes for “Hello World!”. To convert this to a string in go, we need to take a closer look at the output.GetRecord(dec) signature, as shown below:
func GetRecord(dec *FLBDecoder) (ret int, ts interface{}, rec map[interface{}]interface{})
We see that it returns three values
ret an int, which represents that return status and will be non-zero for errors
ts a timestamp, taken from an underlying msgpack object inside GetRecord
rec, the payload data which is of type map[interface{}]interface{}, also taken from the underlying msgpack object inside GetRecord

Now it should be more obvious why we use the interface logKey as a key. The keys are interfaces as are the values. So in order to convert the value from an interface to a byte array, we can check its type then use the appropriate byte conversion as below:
func interfaceToBytes(v interface{}) []byte {
	switch d := v.(type) {
	case []byte:
		fmt.Printf("[]byte: %v\n", d)
		return d
	case string:
		fmt.Printf("string: %v\n", d)
		return []byte(d)
	case int, int32, int64, uint, uint32, uint64:
		fmt.Printf("int, int32, int64, uint, uint32, uint64: %v\n", d)
		return []byte(fmt.Sprintf("%d", d))
	case float32, float64:
		fmt.Printf("float32, float64: %v\n", d)
		return []byte(fmt.Sprintf("%f", d))
	case bool:
		fmt.Printf("bool: %v\n", d)
		return []byte(strconv.FormatBool(d))
	case time.Time:
		fmt.Printf("time: %v\n", d)
		return []byte(d.Format(time.RFC3339))
	default:
		fmt.Printf("default: %v\n", d)
		return []byte(fmt.Sprintf("%v", d))
	}
}
We also print out the type of the interface matched.
Then inside the FLBPluginFlushCtx function we can call interfaceToBytes before converting the byte array to a string and printing as shown below:
func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int {
	// Type assert context back into the original type for the Go variable
	id := output.FLBPluginGetContext(ctx).(string)
	fmt.Printf("using id: %v, and PluginConf: %+v\n", id, plugin[id])

	dec := output.NewDecoder(data, int(length))
	for {
		// Extract Record
		ret, ts, record := output.GetRecord(dec)

		if ret != 0 {
			break
		}
		var logKey interface{} = "log"
		fmt.Printf("timestamp: %v, log: %v\n", ts, string(interfaceToBytes(record[logKey])))
	}

	return output.FLB_OK
}))
Putting all this together we get the following output:
...
[2022/04/07 15:34:27] [ info] [sp] stream processor started
using id: a, and PluginConf: {foo:foo1 bar:1}
[]byte: [72 101 108 108 111 32 87 111 114 108 100 33]
timestamp: 2022-04-07 15:34:27.589327 +0900 JST, log: Hello World!
using id: b, and PluginConf: {foo:foo2 bar:2}
[]byte: [72 101 108 108 111 32 87 111 114 108 100 33]
timestamp: 2022-04-07 15:34:27.589327 +0900 JST, log: Hello World!
...
The output from interfaceToBytes shows us that the type of our payload data was an array of bytes and after converting to a byte array,
and then to a string, we see our original log: “Hello World!”

 

次世代システム研究室では、グループ全体のインテグレーションを支援してくれるアーキテクトを募集しています。インフラ設計、構築経験者の方、次世代システム研究室にご興味を持って頂ける方がいらっしゃいましたら、ぜひ募集職種一覧からご応募をお願いします。

  • Twitter
  • Facebook
  • はてなブックマークに追加

グループ研究開発本部の最新情報をTwitterで配信中です。ぜひフォローください。

 
  • AI研究開発室
  • 大阪研究開発グループ

関連記事