嘘~ 正在从服务器偷取页面 . . .

个人项目开发日志


1. MinIO

对于 OSS 存储,最常用的就是阿里云存储,但是我们也可以使用免费的 MinIO 在服务器上搭建自己的 OSS 存储服务。

这里是 Java SpringBoot 中所需要的 MinIO 的依赖:

<dependency>
    <groupId>io.minio</groupId>
    <artifactId>minio</artifactId>
    <version>8.2.1</version>
</dependency>

1.1 配置单个容器

因为中文文档过老的缘故,建议查看英文最新文档。这里我们使用 Docker 搭建单个 MinIO 的容器:

docker run \
  -p 9000:9000 \
  -p 9001:9001 \
  --name minio1 \
  -e "MINIO_ROOT_USER=username" \
  -e "MINIO_ROOT_PASSWORD=password" \
  -v ~/minio/data:/data \
  minio/minio server /data --console-address ":9001"

1.2 SpringBoot 整合 MinIO

SpringBoot yaml 配置如下:

minio:
  endpoint: http://localhost:9000
  accessKey: root
  secretKey: sast_forever_minio
  bucketName: exam

将 yaml 配置的属性绑定到实体类中,将各个名称对应:

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@Data
@Component
//绑定 yaml 配置文档的属性
@ConfigurationProperties(prefix = "minio")
public class MinioYamlConfig {
    private String endpoint;

    private String accessKey;

    private String secretKey;

    private String bucketName;
}

配置好之后,我们就可以自动创建 MinIO 客户端的连接:

import io.minio.MinioClient;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;

@Configuration
// 指定组件生效,并表明所在类
@EnableConfigurationProperties(MinioYamlConfig.class)
public class MinioConfig {
    
    // 使用组件扫描注入,不需要再配置
    @Resource
    private MinioYamlConfig minioYamlConfig;

    // 创建 minio 客户端连接
    @Bean
    public MinioClient minioClient() {
        return MinioClient.builder()
                .endpoint(minioYamlConfig.getEndpoint())
                .credentials(minioYamlConfig.getAccessKey(), minioYamlConfig.getSecretKey())
                .build();
    }
    
}

之后我们确认 MinIO 的工具类:

import com.baomidou.mybatisplus.core.toolkit.Constants;
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.sast.jwt.exception.LocalRuntimeException;
import io.minio.*;
import io.minio.errors.*;
import io.minio.http.Method;
import lombok.extern.slf4j.Slf4j;
import org.apache.tomcat.util.http.fileupload.IOUtils;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.web.multipart.MultipartFile;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

@Slf4j
@Component
public class MinioUtil {

    private final MinioClient minioClient;

    public MinioUtil(MinioClient minioClient) {
        this.minioClient = minioClient;
    }

    //判断是否存在对应的桶,如果没有就创建一个
    public void existBucket(String name) {
        try {
            boolean exists = minioClient.bucketExists(BucketExistsArgs.builder().bucket(name).build());
            if (!exists) {
                minioClient.makeBucket(MakeBucketArgs.builder().bucket(name).build());
            }
        } catch (Exception e) {
            throw new LocalRuntimeException("创建桶失败");
        }
    }

    //上传文件,返回文件名(文件为数组的形式)
    public List<String> upload(MultipartFile[] multipartFile, String bucketName) {
        List<String> names = new ArrayList<>(multipartFile.length);
        for (MultipartFile file : multipartFile) {
            String fileName = file.getOriginalFilename();
            //使用 hutools 的判断,文件名为空或者 null 都会抛出异常
            if (StringUtils.isEmpty(fileName)) {
                throw new LocalRuntimeException("文件名为空");
            }
            String[] split = fileName.split("\\.");
            if (split.length > 1) {
                fileName = System.currentTimeMillis() + "_" + split[0] + "." + split[split.length - 1];
            } else {
                fileName = System.currentTimeMillis() + fileName;
            }
            InputStream in = null;
            try {
                in = file.getInputStream();
                minioClient.putObject(PutObjectArgs.builder()
                        .bucket(bucketName)
                        .object(fileName)
                        //在 partSize 填入-1时表示大小不确定
                        .stream(in, in.available(), -1)
                        .contentType(file.getContentType())
                        .build()
                );
            } catch (Exception e) {
                log.error(e.getMessage());
            } finally {
                if (in != null) {
                    try {
                        in.close();
                    } catch (IOException e) {
                        log.error(e.getMessage());
                    }
                }
            }
            names.add(fileName);
        }
        return names;
    }


    public ResponseEntity<byte[]> download(String fileName, String bucketName) {
        ResponseEntity<byte[]> responseEntity = null;
        InputStream in = null;
        ByteArrayOutputStream out = null;
        try {
            in = minioClient.getObject(GetObjectArgs.builder()
                    .bucket(bucketName).object(fileName).build());
            out = new ByteArrayOutputStream();
            IOUtils.copy(in, out);
            //封装返回值
            byte[] bytes = out.toByteArray();
            HttpHeaders headers = new HttpHeaders();
            try {
                headers.add("Content-Disposition",
                        "attachment;filename=" + URLEncoder.encode(fileName, Constants.UTF_8));
            } catch (UnsupportedEncodingException e) {
                log.error(e.getMessage());
            }
            headers.setContentLength(bytes.length);
            headers.setContentType(MediaType.APPLICATION_OCTET_STREAM);
            headers.setAccessControlExposeHeaders(Collections.singletonList("*"));
            responseEntity = new ResponseEntity<>(bytes, headers, HttpStatus.OK);
        } catch (Exception e) {
            log.error(e.getMessage());
        } finally {
            try {
                if (in != null) {
                    try {
                        in.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                if (out != null) {
                    out.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return responseEntity;
    }

    //删除对应桶的对应文件
    public void removeObject(String bucketName, String objectName) {
        try {
            RemoveObjectArgs objectArgs = RemoveObjectArgs.builder().object(objectName)
                    .bucket(bucketName).build();
            minioClient.removeObject(objectArgs);
        } catch (Exception e) {
            throw new LocalRuntimeException("文件删除失败");
        }
    }

    //获取文件外链
    public String getObjectURL(String bucketName, String objectName, Integer expires) {
        try {
            GetPresignedObjectUrlArgs objectArgs = GetPresignedObjectUrlArgs.builder().object(objectName)
                    .bucket(bucketName)
                    .expiry(expires).build();
            String url = minioClient.getPresignedObjectUrl(objectArgs);
            return URLDecoder.decode(url, "UTF-8");
        } catch (Exception e) {
            log.error("文件路径获取失败" + e.getMessage());
        }
        return null;
    }

    //获取文件流
    public InputStream getMinioFile(String bucketName,String objectName){
        InputStream inputStream = null;
        try {
            GetObjectArgs objectArgs = GetObjectArgs.builder().object(objectName)
                    .bucket(bucketName).build();
            inputStream = minioClient.getObject(objectArgs);
        } catch (Exception e) {
            log.error("文件获取失败" + e.getMessage());
        }
        return inputStream;
    }

    /**
     * 获取某一个存储对象的下载链接
     *
     * @param bucketName 桶名
     * @param method     方法类型
     * @param objectName 对象名
     * @return url 下载链接
     * @throws ServerException 服务异常
     */
    public String getObjectUrl(String bucketName, Method method, String objectName)
            throws ServerException, InsufficientDataException,
            ErrorResponseException, IOException, NoSuchAlgorithmException,
            InvalidKeyException, InvalidResponseException, XmlParserException, InternalException {
        return minioClient.getPresignedObjectUrl(GetPresignedObjectUrlArgs.builder()
                .method(method)
                .bucket(bucketName)
                .object(objectName).build());
    }

}

1.3 使用 Compose 配置分布式 MinIO

我们也可以使用官方自带的 docker-compose.yaml 和 nginx.conf 搭建 MinIO 环境:

version: '3.7'

# Settings and configurations that are common for all containers
x-minio-common: &minio-common
  image: quay.io/minio/minio:RELEASE.2022-04-01T03-41-39Z
  command: server --console-address ":9001" http://minio{1...4}/data{1...2}
  expose:
    - "9000"
    - "9001"
  environment:
    MINIO_ROOT_USER: root
    MINIO_ROOT_PASSWORD: sast_forever
    #如果不指定,用户名和密码都是 minioadmin
    #需要注意的是,这里的用户名至少为3位,密码至少为8位
  healthcheck:
    test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
    interval: 30s
    timeout: 20s
    retries: 3

# starts 4 docker containers running minio server instances.
# using nginx reverse proxy, load balancing, you can access
# it through port 9000.
services:
  minio1:
    <<: *minio-common
    hostname: minio1
    volumes:
      - data1-1:/data1
      - data1-2:/data2

  minio2:
    <<: *minio-common
    hostname: minio2
    volumes:
      - data2-1:/data1
      - data2-2:/data2

  minio3:
    <<: *minio-common
    hostname: minio3
    volumes:
      - data3-1:/data1
      - data3-2:/data2

  minio4:
    <<: *minio-common
    hostname: minio4
    volumes:
      - data4-1:/data1
      - data4-2:/data2

  nginx:
    image: nginx:1.19.2-alpine
    hostname: nginx
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf:ro
    ports:
      - "9000:9000"
      - "9001:9001"
    depends_on:
      - minio1
      - minio2
      - minio3
      - minio4

## By default this config uses default local driver,
## For custom volumes replace with volume driver configuration.
volumes:
  data1-1:
  data1-2:
  data2-1:
  data2-2:
  data3-1:
  data3-2:
  data4-1:
  data4-2:

nginx.conf 如下:

user  nginx;
worker_processes  auto;

error_log  /var/log/nginx/error.log warn;
pid        /var/run/nginx.pid;

events {
    worker_connections  4096;
}

http {
    include       /etc/nginx/mime.types;
    default_type  application/octet-stream;

    log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
                      '$status $body_bytes_sent "$http_referer" '
                      '"$http_user_agent" "$http_x_forwarded_for"';

    access_log  /var/log/nginx/access.log  main;
    sendfile        on;
    keepalive_timeout  65;

    # include /etc/nginx/conf.d/*.conf;

    upstream minio {
        server minio1:9000;
        server minio2:9000;
        server minio3:9000;
        server minio4:9000;
    }

    upstream console {
        ip_hash;
        server minio1:9001;
        server minio2:9001;
        server minio3:9001;
        server minio4:9001;
    }

    server {
        listen       9000;
        listen  [::]:9000;
        server_name  localhost;

        # To allow special characters in headers
        ignore_invalid_headers off;
        # Allow any size file to be uploaded.
        # Set to a value such as 1000m; to restrict file size to a specific value
        client_max_body_size 0;
        # To disable buffering
        proxy_buffering off;
        proxy_request_buffering off;

        location / {
            proxy_set_header Host $http_host;
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
            proxy_set_header X-Forwarded-Proto $scheme;

            proxy_connect_timeout 300;
            # Default is HTTP/1, keepalive is only enabled in HTTP/1.1
            proxy_http_version 1.1;
            proxy_set_header Connection "";
            chunked_transfer_encoding off;

            proxy_pass http://minio;
        }
    }

    server {
        listen       9001;
        listen  [::]:9001;
        server_name  localhost;

        # To allow special characters in headers
        ignore_invalid_headers off;
        # Allow any size file to be uploaded.
        # Set to a value such as 1000m; to restrict file size to a specific value
        client_max_body_size 0;
        # To disable buffering
        proxy_buffering off;
        proxy_request_buffering off;

        location / {
            proxy_set_header Host $http_host;
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
            proxy_set_header X-Forwarded-Proto $scheme;
            proxy_set_header X-NginX-Proxy true;

            # This is necessary to pass the correct IP to be hashed
            real_ip_header X-Real-IP;

            proxy_connect_timeout 300;
            
            # To support websocket
            proxy_http_version 1.1;
            proxy_set_header Upgrade $http_upgrade;
            proxy_set_header Connection "upgrade";
            
            chunked_transfer_encoding off;

            proxy_pass http://console;
        }
    }
}

1.4 问题

1.4.1 MinIO 无法启动

在 docker-compose up -d 的时候,MinIO 一直无法正常在后台启动。

在不使用后台命令的情况下发现,原来是密码的设置没有达到至少8个字符(后台启动没法看到进入容器内部的报错)。

1.4.2 新版本 MinIO 冲突问题

点开 MinIO 的pom.xml 查看 okhttp3 声明版本是 4.8.1,所以是 SpringBoot 自带的 okhttp 与 MinIO 中不同产生冲突,我们需要手动覆盖。

在父工程 pom.xml 里修改 properties 的版本号。

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.6.5</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

<properties>
    <youlai.version>2.0.0</youlai.version>
    <!-- 覆盖SpringBoot中okhttp3的旧版本声明,解决MinIO 8.4.x的依赖冲突 -->
    <okhttp3.version>4.8.1 </okhttp3.version>
</properties>

2. RabbitMQ

对于 RabbitMQ 通过 Docker 进行安装,采取最新的镜像。

docker run \
-d --name mrabbitmq \
-p 5672:5672 -p 15672:15672 \
-v rabbitmqData:/var/lib/rabbitmq \
-e RABBITMQ_DEFAULT_USER=username \
-e RABBITMQ_DEFAULT_PASS=pas \
rabbitmq:latest
#左边为 api 端口,右边为 gui 端口

docker run \
 -e RABBITMQ_DEFAULT_USER=itcast \
 -e RABBITMQ_DEFAULT_PASS=123321 \
 --name mq \
 --hostname mq1 \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
rabbitmq:3.10.7-management-alpine
# 带有管理插件

2.1 解决页面问题

直接启动 RabbitMQ 容器的时候,页面会无法打开。本质在于拉取镜像时没有选择 management tag 的镜像,在访问管理页面时缺少对应内容。除此之外,还会存在无法创建死信队列的问题。

如果查阅 RabbitMQ 官网,会发现我们缺少一个后台管理的插件,需要在 Docker 容器中执行 rabbitmq-plugins enable rabbitmq_management 命令执行这个插件。这样就能够使用 ip + 15672 访问后端管理页面。

在搭建 rabbitmq 时,最好选择带 management 的 docker 镜像。

2.2 简单示例

简单来说,rabbitmq 的执行流程:

publisher 发布信息 -> rabbitmq 接受 -> consumer 拿到信息。

publisher 具体操作:

public class PublisherTest {
    @Test
    public void testSendMessage() throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("cxy621");
        factory.setPassword("123456");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.发送消息
        String message = "hello, rabbitmq!";
        channel.basicPublish("", queueName, null, message.getBytes());
        System.out.println("发送消息成功:【" + message + "】");

        // 5.关闭通道和连接
        channel.close();
        connection.close();
    }
}

consumer 具体流程:

public class ConsumerTest {

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("cxy621");
        factory.setPassword("123456");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.订阅消息
        channel.basicConsume(queueName, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 5.处理消息
                String message = new String(body);
                System.out.println("接收到消息:【" + message + "】");
            }
        });
        System.out.println("等待接收消息。。。。");
    }
}

基本消息队列的消息发送流程:

  1. 建立 connection;

  2. 创建 channel;

  3. 利用 channel 声明队列;

  4. 利用 channel 向队列发送消息。

基本消息队列的消息接收流程:

  1. 建立 connection;

  2. 创建 channel;

  3. 利用 channel 声明队列;

  4. 定义 consumer 的消费行为 handleDelivery();

  5. 利用 channel 将消费者与队列绑定。

2.3 SpringAMQP

SpringAMQP 是基于 RabbitMQ 封装的一套模板,并且还利用 SpringBoot 对其实现了自动装配。

SpringAMQP 提供了三个功能:

  • 自动声明队列、交换机及其绑定关系;
  • 基于注解的监听器模式,异步接收消息;
  • 封装了 RabbitTemplate 工具,用于发送消息。

引入依赖:

<!--AMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置接受/发送端 rabbitmq 连接信息:

spring:
  rabbitmq:
    host: localhost # rabbitMQ的ip地址
    port: 5672 # 端口
    username: cxy621
    password: 123456
    virtual-host: /

2.3.1 Basic Queue 简单队列模型

@RunWith(SpringRunner.class)
// 通过 @RunWith 指定运行测试环境
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendMessage2SimpleQueue() {
        String queueName = "simple.queue";
        String message = "hello, spring amqp!";
        rabbitTemplate.convertAndSend(queueName, message);
    }
}

在 IDEA 中,会自动识别出对应的 JUnit 环境(相当于就是一个自识别的RUNWITH环境配置),而在其他 IDE 中无法不能,会出现 NPE 问题。

基于 JUnit 4 的测试类

在 Consumer 对应包中创建一个用于接受的类:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class SpringRabbitListener {

    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(String msg) throws InterruptedException {
        System.out.println("spring 消费者接收到消息:【" + msg + "】");
    }
}

只配置 @RabbitListener 是没法自动创建队列的,之后会需要自己配置 Config。

2.3.2 WorkQueue

Work queues,也被称为(Task queues),任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息,防止消息堆积。

一个 queue 绑定多个 consumer

通过 for 循环,模拟高并发导致的消息堆积情况:

@Test
public void testWorkQueue() throws InterruptedException {
    // 队列名称
    String queueName = "simple.queue";
    // 消息
    String message = "hello, message_";
    for (int i = 0; i < 50; i++) {
        // 发送消息
        rabbitTemplate.convertAndSend(queueName, message + i);
        Thread.sleep(20);
    }
}

在 consumer 服务中添加两个 Listener 完成多消息接受:

@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
    System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(20);
}

@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
    System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(200);
}

启动 ConsumerApplication 后,在执行 publisher 服务中刚刚编写的发送测试方法 testWorkQueue。可以看到消费者1很快完成了自己的 25条消息。消费者2却在缓慢的处理自己的 25条消息。

也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。这样显然是有问题的。

在spring中有一个简单的配置,可以解决这个问题。我们修改consumer服务的application.yml文件,添加配置:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
# 这样,处理快的 Consumer 就能获得更多的 message

2.3.3 发布/订阅

发布/订阅 模型

可以看到,在订阅模型中,多了一个 exchange 角色,而且过程略有变化:

  • Publisher:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机);
  • Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型:
    • Fanout:广播,将消息交给所有绑定到交换机的队列;
    • Direct:定向,把消息交给符合指定routing key 的队列;
    • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列。
  • Consumer:消费者,与以前一样,订阅队列,没有变化;
  • Queue:消息队列也与以前一样,接收消息、缓存消息。

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

Fanout

Fanout 工作模式

在广播模式下,消息发送流程是这样的:

  • 可以有多个队列;
  • 每个队列都要绑定到 Exchange(交换机);
  • 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定;
  • 交换机把消息发送给绑定过的所有队列;
  • 订阅队列的消费者都能拿到消息。

配置 exchange:

package cn.itcast.mq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfig {
    // itcast.fanout
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("itcast.fanout");
    }

    // fanout.queue1
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }

    // 绑定队列1到交换机
    @Bean
    public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder
                .bind(fanoutQueue1)
                .to(fanoutExchange);
    }

    // fanout.queue2
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }

    // 绑定队列2到交换机
    @Bean
    public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder
                .bind(fanoutQueue2)
                .to(fanoutExchange);
    }

    @Bean
    public Queue objectQueue(){
        return new Queue("object.queue");
    }
}

发送消息,对于所有绑定的 queue 都能接收到:

@Test
public void testFanoutExchange() {
    // 队列名称
    String exchangeName = "itcast.fanout";
    // 消息
    String message = "hello, everyone!";
    rabbitTemplate.convertAndSend(exchangeName, "", message);
}

消息接受,配置两个 queue:

@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
    System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}

@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
    System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}

Direct

在 Fanout 模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到 Direct 类型的 Exchange。

通过对应 key 发送对应消息到 queue 中

在 Direct 模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key);
  • 消息的发送方在 向 Exchange 发送消息时,也必须指定消息的 RoutingKey
  • Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的 Routingkey 与消息的 Routing key 完全一致,才会接收到消息。

除了常见的 @Bean 方式声明,Spring 提供了注解方式:

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue1"),
    exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "blue"}
))
// 在声明 key 之后,可以不用指定 DIRECT type
public void listenDirectQueue1(String msg){
    System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue2"),
    exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
    System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
}

在发送消息中的方法中指定参数 routingKey 就能将对应消息发送到 queue 中。

Topic

Topic 类型的 ExchangeDirect 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型 Exchange 可以让队列在绑定 Routing key 的时候使用通配符!

Routingkey 一般都是有一个或多个单词组成,多个单词之间以“.”分割,例如:item.insert

#:匹配一个/多个词;*:匹配1个词。

举例:

item.#:能够匹配 item.spu.insert 或者 item.spu

item.*:只能匹配 item.spu

发送对应内容消息:

@Test
public void testSendTopicExchange() {
    // 交换机名称
    String exchangeName = "itcast.topic";
    // 消息
    String message = "今天天气不错,我的心情好极了!";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "china.weather", message);
}

在consumer服务的SpringRabbitListener中添加方法:

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue1"),
    exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
    key = "china.#"
))
// china. 开头的 key 都能接收
public void listenTopicQueue1(String msg){
    System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue2"),
    exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
    key = "#.news"
))
// .news 结尾的 key 都能接收
public void listenTopicQueue2(String msg){
    System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
}

3. SpringCloud

针对多模块开发,各种功能已经不会在一个项目中进行管理。此时,需要一个完整可行的 RPC 方式,来对各种模块的功能/健康进行统一管理。

  • 单体架构:简单方便,高度耦合,扩展性差,适合小型项目。例如:学生管理系统;

  • 分布式架构:松耦合,扩展性好,但架构复杂,难度大。适合大型互联网项目,例如:京东、淘宝;

  • 微服务:一种良好的分布式架构方案;

    • 优点:拆分粒度更小、服务更独立、耦合度更低;

    • 缺点:架构非常复杂,运维、监控、部署难度提高。

  • SpringCloud是微服务架构的一站式解决方案,集成了各种优秀微服务功能组件。

3.1 RestTemplate

针对外面暴露的接口 api,SpringBoot Web 已经整合 RestTemplate 来调用外部的接口。

// 可以在配置文件中自定义 config
@Configuration
public class RestTemplateConfig {
 
    @Bean
    public RestTemplate restTemplate(ClientHttpRequestFactory factory){
        return new RestTemplate(factory);
    }
 
    @Bean
    public ClientHttpRequestFactory simpleClientHttpRequestFactory(){
        SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
        // 自定义超时时间
        factory.setConnectTimeout(15000);
        factory.setReadTimeout(5000);
        return factory;
    }
}

// 最简单可以在启动类中直接添加 Bean
@Bean
public RestTemplate restTemplate() {
    return new RestTemplate();
}

RestTemplate 的大部分方法主要针对于 HTTP 请求。

  • delete() 在特定的 URL 上对资源执行 HTTP DELETE 操作;
  • exchange() 在 URL 上执行特定的 HTTP 方法,返回包含对象的 ResponseEntity,这个对象是从响应体中映射得到的;
  • execute() 在 URL 上执行特定的 HTTP 方法,返回一个从响应体映射得到的对象;
  • getForEntity() 发送一个 HTTP GET 请求,返回的 ResponseEntity 包含了响应体所映射成的对象;
  • getForObject() 发送一个 HTTP GET 请求,返回的请求体将映射为一个对象;
  • postForEntity() POST 数据到一个 URL,返回包含一个对象的 ResponseEntity,这个对象是从响应体中映射得到的;
  • postForObject() POST 数据到一个 URL,返回根据响应体匹配形成的对象;
  • headForHeaders() 发送 HTTP HEAD 请求,返回包含特定资源 URL 的 HTTP 头;
  • optionsForAllow() 发送 HTTP OPTIONS 请求,返回对特定 URL 的 Allow 头信息;
  • postForLocation() POST 数据到一个 URL,返回新创建资源的 URL;
  • put() PUT 资源到特定的 URL。
@Autowired
private RestTemplate restTemplate;

public Order queryOrderById(Long orderId) {
    // 1. 查询订单
    Order order = orderMapper.findById(orderId);
    // 2. 利用 RestTemplate 发起 http 请求,查询用户
    // 2.1. url 路径
    String url = "http://localhost:8081/user/" + order.getUserId();
    // 2.2. 发送 http 请求,实现远程调用
    User user = restTemplate.getForObject(url, User.class);
    // 3. 封装 user 到 Order
    order.setUser(user);
    // 4. 返回
    return order;
}

orderService 调用了 userService 的方法

3.2 Eureka

  • order-service 在发起远程调用的时候,该如何得知 user-service 实例的 ip 地址和端口?
  • 有多个 user-service 实例地址,order-service 调用时该如何选择?
  • order-service 如何得知某个 user-service 实例是否依然健康,是不是已经宕机?、

针对这些问题要利用 SpringCloud 中的注册中心来解决,这里讲解 Eureka。

image-20220729230223772

3.2.1 Server

引入以下依赖:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
</dependency>

在启动类中添加 @EnableEurekaServer 注解,开启 eureka 的注册中心功能。

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;

@SpringBootApplication
@EnableEurekaServer
public class EurekaApplication {
    public static void main(String[] args) {
        SpringApplication.run(EurekaApplication.class, args);
    }
}

编写一个 application.yml 文件,内容如下:

server:
  port: 10086
spring:
  application:
    name: eureka-server
eureka:
  client:
    service-url: 
      defaultZone: http://127.0.0.1:10086/eureka
      # 启动时服务端地址,后台管理

3.2.2 Consumer

在 user-service 中引入 eureka-Client 的依赖:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

在 user-service 中,修改 application.yml 文件,添加服务名称、eureka 地址:

spring:
  application:
    name: userservice
eureka:
  client:
    service-url:
      defaultZone: http://127.0.0.1:10086/eureka

通过 idea 配置,可以启动相同配置的多个实例,实现模拟集群的效果。

在服务中复制一份相同的配置启动项,除此之外,还需要更改运行端口,防止冲突。

复制配置

在 JVM 的配置中,可以对 yaml 文件的参数进行手动配置。

新版 idea 需要提前开启 VM 选项设置,在其中配置新的端口

在 orderService 总采取同样的配置,修改 url 内容:

@Autowired
private RestTemplate restTemplate;

public Order queryOrderById(Long orderId) {
    // 1. 查询订单
    Order order = orderMapper.findById(orderId);
    // 2. 利用 RestTemplate 发起 http 请求,查询用户
    // 2.1. url 路径
    // String url = "http://localhost:8081/user/" + order.getUserId();
    // 在 userService 中封装了 ip + port
    String url = "http://userservice/user/" + order.getUserId();
    // 2.2. 发送 http 请求,实现远程调用
    User user = restTemplate.getForObject(url, User.class);
    // 3. 封装 user 到 Order
    order.setUser(user);
    // 4. 返回
    return order;
}

在 RestTemplate 中添加 @LoadBalanced 开启自动负载均衡模式(默认为轮询调用)。

成功注册结果显示

3.3 Ribbon

在 SpringCloud 中,添加 @LoadBalanced 注解,就能够自动帮助实现负载均衡。原因是 SpringCould 底层内置了 Ribbon。除此,Ribbon 将配置中的服务名自动转发到对应 IP + port。

负载均衡底层

3.3.1 源码分析

在 LoadBalancerInterceptor 中,Ribbon 帮我们拦截发起的请求,并在其中进行解析。

public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {

	private LoadBalancerClient loadBalancer;

	private LoadBalancerRequestFactory requestFactory;

	public LoadBalancerInterceptor(LoadBalancerClient loadBalancer,
			LoadBalancerRequestFactory requestFactory) {
		this.loadBalancer = loadBalancer;
		this.requestFactory = requestFactory;
	}

	public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
		// for backwards compatibility
		this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
	}

	@Override
	public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
			final ClientHttpRequestExecution execution) throws IOException {
		final URI originalUri = request.getURI();
		String serviceName = originalUri.getHost();
		Assert.state(serviceName != null,
				"Request URI does not contain a valid hostname: " + originalUri);
		return this.loadBalancer.execute(serviceName,
				this.requestFactory.createRequest(request, body, execution));
	}
}
  • request.getURI():获取请求 uri;
  • originalUri.getHost():获取 uri 路径的主机名,其实就是服务 id;
  • this.loadBalancer.execute():处理服务 id,和用户请求。

获取到服务对应的 uri

在 execute 调用 RibbonLoadBalancerClient 中的重载方法。

  • getLoadBalancer(serviceId):根据服务 id 获取 ILoadBalancer,而 ILoadBalancer 会拿着服务 id 去 eureka 中获取服务列表并保存;
  • getServer(loadBalancer):利用内置的负载均衡算法,从服务列表中选择一个。

image-20220801131355311

在 Ribbon 中,默认的负载均衡的规则是轮询模式。

RoudRobinRule

3.3.2 修改负载均衡规则

Ribbon 提供了内置负载均衡的规则。

各种规则的继承链

内置负载均衡规则类 规则描述
RoundRobinRule 简单轮询服务列表来选择服务器。它是Ribbon默认的负载均衡规则。
AvailabilityFilteringRule 对以下两种服务器进行忽略:
(1)在默认情况下,这台服务器如果3次连接失败,这台服务器就会被设置为“短路”状态。短路状态将持续30秒,如果再次连接失败,短路的持续时间就会几何级地增加。
(2)并发数过高的服务器。如果一个服务器的并发连接数过高,配置了AvailabilityFilteringRule规则的客户端也会将其忽略。并发连接数的上限,可以由客户端的 <clientName>.<clientConfigNameSpace>.ActiveConnectionsLimit 属性进行配置。
WeightedResponseTimeRule 为每一个服务器赋予一个权重值。服务器响应时间越长,这个服务器的权重就越小。这个规则会随机选择服务器,这个权重值会影响服务器的选择
ZoneAvoidanceRule 以区域可用的服务器为基础进行服务器的选择。使用Zone对服务器进行分类,这个Zone可以理解为一个机房、一个机架等。而后再对Zone内的多个服务做轮询
BestAvailableRule 忽略那些短路的服务器,并选择并发数较低的服务器
RandomRule 随机选择一个可用的服务器
RetryRule 重试机制的选择逻辑
  1. 在配置类中,重新定义 Rule 的 Bean;

    @Bean
    public IRule myRule(){
        return new RandomRule();
    }
  2. 在 yaml 配置文件中修改定义。

    userservice: # 给某个微服务配置负载均衡规则,这里是 userservice 服务
      ribbon:
        NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RandomRule # 负载均衡规则 

总结 Ribbon 拦截解析

3.3.3 饥饿加载

Ribbon 默认是采用懒加载,即第一次访问时才会去创建 LoadBalanceClient,请求时间会很长。

而饥饿加载则会在项目启动时创建,降低第一次访问的耗时,通过下面配置开启饥饿加载:

ribbon:
  eager-load:
    enabled: true
    clients: userservice

3.4 Nacos

SpringCloudAlibaba 推出了 Nacos 的注册中心

在父工程的 pom 文件中的 <dependencyManagement> 中引入 SpringCloudAlibaba 的依赖:

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-alibaba-dependencies</artifactId>
    <version>2.2.6.RELEASE</version>
    <type>pom</type>
    <scope>import</scope>
</dependency>

在子模块中引入 nacos-discovery 依赖:

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>

在 yaml 文件中配置 nacos 服务地址,就可以正常使用:

spring:
  cloud:
    nacos:
      server-addr: localhost:8848

oderservice 1个 实例,userservice 2个 实例

3.4.1 服务分级存储模型

Nacos 就将同一机房内的实例 划分为一个集群

image-20220802132152943

微服务互相访问时,应该尽可能访问同集群实例,因为本地访问速度更快。当本集群内不可用时,才访问其它集群。

image-20220802132158696

在 yaml 文件中配置实例所属集群:

spring:
  cloud:
    nacos:
      server-addr: localhost:8848
      discovery:
        cluster-name: HZ # 集群名称

重新复制一份配置,更改集群参数:

cluster-name 现在是 SH

两个不同集群

3.4.2 集群负载均衡&权重比

默认的 ZoneAvoidanceRule 并不能实现根据同集群优先来实现负载均衡。

在 orderservice 中还是需要通过 Ribbon 更改负载均衡规则:

userservice:
  ribbon:
    NFLoadBalancerRuleClassName: com.alibaba.cloud.nacos.ribbon.NacosRule # 负载均衡规则 

而对于服务访问的权重,可以直接在控制台的进行修改(0 表示不会被访问)。

对于多环境管理,nacos 使用 namespace 将服务放置在不同的环境中。

  • nacos 中可以有多个 namespace;
  • namespace下可以有 group、service 等;
  • 不同 namespace 之间相互隔离,例如不同 namespace 的服务互相不可见。

关系图

通过对应配置完成 namespace 归属:

spring:
  cloud:
    nacos:
      server-addr: localhost:8848
      discovery:
        cluster-name: HZ
        namespace: 492a7d5d-237b-46a1-a99a-fa8e98e4b0f9 # 命名空间,填ID

创建时分配唯一的 id

3.4.3 eureka 和 nacos

Nacos的服务实例分为两种l类型:

  • 临时实例:如果实例宕机超过一定时间,会从服务列表剔除,默认的类型。

  • 非临时实例:如果实例宕机,不会从服务列表剔除,也可以叫永久实例。

配置一个服务实例为永久实例:

spring:
  cloud:
    nacos:
      discovery:
        ephemeral: false # 设置为非临时实例
  • Nacos与eureka的共同点

    • 都支持服务注册和服务拉取
    • 都支持服务提供者心跳方式做健康检测
  • Nacos与Eureka的区别

    • Nacos支持服务端主动检测提供者状态:临时实例采用心跳模式,非临时实例采用主动检测模式
    • 临时实例心跳不正常会被剔除,非临时实例则不会被剔除
    • Nacos支持服务列表变更的消息推送模式,服务列表更新更及时
    • Nacos 集群默认采用 AP 方式,当集群中存在非临时实例时,采用 CP 模式;Eureka 采用 AP 方式

nacos 处理请求方式

CP:我们服务可以不能用,但必须要保证数据的一致性。

AP: 数据可以短暂不一致,但最终是需要一致的,无论如何都要保证服务的可用。

只取舍:有在 CP 和 AP 选择一个平衡点,大多数都是选择 AP 模式。

3.4.4 nacos 配置管理

在 nacos 后台中配置对应文件,应用至对应环境中,可以参考 maven 的多环境配置。

ID 跟 Spring 中的命名一致

在 SpringBoot 中需要引入 bootstrap.yaml 文件,读取 nacos 中的配置,会在 application.yaml 之前被读取。

加载流程

引入 nacos-config 依赖:

<!--nacos配置管理依赖-->
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>

添加 bootstrap.yaml:

spring:
  application:
    name: orderservice # 服务名称
  profiles:
    active: dev #开发环境,这里是dev 
  cloud:
    nacos:
      server-addr: localhost:8848 # Nacos地址
      config:
        file-extension: yaml # 文件后缀名

会根据 spring.cloud.nacos.server-addr 获取 nacos 地址,再根据${spring.application.name}-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}作为文件id,来读取配置。

在 SpringBoot 中,可以像操控本地配置文件一样,使用 @Value 进行读取。

@Value("${pattern.dateformat}")
private String dateformat;

@GetMapping("{orderId}")
public Order queryOrderByUserId(@PathVariable("orderId") Long orderId) {
    // 根据id查询订单并返回
    return orderService.queryOrderById(orderId);
}

配置热更新

  • 在对应 Controller 中添加 @RefreshScope

  • 通过 Spring 中的 @ConfigurationProperties 将配置内容注入 Bean 类中;

    import lombok.Data;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.stereotype.Component;
    
    @Component
    @Data
    @ConfigurationProperties(prefix = "pattern")
    public class PatternProperties {
        private String dateformat;
    }

    两种方式选择一种即可。

配置共享

微服务启动时,nacos 会读取多个配置文件,例如:

  • [spring.application.name]-[spring.profiles.active].yaml,例如:userservice-dev.yaml;

  • [spring.application.name].yaml,例如:userservice.yaml。

[spring.application.name].yaml 不包含环境,因此可以被多个环境共享。

跟 Spring 一致,如果不加 dev 等 profile,代表共享配置。

共享配置优先级

3.4.5 搭建 nacos

Windows

阿里的仓库找到 nacos,下载对应的 nacos 文件。

对于单节点启动,不需要进行配置,在 bin 目录中使用 .\startup.cmd -m standalone (默认是集群模式)即可启动。

对于集群模式,需要我们提前配置集群的 ip + 端口

127.0.0.1:8841
127.0.0.1:8844
127.0.0.1:8847

对于集群的端口,需要提前预留,在 nacos 连续占用会出现问题。

配置文件中打开 mysql 的连接

复制三份 nacos 文件,需要修改 properties 中的 port

# 配置对应负载均衡
upstream nacos-cluster {
    server 127.0.0.1:8845;
    server 127.0.0.1:8846;
    server 127.0.0.1:8847;
}

#gzip  on;
server {
    listen       10000;
    server_name  localhost;

    location /nacos {
        proxy_pass http://nacos-cluster;
    }
}

Docker

官方 Docker Hub 中有对 Docker 启动的样例文件,但可惜有部分错误。

version: "3"
services:
  nacos1:
    hostname: nacos1
    container_name: nacos1
    image: nacos/nacos-server:${NACOS_VERSION}
    volumes:
      - ./cluster-logs/nacos1:/home/nacos/logs
      - ./init.d/custom.properties:/home/nacos/init.d/custom.properties
    ports:
      - "8848:8848"
      - "9848:9848"
      - "9555:9555"
    env_file:
      - ../env/nacos-hostname.env
    restart: always
    depends_on:
      - mysql

  nacos2:
    hostname: nacos2
    image: nacos/nacos-server:${NACOS_VERSION}
    container_name: nacos2
    volumes:
      - ./cluster-logs/nacos2:/home/nacos/logs
      - ./init.d/custom.properties:/home/nacos/init.d/custom.properties
    ports:
      - "8849:8848"
      - "9849:9848"
    env_file:
      - ../env/nacos-hostname.env
    restart: always
    depends_on:
      - mysql

  nacos3:
    hostname: nacos3
    image: nacos/nacos-server:${NACOS_VERSION}
    container_name: nacos3
    volumes:
      - ./cluster-logs/nacos3:/home/nacos/logs
      - ./init.d/custom.properties:/home/nacos/init.d/custom.properties
    ports:
      - "8850:8848"
      - "9850:9848"
    env_file:
      - ../env/nacos-hostname.env
    restart: always
    depends_on:
      - mysql
      
  mysql:
    container_name: mysql
    # 这里官方默认是 5 版本,但不兼容 nacos 2.1
    image: nacos/nacos-mysql:8.0.16
    env_file:
      - ../env/mysql.env
    volumes:
      - ./mysql:/var/lib/mysql
    ports:
      - "10002:3306"

MySQL 的 env 如下,可以对参数进行修改:

MYSQL_ROOT_PASSWORD=root
MYSQL_DATABASE=nacos_devtest
MYSQL_USER=nacos
MYSQL_PASSWORD=nacos

nacos 的 env 如下:

#nacos dev env
PREFER_HOST_MODE=hostname
NACOS_SERVERS=nacos1:8848 nacos2:8848 nacos3:8848
MYSQL_SERVICE_HOST=mysql
MYSQL_SERVICE_DB_NAME=nacos_devtest
MYSQL_SERVICE_PORT=3306
MYSQL_SERVICE_USER=nacos
MYSQL_SERVICE_PASSWORD=nacos

在 docker-compose 启动的过程中,需要先保证创建出 MySQL 容器。如果第一次失败可以重启,保证数据库成功绑定。

而对于这种 cluster 集群来说,会出现无法配置参数的问题,这是因为两个表缺失了两个字段,补齐之后就可以正常使用。

分别在 config_info 和 history_config_info 这两个表里添加 encrypted_data_key 字段,类型为 text。

3.5 Feign

Feign 是一个声明式的 http 客户端,用于替代 RestTemplate 直接使用 URL 的形式。

这个客户端主要是基于 SpringMVC 的注解来声明远程调用的信息,比如:

  • 服务名称:userservice
  • 请求方式:GET
  • 请求路径:/user/{id}
  • 请求参数:Long id
  • 返回值类型:User

这样,Feign 就可以帮助我们发送 http 请求,无需自己使用 RestTemplate 来发送了。

在 orderservice 服务的 pom 文件中引入 feign 的依赖:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>

编写 RPC 接口程序:

import cn.itcast.order.pojo.User;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;

@FeignClient("userservice")
public interface UserClient {
    @GetMapping("/user/{id}")
    User findById(@PathVariable("id") Long id);
}

在 orderservice 中,用 feign 替代 RestTemplate:

@Autowired
private UserClient userClient;

public Order queryOrderById(Long orderId) {
    // 1. 查询订单
    Order order = orderMapper.findById(orderId);
    // 2. 用 Feign 远程调用
    User user = userClient.findById(order.getUserId());
    // 3. 封装 user 到 Order
    order.setUser(user);
    // 4. 返回
    return order;
}

3.5.1 自定义配置

Feign可以支持很多的自定义配置,如下表所示:

类型 作用 说明
feign.Logger.Level 修改日志级别 包含四种不同的级别:NONE、BASIC、HEADERS、FULL
feign.codec.Decoder 响应结果的解析器 http 远程调用的结果做解析,例如解析 json 字符串为 Java 对象
feign.codec.Encoder 请求参数编码 将请求参数编码,便于通过http请求发送
feign.Contract 支持的注解格式 默认是 SpringMVC 的注解
feign.Retryer 失败重试机制 请求失败的重试机制,默认是没有,不过会使用 Ribbon 的重试

这些配置一般采用默认,自定义可以像之前,采用 yaml 或者 Java 代码 两种配置形式。

yaml

基于配置文件修改 feign 的日志级别可以针对单个服务:

feign:  
  client:
    config: 
      userservice: # 针对某个微服务的配置
        loggerLevel: FULL #  日志级别 

也可以针对所有服务:

feign:  
  client:
    config: 
      default: # 这里用default就是全局配置,如果是写服务名称,则是针对某个微服务的配置
        loggerLevel: FULL #  日志级别 

而日志的级别分为四种:

  • NONE:不记录任何日志信息,这是默认值;
  • BASIC:仅记录请求的方法,URL 以及响应状态码和执行时间;
  • HEADERS:在 BASIC 的基础上,额外记录了请求和响应的头信息;
  • FULL:记录所有请求和响应的明细,包括头信息、请求体、元数据。

Java 代码

先声明一个类,然后声明一个Logger.Level的对象:

public class DefaultFeignConfiguration  {
    @Bean
    public Logger.Level feignLogLevel(){
        return Logger.Level.BASIC; // 日志级别为 BASIC
    }
}

如果要全局生效,将其放到启动类的 @EnableFeignClients 这个注解中:

@EnableFeignClients(defaultConfiguration = DefaultFeignConfiguration.class) 

如果是局部生效,则把它放到对应的 @FeignClient 这个注解中(如果没有配置 configuration 会导致 Bean 无法被加载装配):

@FeignClient(value = "userservice", configuration = DefaultFeignConfiguration.class) 

3.5.2 使用优化

Feign 底层发起 http 请求,依赖于其它的框架。其底层客户端实现包括:

  • URLConnection:默认实现,不支持连接池;
  • Apache HttpClient :支持连接池;
  • OKHttp:支持连接池。

因此提高 Feign 的性能主要手段就是使用连接池代替默认的 URLConnection。

在 orderservice 的 pom 文件中引入 Apache 的 HttpClient 依赖:

<!--httpClient的依赖 -->
<dependency>
    <groupId>io.github.openfeign</groupId>
    <artifactId>feign-httpclient</artifactId>
</dependency>

2)配置连接池

在 orderservice 的 application.yml 中添加配置:

feign:
  client:
    config:
      default: # default全局的配置
        loggerLevel: BASIC # 日志级别,BASIC就是基本的请求和响应信息
  httpclient:
    enabled: true # 开启feign对HttpClient的支持
    max-connections: 200 # 最大的连接数
    max-connections-per-route: 50 # 每个路径的最大连接数

3.5.3 抽取 api

针对模块中重复的接口调用,可以将共同的 api 提取出额外的接口,之后在进行导入。

继承解决法,无法继承注解映射,不推荐使用

将 UserClient、User、Feign 的默认配置都抽取到一个 feign-api 包中,所有微服务引用该依赖包,即可直接使用。

抽取 api 示例图

在 feign-api 中然后引入 feign 的 starter 依赖:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>

将公用的类抽取放入 feign-api 中。

在 orderservice 中引入对应依赖。UserClient 现在在cn.itcast.feign.clients 包下,而 orderservice 的 @EnableFeignClients 注解是在 cn.itcast.order 包下,不在同一个包,无法扫描到 UserClient。

方式一:

指定 Feign 应该扫描的包:

@EnableFeignClients(basePackages = "cn.itcast.feign.clients")

方式二:

指定需要加载的 Client 接口:

@EnableFeignClients(clients = {UserClient.class})

3.6 Gateway

网关的核心功能特性

  • 请求路由;
  • 权限控制;
  • 限流。

Gateway 架构图

在创建 Gateway 模块后,引入以下依赖:

<!--网关-->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<!--nacos服务发现依赖-->
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
# 配置对应 yaml,将网关匹配到对应 service
server:
  port: 10010
logging:
  level:
    cn.itcast: debug
  pattern:
    dateformat: MM-dd HH:mm:ss:SSS
spring:
  application:
    name: gateway
  cloud:
    nacos:
      server-addr: localhost:8848 # nacos地址
    gateway:
      routes:
        - id: userservice # 路由标示,必须唯一
          uri: lb://userservice # 路由的目标地址
          predicates: # 路由断言,判断请求是否符合规则
            - Path=/user/** # 路径断言,判断路径是否是以 /user 开头,如果是则符合
        - id: orderservice
          uri: lb://orderservice
          predicates:
            - Path=/order/**

整个网关流程图

路由配置包括:

  1. 路由 id:路由的唯一标示

  2. 路由目标(uri):路由的目标地址,http 代表固定地址,lb 代表根据服务名负载均衡

  3. 路由断言(predicates):判断路由的规则,

  4. 路由过滤器(filters):对请求或响应做处理

3.6.1 断言工厂

我们在配置文件中写的断言规则只是字符串,这些字符串会被Predicate Factory读取并处理,转变为路由判断的条件

例如Path=/user/**是按照路径匹配,这个规则是由

org.springframework.cloud.gateway.handler.predicate.PathRoutePredicateFactory类来

处理的,像这样的断言工厂在SpringCloudGateway还有十几个:

名称 说明 示例
After 是某个时间点后的请求 - After=2037-01-20T17:42:47.789-07:00[America/Denver]
Before 是某个时间点之前的请求 - Before=2031-04-13T15:14:47.433+08:00[Asia/Shanghai]
Between 是某两个时间点之前的请求 - Between=2037-01-20T17:42:47.789-07:00[America/Denver], 2037-01-21T17:42:47.789-07:00[America/Denver]
Cookie 请求必须包含某些cookie - Cookie=chocolate, ch.p
Header 请求必须包含某些header - Header=X-Request-Id, \d+
Host 请求必须是访问某个host(域名) - Host=.somehost.org,.anotherhost.org
Method 请求方式必须是指定方式 - Method=GET,POST
Path 请求路径必须符合指定规则 - Path=/red/{segment}, /blue/**
Query 请求参数必须包含指定参数 - Query=name, Jack 或者 - Query=name
RemoteAddr 请求者的ip必须是指定范围 - RemoteAddr=192.168.1.1/24
Weight 权重处理

3.6.2 过滤工厂

Spring 提供了 31种不同的路由过滤器工厂。例如:

名称 说明
AddRequestHeader 给当前请求添加一个请求头
RemoveRequestHeader 移除请求中的一个请求头
AddResponseHeader 给响应结果中添加一个响应头
RemoveResponseHeader 从响应结果中移除有一个响应头
RequestRateLimiter 限制请求的流量

只需要修改 Gateway 服务的 application.yaml 文件,添加路由过滤即可:

spring:
  cloud:
    gateway:
      routes:
      - id: userservice 
        uri: lb://userservice 
        predicates: 
        - Path=/user/** 
        filters: # 过滤器
        - AddRequestHeader=Truth, Itcast is freaking awesome! # 添加请求头

当前过滤器写在 userservice 路由下,因此仅仅对访问 userservice 的请求有效。

如果要对所有的路由都生效,则可以将过滤器工厂写到 default 下。格式如下:

spring:
  cloud:
    gateway:
      routes:
      - id: userservice 
        uri: lb://userservice 
        predicates: 
        - Path=/user/**
      default-filters: # 默认过滤项
      - AddRequestHeader=Truth, Itcast is freaking awesome! 

全局过滤器

全局过滤器的作用也是处理一切进入网关的请求和微服务响应,与 GatewayFilter 的作用一样。区别在于 GatewayFilter 通过配置定义,处理逻辑是固定的;而 GlobalFilter的逻辑需要自己写代码实现。

定义方式是实现 GlobalFilter 接口:

public interface GlobalFilter {
    /**
     *  处理当前请求,有必要的话通过{@link GatewayFilterChain}将请求交给下一个过滤器处理
     *
     * @param exchange 请求上下文,里面可以获取Request、Response等信息
     * @param chain 用来把请求委托给下一个过滤器 
     * @return {@code Mono<Void>} 返回标示当前过滤器业务结束
     */
    Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain);
}

在 filter 中编写自定义逻辑,可以实现下列功能:

  • 登录状态判断;
  • 权限校验;
  • 请求限流等。
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.stereotype.Component;
import org.springframework.util.MultiValueMap;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;

// @Order(-1)
@Component
public class AuthorizeFilter implements GlobalFilter, Ordered {
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        // 1. 获取请求参数
        ServerHttpRequest request = exchange.getRequest();
        MultiValueMap<String, String> params = request.getQueryParams();
        // 2. 获取参数中的 authorization 参数
        String auth = params.getFirst("authorization");
        // 3. 判断参数值是否等于 admin
        if ("admin".equals(auth)) {
            // 4.是,放行
            return chain.filter(exchange);
        }
        // 5. 否,拦截
        // 5.1.设 置状态码
        exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);
        // 5.2. 拦截请求
        return exchange.getResponse().setComplete();
    }

    @Override
    public int getOrder() {
        return -1;
    }
}

添加对应 parameter 之后才可访问

执行顺序

请求进入网关会碰到三类过滤器:当前路由的过滤器、DefaultFilter、GlobalFilter

请求路由后,会将当前路由过滤器和 DefaultFilter、GlobalFilter,合并到一个过滤器链(集合)中,排序后依次执行每个过滤器:

过滤器执行顺序

  • 每一个过滤器都必须指定一个 int 类型的 order 值,order 值越小,优先级越高,执行顺序越靠前(可以为负数)。
  • GlobalFilter 通过实现 Ordered 接口,或者添加 @Order 注解来指定 order 值,由我们自己指定
  • 路由过滤器和 defaultFilter 的 order 由 Spring 指定,默认是按照声明顺序从 1 递增。
  • 当过滤器的 order 值一样时,会按照 defaultFilter > 路由过滤器 > GlobalFilter 的顺序执行。

3.6.3 跨域问题

在 Gateway 服务的 application.yml 文件中,添加下面的配置:

spring:
  cloud:
    gateway:
      globalcors: # 全局的跨域处理
        add-to-simple-url-handler-mapping: true # 解决 options 请求被拦截问题
        corsConfigurations:
          '[/**]':
            allowedOrigins: # 允许哪些网站的跨域请求 
              - "http://localhost:8090"
            allowedMethods: # 允许的跨域ajax 的请求方式
              - "GET"
              - "POST"
              - "DELETE"
              - "PUT"
              - "OPTIONS"
            allowedHeaders: "*" # 允许在请求中携带的头信息
            allowCredentials: true # 是否允许携带cookie
            maxAge: 360000 # 这次跨域检测的有效期

3.7 Sentinel

为了保护微服务中可能出现的大量 QPS 情况或者因为单个微服务失效而导致的雪崩问题,阿里提供了 Sentinel 对微服务进行保护和管理。

在SpringCloud当中支持多种服务保护技术:

早期比较流行的是 Hystrix 框架,但目前国内实用最广泛的还是阿里巴巴的 Sentinel 框架:

Sentinel Hystrix
隔离策略 信号量隔离 线程池隔离/信号量隔离
熔断降级策略 基于慢调用比例或异常比例 基于失败比率
实时指标实现 滑动窗口 滑动窗口(基于 RxJava)
规则配置 支持多种数据源 支持多种数据源
扩展性 多个扩展点 插件的形式
基于注解的支持 支持 支持
限流 基于 QPS,支持基于调用关系的限流 有限的支持
流量整形 支持慢启动、匀速排队模式 不支持
系统自适应保护 支持 不支持
控制台 开箱即用,可配置规则、查看秒级监控、机器发现等 不完善
常见框架的适配 Servlet、Spring Cloud、Dubbo、gRPC 等 Servlet、Spring Cloud Netflix

3.7.1 安装 Sentinel

在 Sentinel Github 官网下载对应 jar,运行 java -jar sentinel-dashboard-1.8.5.jar --server.port=9000

如果要修改Sentinel的默认端口、账户、密码,可以通过下列配置:

配置项 默认值 说明
server.port 8080 服务端口
sentinel.dashboard.auth.username sentinel 默认用户名
sentinel.dashboard.auth.password sentinel 默认密码

3.7.2 微服务整合 Sentinel

引入 Sentinel 依赖:

<!--sentinel-->
<dependency>
    <groupId>com.alibaba.cloud</groupId> 
    <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>

配置 application.yaml 文件:

spring:
  cloud: 
    sentinel:
      transport:
        dashboard: localhost:9000

访问绑定 Sentinel 的业务,可以查看监控情况

3.7.3 流量控制

通过簇点链路,可以实现对某一请求进行限制。

流控、熔断、热点、授权等等

流控模式

限制 /order/{orderId} QPS 为 5(单机每秒允许 5次请求)

可以通过 JMeter 测试,发现确实有限流作用。

在添加限流规则时,点击高级选项,可以选择三种流控模式

  • 直接:统计当前资源的请求,触发阈值时对当前资源直接限流,也是默认的模式;

默认直接模式

  • 关联:统计与当前资源相关的另一个资源,触发阈值时,对当前资源限流;

在关联模式中,在 /write 访问量触发阈值时,也会对 /read 资源限流。

image-20220824142854431

  • 链路:统计从指定链路访问到本资源的请求,触发阈值时,对指定链路限流。

配置示例

例如有两条请求链路:

  • /test1 –> /common

  • /test2 –> /common

如果只希望统计从 /test2 进入到 /common 的请求,则可以这样配置:

这里的 /common 可以表示 Service 层的 API

默认情况下,OrderService 中的方法是不被 Sentinel 监控的,需要我们自己通过注解来标记要监控的方法。

给 OrderService 的 queryGoods 方法添加 @SentinelResource 注解:

@SentinelResource("goods")
public void queryGoods(){
    System.err.println("查询商品");
}

链路模式中,是对不同来源的两个链路做监控。但是 Sentinel 默认会给进入 SpringMVC 的所有请求设置同一个 root 资源,会导致链路模式失效。我们需要关闭这种对 SpringMVC 的资源聚合,修改服务的 application.yml 文件:

spring:
  cloud:
    sentinel:
      web-context-unify: false # 关闭 context 整合

实现两个请求方法调用 goods

只对 /order/query 的访问进行限制

流控效果

在流控的高级选项中,还有一个流控效果选项。

流控效果是指请求达到流控阈值时应该采取的措施,包括三种:

  • 快速失败:达到阈值后,新的请求会被立即拒绝并抛出 FlowException 异常。是默认的处理方式;
  • warm up:预热模式,对超出阈值的请求同样是拒绝并抛出异常。但这种模式阈值会动态变化,从一个较小值逐渐增加到最大阈值。应对服务冷启动的一种方案。请求阈值初始值是 maxThreshold / coldFactor,持续指定时长后,逐渐提高到 maxThreshold 值。而 coldFactor 的默认值是3。
    例如,设置 QPS 的 maxThreshold 为 10,预热时间为 5秒,那么初始阈值就是 10 / 3 ,也就是 3,然后在 5秒后逐渐增长到 10;
  • 排队等待:让所有的请求按照先后次序排队执行,两个请求的间隔不能小于指定时长。
    如果使用队列模式做流控,所有进入的请求都要排队,以固定的200ms的间隔执行。

热点参数限流

@SentinelResource 中监控特定资源:

@SentinelResource("hot")
public Order queryOrderById(Long orderId) {
    // 1. 查询订单
    Order order = orderMapper.findById(orderId);
    // 2. 用 Feign 远程调用
    User user = userClient.findById(order.getUserId());
    // 3. 封装 user 到 Order
    order.setUser(user);
    // 4. 返回
    return order;
}

对 hot 的 0号参数(第一个参数)做统计,每秒相同参数值的请求数不能超过 5

上述的配置针对于所有参数的请求,在真实业务中可能会出现部分 QPS 需求更大的情况。

修改对 101 和 102 参数的 QPS

3.7.4 隔离和降级

线程隔离:调用者在调用服务提供者时,给每个调用的请求分配独立线程池,出现故障时,最多消耗这个线程池内资源,避免把调用者的所有资源耗尽。

每个业务分配独立线程池

熔断降级:在调用方这边加入断路器,统计对服务提供者的调用,如果调用的失败比例过高,则熔断该业务,不允许访问该服务的提供者了。

熔断失效服务

因为 SpringCloud 中使用 Feign 代替 RestTemplate 来实现 RPC,所以需要同时整合 Sentinel 和 Feign。

修改 application.yml 文件,开启 Feign 的 Sentinel 功能:

feign:
  sentinel:
    enabled: true # 开启 Feign 对 Sentinel 的支持

业务失败后,不能直接报错,而应该返回用户一个友好提示或者默认结果,这个就是失败降级逻辑。

给 FeignClient 编写失败后的降级逻辑

  • 方式一:FallbackClass,无法对远程调用的异常做处理;

  • 方式二:FallbackFactory,可以对远程调用的异常做处理。一般推荐 FallbackFactory。

在 feing-api 项目中定义类,实现 FallbackFactory:

import cn.itcast.feign.clients.UserClient;
import cn.itcast.feign.pojo.User;
import feign.hystrix.FallbackFactory;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class UserClientFallbackFactory implements FallbackFactory<UserClient> {
    // 定义失败之后返回的结果
    @Override
    public UserClient create(Throwable throwable) {
        return new UserClient() {
            @Override
            public User findById(Long id) {
                log.error("查询用户异常", throwable);
                return new User();
            }
        };
    }
}

将 UserClientFallbackFactory 注册为一个 Bean:

@Bean
public UserClientFallbackFactory userClientFallbackFactory(){
    return new UserClientFallbackFactory();
}

