Skip to content

Commit b159bc2

Browse files
author
Anders Jensen
committed
Merge branch 'release/0.12.0'
2 parents 6846f4f + e8f24b3 commit b159bc2

File tree

6 files changed

+127
-34
lines changed

6 files changed

+127
-34
lines changed

README.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# Streaming Plugin
22
https://github.com/JohnDoee/deluge-streaming
33

4-
(c)2019 by Anders Jensen <johndoee@tidalstream.org>
4+
(c)2020 by Anders Jensen <johndoee@tridentstream.org>
55

66
## Description
77

@@ -107,6 +107,11 @@ List of URL GET Arguments
107107

108108
# Version Info
109109

110+
## Version 0.12.0
111+
112+
* Moved to reading pieces through Deluge to avoid unflushed data
113+
* Fixed Deluge 2 / libtorrent related bug
114+
110115
## Version 0.11.0
111116
* Initial support for Deluge 2 / Python 3
112117
* Added support for aggressive piece prioritization when it should not be necessary.

setup.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@
4141

4242
__plugin_name__ = "Streaming"
4343
__author__ = "Anders Jensen"
44-
__author_email__ = "johndoee@tidalstream.org"
45-
__version__ = "0.11.0"
44+
__author_email__ = "johndoee@tridentstream.org"
45+
__version__ = "0.12.0"
4646
__url__ = "https://github.com/JohnDoee/deluge-streaming"
4747
__license__ = "GPLv3"
4848
__description__ = "Enables streaming of files while downloading them."

streaming/core.py

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@
6565
from .resource import Resource
6666
from .torrentfile import DelugeTorrentInput
6767

68-
defer.setDebugging(True)
6968
router.register_handler(DelugeTorrentInput.plugin_name, DelugeTorrentInput, True, False, False)
7069

7170
VIDEO_STREAMABLE_EXTENSIONS = ['mkv', 'mp4', 'iso', 'ogg', 'ogm', 'm4v']
@@ -119,7 +118,7 @@ def get_file_priorities(self):
119118
# Ensure file_priorities option is populated.
120119
self.set_file_priorities([])
121120

122-
return self.options["file_priorities"]
121+
return list(self.options["file_priorities"])
123122

124123
torrent = component.get("TorrentManager").torrents.get(infohash, None)
125124
if torrent and not hasattr(torrent, 'get_file_priorities'):
@@ -200,7 +199,7 @@ def can_read(self, from_byte):
200199
self.torrent.handle.set_piece_deadline(needed_piece, 0)
201200
self.torrent.handle.piece_priority(needed_piece, MAX_PIECE_PRIORITY)
202201

203-
file_priorities = self.torrent.get_file_priorities()
202+
file_priorities = list(self.torrent.get_file_priorities())
204203
if file_priorities[f['index']] != MAX_FILE_PRIORITY:
205204
logger.debug('Also setting file to max %r' % (f, ))
206205
file_priorities[f['index']] = MAX_FILE_PRIORITY
@@ -224,7 +223,8 @@ def can_read(self, from_byte):
224223
logger.debug('Calling read again to get the real number')
225224
return self.can_read(from_byte)
226225
else:
227-
return ((last_available_piece - needed_piece) * self.piece_length) + self.piece_length - rest
226+
logger.debug('Really last available piece is %s' % (last_available_piece, ))
227+
return ((last_available_piece - needed_piece) * self.piece_length) + self.piece_length - rest, last_available_piece
228228

229229
def is_idle(self):
230230
return not self.readers and self.last_activity + TORRENT_CLEANUP_INTERVAL < datetime.now()
@@ -275,7 +275,7 @@ def _cycle(self):
275275
logger.debug('We had a fileset not started, must_whitelist:%r first_files:%r cannot_blacklist:%r' % (must_whitelist, first_files, cannot_blacklist))
276276
status = self.torrent.get_status(['files', 'file_progress'])
277277

278-
file_priorities = self.torrent.get_file_priorities()
278+
file_priorities = list(self.torrent.get_file_priorities())
279279
for f, progress in zip(status['files'], status['file_progress']):
280280
i = f['index']
281281
if progress == 1.0:
@@ -314,7 +314,7 @@ def _cycle(self):
314314
else:
315315
fileset_ranges[fileset_hash] = fileset['files'].index(path)
316316

317-
file_priorities = self.torrent.get_file_priorities()
317+
file_priorities = list(self.torrent.get_file_priorities())
318318
logger.debug('Fileset heads: %r' % (fileset_ranges, ))
319319
for fileset_hash, first_file in fileset_ranges.items():
320320
fileset = self.filesets[fileset_hash]
@@ -387,6 +387,14 @@ def add_fileset(self, fileset):
387387
if fileset_hash not in self.filesets:
388388
self.filesets[fileset_hash] = {'started': False, 'files': files}
389389

