1. MinIO
对于 OSS 存储,最常用的就是阿里云存储,但是我们也可以使用免费的 MinIO 在服务器上搭建自己的 OSS 存储服务。
这里是 Java SpringBoot 中所需要的 MinIO 的依赖:
<dependency>
<groupId>io.minio</groupId>
<artifactId>minio</artifactId>
<version>8.2.1</version>
</dependency>
1.1 配置单个容器
因为中文文档过老的缘故,建议查看英文最新文档。这里我们使用 Docker 搭建单个 MinIO 的容器:
docker run \
-p 9000:9000 \
-p 9001:9001 \
--name minio1 \
-e "MINIO_ROOT_USER=username" \
-e "MINIO_ROOT_PASSWORD=password" \
-v ~/minio/data:/data \
minio/minio server /data --console-address ":9001"
1.2 SpringBoot 整合 MinIO
SpringBoot yaml 配置如下:
minio:
endpoint: http://localhost:9000
accessKey: root
secretKey: sast_forever_minio
bucketName: exam
将 yaml 配置的属性绑定到实体类中,将各个名称对应:
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Data
@Component
//绑定 yaml 配置文档的属性
@ConfigurationProperties(prefix = "minio")
public class MinioYamlConfig {
private String endpoint;
private String accessKey;
private String secretKey;
private String bucketName;
}
配置好之后,我们就可以自动创建 MinIO 客户端的连接:
import io.minio.MinioClient;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
@Configuration
// 指定组件生效,并表明所在类
@EnableConfigurationProperties(MinioYamlConfig.class)
public class MinioConfig {
// 使用组件扫描注入,不需要再配置
@Resource
private MinioYamlConfig minioYamlConfig;
// 创建 minio 客户端连接
@Bean
public MinioClient minioClient() {
return MinioClient.builder()
.endpoint(minioYamlConfig.getEndpoint())
.credentials(minioYamlConfig.getAccessKey(), minioYamlConfig.getSecretKey())
.build();
}
}
之后我们确认 MinIO 的工具类:
import com.baomidou.mybatisplus.core.toolkit.Constants;
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.sast.jwt.exception.LocalRuntimeException;
import io.minio.*;
import io.minio.errors.*;
import io.minio.http.Method;
import lombok.extern.slf4j.Slf4j;
import org.apache.tomcat.util.http.fileupload.IOUtils;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.web.multipart.MultipartFile;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@Slf4j
@Component
public class MinioUtil {
private final MinioClient minioClient;
public MinioUtil(MinioClient minioClient) {
this.minioClient = minioClient;
}
//判断是否存在对应的桶,如果没有就创建一个
public void existBucket(String name) {
try {
boolean exists = minioClient.bucketExists(BucketExistsArgs.builder().bucket(name).build());
if (!exists) {
minioClient.makeBucket(MakeBucketArgs.builder().bucket(name).build());
}
} catch (Exception e) {
throw new LocalRuntimeException("创建桶失败");
}
}
//上传文件,返回文件名(文件为数组的形式)
public List<String> upload(MultipartFile[] multipartFile, String bucketName) {
List<String> names = new ArrayList<>(multipartFile.length);
for (MultipartFile file : multipartFile) {
String fileName = file.getOriginalFilename();
//使用 hutools 的判断,文件名为空或者 null 都会抛出异常
if (StringUtils.isEmpty(fileName)) {
throw new LocalRuntimeException("文件名为空");
}
String[] split = fileName.split("\\.");
if (split.length > 1) {
fileName = System.currentTimeMillis() + "_" + split[0] + "." + split[split.length - 1];
} else {
fileName = System.currentTimeMillis() + fileName;
}
InputStream in = null;
try {
in = file.getInputStream();
minioClient.putObject(PutObjectArgs.builder()
.bucket(bucketName)
.object(fileName)
//在 partSize 填入-1时表示大小不确定
.stream(in, in.available(), -1)
.contentType(file.getContentType())
.build()
);
} catch (Exception e) {
log.error(e.getMessage());
} finally {
if (in != null) {
try {
in.close();
} catch (IOException e) {
log.error(e.getMessage());
}
}
}
names.add(fileName);
}
return names;
}
public ResponseEntity<byte[]> download(String fileName, String bucketName) {
ResponseEntity<byte[]> responseEntity = null;
InputStream in = null;
ByteArrayOutputStream out = null;
try {
in = minioClient.getObject(GetObjectArgs.builder()
.bucket(bucketName).object(fileName).build());
out = new ByteArrayOutputStream();
IOUtils.copy(in, out);
//封装返回值
byte[] bytes = out.toByteArray();
HttpHeaders headers = new HttpHeaders();
try {
headers.add("Content-Disposition",
"attachment;filename=" + URLEncoder.encode(fileName, Constants.UTF_8));
} catch (UnsupportedEncodingException e) {
log.error(e.getMessage());
}
headers.setContentLength(bytes.length);
headers.setContentType(MediaType.APPLICATION_OCTET_STREAM);
headers.setAccessControlExposeHeaders(Collections.singletonList("*"));
responseEntity = new ResponseEntity<>(bytes, headers, HttpStatus.OK);
} catch (Exception e) {
log.error(e.getMessage());
} finally {
try {
if (in != null) {
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (out != null) {
out.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
return responseEntity;
}
//删除对应桶的对应文件
public void removeObject(String bucketName, String objectName) {
try {
RemoveObjectArgs objectArgs = RemoveObjectArgs.builder().object(objectName)
.bucket(bucketName).build();
minioClient.removeObject(objectArgs);
} catch (Exception e) {
throw new LocalRuntimeException("文件删除失败");
}
}
//获取文件外链
public String getObjectURL(String bucketName, String objectName, Integer expires) {
try {
GetPresignedObjectUrlArgs objectArgs = GetPresignedObjectUrlArgs.builder().object(objectName)
.bucket(bucketName)
.expiry(expires).build();
String url = minioClient.getPresignedObjectUrl(objectArgs);
return URLDecoder.decode(url, "UTF-8");
} catch (Exception e) {
log.error("文件路径获取失败" + e.getMessage());
}
return null;
}
//获取文件流
public InputStream getMinioFile(String bucketName,String objectName){
InputStream inputStream = null;
try {
GetObjectArgs objectArgs = GetObjectArgs.builder().object(objectName)
.bucket(bucketName).build();
inputStream = minioClient.getObject(objectArgs);
} catch (Exception e) {
log.error("文件获取失败" + e.getMessage());
}
return inputStream;
}
/**
* 获取某一个存储对象的下载链接
*
* @param bucketName 桶名
* @param method 方法类型
* @param objectName 对象名
* @return url 下载链接
* @throws ServerException 服务异常
*/
public String getObjectUrl(String bucketName, Method method, String objectName)
throws ServerException, InsufficientDataException,
ErrorResponseException, IOException, NoSuchAlgorithmException,
InvalidKeyException, InvalidResponseException, XmlParserException, InternalException {
return minioClient.getPresignedObjectUrl(GetPresignedObjectUrlArgs.builder()
.method(method)
.bucket(bucketName)
.object(objectName).build());
}
}
1.3 使用 Compose 配置分布式 MinIO
我们也可以使用官方自带的 docker-compose.yaml 和 nginx.conf 搭建 MinIO 环境:
version: '3.7'
# Settings and configurations that are common for all containers
x-minio-common: &minio-common
image: quay.io/minio/minio:RELEASE.2022-04-01T03-41-39Z
command: server --console-address ":9001" http://minio{1...4}/data{1...2}
expose:
- "9000"
- "9001"
environment:
MINIO_ROOT_USER: root
MINIO_ROOT_PASSWORD: sast_forever
#如果不指定,用户名和密码都是 minioadmin
#需要注意的是,这里的用户名至少为3位,密码至少为8位
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
interval: 30s
timeout: 20s
retries: 3
# starts 4 docker containers running minio server instances.
# using nginx reverse proxy, load balancing, you can access
# it through port 9000.
services:
minio1:
<<: *minio-common
hostname: minio1
volumes:
- data1-1:/data1
- data1-2:/data2
minio2:
<<: *minio-common
hostname: minio2
volumes:
- data2-1:/data1
- data2-2:/data2
minio3:
<<: *minio-common
hostname: minio3
volumes:
- data3-1:/data1
- data3-2:/data2
minio4:
<<: *minio-common
hostname: minio4
volumes:
- data4-1:/data1
- data4-2:/data2
nginx:
image: nginx:1.19.2-alpine
hostname: nginx
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf:ro
ports:
- "9000:9000"
- "9001:9001"
depends_on:
- minio1
- minio2
- minio3
- minio4
## By default this config uses default local driver,
## For custom volumes replace with volume driver configuration.
volumes:
data1-1:
data1-2:
data2-1:
data2-2:
data3-1:
data3-2:
data4-1:
data4-2:
nginx.conf 如下:
user nginx;
worker_processes auto;
error_log /var/log/nginx/error.log warn;
pid /var/run/nginx.pid;
events {
worker_connections 4096;
}
http {
include /etc/nginx/mime.types;
default_type application/octet-stream;
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';
access_log /var/log/nginx/access.log main;
sendfile on;
keepalive_timeout 65;
# include /etc/nginx/conf.d/*.conf;
upstream minio {
server minio1:9000;
server minio2:9000;
server minio3:9000;
server minio4:9000;
}
upstream console {
ip_hash;
server minio1:9001;
server minio2:9001;
server minio3:9001;
server minio4:9001;
}
server {
listen 9000;
listen [::]:9000;
server_name localhost;
# To allow special characters in headers
ignore_invalid_headers off;
# Allow any size file to be uploaded.
# Set to a value such as 1000m; to restrict file size to a specific value
client_max_body_size 0;
# To disable buffering
proxy_buffering off;
proxy_request_buffering off;
location / {
proxy_set_header Host $http_host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_connect_timeout 300;
# Default is HTTP/1, keepalive is only enabled in HTTP/1.1
proxy_http_version 1.1;
proxy_set_header Connection "";
chunked_transfer_encoding off;
proxy_pass http://minio;
}
}
server {
listen 9001;
listen [::]:9001;
server_name localhost;
# To allow special characters in headers
ignore_invalid_headers off;
# Allow any size file to be uploaded.
# Set to a value such as 1000m; to restrict file size to a specific value
client_max_body_size 0;
# To disable buffering
proxy_buffering off;
proxy_request_buffering off;
location / {
proxy_set_header Host $http_host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_set_header X-NginX-Proxy true;
# This is necessary to pass the correct IP to be hashed
real_ip_header X-Real-IP;
proxy_connect_timeout 300;
# To support websocket
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
chunked_transfer_encoding off;
proxy_pass http://console;
}
}
}
1.4 小坑
在 docker-compose up -d 的时候,MinIO 一直无法正常在后台启动。
在不使用后台命令的情况下发现,原来是密码的设置没有达到至少8个字符(后台启动没法看到进入容器内部的报错)。
2. RabbitMQ
对于 RabbitMQ 通过 Docker 进行安装,采取最新的镜像。
docker run \
-d --name mrabbitmq \
-p 5672:5672 -p 15672:15672 \
-v rabbitmqData:/var/lib/rabbitmq \
-e RABBITMQ_DEFAULT_USER=username \
-e RABBITMQ_DEFAULT_PASS=pas \
rabbitmq:latest
#左边为 api 端口,右边为 gui 端口
2.1 解决页面问题
直接启动 RabbitMQ 容器的时候,页面会无法打开。
如果查阅 RabbitMQ 官网,会发现我们缺少一个后台管理的插件,需要在 Docker 容器中执行rabbitmq-plugins enable rabbitmq_management
命令执行这个插件。这样就能够使用 ip + 15672 访问后端管理页面。
3. RPC
4. I/O 模型
Java 支持3种网络编程模型(使用什么样的通道进行数据的发送和接收):BIO、NIO、AIO。
4.1 BIO
Java BIO:同步并阻塞
(传统阻塞型),服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不作任何事情会造成不必要的线程开销。
服务器对于每一个客户端的连接都会分配一个线程处理,为了避免资源消耗过多,采用了线程池模型(创建一个固定大小的线程池,如果有客户端请求,就从线程池中取一个空闲线程来处理,当客户端处理完操作之后,就会释放对线程的占用)。但如果有过多长时间处理的线程,依然会导致没有空余线程。
4.2 NIO
Java NIO:同步非阻塞
,服务器实现模式为一个线程处理多个请求(连接),即客户端发送的连接请求会被注册到多路复用器上,多路复用器轮询到有 I/O 请求就会进行处理。
Selector 注册处理各种 I/O 事件,分配一个线程轮询每个客户端是否有事件发生,有事件发生时,顺序处理每个事件,当所有事件处理
完之后,便再转去继续轮询。
在 NIO 中,当一个连接创建后,不需要对应一个线程,连接会被注册到多路复用器上面,一个选择器线程可以同时处理成千上万个连接,系统不必创建大量的线程,也不必维护这些线程,从而大大减小了系统的开销。
4.2.1 Reactor 模式
传统处理模式:
采用阻塞I/O模式获取输入数据;
每个连接都需要独立的线程完成数据的输入,业务处理,数据返回。
React 处理模式:
Reactor 模式称为反应器模式或应答者模式,拥有一个或多个并发输入源,有一个服务处理器和多个请求处理器,服务处理器会同步的将输入的请求事件以多路复用的方式分发给相应的请求处理器。当客户端请求抵达后,服务处理程序使用多路分配策略,由一个非阻塞的线程来接收所有请求,然后将请求派发到相关的工作线程并进行处理的过程。
Reactor 包含三种模式:
- 单 Reactor 单线程;
- 单 Reactor 多线程;
- 主从 Reactor 多线程。
4.2.2 结构
NIO 通过 Channels、Buffers、Selectors 组成。
Channel 和 IO 中的 Stream 近似一个等级的。但 Stream 是单向的,譬如:InputStream, OutputStream。而 Channel 是双向的,既可以用来进行读操作,又可以用来进行写操作。
Channel:FileChannel、DatagramChannel、SocketChannel 和 ServerSocketChannel(对应 IO、UDP、TCP(Server 和 Client))。
Buffer:ByteBuffer, CharBuffer, DoubleBuffer, FloatBuffer, IntBuffer, LongBuffer, ShortBuffer。
Selector 运行单线程处理多个 Channel。要使用 Selector, 得向 Selector 注册 Channel,然后调用 select() 方法。select() 会一直阻塞到某个注册的通道有事件就绪。一旦这个方法返回,线程就可以处理这些事件。
Channel
Channel 中,可以进行异步的读写,数据需要读入进一个 Buffer 或者写入进一个 Buffer。
FileChannel
read() 方法返回的int 值表示了有多少字节被读到了Buffer 中。如果返回-1,表示到了文件末尾。
通过调用 position() 方法获取 FileChannel 的当前位置。也可以通过调用 position(long pos) 方法设置 FileChannel 的当前位置。
FileChannel 实例的 size() 方法将返回该实例所关联文件的大小。
可以使用 FileChannel.truncate() 方法截取一个文件,文件将中指定长度后面的部分将被删除。channel.truncate(1024);
,截取文件的前1024 个字节。
// 读取数据的例子
public class Test {
public static void main(String[] args) throws IOException {
// 获取对应文件
RandomAccessFile aFile = new RandomAccessFile("D:\\txt\\1.txt", "rw");
// 文件开启 Channel
FileChannel inChannel = aFile.getChannel();
// 创建 buffer 并分配空间
ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = inChannel.read(buf);
// 读取到文件末尾,返回-1
while (bytesRead != -1) {
System.out.println("Read " + bytesRead);
// 反转读写模式
buf.flip();
while (buf.hasRemaining()) {
// 读取数据,并输出
System.out.print((char) buf.get());
}
// 清除缓冲区的内容
buf.clear();
bytesRead = inChannel.read(buf);
}
inChannel.close();
aFile.close();
System.out.println();
System.out.println("End of file");
}
}
/*==========================================================================*/
// 写入数据例子
public class Test {
public static void main(String[] args) throws IOException {
RandomAccessFile aFile = new RandomAccessFile("D:\\txt\\2.txt", "rw");
FileChannel inChannel = aFile.getChannel();
String data = "New String to write to file..." + LocalDateTime.now();
ByteBuffer buf1 = ByteBuffer.allocate(1024);
buf1.clear();
buf1.put(data.getBytes());
buf1.flip();
// 判断是否有剩余数据
while (buf1.hasRemaining()) {
inChannel.write(buf1);
}
inChannel.close();
aFile.close();
System.out.println();
System.out.println("End of file");
}
}
分散(scatter)从 Channel 中读取是指在读操作时将读取的数据写入多个 Buffer 中。因此,Channel 将从 Channel 中读取的数据“分散(scatter)”到多个 Buffer 中。
聚集(gather)写入 Channel 是指在写操作时将多个 Buffer 的数据写入同一个 Channel,因此,Channel 将多个 Buffer 中的数据“聚集(gather)”后发送到 Channel。
scatter 在读取的时候,需要将 buffer 的空间填满,才能读取对应数据,gather 则没有这个限制。
SocketChannel
- SocketChannel 用来连接 Socket 套接字;
- SocketChannel 用途用来处理网络 I/O 的通道;
- SocketChannel 基于 TCP 连接传输;
- SocketChannel 实现了可选择通道,可以被多路复用。
SocketChannel 是支持非阻塞 socket,改进传统单向流 API,同时支持读写。可以设置 Channel 的(非)阻塞模式,调用 configureBlocking(),通过 isBlocking() 方法判断模式。
public class Test {
public static final String GREETING = "Hello, World!\n";
public static void main(String[] args) throws Exception {
int port = 1234;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
}
ByteBuffer buffer = ByteBuffer.wrap(GREETING.getBytes());
// 使用 open 打开 ServerSocketChannel
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress(port));
// false 表示非阻塞,true 表示阻塞
ssc.configureBlocking(false);
while (true) {
System.out.println("Waiting for connection...");
// 使用 accept 接受连接,如果是阻塞模式会在这里进行阻塞
SocketChannel sc = ssc.accept();
if (sc == null) {
System.out.println("No connection");
Thread.sleep(2000);
} else {
System.out.println("Connection established: " + sc.socket().getRemoteSocketAddress());
// 重置 buffer 的状态
buffer.rewind();
sc.write(buffer);
sc.close();
}
}
}
}
DatagramChannel
DatagramChannel 模拟包导向的无连接协议(如 UDP/IP)。DatagramChannel 是无连接的,每个数据报(datagram)都是一个自包含的实体,拥有它自己的目的地址及不依赖其他数据报的数据负载。与面向流的的 socket 不同,DatagramChannel 可以发送单独的数据报给不同的目的地址。同样,DatagramChannel 对象也可以接收来自任意地址的数据包。每个到达的数据报都含有关于它来自何处的信息(源地址)
@DisplayName("发包的 DatagramChannel")
@Test
public void sendDatagram() throws IOException, InterruptedException {
// 使用 try-catch-resource 的方式,在结束自动执行 close() 方法
try (DatagramChannel sendChannel = DatagramChannel.open()) {
InetSocketAddress sendAddress = new InetSocketAddress("127.0.0.1", 9999);
while (true) {
sendChannel.send(ByteBuffer.wrap(
"测试包发送".getBytes(StandardCharsets.UTF_8)),
sendAddress
);
log.info("发包端发包");
Thread.sleep(2000);
}
}
}
@DisplayName("收包端 DatagramChannel")
@Test
public void receive() throws IOException {
try (DatagramChannel receiveChannel = DatagramChannel.open()) {
InetSocketAddress receiveAddress = new InetSocketAddress(9999);
// 绑定端口为9999
receiveChannel.bind(receiveAddress);
ByteBuffer receiveBuffer = ByteBuffer.allocate(512);
while (true) {
receiveBuffer.clear();
// 可以获取发包端的 ip + 端口等信息
// 格式:/127.0.0.1:58857
SocketAddress sendAddress = receiveChannel.receive(receiveBuffer);
receiveBuffer.flip();
log.info(sendAddress.toString());
log.info("收包端收包:{}", StandardCharsets.UTF_8.decode(receiveBuffer));
}
}
}
@Test
public void testConnect1() throws IOException {
try (DatagramChannel connChannel = DatagramChannel.open()) {
connChannel.bind(new InetSocketAddress(9998));
connChannel.connect(new InetSocketAddress("127.0.0.1", 9999));
// 向9999端口发送信息
connChannel.write(ByteBuffer.wrap("发包".getBytes(StandardCharsets.UTF_8)));
ByteBuffer readBuffer = ByteBuffer.allocate(512);
while (true) {
try {
readBuffer.clear();
connChannel.read(readBuffer);
readBuffer.flip();
log.info(StandardCharsets.UTF_8.decode(readBuffer).toString());
} catch (Exception e) {
}
}
}
}
Buffer
在 NIO 库中,所有数据都是用缓冲区处理。
当向 Buffer 写入数据时,Buffer 会记录下写了多少数据。一旦要读取数据,需要通过 flip() 方法将 Buffer 从写模式切换到读模式。在读模式下,可以读取之前写入到 Buffer 的所有数据。一旦读完了所有的数据,就需要清空缓冲区,让它可以再次被写入。
@Test
@DisplayName("IntBuffer 测试")
public void IntBufferTest() {
// 分配新的int 缓冲区,参数为缓冲区容量
// 新缓冲区的当前位置将为零,其界限(限制位置)将为其容量。
// 它将具有一个底层实现数组,其数组偏移量将为零。
IntBuffer intBuffer = IntBuffer.allocate(10);
for (int i = 0; i < intBuffer.capacity(); i++) {
// 将给定整数写入此缓冲区的当前位置,当前位置递增
intBuffer.put(i * 2 + 1);
}
// 重设此缓冲区,将限制设置为当前位置,然后将当前位置设置为0
intBuffer.flip();
while (intBuffer.hasRemaining()) {
log.info("intBuffer.get() = {}", intBuffer.get());
}
}
基本属性/方法
Buffer 具有三个属性:
capacity:Buffer 有一个固定的大小值,有一个容量;
position:
写入数据:
position 表示写入数据的当前位置,position 的初始值为0,position 最大可为capacity – 1。
读取数据:
position 表示读入数据的当前位置,如 position=2 时表示已开始读入了3个 byte,或从第3个 byte 开始读取。通过ByteBuffer.flip() 切换到读模式时 position 会被重置为0。
limit:
写入数据:
limit 表示可对 Buffer 最多写入多少个数据。写模式下,limit 等于 Buffer 的 capacity。
读取数据:
limit 表示 Buffer 里有多少可读数据(not null 的数据),因此能读到之前写入的所有数据(limit 被设置成已写数据的数量,这个值在写模式下就是 position,通俗来说,就是你能读取到对应的数据的位置)。
Buffer 方法:
- rewind():将 position 重置为0,limit 保持不变;
- clear() & compact():
- clear():position 重置为0,limit 被设置为 capacity 的值,如果存在未读数据,数据将“被遗忘”,不会被标记出来。Buffer 中的数据并未清除;
- compact():所有未读的数据拷贝到 Buffer 起始处。将 position 设为最后一个未读元素之后。limit 属性依然设置 capacity。Buffer 写入数据时不会覆盖未读数据;
- mark() & reset():mark() 可以标记 Buffer 中的一个特定 position,调用Buffer.reset()方法恢复到这个position。
@Test
public void testConnect3() throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(10);
// 缓冲区中的数据0-9
for (int i = 0; i < buffer.capacity(); ++i) {
buffer.put((byte) i);
}
// 创建子缓冲区
buffer.position(3);
// 设置最多操作到第七位的数据
buffer.limit(7);
// slice 与原来的 buffer 共享数据
ByteBuffer slice = buffer.slice();
// 改变子缓冲区的内容
for (int i = 0; i < slice.capacity(); ++i) {
byte b = slice.get(i);
b *= 10;
slice.put(i, b);
}
buffer.position(0);
buffer.limit(buffer.capacity());
while (buffer.remaining() > 0) {
System.out.println(buffer.get());
}
/*
0
1
2
30
40
50
60
7
8
9
*/
}
缓冲区操作
调用buffer.asReadOnlyBuffer();
,创建一个只读缓冲区,原缓冲区内容变化时,只读缓冲区也随之变化。
allocateDirect()
生成一个直接缓冲区,尝试避免将缓冲区的内容拷贝到一个中间缓冲区中或者从一个中间缓冲区中拷贝数据。
Selector
更少的线程处理通道,相比使用多个线程,避免了线程上下文切换带来的开销。
SelectableChannel 类提供了实现通道的可选择性所需要的公共方法,socket 通道都继承自这个父类,而 FileChannel 类没有。
使用 Channel.register(Selector sel, int ops) 方法,将一个通道注册到一个选择器。
第二个参数表示查询的通道操作,包括以下四种(多种模式可以通过|
连接):
- 可读: SelectionKey.OP_READ;
- 可写: SelectionKey.OP_WRITE;
- 连接: SelectionKey.OP_CONNECT;
- 接收: SelectionKey.OP_ACCEPT。
使用 select() 可以查询出已经就绪的通道操作,就绪的状态集合存放在Set<SelectionKey>
集合中。
select() 有几个重载的方法:
- select():阻塞到至少有一个通道在你注册的事件上就绪了;
- select(long timeout):和 select() 一样,但最长阻塞事件为 timeout 毫秒;
- selectNow():非阻塞,只要有通道就绪就立刻返回。
select() 方法返回的 int 值,表示有多少通道已经就绪,更准确的说,是自前一次 select方法以来到这一次 select 方法之间的时间段上,有多少通道变成就绪状态。
@Test
public void ServerDemo() {
try (ServerSocketChannel ssc = ServerSocketChannel.open()) {
ssc.socket().bind(new InetSocketAddress("127.0.0.1", 9000));
// 设置非阻塞,因为 Selector 只能接受非阻塞通道
// 如果是阻塞通道,那么这里就会抛出 IOException
// 所以 FileChannel 不能注册到 Selector 上
ssc.configureBlocking(false);
Selector selector = Selector.open();
// 通道不是支持所有的模式,ServerSocketChannel 支持 Accept
// 查看类内部的 validOps() 方法,获取通道支持的模式
ssc.register(selector, SelectionKey.OP_ACCEPT);
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
writeBuffer.put("received".getBytes());
writeBuffer.flip();
while (true) {
int nReady = selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
if (key.isAcceptable()) {
// 创建新的连接,并且把连接注册到 selector
// 声明 channel 只对读操作感兴趣
SocketChannel socketChannel = ssc.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
// 获取 key 对应的 channel
try (SocketChannel socketChannel = (SocketChannel) key.channel()) {
readBuffer.clear();
socketChannel.read(readBuffer);
readBuffer.flip();
log.info("received: " + StandardCharsets.UTF_8.decode(readBuffer));
key.interestOps(SelectionKey.OP_WRITE);
}
} else if (key.isWritable()) {
writeBuffer.rewind();
try (SocketChannel socketChannel = (SocketChannel) key.channel()) {
socketChannel.write(writeBuffer);
key.interestOps(SelectionKey.OP_READ);
}
}
it.remove();
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Test
public void ClientDemo() {
try (SocketChannel socketChannel = SocketChannel.open()) {
socketChannel.connect(new InetSocketAddress("127.0.0.1", 9000));
ByteBuffer writeBuffer = ByteBuffer.allocate(32);
ByteBuffer readBuffer = ByteBuffer.allocate(32);
writeBuffer.put("hello server".getBytes());
writeBuffer.flip();
while (true) {
writeBuffer.rewind();
socketChannel.write(writeBuffer);
readBuffer.clear();
socketChannel.read(readBuffer);
}
} catch (IOException e) {
}
}
4.2.3 Pipe
Pipe 用于实现两个线程之间数据的单向流通。
@Test
public void testPipe() throws IOException {
// 1、获取通道
Pipe pipe = Pipe.open();
// 2、获取 sink 管道,用来传送数据
Pipe.SinkChannel sinkChannel = pipe.sink();
// 3、申请一定大小的缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
byteBuffer.put("hello, world".getBytes());
byteBuffer.flip();
// 4、sink 发送数据
sinkChannel.write(byteBuffer);
// 5、创建接收 pipe 数据的 source 管道
Pipe.SourceChannel sourceChannel = pipe.source();
// 6、接收数据,并保存到缓冲区中
ByteBuffer byteBuffer2 = ByteBuffer.allocate(1024);
int length = sourceChannel.read(byteBuffer2);
System.out.println(new String(byteBuffer2.array(), 0, length));
sourceChannel.close();
sinkChannel.close();
}
4.3 AIO(NIO 2)
Java AIO:异步非阻塞
,AIO 引入了异步通道的概念,采用了 Proactor 模式,简化了程序编写,有效的请求才启动线程,它的特点是先由操作系统完成后才通知服务端程序启动线程去处理,一般适用于连接数较多且连接时间较长的应用。
异步 IO 是基于事件和回调机制实现的,AIO 模式不需要 selector 操作,而是事件驱动形式。当客户端发送数据之后,会主动通知服
务器,接着服务器再进行读写操作。