在 feing-api 项目中的 UserClient 接口中使用 UserClientFallbackFactory:

import cn.itcast.feign.clients.fallback.UserClientFallbackFactory;
import cn.itcast.feign.pojo.User;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;

@FeignClient(value = "userservice", fallbackFactory = UserClientFallbackFactory.class)
public interface UserClient {

    @GetMapping("/user/{id}")
    User findById(@PathVariable("id") Long id);
}

重启之后能看到对应 Feign 方法

线程隔离

线程隔离有两种方式实现:

  • 线程池隔离;

  • 信号量隔离(Sentinel默认采用)。

线程池隔离 & 信号量隔离

线程池隔离:给每个服务调用业务分配一个线程池,利用线程池本身实现隔离效果;

信号量隔离:不创建线程池,而是计数器模式,记录业务使用的线程数量,达到信号量上限时,禁止新的请求。

两者优缺点

在添加限流规则时,可以选择两种阈值类型:

线程数:该资源能使用的 tomcat 线程数的最大值

通过限制线程数量,实现线程隔离(舱壁模式)。

熔断降级

熔断降级是解决雪崩问题的重要手段。其思路是由断路器统计服务调用的异常比例、慢请求比例,如果超出阈值则会熔断该服务。即拦截访问该服务的一切请求;而当服务恢复时,断路器会放行访问该服务的请求。

断路器控制熔断和放行是通过状态机来完成的:

Closed 表示断路器状态

状态机包括三个状态:

  • closed:关闭状态,断路器放行所有请求,并开始统计异常比例、慢请求比例。超过阈值则切换到 Open 状态;
  • open:打开状态,服务调用被熔断,访问被熔断服务的请求会被拒绝,快速失败,直接走降级逻辑。Open 状态5秒后会进入 half-open 状态;
  • half-open:半开状态,放行一次请求,根据执行结果来判断接下来的操作:
    • 请求成功:则切换到 closed 状态;
    • 请求失败:则切换到 open 状态。

断路器熔断策略有三种:慢调用、异常比例、异常数。

慢调用

慢调用:业务的响应时长(RT)大于指定时长的请求认定为慢调用请求。在指定时间内,如果请求数量超过设定的最小数量,慢调用比例大于设定的阈值,则触发熔断。

设置慢调用规则

RT 超过 500ms 的调用是慢调用,统计最近 10000ms 内的请求,如果请求量超过 10次,并且慢调用比例不低于 0.5,则触发熔断,熔断时长为 5秒。然后进入 half-open 状态,放行一次请求做测试。

异常比例、异常数

异常比例或异常数:统计指定时间内的调用,如果调用次数超过指定请求数,并且出现异常的比例达到设定的比例阈值(或超过指定异常数),则触发熔断。

异常比例设置

统计最近 1000ms 内的请求,如果请求量超过 10次,并且异常比例不低于 0.4,则触发熔断。

异常数设置

统计最近 1000ms 内的请求,如果请求量超过 10次,并且异常比例不低于 2次,则触发熔断。

3.6.5 授权规则

授权规则可以对请求方来源做判断和控制。

基本规则

授权规则可以对调用方的来源做控制,有白名单和黑名单两种方式。

  • 白名单:来源(origin)在白名单内的调用者允许访问;

  • 黑名单:来源(origin)在黑名单内的调用者不允许访问。

点击左侧菜单的授权,可以看到授权规则:

保护资源访问来源

  • 资源名:就是受保护的资源,例如 /order/{orderId};

  • 流控应用:是来源者的名单:

    • 如果是勾选白名单,则名单中的来源被许可访问;
    • 如果是勾选黑名单,则名单中的来源被禁止访问。

允许 Gateway 访问,不允许浏览器访问

获取 origin

Sentinel 通过 RequestOriginParser 这个接口的 parseOrigin 来获取请求的来源。

public interface RequestOriginParser {
    /**
     * 从请求request对象中获取origin,获取方式自定义
     */
    String parseOrigin(HttpServletRequest request);
}

默认情况下,sentinel不管请求者从哪里来,返回值永远是default,也就是说一切请求的来源都被认为是一样的值default。因此,需要自定义这个接口的实现,让不同的请求,返回不同的origin

import com.alibaba.cloud.commons.lang.StringUtils;
import com.alibaba.csp.sentinel.adapter.spring.webmvc.callback.RequestOriginParser;
import org.springframework.stereotype.Component;

import javax.servlet.http.HttpServletRequest;

@Component
public class HeaderOriginParser implements RequestOriginParser {
    @Override
    public String parseOrigin(HttpServletRequest request) {
        // 1. 获取请求头
        String origin = request.getHeader("origin");
        // 2. 非空判断
        if (StringUtils.isEmpty(origin)) {
            origin = "blank";
        }
        return origin;
    }
}

添加请求头

既然获取请求 origin 的方式是从 request-header 中获取 origin 值,我们必须让所有从 Gateway 路由到微服务的请求都带上 origin 头

修改 gateway 服务中的 application.yml,添加一个 defaultFilter:

spring:
  cloud:
    gateway:
      default-filters:
        - AddRequestHeader=origin,gateway
      routes:
       # ...略

这样,从 Gateway 路由的所有请求都会带上 origin 头,值为 gateway。而从其它地方到达微服务的请求则没有这个头。

自定义异常

默认情况下,发生限流、降级、授权拦截时,都会抛出异常到调用方。异常结果都是 flow limmiting(限流)。

如果要自定义异常时的返回结果,需要实现 BlockExceptionHandler 接口:

public interface BlockExceptionHandler {
    /**
     * 处理请求被限流、降级、授权拦截时抛出的异常:BlockException
     */
    void handle(HttpServletRequest request, 
                HttpServletResponse response, 
                BlockException e) throws Exception;
}

这个方法有三个参数:

  • HttpServletRequest request:request 对象
  • HttpServletResponse response:response 对象
  • BlockException e:被 Sentinel 拦截时抛出的异常

这里的 BlockException 包含多个不同的子类:

异常 说明
FlowException 限流异常
ParamFlowException 热点参数限流的异常
DegradeException 降级异常
AuthorityException 授权规则异常
SystemBlockException 系统规则异常

通过调用 BlockExceptionHandler 的方式,进行自定义异常。

import com.alibaba.csp.sentinel.adapter.spring.webmvc.callback.BlockExceptionHandler;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.alibaba.csp.sentinel.slots.block.authority.AuthorityException;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeException;
import com.alibaba.csp.sentinel.slots.block.flow.FlowException;
import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowException;
import org.springframework.stereotype.Component;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

@Component
public class SentinelExceptionHandler implements BlockExceptionHandler {
    @Override
    public void handle(HttpServletRequest httpServletRequest,
                       HttpServletResponse httpServletResponse,
                       BlockException e) throws Exception {
        String msg = "未知异常";
        int status = 429;

        if (e instanceof FlowException) {
            msg = "请求被限流了";
        } else if (e instanceof ParamFlowException) {
            msg = "请求被热点参数限流";
        } else if (e instanceof DegradeException) {
            msg = "请求被降级了";
        } else if (e instanceof AuthorityException) {
            msg = "没有权限访问";
            status = 401;
        }

        httpServletResponse.setContentType("application/json;charset=utf-8");
        httpServletResponse.setStatus(status);
        httpServletResponse.getWriter().println("{\"msg\": " + msg + ", \"status\": " + status + "}");
    }
}

返回自定义结果

3.6.6 规则持久化

Sentinel 的所有规则都是内存存储,重启后所有规则都会丢失。在生产环境下,我们必须确保这些规则的持久化,避免丢失。

规则是否能持久化,取决于规则管理模式,Sentinel 支持三种规则管理模式:

  • 原始模式:Sentinel 的默认模式,将规则保存在内存,重启服务会丢失;
  • pull 模式;
  • push 模式。

pull 模式

pull 模式:控制台将配置的规则推送到 Sentinel 客户端,而客户端会将配置规则保存在本地文件或数据库中。以后会定时去本地文件或数据库中查询,更新本地规则。

按照本地文件或者数据库查询

对于 pull 模式,核心思路是在本地创建 json 文件实现规则的读取写入。

push 模式

push 模式:控制台将配置规则推送到远程配置中心,例如 Nacos。Sentinel 客户端监听 Nacos,获取配置变更的推送消息,完成本地配置更新。

通过远程配置中调用 Sentinel 配置

push 模式中,需要针对 Sentinel 的源码进行部分修改。

4. I/O 模型

Java 支持 3种网络编程模型(使用什么样的通道进行数据的发送和接收):BIO、NIO、AIO。

4.1 BIO

Java BIO同步并阻塞(传统阻塞型),服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不作任何事情会造成不必要的线程开销。

服务器对于每一个客户端的连接都会分配一个线程处理,为了避免资源消耗过多,采用了线程池模型(创建一个固定大小的线程池,如果有客户端请求,就从线程池中取一个空闲线程来处理,当客户端处理完操作之后,就会释放对线程的占用)。但如果有过多长时间处理的线程,依然会导致没有空余线程。

BIO 底层实现

4.2 NIO

Java NIO同步非阻塞,服务器实现模式为一个线程处理多个请求(连接),即客户端发送的连接请求会被注册到多路复用器上,多路复用器轮询到有 I/O 请求就会进行处理。

Selector 注册处理各种 I/O 事件,分配一个线程轮询每个客户端是否有事件发生,有事件发生时,顺序处理每个事件,当所有事件处理
完之后,便再转去继续轮询。

NIO 底层实现

在 NIO 中,当一个连接创建后,不需要对应一个线程,连接会被注册到多路复用器上面,一个选择器线程可以同时处理成千上万个连接,系统不必创建大量的线程,也不必维护这些线程,从而大大减小了系统的开销。

4.2.1 Reactor 模式

传统处理模式:

传统 I/O 服务模型

  • 采用阻塞I/O模式获取输入数据;

  • 每个连接都需要独立的线程完成数据的输入,业务处理,数据返回。

React 处理模式:

img

Reactor 模式称为反应器模式或应答者模式,拥有一个或多个并发输入源,有一个服务处理器和多个请求处理器,服务处理器会同步的将输入的请求事件以多路复用的方式分发给相应的请求处理器。当客户端请求抵达后,服务处理程序使用多路分配策略,由一个非阻塞的线程来接收所有请求,然后将请求派发到相关的工作线程并进行处理的过程。

Reactor 包含三种模式:

  • 单 Reactor 单线程;
  • 单 Reactor 多线程;
  • 主从 Reactor 多线程。

4.2.2 结构

NIO 通过 Channels、Buffers、Selectors 组成。

Channel 和 IO 中的 Stream 近似一个等级的。但 Stream 是单向的,譬如:InputStream, OutputStream。而 Channel 是双向的,既可以用来进行读操作,又可以用来进行写操作。

Channel:FileChannel、DatagramChannel、SocketChannel 和 ServerSocketChannel(对应 IO、UDP、TCP(Server 和 Client))。

Buffer:ByteBuffer, CharBuffer, DoubleBuffer, FloatBuffer, IntBuffer, LongBuffer, ShortBuffer。

Selector 运行单线程处理多个 Channel。要使用 Selector, 得向 Selector 注册 Channel,然后调用 select() 方法。select() 会一直阻塞到某个注册的通道有事件就绪。一旦这个方法返回,线程就可以处理这些事件。

Channel

Channel 中,可以进行异步的读写,数据需要读入进一个 Buffer 或者写入进一个 Buffer。

Channel 和 Buffer 关系图

FileChannel

read() 方法返回的int 值表示了有多少字节被读到了Buffer 中。如果返回-1,表示到了文件末尾。

通过调用 position() 方法获取 FileChannel 的当前位置。也可以通过调用 position(long pos) 方法设置 FileChannel 的当前位置。

FileChannel 实例的 size() 方法将返回该实例所关联文件的大小。

可以使用 FileChannel.truncate() 方法截取一个文件,文件将中指定长度后面的部分将被删除。channel.truncate(1024);,截取文件的前1024 个字节。

// 读取数据的例子
public class Test {
    public static void main(String[] args) throws IOException {
        // 获取对应文件
        RandomAccessFile aFile = new RandomAccessFile("D:\\txt\\1.txt", "rw");
        // 文件开启 Channel
        FileChannel inChannel = aFile.getChannel();
        // 创建 buffer 并分配空间
        ByteBuffer buf = ByteBuffer.allocate(48);
        int bytesRead = inChannel.read(buf);
        // 读取到文件末尾,返回-1
        while (bytesRead != -1) {
            System.out.println("Read " + bytesRead);
            // 反转读写模式
            buf.flip();
            while (buf.hasRemaining()) {
                // 读取数据,并输出
                System.out.print((char) buf.get());
            }
            // 清除缓冲区的内容
            buf.clear();
            bytesRead = inChannel.read(buf);
        }
        inChannel.close();
        aFile.close();
        System.out.println();
        System.out.println("End of file");
    }
}

/*==========================================================================*/
// 写入数据例子
public class Test {
    public static void main(String[] args) throws IOException {
        RandomAccessFile aFile = new RandomAccessFile("D:\\txt\\2.txt", "rw");
        FileChannel inChannel = aFile.getChannel();

        String data = "New String to write to file..." + LocalDateTime.now();
        ByteBuffer buf1 = ByteBuffer.allocate(1024);
        buf1.clear();
        buf1.put(data.getBytes());
        buf1.flip();
        // 判断是否有剩余数据
        while (buf1.hasRemaining()) {
            inChannel.write(buf1);
        }
        inChannel.close();
        aFile.close();
        System.out.println();
        System.out.println("End of file");
    }
}

分散(scatter)从 Channel 中读取是指在读操作时将读取的数据写入多个 Buffer 中。因此,Channel 将从 Channel 中读取的数据“分散(scatter)”到多个 Buffer 中。
聚集(gather)写入 Channel 是指在写操作时将多个 Buffer 的数据写入同一个 Channel,因此,Channel 将多个 Buffer 中的数据“聚集(gather)”后发送到 Channel。

scatter 在读取的时候,需要将 buffer 的空间填满,才能读取对应数据,gather 则没有这个限制。

SocketChannel
  • SocketChannel 用来连接 Socket 套接字;
  • SocketChannel 用途用来处理网络 I/O 的通道;
  • SocketChannel 基于 TCP 连接传输;
  • SocketChannel 实现了可选择通道,可以被多路复用。

SocketChannel 是支持非阻塞 socket,改进传统单向流 API,同时支持读写。可以设置 Channel 的(非)阻塞模式,调用 configureBlocking(),通过 isBlocking() 方法判断模式。

public class Test {
    public static final String GREETING = "Hello, World!\n";

    public static void main(String[] args) throws Exception {
        int port = 1234;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        }

        ByteBuffer buffer = ByteBuffer.wrap(GREETING.getBytes());
        // 使用 open 打开 ServerSocketChannel
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.socket().bind(new InetSocketAddress(port));
        // false 表示非阻塞,true 表示阻塞
        ssc.configureBlocking(false);
        while (true) {
            System.out.println("Waiting for connection...");
            // 使用 accept 接受连接,如果是阻塞模式会在这里进行阻塞
            SocketChannel sc = ssc.accept();
            if (sc == null) {
                System.out.println("No connection");
                Thread.sleep(2000);
            } else {
                System.out.println("Connection established: " + sc.socket().getRemoteSocketAddress());
                // 重置 buffer 的状态
                buffer.rewind();
                sc.write(buffer);
                sc.close();
            }
        }
    }
}
DatagramChannel

DatagramChannel 模拟包导向的无连接协议(如 UDP/IP)。DatagramChannel 是无连接的,每个数据报(datagram)都是一个自包含的实体,拥有它自己的目的地址及不依赖其他数据报的数据负载。与面向流的的 socket 不同,DatagramChannel 可以发送单独的数据报给不同的目的地址。同样,DatagramChannel 对象也可以接收来自任意地址的数据包。每个到达的数据报都含有关于它来自何处的信息(源地址)

@DisplayName("发包的 DatagramChannel")
@Test
public void sendDatagram() throws IOException, InterruptedException {
    // 使用 try-catch-resource 的方式,在结束自动执行 close() 方法
    try (DatagramChannel sendChannel = DatagramChannel.open()) {
        InetSocketAddress sendAddress = new InetSocketAddress("127.0.0.1", 9999);
        while (true) {
            sendChannel.send(ByteBuffer.wrap(
                "测试包发送".getBytes(StandardCharsets.UTF_8)),
                             sendAddress
                            );
            log.info("发包端发包");
            Thread.sleep(2000);
        }
    }
}

@DisplayName("收包端 DatagramChannel")
@Test
public void receive() throws IOException {
    try (DatagramChannel receiveChannel = DatagramChannel.open()) {
        InetSocketAddress receiveAddress = new InetSocketAddress(9999);
        // 绑定端口为9999
        receiveChannel.bind(receiveAddress);
        ByteBuffer receiveBuffer = ByteBuffer.allocate(512);
        while (true) {
            receiveBuffer.clear();
            // 可以获取发包端的 ip + 端口等信息
            // 格式:/127.0.0.1:58857
            SocketAddress sendAddress = receiveChannel.receive(receiveBuffer);
            receiveBuffer.flip();
            log.info(sendAddress.toString());
            log.info("收包端收包:{}", StandardCharsets.UTF_8.decode(receiveBuffer));
        }
    }
}

@Test
public void testConnect1() throws IOException {
    try (DatagramChannel connChannel = DatagramChannel.open()) {
        connChannel.bind(new InetSocketAddress(9998));
        connChannel.connect(new InetSocketAddress("127.0.0.1", 9999));
        // 向9999端口发送信息
        connChannel.write(ByteBuffer.wrap("发包".getBytes(StandardCharsets.UTF_8)));
        ByteBuffer readBuffer = ByteBuffer.allocate(512);
        while (true) {
            try {
                readBuffer.clear();
                connChannel.read(readBuffer);
                readBuffer.flip();
                log.info(StandardCharsets.UTF_8.decode(readBuffer).toString());
            } catch (Exception e) {
            }
        }
    }
}

Buffer

在 NIO 库中,所有数据都是用缓冲区处理。

Buffer 继承链

当向 Buffer 写入数据时,Buffer 会记录下写了多少数据。一旦要读取数据,需要通过 flip() 方法将 Buffer 从写模式切换到读模式。在读模式下,可以读取之前写入到 Buffer 的所有数据。一旦读完了所有的数据,就需要清空缓冲区,让它可以再次被写入。

@Test
@DisplayName("IntBuffer 测试")
public void IntBufferTest() {
    // 分配新的int 缓冲区,参数为缓冲区容量
	// 新缓冲区的当前位置将为零,其界限(限制位置)将为其容量。
	// 它将具有一个底层实现数组,其数组偏移量将为零。
    IntBuffer intBuffer = IntBuffer.allocate(10);
    for (int i = 0; i < intBuffer.capacity(); i++) {
        // 将给定整数写入此缓冲区的当前位置,当前位置递增
        intBuffer.put(i * 2 + 1);
    }
    // 重设此缓冲区,将限制设置为当前位置,然后将当前位置设置为0
    intBuffer.flip();
    while (intBuffer.hasRemaining()) {
        log.info("intBuffer.get() = {}", intBuffer.get());
    }
}
基本属性/方法

Buffer 具有三个属性:

  • capacity:Buffer 有一个固定的大小值,有一个容量;

  • position:

    • 写入数据:

      position 表示写入数据的当前位置,position 的初始值为0,position 最大可为capacity – 1。

    • 读取数据:

      position 表示读入数据的当前位置,如 position=2 时表示已开始读入了3个 byte,或从第3个 byte 开始读取。通过ByteBuffer.flip() 切换到读模式时 position 会被重置为0。

  • limit:

    • 写入数据:

      limit 表示可对 Buffer 最多写入多少个数据。写模式下,limit 等于 Buffer 的 capacity。

    • 读取数据:

      limit 表示 Buffer 里有多少可读数据(not null 的数据),因此能读到之前写入的所有数据(limit 被设置成已写数据的数量,这个值在写模式下就是 position,通俗来说,就是你能读取到对应的数据的位置)。

Buffer 方法:

  • rewind():将 position 重置为0,limit 保持不变;
  • clear() & compact():
    • clear():position 重置为0,limit 被设置为 capacity 的值,如果存在未读数据,数据将“被遗忘”,不会被标记出来。Buffer 中的数据并未清除;
    • compact():所有未读的数据拷贝到 Buffer 起始处。将 position 设为最后一个未读元素之后。limit 属性依然设置 capacity。Buffer 写入数据时不会覆盖未读数据;
  • mark() & reset():mark() 可以标记 Buffer 中的一个特定 position,调用Buffer.reset()方法恢复到这个position。
@Test
public void testConnect3() throws IOException {
    ByteBuffer buffer = ByteBuffer.allocate(10);
    // 缓冲区中的数据0-9
    for (int i = 0; i < buffer.capacity(); ++i) {
        buffer.put((byte) i);
    }
    // 创建子缓冲区
    buffer.position(3);
    // 设置最多操作到第七位的数据
    buffer.limit(7);
    // slice 与原来的 buffer 共享数据
    ByteBuffer slice = buffer.slice();
    // 改变子缓冲区的内容
    for (int i = 0; i < slice.capacity(); ++i) {
        byte b = slice.get(i);
        b *= 10;
        slice.put(i, b);
    }

    buffer.position(0);
    buffer.limit(buffer.capacity());
    while (buffer.remaining() > 0) {
        System.out.println(buffer.get());
    }
    /*
    0
    1
    2
    30
    40
    50
    60
    7
    8
    9
    */
}
缓冲区操作

调用buffer.asReadOnlyBuffer();,创建一个只读缓冲区,原缓冲区内容变化时,只读缓冲区也随之变化。

allocateDirect()生成一个直接缓冲区,尝试避免将缓冲区的内容拷贝到一个中间缓冲区中或者从一个中间缓冲区中拷贝数据。

Selector

更少的线程处理通道,相比使用多个线程,避免了线程上下文切换带来的开销。

Selector 功能图示

SelectableChannel 类提供了实现通道的可选择性所需要的公共方法,socket 通道都继承自这个父类,而 FileChannel 类没有。

使用 Channel.register(Selector sel, int ops) 方法,将一个通道注册到一个选择器。

第二个参数表示查询的通道操作,包括以下四种(多种模式可以通过|连接):

  • 可读: SelectionKey.OP_READ;
  • 可写: SelectionKey.OP_WRITE;
  • 连接: SelectionKey.OP_CONNECT;
  • 接收: SelectionKey.OP_ACCEPT。

使用 select() 可以查询出已经就绪的通道操作,就绪的状态集合存放在Set<SelectionKey>集合中。

select() 有几个重载的方法:

  • select():阻塞到至少有一个通道在你注册的事件上就绪了;
  • select(long timeout):和 select() 一样,但最长阻塞事件为 timeout 毫秒;
  • selectNow():非阻塞,只要有通道就绪就立刻返回。

select() 方法返回的 int 值,表示有多少通道已经就绪,更准确的说,是自前一次 select方法以来到这一次 select 方法之间的时间段上,有多少通道变成就绪状态。

