diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/ack/request/AckRequestProcessor.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/ack/request/AckRequestProcessor.java new file mode 100644 index 0000000..88c4f29 --- /dev/null +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/ack/request/AckRequestProcessor.java @@ -0,0 +1,40 @@ +package cn.skcks.docking.gb28181.mocking.core.sip.message.processor.ack.request; + +import cn.skcks.docking.gb28181.core.sip.listener.SipListener; +import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor; +import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; +import cn.skcks.docking.gb28181.mocking.core.sip.message.subscribe.SipSubscribe; +import gov.nist.javax.sip.message.SIPRequest; +import jakarta.annotation.PostConstruct; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import javax.sip.RequestEvent; +import javax.sip.message.Request; +import java.util.EventObject; +import java.util.Optional; + +@Slf4j +@RequiredArgsConstructor +@Component +public class AckRequestProcessor implements MessageProcessor { + private final SipListener sipListener; + private final SipSubscribe subscribe; + + @PostConstruct + @Override + public void init() { + sipListener.addRequestProcessor(Request.ACK, this); + } + + @Override + public void process(EventObject eventObject) { + RequestEvent requestEvent = (RequestEvent) eventObject; + SIPRequest request = (SIPRequest) requestEvent.getRequest(); + String callId = request.getCallId().getCallId(); + String key = GenericSubscribe.Helper.getKey(Request.ACK, callId); + Optional.ofNullable(subscribe.getAckSubscribe().getPublisher(key)) + .ifPresent(publisher -> publisher.submit(request)); + } +} diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/invite/request/InviteRequestProcessor.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/invite/request/InviteRequestProcessor.java index 8847d25..0dc891a 100644 --- a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/invite/request/InviteRequestProcessor.java +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/invite/request/InviteRequestProcessor.java @@ -2,15 +2,22 @@ package cn.skcks.docking.gb28181.mocking.core.sip.message.processor.invite.reque import cn.hutool.core.date.DateUtil; import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.GB28181Description; +import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.MediaSdpHelper; import cn.skcks.docking.gb28181.core.sip.listener.SipListener; import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor; +import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; import cn.skcks.docking.gb28181.mocking.core.sip.gb28181.sdp.GB28181DescriptionParser; +import cn.skcks.docking.gb28181.mocking.core.sip.message.subscribe.SipSubscribe; import cn.skcks.docking.gb28181.mocking.core.sip.response.SipResponseBuilder; import cn.skcks.docking.gb28181.mocking.core.sip.sender.SipSender; import cn.skcks.docking.gb28181.mocking.orm.mybatis.dynamic.model.MockingDevice; import cn.skcks.docking.gb28181.mocking.service.device.DeviceProxyService; import cn.skcks.docking.gb28181.mocking.service.device.DeviceService; +import gov.nist.core.Separators; +import gov.nist.javax.sdp.SessionDescriptionImpl; import gov.nist.javax.sdp.TimeDescriptionImpl; +import gov.nist.javax.sdp.fields.AttributeField; +import gov.nist.javax.sdp.fields.ConnectionField; import gov.nist.javax.sdp.fields.TimeField; import gov.nist.javax.sip.message.SIPRequest; import jakarta.annotation.PostConstruct; @@ -20,16 +27,15 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Component; -import javax.sdp.Media; -import javax.sdp.MediaDescription; -import javax.sdp.SdpParseException; -import javax.sdp.SessionName; +import javax.sdp.*; import javax.sip.RequestEvent; import javax.sip.message.Request; import javax.sip.message.Response; +import java.util.Arrays; import java.util.Date; import java.util.EventObject; import java.util.Vector; +import java.util.concurrent.*; @Slf4j @RequiredArgsConstructor @@ -43,6 +49,8 @@ public class InviteRequestProcessor implements MessageProcessor { private final DeviceService deviceService; + private final SipSubscribe subscribe; + @PostConstruct @Override public void init() { @@ -87,9 +95,9 @@ public class InviteRequestProcessor implements MessageProcessor { if (StringUtils.equalsAnyIgnoreCase(type, "Play", "PlayBack")) { log.info("点播/回放请求"); if (StringUtils.equalsIgnoreCase(type, "Play")) { - play(device, gb28181Description, (MediaDescription) item); + play(request, device, gb28181Description, (MediaDescription) item); } else { - playback(device, gb28181Description, (MediaDescription) item); + playback(request, device, gb28181Description, (MediaDescription) item); } } else if (StringUtils.equalsIgnoreCase(type, "Download")) { log.info("下载请求"); @@ -123,11 +131,11 @@ public class InviteRequestProcessor implements MessageProcessor { * @param mediaDescription 媒体描述符 */ @SneakyThrows - private void play(MockingDevice device, GB28181Description gb28181Description, MediaDescription mediaDescription) { + private void play(SIPRequest request, MockingDevice device, GB28181Description gb28181Description, MediaDescription mediaDescription) { TimeField time = new TimeField(); time.setStart(DateUtil.offsetMinute(DateUtil.date(), -15)); time.setStop(DateUtil.date()); - playback(device, gb28181Description, mediaDescription, time); + playback(request, device, gb28181Description, mediaDescription, time); } /** @@ -137,14 +145,14 @@ public class InviteRequestProcessor implements MessageProcessor { * @param mediaDescription 媒体描述符 */ @SneakyThrows - private void playback(MockingDevice device, GB28181Description gb28181Description, MediaDescription mediaDescription) { + private void playback(SIPRequest request, MockingDevice device, GB28181Description gb28181Description, MediaDescription mediaDescription) { TimeDescriptionImpl timeDescription = (TimeDescriptionImpl) gb28181Description.getTimeDescriptions(true).get(0); TimeField time = (TimeField) timeDescription.getTime(); - playback(device, gb28181Description, mediaDescription, time); + playback(request, device, gb28181Description, mediaDescription, time); } @SneakyThrows - private void playback(MockingDevice device, GB28181Description gb28181Description, MediaDescription mediaDescription, TimeField time) { + private void playback(SIPRequest request, MockingDevice device, GB28181Description gb28181Description, MediaDescription mediaDescription, TimeField time) { Date start = new Date(time.getStartTime() * 1000); Date stop = new Date(time.getStopTime() * 1000); log.info("{} ~ {}", start, stop); @@ -156,7 +164,68 @@ public class InviteRequestProcessor implements MessageProcessor { int port = media.getMediaPort(); log.info("目标端口号: {}", port); - deviceProxyService.proxyVideo2Rtp(device, start, stop, address, port); - // TODO 推流 && 关流事件订阅 + String senderIp = request.getLocalAddress().getHostAddress(); + SdpFactory sdpFactory = SdpFactory.getInstance(); + SessionDescriptionImpl sessionDescription = new SessionDescriptionImpl(); + sessionDescription.setVersion(sdpFactory.createVersion(0)); + // 目前只配置 ipv4 + sessionDescription.setOrigin(sdpFactory.createOrigin(channelId, 0, 0, ConnectionField.IN, Connection.IP4, senderIp)); + sessionDescription.setSessionName(gb28181Description.getSessionName()); + sessionDescription.setConnection(sdpFactory.createConnection(ConnectionField.IN, Connection.IP4, senderIp)); + TimeField respTime = new TimeField(); + respTime.setZero(); + TimeDescription timeDescription = SdpFactory.getInstance().createTimeDescription(respTime); + sessionDescription.setTimeDescriptions(new Vector<>() {{ + add(timeDescription); + }}); + String[] mediaTypeCodes = new String[]{"98","96"}; + MediaDescription respMediaDescription = SdpFactory.getInstance().createMediaDescription("video", port, 0, SdpConstants.RTP_AVP, mediaTypeCodes); + Arrays.stream(mediaTypeCodes).forEach((k)->{ + String v = MediaSdpHelper.RTPMAP.get(k); + mediaDescription.addAttribute((AttributeField) SdpFactory.getInstance().createAttribute(SdpConstants.RTPMAP, StringUtils.joinWith(Separators.SP,k,v))); + }); + respMediaDescription.addAttribute((AttributeField) SdpFactory.getInstance().createAttribute("sendonly", null)); + GB28181Description description = GB28181Description.Convertor.convert(sessionDescription); + description.setSsrcField(gb28181Description.getSsrcField()); + + String transport = request.getTopmostViaHeader().getTransport(); + + String callId = request.getCallId().getCallId(); + String key = GenericSubscribe.Helper.getKey(Request.ACK, callId); + subscribe.getAckSubscribe().addPublisher(key); + ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + final ScheduledFuture[] schedule = new ScheduledFuture[1]; + Flow.Subscriber subscriber = new Flow.Subscriber<>() { + @Override + public void onSubscribe(Flow.Subscription subscription) { + log.info("创建 ack 订阅 {}", key); + subscription.request(1); + } + + @Override + public void onNext(SIPRequest item) { + log.info("收到 ack 确认请求: {} 开始推流",key); + // RTP 推流 + deviceProxyService.proxyVideo2Rtp(device, start, stop, address, port); + onComplete(); + } + + @Override + public void onError(Throwable throwable) { + + } + + @Override + public void onComplete() { + subscribe.getAckSubscribe().delPublisher(key); + schedule[0].cancel(true); + } + }; + // 60秒超时计时器 + schedule[0] = scheduledExecutorService.schedule(subscriber::onComplete, 60 , TimeUnit.SECONDS); + // 推流 ack 事件订阅 + subscribe.getAckSubscribe().addSubscribe(key, subscriber); + // 发送 sdp 响应 + sender.sendResponse(senderIp, transport, (ignore, ignore2, ignore3) -> SipResponseBuilder.responseSdp(request, description)); } } diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/subscribe/AckSubscribe.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/subscribe/AckSubscribe.java new file mode 100644 index 0000000..ec9eda8 --- /dev/null +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/subscribe/AckSubscribe.java @@ -0,0 +1,39 @@ +package cn.skcks.docking.gb28181.mocking.core.sip.message.subscribe; + +import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; +import gov.nist.javax.sip.message.SIPRequest; +import lombok.RequiredArgsConstructor; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.Flow; +import java.util.concurrent.SubmissionPublisher; + +@RequiredArgsConstructor +public class AckSubscribe implements GenericSubscribe { + private final Executor executor; + + private static final Map> publishers = new ConcurrentHashMap<>(); + + public void close() { + Helper.close(publishers); + } + + public void addPublisher(String key) { + Helper.addPublisher(executor, publishers, key); + } + + public SubmissionPublisher getPublisher(String key) { + return Helper.getPublisher(publishers, key); + } + + public void addSubscribe(String key, Flow.Subscriber subscribe) { + Helper.addSubscribe(publishers, key, subscribe); + } + + @Override + public void delPublisher(String key) { + Helper.delPublisher(publishers, key); + } +} diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/subscribe/SipSubscribe.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/subscribe/SipSubscribe.java index b0f66da..548efb0 100644 --- a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/subscribe/SipSubscribe.java +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/subscribe/SipSubscribe.java @@ -2,6 +2,7 @@ package cn.skcks.docking.gb28181.mocking.core.sip.message.subscribe; import cn.skcks.docking.gb28181.core.sip.executor.DefaultSipExecutor; import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; +import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPResponse; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; @@ -21,14 +22,17 @@ public class SipSubscribe { @Qualifier(DefaultSipExecutor.EXECUTOR_BEAN_NAME) private final Executor executor; private GenericSubscribe registerSubscribe; + private GenericSubscribe ackSubscribe; @PostConstruct private void init() { registerSubscribe = new RegisterSubscribe(executor); + ackSubscribe = new AckSubscribe(executor); } @PreDestroy private void destroy() { registerSubscribe.close(); + ackSubscribe.close(); } } diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/response/SipResponseBuilder.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/response/SipResponseBuilder.java index f10a06d..a7d27aa 100644 --- a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/response/SipResponseBuilder.java +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/response/SipResponseBuilder.java @@ -1,6 +1,7 @@ package cn.skcks.docking.gb28181.mocking.core.sip.response; import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant; +import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.GB28181Description; import cn.skcks.docking.gb28181.core.sip.message.MessageHelper; import cn.skcks.docking.gb28181.core.sip.utils.SipUtil; import gov.nist.javax.sip.message.MessageFactoryImpl; @@ -8,7 +9,12 @@ import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPResponse; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import javax.sip.SipFactory; +import javax.sip.address.Address; +import javax.sip.address.SipURI; +import javax.sip.header.ContentTypeHeader; import javax.sip.message.Response; @Slf4j @@ -28,4 +34,20 @@ public class SipResponseBuilder { } return response; } + + @SneakyThrows + public static Response responseSdp(SIPRequest request, GB28181Description sdp) { + MessageFactoryImpl messageFactory = (MessageFactoryImpl)MessageHelper.getSipFactory().createMessageFactory(); + // 使用 GB28181 默认编码 否则中文将会乱码 + messageFactory.setDefaultContentEncodingCharset(GB28181Constant.CHARSET); + SIPResponse response = (SIPResponse)messageFactory.createResponse(Response.OK, request); + SipFactory sipFactory = SipFactory.getInstance(); + ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("APPLICATION", "SDP"); + response.setContent(sdp.toString(), contentTypeHeader); + SipURI sipURI = (SipURI) request.getRequestURI(); + SipURI uri = MessageHelper.createSipURI(sipURI.getUser(), StringUtils.joinWith(":", sipURI.getHost() + ":" + sipURI.getPort())); + Address concatAddress = sipFactory.createAddressFactory().createAddress(uri); + response.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress)); + return response; + } } diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java index b00451f..5ed7b11 100644 --- a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java @@ -44,7 +44,6 @@ public class DeviceProxyService { String query = URLUtil.buildQuery(map, StandardCharsets.UTF_8); fromUrl = StringUtils.joinWith("?", fromUrl, query); log.info("设备: {} 视频 url: {}", deviceCode, fromUrl); - rtpAddr = "192.168.1.241"; String toUrl = StringUtils.joinWith("", "rtp://", rtpAddr, ":", rtpPort); long time = DateUtil.between(startTime, endTime, DateUnit.SECOND); pushRtp(fromUrl, toUrl, time); @@ -56,6 +55,7 @@ public class DeviceProxyService { // FFmpeg 调试日志 // FFmpegLogCallback.set(); FFmpegFrameGrabber grabber = new FFmpegFrameGrabber(fromUrl); + grabber.setFrameRate(30); grabber.start(); FFmpegFrameRecorder recorder = new FFmpegFrameRecorder(toUrl, grabber.getImageWidth(), grabber.getImageHeight(), grabber.getAudioChannels()); diff --git a/gb28181-mocking-starter/src/main/resources/application.yml b/gb28181-mocking-starter/src/main/resources/application.yml index cf10622..7ccb140 100644 --- a/gb28181-mocking-starter/src/main/resources/application.yml +++ b/gb28181-mocking-starter/src/main/resources/application.yml @@ -53,8 +53,8 @@ gb28181: expire: 3600 transport: "UDP" server: -# ip: 192.168.10.32 - ip: 192.168.3.12 + ip: 192.168.10.32 +# ip: 192.168.3.12 port: 5060 password: 123456 domain: 4405010000