Loading...

Reindex Data with Pipeline in Elasticsearch

Data is not always clean. Depending on how it is produced a number might be rendered in the JSON body as a true JSON number, e.g. 10, but it might also be rendered as a string, e.g. “10”. Some developers use MDC to pass meta data into Elasticsearch. If you have data as String and want to use Kibana for visualizations you need a fix. The only way to fix that is to reindex the data. Using the Reindex API with usage of pipelines ensures that the data have the correct data type.

Reindex Data

If you have a strict dynamic mapping or turned off coercion (forcing String to Integer), the index operation will fail.

"failures": [
{
  "index": "fo-prod-fix-2017.04.28",
  "type": "json",
  "id": "AVuzeFooEwjYNH5b13lU",
  "cause": {
	"type": "mapper_parsing_exception",
	"reason": "failed to parse [duration]",
	"caused_by": {
	  "type": "illegal_argument_exception",
	  "reason": "Integer value passed as String"
	}
  },
  "status": 400
}

Create Pipeline

Ingest nodes on elasticsearch can perform the necessary conversion for that. I create a pipeline named counter-string and use the convert processor to convert the String into Integer.

PUT _ingest/pipeline/counter-string
{
  "description": "convert from string into number converter",
    "processors": [      
      {
        "convert": {
          "field": "duration",
          "type": "integer",
          "ignore_missing": true
        }
      }
    ]
}

Read Pipeline Settings

You may check pipeline details at any time.

GET _ingest/pipeline/counter-string

The response with the pipeline object.

{
  "counter-string": {
    "description": "convert from string into number converter",
    "processors": [
      {
        "convert": {
          "field": "duration",
          "type": "integer",
          "ignore_missing": true
        }
      }
    ]
  }
}

Ensure that the negative case (field is missing), won’t impact the index operation.

Test the Pipeline

Pipelines can be tested with the simulate operation. Pay attention to the last case. If the data is an Integer already, the index action shall not fail.

POST _ingest/pipeline/counter-string/_simulate
{
  "docs": [
    {
      "_source": {
		"duration": "10"
      }
    },
    {
      "_source": {
        "duration": "1"
      }
    },
	{
      "_source": {
        "duration": 67
      }
    }
  ]
}

The pipeline output

{
  "docs": [
    {
      "doc": {
        "_id": "_id",
        "_index": "_index",
        "_type": "_type",
        "_source": {
          "duration": 10
        },
        "_ingest": {
          "timestamp": "2017-05-01T08:00:08.200Z"
        }
      }
    },
    {
      "doc": {
        "_id": "_id",
        "_index": "_index",
        "_type": "_type",
        "_source": {
          "duration": 1
        },
        "_ingest": {
          "timestamp": "2017-05-01T08:00:08.200Z"
        }
      }
    },
    {
      "doc": {
        "_id": "_id",
        "_index": "_index",
        "_type": "_type",
        "_source": {
          "duration": 67
        },
        "_ingest": {
          "timestamp": "2017-05-01T08:00:08.200Z"
        }
      }
    }
  ]
}

Reindex Data

Use pipeline in reindex action

POST _reindex
{
  "source": {
    "index": "fo-prod-2017.04.28"
  },
  "dest": {
    "index": "fo-prod-fix-2017.04.28",
    "pipeline": "counter-string"
  }
}