python版本使用椭圆曲线执行密钥交换

水一篇,

BirdTalk服务端基本快写完了,开始写一个完整的客户端测试;

决定从python入手,因为与其他功能对接时候或者写机器人客服,脚本用的比较多;

直接上代码,原理参考之前的文档。

from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
import os
import struct
import base64
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC

class ECDHKeyExchange:
    def __init__(self):
        self.private_key = None
        self.public_key = None
        self.shared_key = None
        self.key_print = 0

    def generate_key_pair(self):
        """Generate an ECDH key pair."""
        self.private_key = ec.generate_private_key(ec.SECP256R1(), default_backend())
        self.public_key = self.private_key.public_key()
        print("Key pair generated.")

    def export_public_key(self, filename):
        """Export the generated public key to a file."""
        if self.public_key is None:
            raise ValueError("Public key not generated yet.")
        pem = self.public_key.public_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PublicFormat.SubjectPublicKeyInfo
        )
        if filename != "":
            with open(filename, 'wb') as f:
                f.write(pem)
            print(f"Public key saved to {filename}.")
        return pem.decode('utf-8')
    
    def get_public_key(self):
        if self.public_key is None:
            raise ValueError("Public key not generated yet.")
        pem = self.public_key.public_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PublicFormat.SubjectPublicKeyInfo
        )
        return pem


    def exchange_keys_from_file(self, peer_public_key_filename):
        """Exchange keys and generate the shared key using the peer's public key."""
        if self.private_key is None:
            raise ValueError("Private key not generated yet.")
        with open(peer_public_key_filename, 'rb') as f:
            peer_public_key_pem = f.read()
        peer_public_key = serialization.load_pem_public_key(peer_public_key_pem, backend=default_backend())
        self.shared_key = self.private_key.exchange(ec.ECDH(), peer_public_key)
        print("Shared key generated.")
    
    def exchange_keys(self, peer_public_key_pem):
        """Exchange keys and generate the shared key using the peer's public key PEM string."""
        if self.private_key is None:
            raise ValueError("Private key not generated yet.")
        peer_public_key = serialization.load_pem_public_key(peer_public_key_pem.encode('utf-8'), backend=default_backend())
        self.shared_key = self.private_key.exchange(ec.ECDH(), peer_public_key)
        print("Shared key generated.")

    def get_shared_key(self):
        if self.shared_key is None:
            raise ValueError("Shared key not generated yet.")
        
        return self.shared_key

    def save_shared_key(self, filename):
        """Save the shared key to a file."""
        if self.shared_key is None:
            raise ValueError("Shared key not generated yet.")
        with open(filename, 'wb') as f:
            f.write(self.shared_key)
        print(f"Shared key saved to {filename}.")
    
    def export_shared_key_base64(self):
        """Export the shared key as a base64 encoded string."""
        if self.shared_key is None:
            raise ValueError("Shared key not generated yet.")
        return base64.b64encode(self.shared_key).decode('utf-8')

    def load_shared_key(self, filename):
        """Load the shared key from a file."""
        with open(filename, 'rb') as f:
            self.shared_key = f.read()
        print(f"Shared key loaded from {filename}.")

    def save_key_print(self, filename):
        """Save the shared key to a file."""
        if self.key_print is None:
            raise ValueError("Shared key not generated yet.")
        with open(filename, 'wb') as f:
            data = str(self.key_print).encode('utf-8')
            f.write(data)
        print(f"key print saved to {filename}.")

    def load_key_print(self, filename):
        """Load the shared key from a file."""
        with open(filename, 'rb') as f:
            data = f.read()
            str = data.decode('utf-8')
            self.key_print = int(str)
        print(f"key print loaded from {filename}.")
        return self.key_print

    def delete_key_file(self, filename):
        """Delete a key file."""
        if os.path.exists(filename):
            os.remove(filename)
            print(f"File {filename} deleted.")
        else:
            print(f"File {filename} does not exist.")
    
    def get_int64_print(self):
        if self.shared_key is None:
            raise ValueError("Shared key not generated yet.")
         
        """Convert a byte array to a 64-bit integer."""
        # 检查字节数组长度是否足够
        if len(self.shared_key) < 8:
            raise ValueError("Insufficient bytes to convert to int64")

        # 将字节数组转换为 int64
        self.key_print = struct.unpack('<q',  self.shared_key[:8])[0]  # 使用 little-endian 格式

        return self.key_print
    
    def encrypt_aes_ctr(self, plaintext):
        if self.shared_key is None:
            raise ValueError("Shared key not generated yet.")
        
        # 生成随机的初始化向量(IV)
        iv = os.urandom(16)  # 初始化向量长度为 16 字节

        # 创建 AES-CTR 算法对象
        algorithm = algorithms.AES(self.shared_key)
        mode = modes.CTR(iv)
        cipher = Cipher(algorithm, mode, backend=default_backend())

        # 使用加密器加密数据
        encryptor = cipher.encryptor()
        encrypted_data = encryptor.update(plaintext) + encryptor.finalize()

        # 将随机初始化向量和加密后的数据拼接在一起
        ciphertext = iv + encrypted_data

        return ciphertext
    
    def decrypt_aes_ctr(self, ciphertext):
        """Decrypt ciphertext using AES-CTR with the shared key."""
        if self.shared_key is None:
            raise ValueError("Shared key not generated yet.")
        
        # 从密文中提取初始化向量(IV)
        iv = ciphertext[:16]  # 初始化向量长度为 16 字节
        encrypted_data = ciphertext[16:]

        # 创建 AES-CTR 算法对象
        algorithm = algorithms.AES(self.shared_key)
        mode = modes.CTR(iv)
        cipher = Cipher(algorithm, mode, backend=default_backend())

        # 使用解密器解密数据
        decryptor = cipher.decryptor()
        plaintext = decryptor.update(encrypted_data) + decryptor.finalize()

        return plaintext
