From 45ac5d5d563ee7a685d1d388687088f7598d3f19 Mon Sep 17 00:00:00 2001 From: shikong <919411476@qq.com> Date: Sat, 9 Sep 2023 20:53:52 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4/=E8=B0=83=E8=AF=95=20?= =?UTF-8?q?=E9=87=8D=E8=AF=95=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../wvp/api/video/VideoController.java | 6 +- .../gb28181/wvp/api/video/dto/VideoReq.java | 31 ++++ .../wvp/service/download/DownloadService.java | 17 ++- .../gb28181/wvp/service/wvp/WvpService.java | 137 +++++++++++++++--- .../src/main/resources/application.yml | 4 +- 5 files changed, 170 insertions(+), 25 deletions(-) create mode 100644 gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/video/dto/VideoReq.java diff --git a/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/video/VideoController.java b/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/video/VideoController.java index dfdaa60..f63db40 100644 --- a/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/video/VideoController.java +++ b/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/video/VideoController.java @@ -1,6 +1,7 @@ package cn.skcks.docking.gb28181.wvp.api.video; import cn.skcks.docking.gb28181.media.config.ZlmMediaConfig; +import cn.skcks.docking.gb28181.wvp.api.video.dto.VideoReq; import cn.skcks.docking.gb28181.wvp.config.SwaggerConfig; import cn.skcks.docking.gb28181.wvp.service.video.RecordService; import cn.skcks.docking.gb28181.wvp.service.wvp.WvpService; @@ -10,6 +11,7 @@ import jakarta.servlet.http.HttpServletRequest; import jakarta.servlet.http.HttpServletResponse; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springdoc.core.annotations.ParameterObject; import org.springdoc.core.models.GroupedOpenApi; import org.springframework.context.annotation.Bean; import org.springframework.http.MediaType; @@ -36,7 +38,7 @@ public class VideoController { @Operation(summary = "获取视频 (目前仅供测试)") @GetMapping(produces = MediaType.APPLICATION_OCTET_STREAM_VALUE) @ResponseBody - public void video(HttpServletRequest request, HttpServletResponse response) { - wvpService.video(request,response); + public void video(HttpServletRequest request, HttpServletResponse response, @ParameterObject VideoReq req) { + wvpService.video(request,response,req.getDeviceCode(), req.getStartTime(), req.getEndTime()); } } diff --git a/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/video/dto/VideoReq.java b/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/video/dto/VideoReq.java new file mode 100644 index 0000000..9b0c02a --- /dev/null +++ b/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/video/dto/VideoReq.java @@ -0,0 +1,31 @@ +package cn.skcks.docking.gb28181.wvp.api.video.dto; + +import cn.hutool.core.date.DatePattern; +import com.fasterxml.jackson.annotation.JsonFormat; +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.validation.constraints.NotBlank; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.springframework.format.annotation.DateTimeFormat; + +import java.util.Date; + +@NoArgsConstructor +@Data +public class VideoReq { + @NotBlank(message = "设备编码 不能为空") + @Schema(description = "设备编码") + private String deviceCode; + + @Schema(description = "开始时间",example = "20230909000000") + @NotBlank(message = "开始时间 不能为空") + @DateTimeFormat(pattern= DatePattern.PURE_DATETIME_PATTERN) + @JsonFormat(pattern = DatePattern.PURE_DATETIME_PATTERN) + private Date startTime; + + @Schema(description = "结束时间",example = "20230909000500") + @NotBlank(message = "结束时间 不能为空") + @DateTimeFormat(pattern= DatePattern.PURE_DATETIME_PATTERN) + @JsonFormat(pattern = DatePattern.PURE_DATETIME_PATTERN) + private Date endTime; +} diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/download/DownloadService.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/download/DownloadService.java index 3162ea8..0d6aa33 100644 --- a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/download/DownloadService.java +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/download/DownloadService.java @@ -1,6 +1,7 @@ package cn.skcks.docking.gb28181.wvp.service.download; import cn.hutool.core.io.IoUtil; +import cn.skcks.docking.gb28181.common.json.JsonResponse; import jakarta.servlet.AsyncContext; import jakarta.servlet.http.HttpServletRequest; import jakarta.servlet.http.HttpServletResponse; @@ -11,10 +12,13 @@ import org.apache.hc.client5.http.classic.methods.HttpHead; import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; import org.apache.hc.client5.http.impl.classic.HttpClients; import org.apache.hc.core5.http.ClassicHttpResponse; +import org.springframework.http.MediaType; import org.springframework.stereotype.Service; +import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.charset.StandardCharsets; import java.util.Optional; @Slf4j @@ -56,9 +60,7 @@ public class DownloadService { } @SneakyThrows - public void download(HttpServletRequest request, HttpServletResponse response, String url) { - AsyncContext asyncContext = request.startAsync(); - asyncContext.setTimeout(0); + public void download(AsyncContext asyncContext , HttpServletResponse response, String url) { asyncContext.start(() -> { try { response.setHeader("Accept-Ranges", "none"); @@ -72,7 +74,14 @@ public class DownloadService { } @SneakyThrows - private void download(HttpServletResponse response, String url) { + public void download(HttpServletRequest request, HttpServletResponse response, String url) { + AsyncContext asyncContext = request.startAsync(); + asyncContext.setTimeout(0); + download(asyncContext, response, url); + } + + @SneakyThrows + public void download(HttpServletResponse response, String url) { OutputStream outputStream = response.getOutputStream(); try (CloseableHttpClient client = HttpClients.custom().build()) { diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/wvp/WvpService.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/wvp/WvpService.java index d38366e..178bbfe 100644 --- a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/wvp/WvpService.java +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/wvp/WvpService.java @@ -1,23 +1,37 @@ package cn.skcks.docking.gb28181.wvp.service.wvp; +import cn.hutool.core.io.IoUtil; import cn.hutool.crypto.digest.MD5; +import cn.skcks.docking.gb28181.common.json.JsonException; import cn.skcks.docking.gb28181.common.json.JsonResponse; +import cn.skcks.docking.gb28181.common.json.JsonUtils; import cn.skcks.docking.gb28181.wvp.config.WvpProxyConfig; +import cn.skcks.docking.gb28181.wvp.dto.device.DeviceChannel; +import cn.skcks.docking.gb28181.wvp.dto.device.GetDeviceChannelsReq; +import cn.skcks.docking.gb28181.wvp.dto.device.GetDeviceChannelsResp; import cn.skcks.docking.gb28181.wvp.dto.login.WvpLoginReq; import cn.skcks.docking.gb28181.wvp.dto.login.WvpLoginResp; +import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDevice; import cn.skcks.docking.gb28181.wvp.proxy.WvpProxyClient; +import cn.skcks.docking.gb28181.wvp.service.device.DeviceService; import cn.skcks.docking.gb28181.wvp.service.download.DownloadService; -import com.github.rholder.retry.Retryer; -import com.github.rholder.retry.RetryerBuilder; -import com.github.rholder.retry.StopStrategies; -import com.github.rholder.retry.WaitStrategies; +import com.github.rholder.retry.*; +import jakarta.servlet.AsyncContext; import jakarta.servlet.http.HttpServletRequest; import jakarta.servlet.http.HttpServletResponse; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import org.springframework.http.MediaType; import org.springframework.stereotype.Service; +import java.io.PrintWriter; +import java.nio.charset.StandardCharsets; +import java.text.MessageFormat; +import java.util.Date; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @Slf4j @@ -26,34 +40,123 @@ import java.util.concurrent.TimeUnit; public class WvpService { private final WvpProxyClient wvpProxyClient; private final WvpProxyConfig wvpProxyConfig; - + private final DeviceService deviceService; private final DownloadService downloadService; - @SneakyThrows - public void video(HttpServletRequest request, HttpServletResponse response) { - Retryer> retryer = RetryerBuilder.>newBuilder() + /** + * 默认重试次数 + */ + public final static int DEFAULT_RETRY_TIME = 3; + /** + * 默认每次重试等待时间 + */ + public final static int DEFAULT_RETRY_WAIT_TIME = 3; + + @SuppressWarnings("UnstableApiUsage") + private static RetryListener defaultRetryListener(){ + return new RetryListener() { + @Override + public void onRetry(Attempt attempt) { + log.info("第 {} 次 执行结束",attempt.getAttemptNumber()); + if(attempt.hasException()){ + log.info("异常 {}", attempt.getExceptionCause().getMessage()); + } + } + }; + } + + @SuppressWarnings("UnstableApiUsage") + private static Retryer> getDefaultRetryer() { + return RetryerBuilder.>newBuilder() // 异常就重试 .retryIfException() + .retryIfRuntimeException() // 重试间隔 - .withWaitStrategy(WaitStrategies.fixedWait(5, TimeUnit.SECONDS)) + .withWaitStrategy(WaitStrategies.fixedWait(DEFAULT_RETRY_WAIT_TIME, TimeUnit.SECONDS)) // 重试次数 - .withStopStrategy(StopStrategies.stopAfterAttempt(5)) - .retryIfResult(result -> result == null || (result.getCode() != 0 && result.getCode() != 200)) + .withStopStrategy(StopStrategies.stopAfterAttempt(DEFAULT_RETRY_TIME)) + .retryIfResult(result -> result != null && (result.getCode() != 0 || result.getCode() != 200)) + .withRetryListener(defaultRetryListener()) .build(); + } + @SuppressWarnings("UnstableApiUsage") + private static Retryer> getDefaultGenericRetryer() { + return RetryerBuilder.>newBuilder() + // 异常就重试 + .retryIfException() + .retryIfRuntimeException() + // 重试间隔 + .withWaitStrategy(WaitStrategies.fixedWait(DEFAULT_RETRY_WAIT_TIME, TimeUnit.SECONDS)) + // 重试次数 + .withStopStrategy(StopStrategies.stopAfterAttempt(DEFAULT_RETRY_TIME)) + .retryIfResult(result -> result != null && (result.getCode() != 0 || result.getCode() != 200)) + .withRetryListener(defaultRetryListener()) + .build(); + } + + @SneakyThrows + private void writeErrorToResponse(HttpServletResponse response, JsonResponse json) { + response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + response.setCharacterEncoding(StandardCharsets.UTF_8.name()); + response.setContentType(MediaType.APPLICATION_JSON_VALUE); + IoUtil.writeUtf8(response.getOutputStream(), false, json); + } + + @SneakyThrows + public void video(HttpServletRequest request, HttpServletResponse response, String deviceCode, Date startTime, Date endTime) { + WvpProxyDevice wvpProxyDevice = deviceService.getDeviceByDeviceCode(deviceCode).orElse(null); + if (wvpProxyDevice == null) { + writeErrorToResponse(response, JsonResponse.error("设备不存在")); + return; + } + String deviceId = wvpProxyDevice.getGbDeviceId(); + String channelId = wvpProxyDevice.getGbDeviceChannelId(); + log.info("设备编码 (deviceCode=>{}) 查询到的设备信息 国标id(gbDeviceId => {}), 通道(channelId => {})", deviceCode, deviceId, channelId); + + Retryer> genericRetryer = getDefaultGenericRetryer(); String passwdMd5 = MD5.create().digestHex(wvpProxyConfig.getPasswd()); WvpLoginReq loginReq = WvpLoginReq.builder() .username(wvpProxyConfig.getUser()) .password(passwdMd5) .build(); - retryer.call(()->{ - JsonResponse login = wvpProxyClient.login(loginReq); - String accessToken = login.getData().getAccessToken(); - log.info("wvp 登录成功 accessToken => {}", accessToken); + AsyncContext asyncContext = request.startAsync(); + asyncContext.setTimeout(0); + asyncContext.start(() -> { + HttpServletResponse asyncResponse = (HttpServletResponse) asyncContext.getResponse(); + try { + genericRetryer.call(() -> { + JsonResponse login = wvpProxyClient.login(loginReq); + String accessToken = login.getData().getAccessToken(); + log.info("wvp 登录成功 accessToken => {}", accessToken); - downloadService.download(request,response,"http://192.168.1.241:18979/download/recordTemp/0490d767d94ce20aedce57c862b6bfe9/rtp/59777645.mp4"); - return login; + log.debug("通过 wvp 查询设备 国标id(gbDeviceId => {}) 通道信息", deviceId); + JsonResponse deviceChannels = wvpProxyClient.getDeviceChannels(accessToken, deviceId, GetDeviceChannelsReq.builder().build()); + if (deviceChannels.getData() == null || deviceChannels.getData().getTotal() == 0) { + writeErrorToResponse(asyncResponse, JsonResponse.error(MessageFormat.format("未能获取 设备:{0}, 国标id: {1}, 的通道信息", deviceCode, deviceId))); + return JsonResponse.success(null); + } + List list = deviceChannels.getData().getList(); + log.info("通过 wvp 获取到 查询设备 国标id(gbDeviceId => {}), 通道数量 => {}", deviceId, list.size()); + DeviceChannel deviceChannel = list.parallelStream().filter(item -> item.getChannelId().equalsIgnoreCase(channelId)).findFirst().orElse(null); + if (deviceChannel == null) { + writeErrorToResponse(asyncResponse, JsonResponse.error(MessageFormat.format("未查询到 设备:{0}, 国标id: {1}, 通道: {2} 信息", deviceCode, deviceId, channelId))); + return JsonResponse.success(null); + } + + downloadService.download(asyncResponse, "http://192.168.1.241:18979/download/recordTemp/0490d767d94ce20aedce57c862b6bfe9/rtp/59777645.mp4"); + return login; + }); + } catch (RetryException e) { + String reason = MessageFormat.format("查询失败, 已重试 {0} 次", e.getNumberOfFailedAttempts()); + log.error(reason); + writeErrorToResponse(asyncResponse, JsonResponse.error(reason)); + } catch (Exception e) { + writeErrorToResponse(asyncResponse, JsonResponse.error(e.getMessage())); + } finally { + asyncContext.complete(); + } }); } } diff --git a/gb28181-wvp-proxy-starter/src/main/resources/application.yml b/gb28181-wvp-proxy-starter/src/main/resources/application.yml index c42a141..714280f 100644 --- a/gb28181-wvp-proxy-starter/src/main/resources/application.yml +++ b/gb28181-wvp-proxy-starter/src/main/resources/application.yml @@ -25,8 +25,8 @@ spring: username: root password: 123456a url: jdbc:mysql://192.168.1.241:3306/gb28181_docking_platform?createDatabaseIfNotExist=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai -# profiles: -# active: local + profiles: + active: local cloud: openfeign: httpclient: