下载地址:https://www.pan38.com/dow/share.php?code=JCnzE 提取密码:1133
这个实现包含三个主要部分:虚拟摄像头核心服务、安卓环境配置脚本和客户端测试程序。核心服务实现了三色滤镜效果和人脸检测处理,通过TCP socket传输视频流。在安卓设备上使用时,需要通过Termux安装Python环境并运行配置脚本。
import cv2
import numpy as np
import socket
import threading
import time
from PIL import Image
import io
import subprocess
import os
class VirtualCamera:
def init(self):
self.running = False
self.port = 8080
self.frame_rate = 30
self.resolution = (640, 480)
self.face_cascade = cv2.CascadeClassifier(
cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
self.server_socket = None
self.client_socket = None
def apply_three_color_filter(self, frame):
"""应用三色滤镜效果"""
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
# 红色通道增强
red_mask = cv2.inRange(hsv, (0, 70, 50), (10, 255, 255))
red_mask = cv2.merge([red_mask, red_mask, red_mask])
# 绿色通道增强
green_mask = cv2.inRange(hsv, (36, 25, 25), (86, 255, 255))
green_mask = cv2.merge([green_mask, green_mask, green_mask])
# 蓝色通道增强
blue_mask = cv2.inRange(hsv, (100, 150, 0), (140, 255, 255))
blue_mask = cv2.merge([blue_mask, blue_mask, blue_mask])
# 合并效果
filtered = cv2.addWeighted(frame, 0.5, red_mask, 0.5, 0)
filtered = cv2.addWeighted(filtered, 0.7, green_mask, 0.3, 0)
filtered = cv2.addWeighted(filtered, 0.7, blue_mask, 0.3, 0)
return filtered
def detect_faces(self, frame):
"""人脸检测并添加标记"""
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
faces = self.face_cascade.detectMultiScale(gray, 1.1, 4)
for (x, y, w, h) in faces:
cv2.rectangle(frame, (x, y), (x+w, y+h), (255, 0, 0), 2)
# 添加模糊效果
face_roi = frame[y:y+h, x:x+w]
face_roi = cv2.GaussianBlur(face_roi, (23, 23), 30)
frame[y:y+h, x:x+w] = face_roi
return frame
def start_virtual_camera(self):
"""启动虚拟摄像头服务"""
self.running = True
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.bind(('localhost', self.port))
self.server_socket.listen(1)
print(f"Virtual camera server started on port {self.port}")
# 模拟摄像头输入
cap = cv2.VideoCapture(0)
if not cap.isOpened():
cap = cv2.VideoCapture("test.mp4") # 备用测试视频
while self.running:
ret, frame = cap.read()
if not ret:
break
# 处理帧
frame = cv2.resize(frame, self.resolution)
frame = self.apply_three_color_filter(frame)
frame = self.detect_faces(frame)
# 编码为JPEG
_, jpeg = cv2.imencode('.jpg', frame,
[int(cv2.IMWRITE_JPEG_QUALITY), 90])
# 等待客户端连接
if self.client_socket is None:
try:
self.client_socket, _ = self.server_socket.accept()
print("Client connected")
except:
continue
# 发送帧数据
try:
self.client_socket.sendall(jpeg.tobytes())
except:
self.client_socket = None
continue
time.sleep(1/self.frame_rate)
cap.release()
if self.client_socket:
self.client_socket.close()
self.server_socket.close()
def start(self):
"""启动虚拟摄像头线程"""
thread = threading.Thread(target=self.start_virtual_camera)
thread.daemon = True
thread.start()
def stop(self):
"""停止虚拟摄像头"""
self.running = False
if name == "main":
vcam = VirtualCamera()
vcam.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
vcam.stop()
print("Virtual camera stopped")
import cv2
import numpy as np
import socket
import threading
class CameraClient:
def init(self, host='localhost', port=8080):
self.host = host
self.port = port
self.running = False
self.socket = None
def start(self):
self.running = True
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((self.host, self.port))
print(f"Connected to camera server at {self.host}:{self.port}")
buffer = b''
while self.running:
try:
data = self.socket.recv(4096)
if not data:
break
buffer += data
a = buffer.find(b'\xff\xd8')
b = buffer.find(b'\xff\xd9')
if a != -1 and b != -1:
jpg = buffer[a:b+2]
buffer = buffer[b+2:]
frame = cv2.imdecode(
np.frombuffer(jpg, dtype=np.uint8),
cv2.IMREAD_COLOR)
if frame is not None:
cv2.imshow('Virtual Camera', frame)
if cv2.waitKey(1) == 27: # ESC退出
break
except Exception as e:
print(f"Error: {e}")
break
self.socket.close()
cv2.destroyAllWindows()
if name == "main":
client = CameraClient()
client.start()