Skip to content
Snippets Groups Projects
Verified Commit 69bb6afe authored by Gigadoc 2's avatar Gigadoc 2
Browse files

move type annotations into separate .pyi

parent 0dd67ca2
Branches untyped
No related tags found
No related merge requests found
import configparser
from IPy import IP
from collections.abc import Container
from pyroute2 import IPRoute
from pyroute2.netlink import rtnl
from typing import List, Optional, Tuple, TypedDict
update_url: str
class IfaceDict(TypedDict):
hosts: List[configparser.SectionProxy]
active_prefix: Optional[str]
def get_current_aaaas(host: str) -> list: ...
def post_update(host: str, key: str, ip: str) -> None: ...
def update_dns(host: configparser.SectionProxy, new_ip: IP) -> None: ...
def get_id_from_name(ipr: IPRoute, name: str) -> int: ...
def get_watched_ifaces(ipr: IPRoute, config: configparser.ConfigParser) -> dict: ...
def parse_or_discard(watched_ifaces: Container[int], msg: rtnl.rtmsg.rtmsg) -> Optional[Tuple[int, IP, bool]]: ...
def handle_message(watched_ifaces: dict, msg: rtnl.rtmsg.rtmsg) -> None: ...
def main() -> int: ...
......@@ -3,9 +3,6 @@
# Copyright 2021 Fabian Pack
# SPDX-License-Identifier: AGPL-3.0-only
from typing import Dict, List, Optional, Tuple, TypedDict
from collections.abc import Container
import configparser
import random
import socket
......@@ -24,12 +21,7 @@ from pyroute2.netlink import rtnl
update_url = "https://dyn.dns.he.net/nic/update"
class IfaceDict(TypedDict):
hosts: List[configparser.SectionProxy]
active_prefix: Optional[str]
def get_current_aaaas(host: str) -> list:
def get_current_aaaas(host):
host = dns.name.from_unicode("{}.".format(host))
resolver = dns.resolver.Resolver()
......@@ -68,7 +60,7 @@ def get_current_aaaas(host: str) -> list:
# TODO: nxdomain -> by convention probably not safe to continue
def post_update(host: str, key: str, ip: str) -> None:
def post_update(host, key, ip):
data = urllib.parse.urlencode({ "hostname": host, "password": key, "myip": ip })
# TODO: exceptiooons
response = urllib.request.urlopen(update_url, data.encode('ascii'))
......@@ -79,7 +71,7 @@ def post_update(host: str, key: str, ip: str) -> None:
response.status, response.reason))
def update_dns(host: configparser.SectionProxy, new_ip: IP) -> None:
def update_dns(host, new_ip):
for addr in get_current_aaaas(host.name):
current_ip = IP(addr)
if current_ip == new_ip:
......@@ -99,7 +91,7 @@ def update_dns(host: configparser.SectionProxy, new_ip: IP) -> None:
print("{}: update failure!".format(host.name))
def get_id_from_name(ipr: IPRoute, name: str) -> int:
def get_id_from_name(ipr, name):
ret = ipr.link_lookup(ifname=name)
if len(ret) > 0:
assert len(ret) == 1
......@@ -108,8 +100,8 @@ def get_id_from_name(ipr: IPRoute, name: str) -> int:
raise FileNotFoundError
def get_watched_ifaces(ipr: IPRoute, config: configparser.ConfigParser) -> dict:
watched_ifaces: Dict[int, IfaceDict] = dict()
def get_watched_ifaces(ipr, config):
watched_ifaces= dict()
for section in config.sections():
iface = get_id_from_name(ipr, config[section]['iface'])
......@@ -120,7 +112,7 @@ def get_watched_ifaces(ipr: IPRoute, config: configparser.ConfigParser) -> dict:
return watched_ifaces
def parse_or_discard(watched_ifaces: Container[int], msg: rtnl.rtmsg.rtmsg) -> Optional[Tuple[int, IP, bool]]:
def parse_or_discard(watched_ifaces, msg):
# We only care for added or removed routes
if msg['event'] == 'RTM_NEWROUTE':
deleted = False
......@@ -158,7 +150,7 @@ def parse_or_discard(watched_ifaces: Container[int], msg: rtnl.rtmsg.rtmsg) -> O
return ifid, prefix, deleted
def handle_message(watched_ifaces: dict, msg: rtnl.rtmsg.rtmsg) -> None:
def handle_message(watched_ifaces, msg):
ifid, prefix, deleted = \
parse_or_discard(watched_ifaces.keys(), msg) or (None, None, None)
if ifid is None:
......@@ -182,7 +174,7 @@ def handle_message(watched_ifaces: dict, msg: rtnl.rtmsg.rtmsg) -> None:
update_dns(host, prefix[suffix.int()])
def main() -> int:
def main():
config = configparser.ConfigParser()
config.read('config.ini')
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment