-
Notifications
You must be signed in to change notification settings - Fork 38
Expand file tree
/
Copy pathget_js_unicom_iptv.py
More file actions
145 lines (120 loc) · 4.58 KB
/
get_js_unicom_iptv.py
File metadata and controls
145 lines (120 loc) · 4.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import os
import json
import glob
import re
import sys
import requests
from datetime import datetime
def open_latest_tag_file():
# 获取当前目录中所有json文件
tag_files = glob.glob(os.path.join('.','*.json'))
if not tag_files:
print("当前目录中没有JSON文件。")
return ""
# 找出最新的文件
latest_file = max(tag_files, key=lambda x: os.path.getmtime(x))
print (f"当前目录中已存在数据文件 {latest_file} ...")
return latest_file
def read_json_data():
# 检查命令行参数
if len(sys.argv) == 2:
filename = sys.argv[1] # 获取命令行提供的文件名
elif len(sys.argv) == 1:
filename = open_latest_tag_file() # 读取已经存在的数据文件
if (filename == ""):
print ("尝试从URL获取最新数据...")
# url = 'http://122.96.52.19:29010/tagNewestEpgList/JS_CUCC/1/100/0.json'
url = 'http://live.epg.gitv.tv/tagNewestEpgList/JS_CUCC/1/100/0.json'
try:
# 发送GET请求
response = requests.get(url)
# 检查响应状态码
if response.status_code == 200:
# 解析JSON数据
data = response.json()
else:
print(f"请求失败,状态码:{response.status_code}")
except requests.RequestException as e:
print(f"请求异常:{e}")
else:
print (f"读取文件 {filename} ...")
try:
# 尝试打开并读取JSON文件
with open(filename, 'r', encoding='utf-8') as file:
data = json.load(file)
except FileNotFoundError:
print(f"文件 {filename} 未找到。")
sys.exit(1) # 如果文件不存在,程序退出
except json.JSONDecodeError:
print(f"文件 {filename} 不是有效的JSON格式。")
sys.exit(1) # 如果文件不是有效的JSON格式,程序退出
return data
def save_m3u(data, name = "iptv_js"):
# 获取当前日期和时间
now = datetime.now()
# 将日期格式化为 "yyyy-mm-dd"
date_string = now.strftime('%Y-%m-%d')
output_latest = f"{name}-latest.m3u"
output_filename = f"history/{name}-{date_string}.m3u"
with open(output_filename, 'w', encoding='utf-8') as file:
file.write(data)
with open(output_latest, 'w', encoding='utf-8') as file:
file.write(data)
def get_group_info(chnName):
groups = {
"少儿": ["少儿","卡通","CCTV-14"],
"CCTV": ["CCTV","CGTN"],
"江苏": ["江苏", "南京"],
"卫视": ["卫视"],
"教育": ["CETV","教育"],
}
for key in groups:
if any(k in chnName for k in groups[key]):
return key
return "其他"
def get_js_unicom_source(data):
m3u_data_full = "#EXTM3U\n"
m3u_data_kid = "#EXTM3U\n"
# 从data中提取所需的信息
for item in data['data']:
tag = item['tag']
chnunCode = item['chnunCode']
chnName = item['chnName']
tvgName = re.sub(r"高清|超清|超清|-8M|-", "", chnName)
chnCode = item['chnCode']
playUrl = item['playUrl']
response = requests.get(playUrl)
# 检查响应状态码
if response.status_code == 200:
# 解析JSON数据
play_data = response.json()
playUrl_real = play_data['u']
pass
# 获取 group 信息
groupName = get_group_info(chnName)
# 打印提取的信息
print(f"处理: {tag}-{chnName}-{chnCode}")
m3u_data_full += f'#EXTINF:-1 group-title="{groupName}" tvg-name="{tvgName}",{chnName}\n'
m3u_data_full += f'{playUrl_real}\n'
# 青少年保护频道过滤
if all(k not in groupName for k in ["少儿","其他"]):
m3u_data_kid += f'#EXTINF:-1 group-title="{groupName}",{chnName}\n'
m3u_data_kid += f'{playUrl_real}\n'
# 获取custom目录下所有的文件
custom_files = glob.glob(os.path.join('custom', '*.m3u'))
custom_data = ""
# 遍历所有找到的文件
for custom_file in custom_files:
# 读取当前文件的内容
with open(custom_file, 'r', encoding='utf-8') as file:
content = file.readlines()
custom_data += "".join(content)
m3u_data_full += custom_data
m3u_data_kid += custom_data
# 保存
save_m3u(data = m3u_data_full, name="iptv_js_full")
save_m3u(data = m3u_data_kid, name = "iptv_js_kid")
print("Done.")
if __name__ == "__main__":
data = read_json_data()
get_js_unicom_source(data)