From d17dfb7f764a1261bbcc5759ca12137083b6699d Mon Sep 17 00:00:00 2001 From: shikong <919411476@qq.com> Date: Thu, 31 Aug 2023 16:14:59 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8E=86=E5=8F=B2=E5=9B=9E=E6=94=BE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../gb28181/api/play/PlayController.java | 18 ++- .../gb28181/api/play/dto/RecordPlayDTO.java | 38 ++++++ .../gb28181/api/play/dto/RecordStopDTO.java | 33 ++++++ .../gb28181/service/play/PlayService.java | 110 ++++++++++++++++++ 4 files changed, 196 insertions(+), 3 deletions(-) create mode 100644 api/src/main/java/cn/skcks/docking/gb28181/api/play/dto/RecordPlayDTO.java create mode 100644 api/src/main/java/cn/skcks/docking/gb28181/api/play/dto/RecordStopDTO.java diff --git a/api/src/main/java/cn/skcks/docking/gb28181/api/play/PlayController.java b/api/src/main/java/cn/skcks/docking/gb28181/api/play/PlayController.java index 17e4361..f38a106 100644 --- a/api/src/main/java/cn/skcks/docking/gb28181/api/play/PlayController.java +++ b/api/src/main/java/cn/skcks/docking/gb28181/api/play/PlayController.java @@ -4,6 +4,8 @@ import cn.skcks.docking.gb28181.annotation.web.JsonMapping; import cn.skcks.docking.gb28181.annotation.web.methods.GetJson; import cn.skcks.docking.gb28181.api.play.dto.RealTimePlayDTO; import cn.skcks.docking.gb28181.api.play.dto.RealTimeStopDTO; +import cn.skcks.docking.gb28181.api.play.dto.RecordPlayDTO; +import cn.skcks.docking.gb28181.api.play.dto.RecordStopDTO; import cn.skcks.docking.gb28181.common.json.JsonResponse; import cn.skcks.docking.gb28181.config.SwaggerConfig; import cn.skcks.docking.gb28181.service.play.PlayService; @@ -16,7 +18,7 @@ import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.context.request.async.DeferredResult; -@Tag(name="播放") +@Tag(name = "播放") @RestController @JsonMapping("/api/device/play") @RequiredArgsConstructor @@ -29,12 +31,22 @@ public class PlayController { } @GetJson("/realTimePlay") - public DeferredResult> realTimePlay(@ParameterObject @Validated RealTimePlayDTO dto){ + public DeferredResult> realTimePlay(@ParameterObject @Validated RealTimePlayDTO dto) { return playService.realTimePlay(dto.getDeviceId(), dto.getChannelId(), dto.getTimeout()); } @GetJson("/realtimeStop") - public JsonResponse realTimeStop(@ParameterObject @Validated RealTimeStopDTO dto){ + public JsonResponse realTimeStop(@ParameterObject @Validated RealTimeStopDTO dto) { return playService.realTimeStop(dto.getDeviceId(), dto.getChannelId()); } + + @GetJson("/recordPlay") + public DeferredResult> recordPlay(@ParameterObject @Validated RecordPlayDTO dto) { + return playService.recordPlay(dto.getDeviceId(), dto.getChannelId(), dto.getStartTime(), dto.getEndTime(), dto.getTimeout()); + } + + @GetJson("/recordStop") + public JsonResponse recordStop(@ParameterObject @Validated RecordStopDTO dto) { + return playService.recordStop(dto.getDeviceId(), dto.getChannelId(), dto.getStartTime(), dto.getEndTime()); + } } diff --git a/api/src/main/java/cn/skcks/docking/gb28181/api/play/dto/RecordPlayDTO.java b/api/src/main/java/cn/skcks/docking/gb28181/api/play/dto/RecordPlayDTO.java new file mode 100644 index 0000000..1462423 --- /dev/null +++ b/api/src/main/java/cn/skcks/docking/gb28181/api/play/dto/RecordPlayDTO.java @@ -0,0 +1,38 @@ +package cn.skcks.docking.gb28181.api.play.dto; + +import cn.hutool.core.date.DatePattern; +import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant; +import com.fasterxml.jackson.annotation.JsonFormat; +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.validation.constraints.Min; +import jakarta.validation.constraints.NotBlank; +import lombok.Data; +import org.springframework.format.annotation.DateTimeFormat; + +import java.util.Date; + +@Schema(title = "历史回放") +@Data +public class RecordPlayDTO { + @NotBlank + @Schema(description = "设备id", example = "44050100001180000001") + private String deviceId; + + @NotBlank + @Schema(description = "通道id", example = "44050100001180000001") + private String channelId; + + @DateTimeFormat(pattern= DatePattern.NORM_DATETIME_PATTERN) + @JsonFormat(pattern = DatePattern.UTC_SIMPLE_PATTERN, timezone = GB28181Constant.TIME_ZONE) + @Schema(description = "开始时间", example = "2023-08-31 00:00:00") + private Date startTime; + + @DateTimeFormat(pattern= DatePattern.NORM_DATETIME_PATTERN) + @JsonFormat(pattern = DatePattern.UTC_SIMPLE_PATTERN, timezone = GB28181Constant.TIME_ZONE) + @Schema(description = "结束时间", example = "2023-08-31 00:15:00") + private Date endTime; + + @Min(30) + @Schema(description = "超时时间(秒)", example = "30") + private long timeout = 30; +} diff --git a/api/src/main/java/cn/skcks/docking/gb28181/api/play/dto/RecordStopDTO.java b/api/src/main/java/cn/skcks/docking/gb28181/api/play/dto/RecordStopDTO.java new file mode 100644 index 0000000..56c11b8 --- /dev/null +++ b/api/src/main/java/cn/skcks/docking/gb28181/api/play/dto/RecordStopDTO.java @@ -0,0 +1,33 @@ +package cn.skcks.docking.gb28181.api.play.dto; + +import cn.hutool.core.date.DatePattern; +import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant; +import com.fasterxml.jackson.annotation.JsonFormat; +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.validation.constraints.NotBlank; +import lombok.Data; +import org.springframework.format.annotation.DateTimeFormat; + +import java.util.Date; + +@Schema(title = "关闭历史回放") +@Data +public class RecordStopDTO { + @NotBlank + @Schema(description = "设备id", example = "44050100001180000001") + private String deviceId; + + @NotBlank + @Schema(description = "通道id", example = "44050100001180000001") + private String channelId; + + @DateTimeFormat(pattern= DatePattern.NORM_DATETIME_PATTERN) + @JsonFormat(pattern = DatePattern.UTC_SIMPLE_PATTERN, timezone = GB28181Constant.TIME_ZONE) + @Schema(description = "开始时间", example = "2023-08-31 00:00:00") + private Date startTime; + + @DateTimeFormat(pattern= DatePattern.NORM_DATETIME_PATTERN) + @JsonFormat(pattern = DatePattern.UTC_SIMPLE_PATTERN, timezone = GB28181Constant.TIME_ZONE) + @Schema(description = "结束时间", example = "2023-08-31 00:15:00") + private Date endTime; +} diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/play/PlayService.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/play/PlayService.java index bcef74c..2b3984f 100644 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/play/PlayService.java +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/play/PlayService.java @@ -215,6 +215,116 @@ public class PlayService { result.setResult(JsonResponse.success(videoUrl(streamId))); return result; } + + GetRtpInfoResp rtpInfo = zlmMediaService.getRtpInfo(streamId); + if(rtpInfo.getExist()){ + result.setResult(JsonResponse.error(MessageFormat.format("流 {0} 已存在", streamId))); + return result; + } + + int streamMode = device.getStreamMode() == null || device.getStreamMode().equalsIgnoreCase(ListeningPoint.UDP) ? 0 : 1; + OpenRtpServer openRtpServer = new OpenRtpServer(); + openRtpServer.setPort(0); + openRtpServer.setStreamId(streamId); + openRtpServer.setTcpMode(streamMode); + OpenRtpServerResp openRtpServerResp = zlmMediaService.openRtpServer(openRtpServer); + log.info("openRtpServerResp => {}", openRtpServerResp); + if(!openRtpServerResp.getCode().equals(ResponseStatus.Success)){ + result.setResult(JsonResponse.error(openRtpServerResp.getCode().getMsg())); + return result; + } + + String ip = zlmMediaConfig.getIp(); + int port = openRtpServerResp.getPort(); + String ssrc = ssrcService.getPlaySsrc(); + GB28181Description description = MediaSdpHelper.playback(deviceId, channelId, Connection.IP4, ip, port, ssrc, StreamMode.of(device.getStreamMode()),startTime,endTime); + + String transport = device.getTransport(); + String senderIp = device.getLocalIp(); + SipProvider provider = sipService.getProvider(transport, senderIp); + CallIdHeader callId = provider.getNewCallId(); + + Request request = SipRequestBuilder.createInviteRequest(device, channelId, description.toString(), SipUtil.generateViaTag(), SipUtil.generateFromTag(), null, ssrc, callId); + String subscribeKey = GenericSubscribe.Helper.getKey(MessageProcessor.Method.INVITE, callId.getCallId()); + subscribe.getInviteSubscribe().addPublisher(subscribeKey); + Flow.Subscriber subscriber = new Flow.Subscriber<>() { + private Flow.Subscription subscription; + @Override + public void onSubscribe(Flow.Subscription subscription) { + this.subscription = subscription; + log.info("订阅 {} {}",MessageProcessor.Method.INVITE,subscribeKey); + subscription.request(1); + } + + @Override + public void onNext(SIPResponse item) { + int statusCode = item.getStatusCode(); + log.debug("{} 收到订阅消息 {}", subscribeKey, item); + if(statusCode == Response.TRYING){ + log.info("订阅 {} {} 尝试连接流媒体服务", MessageProcessor.Method.INVITE,subscribeKey); + subscription.request(1); + } else if(statusCode>=Response.OK && statusCode < Response.MULTIPLE_CHOICES){ + log.info("订阅 {} {} 流媒体服务连接成功, 开始推送视频流", MessageProcessor.Method.INVITE,subscribeKey); + RedisUtil.StringOps.set(key, JsonUtils.toCompressJson(new SipTransactionInfo(item))); + RedisUtil.StringOps.set(CacheUtil.getKey(key,"ssrc"), ssrc); + result.setResult(JsonResponse.success(videoUrl(streamId))); + onComplete(); + } else { + log.info("订阅 {} {} 连接流媒体服务时出现异常, 终止订阅", MessageProcessor.Method.INVITE,subscribeKey); + RedisUtil.KeyOps.delete(key); + RedisUtil.KeyOps.delete(CacheUtil.getKey(key,"ssrc")); + result.setResult(JsonResponse.error("连接流媒体服务失败")); + ssrcService.releaseSsrc(zlmMediaConfig.getId(), ssrc); + onComplete(); + } + } + + @Override + public void onError(Throwable throwable) { + + } + + @Override + public void onComplete() { + subscribe.getRecordInfoSubscribe().delPublisher(subscribeKey); + } + }; + subscribe.getInviteSubscribe().addSubscribe(subscribeKey, subscriber); + sender.send(senderIp, request); + result.onTimeout(()->{ + subscribe.getInviteSubscribe().delPublisher(subscribeKey); + result.setResult(JsonResponse.error("点播超时")); + }); return result; } + + @SneakyThrows + public JsonResponse recordStop(String deviceId, String channelId, Date startTime, Date endTime){ + DockingDevice device = deviceService.getDevice(deviceId); + if (device == null) { + log.info("未能找到 编码为 => {} 的设备", deviceId); + return JsonResponse.error(null, "未找到设备"); + } + + long start = startTime.toInstant().getEpochSecond(); + long end = endTime.toInstant().getEpochSecond(); + String streamId = MediaSdpHelper.getStreamId(deviceId,channelId,String.valueOf(start), String.valueOf(end)); + String key = CacheUtil.getKey(MediaSdpHelper.Action.PLAY_BACK.getAction(), deviceId, channelId); + String ssrcKey = CacheUtil.getKey(key,"ssrc"); + zlmMediaService.closeRtpServer(new CloseRtpServer(streamId)); + SipTransactionInfo transactionInfo = JsonUtils.parse(RedisUtil.StringOps.get(key), SipTransactionInfo.class); + if(transactionInfo == null){ + return JsonResponse.error("未找到连接信息"); + } + Request request = SipRequestBuilder.createByeRequest(device, channelId, transactionInfo); + String senderIp = device.getLocalIp(); + sender.send(senderIp, request); + + String ssrc = RedisUtil.StringOps.get(ssrcKey); + ssrcService.releaseSsrc(zlmMediaConfig.getId(),ssrc); + RedisUtil.KeyOps.delete(ssrcKey); + RedisUtil.KeyOps.delete(key); + return JsonResponse.success(null); + } + }