From c9af1a4dc38083df1cb5a68cf85cfdd890fad11a Mon Sep 17 00:00:00 2001 From: shikong <919411476@qq.com> Date: Fri, 15 Mar 2024 10:13:31 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E6=B5=8B=E8=AF=95=E8=84=9A?= =?UTF-8?q?=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- test/main.py | 95 ++++++++++++++++++++++++++++++++++++++++++++++++ test/push_zlm.py | 84 ++++++++++++++++++++++++++++++++++++++++++ test/video.py | 44 ++++++++++++++++++++++ 3 files changed, 223 insertions(+) create mode 100644 test/main.py create mode 100644 test/push_zlm.py create mode 100644 test/video.py diff --git a/test/main.py b/test/main.py new file mode 100644 index 0000000..6cbba1a --- /dev/null +++ b/test/main.py @@ -0,0 +1,95 @@ +from concurrent.futures import ThreadPoolExecutor +import os +import os.path as path +import sys +import datetime +import urllib +import urllib.parse +import urllib.request + +work_path = sys.path[0] +# 设备列表 +devices_file = "devices.txt" +devices_file_path = path.join(work_path, devices_file) +# 下载目录 +tmp_dir = "download" +tmp_path = path.join(work_path, tmp_dir) + +server = "http://127.0.0.1:18183/video" + + +# server = "http://httpbin.org/anything/video" + +def check_or_mk_tmp_dir(): + if not path.exists(tmp_path): + os.mkdir(tmp_path) + + +def check_or_create_devices_file(): + if not path.exists(devices_file_path): + with open(devices_file_path, mode="w", encoding="utf8") as f: + f.write("# 设备编码 一行一个\n") + + +def read_devices_file(): + check_or_create_devices_file() + devices = [] + with open(devices_file_path, mode="r", encoding="utf8") as f: + while True: + line = f.readline() + if not line: + break + line = line.strip() + if line.startswith("#") or len(line) == 0: + continue + else: + devices.append(line) + + print(f"读取设备数量: {len(devices)}") + return devices + + +def tasks(device: str, start_time: str, end_time: str): + params = { + "start_time": start_time, + "end_time": end_time, + "device_id": device, + "useDownload": True, + } + url_params = urllib.parse.urlencode(params) + url = urllib.parse.urljoin(server, f"?{url_params}") + start = datetime.datetime.now() + start_str = start.strftime("%Y-%m-%d %H:%M:%S.%f") + print(f"{start_str} 开始下载: {url}") + urllib.request.urlretrieve(url, + f"{tmp_path}/{device}_{start_time}_{end_time}.mp4") + end = datetime.datetime.now() + print( + f"{device} {start_time}-{end_time}: 下载用时: {(end - start).total_seconds()} 秒") + + +if __name__ == '__main__': + check_or_mk_tmp_dir() + check_or_create_devices_file() + + print(work_path) + # workers = os.cpu_count() + workers = 32 + + print(f"最大并发数: {workers}") + + with ThreadPoolExecutor(max_workers=workers) as worker: + devices = read_devices_file() + + # day = datetime.datetime.today() + day = datetime.date(year=2024, month=3, day=11) + # 开始时间 + start = datetime.time(hour=8, minute=11, second=15) + # 结束时间 + end = datetime.time(hour=8, minute=11, second=30) + + start_time = datetime.datetime.combine(day, start).strftime( + "%Y%m%d%H%M%S") + end_time = datetime.datetime.combine(day, end).strftime("%Y%m%d%H%M%S") + for device in devices: + worker.submit(tasks, device, start_time, end_time) diff --git a/test/push_zlm.py b/test/push_zlm.py new file mode 100644 index 0000000..e0ada63 --- /dev/null +++ b/test/push_zlm.py @@ -0,0 +1,84 @@ +import os +import os.path as path +import select +import sys +import urllib.parse +from concurrent.futures import ThreadPoolExecutor +import subprocess +import platform + +work_path = sys.path[0] + +# 下载目录 +tmp_dir = "download" +tmp_path = path.join(work_path, tmp_dir) + +zlm_server = "192.168.3.12" +zlm_rtmp_port = 1936 +zlm_auth_params = { + "sign": "41db35390ddad33f83944f44b8b75ded" +} + +ffmpeg_path = "ffmpeg" +ffmpeg_read_rate = 1 + +ffplay_path = "ffplay" +enable_preview = True + +workers = os.cpu_count() + + +def get_rtmp_url(app: str, stream_id: str): + params = urllib.parse.urlencode(zlm_auth_params) + return "rtmp://{}:{}/{}/{}?{}".format(zlm_server, zlm_rtmp_port, app, + stream_id, params) + + +def check_or_mk_tmp_dir(): + if not path.exists(tmp_path): + os.mkdir(tmp_path) + + +def push_stream(file: str): + stream_id = path.basename(file) + target = get_rtmp_url("ffmpeg", stream_id) + print("开始 ffmpeg 推流 {} => {}", file, target) + cmd = "{} -loglevel error -stream_loop -1 -readrate {} -i {} -t 60 -c copy -f flv {}".format( + ffmpeg_path, ffmpeg_read_rate, file, target) + print(cmd) + proc = subprocess.Popen( + cmd, + stdin=None, + stdout=None, + shell=True + ) + + if enable_preview and len( + ffplay_path) > 0 and platform.system() == "Windows": + subprocess.Popen( + "ffplay {} -x 400 -y 300 -autoexit".format(target), + stdin=subprocess.DEVNULL, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + shell=True + ) + + proc.communicate() + print("ffmpeg 结束 {} =/= {}", file, target) + + +if __name__ == '__main__': + if len(sys.argv) > 1: + try: + workers = int(sys.argv[1]) + except: + print("参数解析错误, 将使用默认线程数 {} ".format(workers)) + + print("最大并发数: {}".format(workers)) + with ThreadPoolExecutor(max_workers=workers) as worker: + for item in os.listdir(tmp_path): + p = path.join(tmp_path, item) + if not path.isfile(p) or not p.endswith(".mp4"): + continue + else: + worker.submit(push_stream, p) diff --git a/test/video.py b/test/video.py new file mode 100644 index 0000000..2c15b5b --- /dev/null +++ b/test/video.py @@ -0,0 +1,44 @@ +import os.path as path +import sys +import shutil + +work_path = sys.path[0] +# 设备列表 +devices_file = "devices.txt" +devices_file_path = path.join(work_path, devices_file) +# 源文件 +source_video_file = "20240311081115_20240311081130.mp4" + + +def check_or_create_devices_file(): + if not path.exists(devices_file_path): + with open(devices_file_path, mode="w", encoding="utf8") as f: + f.write("# 设备编码 一行一个\n") + + +def read_devices_file(): + check_or_create_devices_file() + devices = [] + with open(devices_file_path, mode="r", encoding="utf8") as f: + while True: + line = f.readline() + if not line: + break + line = line.strip() + if line.startswith("#") or len(line) == 0: + continue + else: + devices.append(line) + + print("读取设备数量: {}".format(len(devices))) + return devices + + +if __name__ == '__main__': + check_or_create_devices_file() + devices = read_devices_file() + + src = path.join(work_path, source_video_file) + for device in devices: + dst = path.join(work_path, "{}_{}".format(device, source_video_file)) + shutil.copyfile(src, dst)