Pwnagotchi_waveshare_V4适配(海南大学)(中)

简介: Pwnagotchi_waveshare_V4适配(海南大学)(中)
# pwnagotchi/ui/hw/libs/waveshare/v3/epdconfig.py
import os
import logging
import sys
import time
class RaspberryPi:
    # Pin definition
    RST_PIN         = 17
    DC_PIN          = 25
    CS_PIN          = 8
    BUSY_PIN        = 24
    def __init__(self):
        import spidev
        import RPi.GPIO
        self.GPIO = RPi.GPIO
        # SPI device, bus = 0, device = 0
        self.SPI = spidev.SpiDev(0, 0)
    def digital_write(self, pin, value):
        self.GPIO.output(pin, value)
    def digital_read(self, pin):
        return self.GPIO.input(pin)
    def delay_ms(self, delaytime):
        time.sleep(delaytime / 1000.0)
    def spi_writebyte(self, data):
        self.SPI.writebytes(data)
    def module_init(self):
        self.GPIO.setmode(self.GPIO.BCM)
        self.GPIO.setwarnings(False)
        self.GPIO.setup(self.RST_PIN, self.GPIO.OUT)
        self.GPIO.setup(self.DC_PIN, self.GPIO.OUT)
        self.GPIO.setup(self.CS_PIN, self.GPIO.OUT)
        self.GPIO.setup(self.BUSY_PIN, self.GPIO.IN)
        self.SPI.max_speed_hz = 4000000
        self.SPI.mode = 0b00
        return 0
    def module_exit(self):
        logging.debug("spi end")
        self.SPI.close()
        logging.debug("close 5V, Module enters 0 power consumption ...")
        self.GPIO.output(self.RST_PIN, 0)
        self.GPIO.output(self.DC_PIN, 0)
        self.GPIO.cleanup()
class JetsonNano:
    # Pin definition
    RST_PIN         = 17
    DC_PIN          = 25
    CS_PIN          = 8
    BUSY_PIN        = 24
    def __init__(self):
        import ctypes
        find_dirs = [
            os.path.dirname(os.path.realpath(__file__)),
            '/usr/local/lib',
            '/usr/lib',
        ]
        self.SPI = None
        for find_dir in find_dirs:
            so_filename = os.path.join(find_dir, 'sysfs_software_spi.so')
            if os.path.exists(so_filename):
                self.SPI = ctypes.cdll.LoadLibrary(so_filename)
                break
        if self.SPI is None:
            raise RuntimeError('Cannot find sysfs_software_spi.so')
        import Jetson.GPIO
        self.GPIO = Jetson.GPIO
    def digital_write(self, pin, value):
        self.GPIO.output(pin, value)
    def digital_read(self, pin):
        return self.GPIO.input(self.BUSY_PIN)
    def delay_ms(self, delaytime):
        time.sleep(delaytime / 1000.0)
    def spi_writebyte(self, data):
        self.SPI.SYSFS_software_spi_transfer(data[0])
    def module_init(self):
        self.GPIO.setmode(self.GPIO.BCM)
        self.GPIO.setwarnings(False)
        self.GPIO.setup(self.RST_PIN, self.GPIO.OUT)
        self.GPIO.setup(self.DC_PIN, self.GPIO.OUT)
        self.GPIO.setup(self.CS_PIN, self.GPIO.OUT)
        self.GPIO.setup(self.BUSY_PIN, self.GPIO.IN)
        self.SPI.SYSFS_software_spi_begin()
        return 0
    def module_exit(self):
        logging.debug("spi end")
        self.SPI.SYSFS_software_spi_end()
        logging.debug("close 5V, Module enters 0 power consumption ...")
        self.GPIO.output(self.RST_PIN, 0)
        self.GPIO.output(self.DC_PIN, 0)
        self.GPIO.cleanup()
if os.path.exists('/sys/bus/platform/drivers/gpiomem-bcm2835'):
    implementation = RaspberryPi()
else:
    implementation = JetsonNano()
for func in [x for x in dir(implementation) if not x.startswith('_')]:
    setattr(sys.modules[__name__], func, getattr(implementation, func))
### END OF FILE ###


# pwnagotchi/ui/hw/waveshare3.py
import logging
import pwnagotchi.ui.fonts as fonts
from pwnagotchi.ui.hw.base import DisplayImpl
class WaveshareV3(DisplayImpl):
    def __init__(self, config):
        super(WaveshareV3, self).__init__(config, 'waveshare_3')
        self._display = None
    def layout(self):
        fonts.setup(10, 8, 10, 25, 25, 9)
        self._layout['width'] = 250
        self._layout['height'] = 122
        self._layout['face'] = (0, 40)
        self._layout['name'] = (5, 20)
        self._layout['channel'] = (0, 0)
        self._layout['aps'] = (28, 0)
        self._layout['uptime'] = (185, 0)
        self._layout['line1'] = [0, 14, 250, 14]
        self._layout['line2'] = [0, 108, 250, 108]
        self._layout['friend_face'] = (0, 92)
        self._layout['friend_name'] = (40, 94)
        self._layout['shakes'] = (0, 109)
        self._layout['mode'] = (225, 109)
        self._layout['status'] = {
            'pos': (125, 20),
            'font': fonts.status_font(fonts.Medium),
            'max': 20
        }
        return self._layout
    def initialize(self):
        logging.info("initializing waveshare v3 display")
        from pwnagotchi.ui.hw.libs.waveshare.v3.epd2in13_V3 import EPD
        self._display = EPD()
        self._display.init()
        self._display.Clear(0xFF)
    def render(self, canvas):
        buf = self._display.getbuffer(canvas)
        self._display.displayPartial(buf)
    def clear(self):
        #pass
        self._display.Clear(0xFF)


# pwnagotchi/utils.py
import logging
import glob
import os
import time
import subprocess
import json
import shutil
import toml
import sys
import re
from toml.encoder import TomlEncoder, _dump_str
from zipfile import ZipFile
from datetime import datetime
from enum import Enum
class DottedTomlEncoder(TomlEncoder):
    """
    Dumps the toml into the dotted-key format
    """
    def __init__(self, _dict=dict):
        super(DottedTomlEncoder, self).__init__(_dict)
    def dump_list(self, v):
        retval = "["
        # 1 line if its just 1 item; therefore no newline
        if len(v) > 1:
            retval += "\n"
        for u in v:
            retval += " " + str(self.dump_value(u)) + ",\n"
        # 1 line if its just 1 item; remove newline
        if len(v) <= 1:
            retval = retval.rstrip("\n")
        retval += "]"
        return retval
    def dump_sections(self, o, sup):
        retstr = ""
        pre = ""
        if sup:
            pre = sup + "."
        for section, value in o.items():
            section = str(section)
            qsection = section
            if not re.match(r'^[A-Za-z0-9_-]+$', section):
                qsection = _dump_str(section)
            if value is not None:
                if isinstance(value, dict):
                    toadd, _ = self.dump_sections(value, pre + qsection)
                    retstr += toadd
                    # separte sections
                    if not retstr.endswith('\n\n'):
                        retstr += '\n'
                else:
                    retstr += (pre + qsection + " = " +
                                str(self.dump_value(value)) + '\n')
        return (retstr, self._dict())
def parse_version(version):
    """
    Converts a version str to tuple, so that versions can be compared
    """
    return tuple(version.split('.'))
def remove_whitelisted(list_of_handshakes, list_of_whitelisted_strings, valid_on_error=True):
    """
    Removes a given list of whitelisted handshakes from a path list
    """
    filtered = list()
    def normalize(name):
        """
        Only allow alpha/nums
        """
        return str.lower(''.join(c for c in name if c.isalnum()))
    for handshake in list_of_handshakes:
        try:
            normalized_handshake = normalize(os.path.basename(handshake).rstrip('.pcap'))
            for whitelist in list_of_whitelisted_strings:
                normalized_whitelist = normalize(whitelist)
                if normalized_whitelist in normalized_handshake:
                    break
            else:
                filtered.append(handshake)
        except Exception:
            if valid_on_error:
                filtered.append(handshake)
    return filtered
def download_file(url, destination, chunk_size=128):
    import requests
    resp = requests.get(url)
    resp.raise_for_status()
    with open(destination, 'wb') as fd:
        for chunk in resp.iter_content(chunk_size):
            fd.write(chunk)
def unzip(file, destination, strip_dirs=0):
    os.makedirs(destination, exist_ok=True)
    with ZipFile(file, 'r') as zip:
        if strip_dirs:
            for info in zip.infolist():
                new_filename = info.filename.split('/', maxsplit=strip_dirs)[strip_dirs]
                if new_filename:
                    info.filename = new_filename
                    zip.extract(info, destination)
        else:
            zip.extractall(destination)
# https://stackoverflow.com/questions/823196/yaml-merge-in-python
def merge_config(user, default):
    if isinstance(user, dict) and isinstance(default, dict):
        for k, v in default.items():
            if k not in user:
                user[k] = v
            else:
                user[k] = merge_config(user[k], v)
    return user
def keys_to_str(data):
    if isinstance(data,list):
        converted_list = list()
        for item in data:
            if isinstance(item,list) or isinstance(item,dict):
                converted_list.append(keys_to_str(item))
            else:
                converted_list.append(item)
        return converted_list
    converted_dict = dict()
    for key, value in data.items():
        if isinstance(value, list) or isinstance(value, dict):
            converted_dict[str(key)] = keys_to_str(value)
        else:
            converted_dict[str(key)] = value
    return converted_dict
def save_config(config, target):
    with open(target, 'wt') as fp:
        fp.write(toml.dumps(config, encoder=DottedTomlEncoder()))
    return True
