Source code for parfive.downloader

import asyncio
import contextlib
import os
import pathlib
import urllib.parse
from concurrent.futures import ThreadPoolExecutor
from functools import partial

import aiohttp
from tqdm import tqdm, tqdm_notebook

from .results import Results
from .utils import (FailedDownload, Token, default_name, get_filepath,
                    get_ftp_size, get_http_size, in_notebook, run_in_thread)

try:
    import aioftp
except ImportError:
    aioftp = None


__all__ = ['Downloader']


[docs]class Downloader: """ Download files in parallel. Parameters ---------- max_conn : `int`, optional The number of parallel download slots. progress : `bool`, optional If `True` show a main progress bar showing how many of the total files have been downloaded. If `False`, no progress bars will be shown at all. file_progress : `bool`, optional If `True` and ``progress`` is true, show ``max_conn`` progress bars detailing the progress of each individual file being downloaded. loop : `asyncio.AbstractEventLoop`, optional The event loop to use to download the files. If not specified a new loop will be created and executed in a new thread so it does not interfere with any currently running event loop. notebook : `bool`, optional If `True` tqdm will be used in notebook mode. If `None` an attempt will be made to detect the notebook and guess which progress bar to use. overwrite : `bool` or `str`, optional Determine how to handle downloading if a file already exists with the same name. If `False` the file download will be skipped and the path returned to the existing file, if `True` the file will be downloaded and the existing file will be overwritten, if `'unique'` the filename will be modified to be unique. """ def __init__(self, max_conn=5, progress=True, file_progress=True, loop=None, notebook=None, overwrite=False): self.max_conn = max_conn self._start_loop(loop) # Configure progress bars if notebook is None: notebook = in_notebook() self.progress = progress self.file_progress = file_progress if self.progress else False self.tqdm = tqdm if not notebook else tqdm_notebook self.overwrite = overwrite def _start_loop(self, loop): # Setup asyncio loops if not loop: aio_pool = ThreadPoolExecutor(1) self.loop = asyncio.new_event_loop() self.run_until_complete = partial(run_in_thread, aio_pool, self.loop) else: self.loop = loop self.run_until_complete = self.loop.run_until_complete # Setup queues self.http_queue = asyncio.Queue(loop=self.loop) self.http_tokens = asyncio.Queue(maxsize=self.max_conn, loop=self.loop) self.ftp_queue = asyncio.Queue(loop=self.loop) self.ftp_tokens = asyncio.Queue(maxsize=self.max_conn, loop=self.loop) for i in range(self.max_conn): self.http_tokens.put_nowait(Token(i + 1)) self.ftp_tokens.put_nowait(Token(i + 1)) @property def queued_downloads(self): """ The total number of files already queued for download. """ return self.http_queue.qsize() + self.ftp_queue.qsize()
[docs] def enqueue_file(self, url, path=None, filename=None, overwrite=None, **kwargs): """ Add a file to the download queue. Parameters ---------- url : `str` The URL to retrieve. path : `str`, optional The directory to retrieve the file into, if `None` defaults to the current directory. filename : `str` or `callable`, optional The filename to save the file as. Can also be a callable which takes two arguments the url and the response object from opening that URL, and returns the filename. (Note, for FTP downloads the response will be ``None``.) If `None` the HTTP headers will be read for the filename, or the last segment of the URL will be used. overwrite : `bool` or `str`, optional Determine how to handle downloading if a file already exists with the same name. If `False` the file download will be skipped and the path returned to the existing file, if `True` the file will be downloaded and the existing file will be overwritten, if `'unique'` the filename will be modified to be unique. If `None` the value set when constructing the `~parfive.Downloader` object will be used. kwargs : `dict` Extra keyword arguments are passed to `aiohttp.ClientSession.get` or `aioftp.ClientSession` depending on the protocol. """ overwrite = overwrite or self.overwrite if path is None and filename is None: raise ValueError("Either path or filename must be specified.") elif path is None: path = './' path = pathlib.Path(path) if not filename: filepath = partial(default_name, path) elif callable(filename): filepath = filename else: # Define a function because get_file expects a callback def filepath(*args): return path / filename scheme = urllib.parse.urlparse(url).scheme if scheme in ('http', 'https'): get_file = partial(self._get_http, url=url, filepath_partial=filepath, overwrite=overwrite, **kwargs) self.http_queue.put_nowait(get_file) elif scheme == 'ftp': if aioftp is None: raise ValueError("The aioftp package must be installed to download over FTP.") get_file = partial(self._get_ftp, url=url, filepath_partial=filepath, overwrite=overwrite, **kwargs) self.ftp_queue.put_nowait(get_file) else: raise ValueError("URL must start with either 'http' or 'ftp'.")
[docs] def download(self, timeouts=None): """ Download all files in the queue. Parameters ---------- timeouts : `dict`, optional Overrides for the default timeouts for http downloads. Supported keys are any accepted by the `aiohttp.ClientTimeout` class. Defaults to 5 minutes for total session timeout and 90 seconds for socket read timeout. Returns ------- filenames : `parfive.Results` A list of files downloaded. Notes ----- The defaults for the `'total'` and `'sock_read'` timeouts can be overridden by two environment variables ``PARFIVE_TOTAL_TIMEOUT`` and ``PARFIVE_SOCK_READ_TIMEOUT``. """ timeouts = timeouts or {"total": os.environ.get("PARFIVE_TOTAL_TIMEOUT", 5 * 60), "sock_read": os.environ.get("PARFIVE_SOCK_READ_TIMEOUT", 90)} try: future = self.run_until_complete(self._run_download(timeouts)) finally: self.loop.stop() dlresults = future.result() results = Results() # Iterate through the results and store any failed download errors in # the errors list of the results object. for res in dlresults: if isinstance(res, FailedDownload): results.add_error(res.filepath_partial, res.url, res.exception) elif isinstance(res, Exception): raise res else: results.append(res) return results
[docs] def retry(self, results): """ Retry any failed downloads in a results object. .. note:: This will start a new event loop. Parameters ---------- results : `parfive.Results` A previous results object, the ``.errors`` property will be read and the downloads retried. Returns ------- results : `parfive.Results` A modified version of the input ``results`` with all the errors from this download attempt and any new files appended to the list of file paths. """ # Restart the loop. self._start_loop(None) for err in results.errors: self.enqueue_file(err.url, filename=err.filepath_partial) new_res = self.download() results += new_res results._errors = new_res._errors return results
def _get_main_pb(self, total): """ Return the tqdm instance if we want it, else return a contextmanager that just returns None. """ if self.progress: return self.tqdm(total=total, unit='file', desc="Files Downloaded", position=0) else: return contextlib.contextmanager(lambda: iter([None]))() async def _run_download(self, timeouts): """ Download all files in the queue. Returns ------- results : `parfive.Results` A list of filenames which successfully downloaded. This list also has an attribute ``errors`` which lists any failed urls and their error. """ total_files = self.http_queue.qsize() + self.ftp_queue.qsize() done = set() with self._get_main_pb(total_files) as main_pb: if not self.http_queue.empty(): done.update(await self._run_http_download(main_pb, timeouts)) if not self.ftp_queue.empty(): done.update(await self._run_ftp_download(main_pb, timeouts)) # Return one future to represent all the results. return asyncio.gather(*done, return_exceptions=True) async def _run_http_download(self, main_pb, timeouts): async with aiohttp.ClientSession(loop=self.loop) as session: futures = await self._run_from_queue(self.http_queue, self.http_tokens, main_pb, session=session, timeouts=timeouts) # Wait for all the coroutines to finish done, _ = await asyncio.wait(futures) return done async def _run_ftp_download(self, main_pb, timeouts): futures = await self._run_from_queue(self.ftp_queue, self.ftp_tokens, main_pb, timeouts=timeouts) # Wait for all the coroutines to finish done, _ = await asyncio.wait(futures) return done async def _run_from_queue(self, queue, tokens, main_pb, *, session=None, timeouts): futures = [] while not queue.empty(): get_file = await queue.get() token = await tokens.get() file_pb = self.tqdm if self.file_progress else False future = asyncio.ensure_future(get_file(session, token=token, file_pb=file_pb, timeouts=timeouts)) def callback(token, future, main_pb): tokens.put_nowait(token) # Update the main progressbar if main_pb and not future.exception(): main_pb.update(1) future.add_done_callback(partial(callback, token, main_pb=main_pb)) futures.append(future) return futures @staticmethod async def _get_http(session, *, url, filepath_partial, chunksize=100, file_pb=None, token, overwrite, timeouts, **kwargs): """ Read the file from the given url into the filename given by ``filepath_partial``. Parameters ---------- session : `aiohttp.ClientSession` The `aiohttp.ClientSession` to use to retrieve the files. url : `str` The url to retrieve. filepath_partial : `callable` A function to call which returns the filepath to save the url to. Takes two arguments ``resp, url``. chunksize : `int` The number of bytes to read into the file at a time. file_pb : `tqdm.tqdm` or `False` Should progress bars be displayed for each file downloaded. token : `parfive.downloader.Token` A token for this download slot. kwargs : `dict` Extra keyword arguments are passed to `aiohttp.ClientSession.get`. Returns ------- filepath : `str` The name of the file saved. """ timeout = aiohttp.ClientTimeout(**timeouts) try: async with session.get(url, timeout=timeout, **kwargs) as resp: if resp.status != 200: raise FailedDownload(filepath_partial, url, resp) else: filepath, skip = get_filepath(filepath_partial(resp, url), overwrite) if skip: return str(filepath) if callable(file_pb): file_pb = file_pb(position=token.n, unit='B', unit_scale=True, desc=filepath.name, leave=False, total=get_http_size(resp)) else: file_pb = None with open(str(filepath), 'wb') as fd: while True: chunk = await resp.content.read(chunksize) if not chunk: # Close the file progressbar if file_pb is not None: file_pb.close() return str(filepath) # Write this chunk to the output file. fd.write(chunk) # Update the progressbar for file if file_pb is not None: file_pb.update(chunksize) except Exception as e: raise FailedDownload(filepath_partial, url, e) @staticmethod async def _get_ftp(session=None, *, url, filepath_partial, file_pb=None, token, overwrite, timeouts, **kwargs): """ Read the file from the given url into the filename given by ``filepath_partial``. Parameters ---------- session : `None` A placeholder for API compatibility with ``_get_http`` url : `str` The url to retrieve. filepath_partial : `callable` A function to call which returns the filepath to save the url to. Takes two arguments ``resp, url``. file_pb : `tqdm.tqdm` or `False` Should progress bars be displayed for each file downloaded. token : `parfive.downloader.Token` A token for this download slot. kwargs : `dict` Extra keyword arguments are passed to `~aioftp.ClientSession`. Returns ------- filepath : `str` The name of the file saved. """ parse = urllib.parse.urlparse(url) try: async with aioftp.ClientSession(parse.hostname, **kwargs) as client: if parse.username and parse.password: client.login(parse.username, parse.password) # This has to be done before we start streaming the file: total_size = await get_ftp_size(client, parse.path) async with client.download_stream(parse.path) as stream: filepath, skip = get_filepath(filepath_partial(None, url), overwrite) if skip: return str(filepath) if callable(file_pb): file_pb = file_pb(position=token.n, unit='B', unit_scale=True, desc=filepath.name, leave=False, total=total_size) else: file_pb = None with open(str(filepath), 'wb') as fd: async for chunk in stream.iter_by_block(): # Write this chunk to the output file. fd.write(chunk) # Update the progressbar for file if file_pb is not None: file_pb.update(len(chunk)) # Close the file progressbar if file_pb is not None: file_pb.close() return str(filepath) except Exception as e: raise FailedDownload(filepath_partial, url, e)