分享:Resilience4j- 非 Spring 环境集成:纯 Java 项目中的手动配置实现(整理分享)

这两天一直在研究这个话题,踩了几个坑,把遇到的东西整理成文,供有需要的朋友参考。

👋 大家好,欢迎来到我的技术博客! 📚 在这里,我会分享学笔记、实战经验与技术思考,力求用简单的方式讲清楚复杂的问题。 🎯 这篇文章将围绕Resilience4j这个话题展开,希望能为各位带来一些启发或实用的参考。 🌱 无论各位是刚入门的新手,还是正在进阶的开发者,希望各位都能有所收获!

文章目录

4. Retry(重试机制)实现 🔄 5. RateLimiter(限流器)控制请求速率 ⏱️ 6. Bulkhead(舱壁隔离)限制并发数 🚢 7. TimeLimiter(超时控制)防止阻塞 ⏳ 8. 组合多个弹性模式 🧩 9. 事件监听与监控 📊 10. 配置管理:从硬编码到外部化 📁 11. 异常处理与降级策略 🛟 12. 测试弹性逻辑 🧪 13. 性能考量与最佳实践 ⚡ 14. 常见陷阱与解决方案 🕳️ 15. 结语:构建真正弹性的系统 🌟

Resilience4j- 非 Spring 环境集成:纯 Java 项目中的手动配置实现 💪

在现代分布式系统中,服务之间的依赖关系错综复杂,网络延迟、服务宕机、资源耗尽等问题随时可能发生。为了提升系统的健壮性和容错能力,弹性(Resilience) 成为微服务架构中的关键设计原则之一。而 Resilience4j 正是为此而生——一个轻量级、函数式、受 Netflix Hystrix 启发但更现代化的容错库。

虽然 Resilience4j 提供了对 Spring Boot 的无缝集成(通过 resilience4j-spring-boot2),但在许多实际场景中,我们可能正在维护或开发一个不依赖 Spring 的纯 Java 项目,例如:

  • 基于 Jakarta EE / JAX-RS 的 REST 服务
  • 使用 Vert.x、Quarkus 或 Micronaut 的轻量级应用
  • 传统的 Java SE 应用(如批处理、定时任务)
  • 需要最小化依赖的嵌入式系统
在这些场景下,如何手动配置和使用 Resilience4j?这篇文章将深入探讨如何在非 Spring 环境中,通过纯 Java 代码实现 Resilience4j 的核心功能,包括 Circuit Breaker(熔断器)Retry(重试)RateLimiter(限流)Bulkhead(舱壁隔离)TimeLimiter(超时控制),并展示它们的组合使用方式。
📌 提示:Resilience4j 是基于 Vavr(原名 Javaslang)函数式库构建的,于是大量使用了 io.vavr.control.Try、Either 等类型。如果你还不熟悉 Vavr,建议先阅读其 官方文档。

1. 为什么选择 Resilience4j?🚀

在 Resilience4j 冒出来之前,Netflix Hystrix 是 Java 生态中最流行的容错库。然而,Hystrix 已于 2018 年进入维护模式,不再积极开发。相比之下,Resilience4j 具有以下优势:

  • 轻量级:无外部依赖(除了 Vavr),jar 包体积小。
  • 函数式设计:基于装饰器模式,与 Java 8+ 的函数式编程风格天然契合。
  • 模块化:每个功能(熔断、重试等)都是独立模块,按需引入。
  • 高性能:基于内存状态机,无额外线程开销。
  • 丰富的指标支持:可与 Micrometer、Prometheus 等监控系统集成。
更重要的是,Resilience4j 不强制绑定任何框架,这使得它在非 Spring 环境中依然能大放异彩。

2. 项目初始化:Maven 依赖配置 📦