def load_config(args):
    default_config_path = os.path.dirname(args.config)
    if not os.path.exists(default_config_path):
        os.makedirs(default_config_path)
    import pwnagotchi
    ref_defaults_file = os.path.join(os.path.dirname(pwnagotchi.__file__), 'defaults.toml')
    ref_defaults_data = None
    # check for a config.yml file on /boot/
    for boot_conf in ['/boot/config.yml', '/boot/config.toml']:
        if os.path.exists(boot_conf):
            # logging not configured here yet
            print("installing %s to %s ...", boot_conf, args.user_config)
            # https://stackoverflow.com/questions/42392600/oserror-errno-18-invalid-cross-device-link
            shutil.move(boot_conf, args.user_config)
            break
    # check for an entire pwnagotchi folder on /boot/
    if os.path.isdir('/boot/pwnagotchi'):
        print("installing /boot/pwnagotchi to /etc/pwnagotchi ...")
        shutil.rmtree('/etc/pwnagotchi', ignore_errors=True)
        shutil.move('/boot/pwnagotchi', '/etc/')
    # if not config is found, copy the defaults
    if not os.path.exists(args.config):
        print("copying %s to %s ..." % (ref_defaults_file, args.config))
        shutil.copy(ref_defaults_file, args.config)
    else:
        # check if the user messed with the defaults
        with open(ref_defaults_file) as fp:
            ref_defaults_data = fp.read()
        with open(args.config) as fp:
            defaults_data = fp.read()
        if ref_defaults_data != defaults_data:
            print("!!! file in %s is different than release defaults, overwriting !!!" % args.config)
            shutil.copy(ref_defaults_file, args.config)
    # load the defaults
    with open(args.config) as fp:
        config = toml.load(fp)
    # load the user config
    try:
        user_config = None
        # migrate
        yaml_name = args.user_config.replace('.toml', '.yml')
        if not os.path.exists(args.user_config) and os.path.exists(yaml_name):
            # no toml found; convert yaml
            logging.info('Old yaml-config found. Converting to toml...')
            with open(args.user_config, 'w') as toml_file, open(yaml_name) as yaml_file:
                import yaml
                user_config = yaml.safe_load(yaml_file)
                # convert int/float keys to str
                user_config = keys_to_str(user_config)
                # convert to toml but use loaded yaml
                toml.dump(user_config, toml_file)
        elif os.path.exists(args.user_config):
            with open(args.user_config) as toml_file:
                user_config = toml.load(toml_file)
        if user_config:
            config = merge_config(user_config, config)
    except Exception as ex:
        logging.error("There was an error processing the configuration file:\n%s ",ex)
        sys.exit(1)
    # dropins
    dropin = config['main']['confd']
    if dropin and os.path.isdir(dropin):
        dropin += '*.toml' if dropin.endswith('/') else '/*.toml' # only toml here; yaml is no more
        for conf in glob.glob(dropin):
            with open(conf) as toml_file:
                additional_config = toml.load(toml_file)
                config = merge_config(additional_config, config)
    # the very first step is to normalize the display name so we don't need dozens of if/elif around
    if config['ui']['display']['type'] in ('inky', 'inkyphat'):
        config['ui']['display']['type'] = 'inky'
    elif config['ui']['display']['type'] in ('papirus', 'papi'):
        config['ui']['display']['type'] = 'papirus'
    elif config['ui']['display']['type'] in ('oledhat',):
        config['ui']['display']['type'] = 'oledhat'
    elif config['ui']['display']['type'] in ('ws_1', 'ws1', 'waveshare_1', 'waveshare1'):
        config['ui']['display']['type'] = 'waveshare_1'
    elif config['ui']['display']['type'] in ('ws_2', 'ws2', 'waveshare_2', 'waveshare2'):
        config['ui']['display']['type'] = 'waveshare_2'
    elif config['ui']['display']['type'] in ('ws_3', 'ws3', 'waveshare_3', 'waveshare3'):
        config['ui']['display']['type'] = 'waveshare_3'
    elif config['ui']['display']['type'] in ('ws_27inch', 'ws27inch', 'waveshare_27inch', 'waveshare27inch'):
        config['ui']['display']['type'] = 'waveshare27inch'
    elif config['ui']['display']['type'] in ('ws_29inch', 'ws29inch', 'waveshare_29inch', 'waveshare29inch'):
        config['ui']['display']['type'] = 'waveshare29inch'
    elif config['ui']['display']['type'] in ('lcdhat',):
        config['ui']['display']['type'] = 'lcdhat'
    elif config['ui']['display']['type'] in ('dfrobot_1', 'df1'):
        config['ui']['display']['type'] = 'dfrobot_1'
    elif config['ui']['display']['type'] in ('dfrobot_2', 'df2'):
        config['ui']['display']['type'] = 'dfrobot_2'
    elif config['ui']['display']['type'] in ('ws_154inch', 'ws154inch', 'waveshare_154inch', 'waveshare154inch'):
        config['ui']['display']['type'] = 'waveshare154inch'
    elif config['ui']['display']['type'] in ('waveshare144lcd', 'ws_144inch', 'ws144inch', 'waveshare_144inch', 'waveshare144inch'):
        config['ui']['display']['type'] = 'waveshare144lcd'
    elif config['ui']['display']['type'] in ('ws_213d', 'ws213d', 'waveshare_213d', 'waveshare213d'):
        config['ui']['display']['type'] = 'waveshare213d'
    elif config['ui']['display']['type'] in ('ws_213bc', 'ws213bc', 'waveshare_213bc', 'waveshare213bc'):
        config['ui']['display']['type'] = 'waveshare213bc'
    elif config['ui']['display']['type'] in ('spotpear24inch'):
        config['ui']['display']['type'] = 'spotpear24inch'
    else:
        print("unsupported display type %s" % config['ui']['display']['type'])
        sys.exit(1)
    return config
