Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions domaintools/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -1218,3 +1218,34 @@ def realtime_domain_risk(self, **kwargs) -> FeedsResults:
cls=FeedsResults,
**kwargs,
)

def domainhotlist(self, **kwargs) -> FeedsResults:
"""Returns back list of domain hotlist feed.
Contains high-risk, apex-level domains that are observed by DomainTools' global sensor network to be active within 24 hours.

domain: str: Filter for an exact domain or a substring contained within a domain by prefixing or suffixing your substring with "*". Check the documentation for examples

before: str: Filter for records before the given time value inclusive or time offset relative to now

after: str: Filter for records after the given time value inclusive or time offset relative to now

headers: bool: Use in combination with Accept: text/csv headers to control if headers are sent or not

sessionID: str: A custom string to distinguish between different sessions

top: int: Limit the number of results to the top N, where N is the value of this parameter.
"""
validate_feeds_parameters(kwargs)
endpoint = kwargs.pop("endpoint", Endpoint.FEED.value)
source = ENDPOINT_TO_SOURCE_MAP.get(endpoint).value
if endpoint == Endpoint.DOWNLOAD.value or kwargs.get("output_format", OutputFormat.JSONL.value) != OutputFormat.CSV.value:
# headers param is allowed only in Feed API and CSV format
kwargs.pop("headers", None)

return self._results(
f"domain-hotlist-feed-({source})",
f"v1/{endpoint}/domainhotlist/",
response_path=(),
cls=FeedsResults,
**kwargs,
)
80 changes: 80 additions & 0 deletions domaintools/cli/commands/feeds.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,86 @@ def feeds_noh(
DTCLICommand.run(name=c.FEEDS_NOH, params=ctx.params)


@dt_cli.command(
name=c.FEEDS_DOMAINHOTLIST,
help=get_cli_helptext_by_name(command_name=c.FEEDS_DOMAINHOTLIST),
)
def feeds_domainhotlist(
ctx: typer.Context,
user: str = typer.Option(None, "-u", "--user", help="Domaintools API Username."),
key: str = typer.Option(None, "-k", "--key", help="DomainTools API key"),
creds_file: str = typer.Option(
"~/.dtapi",
"-c",
"--credfile",
help="Optional file with API username and API key, one per line.",
),
no_verify_ssl: bool = typer.Option(
False,
"--no-verify-ssl",
help="Skip verification of SSL certificate when making HTTPs API calls",
),
no_sign_api_key: bool = typer.Option(
False,
"--no-sign-api-key",
help="Skip signing of api key",
),
header_authentication: bool = typer.Option(
True,
"--no-header-auth",
help="Don't use header authentication",
),
output_format: str = typer.Option(
"jsonl",
"-f",
"--format",
help=f"Output format in [{OutputFormat.JSONL.value}, {OutputFormat.CSV.value}]",
callback=DTCLICommand.validate_feeds_format_input,
),
endpoint: str = typer.Option(
Endpoint.FEED.value,
"-e",
"--endpoint",
help=f"Valid endpoints: [{Endpoint.FEED.value}, {Endpoint.DOWNLOAD.value}]",
callback=DTCLICommand.validate_endpoint_input,
),
sessionID: str = typer.Option(
None,
"--session-id",
help="Unique identifier for the session",
),
after: str = typer.Option(
None,
"--after",
help="Start of the time window, relative to the current time in seconds, for which data will be provided",
callback=DTCLICommand.validate_after_or_before_input,
),
before: str = typer.Option(
None,
"--before",
help="The end of the query window in seconds, relative to the current time, inclusive",
callback=DTCLICommand.validate_after_or_before_input,
),
domain: str = typer.Option(
None,
"-d",
"--domain",
help="A string value used to filter feed results",
),
headers: bool = typer.Option(
False,
"--headers",
help="Adds a header to the first line of response when text/csv is set in header parameters",
),
top: str = typer.Option(
None,
"--top",
help="Number of results to return in the response payload. This is ignored in download endpoint",
),
):
DTCLICommand.run(name=c.FEEDS_DOMAINHOTLIST, params=ctx.params)


@dt_cli.command(
name=c.FEEDS_REALTIME_DOMAIN_RISK,
help=get_cli_helptext_by_name(command_name=c.FEEDS_REALTIME_DOMAIN_RISK),
Expand Down
1 change: 1 addition & 0 deletions domaintools/cli/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
FEEDS_NAD = "nad"
FEEDS_NOD = "nod"
FEEDS_NOH = "noh"
FEEDS_DOMAINHOTLIST = "domainhotlist"
FEEDS_DOMAINRDAP = "domainrdap"
FEEDS_DOMAINDISCOVERY = "domaindiscovery"
FEEDS_REALTIME_DOMAIN_RISK = "realtime_domain_risk"
1 change: 1 addition & 0 deletions domaintools/cli/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ def _iris_investigate_helptext():
c.FEEDS_NAD: "Returns back newly active domains feed.",
c.FEEDS_NOD: "Returns back newly observed domains feed.",
c.FEEDS_NOH: "Returns back newly observed hosts feed.",
c.FEEDS_DOMAINHOTLIST: "Returns domaint hotlist feed.",
c.FEEDS_DOMAINRDAP: "Returns changes to global domain registration information, populated by the Registration Data Access Protocol (RDAP).",
c.FEEDS_DOMAINDISCOVERY: "Returns new domains as they are either discovered in domain registration information, observed by our global sensor network, or reported by trusted third parties.",
c.FEEDS_REALTIME_DOMAIN_RISK: "Returns realtime domain risk information for apex-level domains, regardless of observed traffic.",
Expand Down
2 changes: 2 additions & 0 deletions domaintools/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ class OutputFormat(Enum):
"newly-observed-domains-feed-(s3)",
"newly-observed-hosts-feed-(api)",
"newly-observed-hosts-feed-(s3)",
"domain-hotlist-feed-(api)",
"domain-hotlist-feed-(s3)",
"domain-registration-data-access-protocol-feed-(api)",
"domain-registration-data-access-protocol-feed-(s3)",
"domain-risk-feed-(api)",
Expand Down
Loading
Loading