Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 109 additions & 0 deletions ssg/rule_yaml.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,3 +175,112 @@ def get_section_lines(file_path, file_contents, key_name):
return section[0]

return None


def get_line_whitespace(line):
"""
Get the exact whitespace used at the start of this line.
"""
stripped_line = line.lstrip()
delta = len(line) - len(stripped_line)
return line[:delta]


def guess_section_whitespace(file_contents, section_range, default=' '):
"""
Hack: we need to figure out how much whitespace to add when adding a new key to
an existing section. Since different files might be parsed differently, take the
minimum key's whitespace length in this section.
"""
whitespace = None
for line_num in range(section_range.start+1, section_range.end):
line = file_contents[line_num]
if line and ':' in line:
# Assume this is a key, so update our assumptions of whitespace. We ignore
# non-key lines.
this_whitespace = get_line_whitespace(line)

# Only take it if we have _less_ whitespace (to avoid dealing with nested
# sections) or if we have no whitespace yet.
if whitespace is None or len(this_whitespace) < len(whitespace):
whitespace = this_whitespace

# If we don't have any whitespace, use the default to show the YAML parser it
# is a nested section.
if whitespace is None:
whitespace = default

return whitespace


def add_or_modify_nested_section_key(file_path, file_contents, section_title,
key, value, new_section_after_if_missing=None):
"""
Either modify an existing nested section key (in key: value) form or
add it if missing. Optionally, take a section and add our new section
after the existing section.
"""
new_contents = file_contents[:]
section = get_section_lines(file_path, file_contents, section_title)

if not section:
if not new_section_after_if_missing:
msg = "File %s lacks all instances of section %s; refusing to modify file."
msg = msg.format(file_path, section)
raise ValueError(msg)

previous_section = get_section_lines(file_path, file_contents,
new_section_after_if_missing)
if not previous_section:
msg = "File %s lacks all instances of sections %s and %s; refusing to modify file."
msg = msg.format(file_path, section, new_section_after_if_missing)
raise ValueError(msg)

new_section_header = get_line_whitespace(file_contents[previous_section.start])
new_section_header += section_title + ':'
new_section_kv = guess_section_whitespace(file_contents, previous_section)
new_section_kv += key + ': ' + value

new_section = [new_section_header, new_section_kv, '']

tmp_contents = new_contents[:previous_section.end+1]
tmp_contents += new_section
tmp_contents += new_contents[previous_section.end+1:]
new_contents = tmp_contents

return new_contents

# Nasty hacky assumption: assume key is 'unique' within the section and we can
# ignore whitespaces issues with this approach. Also assume (and validate!) that
# : does not appear in the key. This allows us to split the line by ':' and take
# the first as the actual key in the file.
assert ':' not in key
key_match = ' ' + key + ':'

found = None
for line_num in range(section.start, section.end+1):
line = file_contents[line_num]
if key_match in line:
if found:
msg = "Expected to only have key {0} appear once in file, but appeared "
msg += "twice: once on line {1} and once on line {2}."
msg = msg.format(key, found, line_num)
raise ValueError(msg)

# Preserve leading whitespace. :-)
key_prefix = line.split(':', maxsplit=1)[0]
new_line = key_prefix + ': ' + value
new_contents[line_num] = new_line
found = line

if not found:
# Be lazy and add it right after the section heading. Worst case we'll just
# come back and sort the section at a later time.
whitespace = guess_section_whitespace(file_contents, section)
new_line = whitespace + key + ': ' + value
tmp_contents = new_contents[:section.start+1]
tmp_contents += [new_line]
tmp_contents += new_contents[section.start+1:]
new_contents = tmp_contents

return new_contents
258 changes: 258 additions & 0 deletions utils/autorefer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
#!/usr/bin/env python

import sys
import os
import argparse
import json

import ssg.build_yaml
import ssg.products
import ssg.rules
import ssg.yaml
import ssg.utils
import ssg.rule_yaml

from refchecker import load_for_product

