【python项目推荐】键盘监控--统计打字频率

简介: 【python项目推荐】键盘监控--统计打字频率

项目简介

该项目实现了打字频率统计及可视化功能。

主要使用的库

pynput:允许您控制和监视输入设备。 这里我们用来获取键盘输入。

SQLAlchemy:数据库操作。 这里我们用来保存键盘输入。

streamlit:提供可视化界面

项目组成

agent.py :获得键盘输入
display.py:可视化

补充说明

如果你不想用原文的数据库,也可以替换为本地的数据库,如免安装的sqlite

agent.py

# agent.py
from dotenv import load_dotenv
from pynput import keyboard
from pynput.keyboard import Key

import concurrent.futures
import logging
import os
import queue
import sqlalchemy
import sqlalchemy.exc
import sys
import time


MODIFIERS = {
    Key.shift, Key.shift_l, Key.shift_r,
    Key.alt, Key.alt_l, Key.alt_r, Key.alt_gr,
    Key.ctrl, Key.ctrl_l, Key.ctrl_r,
    Key.cmd, Key.cmd_l, Key.cmd_r,
}

TABLE = sqlalchemy.Table(
    'keyboard_monitor',
    sqlalchemy.MetaData(),
    sqlalchemy.Column('hits', sqlalchemy.String),
    sqlalchemy.Column('ts', sqlalchemy.DateTime),
)


if __name__ == '__main__':
    load_dotenv()

    log = logging.getLogger("agent")
    log.setLevel(logging.DEBUG)
    formatter = logging.Formatter('%(asctime)s %(levelname)s %(funcName)s %(message)s')
    file_handler = logging.FileHandler(f'agent-{time.time_ns()}.log', encoding='utf-8')
    file_handler.setLevel(logging.DEBUG)
    file_handler.setFormatter(formatter)
    stdout_handler = logging.StreamHandler(sys.stdout)
    stdout_handler.setLevel(logging.INFO)
    stdout_handler.setFormatter(formatter)
    log.addHandler(file_handler)
    log.addHandler(stdout_handler)

    #engine = sqlalchemy.create_engine(os.environ['DATABASE_URL'], 
    #                                  echo_pool=True, 
    #                                  isolation_level='AUTOCOMMIT')
    engine = sqlalchemy.create_engine("sqlite:///keyboard.db")
    current_modifiers = set()
    pending_hits = queue.Queue()
    cancel_signal = queue.Queue()

    def on_press(key):
        if key in MODIFIERS:
            current_modifiers.add(key)
        else:
            hits = sorted([ str(key) for key in current_modifiers ]) + [ str(key) ]
            hits = '+'.join(hits)
            pending_hits.put(hits)
        log.debug(f'{key} pressed, current_modifiers: {current_modifiers}')

    def on_release(key):
        if key in MODIFIERS:
            try:
                current_modifiers.remove(key)
            except KeyError:
                log.warning(f'Key {key} not in current_modifiers {current_modifiers}')
        log.debug(f'{key} released, current_modifiers: {current_modifiers}')

    #with engine.connect() as connection:
    #    connection.execute(sqlalchemy.sql.text("""
    #        CREATE TABLE IF NOT EXISTS keyboard_monitor (
    #            hits STRING NULL,
    #            ts TIMESTAMP(3) NOT NULL,
    #            TIME INDEX ("ts")
    #        ) ENGINE=mito WITH( regions = 1, ttl = '3months')
    #    """))
    # ...
    

    from sqlalchemy import create_engine, Table, Column, String, TIMESTAMP, MetaData, Index
    metadata = MetaData()
    keyboard_monitor = Table(
        'keyboard_monitor', metadata,
        Column('hits', String, nullable=True),
        Column('ts', TIMESTAMP, nullable=False),
    )

    metadata.create_all(engine)

   


    def sender_thread():
        retries = 0
        while True:
            hits = pending_hits.get()
            log.debug(f'got: {hits}')
            if hits is None:
                log.info("Exiting...")
                break
            with engine.connect() as connection:
                try:
                    log.debug(f'sending: {hits}')
                    connection.execute(TABLE.insert().values(hits=hits, ts=sqlalchemy.func.now()))
                    connection.commit()# ...
                    log.info(f'sent: {hits}')
                    retries = 0
                except sqlalchemy.exc.OperationalError as e:
                    if retries >= 10:
                        log.error(f'Retry exceeds. Operational error: {e}')
                        pending_hits.put(hits)
                        continue

                    if e.connection_invalidated:
                        log.warning(f'Connection invalidated: {e}')
                        pending_hits.put(hits)
                        continue

                    msg = str(e)
                    if "(1815, 'Internal error: 1000')" in msg:
                        # TODO 1815 - should not handle internal error;
                        # see https://github.com/GreptimeTeam/greptimedb/issues/3447
                        log.warning(f'Known operational error: {e}')
                        pending_hits.put(hits)
                        continue
                    elif '2005' in msg and 'Unknown MySQL server host' in msg:
                        log.warning(f'DNS temporary unresolved: {e}')
                        pending_hits.put(hits)
                        continue

                    raise e
                finally:
                    retries += 1

    def listener_thread():
        with keyboard.Listener(on_press=on_press, on_release=on_release) as listener:
            log.info("Listening...")
            cancel_signal.get()
            pending_hits.put(None)
            log.info("Exiting...")

    with concurrent.futures.ThreadPoolExecutor() as executor:
        sender = executor.submit(sender_thread)
        listener = executor.submit(listener_thread)
        try:
            f = concurrent.futures.wait([sender, listener], return_when=concurrent.futures.FIRST_EXCEPTION)
            for fut in f.done:
                log.error(f'Unhandled exception for futures: {fut.exception(timeout=0)}')
        except KeyboardInterrupt as e:
            log.info("KeyboardInterrupt. Exiting...")
        except Exception as e:
            log.error(f'Unhandled exception: {e}')
        finally:
            cancel_signal.put(True)


display.py

# display.py
import datetime
import os
from dotenv import load_dotenv
import pytz
import streamlit as st
import tzlocal
import pandas

st.title("Keyboard Monitor")

load_dotenv()
#conn = st.connection(
##    type="sql",
#    url="sqlite:///keyboard.db",
#)

conn = st.connection('keyboard', type='sql', url="sqlite:///keyboard.db")

df = conn.query("SELECT COUNT(*) AS total_hits FROM keyboard_monitor")
st.metric("Total hits", df.total_hits[0])

most_frequent_key, most_frequent_combo = st.columns(2)
df = conn.query("""
SELECT hits, COUNT(*) as times
FROM keyboard_monitor
WHERE hits NOT LIKE '%+%'
GROUP BY hits
ORDER BY times DESC limit 1;
""")
most_frequent_key.metric("Most frequent key", df.hits[0])
df = conn.query("""
SELECT hits, COUNT(*) as times
FROM keyboard_monitor
WHERE hits LIKE '%+%'
GROUP BY hits
ORDER BY times DESC limit 1;
""")
most_frequent_combo.metric("Most frequent combo", df.hits[0])

top_frequent_keys, top_frequent_combos = st.columns(2)
df = conn.query("""
SELECT hits, COUNT(*) as times
FROM keyboard_monitor
WHERE hits NOT LIKE '%+%'
GROUP BY hits
ORDER BY times DESC limit 10;
""")
top_frequent_keys.subheader("Top 10 keys")
top_frequent_keys.dataframe(df)
df = conn.query("""
SELECT hits, COUNT(*) as times
FROM keyboard_monitor
WHERE hits LIKE '%+%'
GROUP BY hits
ORDER BY times DESC limit 10;
""")
top_frequent_combos.subheader("Top 10 combos")
top_frequent_combos.dataframe(df)

st.header("Find your inputs frequency of day")
local_tz = tzlocal.get_localzone()
hours = int(local_tz.utcoffset(datetime.datetime.now()).total_seconds() / 3600)
if hours > 0:
    offset = f" + INTERVAL '{hours} hours'"
elif hours < 0:
    offset = f" - INTERVAL '{hours} hours'"
else:
    offset = ''
d = st.date_input("Pick a day:", value=datetime.date.today())
query = f"""
SELECT 
    ts,
    COUNT(1) AS times
FROM keyboard_monitor
WHERE strftime('%Y-%m-%d', ts, 'localtime') = '{d}'
GROUP BY strftime('%Y-%m-%d %H:00:00', ts)
ORDER BY ts ASC
LIMIT 10;
"""

