An extensible framework for crawling things on the web.
Crawl the things was initially created to ingest and process news data from Common Crawl, but can be used to directly crawl and process pretty much anything.
There are three components to a crawler:
- Ingestor: a component that implements the
Ingestorinterface. AnIngestorimplements anext() -> Recordfunction - Processor: a component that implements the
Processorinterface. AProcessorimplements aprocess(record: Record)function - Storage: provides a
StorageObjectabstraction. AStorageObjectexposes aappend(payload: string)function
At a high-level, a crawl run will initialize an ingestor that emits records, which are processed by a processor whose results are periodically flushed to a storage object.
Here is a simplified, single-threaded illustration (see src/main.py for the actual multi-threaded implementation):
# Init in-memory results
results=[]
# Init my ingestor with ingestor specific params (e.g. WARC index for a WARC ingestor)
ingestor=MyIngestor('type', params)
# Init a storage object (e.g. local file or S3 object)
storage_object=StorageObject(params)
for record in ingestor:
# Process a record generated by the ingestor and append the result to results
result=MyProcessor.process(record)
results.append(result)
# Periodically flush the results to stable storage
if len(results) > 0 and len(results) % 100 == 0:
flush_results(storage_object, results)
del results[:]pipenv run python3 src/main.py -h
usage: main.py [-h] -i INPUT [-o OUTPUT] -p PROCESSOR -I INGESTOR [-t THREADS]
optional arguments:
-h, --help show this help message and exit
-i INPUT, --input INPUT
Input file containing ingest-specific configuration
-o OUTPUT, --output OUTPUT
Output path (e.g. s3://<region>.<bucket>/<path> or file://<path>)
-p PROCESSOR, --processor PROCESSOR
Processor to use (e.g. news)
-I INGESTOR, --ingestor INGESTOR
Ingestor to use (e.g. warc-index)
-t THREADS, --threads THREADS
Number of threads (default=16)
Note that the number of threads refers to the processor (e.g. news) threadpool. The ingestor is single threaded. To parallelize
the ingestion, you should partition the ingestion index and invoke main.py many times.
Wrapper scripts for crawling and processing news articles are provided in projects/news.
Here we will crawl a subset of news articles for March 2021.
- Create virtual env and install packages:
pipenv install - Set AWS credentials:
AWS_ACCESS_KEY_IDandAWS_SECRET_ACCESS_KEY(authenticated access required for WARC as of 04/2022) - Run:
projects/news/ingest_month.sh 2021 03 0 1( )
When running the script you should see output like:
2022-04-24 09:03:43,863 Fetching index entry: crawl-data/CC-NEWS/2021/03/CC-NEWS-20210301012354-00478.warc.gz
2022-04-24 09:03:43,864 Fetched index entry: crawl-data/CC-NEWS/2021/03/CC-NEWS-20210301012354-00478.warc.gz
2022-04-24 09:03:43,865 Downloading crawl-data/CC-NEWS/2021/03/CC-NEWS-20210301012354-00478.warc.gz to /tmp/23791229-921f-4877-b059-a652e9413c78
Downloading can take a while depending on your network card and location. For example, with a home Internet connection
on the West coast, downloading each WARC takes over 10 minutes, while a 10G interface in AWS takes seconds.
You will see tracebacks (usually from lxml, because not everything can be parsed).
As long as you see occasional Appending 100 results logs, the script is making progress.
Once the script completes, you should have a few artifacts:
-
An index file will get created at
./news-03-2021-0-1.files, which contains the WARC index locations. In this case, there should only be one. Each month of news can be anywhere from 300-500 WARC index files. To process more, increaselength.Since the indexes for a given month can consume 300-500 GB, in practice, you would likely invoke
ingest_month.shmany times to process different WARC files in parallel; otherwise, you'll be waiting around a long time. -
The results will be in
/tmp/crawlthenews-2021-03-0-1.json. This is a big JSON array containing one article per entry. Each article will have the following data:
{
"uri": <source URL>
"ts": <article timestamp>,
"title": <article title>
"text": <article body>
}That's it! Now you can download a bunch of news articles and run analysis (e.g. NLP) on the extracted text.
For scale, you'll likely need to run this in AWS of GCP. For that, we use Pulumi to create the cloud resources for running a crawl.
Currently, there are two Ingestor implementations: CSV and WARC ingestors. The CSV ingestor simply ingests a CSV file
and is mostly used for local testing. The WARC ingestor is initialized with a list of WARC index locations on S3. It
parses and iterates the indexes and emits the raw data for each WARC record.
For example, if you initialize the WARC ingestor with:
crawl-data/CC-NEWS/2021/03/CC-NEWS-20210301012354-00478.warc.gz
crawl-data/CC-NEWS/2021/03/CC-NEWS-20210301040226-00479.warc.gz
crawl-data/CC-NEWS/2021/03/CC-NEWS-20210301060337-00480.warc.gz
crawl-data/CC-NEWS/2021/03/CC-NEWS-20210301074929-00481.warc.gz
crawl-data/CC-NEWS/2021/03/CC-NEWS-20210301091853-00482.warc.gz
It will sequentially process each index file by downloading it, iterating through each record and emitting the raw data (e.g. raw HTML) for each record.
Note that each WARC index is roughly 1 GB, so you'll want to put processing close to the S3 bucket (e.g. us-east). In addition, if you are processing a lot of WARC files, you should also spawn many crawl instances.
This framework can be used to ingest just about anything, provided you specify the locations to crawl:
- Web pages: create a
WebIngestorthat is initialized with a list of URLs and emits the raw HTML for each URL - Files: create a
FileIngestorthat is initialized with a list of files and emits the contents of each file - You get the idea... It is pretty simple.
As of now, there is no reason to create an ingestor outside WARC, because, well, Common Crawl does the actual
crawling for us. We just need to convert the indexes and records into a Record for processing.
To create a new ingestor, implement this interface, put the implementation in src/ingestion and add an initializer
for the ingestor in src/main.py:main():
class Ingestor(Iterable):
def __next__(self) -> Record:
return self.next()
def __iter__(self):
return self.iter()
def next(self) -> Record:
pass
def iter(self):
return self;Note: ingestors are assumed to not be threadsafe.
We currently have three processor implementations:
- News: a news processor built on a fork of newspaper. Newspaper is a Python library that parses news websites. The maintainer has been inactive for years, so I created a fork to fix a few bugs, mostly around the text detection algorithm in newspaper. This processor will detect and extract title, author and body from news articles, which can be used to perform analysis of historical news articles.
- Copy: a simple processor that simply copies the content from a
Record. This is mostly helpful in testing. - Rotten Tomatoes: a processor that extracts audience and critic scores from Rotten Tomatoes pages.
To create a new processor, implement this interface, put the implementation in src/processor and add a call
to the processor in src/main.py:do_process():
class Processor:
def _init__(self, results: List[Dict], mutex: threading.Lock):
self.mutex = mutex
self.results = results
def process(self, record: Record):
passNote: processors must serialize access to the shared results using the provided mutex.
Two StorageObject implementations are provided:
- S3: storage object backed by an S3 object
- file: storage object backed by a local file
See the current implementation to add support to other types of backing stores.
If you need to process a lot of data (e.g. many months or years of news), running the crawler locally will likely be too slow. We have provided the infra config needed to run multiple crawlers as AWS Batch jobs. You will need both Docker and Pulumi to deploy to AWS.
Be sure to install Pulumi
Warning: Running in the cloud costs money. Be sure to monitor your usage and potentially set limits on what you are willing to spend.
Before creating a new image, be sure to set your S3 bucket, if you want results to go to S3: BUCKET_NAME=<your bucket> in projects/news/ingest_month.sh
Your ECR_ENDPOINT should be something like <id>.dkr.ecr.<region>.amazonaws.com
From the root of this repo, generate a Docker image by running:
# aws ecr get-login-password --region us-west-2 | docker login --username AWS --password-stdin ${ECR_ENDPOINT}
# docker build . -f Dockerfile.processor -t ${ECR_ENDPOINT}/processor:latest
# docker push ${ECR_ENDPOINT}/processor:latest
First, change to the infra/jobs directory.
Next, run pulumi stack init (or pulumi stack init --secrets-provider <kms uri> if you want to use a KMS). This will
create a file called Pulumi.<name-you-provided>.yaml. Open the file and edit as you see fit. Of course, you'll have to
provide your own public key (optional) and VPC id:
secretsprovider: <this should be your KMS, if you chose to go that route>
aws:region: us-west-2
jobs:instance_type: c6g.4xlarge
jobs:public_key: <your SSH public key>
jobs:ssm_arn: arn:aws:iam::884270592710:policy/SessionManagerPermissions
jobs:subnet_cidr: 10.1.1.0/24
jobs:vpc_id: <your VPC id>Next, ensure you have a valid stack by previewing: pulumi preview -s <name-you-provided>:
Previewing update (<name-you-provided>):
Type Name Plan
+ pulumi:pulumi:Stack jobs-test create
+ ├─ aws:iam:Role awsBatchServiceRoleRole create
+ ├─ aws:iam:Role ecsInstanceRoleRole create
+ ├─ aws:ec2:Subnet crawlTheThingsSubnet create
+ ├─ aws:ec2:SecurityGroup crawlTheThingsVpcSecurityGroup create
+ ├─ aws:ec2:KeyPair key_pair create
+ ├─ aws:ec2:InternetGateway igw create
+ ├─ aws:iam:RolePolicyAttachment awsBatchServiceRoleRolePolicyAttachment create
+ ├─ aws:iam:RolePolicyAttachment ecsInstanceRoleRolePolicyAttachment create
+ ├─ aws:iam:RolePolicyAttachment ssmRolePolicyAttachment create
+ ├─ aws:iam:InstanceProfile ecsInstanceRoleInstanceProfile create
+ ├─ aws:ec2:RouteTable egressRoutes create
+ ├─ aws:ec2:RouteTableAssociation egressRouteAssociation create
+ ├─ aws:batch:ComputeEnvironment crawlTheThings create
+ └─ aws:batch:JobQueue crawlTheThings create
Resources:
+ 15 to create
If you do not get errors, run pulumi up -s <name-you-provided>. Creation should take a few minutes.
Next, navigate to https://us-west-2.console.aws.amazon.com/batch. You should see a Job Queue and Compute Environment.
Now, create a Job definition:
- Under job definitions, hit
Create - Choose an appropriate
Nameand set a decent timeout (maybe an hour) - Platform compatibility should be
EC2 - Job configuration:
- The
Imageshould beECR_ENDPOINT/processor:latest(you need to specify your ECR endpoint) - The
Commandshould be/main/projects/news/ingest_month.sh 2021 06 0 5 - I'd choose
16vCPUs and16GBof memory (this could change depending on your instance type)
- The
Finally, let's run the job by selecting the new Job Definition and clicking Submit new job:
- Provide a
name - Select the newly created job definition
- Select the
Job queuewith the prefixcrawlTheThings - Set the following environment variables in the
Job configurationsection:AWS_ACCESS_KEY_IDandAWS_SECRET_ACCESS_KEY - You will also need to set
AWS_S3_HOSTtos3-<s3-bucket-region>.amazonaws.com
You should be able to monitor progress in the AWS Batch Dashboard. Note, this will spin up resources in ECS and EC2 on-demand, so it may be a few minutes before the job starts.