【python项目推荐】键盘监控--统计打字频率

简介: 【python项目推荐】键盘监控--统计打字频率

项目简介

该项目实现了打字频率统计及可视化功能。

主要使用的库

pynput:允许您控制和监视输入设备。 这里我们用来获取键盘输入。

SQLAlchemy:数据库操作。 这里我们用来保存键盘输入。

streamlit:提供可视化界面

项目组成

agent.py :获得键盘输入
display.py:可视化

补充说明

如果你不想用原文的数据库,也可以替换为本地的数据库,如免安装的sqlite

agent.py

# agent.py
from dotenv import load_dotenv
from pynput import keyboard
from pynput.keyboard import Key

import concurrent.futures
import logging
import os
import queue
import sqlalchemy
import sqlalchemy.exc
import sys
import time


MODIFIERS = {
    Key.shift, Key.shift_l, Key.shift_r,
    Key.alt, Key.alt_l, Key.alt_r, Key.alt_gr,
    Key.ctrl, Key.ctrl_l, Key.ctrl_r,
    Key.cmd, Key.cmd_l, Key.cmd_r,
}

TABLE = sqlalchemy.Table(
    'keyboard_monitor',
    sqlalchemy.MetaData(),
    sqlalchemy.Column('hits', sqlalchemy.String),
    sqlalchemy.Column('ts', sqlalchemy.DateTime),
)


if __name__ == '__main__':
    load_dotenv()

    log = logging.getLogger("agent")
    log.setLevel(logging.DEBUG)
    formatter = logging.Formatter('%(asctime)s %(levelname)s %(funcName)s %(message)s')
    file_handler = logging.FileHandler(f'agent-{time.time_ns()}.log', encoding='utf-8')
    file_handler.setLevel(logging.DEBUG)
    file_handler.setFormatter(formatter)
    stdout_handler = logging.StreamHandler(sys.stdout)
    stdout_handler.setLevel(logging.INFO)
    stdout_handler.setFormatter(formatter)
    log.addHandler(file_handler)
    log.addHandler(stdout_handler)

    #engine = sqlalchemy.create_engine(os.environ['DATABASE_URL'], 
    #                                  echo_pool=True, 
    #                                  isolation_level='AUTOCOMMIT')
    engine = sqlalchemy.create_engine("sqlite:///keyboard.db")
    current_modifiers = set()
    pending_hits = queue.Queue()
    cancel_signal = queue.Queue()

    def on_press(key):
        if key in MODIFIERS:
            current_modifiers.add(key)
        else:
            hits = sorted([ str(key) for key in current_modifiers ]) + [ str(key) ]
            hits = '+'.join(hits)
            pending_hits.put(hits)
        log.debug(f'{key} pressed, current_modifiers: {current_modifiers}')

    def on_release(key):
        if key in MODIFIERS:
            try:
                current_modifiers.remove(key)
            except KeyError:
                log.warning(f'Key {key} not in current_modifiers {current_modifiers}')
        log.debug(f'{key} released, current_modifiers: {current_modifiers}')

    #with engine.connect() as connection:
    #    connection.execute(sqlalchemy.sql.text("""
    #        CREATE TABLE IF NOT EXISTS keyboard_monitor (
    #            hits STRING NULL,
    #            ts TIMESTAMP(3) NOT NULL,
    #            TIME INDEX ("ts")
    #        ) ENGINE=mito WITH( regions = 1, ttl = '3months')
    #    """))
    # ...
    

    from sqlalchemy import create_engine, Table, Column, String, TIMESTAMP, MetaData, Index
    metadata = MetaData()
    keyboard_monitor = Table(
        'keyboard_monitor', metadata,
        Column('hits', String, nullable=True),
        Column('ts', TIMESTAMP, nullable=False),
    )

    metadata.create_all(engine)

   


    def sender_thread():
        retries = 0
        while True:
            hits = pending_hits.get()
            log.debug(f'got: {hits}')
            if hits is None:
                log.info("Exiting...")
                break
            with engine.connect() as connection:
                try:
                    log.debug(f'sending: {hits}')
                    connection.execute(TABLE.insert().values(hits=hits, ts=sqlalchemy.func.now()))
                    connection.commit()# ...
                    log.info(f'sent: {hits}')
                    retries = 0
                except sqlalchemy.exc.OperationalError as e:
                    if retries >= 10:
                        log.error(f'Retry exceeds. Operational error: {e}')
                        pending_hits.put(hits)
                        continue

                    if e.connection_invalidated:
                        log.warning(f'Connection invalidated: {e}')
                        pending_hits.put(hits)
                        continue

                    msg = str(e)
                    if "(1815, 'Internal error: 1000')" in msg:
                        # TODO 1815 - should not handle internal error;
                        # see https://github.com/GreptimeTeam/greptimedb/issues/3447
                        log.warning(f'Known operational error: {e}')
                        pending_hits.put(hits)
                        continue
                    elif '2005' in msg and 'Unknown MySQL server host' in msg:
                        log.warning(f'DNS temporary unresolved: {e}')
                        pending_hits.put(hits)
                        continue

                    raise e
                finally:
                    retries += 1

    def listener_thread():
        with keyboard.Listener(on_press=on_press, on_release=on_release) as listener:
            log.info("Listening...")
            cancel_signal.get()
            pending_hits.put(None)
            log.info("Exiting...")

    with concurrent.futures.ThreadPoolExecutor() as executor:
        sender = executor.submit(sender_thread)
        listener = executor.submit(listener_thread)
        try:
            f = concurrent.futures.wait([sender, listener], return_when=concurrent.futures.FIRST_EXCEPTION)
            for fut in f.done:
                log.error(f'Unhandled exception for futures: {fut.exception(timeout=0)}')
        except KeyboardInterrupt as e:
            log.info("KeyboardInterrupt. Exiting...")
        except Exception as e:
            log.error(f'Unhandled exception: {e}')
        finally:
            cancel_signal.put(True)


