diff --git a/.github/workflows/python-tests.yml b/.github/workflows/python-tests.yml new file mode 100644 index 0000000..93f0dd8 --- /dev/null +++ b/.github/workflows/python-tests.yml @@ -0,0 +1,21 @@ +name: Python Tests + +on: [push, pull_request] + +jobs: + test: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v3 + + - name: Set up Python + uses: actions/setup-python@v4 + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + + - name: Run tests + run: python -m unittest discover -s test diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ed8ebf5 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +__pycache__ \ No newline at end of file diff --git a/README.md b/README.md index 936c906..ba32399 100644 --- a/README.md +++ b/README.md @@ -3,9 +3,7 @@ The name is pretty self explanatory. It's a Python script for updating type A DNS records at one.com. ## Required Packages -- `requests` -- `json` -- `sys` +Check requirements.txt ## Usage At the very top of the Script there are some customization options and variables for your one.com control panel login credentials. diff --git a/dockerfile b/dockerfile new file mode 100644 index 0000000..1ffaddd --- /dev/null +++ b/dockerfile @@ -0,0 +1,18 @@ +FROM python:3-alpine AS base +FROM base AS builder + +RUN mkdir /install +WORKDIR /install +COPY requirements.txt /requirements.txt +RUN pip install --no-binary=:all: --target=/install -r /requirements.txt + +FROM base +COPY --from=builder /install /usr/local + +WORKDIR /app + +COPY . . +RUN pip3 install -r requirements.txt + +ENTRYPOINT ["python3", "one_com_ddns.py"] +CMD [] \ No newline at end of file diff --git a/modules/dns_utils.py b/modules/dns_utils.py new file mode 100644 index 0000000..3de27f8 --- /dev/null +++ b/modules/dns_utils.py @@ -0,0 +1,69 @@ +# modules/dns_utils.py +import dns.resolver +import dns.exception +import logging + +logger = logging.getLogger("one_com_ddns") + +def get_authoritative_ns(domain): + """ + Recursively finds the authoritative nameservers for a given domain. + Tries parent domains if no NS records are found or in case of timeout or other DNS errors + (except for NXDOMAIN, where it stops and indicates domain does not exist). + + Args: + domain (str): The domain to query. + + Returns: + list: A list of authoritative nameserver hostnames, or None if not found or domain does not exist. + """ + try: + answers = dns.resolver.resolve(domain, 'NS') + return [data.to_text() for data in answers] + except dns.resolver.NoAnswer: + exception_type = "NoAnswer" + except dns.exception.Timeout: + exception_type = "Timeout" + except dns.resolver.NXDOMAIN: + exception_type = "NXDOMAIN" + except dns.exception.DNSException as e: + exception_type = f"DNSException: {e}" + + # Handle exceptions that might indicate trying parent domain + if exception_type in ["NoAnswer", "Timeout", "DNSException", "NXDOMAIN"]: + parts = domain.split('.') + if len(parts) > 2: # Check if there's a parent domain + parent_domain = '.'.join(parts[1:]) + return get_authoritative_ns(parent_domain) + else: + logger.warning(f"No authoritative NS for {domain} and no parent domain to try after exception: {exception_type}") + return None # No parent domain, give up + else: # Should not reach here, but as a fallback. For completeness. + logger.error(f"Failed to get NS for {domain} due to: {exception_type}") + return None + +def get_ip_and_ttl(domain, ns_servers=None): + """ + Gets the IP address and TTL of a domain from the authoritative nameservers. + + Args: + domain (str): The domain to query. + optional ns_servers (list): A list of authoritative nameserver hostnames. + + Returns: + tuple: A tuple containing the IP address and TTL, or None if not found. + """ + if ns_servers is None: + ns_servers = get_authoritative_ns(domain) + if ns_servers is None: + return None + resolver = dns.resolver.Resolver() + tmp = [dns.resolver.resolve(ns, 'A')[0].address for ns in ns_servers] + resolver.nameservers = tmp + try: + answers = resolver.resolve(domain, 'A') + for rdata in answers: + return rdata.address, answers.ttl + except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN, dns.exception.Timeout, dns.exception.DNSException) as e: + logger.warning(f"Error getting IP and TTL for {domain}: {e}") + return None \ No newline at end of file diff --git a/modules/logger.py b/modules/logger.py new file mode 100644 index 0000000..1c3a21c --- /dev/null +++ b/modules/logger.py @@ -0,0 +1,35 @@ +# modules/logger.py +import logging + +GRAY = '\033[90m' +RED = '\033[91m' +YELLOW = '\033[93m' +RESET = '\033[0m' + +class ColoredFormatter(logging.Formatter): + """A formatter that adds colors based on log level.""" + + def format(self, record): + log_level = record.levelname + if log_level == "DEBUG": + log_level_colored = f"{GRAY}[DEBUG]{RESET}" + elif log_level == "WARNING": + log_level_colored = f"{YELLOW}[WARN]{RESET}" + elif log_level == "ERROR": + log_level_colored = f"{RED}[ERROR]{RESET}" + else: + log_level_colored = f"[{log_level}]" # No color for INFO + + return logging.Formatter(f'{log_level_colored} %(message)s').format(record) + +def setup_logging(level=logging.INFO): + """Sets up centralized logging for the application.""" + formatter = ColoredFormatter() + handler = logging.StreamHandler() + handler.setFormatter(formatter) + + logger = logging.getLogger("one_com_ddns") + logger.setLevel(level) + logger.addHandler(handler) + + return logger \ No newline at end of file diff --git a/modules/one_com_api.py b/modules/one_com_api.py new file mode 100644 index 0000000..0e29674 --- /dev/null +++ b/modules/one_com_api.py @@ -0,0 +1,99 @@ +# modules/one_com_api.py +import requests +import json +import tldextract +import logging + +logger = logging.getLogger("one_com_ddns") + +def _find_between(haystack, needle1, needle2): + index1 = haystack.find(needle1) + len(needle1) + index2 = haystack.find(needle2, index1 + 1) + return haystack[index1 : index2] + +def login_session(username, password): + logger.info("Logging in...") + session = requests.session() + redirect_url = "https://www.one.com/admin/" + try: + r = session.get(redirect_url) + except requests.ConnectionError: + logger.error("Connection to one.com failed.") + raise SystemExit("Connection to one.com failed.") + + post_url = _find_between(r.text, '
environment variables > None (if unset). + + Key configuration parameters: + - username (-u, --username): one.com username (defaults to None if unset) + - password (-p, --password): one.com password (defaults to None if unset) + - domains (-d, --domains): List of domain names (e.g.,-d example.com example2.com) (defaults to None if unset) + - ip (-i, --ip): IP address source ('AUTO', 'ARG', or IP) (defaults to None if unset) + - force-update (-f, --force-update): Force DNS update (defaults to None if unset) + - ttl (-t, --ttl): TTL for DNS records (defaults to None if unset) + - skip-confirmation (--skip-confirmation): Skip confirmation prompts (defaults to False if unset) + + Returns: + argparse.Namespace: Object containing configuration parameters. + + Raises: + ValueError: If validate_required is True and username, password, or domain are None + after parsing. + SystemExit: If automatic IP retrieval fails when 'AUTO' is selected + or if no IP address is provided as a command-line argument + when 'ARG' is selected. + """ + parser = argparse.ArgumentParser(description="one.com DDNS Script Configuration") + + parser.add_argument("-u", "--username", help="one.com username", default=os.environ.get("USERNAME")) + parser.add_argument("-p", "--password", help="one.com password", default=os.environ.get("PASSWORD")) + env_domains = os.environ.get("DOMAINS") + if env_domains is not None: + env_domains = env_domains.split(',') + + parser.add_argument("-d", "--domains", nargs="+", help="List of domain names (e.g.,-d example.com example2.com)", default=env_domains) + parser.add_argument("-i", "--ip", help="IP address ('AUTO', or IP)", default=os.environ.get("IP", "AUTO")) + + env_force = os.environ.get("FORCE_DNS_UPDATE") + if env_force is not None: + env_force = env_force.lower() + + parser.add_argument("-f", "--force-update", action="store_true", help="Force DNS update (skip IP check)", default=env_force) + parser.add_argument("-t", "--ttl", type=int, help="TTL value for DNS records", default=os.environ.get("TTL")) + parser.add_argument("-y", "--skip-confirmation", action="store_true", help="Skip confirmation prompts", default=os.environ.get("SKIP_CONFIRMATION")) + + args = parser.parse_args() + + # Basic validation (ONLY IF validate_required is True) + if validate_required: + if not args.username: + raise ValueError("Username is required (command-line or USERNAME env var)") + if not args.password: + raise ValueError("Password is required (command-line or PASSWORD env var)") + if not args.domains: + raise ValueError("Domain is required (command-line or DOMAIN env var)") + + # Handle IP address retrieval + if args.ip == "AUTO": + try: + args.ip = requests.get("https://api.ipify.org/").text + except requests.ConnectionError: + logger.error("Failed to get IP Address from ipify") + raise SystemExit("Failed to get IP Address from ipify") + logger.info(f"Detected external IP: {args.ip}") + + return args \ No newline at end of file diff --git a/modules/update_utils.py b/modules/update_utils.py new file mode 100644 index 0000000..9efb4b4 --- /dev/null +++ b/modules/update_utils.py @@ -0,0 +1,35 @@ +# modules/update_utils.py +import sys +import modules.one_com_api as one_com_api +import logging + +logger = logging.getLogger("one_com_ddns") + +def _confirm_proceed(prompt: str) -> bool: + """ + Prompt for confirmation unless running in a non-interactive shell. + """ + if not sys.stdin.isatty(): + logger.info("Non-interactive shell detected; proceeding without user confirmation.") + return True + response = input(prompt) + return response.strip().lower() == 'y' + +def update_domain(s, domain, records, ip, ttl, current_dns_ip=None, skip_confirmation=False): + """ + Updates the DNS record for a given domain, handling confirmation and logging. + """ + record_obj = one_com_api.find_id_by_subdomain(records, domain) + if record_obj is None: + logger.error(f"Record '{domain}' could not be found.") + return + + if current_dns_ip: + logger.warning(f"Changing IP for {domain} from {current_dns_ip} to {ip} with TTL {ttl}.") + else: + logger.info(f"Changing IP for {domain} to {ip} with TTL {ttl}.") + + if skip_confirmation or _confirm_proceed("Do you want to proceed? (y/n): "): + one_com_api.change_ip(s, record_obj, domain, ip, ttl) + else: + logger.info(f"Update for {domain} cancelled.") diff --git a/one_com_ddns.py b/one_com_ddns.py index e273c92..8b136b3 100644 --- a/one_com_ddns.py +++ b/one_com_ddns.py @@ -1,13 +1,12 @@ ''' - ################################# - ## ## - ## one.com DDNS Script ## - ## ## + ## ## + ## one.com DDNS Script ## + ## ## ################################# - | Version | 2.4 | + | Version | 2.4 | +--------------+----------------+ - | Last Updated | 2023-10-05 | + | Last Updated | 2023-10-05 | +--------------+----------------+ +----------------+-------------------------+ @@ -17,8 +16,6 @@ +----------------+-------------------------+ - - Note: This script is not very fail proof. Very few possible exceptions are handled, something as simple @@ -27,168 +24,67 @@ If you have any problems or suggestions, please open an issue on github or send me an email (main@lugico.de) - ''' +import modules.dns_utils as dns_utils +import modules.one_com_config as config +import modules.one_com_api as one_com_api +import modules.logger as logger_module +import modules.update_utils as update_utils +logger = logger_module.setup_logging() # Setup logging -# YOUR ONE.COM LOGIN -USERNAME="email.address@example.com" -PASSWORD="Your Beautiful Password" - -# YOUR DOMAIN ( NOT www.example.com, only example.com )" -DOMAIN="example.com" - -# LIST OF SUBDOMAINS YOU WANT POINTING TO YOUR IP -SUBDOMAINS = ["myddns"] -# SUBDOMAINS = ["mutiple", "subdomains"] - - -# YOUR IP ADDRESS. -IP='AUTO' -# '127.0.0.1' -> IP Address -# 'AUTO' -> Automatically detect using ipify.org -# 'ARG' -> Read from commandline argument ($ python3 ddns.py 127.0.0.1) - - -# CHECK IF IP ADDRESS HAS CHANGED SINCE LAST SCRIPT EXECUTION? -CHECK_IP_CHANGE = True -# True = only continue when IP has changed -# False = always continue - -# PATH WHERE THE LAST IP SHOULD BE SAVED INBETWEEN SCRIPT EXECUTIONS -# not needed CHECK_IP_CHANGE is false -LAST_IP_FILE = "lastip.txt" - - -import requests -import json -import sys - -if IP == 'AUTO': - print("Fetching IP Address...") - try: - IP = requests.get("https://api.ipify.org/").text - except requests.ConnectionError: - raise SystemExit("Failed to get IP Address from ipify") - print(f"Detected IP: {IP}") -elif IP == 'ARG': - if (len(sys.argv) < 2): - raise SystemExit('No IP Address provided in commandline arguments') - else: - IP = sys.argv[1] - -if CHECK_IP_CHANGE: - try: - # try to read file - with open(LAST_IP_FILE,"r") as f: - if (IP == f.read()): - # abort if ip in file is same as current - print("IP Address hasn't changed. Aborting") - exit() - except IOError: - pass - - # write current ip to file - with open(LAST_IP_FILE,"w") as f: - f.write(IP) - - -def findBetween(haystack, needle1, needle2): - index1 = haystack.find(needle1) + len(needle1) - index2 = haystack.find(needle2, index1 + 1) - return haystack[index1 : index2] - - -# will create a requests session and log you into your one.com account in that session -def loginSession(USERNAME, PASSWORD, TARGET_DOMAIN=''): - print("Logging in...") - - # create requests session - session = requests.session() - - # get admin panel to be redirected to login page - redirectmeurl = "https://www.one.com/admin/" - try: - r = session.get(redirectmeurl) - except requests.ConnectionError: - raise SystemExit("Connection to one.com failed.") - - # find url to post login credentials to from form action attribute - substrstart = '