Loading...

Aggregate data in Elastisearch Part 1

Elasticsearch with its Query DSL allows powerful aggregations in order to save documents and disk space. After a certain period of time a certain level of detail is not needed anymore. For instance, I collect on a daily basis statistical data about fraud prevention services.

GET _cat/indices/fraud?v&s=index:asc

health status index            uuid                   pri rep docs.count docs.deleted store.size pri.store.size
green  open   fraud-2017.08.17 OlwgIRenQ_2dyiKm-Aapkw   2   1       1680            0      1.2mb          634kb
green  open   fraud-2017.08.18 0aJcUMbFQSa3DpGtg1l5iw   2   1      20160            0     12.8mb          6.4mb
green  open   fraud-2017.08.19 pQCsW7NpSZe5UuJT5vcIvQ   2   1      20160            0     12.6mb          6.3mb
green  open   fraud-2017.08.20 G4qG8L5HRKGHddx9jyYbvQ   2   1      26160            0       15mb          7.5mb
green  open   fraud-2017.08.21 UNkfZaXISj-p2fOBUxor0Q   2   1      92789            0     45.3mb         22.7mb
green  open   fraud-2017.08.22 bz8vqtC2RUW1YjnN2L2oQw   2   1      88361            0     44.2mb           22mb
green  open   fraud-2017.08.23 8AtPnSy-TVu0fMEQ3lbcsw   2   1      80999            0     40.6mb         20.3mb
green  open   fraud-2017.08.24 3GIiEC7aRCOvFFzM0_Fi2w   2   1     194570            0     70.4mb         35.1mb
green  open   fraud-2017.08.25 og1Xx9XITJa1gKBxdQMYLQ   2   1     234934            0     83.8mb         41.8mb
green  open   fraud-2017.08.26 AUwvDE0JR0aKIHkKGTseSg   2   1     235553            0     84.4mb         42.2mb
green  open   fraud-2017.08.27 MdLO0ULoQn6al0MSdPu8IA   2   1     275991            0     93.1mb         46.5mb
green  open   fraud-2017.08.28 93nZmpkOSWOcUbm1CxHtCQ   2   1     324153            0      106mb           53mb
green  open   fraud-2017.08.29 Nm021E6sTFi-9DmlGqQEkA   2   1     315797            0    103.4mb         51.7mb
green  open   fraud-2017.08.30 NCJbV-uLSzGbjjRQMvRqug   2   1     283340            0     96.1mb           48mb
green  open   fraud-2017.08.31 AQAWdCqDT3ehvjyn4FbLwg   2   1     332613            0    115.4mb         57.6mb
green  open   fraud-2017.09.01 fNdnrAzIRbOhwuHMsBH-KA   2   1     305892            0    109.7mb           55mb
green  open   fraud-2017.09.02 Z9ynOZfhQgeIi8EH9VUzTg   2   1     276176            0    103.6mb           52mb
green  open   fraud-2017.09.03 IUE7xIf0RFyqfxqeTmDtTQ   2   1     231013            0       91mb         45.6mb
green  open   fraud-2017.09.04 bwyZp5eMTa-9tZa2dGIw6g   2   1     268054            0    100.9mb         50.2mb
green  open   fraud-2017.09.05 tuGZS68IT6aQV5fQbwkL1A   2   1     235889            0     92.9mb         46.4mb
green  open   fraud-2017.09.06 DS2syWlHSSKzzwCqm2ImAA   2   1     227299            0     89.8mb           45mb
green  open   fraud-2017.09.07 PwZ39BHVRDekpe3Eklapgg   2   1     251881            0     92.9mb         46.4mb
green  open   fraud-2017.09.08 tkLcydSoT9KIBTjjEFXH1A   2   1     175374            0     68.5mb         34.3mb
green  open   fraud-2017.09.09 IaRhV8MHTaO8WHCDYqyf7g   2   1     184333            0     79.3mb         39.6mb
green  open   fraud-2017.09.10 5Kc-F3omQHiA1YzU4U5Y4Q   2   1     161799            0     70.6mb         35.4mb
green  open   fraud-2017.09.11 Ajbw9XnNTga66bN-U7IgTA   2   1     205447            0     83.1mb         41.5mb
green  open   fraud-2017.09.12 8DrE-dZ_TQmr1Boor09BKw   2   1     187816            0     70.2mb           35mb
green  open   fraud-2017.09.13 hkQ3WQ49SM-rggxqQonfiw   2   1     234633            0     88.5mb         44.3mb
green  open   fraud-2017.09.14 Q39tR7sKSHqgbEJpJTQiZQ   2   1     230865            0     87.9mb         43.9mb
green  open   fraud-2017.09.15 ebpcLvWnSkSLen7OP7w14Q   2   1     188488            0     78.3mb         39.2mb
green  open   fraud-2017.09.16 FBFbgbadQg-oTMDfNoob0g   2   1     224340            0       96mb           48mb
green  open   fraud-2017.09.17 6z6TpLq7TXuM64K1hYlaDQ   2   1     239607            0     98.6mb         49.1mb
green  open   fraud-2017.09.18 Zq-KBD2sTd2Wnj00Eqv2qQ   2   1     207967            0     90.4mb         45.2mb
green  open   fraud-2017.09.19 VaZmTvtqRY6UmF779jb3TA   2   1     209122            0     77.5mb         38.8mb
green  open   fraud-2017.09.20 avVtOhkqSZuccPkYuOmU5g   2   1     203056            0     74.6mb         37.3mb
green  open   fraud-2017.09.21 gakY_3jHSUq1maHR-4wLXA   2   1     127662            0       55mb         27.4mb
green  open   fraud-2017.09.22 zMDUizWVREq9590wUsJdqQ   2   1     127546            0     55.5mb         27.7mb
green  open   fraud-2017.09.23 ptPaU1WZSKa57jflLpdvNA   2   1      91948            0     44.9mb         22.4mb
green  open   fraud-2017.09.24 uOi464xxTBeoUGDGMQAckQ   2   1     104120            0     47.3mb         23.6mb
green  open   fraud-2017.09.25 tHMnRTu9R3W_woxsKS2qAA   2   1      98119            0     46.3mb           23mb
green  open   fraud-2017.09.26 XgHv3j9ASwq0q_U4otsSFw   2   1     118299            0       52mb           26mb
green  open   fraud-2017.09.27 CeY_Qw1eQ1yEi7WalM_Zlg   2   1     135067            0     61.1mb         30.5mb
green  open   fraud-2017.09.28 5SRhQvB1RdeLPy8WiDQjGA   2   1     121341            0     56.8mb         28.7mb
green  open   fraud-2017.09.29 L2zfdZZCR9e-pQnQ9e5I1A   2   1     136221            0     63.1mb         31.2mb
green  open   fraud-2017.09.30 oncWQAIcSzmBPRt2wlHvTA   2   1     165502            0     80.9mb         40.5mb
green  open   fraud-2017.10.01 OOg1SZH1Qjmo85NSxbDfjg   2   1     162648            0       77mb         38.6mb
green  open   fraud-2017.10.02 wc6l_5WDRVCHMPiUx9BBRQ   2   1     177023            0     82.5mb         41.4mb
green  open   fraud-2017.10.03 6GYS6z8hSqynFFI9GxyWTA   2   1     186684            0       72mb           36mb
green  open   fraud-2017.10.04 _ZkXUpbbRO-euZv_8Vatlw   2   1     177498            0     69.5mb         34.7mb
green  open   fraud-2017.10.05 6G1OZobKTbKQ9MdncVhtPQ   2   1     180769            0     70.3mb         35.1mb
green  open   fraud-2017.10.06 leXb6SkhQASzcZ164hSksg   2   1     194112            0     74.3mb         37.2mb
green  open   fraud-2017.10.07 4rvy0nWWRZGf42eyPBECPg   2   1     181823            0     70.3mb         35.1mb
green  open   fraud-2017.10.08 9rTk6wO_ThWAOntg98qIYg   2   1     125629            0     54.4mb         27.2mb
green  open   fraud-2017.10.09 opTUqawdTzqeFDIs5ouAfw   2   1     144947            0     59.3mb         29.6mb
green  open   fraud-2017.10.10 xvQlqnhlSSiwmJU345_tNA   2   1     141745            0     58.4mb         29.1mb
green  open   fraud-2017.10.11 NODK8l5WS06iYsZ94Ui-AA   2   1     132986            0     56.5mb         28.2mb
green  open   fraud-2017.10.12 aEwb4ihqQ7iKeLRm_ALB6w   2   1     135184            0     57.2mb         28.6mb
green  open   fraud-2017.10.13 WJHzV1RzR2SazRfN5P_8Bw   2   1     143217            0     59.5mb         29.7mb
green  open   fraud-2017.10.14 qQmNX0sySxG7ow21vn3Bnw   2   1     133659            0       57mb         28.5mb
green  open   fraud-2017.10.15 xch4F_E_Rhi8eX1CQlLyCQ   2   1     121647            0     53.1mb         26.5mb
green  open   fraud-2017.10.16 5bk-GdujRyOpK2JbSvjdOA   2   1     141811            0     58.5mb         29.2mb
green  open   fraud-2017.10.17 wU1uLfkETgaTTyuAGQjxhw   2   1     173206            0     66.1mb           33mb
green  open   fraud-2017.10.18 1wwFZwfjSLmVNxhzo3-3Rw   2   1     142172            0     59.5mb         29.7mb
green  open   fraud-2017.10.19 ZzITeyNaSfWAvv9AiKNr9A   2   1     126300            0     55.8mb         27.8mb
green  open   fraud-2017.10.20 BbwIASphRXe3jbE-hgammg   2   1      46324            0     20.8mb         10.4mb

Each minute statistical values are logged. If we look for one logger a whole day:

Get count

GET fraud-2017.08.19/_search
{
  "size": 0,
  "query": {
    "bool": {
      "must": [
        {
          "match": {
            "channel.keyword": "Issuing"
          }
        },
        {
          "match": {
            "logger.keyword": "STA9101"
          }
        }
      ]
    }
  }
}
{
  "took": 14,
  "timed_out": false,
  "_shards": {
    "total": 2,
    "successful": 2,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 1440,
    "max_score": 0,
    "hits": []
  }
}

1440 documents = 1 doc * 60 minute * 24 hours = 1440 metric documents

Aggregate

1440 metric documents can be reduced to 24 documents on a hour basis

Chain aggregations: date histogram and metric aggregation

GET fraud-2017.08.19/_search
{
  "size": 0,
  "query": {
    "bool": {
      "must": [
        {
          "match": {
            "channel.keyword": "Issuing"
          }
        },
        {
          "match": {
            "logger.keyword": "STA9101"
          }
        }
      ]
    }
  },
  "aggs": {
    "trx_over_time": {
      "date_histogram": {
        "field": "@timestamp",
        "interval": "1h"
      },
      "aggs": {
        "sum_trx": {
          "sum": {
            "field": "value"
          }
        }
      }
    }
  }
}

