Prerequisite
2. Java 8+
Install Kafka Connect DataSet Sink
-
Modify Kafka connect distributed file (i.e $KAFKA_HOME/config/connect-distributed.properties) to set the following:
bootstrap.servers=<BOOTSTRAP_SERVER1,BOOTSTRAP_SERVER2,BOOTSTRAP_SERVER3>
plugin.path=<PLUGIN_PATH>The plugin path is defined at the bottom of the distributed property file. My environment uses "/usr/local/share/kafka/plugins", but it can be a different one depending on your own setup.
-
Clone the Kafka Connect DataSet repository.
git clone https://github.com/scalyr/kafka-connect-scalyr
-
Build the connector package to generate a zipped file scalyr-kafka-connect-scalyr-sink-0.1.zip in the "target/components/packages" directory.
cd $KAFKA_CONNECT_SCALYR_HOME
mvn clean package -
Unzip the file to the Kafka connect plugin path.
unzip scalyr-kafka-connect-scalyr-sink-<VERSION>.zip -d /usr/local/share/kafka/plugin
-
Start Kafka Connect in distributed mode:
cd $KAFKA_HOME
bin/connect-distributed.sh config/connect-distributed.properties -
Modify connect-scalyr-sink-custom-app.json that is located in the etc directory of the unzipped plugin. Substituting the values of topics, scalyr_server, api_key, and event_enrichment for your own environment. Here is an example config file that accepts a JSON message from "myapp".
{
"name": "scalyr-sink-connector",
"config": {
"connector.class": "com.scalyr.integrations.kafka.ScalyrSinkConnector",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":"false",
"tasks.max": "1",
"topics": "logs",
"api_key": "<SCALYR LOG WRITE API TOKEN>",
"event_enrichment": "tag=kafka",
"custom_app_event_mapping":"[{\"matcher\": {\"attribute\": \"app.name\", \"value\": \"myapp\"}, \"eventMapping\": {\"message\": \"message\", \"logfile\": \"log.path\", \"source\": \"host.hostname\", \"parser\": \"fields.parser\", \"version\": \"app.version\", \"appField1\":\"appField1\", \"appField2\":\"nested.appField2\"}}]"
}
} -
Run the following command from the plugin etc directory to install the DataSet Sink Connector:
curl localhost:8083/connectors -X POST -H "Content-Type: application/json" -d @connect-scalyr-sink-custom-app.json
Run this command to check the status of the config:
curl http://localhost:8083/connectors/scalyr-sink-connector/status
If you need to replace the configuration file, just delete the existing configuration first before recreating it.curl -X DELETE http://localhost:8083/connectors/scalyr-sink-connector
Send a Custom Message from Kafka to DataSet
1. Sending a JSON message based on the format we previously defined in connect-scalyr-sink-custom-app.json using Kafka producer.sh script.
cd $KAFKA_HOME
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic logs
2. Sending a raw JSON message {"msg": "this is a kafka connect dataset test message"}
>{"app": {"name": "myapp", "version": "0,1"}, "message": "{\"msg\": \"this is a kafka connect dataset test message\"}", "log": {"path": "kafka.log"}, "host": {"hostname": "kafka producer"}, "fields": {"parser": "kafka-logs"}}
3. Go to app.scalyr.com to verify that the log is successfully ingested.
Comments
0 comments
Please sign in to leave a comment.