-
Notifications
You must be signed in to change notification settings - Fork 90
Add CCFileProcessorSparkJob to support file-wise processing #45
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Thanks for the contribution, @jt55401! First, I understand the use case: be able to process any kind of file, not only WARC files and derivatives (WAT, WET).
Of course, this is already possible using CCSparkJob as base class by overriding the method run_job(...). For example, HostLinksToGraph reads and writes from/to Parquet. From CCSparkJob it uses only command-line parsing / option processing, logging definitions and optional profiling.
Good catch! And very well expressed! This needs to be put into the README because it's one of the central design decisions. I never really thought about it, just ported it from cc-mrjob. I do not know why @Smerity decided to use a manifest while most big data tools read the input list from command-line arguments. I see one advantage of the manifest: it's easy to select a (larger) random sample. Using command-line arguments you quickly may reach the system limit on the maximum argument length when passing, say, 1k paths to WARC files as arguments. One question: What's the rationale for using a NamedTemporyFile? Being able to share the content as a file with other processes? ... and one remarks which should be addressed: 90% of the code lines in sparkccfile.py are copied unmodified from sparkcc.py. This complicates maintenance because contributors may forget to implement a bug fix or improvement in both files. Two suggestions how to reduce the code duplication:
In any case: the job / class should be listed in the README, maybe together with a simple example. |
@sebastian-nagel - thank you for the review, it's greatly appreciated.
Yes - that is exactly right. Some parts of the jobs we run use external tools, so we need a file in the outside world.
Yes - we are commonly processing 250,000-1,000,000+ files in a run. (Example: all the wet/wat files for an entire year of crawls)
Ah, very astute. I will review these options and update the PR with a refactor.
Yes, no problem. |
OK @sebastian-nagel - I'm reasonably happy with this version. The only slight downside is due to the way it's packaged, we now have to depend on warcio when it's not really needed. I'm not a deep expert in python modularization, so, if there is a clever way to fix this while preserving the cleanliness of this refactor, please let me know - otherwise, I'm fine leaving it. Let me know if you have any other feedback. |
I've since enhanced this further with 3 more functions:
These are mostly convenience functions which do as they each say for local file paths or S3 paths. |
…d in integrity job)
8aeca80
to
33b4416
Compare
properly count successfully or failed processed files
Rename command-line option `--output_base_uri` to `--output_base_url` following the equivalen option `--input_base_url`. Improve logging of counters / aggregators.
- remove unused method `validate_s3_bucket_from_uri` - remove code checking for output location on http(s) because it's not implemented to write output over HTTP
and CCFileProcessorSparkJob to reduce duplicated code
location uses an unsupported scheme - fail for hdfs:// input and output, not implemented yet. - fail for http:// or https:// output.
Add example tool relying on CCFileProcessorSparkJob extracting text content from WARC HTML records and storing the texts in WET files. For every WARC file one WET file is created. The WET record locations are written to the output table.
The changes were successfully tested using Spark 3.5.6 in local mode for all implemented examples and tools. |
Not sure if I am qualified to review this but it generally looks good at a first glance. However, I did not run all the modified scripts myself. Can you share example commands that are expected to work? At least, it seems to break the example from the README (or is this warning expected?):
What about adding some unit test to ensure all examples work (especially on all the different platforms)? |
Add schema merging option to example command-line call.
Hi @malteos, thanks for testing. Well, the error is because the S3A configuration requests a long value but gets In Hadoop 3.3.2 there is the suspicious line
which is changed in more recent Hadoop versions to
It's required that the version of the hadoop-aws package fits the version of the Hadoop used to run the Spark job. I've tried the job with Hadoop 3.3.6 and Spark 3.5.6 with success. I've updated the README to use a more recent Hadoop version and also added the option to use schema merging (description in the README, next section). |
@sebastian-nagel I managed to run the example with Should we add this to the
To run the command locally, I also needed to add this Spark config to the command:
Also, I needed to explicitly export the AWS secrets as ENV vars. Maybe we should add something like this to the README (even though it's already explained in the Hadoop docs): export AWS_ACCESS_KEY_ID=$(aws configure get aws_access_key_id)
export AWS_SECRET_ACCESS_KEY=$(aws configure get aws_secret_access_key)
export AWS_SESSION_TOKEN=$(aws configure get aws_session_token) # if using temporary credentials |
Excellent! Thanks for trying again!
No. Spark is usually already installed as part of a Hadoop distribution, e.g., EMR. The pyspark module is also no replacement for installing Spark. For the other two points: there is no way to exhaustively explain the setup of Spark and parameters to submit Spark jobs. The README includes links to the Spark documentation. Same for AWS authentication: it's complex both for boto3 and hadoop-aws - I think both are also linked in the README. Passing credentials as environment variables is one of the less preferable ways, IAM roles are better. But in the end it's about the security of the user. They should read the boto3 and hadoop-aws documentation carefully. But if the README does not get to the point here, feel free to improve it. Thanks! |
For some spark jobs, we want to process an entire file at one time.
I copied and simplified sparkcc to do this.
This is used in the upcoming integrity process.