首先,我们需要在 pom.xml 中添加必要的依赖。由于我们不使用 Spring,只需引入核心模块:


        io.github.resilience4j
        resilience4j-circuitbreaker
        2.2.0

        io.github.resilience4j
        resilience4j-retry
        2.2.0

        io.github.resilience4j
        resilience4j-ratelimiter
        2.2.0

        io.github.resilience4j
        resilience4j-bulkhead
        2.2.0

        io.github.resilience4j
        resilience4j-timelimiter
        2.2.0

        io.vavr
        vavr
        0.10.4

        org.slf4j
        slf4j-simple
        2.0.9

✅ 注意:所有模块版本应保持一致,避免兼容性问题。

3. Circuit Breaker(熔断器)详解 🔌

熔断器是 Resilience4j 最核心的功能。它通过监控调用失败率,在达到阈值时自动“熔断”后续请求,避免雪崩效应。

3.1 手动创建 CircuitBreaker

在非 Spring 环境中,我们通过 CircuitBreakerRegistry 创建和管理熔断器实例:

import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;

import java.time.Duration;

public class CircuitBreakerExample {
    public static void main(String[] args) {
        // 1. 定义配置
        CircuitBreakerConfig config = CircuitBreakerConfig.custom()
            .failureRateThreshold(50) // 失败率阈值 50%
            .waitDurationInOpenState(Duration.ofSeconds(10)) // 熔断后等待 10 秒再半开
            .permittedNumberOfCallsInHalfOpenState(3) // 半开状态下允许 3 次尝试
            .slidingWindowSize(10) // 滑动窗口大小(最近 10 次调用)
            .build();

        // 2. 创建注册表(可复用)
        CircuitBreakerRegistry registry = CircuitBreakerRegistry.of(config);

        // 3. 获取熔断器实例(按名称)
        CircuitBreaker circuitBreaker = registry.circuitBreaker("externalService");

        // 4. 装饰你的业务逻辑
        var decoratedSupplier = CircuitBreaker
            .decorateSupplier(circuitBreaker, () -> callExternalService());

        // 5. 执行(可能抛出 CallNotPermittedException)
        try {
            String result = decoratedSupplier.get();
            System.out.println("Result: " + result);
        } catch (Exception e) {
            System.err.println("Call failed or circuit is open: " + e.getMessage());
        }
    }

    private static String callExternalService() {
        // 模拟 60% 失败率
        if (Math.random() > 0.4) {
            throw new RuntimeException("External service error");
        }
        return "Success";
    }
}

3.2 熔断器状态机 🔄

熔断器内部维护一个状态机,包含三种状态:

  • CLOSED:正常调用,记录成功/失败次数。
  • OPEN:熔断开启,直接拒绝所有请求。
  • HALF_OPEN:尝试放行少量请求,若成功则恢复 CLOSED,否则重回 OPEN。
failureRate >= threshold

after waitDuration

permitted calls succeed

any call fails

CLOSED

OPEN

HALF_OPEN

💡 最佳实践:为每个外部依赖(如不同微服务)创建独立的熔断器实例,避免相互影响。

4. Retry(重试机制)实现 🔄

当调用失败时,自动重试是提高成功率的有效手段。Resilience4j 的重试机制支持指数退避固定间隔等多种策略。

4.1 基础重试配置

import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import io.github.resilience4j.retry.RetryRegistry;

import java.time.Duration;
import java.util.function.Supplier;

public class RetryExample {
    public static void main(String[] args) {
        // 1. 配置重试策略
        RetryConfig config = RetryConfig.custom()
            .maxAttempts(3) // 最多重试 3 次
            .waitDuration(Duration.ofMillis(500)) // 每次间隔 500ms
            .retryOnException(e -> e instanceof RuntimeException) // 仅对 RuntimeException 重试
            .build();

        // 2. 创建重试实例
        Retry retry = RetryRegistry.of(config).retry("databaseQuery");

        // 3. 装饰业务逻辑
        Supplier decorated = Retry.decorateSupplier(retry, () -> queryDatabase());

        // 4. 执行
        try {
            String result = decorated.get();
            System.out.println("Query result: " + result);
        } catch (Exception e) {
            System.err.println("All retries failed: " + e.getMessage());
        }
    }

