diff --git a/.github/workflows/python.yml b/.github/workflows/python.yml index f60f54c..bbbb073 100644 --- a/.github/workflows/python.yml +++ b/.github/workflows/python.yml @@ -7,7 +7,7 @@ jobs: runs-on: ubuntu-20.04 strategy: matrix: - python-version: ["3.6", "3.7", "3.8", "3.9", "3.10"] + python-version: ["3.9", "3.10", "3.11"] steps: - uses: actions/checkout@v2 diff --git a/domaintools/api.py b/domaintools/api.py index 8987542..939dbee 100644 --- a/domaintools/api.py +++ b/domaintools/api.py @@ -4,7 +4,13 @@ import re from domaintools._version import current as version -from domaintools.results import GroupedIterable, ParsedWhois, Reputation, Results +from domaintools.results import ( + GroupedIterable, + ParsedWhois, + ParsedDomainRdap, + Reputation, + Results, +) from domaintools.filters import ( filter_by_riskscore, filter_by_expire_date, @@ -77,16 +83,12 @@ def __init__( self._build_api_url(api_url, api_port) if not https: - raise Exception( - "The DomainTools API endpoints no longer support http traffic. Please make sure https=True." - ) + raise Exception("The DomainTools API endpoints no longer support http traffic. Please make sure https=True.") if proxy_url: if isinstance(proxy_url, str): self.proxy_url = {"http://": proxy_url, "https://": proxy_url} else: - raise Exception( - "Proxy URL must be a string. For example: '127.0.0.1:8888'" - ) + raise Exception("Proxy URL must be a string. For example: '127.0.0.1:8888'") def _build_api_url(self, api_url=None, api_port=None): """Build the API url based on the given url and port. Defaults to `https://api.domaintools.com`""" @@ -110,31 +112,18 @@ def _rate_limit(self): hours = limit_hours and 3600 / float(limit_hours) minutes = limit_minutes and 60 / float(limit_minutes) - self.limits[product["id"]] = { - "interval": timedelta(seconds=minutes or hours or default) - } + self.limits[product["id"]] = {"interval": timedelta(seconds=minutes or hours or default)} def _results(self, product, path, cls=Results, **kwargs): """Returns _results for the specified API path with the specified **kwargs parameters""" - if ( - product != "account-information" - and self.rate_limit - and not self.limits_set - and not self.limits - ): + if product != "account-information" and self.rate_limit and not self.limits_set and not self.limits: self._rate_limit() uri = "/".join((self._rest_api_url, path.lstrip("/"))) parameters = self.default_parameters.copy() parameters["api_username"] = self.username self.handle_api_key(path, parameters) - parameters.update( - { - key: str(value).lower() if value in (True, False) else value - for key, value in kwargs.items() - if value is not None - } - ) + parameters.update({key: str(value).lower() if value in (True, False) else value for key, value in kwargs.items() if value is not None}) return cls(self, product, uri, **parameters) @@ -147,14 +136,10 @@ def handle_api_key(self, path, parameters): else: raise ValueError( "Invalid value '{0}' for 'key_sign_hash'. " - "Values available are {1}".format( - self.key_sign_hash, ",".join(AVAILABLE_KEY_SIGN_HASHES) - ) + "Values available are {1}".format(self.key_sign_hash, ",".join(AVAILABLE_KEY_SIGN_HASHES)) ) - parameters["timestamp"] = datetime.now(timezone.utc).strftime( - "%Y-%m-%dT%H:%M:%SZ" - ) + parameters["timestamp"] = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") parameters["signature"] = hmac( self.key.encode("utf8"), "".join([self.username, parameters["timestamp"], path]).encode("utf8"), @@ -163,9 +148,7 @@ def handle_api_key(self, path, parameters): def account_information(self, **kwargs): """Provides a snapshot of your accounts current API usage""" - return self._results( - "account-information", "/v1/account", items_path=("products",), **kwargs - ) + return self._results("account-information", "/v1/account", items_path=("products",), **kwargs) def available_api_calls(self): """Provides a list of api calls that you can use based on your account information.""" @@ -180,25 +163,10 @@ def snakecase(string): string[1:], ) - api_calls = tuple( - ( - api_call - for api_call in dir(API) - if not api_call.startswith("_") - and callable(getattr(API, api_call, None)) - ) - ) - return sorted( - [ - snakecase(p["id"]) - for p in self.account_information()["products"] - if snakecase(p["id"]) in api_calls - ] - ) + api_calls = tuple((api_call for api_call in dir(API) if not api_call.startswith("_") and callable(getattr(API, api_call, None)))) + return sorted([snakecase(p["id"]) for p in self.account_information()["products"] if snakecase(p["id"]) in api_calls]) - def brand_monitor( - self, query, exclude=None, domain_status=None, days_back=None, **kwargs - ): + def brand_monitor(self, query, exclude=None, domain_status=None, days_back=None, **kwargs): """Pass in one or more terms as a list or separated by the pipe character ( | )""" if exclude is None: exclude = [] @@ -324,9 +292,16 @@ def parsed_whois(self, query, **kwargs): **kwargs, ) - def registrant_monitor( - self, query, exclude=None, days_back=0, limit=None, **kwargs - ): + def parsed_domain_rdap(self, query, **kwargs): + """Pass in a domain name to see the most recent Domain-RDAP registration record""" + return self._results( + "parsed-domain-rdap", + "/v1/{0}/rdap/parsed/".format(query), + cls=ParsedDomainRdap, + **kwargs, + ) + + def registrant_monitor(self, query, exclude=None, days_back=0, limit=None, **kwargs): """One or more terms as a Python list or separated by the pipe character ( | ).""" if exclude is None: exclude = [] @@ -354,15 +329,11 @@ def reputation(self, query, include_reasons=False, **kwargs): def reverse_ip(self, domain=None, limit=None, **kwargs): """Pass in a domain name.""" - return self._results( - "reverse-ip", "/v1/{0}/reverse-ip".format(domain), limit=limit, **kwargs - ) + return self._results("reverse-ip", "/v1/{0}/reverse-ip".format(domain), limit=limit, **kwargs) def host_domains(self, ip=None, limit=None, **kwargs): """Pass in an IP address.""" - return self._results( - "reverse-ip", "/v1/{0}/host-domains".format(ip), limit=limit, **kwargs - ) + return self._results("reverse-ip", "/v1/{0}/host-domains".format(ip), limit=limit, **kwargs) def reverse_ip_whois( self, @@ -401,9 +372,7 @@ def reverse_name_server(self, query, limit=None, **kwargs): **kwargs, ) - def reverse_whois( - self, query, exclude=None, scope="current", mode="purchase", **kwargs - ): + def reverse_whois(self, query, exclude=None, scope="current", mode="purchase", **kwargs): """List of one or more terms to search for in the Whois record, as a Python list or separated with the pipe character ( | ). """ @@ -423,9 +392,7 @@ def whois(self, query, **kwargs): """Pass in a domain name or an IP address to perform a whois lookup.""" return self._results("whois", "/v1/{0}/whois".format(query), **kwargs) - def whois_history( - self, query, mode=None, sort=None, offset=None, limit=None, **kwargs - ): + def whois_history(self, query, mode=None, sort=None, offset=None, limit=None, **kwargs): """Pass in a domain name.""" return self._results( "whois-history", @@ -484,16 +451,7 @@ def iris( """Performs a search for the provided search terms ANDed together, returning the pivot engine row data for the resulting domains. """ - if ( - not domain - and not ip - and not email - and not nameserver - and not registrar - and not registrant - and not registrant_org - and not kwargs - ): + if not domain and not ip and not email and not nameserver and not registrar and not registrant and not registrant_org and not kwargs: raise ValueError("At least one search term must be specified") return self._results( @@ -568,12 +526,8 @@ def iris_enrich(self, *domains, **kwargs): younger_than_date = kwargs.pop("younger_than_date", {}) or None older_than_date = kwargs.pop("older_than_date", {}) or None updated_after = kwargs.pop("updated_after", {}) or None - include_domains_with_missing_field = ( - kwargs.pop("include_domains_with_missing_field", {}) or None - ) - exclude_domains_with_missing_field = ( - kwargs.pop("exclude_domains_with_missing_field", {}) or None - ) + include_domains_with_missing_field = kwargs.pop("include_domains_with_missing_field", {}) or None + exclude_domains_with_missing_field = kwargs.pop("exclude_domains_with_missing_field", {}) or None filtered_results = DTResultFilter(result_set=results).by( [ @@ -581,12 +535,8 @@ def iris_enrich(self, *domains, **kwargs): filter_by_expire_date(date=younger_than_date, lookup_type="before"), filter_by_expire_date(date=older_than_date, lookup_type="after"), filter_by_date_updated_after(date=updated_after), - filter_by_field( - field=include_domains_with_missing_field, filter_type="include" - ), - filter_by_field( - field=exclude_domains_with_missing_field, filter_type="exclude" - ), + filter_by_field(field=include_domains_with_missing_field, filter_type="include"), + filter_by_field(field=exclude_domains_with_missing_field, filter_type="exclude"), ] ) @@ -691,9 +641,7 @@ def iris_investigate( kwargs["search_hash"] = search_hash if not (kwargs or domains): - raise ValueError( - "Need to define investigation using kwarg filters or domains" - ) + raise ValueError("Need to define investigation using kwarg filters or domains") if isinstance(domains, (list, tuple)): domains = ",".join(domains) @@ -723,12 +671,8 @@ def iris_investigate( filter_by_expire_date(date=younger_than_date, lookup_type="before"), filter_by_expire_date(date=older_than_date, lookup_type="after"), filter_by_date_updated_after(date=updated_after), - filter_by_field( - field=include_domains_with_missing_field, filter_type="include" - ), - filter_by_field( - field=exclude_domains_with_missing_field, filter_type="exclude" - ), + filter_by_field(field=include_domains_with_missing_field, filter_type="include"), + filter_by_field(field=exclude_domains_with_missing_field, filter_type="exclude"), ] ) @@ -768,9 +712,7 @@ def iris_detect_monitors( if include_counts: if not datetime_counts_since: - raise ValueError( - "Need to define datetime_counts_since when include_counts is True" - ) + raise ValueError("Need to define datetime_counts_since when include_counts is True") if isinstance(datetime_counts_since, datetime): datetime_counts_since = str(datetime_counts_since.astimezone()) elif isinstance(datetime_counts_since, str): @@ -978,9 +920,7 @@ def iris_detect_watched_domains( **kwargs, ) - def iris_detect_manage_watchlist_domains( - self, watchlist_domain_ids, state, **kwargs - ): + def iris_detect_manage_watchlist_domains(self, watchlist_domain_ids, state, **kwargs): """Changes the watch state of a list of domains by their Iris Detect domain ID. watchlist_domain_ids: List[str]: required. List of Iris Detect domain IDs to manage. @@ -999,9 +939,7 @@ def iris_detect_manage_watchlist_domains( **kwargs, ) - def iris_detect_escalate_domains( - self, watchlist_domain_ids, escalation_type, **kwargs - ): + def iris_detect_escalate_domains(self, watchlist_domain_ids, escalation_type, **kwargs): """Changes the escalation type of a list of domains by their Iris Detect domain ID. watchlist_domain_ids: List[str]: required. List of Iris Detect domain IDs to escalate. diff --git a/domaintools/base_results.py b/domaintools/base_results.py index 7674f69..82ad533 100644 --- a/domaintools/base_results.py +++ b/domaintools/base_results.py @@ -68,9 +68,7 @@ def _wait_time(self): wait_for = 0 if now < safe_after: wait_for = safe_after - now - wait_for = float(wait_for.seconds) + ( - float(wait_for.microseconds) / 1000000.0 - ) + wait_for = float(wait_for.seconds) + (float(wait_for.microseconds) / 1000000.0) limit["last_scheduled"] = safe_after else: limit["last_scheduled"] = now @@ -79,9 +77,7 @@ def _wait_time(self): def _make_request(self): - with Client( - verify=self.api.verify_ssl, proxies=self.api.proxy_url, timeout=None - ) as session: + with Client(verify=self.api.verify_ssl, proxy=self.api.proxy_url, timeout=None) as session: if self.product in [ "iris-investigate", "iris-enrich", @@ -95,15 +91,11 @@ def _make_request(self): patch_data.update(self.api.extra_request_params) return session.patch(url=self.url, json=patch_data) else: - return session.get( - url=self.url, params=self.kwargs, **self.api.extra_request_params - ) + return session.get(url=self.url, params=self.kwargs, **self.api.extra_request_params) def _get_results(self): wait_for = self._wait_time() - if self.api.rate_limit and ( - wait_for is None or self.product == "account-information" - ): + if self.api.rate_limit and (wait_for is None or self.product == "account-information"): data = self._make_request() if data.status_code == 503: # pragma: no cover sleeptime = 60 @@ -118,9 +110,7 @@ def _get_results(self): return data if wait_for > 0: - log.info( - "Sleeping for [%s] prior to requesting [%s].", wait_for, self.product - ) + log.info("Sleeping for [%s] prior to requesting [%s].", wait_for, self.product) time.sleep(wait_for) return self._make_request() @@ -143,19 +133,13 @@ def data(self): self._limit_exceeded_message = message if self._limit_exceeded is True: - raise ServiceException( - 503, "Limit Exceeded{}".format(self._limit_exceeded_message) - ) + raise ServiceException(503, "Limit Exceeded{}".format(self._limit_exceeded_message)) else: return self._data def check_limit_exceeded(self): if self.kwargs.get("format", "json") == "json": - if ( - "response" in self._data - and "limit_exceeded" in self._data["response"] - and self._data["response"]["limit_exceeded"] is True - ): + if "response" in self._data and "limit_exceeded" in self._data["response"] and self._data["response"]["limit_exceeded"] is True: return True, self._data["response"]["message"] # TODO: handle html, xml response errors better. elif "response" in self._data and "limit_exceeded" in self._data: @@ -302,16 +286,7 @@ def html(self): ) def as_list(self): - return "\n".join( - [ - json.dumps(item, indent=4, separators=(",", ": ")) - for item in self._items() - ] - ) + return "\n".join([json.dumps(item, indent=4, separators=(",", ": ")) for item in self._items()]) def __str__(self): - return str( - json.dumps(self.data(), indent=4, separators=(",", ": ")) - if self.kwargs.get("format", "json") == "json" - else self.data() - ) + return str(json.dumps(self.data(), indent=4, separators=(",", ": ")) if self.kwargs.get("format", "json") == "json" else self.data()) diff --git a/domaintools/cli/commands/domains.py b/domaintools/cli/commands/domains.py index 0add785..6ba401a 100644 --- a/domaintools/cli/commands/domains.py +++ b/domaintools/cli/commands/domains.py @@ -16,9 +16,7 @@ def brand_monitor( ctx: typer.Context, query: str = typer.Option(..., "-q", "--query", help="The query to use."), exclude: str = typer.Option(None, "--exclude", help="The exclude condition."), - domain_status: str = typer.Option( - None, "--domain-status", help="The domain status." - ), + domain_status: str = typer.Option(None, "--domain-status", help="The domain status."), days_back: int = typer.Option(None, "--days-back", help="The days back to check."), user: str = typer.Option(None, "-u", "--user", help="Domaintools API Username."), key: str = typer.Option(None, "-k", "--key", help="DomainTools API key"), @@ -41,9 +39,7 @@ def brand_monitor( help="Output format in {'list', 'json', 'xml', 'html'}", callback=DTCLICommand.validate_format_input, ), - out_file: typer.FileTextWrite = typer.Option( - sys.stdout, "-o", "--out-file", help="Output file (defaults to stdout)" - ), + out_file: typer.FileTextWrite = typer.Option(sys.stdout, "-o", "--out-file", help="Output file (defaults to stdout)"), no_verify_ssl: bool = typer.Option( False, "--no-verify-ssl", @@ -81,9 +77,7 @@ def domain_profile( help="Output format in {'list', 'json', 'xml', 'html'}", callback=DTCLICommand.validate_format_input, ), - out_file: typer.FileTextWrite = typer.Option( - sys.stdout, "-o", "--out-file", help="Output file (defaults to stdout)" - ), + out_file: typer.FileTextWrite = typer.Option(sys.stdout, "-o", "--out-file", help="Output file (defaults to stdout)"), no_verify_ssl: bool = typer.Option( False, "--no-verify-ssl", @@ -100,19 +94,13 @@ def domain_profile( def domain_search( ctx: typer.Context, query: str = typer.Option(..., "-q", "--query", help="The domain name to query."), - exclude_query: str = typer.Option( - None, "--exclude-query", help="The exclusion filter to query." - ), + exclude_query: str = typer.Option(None, "--exclude-query", help="The exclusion filter to query."), max_length: int = typer.Option(25, "--max-length", help="The max length"), min_length: int = typer.Option(2, "--min-length", help="The min length"), has_hyphen: bool = typer.Option(True, "--has-hyphen", help=""), has_number: bool = typer.Option(True, "--has-number", help=""), - active_only: bool = typer.Option( - False, "--active-only", help="Search for active only domains." - ), - deleted_only: bool = typer.Option( - False, "--deleted-only", help="Search for deleted only domains." - ), + active_only: bool = typer.Option(False, "--active-only", help="Search for active only domains."), + deleted_only: bool = typer.Option(False, "--deleted-only", help="Search for deleted only domains."), anchor_left: bool = typer.Option(False, "--achor-left", help=""), anchor_right: bool = typer.Option(False, "--achor-right", help=""), page: int = typer.Option(1, "--page", help="Number of pages to return."), @@ -137,9 +125,7 @@ def domain_search( help="Output format in {'list', 'json', 'xml', 'html'}", callback=DTCLICommand.validate_format_input, ), - out_file: typer.FileTextWrite = typer.Option( - sys.stdout, "-o", "--out-file", help="Output file (defaults to stdout)" - ), + out_file: typer.FileTextWrite = typer.Option(sys.stdout, "-o", "--out-file", help="Output file (defaults to stdout)"), no_verify_ssl: bool = typer.Option( False, "--no-verify-ssl", @@ -177,9 +163,7 @@ def hosting_history( help="Output format in {'list', 'json', 'xml', 'html'}", callback=DTCLICommand.validate_format_input, ), - out_file: typer.FileTextWrite = typer.Option( - sys.stdout, "-o", "--out-file", help="Output file (defaults to stdout)" - ), + out_file: typer.FileTextWrite = typer.Option(sys.stdout, "-o", "--out-file", help="Output file (defaults to stdout)"), no_verify_ssl: bool = typer.Option( True, "--no-verify-ssl", @@ -219,9 +203,7 @@ def name_server_monitor( help="Output format in {'list', 'json', 'xml', 'html'}", callback=DTCLICommand.validate_format_input, ), - out_file: typer.FileTextWrite = typer.Option( - sys.stdout, "-o", "--out-file", help="Output file (defaults to stdout)" - ), + out_file: typer.FileTextWrite = typer.Option(sys.stdout, "-o", "--out-file", help="Output file (defaults to stdout)"), no_verify_ssl: bool = typer.Option( False, "--no-verify-ssl", @@ -259,9 +241,7 @@ def parsed_whois( help="Output format in {'list', 'json', 'xml', 'html'}", callback=DTCLICommand.validate_format_input, ), - out_file: typer.FileTextWrite = typer.Option( - sys.stdout, "-o", "--out-file", help="Output file (defaults to stdout)" - ), + out_file: typer.FileTextWrite = typer.Option(sys.stdout, "-o", "--out-file", help="Output file (defaults to stdout)"), no_verify_ssl: bool = typer.Option( False, "--no-verify-ssl", @@ -271,6 +251,44 @@ def parsed_whois( DTCLICommand.run(name=c.PARSED_WHOIS, params=ctx.params) +@dt_cli.command( + name=c.PARSED_DOMAIN_RDAP, + help=get_cli_helptext_by_name(command_name=c.PARSED_DOMAIN_RDAP), +) +def parsed_domain_rdap( + ctx: typer.Context, + query: str = typer.Option(..., "-q", "--query", help="The domain to query."), + user: str = typer.Option(None, "-u", "--user", help="Domaintools API Username."), + key: str = typer.Option(None, "-k", "--key", help="DomainTools API key"), + creds_file: str = typer.Option( + "~/.dtapi", + "-c", + "--credfile", + help="Optional file with API username and API key, one per line.", + ), + rate_limit: bool = typer.Option( + False, + "-l", + "--rate-limit", + help="Rate limit API calls against the API based on per minute limits.", + ), + format: str = typer.Option( + "json", + "-f", + "--format", + help="Output format in {'list', 'json', 'xml', 'html'}", + callback=DTCLICommand.validate_format_input, + ), + out_file: typer.FileTextWrite = typer.Option(sys.stdout, "-o", "--out-file", help="Output file (defaults to stdout)"), + no_verify_ssl: bool = typer.Option( + False, + "--no-verify-ssl", + help="Skip verification of SSL certificate when making HTTPs API calls", + ), +): + DTCLICommand.run(name=c.PARSED_DOMAIN_RDAP, params=ctx.params) + + @dt_cli.command( name=c.REGISTRANT_MONITOR, help=get_cli_helptext_by_name(command_name=c.REGISTRANT_MONITOR), @@ -306,9 +324,7 @@ def registrant_monitor( help="Output format in {'list', 'json', 'xml', 'html'}", callback=DTCLICommand.validate_format_input, ), - out_file: typer.FileTextWrite = typer.Option( - sys.stdout, "-o", "--out-file", help="Output file (defaults to stdout)" - ), + out_file: typer.FileTextWrite = typer.Option(sys.stdout, "-o", "--out-file", help="Output file (defaults to stdout)"), no_verify_ssl: bool = typer.Option( False, "--no-verify-ssl", @@ -347,9 +363,7 @@ def reputation( help="Output format in {'list', 'json', 'xml', 'html'}", callback=DTCLICommand.validate_format_input, ), - out_file: typer.FileTextWrite = typer.Option( - sys.stdout, "-o", "--out-file", help="Output file (defaults to stdout)" - ), + out_file: typer.FileTextWrite = typer.Option(sys.stdout, "-o", "--out-file", help="Output file (defaults to stdout)"), no_verify_ssl: bool = typer.Option( False, "--no-verify-ssl", @@ -388,9 +402,7 @@ def reverse_ip( help="Output format in {'list', 'json', 'xml', 'html'}", callback=DTCLICommand.validate_format_input, ), - out_file: typer.FileTextWrite = typer.Option( - sys.stdout, "-o", "--out-file", help="Output file (defaults to stdout)" - ), + out_file: typer.FileTextWrite = typer.Option(sys.stdout, "-o", "--out-file", help="Output file (defaults to stdout)"), no_verify_ssl: bool = typer.Option( False, "--no-verify-ssl", @@ -429,9 +441,7 @@ def reverse_nameserver( help="Output format in {'list', 'json', 'xml', 'html'}", callback=DTCLICommand.validate_format_input, ), - out_file: typer.FileTextWrite = typer.Option( - sys.stdout, "-o", "--out-file", help="Output file (defaults to stdout)" - ), + out_file: typer.FileTextWrite = typer.Option(sys.stdout, "-o", "--out-file", help="Output file (defaults to stdout)"), no_verify_ssl: bool = typer.Option( False, "--no-verify-ssl", @@ -454,9 +464,7 @@ def reverse_whois( "--scope", help="Sets the scope of the report to include only current Whois records, or to include both current and historic records.", ), - mode: str = typer.Option( - "purchase", "--mode", help="Values must be purchase (the default) or quote" - ), + mode: str = typer.Option("purchase", "--mode", help="Values must be purchase (the default) or quote"), user: str = typer.Option(None, "-u", "--user", help="Domaintools API Username."), key: str = typer.Option(None, "-k", "--key", help="DomainTools API key"), creds_file: str = typer.Option( @@ -478,9 +486,7 @@ def reverse_whois( help="Output format in {'list', 'json', 'xml', 'html'}", callback=DTCLICommand.validate_format_input, ), - out_file: typer.FileTextWrite = typer.Option( - sys.stdout, "-o", "--out-file", help="Output file (defaults to stdout)" - ), + out_file: typer.FileTextWrite = typer.Option(sys.stdout, "-o", "--out-file", help="Output file (defaults to stdout)"), no_verify_ssl: bool = typer.Option( False, "--no-verify-ssl", @@ -518,9 +524,7 @@ def whois( help="Output format in {'list', 'json', 'xml', 'html'}", callback=DTCLICommand.validate_format_input, ), - out_file: typer.FileTextWrite = typer.Option( - sys.stdout, "-o", "--out-file", help="Output file (defaults to stdout)" - ), + out_file: typer.FileTextWrite = typer.Option(sys.stdout, "-o", "--out-file", help="Output file (defaults to stdout)"), no_verify_ssl: bool = typer.Option( False, "--no-verify-ssl", @@ -552,9 +556,7 @@ def whois_history( "--limit", help="Specify the maximum number of records to retrieve in an API query.", ), - offset: int = typer.Option( - None, "--offset", help="For paginating requests beyond the limit." - ), + offset: int = typer.Option(None, "--offset", help="For paginating requests beyond the limit."), user: str = typer.Option(None, "-u", "--user", help="Domaintools API Username."), key: str = typer.Option(None, "-k", "--key", help="DomainTools API key"), creds_file: str = typer.Option( @@ -576,9 +578,7 @@ def whois_history( help="Output format in {'list', 'json', 'xml', 'html'}", callback=DTCLICommand.validate_format_input, ), - out_file: typer.FileTextWrite = typer.Option( - sys.stdout, "-o", "--out-file", help="Output file (defaults to stdout)" - ), + out_file: typer.FileTextWrite = typer.Option(sys.stdout, "-o", "--out-file", help="Output file (defaults to stdout)"), no_verify_ssl: bool = typer.Option( False, "--no-verify-ssl", @@ -616,9 +616,7 @@ def risk( help="Output format in {'list', 'json', 'xml', 'html'}", callback=DTCLICommand.validate_format_input, ), - out_file: typer.FileTextWrite = typer.Option( - sys.stdout, "-o", "--out-file", help="Output file (defaults to stdout)" - ), + out_file: typer.FileTextWrite = typer.Option(sys.stdout, "-o", "--out-file", help="Output file (defaults to stdout)"), no_verify_ssl: bool = typer.Option( False, "--no-verify-ssl", @@ -656,9 +654,7 @@ def risk_evidence( help="Output format in {'list', 'json', 'xml', 'html'}", callback=DTCLICommand.validate_format_input, ), - out_file: typer.FileTextWrite = typer.Option( - sys.stdout, "-o", "--out-file", help="Output file (defaults to stdout)" - ), + out_file: typer.FileTextWrite = typer.Option(sys.stdout, "-o", "--out-file", help="Output file (defaults to stdout)"), no_verify_ssl: bool = typer.Option( False, "--no-verify-ssl", @@ -674,6 +670,7 @@ def risk_evidence( "domain_search", "name_server_monitor", "parsed_whois", + "parsed_domain_rdap", "registrant_monitor", "reputation", "reverse_ip", diff --git a/domaintools/cli/constants.py b/domaintools/cli/constants.py index c8f7de7..e6d25fe 100644 --- a/domaintools/cli/constants.py +++ b/domaintools/cli/constants.py @@ -8,6 +8,7 @@ DOMAIN_SEARCH = "domain_search" HOSTING_HISTORY = "hosting_history" NAME_SERVER_MONITOR = "name_server_monitor" +PARSED_DOMAIN_RDAP = "parsed_domain_rdap" PARSED_WHOIS = "parsed_whois" REGISTRANT_MONITOR = "registrant_monitor" REPUTATION = "reputation" diff --git a/domaintools/cli/utils.py b/domaintools/cli/utils.py index a2b71e5..44af477 100644 --- a/domaintools/cli/utils.py +++ b/domaintools/cli/utils.py @@ -60,6 +60,7 @@ def _phisheye_termlist(): c.IP_MONITOR: "Pass in the IP Address you wish to query ( i.e. 199.30.228.112 ).", c.IP_REGISTRANT_MONITOR: "Query based on free text query terms", c.NAME_SERVER_MONITOR: "Pass in the hostname of the Name Server you wish to query ( i.e. dynect.net ).", + c.PARSED_DOMAIN_RDAP: "Pass in a domain name to see the most recent Domain-RDAP registration record.", c.PARSED_WHOIS: "Pass in a domain name.", c.REGISTRANT_MONITOR: "One or more terms as a Python list or separated by the pipe character ( | ).", c.REPUTATION: "Pass in a domain name to see its reputation score.", diff --git a/domaintools/results.py b/domaintools/results.py index a8e9a07..88bca47 100644 --- a/domaintools/results.py +++ b/domaintools/results.py @@ -2,24 +2,26 @@ Additionally, defines any custom result objects that may be used to enable more Pythonic interaction with endpoints. """ + from itertools import chain -try: # pragma: no cover +try: # pragma: no cover from collections import OrderedDict -except ImportError: # pragma: no cover +except ImportError: # pragma: no cover from ordereddict import OrderedDict from itertools import zip_longest from domaintools_async import AsyncResults as Results + class Reputation(Results): """Returns the reputation results in a format that can quickly be converted into floats / ints""" def __float__(self): - return float(self['risk_score']) + return float(self["risk_score"]) def __int__(self): - return int(self['risk_score']) + return int(self["risk_score"]) class GroupedIterable(Results): @@ -27,8 +29,9 @@ class GroupedIterable(Results): def _items(self): if self._items_list is None: - self._items_list = chain(*[zip_longest([], value, fillvalue=key) for key, value in self.response().items() - if type(value) in (list, tuple)]) + self._items_list = chain( + *[zip_longest([], value, fillvalue=key) for key, value in self.response().items() if type(value) in (list, tuple)] + ) return self._items_list @@ -38,26 +41,26 @@ class ParsedWhois(Results): def flattened(self): """Returns a flattened version of the parsed whois data""" - parsed = self['parsed_whois'] + parsed = self["parsed_whois"] flat = OrderedDict() - for key in ('domain', 'created_date', 'updated_date', 'expired_date', 'statuses', 'name_servers'): + for key in ("domain", "created_date", "updated_date", "expired_date", "statuses", "name_servers"): if key in parsed: value = parsed[key] - flat[key] = ' | '.join(value) if type(value) in (list, tuple) else value + flat[key] = " | ".join(value) if type(value) in (list, tuple) else value - registrar = parsed.get('registrar', {}) - for key in ('name', 'abuse_contact_phone', 'abuse_contact_email', 'iana_id', 'url', 'whois_server'): + registrar = parsed.get("registrar", {}) + for key in ("name", "abuse_contact_phone", "abuse_contact_email", "iana_id", "url", "whois_server"): if key in registrar: - flat['registrar_{0}'.format(key)] = registrar[key] + flat["registrar_{0}".format(key)] = registrar[key] if "networks" in parsed: networks = parsed.get("networks") for network in networks: id = network.get("id") - for key in ('range', 'asn', 'org', 'parent', 'customer', 'country', 'phone', 'status', 'source', 'updated_date', 'created_date'): + for key in ("range", "asn", "org", "parent", "customer", "country", "phone", "status", "source", "updated_date", "created_date"): if key in network: value = network[key] - flat['network_{0}'.format(id)] = ' '.join(value) if type(value) in (list, tuple) else value + flat["network_{0}".format(id)] = " ".join(value) if type(value) in (list, tuple) else value if "contacts" in parsed: contacts = parsed.get("contacts") @@ -65,17 +68,76 @@ def flattened(self): # handle IP-style contacts, which show up as a list for contact in contacts: contact_type = contact.get("type") - for key in ('name', 'email', 'org', 'abuse_mailbos', 'address', 'street', 'city', 'state', 'postal', 'country', 'phone', 'fax'): + for key in ( + "name", + "email", + "org", + "abuse_mailbos", + "address", + "street", + "city", + "state", + "postal", + "country", + "phone", + "fax", + ): if key in contact: value = contact[key] - flat['{0}_{1}'.format(contact_type, key)] = ' '.join(value) if type(value) in (list, tuple) else value + flat["{0}_{1}".format(contact_type, key)] = " ".join(value) if type(value) in (list, tuple) else value elif type(contacts) is dict: - for contact_type in ('registrant', 'admin', 'tech', 'billing'): + for contact_type in ("registrant", "admin", "tech", "billing"): contact = contacts.get(contact_type, {}) - for key in ('name', 'email', 'org', 'street', 'city', 'state', 'postal', 'country', 'phone', 'fax'): + for key in ("name", "email", "org", "street", "city", "state", "postal", "country", "phone", "fax"): if key in contact: value = contact[key] - flat['{0}_{1}'.format(contact_type, key)] = ' '.join(value) if type(value) in (list, tuple) else value + flat["{0}_{1}".format(contact_type, key)] = " ".join(value) if type(value) in (list, tuple) else value + + return flat + + +class ParsedDomainRdap(Results): + """Returns the parsed domain rdap results in a format that can quickly be flattened""" + + def flattened(self): + """Returns a flattened version of the parsed domain rdap data""" + parsed = self["parsed_domain_rdap"] + flat = OrderedDict() + for key in ( + "domain", + "handle", + "domain_statuses", + "creation_date", + "last_changed_date", + "expiration_date", + "dnssec", + "nameservers", + "conformance", + "emails", + "email_domains", + "unclassified_emails", + ): + if key in parsed: + value = parsed[key] + flat[key] = " | ".join(value) if type(value) in (list, tuple) else value + + registrar = parsed.get("registrar", {}) + for registrar_key, registrar_value in registrar.items(): + if registrar_key == "contacts": + for i, contact in enumerate(registrar_value, start=1): + for contact_key, contact_value in contact.items(): + flat[f"registrar_contacts_{contact_key}"] = ( + " | ".join(contact_value) if type(contact_value) in (list, tuple) else contact_value + ) + + continue + flat[f"registrar_{registrar_key}"] = registrar_value + + contacts = parsed.get("contacts") + if contacts: + for i, contact in enumerate(contacts, start=1): + for contact_key, contact_value in contact.items(): + flat[f"contact_{contact_key}_{i}"] = " | ".join(contact_value) if type(contact_value) in (list, tuple) else contact_value return flat diff --git a/domaintools_async/__init__.py b/domaintools_async/__init__.py index e28eec4..d852457 100644 --- a/domaintools_async/__init__.py +++ b/domaintools_async/__init__.py @@ -1,4 +1,5 @@ """Adds async capabilities to the base product object""" + import asyncio from httpx import AsyncClient @@ -6,9 +7,14 @@ from domaintools.exceptions import ServiceUnavailableException, ServiceException + class _AIter(object): """A wrapper to wrap an AsyncResults as an async iterable""" - __slots__ = ('results', 'iterator', ) + + __slots__ = ( + "results", + "iterator", + ) def __init__(self, results): self.results = results @@ -35,11 +41,11 @@ def __await__(self): return self.__awaitable__().__await__() async def _make_async_request(self, session): - if self.product in ['iris-investigate', 'iris-enrich', 'iris-detect-escalate-domains']: + if self.product in ["iris-investigate", "iris-enrich", "iris-detect-escalate-domains"]: post_data = self.kwargs.copy() post_data.update(self.api.extra_request_params) results = await session.post(url=self.url, data=post_data) - elif self.product in ['iris-detect-manage-watchlist-domains']: + elif self.product in ["iris-detect-manage-watchlist-domains"]: patch_data = self.kwargs.copy() patch_data.update(self.api.extra_request_params) results = await session.patch(url=self.url, json=patch_data) @@ -47,7 +53,7 @@ async def _make_async_request(self, session): results = await session.get(url=self.url, params=self.kwargs, **self.api.extra_request_params) if results: self.setStatus(results.status_code, results) - if self.kwargs.get('format', 'json') == 'json': + if self.kwargs.get("format", "json") == "json": self._data = results.json() else: self._data = results.text() @@ -60,7 +66,7 @@ async def _make_async_request(self, session): async def __awaitable__(self): if self._data is None: - async with AsyncClient(verify=self.api.verify_ssl, proxies=self.api.proxy_url, timeout=None) as session: + async with AsyncClient(verify=self.api.verify_ssl, proxy=self.api.proxy_url, timeout=None) as session: wait_time = self._wait_time() if wait_time is None and self.api: try: diff --git a/pyproject.toml b/pyproject.toml index 05af593..4436d00 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "setuptools.build_meta" [project] name = "domaintools_api" dependencies = ["httpx", "rich", "typer"] -requires-python = ">= 3.6" +requires-python = ">= 3.9" authors = [{ name = "DomainTools", email = "integrations@domaintools.com" }] description = "DomainTools Official Python API" readme = "README.md" @@ -18,11 +18,6 @@ classifiers = [ "Environment :: Console", "License :: OSI Approved :: MIT License", "Programming Language :: Python", - "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3 :: Only", - "Programming Language :: Python :: 3.6", - "Programming Language :: Python :: 3.7", - "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Topic :: Software Development :: Libraries", diff --git a/tests/fixtures/vcr/test_parsed_domain_rdap.yaml b/tests/fixtures/vcr/test_parsed_domain_rdap.yaml new file mode 100644 index 0000000..65fb0d4 --- /dev/null +++ b/tests/fixtures/vcr/test_parsed_domain_rdap.yaml @@ -0,0 +1,90 @@ +interactions: +- request: + body: '' + headers: + accept: + - '*/*' + accept-encoding: + - gzip, deflate + connection: + - keep-alive + host: + - api.domaintools.com + user-agent: + - python-httpx/0.28.1 + method: GET + uri: https://api.domaintools.com/v1/google.com/rdap/parsed/?app_name=python_wrapper&app_version=2.1.0 + response: + body: + string: !!binary | + H4sIAAAAAAAAA+0baW/bOvKvEPrUoj4k27m8X9br9BUBchQ5CuxrAoORaJuvEqklKbt+Qf77DqnD + pmzFduJk20WAXJaGc3MODvPgCCJjziRxug9OwCNM2UAEOHa63x8cyRPhwxsAGlGpxMypOYpGRCoc + AYTTcluduufV3f1r76jr7ne9zp8AElL2A96OlYplt9nU6BoTIqikI9bwedTU3xOvmZJrjjgfhUS/ + gLWC+FwEsPrhNnHdVovf/0V81Q+xlOc4IunDbvorXZ/+XUt/jTELQhuq5bUP97zO4PjirHdyPuhf + nNW/XV6dW8vCYLyE/cvFxZfTzw2At0FBOJkBfs+YnOAwsRenst/eNuFrSX54lv6cePAjk+K2WUFQ + kNBCLUk4tGUWZPhaxNUstgXDcRxSHyvKWYb901+SZ0Z4rG2ukAiLHxFnVHGRsaUfP0Mf8BkrEmyp + kmeT31Ijd9ky2DIqKfwm/e2HlDCFAhISRVAs+Jje07IoGZASmMkhEWvAkjjA1bgkEeAKawhmQOsI + ZmAVBHOxgSeqKClvmKd2tVkyW7urj1q2V/CQlNSbBS0sbJbi5B7sdRKUeVoy7EnvvIcucyTo5Ngi + SAPN6JASsZqvwvITH4ugJwSe2eyZ5xnK7/kzIiTNfaf28Jj7HPmpLOKdhpsLla0crl90Bi5/lro8 + OmF+I8Nwt1tbrTAEvk8ksY3wyyiltELlwaVW5RUTTn2Sh7v0dyKoHSJI2P3kNVru4T5knoO9MlsE + Iky4ljOjNQhNcQjxSMl/liNWbr3Hu8LVyAQMUrafedjzVaHCIm6mnj1/sYjlGHa1Be4dHR3U3aO6 + t3ftdrquC19/lqJ+FSXyM6Yb04Ga4tDQ6WxNB1xUIR+CxagUq6ooeUeaknt07e1121DAdLailIU+ + PkSXx72vCD7ge5w7+joZ7bppTrnIF8RPBDk+v8oWZgzp0D0yqryCfJ6L2R3iUJLckxjs0jQ6b7OV + 56ssAVZVRudXXqOcHQuV7YxG6w1otN+ARmeZRm5kXSz0ORtyEWGWR5UifcHLQQgOFA5ciwwUGywt + 0geK+GMGn8MBhThBInA24xyDUQL5qXpdXvQPIG8PaVhA5owxriDKLWVIqkpZ+JqISOodcFPy+4BI + X9B4YeNkUl2B2gA1konRL1IcLSJp2HysKrafrO2m02m52k0LurqxmS7QFsJeXe/ZOvZBVFkHVSju + 8xBglOaoLlNW4TNlAfnZ+DlWUWhJuZQcTBC/bc4BITjXKrV3ZWpC1OdBrup12vsDUnfEBUE0dRoN + geArlRGlRSbyNcIaAoeAcIQmoAyFFpVkHKHBxQj+JnH8QpWvxLYbBV0eH19BqQIGSgT2Z6CpLBsi + UES0mcpuLk+1b6kxQSf93vk5ehppt1JTU+oPd6iqObptdHX36IC+lhpzLNZ25vtVnXmprjAP89Z8 + HrlWtOarwt28l7dke5sufqvy53lFSfvaPUiLkobrup/g2y3ljVcot55JcaFI2UzETt0FKVvXbqvr + HXTb7RcQfGlVdNhtg9T7ZQ426Kaf7oC366Y3apQ3ark37KYrCBZp+bUrOya9RsUWXqVyDD4wKXV2 + W23CrSt27aD74Ka6N+hAVDtc8pBdF3BMtt5VUlZJ+10lZZV0fi+VHC2rZAet526OkLbXxBZZzjuo + ey1IM7rn77hdbynLFc3ZioOsIKIsy+Jl8wmiC6n1LdPl5+Ne//rzMfrj4hJ9vTz51uv/22J8qQ68 + yDolkTBfn3SjICG6b8KJGnNB/14uKaq7Lx6RvBrWmRn6CPibQveV0hhjie4JYQik4RMS5O3YGx9m + 4kA8sSoDsnBs+KHfq353k/n73fxE1OfQT/tqcHN5Ylkkr5/T0lUuldDTMaeymfnTJpHmd9oEeSHL + 1PsGeK0NoFvEbFWFTqZc/MgdrALtF5OP0Olp/317/T7bqzjSe99d7+nl9fx/g5Hqs6ZJAm17sOKW + pj3PSEWlCe9uqtC38UVrFlixQzeI9ZsNC1805VyiuWrEefdU9bwwBi6s+v85jd9J/q6Y1+8oirX3 + Oi76jE45G00pG6FTe7ueEUEDiu2HJaUftvc7+5VBrlD56w2T/jXTc6SIKqUlwCw9cvxPQsSshmY8 + QXgkCCSzMVbm45SGIQInTJOSyVCchTM05KUOH0+HSYjiRMRcEgmYA4OkhhIWEIEYRz4VfhJBb89A + tBSxpmAjV7xr4f3gfUQ4DPm0hgjD9yGpIbAth3QpplTqoVgcc6FM/jQnhhGV2gPR/QyZQFFDsHVJ + POaM1CzMgGaIfUkjGpoMHEGQ09xKDrtLHyTWEISBiAif4hDhAFxbUQlaMxzIGEf/QCUlfGh9zLhE + YzoaowkPkwg4hroA8pJBCUuBHV8J8FJfn1rqQRqowyhcEtCatgVEYptZrZt0sZZJ1xqLrv4BvqmS + SM6kIpH8qAFNQZHOuPShDMoyp9TAdhxYWNio3E0iPdaRBq8A4cwcMuIBHc70M21E44wgBmbwiEbE + xmZ7nrH4sttBDXUPgUlbz0DE2hizaraOUwnPMMMjM8v9cH320YL+KjjUh4amDvaY0cw7QSESrCOk + LrkwCuiIKjA0RJkwsAl+M+PARbI4nw42zfh0ZerIC5xU7+DmehUkgEPXPejsHUGgtQMFQ58TwWPt + LgDXgQjZctstd19nfDtZb3OfciWDTX0sFeZj3qhQXj2d4oJ26qCdgOrwYY9ZX3CTcMecrJz9/bpj + 5OZ8kqlHvhsM7astamHanXmq0b5I1//DiXTqcnO59AD5RZpfRrfb3fEE7g1s8DyZ1k60Kw7pn3vJ + +vkUn7pHXHmL+Je9MKTLlk7bksd0qauzyaMDEjkxFpIEA+ufDvL/QXC6jvVvAWm7qi82rL4HACAZ + njR4Eel0vztVI2GArh4Ez18uTWPhVdXQd/5qNcqqAa8DivAFSY2gcYKMK2b/n8xPfYMDSzXIBj85 + /IqLjAX8vBufQy/dZSig59dJwBK60gL4chsCYNAX4AHVV0GgwdL6ysox808jpqiDV2taUlhm6liA + tBpJzYXuHbX9DApH33op0chYW3XYpWXOOKh4DQISouC95hy6/vQvqPShYkr/1mrg2gf7PSNdwvT/ + vnSdmyuLO2sa5Oi0sYYxiIXwcn4u+3a8zg/tN+HzjZgqgpE2MWxgBs2Drz1PmgGjk88WnYV7D3qd + fUsBMNozevOgXX6wOJ81m24eRo2CFoKn9vDNQ6YNvRwoNTGj0LlLP7krBAnAz0mgu1IIFHQCJUGB + I4uWBlV5KcAkzNfHW/ogJBgURPX2SS9vDYqLYwvKeHz8L4F3FIQGNgAA + headers: + Cache-Control: + - no-store, no-cache, must-revalidate + Content-Encoding: + - gzip + Content-Security-Policy: + - 'default-src * data: blob: ''unsafe-eval'' ''unsafe-inline''' + Content-Type: + - application/json + Date: + - Tue, 17 Dec 2024 16:48:10 GMT + Expires: + - Thu, 19 Nov 1981 08:52:00 GMT + Pragma: + - no-cache + Set-Cookie: + - dtsession=0o9kns7olf1sqr97off59och3sob50o6tm225h1dsjlom13o34bpkjog3l0ukoc2honq742n2ur3skuf26b2e2gd761p44dtjn6krr4; + expires=Thu, 16-Jan-2025 16:48:10 GMT; Max-Age=2592000; path=/; domain=.domaintools.com; + secure; HttpOnly + Strict-Transport-Security: + - max-age=31536000; includeSubDomains + Transfer-Encoding: + - chunked + status: + code: 200 + message: OK +version: 1 diff --git a/tests/test_api.py b/tests/test_api.py index cb6906e..89b8481 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -153,6 +153,32 @@ def test_parsed_whois(): assert isinstance(result.flattened(), dict) +@vcr.use_cassette +def test_parsed_domain_rdap(): + api_call = api.parsed_domain_rdap("google.com") + with api_call as result: + for key in ( + "handle", + "domain_statuses", + "creation_date", + "last_changed_date", + "expiration_date", + "nameservers", + "conformance", + "emails", + "email_domains", + "unclassified_emails", + "registrar", + "contacts", + ): + assert key in result.get("parsed_domain_rdap") + + for key, value in result.items(): + assert key + + assert isinstance(result.flattened(), dict) + + @vcr.use_cassette def test_registrant_monitor(): api_call = api.registrant_monitor("google") @@ -319,18 +345,14 @@ def test_exception_handling(): assert "not understand" in exception.reason["error"]["message"] with pytest.raises(exceptions.NotFoundException): - api._results( - "i_made_this_product_up", "/v1/steianrstierstnrsiatiarstnsto.com/whois" - ).data() + api._results("i_made_this_product_up", "/v1/steianrstierstnrsiatiarstnsto.com/whois").data() with pytest.raises(exceptions.NotAuthorizedException): API("notauser", "notakey").domain_search("amazon").data() with pytest.raises( ValueError, match=r"Invalid value 'notahash' for 'key_sign_hash'. Values available are sha1,sha256,md5", ): - API( - "notauser", "notakey", always_sign_api_key=True, key_sign_hash="notahash" - ).domain_search("amazon") + API("notauser", "notakey", always_sign_api_key=True, key_sign_hash="notahash").domain_search("amazon") @vcr.use_cassette @@ -457,9 +479,7 @@ def test_iris_detect_monitors(): @vcr.use_cassette def test_iris_detect_new_domains(): - detect_results = api.iris_detect_new_domains( - monitor_id="nAwmQg2pqg", sort=["risk_score"], order="desc" - ) + detect_results = api.iris_detect_new_domains(monitor_id="nAwmQg2pqg", sort=["risk_score"], order="desc") assert detect_results["watchlist_domains"][0]["risk_score"] == 100 @@ -468,9 +488,7 @@ def test_iris_detect_watched_domains(): detect_results = api.iris_detect_watched_domains() assert detect_results["count"] >= 0 - detect_results = api.iris_detect_watched_domains( - monitor_id="nAwmQg2pqg", sort=["risk_score"], order="desc" - ) + detect_results = api.iris_detect_watched_domains(monitor_id="nAwmQg2pqg", sort=["risk_score"], order="desc") assert len(detect_results["watchlist_domains"]) == 0 detect_results = api.iris_detect_watched_domains(escalation_types="blocked") @@ -479,23 +497,17 @@ def test_iris_detect_watched_domains(): @vcr.use_cassette def test_iris_detect_manage_watchlist_domains(): - detect_results = api.iris_detect_manage_watchlist_domains( - watchlist_domain_ids=["gae08rdVWG"], state="watched" - ) + detect_results = api.iris_detect_manage_watchlist_domains(watchlist_domain_ids=["gae08rdVWG"], state="watched") assert detect_results["watchlist_domains"][0]["state"] == "watched" @vcr.use_cassette def test_iris_detect_escalate_domains(): # If you rerun this test without VCR, it will fail because the domain is already escalated - detect_results = api.iris_detect_escalate_domains( - watchlist_domain_ids=["OWxzqKqQEY"], escalation_type="blocked" - ) + detect_results = api.iris_detect_escalate_domains(watchlist_domain_ids=["OWxzqKqQEY"], escalation_type="blocked") assert detect_results["escalations"][0]["escalation_type"] == "blocked" - detect_results = api.iris_detect_escalate_domains( - watchlist_domain_ids=["OWxzqKqQEY"], escalation_type="google_safe" - ) + detect_results = api.iris_detect_escalate_domains(watchlist_domain_ids=["OWxzqKqQEY"], escalation_type="google_safe") assert detect_results["escalations"][0]["escalation_type"] == "google_safe" diff --git a/tests/test_cli.py b/tests/test_cli.py index 2f734aa..eb3a8b3 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -22,7 +22,7 @@ def test_valid_command(): user = os.environ.get("TEST_USER", "test") key = os.environ.get("TEST_KEY", "key") result = runner.invoke(dt_cli, ["account_information", "--help"]) - assert "Usage: main account_information" in result.stdout + assert "Provides a snapshot of your accounts current API usage." in result.stdout def test_invalid_command(): diff --git a/tox.ini b/tox.ini index 7f99908..e7c06b0 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist=py36, py37, py38, py39, py310 +envlist=py39, py310, py311 skip_missing_interpreters=true [testenv]