@Test
public void ServerDemo() {
    try (ServerSocketChannel ssc = ServerSocketChannel.open()) {
        ssc.socket().bind(new InetSocketAddress("127.0.0.1", 9000));
        // 设置非阻塞,因为 Selector 只能接受非阻塞通道
        // 如果是阻塞通道,那么这里就会抛出 IOException
        // 所以 FileChannel 不能注册到 Selector 上
        ssc.configureBlocking(false);

        Selector selector = Selector.open();
        // 通道不是支持所有的模式,ServerSocketChannel 支持 Accept
        // 查看类内部的 validOps() 方法,获取通道支持的模式
        ssc.register(selector, SelectionKey.OP_ACCEPT);

        ByteBuffer readBuffer = ByteBuffer.allocate(1024);
        ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
        writeBuffer.put("received".getBytes());
        writeBuffer.flip();

        while (true) {
            int nReady = selector.select();
            Set<SelectionKey> keys = selector.selectedKeys();
            Iterator<SelectionKey> it = keys.iterator();

            while (it.hasNext()) {
                SelectionKey key = it.next();
                if (key.isAcceptable()) {
                    // 创建新的连接,并且把连接注册到 selector
                    // 声明 channel 只对读操作感兴趣
                    SocketChannel socketChannel = ssc.accept();
                    socketChannel.configureBlocking(false);
                    socketChannel.register(selector, SelectionKey.OP_READ);
                } else if (key.isReadable()) {
                    // 获取 key 对应的 channel
                    try (SocketChannel socketChannel = (SocketChannel) key.channel()) {
                        readBuffer.clear();
                        socketChannel.read(readBuffer);

                        readBuffer.flip();
                        log.info("received: " + StandardCharsets.UTF_8.decode(readBuffer));
                        key.interestOps(SelectionKey.OP_WRITE);
                    }
                } else if (key.isWritable()) {
                    writeBuffer.rewind();
                    try (SocketChannel socketChannel = (SocketChannel) key.channel()) {
                        socketChannel.write(writeBuffer);
                        key.interestOps(SelectionKey.OP_READ);
                    }
                }
                it.remove();
            }
        }
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}

@Test
public void ClientDemo() {
    try (SocketChannel socketChannel = SocketChannel.open()) {
        socketChannel.connect(new InetSocketAddress("127.0.0.1", 9000));

        ByteBuffer writeBuffer = ByteBuffer.allocate(32);
        ByteBuffer readBuffer = ByteBuffer.allocate(32);

        writeBuffer.put("hello server".getBytes());
        writeBuffer.flip();

        while (true) {
            writeBuffer.rewind();
            socketChannel.write(writeBuffer);
            readBuffer.clear();
            socketChannel.read(readBuffer);
        }
    } catch (IOException e) {
    }
}

4.2.3 Pipe

Pipe 用于实现两个线程之间数据的单向流通。

@Test
public void testPipe() throws IOException {
    // 1、获取通道
    Pipe pipe = Pipe.open();
    // 2、获取 sink 管道,用来传送数据
    Pipe.SinkChannel sinkChannel = pipe.sink();
    // 3、申请一定大小的缓冲区
    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
    byteBuffer.put("hello, world".getBytes());
    byteBuffer.flip();
    // 4、sink 发送数据
    sinkChannel.write(byteBuffer);
    // 5、创建接收 pipe 数据的 source 管道
    Pipe.SourceChannel sourceChannel = pipe.source();
    // 6、接收数据,并保存到缓冲区中
    ByteBuffer byteBuffer2 = ByteBuffer.allocate(1024);
    int length = sourceChannel.read(byteBuffer2);
    System.out.println(new String(byteBuffer2.array(), 0, length));
    sourceChannel.close();
    sinkChannel.close();
}

4.3 AIO(NIO 2)

Java AIO异步非阻塞,AIO 引入了异步通道的概念,采用了 Proactor 模式,简化了程序编写,有效的请求才启动线程,它的特点是先由操作系统完成后才通知服务端程序启动线程去处理,一般适用于连接数较多且连接时间较长的应用。

异步 IO 是基于事件和回调机制实现的,AIO 模式不需要 selector 操作,而是事件驱动形式。当客户端发送数据之后,会主动通知服务器,接着服务器再进行读写操作。

5. Shiro

想要使用 Shiro 中,需要添加以下依赖:

<dependency>
    <groupId>org.apache.shiro</groupId>
    <artifactId>shiro-spring-boot-web-starter</artifactId>
    <version>1.9.0</version>
</dependency>
  1. UsernamePasswordToken,Shiro 用来封装用户登录信息,使用用户的登录信息创建令牌 Token,登录的过程即 Shiro 验证令牌是否具有合法身份以及相关权限;
  2. SecurityManager,Shiro 的核心部分,负责安全认证与授权;
  3. Subject,Shiro 的一个抽象概念,包含了用户信息;
  4. Realm,开发者自定义的模块,根据项目的需求,验证和授权的逻辑在 Realm 中实现;
  5. AuthenticationInfo,用户的角色信息集合,认证时使用;
  6. AuthorizationInfo,角色的权限信息集合,授权时使用;
  7. DefaultWebSecurityManager,安全管理器,开发者自定义的 Realm 需要注入到 DefaultWebSecurityManager 进行管理才能生效;
  8. ShiroFilterFactoryBean,过滤器工厂,Shiro 的基本运行机制是开发者定制规则,Shiro 去执行,具体的执行操作就是由 ShiroFilterFactoryBean 创建一个个 Filter 对象来完成。

Shiro 运行流程

5.1 SpringBoot 整合 JWT

首先配置自己的 Realm,最简单使用账号密码。AuthenticationInfo 作为认证部分,AuthorizationInfo 作为授权部分(不要添加 @Component,会在之后的 config 冲突)。

import fun.sast.shirotest.entity.Account;
import fun.sast.shirotest.exception.LocalRuntimeException;
import fun.sast.shirotest.service.AccountService;
import lombok.extern.slf4j.Slf4j;
import org.apache.shiro.SecurityUtils;
import org.apache.shiro.authc.*;
import org.apache.shiro.authz.AuthorizationInfo;
import org.apache.shiro.authz.SimpleAuthorizationInfo;
import org.apache.shiro.realm.AuthorizingRealm;
import org.apache.shiro.subject.PrincipalCollection;
import org.apache.shiro.subject.Subject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.HashSet;
import java.util.Set;

@Slf4j
public class UserRealm extends AuthorizingRealm {
    // 因为时候要配置 config,所以这里使用字段注入
    @Autowired
    private AccountService accountService;

    // 授权
    @Override
    protected AuthorizationInfo doGetAuthorizationInfo(
            PrincipalCollection principalCollection) {
        // 获取当前登录对象
        Subject subject = SecurityUtils.getSubject();
        Account account = (Account) subject.getPrincipal();

        // 设置角色
        Set<String> roles = new HashSet<>();
        roles.add(account.getRole().toString());
        SimpleAuthorizationInfo info = new SimpleAuthorizationInfo(roles);

        // 设置权限
        // 因为表中并没有权限字段,这里忽略
        // info.addStringPermission(account.getPerms());
        info.addRoles(roles);
        return null;
    }

    // 认证
    @Override
    protected AuthenticationInfo doGetAuthenticationInfo(
            AuthenticationToken authenticationToken) throws AuthenticationException {
        // 客户端传来的 username 和 password 会自动封装到 token
        UsernamePasswordToken token = (UsernamePasswordToken) authenticationToken;
        Account account = accountService.getAccountByUsername(token.getUsername());
        if (account == null) {
            log.error("用户不存在");
            throw new LocalRuntimeException("用户不存在");
        }
        return new SimpleAuthenticationInfo(account, account.getPassword(), getName());
    }
}

认证过滤器:

  • anon:无需认证即可访问,游客身份;
  • authc:必须认证(登录)才能访问;
  • authcBasic:需要通过 httpBasic 认证;
  • user:不一定已通过认证,只要是曾经被 Shiro 记住过登录状态的用户就可以正常发起请求,比如 rememberMe。

授权过滤器:

  • perms:必须拥有对某个资源的访问权限(授权)才能访问;
  • role:必须拥有某个角色权限才能访问;
  • port:请求的端口必须为指定值才可以访问;
  • rest:请求必须是 RESTful,method 为 post、get、delete、put;
  • ssl:必须是安全的 URL 请求,协议为 HTTPS。
import fun.sast.shirotest.common.content.UserRealm;
import org.apache.shiro.spring.web.ShiroFilterFactoryBean;
import org.apache.shiro.web.mgt.DefaultWebSecurityManager;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class ShiroConfig {
    // 通过 @Qualifier 注入创建好的 Bean
    @Bean("shiroFilterFactoryBean")
    public ShiroFilterFactoryBean filterFactoryBean(
        @Qualifier("manager") DefaultWebSecurityManager manager){
        ShiroFilterFactoryBean factoryBean = new ShiroFilterFactoryBean();
        factoryBean.setSecurityManager(manager);
        Map<String,String> map = new HashMap<>();
        map.put("/main","authc");
        map.put("/manage","perms[manage]");
        map.put("/administrator","roles[administrator]");
        factoryBean.setFilterChainDefinitionMap(map);
        //设置登录页面
        factoryBean.setLoginUrl("/login");
        //未授权页面
        factoryBean.setUnauthorizedUrl("/unauth");
        return factoryBean;
    }

    @Bean("manager")
    public DefaultWebSecurityManager manager(@Qualifier("userRealm") UserRealm userRealm){
        DefaultWebSecurityManager manager = new DefaultWebSecurityManager();
        manager.setRealm(userRealm);
        return manager;
    }

    @Bean("userRealm")
    public UserRealm userRealm(){
        return new UserRealm();
    }
}

Shiro 的问题是不能返回 json,只能跳转对应 url,在前后端分离式开发中比较麻烦。

import lombok.extern.slf4j.Slf4j;
import org.apache.shiro.SecurityUtils;
import org.apache.shiro.authc.IncorrectCredentialsException;
import org.apache.shiro.authc.UnknownAccountException;
import org.apache.shiro.authc.UsernamePasswordToken;
import org.apache.shiro.subject.Subject;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.*;

// 因为返回的是 url,所以没有必要添加 @ResponseBody 的注解
@Controller
@Slf4j
@RequestMapping("account")
public class LoginController {

    @GetMapping("/{url}")
    public String redirect(@PathVariable("url") String url) {
        return url;
    }

    @PostMapping("/login")
    public String login(String username, String password, Model model) {
        Subject subject = SecurityUtils.getSubject();
        UsernamePasswordToken token = new UsernamePasswordToken(username, password);
        try {
            subject.login(token);
            return "index";
        } catch (UnknownAccountException e) {
            model.addAttribute("msg", "用户名错误");
            return "login";
        } catch (IncorrectCredentialsException e) {
            model.addAttribute("msg", "密码错误");
            return "login";
        }
    }

    @RequestMapping("/unauth")
    @ResponseBody
    public String unauth() {
        return "未授权没有访问权限";
    }
}

5.2 结合 JWT

  1. POST 用户名与密码到 /login 进行登入,如果成功返回一个加密 token,失败的话直接返回 404 错误;
  2. 之后用户访问每一个需要权限的网址请求需在 header 中添加 Authorization 字段,例如 Authorization: tokentoken 为密钥;
  3. 后台会进行 token 的校验,如果有误会直接返回 404。

5.2.1 实现 JWTToken

JWTTokenShiro 用户名密码的载体,在前后端分离开发中,服务器无需保存用户状态,不需要 RememberMe 这类功能,简单的实现 AuthenticationToken 接口即可。token 中存放用户 uid。

import org.apache.shiro.authc.AuthenticationToken;

public class JWTToken implements AuthenticationToken {
    // 密钥
    private String token;

    public JWTToken(String token) {
        this.token = token;
    }

    @Override
    public Object getPrincipal() {
        return token;
    }

    @Override
    public Object getCredentials() {
        return token;
    }
}

5.2.2 实现 Realm

realm 的用于处理用户是否合法,自定义鉴权中,需要自定义。

import com.sast.jwt.common.enums.CustomError;
import com.sast.jwt.entity.Account;
import com.sast.jwt.exception.LocalRuntimeException;
import com.sast.jwt.service.AccountService;
import com.sast.jwt.utils.JWTUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.shiro.authc.AuthenticationException;
import org.apache.shiro.authc.AuthenticationInfo;
import org.apache.shiro.authc.AuthenticationToken;
import org.apache.shiro.authc.SimpleAuthenticationInfo;
import org.apache.shiro.authz.AuthorizationInfo;
import org.apache.shiro.authz.SimpleAuthorizationInfo;
import org.apache.shiro.realm.AuthorizingRealm;
import org.apache.shiro.subject.PrincipalCollection;
import org.springframework.stereotype.Component;

@Slf4j
public class MyRealm extends AuthorizingRealm {
    private final JWTUtil jwtUtil;

    private final AccountService accountService;

    public MyRealm(AccountService accountService, JWTUtil jwtUtil) {
        this.accountService = accountService;
        this.jwtUtil = jwtUtil;
    }
    
    /*
     * 大坑!,必须重写此方法,不然 Shiro 会报错
     * 这里重写 token 的验证
     */
    @Override
    public boolean supports(AuthenticationToken token) {
        return token instanceof JWTToken;
    }

    // 默认使用此方法进行用户名正确与否验证,错误抛出异常即可。
    @Override
    protected AuthenticationInfo doGetAuthenticationInfo(AuthenticationToken auth)
            throws AuthenticationException {
        String token = (String) auth.getCredentials();
        Long userId = jwtUtil.getId(token);
        Account account = accountService.getAccountById(userId);
        if (account == null) {
            throw new LocalRuntimeException(CustomError.NO_USER);
        }
        return new SimpleAuthenticationInfo(account.getUsername(), token, "my_realm");
    }
    
    /*
     * 只有当需要检测用户权限的时候才会调用此方法
     * 例如 checkRole,checkPermission 之类的
     */
    @Override
    protected AuthorizationInfo doGetAuthorizationInfo(PrincipalCollection principals) {
        Account user = accountService.getAccountByUsername(principals.toString());
        SimpleAuthorizationInfo info = new SimpleAuthorizationInfo();
        info.addRole(user.getRole().toString());
        return info;
    }
}

5.2.3 重写 Filter

所有的请求都会先经过 Filter,需要继承官方的 BasicHttpAuthenticationFilter ,并且重写鉴权的方法。

代码的执行流程 preHandle -> isAccessAllowed -> isLoginAttempt -> executeLogin

import com.sast.jwt.common.contents.JWTToken;
import lombok.extern.slf4j.Slf4j;
import org.apache.shiro.web.filter.authc.BasicHttpAuthenticationFilter;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RequestMethod;

import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;

@Component
@Slf4j
// 代码的执行流程 preHandle -> isAccessAllowed -> isLoginAttempt -> executeLogin
public class JWTFilter extends BasicHttpAuthenticationFilter {
    @Override
    protected boolean preHandle(ServletRequest request, ServletResponse response) throws Exception {
        HttpServletRequest httpServletRequest = (HttpServletRequest) request;
        HttpServletResponse httpServletResponse = (HttpServletResponse) response;
        httpServletResponse.setHeader("Access-control-Allow-Origin",
                httpServletRequest.getHeader("Origin"));
        httpServletResponse.setHeader("Access-Control-Allow-Methods",
                "GET,POST,OPTIONS,PUT,DELETE");
        httpServletResponse.setHeader("Access-Control-Allow-Headers",
                httpServletRequest.getHeader("Access-Control-Request-Headers"));
        // 跨域时会首先发送一个 option 请求,这里我们给 option 请求直接返回正常状态
        if (httpServletRequest.getMethod().equals(RequestMethod.OPTIONS.name())) {
            httpServletResponse.setStatus(HttpStatus.OK.value());
            return false;
        }
        return super.preHandle(request, response);
    }

    /*
     * 将非法请求跳转到 /404
     */
    private void response404(ServletRequest req, ServletResponse resp) {
        try {
            HttpServletResponse httpServletResponse = (HttpServletResponse) resp;
            httpServletResponse.sendRedirect("/404");
        } catch (IOException e) {
            log.error(e.getMessage());
        }
    }

    /*
     * 这里我们详细说明下为什么最终返回的都是 true,即允许访问
     * 例如我们提供一个地址 GET /article
     * 登入用户和游客看到的内容是不同的
     * 如果在这里返回了 false,请求会被直接拦截,用户看不到任何东西
     * 所以我们在这里返回 true,Controller 中可以通过 subject.isAuthenticated() 来判断用户是否登入
     * 如果有些资源只有登入用户才能访问,我们只需要在方法上面加上 @RequiresAuthentication 注解即可
     * 但是这样做有一个缺点,就是不能够对 GET,POST 等请求进行分别过滤鉴权
     * (因为我们重写了官方的方法),但实际上对应用影响不大
     */
    @Override
    protected boolean isAccessAllowed(ServletRequest request,
                                      ServletResponse response,
                                      Object mappedValue) {
        if (isLoginAttempt(request, response)) {
            try {
                executeLogin(request, response);
            } catch (Exception e) {
                response404(request, response);
            }
        }
        return true;
    }

    /*
     * 判断用户是否想要登入。
     * 检测 header 里面是否包含 Authorization 字段即可
     */
    @Override
    protected boolean isLoginAttempt(ServletRequest request, ServletResponse response) {
        HttpServletRequest req = (HttpServletRequest) request;
        String authorization = req.getHeader("Authorization");
        return authorization != null;
    }

    @Override
    protected boolean executeLogin(ServletRequest request, ServletResponse response) throws Exception {
        HttpServletRequest httpServletRequest = (HttpServletRequest) request;
        String authorization = httpServletRequest.getHeader("Authorization");

        JWTToken token = new JWTToken(authorization);
        // 提交给 realm 进行登入,如果错误他会抛出异常并被捕获
        // 这一步就是提交给了 realm 进行处理。
        getSubject(request, response).login(token);
        // 如果没有抛出异常则代表登入成功,返回true
        return true;
    }
}

5.2.4 配置 Shiro

import com.sast.jwt.common.contents.MyRealm;
import com.sast.jwt.filter.JWTFilter;
import org.apache.shiro.mgt.DefaultSessionStorageEvaluator;
import org.apache.shiro.mgt.DefaultSubjectDAO;
import org.apache.shiro.spring.LifecycleBeanPostProcessor;
import org.apache.shiro.spring.security.interceptor.AuthorizationAttributeSourceAdvisor;
import org.apache.shiro.spring.web.ShiroFilterFactoryBean;
import org.apache.shiro.web.mgt.DefaultWebSecurityManager;
import org.springframework.aop.framework.autoproxy.DefaultAdvisorAutoProxyCreator;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;

import javax.servlet.Filter;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class ShiroConfig {
    @Bean("securityManager")
    public DefaultWebSecurityManager getManager(MyRealm realm) {
        DefaultWebSecurityManager manager = new DefaultWebSecurityManager();
        // 使用自己的 realm
        manager.setRealm(realm);

        /*
         * 关闭 Shiro 自带的 session,详情见文档
         * http://shiro.apache.org/session-management.html#SessionManagement-StatelessApplications%28Sessionless%29
         */
        DefaultSubjectDAO subjectDAO = new DefaultSubjectDAO();
        DefaultSessionStorageEvaluator defaultSessionStorageEvaluator =
                new DefaultSessionStorageEvaluator();
        defaultSessionStorageEvaluator.setSessionStorageEnabled(false);
        subjectDAO.setSessionStorageEvaluator(defaultSessionStorageEvaluator);

        manager.setSubjectDAO(subjectDAO);
        return manager;
    }

    // 如果没有规定 ShiroFilterFactoryBean 会bao
    @Bean("ShiroFilterFactoryBean")
    public ShiroFilterFactoryBean factory(DefaultWebSecurityManager securityManager) {
        ShiroFilterFactoryBean factoryBean = new ShiroFilterFactoryBean();

        // 添加自己的过滤器并且取名为 jwt
        Map<String, Filter> filterMap = new HashMap<>();
        filterMap.put("jwt", new JWTFilter());
        factoryBean.setFilters(filterMap);

        factoryBean.setSecurityManager(securityManager);
        factoryBean.setUnauthorizedUrl("/401");

        /*
         * 自定义url规则
         * http://shiro.apache.org/web.html#urls-
         * 定义对应页面的规则
         */
        Map<String, String> filterChain = new HashMap<>();
        // 所有请求通过我们自己的 JWT Filter
        filterChain.put("/**", "jwt");
        // 登录/注册相关的请求不通过我们的JWT Filter
        filterChain.put("/user/login", "anon");
        filterChain.put("/user/register", "anon");
        // 访问 401和 404 页面不通过我们的 Filter
        // 可惜 Shiro 只能跳转页面,不能返回 json 404 页面
        filterChain.put("/404", "anon");
        filterChain.put("/401", "anon");
        factoryBean.setFilterChainDefinitionMap(filterChain);
        return factoryBean;
    }

    /**
     * 下面的代码是添加注解支持
     */
    @Bean
    @DependsOn("lifecycleBeanPostProcessor")
    public DefaultAdvisorAutoProxyCreator defaultAdvisorAutoProxyCreator() {
        DefaultAdvisorAutoProxyCreator defaultAdvisorAutoProxyCreator = new DefaultAdvisorAutoProxyCreator();
        // 强制使用 cglib,防止重复代理和可能引起代理出错的问题
        // https://zhuanlan.zhihu.com/p/29161098
        defaultAdvisorAutoProxyCreator.setProxyTargetClass(true);
        return defaultAdvisorAutoProxyCreator;
    }

    @Bean
    public LifecycleBeanPostProcessor lifecycleBeanPostProcessor() {
        return new LifecycleBeanPostProcessor();
    }

    @Bean
    public AuthorizationAttributeSourceAdvisor
    authorizationAttributeSourceAdvisor(DefaultWebSecurityManager securityManager) {
        AuthorizationAttributeSourceAdvisor advisor = new AuthorizationAttributeSourceAdvisor();
        advisor.setSecurityManager(securityManager);
        return advisor;
    }
}

6. ElasticSearch

ElasticSearch 是一款非常强大的、基于 Lucene 的开源搜索及分析引擎,是一个实时的分布式搜索分析引擎。基于 Restful WebApi,使用 Java 语言开发。

ES 对于传统 RDBM

6.1 配置安装

配置主要针对于 ver 8,而 ES 在这里添加了新的配置,导致一些原始的配置无法使用。

6.1.1 Windows

ES

在官网下载对应 ES 安装包,解压之后打开 bin 目录下的 ES 的脚本文件,启动完成就可以在 localhost:9200 端口查看。

启动成功示例

如果启动成功却无法访问网页,是因为配置文件中的安全配置默认开启,需设置 xpack.security.enabled: false,此配置会默认开启 SSL 认证。

在修改这个配置之后会导致 ES 的安全性检测问题,使得一些命令被禁止使用,并且会在使用 Kibana 时遇到一些问题,显示证书验证失效。

显示集群安全问题,无法执行重置操作

在启动无问题之后,可以用 elasticsearch-service.bat install 将 ES 添加到 Windows 服务中。

在 ES 第一次启动时,会提供 Password、token 等配置,在后续配置其他管理工具使用,也可以针对集群进行配置。

token 的有效时间为 30 分钟,后续使用需要内部运行程序重新生成

head 插件

对于 ES 节点的管理,可以使用官方的 head 插件。在这之前需要 node 环境,运行 npm install -g grunt-cli 安装 grunt。

在下载好 head 文件之后,进入文件夹 npm install 进行安装。在安装过程中可能会出现 JS core 的版本问题,使用 npm install npm@6.14.13 -g 进行回退。

# 在 ES 配置文件中添加以下配置将 head 添加
http.cors.enabled: true 
http.cors.allow-origin: "*"

安装完成之后 npm run startgrunt server,启动 head 插件(如果显示端口占用,可以在 Gruntfile.js 文件中修改端口)。

Kibana

Kibana 是开源的可视化管理工具,需要和 ES 版本一致。

在 ver 8 之后,官方禁止直接使用 elastic 用户进行连接,推荐 Service Account token(目前还不知道怎么实现)。在本地测试中,可以直接使用 localhost:9200 进行手动配置连接。

显示 elastic 用户权限过高,无法用于连接

# 默认使用语言是 EN
i18n.locale: "zh-CN"

# 如果配置 IP 访问需要配置,默认是本地的 9200 端口
elasticsearch.hosts: ["http://localhost:9200"]

6.2 ES 操作

针对 ES 可以使用之前的 head 插件,也能用更方便的 Kinbana 直接进行可视化管理。因为本身是个 No SQL,可以通过 Postman 调用 URL 进行操作。

使用 Postman 连接本地 ES 操作

在 Kibana 的开发者工具中进行测试

一个索引可以存储超出单个节点硬件限制的大量数据。或者单个节点处理搜索请求,响应太慢。为了解决这个问题,Elasticsearch 提供了将索引划分成多份的能力,每一份就称之为分片(Shards)。当你创建一个索引的时候,你可以指定你想要的分片的数量。每个分片本身也是一个功能完善并且独立的“索引”,这个“索引”可以被放置到集群中的任何节点上。

6.2.1 索引管理

在 ES 中,能搜索的数据必须索引,这样可以加快速度。

PUT /bank
# 创建索引,之后可以在索引里添加数据
# 不能使用 POST,因为 POST 没有幂等性

DELETE /bank
# 删除索引

GET /_cat/indices?v
# 查询目前所有索引

在索引中,可以使用 type 存储不同类型的数据,但是在 ver 7 之后只能使用 _doc 存储数据。

版本 Type
5.x 支持多种 type
6.x 只能有一种 type
7.x 默认不再支持自定义索引类型(默认类型为:_doc)

6.2.2 数据存储

POST /bank/_doc 
{
  "name": "cxy621",
  "age": 18
}
# 每个数据会生成一个唯一 id
# 文档中插入数据只能使用 POST,非幂等性每次会生成不同的 id
# 如果 id 重复,会自动变成修改操作

GET /bank/_doc/fgHXFoIBONKzPt9m8wAN
# 获取对应 id 数据

GET /_cat/indices?v
# 查询所有索引情况

查询结果

# 获取索引下的所有内容
GET /shopping/_search

查询所有数据返回数据

  • took:Elasticsearch运行查询所花费的时间(以毫秒为单位);

  • timed_out:搜索请求是否超时;

  • _shards:搜索了多少个碎片,以及成功,失败或跳过了多少个碎片的细目分类;

  • max_score:找到的最相关文档的分数;

  • hits.total.value:找到了多少个匹配的文档;

  • hits.sort:文档的排序位置(不按相关性得分排序时);

  • hits._score:文档的相关性得分(使用 match_all 时不适用,在条件查询中会根据 score 来判断相关性的大小)。

6.2.3 条件查询

GET /shopping/_search
{
  "query": {"match_all": {}},
  "sort": [
    {
      "account_number": "desc"
    }
  ],
  "from": 3,
  "size": 2
}
# query 表示通过条件查询,match_all 表示全部
# sort 表示通过字段排序
# form, size 用于分页查询

# match 会匹配包含字段的内容
GET /shopping/_search
{
  "query":{
    "match": {
      "city": "南京"
    }
  }
}

# match_phrase 强制匹配字段
GET /shopping/_search
{
  "query": {
    "match_phrase": {
      "city": "南京"
    }
  }
}

ES 的分词器会让 match 匹配出包含字的数据

GET /shopping/_search
{
  "query": {
    "bool": {
      "must": [
        {
          "match": {
            "city": "南京"
          }
        }
      ],
      "must_not": [
        {
          "match": {
            "_id": 4
          }
        }
      ]
    }
  }
}
# 通过 bool 进行复合查询

GET /bank/_search
{
  "query": {
    "bool": {
      "must": [
        {
          "match": {
            "state": "ND"
          }
        }
      ],
      "filter": [
        {
          "term": {
            "age": "40"
          }
        },
        {
          "range": {
            "balance": {
              "gte": 20000,
              "lte": 30000
            }
          }
        }
      ]
    }
  }
}
# must 和 filter 的区别在于前者是会给文档打分,后者是直接过滤
# term 表示对应字段
# range 中 gte 表示大于,lte 表示小于

6.2.4 聚合查询

ES 中的聚合是 agg,表示聚合。

GET /bank/_search
{
  "size": 0,
  "aggs": {
    "group_by_state": {
      "terms": {
        "field": "state.keyword"
      }
    }
  }
}
# keyword 表示对关键字的统计
# 每次查询会在 hits 中展示具体信息,size 设置为0表示不需要

GET /bank/_search
{
  "size": 0,
  "aggs": {
    "group_by_state": {
      "terms": {
        "field": "state.keyword",
        "order": {
          "average_balance": "desc"
        }
      },
      "aggs": {
        "average_balance": {
          "avg": {
            "field": "balance"
          }
        }
      }
    }
  }
}
# 聚合中的字段进行自命名,可以嵌套使用
# avg 求平均值关键字
# 使用 order + 字段

6.3 整合 SpringBoot

操作 ES 需要添加 Spring Data 依赖。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
spring:  
  elasticsearch:
    host: localhost
    port: 9200

同 MinIO 一致,配置客户端连接,创建 Bean。

import org.elasticsearch.client.RestHighLevelClient;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.client.RestClients;
import org.springframework.data.elasticsearch.config.AbstractElasticsearchConfiguration;

// 配置 ES 连接
@Configuration
public class ESConfig extends AbstractElasticsearchConfiguration {
    @Value("${spring.elasticsearch.host}")
    private String host;

    @Value("${spring.elasticsearch.port}")
    private String port;

    @NotNull
    @Override
    @Bean
    public RestHighLevelClient elasticsearchClient() {
        final ClientConfiguration clientConfiguration =
                ClientConfiguration.builder()
                .connectedTo(host + ":" + port)
                .build();
        return RestClients.create(clientConfiguration).rest();
    }
}

客户端对象:

  • ElasticsearchOperations 通过偏向 oop 的方式操作;
  • RestHighLevelClient 类似 kibana ,通过 rest 操作(推荐)。

6.3.1 ElasticsearchOperations

创建映射实体类,将索引的中的对应 JSON 映射到对象中。

import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
 
@Document(indexName = "products")
// 指定文档的索引名称
public class Product {
    @Id
    // 指定字段作为 _id
    private Integer id;
 
    @Field(type = FieldType.Keyword)
    private String title;
 
    @Field(type = FieldType.Float)
    private Double price;
 
    @Field(type = FieldType.Text,analyzer = "ik_max_word")
    // 指定映射类型和分词器
    private String description;
    //get set...
}

测试,添加返回删除。

@SpringBootTest
class DemoApplicationTests {
 
    @Autowired
    private ElasticsearchOperations elasticsearchOperations;
 
    @Test
    void contextLoads() {
        Product product = new Product();
        product.setId(1);
        product.setTitle("iphone");
        product.setPrice(9999.0);
        product.setDescription("iphone with IOS");
 
        elasticsearchOperations.save(product);
        // save() 当文档 id 不存在时,创建文档
        // 当文档 id 存在时,更新文档
 
        Product res = elasticsearchOperations.get("1", Product.class);
 
        elasticsearchOperations.delete(product);
    }
}

6.3.2 RestHighLevelClient

import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;

@Component
public class ESUtil {
    private final RestHighLevelClient restHighLevelClient;

    public ESUtil(RestHighLevelClient restHighLevelClient) {
        this.restHighLevelClient = restHighLevelClient;
    }

    /**
     * 判断索引是否存在
     *
     * @param indexName 索引名称
     * @return
     * @throws IOException
     */
    public boolean isIndexExist(String indexName) throws IOException {
        GetIndexRequest request = new GetIndexRequest(indexName);
        return restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT);
    }

    /**
     * 创建索引
     *
     * @param indexName   索引名
     * @param mappingJson 映射
     * @throws IOException
     */
    public void createIndex(String indexName, 
                            String mappingJson) throws IOException {
        CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
        if (mappingJson != null) {
            createIndexRequest.mapping(mappingJson, XContentType.JSON);
        }
        restHighLevelClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
    }

    /**
     * 向指定索引中添加文档
     *
     * @param indexName 索引名
     * @param document  被添加的 JSON 文档
     * @param id        指定要添加的文档的 id,为 null 时 ES 会自动生成
     * @throws IOException
     */
    public void addDocument(String indexName, 
                            String document, String id) throws IOException {
        IndexRequest request = new IndexRequest(indexName);
        request.id(id).source(document, XContentType.JSON);
        restHighLevelClient.index(request, RequestOptions.DEFAULT);
    }

    /**
     * 根据 id 更新文档 (在原文档的基础上更新)
     *
     * @param indexName 索引名
     * @param id        id
     * @param document  更新内容
     */
    public void updateDocument(String indexName, 
                               String id, String document) throws IOException {
        UpdateRequest updateRequest = new UpdateRequest(indexName, id);
        updateRequest.doc(document, XContentType.JSON);
        restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
    }

    /**
     * 删除指定文档
     *
     * @param indexName
     * @param id
     */
    public void deleteDocument(String indexName, 
                               String id) throws IOException {
        DeleteRequest deleteRequest = new DeleteRequest(indexName, id);
        restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
    }

    /**
     * 根据 id 返回文档
     *
     * @param indexName
     * @param id
     * @return
     * @throws IOException
     */
    public Map<String, Object> getDocumentById(String indexName, 
                                               String id) throws IOException {
        GetRequest getRequest = new GetRequest(indexName, id);
        GetResponse documentFields = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT);
        return documentFields.getSource();
    }


    /**
     * 封装分页条件查询
     *
     * @param indexName
     * @param queryBuilder
     * @param pageNum      起始位置(从0开始)
     * @param pageSize     每一页的数量
     * @return
     * @throws IOException
     */
    private SearchResponse query(String indexName,
                                 QueryBuilder queryBuilder,
                                 int pageNum, int pageSize) throws IOException {
        SearchRequest searchRequest = new SearchRequest(indexName);

        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(queryBuilder)
                .from((pageNum - 1) * pageSize)
                .size(pageSize);

        searchRequest.source(searchSourceBuilder);
        return restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
    }

    /**
     * term 查询
     *
     * @param indexName
     * @param fieldName
     * @param terms
     * @return
     * @throws IOException
     */
    public SearchResponse termQuery(String indexName, 
                                    String fieldName, String... terms) throws IOException {
        return this.query(indexName, QueryBuilders.termsQuery(fieldName, terms), 0, 10);
    }
}

