本教程向您展示如何使用Spring Boot和Kotlin构建简单的聊天应用程序。您将从语法的角度了解使用Kotlin进行服务器端开发的好处。

我们将从最小的应用程序实现开始,然后逐步进行开发,首先,该应用程序将生成并显示虚假消息,并使用经典的阻塞请求-响应模型将数据获取到UI。在本教程中,我们将通过添加持久性和扩展来发展应用程序,并迁移到非阻塞流样式以从后端向UI提供数据。

本教程包括5个部分:

  • 第1部分:初始设置和项目简介

  • 第2部分:添加持久性和集成测试

  • 第3部分:实现扩展

  • 第4部分:使用Kotlin Coroutines重构到Spring WebFlux

  • 第5部分:使用RSocket进行流传输

本教程是为Java开发人员设计的,他们已经动手使用Spring MVC / WebFlux,并希望了解如何在Spring中使用Kotlin。

第1部分:初始设置和项目简介

要开始研究本教程,我们需要IntelliJ IDEA的最新版本之一-从2018.1开始的任何版本。您可以在此处下载最新的免费社区版本。

该项目基于Spring Boot 2.4.0,需要Kotlin 1.4.10。确保已安装1.4+版本的Kotlin插件。要更新Kotlin插件,请使用Tools | Kotlin | Configure Kotlin Plugin Updates

下载项目

通过选择从IntelliJ IDEA复制存储库File | New | Project from Version Control

从vcs下载
从vcs github下载

克隆项目后,IntelliJ IDEA将自动导入并打开它。或者,您可以使用命令行克隆项目:

$ git clone https://github.com/kotlin-hands-on/kotlin-spring-chat

解决方案分支

请注意,该项目包括本教程每个部分的解决方案分支。您可以通过调用Branches操作来浏览IDE中的所有分支:

IntelliJ git分支

或者,您可以使用命令行:

git branch -a

可以使用Compare with branchIntelliJ IDEA中的命令将您的解决方案与建议的解决方案进行比较。

intellij git与分支比较

例如,这是initial分支和part-2分支之间的列表差异:

intellij git与分支差异比较

通过单击各个文件,您可以在行级别查看更改。

intellij git与分支文件比较

如果您对本教程的任何阶段的说明有任何疑问,这应该会对您有所帮助。

启动应用程序

main应用程序的方法位于ChatKotlinApplication.kt文件中。只需单击主要方法旁边的装订线图标或单击Alt+Enter快捷方式即可调用IntelliJ IDEA中的启动菜单:

intellij从主运行应用程序

或者,您可以./gradlew bootRun在终端中运行命令。

应用程序启动后,打开以下URL:http:// localhost:8080。您将看到带有消息集合的聊天页面。

聊天

在接下来的步骤中,我们将演示如何将我们的应用程序与真实的数据库集成以存储消息。

项目概况

让我们看一下一般的应用程序概述。在本教程中,我们将构建一个具有以下架构的简单聊天应用程序:

应用架构

我们的应用程序是一个普通的3层Web应用程序。面向客户的层由HtmlControllerMessagesResource类实现。该应用程序通过Thymeleaf模板引擎利用服务器端呈现,并由提供服务HtmlController。消息数据API由提供MessagesResource,该API连接到服务层。

服务层由表示MessagesService,它具有两种不同的实现:

  • FakeMessageService –第一个实现,它产生随机消息

  • PersistentMessageService-第二种实现,可用于实际数据存储。我们将在本教程的第2部分中添加此实现。

PersistentMessageService连接到一个数据库来存储信息。我们将使用H2数据库并通过Spring Data Repository API访问它。

下载项目源并在IDE中打开它们之后,您将看到以下结构,其中包括上述类。

项目树

在该main/kotlin文件夹下,有属于该应用程序的包和类。在该文件夹中,我们将添加更多类,并对现有代码进行更改以发展应用程序。

在该main/resources文件夹中,您将找到各种静态资源和配置文件。

test/kotlin文件夹包含测试。我们将根据主应用程序的更改对测试源进行相应的更改。

应用程序的入口点是ChatKotlinApplication.kt文件。这就是main方法所在。

HtmlController

HtmlController是带@Controller注释的端点,它将公开使用Thymeleaf模板引擎生成的HTML页面

import com.example.kotlin.chat.service.MessageService
import com.example.kotlin.chat.service.MessageVM
import org.springframework.stereotype.Controller
import org.springframework.ui.Model
import org.springframework.ui.set
import org.springframework.web.bind.annotation.GetMapping

@Controller
class HtmlController(val messageService: MessageService) {
   @GetMapping("/")
   fun index(model: Model): String {
       val messages = messageService.latest()

       model["messages"] = messages
       model["lastMessageId"] = messages.lastOrNull()?.id ?: ""
       return "chat"
   }
}

类型推断是您可以立即在Kotlin中发现的功能之一。这意味着可以省略代码中的某些类型的信息,以供编译器推断。

在上面的示例中,编译器通过查看函数的返回类型知道messages变量List<MessageVM>的类型messageService.latest()

Spring Web用户可能会注意到,即使它没有扩展此API,它Model在本示例中也被用作Map另一个Kotlin扩展程序使之成为可能,该扩展程序set操作员提供了重载。有关更多信息,请参见操作员重载文档。

???? 空安全是该语言最重要的功能之一。在上面的示例中,您可以看到此功能的应用程序:messages.lastOrNull()?.id ?: "".首先?.安全调用运算符,该运算符将检查lastOrNull()is的结果是否正确null,然后获取id。如果表达式的结果为null,则我们使用Elvis运算符提供默认值,在我们的示例中为默认值("")。

消息资源

我们需要一个API端点来服务轮询请求。该功能由MessageResource类实现,该类以JSON格式公开最新消息。

如果lastMessageId指定了query参数,则端点将在特定的message-id之后提供最新的消息,否则,它将提供所有可用的消息。

@RestController
@RequestMapping("/api/v1/messages")
class MessageResource(val messageService: MessageService) {

