Micronaut 使用低級 HTTP 客戶端

2023-03-08 14:30 更新

HttpClient 接口構成了低級 API 的基礎。此接口聲明有助于簡化執(zhí)行 HTTP 請求和接收響應的方法。

HttpClient 接口中的大多數(shù)方法都返回 Reactive Streams Publisher 實例,這并不總是最有用的接口。

Micronaut 的 Reactor HTTP Client 依賴項附帶一個名為 ReactorHttpClient 的子接口。它提供了返回 Project Reactor Flux 類型的 HttpClient 接口的變體。

發(fā)送您的第一個 HTTP 請求

獲取 HttpClient

有幾種方法可以獲取對 HttpClient 的引用。最常見的是使用 Client 注釋。例如:

注入 HTTP 客戶端

@Client("https://api.twitter.com/1.1") @Inject HttpClient httpClient;

上面的示例注入了一個以 Twitter API 為目標的客戶端。

@field:Client("\${myapp.api.twitter.url}") @Inject lateinit var httpClient: HttpClient

上面的 Kotlin 示例使用配置路徑注入了一個以 Twitter API 為目標的客戶端。請注意“\${path.to.config}”上所需的轉義(反斜杠),這是由于 Kotlin 字符串插值所必需的。

Client 注釋也是一個自定義范圍,用于管理 HttpClient 實例的創(chuàng)建并確保它們在應用程序關閉時停止。

您傳遞給 Client 注釋的值可以是以下之一:

  • 絕對 URI,例如https://api.twitter.com/1.1

  • 相對 URI,在這種情況下目標服務器將是當前服務器(用于測試)

  • 服務標識符。

另一種創(chuàng)建 HttpClient 的方法是使用 HttpClient 的靜態(tài)創(chuàng)建方法,但是不推薦使用這種方法,因為您必須確保手動關閉客戶端,當然創(chuàng)建的客戶端不會發(fā)生依賴注入。

執(zhí)行 HTTP GET

使用 HttpClient 時,通常有兩種感興趣的方法。第一個是 retrieve,它執(zhí)行一個 HTTP 請求并以您作為 Publisher 請求的任何類型(默認為 String)返回正文。

retrieve 方法接受一個 HttpRequest 或一個字符串 URI 到您希望請求的端點。

以下示例顯示如何使用 retrieve 執(zhí)行 HTTP GET 并將響應主體作為字符串接收:

使用檢索

 Java Groovy  Kotlin 
String uri = UriBuilder.of("/hello/{name}")
                       .expand(Collections.singletonMap("name", "John"))
                       .toString();
assertEquals("/hello/John", uri);

String result = client.toBlocking().retrieve(uri);

assertEquals("Hello John", result);
when:
String uri = UriBuilder.of("/hello/{name}")
                       .expand(name: "John")
then:
"/hello/John" == uri

when:
String result = client.toBlocking().retrieve(uri)

then:
"Hello John" == result
val uri = UriBuilder.of("/hello/{name}")
                    .expand(Collections.singletonMap("name", "John"))
                    .toString()
uri shouldBe "/hello/John"

val result = client.toBlocking().retrieve(uri)

result shouldBe "Hello John"

請注意,在此示例中,出于說明目的,我們調用 toBlocking() 以返回客戶端的阻塞版本。但是,在生產(chǎn)代碼中,您不應該這樣做,而應該依賴 Micronaut HTTP 服務器的非阻塞特性。

例如,以下 @Controller 方法以非阻塞方式調用另一個端點:

不阻塞地使用 HTTP 客戶端

 Java Groovy  Kotlin 
import io.micronaut.http.annotation.Body;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
import io.micronaut.http.annotation.Post;
import io.micronaut.http.annotation.Status;
import io.micronaut.http.client.HttpClient;
import io.micronaut.http.client.annotation.Client;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import io.micronaut.core.async.annotation.SingleResult;
import static io.micronaut.http.HttpRequest.GET;
import static io.micronaut.http.HttpStatus.CREATED;
import static io.micronaut.http.MediaType.TEXT_PLAIN;

@Get("/hello/{name}")
@SingleResult
Publisher<String> hello(String name) { // (1)
    return Mono.from(httpClient.retrieve(GET("/hello/" + name))); // (2)
}
import io.micronaut.http.annotation.Body
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.http.annotation.Post
import io.micronaut.http.annotation.Status
import io.micronaut.http.client.HttpClient
import io.micronaut.http.client.annotation.Client
import org.reactivestreams.Publisher
import io.micronaut.core.async.annotation.SingleResult
import reactor.core.publisher.Mono
import static io.micronaut.http.HttpRequest.GET
import static io.micronaut.http.HttpStatus.CREATED
import static io.micronaut.http.MediaType.TEXT_PLAIN

@Get("/hello/{name}")
@SingleResult
Publisher<String> hello(String name) { // (1)
    Mono.from(httpClient.retrieve( GET("/hello/" + name))) // (2)
}
import io.micronaut.http.HttpRequest.GET
import io.micronaut.http.HttpStatus.CREATED
import io.micronaut.http.MediaType.TEXT_PLAIN
import io.micronaut.http.annotation.Body
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.http.annotation.Post
import io.micronaut.http.annotation.Status
import io.micronaut.http.client.HttpClient
import io.micronaut.http.client.annotation.Client
import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
import io.micronaut.core.async.annotation.SingleResult

