diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/play/PlayService.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/play/PlayService.java new file mode 100644 index 0000000..89a5523 --- /dev/null +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/play/PlayService.java @@ -0,0 +1,77 @@ +package cn.skcks.docking.gb28181.service.play; + +import cn.skcks.docking.gb28181.common.json.JsonResponse; +import cn.skcks.docking.gb28181.common.redis.RedisUtil; +import cn.skcks.docking.gb28181.core.sip.gb28181.cache.CacheUtil; +import cn.skcks.docking.gb28181.media.dto.rtp.GetRtpInfoResp; +import cn.skcks.docking.gb28181.media.dto.rtp.OpenRtpServer; +import cn.skcks.docking.gb28181.media.dto.rtp.OpenRtpServerResp; +import cn.skcks.docking.gb28181.media.dto.status.ResponseStatus; +import cn.skcks.docking.gb28181.media.proxy.ZlmMediaService; +import cn.skcks.docking.gb28181.orm.mybatis.dynamic.model.DockingDevice; +import cn.skcks.docking.gb28181.service.docking.device.DockingDeviceService; +import cn.skcks.docking.gb28181.service.ssrc.SsrcService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import org.springframework.web.context.request.async.DeferredResult; + +import javax.sip.ListeningPoint; +import java.text.MessageFormat; +import java.util.concurrent.TimeUnit; + +@Slf4j +@Service +@RequiredArgsConstructor +public class PlayService { + private static final String PREFIX = "RealTimePlay"; + private final DockingDeviceService deviceService; + private final ZlmMediaService zlmMediaService; + private final SsrcService ssrcService; + + /** + * + * @param deviceId 设备id + * @param channelId 通道id + */ + public DeferredResult> realTimePlay(String deviceId, String channelId, long timeout){ + DeferredResult> result = new DeferredResult<>(TimeUnit.SECONDS.toMillis(timeout)); + DockingDevice device = deviceService.getDevice(deviceId); + if (device == null) { + log.info("未能找到 编码为 => {} 的设备", deviceId); + result.setResult(JsonResponse.error(null, "未找到设备")); + return result; + } + + String streamId = CacheUtil.getKey(deviceId,channelId); + String key = CacheUtil.getKey(PREFIX, streamId); + if(RedisUtil.KeyOps.hasKey(key)){ + String url = RedisUtil.StringOps.get(key); + result.setResult(JsonResponse.success(url)); + return result; + } + + GetRtpInfoResp rtpInfo = zlmMediaService.getRtpInfo(streamId); + if(rtpInfo.getExist()){ + result.setResult(JsonResponse.error(MessageFormat.format("实时流 {0} 已存在", streamId))); + return result; + } + + int streamMode = device.getStreamMode() == null || device.getStreamMode().equalsIgnoreCase(ListeningPoint.UDP) ? 0 : 1; + OpenRtpServer openRtpServer = new OpenRtpServer(); + openRtpServer.setPort(0); + openRtpServer.setStreamId(streamId); + openRtpServer.setTcpMode(streamMode); + OpenRtpServerResp openRtpServerResp = zlmMediaService.openRtpServer(openRtpServer); + if(!openRtpServerResp.getCode().equals(ResponseStatus.Success)){ + result.setResult(JsonResponse.error(openRtpServerResp.getCode().getMsg())); + return result; + } + + return result; +// zlmMediaService.getRtpInfo(); +// GetMediaList getMediaList = new GetMediaList(); +// getMediaList.set +// zlmMediaService.getMediaList() + } +} diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/ssrc/SsrcService.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/ssrc/SsrcService.java new file mode 100644 index 0000000..d726bde --- /dev/null +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/ssrc/SsrcService.java @@ -0,0 +1,98 @@ +package cn.skcks.docking.gb28181.service.ssrc; + +import cn.skcks.docking.gb28181.common.redis.RedisUtil; +import cn.skcks.docking.gb28181.config.sip.SipConfig; +import cn.skcks.docking.gb28181.core.sip.gb28181.cache.CacheUtil; +import cn.skcks.docking.gb28181.media.config.ZlmMediaConfig; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import java.util.ArrayList; +import java.util.List; + +@Slf4j +@Service +@RequiredArgsConstructor +public class SsrcService { + private static final String PREFIX = "SSRC"; + private static final int MAX_STREAM_COUNT = 10000; + private final SipConfig sipConfig; + private final ZlmMediaConfig mediaConfig; + + private String ssrcKey; + + @PostConstruct + private void init(){ + String ssrcPrefix = sipConfig.getDomain().substring(3, 8); + ssrcKey = CacheUtil.getKey(PREFIX,sipConfig.getId(),mediaConfig.getId()); + List ssrcList = new ArrayList<>(MAX_STREAM_COUNT); + for (int i = 1; i < MAX_STREAM_COUNT; i++) { + String ssrc = String.format("%s%04d", ssrcPrefix, i); + ssrcList.add(ssrc); + } + + RedisUtil.KeyOps.delete(ssrcKey); + RedisUtil.SetOps.sAdd(ssrcKey, ssrcList.toArray(String[]::new)); + } + + @PreDestroy + private void destroy(){ + if(ssrcKey != null){ + RedisUtil.KeyOps.delete(ssrcKey); + } + } + + private String getSN() { + String sn; + long size = RedisUtil.SetOps.sSize(ssrcKey); + if (size == 0) { + throw new RuntimeException("ssrc已经用完"); + } else { + // 在集合中移除并返回一个随机成员。 + sn = RedisUtil.SetOps.sPop(ssrcKey); + RedisUtil.SetOps.sRemove(ssrcKey,sn); + } + return sn; + } + + /** + * 重置一个流媒体服务的所有ssrc + * + * @param mediaServerId 流媒体服务ID + */ + public void reset(String mediaServerId) { + init(); + } + + /** + * 获取视频预览的SSRC值,第一位固定为0 + * + * @return ssrc + */ + public String getPlaySsrc() { + return "0" + getSN(); + } + + /** + * 获取录像回放的SSRC值,第一位固定为1 + */ + public String getPlayBackSsrc(String mediaServerI) { + return "1" + getSN(); + } + + /** + * 释放ssrc,主要用完的ssrc一定要释放,否则会耗尽 + * + * @param ssrc 需要重置的ssrc + */ + public void releaseSsrc(String mediaServerId, String ssrc) { + if (ssrc == null) { + return; + } + String sn = ssrc.substring(1); + RedisUtil.SetOps.sAdd(ssrcKey, sn); + } +}