###############################################################
def test_create():
    # 实例化两个 ECDHKeyExchange 对象,模拟两个参与方
    alice = ECDHKeyExchange()
    bob = ECDHKeyExchange()

    # Alice 生成密钥对并导出公钥
    alice.generate_key_pair()
    alice_pub_key = alice.export_public_key("alice_public_key.pem")
    print(alice_pub_key)

    # Bob 生成密钥对并导出公钥
    bob.generate_key_pair()
    bob_pub_key = bob.export_public_key("bob_public_key.pem")
    print(bob_pub_key)

    # Alice 和 Bob 交换公钥并生成共享密钥
    alice.exchange_keys(bob_pub_key)
    bob.exchange_keys(alice_pub_key)

    # 保存共享密钥
    alice.save_shared_key("alice_shared_key.bin")
    bob.save_shared_key("bob_shared_key.bin")

    # 加载共享密钥
    alice.load_shared_key("alice_shared_key.bin")
    bob.load_shared_key("bob_shared_key.bin")
    print(alice.get_int64_print())
    print(bob.get_int64_print())

    alice.save_key_print("alice_key_print.txt")
    keyprint = alice.load_key_print("alice_key_print.txt")
    print(keyprint)

    # 删除密钥文件
    # alice.delete_key_file("alice_public_key.pem")
    # alice.delete_key_file("alice_shared_key.bin")
    # bob.delete_key_file("bob_public_key.pem")
    # bob.delete_key_file("bob_shared_key.bin")

def test_load():
    alice = ECDHKeyExchange()
    alice.load_shared_key("alice_shared_key.bin")
    #print(alice.get_shared_key())
    key_print = alice.load_key_print("alice_key_print.txt")
    print(key_print)

    tm = '123456789412'
    ciper = alice.encrypt_aes_ctr(tm.encode('utf-8'))
    plain = alice.decrypt_aes_ctr(ciper)
    print(plain)