display.py

# display.py
import datetime
import os
from dotenv import load_dotenv
import pytz
import streamlit as st
import tzlocal
import pandas

st.title("Keyboard Monitor")

load_dotenv()
#conn = st.connection(
##    type="sql",
#    url="sqlite:///keyboard.db",
#)

conn = st.connection('keyboard', type='sql', url="sqlite:///keyboard.db")

df = conn.query("SELECT COUNT(*) AS total_hits FROM keyboard_monitor")
st.metric("Total hits", df.total_hits[0])

most_frequent_key, most_frequent_combo = st.columns(2)
df = conn.query("""
SELECT hits, COUNT(*) as times
FROM keyboard_monitor
WHERE hits NOT LIKE '%+%'
GROUP BY hits
ORDER BY times DESC limit 1;
""")
most_frequent_key.metric("Most frequent key", df.hits[0])
df = conn.query("""
SELECT hits, COUNT(*) as times
FROM keyboard_monitor
WHERE hits LIKE '%+%'
GROUP BY hits
ORDER BY times DESC limit 1;
""")
most_frequent_combo.metric("Most frequent combo", df.hits[0])

top_frequent_keys, top_frequent_combos = st.columns(2)
df = conn.query("""
SELECT hits, COUNT(*) as times
FROM keyboard_monitor
WHERE hits NOT LIKE '%+%'
GROUP BY hits
ORDER BY times DESC limit 10;
""")
top_frequent_keys.subheader("Top 10 keys")
top_frequent_keys.dataframe(df)
df = conn.query("""
SELECT hits, COUNT(*) as times
FROM keyboard_monitor
WHERE hits LIKE '%+%'
GROUP BY hits
ORDER BY times DESC limit 10;
""")
top_frequent_combos.subheader("Top 10 combos")
top_frequent_combos.dataframe(df)

st.header("Find your inputs frequency of day")
local_tz = tzlocal.get_localzone()
hours = int(local_tz.utcoffset(datetime.datetime.now()).total_seconds() / 3600)
if hours > 0:
    offset = f" + INTERVAL '{hours} hours'"
elif hours < 0:
    offset = f" - INTERVAL '{hours} hours'"
else:
    offset = ''
d = st.date_input("Pick a day:", value=datetime.date.today())
query = f"""
SELECT 
    ts,
    COUNT(1) AS times
FROM keyboard_monitor
WHERE strftime('%Y-%m-%d', ts, 'localtime') = '{d}'
GROUP BY strftime('%Y-%m-%d %H:00:00', ts)
ORDER BY ts ASC
LIMIT 10;
"""