   @GetMapping
   fun latest(@RequestParam(value = "lastMessageId", defaultValue = "") lastMessageId: String): ResponseEntity<List<MessageVM>> {
       val messages = if (lastMessageId.isNotEmpty()) {
           messageService.after(lastMessageId)
       } else {
           messageService.latest()
       }

       return if (messages.isEmpty()) {
           with(ResponseEntity.noContent()) {
               header("lastMessageId", lastMessageId)
               build<List<MessageVM>>()
           }
       } else {
           with(ResponseEntity.ok()) {
               header("lastMessageId", messages.last().id)
               body(messages)
           }
       }
   }

   @PostMapping
   fun post(@RequestBody message: MessageVM) {
       messageService.post(message)
   }
}

在Kotlin中,if 是一个expression,它返回一个值。这就是为什么我们可以将if表达式的结果分配给变量的原因:val messages = if (lastMessageId.isNotEmpty()) { … }

???? Kotlin标准库包含范围函数,其唯一目的是在对象上下文内执行代码块。在上面的示例中,我们使用该with()函数来构建响应对象。

FakeMessageService

FakeMessageService是该MessageService接口的初始实现。它为我们的聊天提供虚假数据。我们使用Java Faker库来生成假数据。该服务使用莎士比亚,尤达和里克和莫蒂的著名报价生成随机消息:

@Service
class FakeMessageService : MessageService {

    val users: Map<String, UserVM> = mapOf(
        "Shakespeare" to UserVM("Shakespeare", URL("https://blog.12min.com/wp-content/uploads/2018/05/27d-William-Shakespeare.jpg")),
        "RickAndMorty" to UserVM("RickAndMorty", URL("http://thecircular.org/wp-content/uploads/2015/04/rick-and-morty-fb-pic1.jpg")),
        "Yoda" to UserVM("Yoda", URL("https://news.toyark.com/wp-content/uploads/sites/4/2019/03/SH-Figuarts-Yoda-001.jpg"))
    )

    val usersQuotes: Map<String, () -> String> = mapOf(
       "Shakespeare" to { Faker.instance().shakespeare().asYouLikeItQuote() },
       "RickAndMorty" to { Faker.instance().rickAndMorty().quote() },
       "Yoda" to { Faker.instance().yoda().quote() }
    )

    override fun latest(): List<MessageVM> {
        val count = Random.nextInt(1, 15)
        return (0..count).map {
            val user = users.values.random()
            val userQuote = usersQuotes.getValue(user.name).invoke()

            MessageVM(userQuote, user, Instant.now(),
                      Random.nextBytes(10).toString())
        }.toList()
    }

    override fun after(lastMessageId: String): List<MessageVM> {
        return latest()
    }

    override fun post(message: MessageVM) {
        TODO("Not yet implemented")
    }
}

???? Kotlin具有功能类型,我们经常以lambda表达式的形式使用它们。在上面的示例中,userQuotes是一个映射对象,其中的键是字符串,值是lambda表达式。类型签名() → String表示,lambda表达式不带参数,并产生String结果。因此,的类型userQuotes指定为Map<String, () → String>

???? 该mapOf函数可让您创建的地图。Pair`s, where the pair’s definition is provided with an extension method `<A, B> A.to(that: B): Pair<A, B>

???? 该TODO()函数扮演两个角色:提醒角色和stab角色,因为它总是会引发NotImplementedError异常。

FakeMessageService该类的主要任务是生成随机数量的虚假消息,以发送到聊天的UI。该latest()方法是实现此逻辑的地方。

val count = Random.nextInt(1, 15)
return (0..count).map {
    val user = users.values.random()
    val userQuote = usersQuotes.getValue(user.name).invoke()

    MessageVM(userQuote, user, Instant.now(), Random.nextBytes(10).toString())
  }.toList()

在Kotlin中,要生成整数范围,我们要做的就是say (0..count)。然后,我们应用一个map()函数将每个数字转换为一条消息。

值得注意的是,从任何集合中选择随机元素也非常简单。Kotlin提供了用于集合的扩展方法,称为random()。我们使用此扩展方法从列表中选择并返回用户:users.values.random()

选择用户后,我们需要从userQuotes地图上获取用户的报价。从中选择的值userQuotes实际上是我们必须调用以获取真实报价的lambda表达式:usersQuotes.getValue(user.name).invoke()

接下来,我们创建MessageVM该类的实例。这是用于将数据传递到客户端的视图模型:

data class MessageVM(val content: String, val user: UserVM, val sent: Instant, val id: String? = null)

????对于数据类时,编译器自动生成toStringequalshashCode功能,最小化的实用程序的代码量,你必须写。

第2部分:添加持久性和集成测试

在这一部分中,我们将MessageService使用Spring Data JDBC和H2作为数据库来实现接口的持久版本。我们将介绍以下类:

  • PersistentMessageServiceMessageService接口的实现,该接口将通过Spring Data Repository API与实际数据存储进行交互。

  • MessageRepository –由以下人员使用的存储库实现 MessageService.

添加新的依赖项

首先,我们必须将所需的依赖项添加到项目中。为此,我们需要dependencies在build.gradle.kts文件中的块中添加以下行:

implementation("org.springframework.boot:spring-boot-starter-data-jdbc")
runtimeOnly("com.h2database:h2")

Note️注意,在此示例中,我们将其spring-data-jdbc作为在Spring Framework中使用JDBC的轻巧直接方法。如果您希望看到使用JPA的示例,请参见以下博客文章

To️要刷新项目依赖项列表,请单击出现在编辑器右上角的小象图标。

Intellij Gradle重新加载

创建数据库架构和配置

添加并解决依赖关系后,我们就可以开始对数据库架构进行建模。由于这是一个演示项目,因此我们不会设计任何复杂的东西,我们将坚持以下结构:

