From 86b87ba55e88378fbf8d66fb4660d1c86c2cc657 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Thu, 3 Aug 2023 15:50:15 +0800 Subject: [PATCH 1/4] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dtcp=E4=B8=BB=E5=8A=A8?= =?UTF-8?q?=E5=BD=95=E5=83=8F=E5=9B=9E=E6=94=BE=E6=97=B6ssrc=E4=B8=8D?= =?UTF-8?q?=E4=B8=80=E8=87=B4=E6=97=B6=E7=82=B9=E6=92=AD=E5=A4=B1=E8=B4=A5?= =?UTF-8?q?=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../genersoft/iot/vmp/conf/DynamicTask.java | 3 +- .../iot/vmp/service/impl/PlayServiceImpl.java | 43 +++---------------- 2 files changed, 9 insertions(+), 37 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java b/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java index 041d7388..f83eaf1c 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java @@ -1,5 +1,6 @@ package com.genersoft.iot.vmp.conf; +import org.apache.commons.lang3.ObjectUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Scheduled; @@ -100,7 +101,7 @@ public class DynamicTask { public boolean stop(String key) { boolean result = false; - if (futureMap.get(key) != null && !futureMap.get(key).isCancelled() && !futureMap.get(key).isDone()) { + if (!ObjectUtils.isEmpty(futureMap.get(key)) && !futureMap.get(key).isCancelled() && !futureMap.get(key).isDone()) { result = futureMap.get(key).cancel(false); futureMap.remove(key); runnableMap.remove(key); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index 2fe72f45..99b7c52b 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -422,11 +422,11 @@ public class PlayServiceImpl implements IPlayService { break; } } - logger.info("[点播-TCP主动连接对方] deviceId: {}, channelId: {}, 连接对方的地址:{}:{}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, sdp.getConnection().getAddress(), port, device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck()); + logger.info("[TCP主动连接对方] deviceId: {}, channelId: {}, 连接对方的地址:{}:{}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, sdp.getConnection().getAddress(), port, device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck()); JSONObject jsonObject = zlmresTfulUtils.connectRtpServer(mediaServerItem, sdp.getConnection().getAddress(), port, ssrcInfo.getStream()); - logger.info("[点播-TCP主动连接对方] 结果: {}", jsonObject); + logger.info("[TCP主动连接对方] 结果: {}", jsonObject); } catch (SdpException e) { - logger.error("[点播-TCP主动连接对方] deviceId: {}, channelId: {}, 解析200OK的SDP信息失败", device.getDeviceId(), channelId, e); + logger.error("[TCP主动连接对方] deviceId: {}, channelId: {}, 解析200OK的SDP信息失败", device.getDeviceId(), channelId, e); dynamicTask.stop(timeOutTaskKey); mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); // 释放ssrc @@ -648,39 +648,7 @@ public class PlayServiceImpl implements IPlayService { // 查询到ssrc不一致且开启了ssrc校验则需要针对处理 if (ssrcInfo.getSsrc().equals(ssrcInResponse)) { if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) { - String substring = contentString.substring(0, contentString.indexOf("y=")); - try { - SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring); - int port = -1; - Vector mediaDescriptions = sdp.getMediaDescriptions(true); - for (Object description : mediaDescriptions) { - MediaDescription mediaDescription = (MediaDescription) description; - Media media = mediaDescription.getMedia(); - - Vector mediaFormats = media.getMediaFormats(false); - if (mediaFormats.contains("96")) { - port = media.getMediaPort(); - break; - } - } - logger.info("[录像回放-TCP主动连接对方] deviceId: {}, channelId: {}, 连接对方的地址:{}:{}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, sdp.getConnection().getAddress(), port, device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck()); - JSONObject jsonObject = zlmresTfulUtils.connectRtpServer(mediaServerItem, sdp.getConnection().getAddress(), port, ssrcInfo.getStream()); - logger.info("[录像回放-TCP主动连接对方] 结果: {}", jsonObject); - } catch (SdpException e) { - logger.error("[录像回放-TCP主动连接对方] deviceId: {}, channelId: {}, 解析200OK的SDP信息失败", device.getDeviceId(), channelId, e); - dynamicTask.stop(playBackTimeOutTaskKey); - mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); - // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - - streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); - - callback.run(InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(), - InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null); - inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null, - InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(), - InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null); - } + tcpActiveHandler(device, channelId, contentString, mediaServerItem, playBackTimeOutTaskKey, ssrcInfo, callback); } return; } @@ -734,6 +702,9 @@ public class PlayServiceImpl implements IPlayService { ssrcInfo.setSsrc(ssrcInResponse); inviteInfo.setSsrcInfo(ssrcInfo); inviteInfo.setStream(ssrcInfo.getStream()); + if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) { + tcpActiveHandler(device, channelId, contentString, mediaServerItem, playBackTimeOutTaskKey, ssrcInfo, callback); + } } }else { logger.info("[点播消息] 收到invite 200, 下级自定义了ssrc, 但是当前模式无需修正"); From 626959d0153a84cb3137569ce46e6928320e4e20 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Thu, 3 Aug 2023 16:08:22 +0800 Subject: [PATCH 2/4] =?UTF-8?q?=E9=BB=98=E8=AE=A4=E5=85=B3=E9=97=ADssrc=20?= =?UTF-8?q?check?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/gb28181/bean/Device.java | 2 +- .../iot/vmp/service/impl/PlayServiceImpl.java | 37 ++----------------- 2 files changed, 5 insertions(+), 34 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Device.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Device.java index 1c7baac9..34d624f4 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Device.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Device.java @@ -165,7 +165,7 @@ public class Device { * 是否开启ssrc校验,默认关闭,开启可以防止串流 */ @Schema(description = "是否开启ssrc校验,默认关闭,开启可以防止串流") - private boolean ssrcCheck = true; + private boolean ssrcCheck = false; /** * 地理坐标系, 目前支持 WGS84,GCJ02 diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index 99b7c52b..113c8592 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -818,39 +818,7 @@ public class PlayServiceImpl implements IPlayService { // 查询到ssrc不一致且开启了ssrc校验则需要针对处理 if (ssrcInfo.getSsrc().equals(ssrcInResponse)) { if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) { - String substring = contentString.substring(0, contentString.indexOf("y=")); - try { - SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring); - int port = -1; - Vector mediaDescriptions = sdp.getMediaDescriptions(true); - for (Object description : mediaDescriptions) { - MediaDescription mediaDescription = (MediaDescription) description; - Media media = mediaDescription.getMedia(); - - Vector mediaFormats = media.getMediaFormats(false); - if (mediaFormats.contains("96")) { - port = media.getMediaPort(); - break; - } - } - logger.info("[录像下载-TCP主动连接对方] deviceId: {}, channelId: {}, 连接对方的地址:{}:{}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, sdp.getConnection().getAddress(), port, device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck()); - JSONObject jsonObject = zlmresTfulUtils.connectRtpServer(mediaServerItem, sdp.getConnection().getAddress(), port, ssrcInfo.getStream()); - logger.info("[录像下载-TCP主动连接对方] 结果: {}", jsonObject); - } catch (SdpException e) { - logger.error("[录像下载-TCP主动连接对方] deviceId: {}, channelId: {}, 解析200OK的SDP信息失败", device.getDeviceId(), channelId, e); - dynamicTask.stop(downLoadTimeOutTaskKey); - mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); - // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - - streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); - - callback.run(InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(), - InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null); - inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null, - InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(), - InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null); - } + tcpActiveHandler(device, channelId, contentString, mediaServerItem, downLoadTimeOutTaskKey, ssrcInfo, callback); } return; } @@ -902,6 +870,9 @@ public class PlayServiceImpl implements IPlayService { ssrcInfo.setSsrc(ssrcInResponse); inviteInfo.setSsrcInfo(ssrcInfo); inviteInfo.setStream(ssrcInfo.getStream()); + if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) { + tcpActiveHandler(device, channelId, contentString, mediaServerItem, downLoadTimeOutTaskKey, ssrcInfo, callback); + } } }else { logger.info("[录像下载] 收到invite 200, 下级自定义了ssrc, 但是当前模式无需修正"); From 16564481c4ddf8bf03669e72b828a0d210161650 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Fri, 4 Aug 2023 15:47:56 +0800 Subject: [PATCH 3/4] =?UTF-8?q?=E9=87=8D=E6=9E=84=E7=82=B9=E6=92=AD?= =?UTF-8?q?=EF=BC=8C=E5=9B=9E=E6=94=BE=EF=BC=8C=E4=B8=8B=E8=BD=BD=E6=97=B6?= =?UTF-8?q?ssrc=E4=B8=8D=E4=B8=80=E8=87=B4=E4=BB=A5=E5=8F=8ATCP=E4=B8=BB?= =?UTF-8?q?=E5=8A=A8=E6=92=AD=E6=94=BE=E7=9A=84=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../genersoft/iot/vmp/conf/DynamicTask.java | 17 +- .../request/impl/ByeRequestProcessor.java | 2 +- .../cmd/KeepaliveNotifyMessageHandler.java | 11 +- .../vmp/media/zlm/ZLMHttpHookListener.java | 1 + .../service/impl/InviteStreamServiceImpl.java | 3 + .../service/impl/MediaServerServiceImpl.java | 7 +- .../iot/vmp/service/impl/PlayServiceImpl.java | 420 +++++++----------- .../com/genersoft/iot/vmp/utils/DateUtil.java | 6 + .../vmanager/gb28181/play/PlayController.java | 4 +- 9 files changed, 198 insertions(+), 273 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java b/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java index f83eaf1c..873feab5 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java @@ -46,6 +46,9 @@ public class DynamicTask { * @return */ public void startCron(String key, Runnable task, int cycleForCatalog) { + if(ObjectUtils.isEmpty(key)) { + return; + } ScheduledFuture future = futureMap.get(key); if (future != null) { if (future.isCancelled()) { @@ -74,6 +77,9 @@ public class DynamicTask { * @return */ public void startDelay(String key, Runnable task, int delay) { + if(ObjectUtils.isEmpty(key)) { + return; + } stop(key); // 获取执行的时刻 @@ -100,9 +106,12 @@ public class DynamicTask { } public boolean stop(String key) { + if(ObjectUtils.isEmpty(key)) { + return false; + } boolean result = false; if (!ObjectUtils.isEmpty(futureMap.get(key)) && !futureMap.get(key).isCancelled() && !futureMap.get(key).isDone()) { - result = futureMap.get(key).cancel(false); + result = futureMap.get(key).cancel(true); futureMap.remove(key); runnableMap.remove(key); } @@ -110,6 +119,9 @@ public class DynamicTask { } public boolean contains(String key) { + if(ObjectUtils.isEmpty(key)) { + return false; + } return futureMap.get(key) != null; } @@ -118,6 +130,9 @@ public class DynamicTask { } public Runnable get(String key) { + if(ObjectUtils.isEmpty(key)) { + return null; + } return runnableMap.get(key); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java index 374e5dcc..b6aac9c7 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java @@ -138,7 +138,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In logger.info("[收到bye] {} 通知设备停止推流时未找到设备信息", streamId); } try { - logger.warn("[停止点播] {}/{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); + logger.info("[停止点播] {}/{}", sendRtpItem.getDeviceId(), sendRtpItem.getChannelId()); cmder.streamByeCmd(device, sendRtpItem.getChannelId(), streamId, null); } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java index 5c577ba6..7b9f69a3 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java @@ -33,7 +33,7 @@ import java.text.ParseException; public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler { - private Logger logger = LoggerFactory.getLogger(KeepaliveNotifyMessageHandler.class); + private final Logger logger = LoggerFactory.getLogger(KeepaliveNotifyMessageHandler.class); private final static String cmdType = "Keepalive"; @Autowired @@ -59,14 +59,19 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp // 未注册的设备不做处理 return; } - logger.info("[收到心跳], device: {}", device.getDeviceId()); SIPRequest request = (SIPRequest) evt.getRequest(); + logger.info("[收到心跳], device: {}, callId: {}", device.getDeviceId(), request.getCallIdHeader().getCallId()); + // 回复200 OK try { responseAck(request, Response.OK); } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[命令发送失败] 心跳回复: {}", e.getMessage()); } + if (DateUtil.getDifferenceForNow(device.getKeepaliveTime()) <= 3000L){ + logger.info("[收到心跳] 心跳发送过于频繁,已忽略 device: {}, callId: {}", device.getDeviceId(), request.getCallIdHeader().getCallId()); + return; + } RemoteAddressInfo remoteAddressInfo = SipUtils.getRemoteAddressFromRequest(request, userSetting.getSipUseSourceIpAsRemoteAddress()); if (!device.getIp().equalsIgnoreCase(remoteAddressInfo.getIp()) || device.getPort() != remoteAddressInfo.getPort()) { @@ -80,7 +85,7 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp }else { long lastTime = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(device.getKeepaliveTime()); if (System.currentTimeMillis()/1000-lastTime > 10) { - device.setKeepaliveIntervalTime(new Long(System.currentTimeMillis()/1000-lastTime).intValue()); + device.setKeepaliveIntervalTime(Long.valueOf(System.currentTimeMillis()/1000-lastTime).intValue()); } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 9b2864fd..465aa2f7 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -494,6 +494,7 @@ public class ZLMHttpHookListener { Device device = deviceService.getDevice(inviteInfo.getDeviceId()); if (device != null) { try { + // 多查询一次防止已经被处理了 InviteInfo info = inviteStreamService.getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream()); if (info != null) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java index f4128163..6e460823 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java @@ -98,6 +98,9 @@ public class InviteStreamServiceImpl implements IInviteStreamService { "_" + inviteInfo.getChannelId() + "_" + stream; inviteInfoInDb.setStream(stream); + if (inviteInfoInDb.getSsrcInfo() != null) { + inviteInfoInDb.getSsrcInfo().setStream(stream); + } redisTemplate.opsForValue().set(key, inviteInfoInDb); return inviteInfoInDb; } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java index 142b8100..580561b3 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java @@ -151,9 +151,14 @@ public class MediaServerServiceImpl implements IMediaServerService { if (streamId == null) { streamId = String.format("%08x", Integer.parseInt(ssrc)).toUpperCase(); } + int ssrcCheckParam = 0; + if (ssrcCheck && tcpMode > 1) { + // 目前zlm不支持 tcp模式更新ssrc,暂时关闭ssrc校验 + logger.warn("[openRTPServer] TCP被动/TCP主动收流时,默认关闭ssrc检验"); + } int rtpServerPort; if (mediaServerItem.isRtpEnable()) { - rtpServerPort = zlmServerFactory.createRTPServer(mediaServerItem, streamId, ssrcCheck?Integer.parseInt(ssrc):0, port, reUsePort, tcpMode); + rtpServerPort = zlmServerFactory.createRTPServer(mediaServerItem, streamId, (ssrcCheck && tcpMode == 0)?Integer.parseInt(ssrc):0, port, reUsePort, tcpMode); } else { rtpServerPort = mediaServerItem.getRtpProxyPort(); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index 113c8592..260b9a42 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -17,6 +17,7 @@ import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; +import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; @@ -34,6 +35,7 @@ import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; +import gov.nist.javax.sip.message.SIPResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -95,7 +97,6 @@ public class PlayServiceImpl implements IPlayService { @Autowired private VideoStreamSessionManager streamSession; - @Autowired private IDeviceService deviceService; @@ -108,25 +109,25 @@ public class PlayServiceImpl implements IPlayService { @Autowired private ZlmHttpHookSubscribe subscribe; - @Autowired - private SSRCFactory ssrcFactory; - - @Autowired - private RedisTemplate redisTemplate; - @Override public SSRCInfo play(MediaServerItem mediaServerItem, String deviceId, String channelId, String ssrc, ErrorCallback callback) { if (mediaServerItem == null) { + logger.warn("[点播] 未找到可用的zlm deviceId: {},channelId:{}", deviceId, channelId); throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的zlm"); } Device device = redisCatchStorage.getDevice(deviceId); + if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE") && !mediaServerItem.isRtpEnable()) { + logger.warn("[点播] 单端口收流时不支持TCP主动方式收流 deviceId: {},channelId:{}", deviceId, channelId); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "单端口收流时不支持TCP主动方式收流"); + } InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId); if (inviteInfo != null ) { if (inviteInfo.getStreamInfo() == null) { // 点播发起了但是尚未成功, 仅注册回调等待结果即可 inviteStreamService.once(InviteSessionType.PLAY, deviceId, channelId, null, callback); + logger.info("[点播开始] 已经请求中,等待结果, deviceId: {}, channelId: {}", device.getDeviceId(), channelId); return inviteInfo.getSsrcInfo(); }else { StreamInfo streamInfo = inviteInfo.getStreamInfo(); @@ -149,6 +150,7 @@ public class PlayServiceImpl implements IPlayService { InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo); + logger.info("[点播已存在] 直接返回, deviceId: {}, channelId: {}", device.getDeviceId(), channelId); return inviteInfo.getSsrcInfo(); }else { // 点播发起了但是尚未成功, 仅注册回调等待结果即可 @@ -171,7 +173,6 @@ public class PlayServiceImpl implements IPlayService { null); return null; } - // TODO 记录点播的状态 play(mediaServerItem, ssrcInfo, device, channelId, callback); return ssrcInfo; } @@ -187,8 +188,8 @@ public class PlayServiceImpl implements IPlayService { null); return; } - logger.info("[点播开始] deviceId: {}, channelId: {},码流类型:{}, 收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验:{}", - device.getDeviceId(), channelId, device.isSwitchPrimarySubStream() ? "辅码流" : "主码流", ssrcInfo.getPort(), + logger.info("[点播开始] deviceId: {}, channelId: {},码流类型:{}, 收流端口: {}, STREAM:{}, 收流模式:{}, SSRC: {}, SSRC校验:{}", + device.getDeviceId(), channelId, device.isSwitchPrimarySubStream() ? "辅码流" : "主码流", ssrcInfo.getPort(), ssrcInfo.getStream(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck()); //端口获取失败的ssrcInfo 没有必要发送点播指令 if (ssrcInfo.getPort() <= 0) { @@ -219,16 +220,6 @@ public class PlayServiceImpl implements IPlayService { device.getDeviceId(), channelId, device.isSwitchPrimarySubStream() ? "辅码流" : "主码流", ssrcInfo.getPort(), ssrcInfo.getSsrc()); - // 点播超时回复BYE 同时释放ssrc以及此次点播的资源 -// InviteInfo inviteInfoForTimeout = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.play, device.getDeviceId(), channelId); -// if (inviteInfoForTimeout == null) { -// return; -// } -// if (InviteSessionStatus.ok == inviteInfoForTimeout.getStatus() ) { -// // TODO 发送bye -// }else { -// // TODO 发送cancel -// } callback.run(InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null); inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null, InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null); @@ -272,99 +263,10 @@ public class PlayServiceImpl implements IPlayService { logger.info("[点播成功] deviceId: {}, channelId:{}, 码流类型:{}", device.getDeviceId(), channelId, device.isSwitchPrimarySubStream() ? "辅码流" : "主码流"); snapOnPlay(mediaServerItemInuse, device.getDeviceId(), channelId, ssrcInfo.getStream()); - }, (event) -> { - inviteInfo.setStatus(InviteSessionStatus.ok); - - ResponseEvent responseEvent = (ResponseEvent) event.event; - String contentString = new String(responseEvent.getResponse().getRawContent()); - // 获取ssrc - int ssrcIndex = contentString.indexOf("y="); - // 检查是否有y字段 - if (ssrcIndex >= 0) { - //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容 - String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12).trim(); - // 查询到ssrc不一致且开启了ssrc校验则需要针对处理 - if (ssrcInfo.getSsrc().equals(ssrcInResponse)) { - if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) { - tcpActiveHandler(device, channelId, contentString, mediaServerItem, timeOutTaskKey, ssrcInfo, callback); - } - return; - } - logger.info("[点播消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse); - if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) { - logger.info("[点播消息] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse); - // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - // 单端口模式streamId也有变化,重新设置监听即可 - if (!mediaServerItem.isRtpEnable()) { - // 添加订阅 - HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId()); - subscribe.removeSubscribe(hookSubscribe); - String stream = String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase(); - hookSubscribe.getContent().put("stream", stream); - inviteStreamService.updateInviteInfoForStream(inviteInfo, stream); - subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> { - logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + hookParam); - dynamicTask.stop(timeOutTaskKey); - // hook响应 - StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInUse, hookParam, device.getDeviceId(), channelId); - if (streamInfo == null){ - callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(), - InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null); - inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null, - InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(), - InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null); - return; - } - callback.run(InviteErrorCode.SUCCESS.getCode(), - InviteErrorCode.SUCCESS.getMsg(), streamInfo); - inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null, - InviteErrorCode.SUCCESS.getCode(), - InviteErrorCode.SUCCESS.getMsg(), - streamInfo); - snapOnPlay(mediaServerItemInUse, device.getDeviceId(), channelId, stream); - }); - return; - } - - // 更新ssrc - Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse); - if (!result) { - try { - logger.warn("[点播] 更新ssrc失败,停止点播 {}/{}", device.getDeviceId(), channelId); - cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null, null); - } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) { - logger.error("[命令发送失败] 停止点播, 发送BYE: {}", e.getMessage()); - } - - dynamicTask.stop(timeOutTaskKey); - // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - - streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); - - callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(), - "下级自定义了ssrc,重新设置收流信息失败", null); - inviteStreamService.call(InviteSessionType.PLAY, device.getDeviceId(), channelId, null, - InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(), - "下级自定义了ssrc,重新设置收流信息失败", null); - - }else { - if (ssrcInfo.getStream()!= null && !ssrcInfo.getStream().equals(inviteInfo.getStream())) { - inviteStreamService.removeInviteInfo(inviteInfo); - } - ssrcInfo.setSsrc(ssrcInResponse); - inviteInfo.setSsrcInfo(ssrcInfo); - inviteInfo.setStream(ssrcInfo.getStream()); - if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) { - tcpActiveHandler(device, channelId, contentString, mediaServerItem, timeOutTaskKey, ssrcInfo, callback); - } - } - }else { - logger.info("[点播消息] 收到invite 200, 下级自定义了ssrc, 但是当前模式无需修正"); - } - } - inviteStreamService.updateInviteInfo(inviteInfo); + }, (eventResult) -> { + // 处理收到200ok后的TCP主动连接以及SSRC不一致的问题 + InviteOKHandler(eventResult, ssrcInfo, mediaServerItem, device, channelId, + timeOutTaskKey, callback, inviteInfo, InviteSessionType.PLAY); }, (event) -> { dynamicTask.stop(timeOutTaskKey); mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); @@ -548,19 +450,23 @@ public class PlayServiceImpl implements IPlayService { String endTime, ErrorCallback callback) { Device device = storager.queryVideoDevice(deviceId); if (device == null) { - return; + logger.warn("[录像回放] 未找到设备 deviceId: {},channelId:{}", deviceId, channelId); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到设备:" + deviceId); } + MediaServerItem newMediaServerItem = getNewMediaServerItem(device); + if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE") && ! newMediaServerItem.isRtpEnable()) { + logger.warn("[录像回放] 单端口收流时不支持TCP主动方式收流 deviceId: {},channelId:{}", deviceId, channelId); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "单端口收流时不支持TCP主动方式收流"); + } String stream = null; if (newMediaServerItem.isRtpEnable()) { String startTimeStr = startTime.replace("-", "") .replace(":", "") .replace(" ", ""); - System.out.println(startTimeStr); String endTimeTimeStr = endTime.replace("-", "") .replace(":", "") .replace(" ", ""); - System.out.println(endTimeTimeStr); stream = deviceId + "_" + channelId + "_" + startTimeStr + "_" + endTimeTimeStr; } SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, stream, null, device.isSsrcCheck(), true, 0, false, device.getStreamModeForParam()); @@ -636,84 +542,13 @@ public class PlayServiceImpl implements IPlayService { try { cmder.playbackStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime, hookEvent, eventResult -> { - inviteInfo.setStatus(InviteSessionStatus.ok); - ResponseEvent responseEvent = (ResponseEvent) eventResult.event; - String contentString = new String(responseEvent.getResponse().getRawContent()); - // 获取ssrc - int ssrcIndex = contentString.indexOf("y="); - // 检查是否有y字段 - if (ssrcIndex >= 0) { - //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容 - String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); - // 查询到ssrc不一致且开启了ssrc校验则需要针对处理 - if (ssrcInfo.getSsrc().equals(ssrcInResponse)) { - if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) { - tcpActiveHandler(device, channelId, contentString, mediaServerItem, playBackTimeOutTaskKey, ssrcInfo, callback); - } - return; - } - logger.info("[录像回放] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse); - if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) { - logger.info("[录像回放] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse); + // 处理收到200ok后的TCP主动连接以及SSRC不一致的问题 + InviteOKHandler(eventResult, ssrcInfo, mediaServerItem, device, channelId, + playBackTimeOutTaskKey, callback, inviteInfo, InviteSessionType.PLAYBACK); - // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - - // 单端口模式streamId也有变化,需要重新设置监听 - if (!mediaServerItem.isRtpEnable()) { - // 添加订阅 - HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId()); - subscribe.removeSubscribe(hookSubscribe); - String stream = String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase(); - hookSubscribe.getContent().put("stream", stream); - inviteStreamService.updateInviteInfoForStream(inviteInfo, stream); - subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> { - logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + hookParam); - dynamicTask.stop(playBackTimeOutTaskKey); - // hook响应 - hookEvent.response(mediaServerItemInUse, hookParam); - }); - } - // 更新ssrc - Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse); - if (!result) { - try { - logger.warn("[录像回放] 更新ssrc失败,停止录像回放 {}/{}", device.getDeviceId(), channelId); - cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null, null); - } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) { - logger.error("[命令发送失败] 停止点播, 发送BYE: {}", e.getMessage()); - - } - - dynamicTask.stop(playBackTimeOutTaskKey); - // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - - streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); - - callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(), - "下级自定义了ssrc,重新设置收流信息失败", null); - - }else { - if (ssrcInfo.getStream()!= null && !ssrcInfo.getStream().equals(inviteInfo.getStream())) { - inviteStreamService.removeInviteInfo(inviteInfo); - } - - ssrcInfo.setSsrc(ssrcInResponse); - inviteInfo.setSsrcInfo(ssrcInfo); - inviteInfo.setStream(ssrcInfo.getStream()); - if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) { - tcpActiveHandler(device, channelId, contentString, mediaServerItem, playBackTimeOutTaskKey, ssrcInfo, callback); - } - } - }else { - logger.info("[点播消息] 收到invite 200, 下级自定义了ssrc, 但是当前模式无需修正"); - } - } - inviteStreamService.updateInviteInfo(inviteInfo); }, errorEvent); } catch (InvalidArgumentException | SipException | ParseException e) { - logger.error("[命令发送失败] 回放: {}", e.getMessage()); + logger.error("[命令发送失败] 录像回放: {}", e.getMessage()); SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(); eventResult.type = SipSubscribe.EventResultType.cmdSendFailEvent; @@ -724,6 +559,121 @@ public class PlayServiceImpl implements IPlayService { } + private void InviteOKHandler(SipSubscribe.EventResult eventResult, SSRCInfo ssrcInfo, MediaServerItem mediaServerItem, + Device device, String channelId, String timeOutTaskKey, ErrorCallback callback, + InviteInfo inviteInfo, InviteSessionType inviteSessionType){ + inviteInfo.setStatus(InviteSessionStatus.ok); + ResponseEvent responseEvent = (ResponseEvent) eventResult.event; + String contentString = new String(responseEvent.getResponse().getRawContent()); + String ssrcInResponse = SipUtils.getSsrcFromSdp(contentString); + if (ssrcInfo.getSsrc().equals(ssrcInResponse)) { + // ssrc 一致 + if (mediaServerItem.isRtpEnable()) { + // 多端口 + if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) { + tcpActiveHandler(device, channelId, contentString, mediaServerItem, timeOutTaskKey, ssrcInfo, callback); + } + }else { + // 单端口 + if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) { + logger.warn("[Invite 200OK] 单端口收流模式不支持tcp主动模式收流"); + } + + } + }else { + logger.info("[Invite 200OK] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse); + // ssrc 不一致 + if (mediaServerItem.isRtpEnable()) { + // 多端口 + if (device.isSsrcCheck()) { + // ssrc检验 + // 更新ssrc + logger.info("[Invite 200OK] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse); + // 释放ssrc + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); + Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse); + if (!result) { + try { + logger.warn("[Invite 200OK] 更新ssrc失败,停止点播 {}/{}", device.getDeviceId(), channelId); + cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null, null); + } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) { + logger.error("[命令发送失败] 停止播放, 发送BYE: {}", e.getMessage()); + } + + dynamicTask.stop(timeOutTaskKey); + // 释放ssrc + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); + + streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); + + callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(), + "下级自定义了ssrc,重新设置收流信息失败", null); + inviteStreamService.call(inviteSessionType, device.getDeviceId(), channelId, null, + InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(), + "下级自定义了ssrc,重新设置收流信息失败", null); + + }else { + ssrcInfo.setSsrc(ssrcInResponse); + inviteInfo.setSsrcInfo(ssrcInfo); + inviteInfo.setStream(ssrcInfo.getStream()); + if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) { + if (mediaServerItem.isRtpEnable()) { + tcpActiveHandler(device, channelId, contentString, mediaServerItem, timeOutTaskKey, ssrcInfo, callback); + }else { + logger.warn("[Invite 200OK] 单端口收流模式不支持tcp主动模式收流"); + } + } + inviteStreamService.updateInviteInfo(inviteInfo); + } + } + }else { + if (ssrcInResponse != null) { + // 单端口 + // 重新订阅流上线 + HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", + ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId()); + subscribe.removeSubscribe(hookSubscribe); + SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(inviteInfo.getDeviceId(), + inviteInfo.getChannelId(), null, inviteInfo.getStream()); + streamSession.remove(inviteInfo.getDeviceId(), + inviteInfo.getChannelId(), inviteInfo.getStream()); + + String stream = String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase(); + hookSubscribe.getContent().put("stream", stream); + + inviteStreamService.updateInviteInfoForStream(inviteInfo, stream); + streamSession.put(device.getDeviceId(), channelId, ssrcTransaction.getCallId(), + stream, ssrcInResponse, mediaServerItem.getId(), (SIPResponse) responseEvent.getResponse(), inviteSessionType); + subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> { + logger.info("[Invite 200OK] ssrc修正后收到订阅消息: " + hookParam); + dynamicTask.stop(timeOutTaskKey); + subscribe.removeSubscribe(hookSubscribe); + // hook响应 + StreamInfo streamInfo = onPublishHandlerForPlay(mediaServerItemInUse, hookParam, device.getDeviceId(), channelId); + if (streamInfo == null){ + callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(), + InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null); + inviteStreamService.call(inviteSessionType, device.getDeviceId(), channelId, null, + InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(), + InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null); + return; + } + callback.run(InviteErrorCode.SUCCESS.getCode(), + InviteErrorCode.SUCCESS.getMsg(), streamInfo); + inviteStreamService.call(inviteSessionType, device.getDeviceId(), channelId, null, + InviteErrorCode.SUCCESS.getCode(), + InviteErrorCode.SUCCESS.getMsg(), + streamInfo); + if (inviteSessionType == InviteSessionType.PLAY) { + snapOnPlay(mediaServerItemInUse, device.getDeviceId(), channelId, stream); + } + }); + } + } + } + } + + @Override public void download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, ErrorCallback callback) { @@ -738,7 +688,17 @@ public class PlayServiceImpl implements IPlayService { null); return; } - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, null, device.isSsrcCheck(), true, 0, false, device.getStreamModeForParam()); + String stream = null; + if (newMediaServerItem.isRtpEnable()) { + String startTimeStr = startTime.replace("-", "") + .replace(":", "") + .replace(" ", ""); + String endTimeTimeStr = endTime.replace("-", "") + .replace(":", "") + .replace(" ", ""); + stream = deviceId + "_" + channelId + "_" + startTimeStr + "_" + endTimeTimeStr; + } + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, stream, null, device.isSsrcCheck(), true, 0, false, device.getStreamModeForParam()); download(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, downloadSpeed, callback); } @@ -806,79 +766,9 @@ public class PlayServiceImpl implements IPlayService { try { cmder.downloadStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime, downloadSpeed, hookEvent, errorEvent, eventResult ->{ - inviteInfo.setStatus(InviteSessionStatus.ok); - ResponseEvent responseEvent = (ResponseEvent) eventResult.event; - String contentString = new String(responseEvent.getResponse().getRawContent()); - // 获取ssrc - int ssrcIndex = contentString.indexOf("y="); - // 检查是否有y字段 - if (ssrcIndex >= 0) { - //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容 - String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); - // 查询到ssrc不一致且开启了ssrc校验则需要针对处理 - if (ssrcInfo.getSsrc().equals(ssrcInResponse)) { - if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) { - tcpActiveHandler(device, channelId, contentString, mediaServerItem, downLoadTimeOutTaskKey, ssrcInfo, callback); - } - return; - } - logger.info("[录像下载] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse); - if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) { - logger.info("[录像下载] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse); - - // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - - // 单端口模式streamId也有变化,需要重新设置监听 - if (!mediaServerItem.isRtpEnable()) { - // 添加订阅 - HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId()); - subscribe.removeSubscribe(hookSubscribe); - String stream = String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase(); - hookSubscribe.getContent().put("stream", stream); - inviteStreamService.updateInviteInfoForStream(inviteInfo, stream); - subscribe.addSubscribe(hookSubscribe, (mediaServerItemInUse, hookParam) -> { - logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + hookParam); - dynamicTask.stop(downLoadTimeOutTaskKey); - hookEvent.response(mediaServerItemInUse, hookParam); - }); - } - - // 更新ssrc - Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse); - if (!result) { - try { - logger.warn("[录像下载] 更新ssrc失败,停止录像回放 {}/{}", device.getDeviceId(), channelId); - cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null, null); - } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) { - logger.error("[命令发送失败] 停止点播, 发送BYE: {}", e.getMessage()); - } - - dynamicTask.stop(downLoadTimeOutTaskKey); - // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - - streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); - - callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(), - "下级自定义了ssrc,重新设置收流信息失败", null); - - }else { - if (ssrcInfo.getStream()!= null && !ssrcInfo.getStream().equals(inviteInfo.getStream())) { - inviteStreamService.removeInviteInfo(inviteInfo); - } - ssrcInfo.setSsrc(ssrcInResponse); - inviteInfo.setSsrcInfo(ssrcInfo); - inviteInfo.setStream(ssrcInfo.getStream()); - if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) { - tcpActiveHandler(device, channelId, contentString, mediaServerItem, downLoadTimeOutTaskKey, ssrcInfo, callback); - } - } - }else { - logger.info("[录像下载] 收到invite 200, 下级自定义了ssrc, 但是当前模式无需修正"); - } - } - inviteStreamService.updateInviteInfo(inviteInfo); + // 处理收到200ok后的TCP主动连接以及SSRC不一致的问题 + InviteOKHandler(eventResult, ssrcInfo, mediaServerItem, device, channelId, + downLoadTimeOutTaskKey, callback, inviteInfo, InviteSessionType.DOWNLOAD); }); } catch (InvalidArgumentException | SipException | ParseException e) { logger.error("[命令发送失败] 录像下载: {}", e.getMessage()); diff --git a/src/main/java/com/genersoft/iot/vmp/utils/DateUtil.java b/src/main/java/com/genersoft/iot/vmp/utils/DateUtil.java index 923f8346..23cb9dac 100644 --- a/src/main/java/com/genersoft/iot/vmp/utils/DateUtil.java +++ b/src/main/java/com/genersoft/iot/vmp/utils/DateUtil.java @@ -7,6 +7,7 @@ import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.time.format.DateTimeParseException; +import java.time.temporal.ChronoUnit; import java.time.temporal.TemporalAccessor; import java.util.Locale; @@ -106,4 +107,9 @@ public class DateUtil { LocalDateTime nowDateTime = LocalDateTime.now(); return formatterISO8601.format(nowDateTime); } + + public static long getDifferenceForNow(String keepaliveTime) { + Instant beforeInstant = Instant.from(formatter.parse(keepaliveTime)); + return ChronoUnit.MILLIS.between(beforeInstant, Instant.now()); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java index 445e42f4..3d233dcf 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java @@ -111,7 +111,7 @@ public class PlayController { wvpResult.setCode(ErrorCode.ERROR100.getCode()); wvpResult.setMsg("点播超时"); requestMessage.setData(wvpResult); - resultHolder.invokeResult(requestMessage); + resultHolder.invokeAllResult(requestMessage); inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId); storager.stopPlay(deviceId, channelId); }); @@ -166,7 +166,7 @@ public class PlayController { } if (InviteSessionStatus.ok == inviteInfo.getStatus()) { try { - logger.warn("[停止点播] {}/{}", device.getDeviceId(), channelId); + logger.info("[停止点播] {}/{}", device.getDeviceId(), channelId); cmder.streamByeCmd(device, channelId, inviteInfo.getStream(), null, null); } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) { logger.error("[命令发送失败] 停止点播, 发送BYE: {}", e.getMessage()); From d099daeb31bb8f1b2e14e0bc128f9b2f7c4fca45 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Fri, 4 Aug 2023 16:41:29 +0800 Subject: [PATCH 4/4] =?UTF-8?q?=E5=BD=95=E5=83=8F=E4=B8=8B=E8=BD=BD?= =?UTF-8?q?=E4=B8=8D=E4=BD=BF=E7=94=A8=E5=9B=BA=E5=AE=9A=E5=9C=B0=E5=9D=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../cmd/SIPRequestHeaderPlarformProvider.java | 1 + .../event/request/impl/AckRequestProcessor.java | 17 ++++++++++++----- .../iot/vmp/service/impl/PlayServiceImpl.java | 15 ++------------- 3 files changed, 15 insertions(+), 18 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java index fe130a54..9a00c179 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java @@ -166,6 +166,7 @@ public class SIPRequestHeaderPlarformProvider { public Request createMessageRequest(ParentPlatform parentPlatform, String content, SendRtpItem sendRtpItem) throws PeerUnavailableException, ParseException, InvalidArgumentException { CallIdHeader callIdHeader = SipFactory.getInstance().createHeaderFactory().createCallIdHeader(sendRtpItem.getCallId()); + callIdHeader.setCallId(sendRtpItem.getCallId()); return createMessageRequest(parentPlatform, content, sendRtpItem.getToTag(), SipUtils.getNewViaTag(), sendRtpItem.getFromTag(), callIdHeader); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index 61a60041..d189048b 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -111,9 +111,20 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In logger.warn("[收到ACK]:未找到通道({})的推流信息", channelId); return; } + // tcp主动时,此时是级联下级平台,在回复200ok时,本地已经请求zlm开启监听,跳过下面步骤 + if (sendRtpItem.isTcpActive()) { + logger.info("收到ACK,rtp/{} TCP主动方式后续处理", sendRtpItem.getStreamId()); + return; + } String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - logger.info("收到ACK,rtp/{}开始向上级推流, 目标={}:{},SSRC={}", sendRtpItem.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc()); + logger.info("收到ACK,rtp/{}开始向上级推流, 目标={}:{},SSRC={}, 协议:{}", + sendRtpItem.getStreamId(), + sendRtpItem.getIp(), + sendRtpItem.getPort(), + sendRtpItem.getSsrc(), + sendRtpItem.isTcp()?(sendRtpItem.isTcpActive()?"TCP主动":"TCP被动"):"UDP" + ); Map param = new HashMap<>(12); param.put("vhost","__defaultVhost__"); param.put("app",sendRtpItem.getApp()); @@ -130,10 +141,6 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In // 开启rtcp保活 param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0"); } - // tcp主动时,此时是级联下级平台,在回复200ok时,本地已经请求zlm开启监听,跳过下面步骤 - if (sendRtpItem.isTcpActive()) { - return; - } if (mediaInfo == null) { RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance( sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStreamId(), diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index 260b9a42..cf8bdd24 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -12,7 +12,6 @@ import com.genersoft.iot.vmp.conf.exception.ServiceException; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; -import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; @@ -39,7 +38,6 @@ import gov.nist.javax.sip.message.SIPResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; import org.springframework.util.ObjectUtils; @@ -688,17 +686,8 @@ public class PlayServiceImpl implements IPlayService { null); return; } - String stream = null; - if (newMediaServerItem.isRtpEnable()) { - String startTimeStr = startTime.replace("-", "") - .replace(":", "") - .replace(" ", ""); - String endTimeTimeStr = endTime.replace("-", "") - .replace(":", "") - .replace(" ", ""); - stream = deviceId + "_" + channelId + "_" + startTimeStr + "_" + endTimeTimeStr; - } - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, stream, null, device.isSsrcCheck(), true, 0, false, device.getStreamModeForParam()); + // 录像下载不使用固定流地址,固定流地址会导致如果开始时间与结束时间一致时文件错误的叠加在一起 + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, null, device.isSsrcCheck(), true, 0, false, device.getStreamModeForParam()); download(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, downloadSpeed, callback); }