Search
Close this search box.

Export data from Elasticsearch – search_after

Table of Contents

1. Introduction

In the last article Export Data from Elasticsearch – Logstash I covered topic of exporting data from Elasticsearch using Logstash. This time I want to show you another method that utilize search_after API. This is require coding activity because no one want to run manually queries to export 1 000 000 documents. Even if you increase size of hits array to get more documents at one request, this manual activity will not be convenient. Therefore time to explain how it works and how to do it with script. Let’s go.

2. Start Elasticsearch

To start Elasticsearch one node cluster use docker commands. Notice that I decided to create volumes for config & data (lines 7 and 8) because in case of need to shutdown container you will not lose 1M docs that were loaded. You can loaded it once, keep it in volume and the practice multiple export methods to have an idea. For loading documents from larger that 100mb JSON file parameter of max content length need to be increased (line 9).

				
					docker volume create elkconfig
docker volume create elkdata

docker run --rm \
--name elk \
--net elkcluster \
-v elkconfig:/usr/share/elasticsearch/config/ \
-v elkdata:/usr/share/elasticsearch/data \
-e "http.max_content_length=1000mb" \
-e "node.name=elk" \
-d \
-p 9200:9200 \
docker.elastic.co/elasticsearch/elasticsearch:8.10.4
				
			

For your convenience it is easier to set quickly simple password. One liner for you.

				
					docker exec -it elk bash -c "(mkfifo pipe1); ( (elasticsearch-reset-password -u elastic -i < pipe1) & ( echo $'y\n123456\n123456' > pipe1) );sleep 5;rm pipe1"
				
			

3. Load test data

To load sample data use generator script from article Load 1M sample records to Elasticsearch with python and curl and create file. Then mapping and run load command.

				
					curl -k -u elastic:123456 -XPUT "https://localhost:9200/connections" -H 'content-type: application/json' -d'
{
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 0
  },
  "mappings": {
      "properties": {
        "connection_name": {
          "type": "keyword"
        },
        "start_connection": {
          "type": "date",
          "format": "yyyy-MM-dd HH:mm:ss"
        }
      }
  }
}'
				
			
				
					curl -s -k -u elastic:123456 -XPOST "https://localhost:9200/connections/_bulk" -H 'Content-Type: application/x-ndjson' --data-binary @filePY2.json > /dev/null
				
			

Check if data was loaded correctly

				
					curl -k -u elastic:123456 -XGET "https://localhost:9200/_cat/shards?v&index=connections"
				
			

4. search_after

I want to show you manual steps that will be scripted later on.
Firstly To secure data consistency across multiple API requests you have to create Point in Time (PIP). 

				
					curl -k -u elastic:123456 -XPOST "https://localhost:9200/connections/_pit?keep_alive=10m"

# it will return ID like below
# {"id":"3eaGBAELY29ubmVjdGlvbnMWREtPcHVnOUpSNGlmQTZzd3JXM0pWUQAWcTkxUnZKZDZTVFcxRUxrLTk4YnREZwAAAAAAAAAAARZFaThQelNmNFJuQ3R6Q1FaWUFraS1BAAEWREtPcHVnOUpSNGlmQTZzd3JXM0pWUQAA"}
				
			

After you got PIP id you can run search query within that data snapshot.

				
					curl -k -u elastic:123456 -XPOST "https://localhost:9200/_search"  -H 'content-type: application/json' -d'
{
    "query": {
        "match_all": {}
    },
    "pit": {
        "id": "3eaGBAELY29ubmVjdGlvbnMWREtPcHVnOUpSNGlmQTZzd3JXM0pWUQAWcTkxUnZKZDZTVFcxRUxrLTk4YnREZwAAAAAAAAAAARZFaThQelNmNFJuQ3R6Q1FaWUFraS1BAAEWREtPcHVnOUpSNGlmQTZzd3JXM0pWUQAA",
        "keep_alive": "1m"
    },
    "sort": [
        {
            "_shard_doc": "desc"
        }
    ],
    "track_total_hits": false
}
'
				
			

This will give you array of hits and you have to scroll down to see last element and from that last element you are saving sort value.

				
					                "sort": [
                    999990
                ]
				
			

From now all next requests will include value of sort in search_after field. This value will be replaced every request as response will give new sort value.

				
					curl -k -u elastic:123456 -XPOST "https://localhost:9200/_search"  -H 'content-type: application/json' -d'
{
    "query": {
        "match_all": {}
    },
    "pit": {
        "id": "3eaGBAELY29ubmVjdGlvbnMWREtPcHVnOUpSNGlmQTZzd3JXM0pWUQAWcTkxUnZKZDZTVFcxRUxrLTk4YnREZwAAAAAAAAAAARZFaThQelNmNFJuQ3R6Q1FaWUFraS1BAAEWREtPcHVnOUpSNGlmQTZzd3JXM0pWUQAA",
        "keep_alive": "1m"
    },
    "sort": [
        {
            "_shard_doc": "desc"
        }
    ],
    "track_total_hits": false,
    "search_after": [                                
    999990
  ]
}
'
				
			