CREATE TABLE IF NOT EXISTS messages (
  id                     VARCHAR(60)  DEFAULT RANDOM_UUID() PRIMARY KEY,
  content                VARCHAR      NOT NULL,
  content_type           VARCHAR(128) NOT NULL,
  sent                   TIMESTAMP    NOT NULL,
  username               VARCHAR(60)  NOT NULL,
  user_avatar_image_link VARCHAR(256) NOT NULL
);

Create️sqlsrc/main/resources目录中创建一个新文件夹。然后将上面的SQL代码放入src/main/resources/sql/schema.sql文件中。

模式sql位置

另外,您应该进行修改application.properties,使其包含以下属性:

spring.datasource.schema=classpath:sql/schema.sql
spring.datasource.url=jdbc:h2:file:./build/data/testdb
spring.datasource.driverClassName=org.h2.Driver
spring.datasource.username=sa
spring.datasource.password=password
spring.datasource.initialization-mode=always

处理数据

使用Spring Data,可以使用以下域类来表示上述表格,这些域类应放在src / main / kotlin / com / example / kotlin / chat / repository / DomainModel.kt文件中:

import org.springframework.data.annotation.Id
import org.springframework.data.relational.core.mapping.Table
import java.time.Instant

@Table("MESSAGES")
data class Message(
    val content: String,
    val contentType: ContentType,
    val sent: Instant,
    val username: String,
    val userAvatarImageLink: String,
    @Id var id: String? = null)

enum class ContentType {
    PLAIN
}

这里有些事情需要解释。领域,如contentsentid镜像MessageVM类。但是,为了减少表的数量并简化最终的关系结构,我们将User对象展平,并将其字段作为Message类的一部分。除此之外,还有一个名为的新额外字段contentType,用于指示所存储消息的内容类型。由于大多数现代聊天都支持不同的标记语言,因此通常支持不同的消息内容编码。最初,我们仅支持PLAIN文本,但后来,我们也将扩展ContentType以支持该MARKDOWN类型。

将表格表示形式归为一类后,我们可以通过引入对数据的便捷访问Repository

⌨️将MessageRepository.ktsrc/main/kotlin/com/example/kotlin/chat/repository的文件夹。

import org.springframework.data.jdbc.repository.query.Query
import org.springframework.data.repository.CrudRepository
import org.springframework.data.repository.query.Param

interface MessageRepository : CrudRepository<Message, String> {

    // language=SQL
    @Query("""
        SELECT * FROM (
            SELECT * FROM MESSAGES
            ORDER BY "SENT" DESC
            LIMIT 10
        ) ORDER BY "SENT"
    """)
    fun findLatest(): List<Message>

    // language=SQL
    @Query("""
        SELECT * FROM (
            SELECT * FROM MESSAGES
            WHERE SENT > (SELECT SENT FROM MESSAGES WHERE ID = :id)
            ORDER BY "SENT" DESC
        ) ORDER BY "SENT"
    """)
    fun findLatest(@Param("id") id: String): List<Message>
}

我们的方法MessageRepository扩展了普通CrudRepository方法,并提供了两种带有自定义查询的方法来检索最新消息和与特定消息ID关联的消息。

???? 您是否注意到用于以可读格式表示SQL查询的多行字符串?Kotlin为String提供了一组有用的添加。您可以在Kotlin语言文档中了解有关这些附加功能的更多信息

我们的下一步是实现与MessageService该类集成的MessageRepository类。

Put️将PersistentMessageService类放入src/main/kotlin/com/example/kotlin/chat/service文件夹,替换之前的FakeMessageService实现。

package com.example.kotlin.chat.service

import com.example.kotlin.chat.repository.ContentType
import com.example.kotlin.chat.repository.Message
import com.example.kotlin.chat.repository.MessageRepository
import org.springframework.context.annotation.Primary
import org.springframework.stereotype.Service
import java.net.URL

@Service
@Primary
class PersistentMessageService(val messageRepository: MessageRepository) : MessageService {

    override fun latest(): List<MessageVM> =
        messageRepository.findLatest()
            .map { with(it) { MessageVM(content, UserVM(username,
                              URL(userAvatarImageLink)), sent, id) } }

    override fun after(lastMessageId: String): List<MessageVM> =
        messageRepository.findLatest(lastMessageId)
            .map { with(it) { MessageVM(content, UserVM(username,
                              URL(userAvatarImageLink)), sent, id) } }

    override fun post(message: MessageVM) {
        messageRepository.save(
            with(message) { Message(content, ContentType.PLAIN, sent,
                         user.name, user.avatarImageLink.toString()) }
        )
    }
}

PersistentMessageService是的薄层MessageRepository,因为这里我们只是在做一些简单的对象映射。所有业务查询都在该Repository级别上进行。另一方面,此实现方式的简单性是Kotlin语言的优点,该语言提供了诸如map和的扩展功能with

如果现在启动该应用程序,我们将再次看到一个空白的聊天页面。但是,如果我们在文本输入中键入一条消息并发送,则稍后会看到它出现在屏幕上。如果我们打开一个新的浏览器页面,我们将在消息历史记录中再次看到此消息。

最后,我们可以编写一些集成测试以确保我们的代码将随着时间的推移继续正常工作。

添加集成测试

首先,我们必须修改其中的ChatKotlinApplicationTests文件/src/test并添加我们需要在测试中使用的字段:

import com.example.kotlin.chat.repository.ContentType
import com.example.kotlin.chat.repository.Message
import com.example.kotlin.chat.repository.MessageRepository
import com.example.kotlin.chat.service.MessageVM
import com.example.kotlin.chat.service.UserVM
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.boot.test.web.client.TestRestTemplate
import org.springframework.boot.test.web.client.postForEntity
import org.springframework.core.ParameterizedTypeReference
import org.springframework.http.HttpMethod
import org.springframework.http.RequestEntity
import java.net.URI
import java.net.URL
import java.time.Instant
import java.time.temporal.ChronoUnit.MILLIS

@SpringBootTest(
        webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
        properties = [
            "spring.datasource.url=jdbc:h2:mem:testdb"
        ]
)
class ChatKotlinApplicationTests {

    @Autowired
    lateinit var client: TestRestTemplate

    @Autowired
    lateinit var messageRepository: MessageRepository

    lateinit var lastMessageId: String

    val now: Instant = Instant.now()
}

我们使用lateinit关键字,该关键字在必须推迟非空字段的初始化的情况下非常有效。在我们的例子中,我们用它来@AutowireMessageRepository领域和决心TestRestTemplate

为简单起见,我们将测试三种常规情况:

  • 解决消息的时间lastMessageId不可用。

  • lastMessageId存在时解决消息。

  • 并发送消息。

为了测试消息的解决方案,我们必须准备一些测试消息,并在完成每种情况后清理存储。将以下内容添加到ChatKotlinApplicationTests

@BeforeEach
fun setUp() {
    val secondBeforeNow = now.minusSeconds(1)
    val twoSecondBeforeNow = now.minusSeconds(2)
    val savedMessages = messageRepository.saveAll(listOf(
            Message(
                    "*testMessage*",
                    ContentType.PLAIN,
                    twoSecondBeforeNow,
                    "test",
                    "http://test.com"
            ),
            Message(
                    "**testMessage2**",
                    ContentType.PLAIN,
                    secondBeforeNow,
                    "test1",
                    "http://test.com"
            ),
            Message(
                    "`testMessage3`",
                    ContentType.PLAIN,
                    now,
                    "test2",
                    "http://test.com"
            )
    ))
    lastMessageId = savedMessages.first().id ?: ""
}

@AfterEach
fun tearDown() {
    messageRepository.deleteAll()
}

准备工作完成后,我们可以创建我们的第一个测试用例以进行消息检索:

@ParameterizedTest
@ValueSource(booleans = [true, false])
fun `test that messages API returns latest messages`(withLastMessageId: Boolean) {
    val messages: List<MessageVM>? = client.exchange(
        RequestEntity<Any>(
            HttpMethod.GET,
            URI("/api/v1/messages?lastMessageId=${if (withLastMessageId) lastMessageId else ""}")
            ),
            object : ParameterizedTypeReference<List<MessageVM>>() {}).body

    if (!withLastMessageId) {
        assertThat(messages?.map { with(it) { copy(id = null, sent = sent.truncatedTo(MILLIS))}})
                .first()
                .isEqualTo(MessageVM(
                        "*testMessage*",
                        UserVM("test", URL("http://test.com")),
                        now.minusSeconds(2).truncatedTo(MILLIS)
                ))
    }

    assertThat(messages?.map { with(it) { copy(id = null, sent = sent.truncatedTo(MILLIS))}})
            .containsSubsequence(
                    MessageVM(
                            "**testMessage2**",
                            UserVM("test1", URL("http://test.com")),
                            now.minusSeconds(1).truncatedTo(MILLIS)
                    ),
                    MessageVM(
                            "`testMessage3`",
                            UserVM("test2", URL("http://test.com")),
                            now.truncatedTo(MILLIS)
                    )
            )
}

???? 所有数据类都有一个copy方法,该方法可让您在完全自定义实例的同时,在必要时自定义某些字段。在我们的案例中这非常有用,因为我们希望将消息发送时间截断为相同的时间单位,以便我们可以比较时间戳。

???? Kotlin对String模板的支持是测试的绝佳补充。

一旦实现了此测试,我们最后要实现的就是消息发布测试。将以下代码添加到ChatKotlinApplicationTests

@Test
fun `test that messages posted to the API is stored`() {
    client.postForEntity<Any>(
            URI("/api/v1/messages"),
            MessageVM(
                    "`HelloWorld`",
                    UserVM("test", URL("http://test.com")),
                    now.plusSeconds(1)
            )
    )

    messageRepository.findAll()
            .first { it.content.contains("HelloWorld") }
            .apply {
                assertThat(this.copy(id = null, sent = sent.truncatedTo(MILLIS)))
                        .isEqualTo(Message(
                                "`HelloWorld`",
                                ContentType.PLAIN,
                                now.plusSeconds(1).truncatedTo(MILLIS),
                                "test",
                                "http://test.com"
                        ))
            }
}

???? 在测试中,使用在反引号包含空格的函数名称是可以接受的。请参阅相关文档

除了我们检查发布的消息是否存储在数据库中之外,上面的测试与上一个测试相似。在此示例中,我们可以看到run范围函数,该函数可以在调用范围内使用目标对象this

一旦实现了所有这些测试,就可以运行它们并查看它们是否通过。

intellij运行测试

在此阶段,我们向聊天应用程序添加了消息持久性。现在可以将消息传递给连接到应用程序的所有活动客户端。此外,我们现在可以访问历史数据,因此每个人都可以阅读以前的消息(如果需要)。

该实现可能看起来很完整,但是我们编写的代码仍有一些改进的空间。因此,我们将在下一步中看到如何使用Kotlin扩展来改进我们的代码。

第3部分:实现扩展

在这一部分中,我们将实现扩展功能,以减少一些地方的代码重复次数。

例如,您可能会注意到Message<-→MessageVM转换当前明确地在中进行PersistableMessageService。我们可能还想通过添加对Markdown的支持来扩展对其他内容类型的支持。

首先,我们为Message和创建扩展方法MessageVM。新方法实现了从Messageto到MessageVM反之亦然的转换逻辑:

import com.example.kotlin.chat.repository.ContentType
import com.example.kotlin.chat.repository.Message
import com.example.kotlin.chat.service.MessageVM
import com.example.kotlin.chat.service.UserVM
import java.net.URL

fun MessageVM.asDomainObject(contentType: ContentType = ContentType.PLAIN): Message = Message(
        content,
        contentType,
        sent,
        user.name,
        user.avatarImageLink.toString(),
        id
)

fun Message.asViewModel(): MessageVM = MessageVM(
        content,
        UserVM(username, URL(userAvatarImageLink)),
        sent,
        id
)