    private static String queryDatabase() {
        if (Math.random() > 0.7) {
            return "Data from DB";
        }
        throw new RuntimeException("DB connection timeout");
    }
}

4.2 高级重试:指数退避

对于网络抖动等场景,指数退避(Exponential Backoff)更有效:

RetryConfig config = RetryConfig.custom()
    .maxAttempts(4)
    .intervalFunction(IntervalFunction.ofExponentialBackoff(
        Duration.ofMillis(100), // 初始间隔 100ms
        2.0 // 每次乘以 2
    ))
    .retryOnException(e -> e instanceof IOException)
    .build();

这样,重试间隔将依次为:100ms → 200ms → 400ms → 800ms。


5. RateLimiter(限流器)控制请求速率 ⏱️

限流器用于防止系统被突发流量压垮,确保请求以可控速率处理。

5.1 固定窗口限流

import io.github.resilience4j.ratelimiter.RateLimiter;
import io.github.resilience4j.ratelimiter.RateLimiterConfig;
import io.github.resilience4j.ratelimiter.RateLimiterRegistry;

import java.time.Duration;

public class RateLimiterExample {
    public static void main(String[] args) {
        // 1. 配置:每秒最多 2 个请求
        RateLimiterConfig config = RateLimiterConfig.custom()
            .limitForPeriod(2)
            .limitRefreshPeriod(Duration.ofSeconds(1))
            .timeoutDuration(Duration.ofMillis(500)) // 获取许可超时
            .build();

        // 2. 创建限流器
        RateLimiter rateLimiter = RateLimiterRegistry.of(config).rateLimiter("api");

        // 3. 装饰业务逻辑
        var decorated = RateLimiter.decorateRunnable(rateLimiter, () -> {
            System.out.println("Processing request at " + System.currentTimeMillis());
        });

        // 4. 模拟高频调用
        for (int i = 0; i < 5; i++) {
            try {
                decorated.run();
                Thread.sleep(100);
            } catch (Exception e) {
                System.err.println("Rate limit exceeded: " + e.getMessage());
            }
        }
    }
}

输出示例:

Processing request at 1710000000000
Processing request at 1710000000100
Rate limit exceeded: ...
Rate limit exceeded: ...
Rate limit exceeded: ...

⚠️ 注意:timeoutDuration 是获取许可的最大等待时间,超时会抛出 RequestNotPermitted 异常。

6. Bulkhead(舱壁隔离)限制并发数 🚢

舱壁模式源自船舶设计,将船体分隔成多个水密舱,即使一个舱进水,整艘船也不会沉没。在软件中,它用于限制并发执行的线程数,防止单个服务耗尽所有资源。

Resilience4j 提供两种实现:

  • SemaphoreBulkhead:基于信号量,不创建新线程(推荐)。
  • ThreadPoolBulkhead:基于线程池,适用于阻塞 I/O。

6.1 SemaphoreBulkhead 示例

import io.github.resilience4j.bulkhead.Bulkhead;
import io.github.resilience4j.bulkhead.BulkheadConfig;
import io.github.resilience4j.bulkhead.BulkheadRegistry;

public class BulkheadExample {
    public static void main(String[] args) {
        // 1. 配置:最多 3 个并发
        BulkheadConfig config = BulkheadConfig.custom()
            .maxConcurrentCalls(3)
            .maxWaitDuration(Duration.ofMillis(100)) // 等待许可超时
            .build();

        // 2. 创建舱壁
        Bulkhead bulkhead = BulkheadRegistry.of(config).bulkhead("imageProcessing");

        // 3. 装饰业务逻辑
        var decorated = Bulkhead.decorateRunnable(bulkhead, () -> {
            System.out.println("Start processing by " + Thread.currentThread().getName());
            try { Thread.sleep(2000); } catch (InterruptedException e) {}
            System.out.println("End processing");
        });

        // 4. 启动 5 个线程模拟并发
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                try {
                    decorated.run();
                } catch (Exception e) {
                    System.err.println("Bulkhead rejected: " + e.getMessage());
                }
            }).start();
        }

        // 等待完成
        try { Thread.sleep(5000); } catch (InterruptedException e) {}
    }
}