def secs_to_hhmmss(secs):
    mins, secs = divmod(secs, 60)
    hours, mins = divmod(mins, 60)
    return '%02d:%02d:%02d' % (hours, mins, secs)
def total_unique_handshakes(path):
    expr = os.path.join(path, "*.pcap")
    return len(glob.glob(expr))
def iface_channels(ifname):
    channels = []
    output = subprocess.getoutput("/sbin/iwlist %s freq" % ifname)
    for line in output.split("\n"):
        line = line.strip()
        if line.startswith("Channel "):
            channels.append(int(line.split()[1]))
    return channels
def led(on=True):
    with open('/sys/class/leds/led0/brightness', 'w+t') as fp:
        fp.write("%d" % (0 if on is True else 1))
def blink(times=1, delay=0.3):
    for _ in range(0, times):
        led(True)
        time.sleep(delay)
        led(False)
        time.sleep(delay)
    led(True)
class WifiInfo(Enum):
    """
    Fields you can extract from a pcap file
    """
    BSSID = 0
    ESSID = 1
    ENCRYPTION = 2
    CHANNEL = 3
    RSSI = 4
class FieldNotFoundError(Exception):
    pass
def md5(fname):
    """
    https://stackoverflow.com/questions/3431825/generating-an-md5-checksum-of-a-file
    """
    import hashlib
    hash_md5 = hashlib.md5()
    with open(fname, "rb") as f:
        for chunk in iter(lambda: f.read(4096), b""):
            hash_md5.update(chunk)
    return hash_md5.hexdigest()
def extract_from_pcap(path, fields):
    """
    Search in pcap-file for specified information
    path: Path to pcap file
    fields: Array of fields that should be extracted
    If a field is not found, FieldNotFoundError is raised
    """
    results = dict()
    for field in fields:
        if not isinstance(field, WifiInfo):
            raise TypeError("Invalid field")
        subtypes = set()
        if field == WifiInfo.BSSID:
            from scapy.all import Dot11Beacon, Dot11ProbeResp, Dot11AssoReq, Dot11ReassoReq, Dot11, sniff
            subtypes.add('beacon')
            bpf_filter = " or ".join([f"wlan type mgt subtype {subtype}" for subtype in subtypes])
            packets = sniff(offline=path, filter=bpf_filter)
            try:
                for packet in packets:
                    if packet.haslayer(Dot11Beacon):
                        if hasattr(packet[Dot11], 'addr3'):
                            results[field] = packet[Dot11].addr3
                            break
                else:  # magic
                    raise FieldNotFoundError("Could not find field [BSSID]")
            except Exception:
                raise FieldNotFoundError("Could not find field [BSSID]")
        elif field == WifiInfo.ESSID:
            from scapy.all import Dot11Beacon, Dot11ReassoReq, Dot11AssoReq, Dot11, sniff, Dot11Elt
            subtypes.add('beacon')
            subtypes.add('assoc-req')
            subtypes.add('reassoc-req')
            bpf_filter = " or ".join([f"wlan type mgt subtype {subtype}" for subtype in subtypes])
            packets = sniff(offline=path, filter=bpf_filter)
            try:
                for packet in packets:
                    if packet.haslayer(Dot11Elt) and hasattr(packet[Dot11Elt], 'info'):
                        results[field] = packet[Dot11Elt].info.decode('utf-8')
                        break
                else:  # magic
                    raise FieldNotFoundError("Could not find field [ESSID]")
            except Exception:
                raise FieldNotFoundError("Could not find field [ESSID]")
        elif field == WifiInfo.ENCRYPTION:
            from scapy.all import Dot11Beacon, sniff
            subtypes.add('beacon')
            bpf_filter = " or ".join([f"wlan type mgt subtype {subtype}" for subtype in subtypes])
            packets = sniff(offline=path, filter=bpf_filter)
            try:
                for packet in packets:
                    if packet.haslayer(Dot11Beacon) and hasattr(packet[Dot11Beacon], 'network_stats'):
                        stats = packet[Dot11Beacon].network_stats()
                        if 'crypto' in stats:
                            results[field] = stats['crypto']  # set with encryption types
                            break
                else:  # magic
                    raise FieldNotFoundError("Could not find field [ENCRYPTION]")
            except Exception:
                raise FieldNotFoundError("Could not find field [ENCRYPTION]")
        elif field == WifiInfo.CHANNEL:
            from scapy.all import sniff, RadioTap
            from pwnagotchi.mesh.wifi import freq_to_channel
            packets = sniff(offline=path, count=1)
            try:
                results[field] = freq_to_channel(packets[0][RadioTap].ChannelFrequency)
            except Exception:
                raise FieldNotFoundError("Could not find field [CHANNEL]")
        elif field == WifiInfo.RSSI:
            from scapy.all import sniff, RadioTap
            from pwnagotchi.mesh.wifi import freq_to_channel
            packets = sniff(offline=path, count=1)
            try:
                results[field] = packets[0][RadioTap].dBm_AntSignal
            except Exception:
                raise FieldNotFoundError("Could not find field [RSSI]")
    return results
