Skip to content

change(get): Change get.py to match idf_tools.py more closely #10861

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
244 changes: 153 additions & 91 deletions tools/get.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,74 @@
import re
import time
import argparse
import ssl
import contextlib

from urllib.request import urlretrieve
from urllib.request import urlopen
from urllib.response import addinfourl
from typing import IO, Any, Callable, Dict, Iterator, List, Optional, Set, Tuple, Union
from ssl import SSLContext

unicode = lambda s: str(s) # noqa: E731

# the older "DigiCert Global Root CA" certificate used with github.com
DIGICERT_ROOT_CA_CERT = """
-----BEGIN CERTIFICATE-----
MIIDrzCCApegAwIBAgIQCDvgVpBCRrGhdWrJWZHHSjANBgkqhkiG9w0BAQUFADBh
MQswCQYDVQQGEwJVUzEVMBMGA1UEChMMRGlnaUNlcnQgSW5jMRkwFwYDVQQLExB3
d3cuZGlnaWNlcnQuY29tMSAwHgYDVQQDExdEaWdpQ2VydCBHbG9iYWwgUm9vdCBD
QTAeFw0wNjExMTAwMDAwMDBaFw0zMTExMTAwMDAwMDBaMGExCzAJBgNVBAYTAlVT
MRUwEwYDVQQKEwxEaWdpQ2VydCBJbmMxGTAXBgNVBAsTEHd3dy5kaWdpY2VydC5j
b20xIDAeBgNVBAMTF0RpZ2lDZXJ0IEdsb2JhbCBSb290IENBMIIBIjANBgkqhkiG
9w0BAQEFAAOCAQ8AMIIBCgKCAQEA4jvhEXLeqKTTo1eqUKKPC3eQyaKl7hLOllsB
CSDMAZOnTjC3U/dDxGkAV53ijSLdhwZAAIEJzs4bg7/fzTtxRuLWZscFs3YnFo97
nh6Vfe63SKMI2tavegw5BmV/Sl0fvBf4q77uKNd0f3p4mVmFaG5cIzJLv07A6Fpt
43C/dxC//AH2hdmoRBBYMql1GNXRor5H4idq9Joz+EkIYIvUX7Q6hL+hqkpMfT7P
T19sdl6gSzeRntwi5m3OFBqOasv+zbMUZBfHWymeMr/y7vrTC0LUq7dBMtoM1O/4
gdW7jVg/tRvoSSiicNoxBN33shbyTApOB6jtSj1etX+jkMOvJwIDAQABo2MwYTAO
BgNVHQ8BAf8EBAMCAYYwDwYDVR0TAQH/BAUwAwEB/zAdBgNVHQ4EFgQUA95QNVbR
TLtm8KPiGxvDl7I90VUwHwYDVR0jBBgwFoAUA95QNVbRTLtm8KPiGxvDl7I90VUw
DQYJKoZIhvcNAQEFBQADggEBAMucN6pIExIK+t1EnE9SsPTfrgT1eXkIoyQY/Esr
hMAtudXH/vTBH1jLuG2cenTnmCmrEbXjcKChzUyImZOMkXDiqw8cvpOp/2PV5Adg
06O/nVsJ8dWO41P0jmP6P6fbtGbfYmbW0W5BjfIttep3Sp+dWOIrWcBAI+0tKIJF
PnlUkiaY4IBIqDfv8NZ5YBberOgOzW6sRBc4L0na4UU+Krk2U886UAb3LujEV0ls
YSEY1QSteDwsOoBrp+uvFRTp2InBuThs4pFsiv9kuXclVzDAGySj4dzp30d8tbQk
CAUw7C29C79Fv1C5qfPrmAESrciIxpg0X40KPMbp1ZWVbd4=
-----END CERTIFICATE-----
"""

# Initialize start_time globally
start_time = -1