输出中,前 3 个任务立即着手,第 4、5 个因超时被拒绝。


7. TimeLimiter(超时控制)防止阻塞 ⏳

虽然 Java 的 FutureCompletableFuture 支持超时,但 Resilience4j 的 TimeLimiter 提供了更统一的装饰器接口。

🔔 重要:TimeLimiter 仅适用于 CompletableFuture,不能用于同步方法。

7.1 异步超时控制

import io.github.resilience4j.timelimiter.TimeLimiter;
import io.github.resilience4j.timelimiter.TimeLimiterConfig;
import io.github.resilience4j.timelimiter.TimeLimiterRegistry;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TimeLimiterExample {
    public static void main(String[] args) {
        // 1. 配置:超时 1 秒
        TimeLimiterConfig config = TimeLimiterConfig.custom()
            .timeoutDuration(Duration.ofSeconds(1))
            .cancelRunningFuture(true) // 超时时取消任务
            .build();

        TimeLimiter timeLimiter = TimeLimiterRegistry.of(config).timeLimiter("slowService");

        // 2. 创建线程池
        ExecutorService executor = Executors.newCachedThreadPool();

        // 3. 装饰异步任务
        Supplier decorated =
            TimeLimiter.decorateFutureSupplier(timeLimiter,
                () -> CompletableFuture.supplyAsync(() -> slowOperation(), executor));

        // 4. 执行并处理结果
        try {
            String result = decorated.get().get(); // 可能抛出 TimeoutException
            System.out.println("Result: " + result);
        } catch (Exception e) {
            System.err.println("Operation timed out or failed: " + e.getCause());
        } finally {
            executor.shutdown();
        }
    }

    private static String slowOperation() {
        try { Thread.sleep(1500); } catch (InterruptedException e) {}
        return "Done";
    }
}


8. 组合多个弹性模式 🧩

Resilience4j 的强大之处在于能够任意组合多个模式。例如,一个典型的外部调用可能需要:

1. 限流(RateLimiter)防止自身被压垮
2. 舱壁(Bulkhead)隔离资源
3. 熔断(CircuitBreaker)应对持续失败
4. 重试(Retry)处理瞬时错误
5. 超时(TimeLimiter)避免永久阻塞

8.1 装饰器链式调用

// 假设已创建所有组件实例
Supplier supplier = () -> callExternalService();

// 从内到外装饰(顺序很重要!)
Supplier decorated = Decorators.ofSupplier(supplier)
    .withTimeLimiter(timeLimiter)
    .withBulkhead(bulkhead)
    .withCircuitBreaker(circuitBreaker)
    .withRetry(retry)
    .withRateLimiter(rateLimiter)
    .decorate();

// 执行
Try result = Try.of(decorated::get);
result.onSuccess(System.out::println)
      .onFailure(e -> System.err.println("Final failure: " + e));

8.2 推荐的装饰顺序

原始调用

RateLimiter

Bulkhead

CircuitBreaker

Retry

TimeLimiter

📖 解释:
RateLimiter 最外层:最先拒绝超额请求,节省资源。Bulkhead 第二层:限制并发,防止资源耗尽。CircuitBreaker 第三层:在允许的请求中检测失败模式。Retry 第四层:对熔断器放行的失败请求进行重试。TimeLimiter 最内层:确保单次调用不会无限等待。

9. 事件监听与监控 📊

Resilience4j 允许你监听各种事件(如熔断器状态变更、重试发生等),用于日志记录或指标收集。

9.1 熔断器事件监听

circuitBreaker.getEventPublisher()
    .onStateTransition(event ->
        System.out.println("State changed: " + event.getStateTransition()))
    .onError(event ->
        System.out.println("Call failed: " + event.getThrowable().getMessage()));