Response of that API call will include another hits array so you can get new value from last array element and it’s sort value. You can repeat it few times to have a feeling. Once done, try automate it.

5. Automate with Python

Scripting all steps can be done in any language but I decided to present it here with python. Copy/Paste to run it on your local machine.

				
					import requests
import json
import time
import getpass  # Library to hide password input

# Prompt for username and password
username = input("Enter your username: ")
password = getpass.getpass("Enter your password: ")

# Initialize variables
pit_id = None
last_sort = None
output_file = "output.json"
enable_logging = False  # Set to True to enable general logging
enable_logging_time = True  # Set to True to log script execution time

# Disable SSL certificate verification
requests.packages.urllib3.disable_warnings()

# Function to log messages
def log_message(message):
    if enable_logging:
        print(message)

# Function to log script execution time
def log_execution_time(start_time):
    if enable_logging_time:
        end_time = time.time()
        print(f"Script execution time: {end_time - start_time:.2f} seconds")

# Measure start time
start_time = time.time()

# Step 1: Create a PIT
pit_url = "https://localhost:9200/connections/_pit?keep_alive=10m"
pit_response = requests.post(pit_url, auth=(username, password), verify=False)

# Log the Step 1 request and response
log_message("Step 1 Request: " + pit_url)
log_message("Step 1 Response Status Code: " + str(pit_response.status_code))
log_message("Step 1 Response Body: " + pit_response.text)

if pit_response.status_code != 200:
    log_message(f"Step 1: Failed to create PIT. Status Code: {pit_response.status_code}")
    if pit_response.status_code == 401:
        log_message("Authentication failed. Check your credentials")
    exit(1)

# Parse PIT response JSON and save pit_id
pit_data = json.loads(pit_response.text)
pit_id = pit_data.get("id")

# Step 3: Create a search query with pit_id
search_url = "https://localhost:9200/_search"
search_body = {
    "size": 10000,  # Increase the size parameter to fetch more documents at once
    "query": {"match_all": {}},
    "pit": {"id": pit_id, "keep_alive": "10m"},
    "track_total_hits": False,
    "sort": [{"_shard_doc": "asc"}],
}

# Print the JSON query for Step 3
log_message("Step 3 Query: " + json.dumps(search_body, indent=2))

# Perform the search with SSL certificate verification disabled
search_response = requests.post(search_url, json=search_body, auth=(username, password), verify=False)

if search_response.status_code != 200:
    log_message(f"Step 3: Search failed. Status Code: {search_response.status_code}")
    if search_response.status_code == 401:
        log_message("Authentication failed. Check your credentials.")
    exit(1)

while True:
    # Step 4: Parse the search response
    search_data = json.loads(search_response.text)
    hits = search_data.get("hits", {}).get("hits", [])

    if not hits:
        break

    # Extract and save data to output_file
    with open(output_file, "a") as file:
        for hit in hits:
            connection_name = hit["_source"]["connection_name"]
            start_connection = hit["_source"]["start_connection"]
            file.write(json.dumps({"connection_name": connection_name, "start_connection": start_connection}) + "\n")

    # Step 5: Get last_sort for the next iteration
    last_sort = hits[-1]["sort"]

    # Step 6: Create a search query with pit_id and last_sort for the next iteration
    search_body = {
        "size": 10000,
        "query": {"match_all": {}},
        "pit": {"id": pit_id, "keep_alive": "1m"},
        "sort": [{"_shard_doc": "asc"}],
        "track_total_hits": False,
        "search_after": last_sort,
    }

    # Print the JSON query for Step 6
    log_message("Step 6 Query: " + json.dumps(search_body, indent=2))

    # Perform the search with SSL certificate verification disabled
    search_response = requests.post(search_url, json=search_body, auth=(username, password), verify=False)

    if search_response.status_code != 200:
        log_message(f"Step 6: Search failed. Status Code: {search_response.status_code}")
        if search_response.status_code == 401:
            log_message("Authentication failed. Check your credentials.")
        break

# Log script execution time
log_execution_time(start_time)

# Close the loop when there are no more hits
log_message("Loop completed")

				
			

6. Final thoughts

You have learned how to export data from Elasticsearch using search_after API. It will help you deal with situation when you cannot install Logstash and you want to export huge amount of documents.

Follow me on LinkedIn

Leave a Reply

Your email address will not be published. Required fields are marked *

Share the Post:

Enjoy Free Useful Amazing Content

Related Posts