Skip to content

Commit

Permalink
Fix code style
Browse files Browse the repository at this point in the history
  • Loading branch information
nickspaargaren committed Aug 14, 2024
1 parent 1a3f738 commit 957e89a
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 87 deletions.
8 changes: 7 additions & 1 deletion convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,13 @@ def run(action: str):
if __name__ == "__main__":

# Read subcommand from command line, with error handling.
action_candidates = ["pihole", "unbound", "adguard", "adguard_important", "categories"]
action_candidates = [
"pihole",
"unbound",
"adguard",
"adguard_important",
"categories",
]
special_candidates = ["all", "duplicates", "json"]
subcommand = None
try:
Expand Down
162 changes: 95 additions & 67 deletions install.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,35 +11,39 @@ def fetch_url(url):
if not url:
return

headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:71.0) Gecko/20100101 Firefox/71.0'}
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:71.0) Gecko/20100101 Firefox/71.0"
}

print('[i] Fetching:', url)
print("[i] Fetching:", url)

try:
response = urlopen(Request(url, headers=headers))
except HTTPError as e:
print('[E] HTTP Error:', e.code, 'whilst fetching', url)
print("[E] HTTP Error:", e.code, "whilst fetching", url)
return
except URLError as e:
print('[E] URL Error:', e.reason, 'whilst fetching', url)
print("[E] URL Error:", e.reason, "whilst fetching", url)
return

# Read and decode
response = response.read().decode('UTF-8').replace('\r\n', '\n')
response = response.read().decode("UTF-8").replace("\r\n", "\n")

# If there is data
if response:
# Strip leading and trailing whitespace
response = '\n'.join(x for x in map(str.strip, response.splitlines()))
response = "\n".join(x for x in map(str.strip, response.splitlines()))

# Return the hosts
return response


url_regexps_remote = 'https://raw.githubusercontent.com/nickspaargaren/no-google/master/regex.list'
install_comment = 'github.com/nickspaargaren/no-google'
url_regexps_remote = (
"https://raw.githubusercontent.com/nickspaargaren/no-google/master/regex.list"
)
install_comment = "github.com/nickspaargaren/no-google"

cmd_restart = ['pihole', 'restartdns', 'reload']
cmd_restart = ["pihole", "restartdns", "reload"]

db_exists = False
conn = None
Expand All @@ -61,73 +65,83 @@ def fetch_url(url):

# Check to see whether the default "pihole" docker container is active
try:
docker_id = subprocess.run(['docker', 'ps', '--filter', 'name=pihole', '-q'],
stdout=subprocess.PIPE, universal_newlines=True).stdout.strip()
docker_id = subprocess.run(
["docker", "ps", "--filter", "name=pihole", "-q"],
stdout=subprocess.PIPE,
universal_newlines=True,
).stdout.strip()
# Exception for if docker is not installed
except FileNotFoundError:
pass

# If a pihole docker container was found, locate the first mount
if docker_id:
docker_mnt = subprocess.run(['docker', 'inspect', '--format', '{{ (json .Mounts) }}', docker_id],
stdout=subprocess.PIPE, universal_newlines=True).stdout.strip()
docker_mnt = subprocess.run(
["docker", "inspect", "--format", "{{ (json .Mounts) }}", docker_id],
stdout=subprocess.PIPE,
universal_newlines=True,
).stdout.strip()
# Convert output to JSON and iterate through each dict
for json_dict in json.loads(docker_mnt):
# If this mount's destination is /etc/pihole
if json_dict['Destination'] == r'/etc/pihole':
if json_dict["Destination"] == r"/etc/pihole":
# Use the source path as our target
docker_mnt_src = json_dict['Source']
docker_mnt_src = json_dict["Source"]
break

# If we successfully found the mount
if docker_mnt_src:
print('[i] Running in docker installation mode')
print("[i] Running in docker installation mode")
# Prepend restart commands
cmd_restart[0:0] = ['docker', 'exec', '-i', 'pihole']
cmd_restart[0:0] = ["docker", "exec", "-i", "pihole"]
else:
print('[i] Running in physical installation mode ')
print("[i] Running in physical installation mode ")

# Set paths
path_pihole = docker_mnt_src if docker_mnt_src else r'/etc/pihole'
path_legacy_regex = os.path.join(path_pihole, 'regex.list')
path_legacy_mmotti_regex = os.path.join(path_pihole, 'mmotti-regex.list')
path_pihole_db = os.path.join(path_pihole, 'gravity.db')
path_pihole = docker_mnt_src if docker_mnt_src else r"/etc/pihole"
path_legacy_regex = os.path.join(path_pihole, "regex.list")
path_legacy_mmotti_regex = os.path.join(path_pihole, "mmotti-regex.list")
path_pihole_db = os.path.join(path_pihole, "gravity.db")

# Check that pi-hole path exists
if os.path.exists(path_pihole):
print('[i] Pi-hole path exists')
print("[i] Pi-hole path exists")
else:
print(f'[e] {path_pihole} was not found')
print(f"[e] {path_pihole} was not found")
exit(1)

# Check for write access to /etc/pihole
if os.access(path_pihole, os.X_OK | os.W_OK):
print(f'[i] Write access to {path_pihole} verified')
print(f"[i] Write access to {path_pihole} verified")
else:
print(f'[e] Write access is not available for {path_pihole}. Please run as root or other privileged user')
print(
f"[e] Write access is not available for {path_pihole}. Please run as root or other privileged user"
)
exit(1)

# Determine whether we are using DB or not
if os.path.isfile(path_pihole_db) and os.path.getsize(path_pihole_db) > 0:
db_exists = True
print('[i] DB detected')
print("[i] DB detected")
else:
print('[i] Legacy regex.list detected')
print("[i] Legacy regex.list detected")

# Fetch the remote regexps
str_regexps_remote = fetch_url(url_regexps_remote)

# If regexps were fetched, remove any comments and add to set
if str_regexps_remote:
regexps_remote.update(x for x in map(str.strip, str_regexps_remote.splitlines()) if x and x[:1] != '#')
print(f'[i] {len(regexps_remote)} regexps collected from {url_regexps_remote}')
regexps_remote.update(
x for x in map(str.strip, str_regexps_remote.splitlines()) if x and x[:1] != "#"
)
print(f"[i] {len(regexps_remote)} regexps collected from {url_regexps_remote}")
else:
print('[i] No remote regexps were found.')
print("[i] No remote regexps were found.")
exit(1)

if db_exists:
# Create a DB connection
print(f'[i] Connecting to {path_pihole_db}')
print(f"[i] Connecting to {path_pihole_db}")

try:
conn = sqlite3.connect(path_pihole_db)
Expand All @@ -139,93 +153,107 @@ def fetch_url(url):
c = conn.cursor()

# Add / update remote regexps
print('[i] Adding / updating regexps in the DB')

c.executemany('INSERT OR IGNORE INTO domainlist (type, domain, enabled, comment) '
'VALUES (3, ?, 1, ?)',
[(x, install_comment) for x in sorted(regexps_remote)])
c.executemany('UPDATE domainlist '
'SET comment = ? WHERE domain in (?) AND comment != ?',
[(install_comment, x, install_comment) for x in sorted(regexps_remote)])
print("[i] Adding / updating regexps in the DB")

c.executemany(
"INSERT OR IGNORE INTO domainlist (type, domain, enabled, comment) "
"VALUES (3, ?, 1, ?)",
[(x, install_comment) for x in sorted(regexps_remote)],
)
c.executemany(
"UPDATE domainlist " "SET comment = ? WHERE domain in (?) AND comment != ?",
[(install_comment, x, install_comment) for x in sorted(regexps_remote)],
)

conn.commit()

# Fetch all current mmotti regexps in the local db
c.execute('SELECT domain FROM domainlist WHERE type = 3 AND comment = ?', (install_comment,))
c.execute(
"SELECT domain FROM domainlist WHERE type = 3 AND comment = ?",
(install_comment,),
)
regexps_mmotti_local_results = c.fetchall()
regexps_mmotti_local.update([x[0] for x in regexps_mmotti_local_results])

# Remove any local entries that do not exist in the remote list
# (will only work for previous installs where we've set the comment field)
print('[i] Identifying obsolete regexps')
print("[i] Identifying obsolete regexps")
regexps_remove = regexps_mmotti_local.difference(regexps_remote)

if regexps_remove:
print('[i] Removing obsolete regexps')
c.executemany('DELETE FROM domainlist WHERE type = 3 AND domain in (?)', [(x,) for x in regexps_remove])
print("[i] Removing obsolete regexps")
c.executemany(
"DELETE FROM domainlist WHERE type = 3 AND domain in (?)",
[(x,) for x in regexps_remove],
)
conn.commit()

# Delete mmotti-regex.list as if we've migrated to the db, it's no longer needed
if os.path.exists(path_legacy_mmotti_regex):
os.remove(path_legacy_mmotti_regex)

print('[i] Restarting Pi-hole')
print("[i] Restarting Pi-hole")
subprocess.run(cmd_restart, stdout=subprocess.DEVNULL)

# Prepare final result
print('[i] Done - Please see your installed regexps below\n')
print("[i] Done - Please see your installed regexps below\n")

c.execute('Select domain FROM domainlist WHERE type = 3')
c.execute("Select domain FROM domainlist WHERE type = 3")
final_results = c.fetchall()
regexps_local.update(x[0] for x in final_results)

print(*sorted(regexps_local), sep='\n')
print(*sorted(regexps_local), sep="\n")

conn.close()

else:
# If regex.list exists and is not empty
# Read it and add to a set
if os.path.isfile(path_legacy_regex) and os.path.getsize(path_legacy_regex) > 0:
print('[i] Collecting existing entries from regex.list')
with open(path_legacy_regex, 'r') as fRead:
regexps_local.update(x for x in map(str.strip, fRead) if x and x[:1] != '#')
print("[i] Collecting existing entries from regex.list")
with open(path_legacy_regex, "r") as fRead:
regexps_local.update(x for x in map(str.strip, fRead) if x and x[:1] != "#")

# If the local regexp set is not empty
if regexps_local:
print(f'[i] {len(regexps_local)} existing regexps identified')
print(f"[i] {len(regexps_local)} existing regexps identified")
# If we have a record of a previous legacy install
if os.path.isfile(path_legacy_mmotti_regex) and os.path.getsize(path_legacy_mmotti_regex) > 0:
print('[i] Existing mmotti-regex install identified')
if (
os.path.isfile(path_legacy_mmotti_regex)
and os.path.getsize(path_legacy_mmotti_regex) > 0
):
print("[i] Existing mmotti-regex install identified")
# Read the previously installed regexps to a set
with open(path_legacy_mmotti_regex, 'r') as fOpen:
regexps_legacy_mmotti.update(x for x in map(str.strip, fOpen) if x and x[:1] != '#')
with open(path_legacy_mmotti_regex, "r") as fOpen:
regexps_legacy_mmotti.update(
x for x in map(str.strip, fOpen) if x and x[:1] != "#"
)

if regexps_legacy_mmotti:
print('[i] Removing previously installed regexps')
print("[i] Removing previously installed regexps")
regexps_local.difference_update(regexps_legacy_mmotti)

# Add remote regexps to local regexps
print(f'[i] Syncing with {url_regexps_remote}')
print(f"[i] Syncing with {url_regexps_remote}")
regexps_local.update(regexps_remote)

# Output to regex.list
print(f'[i] Outputting {len(regexps_local)} regexps to {path_legacy_regex}')
with open(path_legacy_regex, 'w') as fWrite:
print(f"[i] Outputting {len(regexps_local)} regexps to {path_legacy_regex}")
with open(path_legacy_regex, "w") as fWrite:
for line in sorted(regexps_local):
fWrite.write(f'{line}\n')
fWrite.write(f"{line}\n")

# Output mmotti remote regexps to mmotti-regex.list
# for future install / uninstall
with open(path_legacy_mmotti_regex, 'w') as fWrite:
with open(path_legacy_mmotti_regex, "w") as fWrite:
for line in sorted(regexps_remote):
fWrite.write(f'{line}\n')
fWrite.write(f"{line}\n")

print('[i] Restarting Pi-hole')
print("[i] Restarting Pi-hole")
subprocess.run(cmd_restart, stdout=subprocess.DEVNULL)

# Prepare final result
print('[i] Done - Please see your installed regexps below\n')
with open(path_legacy_regex, 'r') as fOpen:
print("[i] Done - Please see your installed regexps below\n")
with open(path_legacy_regex, "r") as fOpen:
for line in fOpen:
print(line, end='')
print(line, end="")
5 changes: 4 additions & 1 deletion scripts/dnscheck.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import dns.resolver


def check_domain(domain):
try:
# Query for 'NS' records
Expand All @@ -12,10 +13,11 @@ def check_domain(domain):

return not has_ns_record


def main():
found_domains = 0
domains_with_ns_records = []

with open("../pihole-google.txt") as f:
for line in f:
domain = line.strip() # strip whitespace
Expand All @@ -32,5 +34,6 @@ def main():
with open("../pihole-google.txt", "w") as f:
f.writelines(domains_with_ns_records)


if __name__ == "__main__":
main()
8 changes: 4 additions & 4 deletions scripts/domain-check-api.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import whois # pip install python-whois
import whois
import time
import re

Expand All @@ -19,11 +19,11 @@ def remove_duplicates(mylist: list[str]):


def get_domains():
with open('../pihole-google.txt', 'r') as main:
with open("../pihole-google.txt", "r") as main:
for line in main:
if not '#' in line and not ':' in line:
if not "#" in line and not ":" in line:
line = get_domain(line)
if line != '\n' and line != []:
if line != "\n" and line != []:
line = re.sub(r"\n", "", line)
yield line

Expand Down
Loading

0 comments on commit 957e89a

Please sign in to comment.