Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
[project]
name = "floresta-functional-tests"
version = "0.0.2"
description = "Collection of tools to help with functional testing of Floresta"
authors = [{name = "The Floresta Project Developers"}]
license = {text = "MIT"}
version = "0.0.3"
description = "Collection of tools to help with the functional tests of Floresta"
authors = [{ name = "The Floresta Project Developers" }]
license = { text = "MIT" }
requires-python = ">=3.12"
dependencies = [
"jsonrpclib>=0.2.1",
"requests>=2.32.3",
"black>=24.10.0",
"pylint>=3.3.2",
"jsonrpclib>=0.2.1",
"requests>=2.32.3",
"black>=24.10.0",
"pylint>=3.3.2",
"cryptography>=44.0.2",
]

[tool.hatch.build.targets.wheel]
Expand Down
52 changes: 52 additions & 0 deletions tests/florestad/ssl-fail-test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""
florestad/ssl-fail-test.py

This functional test checks the failure on connect to florestad's TLS port.
"""

import errno

from test_framework.electrum_client import ElectrumClient
from test_framework.floresta_rpc import REGTEST_RPC_SERVER
from test_framework.test_framework import FlorestaTestFramework


class TestSslFailInitialization(FlorestaTestFramework):
"""
Test the initialization of florestad without --ssl-key-path and --ssl-cert-path
(and without proper keys), a request from Electrum client to TLS port and its failure.
"""

nodes = [-1]
electrum = None

def set_test_params(self):
"""
Setup a single node without SSL
"""
TestSslFailInitialization.nodes[0] = self.add_node_settings(
chain="regtest", extra_args=[], rpcserver=REGTEST_RPC_SERVER, ssl=False
)

def run_test(self):
"""
Run the no-ssl node, create an electrum client that will try to connect to port 50002,
and assert a connection refused failure.
"""
self.run_node(TestSslFailInitialization.nodes[0])
self.wait_for_rpc_connection(TestSslFailInitialization.nodes[0])

# now try create a connection with an electrum client at default port
# it must fail, since the TLS port isnt opened
try:
TestSslFailInitialization.electrum = ElectrumClient("0.0.0.0", 50002)

except ConnectionRefusedError as exc:
assert exc.errno == errno.ECONNREFUSED

# Shutdown node
self.stop_node(TestSslFailInitialization.nodes[0])


if __name__ == "__main__":
TestSslFailInitialization().main()
51 changes: 51 additions & 0 deletions tests/florestad/ssl-test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""
florestad/ssl-test.py

This functional test tests the proper creatiion of a TLS port on florestad.
"""

from test_framework.electrum_client import ElectrumClient
from test_framework.floresta_rpc import REGTEST_RPC_SERVER
from test_framework.test_framework import FlorestaTestFramework


class TestSslInitialization(FlorestaTestFramework):
"""
Test the initialization of florestad with --ssl-key-path and --ssl-cert-path and
a request from Electrum client to TLS port and its success.
"""

nodes = [-1]
electrum = None

def set_test_params(self):
"""
Setup a single node and a electrum client at port 50002
"""
TestSslInitialization.nodes[0] = self.add_node_settings(
chain="regtest", extra_args=[], rpcserver=REGTEST_RPC_SERVER, ssl=True
)

def run_test(self):
"""
Run the ssl node, create a electrum client that will try to connect to port 50002.
Send a ping to make sure everything is working.
"""
self.run_node(TestSslInitialization.nodes[0])
self.wait_for_rpc_connection(TestSslInitialization.nodes[0])

# now create a connection with an electrum client at default port
TestSslInitialization.electrum = ElectrumClient("0.0.0.0", 50002)

# request something to TLS port
result = TestSslInitialization.electrum.ping()
self.log(result)

assert result is not None

# Shutdown node
self.stop_node(TestSslInitialization.nodes[0])


if __name__ == "__main__":
TestSslInitialization().main()
79 changes: 79 additions & 0 deletions tests/test_framework/crypto/pkcs8.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
"""
tests/test_framework/crypto/pkcs8.py

This module generate proper PKCS#8 private keys and certificate
"""

import os
from datetime import datetime, timedelta
from typing import Tuple

from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey
from cryptography.x509.oid import NameOID

DEFAULT_PUBLIC_EXPONENT = 65537
DEFAULT_KEY_SIZE = 2048
DEFAULT_CN = "vinteumorg"
DEFAULT_DAYS = 1


def create_pkcs8_private_key(
path: str,
public_exponent: int = DEFAULT_PUBLIC_EXPONENT,
key_size: int = DEFAULT_KEY_SIZE,
) -> Tuple[str, RSAPrivateKey]:
"""
Generate private key in a proper format PKCS#8
"""
pk = rsa.generate_private_key(public_exponent=public_exponent, key_size=key_size)

# Serialize and save key
pem = pk.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
)
pk_path = os.path.join(path, "key.pem")
with open(pk_path, "wb") as f:
f.write(pem)

return (pk_path, pk)


def create_pkcs8_self_signed_certificate(
path: str,
pk: RSAPrivateKey,
common_name: str = DEFAULT_CN,
validity_days: int = DEFAULT_DAYS,
) -> str:
"""
Generate a self signed certificate in a proper format PKCS#8
"""
# Create subject/issuer name
subject = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, common_name)])

# Certificate validity period
now = datetime.utcnow()
validity = timedelta(days=validity_days)

# Build and sign certificate
cert = (
x509.CertificateBuilder()
.subject_name(subject)
.issuer_name(subject)
.public_key(pk.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(now)
.not_valid_after(now + validity)
.sign(pk, hashes.SHA256())
)

# Save certificate
cert_path = os.path.join(path, "cert.pem")
with open(cert_path, "wb") as f:
f.write(cert.public_bytes(serialization.Encoding.PEM))

return cert_path
69 changes: 62 additions & 7 deletions tests/test_framework/test_framework.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
import tempfile
import subprocess
from test_framework.floresta_rpc import FlorestaRPC
from test_framework.crypto.pkcs8 import (
create_pkcs8_private_key,
create_pkcs8_self_signed_certificate,
)

VALID_FLORESTAD_EXTRA_ARGS = [
"-c",
Expand All @@ -39,6 +43,9 @@
"--filters-start-height",
"--assume-utreexo",
"--pid-file",
"--ssl-electrum-address",
"--ssl-cert-path",
"--ssl-key-path",
]


Expand Down Expand Up @@ -89,6 +96,10 @@ def __init__(self):
self._tests = []
self._nodes = []

def log(self, msg: str) -> str:
"""Log a message with the class caller"""
print(f"[{self.__class__.__name__} INFO] {msg}")

def main(self):
"""
Main function.
Expand Down Expand Up @@ -151,9 +162,42 @@ def get_target_release_dir():
dirname = os.path.dirname(__file__)
return os.path.normpath(os.path.join(dirname, "..", "..", "target", "release"))

def create_ssl_keys(self) -> tuple[str, str]:
"""
Create a PKCS#8 formatted private key and a self-signed certificate.
These keys are intended to be used with florestad's --ssl-key-path and --ssl-cert-path
options.
"""
# Check if we're in CI or not
if "/tmp/floresta-integration-tests" in os.getenv("PATH"):
ssl_path = os.path.normpath(
os.path.abspath(
os.path.join(self.get_integration_test_dir(), "..", "..", "ssl")
)
)
else:
home = os.path.expanduser("~") # Fixed: '~user' -> '~' for current user
ssl_path = os.path.normpath(
os.path.abspath(os.path.join(home, ".floresta", "ssl"))
)

# Create the folder if not exists
os.makedirs(ssl_path, exist_ok=True)

# Create certificates
pk_path, private_key = create_pkcs8_private_key(ssl_path)
self.log(f"Created PKCS#8 key at {pk_path}")

cert_path = create_pkcs8_self_signed_certificate(
ssl_path, private_key, common_name="florestad", validity_days=365
)
self.log(f"Created self-signed certificate at {cert_path}")

return (pk_path, cert_path)

# Framework
def add_node_settings(
self, chain: str, extra_args: list[str], rpcserver: dict
self, chain: str, extra_args: list[str], rpcserver: dict, ssl: bool = False
) -> int:
"""
Add a node settings to be run. Use this on set_test_params method many times you want.
Expand Down Expand Up @@ -184,12 +228,7 @@ def add_node_settings(
print(f"Using {florestad}")
setting = {
"chain": chain,
"config": [
florestad,
"--network",
chain,
"--no-ssl",
],
"config": [florestad, "--network", chain],
"rpcserver": rpcserver,
}

Expand All @@ -206,6 +245,20 @@ def add_node_settings(
else:
raise ValueError(f"Invalid extra_arg '{extra}'")

# If ssl isnt enabled, add --no-ssl
# if ssl is enabled, user can add:
# --ssl-cert-path
# --ssl-key-path
# Either way, we need to create PKCS#8 key and certificate
if not ssl:
setting["config"].append("--no-ssl")
else:
(key, cert) = self.create_ssl_keys()
setting["config"].append("--ssl-key-path")
setting["config"].append(key)
setting["config"].append("--ssl-cert-path")
setting["config"].append(cert)

self._tests.append(setting)
return len(self._tests) - 1

Expand All @@ -227,6 +280,8 @@ def run_node(self, index: int):
if node["chain"] == "regtest":
# pylint: disable=consider-using-with
# add text=True to treat all outputs as texts (jsons or python stack traces)
cmd = " ".join(node["config"])
self.log(f"Running '{cmd}'")
process_node = subprocess.Popen(node["config"], text=True)
json_rpc = FlorestaRPC(process=process_node, rpcserver=node["rpcserver"])
self._nodes.append(json_rpc)
Expand Down
Loading