@Get("/hello/{name}")
@SingleResult
internal fun hello(name: String): Publisher<String> { // (1)
    return Flux.from(httpClient.retrieve(GET<Any>("/hello/$name")))
                     .next() // (2)
}
  1. hello 方法返回一個 Mono,它可能會或可能不會發(fā)出一個項目。如果未發(fā)出某個項目,則返回 404。

  2. 檢索方法被調用,它返回一個 Flux。這有一個 firstElement 方法返回第一個發(fā)出的項目或什么都不返回

使用 Reactor(如果您愿意,也可以使用 RxJava),您可以輕松高效地編寫多個 HTTP 客戶端調用,而不會阻塞(這會限制您的應用程序的吞吐量和可擴展性)。

調試/跟蹤 HTTP 客戶端

要調試從 HTTP 客戶端發(fā)送和接收的請求,您可以通過 logback.xml 文件啟用跟蹤日志記錄:

logback.xml

<logger name="io.micronaut.http.client" level="TRACE"/>

客戶端特定調試/跟蹤

要啟用特定于客戶端的日志記錄,您可以為所有 HTTP 客戶端配置默認記錄器。您還可以使用特定于客戶端的配置為不同的客戶端配置不同的記錄器。例如,在您的配置文件(例如 application.yml)中:

 Properties Yaml  Toml  Groovy  Hocon  JSON 
micronaut.http.client.logger-name=mylogger
micronaut.http.services.otherClient.logger-name=other.client
micronaut:
  http:
    client:
      logger-name: mylogger
    services:
      otherClient:
        logger-name: other.client
[micronaut]
  [micronaut.http]
    [micronaut.http.client]
      logger-name="mylogger"
    [micronaut.http.services]
      [micronaut.http.services.otherClient]
        logger-name="other.client"
micronaut {
  http {
    client {
      loggerName = "mylogger"
    }
    services {
      otherClient {
        loggerName = "other.client"
      }
    }
  }
}
{
  micronaut {
    http {
      client {
        logger-name = "mylogger"
      }
      services {
        otherClient {
          logger-name = "other.client"
        }
      }
    }
  }
}
{
  "micronaut": {
    "http": {
      "client": {
        "logger-name": "mylogger"
      },
      "services": {
        "otherClient": {
          "logger-name": "other.client"
        }
      }
    }
  }
}

然后在 logback.xml 中啟用日志記錄:

logback.xml

<logger name="mylogger" level="DEBUG"/>
<logger name="other.client" level="TRACE"/>

自定義 HTTP 請求

前面的示例演示了使用 HttpRequest 接口的靜態(tài)方法來構造 MutableHttpRequest 實例。顧名思義,MutableHttpRequest 可以改變,包括添加標頭、自定義請求正文等的能力。例如:

傳遞一個 HttpRequest 來檢索

 Java Groovy  Kotlin 
Flux<String> response = Flux.from(client.retrieve(
        GET("/hello/John")
        .header("X-My-Header", "SomeValue")
));
Flux<String> response = Flux.from(client.retrieve(
        GET("/hello/John")
        .header("X-My-Header", "SomeValue")
))
val response = client.retrieve(
        GET<Any>("/hello/John")
                .header("X-My-Header", "SomeValue")
)

上面的示例在發(fā)送之前向響應添加一個標頭(X-My-Header)。 MutableHttpRequest 接口有更多方便的方法,可以很容易地以常用的方式修改請求。

讀取 JSON 響應

微服務通常使用 JSON 等消息編碼格式。 Micronaut 的 HTTP 客戶端利用 Jackson 進行 JSON 解析,因此 Jackson 可以解碼的任何類型都可以作為第二個參數(shù)傳遞給檢索。

例如,考慮以下返回 JSON 響應的 @Controller 方法:

從控制器返回 JSON

 Java Groovy  Kotlin 
@Get("/greet/{name}")
Message greet(String name) {
    return new Message("Hello " + name);
}
@Get("/greet/{name}")
Message greet(String name) {
    new Message("Hello $name")
}
@Get("/greet/{name}")
internal fun greet(name: String): Message {
    return Message("Hello $name")
}

上面的方法返回一個 Message 類型的 POJO,如下所示:

Message POJO

 Java Groovy  Kotlin 
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

public class Message {

    private final String text;

    @JsonCreator
    public Message(@JsonProperty("text") String text) {
        this.text = text;
    }

    public String getText() {
        return text;
    }
}
import com.fasterxml.jackson.annotation.JsonCreator
import com.fasterxml.jackson.annotation.JsonProperty

class Message {

    final String text

    @JsonCreator
    Message(@JsonProperty("text") String text) {
        this.text = text
    }
}
import com.fasterxml.jackson.annotation.JsonCreator
import com.fasterxml.jackson.annotation.JsonProperty

class Message @JsonCreator
constructor(@param:JsonProperty("text") val text: String)

Jackson注解用于映射構造函數(shù)

