Skip to content

Commit 477530a

Browse files
committed
Verify connection to Elasticsearch
1 parent b2c05d4 commit 477530a

File tree

6 files changed

+597
-4
lines changed

6 files changed

+597
-4
lines changed

elasticsearch/_async/transport.py

+73-1
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,19 @@
1818
import asyncio
1919
import logging
2020
import sys
21+
import warnings
2122
from itertools import chain
2223

2324
from ..exceptions import (
25+
AuthenticationException,
26+
AuthorizationException,
2427
ConnectionError,
2528
ConnectionTimeout,
29+
ElasticsearchWarning,
2630
SerializationError,
2731
TransportError,
2832
)
29-
from ..transport import Transport
33+
from ..transport import Transport, _verify_elasticsearch
3034
from .compat import get_running_loop
3135
from .http_aiohttp import AIOHttpConnection
3236

@@ -327,6 +331,10 @@ async def perform_request(self, method, url, headers=None, params=None, body=Non
327331
method, headers, params, body
328332
)
329333

334+
# Before we make the actual API call we verify the Elasticsearch instance.
335+
if not self._verified_elasticsearch:
336+
await self._do_verify_elasticsearch(headers=headers, timeout=timeout)
337+
330338
for attempt in range(self.max_retries + 1):
331339
connection = self.get_connection()
332340

@@ -398,3 +406,67 @@ async def close(self):
398406

399407
for connection in self.connection_pool.connections:
400408
await connection.close()
409+
410+
async def _do_verify_elasticsearch(self, headers, timeout):
411+
"""Verifies that we're connected to an Elasticsearch cluster.
412+
This is done at least once before the first actual API call
413+
and makes a single request to the 'GET /' API endpoint and
414+
check version along with other details of the response.
415+
416+
If we're unable to verify we're talking to Elasticsearch
417+
but we're also unable to rule it out due to a permission
418+
error we instead emit an 'ElasticsearchWarning'.
419+
"""
420+
# Product check has already been done, no need to do again.
421+
if self._verified_elasticsearch:
422+
return
423+
424+
headers = {header.lower(): value for header, value in (headers or {}).items()}
425+
# We know we definitely want JSON so request it via 'accept'
426+
headers.setdefault("accept", "application/json")
427+
428+
info_headers = {}
429+
info_response = {}
430+
info_error = None
431+
432+
for conn in chain(self.connection_pool.connections, self.seed_connections):
433+
try:
434+
_, info_headers, info_response = await conn.perform_request(
435+
"GET", "/", headers=headers, timeout=timeout
436+
)
437+
438+
# Lowercase all the header names for consistency in accessing them.
439+
info_headers = {
440+
header.lower(): value for header, value in info_headers.items()
441+
}
442+
443+
info_response = self.deserializer.loads(
444+
info_response, mimetype="application/json"
445+
)
446+
break
447+
448+
# Previous versions of 7.x Elasticsearch required a specific
449+
# permission so if we receive HTTP 401/403 we should warn
450+
# instead of erroring out.
451+
except (AuthenticationException, AuthorizationException):
452+
warnings.warn(
453+
(
454+
"The client is unable to verify that the server is "
455+
"Elasticsearch due security privileges on the server side"
456+
),
457+
ElasticsearchWarning,
458+
stacklevel=3,
459+
)
460+
self._verified_elasticsearch = True
461+
return
462+
463+
# This connection didn't work, we'll try another.
464+
except (ConnectionError, SerializationError):
465+
if info_error is None:
466+
info_error = info_error
467+
468+
# Check the information we got back from the index request.
469+
_verify_elasticsearch(info_headers, info_response)
470+
471+
# If we made it through the above call this config is verified.
472+
self._verified_elasticsearch = True

elasticsearch/exceptions.py

+6
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,12 @@ class SerializationError(ElasticsearchException):
5151
"""
5252

5353

54+
class NotElasticsearchError(ElasticsearchException):
55+
"""Error which is raised when the client detects
56+
it's not connected to an Elasticsearch cluster.
57+
"""
58+
59+
5460
class TransportError(ElasticsearchException):
5561
"""
5662
Exception raised when ES returns a non-OK (>=400) HTTP status code. Or when

elasticsearch/exceptions.pyi

+1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ from typing import Any, Dict, Union
2020
class ImproperlyConfigured(Exception): ...
2121
class ElasticsearchException(Exception): ...
2222
class SerializationError(ElasticsearchException): ...
23+
class NotElasticsearchError(ElasticsearchException): ...
2324

2425
class TransportError(ElasticsearchException):
2526
@property

elasticsearch/transport.py

+125
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,22 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18+
import re
1819
import time
20+
import warnings
1921
from itertools import chain
2022
from platform import python_version
2123

