Skip to content

Conversation

jt55401
Copy link

@jt55401 jt55401 commented Jul 31, 2024

For some spark jobs, we want to process an entire file at one time.
I copied and simplified sparkcc to do this.
This is used in the upcoming integrity process.

@jt55401 jt55401 requested a review from sebastian-nagel July 31, 2024 15:57
@sebastian-nagel
Copy link
Contributor

Thanks for the contribution, @jt55401!

First, I understand the use case: be able to process any kind of file, not only WARC files and derivatives (WAT, WET).
Yes, is is "more generic" and allows to cover more use cases. Including those where you want to

  • process a WARC file without using a WARC parser
  • use a custom output format.

Of course, this is already possible using CCSparkJob as base class by overriding the method run_job(...). For example, HostLinksToGraph reads and writes from/to Parquet. From CCSparkJob it uses only command-line parsing / option processing, logging definitions and optional profiling.

A Spark job definition to process individual files from a manifest.

Good catch! And very well expressed! This needs to be put into the README because it's one of the central design decisions. I never really thought about it, just ported it from cc-mrjob. I do not know why @Smerity decided to use a manifest while most big data tools read the input list from command-line arguments. I see one advantage of the manifest: it's easy to select a (larger) random sample. Using command-line arguments you quickly may reach the system limit on the maximum argument length when passing, say, 1k paths to WARC files as arguments.

One question: What's the rationale for using a NamedTemporyFile? Being able to share the content as a file with other processes?

... and one remarks which should be addressed: 90% of the code lines in sparkccfile.py are copied unmodified from sparkcc.py. This complicates maintenance because contributors may forget to implement a bug fix or improvement in both files.

Two suggestions how to reduce the code duplication:

  1. CCSparkJob inherits from CCFileProcessorSparkJob

    • (preferred variant, although more work; expected to remove more duplicated code)
    • move the definition of CCFileProcessorSparkJob into sparkcc.py
      • makes the deployment easier
      • avoids that the deployment for existing setups is broken by the changes below
    • remove duplicated code from CCSparkJob
      • keep only code / methods specific for WARC file processing
    • (difficult) fetch_warc to call fetch_file where applicable
      • the method "fetch_warc" is complex (120+ lines of code)
      • "fetch_file" mostly duplicates 70 lines
      • ideally
  2. CCFileProcessorSparkJob inherits from CCSparkJob

    • (easier to implement)
    • cf. above and HostLinksToGraph
    • basically, only fetch_file(...) and run_job(...) are then implemented by CCFileProcessorSparkJob
    • could move the definition also into sparkcc.py for easier deployment

In any case: the job / class should be listed in the README, maybe together with a simple example.

@jt55401
Copy link
Author

jt55401 commented Aug 2, 2024

@sebastian-nagel - thank you for the review, it's greatly appreciated.

NamedTemporyFile

Yes - that is exactly right. Some parts of the jobs we run use external tools, so we need a file in the outside world.

Manifests vs. command line args

Yes - we are commonly processing 250,000-1,000,000+ files in a run. (Example: all the wet/wat files for an entire year of crawls)

Two suggestions how to reduce the code duplication:

Ah, very astute. I will review these options and update the PR with a refactor.

In any case: the job / class should be listed in the README, maybe together with a simple example.

Yes, no problem.

@jt55401 jt55401 changed the title Add sparkccfile.py to support file-wise processing Add CCFileProcessorSparkJob to support file-wise processing Aug 3, 2024
@jt55401
Copy link
Author

jt55401 commented Aug 3, 2024

OK @sebastian-nagel - I'm reasonably happy with this version.

The only slight downside is due to the way it's packaged, we now have to depend on warcio when it's not really needed. I'm not a deep expert in python modularization, so, if there is a clever way to fix this while preserving the cleanliness of this refactor, please let me know - otherwise, I'm fine leaving it.

Let me know if you have any other feedback.

@jt55401
Copy link
Author

jt55401 commented Sep 10, 2024

I've since enhanced this further with 3 more functions:

  • validate_s3_bucket_from_uri
  • check_for_output_file
  • write_output_file