⌨️我们将上述功能存储在src/main/kotlin/com/example/kotlin/chat/Extensions.kt文件中。

现在我们有了用于MessageVMMessage转换的扩展方法,我们可以在中使用它们PersistentMessageService

@Service
class PersistentMessageService(val messageRepository: MessageRepository) : MessageService {

    override fun latest(): List<MessageVM> =
            messageRepository.findLatest()
                    .map { it.asViewModel() }

    override fun after(lastMessageId: String): List<MessageVM> =
            messageRepository.findLatest(lastMessageId)
                    .map { it.asViewModel() }

    override fun post(message: MessageVM) {
        messageRepository.save(message.asDomainObject())
    }
}

上面的代码比以前更好。它更简洁,读起来更好。但是,我们可以进一步提高。如我们所见,map()`operators with the same function mapper twice. In fact, we can improve that by adding a custom `map对于List具有特定泛型类型的,我们使用相同的函数。将以下行添加到Extensions.kt文件中:

fun List<Message>.mapToViewModel(): List<MessageVM> = map { it.asViewModel() }

包括这一行,Kotlin将为List泛型类型与指定类型相对应的任何对象提供上述扩展方法:

@Service
class PersistentMessageService(val messageRepository: MessageRepository) : MessageService {

    override fun latest(): List<MessageVM> =
        messageRepository.findLatest()
            .mapToViewModel() // now we can use the mentioned extension on List<Message>

    override fun after(lastMessageId: String): List<MessageVM> =
        messageRepository.findLatest(lastMessageId)
            .mapToViewModel()
    //...
}

Note️请注意,对于具有不同泛型类型的同一类,您不能使用相同的扩展名。造成这种情况的原因是类型擦除,这意味着在运行时,两个类都将使用相同的方法,并且无法猜测应该调用哪个类。

一旦应用了所有扩展,我们就可以做一个类似的技巧,并声明支持的扩展以供在测试类中使用。将以下内容放入src/test/kotlin/com/example/kotlin/chat/TestExtensions.kt文件中

import com.example.kotlin.chat.repository.Message
import com.example.kotlin.chat.service.MessageVM
import java.time.temporal.ChronoUnit.MILLIS

fun MessageVM.prepareForTesting() = copy(id = null, sent = sent.truncatedTo(MILLIS))

fun Message.prepareForTesting() = copy(id = null, sent = sent.truncatedTo(MILLIS))

现在,我们可以继续前进并实现对MARKDOWN内容类型的支持。首先,我们需要添加用于Markdown内容呈现的实用程序。为此,我们可以将JetBrains的官方Markdown库添加到build.gradle.kts文件中:

dependencies {
   ...
   implementation("org.jetbrains:markdown:0.2.2")
   ...
}

由于我们已经学习了如何使用扩展名,因此让我们在Extensions.kt文件中为该ContentType枚举创建另一个扩展名,以便每个枚举值都将知道如何呈现特定的内容。

fun ContentType.render(content: String): String = when (this) {
    ContentType.PLAIN -> content
}

在上面的示例中,我们使用一个when表达式,该表达式在Kotlin中提供模式匹配。如果when将其用作表达式,则该else分支是必需的。但是,如果将when表达式与穷举值一起使用(例如enum,具有恒定数量的结果或sealed classes具有定义的子类数量),则else不需要分支。上面的示例正是其中一种情况,在这种情况下,我们在编译时就知道所有可能的结果(并且所有结果都已处理),因此我们不必指定else分支。

现在我们知道了when表达式的工作原理,最后让我们向ContentType枚举添加第二个选项:

enum class ContentType {
    PLAIN, MARKDOWN
}

when表达的力量带有穷举的强烈要求。只要有新的价值添加到enum,我们就必须先解决编译问题,然后才能将软件投入生产:

fun ContentType.render(content: String): String = when (this) {
    ContentType.PLAIN -> content
    ContentType.MARKDOWN -> {
        val flavour = CommonMarkFlavourDescriptor()
        HtmlGenerator(content, MarkdownParser(flavour).buildMarkdownTreeFromString(content),
           flavour).generateHtml()
    }
}

一旦修复了render方法以支持new ContentType,就可以修改MessageMessageVM扩展方法以启用MARKDOWN类型的使用并相应地呈现其内容:

fun MessageVM.asDomainObject(contentType: ContentType = ContentType.MARKDOWN): Message = Message(
        content,
        contentType,
        sent,
        user.name,
        user.avatarImageLink.toString(),
        id
)

fun Message.asViewModel(): MessageVM = MessageVM(
        contentType.render(content),
        UserVM(username, URL(userAvatarImageLink)),
        sent,
        id
)

我们还需要修改测试以确保MARKDOWN正确呈现内容类型。为此,我们必须更改ChatKotlinApplicationTests.kt并更改以下内容:

@BeforeEach
fun setUp() {
    //...
            Message(
                    "*testMessage*",
                    ContentType.PLAIN,
                    twoSecondBeforeNow,
                    "test",
                    "http://test.com"
            ),
            Message(
                    "**testMessage2**",
                    ContentType.MARKDOWN,
                    secondBeforeNow,
                    "test1",
                    "http://test.com"
            ),
            Message(
                    "`testMessage3`",
                    ContentType.MARKDOWN,
                    now,
                    "test2",
                    "http://test.com"
            )
   //...
}

@ParameterizedTest
@ValueSource(booleans = [true, false])
fun `test that messages API returns latest messages`(withLastMessageId: Boolean) {
    //...

    assertThat(messages?.map { it.prepareForTesting() })
            .containsSubsequence(
                    MessageVM(
                            "<body><p><strong>testMessage2</strong></p></body>",
                            UserVM("test1", URL("http://test.com")),
                            now.minusSeconds(1).truncatedTo(MILLIS)
                    ),
                    MessageVM(
                            "<body><p><code>testMessage3</code></p></body>",
                            UserVM("test2", URL("http://test.com")),
                            now.truncatedTo(MILLIS)
                    )
            )
}