在客戶端,您可以調用此端點并使用 retrieve 方法將 JSON 解碼為映射,如下所示:

將響應主體解碼為 Map

 Java Groovy  Kotlin 
response = Flux.from(client.retrieve(
        GET("/greet/John"),
        Argument.of(Map.class, String.class, String.class) // (1)
));
response = Flux.from(client.retrieve(
        GET("/greet/John"),
        Argument.of(Map, String, String) // (1)
))
var response: Flux<Map<*, *>> = Flux.from(client.retrieve(
        GET<Any>("/greet/John"), Map::class.java
))
  1. Argument.of 方法返回一個 Map,其中鍵和值類型為 String

雖然檢索 JSON 作為映射可能是可取的,但通常您希望將對象解碼為 POJO。為此,請改為傳遞類型:

將響應主體解碼為 POJO

 Java Groovy  Kotlin 
Flux<Message> response = Flux.from(client.retrieve(
        GET("/greet/John"), Message.class
));

assertEquals("Hello John", response.blockFirst().getText());
when:
Flux<Message> response = Flux.from(client.retrieve(
        GET("/greet/John"), Message
))

then:
"Hello John" == response.blockFirst().getText()
val response = Flux.from(client.retrieve(
        GET<Any>("/greet/John"), Message::class.java
))

response.blockFirst().text shouldBe "Hello John"

請注意如何在客戶端和服務器上使用相同的 Java 類型。這意味著您通常會定義一個公共 API 項目,在該項目中定義用于定義 API 的接口和類型。

解碼其他內容類型

如果您與之通信的服務器使用非 JSON 的自定義內容類型,默認情況下 Micronaut 的 HTTP 客戶端將不知道如何解碼這種類型。

要解決此問題,請將 MediaTypeCodec 注冊為一個 bean,它會被自動拾取并用于解碼(或編碼)消息。

接收完整的 HTTP 響應

有時僅接收響應主體是不夠的,您還需要響應中的其他信息,例如標頭、cookie 等。在這種情況下,不要使用 retrieve 方法,而是使用 exchange 方法:

接收完整的 HTTP 響應

 Java Groovy  Kotlin 
Flux<HttpResponse<Message>> call = Flux.from(client.exchange(
        GET("/greet/John"), Message.class // (1)
));

HttpResponse<Message> response = call.blockFirst();
Optional<Message> message = response.getBody(Message.class); // (2)
// check the status
assertEquals(HttpStatus.OK, response.getStatus()); // (3)
// check the body
assertTrue(message.isPresent());
assertEquals("Hello John", message.get().getText());
when:
Flux<HttpResponse<Message>> call = Flux.from(client.exchange(
        GET("/greet/John"), Message // (1)
))

HttpResponse<Message> response = call.blockFirst();
Optional<Message> message = response.getBody(Message) // (2)
// check the status
then:
HttpStatus.OK == response.getStatus() // (3)
// check the body
message.isPresent()
"Hello John" == message.get().getText()
val call = client.exchange(
        GET<Any>("/greet/John"), Message::class.java // (1)
)

val response = Flux.from(call).blockFirst()
val message = response.getBody(Message::class.java) // (2)
// check the status
response.status shouldBe HttpStatus.OK // (3)
// check the body
message.isPresent shouldBe true
message.get().text shouldBe "Hello John"
  1. 交換方法接收 HttpResponse

  2. 使用響應的 getBody(..) 方法檢索正文

  3. 可以檢查響應的其他方面,例如 HttpStatus

上面的示例接收完整的 HttpResponse,您可以從中獲取標頭和其他有用信息。

發(fā)布請求正文

到目前為止,所有示例都使用了相同的 HTTP 方法,即 GET。 HttpRequest 接口具有適用于所有不同 HTTP 方法的工廠方法。下表總結了它們:

表 1. HttpRequest 工廠方法
方法 描述 Allows Body

HttpRequest.GET(java.lang.String)

構造一個 HTTP GET 請求

false

HttpRequest.OPTIONS(java.lang.String)

構造一個 HTTP OPTIONS 請求

false

HttpRequest.HEAD(java.lang.String)

構造一個 HTTP HEAD 請求

false

HttpRequest.POST(java.lang.String,T)

構造一個 HTTP POST 請求

true

HttpRequest.PUT(java.lang.String,T)

構造一個 HTTP PUT 請求

true

HttpRequest.PATCH(java.lang.String,T)

構造一個 HTTP PATCH 請求

true

HttpRequest.DELETE(java.lang.String)

構造一個 HTTP DELETE 請求

true

還存在一個創(chuàng)建方法來構造任何 HttpMethod 類型的請求。由于 POST、PUT 和 PATCH 方法需要主體,因此需要第二個參數(shù),即主體對象。

以下示例演示了如何發(fā)送簡單的 String 正文:

發(fā)送字符串正文

 Java Groovy  Kotlin 
Flux<HttpResponse<String>> call = Flux.from(client.exchange(
        POST("/hello", "Hello John") // (1)
            .contentType(MediaType.TEXT_PLAIN_TYPE)
            .accept(MediaType.TEXT_PLAIN_TYPE), // (2)
        String.class // (3)
));
Flux<HttpResponse<String>> call = Flux.from(client.exchange(
        POST("/hello", "Hello John") // (1)
            .contentType(MediaType.TEXT_PLAIN_TYPE)
            .accept(MediaType.TEXT_PLAIN_TYPE), // (2)
        String // (3)
))
val call = client.exchange(
        POST("/hello", "Hello John") // (1)
                .contentType(MediaType.TEXT_PLAIN_TYPE)
                .accept(MediaType.TEXT_PLAIN_TYPE), String::class.java // (3)
)
  1. 使用POST方法;第一個參數(shù)是 URI,第二個是主體

  2. 內容類型和接受類型設置為text/plain(默認為application/json)

  3. 預期的響應類型是 String

發(fā)送 JSON

前面的示例發(fā)送純文本。要發(fā)送 JSON,將要編碼的對象傳遞給 JSON(無論是 Map 還是 POJO),只要 Jackson 能夠對其進行編碼。

例如,您可以從上一節(jié)創(chuàng)建一個 Message 并將其傳遞給 POST 方法:

Sending a JSON body

 Java Groovy  Kotlin 
Flux<HttpResponse<Message>> call = Flux.from(client.exchange(
        POST("/greet", new Message("Hello John")), // (1)
        Message.class // (2)
));
Flux<HttpResponse<Message>> call = Flux.from(client.exchange(
        POST("/greet", new Message("Hello John")), // (1)
        Message // (2)
))
val call = client.exchange(
        POST("/greet", Message("Hello John")), Message::class.java // (2)
)
  1. 創(chuàng)建 Message 實例并將其傳遞給 POST 方法

  2. 同一個類解碼響應

在上面的示例中,以下 JSON 作為請求的主體發(fā)送:

Resulting JSON

{"text":"Hello John"}

可以使用 Jackson Annotations 自定義 JSON。

使用 URI 模板

如果在 URI 中包含對象的某些屬性,則可以使用 URI 模板。

例如,假設您有一個帶有 title 屬性的 Book 類。您可以在 URI 模板中包含標題,然后從 Book 的實例中填充它。例如:

Sending a JSON body with a URI template

 Java Groovy  Kotlin 
Flux<HttpResponse<Book>> call = Flux.from(client.exchange(
        POST("/amazon/book/{title}", new Book("The Stand")),
        Book.class
));
Flux<HttpResponse<Book>> call = client.exchange(
        POST("/amazon/book/{title}", new Book("The Stand")),
        Book
);
val call = client.exchange(
        POST("/amazon/book/{title}", Book("The Stand")),
        Book::class.java
)

在上述情況下,title 屬性包含在 URI 中。

發(fā)送表單數(shù)據(jù)

您還可以將 POJO 或地圖編碼為表單數(shù)據(jù)而不是 JSON。只需在發(fā)布請求中將內容類型設置為 application/x-www-form-urlencoded:

Sending a Form Data

 Java Groovy  Kotlin 
Flux<HttpResponse<Book>> call = Flux.from(client.exchange(
        POST("/amazon/book/{title}", new Book("The Stand"))
        .contentType(MediaType.APPLICATION_FORM_URLENCODED),
        Book.class
));
Flux<HttpResponse<Book>> call = client.exchange(
        POST("/amazon/book/{title}", new Book("The Stand"))
        .contentType(MediaType.APPLICATION_FORM_URLENCODED),
        Book
)
val call = client.exchange(
        POST("/amazon/book/{title}", Book("The Stand"))
                .contentType(MediaType.APPLICATION_FORM_URLENCODED),
        Book::class.java
)

請注意,Jackson 也可以綁定表單數(shù)據(jù),因此要自定義綁定過程,請使用 Jackson 注釋。

多部分客戶端上傳

Micronaut HTTP 客戶端支持多部分請求。要構建多部分請求,請將內容類型設置為 multipart/form-data 并將正文設置為 MultipartBody 的實例。

例如:

Creating the body

 Java Groovy  Kotlin 
import io.micronaut.http.client.multipart.MultipartBody;

String toWrite = "test file";
File file = File.createTempFile("data", ".txt");
FileWriter writer = new FileWriter(file);
writer.write(toWrite);
writer.close();

MultipartBody requestBody = MultipartBody.builder()     // (1)
        .addPart(                                       // (2)
            "data",
            file.getName(),
            MediaType.TEXT_PLAIN_TYPE,
            file
        ).build();                                      // (3)
import io.micronaut.http.multipart.CompletedFileUpload
import io.micronaut.http.multipart.StreamingFileUpload
import io.micronaut.http.client.multipart.MultipartBody
import org.reactivestreams.Publisher

File file = new File(uploadDir, "data.txt")
file.text = "test file"
file.createNewFile()


MultipartBody requestBody = MultipartBody.builder()     // (1)
        .addPart(                                       // (2)
            "data",
            file.name,
            MediaType.TEXT_PLAIN_TYPE,
            file
        ).build()                                       // (3)
import io.micronaut.http.client.multipart.MultipartBody

val toWrite = "test file"
val file = File.createTempFile("data", ".txt")
val writer = FileWriter(file)
writer.write(toWrite)
writer.close()

val requestBody = MultipartBody.builder()     // (1)
        .addPart(                             // (2)
                "data",
                file.name,
                MediaType.TEXT_PLAIN_TYPE,
                file
        ).build()                             // (3)
  1. 創(chuàng)建一個 MultipartBody 構建器,用于向主體添加部件。

  2. 將一個部分添加到正文中,在本例中是一個文件。此方法在 MultipartBody.Builder 中有不同的變體。

  3. build 方法將構建器中的所有部件組裝成一個 MultipartBody。至少需要一個部分。

創(chuàng)建請求

 Java Groovy  Kotlin 
HttpRequest.POST("/multipart/upload", requestBody)    // (1)
           .contentType(MediaType.MULTIPART_FORM_DATA_TYPE) // (2)
HttpRequest.POST("/multipart/upload", requestBody)      // (1)
           .contentType(MediaType.MULTIPART_FORM_DATA_TYPE) // (2)
HttpRequest.POST("/multipart/upload", requestBody)    // (1)
           .contentType(MediaType.MULTIPART_FORM_DATA_TYPE) // (2)
  1. 具有不同類型數(shù)據(jù)的多部分請求正文。

  2. 將請求的內容類型標頭設置為 multipart/form-data。

通過 HTTP 流式傳輸 JSON

Micronaut 的 HTTP 客戶端支持通過 ReactorStreamingHttpClient 接口通過 HTTP 流式傳輸數(shù)據(jù),該接口包括特定于流式傳輸?shù)姆椒?,包括?/p>

表 1. HTTP 流媒體方法
方法 描述

dataStream(HttpRequest<I> request)

將數(shù)據(jù)流作為 ByteBuffer 的 Flux 返回

exchangeStream(HttpRequest<I> request)

返回包裝 ByteBuffer 的 Flux 的 HttpResponse

jsonStream(HttpRequest<I> request)

返回一個非阻塞的 JSON 對象流

要使用 JSON 流,請在服務器上聲明一個控制器方法,該方法返回 JSON 對象的 application/x-json-stream。例如:

Streaming JSON on the Server

 Java Groovy  Kotlin 
import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;

@Get(value = "/headlines", processes = MediaType.APPLICATION_JSON_STREAM) // (1)
Publisher<Headline> streamHeadlines() {
    return Mono.fromCallable(() -> {  // (2)
        Headline headline = new Headline();
        headline.setText("Latest Headline at " + ZonedDateTime.now());
        return headline;
    }).repeat(100) // (3)
      .delayElements(Duration.of(1, ChronoUnit.SECONDS)); // (4)
}
import io.micronaut.http.MediaType
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono

import java.time.Duration
import java.time.ZonedDateTime
import java.time.temporal.ChronoUnit
import java.util.concurrent.TimeUnit

@Get(value = "/headlines", processes = MediaType.APPLICATION_JSON_STREAM) // (1)
Flux<Headline> streamHeadlines() {
    Mono.fromCallable({ // (2)
        new Headline(text: "Latest Headline at ${ZonedDateTime.now()}")
    }).repeat(100) // (3)
            .delayElements(Duration.of(1, ChronoUnit.SECONDS)) // (4)
}
import io.micronaut.http.MediaType.APPLICATION_JSON_STREAM
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import java.time.Duration
import java.time.ZonedDateTime
import java.time.temporal.ChronoUnit
import java.util.concurrent.TimeUnit.SECONDS

@Get(value = "/headlines", processes = [APPLICATION_JSON_STREAM]) // (1)
internal fun streamHeadlines(): Flux<Headline> {
    return Mono.fromCallable { // (2)
        val headline = Headline()
        headline.text = "Latest Headline at ${ZonedDateTime.now()}"
        headline
    }.repeat(100) // (3)
     .delayElements(Duration.of(1, ChronoUnit.SECONDS)) // (4)
}
  1. streamHeadlines 方法產(chǎn)生 application/x-json-stream

  2. Flux 是從 Callable 函數(shù)創(chuàng)建的(注意函數(shù)內不會發(fā)生阻塞,所以這沒問題,否則你應該訂閱 I/O 線程池)。

  3. Flux 重復 100 次

  4. Flux 發(fā)射物品,每個物品之間有 1 秒的延遲

服務器不必用 Micronaut 編寫,任何支持 JSON 流的服務器都可以。

然后在客戶端上,使用 jsonStream 訂閱流,每次服務器發(fā)出 JSON 對象時,客戶端都會解碼并使用它:

在客戶端流式傳輸 JSON

 Java Groovy  Kotlin 
Flux<Headline> headlineStream = Flux.from(client.jsonStream(
        GET("/streaming/headlines"), Headline.class)); // (1)
CompletableFuture<Headline> future = new CompletableFuture<>(); // (2)
headlineStream.subscribe(new Subscriber<Headline>() {
    @Override
    public void onSubscribe(Subscription s) {
        s.request(1); // (3)
    }

    @Override
    public void onNext(Headline headline) {
        System.out.println("Received Headline = " + headline.getText());
        future.complete(headline); // (4)
    }

    @Override
    public void onError(Throwable t) {
        future.completeExceptionally(t); // (5)
    }

    @Override
    public void onComplete() {
        // no-op // (6)
    }
});
Flux<Headline> headlineStream = Flux.from(client.jsonStream(
        GET("/streaming/headlines"), Headline)) // (1)
CompletableFuture<Headline> future = new CompletableFuture<>() // (2)
headlineStream.subscribe(new Subscriber<Headline>() {
    @Override
    void onSubscribe(Subscription s) {
        s.request(1) // (3)
    }

    @Override
    void onNext(Headline headline) {
        println "Received Headline = $headline.text"
        future.complete(headline) // (4)
    }

    @Override
    void onError(Throwable t) {
        future.completeExceptionally(t) // (5)
    }

    @Override
    void onComplete() {
        // no-op // (6)
    }
})
val headlineStream = client.jsonStream(
    GET<Any>("/streaming/headlines"), Headline::class.java) // (1)
val future = CompletableFuture<Headline>() // (2)
headlineStream.subscribe(object : Subscriber<Headline> {
    override fun onSubscribe(s: Subscription) {
        s.request(1) // (3)
    }

    override fun onNext(headline: Headline) {
        println("Received Headline = ${headline.text!!}")
        future.complete(headline) // (4)
    }

    override fun onError(t: Throwable) {
        future.completeExceptionally(t) // (5)
    }

    override fun onComplete() {
        // no-op // (6)
    }
})
  1. jsonStream 方法返回一個 Flux

  2. CompletableFuture 用于接收值,但是您對每個發(fā)出的項目執(zhí)行的操作是特定于應用程序的

  3. 訂閱請求單個項目。您可以使用訂閱來調節(jié)背壓和需求。

  4. onNext 方法在一個項目被發(fā)出時被調用

  5. 發(fā)生錯誤時調用 onError 方法

  6. onComplete 方法在所有 Headline 實例發(fā)出后被調用

請注意,上例中的服務器和客戶端都不執(zhí)行任何阻塞 I/O。

配置 HTTP 客戶端

所有客戶端的全局配置

默認的 HTTP 客戶端配置是一個名為 DefaultHttpClientConfiguration 的配置屬性,它允許為所有 HTTP 客戶端配置默認行為。例如,在您的配置文件中(例如 application.yml):

更改默認 HTTP 客戶端配置

 Properties Yaml  Toml  Groovy  Hocon  JSON 
micronaut.http.client.read-timeout=5s
micronaut:
  http:
    client:
      read-timeout: 5s
[micronaut]
  [micronaut.http]
    [micronaut.http.client]
      read-timeout="5s"
micronaut {
  http {
    client {
      readTimeout = "5s"
    }
  }
}
{
  micronaut {
    http {
      client {
        read-timeout = "5s"
      }
    }
  }
}
{
  "micronaut": {
    "http": {
      "client": {
        "read-timeout": "5s"
      }
    }
  }
}

上面的示例設置了 HttpClientConfiguration 類的 readTimeout 屬性。

客戶端特定配置

要為每個客戶端單獨配置,有幾個選項。您可以在配置文件(例如 application.yml)中手動配置服務發(fā)現(xiàn)并應用每個客戶端配置:

手動配置 HTTP 服務

 Properties Yaml  Toml  Groovy  Hocon  JSON 
micronaut.http.services.foo.urls[0]=http://foo1
micronaut.http.services.foo.urls[1]=http://foo2
micronaut.http.services.foo.read-timeout=5s
micronaut:
  http:
    services:
      foo:
        urls:
          - http://foo1
          - http://foo2
        read-timeout: 5s
[micronaut]
  [micronaut.http]
    [micronaut.http.services]
      [micronaut.http.services.foo]
        urls=[
          "http://foo1",
          "http://foo2"
        ]
        read-timeout="5s"
micronaut {
  http {
    services {
      foo {
        urls = ["http://foo1", "http://foo2"]
        readTimeout = "5s"
      }
    }
  }
}
{
  micronaut {
    http {
      services {
        foo {
          urls = ["http://foo1", "http://foo2"]
          read-timeout = "5s"
        }
      }
    }
  }
}
{
  "micronaut": {
    "http": {
      "services": {
        "foo": {
          "urls": ["http://foo1", "http://foo2"],
          "read-timeout": "5s"
        }
      }
    }
  }
}
  • 讀取超時應用于 foo 客戶端。

警告:此客戶端配置可以與 @Client 注釋結合使用,通過直接注入 HttpClient 或在客戶端界面上使用。在任何情況下,注釋上的所有其他屬性都將被忽略,除了服務 id。

然后,注入指定的客戶端配置:

注入 HTTP 客戶端

@Client("foo") @Inject ReactorHttpClient httpClient;

您還可以定義一個從 HttpClientConfiguration 擴展的 bean,并確保 javax.inject.Named 注釋適當?shù)孛?

定義 HTTP 客戶端配置 bean

@Named("twitter")
@Singleton
class TwitterHttpClientConfiguration extends HttpClientConfiguration {
   public TwitterHttpClientConfiguration(ApplicationConfiguration configuration) {
        super(configuration);
    }
}

