ADB命令来捕获设备屏幕和发送鼠标事件来实现抓取公众号文章
现在公众号需要登陆账号才能看到评论内容。登陆要账号的密码等,token还会过期。
现在的很多小程序进行了加密,那是否有方案可以抓取小程序或公众号的数据呢?
解决方案:
1.通过ADB命令来捕获设备屏幕快照,传递给电脑并且保存在本地文件;
2.通过百度飞桨ocr解析图片获取内容并保存;
3.根据解析的内容和坐标,向手机发送鼠标事件(点击和上下,左右滑动)来控制页面的跳转。
有待解决的问题:
百度飞桨ocr解析普通文字正确很高,但是经常把图片图标或logo识别成不同的文字,导致解析不错误。
由于公众号文章形式多样,很难统一处理,经常需要调整算法。
当然爬取普通单一样式的文章不在话下。
具体解决方案是:
开一个协程专门进行手机抓屏幕保存到电脑。
另开一个协程进行图片解析数据保存,根据解析的内容发送鼠标事件控制手机中的应用打开和翻到对应页面。
注意:
1.需要一个通过数据线连接电脑的安卓手机(苹果手机不行,不能是充电线,要是数据线,有的线只有充电功能无数据传输功能)。
2.翻页不能太快,因为ocr解析图片很慢,翻页太快会漏掉部分屏幕的解析。
3.因为公众号有记忆功能。如上次看文章翻到底部,下次再点进来看到的文章仍在底部。当下拉三次ocr识别页面相同就认为到顶,当上拉三次ocr识别的页面相同就认为到底部。
是不是很强大,把软件的加密算法变成了花瓶,达到所见即所得。这就是AI的强大应用之一,妥妥的非主流。
来分享部分抓公众号文章及评论的代码:
主程序:
async def main():
print("main")
sno = 0
singleton1 = OCSingleton()
img_path = r"screenshot.png"
file_to_open = Path(img_path)
if (file_to_open.is_file()):
os.remove(img_path)
isDawAdbScreenshot = True
if(isDawAdbScreenshot):
# 初始化Pygame
pygame.init()
ocr = PaddleOCR(use_angle_cls=True, lang="ch", use_gpu=False)
try:
await taskProccess(isDawAdbScreenshot, singleton1, ocr)
except Exception as e:
print('Exception:', e)
await taskProccess(isDawAdbScreenshot, singleton1, ocr)
finally:
print('finally...')
主要的事务处理(发送鼠标事件,起协程,页面跳转控制):
async def taskProccess(isDawAdbScreenshot, singleton1, ocr):
print("main 点击坐标判断0-0")
# sn = 0
while True:
task1 = asyncio.create_task(adbPngGrap( isDawAdbScreenshot)) # 使用asyncio.create_task将函数打包成一个任务,该协程将自动排入日程等待运行
task2 = asyncio.create_task(check_text_show(not isDawAdbScreenshot, ocr))
if(checkNotSkip(singleton1, True)):
await asyncio.sleep(4)
await task1
result1 = await task2
# print(sno) # 等待 task1 如果task1中存在耗时操作,则挂起
print(result1)
# print("main 点击坐标判断:%d ; %d" %(len(result1), (result1[0] == ['home'])))
print("main 点击坐标判断0 now:%s, singleton1.sameArticleTime:%d, singleton1.isArticleTop:%d, singleton1.pageName:%s, singleton1.isNotCommonArticle:%d,singleton1.isFinishSingleArticle:%d, singleton1.backPageBox:%s, singleton1.secondBackPageBox:%s" %(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), singleton1.sameArticleTime, singleton1.isArticleTop, singleton1.pageName, singleton1.isNotCommonArticle ,singleton1.isFinishSingleArticle, singleton1.backPageBox, singleton1.secondBackPageBox))
if((singleton1.pageName == '当前文章') and (not singleton1.isArticleTop) and (singleton1.down_times != 50) and (singleton1.down_times > 0)):
singleton1.down_times = singleton1.down_times-1
swipe_cmd = "adb shell input swipe {} {} {} {} {}".format(
int(224), (int(500)), int(224),
int(window_size[1]/0.3-300), int(window_size[1]/0.3-500)
)
subprocess.run(swipe_cmd.split())
print("main 点击坐标判1 向下滑动: int(start_y)= %d swipe_duration=%d, singleton1.down_times:%d" %(int(window_size[1]/0.3), int(window_size[1]/0.3-350), singleton1.down_times))
await asyncio.sleep(0.1)
elif((singleton1.pageName == '公众号文章列表页') and (singleton1.isBackPublicAccountsList)):
backUpPage("公众号文章列表页")
elif((singleton1.pageName == '当前文章') and (singleton1.isNotCommonArticle)):
print("main 点击坐标判2 -0")
backUpPage('当前文章')
elif(isinstance(result1, list) and (5 == len(result1))):
print(result1)
boxUnit = result1[1]
if(singleton1.isFinishSingleArticle):
if (singleton1.isNotCommonArticle) and isinstance(singleton1.articleComments, list) and isinstance(singleton1.articleCommentsBoxes, list) and (len(singleton1.articleComments) > 0) and (len(singleton1.oldArticleTitle) > 0) and (len(singleton1.articleComments) == len(singleton1.articleCommentsBoxes)):
if(singleton1.oldArticleTitle != singleton1.articleTitle):
if(singleton1.pageName == '当前文章'):
#发送数据给后台成功
print("main 点击坐标判2 -1")
backUpPage("当前文章")
else:
if(singleton1.pageName == '当前文章'):
#相同文章返回上一个页面
print("main 点击坐标判2 -2")
backUpPage("当前文章")
else:
if (not (singleton1.isNotCommonArticle)) and isinstance(singleton1.articleTitle, str) and isinstance(singleton1.articleComments, list) and isinstance(singleton1.articleCommentsBoxes, list) and (len(singleton1.articleComments) > 0):
storeFile1 = OCStoreFile()
print("main 自动点击对象1-0:storeFile1%d" %(id(storeFile1)))
storeFile1.storeArticleData(singleton1)
if(singleton1.pageName == '当前文章'):
print("main 点击坐标判2 -3")
backUpPage("当前文章")
elif isinstance(boxUnit, list):
if isinstance(result1[0], list) and ((result1[0] == ['article']) or (result1[0] == ['articleTop'])):
if(singleton1.pageName == '当前文章'):
if(result1[0] == ['article']):
swipe_cmd = "adb shell input swipe {} {} {} {} {}".format(
int(224), int(window_size[1]/0.3-400), int(224),
(int(window_size[1]/0.3-400)-int(window_size[1]/0.3-550)), int(window_size[1]/0.3-350)
)
print("main 点击坐标判断 向上滑动2:%d int(start_y)= %d swipe_duration=%d" %(len(result1), int(window_size[1]/0.3), int(window_size[1]/0.3-400)))
else:
swipe_cmd = "adb shell input swipe {} {} {} {} {}".format(
int(224), (int(500)), int(224),
int(window_size[1]/0.3-300), int(window_size[1]/0.3-500)
)
print("main 点击坐标判断3 向下滑动:%d int(start_y)= %d swipe_duration=%d" %(len(result1), int(window_size[1]/0.3), int(window_size[1]/0.3-350)))
subprocess.run(swipe_cmd.split())
else:
x = (boxUnit[0][0]+ boxUnit[1][0]+boxUnit[2][0]+boxUnit[3][0])/4.0
y = (boxUnit[0][1]+ boxUnit[1][1]+boxUnit[2][1]+boxUnit[3][1])/4.0
tap_cmd = adb_tap_cmd.format(int(x), int(y))
# 发送tap命令到设备
subprocess.run(tap_cmd.split())
print("main 点击坐标判断4:%d x= %d y=%d" %(len(result1), int(x), int(y)))
print("main 自动点击对象5:%s" %(result1))
if(checkNotSkip(singleton1,True)):
await asyncio.sleep(4)
if(isDawAdbScreenshot):
# 处理Pygame事件(如关闭窗口或鼠标点击)
for event in pygame.event.get():
if event.type == pygame.QUIT:
pygame.quit()
sys.exit()
elif event.type == pygame.MOUSEBUTTONDOWN:
# 将鼠标点击事件转换为安卓设备的tap命令
x, y = event.pos
x *= width / window_size[0]
y *= height / window_size[1]
print("main event x= %d y=%d" %(int(x), int(y)))
tap_cmd = adb_tap_cmd.format(int(x), int(y))
# 发送tap命令到设备
subprocess.run(tap_cmd.split())
elif event.type == pygame.MOUSEMOTION:
# 将鼠标移动事件转换为安卓设备的坐标系
x, y = event.pos
x *= width / window_size[0]
y *= height / window_size[1]
# 如果鼠标左键已按下,则将鼠标移动事件转换为滑动事件
if event.buttons[0]:
# 根据鼠标移动距离计算滑动距离和方向
dx = event.rel[0]
dy = event.rel[1]
distance = (dx ** 2 + dy ** 2) ** 0.5
direction = (dx / distance, dy / distance)
# 计算滑动距离和速度
swipe_distance = min(width, height) * distance / 200
swipe_duration = int(swipe_distance)
# 计算滑动的起点和终点
start_x, start_y = x - direction[0] * swipe_distance / 2, y - direction[1] * swipe_distance / 2
end_x, end_y = x + direction[0] * swipe_distance / 2, y + direction[1] * swipe_distance / 2
# 使用ADB命令模拟滑动事件
swipe_cmd = "adb shell input swipe {} {} {} {} {}".format(
int(start_x), int(start_y), int(end_x),
int(end_y), swipe_duration
)
print("main event start_x= %d start_y=%d; end_x=%d end_y=%d; swipe_duration=%d" %(int(start_x), int(start_y), int(end_x), int(end_y), int(swipe_duration)))
subprocess.run(swipe_cmd.split())
# 检查Pygame事件(如关闭窗口)
for event in pygame.event.get():
if event.type == pygame.QUIT:
pygame.quit()
sys.exit()
ADB命令来捕获设备屏幕快照并保存到电脑的代码:
async def adbPngGrap(isDraw):
print("adbPngGrap 需要绘制图形:%d " %(isDraw))
singleton1 = OCSingleton()
if(singleton1.isConvertImage):
# await asyncio.sleep(0.5)
if(checkNotSkip(singleton1, True)):
await asyncio.sleep(2)
return
# await asyncio.sleep(0.5)
#抓取图片
os.system("adb exec-out screencap -p > screenshot.png")
# await asyncio.sleep(4)
if isDraw:
img_path = r"screenshot.png"
file_to_open = Path(img_path)
if not (file_to_open.is_file()):
return None
image1 = cv2.imread(img_path)
if image1 is None:
return None
image = pygame.image.load('screenshot.png')
# image = pygame.image.load(os.path.join('data', 'bla.png'))
# 缩放图像以适应窗口大小
image = pygame.transform.scale(image, window_size)
# 显示图像
screen.blit(image, (0, 0))
pygame.display.update()
print("adbPngGrap is finished!")
至于具体的业务逻辑代码(协程async def check_text_show(isDraw, ocr):)因为公司保密性和代码太长就不写了。抓小程序的内容也是同样的道理,识别出按钮的文字和坐标发送鼠标事件进行解析。有这个程序爬别人的小说何需打字员,爬虫和ocr帮你搞定一切,并且是免费的。