项目简介
该项目实现了打字频率统计及可视化功能。
主要使用的库
pynput
:允许您控制和监视输入设备。 这里我们用来获取键盘输入。
SQLAlchemy
:数据库操作。 这里我们用来保存键盘输入。
streamlit
:提供可视化界面。
项目组成
agent.py :获得键盘输入 display.py:可视化
补充说明
如果你不想用原文的数据库,也可以替换为本地的数据库,如免安装的sqlite
,
agent.py
# agent.py from dotenv import load_dotenv from pynput import keyboard from pynput.keyboard import Key import concurrent.futures import logging import os import queue import sqlalchemy import sqlalchemy.exc import sys import time MODIFIERS = { Key.shift, Key.shift_l, Key.shift_r, Key.alt, Key.alt_l, Key.alt_r, Key.alt_gr, Key.ctrl, Key.ctrl_l, Key.ctrl_r, Key.cmd, Key.cmd_l, Key.cmd_r, } TABLE = sqlalchemy.Table( 'keyboard_monitor', sqlalchemy.MetaData(), sqlalchemy.Column('hits', sqlalchemy.String), sqlalchemy.Column('ts', sqlalchemy.DateTime), ) if __name__ == '__main__': load_dotenv() log = logging.getLogger("agent") log.setLevel(logging.DEBUG) formatter = logging.Formatter('%(asctime)s %(levelname)s %(funcName)s %(message)s') file_handler = logging.FileHandler(f'agent-{time.time_ns()}.log', encoding='utf-8') file_handler.setLevel(logging.DEBUG) file_handler.setFormatter(formatter) stdout_handler = logging.StreamHandler(sys.stdout) stdout_handler.setLevel(logging.INFO) stdout_handler.setFormatter(formatter) log.addHandler(file_handler) log.addHandler(stdout_handler) #engine = sqlalchemy.create_engine(os.environ['DATABASE_URL'], # echo_pool=True, # isolation_level='AUTOCOMMIT') engine = sqlalchemy.create_engine("sqlite:///keyboard.db") current_modifiers = set() pending_hits = queue.Queue() cancel_signal = queue.Queue() def on_press(key): if key in MODIFIERS: current_modifiers.add(key) else: hits = sorted([ str(key) for key in current_modifiers ]) + [ str(key) ] hits = '+'.join(hits) pending_hits.put(hits) log.debug(f'{key} pressed, current_modifiers: {current_modifiers}') def on_release(key): if key in MODIFIERS: try: current_modifiers.remove(key) except KeyError: log.warning(f'Key {key} not in current_modifiers {current_modifiers}') log.debug(f'{key} released, current_modifiers: {current_modifiers}') #with engine.connect() as connection: # connection.execute(sqlalchemy.sql.text(""" # CREATE TABLE IF NOT EXISTS keyboard_monitor ( # hits STRING NULL, # ts TIMESTAMP(3) NOT NULL, # TIME INDEX ("ts") # ) ENGINE=mito WITH( regions = 1, ttl = '3months') # """)) # ... from sqlalchemy import create_engine, Table, Column, String, TIMESTAMP, MetaData, Index metadata = MetaData() keyboard_monitor = Table( 'keyboard_monitor', metadata, Column('hits', String, nullable=True), Column('ts', TIMESTAMP, nullable=False), ) metadata.create_all(engine) def sender_thread(): retries = 0 while True: hits = pending_hits.get() log.debug(f'got: {hits}') if hits is None: log.info("Exiting...") break with engine.connect() as connection: try: log.debug(f'sending: {hits}') connection.execute(TABLE.insert().values(hits=hits, ts=sqlalchemy.func.now())) connection.commit()# ... log.info(f'sent: {hits}') retries = 0 except sqlalchemy.exc.OperationalError as e: if retries >= 10: log.error(f'Retry exceeds. Operational error: {e}') pending_hits.put(hits) continue if e.connection_invalidated: log.warning(f'Connection invalidated: {e}') pending_hits.put(hits) continue msg = str(e) if "(1815, 'Internal error: 1000')" in msg: # TODO 1815 - should not handle internal error; # see https://github.com/GreptimeTeam/greptimedb/issues/3447 log.warning(f'Known operational error: {e}') pending_hits.put(hits) continue elif '2005' in msg and 'Unknown MySQL server host' in msg: log.warning(f'DNS temporary unresolved: {e}') pending_hits.put(hits) continue raise e finally: retries += 1 def listener_thread(): with keyboard.Listener(on_press=on_press, on_release=on_release) as listener: log.info("Listening...") cancel_signal.get() pending_hits.put(None) log.info("Exiting...") with concurrent.futures.ThreadPoolExecutor() as executor: sender = executor.submit(sender_thread) listener = executor.submit(listener_thread) try: f = concurrent.futures.wait([sender, listener], return_when=concurrent.futures.FIRST_EXCEPTION) for fut in f.done: log.error(f'Unhandled exception for futures: {fut.exception(timeout=0)}') except KeyboardInterrupt as e: log.info("KeyboardInterrupt. Exiting...") except Exception as e: log.error(f'Unhandled exception: {e}') finally: cancel_signal.put(True)
display.py
# display.py import datetime import os from dotenv import load_dotenv import pytz import streamlit as st import tzlocal import pandas st.title("Keyboard Monitor") load_dotenv() #conn = st.connection( ## type="sql", # url="sqlite:///keyboard.db", #) conn = st.connection('keyboard', type='sql', url="sqlite:///keyboard.db") df = conn.query("SELECT COUNT(*) AS total_hits FROM keyboard_monitor") st.metric("Total hits", df.total_hits[0]) most_frequent_key, most_frequent_combo = st.columns(2) df = conn.query(""" SELECT hits, COUNT(*) as times FROM keyboard_monitor WHERE hits NOT LIKE '%+%' GROUP BY hits ORDER BY times DESC limit 1; """) most_frequent_key.metric("Most frequent key", df.hits[0]) df = conn.query(""" SELECT hits, COUNT(*) as times FROM keyboard_monitor WHERE hits LIKE '%+%' GROUP BY hits ORDER BY times DESC limit 1; """) most_frequent_combo.metric("Most frequent combo", df.hits[0]) top_frequent_keys, top_frequent_combos = st.columns(2) df = conn.query(""" SELECT hits, COUNT(*) as times FROM keyboard_monitor WHERE hits NOT LIKE '%+%' GROUP BY hits ORDER BY times DESC limit 10; """) top_frequent_keys.subheader("Top 10 keys") top_frequent_keys.dataframe(df) df = conn.query(""" SELECT hits, COUNT(*) as times FROM keyboard_monitor WHERE hits LIKE '%+%' GROUP BY hits ORDER BY times DESC limit 10; """) top_frequent_combos.subheader("Top 10 combos") top_frequent_combos.dataframe(df) st.header("Find your inputs frequency of day") local_tz = tzlocal.get_localzone() hours = int(local_tz.utcoffset(datetime.datetime.now()).total_seconds() / 3600) if hours > 0: offset = f" + INTERVAL '{hours} hours'" elif hours < 0: offset = f" - INTERVAL '{hours} hours'" else: offset = '' d = st.date_input("Pick a day:", value=datetime.date.today()) query = f""" SELECT ts, COUNT(1) AS times FROM keyboard_monitor WHERE strftime('%Y-%m-%d', ts, 'localtime') = '{d}' GROUP BY strftime('%Y-%m-%d %H:00:00', ts) ORDER BY ts ASC LIMIT 10; """ df = conn.query(query) #print(df.keys()) df['ts'] = pandas.to_datetime(df['ts']) df['ts'] = df['ts'].dt.tz_localize(pytz.utc).dt.tz_convert(local_tz) st.dataframe(df)