Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions .github/workflows/python-tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
name: Python Tests

on: [push, pull_request]

jobs:
test:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v3

- name: Set up Python
uses: actions/setup-python@v4

- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt

- name: Run tests
run: python -m unittest discover -s test
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__pycache__
4 changes: 1 addition & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@
The name is pretty self explanatory. It's a Python script for updating type A DNS records at one.com.

## Required Packages
- `requests`
- `json`
- `sys`
Check requirements.txt

## Usage
At the very top of the Script there are some customization options and variables for your one.com control panel login credentials.
Expand Down
18 changes: 18 additions & 0 deletions dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
FROM python:3-alpine AS base
FROM base AS builder

RUN mkdir /install
WORKDIR /install
COPY requirements.txt /requirements.txt
RUN pip install --no-binary=:all: --target=/install -r /requirements.txt

FROM base
COPY --from=builder /install /usr/local

WORKDIR /app

COPY . .
RUN pip3 install -r requirements.txt

ENTRYPOINT ["python3", "one_com_ddns.py"]
CMD []
69 changes: 69 additions & 0 deletions modules/dns_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# modules/dns_utils.py
import dns.resolver
import dns.exception
import logging

logger = logging.getLogger("one_com_ddns")

def get_authoritative_ns(domain):
"""
Recursively finds the authoritative nameservers for a given domain.
Tries parent domains if no NS records are found or in case of timeout or other DNS errors
(except for NXDOMAIN, where it stops and indicates domain does not exist).

Args:
domain (str): The domain to query.

Returns:
list: A list of authoritative nameserver hostnames, or None if not found or domain does not exist.
"""
try:
answers = dns.resolver.resolve(domain, 'NS')
return [data.to_text() for data in answers]
except dns.resolver.NoAnswer:
exception_type = "NoAnswer"
except dns.exception.Timeout:
exception_type = "Timeout"
except dns.resolver.NXDOMAIN:
exception_type = "NXDOMAIN"
except dns.exception.DNSException as e:
exception_type = f"DNSException: {e}"

# Handle exceptions that might indicate trying parent domain
if exception_type in ["NoAnswer", "Timeout", "DNSException", "NXDOMAIN"]:
parts = domain.split('.')
if len(parts) > 2: # Check if there's a parent domain
parent_domain = '.'.join(parts[1:])
return get_authoritative_ns(parent_domain)
else:
logger.warning(f"No authoritative NS for {domain} and no parent domain to try after exception: {exception_type}")
return None # No parent domain, give up
else: # Should not reach here, but as a fallback. For completeness.
logger.error(f"Failed to get NS for {domain} due to: {exception_type}")
return None

def get_ip_and_ttl(domain, ns_servers=None):
"""
Gets the IP address and TTL of a domain from the authoritative nameservers.

Args:
domain (str): The domain to query.
optional ns_servers (list): A list of authoritative nameserver hostnames.

Returns:
tuple: A tuple containing the IP address and TTL, or None if not found.
"""
if ns_servers is None:
ns_servers = get_authoritative_ns(domain)
if ns_servers is None:
return None
resolver = dns.resolver.Resolver()
tmp = [dns.resolver.resolve(ns, 'A')[0].address for ns in ns_servers]
resolver.nameservers = tmp
try:
answers = resolver.resolve(domain, 'A')
for rdata in answers:
return rdata.address, answers.ttl
except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN, dns.exception.Timeout, dns.exception.DNSException) as e:
logger.warning(f"Error getting IP and TTL for {domain}: {e}")
return None
35 changes: 35 additions & 0 deletions modules/logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# modules/logger.py
import logging

GRAY = '\033[90m'
RED = '\033[91m'
YELLOW = '\033[93m'
RESET = '\033[0m'

class ColoredFormatter(logging.Formatter):
"""A formatter that adds colors based on log level."""

def format(self, record):
log_level = record.levelname
if log_level == "DEBUG":
log_level_colored = f"{GRAY}[DEBUG]{RESET}"
elif log_level == "WARNING":
log_level_colored = f"{YELLOW}[WARN]{RESET}"
elif log_level == "ERROR":
log_level_colored = f"{RED}[ERROR]{RESET}"
else:
log_level_colored = f"[{log_level}]" # No color for INFO

return logging.Formatter(f'{log_level_colored} %(message)s').format(record)

def setup_logging(level=logging.INFO):
"""Sets up centralized logging for the application."""
formatter = ColoredFormatter()
handler = logging.StreamHandler()
handler.setFormatter(formatter)

