Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
173c165
Add sparkccfile.py to support file-wise processing in spark jobs (use…
jt55401 Jul 31, 2024
8405c52
Merge CCFileProcessorSparkJob into sparkcc.py
jt55401 Aug 3, 2024
463f5a2
Add CCFileProcessorSparkJob example and link in readme for it.
jt55401 Aug 3, 2024
850b9d6
fix s3 functions so they work in spark environment
jt55401 Sep 10, 2024
555ae79
fix bug when the file was not downloaded from s3
jt55401 Sep 11, 2024
6b3c0fc
fix: fix scheme to recognize s3a and s3n
jt55401 Nov 27, 2024
33b4416
fix: don't log 404 errors when checking for existence
jt55401 Nov 27, 2024
ee36b34
docs: update description of MD5Sum job
sebastian-nagel May 27, 2025
c3e3a29
refactor: Python code style
sebastian-nagel May 27, 2025
a5bd4a2
fix(CCFileProcessor): catch exceptions while opening local files,
sebastian-nagel Aug 19, 2025
d302b30
refactor(CCFileProcessor): rename command-line option
sebastian-nagel Aug 19, 2025
6ed36a3
refactor(CCFileProcessor): remove unused methods and code
sebastian-nagel Aug 19, 2025
c444cbe
Bundle functionality shared between methods of CCSparkJob
sebastian-nagel Sep 17, 2025
15481a3
CCFileProcessor: Raise an exception if input or output
sebastian-nagel Sep 18, 2025
5e21315
Experimental WARC-to-WET extractor
sebastian-nagel Sep 18, 2025
cf3e6c0
docs: improve descriptions of CCSparkJob and derived core classes
sebastian-nagel Sep 19, 2025
4a48c9e
Add more FastWARC examples and complete README
sebastian-nagel Sep 19, 2025
7c60145
fix(README): update Hadoop version and instructions about S3/S3A support
sebastian-nagel Sep 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 24 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,18 @@ This project provides examples how to process the Common Crawl dataset with [Apa

+ [count HTML tags](./html_tag_count.py) in Common Crawl's raw response data (WARC files)

+ [count web server names](./server_count.py) in Common Crawl's metadata (WAT files or WARC files)
+ [count web server names](./server_count.py) in Common Crawl's metadata (HTTP headers in WAT or WARC files)

+ list host names and corresponding [IP addresses](./server_ip_address.py) (WAT files or WARC files)

+ [word count](./word_count.py) (term and document frequency) in Common Crawl's extracted text (WET files)

+ [md5sum](./md5sum.py) Run an external command (`md5sum`) on a list of files from a manifest – WARC, WET, WAT, or any other type of file.

+ [extract links](./wat_extract_links.py) from WAT files and [construct the (host-level) web graph](./hostlinks_to_graph.py) – for further details about the web graphs see the project [cc-webgraph](https://github.com/commoncrawl/cc-webgraph)

+ [WET extractor](./wet_extractor.py), using FastWARC and Resiliparse. See also [Using FastWARC](#using-fastwarc-to-read-warc-files).

+ work with the [columnar URL index](https://commoncrawl.org/2018/03/index-to-warc-files-and-urls-in-columnar-format/) (see also [cc-index-table](https://github.com/commoncrawl/cc-index-table) and the notes about [querying the columnar index](#querying-the-columnar-index)):

- run a SQL query and [export the result as a table](./cc_index_export.py)
Expand All @@ -39,7 +43,7 @@ pip install -r requirements.txt

## Compatibility and Requirements

Tested with with Spark 3.2.3, 3.3.2, 3.4.1, 3.5.5 in combination with Python 3.8, 3.9, 3.10, 3.12 and 3.13. See the branch [python-2.7](/commoncrawl/cc-pyspark/tree/python-2.7) if you want to run the job on Python 2.7 and older Spark versions.
Tested with with Spark 3.2.3, 3.3.2, 3.4.1, 3.5.5, 3.5.6 in combination with Python 3.8, 3.9, 3.10, 3.12 and 3.13. See the branch [python-2.7](/commoncrawl/cc-pyspark/tree/python-2.7) if you want to run the job on Python 2.7 and older Spark versions.


## Get Sample Data
Expand Down Expand Up @@ -160,7 +164,7 @@ Querying the columnar index using cc-pyspark requires authenticated S3 access. T

#### Installation of S3 Support Libraries

While WARC/WAT/WET files are read using boto3, accessing the [columnar URL index](https://commoncrawl.org/2018/03/index-to-warc-files-and-urls-in-columnar-format/) (see option `--query` of CCIndexSparkJob) is done directly by the SparkSQL engine and requires that S3 support libraries are available. These libs are usually provided when the Spark job is run on a Hadoop cluster running on AWS (eg. EMR). However, they may not be provided for any Spark distribution and are usually absent when running Spark locally (not in a Hadoop cluster). In these situations, the easiest way is to add the libs as required packages by adding `--packages org.apache.hadoop:hadoop-aws:3.2.1` to the arguments of `spark-submit`. This will make [Spark manage the dependencies](https://spark.apache.org/docs/latest/submitting-applications.html#advanced-dependency-management) - the hadoop-aws package and transitive dependencies are downloaded as Maven dependencies. Note that the required version of hadoop-aws package depends on the Hadoop version bundled with your Spark installation, e.g., Spark 3.2.1 bundled with Hadoop 3.2 ([spark-3.2.1-bin-hadoop3.2.tgz](https://archive.apache.org/dist/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz)).
While WARC/WAT/WET files are read using boto3, accessing the [columnar URL index](https://commoncrawl.org/2018/03/index-to-warc-files-and-urls-in-columnar-format/) (see option `--query` of CCIndexSparkJob) is done directly by the SparkSQL engine and requires that S3 support libraries are available. These libs are usually provided when the Spark job is run on a Hadoop cluster running on AWS (eg. EMR). However, they may not be provided for any Spark distribution and are usually absent when running Spark locally (not in a Hadoop cluster). In these situations, the easiest way is to add the libs as required packages by adding `--packages org.apache.hadoop:hadoop-aws:3.3.4` to the arguments of `spark-submit`. This will make [Spark manage the dependencies](https://spark.apache.org/docs/latest/submitting-applications.html#advanced-dependency-management) - the hadoop-aws package and transitive dependencies are downloaded as Maven dependencies. Note that the required version of hadoop-aws package depends on the Hadoop version bundled with your Spark installation, e.g., Spark 3.5.6 bundled with Hadoop 3.3.4 ([spark-3.5.6-bin-hadoop3.tgz](https://archive.apache.org/dist/spark/spark-3.5.6/spark-3.5.6-bin-hadoop3.tgz)). Please check your Spark package and the underlying Hadoop installation for the correct version.

Please also note that:
- the schema of the URL referencing the columnar index depends on the actual S3 file system implementation: it's `s3://` on EMR but `s3a://` when using [s3a](https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html#Introducing_the_Hadoop_S3A_client.).
Expand All @@ -171,7 +175,8 @@ Please also note that:
Below an example call to count words in 10 WARC records host under the `.is` top-level domain using the `--packages` option:
```
$SPARK_HOME/bin/spark-submit \
--packages org.apache.hadoop:hadoop-aws:3.3.2 \
--packages org.apache.hadoop:hadoop-aws:3.3.4 \
--conf spark.sql.parquet.mergeSchema=true \
./cc_index_word_count.py \
--input_base_url s3://commoncrawl/ \
--query "SELECT url, warc_filename, warc_record_offset, warc_record_length, content_charset FROM ccindex WHERE crawl = 'CC-MAIN-2020-24' AND subset = 'warc' AND url_host_tld = 'is' LIMIT 10" \
Expand All @@ -195,7 +200,7 @@ Alternatively, it's possible configure the table schema explicitly:
- and use it by adding the command-line argument `--table_schema cc-index-schema-flat.json`.


### Using FastWARC to parse WARC files
### Using FastWARC to read WARC files

> [FastWARC](https://resiliparse.chatnoir.eu/en/latest/man/fastwarc.html) is a high-performance WARC parsing library for Python written in C++/Cython. The API is inspired in large parts by WARCIO, but does not aim at being a drop-in replacement.

Expand All @@ -209,6 +214,20 @@ Some differences between the warcio and FastWARC APIs are hidden from the user i

However, it's recommended that you carefully verify that your custom job implementation works in combination with FastWARC. There are subtle differences between the warcio and FastWARC APIs, including the underlying classes (WARC/HTTP headers and stream implementations). In addition, FastWARC does not support for legacy ARC files and does not automatically decode HTTP content and transfer encodings (see [Resiliparse HTTP Tools](https://resiliparse.chatnoir.eu/en/latest/man/parse/http.html#read-chunked-http-payloads)). While content and transfer encodings are already decoded in Common Crawl WARC files, this may not be the case for WARC files from other sources. See also [WARC 1.1 specification, http/https response records](https://iipc.github.io/warc-specifications/specifications/warc-format/warc-1.1/#http-and-https-schemes).

FastWARC allows to filter unwanted WARC record types at parse time, e.g., skip request records immediately not even passing them forward to the caller. To get the maximum performance from FastWARC, it's recommended to utilize the filters by setting the static class variable `fastwarc_record_filter`.

The following examples are ported to use FastWARC:
+ [count HTML tags](./html_tag_count_fastwarc.py)
+ [count web server names](./server_count_fastwarc.py)
+ list host names and corresponding [IP addresses](./server_ip_address_fastwarc.py)
+ [word count](./word_count_fastwarc.py)

In addition, the following tools are implemented using FastWARC:
+ [extract host-level links](./hostlinks_extract_fastwarc.py)
+ [WET extractor](./wet_extractor.py)

Please refer to the above [description of examples](#common-crawl-pyspark-examples) for additional details.


## Credits

Expand Down
18 changes: 18 additions & 0 deletions html_tag_count_fastwarc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from fastwarc.warc import WarcRecordType

from sparkcc_fastwarc import CCFastWarcSparkJob
from html_tag_count import TagCountJob


class TagCountFastWarcJob(TagCountJob, CCFastWarcSparkJob):
""" Count HTML tag names in Common Crawl WARC files
using FastWARC to read WARC files"""

name = "TagCount"

fastwarc_record_filter = WarcRecordType.response


if __name__ == '__main__':
job = TagCountFastWarcJob()
job.run()
24 changes: 24 additions & 0 deletions md5sum.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import subprocess

from sparkcc import CCFileProcessorSparkJob
from pyspark.sql.types import StructType, StructField, StringType


class MD5Sum(CCFileProcessorSparkJob):
"""MD5 sum of each file, calling the command-line utility 'md5sum'"""

name = "MD5Sum"

output_schema = StructType([
StructField("uri", StringType(), True),
StructField("md5", StringType(), True),
])

def process_file(self, uri, tempfd):
proc = subprocess.run(['md5sum', tempfd.name], capture_output=True, check=True, encoding='utf8')
digest = proc.stdout.rstrip().split()[0]
yield uri, digest

if __name__ == '__main__':
job = MD5Sum()
job.run()
2 changes: 1 addition & 1 deletion server_count_fastwarc.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
class ServerCountFastWarcJob(ServerCountJob, CCFastWarcSparkJob):
""" Count server names sent in HTTP response header
(WARC and WAT is allowed as input) using FastWARC
to parse WARC files"""
to read WARC files"""

name = "CountServers"

Expand Down
22 changes: 22 additions & 0 deletions server_ip_address_fastwarc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from fastwarc.warc import WarcRecordType

from sparkcc_fastwarc import CCFastWarcSparkJob
from server_ip_address import ServerIPAddressJob


class ServerIPAddressFastWarcJob(ServerIPAddressJob, CCFastWarcSparkJob):
""" Collect server IP addresses from WARC response records
(WARC and WAT is allowed as input) using FastWARC
to parse WARC files"""

name = "ServerIPAddresses"

# process only WARC request or metadata (including WAT) records
# Note: restrict the filter accordingly, depending on whether
# WARC or WAT files are used
fastwarc_record_filter = WarcRecordType.request | WarcRecordType.metadata


if __name__ == "__main__":
job = ServerIPAddressFastWarcJob()
job.run()
Loading