{ "cells": [ { "cell_type": "code", "execution_count": null, "id": "286b8920-1e0b-4b9b-b524-a2ac1122104c", "metadata": {}, "outputs": [], "source": [ "import asyncio\n", "import websockets\n", "import base64\n", "from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes\n", "from cryptography.hazmat.backends import default_backend\n", "import json\n", "\n", "# Конфигурация шифрования\n", "KEY = b'8888888888888888'\n", "IV_LENGTH = 16\n", "\n", "async def decrypt_message(encrypted_message):\n", " # Декодируем base64\n", " encrypted_data = base64.b64decode(encrypted_message)\n", "\n", " # Извлекаем IV и зашифрованный текст\n", " iv = encrypted_data[:IV_LENGTH]\n", " cipher_text = encrypted_data[IV_LENGTH:]\n", "\n", " # Создаем объект Cipher для расшифровки\n", " cipher = Cipher(algorithms.AES(KEY), modes.CBC(iv), backend=default_backend())\n", " decryptor = cipher.decryptor()\n", "\n", " # Расшифровываем данные\n", " decrypted_data = decryptor.update(cipher_text) + decryptor.finalize()\n", "\n", " # Удаляем PKCS5Padding\n", " padding_length = decrypted_data[-1]\n", " decrypted_data = decrypted_data[:-padding_length]\n", "\n", " return decrypted_data.decode('utf-8')\n", "\n", "async def listen():\n", " uri = \"ws://45.67.56.214:8088/users\"\n", " async with websockets.connect(uri) as websocket:\n", " while True:\n", " message = await websocket.recv()\n", " encrypted_messages = json.loads(message)\n", " for encrypted_message in encrypted_messages:\n", " decrypted_message = await decrypt_message(encrypted_message)\n", " print(decrypted_message)\n", "\n", "asyncio.get_event_loop().run_until_complete(listen())" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.12.4" } }, "nbformat": 4, "nbformat_minor": 5 }