| #!/usr/bin/env python3 |
| import argparse |
| import logging |
| import os |
| import re |
| import typing |
| |
| |
| def main(): |
| parser = argparse.ArgumentParser() |
| parser.add_argument("logfile", help="The log file from CQ to ingest") |
| parser.add_argument( |
| "-v", "--verbose", |
| action="store_true", |
| help="Add debug logging.") |
| args = parser.parse_args() |
| |
| logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO) |
| with open(args.logfile, "r") as f: |
| _fix_expectation_files(f.read()) |
| |
| |
| def _remove_ansi(line: str) -> str: |
| ansi_escape = re.compile(r'(?:\x1B[@-_]|[\x80-\x9F])[0-?]*[ -/]*[@-~]') |
| return ansi_escape.sub('', line) |
| |
| def _fix_expectation_files(content: typing.Text) -> None: |
| filename = None |
| lines = [] |
| filename_to_content = {} |
| for i, line in enumerate(content.split("\n")): |
| # Handle cases where we don't get a clean finishing line |
| # because of color codes. |
| line = _remove_ansi(line) |
| if line == "}\n": |
| lines.append(line) |
| content = "\n".join(lines) |
| if filename in filename_to_content: |
| if filename_to_content[filename] != content: |
| _write_expectation(filename + ".1", |
| filename_to_content[filename]) |
| _write_expectation(filename + ".2", content) |
| raise Exception("Whoa, got two conflicting contents.") |
| else: |
| filename_to_content[filename] = content |
| filename = None |
| lines = [] |
| continue |
| |
| if filename: |
| if not line: |
| lines.append("") |
| logging.debug( |
| "Finished reading corrected version of %s " |
| "at %d lines at line %d", |
| filename, len(lines), i) |
| content = "\n".join(lines) |
| |
| if filename in filename_to_content: |
| if filename_to_content[filename] != content: |
| _write_expectation(filename + ".1", |
| filename_to_content[filename]) |
| _write_expectation(filename + ".2", content) |
| raise Exception("Whoa, got two conflicting contents.") |
| else: |
| filename_to_content[filename] = content |
| filename = None |
| lines = [] |
| else: |
| lines.append(line) |
| else: |
| match = re.match( |
| "^Correct expectation file (?P<filename>.*):", line) |
| if match: |
| filename = match.group("filename") |
| logging.debug("Found corrected version of '%s'", filename) |
| for filename, file_content in filename_to_content.items(): |
| _write_expectation(filename, file_content) |
| |
| def _write_expectation(filename: typing.Text, content: str) -> None: |
| fullpath = os.path.abspath(os.path.join( |
| os.path.dirname(__file__), |
| "..", |
| "slave", |
| "recipe_expectations", |
| filename, |
| )) |
| logging.info("Writing %s with %d lines", fullpath, len(content.split("\n"))) |
| with open(fullpath, "w") as f: |
| f.write(content) |
| |
| if __name__ == "__main__": |
| main() |