如果您使用服務發(fā)現(xiàn)使用 @Client 注入名為 twitter 的服務,則將選擇此配置:

注入 HTTP 客戶端

@Client("twitter") @Inject ReactorHttpClient httpClient;

或者,如果您不使用服務發(fā)現(xiàn),則可以使用 @Client 的配置成員來引用特定類型:

注入 HTTP 客戶端

@Client(value = "https://api.twitter.com/1.1",
        configuration = TwitterHttpClientConfiguration.class)
@Inject
ReactorHttpClient httpClient;

使用 HTTP 客戶端連接池

處理大量請求的客戶端將受益于啟用 HTTP 客戶端連接池。以下配置為 foo 客戶端啟用池化:

手動配置 HTTP 服務

 Properties Yaml  Toml  Groovy  Hocon  JSON 
micronaut.http.services.foo.urls[0]=http://foo1
micronaut.http.services.foo.urls[1]=http://foo2
micronaut.http.services.foo.pool.enabled=true
micronaut.http.services.foo.pool.max-connections=50
micronaut:
  http:
    services:
      foo:
        urls:
          - http://foo1
          - http://foo2
        pool:
          enabled: true
          max-connections: 50
[micronaut]
  [micronaut.http]
    [micronaut.http.services]
      [micronaut.http.services.foo]
        urls=[
          "http://foo1",
          "http://foo2"
        ]
        [micronaut.http.services.foo.pool]
          enabled=true
          max-connections=50
micronaut {
  http {
    services {
      foo {
        urls = ["http://foo1", "http://foo2"]
        pool {
          enabled = true
          maxConnections = 50
        }
      }
    }
  }
}
{
  micronaut {
    http {
      services {
        foo {
          urls = ["http://foo1", "http://foo2"]
          pool {
            enabled = true
            max-connections = 50
          }
        }
      }
    }
  }
}
{
  "micronaut": {
    "http": {
      "services": {
        "foo": {
          "urls": ["http://foo1", "http://foo2"],
          "pool": {
            "enabled": true,
            "max-connections": 50
          }
        }
      }
    }
  }
}
  • pool 啟用池并為其設置最大連接數(shù)

有關可用池配置選項的詳細信息,請參閱 ConnectionPoolConfiguration 的 API。

配置事件循環(huán)組

默認情況下,Micronaut 為工作線程和所有 HTTP 客戶端線程共享一個通用的 Netty EventLoopGroup。

這個 EventLoopGroup 可以通過 micronaut.netty.event-loops.default 屬性進行配置:

配置默認事件循環(huán)

 Properties Yaml  Toml  Groovy  Hocon  JSON 
micronaut.netty.event-loops.default.num-threads=10
micronaut.netty.event-loops.default.prefer-native-transport=true
micronaut:
  netty:
    event-loops:
      default:
        num-threads: 10
        prefer-native-transport: true
[micronaut]
  [micronaut.netty]
    [micronaut.netty.event-loops]
      [micronaut.netty.event-loops.default]
        num-threads=10
        prefer-native-transport=true
micronaut {
  netty {
    eventLoops {
      'default' {
        numThreads = 10
        preferNativeTransport = true
      }
    }
  }
}
{
  micronaut {
    netty {
      event-loops {
        default {
          num-threads = 10
          prefer-native-transport = true
        }
      }
    }
  }
}
{
  "micronaut": {
    "netty": {
      "event-loops": {
        "default": {
          "num-threads": 10,
          "prefer-native-transport": true
        }
      }
    }
  }
}

您還可以使用 micronaut.netty.event-loops 設置來配置一個或多個額外的事件循環(huán)。下表總結了屬性:

表 1. DefaultEventLoopGroupConfiguration 的配置屬性
屬性 類型 描述

micronaut.netty.event-loops.*.num-threads

int

micronaut.netty.event-loops.*.io-ratio

java.lang.Integer

micronaut.netty.event-loops.*.prefer-native-transport

boolean

micronaut.netty.event-loops.*.executor

java.lang.String

micronaut.netty.event-loops.*.shutdown-quiet-period

java.time.Duration

micronaut.netty.event-loops.*.shutdown-timeout

java.time.Duration

例如,如果您與 HTTP 客戶端的交互涉及 CPU 密集型工作,則可能值得為一個或所有客戶端配置一個單獨的 EventLoopGroup。

以下示例配置了一個名為“other”的附加事件循環(huán)組,其中包含 10 個線程:

配置額外的事件循環(huán)

 Properties Yaml  Toml  Groovy  Hocon  JSON 
micronaut.netty.event-loops.other.num-threads=10
micronaut.netty.event-loops.other.prefer-native-transport=true
micronaut:
  netty:
    event-loops:
      other:
        num-threads: 10
        prefer-native-transport: true
[micronaut]
  [micronaut.netty]
    [micronaut.netty.event-loops]
      [micronaut.netty.event-loops.other]
        num-threads=10
        prefer-native-transport=true
