Introduction
Since the DataSet Kafka connector was introduced, we've gotten a lot of questions about configuring Fluentd to pipeline messages before storing them into a Kafka topic. I previously outlined the configuration steps for ingesting custom application logs to DataSet. A similar procedure is used to configure Fluentd to send Kafka messages to DataSet.
Install fluent-plugin-kafka
Run the following command to get fluent-plugin-kafka.
td-agent-gem install fluent-plugin-kafka
Set up Fluentd Config
It's worth mentioning that DataSet also has a Fluentd plugin and there's a separate blog post for configuring Fluentd message ingestion to DataSet. I am going to skip the prerequisites and only focus on the Fluentd configuration (for sending messages from Fluentd to Kafka).
Here is an example Fluentd configuration (ex. /etc/td-agent/td-agent.conf) that forwards messages from HTTP port 8888 on a local machine to a remote Kafka cluster.
<source>
@type http
port 8888
</source>
<filter scalyr.myapp.access>
@type record_transformer
<record>
hostname "#{Socket.gethostname}"
tag "myapp"
</record>
</filter>
<match scalyr.myapp.access>
@type kafka2
# list of seed brokers
brokers <BROKER2_IP>:9092,<BROKER2_IP>:9092
use_event_time true
# buffer settings
<buffer topic>
@type file
path /var/log/td-agent/buffer/td
flush_interval 3s
</buffer>
# data type settings
<format>
@type json
</format>
# topic settings
topic_key topic
default_topic log-messages
# producer settings
required_acks -1
compression_codec gzip
</match>
A couple of key configurations in this file:
- Assigning tag "myapp" for matching Kafka Connect's custom_app_event_mapping which we will cover in the later section.
- The messages are forwarded to the remote Kafka cluster (ie.<BROKER2_IP>:9092,<BROKER2_IP>:9092).
Restart Fluentd to apply the new configuration.
Configure Kafka Connect DataSet Sink Mapping
Creating a new connect-scalyr-sink-custom-app.json file and specify the custom app event mapping based on the Fluentd message's structure.
{
"name": "scalyr-sink-connector",
"config": {
"connector.class": "com.scalyr.integrations.kafka.ScalyrSinkConnector",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":"false",
"tasks.max": "1",
"topics": "log-messages",
"api_key": "<SCALYR_API_KEY>",
"event_enrichment": "env=dev,parser=kafka",
"custom_app_event_mapping":"[{\"matcher\": {\"attribute\": \"tag\", \"value\": \"myapp\"}, \"eventMapping\": {\"message\": \"message\", \"logfile\": \"path\", \"serverHost\": \"hostname\"}}]"}
}
Then, executing the following command to install the DataSet Sink Connector:
curl localhost:8083/connectors -X POST -H "Content-Type: application/json" -d @connect-scalyr-sink-custom-app.json
We are now ready to consume messages from Fluentd and all the messages will be stored in the "log-messages" Kafka topic.
Send an Event From Fluentd to Kafka
Submitting an HTTP request to send a JSON message to Fluentd.
curl -XPOST
-d 'json={"tag": "myapp", "message":"{\"msg\":\"test message from fluentd to kafka\"}", "path": "kafka.log"}'
http://127.0.0.1:8888/scalyr.myapp.access
This message will be forwarded to the "log-messages" Kafka topic. You can confirm the message is available on the topic by running the consumer.sh script.
cd $KAFKA_HOME
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic log-messages --from-beginning
....
....
{"tag":"myapp","hostname":"ip-172-31-26-145.us-west-2.compute.internal","message":"{\"msg\":\"test message from fluentd to kafka\"}","path":"kafka.log"}
This message will eventually be forwarded to DataSet because it matches with our custom app configuration tag=myapp.
View Logs at app.scalyr.com
Finally, go to app.scalyr.com to verify that the message is successfully ingested to your account:
Comments
0 comments
Please sign in to leave a comment.