Spring Reactor 入门

适合阅读的人群:本文适合对 Spring、Netty 等框架,以及 Java 8 的 Lambda、Stream 等特性有基本认识,希望了解 Spring 5 的反应式编程特性的技术人员阅读。

一、前言

最近几年,随着 Node.js、Golang 等新技术、新语言的出现,Java 的服务器端开发语言老大的地位受到了不小的挑战。虽然,Java 的市场份额依旧很大,短时间内也不会改变,但 Java 社区对于挑战也并没有无动于衷。相反,Java 社区积极应对这些挑战,不断提高自身应对高并发服务器端开发场景的能力。

为了应对高并发的服务器端开发,在2009年的时候,微软提出了一个更优雅地实现异步编程的方式 —— Reactive Programming,中文称反应式编程。随后,其它技术也迅速地跟上了脚步,像 ES6 通过 Promise 引入了类似的异步编程方式。Java 社区也没有落后很多,Netflix 和 TypeSafe 公司提供了 RxJava 和 Akka Stream 技术,让 Java 平台也有了能够实现反应式编程的框架。

其实,在更早之前,像 Mina 和 Netty 这样的 NIO 框架其实也能搞定高并发的服务器端开发任务,但这样的技术相对来说只是少数高级开发人员手中的工具。对于更多的普通开发者来说,难度显得大了些,所以不容易普及。

很多年过去了,到了2017年,虽然已经有不少公司在实践反应式编程。但整体来说,应用范围依旧不大。原因在于缺少简单易用的技术将反应式编程推广普及,并同诸如 MVC 框架、HTTP 客户端、数据库技术等整合。

终于,在2017年9月28日,解决上面问题的利器浮出水面 —— Spring 5 正式发布。Spring 5 其最大的意义就是能将反应式编程技术的普及向前推进一大步。而作为在背后支持 Spring 5 反应式编程的框架 Reactor,也相应的发布了 3.1.0 版本。

本文接下来将会向大家介绍 Reactive Programming(反应式编程)、Reactor 的入门以及实践技巧等相关的内容。文章中的实践内容来自作者使用 Spring 5 和 Reactor 等技术改造实际项目的经历。

二、Reactor 简介

先介绍一下 Reactor 技术。Reactor 框架是 Pivotal 公司(开发 Spring 等技术的公司)开发的,实现了 Reactive Programming 思想,符合 Reactive Streams 规范(Reactive Streams 是由 Netflix、TypeSafe、Pivotal 等公司发起的)的一项技术。其名字有反应堆之意,反映了其背后的强大的性能。

Reactive Programming

Reactive Programming,中文称反应式编程,是一种高性能应用的编程方式。其最早是由微软提出并引入到 .NET 平台中,随后 ES6 也引入了类似的技术。在 Java 平台上,较早采用反应式编程技术的是 Netflix 公司开源的 RxJava 框架。现在大家比较熟知的 Hystrix 就是以 RxJava 为基础开发的。

反应式编程其实并不神秘,通过与我们熟悉的迭代器模式对比便可了解其基本思想:

event Iterable (pull) Observable (push)
retrieve data T next() onNext(T)
discover error throws Exception onError(Exception)
complete !hasNext() onCompleted()

上面表格的中的 Observable 那一列便代表反应式编程的 API 使用方式。可见,它就是常见的观察者模式的一种延伸。如果将迭代器看作是拉模式,那观测者模式便是推模式。被订阅者(Publisher)主动的推送数据给订阅者(Subscriber),触发 onNext 方法。异常和完成时触发另外两个方法。如果 Publisher 发布消息太快了,超过了 Subscriber 的处理速度,那怎么办。这就是 Backpressure 的由来,Reactive Programming 框架需要提供机制,使得 Subscriber 能够控制消费消息的速度。

在 Java 平台上,Netflix(开发了 RxJava)、TypeSafe(开发了 Scala、Akka)、Pivatol(开发了 Spring、Reactor)共同制定了一个被称为 Reactive Streams 项目(规范),用于制定反应式编程相关的规范以及接口。其主要的接口有这三个:

  • Publisher
  • Subscriber
  • Subcription

其中,Subcriber 中便包含了上面表格提到的 onNextonErroronCompleted 这三个方法。

对于 Reactive Streams,大家只需要理解其思想就可以,包括基本思想以及 Backpressure 等思想即可。

Imperative vs Reactive

对于上面表格里提到的 Iterable 和 Observale 两种风格,还有另一个称呼,便是 Imperative(指令式编程)和 Reactive(反应式编程)这两种风格。其实就是拉模型和推模型的另一种表述,大家理解其中的思想即可。对于 Imperative,老外写的文章有时会用,直译就是指令式编程,其实就是我们大家平时用 Java、Python 等语言写代码的常见风格,代码执行顺序和编写顺序基本一致(这里不考虑 JVM 指令重排)

Reactor 的主要模块

Reactor 框架主要有两个主要的模块:reactor-core 和 reactor-ipc。前者主要负责 Reactive Programming 相关的核心 API 的实现,后者负责高性能网络通信的实现,目前是基于 Netty 实现的。

Reactor 的主要类

在 Reactor 中,经常使用的类并不是很多,主要有以下两个:

  • Mono 实现了 org.reactivestreams.Publisher 接口,代表0到1个元素的发布者。
  • Flux 同样实现了 org.reactivestreams.Publisher 接口,代表0到N个元素的发表者。

可能会使用到的类

  • Scheduler 表示背后驱动反应式流的调度器,通常由各种线程池实现。

Web Flux

001

Spring 5 引入的一个基于 Netty 而不是 Servlet 的高性能的 Web 框架,但是使用方式并没有同传统的基于 Servlet 的 Spring MVC 有什么大的不同。

▼ Web Flux 中 MVC 接口的示例

@RequestMapping("/demo")
@RestController
public class DemoController {
    @RequestMapping(value = "/foobar")
    public Mono<Foobar> foobar() {
        return Mono.just(new Foobar());
    }
}

最大的变化就是返回值从 Foobar 所表示的一个对象变为 Mono<Foobar> (或 Flux<T>

当然,实际的程序并不会像示例那样就一行代码。关于如何开发实际的应用,这些正是后面介绍 Reactor 的部分所要详细叙述的。

Reactive Streams、Reactor 和 Web Flux

上面介绍了反应式编程的一些概念,以及 Reactor 和 Web Flux。可能读者看到这里有些乱。这里介绍一下三者的关系。其实很简单:

Reactive Streams 是规范,Reactor 实现了 Reactive Streams。Web Flux 以 Reactor 为基础,实现 Web 领域的反应式编程框架。

其实,对于大部分业务开发人员来说,当编写反应式代码时,我们通常只会接触到 Publisher 这个接口,对应到 Reactor 便是 MonoFlux。对于 SubscriberSubcription 这两个接口,Reactor 必然也有相应的实现。但是,这些都是 Web Flux 和 Spring Data Reactive 这样的框架用到的。如果不开发中间件,通常开发人员是不会接触到的。

比如,在 Web Flux,你的方法只需返回 MonoFlux 即可。你的代码基本也只和 MonoFlux 打交道。而 Web Flux 则会实现 SubscriberonNext 时将业务开发人员编写的 MonoFlux 转换为 HTTP Response 返回给客户端。

三、Reactor 入门

接下来介绍一下 Reactor 中 Mono 和 Flux 这两个类中的主要方法的使用。

如同 Java 8 所引入的 Stream 一样,Reactor 的使用方式基本上也是分三步:开始阶段的创建、中间阶段的处理和最终阶段的消费。只不过创建和消费可能是通过像 Spring 5 这样框架完成的(比如通过 Web Flux 中的 WebClient 调用 HTTP 接口,返回值便是一个 Mono)。但我们还是需要基本了解这些阶段的开发方式。

1. 创建 Mono 和 Flux(开始阶段)

使用 Reactor 编程的开始必然是先创建出 Mono 或 Flux。有些时候不需要我们自己创建,而是实现例如 WebFlux 中的 WebClient 或 Spring Data Reactive 得到一个 Mono 或 Flux。

▼ 使用 WebFlux WebClient 调用 HTTP 接口

WebClient webClient = WebClient.create("http://localhost:8080");

public Mono<User> findById(Long userId) {
    return webClient
            .get()
            .uri("/users/" + userId)
            .accept(MediaType.APPLICATION_JSON)
            .exchange()
            .flatMap(cr -> cr.bodyToMono(User.class));
}

▼ 使用 ReactiveMongoRepository 查询 User

public interface UserRepository extends ReactiveMongoRepository<User, Long> {
    Mono<User> findByUsername(String username);
}

但有些时候,我们也需要主动地创建一个 Mono 或 Flux。

“普通”的创建方式

简单的创建方式是主要是使用像 just 这样的方法创建

Mono<String> helloWorld = Mono.just("Hello World");
Flux<String> fewWords = Flux.just("Hello", "World");
Flux<String> manyWords = Flux.fromIterable(words);

这样的创建方式在什么时候用呢?一般是用在当你在经过一系列非 IO 型的操作之后,得到了一个对象。接下来要基于这个对象运用 Reactor 进行高性能的 IO 操作时,可以用这种方式将你之前得到的对象转换为 Mono 或 Flux。

“文艺”的创建方式

上述是我们通过一个同步调用得到的结果创建出 MonoFlux,但有时我们需要从一个非 Reactive 的异步调用的结果创建出 Mono 或 Flux。那如何实现呢。

如果这个异步方法返回一个 CompletableFuture,那我们可以基于这个 CompletableFuture 创建一个 Mono:

Mono.fromFuture(aCompletableFuture);

如果这个异步调用不会返回 CompletableFuture,是有自己的回调方法,那怎么创建 Mono 呢?我们可以使用 static <T> Mono<T> create(Consumer<MonoSink<T>> callback) 方法:

Mono.create(sink -> {
    ListenableFuture<ResponseEntity<String>> entity = asyncRestTemplate.getForEntity(url, String.class);
    entity.addCallback(new ListenableFutureCallback<ResponseEntity<String>>() {
        @Override
        public void onFailure(Throwable ex) {
            sink.error(ex);
        }

        @Override
        public void onSuccess(ResponseEntity<String> result) {
            sink.success(result.getBody());
        }
    });
});

在使用 WebFlux 之后,AsyncRestTemplate 已经不推荐使用,这里只是做演示。

2. 处理 Mono 和 Flux(中间阶段)

中间阶段的 Mono 和 Flux 的方法主要有 filtermapflatMapthenzipreduce 等。这些方法使用方法和 Stream 中的方法类似。对于这些方法的介绍,将会放在下一节“Reactor 进阶”中,主要介绍这些方法不容易理解和使用容易出问题的点。

3. 消费 Mono 和 Flux(结束阶段)

直接消费的 Mono 或 Flux 的方式就是调用 subscribe 方法。如果在 Web Flux 接口中开发,直接返回 Mono 或 Flux 即可。Web Flux 框架会为我们完成最后的 Response 输出工作。

四、Reactor 进阶

接下来我将介绍一下我在使用 Reactor 开发实际项目时遇到的一些稍显复杂的问题,以及解决方法。

问题一:mapflatMapthen 分别在什么时候使用?

本段内容将涉及到如下类和方法:

  • 方法:Mono.map
  • 方法:Mono.flatMap
  • 方法:Mono.then
  • 类:Function

MonoFlux 中间环节处理的处理过程中,有三个有些类似的方法:mapflatMapthen。这三个方法可以说是 Reactor 中使用频率很高的方法。

▼ 传统的命令式编程

Object result1 = doStep1(params);
Object result2 = doStep2(result1);
Object result3 = doStep3(result2);

▼ 对应的反应式编程

Mono.just(params)
    .flatMap(v -> doStep1(v))
    .flatMap(v -> doStep2(v))
    .flatMap(v -> doStep3(v));

从上面两段代码的对比就很容易看出来 flatMap 方法在其中起到的作用,mapthen 方法也有类似的作用。但这些方法之间的区别是什么呢?我们先来看看这三个方法的签名(以 Mono 为例):

  • flatMap(Function<? super T, ? extends Mono<? extends R>> transformer)
  • map(Function<? super T, ? extends R> mapper)
  • then(Mono<V> other)

可见,最复杂的是 flatMap 方法,map 次之,then 最简单。从方法名字上看,flatMapmap 都是做映射之用。而 then 则是下一步的意思,最适合用于链式调用,但为什么上面的例子使用的是 flatMap 而不是 then

then 表面看上去是下一步的意思,但它只表示执行顺序的下一步,不表示下一步依赖于上一步。这个语义同 ES6 Promise 中的 then 方法是不同的。从 then 方法的参数只是一个 Mono,无从接受上一步的执行结果。flatMapmap 的参数都是一个 Function,入参是上一步的执行结果。

flatMapmap 的区别在于,flatMap 中的入参 Function 的返回值要求是一个 Mono(不明白的复习一下 Function 接口的定义),而 map 的入参 Function 只要求返回一个普通对象。因为我们在业务处理中常需要调用 WebClientReactiveXxxRepository 中的方法,这些方法的返回值都是 Mono(或 Flux)。所以要将这些调用串联为一个整体链式调用,就必须使用 flatMap,而不是 map

所以,我们要正确理解 flatMapmapthen 这三个方法的用法和背后的含义,这样才能正确实践反应式编程。

问题二:如何实现并发执行

本段内容将涉及到如下类和方法:

  • 方法:Mono.zip
  • 类:Tuple2
  • 类:BiFunction

并发执行是常见的一个需求。Reactive Programming 虽然是一种异步编程方式,但是异步不代表就是并发并行的。

在传统的命令式开发方式中,并发执行是通过线程池加 Future 的方式实现的。

Future<Result1> result1Future = doStep1(params);
Future<Result2> result2Future = doStep2(params);
Result1 result1 = result1Future.get();
Result2 result2 = result2Future.get();
// Do merge;
return mergeResult;

因为上面的代码虽然有一些异步效果在里面,但 Future.get() 方法是阻塞的。所以,当我们使用 Reactor 开发有并发执行场景的反应式代码时,肯定不能用上面的方式。这时,需要使用到 MonoFlux 中的 zip 方法。这里我们以 Mono 为例演示。代码如下:

Mono<CustomType1> item1Mono = ...;
Mono<CustomType2> item2Mono = ...;
Mono.zip(items -> {
    CustomType1 item1 = CustomType1.class.cast(items[0]);
    CustomType2 item2 = CustomType2.class.cast(items[1]);
    // Do merge
    return mergeResult;
}, item1Mono, item2Mono);

上述代码中,产生 item1Monoitem2Mono 的过程是并行的。比如,调用一个 HTTP 接口的同时,执行一个数据库查询操作。这样就可以加快程序的执行。

但上述代码存在一个问题,就是 zip 方法需要做强制类型转换。而强制类型转换是不安全的。所以我们需要更优雅的方式。

好在 zip 方法存在多种重载形式。除了最基本的形式以外,还有多种类型安全的形式:

static <T1, T2> Mono<Tuple2<T1, T2>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2);
static <T1, T2, O> Mono<O> zip(Mono<? extends T1> p1, Mono<? extends T2> p2, BiFunction<? super T1, ? super T2, ? extends O> combinator); 
static <T1, T2, T3> Mono<Tuple3<T1, T2, T3>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3);