Get 24 hours

{
  "took": 2,
  "timed_out": false,
  "_shards": {
    "total": 2,
    "successful": 2,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 1440,
    "max_score": 0,
    "hits": []
  },
  "aggregations": {
    "trx_over_time": {
      "buckets": [
        {
          "key_as_string": "2017-08-19T00:00:00.000Z",
          "key": 1503100800000,
          "doc_count": 60,
          "sum_trx": {
            "value": 6742
          }
        },
        {
          "key_as_string": "2017-08-19T01:00:00.000Z",
          "key": 1503104400000,
          "doc_count": 60,
          "sum_trx": {
            "value": 4734
          }
        },
        {
          "key_as_string": "2017-08-19T02:00:00.000Z",
          "key": 1503108000000,
          "doc_count": 60,
          "sum_trx": {
            "value": 3752
          }
        },
        {
          "key_as_string": "2017-08-19T03:00:00.000Z",
          "key": 1503111600000,
          "doc_count": 60,
          "sum_trx": {
            "value": 5408
          }
        },
        {
          "key_as_string": "2017-08-19T04:00:00.000Z",
          "key": 1503115200000,
          "doc_count": 60,
          "sum_trx": {
            "value": 13376
          }
        },
        {
          "key_as_string": "2017-08-19T05:00:00.000Z",
          "key": 1503118800000,
          "doc_count": 60,
          "sum_trx": {
            "value": 34932
          }
        },
        {
          "key_as_string": "2017-08-19T06:00:00.000Z",
          "key": 1503122400000,
          "doc_count": 60,
          "sum_trx": {
            "value": 93086
          }
        },
        {
          "key_as_string": "2017-08-19T07:00:00.000Z",
          "key": 1503126000000,
          "doc_count": 60,
          "sum_trx": {
            "value": 163467
          }
        },
        {
          "key_as_string": "2017-08-19T08:00:00.000Z",
          "key": 1503129600000,
          "doc_count": 60,
          "sum_trx": {
            "value": 230601
          }
        },
        {
          "key_as_string": "2017-08-19T09:00:00.000Z",
          "key": 1503133200000,
          "doc_count": 60,
          "sum_trx": {
            "value": 264623
          }
        },
        {
          "key_as_string": "2017-08-19T10:00:00.000Z",
          "key": 1503136800000,
          "doc_count": 60,
          "sum_trx": {
            "value": 248176
          }
        },
        {
          "key_as_string": "2017-08-19T11:00:00.000Z",
          "key": 1503140400000,
          "doc_count": 60,
          "sum_trx": {
            "value": 238703
          }
        },
        {
          "key_as_string": "2017-08-19T12:00:00.000Z",
          "key": 1503144000000,
          "doc_count": 60,
          "sum_trx": {
            "value": 248056
          }
        },
        {
          "key_as_string": "2017-08-19T13:00:00.000Z",
          "key": 1503147600000,
          "doc_count": 60,
          "sum_trx": {
            "value": 247916
          }
        },
        {
          "key_as_string": "2017-08-19T14:00:00.000Z",
          "key": 1503151200000,
          "doc_count": 60,
          "sum_trx": {
            "value": 216478
          }
        },
        {
          "key_as_string": "2017-08-19T15:00:00.000Z",
          "key": 1503154800000,
          "doc_count": 60,
          "sum_trx": {
            "value": 160784
          }
        },
        {
          "key_as_string": "2017-08-19T16:00:00.000Z",
          "key": 1503158400000,
          "doc_count": 60,
          "sum_trx": {
            "value": 107450
          }
        },
        {
          "key_as_string": "2017-08-19T17:00:00.000Z",
          "key": 1503162000000,
          "doc_count": 60,
          "sum_trx": {
            "value": 86520
          }
        },
        {
          "key_as_string": "2017-08-19T18:00:00.000Z",
          "key": 1503165600000,
          "doc_count": 60,
          "sum_trx": {
            "value": 68501
          }
        },
        {
          "key_as_string": "2017-08-19T19:00:00.000Z",
          "key": 1503169200000,
          "doc_count": 60,
          "sum_trx": {
            "value": 55975
          }
        },
        {
          "key_as_string": "2017-08-19T20:00:00.000Z",
          "key": 1503172800000,
          "doc_count": 60,
          "sum_trx": {
            "value": 40971
          }
        },
        {
          "key_as_string": "2017-08-19T21:00:00.000Z",
          "key": 1503176400000,
          "doc_count": 60,
          "sum_trx": {
            "value": 27974
          }
        },
        {
          "key_as_string": "2017-08-19T22:00:00.000Z",
          "key": 1503180000000,
          "doc_count": 60,
          "sum_trx": {
            "value": 18237
          }
        },
        {
          "key_as_string": "2017-08-19T23:00:00.000Z",
          "key": 1503183600000,
          "doc_count": 60,
          "sum_trx": {
            "value": 13241
          }
        }
      ]
    }
  }
}

Source document

If we look into the source document, which is parsed by a pipeline, we have a lot of information which aren’t related to the statistical information. We can not only save documents, but also clean up the number of fields.

{
  "_index": "fraud-2017.10.20",
  "_type": "stats",
  "_id": "AV84tDR8IIOyJsb0pJ3m",
  "_score": 1,
  "_source": {
    "instance": 0,
    "offset": 1050043,
    "level": "I",
    "logger": "STA9101",
    "channel": "Issuing",
    "input_type": "log",
    "logmessage": "2290 transactions since 2017-10-20 09:33:10, next statistical log at: 2017-10-20 09:35:10",
    "index": "fraud",
    "source": "/var/log/RiskShield/iss/prd/cur/2017-10-20_DecisionServer_Stats.log",
    "type": "stats",
    "tags": [
      "beats_input_codec_plain_applied"
    ],
    "environment": "prd",
    "@timestamp": "2017-10-20T07:34:10.000Z",
    "application": "RiskShield",
    "@version": "1",
    "beat": {
      "hostname": "fraud-detect",
      "name": "fraud-detect",
      "version": "5.5.2"
    },
    "host": "fraud-detect",
    "value": 2290
  },
  "fields": {
    "@timestamp": [
      1508484850000
    ]
  }
}

Using Watcher