@Test
fun `test that messages posted to the API are stored`() {
    //...
    messageRepository.findAll()
            .first { it.content.contains("HelloWorld") }
            .apply {
                assertThat(this.prepareForTesting())
                        .isEqualTo(Message(
                                "`HelloWorld`",
                                ContentType.MARKDOWN,
                                now.plusSeconds(1).truncatedTo(MILLIS),
                                "test",
                                "http://test.com"
                        ))
            }
}

完成此操作后,我们将看到所有测试仍在通过,并且MARKDOWN内容类型的消息将按预期呈现。

在这一步中,我们学习了如何使用扩展来提高代码质量。我们还了解了这种when表达方式以及在添加新的业务功能时如何减少人为错误。

第4部分:使用Kotlin Coroutines重构到Spring WebFlux

在本教程的这一部分中,我们将修改代码库以添加对协程的支持。

从本质上讲,协程是轻量级线程,可以以命令式方式表达异步代码。这解决了与上面用来实现相同效果的回调(观察者)模式相关的各种问题

⚠️在本教程中,我们不会对协程和标准的kotlinx.coroutines库太过仔细地研究。要了解有关协程及其功能的更多信息,请查看以下教程

添加协程

要开始使用Kotlin协程,我们必须向中添加三个附加库build.gradle.kts

dependencies {
    ...
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core")
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactive")
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")
    ...
}

添加依赖项后,就可以开始使用与协程相关的主要关键字:suspend。该suspend关键字指示被调用的函数是一个异步的。与其他语言中通过asyncorawait关键字公开相似概念的语言不同,该suspend函数必须在协程上下文中进行处理,协程上下文可以是另一个suspend函数,也可以是Job使用CoroutineScope.launchorrunBlocking函数创建的显式协程。

因此,这是将协程引入项目的第一步,我们会将suspend关键字添加到项目的所有控制器和服务方法中。例如,修改后,MessageService界面应如下所示:

interface MessageService {

    suspend fun latest(): List<MessageVM>

    suspend fun after(lastMessageId: String): List<MessageVM>

    suspend fun post(message: MessageVM)
}

上面的更改还将影响我们代码中MessageService使用的位置。PersistentMessageService必须通过添加suspend关键字来相应地更新其中的所有功能。

@Service
class PersistentMessageService(val messageRepository: MessageRepository) : MessageService {

   override suspend fun latest(): List<MessageVM> =
       messageRepository.findLatest()
           .mapToViewModel()

   override suspend fun after(messageId: String): List<MessageVM> =
       messageRepository.findLatest(messageId)
           .mapToViewModel()

   override suspend fun post(message: MessageVM) {
       messageRepository.save(message.asDomainObject())
   }
}

两个请求处理程序HtmlControllerMessageResource都必须进行调整:

// src/main/kotlin/com/example/kotlin/chat/controller/HtmlController.kt

@Controller
class HtmlController(val messageService: MessageService) {

   @GetMapping("/")
   suspend fun index(model: Model): String {
       //...
   }
}
// src/main/kotlin/com/example/kotlin/chat/controller/MessageResource.kt

@RestController
@RequestMapping("/api/v1/messages")
class MessageResource(val messageService: MessageService) {

   @GetMapping
   suspend fun latest(@RequestParam(value = "lastMessageId", defaultValue = "") lastMessageId: String): ResponseEntity<List<MessageVM>> {
       //...
   }

   @PostMapping
   suspend fun post(@RequestBody message: MessageVM) {
       //...
   }
}

我们已经准备好代码,以移植到响应式Spring堆栈Spring WebFlux。继续阅读!

添加WebFlux和R2DBC

尽管在大多数情况下添加org.jetbrains.kotlinx:kotlinx-coroutines-core依赖就足够了,但是要与Spring Framework进行适当的集成,我们需要替换Web和数据库模块:

dependencies {
    ...
    implementation("org.springframework.boot:spring-boot-starter-web")
    implementation("org.springframework.boot:spring-boot-starter-data-jdbc")
    ...
}

具有以下内容:

dependencies {
    ...
    implementation("org.springframework.boot:spring-boot-starter-webflux")
    implementation("org.springframework.boot:spring-boot-starter-data-r2dbc")
    implementation("io.r2dbc:r2dbc-h2")
    ...
}

通过添加上述依赖关系,我们将标准阻塞式Web MVC替换为完全反应性和非阻塞式WebFlux。另外,JDBC被完全反应性和非阻塞性的R2DBC代替

由于所有Spring Framework工程师的辛勤工作,从Spring Web MVC到Spring WebFlux的迁移是无缝的,而且我们根本不必重写任何内容!但是,对于R2DBC,我们还有一些额外的步骤。首先,我们需要添加一个配置类。

We️我们将此类放在应用程序方法com/example/kotlin/chat/ChatKotlinApplication.kt所在的文件main()中。

@Configuration
class Config {

    @Bean
    fun initializer(connectionFactory: ConnectionFactory): ConnectionFactoryInitializer {
        val initializer = ConnectionFactoryInitializer()
        initializer.setConnectionFactory(connectionFactory)
        val populator = CompositeDatabasePopulator()
        populator.addPopulators(ResourceDatabasePopulator(ClassPathResource("./sql/schema.sql")))
        initializer.setDatabasePopulator(populator)
        return initializer
    }
}

上面的配置可确保在应用程序启动时初始化表的架构。

接下来,我们需要修改其中的属性application.properties以仅包含一个属性:

spring.r2dbc.url=r2dbc:h2:file:///./build/data/testdb;USER=sa;PASSWORD=password

进行了一些与配置有关的基本更改之后,我们将执行从Spring Data JDBC到Spring Data R2DBC的迁移。为此,我们需要更新MessageRepository接口,以从关键字派生CoroutineCrudRepository并标记其方法suspend。我们这样做如下:

interface MessageRepository : CoroutineCrudRepository<Message, String> {