logger = logging.getLogger("one_com_ddns")
logger.setLevel(level)
logger.addHandler(handler)

return logger
99 changes: 99 additions & 0 deletions modules/one_com_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# modules/one_com_api.py
import requests
import json
import tldextract
import logging

logger = logging.getLogger("one_com_ddns")

def _find_between(haystack, needle1, needle2):
index1 = haystack.find(needle1) + len(needle1)
index2 = haystack.find(needle2, index1 + 1)
return haystack[index1 : index2]

def login_session(username, password):
logger.info("Logging in...")
session = requests.session()
redirect_url = "https://www.one.com/admin/"
try:
r = session.get(redirect_url)
except requests.ConnectionError:
logger.error("Connection to one.com failed.")
raise SystemExit("Connection to one.com failed.")

post_url = _find_between(r.text, '<form id="kc-form-login" class="Login-form login autofill" onsubmit="login.disabled = true; return true;" action="', '"').replace('&amp;', '&')
login_data = {'username': username, 'password': password, 'credentialId': ''}
response = session.post(post_url, data=login_data)
if "Invalid username or password." in response.text:
logger.error("Invalid credentials. Exiting")
exit(1)
logger.info("Login successful.")
return session

def select_admin_domain(session, domain):
request_str = f"https://www.one.com/admin/select-admin-domain.do?domain={domain}"
session.get(request_str)

def get_custom_records(session, domain):
extracted = tldextract.extract(domain)
primary_domain = f"{extracted.domain}.{extracted.suffix}"
logger.info(f"Getting Records for primary domain: {primary_domain} (Requesting for: {domain})")
dns_url = f"https://www.one.com/admin/api/domains/{primary_domain}/dns/custom_records"
try:
response = session.get(dns_url)
response.raise_for_status()
get_res = response.text
if not get_res:
raise ValueError("Empty response from API")
data = json.loads(get_res)
records = data["result"]["data"]
# Validate that the records are in the expected list format.
if not isinstance(records, list):
raise TypeError("DNS records are not in the expected list format")
return records
except requests.exceptions.RequestException as e:
logger.error(f"Error fetching DNS records for domain: {domain}. HTTP Status Code: {e.response.status_code if e.response else 'N/A'}")
raise SystemExit(f"Failed to get DNS records for domain {domain}: {e}") from e
except (json.JSONDecodeError, KeyError, TypeError, ValueError) as e:
logger.error(f"Error parsing JSON response for domain: {domain}. Response text was: {get_res}")
raise SystemExit(f"Failed to parse DNS records JSON for domain {domain}: {e}") from e


def find_id_by_subdomain(records, subdomain):
extracted_subdomain = tldextract.extract(subdomain).subdomain
logger.info(f"searching domain prefix for: '{extracted_subdomain}'")
for obj in records:
if obj["attributes"]["prefix"] == extracted_subdomain:
logger.info(f"Found Domain Prefix '{extracted_subdomain}': {obj['id']}")
return obj
return None

def change_ip(session, record, domain, ip, ttl):
record_id = record["id"]
current_ttl = record["attributes"]["ttl"]
actual_ttl = ttl if ttl is not None else current_ttl
extracted = tldextract.extract(domain)
primary_domain = f"{extracted.domain}.{extracted.suffix}"
subdomain = extracted.subdomain
logger.info(f"Changing IP on record for subdomain '{subdomain}' - ID '{record_id}' TO NEW IP '{ip}' with TTL '{actual_ttl}' on primary domain '{primary_domain}'")
to_send = {
"type": "dns_service_records",
"id": record_id,
"attributes": {
"type": "A",
"prefix": subdomain,
"content": ip,
"ttl": actual_ttl
}
}
dns_url = f"https://www.one.com/admin/api/domains/{primary_domain}/dns/custom_records/{record_id}"
send_headers = {'Content-Type': 'application/json'}
try:
response = session.patch(dns_url, data=json.dumps(to_send), headers=send_headers)
response.raise_for_status()
logger.info("Sent Change IP Request")
except requests.exceptions.RequestException as e:
logger.error(f"Error updating IP for record with subdomain '{subdomain}': {e}")
status_code = e.response.status_code if e.response else 'N/A'
logger.error(f"HTTP Status Code: {status_code}")
raise SystemExit(f"Failed to update IP for record with subdomain {subdomain}: {e}") from e
74 changes: 74 additions & 0 deletions modules/one_com_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# modules/one_com_config.py
import os
import argparse
import requests
import logging

