From ea0f5843259a6a93aa2d1e8889870b94aa5411eb Mon Sep 17 00:00:00 2001 From: Nephim Date: Wed, 26 Feb 2025 18:27:02 +0100 Subject: [PATCH 1/6] Added modules for new functionality --- .gitignore | 1 + modules/dns_utils.py | 69 ++++++++++++++++++++++++++++++++++++ modules/logger.py | 34 ++++++++++++++++++ modules/one_com_config.py | 74 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 178 insertions(+) create mode 100644 .gitignore create mode 100644 modules/dns_utils.py create mode 100644 modules/logger.py create mode 100644 modules/one_com_config.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ed8ebf5 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +__pycache__ \ No newline at end of file diff --git a/modules/dns_utils.py b/modules/dns_utils.py new file mode 100644 index 0000000..3de27f8 --- /dev/null +++ b/modules/dns_utils.py @@ -0,0 +1,69 @@ +# modules/dns_utils.py +import dns.resolver +import dns.exception +import logging + +logger = logging.getLogger("one_com_ddns") + +def get_authoritative_ns(domain): + """ + Recursively finds the authoritative nameservers for a given domain. + Tries parent domains if no NS records are found or in case of timeout or other DNS errors + (except for NXDOMAIN, where it stops and indicates domain does not exist). + + Args: + domain (str): The domain to query. + + Returns: + list: A list of authoritative nameserver hostnames, or None if not found or domain does not exist. + """ + try: + answers = dns.resolver.resolve(domain, 'NS') + return [data.to_text() for data in answers] + except dns.resolver.NoAnswer: + exception_type = "NoAnswer" + except dns.exception.Timeout: + exception_type = "Timeout" + except dns.resolver.NXDOMAIN: + exception_type = "NXDOMAIN" + except dns.exception.DNSException as e: + exception_type = f"DNSException: {e}" + + # Handle exceptions that might indicate trying parent domain + if exception_type in ["NoAnswer", "Timeout", "DNSException", "NXDOMAIN"]: + parts = domain.split('.') + if len(parts) > 2: # Check if there's a parent domain + parent_domain = '.'.join(parts[1:]) + return get_authoritative_ns(parent_domain) + else: + logger.warning(f"No authoritative NS for {domain} and no parent domain to try after exception: {exception_type}") + return None # No parent domain, give up + else: # Should not reach here, but as a fallback. For completeness. + logger.error(f"Failed to get NS for {domain} due to: {exception_type}") + return None + +def get_ip_and_ttl(domain, ns_servers=None): + """ + Gets the IP address and TTL of a domain from the authoritative nameservers. + + Args: + domain (str): The domain to query. + optional ns_servers (list): A list of authoritative nameserver hostnames. + + Returns: + tuple: A tuple containing the IP address and TTL, or None if not found. + """ + if ns_servers is None: + ns_servers = get_authoritative_ns(domain) + if ns_servers is None: + return None + resolver = dns.resolver.Resolver() + tmp = [dns.resolver.resolve(ns, 'A')[0].address for ns in ns_servers] + resolver.nameservers = tmp + try: + answers = resolver.resolve(domain, 'A') + for rdata in answers: + return rdata.address, answers.ttl + except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN, dns.exception.Timeout, dns.exception.DNSException) as e: + logger.warning(f"Error getting IP and TTL for {domain}: {e}") + return None \ No newline at end of file diff --git a/modules/logger.py b/modules/logger.py new file mode 100644 index 0000000..3747ba1 --- /dev/null +++ b/modules/logger.py @@ -0,0 +1,34 @@ +import logging + +GRAY = '\033[90m' +RED = '\033[91m' +YELLOW = '\033[93m' +RESET = '\033[0m' + +class ColoredFormatter(logging.Formatter): + """A formatter that adds colors based on log level.""" + + def format(self, record): + log_level = record.levelname + if log_level == "DEBUG": + log_level_colored = f"{GRAY}[DEBUG]{RESET}" + elif log_level == "WARNING": + log_level_colored = f"{YELLOW}[WARN]{RESET}" + elif log_level == "ERROR": + log_level_colored = f"{RED}[ERROR]{RESET}" + else: + log_level_colored = f"[{log_level}]" # No color for INFO + + return logging.Formatter(f'{log_level_colored} %(message)s').format(record) + +def setup_logging(level=logging.INFO): + """Sets up centralized logging for the application.""" + formatter = ColoredFormatter() + handler = logging.StreamHandler() + handler.setFormatter(formatter) + + logger = logging.getLogger("one_com_ddns") + logger.setLevel(level) + logger.addHandler(handler) + + return logger \ No newline at end of file diff --git a/modules/one_com_config.py b/modules/one_com_config.py new file mode 100644 index 0000000..b0b434e --- /dev/null +++ b/modules/one_com_config.py @@ -0,0 +1,74 @@ +# modules/one_com_config.py +import os +import argparse +import requests +import logging + +logger = logging.getLogger("one_com_ddns") + +def parse_config(validate_required=True): + """ + Parses configuration for the one.com DDNS script from command-line arguments, + environment variables (ONECOM_*), and returns None as default if no value is set. + + Configuration is prioritized: command-line arguments > environment variables > None (if unset). + + Key configuration parameters: + - username (-u, --username): one.com username (defaults to None if unset) + - password (-p, --password): one.com password (defaults to None if unset) + - domains (-d, --domains): List of domain names (e.g.,-d example.com example2.com) (defaults to None if unset) + - ip (-i, --ip): IP address source ('AUTO', 'ARG', or IP) (defaults to None if unset) + - force-update (-f, --force-update): Force DNS update (defaults to None if unset) + - ttl (-t, --ttl): TTL for DNS records (defaults to None if unset) + - skip-confirmation (--skip-confirmation): Skip confirmation prompts (defaults to False if unset) + + Returns: + argparse.Namespace: Object containing configuration parameters. + + Raises: + ValueError: If validate_required is True and username, password, or domain are None + after parsing. + SystemExit: If automatic IP retrieval fails when 'AUTO' is selected + or if no IP address is provided as a command-line argument + when 'ARG' is selected. + """ + parser = argparse.ArgumentParser(description="one.com DDNS Script Configuration") + + parser.add_argument("-u", "--username", help="one.com username", default=os.environ.get("ONECOM_USERNAME")) + parser.add_argument("-p", "--password", help="one.com password", default=os.environ.get("ONECOM_PASSWORD")) + env_onecome_domains = os.environ.get("ONECOM_DOMAINS") + if env_onecome_domains is not None: + env_onecome_domains = env_onecome_domains.split(',') + + parser.add_argument("-d", "--domains", nargs="+", help="List of domain names (e.g.,-d example.com example2.com)", default=env_onecome_domains) + parser.add_argument("-i", "--ip", help="IP address ('AUTO', or IP)", default=os.environ.get("ONECOM_IP", "AUTO")) + + env_onecom_force = os.environ.get("ONECOM_FORCE_DNS_UPDATE") + if env_onecom_force is not None: + env_onecom_force = env_onecom_force.lower() + + parser.add_argument("-f", "--force-update", action="store_true", help="Force DNS update (skip IP check)", default=env_onecom_force) + parser.add_argument("-t", "--ttl", type=int, help="TTL value for DNS records", default=os.environ.get("ONECOM_TTL")) + parser.add_argument("-y", "--skip-confirmation", action="store_true", help="Skip confirmation prompts", default=os.environ.get("ONECOM_SKIP_CONFIRMATION")) + + args = parser.parse_args() + + # Basic validation (ONLY IF validate_required is True) + if validate_required: + if not args.username: + raise ValueError("Username is required (command-line or ONECOM_USERNAME env var)") + if not args.password: + raise ValueError("Password is required (command-line or ONECOM_PASSWORD env var)") + if not args.domains: + raise ValueError("Domain is required (command-line or ONECOM_DOMAIN env var)") + + # Handle IP address retrieval + if args.ip == "AUTO": + try: + args.ip = requests.get("https://api.ipify.org/").text + except requests.ConnectionError: + logger.error("Failed to get IP Address from ipify") + raise SystemExit("Failed to get IP Address from ipify") + logger.info(f"Detected external IP: {args.ip}") + + return args \ No newline at end of file From e1514e0e292517d6be00412998ae7f9d6e8bc621 Mon Sep 17 00:00:00 2001 From: Nephim Date: Wed, 26 Feb 2025 18:27:25 +0100 Subject: [PATCH 2/6] Refactored script to use new modules functionality --- modules/one_com_api.py | 93 ++++++++++++++ one_com_ddns.py | 266 +++++++++++++++-------------------------- 2 files changed, 190 insertions(+), 169 deletions(-) create mode 100644 modules/one_com_api.py diff --git a/modules/one_com_api.py b/modules/one_com_api.py new file mode 100644 index 0000000..cec53ea --- /dev/null +++ b/modules/one_com_api.py @@ -0,0 +1,93 @@ +# modules/one_com_api.py +import requests +import json +import tldextract +import logging + +logger = logging.getLogger("one_com_ddns") + +def _find_between(haystack, needle1, needle2): + index1 = haystack.find(needle1) + len(needle1) + index2 = haystack.find(needle2, index1 + 1) + return haystack[index1 : index2] + +def login_session(username, password): + logger.info("Logging in...") + session = requests.session() + redirect_url = "https://www.one.com/admin/" + try: + r = session.get(redirect_url) + except requests.ConnectionError: + logger.error("Connection to one.com failed.") + raise SystemExit("Connection to one.com failed.") + + post_url = _find_between(r.text, '' + with patch('requests.sessions.Session.post') as mock_post: + mock_post.return_value.text = "Success" + session = one_com_api.login_session("testuser", "testpass") + self.assertIsNotNone(session) + with patch('requests.sessions.Session.get') as mock_get: + mock_get.side_effect = requests.ConnectionError + with self.assertRaises(SystemExit): + one_com_api.login_session("testuser", "testpass") + + def test_one_com_api_get_custom_records(self): + mock_session = Mock() + mock_response = Mock() + mock_response.raise_for_status.return_value = None + mock_response.text = '{"result": {"data": [{"id": "123", "attributes": {"prefix": "test"}} ]}}' + mock_session.get.return_value = mock_response + + records = one_com_api.get_custom_records(mock_session, "test.com") + self.assertIsNotNone(records) + self.assertEqual(len(records), 1) + self.assertEqual(records[0]['id'], "123") + + mock_session.get.side_effect = requests.exceptions.RequestException("Test Exception") + raised_system_exit = False + try: + one_com_api.get_custom_records(mock_session, "test.com") + except SystemExit: + raised_system_exit = True + self.assertTrue(raised_system_exit, "SystemExit was not raised") + + mock_session.get.side_effect = None + mock_response.text = '{"result": {"data": "invalid json"}}' + mock_response.json.side_effect = json.JSONDecodeError("msg", "doc", 0) + mock_session.get.return_value = mock_response + with self.assertRaises(SystemExit): + one_com_api.get_custom_records(mock_session, "test.com") + + def test_one_com_api_find_id_by_subdomain(self): + records = [{"id": "1", "attributes": {"prefix": "sub"}}, {"id": "2", "attributes": {"prefix": "other"}}] + record_obj = one_com_api.find_id_by_subdomain(records, "sub.test.com") + self.assertEqual(record_obj['id'], "1") + + record_obj_not_found = one_com_api.find_id_by_subdomain(records, "nonexistent.test.com") + self.assertIsNone(record_obj_not_found) + + def test_one_com_api_change_ip(self): + mock_session = Mock() + mock_response = Mock() + mock_response.raise_for_status.return_value = None + mock_session.patch.return_value = mock_response + + record = {"id": "123", "attributes": {"ttl": 300}} + one_com_api.change_ip(mock_session, record, "sub.test.com", "1.2.3.4", 600) + + mock_session.patch.side_effect = requests.exceptions.RequestException("Test Exception") + with self.assertRaises(SystemExit): + one_com_api.change_ip(mock_session, record, "sub.test.com", "1.2.3.4", 600) + + def test_dns_utils_get_authoritative_ns(self): + ns_servers = dns_utils.get_authoritative_ns("google.com") + self.assertIsNotNone(ns_servers) + self.assertIsInstance(ns_servers, list) + self.assertGreater(len(ns_servers), 0) + + ns_servers_invalid = dns_utils.get_authoritative_ns("invalid-domain-for-testing.com") + self.assertIsNone(ns_servers_invalid) + + def test_dns_utils_get_authoritative_ns_parent(self): + ns_servers = dns_utils.get_authoritative_ns("sub.google.com") + self.assertIsNotNone(ns_servers) + self.assertIsInstance(ns_servers, list) + self.assertGreater(len(ns_servers), 0) + +if __name__ == '__main__': + unittest.main() \ No newline at end of file From 1ec0e78584b32ec9d04b5dcbaf8fe96e2809b869 Mon Sep 17 00:00:00 2001 From: Nephim Date: Wed, 26 Feb 2025 19:07:19 +0100 Subject: [PATCH 4/6] Removed v 10 requirement --- .github/workflows/python-tests.yml | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/.github/workflows/python-tests.yml b/.github/workflows/python-tests.yml index cfde358..e1a8c47 100644 --- a/.github/workflows/python-tests.yml +++ b/.github/workflows/python-tests.yml @@ -9,10 +9,8 @@ jobs: steps: - uses: actions/checkout@v3 - - name: Set up Python 3.10 + - name: Set up Python uses: actions/setup-python@v4 - with: - python-version: 3.10 - name: Install dependencies run: | From c4e70ba51bc54c3c0b39c0229a40b1c708e293aa Mon Sep 17 00:00:00 2001 From: Nephim Date: Wed, 26 Feb 2025 19:08:44 +0100 Subject: [PATCH 5/6] Fixed run command --- .github/workflows/python-tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/python-tests.yml b/.github/workflows/python-tests.yml index e1a8c47..93f0dd8 100644 --- a/.github/workflows/python-tests.yml +++ b/.github/workflows/python-tests.yml @@ -18,4 +18,4 @@ jobs: pip install -r requirements.txt - name: Run tests - run: python -m unittest discover + run: python -m unittest discover -s test From 754180ae6de503a97f98726e30050ddaade316e7 Mon Sep 17 00:00:00 2001 From: Nephim Date: Wed, 26 Feb 2025 23:12:52 +0100 Subject: [PATCH 6/6] Implemented dockerfile --- README.md | 4 +--- dockerfile | 18 +++++++++++++++++ modules/one_com_config.py | 34 ++++++++++++++++---------------- modules/update_utils.py | 35 +++++++++++++++++++++++++++++++++ one_com_ddns.py | 41 +++++---------------------------------- requirements.txt | 2 +- 6 files changed, 77 insertions(+), 57 deletions(-) create mode 100644 dockerfile create mode 100644 modules/update_utils.py diff --git a/README.md b/README.md index 936c906..ba32399 100644 --- a/README.md +++ b/README.md @@ -3,9 +3,7 @@ The name is pretty self explanatory. It's a Python script for updating type A DNS records at one.com. ## Required Packages -- `requests` -- `json` -- `sys` +Check requirements.txt ## Usage At the very top of the Script there are some customization options and variables for your one.com control panel login credentials. diff --git a/dockerfile b/dockerfile new file mode 100644 index 0000000..1ffaddd --- /dev/null +++ b/dockerfile @@ -0,0 +1,18 @@ +FROM python:3-alpine AS base +FROM base AS builder + +RUN mkdir /install +WORKDIR /install +COPY requirements.txt /requirements.txt +RUN pip install --no-binary=:all: --target=/install -r /requirements.txt + +FROM base +COPY --from=builder /install /usr/local + +WORKDIR /app + +COPY . . +RUN pip3 install -r requirements.txt + +ENTRYPOINT ["python3", "one_com_ddns.py"] +CMD [] \ No newline at end of file diff --git a/modules/one_com_config.py b/modules/one_com_config.py index b0b434e..afaf4f4 100644 --- a/modules/one_com_config.py +++ b/modules/one_com_config.py @@ -9,7 +9,7 @@ def parse_config(validate_required=True): """ Parses configuration for the one.com DDNS script from command-line arguments, - environment variables (ONECOM_*), and returns None as default if no value is set. + environment variables, and returns None as default if no value is set. Configuration is prioritized: command-line arguments > environment variables > None (if unset). @@ -34,33 +34,33 @@ def parse_config(validate_required=True): """ parser = argparse.ArgumentParser(description="one.com DDNS Script Configuration") - parser.add_argument("-u", "--username", help="one.com username", default=os.environ.get("ONECOM_USERNAME")) - parser.add_argument("-p", "--password", help="one.com password", default=os.environ.get("ONECOM_PASSWORD")) - env_onecome_domains = os.environ.get("ONECOM_DOMAINS") - if env_onecome_domains is not None: - env_onecome_domains = env_onecome_domains.split(',') + parser.add_argument("-u", "--username", help="one.com username", default=os.environ.get("USERNAME")) + parser.add_argument("-p", "--password", help="one.com password", default=os.environ.get("PASSWORD")) + env_domains = os.environ.get("DOMAINS") + if env_domains is not None: + env_domains = env_domains.split(',') - parser.add_argument("-d", "--domains", nargs="+", help="List of domain names (e.g.,-d example.com example2.com)", default=env_onecome_domains) - parser.add_argument("-i", "--ip", help="IP address ('AUTO', or IP)", default=os.environ.get("ONECOM_IP", "AUTO")) + parser.add_argument("-d", "--domains", nargs="+", help="List of domain names (e.g.,-d example.com example2.com)", default=env_domains) + parser.add_argument("-i", "--ip", help="IP address ('AUTO', or IP)", default=os.environ.get("IP", "AUTO")) - env_onecom_force = os.environ.get("ONECOM_FORCE_DNS_UPDATE") - if env_onecom_force is not None: - env_onecom_force = env_onecom_force.lower() + env_force = os.environ.get("FORCE_DNS_UPDATE") + if env_force is not None: + env_force = env_force.lower() - parser.add_argument("-f", "--force-update", action="store_true", help="Force DNS update (skip IP check)", default=env_onecom_force) - parser.add_argument("-t", "--ttl", type=int, help="TTL value for DNS records", default=os.environ.get("ONECOM_TTL")) - parser.add_argument("-y", "--skip-confirmation", action="store_true", help="Skip confirmation prompts", default=os.environ.get("ONECOM_SKIP_CONFIRMATION")) + parser.add_argument("-f", "--force-update", action="store_true", help="Force DNS update (skip IP check)", default=env_force) + parser.add_argument("-t", "--ttl", type=int, help="TTL value for DNS records", default=os.environ.get("TTL")) + parser.add_argument("-y", "--skip-confirmation", action="store_true", help="Skip confirmation prompts", default=os.environ.get("SKIP_CONFIRMATION")) args = parser.parse_args() # Basic validation (ONLY IF validate_required is True) if validate_required: if not args.username: - raise ValueError("Username is required (command-line or ONECOM_USERNAME env var)") + raise ValueError("Username is required (command-line or USERNAME env var)") if not args.password: - raise ValueError("Password is required (command-line or ONECOM_PASSWORD env var)") + raise ValueError("Password is required (command-line or PASSWORD env var)") if not args.domains: - raise ValueError("Domain is required (command-line or ONECOM_DOMAIN env var)") + raise ValueError("Domain is required (command-line or DOMAIN env var)") # Handle IP address retrieval if args.ip == "AUTO": diff --git a/modules/update_utils.py b/modules/update_utils.py new file mode 100644 index 0000000..9efb4b4 --- /dev/null +++ b/modules/update_utils.py @@ -0,0 +1,35 @@ +# modules/update_utils.py +import sys +import modules.one_com_api as one_com_api +import logging + +logger = logging.getLogger("one_com_ddns") + +def _confirm_proceed(prompt: str) -> bool: + """ + Prompt for confirmation unless running in a non-interactive shell. + """ + if not sys.stdin.isatty(): + logger.info("Non-interactive shell detected; proceeding without user confirmation.") + return True + response = input(prompt) + return response.strip().lower() == 'y' + +def update_domain(s, domain, records, ip, ttl, current_dns_ip=None, skip_confirmation=False): + """ + Updates the DNS record for a given domain, handling confirmation and logging. + """ + record_obj = one_com_api.find_id_by_subdomain(records, domain) + if record_obj is None: + logger.error(f"Record '{domain}' could not be found.") + return + + if current_dns_ip: + logger.warning(f"Changing IP for {domain} from {current_dns_ip} to {ip} with TTL {ttl}.") + else: + logger.info(f"Changing IP for {domain} to {ip} with TTL {ttl}.") + + if skip_confirmation or _confirm_proceed("Do you want to proceed? (y/n): "): + one_com_api.change_ip(s, record_obj, domain, ip, ttl) + else: + logger.info(f"Update for {domain} cancelled.") diff --git a/one_com_ddns.py b/one_com_ddns.py index ecd7479..8b136b3 100644 --- a/one_com_ddns.py +++ b/one_com_ddns.py @@ -29,7 +29,8 @@ import modules.dns_utils as dns_utils import modules.one_com_config as config import modules.one_com_api as one_com_api -import modules.logger as logger_module # Import the logger module +import modules.logger as logger_module +import modules.update_utils as update_utils logger = logger_module.setup_logging() # Setup logging @@ -67,12 +68,9 @@ for DOMAIN in DOMAINS: print() logger.info(f"Processing domain: {DOMAIN}") - one_com_api.select_admin_domain(s, DOMAIN) # Select domain at the beginning of each domain loop - - # get dns records for the current domain + one_com_api.select_admin_domain(s, DOMAIN) records = one_com_api.get_custom_records(s, DOMAIN) - # Check current IP from DNS logger.info(f"Attempting to get current DNS IP for: {DOMAIN}") current_dns_ip_info = dns_utils.get_ip_and_ttl(DOMAIN) @@ -84,38 +82,9 @@ logger.info(f"IP Address hasn't changed for {DOMAIN}. Aborting update for this domain.") continue - # change ip address - record_obj = one_com_api.find_id_by_subdomain(records, DOMAIN) - if record_obj is None: - logger.error(f"Record '{DOMAIN}' could not be found.") - continue - - # Ask for confirmation before changing - logger.warning(f"Changing IP for {DOMAIN} from {current_dns_ip} to {IP} with TTL {TTL}.") - if settings.skip_confirmation: - one_com_api.change_ip(s, record_obj, DOMAIN, IP, TTL) - else: - confirmation = input("Do you want to proceed? (y/n): ") - if confirmation.lower() == 'y': - one_com_api.change_ip(s, record_obj, DOMAIN, IP, TTL) - else: - logger.info(f"Update for {DOMAIN} cancelled.") - + update_utils.update_domain(s, DOMAIN, records, IP, TTL, current_dns_ip, SKIP_CONFIRMATION) else: logger.warning(f"Could not retrieve current DNS IP for {DOMAIN} after multiple retries. Proceeding with update anyway.") - record_obj = one_com_api.find_id_by_subdomain(records, DOMAIN) - if record_obj is None: - logger.error(f"Record '{DOMAIN}' could not be found.") - continue - - logger.info(f"Changing IP for {DOMAIN} to {IP} with TTL {TTL}.") - if settings.skip_confirmation: - one_com_api.change_ip(s, record_obj, DOMAIN, IP, TTL) - else: - confirmation = input("Do you want to proceed? (y/n): ") - if confirmation.lower() == 'y': - one_com_api.change_ip(s, record_obj, DOMAIN, IP, TTL) - else: - logger.info(f"Update for {DOMAIN} cancelled.") + update_utils.update_domain(s, DOMAIN, records, IP, TTL, None, SKIP_CONFIRMATION) logger.info("DDNS update process completed.") \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 76b81cc..3450811 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ requests tldextract -dnspython +dnspython \ No newline at end of file