[ELK] Logstash 사용하기 📌시각화를 위한 설정 📌JDBC 상세 설정 #4
반응형

 

안녕하세요 😉

유유자적한 개발자 유로띠 입니다 😀

 

 

 

 

이전 포스팅에서

[ELK] Logstash 설치하기

 

[ELK] Logstash 설치하기

안녕하세요 😉 유유자적한 개발자 유로띠 입니다 😀 이전 포스팅인 ELK 설치 Elasticsearch 설치 및 외부 허용 설정하기 Elasticsearch 설치 및 외부 허용 설정하기 안녕하세요😁 유유자적한 개발자 유로띠 입니..

msyu1207.tistory.com

에 대해 알아보았습니다 😊

 

하지만

저 정도로는 

Logstash의 기능을 제대로 파악하지 못했습니다 😏

 

 

그래서 준비했습니다 ✌

 

Logstash를 제대로 사용해서 

kibana를 이용한

시각화를 할 수 있도록 하기 위해

 

Logstash 사용하기 ❗❗

Logstash를 잘 사용하기 위해서는

configuration을 잘 작성하고 다뤄야 합니다 👍

 

✅ 기본적인 logstash.conf 구조

input {
	stdin {}
	jdbc {}
	file {}
}

filter {
	mutate {}
	date {}
	grok {}    
}

output {
	stdout {}
	elasticsearch {}
	kafka {}
	csv {}    
}

input - 데이터를 올바르게 수집

filter - 데이터를 필요한 상태로 가공

output - 데이터를 원하는 형식에 전송

하는것이

Logstash의 역할입니다 😁

 

 

 

이번 포스팅에서는

✅ 시각화를 위한 JDBC 설정

✅ 필요한 데이터 수집을 위한 Filter 설정

✅ elasticsearch에 전 송하기 위한 output 설정

에 대해서 알아보겠습니다

 

👏👏👏👏

 

🎉 Logstash 사용하기


📢시각화를 위한 JDBC 설정

 

저번 포스팅에서는 간단하게 jdbc를 연동하여

데이터를 가져오는 것을 해보았습니다 😄

 

하지만 매번 전체 데이터를 가져올 순 없기 때문에

데이터를 가져온 이후 시점으로 가져오는 방법에 대해 알아보도록 하겠습니다

 

 

✅ 선행 조건

데이터를 마지막으로 조회한 특정 컬럼을 확인하여 그 이후 시점으로 가져오려면

데이터를 가져오는 정보 중 기준이 되는 컬럼이 있어야 합니다 👍

 

🔵 일정하게 증가하는(incremental)  numeric Type

 

🔵 생성 날짜나 변경 날짜와 같은 Date Type

 

 

자 그럼

이전 포스팅에 사용했던 Logstash를

기동 하기 위해 사용한

inspector.conf 파일을 확인해 보겠습니다 🔎

 

inspector.conf

 

추가된 부분은 use_column_value, tracking_column, last_run_metadata_path 그리고 쿼리 부분에 :sql_last_value 입니다

input {
   jdbc {
        jdbc_validate_connection => true
        jdbc_driver_library => "/usr/share/logstash/tools/mysql-connector-java-5.1.38.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_connection_string => "jdbc:mysql://test-database-1.rds.amazonaws.com:3306/<DBname>"
        jdbc_user => "username"
        jdbc_password => "password"
        use_column_value => true
        tracking_column => idx
        last_run_metadata_path => "/usr/share/logstash/inspector-index.dat"
        statement => "select * from INSPECTOR where idx > :sql_last_value order by idx ASC"
    }
}

 

use_column_value
true일 경우 :sql_last_value를 사용할 수 있으며, 쿼리가 실행된 마지막 부분을 반영합니다

 

tracking_column
값은 numeric 또는 timestamp이며 기본값은 numeric입니다
어디까지 조회했는지 <idx> 컬럼으로 확인합니다

 

last_run_metadata_path
어디까지 조회했는지에 대한 정보를 해당 경로의 inspector-index.dat파일에 저장합니다

 

 

✅ inspector-index.dat

 

파일 정보를 보면 마지막 idx를 저장합니다

때문에 order by를 ASC(오름차순)로해야 합니다

--- 26321

 

데이터를 순차적으로 가져왔다면 👍

데이터를 정기적으로 가져오는 방법에 대해 알아보겠습니다 🔎

 

 

✅ inspector.conf

 

schedule를 사용하며 cron 문법과 비슷하게 📃스케쥴링이 가능합니다

input {
   jdbc {
        jdbc_validate_connection => true
        jdbc_driver_library => "/usr/share/logstash/tools/mysql-connector-java-5.1.38.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_connection_string => "jdbc:mysql://test-database-1.rds.amazonaws.com:3306/<DBname>"
        jdbc_user => "username"
        jdbc_password => "password"
        use_column_value => true
        tracking_column => idx
        last_run_metadata_path => "/usr/share/logstash/inspector-index.dat"
        statement => "select * from INSPECTOR where idx > :sql_last_value order by idx ASC"
        schedule => "0 */2 * * *"
    }
}

 

 

💡 cron 표현식

 *  *  *  *  *    수행할 명령어
┬ ┬ ┬ ┬ ┬
│ │ │ │ │
│ │ │ │ │
│ │ │ │ └───────── 요일 (0 - 6) (0:일요일, 1:월요일, 2:화요일, …, 6:토요일)
│ │ │ └───────── 월 (1 - 12)
│ │ └───────── 일 (1 - 31)
│ └───────── 시 (0 - 23)
└───────── 분 (0 - 59)

 

 