df = conn.query(query)
#print(df.keys())
df['ts'] = pandas.to_datetime(df['ts'])
df['ts'] = df['ts'].dt.tz_localize(pytz.utc).dt.tz_convert(local_tz)
st.dataframe(df)
相关文章
|
14小时前
|
JSON 中间件 数据处理
实践出真知:通过项目学习Python Web框架的路由与中间件设计
【7月更文挑战第19天】探索Python Web开发,掌握Flask或Django的关键在于理解路由和中间件。路由连接URL与功能,如Flask中@app.route()定义请求响应路径。中间件在请求处理前后执行,提供扩展功能,如日志、认证。通过实践项目,不仅学习理论,还能提升构建高效Web应用的能力。示例代码展示路由定义及模拟中间件行为,强调动手实践的重要性。
|
1天前
|
分布式计算 大数据 Java
如何使用Python的pyodps库来进行跨项目空间重命名表名?
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
26 12
|
1天前
|
前端开发 Python
前后端分离的进化:Python Web项目中的WebSocket实时通信解决方案
【7月更文挑战第18天】在Python的Flask框架中,结合Flask-SocketIO库可轻松实现WebSocket实时通信,促进前后端分离项目中的高效交互。示例展示了一个简单的聊天应用:Flask路由渲染HTML,客户端通过Socket.IO库连接服务器,发送消息并监听广播。此方法支持多种实时通信协议,适应不同环境,提供流畅的实时体验。
11 3
|
2天前
|
数据可视化 数据挖掘 定位技术
Seaborn统计图表指南
【7月更文挑战第12天】Seaborn是Python的数据可视化库,基于Matplotlib,提供美观的统计图形。要开始使用,需通过`pip install seaborn`安装。它支持多种图表,如分布图、热图、聚类图、箱线图、小提琴图、联合分布图、点图、多变量分布图、线性关系图、树地图、时间序列图、分面绘图、分类数据图、分布对比图、多变量图和气泡图等,适用于复杂数据分析和展示。Seaborn简化了创建这些高级图表的过程,使数据可视化更直观和高效。
17 5
|
4天前
|
前端开发 JavaScript API
惊呆了!学会AJAX与Fetch API,你的Python Web项目瞬间高大上!
【7月更文挑战第15天】AJAX和Fetch API是Web开发中的关键工具,用于异步前后端交互。AJAX借助XMLHttpRequest实现页面局部更新,而Fetch API是现代的、基于Promise的HTTP请求接口,提供更强大功能和简洁语法。结合Python Web框架如Django或Flask,利用这两者能创造无缝体验,提升项目性能和用户体验。学习并应用这些技术,将使你的Web应用焕发新生。**
14 5
|
6天前
|
数据可视化 Python
时间序列分析是一种统计方法,用于分析随时间变化的数据序列。在金融、经济学、气象学等领域,时间序列分析被广泛用于预测未来趋势、检测异常值、理解周期性模式等。在Python中,`statsmodels`模块是一个强大的工具,用于执行各种时间序列分析任务。
时间序列分析是一种统计方法,用于分析随时间变化的数据序列。在金融、经济学、气象学等领域,时间序列分析被广泛用于预测未来趋势、检测异常值、理解周期性模式等。在Python中,`statsmodels`模块是一个强大的工具,用于执行各种时间序列分析任务。
|
6天前
|
数据可视化 Linux 数据格式
`seaborn`是一个基于`matplotlib`的Python数据可视化库,它提供了更高级别的接口来绘制有吸引力的和信息丰富的统计图形。`seaborn`的设计目标是使默认图形具有吸引力,同时允许用户通过调整绘图参数来定制图形。
`seaborn`是一个基于`matplotlib`的Python数据可视化库,它提供了更高级别的接口来绘制有吸引力的和信息丰富的统计图形。`seaborn`的设计目标是使默认图形具有吸引力,同时允许用户通过调整绘图参数来定制图形。
|
10天前
|
运维 数据安全/隐私保护 Python
Python基于telnetlib模块实现交换机全面巡检自动化运维项目
Python基于telnetlib模块实现交换机全面巡检自动化运维项目
46 14
|
11天前
|
Python
【Python】已完美解决:(Python键盘中断报错问题) KeyboardInterrupt
【Python】已完美解决:(Python键盘中断报错问题) KeyboardInterrupt
20 3
|
11天前
|
机器学习/深度学习 Prometheus 监控
使用Python实现深度学习模型:模型监控与性能优化
【7月更文挑战第8天】 使用Python实现深度学习模型:模型监控与性能优化
24 4