logger = logging.getLogger("one_com_ddns")

def parse_config(validate_required=True):
"""
Parses configuration for the one.com DDNS script from command-line arguments,
environment variables, and returns None as default if no value is set.

Configuration is prioritized: command-line arguments > environment variables > None (if unset).

Key configuration parameters:
- username (-u, --username): one.com username (defaults to None if unset)
- password (-p, --password): one.com password (defaults to None if unset)
- domains (-d, --domains): List of domain names (e.g.,-d example.com example2.com) (defaults to None if unset)
- ip (-i, --ip): IP address source ('AUTO', 'ARG', or IP) (defaults to None if unset)
- force-update (-f, --force-update): Force DNS update (defaults to None if unset)
- ttl (-t, --ttl): TTL for DNS records (defaults to None if unset)
- skip-confirmation (--skip-confirmation): Skip confirmation prompts (defaults to False if unset)

Returns:
argparse.Namespace: Object containing configuration parameters.

Raises:
ValueError: If validate_required is True and username, password, or domain are None
after parsing.
SystemExit: If automatic IP retrieval fails when 'AUTO' is selected
or if no IP address is provided as a command-line argument
when 'ARG' is selected.
"""
parser = argparse.ArgumentParser(description="one.com DDNS Script Configuration")

parser.add_argument("-u", "--username", help="one.com username", default=os.environ.get("USERNAME"))
parser.add_argument("-p", "--password", help="one.com password", default=os.environ.get("PASSWORD"))
env_domains = os.environ.get("DOMAINS")
if env_domains is not None:
env_domains = env_domains.split(',')

parser.add_argument("-d", "--domains", nargs="+", help="List of domain names (e.g.,-d example.com example2.com)", default=env_domains)
parser.add_argument("-i", "--ip", help="IP address ('AUTO', or IP)", default=os.environ.get("IP", "AUTO"))

env_force = os.environ.get("FORCE_DNS_UPDATE")
if env_force is not None:
env_force = env_force.lower()

parser.add_argument("-f", "--force-update", action="store_true", help="Force DNS update (skip IP check)", default=env_force)
parser.add_argument("-t", "--ttl", type=int, help="TTL value for DNS records", default=os.environ.get("TTL"))
parser.add_argument("-y", "--skip-confirmation", action="store_true", help="Skip confirmation prompts", default=os.environ.get("SKIP_CONFIRMATION"))

args = parser.parse_args()

# Basic validation (ONLY IF validate_required is True)
if validate_required:
if not args.username:
raise ValueError("Username is required (command-line or USERNAME env var)")
if not args.password:
raise ValueError("Password is required (command-line or PASSWORD env var)")
if not args.domains:
raise ValueError("Domain is required (command-line or DOMAIN env var)")

# Handle IP address retrieval
if args.ip == "AUTO":
try:
args.ip = requests.get("https://api.ipify.org/").text
except requests.ConnectionError:
logger.error("Failed to get IP Address from ipify")
raise SystemExit("Failed to get IP Address from ipify")
logger.info(f"Detected external IP: {args.ip}")

return args
35 changes: 35 additions & 0 deletions modules/update_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# modules/update_utils.py
import sys
import modules.one_com_api as one_com_api
import logging

logger = logging.getLogger("one_com_ddns")

def _confirm_proceed(prompt: str) -> bool:
"""
Prompt for confirmation unless running in a non-interactive shell.
"""
if not sys.stdin.isatty():
logger.info("Non-interactive shell detected; proceeding without user confirmation.")
return True
response = input(prompt)
return response.strip().lower() == 'y'

def update_domain(s, domain, records, ip, ttl, current_dns_ip=None, skip_confirmation=False):
"""
Updates the DNS record for a given domain, handling confirmation and logging.
"""
record_obj = one_com_api.find_id_by_subdomain(records, domain)
if record_obj is None:
logger.error(f"Record '{domain}' could not be found.")
return

if current_dns_ip:
logger.warning(f"Changing IP for {domain} from {current_dns_ip} to {ip} with TTL {ttl}.")
else:
logger.info(f"Changing IP for {domain} to {ip} with TTL {ttl}.")

if skip_confirmation or _confirm_proceed("Do you want to proceed? (y/n): "):
one_com_api.change_ip(s, record_obj, domain, ip, ttl)
else:
logger.info(f"Update for {domain} cancelled.")
Loading