Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions domaintools/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -1187,3 +1187,34 @@ def noh(self, **kwargs) -> FeedsResults:
cls=FeedsResults,
**kwargs,
)

def realtime_domain_risk(self, **kwargs) -> FeedsResults:
"""Returns back list of the realtime domain risk feed.
Contains realtime domain risk information for apex-level domains, regardless of observed traffic.

domain: str: Filter for an exact domain or a substring contained within a domain by prefixing or suffixing your substring with "*". Check the documentation for examples

before: str: Filter for records before the given time value inclusive or time offset relative to now

after: str: Filter for records after the given time value inclusive or time offset relative to now

headers: bool: Use in combination with Accept: text/csv headers to control if headers are sent or not

sessionID: str: A custom string to distinguish between different sessions

top: int: Limit the number of results to the top N, where N is the value of this parameter.
"""
validate_feeds_parameters(kwargs)
endpoint = kwargs.pop("endpoint", Endpoint.FEED.value)
source = ENDPOINT_TO_SOURCE_MAP.get(endpoint).value
if endpoint == Endpoint.DOWNLOAD.value or kwargs.get("output_format", OutputFormat.JSONL.value) != OutputFormat.CSV.value:
# headers param is allowed only in Feed API and CSV format
kwargs.pop("headers", None)

return self._results(
f"domain-risk-({source})",
f"v1/{endpoint}/domainrisk/",
response_path=(),
cls=FeedsResults,
**kwargs,
)
1 change: 1 addition & 0 deletions domaintools/cli/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ def run(cls, name: str, params: Optional[Dict] = {}, **kwargs):
verify_ssl=verify_ssl,
rate_limit=rate_limit,
always_sign_api_key=always_sign_api_key,
api_url="https://api.domaintools.test",
)
dt_api_func = getattr(dt_api, name)

Expand Down
80 changes: 80 additions & 0 deletions domaintools/cli/commands/feeds.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,3 +394,83 @@ def feeds_noh(
),
):
DTCLICommand.run(name=c.FEEDS_NOH, params=ctx.params)


@dt_cli.command(
name=c.FEEDS_REALTIME_DOMAIN_RISK,
help=get_cli_helptext_by_name(command_name=c.FEEDS_REALTIME_DOMAIN_RISK),
)
def feeds_realtime_domain_risk(
ctx: typer.Context,
user: str = typer.Option(None, "-u", "--user", help="Domaintools API Username."),
key: str = typer.Option(None, "-k", "--key", help="DomainTools API key"),
creds_file: str = typer.Option(
"~/.dtapi",
"-c",
"--credfile",
help="Optional file with API username and API key, one per line.",
),
no_verify_ssl: bool = typer.Option(
False,
"--no-verify-ssl",
help="Skip verification of SSL certificate when making HTTPs API calls",
),
no_sign_api_key: bool = typer.Option(
False,
"--no-sign-api-key",
help="Skip signing of api key",
),
header_authentication: bool = typer.Option(
True,
"--no-header-auth",
help="Don't use header authentication",
),
output_format: str = typer.Option(
"jsonl",
"-f",
"--format",
help=f"Output format in [{OutputFormat.JSONL.value}, {OutputFormat.CSV.value}]",
callback=DTCLICommand.validate_feeds_format_input,
),
endpoint: str = typer.Option(
Endpoint.FEED.value,
"-e",
"--endpoint",
help=f"Valid endpoints: [{Endpoint.FEED.value}, {Endpoint.DOWNLOAD.value}]",
callback=DTCLICommand.validate_endpoint_input,
),
sessionID: str = typer.Option(
None,
"--session-id",
help="Unique identifier for the session",
),
after: str = typer.Option(
None,
"--after",
help="Start of the time window, relative to the current time in seconds, for which data will be provided",
callback=DTCLICommand.validate_after_or_before_input,
),
before: str = typer.Option(
None,
"--before",
help="The end of the query window in seconds, relative to the current time, inclusive",
callback=DTCLICommand.validate_after_or_before_input,
),
domain: str = typer.Option(
None,
"-d",
"--domain",
help="A string value used to filter feed results",
),
headers: bool = typer.Option(
False,
"--headers",
help="Adds a header to the first line of response when text/csv is set in header parameters",
),
top: str = typer.Option(
None,
"--top",
help="Number of results to return in the response payload. This is ignored in download endpoint",
),
):
DTCLICommand.run(name=c.FEEDS_REALTIME_DOMAIN_RISK, params=ctx.params)
1 change: 1 addition & 0 deletions domaintools/cli/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,4 @@
FEEDS_NOH = "noh"
FEEDS_DOMAINRDAP = "domainrdap"
FEEDS_DOMAINDISCOVERY = "domaindiscovery"
FEEDS_REALTIME_DOMAIN_RISK = "realtime_domain_risk"
1 change: 1 addition & 0 deletions domaintools/cli/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def _iris_investigate_helptext():
c.FEEDS_NOH: "Returns back newly observed hosts feed.",
c.FEEDS_DOMAINRDAP: "Returns changes to global domain registration information, populated by the Registration Data Access Protocol (RDAP).",
c.FEEDS_DOMAINDISCOVERY: "Returns new domains as they are either discovered in domain registration information, observed by our global sensor network, or reported by trusted third parties.",
c.FEEDS_REALTIME_DOMAIN_RISK: "Returns realtime domain risk information for apex-level domains, regardless of observed traffic.",
}


Expand Down
2 changes: 2 additions & 0 deletions domaintools/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ class OutputFormat(Enum):
"newly-observed-hosts-feed-(s3)",
"domain-registration-data-access-protocol-feed-(api)",
"domain-registration-data-access-protocol-feed-(s3)",
"domain-risk-feed-(api)",
"domain-risk-feed-(s3)",
"real-time-domain-discovery-feed-(api)",
"real-time-domain-discovery-feed-(s3)",
]
Loading
Loading