DevTech101

DevTech101

First lets configure Kafka

Note: In Kafka 8.1 (and lower) you can not delete a topic, make sure to update to a latter version then 8.2+

Modify the below Kafka settings

message.max.bytes: 5000000
replica.fetch.max.bytes: 5
max.connections.per.ip: 500

To make kafka listing on all interfaces

Added the following to the Advanced Snippet text field:
listeners=PLAINTEXT://0.0.0.0:9092,SSL://127.0.0.1:9093 

#Note: Also Add under each broker the broker's own ip

First lets create a topic

Note: The instructions below is missing initial zookeeper setup which is alrady done on the Oracle BDA.

# partitions is the number of logstash consumers
kafka-topics --create --zookeeper n02.domain.com:2181 --partitions 2 --replication-factor 1 --topic web-sql-log

# Verify if topic got created
kafka-topics --zookeeper n02.domain.com:2181 --list 

To watch the active queue

kafka-console-consumer --zookeeper n02.domain.com:2181 --topic web-sql-log

# or the newer way
kafka-console-producer  --broker-list 10.10.10.11:9092  --old-producer --topic web-sql-log

To delete a topic (with all messages)

Make sure to stop all logstash instances using this topic
Also good to stop all kafka brokers

kafka-topics --delete --zookeeper n02.domain.com:2181  --topic web-sql-log

Newer versions, full clean up

# Manually delete Kafka metadata in Zookeeper 
zookeeper-client 
rmr /config
rmr /brokers
rmr /consumers
rmr /controller_epoch
rmr /isr_change_notification
rmr /admin
rmr /controller
quit

# remove all data from kafka data directory
rm -rf /var/local/kafka/data/*

Note: if delete is not working, stop Kafka, then

rm -r /var/local/kafka/data

Kafka helpful hints

To get all messages offset

kafka-run-class kafka.tools.ConsumerOffsetChecker --zookeeper n02.domain.com:2181 --group logstash --topic web-sql-log

To see the number of PartitionCount, ReplicationFactor, etc

kafka-topics --describe --topic web-sql-log --zookeeper n02.domain.com:2181

To modify/add a PartitionCount

The below will change it to two partitions

kafka-topics --zookeeper n02.domain.com:2181 --alter --topic web-sql-log --partitions 2

Logstash and Kafka configuration

On the Client side, lets create a logstash output with Kafka output

logstash.conf

#input {
  #file {
    ## Wildcards work, here :)
    #path => [ "/var/log/*.log", "/var/log/messages", "/var/log/syslog" ]
    #type => "syslog-ng"
  #}
#}

input {
  file {
    # Wildcards work, here :)
    path => [ "/zones/*wapp*/root/devtech101logs/application/sql.log" ]
    type => "sql-log"
  }
}

input {
    tcp {
        host => "10.10.19.10"
        port => 3333
        type => "sql-log"
    }
}

filter {
# Remove empty lines
#  if [type] == "sql-log" {
    if [message] =~ /^s*$/ {
      drop { }
    }
#  }
}

output {
      kafka {
        ## Un-formated message
        codec => plain {
           format => "%{message}"
        }
           topic_id => "web-sql-log"
           #bootstrap_servers => "n01.domain.com:9092,n02.domain.com:9092"
           bootstrap_servers => "n02.domain.com:9092"
           #compression_type => gzip
      }
      stdout { codec => rubydebug }
}

Start logstash with SMF or at commend line

logstash -w 8 -f logstash.conf
For SMF details check here

On the server side (Kafka consumer), lets create a logstash input with Kafka input and elasticsearch output

logstash.conf
Note: In Linux its in /etc/logstash/conf.d/50-logstash.conf

input {
   kafka {
      codec => plain
      #zk_connect => "n01.domain.com:2181,n02.domain.com:2181"
      zk_connect => "n02.domain.com:2181"
      topic_id => "web-sql-log"
      group_id => "logstash"
      #reset_beginning => true
      decorate_events => false
      consumer_threads => 1
      consumer_restart_sleep_ms => 100
      type => "sql-log"
   }
}

filter {
  if [type] == "sql-log" {
    grok {
      match => { "message" => "(?m)%{MONTHDAY:MONTHDAY}%{SPACE}%{MONTH:MONTH}%{SPACE}%{YEAR:YEAR}%{SPACE}%{TIME:TIME}%{SPACE}-%{SPACE}%{LOGLEVEL:LOGLEVEL}%{SPACE}-%{SPACE}%{HOSTNAME:HOSTNAME}%{SPACE}::%{SPACE}%{DATA:SESSION_ID}%{SPACE}::%{SPACE}BHSql%{SPACE}::%{SPACE}%{DATA:DURATION}%{SPACE}::%{SPACE}%{GREEDYDATA:SQL_STATEMENT}" }
      add_field => { "mytimestamp" => "%{MONTHDAY} %{MONTH} %{YEAR} %{TIME}" }
    }
    date {
      match => [ "mytimestamp", "dd MMM YYYY HH:mm:ss.SSS", "dd MMM YYYY HH:mm:ss,SSS" ]
      #timezone => "UTC"
      target => "@timestamp"
    }
    mutate {
      remove_field => [ "mytimestamp", "%{MONTHDAY} %{MONTH} %{YEAR} %{TIME}" ]
    }
  }
   metrics {
     meter => "events"
     add_tag => "metric"
     flush_interval => 60
  }
}

output {
    ## Debug
    #stdout { codec => rubydebug }

    if [type] == "sql-log" {
      elasticsearch {
        hosts => [ "10.10.3.25:9200", "10.10.3.26:9200", "10.10.3.27:9200" ]
        timeout => 30
        index => "web-%{type}-%{+YYYY.MM.dd}"
        ##flush_size => 2000
        #flush_size => 5
      }
    } else if "metric" in [tags] {
      file {
        codec => line {
          format => "rate: %{[events][rate_1m]}"
          #format => "rate: %{[events][rate_5m]}"
        }
          path => "/var/tmp/logstash-%{+YYYY-MM-dd}.log"
      }
    } else {
      elasticsearch{
        hosts => [ "10.10.3.25:9200", "10.10.3.26:9200", "10.10.3.27:9200" ]
        timeout => 30
        #flush_size => 2000
      }
    }
    # For testing only
    #if [type] == "sql-log" {
      #null{}
    #}
}

Start logstash with SMF or at commend line

su - logstash
logstash -w 8 -f logstash.conf

# OR
systemctl start logstash

To increase logstash Memory, add below

/opt/logstash/bin/logstash
LS_HEAP_SIZE="4g"

Log locations

Default logstash logs
/var/log/logstash
Default Kafka logs (on BDA)
/var/log/kafka/server.log
For SMF details check here
0 0 votes
Article Rating
Subscribe
Notify of
guest
0 Comments
Inline Feedbacks
View all comments
0
Would love your thoughts, please comment.x
()
x
%d bloggers like this: