dubbo接口测试四: Python调用Dubbo接口

DubboClient:是一个基于telnetlib类库封装的,用于测试dubbo接口的客户端工具类

说明:

• telnetlib 是python内置的模块,用来创建Telnet协议的连接

• telnetlib 模块提供一个实现telnet协议的类 Telnet,Dubboclient,封装了 telnetlib 库。

安装dubboclient

pip install dubboclient

检查是否已经安装:pip list 或 pip show dubboclient

实用案例

案例一

根据手机号,查询会员信息(传递普通参数)

图片[1] - dubbo接口测试四: Python调用Dubbo接口 - 正则时光

实现代码

# 1. 导包 from dubboclient import DubboClient 
from dubboclient import DubboClient 
# 2. 创建 DubboClient类实例,指定 IP 和 port 
dubboclt = DubboClient("211.103.136.244", 6502) 
# 3. 使用 实例调用 invoke() 方法。 传入 :服务名、方法名、实参(方法使用)。获取响应结果 
resp = dubboclt.invoke("MemberService", "findByTelephone", "13020210001") 
# 4. 打印响应结果 
print("响应结果 =", resp) 
print("type(resp) =", type(resp))

案例二

添加会员(传递 对象参数)

参数类型为自定义类型时,传送参数时要增加键值对:class:完整包名和类名

图片[2] - dubbo接口测试四: Python调用Dubbo接口 - 正则时光

实现代码:

# 1. 导包 
from dubboclient import DubboClient 
# 2. 创建 dubboclient 实例 
dubboclt = DubboClient("211.103.136.244", 6502) 
# 准备 add 方法,所需要的数据 
info = {"id": 911, "name": "杜甫", "phoneNumber": "13048379884"} 
# 如果 class 已经存在,覆盖原有class值; 如果不存在 class,新增一组 元素到 字典中。 
info["class"] = "com.itheima.pojo.Member" 
# 3. 调用 invoke 传 服务名、方法名、实参。得响应结果 
resp = dubboclt.invoke("MemberService", "add", info) 
# 4. 打印 
print("响应结果 =", resp) 
print("type(resp) =", type(resp))

案例三

添加预约设置

图片[3] - dubbo接口测试四: Python调用Dubbo接口 - 正则时光

实现代码:

# 1. 导包 from dubboclient import DubboClient 
# 2. 创建 dubboclient 实例 
dubboclt = DubboClient("211.103.136.244", 6502) 
# 准备 add 方法,所需要的数据 
info = [{"orderDate": "2021-05-18 18:89:02", "number": 346}]
# 3. 调用 invoke 传 服务名、方法名、实参。得响应结果 
resp = dubboclt.invoke("OrderSettingService", "add", info) 
# 4. 打印 
print("响应结果 =", resp) 
print("type(resp) =", type(resp))

Dubbo框架封装

看了上面的代码,大家可能都发觉远程调用的这几个dubbo接口 存在的问题:

1. 代码有 大量冗余

2. 测试接口时,除了要给 测试数据之外, 还需要 指定 服务名、方法名

3. 传参时,除了要考虑测试数据外,还要分析是否要添加 class 字段 及 对应数据。

4. 返回的数据类型统一为 string(不具体)

图片[4] - dubbo接口测试四: Python调用Dubbo接口 - 正则时光

基础服务对象

# api/base_service.py 
from dubboclient import DubboClient 
class BaseService: 
   """基础服务""" 
   def __init__(self): 
       self.dubbo_client = DubboClient("211.103.136.244", 6502)

服务对象

class MemberService(BaseService): 
    """会员服务接口""" 
    def __init__(self): 
        super().__init__() 
        self.service_name = "MemberService" 
        self.find_by_telephone_method = "findByTelephone" 

    def find_by_telephone(self, telephone): 
        return self.dubbo_client.invoke(self.service_name, self.find_by_telephone_method, telephone)

测试用例对象

class TestMemberService(unittest.TestCase): 
    @classmethod 
    def setUpClass(cls) -> None: 
        cls.member_service = MemberService() 
    def test01_find_by_telephone_success(self): 
        """查询成功""" 
        telephone = "13520196139" 
        response = self.member_service.find_by_telephone(telephone) 
        print("response===", response) 

        json_data = json.loads(response) 
        self.assertEqual(telephone, json_data.get("phoneNumber"))
图片[5] - dubbo接口测试四: Python调用Dubbo接口 - 正则时光
© 版权声明
THE END
喜欢就支持一下吧
点赞10 分享