df = conn.query(query)
#print(df.keys())
df['ts'] = pandas.to_datetime(df['ts'])
df['ts'] = df['ts'].dt.tz_localize(pytz.utc).dt.tz_convert(local_tz)
st.dataframe(df)
相关文章
|
5月前
|
监控 算法 安全
深度洞察内网监控电脑:基于Python的流量分析算法
在当今数字化环境中,内网监控电脑作为“守城卫士”,通过流量分析算法确保内网安全、稳定运行。基于Python的流量分析算法,利用`scapy`等工具捕获和解析数据包,提取关键信息,区分正常与异常流量。结合机器学习和可视化技术,进一步提升内网监控的精准性和效率,助力企业防范潜在威胁,保障业务顺畅。本文深入探讨了Python在内网监控中的应用,展示了其实战代码及未来发展方向。
|
23天前
|
存储 监控 算法
企业数据泄露风险防控视域下 Python 布隆过滤器算法的应用研究 —— 怎样防止员工私下接单,监控为例
本文探讨了布隆过滤器在企业员工行为监控中的应用。布隆过滤器是一种高效概率数据结构,具有空间复杂度低、查询速度快的特点,适用于大规模数据过滤场景。文章分析了其在网络访问监控和通讯内容筛查中的实践价值,并通过Python实现示例展示其技术优势。同时,文中指出布隆过滤器存在误判风险,需在准确性和资源消耗间权衡。最后强调构建多维度监控体系的重要性,结合技术与管理手段保障企业运营安全。
48 10
|
2月前
|
程序员 测试技术 开发工具
怎么开发Python第三方库?手把手教你参与开源项目!
大家好,我是程序员晚枫。本文将分享如何开发Python第三方库,并以我维护的开源项目 **popdf** 为例,指导参与开源贡献。Popdf是一个PDF操作库,支持PDF转Word、转图片、合并与加密等功能。文章涵盖从fork项目、本地开发、单元测试到提交PR的全流程,适合想了解开源贡献的开发者。欢迎访问[popdf](https://gitcode.com/python4office/popdf),一起交流学习!
129 21
怎么开发Python第三方库?手把手教你参与开源项目!
|
8月前
|
机器学习/深度学习 数据采集 数据可视化
Python 数据分析:从零开始构建你的数据科学项目
【10月更文挑战第9天】Python 数据分析:从零开始构建你的数据科学项目
165 2
|
5月前
|
测试技术 Python
【03】做一个精美的打飞机小游戏,规划游戏项目目录-分门别类所有的资源-库-类-逻辑-打包为可玩的exe-练习python打包为可执行exe-优雅草卓伊凡-持续更新-分享源代码和游戏包供游玩-1.0.2版本
【03】做一个精美的打飞机小游戏,规划游戏项目目录-分门别类所有的资源-库-类-逻辑-打包为可玩的exe-练习python打包为可执行exe-优雅草卓伊凡-持续更新-分享源代码和游戏包供游玩-1.0.2版本
240 31
【03】做一个精美的打飞机小游戏,规划游戏项目目录-分门别类所有的资源-库-类-逻辑-打包为可玩的exe-练习python打包为可执行exe-优雅草卓伊凡-持续更新-分享源代码和游戏包供游玩-1.0.2版本
|
3月前
|
Docker Python 容器
Docker——阿里云服务器使用Docker部署python项目全程小记
本文记录了我在阿里云服务器上使用Docker部署python项目(flask为例)的全过程,在这里记录和分享一下,希望可以给大家提供一些参考。
337 0
|
3月前
|
监控 算法 安全
基于 Python 广度优先搜索算法的监控局域网电脑研究
随着局域网规模扩大,企业对高效监控计算机的需求增加。广度优先搜索(BFS)算法凭借其层次化遍历特性,在Python中可用于实现局域网内的计算机设备信息收集、网络连接状态监测及安全漏洞扫描,确保网络安全与稳定运行。通过合理选择数据结构与算法,BFS显著提升了监控效能,助力企业实现智能化的网络管理。
58 7
|
6月前
|
Python
课程设计项目之基于Python实现围棋游戏代码
游戏进去默认为九路玩法,当然也可以选择十三路或是十九路玩法 使用pycharam打开项目,pip安装模块并引用,然后运行即可, 代码每行都有详细的注释,可以做课程设计或者毕业设计项目参考
140 33
|
7月前
|
监控 安全 测试技术
如何在实际项目中应用Python Web开发的安全测试知识?
如何在实际项目中应用Python Web开发的安全测试知识?
150 61
|
6月前
|
存储 运维 监控
探索局域网电脑监控软件:Python算法与数据结构的巧妙结合
在数字化时代,局域网电脑监控软件成为企业管理和IT运维的重要工具,确保数据安全和网络稳定。本文探讨其背后的关键技术——Python中的算法与数据结构,如字典用于高效存储设备信息,以及数据收集、异常检测和聚合算法提升监控效率。通过Python代码示例,展示了如何实现基本监控功能,帮助读者理解其工作原理并激发技术兴趣。
142 20

热门文章

最新文章

推荐镜像

更多
下一篇
oss创建bucket