对于不超过7个元素的合并操作,都有类型安全的 zip 方法可选。

以两个元素的合并为例,介绍一下使用方法:

Mono.zip(item1Mono, item2Mono).map(tuple -> {
    CustomType1 item1 = tuple.getT1();
    CustomType2 item2 = tuple.getT2();
    // Do merge
    return mergeResult;
});

上述代码中,map 方法的参数是一个 Tuple2,表示一个二元数组,相应的还有 Tuple3Tuple4 等。

另外,对于两个元素的并发执行,也可以通过 zip(Mono<? extends T1> p1, Mono<? extends T2> p2, BiFunction<? super T1, ? super T2, ? extends O> combinator) 方法直接将结果合并。方法是传递 BiFunction 实现合并算法。

问题三:集合循环之后的汇聚

本段内容将涉及到如下类和方法:

  • 方法:Flux.fromIterable
  • 方法:Flux.reduce
  • 类:BiFunction

另外一个稍微复杂的场景是对一个对象中的一个类型为集合类的(List、Set)进行处理之后,再对原本的对象进行处理。使用 Imperative 风格的代码很容易编写:

List<SubData> subDataList = data.getSubDataList();
for (SubData item : subDataList) {
    // Do something on data and item
}
// Do something on data

是不是简单到无以复加的地步了。但当我们要用 Reactive 风格的代码实现上述逻辑时,就不是那么简单了。

要在 Reactive 风格的代码中实现上述逻辑,我们主要是要用到 Fluxreduce 方法。我们先来看 reduce 方法的签名:

<A> Mono<A> reduce(A initial, BiFunction<A, ? super T, A> accumulator);

从方法签名我们可以看出 reduce 方法的功能就是讲一个 Flux 聚合成一个 Mono。参数中第一个参数是返回值 Mono 中元素的初始值。

第二个参数是一个 BiFunction,用来实现聚合操作的逻辑。泛型参数 <A, ? super T, A> 中,第一个 A 表示每次聚合操作(因为需要对集合中每个元素进行操作)之后的结果的类型,它作为 BiFunction.apply 方法的第一个入参 ;? super T 表示集合中的每个元素,它作为 BiFunction.apply 方法的第二个入参;最后一个 A 表示聚合操作的结果,它作为 BiFunction.apply 方法的返回值。

