Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
PR comments
  • Loading branch information
jakubzadroga committed Dec 17, 2025
commit 9913dde56ef6fec3db651f6b4c706a551a2b1c9a
5 changes: 3 additions & 2 deletions node/src/main/kotlin/net/corda/node/internal/Node.kt
Original file line number Diff line number Diff line change
Expand Up @@ -353,16 +353,17 @@ open class Node(configuration: NodeConfiguration,
}

private fun startLocalRpcBroker(securityManager: RPCSecurityManager): BrokerAddresses? {
val rateLimitConfig = configuration.security?.authService?.options?.rateLimit
return with(configuration) {
rpcOptions.address.let {
val rpcBrokerDirectory: Path = baseDirectory / "brokers" / "rpc"
with(rpcOptions) {
rpcBroker = if (useSsl) {
ArtemisRpcBroker.withSsl(configuration.p2pSslOptions, this.address, adminAddress, sslConfig!!, securityManager, MAX_RPC_MESSAGE_SIZE,
journalBufferTimeout, jmxMonitoringHttpPort != null, rpcBrokerDirectory, shouldStartLocalShell())
journalBufferTimeout, jmxMonitoringHttpPort != null, rpcBrokerDirectory, shouldStartLocalShell(), rateLimitConfig)
} else {
ArtemisRpcBroker.withoutSsl(configuration.p2pSslOptions, this.address, adminAddress, securityManager, MAX_RPC_MESSAGE_SIZE,
journalBufferTimeout, jmxMonitoringHttpPort != null, rpcBrokerDirectory, shouldStartLocalShell())
journalBufferTimeout, jmxMonitoringHttpPort != null, rpcBrokerDirectory, shouldStartLocalShell(), rateLimitConfig)
}
}
rpcBroker!!.addresses
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,8 @@ import org.apache.shiro.realm.AuthorizingRealm
import org.apache.shiro.realm.jdbc.JdbcRealm
import org.apache.shiro.subject.PrincipalCollection
import org.apache.shiro.subject.SimplePrincipalCollection
import java.security.MessageDigest
import java.time.Duration
import java.time.Instant
import java.util.Base64
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.TimeUnit
import javax.security.auth.login.FailedLoginException
import kotlin.math.pow

private typealias AuthServiceConfig = SecurityConfiguration.AuthService

Expand All @@ -42,79 +36,26 @@ class RPCSecurityManagerImpl(config: AuthServiceConfig, cacheFactory: NamedCache

override val id = config.id
private val manager: DefaultSecurityManager
private val rateLimitConfig = config.options?.rateLimit
private val baseDelaySeconds = rateLimitConfig?.backoffBaseSeconds ?: 2L
@Suppress("MagicNumber")
private val maxDelaySeconds = rateLimitConfig?.backoffMaxSeconds ?: 60L
@Suppress("MagicNumber")
private val attemptExpireMinutes = rateLimitConfig?.attemptExpireMinutes ?: 15L

init {
manager = buildImpl(config, cacheFactory)
}

private data class LoginAttempt(val count: Int, val nextAllowed: Instant)

@Suppress("MagicNumber")
private val failedLoginCache =
if (rateLimitConfig != null) {
Caffeine.newBuilder()
.expireAfterWrite(attemptExpireMinutes, TimeUnit.MINUTES)
.maximumSize(10_000)
.build<String, LoginAttempt>()
} else {
null
}

@Suppress("ComplexMethod")
@Throws(FailedLoginException::class)
override fun authenticate(principal: String, password: Password): AuthorizingSubject {
val now = Instant.now()
val key = hashUsername(principal)
val attempt = failedLoginCache?.getIfPresent(key)

@Suppress("MagicNumber")
fun recordFailedAttemptAndThrow(attempt: LoginAttempt?, cause: AuthenticationException? = null) {
if (rateLimitConfig == null) throw FailedLoginException("Authentication failed for user '$principal'\n${cause.toString()}")
val newCount = (attempt?.count ?: 0) + 1
val failuresBeforeBackoff = 3
val effectiveFailures = maxOf(0, newCount - failuresBeforeBackoff)
val delaySeconds =
if (effectiveFailures == 0) 0
else (baseDelaySeconds * 2.0.pow(effectiveFailures - 1)).toLong().coerceAtMost(maxDelaySeconds)
val nextAllowed = now.plusSeconds(delaySeconds)
failedLoginCache!!.put(key, LoginAttempt(newCount, nextAllowed))
val messagePrefix = cause?.toString()?.plus("\n") ?: ""
val tryAgainMessage = if (delaySeconds != 0L) " Try again in $delaySeconds seconds." else ""
val message = messagePrefix + "Failed login for user '$principal'.$tryAgainMessage"
throw FailedLoginException(message)
}

// Block immediately if inside backoff window
if (rateLimitConfig != null && attempt != null && now.isBefore(attempt.nextAllowed)) {
val remaining = Duration.between(now, attempt.nextAllowed).seconds
throw FailedLoginException("Login temporarily suspended for user '$principal'. Try again in $remaining seconds.")
}

password.use {
val authToken = UsernamePasswordToken(principal, it.value)
try {
manager.authenticate(authToken)
failedLoginCache?.invalidate(key)
} catch (authcException: AuthenticationException) {
recordFailedAttemptAndThrow(attempt, authcException)
throw FailedLoginException(authcException.toString())
}
return ShiroAuthorizingSubject(
subjectId = SimplePrincipalCollection(principal, id.value),
manager = manager)
}
}

private fun hashUsername(user: String): String =
Base64.getEncoder().encodeToString(
MessageDigest.getInstance("SHA-256").digest(user.toByteArray())
)

override fun buildSubject(principal: String): AuthorizingSubject =
ShiroAuthorizingSubject(
subjectId = SimplePrincipalCollection(principal, id.value),
Expand All @@ -136,7 +77,6 @@ class RPCSecurityManagerImpl(config: AuthServiceConfig, cacheFactory: NamedCache
logger.info("Constructing DB-backed security data source: ${config.dataSource.connection}")
NodeJdbcRealm(config.dataSource)
}

AuthDataSourceType.INMEMORY -> {
logger.info("Constructing realm from list of users in config ${config.dataSource.users!!}")
InMemoryRealm(config.dataSource.users, config.id.value, config.dataSource.passwordEncryption)
Expand Down Expand Up @@ -173,6 +113,7 @@ internal class RPCPermission : DomainPermission {
*/
constructor(methods: Set<String>, target: String? = null) : super(methods, target?.let { setOf(it.replace(".", ":")) })


/**
* Default constructor instantiate an "ALL" permission
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,88 +1,132 @@
package net.corda.node.services.messaging

import com.github.benmanes.caffeine.cache.Cache
import com.github.benmanes.caffeine.cache.Caffeine
import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager
import java.security.MessageDigest
import java.time.Duration
import java.time.Instant
import java.util.Base64
import java.util.concurrent.TimeUnit
import javax.security.auth.Subject
import javax.security.auth.login.FailedLoginException
import kotlin.math.pow

class RateLimitingActiveMQJAASSecurityManager(
configurationName: String,
configuration: SecurityConfiguration
configuration: SecurityConfiguration,
rateLimitConfig: net.corda.node.services.config.SecurityConfiguration.AuthService.Options.RateLimit?
) : ActiveMQJAASSecurityManager(configurationName, configuration) {

private val ipLimiter = IpRateLimiter()
private data class Attempt(val count: Int, val nextAllowed: Instant)

private val baseDelaySeconds = rateLimitConfig?.backoffBaseSeconds ?: 2L
@Suppress("MagicNumber")
private val maxDelaySeconds = rateLimitConfig?.backoffMaxSeconds ?: 60L
@Suppress("MagicNumber")
private val attemptExpireMinutes = rateLimitConfig?.attemptExpireMinutes ?: 15L

private val userFreeAttempts = 3
private val ipFreeAttempts = 10

private val userAttempts =
Caffeine.newBuilder()
.expireAfterWrite(attemptExpireMinutes, TimeUnit.MINUTES)
.maximumSize(10_000)
.build<String, Attempt>()

private val ipAttempts =
Caffeine.newBuilder()
.expireAfterWrite(15, TimeUnit.MINUTES)
.maximumSize(10_000)
.build<String, Attempt>()

override fun authenticate(user: String?, password: String?, remotingConnection: RemotingConnection?, securityDomain: String?): Subject? {

val now = Instant.now()
val ip = extractIp(remotingConnection)
val userKey = user?.let { hash("$it|$ip") }
val ipKey = hash(ip)

// 1. Block if IP suspended
ipLimiter.checkAllowed(ip)
// 1. If user+IP suspended -> immediately reject
if (userKey != null) {
userAttempts.getIfPresent(userKey)?.let { userAttempt ->
if (now.isBefore(userAttempt.nextAllowed)) {
val remaining = Duration.between(now, userAttempt.nextAllowed).seconds
throw FailedLoginException("Login temporarily suspended for user '$user'. Try again in $remaining seconds.")
}
}
}

// 2. Attempt authentication
return try {
val subject = super.authenticate(user, password, remotingConnection, securityDomain)
// 2. Success - clear IP state
ipLimiter.recordSuccess(ip)
subject
} catch (e: FailedLoginException) {
// 3. Failure - record IP failure
ipLimiter.recordFailure(ip)
throw e
}
}

/**
* Extracts the remote IP address without the port.
*/
private fun extractIp(remotingConnection: RemotingConnection?): String {
val raw = remotingConnection?.remoteAddress ?: "unknown"
return raw.substringAfter('/').substringBefore(':')
}
}
// success - clear user cache only
if (userKey != null) {
userAttempts.invalidate(userKey)
}
subject
} catch (fle: FailedLoginException) {

internal class IpRateLimiter(
private val maxFailuresBeforeBackoff: Int = 100,
private val baseDelaySeconds: Long = 2,
private val maxDelaySeconds: Long = 60
) {
// 3. Record IP failure
recordFailure(ipAttempts, ipKey, ipFreeAttempts, now)

private data class Attempt(val count: Int, val nextAllowed: Instant)
// 4, If IP suspended -> reject
ipAttempts.getIfPresent(ipKey)?.let { ipAttempt ->
if (now.isBefore(ipAttempt.nextAllowed)) {
val remaining = Duration.between(now, ipAttempt.nextAllowed).seconds
throw FailedLoginException("Login temporarily suspended from IP $ip. Try again in $remaining seconds.")
}
}

@Suppress("MagicNumber")
private val attempts =
Caffeine.newBuilder()
.expireAfterWrite(15, TimeUnit.MINUTES)
.maximumSize(10_000)
.build<String, Attempt>()
// 5. Record user+IP failure
if (userKey != null) {
recordFailure(userAttempts, userKey, userFreeAttempts, now)
}

fun checkAllowed(ip: String, now: Instant = Instant.now()) {
val attempt = attempts.getIfPresent(ip) ?: return
if (now.isBefore(attempt.nextAllowed)) {
val remaining = attempt.nextAllowed.epochSecond - now.epochSecond
throw FailedLoginException(
"Login temporarily suspended from IP $ip. Try again in $remaining seconds."
)
// 6. If user+IP suspected -> reject
if (userKey != null) {
userAttempts.getIfPresent(userKey)?.let { userAttempt ->
if (now.isBefore(userAttempt.nextAllowed)) {
val remaining = Duration.between(now, userAttempt.nextAllowed).seconds
throw FailedLoginException("Login temporarily suspended for user '$user'. Try again in $remaining seconds.")
}
}
}
// 7. Plain authentication failure
throw fle
}
}

fun recordFailure(ip: String, now: Instant = Instant.now()) {
val prev = attempts.getIfPresent(ip)
private fun recordFailure(
cache: Cache<String, Attempt>,
key: String,
freeAttempts: Int,
now: Instant
) {
val prev = cache.getIfPresent(key)
val newCount = (prev?.count ?: 0) + 1

val delay =
if (newCount <= maxFailuresBeforeBackoff) 0
if (newCount <= freeAttempts) 0
else {
val exp = newCount - maxFailuresBeforeBackoff - 1
val exp = newCount - freeAttempts - 1
(baseDelaySeconds * 2.0.pow(exp)).toLong().coerceAtMost(maxDelaySeconds)
}
cache.put(key, Attempt(newCount, now.plusSeconds(delay)))
}

attempts.put(ip, Attempt(newCount, now.plusSeconds(delay)))
private fun extractIp(remotingConnection: RemotingConnection?): String {
val raw = remotingConnection?.remoteAddress ?: "unknown"
return raw.substringAfter('/').substringBefore(':')
}

fun recordSuccess(ip: String) {
attempts.invalidate(ip)
private fun hash(value: String): String {
return Base64.getEncoder().encodeToString(
MessageDigest.getInstance("SHA-256").digest(value.toByteArray())
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,23 @@ class ArtemisRpcBroker internal constructor(
private val jmxEnabled: Boolean = false,
private val baseDirectory: Path,
private val nodeConfiguration: MutualSslConfiguration,
private val shouldStartLocalShell: Boolean) : ArtemisBroker {
private val shouldStartLocalShell: Boolean,
private val rateLimitConfig: net.corda.node.services.config.SecurityConfiguration.AuthService.Options.RateLimit?) : ArtemisBroker {

companion object {
private val logger = loggerFor<ArtemisRpcBroker>()

fun withSsl(configuration: MutualSslConfiguration, address: NetworkHostAndPort, adminAddress: NetworkHostAndPort,
sslOptions: BrokerRpcSslOptions, securityManager: RPCSecurityManager, maxMessageSize: Int,
journalBufferTimeout: Int?, jmxEnabled: Boolean, baseDirectory: Path, shouldStartLocalShell: Boolean): ArtemisBroker {
journalBufferTimeout: Int?, jmxEnabled: Boolean, baseDirectory: Path, shouldStartLocalShell: Boolean, rateLimitConfig: net.corda.node.services.config.SecurityConfiguration.AuthService.Options.RateLimit?): ArtemisBroker {
return ArtemisRpcBroker(address, adminAddress, sslOptions, true, securityManager, maxMessageSize, journalBufferTimeout,
jmxEnabled, baseDirectory, configuration, shouldStartLocalShell)
jmxEnabled, baseDirectory, configuration, shouldStartLocalShell, rateLimitConfig)
}

fun withoutSsl(configuration: MutualSslConfiguration, address: NetworkHostAndPort, adminAddress: NetworkHostAndPort,
securityManager: RPCSecurityManager, maxMessageSize: Int, journalBufferTimeout: Int?, jmxEnabled: Boolean,
baseDirectory: Path, shouldStartLocalShell: Boolean): ArtemisBroker {
baseDirectory: Path, shouldStartLocalShell: Boolean, rateLimitConfig: net.corda.node.services.config.SecurityConfiguration.AuthService.Options.RateLimit?): ArtemisBroker {
return ArtemisRpcBroker(address, adminAddress, null, false, securityManager, maxMessageSize, journalBufferTimeout,
jmxEnabled, baseDirectory, configuration, shouldStartLocalShell)
jmxEnabled, baseDirectory, configuration, shouldStartLocalShell, rateLimitConfig)
}
}

Expand Down Expand Up @@ -109,7 +109,7 @@ class ArtemisRpcBroker internal constructor(
return arrayOf(AppConfigurationEntry(name, AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, options))
}
}
return RateLimitingActiveMQJAASSecurityManager(BrokerJaasLoginModule::class.java.name, securityConfig)
return RateLimitingActiveMQJAASSecurityManager(BrokerJaasLoginModule::class.java.name, securityConfig, rateLimitConfig)
}
}

Expand Down
Loading