Prerequisites
3. Java 8+
Install Kafka Connect DataSet Sink
1. Clone the Kafka Connect DataSet repository.
git clone https://github.com/scalyr/kafka-connect-scalyr
2. Build the connector package to generate a zipped file scalyr-kafka-connect-scalyr-sink-<version>.zip in the "target/components/packages" directory (where <version> is the release version).
cd $KAFKA_CONNECT_SCALYR_HOME
mvn clean package
3. Unzipped the file to the Kafka connect plugin path directory (ex. /usr/local/share/kafka/plugins)
unzip scalyr-kafka-connect-scalyr-sink-<version>.zip -d /usr/local/share/kafka/plugins
4. Changing the parameter values for topic
, scalyr_server
, and api_key
of the connector-scalyr-sink property file (i.e. etc/connector-scalyr-sink.properties). You might also want to change other parameters such as event_enrichment
or tasks.max
based on your own requirements.
Here is a sample configuration:
name=scalyr-sink
connector.class=com.scalyr.integrations.kafka.ScalyrSinkConnector
tasks.max=1
topics=logs
value.converter.schemas.enable=true
scalyr_server=https://app.scalyr.com
api_key=<Scalyr LOG WRITE API TOKEN>
event_enrichment=tag=kafkaConnect,env=dev
....
5. Add the plugin path to the Kafka connect standalone property file (ex. $KAFKA_HOME/config/connect-standalone.properties). My environment uses "/usr/local/share/kafka/plugins", but it may vary, depending on your configuration:
plugin.path=/usr/local/share/kafka/plugins
6. Run the connector in standalone mode:
cd $KAFKA_HOME
bin/connect-standalone.sh config/connect-standalone.properties
Filebeat Configuration
1. Setup Filebeat as an input stream
Adding Filebeat input path (ex. /home/kafka/*.log) and the DataSet parser name (ex. kafka-logs) for processing Filebeat logs.
#=========================== Filebeat inputs =============================
filebeat.inputs:
- type: log
enabled: true
paths:
- /home/kafka/*.log
fields:
parser: 'kafka-logs'
2. Setup Kafka as an output stream
#-------------------------- Kafka output ------------------------------
output.kafka:
hosts: ["localhost:9092"]
topic: "logs"
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
3. Start Filebeat
sudo bin/filebeat -e -c /etc/filebeat/filebeat.yml
Logs Ingestion (Filebeat -> Kafka -> DataSet)
Appending a new line to the input file to produce a Filebeat message to the Kafka topic.
{"app": "filebeat_kafka_connect_demo", "msg": "this is a test message"}
This line is converted to a JSON message and sent to Kafka.
{"@timestamp":"2020-06-19T22:13:21.992Z","@metadata":{"beat":"filebeat","type":"_doc","version":"7.7.1"},"message":"{\"app\": \"filebeat_kafka_connect_demo\", \"msg\": \"this is a test message\"}","input":{"type":"log"},"fields":{"parser":"kafka-logs"},"ecs":{"version":"1.5.0"},"host":{"name":"ip-172-31-21-36","ip":["172.31.21.36","fe80::f4:fdff:fecc:c660"],"mac":["02:f4:fd:cc:c6:60"],"hostname":"ip-172-31-21-36","architecture":"x86_64","os":{"platform":"ubuntu","version":"18.04.4 LTS (Bionic Beaver)","family":"debian","name":"Ubuntu","kernel":"4.15.0-1065-aws","codename":"bionic"},"id":"a5d5aa87c541449fa22351448e488767","containerized":false},"agent":{"ephemeral_id":"db6f1a58-531b-46b4-b401-398c73c8c728","hostname":"ip-172-31-21-36","id":"fd2a76eb-6fea-4dfd-8462-188e3ce81f88","version":"7.7.1","type":"filebeat"},"cloud":{"account":{"id":"630972250024"},"image":{"id":"ami-003634241a8fcdec0"},"provider":"aws","instance":{"id":"i-01c38a174d12975ed"},"machine":{"type":"t2.medium"},"region":"us-west-2","availability_zone":"us-west-2b"},"log":{"file":{"path":"/home/kafka/kafka.log"},"offset":144}}
Finally, you can go to app.scalyr.com to verify that the log is successfully ingested:
Comments
0 comments
Please sign in to leave a comment.