First lets configure Kafka
Note: In Kafka 8.1 (and lower) you can not delete a topic, make sure to update to a latter version then 8.2+
Modify the below Kafka settings
message.max.bytes: 5000000 replica.fetch.max.bytes: 5 max.connections.per.ip: 500
To make kafka listing on all interfaces
Added the following to the Advanced Snippet text field: listeners=PLAINTEXT://,SSL:// #Note: Also Add under each broker the broker's own ip
First lets create a topic
Note: The instructions below is missing initial zookeeper setup which is alrady done on the Oracle BDA.
# partitions is the number of logstash consumers kafka-topics --create --zookeeper --partitions 2 --replication-factor 1 --topic web-sql-log # Verify if topic got created kafka-topics --zookeeper --list
To watch the active queue
kafka-console-consumer --zookeeper --topic web-sql-log # or the newer way kafka-console-producer --broker-list --old-producer --topic web-sql-log
To delete a topic (with all messages)
Make sure to stop all logstash instances using this topic
Also good to stop all kafka brokers
kafka-topics --delete --zookeeper --topic web-sql-log
Newer versions, full clean up
# Manually delete Kafka metadata in Zookeeper zookeeper-client rmr /config rmr /brokers rmr /consumers rmr /controller_epoch rmr /isr_change_notification rmr /admin rmr /controller quit # remove all data from kafka data directory rm -rf /var/local/kafka/data/*
Note: if delete is not working, stop Kafka, then
rm -r /var/local/kafka/data
Kafka helpful hints
To get all messages offset
kafka-run-class --zookeeper --group logstash --topic web-sql-log
To see the number of PartitionCount, ReplicationFactor, etc
kafka-topics --describe --topic web-sql-log --zookeeper
To modify/add a PartitionCount
The below will change it to two partitions
kafka-topics --zookeeper --alter --topic web-sql-log --partitions 2
Logstash and Kafka configuration
On the Client side, lets create a logstash output with Kafka output
#input { #file { ## Wildcards work, here :) #path => [ "/var/log/*.log", "/var/log/messages", "/var/log/syslog" ] #type => "syslog-ng" #} #} input { file { # Wildcards work, here :) path => [ "/zones/*wapp*/root/devtech101logs/application/sql.log" ] type => "sql-log" } } input { tcp { host => "" port => 3333 type => "sql-log" } } filter { # Remove empty lines # if [type] == "sql-log" { if [message] =~ /^s*$/ { drop { } } # } } output { kafka { ## Un-formated message codec => plain { format => "%{message}" } topic_id => "web-sql-log" #bootstrap_servers => "," bootstrap_servers => "" #compression_type => gzip } stdout { codec => rubydebug } }
Start logstash with SMF or at commend line
logstash -w 8 -f logstash.conf
For SMF details check here
On the server side (Kafka consumer), lets create a logstash input with Kafka input and elasticsearch output
Note: In Linux its in /etc/logstash/conf.d/50-logstash.conf
input { kafka { codec => plain #zk_connect => "," zk_connect => "" topic_id => "web-sql-log" group_id => "logstash" #reset_beginning => true decorate_events => false consumer_threads => 1 consumer_restart_sleep_ms => 100 type => "sql-log" } } filter { if [type] == "sql-log" { grok { match => { "message" => "(?m)%{MONTHDAY:MONTHDAY}%{SPACE}%{MONTH:MONTH}%{SPACE}%{YEAR:YEAR}%{SPACE}%{TIME:TIME}%{SPACE}-%{SPACE}%{LOGLEVEL:LOGLEVEL}%{SPACE}-%{SPACE}%{HOSTNAME:HOSTNAME}%{SPACE}::%{SPACE}%{DATA:SESSION_ID}%{SPACE}::%{SPACE}BHSql%{SPACE}::%{SPACE}%{DATA:DURATION}%{SPACE}::%{SPACE}%{GREEDYDATA:SQL_STATEMENT}" } add_field => { "mytimestamp" => "%{MONTHDAY} %{MONTH} %{YEAR} %{TIME}" } } date { match => [ "mytimestamp", "dd MMM YYYY HH:mm:ss.SSS", "dd MMM YYYY HH:mm:ss,SSS" ] #timezone => "UTC" target => "@timestamp" } mutate { remove_field => [ "mytimestamp", "%{MONTHDAY} %{MONTH} %{YEAR} %{TIME}" ] } } metrics { meter => "events" add_tag => "metric" flush_interval => 60 } } output { ## Debug #stdout { codec => rubydebug } if [type] == "sql-log" { elasticsearch { hosts => [ "", "", "" ] timeout => 30 index => "web-%{type}-%{+YYYY.MM.dd}" ##flush_size => 2000 #flush_size => 5 } } else if "metric" in [tags] { file { codec => line { format => "rate: %{[events][rate_1m]}" #format => "rate: %{[events][rate_5m]}" } path => "/var/tmp/logstash-%{+YYYY-MM-dd}.log" } } else { elasticsearch{ hosts => [ "", "", "" ] timeout => 30 #flush_size => 2000 } } # For testing only #if [type] == "sql-log" { #null{} #} }
Start logstash with SMF or at commend line
su - logstash logstash -w 8 -f logstash.conf # OR systemctl start logstash