안녕하세요 😉
유유자적한 개발자 유로띠 입니다 😀
이전 포스팅에서
에 대해 알아보았습니다 😊
하지만
저 정도로는
Logstash의 기능을 제대로 파악하지 못했습니다 😏
그래서 준비했습니다 ✌
Logstash를 제대로 사용해서
kibana를 이용한
시각화를 할 수 있도록 하기 위해
Logstash 사용하기 ❗❗
Logstash를 잘 사용하기 위해서는
configuration을 잘 작성하고 다뤄야 합니다 👍
✅ 기본적인 logstash.conf 구조
input {
stdin {}
jdbc {}
file {}
}
filter {
mutate {}
date {}
grok {}
}
output {
stdout {}
elasticsearch {}
kafka {}
csv {}
}
input - 데이터를 올바르게 수집
filter - 데이터를 필요한 상태로 가공
output - 데이터를 원하는 형식에 전송
하는것이
Logstash의 역할입니다 😁
이번 포스팅에서는
✅ 시각화를 위한 JDBC 설정
✅ 필요한 데이터 수집을 위한 Filter 설정
✅ elasticsearch에 전 송하기 위한 output 설정
에 대해서 알아보겠습니다
👏👏👏👏
🎉 Logstash 사용하기
📢시각화를 위한 JDBC 설정
저번 포스팅에서는 간단하게 jdbc를 연동하여
데이터를 가져오는 것을 해보았습니다 😄
하지만 매번 전체 데이터를 가져올 순 없기 때문에
데이터를 가져온 이후 시점으로 가져오는 방법에 대해 알아보도록 하겠습니다 ✌
✅ 선행 조건
데이터를 마지막으로 조회한 특정 컬럼을 확인하여 그 이후 시점으로 가져오려면
데이터를 가져오는 정보 중 기준이 되는 컬럼이 있어야 합니다 👍
🔵 일정하게 증가하는(incremental) numeric Type
🔵 생성 날짜나 변경 날짜와 같은 Date Type
자 그럼
이전 포스팅에 사용했던 Logstash를
기동 하기 위해 사용한
inspector.conf 파일을 확인해 보겠습니다 🔎
✅ inspector.conf
추가된 부분은 use_column_value, tracking_column, last_run_metadata_path 그리고 쿼리 부분에 :sql_last_value 입니다
input {
jdbc {
jdbc_validate_connection => true
jdbc_driver_library => "/usr/share/logstash/tools/mysql-connector-java-5.1.38.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://test-database-1.rds.amazonaws.com:3306/<DBname>"
jdbc_user => "username"
jdbc_password => "password"
use_column_value => true
tracking_column => idx
last_run_metadata_path => "/usr/share/logstash/inspector-index.dat"
statement => "select * from INSPECTOR where idx > :sql_last_value order by idx ASC"
}
}
use_column_value
true일 경우 :sql_last_value를 사용할 수 있으며, 쿼리가 실행된 마지막 부분을 반영합니다
tracking_column
값은 numeric 또는 timestamp이며 기본값은 numeric입니다
어디까지 조회했는지 <idx> 컬럼으로 확인합니다
last_run_metadata_path
어디까지 조회했는지에 대한 정보를 해당 경로의 inspector-index.dat파일에 저장합니다
✅ inspector-index.dat
파일 정보를 보면 마지막 idx를 저장합니다
때문에 order by를 ASC(오름차순)로해야 합니다
--- 26321
데이터를 순차적으로 가져왔다면 👍
데이터를 정기적으로 가져오는 방법에 대해 알아보겠습니다 🔎
✅ inspector.conf
schedule를 사용하며 cron 문법과 비슷하게 📃스케쥴링이 가능합니다
input {
jdbc {
jdbc_validate_connection => true
jdbc_driver_library => "/usr/share/logstash/tools/mysql-connector-java-5.1.38.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://test-database-1.rds.amazonaws.com:3306/<DBname>"
jdbc_user => "username"
jdbc_password => "password"
use_column_value => true
tracking_column => idx
last_run_metadata_path => "/usr/share/logstash/inspector-index.dat"
statement => "select * from INSPECTOR where idx > :sql_last_value order by idx ASC"
schedule => "0 */2 * * *"
}
}
💡 cron 표현식
* * * * * 수행할 명령어
┬ ┬ ┬ ┬ ┬
│ │ │ │ │
│ │ │ │ │
│ │ │ │ └───────── 요일 (0 - 6) (0:일요일, 1:월요일, 2:화요일, …, 6:토요일)
│ │ │ └───────── 월 (1 - 12)
│ │ └───────── 일 (1 - 31)
│ └───────── 시 (0 - 23)
└───────── 분 (0 - 59)
💡 logstash의 jdbc 설정은 공식문서에서 확인할 수 있습니다
📢필요한 데이터 수집을 위한 Filter 설정
시각화에 필요한 데이터를 수집하기 위해선
Filter 설정을 통해 데이터를 가공해야 합니다 😁
📍 mutate
field data의 값을 변경할 때 사용됩니다
mutate 기능 중 remove_field 기능만 사용하였습니다
🟢 remove_field
불필요한 정보를 제거할 때 사용됩니다
위에처럼 시각화에 필요하지 않은 데이터에 대해서 remove_field를 사용하여 제거할 수 있습니다 😊
filter {
mutate {
remove_field => ["@timestamp","@version","host","message"]
}
}
📍 date
날짜의 형식을 변경하거나 timezone를 변경할 때 사용됩니다
DB에 날짜 형식이 있으면 편하지만 그렇지 않은 경우 kibana 시각화를 하기 위해서 date type의 컬럼이 필요하기 때문에 String형식의 날짜를 date로 변경해 주어야 합니다
또한 기준 시간이 utc로 되어있기 때문에 timezone도 변경해주여야 합니다 😁
filter {
date {
match => ["setdt","yyyyMMdd'T'HHmmss"]
timezone => "Asia/Seoul"
target => "setdt"
}
}
여기까지 설정한 conf 파일입니다
👏👏👏
✅ inspector.conf
input {
jdbc {
jdbc_validate_connection => true
jdbc_driver_library => "/usr/share/logstash/tools/mysql-connector-java-5.1.38.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://test-database-1.rds.amazonaws.com:3306/<DBname>"
jdbc_user => "username"
jdbc_password => "password"
use_column_value => true
tracking_column => idx
last_run_metadata_path => "/usr/share/logstash/inspector-index.dat"
statement => "select * from INSPECTOR where idx > :sql_last_value order by idx ASC"
schedule => "0 */2 * * *"
}
}
filter {
date {
match => ["setdt","yyyyMMdd'T'HHmmss"]
timezone => "Asia/Seoul"
target => "setdt"
}
mutate {
remove_field => ["@timestamp","@version","host","message"]
}
}
📢elasticsearch에 전 송하기 위한 output 설정
앞서 input과 Filter를 이용하여
데이터를 수집하고 가공하였습니다
이번엔 kibana와 연동하기 위해서
데이터를 elasticsearch에
전송하기 위한
output을 설정해보도록 하겠습니다
📍 output
elasticsearch와 stdout 두 개 동시해 설정하는 multiple output 설정(debug를 위해 stdout를 추가하였습니다)도
가능합니다
생성할 index의 명칭을 등록합니다 (나중에 kibana에서 시각화하기 위해 필요합니다)
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "inspector"
}
stdout {}
}
hosts
유형은 URL 입니다
원격 인스턴스의 호스트를 설정합니다
index
유형은 String입니다
인덱스를 생성하며 대문자를 포함할 수 없습니다
💡 이밖에 output의 정보도 공식문서를 참조하면 좋을 거 같습니다
📢마무리
오늘은
✅ 시각화를 위한 JDBC 설정
✅ 필요한 데이터 수집을 위한 Filter 설정
✅ elasticsearch에 전송하기 위한 output 설정
대해서 알아보았습니다
드디어 오늘 알아본 최종 파일입니다
👏👏👏👏
✅ inspector.conf
input {
jdbc {
jdbc_validate_connection => true
jdbc_driver_library => "/usr/share/logstash/tools/mysql-connector-java-5.1.38.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://test-database-1.rds.amazonaws.com:3306/<DBname>"
jdbc_user => "username"
jdbc_password => "password"
use_column_value => true
tracking_column => idx
last_run_metadata_path => "/usr/share/logstash/inspector-index.dat"
statement => "select * from INSPECTOR where idx > :sql_last_value order by idx ASC"
schedule => "0 */2 * * *"
}
}
filter {
date {
match => ["setdt","yyyyMMdd'T'HHmmss"]
timezone => "Asia/Seoul"
target => "setdt"
}
mutate {
remove_field => ["@timestamp","@version","host","message"]
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "inspector"
}
stdout {}
}
'Programming > ELK' 카테고리의 다른 글
[ELK] Kibana 보안 연결을 위한 SSL를 적용 해보자 #6 (2) | 2020.04.30 |
---|---|
[ELK] KIBANA를 이용하여 데이터를 👁🗨시각화 해보자 #5 (2) | 2020.04.23 |
[ELK] Logstash 설치하기 #3 (4) | 2020.04.17 |
[ELK] KIBANA 설치하기 #2 (0) | 2020.04.16 |
Elasticsearch 설치 및 외부 허용 설정하기 #1 (2) | 2020.04.14 |