接下来看一下示例:

Data initData = ...;
List<SubData> aList = ...;
Flux.fromIterable(aList)
    .reduce(initData, (data, itemInList) -> {
        // Do something on data and itemInList
        return data;
    });

上面的示例代码中,initDatadata 的类型相同,我们,但是命名不能重复。执行完上述代码之后, reduce 方法会返回 Mono<Data>

Java 20中有哪些新功能

Java 20将于2023年3月发布,预计将引入一系列变化和新功能。我们准备了一个先睹为快,看看哪些JEP最有可能被JDK 20接受,哪些JEP我们很有希望下一个被接受!
Java 20是计划作为一个非LTS版本,而随后的版本21将被设置为一个具有长期支持(LTS)的版本。

Java开发工具包(JDK)的开发基于JDK增强建议(JEP)的概念。实际上,这些增强建议是JDK发布项目和所有相关开发活动的路线图。
在编写本报告时,JEP索引列出了437项改进建议,以及一些JEP草案和已提交的JEP。

JDK 20中需要哪些JEP?
1、记录模式
作为JDK 19中的第一个预览引入,并预期作为JDK 20中的第二个预览。为了支持数据导航和处理,记录模式简化了与记录组件的使用。在不更改类型模式的语法或语义的情况下,这将扩展更复杂数据查询的模式匹配。

2、switch语句的模式匹配
作为JDK 17中的第一个预览版引入,并预期作为JDK 20中的第四个预览版。

switch (s) {
  case null ->
    { break; }
  case Triangle t
  when t.calculateArea() > 100 ->
    System.out.println("Large triangle");
  default ->
    System.out.println("A shape, possibly a small triangle");
}

3、外部函数&内存(FFM)API
FFM API的根源在于广泛的JEP组合,这导致了它在JDK 19中的第一个预览版,并有望在JDK 20中成为第二个预览版。它允许调用Java程序的外部函数,以与外部代码和数据一起操作,例如在Java运行时之外,而没有JNI的缺点和风险。FFM API的主要目标是拥有一个上级的纯Java开发模型,并支持更广泛的外部内存模型。

4、虚拟线程
作为JDK 19中的第一个预览引入,并预期作为JDK 20中的第二个预览。虚拟线程可提高编写、维护和监视并发应用程序时的效率。它们是轻量级的,支持每个请求的线程可伸缩性,并且可以在对现有代码进行最小更改的情况下应用。传统的Java线程精确地映射到一个操作系统线程,而虚拟线程并不绑定到特定的操作系统线程。因此,创建它们的成本很低,并且可以根据需要创建任意多个。

JDK20发布后,哪些JEP令我们兴奋
1、基元类primitive

JEP 401引入了对一种新的特殊类型的值类的支持,以定义基元类型。由于更好的存储器访问以及因为在CPU内更有效地执行基元操作,基元的使用使得能够改进性能。通过此更新,开发人员可以获得基元的性能优势,同时保留类声明的抽象和其他优势。
为了能够将基元的优点与典型的面向对象结合起来,基元类需要遵守以下两条规则:

  • 原始类中的所有字段都被隐式定义为final,即它们只能在构造函数或初始化函数中设置。
  • 此外,基元类不能具有隐式依赖于声明类的字段。
primitive class Point implements Shape {
   ...
}

2、值对象
传统的Java对象提供标识,即它们的内存位置用于区分一个对象和另一个对象。提供标识在运行时代价很高,而且通常不会在实现中使用。通过这次提交,Java的对象模型扩展了值类和值对象。值类是无标识的,即它们可以提供基元类型的性能优势,同时利用面向对象的概念。当在值对象上使用==时,它们的字段值用于确定对象是否相等,而不是它们的内存位置。请注意,值类的所有字段都是隐式final的,需要在初始化器或构造函数中设置!
实值类使用value,例如:

value class Point implements Shape {
  ...
}

3、Universal generics通用泛型
此提交通过还允许原始类作为类型参数,消除了类型参数必须是引用类型的要求。许多现有的泛型实现将开箱即用地使用这个新增功能,即它们可以用原始类作为其类型参数来实例化。但是,需要特别注意,以防代码对 null 进行赋值,因为原始类将因空指针异常而失败。

4、字符串模板
为了简化常规字符串组合,字符串模板(又名字符串文字)包含在运行时解释的嵌入式表达式。通过向 Java 添加一种新的表达式(字符串模板表达式),JEP 430使得使用包含在运行时计算的值的字符串编写代码变得更加简单。这使得需要用户输入值的程序具有更好的可读性、定义格式语法的灵活性以及更高的安全性。

多线程代码的补充
如果您正在使用多线程代码,请确保检查这些添加项,它们提供了使多线程代码更易于阅读或提供性能提升的选项。
1、Scoped values
使用 extent-local 变量,可以更容易地在 Java 中的线程内和子线程之间共享不可变数据。此更新的目标是简化关于数据流的推理以提高可用性;提高健壮性,以便只有合法的被调用者才能检索调用者共享的数据;并通过启用运行时优化和将共享数据视为不可变来提高性能 虽然此更改不需要从线程局部变量迁移,但它们更适合用于大量虚拟线程。

2、结构化并发
通过引入用于结构化并发的 API, JEP 428可以将在不同线程中运行的不同任务视为一个工作单元。这应该大大简化多线程编程,并通过更容易确保代码的可靠性、可维护性和可观察性来鼓励开发人员应用并发编程。

更多令人兴奋的 JEP

  • Sequenced collections:将 SequencedCollection 接口添加到标准库,提供第一个定义集合中元素顺序的集合。
  • Vector API:与标量计算相比,为了提高性能,这个新的 API 支持在运行时编译矢量计算。
  • 异步堆栈跟踪 API:引入了一个用于异步收集堆栈跟踪的新 API。
  • 类文件 API:添加一个新的 API 来替换 ASM(或 cglib,或其他字节码库),它将提供一种读取、写入和转换 Java 类文件的方法。

 