2224
from ._version import __versionstr__
2325
from .connection import Urllib3HttpConnection
2426
from .connection_pool import ConnectionPool, DummyConnectionPool, EmptyConnectionPool
2527
from .exceptions import (
28+
AuthenticationException,
29+
AuthorizationException,
2630
ConnectionError,
2731
ConnectionTimeout,
32+
ElasticsearchWarning,
33+
NotElasticsearchError,
2834
SerializationError,
2935
TransportError,
3036
)
@@ -198,6 +204,10 @@ def __init__(
198204
if http_client_meta:
199205
self._client_meta += (http_client_meta,)
200206

207+
# Flag which is set after verifying that we're
208+
# connected to Elasticsearch.
209+
self._verified_elasticsearch = False
210+
201211
def add_connection(self, host):
202212
"""
203213
Create a new :class:`~elasticsearch.Connection` instance and add it to the pool.
@@ -380,6 +390,9 @@ def perform_request(self, method, url, headers=None, params=None, body=None):
380390
method, headers, params, body
381391
)
382392

393+
# Before we make the actual API call we verify the Elasticsearch instance.
394+
self._do_verify_elasticsearch(headers=headers, timeout=timeout)
395+
383396
for attempt in range(self.max_retries + 1):
384397
connection = self.get_connection()
385398

@@ -488,3 +501,115 @@ def _resolve_request_args(self, method, headers, params, body):
488501
)
489502

490503
return method, headers, params, body, ignore, timeout
504+
505+
def _do_verify_elasticsearch(self, headers, timeout):
506+
"""Verifies that we're connected to an Elasticsearch cluster.
507+
This is done at least once before the first actual API call
508+
and makes a single request to the 'GET /' API endpoint to
509+
check the version along with other details of the response.
510+
511+
If we're unable to verify we're talking to Elasticsearch
512+
but we're also unable to rule it out due to a permission
513+
error we instead emit an 'ElasticsearchWarning'.
514+
"""
515+
# Product check has already been done, no need to do again.
516+
if self._verified_elasticsearch:
517+
return
518+
519+
headers = {header.lower(): value for header, value in (headers or {}).items()}
520+
# We know we definitely want JSON so request it via 'accept'
521+
headers.setdefault("accept", "application/json")
522+
523+
info_headers = {}
524+
info_response = {}
525+
526+
for conn in chain(self.connection_pool.connections, self.seed_connections):
527+
try:
528+
_, info_headers, info_response = conn.perform_request(
529+
"GET", "/", headers=headers, timeout=timeout
530+
)
531+
532+
# Lowercase all the header names for consistency in accessing them.
533+
info_headers = {
534+
header.lower(): value for header, value in info_headers.items()
535+
}
536+
537+
info_response = self.deserializer.loads(
538+
info_response, mimetype="application/json"
539+
)
540+
break
541+
542+
# Previous versions of 7.x Elasticsearch required a specific
543+
# permission so if we receive HTTP 401/403 we should warn
544+
# instead of erroring out.
545+
except (AuthenticationException, AuthorizationException):
546+
warnings.warn(
547+
(
548+
"The client is unable to verify that the server is "
549+
"Elasticsearch due security privileges on the server side"
550+
),
551+
ElasticsearchWarning,
552+
stacklevel=3,
553+
)
554+
self._verified_elasticsearch = True
555+
return
556+
557+
# This connection didn't work, we'll try another.
558+
except (ConnectionError, SerializationError):
559+
pass
560+
561+
# Check the information we got back from the index request.
562+
_verify_elasticsearch(info_headers, info_response)
563+
564+
# If we made it through the above call this config is verified.
565+
self._verified_elasticsearch = True
566+
567+
568+
def _verify_elasticsearch(headers, response):
569+
"""Verifies that the server we're talking to is Elasticsearch.
570+
Does this by checking HTTP headers and the deserialized
571+
response to the 'info' API.
572+
573+
If there's a problem this function raises 'NotElasticsearchError'
574+
otherwise doesn't do anything.
575+
"""
576+
try:
577+
version = response.get("version", {})
578+
version_number = tuple(
579+
int(x) if x is not None else 999
580+
for x in re.search(
581+
r"^([0-9]+)\.([0-9]+)(?:\.([0-9]+))?", version["number"]
582+
).groups()
583+
)
584+
except (KeyError, TypeError, ValueError, AttributeError):
585+
# No valid 'version.number' field, effectively 0.0.0
586+
version = {}
587+
version_number = (0, 0, 0)
588+
589+
# Check all of the fields and headers for missing/valid values.
590+
try:
591+
bad_tagline = response.get("tagline", None) != "You Know, for Search"
592+
bad_build_flavor = version.get("build_flavor", None) != "default"
593+
bad_product_header = headers.get("x-elastic-product", None) != "Elasticsearch"
594+
except (AttributeError, TypeError):
595+
bad_tagline = True
596+
bad_build_flavor = True
597+
bad_product_header = True
598+
599+
if (
600+
# No version or version less than 6.x
601+
version_number < (6, 0, 0)
602+
# 6.x and there's a bad 'tagline'
603+
or ((6, 0, 0) <= version_number < (7, 0, 0) and bad_tagline)
604+
# 7.0-7.13 and there's a bad 'tagline' or 'build_flavor'
605+
or (
606+
(7, 0, 0) <= version_number < (7, 14, 0)
607+
and (bad_tagline or bad_build_flavor)
608+
)
609+
# 7.14+ and there's a bad 'X-Elastic-Product' HTTP header
610+
or ((7, 14, 0) <= version_number and bad_product_header)
611+
):
612+
raise NotElasticsearchError(
613+
"The client noticed that the server is not Elasticsearch "
614+
"and we do not support this unknown product"
615+
)

0 commit comments

Comments
 (0)