390+
def request_piece(self, piece):
391+
self.torrent.handle.read_piece(piece)
392+
393+
def new_piece_available(self, piece, data):
394+
logger.debug("New pice available: %s" % (piece, ))
395+
for reader in self.readers.keys():
396+
reader.new_piece_available(piece, data)
397+
390398

391399
class TorrentHandler(object):
392400
def __init__(self, reset_priorities_on_finish, aggressive_prioritizing=False):
@@ -397,6 +405,7 @@ def __init__(self, reset_priorities_on_finish, aggressive_prioritizing=False):
397405
self.alerts = component.get("AlertManager")
398406
self.alerts.register_handler("torrent_removed_alert", self.on_alert_torrent_removed)
399407
self.alerts.register_handler("torrent_finished_alert", self.on_alert_torrent_finished)
408+
self.alerts.register_handler("read_piece_alert", self.on_alert_read_piece)
400409

401410
self.cleanup_looping_call = task.LoopingCall(self.cleanup)
402411
self.cleanup_looping_call.start(60)
@@ -427,6 +436,18 @@ def on_alert_torrent_finished(self, alert):
427436
if self.reset_priorities_on_finish:
428437
self.torrents[infohash].reset_priorities()
429438

439+
def on_alert_read_piece(self, alert):
440+
try:
441+
infohash = str(alert.handle.info_hash())
442+
except (RuntimeError, KeyError):
443+
logger.warning('Failed to handle on read piece alert')
444+
return
445+
446+
if infohash not in self.torrents:
447+
return
448+
449+
self.torrents[infohash].new_piece_available(alert.piece, alert.buffer)
450+
430451
def shutdown(self):
431452
for torrent in self.torrents.values():
432453
if self.reset_priorities_on_finish:

streaming/data/config.ui

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -228,20 +228,18 @@
228228
</packing>
229229
</child>
230230
<!-- <child>
231-
<widget class="GtkRadioButton" id="input_serve_webui">
231+
<object class="GtkRadioButton" id="input_serve_webui">
232232
<property name="label" translatable="yes">Serve files via WebUI</property>
233-
<property name="visible">False</property>
234-
<property name="sensitive">False</property>
233+
<property name="visible">True</property>
235234
<property name="can_focus">True</property>
236235
<property name="receives_default">False</property>
237236
<property name="use_action_appearance">False</property>
238-
<property name="active">True</property>
239237
<property name="draw_indicator">True</property>
240-
</widget>
238+
</object>
241239
<packing>
242240
<property name="expand">True</property>
243241
<property name="fill">True</property>
244-
<property name="position">2</property>
242+
<property name="position">4</property>
245243
</packing>
246244
</child> -->
247245
<child>
@@ -250,17 +248,15 @@
250248
<property name="can_focus">False</property>
251249
<property name="spacing">5</property>
252250
<!-- <child>
253-
<widget class="GtkRadioButton" id="input_serve_standalone">
251+
<object class="GtkRadioButton" id="input_serve_standalone">
254252
<property name="label" translatable="yes">Serve files via standalone</property>
255-
<property name="visible">False</property>
256-
<property name="sensitive">False</property>
253+
<property name="visible">True</property>
257254
<property name="can_focus">True</property>
258255
<property name="receives_default">False</property>
259256
<property name="use_action_appearance">False</property>
260-
<property name="active">True</property>
261257
<property name="draw_indicator">True</property>
262258
<property name="group">input_serve_webui</property>
263-
</widget>
259+
</object>
264260
<packing>
265261
<property name="expand">True</property>
266262
<property name="fill">True</property>

streaming/resource.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ def render(self, request): # Authorization: Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ==
3232

3333
if not authenticated:
3434
request.setResponseCode(401)
35-
return 'Unauthorized'
35+
return b'Unauthorized'
3636

3737
m = getattr(self, 'render_' + request.method.decode('utf-8'), None)
3838
if not m:

streaming/torrentfile.py

Lines changed: 84 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,27 @@
11
import logging
22
import mimetypes
33
import os
4+
import time
5+
import threading
6+
7+
from io import BytesIO
48

59
from thomas import InputBase
610

711
logger = logging.getLogger(__name__)
812

13+
PIECE_REQUEST_HISTORY_TIME = 10
14+
MAX_PIECE_REQUEST_COUNT = 20
915

10-
class DelugeTorrentInput(InputBase.find_plugin('file')):
16+
class DelugeTorrentInput(InputBase):
1117
plugin_name = 'torrent_file'
1218
protocols = []
1319

20+
current_piece_data = None
1421
can_read_to = None
22+
last_available_piece = None
23+
_pos = None
24+
_closed = False
1525

1626
def __init__(self, item, torrent_handler, infohash, offset, path):
1727
self.item = item
@@ -20,6 +30,9 @@ def __init__(self, item, torrent_handler, infohash, offset, path):
2030
self.infohash = infohash
2131
self.offset = offset
2232
self.path = path
33+
self.piece_buffer = {}
34+
self.requested_pieces = {}
35+
self.piece_request_queue = []
2336
self.size, self.filename, self.content_type = self.get_info()
2437

