能力说明:
精通JVM运行机制,包括类生命、内存模型、垃圾回收及JVM常见参数;能够熟练使用Runnable接口创建线程和使用ExecutorService并发执行任务、识别潜在的死锁线程问题;能够使用Synchronized关键字和atomic包控制线程的执行顺序,使用并行Fork/Join框架;能过开发使用原始版本函数式接口的代码。
能力说明:
通过课程学习与实战项目,熟练掌握Python的语法知识与编程技能,具备Python语言的函数、面向对象、异常处理等能力,常用开发框架的实际应用和开发能力,具备使用,掌握Python数据分析三剑客Matplotlib、Numpy、Pandas的概念与应用场景,掌握利用Python语言从数据采集到分析的全流程相关知识。
暂时未有相关云产品技术能力~
在之前的文章中,FastAPI 学习之路(二十九)使用(哈希)密码和 JWT Bearer 令牌的 OAuth2,FastAPI 学习之路(二十八)使用密码和 Bearer 的简单 OAuth2,FastAPI 学习之路(三十四)数据库多表操作,我们分享了基于jwt认证token和基于数据库创建用户,那么我们今天把这些代码整理下,形成基于数据库用户名密码,登陆验证token存储到redis中。 首先我们看下之前基于jwt认证token的代码from fastapi import Depends,status,HTTPException from pydantic import BaseModel from typing import Optional from jose import JWTError, jwt from datetime import datetime, timedelta from passlib.context import CryptContext SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30 # oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") fake_users = { "leizi": { "username": "leizi", "full_name": "leizishuoceshikaifa", "email": "leizi@leizi.com", "hashed_password": "$2b$12$4grMcfV9UMijFC0CEeJOTuTHE21msQOmkUWuowUewRSXt8cimW/76", "disabled": False } } def fake_hash_password(password: str): return password class Token(BaseModel): access_token: str token_type: str class TokenData(BaseModel): username: Optional[str] = None password:Optional[str]=None class User(BaseModel): username: str email: Optional[str] = None full_name: Optional[str] = None disabled: Optional[bool] = None class UserInDB(User): hashed_password: str pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password): return pwd_context.hash(password) def authenticate_user(fake_db, username: str, password: str): user = get_user(fake_db, username) print(get_password_hash(password)) if not user: return False if not verify_password(password, user.hashed_password): return False return user def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt def get_user(db, username: str): if username in db: user_dict = db[username] return UserInDB(**user_dict) def fake_decode_token(token): user = get_user(fake_users, token) return user def get_current_user(token: str = Depends()): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="验证失败", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception token_data = TokenData(username=username) except JWTError: raise credentials_exception user = get_user(fake_users, username=token_data.username) if user is None: raise credentials_exception return user def get_current_active_user(current_user: User = Depends(get_current_user)): if current_user.disabled: raise HTTPException(status_code=400, detail="已经删除") return current_user @app.post("/login", response_model=Token) async def login_for_access_token( tokendata:TokenData): user = authenticate_user(fake_users,tokendata.username,tokendata.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"} 我们需要把这部分代码进行调整,我们调整到routers中的user.py。其实就是把之前的方法去柔和到新的方法中,需要调整下之前的用户创建,把登陆给实现了。 我们看下新修改后的代码。from fastapi import APIRouter,status from fastapi import Depends,HTTPException from models.crud import * from get_db import get_db from datetime import timedelta,datetime from jose import JWTError, jwt usersRouter=APIRouter() SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30 from passlib.context import CryptContext pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password): return pwd_context.hash(password) # 新建用户 @usersRouter.post("/users/", tags=["users"], response_model=Users) def create_user(user: UserCreate, db: Session = Depends(get_db)): """ - **email**: 用户的邮箱 - **password**: 用户密码 """ db_crest = get_user_emai(db, user.email) user.password=get_password_hash(user.password) if not db_crest: return db_create_user(db=db, user=user) raise HTTPException(status_code=200, detail="账号不能重复") def create_access_token(data: dict): to_encode = data.copy() encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt def get_cure(token): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="验证失败", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception return username except JWTError: raise credentials_exception @usersRouter.post("/login",response_model=UsersToken) def login(user:UserCreate,db: Session = Depends(get_db)): db_crest = get_user_emai(db, user.email) if not db_crest: raise HTTPException(status_code=200, detail="账号不存在") pass 现在登陆还未完全实现,我们去实现下这块。 这里的UsersToken在schemas中实现。class UsersToken(UserBase): token: str登陆的实现我们实现如下@usersRouter.post("/login",response_model=UsersToken) async def login(request: Request,user:UserCreate,db: Session = Depends(get_db)): #查看用户是否存在 db_crest = get_user_emai(db, user.email) if not db_crest: raise HTTPException(status_code=200, detail="账号不存在") #校验密码 verifypassowrd=verify_password(user.password,db_crest.password) if verifypassowrd: #产生token token=create_access_token(data={"sub": user.email}) useris=await request.app.state.redis.get(user.email) if not useris: request.app.state.redis.set(user.email,token,expire=ACCESS_TOKEN_EXPIRE_MINUTES*60) usertoken=UsersToken(token=token,email=user.email) return usertoken raise HTTPException(status_code=200, detail="请勿重复登陆") else: raise HTTPException(status_code=200, detail="密码错误") redis相关的还是在我们上次分享的时候的FastAPI 学习之路(五十四)操作Redis。 我们去启动下去测试下,看我们实现的是否正确。 由于我们更新了我们的创建用户的时候的密码的hash呢,我们先去创建用户 接下来,我们调用我们的登录发现登陆报错了。这里我们在设计数据库的时候用的是hashed_password存储的密码,我们这里需要修改下 verifypassowrd=verify_password(user.password,db_crest.hashed_password) 然后我们在测试下 这样我们的token就产生了,我们也在redis有了存储那么接下来会分享如何校验token? 通过本次的分享,我们讲登陆的用户存储到了数据库中,讲登陆后的产生的token我们存储到了redis上了。这样我们的存储持久化,接下来,我会分享如何校验token做判断是否登陆。 所有的代码,都会放在gitee上,大家可以后续看到完整的代码。后续将开发几个接口,和结合我们的接口测试来分享。欢迎持续关注。https://gitee.com/liwanlei/fastapistuday
之前我们分享了操作关系型数据库,具体文章, FastAPI 学习之路(三十二)创建数据库 FastAPI 学习之路(三十三)操作数据库 FastAPI 学习之路(三十四)数据库多表操作 这次我们分享的是非关系型数据库--Redis。 首先,我们安装对应的依赖pip intsall aioredis==1.3.1 接下来,我们去导入创建对应的连接。from aioredis import create_redis_pool, Redis from fastapi import FastAPI app = FastAPI() async def get_redis_pool() -> Redis: redis = await create_redis_pool(f"redis://:@127.0.0.1:6379/0?encoding=utf-8") return redis @app.on_event("startup") async def startup_event(): app.state.redis = await get_redis_pool() @app.on_event("shutdown") async def shutdown_event(): app.state.redis.close() await app.state.redis.wait_closed() 这里我们也利用了上次分享的事件,FastAPI 学习之路(五十三)startup 和 shutdown。接下来,我们去创建一个api去操作对应的一个api,进行调试。@app.get("/test", summary="测试redis") async def test_redis(request: Request, num: int=Query(123, title="参数num")): # 等待redis写入 await异步变同步 # 如果不关心结果可以不用await,但是这里下一步要取值, # 必须得先等存完值 后再取值 await request.app.state.redis.set("test", num) # 等待 redis读取 v = await request.app.state.redis.get("test") print(v, type(v)) return {"msg": v} 我们可以用postman请求下。 我们看下。redis是否存储 我们可以看到redis存储了我们的数据,我们的接口也正常返回了。这只是一个简单的demo。后续我们可以存储缓存,也可以来存储我们的token。
相对于之前的text的消息来说,我们之前发送的消息都是text的居多,那么对于text格式的消息来说,我们处理起来,尤其是后端要麻烦的多,那么我们可不可以传递json格式的,对于前后端来说都好处理的一种格式的消息,那么答案来说是可以的。我们需要做下处理。 我们在之前的websockets管理中的的消息的处理全部改造成json格式的。我们看下如何处理呢class ConnectionManager: def __init__(self): # 存放**的链接 self.active_connections: List[Dict[str, WebSocket]] = [] async def connect(self, user: str, ws: WebSocket): # 链接 await ws.accept() self.active_connections.append({"user": user, "ws": ws}) def disconnect(self, user: str, ws: WebSocket): # 关闭时 移除ws对象 self.active_connections.remove({"user": user, "ws": ws}) async def send_other_message_json(self, message: dict, user: str): # 发送个人消息 for connection in self.active_connections: if connection["user"] == user: await connection['ws'].send_json(message) async def broadcast_json(self, data: dict): # 广播消息 for connection in self.active_connections: await connection['ws'].send_json(data) 我们现在所有的消息,我们只支持的json格式呢,我们可以对的处理消息,manager = ConnectionManager() @app.websocket("/ws/{user}/") async def websocket_many_point( websocket: WebSocket, user:str, cookie_or_token: str = Depends(get_cookie_or_token), ): print(user) await manager.connect(user, websocket) try: while True: data = await websocket.receive_json() senduser=data['username'] if senduser: await manager.send_other_message_json(data,senduser) else: await manager.broadcast_json(data) except WebSocketDisconnect as e: manager.disconnect(user, websocket) 修改完之后,我们针对前段也需要修改,修改成支持我们的json格式数据<!DOCTYPE html> <html> <head> <title>Chat</title> </head> <body> <h1>WebSocket 聊天</h1> <form action="" onsubmit="sendMessage(event)"> <input type="text" id="messageText" autocomplete="off"/> <input type="text" id="username" autocomplete="off"/> <button>Send</button> </form> <button onclick="logout()">退出</button> <ul id='messages'> </ul> <script> var token=window.localStorage.getItem("token") if (token==null ){ window.location.href="/login" } var ws = new WebSocket("ws://localhost:8000/ws/"+token+"/?token="+token); ws.onmessage = function (event) { var messages = document.getElementById('messages') var message = document.createElement('li') let receiveJson = JSON.parse(event.data); console.log(receiveJson) var content = document.createTextNode(receiveJson.username+"说:"+receiveJson.messageText) message.appendChild(content) messages.appendChild(message) }; function sendMessage(event) { var input = document.getElementById("messageText") var username = document.getElementById("username") let message = {messageText: input.value, username:username.value}; let messageJson = JSON.stringify(message); ws.send(messageJson); input.value = '' event.preventDefault() } function logout() { window.localStorage.removeItem("token") window.location.href='/login' } </script> </body> </html> 我们部署下,重启就可以完成我们的界面了。 所有的都接受到了,当我只对一个人说是,只有这个人接收到了这条消息。
我们要实现的就是,密码连续输入错误5次,就限制用户十分钟不能进行登录。 大致的流程图 数据库设计如下 DROP TABLE IF EXISTS `user`; CREATE TABLE `user` ( `id` int(11) NOT NULL AUTO_INCREMENT, `username` varchar(64) NOT NULL COMMENT '用户名', `password` varchar(255) NOT NULL COMMENT '用户密码', `email` varchar(64) DEFAULT NULL COMMENT '用户邮箱', `status` int(11) NOT NULL DEFAULT '0' COMMENT '状态,1代表删除', `admin` int(11) NOT NULL DEFAULT '0' COMMENT '是否是管理员,1代表是管理员', `iphone` varchar(20) DEFAULT NULL COMMENT '用户手机号', `workid` int(11) NOT NULL DEFAULT '0', `token` varchar(255) DEFAULT NULL, `errornum` int(2) NOT NULL DEFAULT '0', `freeze` int(2) NOT NULL DEFAULT '0', `freezetime` datetime DEFAULT NULL, PRIMARY KEY (`id`,`username`), KEY `username` (`username`) ) ENGINE=InnoDB AUTO_INCREMENT=49 DEFAULT CHARSET=utf8 COMMENT='用户表'; 那么我们来实现dao层package pan.DaoObject; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import lombok.Data; import org.hibernate.annotations.DynamicUpdate; import pan.enmus.FreezeEmus; import pan.enmus.UserEmus; import javax.persistence.Entity; import javax.persistence.GeneratedValue; import javax.persistence.GenerationType; import javax.persistence.Id; import java.io.Serializable; import java.util.Date; @Data @DynamicUpdate @Entity @JsonIgnoreProperties(value = {"hibernateLazyInitializer", "handler"}) public class User implements Serializable { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Integer id; private String username; private String password; private String email; private String iphone; private Integer status = UserEmus.UNDELETE.getCode(); private Integer admin = UserEmus.NOTADMIN.getCode(); private String token; private Date freezetime; private Integer errornum; private Integer freeze= FreezeEmus.UNDELETE.getCode(); } 对应的UserEmus package pan.enmus; import lombok.Getter; @Getter public enum UserEmus { ADMIN(1, "管理员"), NOTADMIN(0, "非管理员"), DELETE(1, "删除"), UNDELETE(0, "正常"); private Integer code; private String message; UserEmus(Integer code, String message) { this.code = code; this.message = message; } }FreezeEmus 为:package pan.enmus; import lombok.Getter; @Getter public enum FreezeEmus { DELETE(1,"冻结"), UNDELETE(0,"正常"); private Integer code; private String message; FreezeEmus(Integer code, String message){ this.code=code; this.message=message; } }那么接下来,我们就是UserRepository实现public interface UserRepository extends JpaRepository<User, Integer> { User findByUsername(String username); }里面就用到了一个通过username查找用户那么我们接下来去实现service@Service public class UserSericeImpl { @Autowired private UserRepository userRepository; @Override public User login(String username, String password) { User user = userRepository.findByUsername(username); if (user != null) { if (user.getStatus().equals( UserEmus.DELETE.getCode())){ throw new PanExection(ResultEmus.USER_DELETE); } SimpleDateFormat format=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); try{ try { if(user.getFreeze().equals(FreezeEmus.DELETE.getCode())&& (new Date().getTime()-format.parse(user.getFreezetime().toString()).getTime()<6*60*1000)){ user.setErrornum(0); userRepository.saveAndFlush(user); throw new PanExection(ResultEmus.USER_FREE); } } catch (ParseException e) { e.printStackTrace(); } }catch (NullPointerException e){ userRepository.saveAndFlush(user); } Boolean b = null; try { b = MD5Until.checkoutpassword(password, user.getPassword()); } catch (Exception e) { throw new PanExection(ResultEmus.EXCEPTIONS); } if (b) { String key = "Plan_" + user.getUsername(); String tokned = (String) userredis(redisTemplate).opsForValue().get(key); user.setErrornum(0); user.setFreezetime(null); if (tokned == null) { Date date = new Date(); String tokne = null; try { tokne = MD5Until.md5(key + date.toString()); } catch (Exception e) { throw new PanExection(ResultEmus.EXCEPTIONS); } String token = user.getUsername() + "_" + tokne; user.setToken(token); userRepository.saveAndFlush(user); userredis(redisTemplate).opsForValue().set(key, token, 1, TimeUnit.DAYS); return user; } else { userRepository.saveAndFlush(user); return user; } }else { if(user.getErrornum()>4){ user.setErrornum(user.getErrornum()+1); user.setFreeze(FreezeEmus.DELETE.getCode()); user.setFreezetime(new Date()); userRepository.saveAndFlush(user); throw new PanExection(ResultEmus.USER_FREE); }else { Integer err=user.getErrornum()+1; user.setErrornum(err); userRepository.saveAndFlush(user); throw new PanExection(ResultEmus.USER_ERROR_PASSWORD); } } } throw new PanExection(ResultEmus.USER_NOT_EXIT); } } 我们最后去实现一个contorller类import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.validation.BindingResult; import org.springframework.web.bind.annotation.*; import pan.DaoObject.User; import pan.Form.UserForm; import pan.config.RedisConfig; import pan.converter.UserForm2User; import pan.enmus.ResultEmus; import pan.exection.PanExection; import pan.service.FileService; import pan.service.UserSerice; import pan.untils.RedisDbInit; import pan.untils.ResultVOUntils; import pan.vo.ResultVO; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import javax.validation.Valid; import java.util.HashMap; import java.util.Map; @RestController @Api(tags = "2.4", description = "登录", value = "实现登录") @RequestMapping("/plan") public class LoginContorl { @Autowired private UserSerice userSerice; @Autowired private RedisTemplate redisTemplate; @Autowired private FileService fileService; private RedisTemplate setusertoken(RedisTemplate redisTemplate) { redisTemplate = RedisDbInit.initRedis(RedisConfig.userreidport, redisTemplate); return redisTemplate; } @ApiOperation(value = "登录", notes = "用户登录") @PostMapping(value = "/login", produces = "application/json") public ResultVO login(@Valid UserForm userForm, BindingResult bindingResult) { if (bindingResult.hasErrors()) { throw new PanExection(ResultEmus.PARM_ERROR.getCode(), bindingResult.getFieldError().getDefaultMessage()); } User user = UserForm2User.convert(userForm); User login = userSerice.login(user.getUsername(), user.getPassword()); Map<String, String> map = new HashMap<>(); String redis = (String) setusertoken(redisTemplate).opsForValue().get("Plan_" + user.getUsername()); Boolean superadmin=userSerice.usersuperadmin(login.getId()); if(superadmin){ map.put("is_super","1"); }else{ map.put("is_super","0"); } if (login != null) { map.put("userid", login.getId().toString()); map.put("token", redis); return ResultVOUntils.success(map); } return ResultVOUntils.error(1, "密码或者用户名错误"); } }到此我们的代码部分已经实现
周末无聊 在家封装一个pyselenium。可能这些封装大家都会使用,但是我还是根据我自己的习惯去选择性的去封装一些在我工作中用的,这样的话,我就不用去看selenium的api的,我可以根据我自己的封装去写脚本,这样的,我还是可以更加灵活快捷的去完成我的代码,其实我相信很多的网上都会有大神对这个的封装,那么我也是根据我业余学习的只知识,在加上我自己的摸索去封装的几个方法,其实我也没怎么去大规模的去试下这个是否能够更快的去使用,但是我封装的函数都是我自己能看懂的,可以直接去使用的。经过简单的封装,让我们在使用的时候更加方便快捷,更加用起来顺手,虽然笔者不曾大规模的使用,但是也是根据我的经验去试着去对这些进行封装from selenium import webdriver from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.common.by import By from selenium.webdriver.common.action_chains import ActionChains class pyselenium(): def __init__(self,brower):#初始化浏览器 if brower =='firefox' or brower =='Firefox' or brower =='f' or brower =='F': deriver=webdriver.Firefox() elif brower =='Ie' or brower =='ie' or brower =='i' or brower=='I': deriver=webdriver.Ie() elif brower =='Chrome' or brower =='chrome' or brower =='Ch' or brower=='ch': deriver=webdriver.Chrome() elif brower =='PhantomJS' or brower =='phantomjs' or brower =='ph' or brower=='phjs': deriver=webdriver.PhantomJS() elif brower =='Edge' or brower =='edge' or brower =='Ed' or brower=='ed': deriver=webdriver.Edge() elif brower =='Opera' or brower =='opera' or brower =='op' or brower=='OP': deriver=webdriver.Opera() elif brower =='Safari' or brower =='safari' or brower =='sa' or brower=='saf': deriver=webdriver.Safari() else: raise NameError('只能输入firefox,Ie,Chrome,PhantomJS,Edge,Opera,Safari') self.driver=deriver def element(self,fangfa,dingwei):#定位 if fangfa=='id': element=self.deriver.find_element_by_id(dingwei) elif fangfa == "name": element = self.driver.find_element_by_name(dingwei) elif fangfa == "class": element = self.driver.find_element_by_class_name(dingwei) elif fangfa == "link_text": element = self.driver.find_element_by_link_text(dingwei) elif fangfa == "xpath": element = self.driver.find_element_by_xpath(dingwei) elif fangfa == "tag": element = self.driver.find_element_by_tag_name(dingwei) elif fangfa == "css": element = self.driver.find_element_by_css_selector(dingwei) else: raise NameError("Please enter the elements,'id','name','class','link_text','xpath','css','tag'.") return element def elements(self,fangfa,dingwei):#组定位 if fangfa=='id': element=self.driver.find_elements_by_id(dingwei) elif fangfa == "name": element = self.driver.find_elements_by_name(dingwei) elif fangfa == "class": element = self.driver.find_elements_by_class_name(dingwei) elif fangfa == "link_text": element = self.driver.find_elements_by_link_text(dingwei) elif fangfa == "xpath": element = self.driver.find_elements_by_xpath(dingwei) elif fangfa == "tag": element = self.driver.find_elements_by_tag_name(dingwei) elif fangfa == "css": element = self.driver.find_elements_by_css_selector(dingwei) else: raise NameError("Please enter the elements,'id','name','class','link_text','xpath','css','tag'.") return elements def element_wait(self,fangfa,dingwei,wati=6):#等待 if fangfa == "id": WebDriverWait(self.driver,wati,1).until(EC.presence_of_element_located((By.ID, dingwei))) elif fangfa == "name": WebDriverWait(self.driver,wati,1).until(EC.presence_of_element_located((By.NAME, dingwei))) elif fangfa == "class": WebDriverWait(self.driver,wati,1).until(EC.presence_of_element_located((By.CLASS_NAME, dingwei))) elif fangfa == "link_text": WebDriverWait(self.driver,wati,1).until(EC.presence_of_element_located((By.LINK_TEXT, dingwei))) elif fangfa == "xpath": WebDriverWait(self.driver,wati,1).until(EC.presence_of_element_located((By.XPATH, dingwei))) elif fangfa == "css": WebDriverWait(self.driver,wati,1).until(EC.presence_of_element_located((By.CSS_SELECTOR, dingwei))) else: raise NameError("Please enter the elements,'id','name','class','link_text','xpath','css'.") def open(self,url):#打开网页 self.driver.get(url) def make_maxwindow(self):#最大化浏览器 self.driver.maximize_window() def set_winsize(self,wide,hight):#设置窗口 self.driver.set_window_size(wide,hight) def send_ke(self,fangfa,dingwei,text):#发送内容 self.element(fangfa,dingwei) e1=self.element(fangfa,dingwei) e1.clear() e1.send_keys(text) def clea(self,fangfa,dingwei):#清空 self.element_wait(fangfa,dingwei) e1=self.element(fangfa,dingwei) e1.clear() def click(self,fangfa,dingwei):#单击 self.element_wait(fangfa,dingwei) e1=self.element(fangfa,dingwei) e1.click() def right_click(self,fangfa,dingwei):#右击 self.element_wait(fangfa,dingwei) e1=self.element(fangfa,dingwei) ActionChains(self.deriver).context_click(e1).perform() def move_element(self,fangfa,dingwei):#移动到 self.element_wait(fangfa,dingwei) e1=self.element(fangfa,dingwei) ActionChains(self.deriver).move_element(e1).perform() def double_click(self,dingwei,fangfa):#双击 self.element_wait(fangfa,dingwei) e1=self.element(fangfa,dingwei) ActionChains(self.driver).double_click(e1).perform() def drag_and_drop(self,fangfa1,e1,fangfa2,e2):#从e1到e2 self.element_wait(fangfa1,e1) eme1=self.element(fangfa1,e1) self.element_wait(fangfa2,e2) eme2=self.element(fangfa2,e2) ActionChains(self.deriver).drag_and_drop(eme1,eme2).perform() def click_text(self,text):#点击文字 self.driver.find_element_by_link_text(text).click() def close(self):#关闭 self.driver.close() def kill(self):#退出 self.driver.quit() def sublimit(self,fangfa,dingwei):#提交 self.element_wait(fangfa,dingwei) e1=self.element(fangfa,dingwei) e1.sublimit() def f5(self):#刷新 self.driver.refresh() def js(self,sprit):#执行js self.driver.execute_script(sprit) def get_attribute(self, fangfa,dingwei, attribute): e1=self.element(fangfa,dingwei) return e1.get_attribute(attribute) def get_text(self,fangfa,dingwei): self.element_wait(fangfa,dingwei) e1=self.element(fangfa,dingwei) return e1.text def get_is_dis(self,fangfa,dingwei): self.element_wait(fangfa,dingwei) e1=self.element(fangfa,dingwei) return e1.is_displayed() def get_title(self,fangfa,dingwei):#获取title return self.driver.title() def get_screen(self,file_path):#截屏 self.driver.get_screenshot_as_file(file_path) def wait(self,fangfa,dingwei):#等待 self.driver.element_wait(fangfa,dingwei) def accpet(self):#允许 self.driver.switch_to.alert.accept() def dismiss_alert(self): self.driver.switch_to.alert.dismiss() def switch_to_frame(self, fangfa,dingwei):#切换 self.element_wait(fangfa,dingwei) if1=self.element(fangfa,dingwei) self.driver._switch_to.frame(if1) if __name__ == '__main__': driver=pyselenium(brower="firefox") driver.open('http://www.baidu.com')
2022年04月