One way for automation, is Elasticsearch Watcher. If you don’t have a commercial license, you could also easily accomplish this task with Spring Batch and a custom Tasklet Implementation using the official Elasticsearch Java Rest Client libraries.

For demonstration purpose, following Index and Mapping will be used:

DELETE test
PUT test
{
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 0
  },
  "mappings": {
    "stats": {
       "_all": {
        "enabled": false
       },
      "properties": {
        "@timestamp": {
          "type": "date"
        },
        "channel": {
          "type": "keyword"
        },
        "logger": {
          "type": "keyword"
        },
        "value": {
          "type": "integer"
        },
        "ingest.agent": {
          "type": "keyword"
        },
        "ingest.time": {
          "type": "date"
        }
      }
    }
  }
}

Now the tricky part. As index source I choose a specific index for testing. Use an alias in production instead. The action part is more interesting. The search aggregation results will be used as index payload to write a new document.

Define watch for testing purpose on specific index

PUT /_xpack/watcher/watch/fraud-issuing-aggregations
{
  "input": {
    "search": {
      "request": {
        "indices": [
          "fraud-2017.08.20"
        ],
        "types": [
          "stats"
        ],
        "body": {
          "size": 0,
          "query": {
            "bool": {
              "must": [
                {
                  "match": {
                    "channel.keyword": "Issuing"
                  }
                },
                {
                  "match": {
                    "logger.keyword": "STA9101"
                  }
                }
              ]
            }
          },
          "aggs": {
            "trx_over_time": {
              "date_histogram": {
                "field": "@timestamp",
                "interval": "1h"
              },
              "aggs": {
                "sum_trx": {
                  "sum": {
                    "field": "value"
                  }
                }
              }
            }
          }
        }
      }
    }
  },
  "trigger": {
    "schedule": {
      "interval": "1d"
    }
  },
  "actions": {
    "index_payload": {
      "transform": {
        "script": {
          "lang": "painless",
          "source": """
          def docs = [];
def id = '';
def value = 0;
for(item in ctx.payload.aggregations.trx_over_time.buckets) {
def document = [
'_id': item.key,
'@timestamp': LocalDateTime.ofInstant(Instant.ofEpochMilli(item.key), ZoneOffset.UTC).atZone(ZoneId.of("Europe/Zurich")).toInstant().toEpochMilli(),
'value': item.sum_trx.value,
'logger': 'STA9101',
'channel': 'Issuing',
'ingest.time': ctx.execution_time,
'ingest.agent': 'watcher'
];
docs.add(document);}
return ['_doc' : docs];
          """
        }
      },
      "index": {
        "index": "test",
        "doc_type": "stats"
      }
    }
  }
}

Execute watch manually

POST _xpack/watcher/watch/fraud-issuing-aggregations/_execute

Query aggregated documents

GET test/_search
{
  "query": {"match_all": {}}
}

If everything works perfectly, we can adjust the watcher to do it for indices older than two weeks and remove the old indices. Therefore Elasticsearch Curator comes in handy, by assigning indices older than two weeks to a dedicated alias.