if sys.version_info[0] == 3:
from urllib.request import urlretrieve
from urllib.request import urlopen
# the newer "DigiCert Global Root G2" certificate used with dl.espressif.com
DIGICERT_ROOT_G2_CERT = """
-----BEGIN CERTIFICATE-----
MIIDjjCCAnagAwIBAgIQAzrx5qcRqaC7KGSxHQn65TANBgkqhkiG9w0BAQsFADBh
MQswCQYDVQQGEwJVUzEVMBMGA1UEChMMRGlnaUNlcnQgSW5jMRkwFwYDVQQLExB3
d3cuZGlnaWNlcnQuY29tMSAwHgYDVQQDExdEaWdpQ2VydCBHbG9iYWwgUm9vdCBH
MjAeFw0xMzA4MDExMjAwMDBaFw0zODAxMTUxMjAwMDBaMGExCzAJBgNVBAYTAlVT
MRUwEwYDVQQKEwxEaWdpQ2VydCBJbmMxGTAXBgNVBAsTEHd3dy5kaWdpY2VydC5j
b20xIDAeBgNVBAMTF0RpZ2lDZXJ0IEdsb2JhbCBSb290IEcyMIIBIjANBgkqhkiG
9w0BAQEFAAOCAQ8AMIIBCgKCAQEAuzfNNNx7a8myaJCtSnX/RrohCgiN9RlUyfuI
2/Ou8jqJkTx65qsGGmvPrC3oXgkkRLpimn7Wo6h+4FR1IAWsULecYxpsMNzaHxmx
1x7e/dfgy5SDN67sH0NO3Xss0r0upS/kqbitOtSZpLYl6ZtrAGCSYP9PIUkY92eQ
q2EGnI/yuum06ZIya7XzV+hdG82MHauVBJVJ8zUtluNJbd134/tJS7SsVQepj5Wz
tCO7TG1F8PapspUwtP1MVYwnSlcUfIKdzXOS0xZKBgyMUNGPHgm+F6HmIcr9g+UQ
vIOlCsRnKPZzFBQ9RnbDhxSJITRNrw9FDKZJobq7nMWxM4MphQIDAQABo0IwQDAP
BgNVHRMBAf8EBTADAQH/MA4GA1UdDwEB/wQEAwIBhjAdBgNVHQ4EFgQUTiJUIBiV
5uNu5g/6+rkS7QYXjzkwDQYJKoZIhvcNAQELBQADggEBAGBnKJRvDkhj6zHd6mcY
1Yl9PMWLSn/pvtsrF9+wX3N3KjITOYFnQoQj8kVnNeyIv/iPsGEMNKSuIEyExtv4
NeF22d+mQrvHRAiGfzZ0JFrabA0UWTW98kndth/Jsw1HKj2ZL7tcu7XUIOGZX1NG
Fdtom/DzMNU+MeKNhJ7jitralj41E6Vf8PlwUHBHQRFXGU7Aj64GxJUTFy8bJZ91
8rGOmaFvE7FBcf6IKshPECBV1/MUReXgRPTqh5Uykw7+U0b6LJ3/iyK5S9kJRaTe
pLiaWN0bfVKfjllDiIGknibVb63dDcY3fe0Dkhvld1927jyNxF1WW6LZZm6zNTfl
MrY=
-----END CERTIFICATE-----
"""

unicode = lambda s: str(s) # noqa: E731
else:
# Not Python 3 - today, it is most likely to be Python 2
from urllib import urlretrieve
from urllib import urlopen
DL_CERT_DICT = {'dl.espressif.com': DIGICERT_ROOT_G2_CERT,
'github.com': DIGICERT_ROOT_CA_CERT}

if "Windows" in platform.system():
import requests
# Initialize start_time globally
start_time = -1

# determine if application is a script file or frozen exe
if getattr(sys, "frozen", False):
Expand Down Expand Up @@ -71,20 +123,18 @@ def format_time(seconds):
return "{:02}:{:05.2f}".format(int(minutes), seconds)


def report_progress(block_count, block_size, total_size, start_time):
def report_progress(block_count, block_size, total_size):
downloaded_size = block_count * block_size
time_elapsed = time.time() - start_time
current_speed = downloaded_size / (time_elapsed)

if sys.stdout.isatty():
if total_size > 0:
percent_complete = min((downloaded_size / total_size) * 100, 100)
sys.stdout.write(
f"\rDownloading... {percent_complete:.2f}% - {downloaded_size / 1024 / 1024:.2f} MB downloaded - Elapsed Time: {format_time(time_elapsed)} - Speed: {current_speed / 1024 / 1024:.2f} MB/s" # noqa: E501
f"\rDownloading... {percent_complete:.2f}% - {downloaded_size / 1024 / 1024:.2f} MB downloaded" # noqa: E501
)
else:
sys.stdout.write(
f"\rDownloading... {downloaded_size / 1024 / 1024:.2f} MB downloaded - Elapsed Time: {format_time(time_elapsed)} - Speed: {current_speed / 1024 / 1024:.2f} MB/s" # noqa: E501
f"\rDownloading... {downloaded_size / 1024 / 1024:.2f} MB downloaded" # noqa: E501
)
sys.stdout.flush()

Expand Down Expand Up @@ -243,18 +293,25 @@ def unpack(filename, destination, force_extract, checksum): # noqa: C901
if filename.endswith("tar.gz"):
if not cfile:
cfile = tarfile.open(filename, "r:gz")
cfile.extractall(destination, filter="tar")
cfile.extractall(destination, filter="fully_trusted")
elif filename.endswith("tar.xz"):
if not cfile:
cfile = tarfile.open(filename, "r:xz")
cfile.extractall(destination, filter="tar")
cfile.extractall(destination, filter="fully_trusted")
elif filename.endswith("zip"):
if not cfile:
cfile = zipfile.ZipFile(filename)
cfile.extractall(destination)
else:
raise NotImplementedError("Unsupported archive type")