Java 17 新特性

Java 17 在 2021 年 9 月 14 日正式发布,Java 17 是一个长期支持(LTS)版本,这次更新共带来 14 个新功能。

OpenJDK Java 17 下载:https://jdk.java.net/archive/

OpenJDK Java 17 文档:https://openjdk.java.net/projects/jdk/17/

1. JEP 306: 恢复始终严格的浮点语义

既然是恢复严格的浮点语义,那么说明在某个时间点之前,是始终严格的浮点语义的。其实在 Java SE 1.2 之前,所有的浮点计算都是严格的,但是以当初的情况来看,过于严格的浮点计算在当初流行的 x86 架构和 x87 浮点协议处理器上运行,需要大量的额外的指令开销,所以在 Java SE 1.2 开始,需要手动使用关键字 strictfp(strict float point) 才能启用严格的浮点计算。

但是在 2021 年的今天,硬件早已发生巨变,当初的问题已经不存在了,所以从 Java 17 开始,恢复了始终严格的浮点语义这一特性。

扩展strictfp 是 Java 中的一个关键字,大多数人可能没有注意过它,它可以用在类、接口或者方法上,被 strictfp 修饰的部分中的 float 和 double 表达式会进行严格浮点计算。

下面是一个示例,其中的 testStrictfp() 被 strictfp 修饰。

package com.wdbyte;

public class Main {
    public static void main(String[] args) {
        testStrictfp();
    }

    public strictfp static void testStrictfp() {
        float aFloat = 0.6666666666666666666f;
        double aDouble = 0.88888888888888888d;
        double sum = aFloat + aDouble;
        System.out.println("sum: " + sum);
    }
}

2. JEP 356:增强的伪随机数生成器

为伪随机数生成器 RPNG(pseudorandom number generator)增加了新的接口类型和实现,让在代码中使用各种 PRNG 算法变得容易许多。

这次增加了 RandomGenerator 接口,为所有的 PRNG 算法提供统一的 API,并且可以获取不同类型的 PRNG 对象流。同时也提供了一个新类 RandomGeneratorFactory 用于构造各种 RandomGenerator 实例,在 RandomGeneratorFactory 中使用 ServiceLoader.provider 来加载各种 PRNG 实现。

下面是一个使用示例:随便选择一个 PRNG 算法生成 5 个 10 以内的随机数。

package com.wdbyte.java17;

import java.util.Date;
import java.util.random.RandomGenerator;
import java.util.random.RandomGeneratorFactory;
import java.util.stream.Stream;

/**
 * @author niulang
 */
public class JEP356 {

    public static void main(String[] args) {
        RandomGeneratorFactory<RandomGenerator> l128X256MixRandom = RandomGeneratorFactory.of("L128X256MixRandom");
        // 使用时间戳作为随机数种子
        RandomGenerator randomGenerator = l128X256MixRandom.create(System.currentTimeMillis());
        for (int i = 0; i < 5; i++) {
            System.out.println(randomGenerator.nextInt(10));
        }
    }
}

得到输出:

7
3
4
4
6

你也可以遍历出所有的 PRNG 算法。

RandomGeneratorFactory.all().forEach(factory -> {
    System.out.println(factory.group() + ":" + factory.name());
});

得到输出:

LXM:L32X64MixRandom
LXM:L128X128MixRandom
LXM:L64X128MixRandom
Legacy:SecureRandom
LXM:L128X1024MixRandom
LXM:L64X128StarStarRandom
Xoshiro:Xoshiro256PlusPlus
LXM:L64X256MixRandom
Legacy:Random
Xoroshiro:Xoroshiro128PlusPlus
LXM:L128X256MixRandom
Legacy:SplittableRandom
LXM:L64X1024MixRandom

可以看到 Legacy:Random 也在其中,新的 API 兼容了老的 Random 方式,所以你也可以使用新的 API 调用 Random 类生成随机数。

// 使用 Random
RandomGeneratorFactory<RandomGenerator> l128X256MixRandom = RandomGeneratorFactory.of("Random");
// 使用时间戳作为随机数种子
RandomGenerator randomGenerator = l128X256MixRandom.create(System.currentTimeMillis());
for (int i = 0; i < 5; i++) {
    System.out.println(randomGenerator.nextInt(10));
}

扩展阅读:增强的伪随机数生成器

3. JEP 382:使用新的 macOS 渲染库

macOS 为了提高图形的渲染性能,在 2018 年 9 月抛弃了之前的 OpenGL 渲染库 ,而使用了 Apple Metal 进行代替。Java 17 这次更新开始支持 Apple Metal,不过对于 API 没有任何改变,这一些都是内部修改。

扩展阅读:macOS Mojave 10.14 Release NotesApple Metal

4. JEP 391:支持 macOS/AArch64 架构

起因是 Apple 在 2020 年 6 月的 WWDC 演讲中宣布,将开启一项长期的将 Macintosh 计算机系列从 x64 过度到 AArch64 的长期计划,因此需要尽快的让 JDK 支持 macOS/AArch64 。

Linux 上的 AArch64 支持以及在 Java 16 时已经支持,可以查看之前的文章了解。

扩展:Java 16 新功能介绍 – JEP 386

5. JEP 398:删除已弃用的 Applet API

Applet 是使用 Java 编写的可以嵌入到 HTML 中的小应用程序,嵌入方式是通过普通的 HTML 标记语法,由于早已过时,几乎没有场景在使用了。

示例:嵌入 Hello.class

<applet code="Hello.class" height=200 width=200></applet>

Applet API 在 Java 9 时已经标记了废弃,现在 Java 17 中将彻底删除。

6. JEP 403:更强的 JDK 内部封装

如 Java 16 的 JEP 396 中描述的一样,为了提高 JDK 的安全性,使 --illegal-access 选项的默认模式从允许更改为拒绝。通过此更改,JDK 的内部包和 API(关键内部 API 除外)将不再默认打开。

