From e37e9b267708c6228b97907d1a3c6de3c5982882 Mon Sep 17 00:00:00 2001 From: shikong <919411476@qq.com> Date: Wed, 13 Mar 2024 12:36:25 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4=20ffmpeg=20=E5=90=AF?= =?UTF-8?q?=E5=8A=A8=E6=97=B6=E6=9C=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../gb28181/Gb28181DownloadService.java | 35 ++++++------------- 1 file changed, 10 insertions(+), 25 deletions(-) diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/Gb28181DownloadService.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/Gb28181DownloadService.java index 5fab897..e786cf6 100644 --- a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/Gb28181DownloadService.java +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/Gb28181DownloadService.java @@ -570,8 +570,8 @@ public class Gb28181DownloadService { Flow.Subscriber subscriber = inviteSubscriber(docking,device,subscribeKey,cacheKey, ssrc, streamId, result, time + 60, TimeUnit.SECONDS); subscribe.getInviteSubscribe().addSubscribe(subscribeKey, subscriber); RedisUtil.StringOps.set(cacheKey, callId.getCallId()); - // 用以 提前 启动 ffmpeg 预备录制 - // result.completeAsync(() -> new VideoInfo(streamId, videoRtmpUrl(streamId), callId.getCallId(), device), executor); + // 用以 提前 启动 ffmpeg 预备录制, 需要配置 ffmpeg rw_timeout 时长 避免收不到流 + result.completeAsync(() -> new VideoInfo(streamId, videoRtmpUrl(streamId), callId.getCallId(), device), executor); return SipRequestBuilder.createInviteRequest(ip, port, docking, device.getGbDeviceChannelId(), description.toString(), SipUtil.generateViaTag(), SipUtil.generateFromTag(), null, ssrc, callId); }; } @@ -580,7 +580,6 @@ public class Gb28181DownloadService { ScheduledFuture[] schedule = new ScheduledFuture[1]; Flow.Subscriber subscriber = new Flow.Subscriber<>() { private Flow.Subscription subscription; - private boolean isStart = false; @Override public void onSubscribe(Flow.Subscription subscription) { this.subscription = subscription; @@ -592,14 +591,6 @@ public class Gb28181DownloadService { public void onNext(SIPResponse item) { int statusCode = item.getStatusCode(); log.debug("{} 收到订阅消息 {}", subscribeKey, item); - if(statusCode == Response.OK){ - String callId = item.getCallId().getCallId(); - if(!isStart){ - isStart = true; - result.completeAsync(() -> new VideoInfo(streamId,videoRtmpUrl(streamId), callId, device), executor); - } - } - if (statusCode == Response.TRYING) { log.info("订阅 {} {} 尝试连接流媒体服务", MessageProcessor.Method.INVITE, subscribeKey); subscription.request(1); @@ -607,20 +598,14 @@ public class Gb28181DownloadService { log.info("订阅 {} {} 流媒体服务连接成功, 开始推送视频流", MessageProcessor.Method.INVITE, subscribeKey); log.info("收到响应状态 {}", statusCode); String callId = item.getCallId().getCallId(); - if(!isStart){ - isStart = true; - result.completeAsync(() -> new VideoInfo(streamId,videoRtmpUrl(streamId), callId, device), executor); - } - scheduledExecutorService.schedule(()->{ - sender.sendRequest(((provider, ip, port) -> { - String fromTag = item.getFromTag(); - String toTag = item.getToTag(); - String key = GenericSubscribe.Helper.getKey(Request.BYE, callId); - subscribe.getByeSubscribe().addPublisher(key); - subscribe.getByeSubscribe().addSubscribe(key, byeSubscriber(key, device, cacheKey, streamId, time, unit)); - return SipRequestBuilder.createAckRequest(Response.OK, ip, port, docking, device.getGbDeviceChannelId(), fromTag, toTag, callId); - })); - },1000, TimeUnit.MILLISECONDS); + sender.sendRequest(((provider, ip, port) -> { + String fromTag = item.getFromTag(); + String toTag = item.getToTag(); + String key = GenericSubscribe.Helper.getKey(Request.BYE, callId); + subscribe.getByeSubscribe().addPublisher(key); + subscribe.getByeSubscribe().addSubscribe(key, byeSubscriber(key, device, cacheKey, streamId, time, unit)); + return SipRequestBuilder.createAckRequest(Response.OK, ip, port, docking, device.getGbDeviceChannelId(), fromTag, toTag, callId); + })); } else { log.info("订阅 {} {} 连接流媒体服务时出现异常, 终止订阅", MessageProcessor.Method.INVITE, subscribeKey); zlmMediaService.closeRtpServer(new CloseRtpServer(streamId));