    // language=SQL
    @Query("""
        SELECT * FROM (
            SELECT * FROM MESSAGES
            ORDER BY "SENT" DESC
            LIMIT 10
        ) ORDER BY "SENT"
    """)
    suspend fun findLatest(): List<Message>

    // language=SQL
    @Query("""
        SELECT * FROM (
            SELECT * FROM MESSAGES
            WHERE SENT > (SELECT SENT FROM MESSAGES WHERE ID = :id)
            ORDER BY "SENT" DESC
        ) ORDER BY "SENT"
    """)
    suspend fun findLatest(@Param("id") id: String): List<Message>
}

设计所有方法时CoroutineCrudRepository都考虑到Kotlin协程。

⚠️请注意,@Query注释现在位于其他程序包中,因此应按以下方式导入:

import org.springframework.data.r2dbc.repository.Query

在此阶段,这些更改应足以使您的应用程序异步且无阻塞。重新运行应用程序后,从功能角度来看,什么都应该更改,但是现在执行将是异步的且是非阻塞的。

最后,我们还需要对我们的测试应用更多修复程序。由于我们MessageRepository现在是异步的,因此我们需要更改数据源URL并在协程上下文中运行所有相关操作,该协程上下文runBlocking如下所示(在ChatKotlinApplicationTests.kt文件中):

// ...
// new imports
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.runBlocking

@SpringBootTest(
        webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
        properties = [
            "spring.r2dbc.url=r2dbc:h2:mem:///testdb;USER=sa;PASSWORD=password"
        ]
)
class ChatKotlinApplicationTests {
    //...

    @BeforeEach
    fun setUp() {
       runBlocking {
        //...
       }
    }

    @AfterEach
    fun tearDown() {
       runBlocking {
        //...
       }
    }

    //...

    @Test
    fun `test that messages posted to the API is stored`() {
       runBlocking {
        //...
       }
    }
}

我们的应用程序现在是异步且非阻塞的。但是它仍然使用轮询将消息从后端传递到UI。在下一部分中,我们将修改应用程序以使用RSocket将消息流传输到所有连接的客户端。

第5部分:使用RSocket进行流传输

我们将使用RSocket将消息传递转换为类似流的方法。

RSocket是一种二进制协议,可用于字节流传输,例如TCP和WebSockets。为包括Kotlin在内的各种编程语言提供了该API 。但是,在我们的示例中,我们不需要直接使用API​​。相反,我们将使用Spring Messaging,它与RSocket集成并提供了一种方便的基于注释的配置方法。

要开始将RSocket与Spring一起使用,我们需要添加新的依赖项并将其导入build.gradle.kts

dependencies {
    ....
     implementation("org.springframework.boot:spring-boot-starter-rsocket")
    ....
}

接下来,我们将更新MessageRepository以返回通过Flow<Messages>而不是List公开的异步消息流。

interface MessageRepository : CoroutineCrudRepository<Message, String> {

    //...
    fun findLatest(): Flow<Message>

    //...
    fun findLatest(@Param("id") id: String): Flow<Message>
}

我们需要对MessageService接口进行类似的更改,以准备将其用于流式传输。我们不再需要suspend关键字。相反,我们将使用Flow表示异步数据流的接口。List结果产生a的任何函数现在都会产生a Flow。post方法也将接收Flow类型作为参数。

import kotlinx.coroutines.flow.Flow

interface MessageService {

   fun latest(): Flow<MessageVM>

   fun after(messageId: String): Flow<MessageVM>

   fun stream(): Flow<MessageVM>

   suspend fun post(messages: Flow<MessageVM>)
}

现在,我们可以将点连接起来并更新PersistentMessageService类,以整合上述更改。

import com.example.kotlin.chat.asDomainObject
import com.example.kotlin.chat.asRendered
import com.example.kotlin.chat.mapToViewModel
import com.example.kotlin.chat.repository.MessageRepository
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.collect
import org.springframework.stereotype.Service

@Service
class PersistentMessageService(val messageRepository: MessageRepository) : MessageService {

   val sender: MutableSharedFlow<MessageVM> = MutableSharedFlow()

   override fun latest(): Flow<MessageVM> =
       messageRepository.findLatest()
           .mapToViewModel()

   override fun after(messageId: String): Flow<MessageVM> =
       messageRepository.findLatest(messageId)
           .mapToViewModel()

   override fun stream(): Flow<MessageVM> = sender

   override suspend fun post(messages: Flow<MessageVM>) =
       messages
           .onEach { sender.emit(it.asRendered()) }
           .map {  it.asDomainObject() }
           .let { messageRepository.saveAll(it) }
           .collect()
}

首先,由于MessageService接口已更改,因此我们需要在相应的实现中更新方法签名。因此,现在需要该类型的mapToViewModel `extension method that we defined previously in the `Extension.kt文件。ListFlow

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map

fun Flow<Message>.mapToViewModel(): Flow<MessageVM> = map { it.asViewModel() }

为了提高可读性,我们还asRendered为MessageVM类添加了扩展功能。在Extensions.kt文件中:

fun MessageVM.asRendered(contentType: ContentType = ContentType.MARKDOWN): MessageVM =
   this.copy(content = contentType.render(this.content))

接下来,我们将使用MutableSharedFlowCoroutines API中的,将消息广播到连接的客户端。

通过更改,我们将更接近所需的UI。接下来,我们将更新MessageResourceHtmlController

MessageResource得到了一个全新的实现。首先,我们将通过应用@MessageMapping注释而不是来使用此类来支持消息传递@RequestMapping。新方法将send()receive(),映射到同一端点,以@MessageMapping("stream")进行双工通信。

@Controller
@MessageMapping("api.v1.messages")
class MessageResource(val messageService: MessageService) {

   @MessageMapping("stream")
   suspend fun receive(@Payload inboundMessages: Flow<MessageVM>) =
       messageService.post(inboundMessages)