# 示例使用
if __name__ == "__main__":
    test_load()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/712765.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Java SSTI服务端模版注入漏洞原理与利用

文章目录 前言Velocity基础语法基础示例命令执行 靶场实践漏洞代码漏洞验证检测工具 FreeMarker基础示例漏洞示例CMS案例 Thymeleaf基础示例漏洞示例安全方案 总结 前言 SSTI&#xff08;Server Side Template Injection&#xff09;全称服务端模板注入漏洞&#xff0c;在 Jav…

开放式耳机值得入手买吗?可以对比这几款开放式耳机看看

居家办公时&#xff0c;选择一款合适的耳机能够有效地提高工作效率。入耳式耳机虽然能够有效地隔绝外界噪音&#xff0c;但长时间佩戴会对耳朵造成负担&#xff0c;甚至引发耳道感染。而头戴式耳机虽然能够提供更好的音质&#xff0c;但体积较大&#xff0c;佩戴起来不够灵活。…

PyTorch -- Batch Normalization(BN) 快速实践

Batch Normalization 可以 改善梯度消失/爆炸问题&#xff1a;前面层的梯度经过多次传递后会变得非常小(大)&#xff0c;从而导致网络收敛速度慢(不收敛)&#xff0c;应用 BN 可缓解加速网络收敛&#xff1a;BN 使得每个神经元的输入分布更加稳定减少过拟合&#xff1a;BN 可减…

求导,积分

求导公式&#xff1a; 复合函数求导法则&#xff1a;两个函数导函数的乘积. 例如&#xff1a;f(x)2x1,f(x)2,g(x)x^24x4,g(x)2x4 那么复合函数&#xff1a; g(f(x))(2x1)^24(2x1)4 把&#xff08;2x1&#xff09;看做整体,则g2(2x1)4 然后再求&#xff08;2x1&#xff09;的导函…

LeetCode | 2879.显示前三行

在 pandas 中&#xff0c;可以使用 head() 方法来读取 DataFrame 的前几行数据。如果想读取指定数量的行&#xff0c;可以在 head() 方法中传入一个参数 n&#xff0c;读取前 n 行 import pandas as pddef selectFirstRows(employees: pd.DataFrame) -> pd.DataFrame:retur…

Dictionary 字典

文章目录 一、什么是字典1.1 字典的创建方式 一、什么是字典 字典&#xff1a; 用来存储数据&#xff0c;与列表和元组不一样的是&#xff0c;字典以键值对的形式对数据进行存储&#xff0c;也就是 key 和 value。相当于 Java 中的 Map。 注意&#xff1a; 1、 key 的值不可重…

C++进阶(一)

个人主页&#xff1a;PingdiGuo_guo 收录专栏&#xff1a;C干货专栏 前言 本篇博客是讲解函数的重载以及引用的知识点的。 文章目录 前言 1.函数重载 1.1何为函数重载 1.2函数重载的作用 1.3函数重载的实现 2.引用 2.1何为引用 2.2定义引用 2.3引用特性 2.4常引用 2…

认识一些分布函数-Frechet分布及其应用

