Method: rich.progress.Progress.wrap_file
Calls: 2, Exceptions: 0, Paths: 2Back
Path 1: 1 calls (0.5)
BufferedReader (1)
13 (1)
None (1)
'Reading...' (1)
_Reader (1)
1def wrap_file(
2 self,
3 file: BinaryIO,
4 total: Optional[int] = None,
5 *,
6 task_id: Optional[TaskID] = None,
7 description: str = "Reading...",
8 ) -> BinaryIO:
9 """Track progress file reading from a binary file.
10
11 Args:
12 file (BinaryIO): A file-like object opened in binary mode.
13 total (int, optional): Total number of bytes to read. This must be provided unless a task with a total is also given.
14 task_id (TaskID): Task to track. Default is new task.
15 description (str, optional): Description of task, if new task is created.
16
17 Returns:
18 BinaryIO: A readable file-like object in binary mode.
19
20 Raises:
21 ValueError: When no total value can be extracted from the arguments or the task.
22 """
23 # attempt to recover the total from the task
24 total_bytes: Optional[float] = None
25 if total is not None:
26 total_bytes = total
27 elif task_id is not None:
28 with self._lock:
29 total_bytes = self._tasks[task_id].total
30 if total_bytes is None:
31 raise ValueError(
32 f"unable to get the total number of bytes, please specify 'total'"
33 )
34
35 # update total of task or create new task
36 if task_id is None:
37 task_id = self.add_task(description, total=total_bytes)
38 else:
39 self.update(task_id, total=total_bytes)
40
41 return _Reader(file, self, task_id, close_handle=False)
Path 2: 1 calls (0.5)
BufferedReader (1)
None (1)
0 (1)
'Reading...' (1)
_Reader (1)
1def wrap_file(
2 self,
3 file: BinaryIO,
4 total: Optional[int] = None,
5 *,
6 task_id: Optional[TaskID] = None,
7 description: str = "Reading...",
8 ) -> BinaryIO:
9 """Track progress file reading from a binary file.
10
11 Args:
12 file (BinaryIO): A file-like object opened in binary mode.
13 total (int, optional): Total number of bytes to read. This must be provided unless a task with a total is also given.
14 task_id (TaskID): Task to track. Default is new task.
15 description (str, optional): Description of task, if new task is created.
16
17 Returns:
18 BinaryIO: A readable file-like object in binary mode.
19
20 Raises:
21 ValueError: When no total value can be extracted from the arguments or the task.
22 """
23 # attempt to recover the total from the task
24 total_bytes: Optional[float] = None
25 if total is not None:
26 total_bytes = total
27 elif task_id is not None:
28 with self._lock:
29 total_bytes = self._tasks[task_id].total
30 if total_bytes is None:
31 raise ValueError(
32 f"unable to get the total number of bytes, please specify 'total'"
33 )
34
35 # update total of task or create new task
36 if task_id is None:
37 task_id = self.add_task(description, total=total_bytes)
38 else:
39 self.update(task_id, total=total_bytes)
40
41 return _Reader(file, self, task_id, close_handle=False)