#!/usr/local/bin/python3
# coding=gbk
# http://www.cnblogs.com/txw1958/
#
import os, io, sys, re, time, base64, json
import webbrowser, urllib.request
import unittest
from weibopy.auth import OAuthHandler, BasicAuthHandler
from weibopy.api import API
from weibopy.api import WeibopError
class Test(unittest.TestCase):
consumer_key = ""
consumer_secret = ""
def __init__(self, app_key, app_secret):
""" constructor """
self.consumer_key = app_key
self.consumer_secret = app_secret
def getAtt(self, key):
try:
return self.obj.__getattribute__(key)
except Exception as e:
print(e)
return ''
def getAttValue(self, obj, key):
try:
return obj.__getattribute__(key)
except Exception as e:
print(e)
return ''
#1.最老的认证方式,现在已不支持
def basicAuth(self, source, username, password):
self.authType = 'basicauth'
self.auth = BasicAuthHandler(username, password)
self.api = API(self.auth,source=source)
#2.新的认证方式,目前新浪所支持的
# 新浪源代码中需要手动添加,现在改为代码自动解析verifier
def auth(self, username, password):
if len(self.consumer_key) == 0:
print("Please set consumer_key")
return
if len(self.consumer_key) == 0:
print("Please set consumer_secret")
return
oauthFormat = "json" if True else "xml"
self.auth = OAuthHandler(self.consumer_key, self.consumer_secret)
auth_url = self.auth.get_authorization_url()
auth_url = auth_url + '&oauth_callback=' + oauthFormat + '&userId=' + username +'&passwd=' + password
#print('URL: ' + auth_url)
if (oauthFormat == "xml"):
xmlcontent = urllib.request.urlopen(auth_url)
xmldoc = minidom.parse(xmlcontent)
root = xmldoc.documentElement
oauth_verifier = root.getElementsByTagName("oauth_verifier")[0]
verifier = oauth_verifier.childNodes[0].data
else:
jsoncontent = urllib.request.urlopen(auth_url)
stdout = jsoncontent.read().decode('utf-8')
jsondatas = json.loads(stdout)
verifier = jsondatas["oauth_verifier"]
#print("Verifier: " + verifier)
#verifier = input('PIN: ').strip()
access_token = self.auth.get_access_token(verifier)
self.api = API(self.auth)
return access_token
#3. 直接使用新的认证方式生成的已授权密码和密钥通讯,不需要重新认证
# 将已授权密码和密钥保存下来,直接使用
def setAccessToken(self, key, secret):
self.auth = OAuthHandler(self.consumer_key, self.consumer_secret)
self.auth.setAccessToken(key, secret)
self.api = API(self.auth)
#发布一条微博信息
def update(self, message):
status = self.api.update_status(lat='39', long='110', status=message)
self.obj = status
id = self.getAtt("id")
text = self.getAtt("text")
print("update---"+ str(id) +":"+ text)
return int(id)
#上传图片并发布一条微博信息
#仅支持 'image/gif', 'image/jpeg', 'image/png'
def upload(self, filename, message):
status = self.api.upload(filename, status=message)
self.obj = status
id = self.getAtt("id")
text = self.getAtt("text")
self.obj = self.getAtt("user")
profile_image_url = self.getAtt("profile_image_url")
print("upload,id="+ str(id) +",text="+ text +",image_name="+ filename)
def uploadExt(self, filename, message, latPosition, longPosition):
status = self.api.upload(filename, status=message, lat=latPosition, long=longPosition)
self.obj = status
id = self.getAtt("id")
text = self.getAtt("text")
self.obj = self.getAtt("user")
profile_image_url = self.getAtt("profile_image_url")
print("upload,id="+ str(id) +",text="+ text +",profile_image_url="+ profile_image_url)
#删除一条微博信息
def destroy_status(self, id):
status = self.api.destroy_status(id)
self.obj = status
id = self.getAtt("id")
text = self.getAtt("text")
print("destroy---"+ str(id) +":"+ text)
#回复微博评论信息
#个人理解:对某条消息id的评论cid的评论
def reply(self, id, cid, message):
status = self.api.reply(id, cid, message)
self.obj = status
id = self.getAtt("id")
text = self.getAtt("text")
print("reply---"+ str(id) +":"+ text)
#转发一条微博信息(可加评论)id:微博id, message:评论
def repost(self, id, message):
status = self.api.repost(id, message)
self.obj = status
id = self.getAtt("id")
text = self.getAtt("text")
print("repost---"+ str(id) +":"+ text)
#获取指定微博的评论列表: id:微博消息id
def comments(self, id):
comments = self.api.comments(id=id)
for comment in comments:
self.obj = comment
mid = self.getAtt("id")
text = self.getAtt("text")
print("comments---"+ str(mid) +":"+ text)
#获取当前用户发送及收到的评论列表: 5条
def comments_timeline(self):
comments = self.api.comments_timeline()
for comment in comments:
self.obj = comment
mid = self.getAtt("id")
text = self.getAtt("text")
print("comments_timeline---"+ str(mid) +":"+ text)
#获取当前用户发出的评论: number 评论条数, 返回最后一条评论的id
def comments_by_me(self, number):
comments = self.api.comments_by_me(count=number)
mid = ''
for comment in comments:
self.obj = comment
mid = self.getAtt("id")
text = self.getAtt("text")
created_at = self.getAtt("created_at")
print('comments_by_me,id='+ str(mid) +',text='+ text+',created_at='+ str(created_at))
return mid
#评论指定微博: mid:微博id content: 要评论的内容
def comment(self, mid, comment_content):
comment = self.api.comment(id=mid, comment=comment_content)
self.obj = comment
mid2 = self.getAtt("id")
text = self.getAtt("text")
print("mid: "+ str(mid2) +" test:"+ text)
#删除当前用户的微博评论信息: mid 消息id
def comment_destroy (self, mid):
comment = self.api.comment_destroy(mid)
self.obj = comment
mid = self.getAtt("id")
text = self.getAtt("text")
print("comment_destroy---"+ str(mid) +":"+ text)
#批量获取一组微博的评论数及转发数
def counts(self):
counts = self.api.counts(ids='3363780346175114,3366928917792687,1877120192')
for count in counts:
self.obj = count
mid = self.getAtt("id")
comments = self.getAtt("comments")
rt = self.getAtt("rt")
print("mentions---"+ str(mid) +":"+ str(comments) +":"+ str(rt))
#私信,通过高级认证的应用key可以使用
def direct_message(self):
messages = self.api.direct_messages()
mid = ''
for msg in messages:
self.obj = msg
mid = self.getAtt("id")
text = self.getAtt("text")
print("direct_message---"+ str(mid) +":"+ text)
return mid
#我发送的私信列表
def sent_direct_messages(self):
messages = self.api.sent_direct_messages()
for msg in messages:
self.obj = msg
mid = self.getAtt("id")
text = self.getAtt("text")
print("sent_direct_messages---"+ str(mid) +":"+ text)
#发送私信
def new_direct_message(self):
msg = self.api.new_direct_message(id=1114365581,text='directMessages--test-测试-'+ str(time.time()))
self.obj = msg
mid = self.getAtt("id")
text = self.getAtt("text")
print("new_direct_message---"+ str(mid) +":"+ text)
#删除一条私信
def destroy_direct_message(self, id):
msg = self.api.destroy_direct_message(id)
self.obj = msg
mid = self.getAtt("id")
text = self.getAtt("text")
print("destroy_direct_message---"+ str(mid) +":"+ text)
#获取当前用户的收藏列表
def favorites(self):
statuses = self.api.favorites(id=2)
for status in statuses:
self.obj = status
sid = self.getAtt("id")
text = self.getAtt("text")
print("favorites---"+ str(sid) +":"+ text)
#添加收藏: id:消息id
def create_favorite(self, id):
status = self.api.create_favorite(id)
self.obj = status
sid = self.getAtt("id")
text = self.getAtt("text")
print("create_favorite---"+ str(sid) +":"+ text)
#删除当前用户收藏的微博信息 : id:消息id
def destroy_favorite(self, id):
msg = self.api.destroy_favorite(id)
self.obj = msg
mid = self.getAtt("id")
text = self.getAtt("text")
print("destroy_favorite---"+ str(mid) +":"+ text)
#关注某用户: fid 用户id
def create_friendship(self, fid):
user = self.api.create_friendship(id=fid)
self.obj = user
uid = self.getAtt("id")
screen_name = self.getAtt("screen_name")
print("create_friendship---"+ str(uid) +":"+ screen_name)
#取消关注: fid 用户id
def destroy_friendship(self, fid):
user = self.api.destroy_friendship(id=fid)
self.obj = user
uid = self.getAtt("id")
screen_name = self.getAtt("screen_name")
print("destroy_friendship---"+ str(uid) +":"+ screen_name)
#是否关注某用户:用户a是否关注用户b
def exists_friendship(self, user_a_id, user_b_id):
self.obj = self.api.exists_friendship(user_a=user_a_id, user_b=user_b_id)
friends = self.getAtt("friends")
print("exists_friendship--- "+ str(friends))
#是否互相关注: uid: 用户id : 似乎没有用
def show_friendship(self, uid):
showList = self.api.show_friendship(target_id=uid)
for obj in showList:
self.obj = obj
uid = self.getAtt("id")
screen_name = self.getAtt("screen_name")
print("show_friendship---"+ str(uid) +":"+ screen_name)
#是否为我的关注用户
def is_my_attention(self, uid):
self.obj = self.api.exists_friendship(user_a=myuserid, user_b=uid)
friends = self.getAtt("friends")
print(str(friends))
return friends
#是否为我的粉丝用户
def is_my_follower(self, uid):
self.obj = self.api.exists_friendship(user_a=uid, user_b=myuserid)
friends = self.getAtt("friends")
print(str(friends))
return friends
#显示当前登录用户id
def show_self_id(self):
global myuserid
showList = self.api.show_friendship(target_id=opposite_id) #存在的id号
self.obj = showList[0]
myuserid= self.getAtt("id")
screen_name = self.getAtt("screen_name")
print("\nuid: " + str(myuserid) + " name: " + str(screen_name))
return myuserid
#根据id显示用户信息
def show_opposite_id(self, opposite_id):
showList = self.api.show_friendship(target_id=opposite_id) #存在的id号
self.obj = showList[1]
#opposite_id= self.getAtt("id")
screen_name = self.getAtt("screen_name")
print("uid: " + str(opposite_id) + " name: " + str(screen_name))
return opposite_id
#根据id显示用户昵称_新
def get_user_nickname(self, opposite_id):
showList = self.api.get_user(user_id=opposite_id) #存在的id号
self.obj = showList
opposite_id= self.getAtt("id")
screen_name = self.getAtt("screen_name")
#print("uid: " + str(opposite_id) + " name: " + str(screen_name))
return screen_name
#根据id获取用户信息:关注的人数
def get_friends_count(self, uid):
showList = self.api.get_user(user_id = uid)
self.obj = showList
screen_name = self.getAtt("screen_name")
description = self.getAtt("description")
followers_count = self.getAtt("followers_count") #int
friends_count = self.getAtt("friends_count")
statuses_count = self.getAtt("statuses_count")
return friends_count
#根据id获取用户信息:
def get_user_info_list(self, uid):
showList = self.api.get_user(user_id = uid)
user_info_list = showList
return user_info_list
#关注列表:我关注的用户: 20个
def friends(self):
timeline = self.api.friends()
for line in timeline:
self.obj = line
fid = self.getAtt("id")
name = self.getAtt("screen_name")
print("friends---"+ str(fid) +":"+ name)
#获取当前用户所关注用户的最新微博信息 2条
def friends_timeline(self):
twitter_dict = []
timeline = self.api.friends_timeline(count=2, page=1)
for line in timeline:
self.obj = line
mid = self.getAtt("id")
text = self.getAtt("text")
twitter_dict.append({'mid': mid, 'txt': text})
print("friends_timeline---"+ str(mid) +":"+ text)
return twitter_dict
#获取当前用户发送及收到的评论列表 2条
def comments_timeline(self):
timeline = self.api.comments_timeline(count=2, page=1)
for line in timeline:
self.obj = line
mid = self.getAtt("id")
text = self.getAtt("text")
print("comments_timeline---"+ str(mid) +":"+ text)
#获取指定用户发布的微博信息列表:
def user_timeline(self, userid, amount):
timeline = self.api.user_timeline(uid=userid, count=amount)
for line in timeline:
self.obj = line
mid = self.getAtt("id")
text = self.getAtt("text")
created_at = self.getAtt("created_at")
print("user_timeline---"+ str(mid) +":"+ text)
#获取用户发布的微博信息列表 5条
def user_timeline_dict(self, amount):
user_msg_dict = []
timeline = self.api.user_timeline(count=amount, page=1)
for line in timeline:
self.obj = line
mid = self.getAtt("id")
text = self.getAtt("text")
user_msg_dict.append({'id': mid, 'text': text})
print("user_timeline---"+ str(mid) +":"+ text)
return user_msg_dict
#获取用户发布的微博信息列表 5条
def user_timeline_pages(self, amount, pagenumber):
user_msg_dict = []
timeline = self.api.user_timeline(count=amount, page=pagenumber)
for line in timeline:
self.obj = line
mid = self.getAtt("id")
text = self.getAtt("text")
user_msg_dict.append({'id': mid, 'text': text})
print("user_timeline---"+ str(mid) +":"+ text)
return user_msg_dict
#获取最新更新的公共微博消息 2条
def public_timeline(self):
timeline = self.api.public_timeline(count=2, page=1)
for line in timeline:
self.obj = line
mid = self.getAtt("id")
text = self.getAtt("text")
print("public_timeline---"+ str(mid) +":"+ text)
#获取最新更新的公共微博消息 1 条返回 mid
def public_timeline_mid(self):
timeline = self.api.public_timeline(count=2, page=1)
for line in timeline:
self.obj = line
mid = self.getAtt("id")
return mid
#关注列表
def friends_ids(self):
fids = self.api.friends_ids()
self.obj = fids
ids = self.getAtt("ids")
next_cursor = self.getAtt("next_cursor")
previous_cursor = self.getAtt("previous_cursor")
print("friends_ids---"+ str(ids) +":next_cursor"+ str(next_cursor) +":previous_cursor"+ str(previous_cursor))
return ids
#关注列表 全部 (用户名)
def friends_ids_ex(self):
fids = self.api.friends_ids()
self.obj = fids
ids = self.getAtt("ids")
for line in ids:
self.obj = line
fid = self.getAtt("id")
name = self.getAtt("screen_name")
print("friends---"+ str(fid) +":"+ name)
#粉丝列表 全部
def followers_ids(self):
fids = self.api.followers_ids()
self.obj = fids
ids = self.getAtt("ids")
next_cursor = self.getAtt("next_cursor")
previous_cursor = self.getAtt("previous_cursor")
print("followers_ids---"+ str(ids) +":next_cursor"+ str(next_cursor) +":previous_cursor"+ str(previous_cursor))
return ids
#获取@当前用户的微博列表 全部
def mentions(self):
comments = self.api.mentions()
for comment in comments:
self.obj = comment
mid = self.getAtt("id")
text = self.getAtt("text")
print("mentions---"+ str(mid) +":"+ text)
#获取隐私信息
##comment : 谁可以评论此账号的微薄。0:所有人,1:我关注的人。
##dm : 谁可以给此账号发私信。0:所有人,1:我关注的人。
##real_name: 是否允许别人通过真实姓名搜索到我, 0允许,1不允许。
##geo : 发布微博,是否允许微博保存并显示所处的地理位置信息。 0允许,1不允许。
##badge : 勋章展现状态,值—1私密状态,0公开状态。
def get_privacy(self):
privacy = self.api.get_privacy()
self.obj = privacy
ct = self.getAtt("comment")
dm = self.getAtt("dm")
real_name = self.getAtt("real_name")
geo = self.getAtt("geo")
badge = self.getAtt("badge")
print ("privacy---"+ str(ct) + str(dm) + str(real_name) + str(geo) + str(badge))
#设置隐私信息
def update_privacy(self):
update_privacy = self.api.update_privacy(comment=0)
#获取请求次数限制信息
#-针对一个用户在使用一个应用的请求次数限制
#-普通授权:
#-总限制:单用户每应用 150次/小时
#-发微博:单用户每应用 30次/小时
#-发评论:单用户每应用 60次/小时
#-加关注:单用户每应用 60次/小时 200次/天
#-{
#- "hourly_limit":150, #本小时内剩余数量 150 次,
#- "reset_time_in_seconds":1264994122, #计数器重置剩余时间1264994122,秒,
#- "reset_time":"Mon Feb 01 03:15:22 +0800 2010", #下次重置时间03:15:22点。
#- "remaining_hits":150 #剩余访问数量 150 次,
#-}
def rate_limit_status(self):
limit_status = self.api.rate_limit_status()
self.obj = limit_status
hourly_limit = self.getAtt("hourly_limit")
reset_time_in_seconds = self.getAtt("reset_time_in_seconds")
reset_time = self.getAtt("reset_time")
remaining_hits = self.getAtt("remaining_hits")
print ("hourly_limit---" + str(hourly_limit))
print ("reset_time_in_seconds---" + str(reset_time_in_seconds))
print ("reset_time---" + str(reset_time))
print ("remaining_hits---" + str(remaining_hits))
#添加用户标签
def tag_create(self, message):
message = message.encode("utf-8")
tag_create = self.api.tag_create(tags=message)
for line in tag_create:
self.obj = line
tagid = self.getAtt("tagid")
print ("tag_create---"+tagid)
#返回用户感兴趣的标签 10条
def tag_suggestions(self):
tag_suggestions=self.api.tag_suggestions()
for line in tag_suggestions:
self.obj = line
id = self.getAtt("id")
value = self.getAtt("value")
print ("tag_suggestions---"+ id +":"+ value)
#删除标签 tag_id 标签id
def tag_destroy(self,tag_id):
tag_destroy=self.api.tag_destroy(tag_id)
self.obj=tag_destroy
result=self.getAtt("result")
print ("tag_destroy---"+ result)
#批量删除标签
def tag_destroy_batch(self,tag_ids):
tag_destroy_batch=self.api.tag_destroy_batch(tag_ids)
for line in tag_destroy_batch:
self.obj = line
tagid=self.getAtt("tagid")
print ("tag_destroy_batch---"+ tagid)
#获取某人的话题 uid 用户id
def trends(self, uid):
trends = self.api.trends(user_id=uid, count=20, page=1)
for line in trends:
self.obj=line
num = self.getAtt("num")
trend_id = self.getAtt("trend_id")
hotword = self.getAtt("hotword")
print ("trends---"+ num +":"+ hotword +":"+trend_id)
#新浪未提供测试代码
def trends_statuses(self , message):
message = message.encode("utf-8")
trends_statuses = self.api.trends_statuses(message)
for line in trends_statuses:
self.obj=line
id = self.getAtt("id")
text = self.getAtt("text")
print ("ToReader---"+ str(id) + ":" +text)
#print text
#关注某一个话题
def trends_follow(self , message):
message = message.encode("utf-8")
trends_follow = self.api.trends_follow(message)
#取消关注的某一个话题
def trends_destroy(self , id):
trends_destroy=self.api.trends_destroy(id)
#按小时返回热门话题
def trends_hourly(self):
trends_hourly = self.api.trends_hourly(base_app=0)
self.obj=trends_hourly
query = self.getAtt("trends")
as_of = self.getAtt("as_of")
for key in query:
key = time.strftime('%Y-%m-%d',time.localtime(as_of))
for line in query[key]:
query = line["query"]
name = line["name"]
print ("trends_hourly---"+" Query:" + query + ", Name:"+ name)
#返回最近一天内的热门话题
def trends_daily(self):
trends_daily=self.api.trends_daily(base_app=0)
self.obj=trends_daily
query=self.getAtt("trends")
as_of=self.getAtt("as_of")
for key in query:
key=time.strftime('%Y-%m-%d',time.localtime(as_of))
for line in query[key]:
query=line["query"]
name=line["name"]
print ("trends_daily---"+"Query:" + query+",Name:"+ name)
#返回当周热门话题
def trends_weekly(self):
trends_weekly=self.api.trends_weekly(base_app=0)
self.obj=trends_weekly
query=self.getAtt("trends")
as_of=self.getAtt("as_of")
for key in query:
key = time.strftime('%Y-%m-%d',time.localtime(as_of))
for line in query[key]:
query=line["query"]
name=line["name"]
print ("trends_weekly---"+"Query:" + query+",Name:"+ name)
#按小时返回热门话题
def trends_hourly(self, number):
index = 0
trends_hourly = self.api.trends_hourly(base_app=0)
self.obj = trends_hourly
query = self.getAtt("trends")
as_of = self.getAtt("as_of")
for key in query:
key = time.strftime('%Y-%m-%d',time.localtime(as_of))
for line in query[key]:
if index == 10:
return
index +=1
query = line["query"]
name = line["name"]
print ("trends_hourly---"+" Query:" + query + ", Name:"+ name)
#获取当前用户未读消息数
def unread(self):
count = self.api.unread()
self.obj = count
mentions = self.getAtt("mentions")
comments = self.getAtt("comments")
followers = self.getAtt("followers")
dm = self.getAtt("dm")
print("mentions---"+ str(mentions) +":"+ str(comments) +":"+ str(followers) +":"+ str(dm))
#更改头像
def update_profile_image (self, filename):
user = self.api.update_profile_image(filename)
self.obj = user
id = self.getAtt("id")
profile_image_url = self.getAtt("profile_image_url")
print("update,uid="+ str(id) +",profile_image_url="+ profile_image_url)
def main():
"main function"
print(base64.b64decode(b'Q29weXJpZ2h0IChjKSAyMDEyIERvdWN1YmUgSW5jLiBBbGwgcmlnaHRzIHJlc2VydmVkLg==').decode())
for user_index in userInfo.user_info_temp:
try:
user_title = user_index['user']['title']
user_uid = user_index['user']['uid']
user_nickname = user_index['user']['nickname']
login_account = user_index['login']['account']
login_password = user_index['login']['password']
app_key = user_index['app']['key']
app_secret = user_index['app']['secret']
access_token_key = user_index['token']['key']
access_token_secret = user_index['token']['secret']
test = Test(app_key, app_secret)
test.auth(login_account, login_password)
test.show_self_id()
test.direct_message()
test.user_timeline(20)
test.comments(3386439134314734)
print()
except WeibopError as err:
print(">>>>>> WeibopError: " + err.reason)
except Exception as error:
print(">>>>>> PythonError " + str(error))
else:
None
finally:
None
if __name__ == '__main__':
main()