SSG_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
SECTION_KEY_FUNC = ssg.rule_yaml.add_or_modify_nested_section_key


"""
Nicely laid out profiles have a structure matching their corresponding
benchmarks. I'm thinking the CIS and STIG profiles here.

Let's formalize that structure a bit.

Given a profile with format:

```yaml
selection:
# <identifier> description
- rule_choice
```

Where one or more comments precede one or more rules, and the closest
comment that matches the given format for a reference identifier wins.

Let's take this as an example:

```yaml
## 5.3 Configure PAM ##
### 5.3.1 Ensure password creation requirements are configured (Automated)
- var_password_pam_minlen=14
- accounts_password_pam_minlen
- var_password_pam_minclass=4
- accounts_password_pam_minclass
```

Here both accounts_password_pam_minlen and
accounts_password_pam_minclass should get CIS reference value 5.3.1. The
other two entries are vars, and since 5.3 is further away than 5.3.1,
5.3.1 should win.

Some ground rules:

- We should avoid guessing when possible.
- We should create minimal diffs.
- Some rules lack a references section; we should add them in that
case.
- If we're not sure, ignore the rule and print info telling the caller
about it.
- Rules that don't belong to a section shouldn't be in the profile!
- If we're adding a reference, don't clutter other products! Only do
our current product.
"""


def parse_args():
parser = argparse.ArgumentParser(description="Utility to parse a given profile and "
"automatically add or update a given reference "
"in all included rules")
parser.add_argument("-j", "--json", type=str, action="store",
default="build/rule_dirs.json", help="File to read "
"json output of rule_dir_json from (defaults to "
"build/rule_dirs.json")
parser.add_argument("-c", "--build-config-yaml", default="build/build_config.yml",
help="YAML file with information about the build configuration. "
"Defaults to build/build_config.yml")
parser.add_argument("-p", "--profiles-root",
help="Override where to look for profile files.")
parser.add_argument("product", type=str, help="Product to check has required references")
parser.add_argument("profile", type=str, help="Profile to iterate over")
parser.add_argument("reference", type=str,
help="Required reference system to automatically add")

return parser.parse_args()


def find_value_line(lines, value):
# Hack: within the lines in a file, return the line number matching
# the given value. We assume a "nice" file.

matches = []
for index, line in enumerate(lines):
no_trailing_comment = line.split('#', 1)[0].strip()
if no_trailing_comment.endswith(value):
matches.append(index)
if no_trailing_comment.endswith(value + '"'):
matches.append(index)
if no_trailing_comment.endswith(value + "'"):
matches.append(index)

if len(matches) > 1 or not matches:
msg = "While searching for pattern `{0}` in file lines, got no or "
msg += "several matches: {1}"
msg = msg.format(value, matches)
raise ValueError(msg)

return matches[0]


def is_reference_identifier_comment(line, reference):
stripped = line.strip()
if not stripped.startswith('#'):
return False, None

# Sometimes we add lots of nested comment symbols to show depth of a
# section. Handle that nicely.
no_comment_symbol = stripped[1:].strip()
while no_comment_symbol.startswith('#'):
no_comment_symbol = no_comment_symbol[1:].strip()

# Assume the initial token now is the reference identifier's value.
ref_identifier = no_comment_symbol.split(' ', 1)[0].strip()

# Try and validate our identifier based on what reference system we have.
# Currently the only one we know of is CIS.
if reference == 'cis':
valid_id = '.' in ref_identifier and not ref_identifier.lower().islower()
if ref_identifier.endswith('.'):
# We might've copied an extra period after our reference identifier;
# handle trimming it nicely.
ref_identifier = ref_identifier[:-1]
if valid_id:
return True, ref_identifier
return False, None

return False, ref_identifier


def reference_add(env_yaml, rule_dirs, profile_path, product, reference):
profile = ssg.build_yaml.ProfileWithInlinePolicies.from_yaml(profile_path, env_yaml)
profile_lines = ssg.utils.read_file_list(profile_path)

