From 2912cff24f0463d7f442a6ea1f7628258877c580 Mon Sep 17 00:00:00 2001
From: shikong <919411476@qq.com>
Date: Thu, 7 Sep 2023 02:42:28 +0800
Subject: [PATCH] =?UTF-8?q?webflux=20=E8=AF=95=E6=B0=B4?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
gb28181-wvp-proxy-api/pom.xml | 6 +-
.../wvp/advice/BasicExceptionAdvice.java | 6 +-
.../wvp/api/video/RecordController.java | 13 ++--
.../wvp/api/{ => video}/VideoController.java | 2 +-
.../docking/gb28181/wvp/config/WebConfig.java | 30 ++++++---
.../wvp/interceptor/RequestInterceptor.java | 20 ------
gb28181-wvp-proxy-service/pom.xml | 8 ++-
.../wvp/service/video/RecordService.java | 61 ++++++++++++-------
gb28181-wvp-proxy-starter/pom.xml | 4 +-
.../src/main/resources/application-local.yml | 5 ++
.../src/main/resources/application.yml | 2 +
pom.xml | 8 ++-
12 files changed, 99 insertions(+), 66 deletions(-)
rename gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/{ => video}/VideoController.java (98%)
delete mode 100644 gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/interceptor/RequestInterceptor.java
diff --git a/gb28181-wvp-proxy-api/pom.xml b/gb28181-wvp-proxy-api/pom.xml
index 265ea2c..a1ee7f8 100644
--- a/gb28181-wvp-proxy-api/pom.xml
+++ b/gb28181-wvp-proxy-api/pom.xml
@@ -37,7 +37,7 @@
org.springdoc
- springdoc-openapi-starter-webmvc-ui
+ springdoc-openapi-starter-webflux-ui
${springdoc.version}
@@ -48,7 +48,7 @@
org.springframework.boot
- spring-boot-starter-web
+ spring-boot-starter-webflux
@@ -62,4 +62,4 @@
jakarta.servlet-api
-
\ No newline at end of file
+
diff --git a/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/advice/BasicExceptionAdvice.java b/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/advice/BasicExceptionAdvice.java
index 25a10cb..665e22f 100644
--- a/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/advice/BasicExceptionAdvice.java
+++ b/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/advice/BasicExceptionAdvice.java
@@ -1,7 +1,7 @@
package cn.skcks.docking.gb28181.wvp.advice;
-import jakarta.servlet.http.HttpServletRequest;
import lombok.extern.slf4j.Slf4j;
+import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.bind.annotation.ExceptionHandler;
@@ -14,8 +14,8 @@ import org.springframework.web.bind.annotation.ExceptionHandler;
@ControllerAdvice
public class BasicExceptionAdvice {
@ExceptionHandler(Exception.class)
- public void exception(HttpServletRequest request, Exception e) {
- if(request.getRequestURI().equals("/video")){
+ public void exception(ServerHttpRequest request, Exception e) {
+ if(request.getURI().getPath().equals("/video")){
return;
}
e.printStackTrace();
diff --git a/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/video/RecordController.java b/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/video/RecordController.java
index b792a33..efd7cf4 100644
--- a/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/video/RecordController.java
+++ b/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/video/RecordController.java
@@ -1,10 +1,12 @@
package cn.skcks.docking.gb28181.wvp.api.video;
import cn.skcks.docking.gb28181.wvp.service.video.RecordService;
-import jakarta.servlet.http.HttpServletResponse;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.web.bind.annotation.*;
+import org.springframework.web.server.ServerWebExchange;
+import reactor.core.publisher.Mono;
@Slf4j
@RequiredArgsConstructor
@@ -14,12 +16,13 @@ public class RecordController {
private final RecordService recordService;
@RequestMapping(method = {RequestMethod.HEAD,RequestMethod.OPTIONS})
- public void record(HttpServletResponse response){
- recordService.header(response);
+ public void record(ServerWebExchange exchange){
+ recordService.header(exchange.getResponse());
}
@GetMapping
- public void record(HttpServletResponse response, @RequestParam String url,@RequestParam long time){
- recordService.record(response,url,time);
+ public Mono record(ServerWebExchange exchange, @RequestParam String url, @RequestParam long time){
+ log.info("{} {}", url,time);
+ return recordService.record(exchange.getResponse(),url,time);
}
}
diff --git a/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/VideoController.java b/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/video/VideoController.java
similarity index 98%
rename from gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/VideoController.java
rename to gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/video/VideoController.java
index 801c915..b168e56 100644
--- a/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/VideoController.java
+++ b/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/video/VideoController.java
@@ -1,4 +1,4 @@
-package cn.skcks.docking.gb28181.wvp.api;
+package cn.skcks.docking.gb28181.wvp.api.video;
import cn.hutool.core.io.IoUtil;
import cn.hutool.core.util.IdUtil;
diff --git a/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/config/WebConfig.java b/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/config/WebConfig.java
index 26452d2..82e79b7 100644
--- a/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/config/WebConfig.java
+++ b/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/config/WebConfig.java
@@ -1,22 +1,34 @@
package cn.skcks.docking.gb28181.wvp.config;
-import cn.skcks.docking.gb28181.wvp.interceptor.RequestInterceptor;
+import cn.hutool.core.util.ReUtil;
import jakarta.validation.constraints.NotNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Configuration;
-import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
-import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
+import org.springframework.web.server.ServerWebExchange;
+import org.springframework.web.server.WebFilter;
+import org.springframework.web.server.WebFilterChain;
+import reactor.core.publisher.Mono;
+
+import java.net.InetSocketAddress;
+import java.util.Optional;
+
@Slf4j
@Configuration
@RequiredArgsConstructor
-public class WebConfig implements WebMvcConfigurer {
- private final RequestInterceptor requestInterceptor;
+public class WebConfig implements WebFilter {
+
@Override
- public void addInterceptors(@NotNull InterceptorRegistry registry) {
- registry.addInterceptor(requestInterceptor)
- .excludePathPatterns("/swagger-ui/**","/v3/api-docs/**")
- .addPathPatterns("/**");
+ public Mono filter(ServerWebExchange exchange, WebFilterChain chain) {
+ String path = exchange.getRequest().getPath().toString();
+ if(ReUtil.isMatch(".*/swagger-ui/.*",path) || ReUtil.isMatch("/v3/api-docs.*",path) ){
+ return chain.filter(exchange);
+ } else {
+ String ip = Optional.ofNullable(exchange.getRequest().getRemoteAddress())
+ .orElse(new InetSocketAddress("0.0.0.0",0)).getHostString();
+ log.info("{} 访问 {}", ip, path);
+ }
+ return chain.filter(exchange);
}
}
diff --git a/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/interceptor/RequestInterceptor.java b/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/interceptor/RequestInterceptor.java
deleted file mode 100644
index 1467394..0000000
--- a/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/interceptor/RequestInterceptor.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package cn.skcks.docking.gb28181.wvp.interceptor;
-
-import jakarta.servlet.http.HttpServletRequest;
-import jakarta.servlet.http.HttpServletResponse;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Component;
-import org.springframework.web.servlet.HandlerInterceptor;
-
-@Slf4j
-@Component
-@SuppressWarnings({"unused"})
-@RequiredArgsConstructor
-public class RequestInterceptor implements HandlerInterceptor {
- @Override
- public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
- log.info("{} 访问 {}",request.getRemoteHost(), request.getRequestURI());
- return true;
- }
-}
diff --git a/gb28181-wvp-proxy-service/pom.xml b/gb28181-wvp-proxy-service/pom.xml
index 84e8fdb..b3844e1 100644
--- a/gb28181-wvp-proxy-service/pom.xml
+++ b/gb28181-wvp-proxy-service/pom.xml
@@ -32,11 +32,17 @@
cn.skcks.docking
zlmediakit-service
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+
+
org.springframework.boot
- spring-boot-starter-web
+ spring-boot-starter-webflux
diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/video/RecordService.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/video/RecordService.java
index 4dbfc54..3cbb0ba 100644
--- a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/video/RecordService.java
+++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/video/RecordService.java
@@ -2,15 +2,24 @@ package cn.skcks.docking.gb28181.wvp.service.video;
import cn.hutool.core.io.IoUtil;
import cn.hutool.core.util.IdUtil;
+import jakarta.servlet.ServletOutputStream;
import jakarta.servlet.http.HttpServletResponse;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.bytedeco.ffmpeg.global.avcodec;
import org.bytedeco.ffmpeg.global.avutil;
import org.bytedeco.javacv.*;
+import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.core.io.buffer.DefaultDataBuffer;
+import org.springframework.core.io.buffer.DefaultDataBufferFactory;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.MediaType;
+import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Service;
+import reactor.core.publisher.Mono;
import java.io.*;
+import java.net.http.HttpClient;
import java.nio.file.Path;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -20,45 +29,48 @@ import java.util.concurrent.atomic.AtomicBoolean;
@Slf4j
@Service
public class RecordService {
- public void header(HttpServletResponse response){
- response.setContentType("video/mp4");
- response.setHeader("Accept-Ranges","none");
- response.setHeader("Connection","close");
+ public void header(ServerHttpResponse response){
+ HttpHeaders headers = response.getHeaders();
+ headers.setContentType(MediaType.APPLICATION_OCTET_STREAM);
+ headers.set("Accept-Ranges","none");
+ headers.setConnection("close");
}
+ // @Async(Default)
@SneakyThrows
- public void record(HttpServletResponse response, String url, long timeout){
- response.reset();
+ public Mono record(ServerHttpResponse response, String url, long timeout){
+ Mono mono = Mono.empty();
// response.setContentType(MediaType.APPLICATION_OCTET_STREAM_VALUE);
header(response);
-
Path tmp = Path.of(System.getProperty("java.io.tmpdir"), IdUtil.getSnowflakeNextIdStr()).toAbsolutePath();
File file = new File(tmp + ".mp4");
log.info("创建文件 {}, {}", file, file.createNewFile());
-
log.info("url {}", url);
+ DataBuffer dataBuffer = response.bufferFactory().allocateBuffer(DefaultDataBufferFactory.DEFAULT_INITIAL_CAPACITY);
+ OutputStream outputStream = dataBuffer.asOutputStream();
try (FFmpegFrameGrabber grabber = new FFmpegFrameGrabber(url)) {
grabber.start();
- try(FFmpegFrameRecorder recorder = new FFmpegFrameRecorder(file, grabber.getImageWidth(), grabber.getImageHeight(),grabber.getAudioChannels())){
+
+ try (FFmpegFrameRecorder recorder = new FFmpegFrameRecorder(file, grabber.getImageWidth(), grabber.getImageHeight(), grabber.getAudioChannels())) {
recorder.start();
log.info("开始录像");
log.info("{}", file);
recorder.setVideoCodec(avcodec.AV_CODEC_ID_H264);
- recorder.setPixelFormat(avutil.AV_PIX_FMT_YUV420P); //视频源数据yuv
- recorder.setAudioCodec(avcodec.AV_CODEC_ID_AAC); //设置音频压缩方式
+ recorder.setPixelFormat(avutil.AV_PIX_FMT_YUV420P); // 视频源数据yuv
+ recorder.setAudioCodec(avcodec.AV_CODEC_ID_AAC); // 设置音频压缩方式
recorder.setFormat("mp4");
- recorder.setVideoOption("threads", String.valueOf(Runtime.getRuntime().availableProcessors())); //解码线程数
+ recorder.setVideoOption("threads", String.valueOf(Runtime.getRuntime().availableProcessors())); // 解码线程数
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
AtomicBoolean record = new AtomicBoolean(true);
- scheduledExecutorService.schedule(()->{
+ scheduledExecutorService.schedule(() -> {
log.info("到达超时时间, 结束录制");
record.set(false);
}, timeout, TimeUnit.SECONDS);
try {
Frame frame;
- while (record.get() && (frame = grabber.grab()) != null) {
+ while (!response.isCommitted() && record.get() && (frame = grabber.grab()) != null) {
recorder.record(frame);
}
grabber.stop();
@@ -67,15 +79,22 @@ public class RecordService {
throw new RuntimeException(e);
}
}
- } finally {
+ // response.getOutputStream()
+
+ } catch (Exception e){
+ log.info("{}",e.getMessage());
+ }finally {
log.info("结束录制");
- InputStream inputStream = new BufferedInputStream(new FileInputStream(file));
- OutputStream outputStream = new BufferedOutputStream(response.getOutputStream());
- try{
- IoUtil.copy(inputStream, outputStream);
- } catch (Exception ignore){}
- log.info("临时文件 {} 写入 响应 完成", file);
+ if(!response.isCommitted()){
+ InputStream inputStream = new BufferedInputStream(new FileInputStream(file));
+ try{
+ IoUtil.copy(inputStream, outputStream);
+ } catch (Exception ignore){}
+ log.info("临时文件 {} 写入 响应", file);
+ mono = response.writeWith(Mono.just(dataBuffer));
+ }
log.info("删除临时文件 {} {}", file, file.delete());
}
+ return mono;
}
}
diff --git a/gb28181-wvp-proxy-starter/pom.xml b/gb28181-wvp-proxy-starter/pom.xml
index 2ec8d74..1e80651 100644
--- a/gb28181-wvp-proxy-starter/pom.xml
+++ b/gb28181-wvp-proxy-starter/pom.xml
@@ -53,7 +53,7 @@
org.springframework.boot
- spring-boot-starter-web
+ spring-boot-starter-webflux
@@ -158,4 +158,4 @@
-
\ No newline at end of file
+
diff --git a/gb28181-wvp-proxy-starter/src/main/resources/application-local.yml b/gb28181-wvp-proxy-starter/src/main/resources/application-local.yml
index b8215c9..ebb4f4f 100644
--- a/gb28181-wvp-proxy-starter/src/main/resources/application-local.yml
+++ b/gb28181-wvp-proxy-starter/src/main/resources/application-local.yml
@@ -4,6 +4,8 @@ project:
version: @project.version@
spring:
+ main:
+ web-application-type: reactive
data:
redis:
# [必须修改] Redis服务器IP, REDIS安装在本机的,使用127.0.0.1
@@ -45,3 +47,6 @@ proxy:
url: http://192.168.3.13:18978
user: admin
passwd: admin
+springdoc:
+ api-docs:
+ enabled: true
diff --git a/gb28181-wvp-proxy-starter/src/main/resources/application.yml b/gb28181-wvp-proxy-starter/src/main/resources/application.yml
index 714280f..87bb3eb 100644
--- a/gb28181-wvp-proxy-starter/src/main/resources/application.yml
+++ b/gb28181-wvp-proxy-starter/src/main/resources/application.yml
@@ -6,6 +6,8 @@ project:
spring:
+ main:
+ web-application-type: reactive
data:
redis:
# [必须修改] Redis服务器IP, REDIS安装在本机的,使用127.0.0.1
diff --git a/pom.xml b/pom.xml
index e8c96ad..e756259 100644
--- a/pom.xml
+++ b/pom.xml
@@ -118,6 +118,12 @@
cn.skcks.docking
zlmediakit-service
${gb28181.docking.version}
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+
+
@@ -202,7 +208,7 @@
org.springdoc
- springdoc-openapi-starter-webmvc-ui
+ springdoc-openapi-starter-webflux-ui
${springdoc.version}