参考文章

  1. MinIO 官方网站
  2. SpringBoot 集成 MinIO 8.3.X 依赖冲突解决
  3. SpringBoot 集成 MinIO
  4. SpringBoot 整合 Minio 对象存储服务
  5. Docker 安装 RabbitMQ 无法访问页面
  6. RabbitMQ 官方网站
  7. 关于 @RunWith(SpringRunner.class)的作用
  8. spring boot 的测试类要不要加 @RunWith(SpringJUnit4ClassRunner.class)
  9. 认识一下 RabbitMQ
  10. SpringBoot 2.X 整合 RestTemplate
  11. Nacos 中 AP 和 CP 模式切换
  12. Windows 搭建 Nacos 服务
  13. 启动 nacos 集群出现端口占用问题
  14. Nacos 2.1.0 无法创建新的配置
  15. Nacos Docker 快速开始
  16. Sentinel 规则 Pull 模式持久化
  17. Netty 学习前基本知识
  18. Reactor 模式
  19. Redis 为什么这么快?
  20. Spring Boot 整合 Shiro 教程
  21. Shiro + JWT + Spring Boot Restful
  22. SpringBoot:集成 Shiro 之自定义 Realm 实现认证授权
  23. Windows 环境下 ES 安装教程
  24. Elasticsearch 8.0 received plaintext http traffic on an https channel, closing connection
  25. 【npm】core-js@2.6.12
  26. Windows 下安装和配置 Kibana
  27. ElasticSearch – 風楪fy

文章作者: 陈鑫扬
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 陈鑫扬 !
评论
  目录