updated = False
for rule_id in profile.selected + profile.unselected:
if rule_id not in rule_dirs:
msg = "Unable to find rule in rule_dirs.json: {0}"
msg = msg.format(rule_id)
raise ValueError(msg)

rule_obj = rule_dirs[rule_id]
rule = load_for_product(rule_obj, product, env_yaml=env_yaml)

# Now we're attempting to parse the profile file and see if we can't
# determine the correct reference identifier to add.
rule_line_num = find_value_line(profile_lines, rule_id)

# Maximum delta (inclusive) to search from the current point to find a
# matching reference identifier. This is from experimental evidence (see
# the accounts_password_pam_retry rule).
MAX_DELTA = 20
ref_id = None

for delta in range(1, MAX_DELTA+1):
abs_line_num = rule_line_num - delta
line = profile_lines[abs_line_num]

# Only use this reference if we're absolutely sure.
valid, ref_id = is_reference_identifier_comment(line, reference)
if not valid and ref_id:
msg = "Got suspected reference identifier {0} on line {1}, but due to "
msg += "unknown reference system {2}, cannot confirm. Refusing to add."
msg = msg.format(ref_id, abs_line_num, reference)
print(msg, file=sys.stderr)
ref_id = None
break
if valid and ref_id:
break

if not ref_id:
msg = "Unknown reference identifier for rule {0}; ignoring."
msg = msg.format(rule_id)
print(msg, file=sys.stderr)
continue

# Now we definitely have a reference identifier. We have three cases:
#
# 1. Our reference identifier is correct; don't need to do anything.
# 2. We need to update our reference identifier; it was wrong in the rule.yml.
# 3. We don't have a reference identifier in the rule.yml and we need to add one.

if reference in rule.references and rule.references[reference] == ref_id:
print("ok", rule_id, ref_id)
continue

# Load the 'raw' rule.yml file and get the lines corresponding with the references
# section.
rule_path, rule_lines = ssg.rule_yaml.get_yaml_contents(rule_obj)

# Here, we make a judgement call. If we're modifying a product reference,
# only add a product-qualified value.
reference_key = reference
if reference in ssg.build_yaml.Rule.PRODUCT_REFERENCES:
reference_key += "@" + product

# Lastly, some post-processing magic. When we have a CIS identifier with only a
# single period, it is going to get picked up as a float, so quote it.
if reference == 'cis' and ref_id.count('.') == 1:
ref_id = "'" + ref_id + "'"

print("Updating " + rule_id + " to include " + reference_key + ": " + ref_id)

new_lines = SECTION_KEY_FUNC(rule_path, rule_lines, 'references', reference_key,
ref_id, new_section_after_if_missing='identifiers')

if new_lines != rule_lines:
ssg.utils.write_list_file(rule_path, new_lines)
updated = True

return updated


def main():
args = parse_args()

json_file = open(args.json, 'r')
all_rules = json.load(json_file)

linux_products, other_products = ssg.products.get_all(SSG_ROOT)
all_products = linux_products.union(other_products)
if args.product not in all_products:
msg = "Unknown product {0}: check SSG_ROOT and try again"
msg = msg.format(args.product)
raise ValueError(msg)

product_base = os.path.join(SSG_ROOT, args.product)
product_yaml = os.path.join(product_base, "product.yml")
env_yaml = ssg.yaml.open_environment(args.build_config_yaml, product_yaml)

profiles_root = os.path.join(product_base, "profiles")
if args.profiles_root:
profiles_root = args.profiles_root

profile_filename = args.profile + ".profile"
profile_path = os.path.join(profiles_root, profile_filename)
if not os.path.exists(profile_path):
msg = "Unknown profile {0}: check profile, --profiles-root, and try again"
msg = msg.format(args.profile)
raise ValueError(msg)

updated = reference_add(env_yaml, all_rules, profile_path, args.product, args.reference)
if updated:
print("One or more rules were modified to add missing references.", file=sys.stderr)


if __name__ == "__main__":
main()