1. 何为Frechet分布 Frechet分布也称为极值分布(EVD)类型II,用于对数据集中的最大值进行建模。它是四种常用极值分布之一。另外三种是古贝尔分布、威布尔分布和广义极值分布(Gumbel Distribution, the Weibull Distribution and the Generalized Extreme Value Distributi…

34 Debian如何配置ELK群集

作者:网络傅老师 特别提示:未经作者允许,不得转载任何内容。违者必究! Debian如何配置ELK群集 《傅老师Debian知识库系列之34》——原创 ==前言== 傅老师Debian知识库特点: 1、拆解Debian实用技能; 2、所有操作在VMware虚拟机实测完成; 3、致力于最终形成Debian知识手…

LVS-DR模式详解:提升网站性能的最佳解决方案

LVS-DR模式原理 用户请求到达Director Server&#xff1a; 用户请求到达Director Server&#xff08;负载均衡服务器&#xff09;&#xff0c;数据包首先到达内核空间的PREROUTING链。数据包源IP&#xff1a;CIP&#xff0c;目标IP&#xff1a;VIP&#xff0c;源MAC&#xff1a…

【内存管理之C语言数组】

1.栈空间上的C数组 糟糕的可用性&#xff0c;但是你将在遗留代码中见到它们 相同类型的对象的内存块 大小必须是常量表达式 第一个元素索引为0 2.指针和C数组 更奇怪的是&#xff1a;数组标识符退化为指向第一个元素的指针 3.访问数组 4.堆空间上的C数组 相同类型的对象的内…

数据库开发——并发控制(第十一章)

文章目录 前言并发执行例题一、封锁二、封锁协议三、可串行调度四、总结 学习目标&#xff1a;重点为并发控制的基本概念及几个基本协议 前言 数据库管理系统必须提供并发控制机制&#xff0c;保证事务的隔离性和一致性 并发执行例题 一、封锁 排他锁称为写锁&#xff0c;共…

智能化状态管理:自动状态流转处理模块

目录 基本背景介绍 具体实现 基本数据准备 基本数据表 状态转换常量 状态转换注解 任务处理模版 各任务实现逻辑 开启比对任务进行处理 降噪字段处理任务处理 开启业务数据比对处理 业务数据比对处理 开始核对数据生成最终报告处理 核对数据生成最终报告处理 状…

小红书教程简化版,从0开始走向专业,小红书-主理人培养计划 (13节)

课程目录 1-小红书分析与拆解.mp4 2-小红书电商玩法.mp4 3-小红书基础信息设置10_1.mp4 4-小红书如何开店&#xff1f;.mp4 5-小红书店铺设置&#xff08;1&#xff09;.mp4 5-小红书店铺设置.mp4 6-小红书笔记制作与产品发布.mp4 7-小红书运营的文案与标题.mp4 8-小红…

Spring Boot 自定义Starter

自定义starter 创建pom项目 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.ap…

MySQL的三种重要的日志

日志 Mysql有三大日志系统 Undo Log&#xff08;回滚日志&#xff09;&#xff1a;记录修改前的数据&#xff0c;用于事务回滚和 MVCC&#xff08;多版本并发控制&#xff09;。 Redo Log&#xff08;重做日志&#xff09;&#xff1a;记录数据变更&#xff0c;用于崩溃恢复&…

XAMPP PHP-CGI 远程代码执行漏洞(CVE-2024-4577)

漏洞概述&#xff1a; PHP 是一种被广泛应用的开放源代码的多用途脚本语言&#xff0c;PHP-CGI 是 PHP 自带的 FastCGI 管理器。是一个实现了 CGI 协议的程序&#xff0c;用来解释 PHP 脚本的程序&#xff0c;2024 年 6 月 7 日&#xff0c;推特安全上 orange 公开了其漏洞细节…

基于Wireshark实现对FTP的抓包分析

基于Wireshark实现对FTP的抓包分析 前言一、虚拟机Win10环境配置二、FileZilla客户端的安装配置下载FileZilla客户端安装FileZilla 三、FileZilla Server安装下载FileZilla Server安装 四、实现对FTP的抓包前置工作实现抓包完成抓包 前言 推荐一个网站给想要了解或者学习人工智…

MySQL学习笔记-进阶篇-SQL优化

SQL优化 插入数据 insert优化 1&#xff09;批量插入 insert into tb_user values(1,Tom),(2,Cat),(3,Jerry); 2&#xff09;手动提交事务 mysql 默认是自动提交事务&#xff0c;这样会导致频繁的开启和提交事务&#xff0c;影响性能 start transaction insert into tb_us…

【面经总结】Java基础 - SPI

SPI 什么是 SPI&#xff1f; 提供给服务提供者去使用的一个接口 SPI 的优点 低耦合配置灵活多态性 SPI 的应用场景 JDBCSLF4J 日志