micronaut {
  netty {
    eventLoops {
      other {
        numThreads = 10
        preferNativeTransport = true
      }
    }
  }
}
{
  micronaut {
    netty {
      event-loops {
        other {
          num-threads = 10
          prefer-native-transport = true
        }
      }
    }
  }
}
{
  "micronaut": {
    "netty": {
      "event-loops": {
        "other": {
          "num-threads": 10,
          "prefer-native-transport": true
        }
      }
    }
  }
}

配置附加事件循環(huán)后,您可以更改 HTTP 客戶端配置以使用它:

改變客戶端使用的事件循環(huán)組

 Properties Yaml  Toml  Groovy  Hocon  JSON 
micronaut.http.client.event-loop-group=other
micronaut:
  http:
    client:
      event-loop-group: other
[micronaut]
  [micronaut.http]
    [micronaut.http.client]
      event-loop-group="other"
micronaut {
  http {
    client {
      eventLoopGroup = "other"
    }
  }
}
{
  micronaut {
    http {
      client {
        event-loop-group = "other"
      }
    }
  }
}
{
  "micronaut": {
    "http": {
      "client": {
        "event-loop-group": "other"
      }
    }
  }
}

錯誤響應

如果返回代碼為 400 或更高的 HTTP 響應,則會創(chuàng)建 HttpClientResponseException。異常包含原始響應。如何拋出異常取決于方法的返回類型。

對于阻塞客戶端,拋出異常并應由調用者捕獲和處理。對于反應式客戶端,異常作為錯誤通過發(fā)布者傳遞。

綁定錯誤

如果請求成功,您通常希望使用端點并綁定到 POJO,如果發(fā)生錯誤則綁定到不同的 POJO。以下示例顯示如何調用具有成功和錯誤類型的交換。

 Java Groovy  Kotlin 
@Controller("/books")
public class BooksController {

    @Get("/{isbn}")
    public HttpResponse find(String isbn) {
        if (isbn.equals("1680502395")) {
            Map<String, Object> m = new HashMap<>();
            m.put("status", 401);
            m.put("error", "Unauthorized");
            m.put("message", "No message available");
            m.put("path", "/books/" + isbn);
            return HttpResponse.status(HttpStatus.UNAUTHORIZED).body(m);
        }

        return HttpResponse.ok(new Book("1491950358", "Building Microservices"));
    }
}
@Controller("/books")
class BooksController {

    @Get("/{isbn}")
    HttpResponse find(String isbn) {
        if (isbn == "1680502395") {
            Map<String, Object> m = [
                    status : 401,
                    error  : "Unauthorized",
                    message: "No message available",
                    path   : "/books/" + isbn]
            return HttpResponse.status(HttpStatus.UNAUTHORIZED).body(m)
        }

        return HttpResponse.ok(new Book("1491950358", "Building Microservices"))
    }
}
@Controller("/books")
class BooksController {

    @Get("/{isbn}")
    fun find(isbn: String): HttpResponse<*> {
        if (isbn == "1680502395") {
            val m = mapOf(
                "status" to 401,
                "error" to "Unauthorized",
                "message" to "No message available",
                "path" to "/books/$isbn"
            )
            return HttpResponse.status<Any>(HttpStatus.UNAUTHORIZED).body(m)
        }

        return HttpResponse.ok(Book("1491950358", "Building Microservices"))
    }
}
 Java Groovy  Kotlin 
@Test
public void afterAnHttpClientExceptionTheResponseBodyCanBeBoundToAPOJO() {
    try {
        client.toBlocking().exchange(HttpRequest.GET("/books/1680502395"),
                Argument.of(Book.class), // (1)
                Argument.of(CustomError.class)); // (2)
    } catch (HttpClientResponseException e) {
        assertEquals(HttpStatus.UNAUTHORIZED, e.getResponse().getStatus());
        Optional<CustomError> jsonError = e.getResponse().getBody(CustomError.class);
        assertTrue(jsonError.isPresent());
        assertEquals(401, jsonError.get().status);
        assertEquals("Unauthorized", jsonError.get().error);
        assertEquals("No message available", jsonError.get().message);
        assertEquals("/books/1680502395", jsonError.get().path);
    }
}
def "after an HttpClientException the response body can be bound to a POJO"() {
    when:
    client.toBlocking().exchange(HttpRequest.GET("/books/1680502395"),
            Argument.of(Book), // (1)
            Argument.of(CustomError)) // (2)

    then:
    def e = thrown(HttpClientResponseException)
    e.response.status == HttpStatus.UNAUTHORIZED

    when:
    Optional<CustomError> jsonError = e.response.getBody(CustomError)

    then:
    jsonError.isPresent()
    jsonError.get().status == 401
    jsonError.get().error == 'Unauthorized'
    jsonError.get().message == 'No message available'
    jsonError.get().path == '/books/1680502395'
}
"after an httpclient exception the response body can be bound to a POJO" {
    try {
        client.toBlocking().exchange(HttpRequest.GET<Any>("/books/1680502395"),
                Argument.of(Book::class.java), // (1)
                Argument.of(CustomError::class.java)) // (2)
    } catch (e: HttpClientResponseException) {
        e.response.status shouldBe HttpStatus.UNAUTHORIZED
    }
}
  1. 成功類型
  2. 錯誤類型


以上內容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號