但是在 Java 17 中,除了 sun.misc.Unsafe ,使用 --illegal-access 命令也不能打开 JDK 内部的强封装模式了,除了 sun.misc.Unsafe API .

在 Java 17 中使用 --illegal-access 选项将会得到一个命令已经移除的警告。

➜  bin ./java -version
openjdk version "17" 2021-09-14
OpenJDK Runtime Environment (build 17+35-2724)
OpenJDK 64-Bit Server VM (build 17+35-2724, mixed mode, sharing)
➜  bin ./java --illegal-access=warn
OpenJDK 64-Bit Server VM warning: Ignoring option --illegal-access=warn; support was removed in 17.0

扩展阅读:JEP 403:更强的 JDK 内部封装Java 16 新功能介绍

7. JEP 406:switch 的类型匹配(预览)

如 instanceof 一样,为 switch 也增加了类型匹配自动转换功能。

在之前,使用 instanceof 需要如下操作:

if (obj instanceof String) {
    String s = (String) obj;    // grr...
    ...
}

多余的类型强制转换,而现在:

if (obj instanceof String s) {
    // Let pattern matching do the work!
    ...
}

switch 也可以使用类似的方式了。

static String formatterPatternSwitch(Object o) {
    return switch (o) {
        case Integer i -> String.format("int %d", i);
        case Long l    -> String.format("long %d", l);
        case Double d  -> String.format("double %f", d);
        case String s  -> String.format("String %s", s);
        default        -> o.toString();
    };
}

对于 null 值的判断也有了新的方式。

// Java 17 之前
static void testFooBar(String s) {
    if (s == null) {
        System.out.println("oops!");
        return;
    }
    switch (s) {
        case "Foo", "Bar" -> System.out.println("Great");
        default           -> System.out.println("Ok");
    }
}
// Java 17
static void testFooBar(String s) {
    switch (s) {
        case null         -> System.out.println("Oops");
        case "Foo", "Bar" -> System.out.println("Great");
        default           -> System.out.println("Ok");
    }
}

扩展阅读: JEP 406:switch 的类型匹配(预览)

8. JEP 407:移除 RMI Activation

移除了在 JEP 385 中被标记废除的 RMI(Remote Method Invocation)Activation,但是 RMI 其他部分不会受影响。

RMI Activation 在 Java 15 中的 JEP 385 已经被标记为过时废弃,至今没有收到不良反馈,因此决定在 Java 17 中正式移除。

扩展阅读: JEP 407:移除 RMI Activation

9. JEP 409:密封类(Sealed Classes)

Sealed Classes 在 Java 15 中的 JEP 360 中提出,在 Java 16 中的 JEP 397 再次预览,现在 Java 17 中成为正式的功能,相比 Java 16 并没有功能变化,这里不再重复介绍,想了解的可以参考之前文章。

扩展阅读:Java 16 新功能介绍JEP 409: Sealed Classes

10. JEP 401:移除实验性的 AOT 和 JIT 编译器

在 Java 9 的 JEP 295 中,引入了实验性的提前编译 jaotc 工具,但是这个特性自从引入依赖用处都不太大,而且需要大量的维护工作,所以在 Java 17 中决定删除这个特性。

主要移除了三个 JDK 模块:

  1. jdk.aot – jaotc 工具。
  2. Jdk.internal.vm.compiler – Graal 编译器。
  3. jdk.internal.vm.compiler.management

同时也移除了部分与 AOT 编译相关的 HotSpot 代码:

  1. src/hotspot/share/aot — dumps and loads AOT code
  2. Additional code guarded by #if INCLUDE_AOT

11. JEP 411:弃用 Security Manager

Security Manager 在 JDK 1.0 时就已经引入,但是它一直都不是保护服务端以及客户端 Java 代码的主要手段,为了 Java 的继续发展,决定弃用 Security Manager,在不久的未来进行删除。

@Deprecated(since="17", forRemoval=true)
public class SecurityManager {
	// ...
}

12. JEP 412:外部函数和内存 API (孵化)

新的 API 允许 Java 开发者与 JVM 之外的代码和数据进行交互,通过调用外部函数,可以在不使用 JNI 的情况下调用本地库。

这是一个孵化功能;需要添加--add-modules jdk.incubator.foreign来编译和运行 Java 代码。

历史

  • Java 14 JEP 370引入了外部内存访问 API(孵化器)。
  • Java 15 JEP 383引入了外部内存访问 API(第二孵化器)。
  • Java 16 JEP 389引入了外部链接器 API(孵化器)。
  • Java 16 JEP 393引入了外部内存访问 API(第三孵化器)。
  • Java 17 JEP 412引入了外部函数和内存 API(孵化器)。

扩展阅读:JEP 412:外部函数和内存 API (孵化)

13. JEP 414:Vector API(二次孵化)

在 Java 16 中引入一个新的 API 来进行向量计算,它可以在运行时可靠的编译为支持的 CPU 架构,从而实现更优的计算能力。

现在 Java 17 中改进了 Vector API 性能,增强了例如对字符的操作、字节向量与布尔数组之间的相互转换等功能。

14. JEP 415:指定上下文的反序列化过滤器

Java 中的序列化一直都是非常重要的功能,如果没有序列化功能,Java 可能都不会占据开发语言的主导地位,序列化让远程处理变得容易和透明,同时也促进了 Java EE 的成功。

但是 Java 序列化的问题也很多,它几乎会犯下所有的可以想象的错误,为开发者带来持续的维护工作。但是要说明的是序列化的概念是没有错的,把对象转换为可以在 JVM 之间自由传输,并且可以在另一端重新构建的能力是完全合理的想法,问题在于 Java 中的序列化设计存在风险,以至于爆出过很多和序列化相关的漏洞。

反序列化危险的一个原因是,有时候我们不好验证将要进行反序列化的内容是否存在风险,而传入的数据流可以自由引用对象,很有可能这个数据流就是攻击者精心构造的恶意代码。

所以,JEP 415 允许在反序列化时,通过一个过滤配置,来告知本次反序列化允许或者禁止操作的类,反序列化时碰到被禁止的类,则会反序列化失败。

14.1. 反序列化示例

假设 Dog 类中的 Poc 是恶意构造的类,但是正常反序列化是可以成功的。

package com.wdbyte.java17;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;

/**
 * @author niulang
 */
public class JEP415 {
    public static void main(String[] args) throws IOException, ClassNotFoundException {
        Dog dog = new Dog("哈士奇");
        dog.setPoc(new Poc());
        // 序列化 - 对象转字节数组
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);) {
            objectOutputStream.writeObject(dog);
        }
        byte[] bytes = byteArrayOutputStream.toByteArray();
        // 反序列化 - 字节数组转对象
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
        ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream);
        Object object = objectInputStream.readObject();
        System.out.println(object.toString());
    }
}

class Dog implements Serializable {
    private String name;
    private Poc poc;

    public Dog(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return "Dog{" + "name='" + name + '\'' + '}';
    }
		// get...set...
}

class Poc implements Serializable{

}

输出结果:

Dog{name='哈士奇'}

14.2. 反序列化过滤器

在 Java 17 中可以自定义反序列化过滤器,拦截不允许的类。

package com.wdbyte.java17;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputFilter;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;

/**
 * @author niulang
 */
public class JEP415 {
    public static void main(String[] args) throws IOException, ClassNotFoundException {
        Dog dog = new Dog("哈士奇");
        dog.setPoc(new Poc());
        // 序列化 - 对象转字节数组
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);) {
            objectOutputStream.writeObject(dog);
        }
        byte[] bytes = byteArrayOutputStream.toByteArray();
        // 反序列化 - 字节数组转对象
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
        ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream);
        // 允许 com.wdbyte.java17.Dog 类,允许 java.base 中的所有类,拒绝其他任何类
        ObjectInputFilter filter = ObjectInputFilter.Config.createFilter(
                        "com.wdbyte.java17.Dog;java.base/*;!*");
        objectInputStream.setObjectInputFilter(filter);
        Object object = objectInputStream.readObject();
        System.out.println(object.toString());
    }
}

class Dog implements Serializable {
    private String name;
    private Poc poc;

    public Dog(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return "Dog{" + "name='" + name + '\'' + '}';
    }
		// get...set...
}

class Poc implements Serializable{
}

这时反序列化会得到异常。

Exception in thread "main" java.io.InvalidClassException: filter status: REJECTED
	at java.base/java.io.ObjectInputStream.filterCheck(ObjectInputStream.java:1412)
	at java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2053)
	at java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1907)
	....

扩展阅读:JEP 415:指定上下文的反序列化过滤器

 

文章转自:https://www.cnblogs.com/niumoo/p/15522730.html

kafka-configs.sh命令行使用方法

kafka的一个动态配置工具,可以通过命令行对系统进行动态配置,但并不是所有的配置项都支持的。

$ ./kafka-configs.bat
        This tool helps to manipulate and describe entity config for a topic, client, user or broker
        Option                                 Description
        ------                                 -----------
        --add-config <String>                  Key Value pairs of configs to add.
        Square brackets can be used to group
        values which contain commas: 'k1=v1,
        k2=[v1,v2,v2],k3=v3'. The following
        is a list of valid configurations:
        For entity-type 'topics':
        cleanup.policy
        compression.type
        delete.retention.ms
        file.delete.delay.ms
        flush.messages
        flush.ms
        follower.replication.throttled.
        replicas
        index.interval.bytes
        leader.replication.throttled.replicas
        max.compaction.lag.ms
        max.message.bytes
        message.downconversion.enable
        message.format.version
        message.timestamp.difference.max.ms
        message.timestamp.type
        min.cleanable.dirty.ratio
        min.compaction.lag.ms
        min.insync.replicas
        preallocate
        retention.bytes
        retention.ms
        segment.bytes
        segment.index.bytes
        segment.jitter.ms
        segment.ms
        unclean.leader.election.enable
        For entity-type 'brokers':
        log.message.timestamp.type
        ssl.client.auth
        log.retention.ms
        sasl.login.refresh.window.jitter
        sasl.kerberos.ticket.renew.window.
        factor
        log.preallocate
        log.index.size.max.bytes
        sasl.login.refresh.window.factor
        ssl.truststore.type
        ssl.keymanager.algorithm
        log.cleaner.io.buffer.load.factor
        sasl.login.refresh.min.period.seconds
        ssl.key.password
        background.threads
        log.retention.bytes
        ssl.trustmanager.algorithm
        log.segment.bytes
        max.connections.per.ip.overrides
        log.cleaner.delete.retention.ms
        log.segment.delete.delay.ms
        min.insync.replicas
        ssl.keystore.location
        ssl.cipher.suites
        log.roll.jitter.ms
        log.cleaner.backoff.ms
        sasl.jaas.config
        principal.builder.class