   @MessageMapping("stream")
   fun send(): Flow<MessageVM> = messageService
       .stream()
       .onStart {
           emitAll(messageService.latest())
       }
}

要将消息发送到UI,我们stream从方法messageService实现的中打开,PersistentMessageService `class, and call the `onStart以开始流式传输事件。当新的客户端连接到该服务时,由于将onStart作为参数提供给该方法的代码块,它将首先从历史记录中接收消息emitAll(messageService.latest())。然后,频道保持打开状态以流式传输新消息。

HtmlController级不再需要办理任何流逻辑。现在,其目的是为静态页面提供服务,因此实现变得微不足道:

@Controller
class HtmlController() {

   @GetMapping("/")
   fun index(): String {
       // implemented in src/main/resources/templates/chatrs.html
       return "chatrs"
   }
}

请注意,UI模板现在chatrs.htmlchat.html。新模板包括JavaScript代码,该代码配置WebSocket连接并直接与该类api.v1.messages.stream实现的端点进行交互MessageResource

我们需要对application.properties文件进行最后更改,以使RSocket正常工作。将以下属性添加到配置中:

spring.rsocket.server.transport=websocket
spring.rsocket.server.mapping-path=/rsocket

该应用程序已准备好启动!现在,借助RSocket,消息无需进行轮询即可传递到聊天UI。此外,由于使用了Spring WebFlux和Kotlin Coroutines,该应用程序的后端是完全异步且无阻塞的。

本教程对我们而言的最后一步是更新测试。

我们将添加一个专门用于测试的依赖项。涡轮机是一个小型测试库。通过为Flowkotlinx.coroutines的接口提供一些有用的扩展,它简化了测试。

dependencies {
    ...
    testImplementation("app.cash.turbine:turbine:0.4.1")
    ...
}

库的入口点是的test()扩展Flow<T>,它接受实现验证逻辑的代码块。该test()扩展程序是一个挂起函数,在流程完成或取消之前不会返回。我们将在稍后讨论它的应用。

接下来,更新测试依赖项。代替通过字段自动装配,我们将使用构造函数来注入依赖项。

class ChatKotlinApplicationTests {

   @Autowired
   lateinit var client: TestRestTemplate

   @Autowired
   lateinit var messageRepository: MessageRepository

class ChatKotlinApplicationTests(
   @Autowired val rsocketBuilder: RSocketRequester.Builder,
   @Autowired val messageRepository: MessageRepository,
   @LocalServerPort val serverPort: Int
) {

我们使用RSocketRequest.Builder而不是TestRestTemplate因为MessageResource通过RSocket协议的对话实现的端点。在测试中,我们需要构造一个实例RSocketRequester并使用它发出请求。将旧测试替换为下面的新代码:

@ExperimentalTime
@ExperimentalCoroutinesApi
@Test
fun `test that messages API streams latest messages`() {
   runBlocking {
       val rSocketRequester =
            rsocketBuilder.websocket(URI("ws://localhost:${serverPort}/rsocket"))

       rSocketRequester
           .route("api.v1.messages.stream")
           .retrieveFlow<MessageVM>()
           .test {
               assertThat(expectItem().prepareForTesting())
                   .isEqualTo(
                       MessageVM(
                           "*testMessage*",
                           UserVM("test", URL("http://test.com")),
                           now.minusSeconds(2).truncatedTo(MILLIS)
                       )
                   )

               assertThat(expectItem().prepareForTesting())
                   .isEqualTo(
                       MessageVM(
                           "<body><p><strong>testMessage2</strong></p></body>",
                           UserVM("test1", URL("http://test.com")),
                           now.minusSeconds(1).truncatedTo(MILLIS)
                       )
                   )
               assertThat(expectItem().prepareForTesting())
                   .isEqualTo(
                       MessageVM(
                           "<body><p><code>testMessage3</code></p></body>",
                           UserVM("test2", URL("http://test.com")),
                           now.truncatedTo(MILLIS)
                       )
                   )

               expectNoEvents()

               launch {
                   rSocketRequester.route("api.v1.messages.stream")
                       .dataWithType(flow {
                           emit(
                               MessageVM(
                                   "`HelloWorld`",
                                   UserVM("test", URL("http://test.com")),
                                   now.plusSeconds(1)
                               )
                           )
                       })
                       .retrieveFlow<Void>()
                       .collect()
               }

               assertThat(expectItem().prepareForTesting())
                   .isEqualTo(
                       MessageVM(
                           "<body><p><code>HelloWorld</code></p></body>",
                           UserVM("test", URL("http://test.com")),
                           now.plusSeconds(1).truncatedTo(MILLIS)
                       )
                   )

               cancelAndIgnoreRemainingEvents()
           }
   }
}

@ExperimentalTime
@Test
fun `test that messages streamed to the API is stored`() {
   runBlocking {
       launch {
           val rSocketRequester =
                rsocketBuilder.websocket(URI("ws://localhost:${serverPort}/rsocket"))

           rSocketRequester.route("api.v1.messages.stream")
               .dataWithType(flow {
                   emit(
                       MessageVM(
                           "`HelloWorld`",
                           UserVM("test", URL("http://test.com")),
                           now.plusSeconds(1)
                       )
                   )
               })
               .retrieveFlow<Void>()
               .collect()
       }

       delay(2.seconds)

       messageRepository.findAll()
           .first { it.content.contains("HelloWorld") }
           .apply {
               assertThat(this.prepareForTesting())
                   .isEqualTo(
                       Message(
                           "`HelloWorld`",
                           ContentType.MARKDOWN,
                           now.plusSeconds(1).truncatedTo(MILLIS),
                           "test",
                           "http://test.com"
                       )
                   )
           }
   }
}

概括

这是本教程的最后一部分。我们从一个简单的聊天应用程序开始,该应用程序在运行数据库查询时,在后端阻塞时,UI轮询新消息。我们逐渐向应用程序中添加了功能,并将其迁移到了响应式Spring堆栈。后端现在是完全异步的,利用了Spring WebFlux和Kotlin协程。