These are mostly convenience functions which do as they each say for local file paths or S3 paths.
This further makes writing jobs which process entire files, and output NEW files to locations other than the default spark result table (which I've been using more as an audit log for such file-wise processing jobs)

                      properly count successfully or failed processed files
Rename command-line option `--output_base_uri` to
`--output_base_url` following the equivalen option
`--input_base_url`.

Improve logging of counters / aggregators.
- remove unused method `validate_s3_bucket_from_uri`
- remove code checking for output location on http(s)
  because it's not implemented to write output over HTTP
and CCFileProcessorSparkJob to reduce duplicated code
                 location uses an unsupported scheme

- fail for hdfs:// input and output, not implemented yet.
- fail for http:// or https:// output.
Add example tool relying on CCFileProcessorSparkJob extracting
text content from WARC HTML records and storing the texts in WET files.
For every WARC file one WET file is created. The WET record locations
are written to the output table.
@sebastian-nagel
Copy link
Contributor

  • Rebased the PR branch on top of the current main branch.
  • Refactored the new CCFileProcessorSparkJob to reduce the amount of duplicated code: pulled out common code blocks into utility methods shared among CCSparkJob and CCFileProcessorSparkJob and used both for reading and writing files.
  • Added an example tool which makes use of CCFileProcessorSparkJob to write output to per-map files while still writing to the output table. It's a WET extractor tool based on Resiliparse:
    • writes WET files, one per WARC input
    • output table is an index of URL and WET record locations
    • the text extraction is good:
      • some structure preserved (list, enumerations: helps esp. for nested lists)
      • adds image alt text
      • overal, good text extraction, at a first glance slightly better than the WETExtractor in ia-web-commons
  • Improved documentation.

The changes were successfully tested using Spark 3.5.6 in local mode for all implemented examples and tools.
Testing in distributed mode needs to be done.

@malteos
Copy link

malteos commented Sep 22, 2025

Not sure if I am qualified to review this but it generally looks good at a first glance. However, I did not run all the modified scripts myself. Can you share example commands that are expected to work?

At least, it seems to break the example from the README (or is this warning expected?):

$SPARK_HOME/bin/spark-submit \
    --packages org.apache.hadoop:hadoop-aws:3.3.2 \
    ./cc_index_word_count.py \
    --input_base_url s3://commoncrawl/ \
    --query "SELECT url, warc_filename, warc_record_offset, warc_record_length, content_charset FROM ccindex WHERE crawl = 'CC-MAIN-2020-24' AND subset = 'warc' AND url_host_tld = 'is' LIMIT 10" \
    s3a://commoncrawl/cc-index/table/cc-main/warc/ \
    myccindexwordcountoutput \
    --num_output_partitions 1 \
    --output_format json
5/09/22 13:45:58 WARN FileSystem: Failed to initialize filesystem s3a://commoncrawl/cc-index/table/cc-main/warc: java.lang.NumberFormatException: For input string: "60s"
25/09/22 13:45:58 WARN FileStreamSink: Assume no metadata directory. Error while looking for metadata directory in the path: s3a://commoncrawl/cc-index/table/cc-main/warc/.
java.lang.NumberFormatException: For input string: "60s"
        at java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:67)
        at java.base/java.lang.Long.parseLong(Long.java:709)
        at java.base/java.lang.Long.parseLong(Long.java:832)
        at org.apache.hadoop.conf.Configuration.getLong(Configuration.java:1607)
        at org.apache.hadoop.fs.s3a.S3AUtils.longOption(S3AUtils.java:1024)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.initThreadPools(S3AFileSystem.java:719)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:498)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3615)
        at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:172)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3716)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3667)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:557)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:366)
        at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:55)
        at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:381)
        at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.org$apache$spark$sql$catalyst$analysis$ResolveDataSource$$loadV1BatchSource(ResolveDataSource.scala:143)
        at org.apache.spark.sql.catalyst.analysis.ResolveDataSource$$anonfun$apply$1.$anonfun$applyOrElse$2(ResolveDataSource.scala:61)
        at scala.Option.getOrElse(Option.scala:201)

What about adding some unit test to ensure all examples work (especially on all the different platforms)?

Add schema merging option to example command-line call.
@sebastian-nagel
Copy link
Contributor

sebastian-nagel commented Sep 24, 2025

Hi @malteos, thanks for testing.

Well, the error is because the S3A configuration requests a long value but gets 60s.

In Hadoop 3.3.2 there is the suspicious line

    long keepAliveTime = longOption(conf, KEEPALIVE_TIME,
        DEFAULT_KEEPALIVE_TIME, 0);

which is changed in more recent Hadoop versions to

    // keepalive time takes a time suffix; default unit is seconds
    long keepAliveTime = ConfigurationHelper.getDuration(conf,
            KEEPALIVE_TIME,
            Duration.ofSeconds(DEFAULT_KEEPALIVE_TIME),
            TimeUnit.SECONDS,
            Duration.ZERO).getSeconds();

It's required that the version of the hadoop-aws package fits the version of the Hadoop used to run the Spark job.

I've tried the job with Hadoop 3.3.6 and Spark 3.5.6 with success. I've updated the README to use a more recent Hadoop version and also added the option to use schema merging (description in the README, next section).

@malteos
Copy link

malteos commented Sep 25, 2025

@sebastian-nagel I managed to run the example with pyspark==3.5.6.

Should we add this to the requirementst.txt to make the version requirements more explicit?

pyspark>=3.2.3,<=3.5.6

To run the command locally, I also needed to add this Spark config to the command:

 --conf spark.driver.host=localhost  --conf spark.driver.bindAddress=localhost

Also, I needed to explicitly export the AWS secrets as ENV vars. Maybe we should add something like this to the README (even though it's already explained in the Hadoop docs):

  export AWS_ACCESS_KEY_ID=$(aws configure get aws_access_key_id)
  export AWS_SECRET_ACCESS_KEY=$(aws configure get aws_secret_access_key)
  export AWS_SESSION_TOKEN=$(aws configure get aws_session_token)  # if using temporary credentials

@sebastian-nagel
Copy link
Contributor

I managed to run the example with pyspark==3.5.6.

Excellent! Thanks for trying again!

Should we add this to the requirementst.txt to make the version requirements more explicit?

No. Spark is usually already installed as part of a Hadoop distribution, e.g., EMR. The pyspark module is also no replacement for installing Spark.

For the other two points: there is no way to exhaustively explain the setup of Spark and parameters to submit Spark jobs. The README includes links to the Spark documentation. Same for AWS authentication: it's complex both for boto3 and hadoop-aws - I think both are also linked in the README. Passing credentials as environment variables is one of the less preferable ways, IAM roles are better. But in the end it's about the security of the user. They should read the boto3 and hadoop-aws documentation carefully. But if the README does not get to the point here, feel free to improve it. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants