How to Write Output Plugin
require 'fluent/plugin/output'
module Fluent::Plugin
class SomeOutput < Output
# First, register the plugin. 'NAME' is the name of this plugin
# and identifies the plugin in the configuration file.
Fluent::Plugin.register_output('NAME', self)
# Enable threads if you are writing an async buffered plugin.
helpers :thread
# Define parameters for your plugin.
config_param :path, :string
#### Non-Buffered Output #############################
# Implement `process()` if your plugin is non-buffered.
# Read "Non-Buffered output" for details.
######################################################
def process(tag, es)
es.each do |time, record|
# output events to ...
end
end
#### Sync Buffered Output ##############################
# Implement `write()` if your plugin uses normal buffer.
# Read "Sync Buffered Output" for details.
########################################################
def write(chunk)
real_path = extract_placeholders(@path, chunk)
log.debug 'writing data to file', chunk_id: dump_unique_id_hex(chunk.unique_id)
# For standard chunk format (without `#format()` method)
chunk.each do |time, record|
# output events to ...
end
# For custom format (when `#format()` implemented)
# File.open(real_path, 'w+')
# or `#write_to(io)` is available
# File.open(real_path, 'w+') do |file|
# chunk.write_to(file)
# end
end
#### Async Buffered Output #############################
# Implement `try_write()` if you want to defer committing
# chunks. Read "Async Buffered Output" for details.
########################################################
def try_write(chunk)
real_path = extract_placeholders(@path, chunk)
log.debug 'sending data to server', chunk_id: dump_unique_id_hex(chunk.unique_id)
send_data_to_server(@host, real_path, chunk.read)
chunk_id = chunk.unique_id
# Create a thread for deferred commit.
thread_create(:check_send_result) do
while thread_current_running?
sleep SENDDATA_CHECK_INTERVAL # == 5
if check_data_on_server(real_path, chunk_id)
# commit chunk
# chunk will be deleted and not be retried anymore by this call
commit_write(chunk_id)
break
end
end
end
end
# Override `#format` if you want to customize how Fluentd stores
# events. Read the section "How to Customize the Serialization
# Format for Chunks" for details.
def format(tag, time, record)
[tag, time, record].to_json
end
end
endThree Modes of Output Plugins
Non-Buffered Mode
Sync-Buffered Mode
Async-Buffered Mode
How Fluentd Chooses Modes
How Buffers Work
Understanding Chunking and Metadata
How To Control Buffering
How to Change the Default Values for Parameters
Development Guide
How To Use Asynchronous Buffered Mode and Delayed Commit
How to Customize the Serialization Format for Chunks
List of Interface Methods
#process(tag, es)
#process(tag, es)#write(chunk)
#write(chunk)#try_write(chunk)
#try_write(chunk)#format(tag, time, record)
#format(tag, time, record)#prefer_buffered_processing
#prefer_buffered_processing#prefer_delayed_commit
#prefer_delayed_commit#extract_placeholders(str, chunk)
#extract_placeholders(str, chunk)#commit_write(chunk_id)
#commit_write(chunk_id)#rollback_write(chunk_id)
#rollback_write(chunk_id)#dump_unique_id_hex(chunk_id)
#dump_unique_id_hex(chunk_id)es.size
es.sizees.each(&block)
es.each(&block)chunk.unique_id
chunk.unique_idchunk.metadata
chunk.metadatachunk.size
chunk.sizechunk.read
chunk.readchunk.open(&block)
chunk.open(&block)chunk.write_to(io)
chunk.write_to(io)chunk.each(&block)
How to Write Tests
Plugin Testing Overview
How to Use Test Drivers
Example Test Code
Last updated
Was this helpful?