log.flush.interval.ms
        log.cleaner.max.compaction.lag.ms
        max.connections
        log.cleaner.dedupe.buffer.size
        log.flush.interval.messages
        advertised.listeners
        num.io.threads
        listener.security.protocol.map
        log.message.downconversion.enable
        sasl.enabled.mechanisms
        sasl.login.refresh.buffer.seconds
        ssl.truststore.password
        listeners
        metric.reporters
        ssl.protocol
        sasl.kerberos.ticket.renew.jitter
        ssl.keystore.password
        sasl.mechanism.inter.broker.protocol
        log.cleanup.policy
        sasl.kerberos.principal.to.local.rules
        sasl.kerberos.min.time.before.relogin
        num.recovery.threads.per.data.dir
        log.cleaner.io.max.bytes.per.second
        log.roll.ms
        ssl.endpoint.identification.algorithm
        unclean.leader.election.enable
        message.max.bytes
        log.cleaner.threads
        log.cleaner.io.buffer.size
        max.connections.per.ip
        sasl.kerberos.service.name
        ssl.provider
        follower.replication.throttled.rate
        log.index.interval.bytes
        log.cleaner.min.compaction.lag.ms
        log.message.timestamp.difference.max.
        ms
        ssl.enabled.protocols
        log.cleaner.min.cleanable.ratio
        replica.alter.log.dirs.io.max.bytes.
        per.second
        ssl.keystore.type
        ssl.secure.random.implementation
        ssl.truststore.location
        sasl.kerberos.kinit.cmd
        leader.replication.throttled.rate
        num.network.threads
        compression.type
        num.replica.fetchers
        For entity-type 'users':
        request_percentage
        producer_byte_rate
        SCRAM-SHA-256
        SCRAM-SHA-512
        consumer_byte_rate
        For entity-type 'clients':
        request_percentage
        producer_byte_rate
        consumer_byte_rate
        Entity types 'users' and 'clients' may
        be specified together to update
        config for clients of a specific
        user.
        --alter                                Alter the configuration for the entity.
        --bootstrap-server <String: server to  The Kafka server to connect to. This
        connect to>                            is required for describing and
        altering broker configs.
        --command-config <String: command      Property file containing configs to be
        config property file>                  passed to Admin Client. This is used
        only with --bootstrap-server option
        for describing and altering broker
        configs.
        --delete-config <String>               config keys to remove 'k1,k2'
        --describe                             List configs for the given entity.
        --entity-default                       Default entity name for
        clients/users/brokers (applies to
        corresponding entity type in command
        line)
        --entity-name <String>                 Name of entity (topic name/client
        id/user principal name/broker id)
        --entity-type <String>                 Type of entity
        (topics/clients/users/brokers)
        --force                                Suppress console prompts
        --help                                 Print usage information.
        --version                              Display Kafka version.
        --zookeeper <String: urls>             REQUIRED: The connection string for
        the zookeeper connection in the form
        host:port. Multiple URLS can be
        given to allow fail-over.

网上找了一张图,翻译解释说明:

001

使用方法(例子):

  • 增加/编辑配置

./kafka-configs.sh –zookeeper localhost:2181/kafkacluster –alter –entity-type topics –entity-name topicName –add-config ‘k1=v1, k2=v2, k3=v3′
./kafka-configs.sh –zookeeper localhost:2181/kafkacluster –alter –entity-type brokers –entity-name 1 –add-config ‘retention.bytes=1024072′
./kafka-configs.sh –zookeeper localhost:2181/kafkacluster –alter –entity-type brokers –entity-default –add-config ‘retention.bytes=1024072′
./kafka-configs.sh –zookeeper localhost:2181/kafkacluster –alter –entity-type clients –entity-default –add-config ‘retention.bytes=1024072′

  • 删除配置

./kafka-configs.sh –zookeeper localhost:2181/kafkacluster –alter –entity-type topics –entity-name topicName –delete-config ‘k1,k2,k3’
./kafka-configs.sh –zookeeper localhost:2181/kafkacluster –alter –entity-type clients –entity-name clientId –delete-config ‘k1,k2,k3’
./kafka-configs.sh –bootstrap-server localhost:9092 –alter –entity-type brokers –entity-name $brokerId –delete-config ‘k1,k2,k3’
./kafka-configs.sh –bootstrap-server localhost:9092 –alter –entity-type brokers –entity-default –delete-config ‘k1,k2,k3’

  • 列出配置项的描述信息

./kafka-configs.sh –zookeeper localhost:2181/kafkacluster –entity-type topics –entity-name topicName –describe
./kafka-configs.sh–bootstrap-server localhost:9092 –entity-type brokers –entity-name $brokerId –describe
./kafka-configs.sh –bootstrap-server localhost:9092 –entity-type brokers –entity-default –describe
./kafka-configs.sh –zookeeper localhost:2181/kafkacluster –entity-type users –entity-name user1 –entity-type clients –entity-name clientA –describe

 

 

WRONGTYPE Operation against a key holding the wrong kind of value

redis报错:

2022/07/20 18:12:32.277 ERROR [0a2cd65e1bef3f0c][task-304318095] o.s.a.i.SimpleAsyncUncaughtExceptionHandler : Unexpected exception occurred invoking async method: public void com.test.app.system.bus.listener.DeviceUpwardListener.onApplicationEvent(com.test.app.system.bus.e.DeviceEvent)
redis.clients.jedis.exceptions.JedisDataException: WRONGTYPE Operation against a key holding the wrong kind of value
at redis.clients.jedis.Protocol.processError(Protocol.java:132) ~[jedis-3.1.0.jar!/:na]
at redis.clients.jedis.Protocol.process(Protocol.java:166) ~[jedis-3.1.0.jar!/:na]
at redis.clients.jedis.Protocol.read(Protocol.java:220) ~[jedis-3.1.0.jar!/:na]
at redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:318) ~[jedis-3.1.0.jar!/:na]
at redis.clients.jedis.Connection.getIntegerReply(Connection.java:260) ~[jedis-3.1.0.jar!/:na]
at redis.clients.jedis.BinaryJedis.hexists(BinaryJedis.java:1034) ~[jedis-3.1.0.jar!/:na]

 

原因是写入redis的key存在两个(多个),但是类型不一样;

如:

jedis.set(“test”,1);

jedis.hset(“test”,”key”1);

此时testkey就存在两种类型且两个,则有可能会报这个错误。

解决方法就是删除重复的键值,建议每一种set方法都加入一个redis方法名作为前缀,这样子就不可能会出现重复的键。

如:

jedis.set(“set:test”,1);

jedis.hset(“hset:test”,”key”1);

12345632
 
Copyright © 2008-2021 lanxinbase.com Rights Reserved. | 粤ICP备14086738号-3 |