9.2 与 Micrometer 集成(非 Spring)

即使没有 Spring,你也能够手动将指标暴露给 Prometheus:

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.prometheus.PrometheusConfig;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import io.github.resilience4j.micrometer.tagged.TaggedCircuitBreakerMetrics;

public class MetricsExample {
    public static void main(String[] args) {
        // 1. 创建 Prometheus 注册表
        MeterRegistry meterRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);

        // 2. 创建熔断器
        CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("service");

        // 3. 绑定指标
        TaggedCircuitBreakerMetrics.ofCircuitBreakerRegistry(
            CircuitBreakerRegistry.of(CircuitBreakerConfig.ofDefaults())
        ).bindTo(meterRegistry);

        // 4. 暴露 /metrics 端点(需自行实现 HTTP 服务)
        // 例如使用 SparkJava:
        // get("/metrics", (req, res) -> meterRegistry.scrape());
    }
}

🔗 你能够参考 Micrometer 官方文档 了解如何集成其他监控系统。

10. 配置管理:从硬编码到外部化 📁

在生产环境中,弹性策略的参数(如超时时间、重试次数)应外部化配置,便于调整。

10.1 使用 Typesafe Config(HOCON)

首先添加依赖:


    com.typesafe
    config
    1.4.3

创建 application.conf

resilience {
  circuit-breaker {
    external-service {
      failure-rate-threshold = 50
      wait-duration-in-open-state = 10s
      sliding-window-size = 10
    }
  }
  retry {
    database {
      max-attempts = 3
      wait-duration = 500ms
    }
  }
}

加载配置:

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

public class ConfigExample {
    public static void main(String[] args) {
        Config config = ConfigFactory.load();

        // 读取熔断器配置
        CircuitBreakerConfig cbConfig = CircuitBreakerConfig.custom()
            .failureRateThreshold(config.getDouble("resilience.circuit-breaker.external-service.failure-rate-threshold"))
            .waitDurationInOpenState(config.getDuration("resilience.circuit-breaker.external-service.wait-duration-in-open-state"))
            .slidingWindowSize(config.getInt("resilience.circuit-breaker.external-service.sliding-window-size"))
            .build();

        CircuitBreaker cb = CircuitBreaker.of("externalService", cbConfig);
    }
}

💡 提示:你也可以使用 JSON、YAML 或自定义属性文件,只要能解析为键值对即可。

11. 异常处理与降级策略 🛟

当所有弹性措施都失败时,提供降级(Fallback) 是保证用户体验的关键。

11.1 使用 Vavr 的 Try 进行优雅降级

import io.vavr.control.Try;

public class FallbackExample {
    public static void main(String[] args) {
        Supplier decorated = decorateWithResilience(); // 假设已装饰

        String result = Try.of(decorated::get)
            .recover(throwable -> {
                if (throwable instanceof CallNotPermittedException) {
                    return "Service unavailable, please try later";
                } else if (throwable instanceof TimeoutException) {
                    return "Request timed out, using cached data";
                }
                return "General fallback response";
            })
            .get();

        System.out.println("Final result: " + result);
    }
}

11.2 缓存降级数据

结合 Caffeine 缓存,可返回最近成功的结果:

LoadingCache cache = Caffeine.newBuilder()
    .maximumSize(100)
    .expireAfterWrite(5, TimeUnit.MINUTES)
    .build(key -> callExternalService()); // 实际调用

Supplier fallback = () -> {
    try {
        return cache.get("data");
    } catch (Exception e) {
        return "Cached data expired, service down";
    }
};


12. 测试弹性逻辑 🧪

编写单元测试验证熔断、重试等行为至关重要。

12.1 使用 Mockito 模拟失败

