[雪峰磁针石博客]python工具库介绍-dubbo:通过telnet接口访问dubbo服务

简介: Django - The Easy Way pdf - 2017 [下载地址](https://itbooks.pipipan.com/fs/18113597-305049783) Django is a very powerful Python Web Framework.
## 简介

dubbo服务发布之后,我们可以利用telnet命令进行调试、管理。更多资料参见 [Telnet命令参考手册](http://alibaba.github.io/dubbo-doc-static/Telnet+Command+Reference-zh-showComments=true&showCommentArea=true.htm)

telnet 调用示例:


```python

$ telnet 172.17.103.110 9097
Trying 172.17.103.110...
Connected to 172.17.103.110.
Escape character is '^]'.

dubbo>ls com.oppo.sso.service.onekey.IOnekeyRegister
register
dubbo>invoke com.oppo.sso.service.onekey.IOnekeyRegister.register({"applicationKey":"mac","imei":"","mobile":"13244448888","createIP":"127.0.0.1","createBy":"172.17.0.1"})
{"configCountry":null,"userIdLong":0,"appPackage":null,"appVersion":null,"accountName":null,"romVersion":null,"resultCode":3001,"thirdStatus":null,"registerType":0,"sendChannel":null,"operator":null,"manufacturer":null,"password":null,"osVersion":null,"lock":false,"model":null,"visitorLocked":false,"OK":false,"brand":null,"email":null,"createIP":null,"deny":false,"encryptEmail":null,"sessionKey":null,"thirdId":null,"passwordOriginal":null,"mobile":null,"applicationKey":null,"thirdpartyOk":false,"userAgent":null,"userName":null,"resultDesc":"应用不存在","userId":0,"encryptMobile":null,"emailStatus":null,"createBy":null,"freePwd":false,"changeTimes":0,"createTime":null,"mobileStatus":null,"oldLock":false,"codeTimeout":null,"lastUpdate":null,"imei":null,"sessionTimeout":0,"sdkVersion":null,"networkID":0,"status":null}
elapsed: 98 ms.
dubbo>

```

## Python实现

源码:


```python

"""
Name: dubbo.py

Tesed in python3.5
"""

import json
import telnetlib
import socket


class Dubbo(telnetlib.Telnet):

    prompt = 'dubbo>'
    coding = 'utf-8'

    def __init__(self, host=None, port=0,
                 timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
        super().__init__(host, port)
        self.write(b'\n')

    def command(self, flag, str_=""):
        data = self.read_until(flag.encode())
        self.write(str_.encode() + b"\n")
        return data

    def invoke(self, service_name, method_name, arg):
        command_str = "invoke {0}.{1}({2})".format(
            service_name, method_name, json.dumps(arg))
        self.command(Dubbo.prompt, command_str)
        data = self.command(Dubbo.prompt, "")
        data = json.loads(data.decode(Dubbo.coding,
                                      errors='ignore').split('\n')[0].strip())
        return data

if __name__ == '__main__':

    conn = Dubbo('172.17.103.110', 9097)

    json_data = {
        "applicationKey":"mac",
        "imei":"",
        "mobile":"13244448888",
        "createIP":"127.0.0.1",
        "createBy":"172.17.0.1"
    }

    result = conn.invoke(
        "com.oppo.sso.service.onekey.IOnekeyRegister",
        "register",
        json_data
    )
    print(result)
```

执行结果:


```python

# python3 dubbo.py
{'manufacturer': None, 'applicationKey': None, 'OK': False, 'codeTimeout': None, 'password': None, 'encryptEmail': None, 'passwordOriginal': None, 'thirdId': None, 'emailStatus': None, 'freePwd': False, 'sessionTimeout': 0, 'createTime': None, 'osVersion': None, 'lastUpdate': None, 'email': None, 'sdkVersion': None, 'registerType': 0, 'sendChannel': None, 'visitorLocked': False, 'createIP': None, 'thirdStatus': None, 'encryptMobile': None, 'networkID': 0, 'resultCode': 3001, 'brand': None, 'changeTimes': 0, 'userAgent': None, 'imei': None, 'operator': None, 'romVersion': None, 'model': None, 'lock': False, 'sessionKey': None, 'configCountry': None, 'deny': False, 'userIdLong': 0, 'resultDesc': '应用不存在', 'status': None, 'createBy': None, 'thirdpartyOk': False, 'appPackage': None, 'appVersion': None, 'accountName': None, 'userId': 0, 'oldLock': False, 'userName': None, 'mobile': None, 'mobileStatus': None}
```


更复杂的例子。

源码:


```python

import json
import telnetlib
import socket


class Dubbo(telnetlib.Telnet):

    prompt = 'dubbo>'
    coding = 'gbk'

    def __init__(self, host=None, port=0,
                 timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
        super().__init__(host, port)
        self.write(b'\n')

    def command(self, flag, str_=""):
        data = self.read_until(flag.encode())
        self.write(str_.encode() + b"\n")
        return data

    def invoke(self, service_name, method_name, arg):
        command_str = "invoke {0}.{1}({2})".format(
            service_name, method_name, json.dumps(arg))
        self.command(Dubbo.prompt, command_str)
        data = self.command(Dubbo.prompt, "")
        data = json.loads(data.decode(Dubbo.coding,
                                      errors='ignore').split('\n')[0].strip())
        return data

if __name__ == '__main__':

    conn = Dubbo('183.131.22.99', 21881)

    content = {
        "sign": "FKeKnMEPybHujjBTzz11BrulB5av7pLhJpk=",
        "partnerOrder": "0511e0d38f334319a96920fa02be02a7",
        "productDesc": "hello",
        "paySuccessTime": "2016-08-25 18:33:04",
        "price": "1",
        "count": "1",
        "attach": "from_pay_demo",
        "date": "20160825",
    }
    content_json = json.dumps(content).replace('"', '\\"')

    json_data = {
        "requestId": "0511e0d38f334319a96920fa02be02a7",
        "reqUrl": 'http://www.oppo.com',
        "httpReqType": "POST",
        "headerMap": None,
        "reqContent": content_json,
        "appId": "10001",
        "productName": "test",
        "timeStamp": "1472121184957",
        "partnerId": "2031",
        "signType": "MD5",
        "sign": "23B621FBBF887373C65E553C2222258F",
        "successResult": "OK",
    }

    result = conn.invoke(
        "com.oppo.pay.notify.api.facade.NotifyApplyService",
        "httpNotify",
        json_data
    )
    print(result)
```

执行结果:


```python

$ python3 dubbo.py
{'resMsg': 'ǩ', 'data': None, 'errorDetail': 'sign check fail!', 'resCode': '100003', 'success': False}

```

## 待改进

* 进行多个项目的实验,增加异常处理

## 参考资料

-   [本文最新版本地址](https://china-testing.github.io/python3_lib_dubbo.html)
-   [本文涉及的python测试开发库](https://github.com/china-testing/python-api-tesing) 谢谢点赞!
-   [本文相关海量书籍下载](https://github.com/china-testing/python-api-tesing/blob/master/books.md)
-   [本文源码地址](https://github.com/china-testing/python-api-tesing/tree/master/python3_libraries/_dubbo)


相关文章
|
9天前
|
XML JSON 数据库
Python的标准库
Python的标准库
121 77
|
10天前
|
XML JSON 数据库
Python的标准库
Python的标准库
38 11
|
10天前
|
数据可视化 Python
以下是一些常用的图表类型及其Python代码示例,使用Matplotlib和Seaborn库。
通过这些思维导图和分析说明表,您可以更直观地理解和选择适合的数据可视化图表类型,帮助更有效地展示和分析数据。
49 8
|
16天前
|
人工智能 数据可视化 数据挖掘
探索Python编程:从基础到高级
在这篇文章中,我们将一起深入探索Python编程的世界。无论你是初学者还是有经验的程序员,都可以从中获得新的知识和技能。我们将从Python的基础语法开始,然后逐步过渡到更复杂的主题,如面向对象编程、异常处理和模块使用。最后,我们将通过一些实际的代码示例,来展示如何应用这些知识解决实际问题。让我们一起开启Python编程的旅程吧!
|
14天前
|
存储 数据采集 人工智能
Python编程入门:从零基础到实战应用
本文是一篇面向初学者的Python编程教程,旨在帮助读者从零开始学习Python编程语言。文章首先介绍了Python的基本概念和特点,然后通过一个简单的例子展示了如何编写Python代码。接下来,文章详细介绍了Python的数据类型、变量、运算符、控制结构、函数等基本语法知识。最后,文章通过一个实战项目——制作一个简单的计算器程序,帮助读者巩固所学知识并提高编程技能。
|
3天前
|
Unix Linux 程序员
[oeasy]python053_学编程为什么从hello_world_开始
视频介绍了“Hello World”程序的由来及其在编程中的重要性。从贝尔实验室诞生的Unix系统和C语言说起,讲述了“Hello World”作为经典示例的起源和流传过程。文章还探讨了C语言对其他编程语言的影响,以及它在系统编程中的地位。最后总结了“Hello World”、print、小括号和双引号等编程概念的来源。
97 80
|
21天前
|
存储 索引 Python
Python编程数据结构的深入理解
深入理解 Python 中的数据结构是提高编程能力的重要途径。通过合理选择和使用数据结构,可以提高程序的效率和质量
133 59
|
1天前
|
分布式计算 大数据 数据处理
技术评测:MaxCompute MaxFrame——阿里云自研分布式计算框架的Python编程接口
随着大数据和人工智能技术的发展,数据处理的需求日益增长。阿里云推出的MaxCompute MaxFrame(简称“MaxFrame”)是一个专为Python开发者设计的分布式计算框架,它不仅支持Python编程接口,还能直接利用MaxCompute的云原生大数据计算资源和服务。本文将通过一系列最佳实践测评,探讨MaxFrame在分布式Pandas处理以及大语言模型数据处理场景中的表现,并分析其在实际工作中的应用潜力。
14 2
|
15天前
|
小程序 开发者 Python
探索Python编程:从基础到实战
本文将引导你走进Python编程的世界,从基础语法开始,逐步深入到实战项目。我们将一起探讨如何在编程中发挥创意,解决问题,并分享一些实用的技巧和心得。无论你是编程新手还是有一定经验的开发者,这篇文章都将为你提供有价值的参考。让我们一起开启Python编程的探索之旅吧!
41 10
|
18天前
|
机器学习/深度学习 人工智能 Java
Python 语言:强大、灵活与高效的编程之选
本文全面介绍了 Python 编程语言,涵盖其历史、特点、应用领域及核心概念。从 1989 年由 Guido van Rossum 创立至今,Python 凭借简洁的语法和强大的功能,成为数据科学、AI、Web 开发等领域的首选语言。文章还详细探讨了 Python 的语法基础、数据结构、面向对象编程等内容,旨在帮助读者深入了解并有效利用 Python 进行编程。
下一篇
DataWorks