if sys.platform != 'win32' and filename.endswith('zip') and isinstance(cfile, zipfile.ZipFile):
for file_info in cfile.infolist():
extracted_file = os.path.join(destination, file_info.filename)
extracted_permissions = file_info.external_attr >> 16 & 0o777 # Extract Unix permissions
if os.path.exists(extracted_file):
os.chmod(extracted_file, extracted_permissions)

if rename_to != dirname:
print("Renaming {0} to {1} ...".format(dirname, rename_to))
shutil.move(dirname, rename_to)
Expand All @@ -275,55 +332,67 @@ def unpack(filename, destination, force_extract, checksum): # noqa: C901
return False


def download_file_with_progress(url, filename, start_time):
import ssl
import contextlib

ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
with contextlib.closing(urlopen(url, context=ctx)) as fp:
total_size = int(fp.getheader("Content-Length", fp.getheader("Content-length", "0")))
block_count = 0
block_size = 1024 * 8
block = fp.read(block_size)
if block:
with open(filename, "wb") as out_file:
out_file.write(block)
block_count += 1
report_progress(block_count, block_size, total_size, start_time)
while True:
block = fp.read(block_size)
if not block:
break
out_file.write(block)
block_count += 1
report_progress(block_count, block_size, total_size, start_time)
else:
raise Exception("Non-existing file or connection error")


def download_file(url, filename):
import ssl
import contextlib

ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
with contextlib.closing(urlopen(url, context=ctx)) as fp:
block_size = 1024 * 8
block = fp.read(block_size)
if block:
with open(filename, "wb") as out_file:
out_file.write(block)
while True:
block = fp.read(block_size)
if not block:
break
out_file.write(block)
else:
raise Exception("Non-existing file or connection error")

def splittype(url: str) -> Tuple[Optional[str], str]:
"""
Splits given url into its type (e.g. https, file) and the rest.
"""
match = re.match('([^/:]+):(.*)', url, re.DOTALL)
if match:
scheme, data = match.groups()
return scheme.lower(), data
return None, url

def urlretrieve_ctx(url: str,
filename: str,
reporthook: Optional[Callable[[int, int, int], None]]=None,
data: Optional[bytes]=None,
context: Optional[SSLContext]=None) -> Tuple[str, addinfourl]:
"""
Retrieve data from given URL. An alternative version of urlretrieve which takes SSL context as an argument.
"""
url_type, path = splittype(url)

# urlopen doesn't have context argument in Python <=2.7.9
extra_urlopen_args = {}
if context:
extra_urlopen_args['context'] = context
with contextlib.closing(urlopen(url, data, **extra_urlopen_args)) as fp: # type: ignore
headers = fp.info()

# Just return the local path and the "headers" for file://
# URLs. No sense in performing a copy unless requested.
if url_type == 'file' and not filename:
return os.path.normpath(path), headers

# Handle temporary file setup.
tfp = open(filename, 'wb')

with tfp:
result = filename, headers
bs = 1024 * 8
size = int(headers.get('content-length', -1))
read = 0
blocknum = 0

if reporthook:
reporthook(blocknum, bs, size)

while True:
block = fp.read(bs)
if not block:
break
read += len(block)
tfp.write(block)
blocknum += 1
if reporthook:
reporthook(blocknum, bs, size)

if size >= 0 and read < size:
raise ContentTooShortError(
'retrieval incomplete: got only %i out of %i bytes'
% (read, size), result)

return result

def get_tool(tool, force_download, force_extract):
sys_name = platform.system()
Expand All @@ -339,29 +408,22 @@ def get_tool(tool, force_download, force_extract):
else:
print("Downloading '" + archive_name + "' ...")
sys.stdout.flush()
if "CYGWIN_NT" in sys_name:
import ssl

ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
urlretrieve(url, local_path, report_progress, context=ctx)
elif "Windows" in sys_name:
r = requests.get(url)
f = open(local_path, "wb")
f.write(r.content)
f.close()
else:
is_ci = os.environ.get("GITHUB_WORKSPACE")
if is_ci:
download_file(url, local_path)

try:
for site, cert in DL_CERT_DICT.items():
if site in url:
ctx = ssl.create_default_context()
ctx.load_verify_locations(cadata=cert)
break
else:
try:
urlretrieve(url, local_path, report_progress)
except: # noqa: E722
download_file_with_progress(url, local_path, start_time)
sys.stdout.write(" - Done\n")
sys.stdout.flush()
ctx = None

urlretrieve_ctx(url, local_path, report_progress, context=ctx)
except Exception as e:
print(f"Failed to download {archive_name}: {e}")
return False
finally:
sys.stdout.flush()
else:
print("Tool {0} already downloaded".format(archive_name))
sys.stdout.flush()
Expand Down
Loading