💡 logstash의 jdbc 설정은 공식문서에서 확인할 수 있습니다

 

Jdbc input plugin | Logstash Reference [7.6] | Elastic

If not provided, Plugin will look for the driver class in the Logstash Java classpath. Additionally, if the library does not appear to be being loaded correctly via this setting, placing the relevant jar(s) in the Logstash Java classpath rather than via th

www.elastic.co

 


 

📢필요한 데이터 수집을 위한 Filter 설정

 

시각화에 필요한 데이터를 수집하기 위해선

Filter 설정을 통해 데이터를 가공해야 합니다 😁

 

📍 mutate

field data의 값을 변경할 때 사용됩니다

mutate 기능 중 remove_field 기능만 사용하였습니다

 

    🟢 remove_field

       불필요한 정보를 제거할 때 사용됩니다

 

위에처럼 시각화에 필요하지 않은 데이터에 대해서 remove_field를 사용하여 제거할 수 있습니다 😊

filter {
    mutate {
      remove_field => ["@timestamp","@version","host","message"]
    }
}

 

📍 date

날짜의 형식을 변경하거나 timezone를 변경할 때 사용됩니다

 

DB에 날짜 형식이 있으면 편하지만 그렇지 않은 경우 kibana 시각화를 하기 위해서 date type의 컬럼이 필요하기 때문에 String형식의 날짜를 date로 변경해 주어야 합니다

또한 기준 시간이 utc로 되어있기 때문에 timezone도 변경해주여야 합니다 😁

 

filter {
  date {
    match => ["setdt","yyyyMMdd'T'HHmmss"]
    timezone => "Asia/Seoul"
    target => "setdt"
  }
}

여기까지 설정한 conf 파일입니다

👏👏👏

 

✅ inspector.conf

input {
   jdbc {
        jdbc_validate_connection => true
        jdbc_driver_library => "/usr/share/logstash/tools/mysql-connector-java-5.1.38.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_connection_string => "jdbc:mysql://test-database-1.rds.amazonaws.com:3306/<DBname>"
        jdbc_user => "username"
        jdbc_password => "password"
        use_column_value => true
        tracking_column => idx
        last_run_metadata_path => "/usr/share/logstash/inspector-index.dat"
        statement => "select * from INSPECTOR where idx > :sql_last_value order by idx ASC"
        schedule => "0 */2 * * *"
    }
}

filter {
  date {
    match => ["setdt","yyyyMMdd'T'HHmmss"]
    timezone => "Asia/Seoul"
    target => "setdt"
  }
 mutate {
  remove_field => ["@timestamp","@version","host","message"]
 }
}

 

 


📢elasticsearch에 전 송하기 위한 output 설정

 

앞서 input과 Filter를 이용하여

데이터를 수집하고 가공하였습니다

 

이번엔 kibana와 연동하기 위해서

데이터를 elasticsearch에

전송하기 위한

output을 설정해보도록 하겠습니다

 

 

📍 output

 

elasticsearch와 stdout 두 개 동시해 설정하는 multiple output 설정(debug를 위해 stdout를 추가하였습니다)

가능합니다

생성할 index의 명칭을 등록합니다 (나중에 kibana에서 시각화하기 위해 필요합니다)

output {
  elasticsearch {
        hosts => ["localhost:9200"]
        index => "inspector"
  }
 stdout {}
}

 

hosts
유형은 URL 입니다
원격 인스턴스의 호스트를 설정합니다

 

 

index
유형은 String입니다
인덱스를 생성하며 대문자를 포함할 수 없습니다

 

 

 

💡 이밖에 output의 정보도 공식문서를 참조하면 좋을 거 같습니다

 

Elasticsearch output plugin | Logstash Reference [7.6] | Elastic

Compatibility Note Starting with Elasticsearch 5.3, there’s an HTTP setting called http.content_type.required. If this option is set to true, and you are using Logstash 2.4 through 5.2, you need to update the Elasticsearch output plugin to version 6.2.5 or

www.elastic.co

 


📢마무리

 

오늘은

✅ 시각화를 위한 JDBC 설정

✅ 필요한 데이터 수집을 위한 Filter 설정

✅ elasticsearch에 전송하기 위한 output 설정

대해서 알아보았습니다

 

드디어 오늘 알아본 최종 파일입니다

👏👏👏👏

✅ inspector.conf

input {
   jdbc {
        jdbc_validate_connection => true
        jdbc_driver_library => "/usr/share/logstash/tools/mysql-connector-java-5.1.38.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_connection_string => "jdbc:mysql://test-database-1.rds.amazonaws.com:3306/<DBname>"
        jdbc_user => "username"
        jdbc_password => "password"
        use_column_value => true
        tracking_column => idx
        last_run_metadata_path => "/usr/share/logstash/inspector-index.dat"
        statement => "select * from INSPECTOR where idx > :sql_last_value order by idx ASC"
        schedule => "0 */2 * * *"
    }
}

filter {
  date {
    match => ["setdt","yyyyMMdd'T'HHmmss"]
    timezone => "Asia/Seoul"
    target => "setdt"
  }
 mutate {
  remove_field => ["@timestamp","@version","host","message"]
 }
}

output {
  elasticsearch {
        hosts => ["localhost:9200"]
        index => "inspector"
  }
 stdout {}
}

 

 

반응형