目录
Welcome to Code Block's blog
本篇文章主要介绍了
[小试牛刀-Turbine数据分发]
❤博主广交技术好友,喜欢文章的可以关注一下❤
一、什么是Turbine算法?
Turbine 算法是一种用于区块链网络(尤其是 Solana 区块链)中高效数据传输和共识的算法。它的核心目标是在高吞吐量和低延迟的网络环境中,优化数据的传播和验证过程。Turbine 是 Solana 区块链的关键组件之一,帮助其实现每秒处理数万笔交易的能力。
二、流程图
编辑 2-1 流程图
如图2-1的所示,当客户端发送transaction到RPC节点后,会被分发到Leader节点(领导者节点),然后领导者节点对数据块进行封装后,会分发到验证节点(Validator)。当数据块数据过大时,数据的分发就会占用大量的网络带宽,造成网络短时负载增加。在区块链网络中,这种网络负载的造成的影响是巨大的,所以使用Turbine算法解决网络中的负载问题。如下图(2-2)所示:
编辑 2-2 流程图
当领导者分发包装的区块数据量过大时,将块(Block)拆分为小的Shred并对其进行签名然后分发到验证节点,这样验证节点对接收到得小的Shred验证并进行拼接,最终组成完整的块,然后对块内的交易数据进行验证,并最终完成整个区块的验证。
三、模拟实现
3.1 Leader实现
为了更好的理解Turbine算法的功能实现,这里使用Python代码对其流程进行实现。为更好的模拟领导者节点和验证者节点,这里使用google的grpc依赖创建leader(server)和validator(client)进行演示。
首先准备用户发送到链上的数据信息,这里模拟可直接对其进行定义,其定义如下:
# 接收到的交易数据 transaction = { "message_header": { "version": 1, "account_count": 2, "signature_count": 1 }, "account_keys": ["Alice", "Bob"], "recent_blockhash": "5KQmYg7s8v9wX2y3z4a5b6c7d8e9f0g1h2i3j4k5l6m7n8o9p0q1r2s3t4u5v6w7", "instructions": [], "program_id": "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA" }
当用户发送该数据到链上之前应对其进行签名(模拟私钥签名),使用pynacl依赖使用私钥对数据进行签名并添加到要发送transcation中,其代码如下:
def get_signkey()->None: """ 生成并返回签名密钥。 """ config = load_config("config.yml") # 加载配置文件 private_key = config["private_key"] # 获取私钥 seed = base58.b58decode(private_key)[:32] # 解码私钥并取前32字节作为种子 signing_key = SigningKey(seed) # 使用种子生成签名密钥 return signing_key def sign_transaction(transaction:dict[str,Any]) -> dict[str,Any]: """ 对交易数据进行签名并返回签名后的交易数据。 """ transaction_signature = get_signkey().sign(json.dumps(transaction).encode()).signature # 对交易数据进行签名 transaction.update({"signature": base58.b58encode(transaction_signature).decode()}) # 将签名添加到交易数据中 return transaction
之后对数据进行分片,即将其打包为Shred列表,为发送数据做准备,代码如下:
def create_shreds(transaction_data:str) -> list[Shred]: """ 将交易数据分片并返回分片对象列表。 """ shreds = [] for i in range(0, len(transaction_data), SHRED_LENGTH): payload = transaction_data[i:i+SHRED_LENGTH] # 分片数据 base58_payload = base58.b58encode(payload).decode() # 对分片数据进行 base58 编码 shred = Shred(i // SHRED_LENGTH, len(transaction_data) // SHRED_LENGTH + 1, base58_payload) # 创建分片对象 signKey = get_signkey() # 获取签名密钥 shred.sign_shred(signKey) # 对分片进行签名 shreds.append(shred) return shreds
这里对数据根据SHRED_LENGTH进行分割,同时将分割Json数据转换base58编码,并对Shred进行签名,其内容格式如下(index,total,payload,signature):
Shred(0, 4, 5nZVN1neN8mfwrCcs3VAa1m5TDiaYrKP19fQ495t2ZrQJuvwh3 JHdvna11pcYbDng7B4LzjyTMXXbBps61PCPQGqVj5HJmDPZMv1Ng8k2soWZEAE 9SHxLeaEhfMQLKVMs4Q4dCezp, 2g4436FiWE61hSbNbsn35x3Po2DaYJcC9uBKEY 7NE7ZiPzTpYb4ijPVWJhLna7ZdhLxR3BhUdczDZkwQjobYaAJJ)
最后将打包好的Shred列表通过grpc服务发送到validator节点,其代码如下:
class StreamService(sync_pb2_grpc.StreamServiceServicer): """ 实现 gRPC 服务的类。 """ def BiStream(self, request_iterator, context): """ 双向流方法。 """ while True: signed_transaction = sign_transaction(transaction.copy()) # 对交易数据进行签名 transaction_data = json.dumps(signed_transaction) # 将交易数据转换为 JSON 字符串 shreds = create_shreds(transaction_data) # 创建分片对象列表 for shred in shreds: response_data = json.dumps(shred.__dict__) # 将分片对象转换为 JSON 字符串 yield sync_pb2.SyncResponse(success=True, data=response_data) # 服务器持续返回数据 break def serve(): """ 创建并启动 gRPC 服务器。 """ server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) sync_pb2_grpc.add_StreamServiceServicer_to_server(StreamService(), server) server.add_insecure_port("[::]:50051") server.start() server.wait_for_termination() if __name__ == "__main__": serve()
3.2 Validator实现
当Validator连接到服务器后,会接收到服务器发送的Shred数据,因为领导者节点在发送数据时对每个Shred进行了签名,所以这里可以直接对每个Shred数据进行验证签名(该操作保证了领导者节点发送的Shred数据完整性)。然后对Shred携带的Payload数据进行组合,得到数据(块数据),然后根据用户携带的签名,对用户数据进行验证(即transaction签名)。其代码如下:
def process_responses(stub): """ 处理来自 gRPC 服务器的响应。 参数: stub (sync_pb2_grpc.StreamServiceStub): gRPC 客户端存根。 返回: bytes: 累积的有效负载数据。 """ responses = stub.BiStream(empty_request_iterator()) payload = b"" for response in responses: res_data = json.loads(response.data, object_hook=json_to_shred) verify = res_data.verify_shred(get_verify_key()) print("验证每个shred的签名:",verify) print(f"Received from Server: {response.success}:{res_data}:{verify}") payload += base58.b58decode(res_data.payload) if res_data.index + 1 == res_data.total: break return payload def verify_payload_signature(payload): """ 验证累积的有效负载数据的签名。 参数: payload (bytes): 累积的有效负载数据。 抛出: nacl.exceptions.BadSignatureError: 如果签名验证失败。 """ dict_data = json.loads(payload.decode('utf-8')) signature = dict_data.pop('signature') print("接收到的组合完成的数据:", dict_data) payload_verify = json.dumps(dict_data).encode() signature_bytes = base58.b58decode(signature) print("需验证的整体payload:",payload_verify) print("需验证的签名:",signature_bytes) get_verify_key().verify(payload_verify, signature_bytes) print("signature verify success")
需要完整代码可以去我的GitHub获取.
对区块链内容感兴趣可以查看我的专栏:小试牛刀-区块链
感谢您的关注和收藏!!!!!!
编辑