diff --git a/.github/workflows/python-tests.yml b/.github/workflows/python-tests.yml new file mode 100644 index 0000000..93f0dd8 --- /dev/null +++ b/.github/workflows/python-tests.yml @@ -0,0 +1,21 @@ +name: Python Tests + +on: [push, pull_request] + +jobs: + test: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v3 + + - name: Set up Python + uses: actions/setup-python@v4 + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + + - name: Run tests + run: python -m unittest discover -s test diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ed8ebf5 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +__pycache__ \ No newline at end of file diff --git a/README.md b/README.md index 936c906..ba32399 100644 --- a/README.md +++ b/README.md @@ -3,9 +3,7 @@ The name is pretty self explanatory. It's a Python script for updating type A DNS records at one.com. ## Required Packages -- `requests` -- `json` -- `sys` +Check requirements.txt ## Usage At the very top of the Script there are some customization options and variables for your one.com control panel login credentials. diff --git a/dockerfile b/dockerfile new file mode 100644 index 0000000..1ffaddd --- /dev/null +++ b/dockerfile @@ -0,0 +1,18 @@ +FROM python:3-alpine AS base +FROM base AS builder + +RUN mkdir /install +WORKDIR /install +COPY requirements.txt /requirements.txt +RUN pip install --no-binary=:all: --target=/install -r /requirements.txt + +FROM base +COPY --from=builder /install /usr/local + +WORKDIR /app + +COPY . . +RUN pip3 install -r requirements.txt + +ENTRYPOINT ["python3", "one_com_ddns.py"] +CMD [] \ No newline at end of file diff --git a/modules/dns_utils.py b/modules/dns_utils.py new file mode 100644 index 0000000..3de27f8 --- /dev/null +++ b/modules/dns_utils.py @@ -0,0 +1,69 @@ +# modules/dns_utils.py +import dns.resolver +import dns.exception +import logging + +logger = logging.getLogger("one_com_ddns") + +def get_authoritative_ns(domain): + """ + Recursively finds the authoritative nameservers for a given domain. + Tries parent domains if no NS records are found or in case of timeout or other DNS errors + (except for NXDOMAIN, where it stops and indicates domain does not exist). + + Args: + domain (str): The domain to query. + + Returns: + list: A list of authoritative nameserver hostnames, or None if not found or domain does not exist. + """ + try: + answers = dns.resolver.resolve(domain, 'NS') + return [data.to_text() for data in answers] + except dns.resolver.NoAnswer: + exception_type = "NoAnswer" + except dns.exception.Timeout: + exception_type = "Timeout" + except dns.resolver.NXDOMAIN: + exception_type = "NXDOMAIN" + except dns.exception.DNSException as e: + exception_type = f"DNSException: {e}" + + # Handle exceptions that might indicate trying parent domain + if exception_type in ["NoAnswer", "Timeout", "DNSException", "NXDOMAIN"]: + parts = domain.split('.') + if len(parts) > 2: # Check if there's a parent domain + parent_domain = '.'.join(parts[1:]) + return get_authoritative_ns(parent_domain) + else: + logger.warning(f"No authoritative NS for {domain} and no parent domain to try after exception: {exception_type}") + return None # No parent domain, give up + else: # Should not reach here, but as a fallback. For completeness. + logger.error(f"Failed to get NS for {domain} due to: {exception_type}") + return None + +def get_ip_and_ttl(domain, ns_servers=None): + """ + Gets the IP address and TTL of a domain from the authoritative nameservers. + + Args: + domain (str): The domain to query. + optional ns_servers (list): A list of authoritative nameserver hostnames. + + Returns: + tuple: A tuple containing the IP address and TTL, or None if not found. + """ + if ns_servers is None: + ns_servers = get_authoritative_ns(domain) + if ns_servers is None: + return None + resolver = dns.resolver.Resolver() + tmp = [dns.resolver.resolve(ns, 'A')[0].address for ns in ns_servers] + resolver.nameservers = tmp + try: + answers = resolver.resolve(domain, 'A') + for rdata in answers: + return rdata.address, answers.ttl + except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN, dns.exception.Timeout, dns.exception.DNSException) as e: + logger.warning(f"Error getting IP and TTL for {domain}: {e}") + return None \ No newline at end of file diff --git a/modules/logger.py b/modules/logger.py new file mode 100644 index 0000000..1c3a21c --- /dev/null +++ b/modules/logger.py @@ -0,0 +1,35 @@ +# modules/logger.py +import logging + +GRAY = '\033[90m' +RED = '\033[91m' +YELLOW = '\033[93m' +RESET = '\033[0m' + +class ColoredFormatter(logging.Formatter): + """A formatter that adds colors based on log level.""" + + def format(self, record): + log_level = record.levelname + if log_level == "DEBUG": + log_level_colored = f"{GRAY}[DEBUG]{RESET}" + elif log_level == "WARNING": + log_level_colored = f"{YELLOW}[WARN]{RESET}" + elif log_level == "ERROR": + log_level_colored = f"{RED}[ERROR]{RESET}" + else: + log_level_colored = f"[{log_level}]" # No color for INFO + + return logging.Formatter(f'{log_level_colored} %(message)s').format(record) + +def setup_logging(level=logging.INFO): + """Sets up centralized logging for the application.""" + formatter = ColoredFormatter() + handler = logging.StreamHandler() + handler.setFormatter(formatter) + + logger = logging.getLogger("one_com_ddns") + logger.setLevel(level) + logger.addHandler(handler) + + return logger \ No newline at end of file diff --git a/modules/one_com_api.py b/modules/one_com_api.py new file mode 100644 index 0000000..0e29674 --- /dev/null +++ b/modules/one_com_api.py @@ -0,0 +1,99 @@ +# modules/one_com_api.py +import requests +import json +import tldextract +import logging + +logger = logging.getLogger("one_com_ddns") + +def _find_between(haystack, needle1, needle2): + index1 = haystack.find(needle1) + len(needle1) + index2 = haystack.find(needle2, index1 + 1) + return haystack[index1 : index2] + +def login_session(username, password): + logger.info("Logging in...") + session = requests.session() + redirect_url = "https://www.one.com/admin/" + try: + r = session.get(redirect_url) + except requests.ConnectionError: + logger.error("Connection to one.com failed.") + raise SystemExit("Connection to one.com failed.") + + post_url = _find_between(r.text, '