2538
def get_info(self):
@@ -33,36 +46,94 @@ def ensure_exists(self):
3346
if not os.path.exists(self.path):
3447
self.torrent.can_read(self.offset)
3548

49+
def tell(self):
50+
return self._pos
51+
3652
def seek(self, pos):
3753
self.ensure_exists()
38-
super(DelugeTorrentInput, self).seek(pos)
54+
self._pos = pos
3955
logger.debug('Seeking at %s torrentfile_id %r' % (self.tell(), id(self)))
4056
self.torrent.add_reader(self, self.item.path, self.offset + self.tell(), self.offset + self.size)
4157

58+
def _read(self, num):
59+
data = self.current_piece_data.read(num)
60+
self._pos += len(data)
61+
return data
62+
4263
def read(self, num):
64+
if self.current_piece_data:
65+
data = self._read(num)
66+
if data:
67+
return data
68+
4369
self.ensure_exists()
4470

45-
if not self._open_file:
71+
if self._pos is None:
4672
self.seek(0)
4773

4874
logger.debug('Trying to read %s from %i torrentfile_id %r' % (self.path, self.tell(), id(self)))
4975
tell = self.tell()
5076
if self.can_read_to is None or self.can_read_to <= tell:
51-
self.can_read_to = self.torrent.can_read(self.offset + tell) + tell
77+
can_read_result = self.torrent.can_read(self.offset + tell)
78+
self.last_available_piece = can_read_result[1]
79+
self.can_read_to = can_read_result[0] + tell
80+
81+
current_piece, rest = self.current_piece
82+
logger.debug('Calculated last available piece is %s offset %s can_read_to %s piece_length %s' % (self.last_available_piece, self.offset, self.can_read_to, self.torrent.piece_length))
83+
84+
while self.piece_consumption_time and self.piece_consumption_time[0] < time.time() - PIECE_REQUEST_HISTORY_TIME:
85+
self.piece_consumption_time.pop(0)
86+
87+
max_piece_count = (self.last_available_piece - current_piece) + 1
88+
pieces_to_request = min(min(max(2, len(self.piece_consumption_time)), max_piece_count), MAX_PIECE_REQUEST_COUNT)
5289

53-
if self._open_file:
54-
self._open_file.seek(tell)
90+
logger.debug('New piece request status pieces_to_request: %s piece_consumption_time: %s max_piece_count: %s' % (pieces_to_request, len(self.piece_consumption_time), max_piece_count, ))
91+
logger.debug('Requested pieces: %r' % (self.requested_pieces.items()))
92+
logger.debug('Piece buffer: %r' % (self.piece_buffer.keys()))
5593

56-
real_num = min(num, self.can_read_to - tell)
57-
if num != real_num:
58-
logger.info('The real number we can read to is %s and not %s at position %s' % (real_num, num, tell))
94+
for piece in range(current_piece, current_piece + pieces_to_request):
95+
if piece in self.requested_pieces:
96+
continue
5997

60-
if not self._open_file: # the file was closed while we waited
98+
logger.debug('Requesting piece %s' % (piece, ))
99+
self.requested_pieces[piece] = threading.Event()
100+
self.torrent.request_piece(piece)
101+
102+
for _ in range(1000):
103+
if self.requested_pieces[current_piece].wait(1):
104+
break
105+
if self._closed:
106+
return b''
107+
else:
61108
return b''
62109

63-
data = super(DelugeTorrentInput, self).read(real_num)
64-
return data
110+
for delete_piece in [p for p in self.piece_buffer.keys() if p < current_piece]:
111+
del self.piece_buffer[delete_piece]
112+
113+
for delete_piece in [p for p in self.requested_pieces.keys() if p < current_piece]:
114+
del self.requested_pieces[delete_piece]
115+
116+
self.current_piece_data = self.piece_buffer[current_piece]
117+
self.current_piece_data.seek(rest)
118+
self.piece_consumption_time.append(time.time())
119+
logger.debug('Returning %s bytes' % (num, ))
120+
return self._read(num)
121+
122+
@property
123+
def current_piece(self):
124+
from_byte = self.offset + self.tell()
125+
piece_length = self.torrent.piece_length
126+
piece, rest = divmod(from_byte, piece_length)
127+
return piece, rest
128+
129+
def new_piece_available(self, piece, data):
130+
if piece not in self.requested_pieces or self.requested_pieces[piece].is_set():
131+
return
132+
133+
logger.debug("Setting data for piece %s" % (piece, ))
134+
self.piece_buffer[piece] = BytesIO(data)
135+
self.requested_pieces[piece].set()
65136

66137
def close(self):
67138
self.torrent.remove_reader(self)
68-
super(DelugeTorrentInput, self).close()
139+
self._closed = True

0 commit comments

Comments
 (0)