본문 바로가기
DataBase

[Elastic Search] Processor / Changing Data

by HANdeveloper 2024. 1. 5.

✅ processors ➡️ beats, logstash, ingest pipeline 마다 각자의 processor 가 존재

 - data 저장하기 전 정제하는 단계

 

✅ 3개의 툴 안에서 흔하게 사용하는 processor 

  1. Manipulate fields : 필드를 추가/삭제와 같이 조작하는 것 (e.g set / remove / rename / dot_expander ... )
  2. Manipulate values : 필드 안에 있는 값을 조작하는 것 (e.g split / join / grok / dissect / gsub ... )
  3. Special operations : 기타 특별한 용도로 사용 (e.g csv / json / geoip / user_agent / script / pipeline ... )

▶️ dissect processor 

시험e.g) 어디어디를 잘라 필드 4개를 만드시오

%{} syntax를 사용하여 비정형 -> 정형 데이터로 만드는 것

1.2.3.4 [30/Apr/1998:22:00:52 +0000] \"GET /english/images/montpellier/18.gif\"

⬇️ 공백 똑같이 만들어줌

%{clientip} [%{@timestamp}] \"%{verb} %{request}\"

⬇️ 비정형 -> 정형         

"_source": {
    "request": "/english/images/montpellier/18.gif\", 
    "verb": "GET",
    "@timestamp": "30/Apr/1998:22:00:52 +0000", 
    "clientip": "1.2.3.4"
}

 

▶️ pipeline processor 

 - 파이프라인을 굳이 새로 만들지않고 기존 파이프라인 재사용

 - inner_pipeline -> set 순으로 돌게 됨 (작성한 순으로)

 - set 은 새로운 필드를 생성하는 것

PUT _ingest/pipeline/blogs_pipeline 
{
  "processors" : [
    {
      "pipeline" : { 
      	"name": "inner_pipeline" 
       }
    },
    {
      "set" : {
        "field": "outer_pipeline_set",
        "value": "outer_value",
     }
    } 
  ]
}

 

Use the pipeline 3가지

  1. default_pipeline : 처음 인덱스 생성 시 기본 파이프라인을 지정해 해당 파이프라인을 무조건 타고 들어오게 함
  2. reindex : dest에 들어오는 인덱스에 내가 만든 pipeline이 수용되는 것
  3. update_by_query : pipeline 지정해주면 그 필드를 업데이트하면서 해당 파이프라인을 다 타도록 하는 것

▶️ default_pipeline

PUT blogs_fixed
{
	"settings" : {
    	"default_pipeline" : "blogs_pipeline" // 처음 인덱스 생성 시 해당 파이프라인 타도록 함
    }
}

 

▶️ reindex API

 - max_docs 와 query 를 적용해 원하는 부분만 reindex(복사) 되도록 함

POST _reindex
{
    "max_docs" : 100,
    "source" : {
    	"index" : "blogs",
        "query" : {
             "match" : {
            	"category" : "Engineering" //카테고리 필드에서 엔지니어링만 복사하기
            }
        }
    },
    "dest" : {
    	"index" : "blogs_fixed"
    }
}

 

▶️ _update_by_query

  - 인덱스 전체를 같게 다 바꾸는 것

 [ 번외 _delete_by_query : 지정된 쿼리만 삭제하는 것 ]

 

 

✅ lab 4.1 EXAM PREP 정리

Q. ingest node pipeline UI를 사용하여 다음 요구 사항을 충족하는 ingest pipeline 생성

요구사항

PUT _ingest/pipeline/web_traffic_pipeline
{
  "processors": [
    {
      "remove": {
        "field": "is_https",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "request",
        "target_field": "url.original",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "verb",
        "target_field": "http.request.method",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "response",
        "target_field": "http.response.status_code",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "geoip_location_lat",
        "target_field": "geo.location.lat",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "geoip_location_lon",
        "target_field": "geo.location.lon",
        "ignore_missing": true
      }
    },
    {
      "user_agent": {
        "field": "user_Agent",
        "ignore_missing": true
      }
    },
    {
      "remove": {
        "field": "user_Agent",
        "ignore_missing": true
      }
    }
  ]
}

 

Q. 위 pipeline running 하기

GET _ingest/pipeline/web_traffic_pipeline/_simulate //_simulate를 endpoint로 작성
{
  "docs": [
    {
      "_index": "index",
      "_id": "id",
      "_source": {
        "@timestamp": "2021-03-21T19:25:05.000-06:00",
        "bytes_sent": 26774,
        "content_type": "text/html; charset=utf-8",
        "geoip_location_lat": 39.1029,
        "geoip_location_lon": -94.5713,
        "is_https": true,
        "request": "/blog/introducing-elastic-endpoint-security",
        "response": 200,
        "runtime_ms": 191,
        "user_Agent": "Mozilla/5.0 (compatible; MJ12bot/v1.4.8; http://mj12bot.com/)",
        "verb": "GET"
      }
    }
  ]
}

 

❌ [나의 틀린 부분]

 - processor 하위는 배열에 담도록 함!

 - "ignore_missing" 알아두기

 - _simulate 통해 pipeline running 시키기

 

댓글