diff --git a/ipinfo/__init__.py b/ipinfo/__init__.py index 781cb95..64049b6 100644 --- a/ipinfo/__init__.py +++ b/ipinfo/__init__.py @@ -2,6 +2,8 @@ from .handler_lite_async import AsyncHandlerLite from .handler import Handler from .handler_async import AsyncHandler +from .handler_core import HandlerCore +from .handler_core_async import AsyncHandlerCore def getHandler(access_token=None, **kwargs): @@ -14,6 +16,11 @@ def getHandlerLite(access_token=None, **kwargs): return HandlerLite(access_token, **kwargs) +def getHandlerCore(access_token=None, **kwargs): + """Create and return HandlerCore object.""" + return HandlerCore(access_token, **kwargs) + + def getHandlerAsync(access_token=None, **kwargs): """Create an return an asynchronous Handler object.""" return AsyncHandler(access_token, **kwargs) @@ -22,3 +29,8 @@ def getHandlerAsync(access_token=None, **kwargs): def getHandlerAsyncLite(access_token=None, **kwargs): """Create and return asynchronous HandlerLite object.""" return AsyncHandlerLite(access_token, **kwargs) + + +def getHandlerAsyncCore(access_token=None, **kwargs): + """Create and return asynchronous HandlerCore object.""" + return AsyncHandlerCore(access_token, **kwargs) diff --git a/ipinfo/handler_core.py b/ipinfo/handler_core.py new file mode 100644 index 0000000..196b2e3 --- /dev/null +++ b/ipinfo/handler_core.py @@ -0,0 +1,300 @@ +""" +Core API client handler for fetching data from the IPinfo Core service. +""" + +import time +from ipaddress import IPv4Address, IPv6Address + +import requests + +from . import handler_utils +from .bogon import is_bogon +from .cache.default import DefaultCache +from .data import ( + continents, + countries, + countries_currencies, + countries_flags, + eu_countries, +) +from .details import Details +from .error import APIError +from .exceptions import RequestQuotaExceededError, TimeoutExceededError +from .handler_utils import ( + BATCH_MAX_SIZE, + BATCH_REQ_TIMEOUT_DEFAULT, + CACHE_MAXSIZE, + CACHE_TTL, + CORE_API_URL, + REQUEST_TIMEOUT_DEFAULT, + cache_key, +) + + +class HandlerCore: + """ + Allows client to request data for specified IP address using the Core API. + Core API provides city-level geolocation with nested geo and AS objects. + Instantiates and maintains access to cache. + """ + + def __init__(self, access_token=None, **kwargs): + """ + Initialize the HandlerCore object with country name list and the + cache initialized. + """ + self.access_token = access_token + + # load countries file + self.countries = kwargs.get("countries") or countries + + # load eu countries file + self.eu_countries = kwargs.get("eu_countries") or eu_countries + + # load countries flags file + self.countries_flags = kwargs.get("countries_flags") or countries_flags + + # load countries currency file + self.countries_currencies = ( + kwargs.get("countries_currencies") or countries_currencies + ) + + # load continent file + self.continents = kwargs.get("continent") or continents + + # setup req opts + self.request_options = kwargs.get("request_options", {}) + if "timeout" not in self.request_options: + self.request_options["timeout"] = REQUEST_TIMEOUT_DEFAULT + + # setup cache + if "cache" in kwargs: + self.cache = kwargs["cache"] + else: + cache_options = kwargs.get("cache_options", {}) + if "maxsize" not in cache_options: + cache_options["maxsize"] = CACHE_MAXSIZE + if "ttl" not in cache_options: + cache_options["ttl"] = CACHE_TTL + self.cache = DefaultCache(**cache_options) + + # setup custom headers + self.headers = kwargs.get("headers", None) + + def getDetails(self, ip_address=None, timeout=None): + """ + Get Core details for specified IP address as a Details object. + + If `timeout` is not `None`, it will override the client-level timeout + just for this operation. + """ + # If the supplied IP address uses the objects defined in the built-in + # module ipaddress extract the appropriate string notation before + # formatting the URL. + if isinstance(ip_address, IPv4Address) or isinstance( + ip_address, IPv6Address + ): + ip_address = ip_address.exploded + + # check if bogon. + if ip_address and is_bogon(ip_address): + details = {} + details["ip"] = ip_address + details["bogon"] = True + return Details(details) + + # check cache first. + try: + cached_data = self.cache[cache_key(ip_address)] + return Details(cached_data) + except KeyError: + pass + + # prepare req http opts + req_opts = {**self.request_options} + if timeout is not None: + req_opts["timeout"] = timeout + + # Build URL + url = CORE_API_URL + if ip_address: + url += "/" + ip_address + + headers = handler_utils.get_headers(self.access_token, self.headers) + response = requests.get(url, headers=headers, **req_opts) + + if response.status_code == 429: + raise RequestQuotaExceededError() + if response.status_code >= 400: + error_code = response.status_code + content_type = response.headers.get("Content-Type") + if content_type == "application/json": + error_response = response.json() + else: + error_response = {"error": response.text} + raise APIError(error_code, error_response) + + details = response.json() + + # Format and cache + self._format_core_details(details) + self.cache[cache_key(ip_address)] = details + + return Details(details) + + def _format_core_details(self, details): + """ + Format Core response details. + Core has nested geo and as objects that need special formatting. + """ + # Format geo object if present + if "geo" in details and details["geo"]: + geo = details["geo"] + if "country_code" in geo: + country_code = geo["country_code"] + geo["country_name"] = self.countries.get(country_code) + geo["isEU"] = country_code in self.eu_countries + geo["country_flag"] = self.countries_flags.get(country_code) + geo["country_currency"] = self.countries_currencies.get( + country_code + ) + geo["continent"] = self.continents.get(country_code) + geo["country_flag_url"] = ( + f"{handler_utils.COUNTRY_FLAGS_URL}{country_code}.svg" + ) + + # Top-level country_code might also exist in some responses + if "country_code" in details: + country_code = details["country_code"] + details["country_name"] = self.countries.get(country_code) + details["isEU"] = country_code in self.eu_countries + details["country_flag"] = self.countries_flags.get(country_code) + details["country_currency"] = self.countries_currencies.get( + country_code + ) + details["continent"] = self.continents.get(country_code) + details["country_flag_url"] = ( + f"{handler_utils.COUNTRY_FLAGS_URL}{country_code}.svg" + ) + + def getBatchDetails( + self, + ip_addresses, + batch_size=None, + timeout_per_batch=BATCH_REQ_TIMEOUT_DEFAULT, + timeout_total=None, + raise_on_fail=True, + ): + """ + Get Core details for a batch of IP addresses at once. + + There is no specified limit to the number of IPs this function can + accept; it can handle as much as the user can fit in RAM (along with + all of the response data, which is at least a magnitude larger than the + input list). + + The input list is broken up into batches to abide by API requirements. + The batch size can be adjusted with `batch_size` but is clipped to + `BATCH_MAX_SIZE`. + Defaults to `BATCH_MAX_SIZE`. + + For each batch, `timeout_per_batch` indicates the maximum seconds to + spend waiting for the HTTP request to complete. If any batch fails with + this timeout, the whole operation fails. + Defaults to `BATCH_REQ_TIMEOUT_DEFAULT` seconds. + + `timeout_total` is a seconds-denominated hard-timeout for the time + spent in HTTP operations; regardless of whether all batches have + succeeded so far, if `timeout_total` is reached, the whole operation + will fail by raising `TimeoutExceededError`. + Defaults to being turned off. + + `raise_on_fail`, if turned off, will return any result retrieved so far + rather than raise an exception when errors occur, including timeout and + quota errors. + Defaults to on. + """ + if batch_size == None: + batch_size = BATCH_MAX_SIZE + + result = {} + lookup_addresses = [] + + # pre-populate with anything we've got in the cache, and keep around + # the IPs not in the cache. + for ip_address in ip_addresses: + # if the supplied IP address uses the objects defined in the + # built-in module ipaddress extract the appropriate string notation + # before formatting the URL. + if isinstance(ip_address, IPv4Address) or isinstance( + ip_address, IPv6Address + ): + ip_address = ip_address.exploded + + if ip_address and is_bogon(ip_address): + details = {} + details["ip"] = ip_address + details["bogon"] = True + result[ip_address] = Details(details) + else: + try: + cached_data = self.cache[cache_key(ip_address)] + result[ip_address] = Details(cached_data) + except KeyError: + lookup_addresses.append(ip_address) + + # all in cache - return early. + if len(lookup_addresses) == 0: + return result + + # do start timer if necessary + if timeout_total is not None: + start_time = time.time() + + # prepare req http options + req_opts = {**self.request_options, "timeout": timeout_per_batch} + + # loop over batch chunks and do lookup for each. + url = "https://api.ipinfo.io/batch" + headers = handler_utils.get_headers(self.access_token, self.headers) + headers["content-type"] = "application/json" + + for i in range(0, len(lookup_addresses), batch_size): + # quit if total timeout is reached. + if ( + timeout_total is not None + and time.time() - start_time > timeout_total + ): + return handler_utils.return_or_fail( + raise_on_fail, TimeoutExceededError(), result + ) + + chunk = lookup_addresses[i : i + batch_size] + + # lookup + try: + response = requests.post( + url, json=chunk, headers=headers, **req_opts + ) + except Exception as e: + return handler_utils.return_or_fail(raise_on_fail, e, result) + + # fail on bad status codes + try: + if response.status_code == 429: + raise RequestQuotaExceededError() + response.raise_for_status() + except Exception as e: + return handler_utils.return_or_fail(raise_on_fail, e, result) + + # Process batch response + json_response = response.json() + + for ip_address, data in json_response.items(): + # Cache and format the data + if isinstance(data, dict) and not data.get("bogon"): + self._format_core_details(data) + self.cache[cache_key(ip_address)] = data + result[ip_address] = Details(data) + + return result diff --git a/ipinfo/handler_core_async.py b/ipinfo/handler_core_async.py new file mode 100644 index 0000000..b1c90af --- /dev/null +++ b/ipinfo/handler_core_async.py @@ -0,0 +1,360 @@ +""" +Core API client asynchronous handler for fetching data from the IPinfo Core service. +""" + +import asyncio +import json +import time +from ipaddress import IPv4Address, IPv6Address + +import aiohttp + +from . import handler_utils +from .bogon import is_bogon +from .cache.default import DefaultCache +from .data import ( + continents, + countries, + countries_currencies, + countries_flags, + eu_countries, +) +from .details import Details +from .error import APIError +from .exceptions import RequestQuotaExceededError, TimeoutExceededError +from .handler_utils import ( + BATCH_MAX_SIZE, + BATCH_REQ_TIMEOUT_DEFAULT, + CACHE_MAXSIZE, + CACHE_TTL, + CORE_API_URL, + REQUEST_TIMEOUT_DEFAULT, + cache_key, +) + + +class AsyncHandlerCore: + """ + Allows client to request data for specified IP address asynchronously using the Core API. + Core API provides city-level geolocation with nested geo and AS objects. + Instantiates and maintains access to cache. + """ + + def __init__(self, access_token=None, **kwargs): + """ + Initialize the AsyncHandlerCore object with country name list and the + cache initialized. + """ + self.access_token = access_token + + # load countries file + self.countries = kwargs.get("countries") or countries + + # load eu countries file + self.eu_countries = kwargs.get("eu_countries") or eu_countries + + # load countries flags file + self.countries_flags = kwargs.get("countries_flags") or countries_flags + + # load countries currency file + self.countries_currencies = ( + kwargs.get("countries_currencies") or countries_currencies + ) + + # load continent file + self.continents = kwargs.get("continent") or continents + + # setup req opts + self.request_options = kwargs.get("request_options", {}) + if "timeout" not in self.request_options: + self.request_options["timeout"] = REQUEST_TIMEOUT_DEFAULT + + # setup aiohttp + self.httpsess = None + + # setup cache + if "cache" in kwargs: + self.cache = kwargs["cache"] + else: + cache_options = kwargs.get("cache_options", {}) + if "maxsize" not in cache_options: + cache_options["maxsize"] = CACHE_MAXSIZE + if "ttl" not in cache_options: + cache_options["ttl"] = CACHE_TTL + self.cache = DefaultCache(**cache_options) + + # setup custom headers + self.headers = kwargs.get("headers", None) + + async def init(self): + """ + Initializes internal aiohttp connection pool. + + This isn't _required_, as the pool is initialized lazily when needed. + But in case you require non-lazy initialization, you may await this. + + This is idempotent. + """ + await self._ensure_aiohttp_ready() + + async def deinit(self): + """ + Deinitialize the async handler. + + This is required in case you need to let go of the memory/state + associated with the async handler in a long-running process. + + This is idempotent. + """ + if self.httpsess: + await self.httpsess.close() + self.httpsess = None + + async def getDetails(self, ip_address=None, timeout=None): + """ + Get Core details for specified IP address as a Details object. + + If `timeout` is not `None`, it will override the client-level timeout + just for this operation. + """ + self._ensure_aiohttp_ready() + + # If the supplied IP address uses the objects defined in the built-in + # module ipaddress, extract the appropriate string notation before + # formatting the URL. + if isinstance(ip_address, IPv4Address) or isinstance(ip_address, IPv6Address): + ip_address = ip_address.exploded + + # check if bogon. + if ip_address and is_bogon(ip_address): + details = {"ip": ip_address, "bogon": True} + return Details(details) + + # check cache first. + try: + cached_data = self.cache[cache_key(ip_address)] + return Details(cached_data) + except KeyError: + pass + + # not in cache; do http req + url = CORE_API_URL + if ip_address: + url += "/" + ip_address + headers = handler_utils.get_headers(self.access_token, self.headers) + req_opts = {} + if timeout is not None: + req_opts["timeout"] = timeout + async with self.httpsess.get(url, headers=headers, **req_opts) as resp: + if resp.status == 429: + raise RequestQuotaExceededError() + if resp.status >= 400: + error_code = resp.status + content_type = resp.headers.get("Content-Type") + if content_type == "application/json": + error_response = await resp.json() + else: + error_response = {"error": resp.text()} + raise APIError(error_code, error_response) + details = await resp.json() + + # format & cache + self._format_core_details(details) + self.cache[cache_key(ip_address)] = details + + return Details(details) + + def _format_core_details(self, details): + """ + Format Core response details. + Core has nested geo and as objects that need special formatting. + """ + # Format geo object if present + if "geo" in details and details["geo"]: + geo = details["geo"] + if "country_code" in geo: + country_code = geo["country_code"] + geo["country_name"] = self.countries.get(country_code) + geo["isEU"] = country_code in self.eu_countries + geo["country_flag"] = self.countries_flags.get(country_code) + geo["country_currency"] = self.countries_currencies.get(country_code) + geo["continent"] = self.continents.get(country_code) + geo["country_flag_url"] = ( + f"{handler_utils.COUNTRY_FLAGS_URL}{country_code}.svg" + ) + + # Top-level country_code might also exist in some responses + if "country_code" in details: + country_code = details["country_code"] + details["country_name"] = self.countries.get(country_code) + details["isEU"] = country_code in self.eu_countries + details["country_flag"] = self.countries_flags.get(country_code) + details["country_currency"] = self.countries_currencies.get(country_code) + details["continent"] = self.continents.get(country_code) + details["country_flag_url"] = ( + f"{handler_utils.COUNTRY_FLAGS_URL}{country_code}.svg" + ) + + async def getBatchDetails( + self, + ip_addresses, + batch_size=None, + timeout_per_batch=BATCH_REQ_TIMEOUT_DEFAULT, + timeout_total=None, + raise_on_fail=True, + ): + """ + Get Core details for a batch of IP addresses at once. + + There is no specified limit to the number of IPs this function can + accept; it can handle as much as the user can fit in RAM (along with + all of the response data, which is at least a magnitude larger than the + input list). + + The input list is broken up into batches to abide by API requirements. + The batch size can be adjusted with `batch_size` but is clipped to + `BATCH_MAX_SIZE`. + Defaults to `BATCH_MAX_SIZE`. + + For each batch, `timeout_per_batch` indicates the maximum seconds to + spend waiting for the HTTP request to complete. If any batch fails with + this timeout, the whole operation fails. + Defaults to `BATCH_REQ_TIMEOUT_DEFAULT` seconds. + + `timeout_total` is a seconds-denominated hard-timeout for the time + spent in HTTP operations; regardless of whether all batches have + succeeded so far, if `timeout_total` is reached, the whole operation + will fail by raising `TimeoutExceededError`. + Defaults to being turned off. + + `raise_on_fail`, if turned off, will return any result retrieved so far + rather than raise an exception when errors occur, including timeout and + quota errors. + Defaults to on. + + The concurrency level is currently unadjustable; coroutines will be + created and consumed for all batches at once. + """ + self._ensure_aiohttp_ready() + + if batch_size is None: + batch_size = BATCH_MAX_SIZE + + result = {} + + # Pre-populate with anything we've got in the cache, and keep around + # the IPs not in the cache. + lookup_addresses = [] + for ip_address in ip_addresses: + # If the supplied IP address uses the objects defined in the + # built-in module ipaddress extract the appropriate string notation + # before formatting the URL. + if isinstance(ip_address, IPv4Address) or isinstance( + ip_address, IPv6Address + ): + ip_address = ip_address.exploded + + if ip_address and is_bogon(ip_address): + details = {} + details["ip"] = ip_address + details["bogon"] = True + result[ip_address] = Details(details) + else: + try: + cached_data = self.cache[cache_key(ip_address)] + result[ip_address] = Details(cached_data) + except KeyError: + lookup_addresses.append(ip_address) + + # all in cache - return early. + if not lookup_addresses: + return result + + # loop over batch chunks and prepare coroutines for each. + url = "https://api.ipinfo.io/batch" + headers = handler_utils.get_headers(self.access_token, self.headers) + headers["content-type"] = "application/json" + + # prepare tasks that will make reqs and update results. + tasks = [ + asyncio.create_task( + self._do_batch_req( + lookup_addresses[i : i + batch_size], + url, + headers, + timeout_per_batch, + raise_on_fail, + result, + ) + ) + for i in range(0, len(lookup_addresses), batch_size) + ] + + try: + _, pending = await asyncio.wait( + tasks, + timeout=timeout_total, + return_when=asyncio.FIRST_EXCEPTION, + ) + + # if all done, return result. + if not pending: + return result + + # if some had a timeout, first cancel timed out stuff and wait for + # cleanup. then exit with return_or_fail. + for co in pending: + try: + co.cancel() + await co + except asyncio.CancelledError: + pass + + return handler_utils.return_or_fail( + raise_on_fail, TimeoutExceededError(), result + ) + except Exception as e: + return handler_utils.return_or_fail(raise_on_fail, e, result) + + return result + + async def _do_batch_req( + self, chunk, url, headers, timeout_per_batch, raise_on_fail, result + ): + """ + Coroutine which will do the actual POST request for getBatchDetails. + """ + try: + resp = await self.httpsess.post( + url, + data=json.dumps(chunk), + headers=headers, + timeout=timeout_per_batch, + ) + except Exception as e: + return handler_utils.return_or_fail(raise_on_fail, e, None) + + # gather data + try: + if resp.status == 429: + raise RequestQuotaExceededError() + resp.raise_for_status() + except Exception as e: + return handler_utils.return_or_fail(raise_on_fail, e, None) + + json_resp = await resp.json() + + # format & fill up cache + for ip_address, data in json_resp.items(): + if isinstance(data, dict) and not data.get("bogon"): + self._format_core_details(data) + self.cache[cache_key(ip_address)] = data + result[ip_address] = Details(data) + + def _ensure_aiohttp_ready(self): + """Ensures aiohttp internal state is initialized.""" + if self.httpsess: + return + + timeout = aiohttp.ClientTimeout(total=self.request_options["timeout"]) + self.httpsess = aiohttp.ClientSession(timeout=timeout) diff --git a/ipinfo/handler_utils.py b/ipinfo/handler_utils.py index 971003a..7a34bbf 100644 --- a/ipinfo/handler_utils.py +++ b/ipinfo/handler_utils.py @@ -15,6 +15,9 @@ # Base URL for the IPinfo Lite API LITE_API_URL = "https://api.ipinfo.io/lite" +# Base URL for the IPinfo Core API +CORE_API_URL = "https://api.ipinfo.io/lookup" + # Base URL to get country flag image link. # "PK" -> "https://cdn.ipinfo.io/static/images/countries-flags/PK.svg" COUNTRY_FLAGS_URL = "https://cdn.ipinfo.io/static/images/countries-flags/" diff --git a/tests/handler_core_async_test.py b/tests/handler_core_async_test.py new file mode 100644 index 0000000..53090c6 --- /dev/null +++ b/tests/handler_core_async_test.py @@ -0,0 +1,206 @@ +import os + +import pytest + +from ipinfo import handler_utils +from ipinfo.cache.default import DefaultCache +from ipinfo.details import Details +from ipinfo.handler_core_async import AsyncHandlerCore + + +@pytest.mark.asyncio +async def test_init(): + token = "mytesttoken" + handler = AsyncHandlerCore(token) + assert handler.access_token == token + assert isinstance(handler.cache, DefaultCache) + assert "US" in handler.countries + await handler.deinit() + + +@pytest.mark.asyncio +async def test_headers(): + token = "mytesttoken" + handler = AsyncHandlerCore(token, headers={"custom_field": "yes"}) + headers = handler_utils.get_headers(token, handler.headers) + await handler.deinit() + + assert "user-agent" in headers + assert "accept" in headers + assert "authorization" in headers + assert "custom_field" in headers + + +@pytest.mark.skipif( + "IPINFO_TOKEN" not in os.environ, + reason="Can't call Core API without token", +) +@pytest.mark.asyncio +async def test_get_details(): + """Test basic Core API lookup""" + token = os.environ.get("IPINFO_TOKEN", "") + handler = AsyncHandlerCore(token) + details = await handler.getDetails("8.8.8.8") + + # Should return Details object + assert isinstance(details, Details) + assert details.ip == "8.8.8.8" + + # Check nested geo object exists + assert hasattr(details, "geo") + assert isinstance(details.geo, dict) + assert "city" in details.geo + assert "country_code" in details.geo + assert "latitude" in details.geo + assert "longitude" in details.geo + + # Check nested as object exists (use .all to access since 'as' is a keyword) + assert "as" in details.all + as_obj = details.all["as"] + assert isinstance(as_obj, dict) + assert "asn" in as_obj + assert "name" in as_obj + + # Check network flags + assert hasattr(details, "is_hosting") + assert hasattr(details, "is_anycast") + + # Check geo formatting was applied + assert "country_name" in details.geo + assert "isEU" in details.geo + assert "country_flag_url" in details.geo + + await handler.deinit() + + +############# +# BOGON TESTS +############# + + +@pytest.mark.skipif( + "IPINFO_TOKEN" not in os.environ, + reason="Can't call Core API without token", +) +@pytest.mark.asyncio +async def test_bogon_details(): + token = os.environ.get("IPINFO_TOKEN", "") + handler = AsyncHandlerCore(token) + details = await handler.getDetails("127.0.0.1") + assert isinstance(details, Details) + assert details.all == {"bogon": True, "ip": "127.0.0.1"} + await handler.deinit() + + +##################### +# BATCH TESTS +##################### + + +@pytest.mark.skipif( + "IPINFO_TOKEN" not in os.environ, + reason="Can't call Core API without token", +) +@pytest.mark.asyncio +async def test_batch_ips(): + """Test batch request with IPs""" + token = os.environ.get("IPINFO_TOKEN", "") + handler = AsyncHandlerCore(token) + results = await handler.getBatchDetails(["8.8.8.8", "1.1.1.1"]) + + assert len(results) == 2 + assert "8.8.8.8" in results + assert "1.1.1.1" in results + + # Both should be Details objects + assert isinstance(results["8.8.8.8"], Details) + assert isinstance(results["1.1.1.1"], Details) + + # Check structure - Core API returns nested geo and as objects + assert hasattr(results["8.8.8.8"], "geo") + assert "as" in results["8.8.8.8"].all + + await handler.deinit() + + +@pytest.mark.skipif( + "IPINFO_TOKEN" not in os.environ, + reason="Can't call Core API without token", +) +@pytest.mark.asyncio +async def test_batch_with_bogon(): + """Test batch including bogon IPs""" + token = os.environ.get("IPINFO_TOKEN", "") + handler = AsyncHandlerCore(token) + results = await handler.getBatchDetails( + [ + "8.8.8.8", + "127.0.0.1", # Bogon + "1.1.1.1", + ] + ) + + assert len(results) == 3 + + # Normal IPs should be Details + assert isinstance(results["8.8.8.8"], Details) + assert isinstance(results["1.1.1.1"], Details) + + # Bogon should also be Details with bogon flag + assert isinstance(results["127.0.0.1"], Details) + assert results["127.0.0.1"].bogon == True + + await handler.deinit() + + +##################### +# CACHING TESTS +##################### + + +@pytest.mark.skipif( + "IPINFO_TOKEN" not in os.environ, + reason="Can't call Core API without token", +) +@pytest.mark.asyncio +async def test_caching(): + """Test that results are properly cached""" + token = os.environ.get("IPINFO_TOKEN", "") + handler = AsyncHandlerCore(token) + + # First request - should hit API + details1 = await handler.getDetails("8.8.8.8") + assert isinstance(details1, Details) + + # Second request - should come from cache + details2 = await handler.getDetails("8.8.8.8") + assert isinstance(details2, Details) + assert details2.ip == details1.ip + + # Verify cache key exists + cache_key_val = handler_utils.cache_key("8.8.8.8") + assert cache_key_val in handler.cache + + await handler.deinit() + + +@pytest.mark.skipif( + "IPINFO_TOKEN" not in os.environ, + reason="Can't call Core API without token", +) +@pytest.mark.asyncio +async def test_batch_caching(): + """Test that batch results are properly cached""" + token = os.environ.get("IPINFO_TOKEN", "") + handler = AsyncHandlerCore(token) + + # First batch request + results1 = await handler.getBatchDetails(["8.8.8.8", "1.1.1.1"]) + assert len(results1) == 2 + + # Second batch with same IPs (should come from cache) + results2 = await handler.getBatchDetails(["8.8.8.8", "1.1.1.1"]) + assert len(results2) == 2 + assert results2["8.8.8.8"].ip == results1["8.8.8.8"].ip + + await handler.deinit() diff --git a/tests/handler_core_test.py b/tests/handler_core_test.py new file mode 100644 index 0000000..4a90ff1 --- /dev/null +++ b/tests/handler_core_test.py @@ -0,0 +1,185 @@ +import os + +import pytest + +from ipinfo import handler_utils +from ipinfo.cache.default import DefaultCache +from ipinfo.details import Details +from ipinfo.handler_core import HandlerCore + + +def test_init(): + token = "mytesttoken" + handler = HandlerCore(token) + assert handler.access_token == token + assert isinstance(handler.cache, DefaultCache) + assert "US" in handler.countries + + +def test_headers(): + token = "mytesttoken" + handler = HandlerCore(token, headers={"custom_field": "yes"}) + headers = handler_utils.get_headers(token, handler.headers) + + assert "user-agent" in headers + assert "accept" in headers + assert "authorization" in headers + assert "custom_field" in headers + + +@pytest.mark.skipif( + "IPINFO_TOKEN" not in os.environ, + reason="Can't call Core API without token", +) +def test_get_details(): + """Test basic Core API lookup""" + token = os.environ.get("IPINFO_TOKEN", "") + handler = HandlerCore(token) + details = handler.getDetails("8.8.8.8") + + # Should return Details object + assert isinstance(details, Details) + assert details.ip == "8.8.8.8" + + # Check nested geo object exists + assert hasattr(details, "geo") + assert isinstance(details.geo, dict) + assert "city" in details.geo + assert "country_code" in details.geo + assert "latitude" in details.geo + assert "longitude" in details.geo + + # Check nested as object exists (use .all to access since 'as' is a keyword) + assert "as" in details.all + as_obj = details.all["as"] + assert isinstance(as_obj, dict) + assert "asn" in as_obj + assert "name" in as_obj + + # Check network flags + assert hasattr(details, "is_hosting") + assert hasattr(details, "is_anycast") + + # Check geo formatting was applied + assert "country_name" in details.geo + assert "isEU" in details.geo + assert "country_flag_url" in details.geo + + +############# +# BOGON TESTS +############# + + +@pytest.mark.skipif( + "IPINFO_TOKEN" not in os.environ, + reason="Can't call Core API without token", +) +def test_bogon_details(): + token = os.environ.get("IPINFO_TOKEN", "") + handler = HandlerCore(token) + details = handler.getDetails("127.0.0.1") + assert isinstance(details, Details) + assert details.all == {"bogon": True, "ip": "127.0.0.1"} + + +##################### +# BATCH TESTS +##################### + + +@pytest.mark.skipif( + "IPINFO_TOKEN" not in os.environ, + reason="Can't call Core API without token", +) +def test_batch_ips(): + """Test batch request with IPs""" + token = os.environ.get("IPINFO_TOKEN", "") + handler = HandlerCore(token) + results = handler.getBatchDetails(["8.8.8.8", "1.1.1.1"]) + + assert len(results) == 2 + assert "8.8.8.8" in results + assert "1.1.1.1" in results + + # Both should be Details objects + assert isinstance(results["8.8.8.8"], Details) + assert isinstance(results["1.1.1.1"], Details) + + # Check structure - Core API returns nested geo and as objects + assert hasattr(results["8.8.8.8"], "geo") + assert "as" in results["8.8.8.8"].all + + +@pytest.mark.skipif( + "IPINFO_TOKEN" not in os.environ, + reason="Can't call Core API without token", +) +def test_batch_with_bogon(): + """Test batch including bogon IPs""" + token = os.environ.get("IPINFO_TOKEN", "") + handler = HandlerCore(token) + results = handler.getBatchDetails( + [ + "8.8.8.8", + "127.0.0.1", # Bogon + "1.1.1.1", + ] + ) + + assert len(results) == 3 + + # Normal IPs should be Details + assert isinstance(results["8.8.8.8"], Details) + assert isinstance(results["1.1.1.1"], Details) + + # Bogon should also be Details with bogon flag + assert isinstance(results["127.0.0.1"], Details) + assert results["127.0.0.1"].bogon == True + + +##################### +# CACHING TESTS +##################### + + +@pytest.mark.skipif( + "IPINFO_TOKEN" not in os.environ, + reason="Can't call Core API without token", +) +def test_caching(): + """Test that results are properly cached""" + token = os.environ.get("IPINFO_TOKEN", "") + handler = HandlerCore(token) + + # First request - should hit API + details1 = handler.getDetails("8.8.8.8") + assert isinstance(details1, Details) + + # Second request - should come from cache + details2 = handler.getDetails("8.8.8.8") + assert isinstance(details2, Details) + assert details2.ip == details1.ip + + # Verify cache key exists + cache_key_val = handler_utils.cache_key("8.8.8.8") + assert cache_key_val in handler.cache + + +@pytest.mark.skipif( + "IPINFO_TOKEN" not in os.environ, + reason="Can't call Core API without token", +) +def test_batch_caching(): + """Test that batch results are properly cached""" + token = os.environ.get("IPINFO_TOKEN", "") + handler = HandlerCore(token) + + # First batch request + results1 = handler.getBatchDetails(["8.8.8.8", "1.1.1.1"]) + assert len(results1) == 2 + + # Second batch with same IPs (should come from cache) + results2 = handler.getBatchDetails(["8.8.8.8", "1.1.1.1"]) + assert len(results2) == 2 + assert results2["8.8.8.8"].ip == results1["8.8.8.8"].ip