class StatusFile(object):
    def __init__(self, path, data_format='raw'):
        self._path = path
        self._updated = None
        self._format = data_format
        self.data = None
        if os.path.exists(path):
            self._updated = datetime.fromtimestamp(os.path.getmtime(path))
            with open(path) as fp:
                if data_format == 'json':
                    self.data = json.load(fp)
                else:
                    self.data = fp.read()
    def data_field_or(self, name, default=""):
        if self.data is not None and name in self.data:
            return self.data[name]
        return default
    def newer_then_minutes(self, minutes):
        return self._updated is not None and ((datetime.now() - self._updated).seconds / 60) < minutes
    def newer_then_hours(self, hours):
        return self._updated is not None and ((datetime.now() - self._updated).seconds / (60 * 60)) < hours
    def newer_then_days(self, days):
        return self._updated is not None and (datetime.now() - self._updated).days < days
    def update(self, data=None):
        from pwnagotchi.fs import ensure_write
        self._updated = datetime.now()
        self.data = data
        with ensure_write(self._path, 'w') as fp:
            if data is None:
                fp.write(str(self._updated))
            elif self._format == 'json':
                json.dump(self.data, fp)
            else:
                fp.write(data)
相关文章
|
4月前
|
小程序 存储 UED
如何实现一次搭建 多平台适配的小程序
【6月更文挑战第3天】如何实现一次搭建 多平台适配的小程序
|
4月前
|
容器
代码适配性问题怎么解决,如何快速创建左右布局
代码适配性问题怎么解决,如何快速创建左右布局
|
5月前
|
iOS开发
iPad适配
iPad适配
51 0
|
5月前
|
JSON iOS开发 数据格式
iPhont X适配
iPhont X适配
33 0
|
5月前
|
编解码 前端开发
什么是适配?
什么是适配?
|
12月前
|
设计模式 Java
JAVA设计模式7:适配者模式,彻底解决两不兼容接口之间的问题
JAVA设计模式7:适配者模式,彻底解决两不兼容接口之间的问题
138 0
|
SQL 消息中间件 缓存
12种接口优化的通用方案
12种接口优化的通用方案
221 0
|
前端开发 开发者 容器
总结适配方案|学习笔记
快速学习 总结适配方案
|
存储 测试技术 数据安全/隐私保护
RobotFrameWork接口项目分层及通用控制方式
RobotFrameWork接口项目分层及通用控制方式
1000 0
RobotFrameWork接口项目分层及通用控制方式
Pwnagotchi_waveshare_V3适配(海南大学)(上)
Pwnagotchi_waveshare_V3适配(海南大学)(上)
255 0
Pwnagotchi_waveshare_V3适配(海南大学)(上)