@Test
public void testCircuitBreakerOpensAfterFailures() {
    // 1. 创建熔断器(短等待时间便于测试)
    CircuitBreakerConfig config = CircuitBreakerConfig.custom()
        .failureRateThreshold(50)
        .waitDurationInOpenState(Duration.ofMillis(100))
        .slidingWindowSize(4)
        .build();
    CircuitBreaker cb = CircuitBreaker.of("test", config);

    // 2. 模拟总是失败的服务
    Supplier failingService = () -> { throw new RuntimeException("Oops"); };
    Supplier decorated = CircuitBreaker.decorateSupplier(cb, failingService);

    // 3. 触发 3 次失败(超过 50% 阈值)
    assertThrows(RuntimeException.class, decorated::get);
    assertThrows(RuntimeException.class, decorated::get);
    assertThrows(RuntimeException.class, decorated::get);

    // 4. 第 4 次应被熔断器拒绝(CallNotPermittedException)
    assertThrows(CallNotPermittedException.class, decorated::get);
}

12.2 验证重试次数

@Test
public void testRetryAttempts() {
    AtomicInteger counter = new AtomicInteger(0);
    Supplier service = () -> {
        counter.incrementAndGet();
        throw new IOException("Network error");
    };

    Retry retry = Retry.ofDefaults("test");
    Supplier decorated = Retry.decorateSupplier(retry, service);

    assertThrows(IOException.class, decorated::get);
    assertEquals(3, counter.get()); // 默认重试 3 次
}


13. 性能考量与最佳实践 ⚡

13.1 性能开销

Resilience4j 的设计目标是极低开销

  • 熔断器:基于原子整数和环形缓冲区,无锁。
  • 重试/限流:仅增加少量方法调用。
  • 舱壁:信号量操作挺轻量。
基准测试显示,装饰后的调用比原始调用慢约 100-200 纳秒,可忽略不计。

13.2 最佳实践清单

为每个外部依赖创建独立实例
避免不同服务的故障相互影响。

合理设置参数

  • 熔断器滑动窗口不宜过大(10-100 次调用)
  • 重试次数不宜过多(2-3 次足够)
  • 超时时间应小于上游调用超时
优先使用异步非阻塞 结合 CompletableFutureTimeLimiter 避免线程阻塞。

监控与告警
通过事件监听或 Micrometer 暴露指标,及时发现异常。

降级策略必不可少
永远不要让最终用户看到技术错误。


14. 常见陷阱与解决方案 🕳️

14.1 陷阱:熔断器未生效

原因:默认只对 Exception 及其子类触发熔断,Error(如 OutOfMemoryError)不会。

解决:显式配置记录哪些异常:

CircuitBreakerConfig.custom()
    .recordExceptions(Exception.class, Error.class)
    .ignoreExceptions(BusinessException.class)
    .build();

14.2 陷阱:重试导致副作用

问题:对非幂等操作(如创建订单)重试会导致重复提交。

解决:仅对幂等操作启用重试,或在业务层实现幂等性。

14.3 陷阱:限流器与线程池冲突

问题:在 Tomcat 等容器中,限流器可能无法限制总并发(因为请求已由线程池接收)。

解决:在入口处(如 Filter)应用限流器,或使用响应式框架(如 WebFlux)。


15. 结语:构建真正弹性的系统 🌟

在非 Spring 的纯 Java 项目中集成 Resilience4j,不仅可行,而且挺高效。通过手动配置各个弹性组件,并合理组合使用,我们可以构建出能够优雅应对故障、自动恢复、保护自身稳定性的分布式系统。

记住,弹性不是银弹,而是防御性编程思维的体现。Resilience4j 提供了强大的工具,但如何使用它们,仍需结合业务场景深思熟虑。

🌐 延伸阅读:
Resilience4j 官方文档Vavr 函数式编程指南Designing Distributed Systems by Brendan Burns

希望这篇文章能帮助你在纯 Java 项目中成功应用 Resilience4j,打造坚如磐石的服务!💪


🙌

暂时整理到这里。以上都是个人理解,可能有疏漏,欢迎指正。

评论 (0)

暂无评论