Stop using the deprecated cgi module.

Signed-off-by: Pedro Algarvio <palgarvio@vmware.com>
This commit is contained in:
Pedro Algarvio 2023-02-22 16:08:35 +00:00 committed by Pedro Algarvio
parent 7d57774710
commit 72fc1094ce

View file

@ -5,7 +5,7 @@ and the like, but also useful for basic HTTP testing.
.. versionadded:: 2015.5.0
"""
import cgi
import email.message
import gzip
import http.client
import http.cookiejar
@ -85,7 +85,7 @@ except ImportError:
HAS_CERTIFI = False
log = logging.getLogger(__name__)
USERAGENT = "Salt/{}".format(salt.version.__version__)
USERAGENT = f"Salt/{salt.version.__version__}"
def __decompressContent(coding, pgctnt):
@ -171,7 +171,7 @@ def query(
formdata_fieldname=None,
formdata_filename=None,
decode_body=True,
**kwargs
**kwargs,
):
"""
Query a resource, and decode the return data
@ -296,7 +296,7 @@ def query(
auth = (username, password)
if agent == USERAGENT:
agent = "{} http.query()".format(agent)
agent = f"{agent} http.query()"
header_dict["User-agent"] = agent
if backend == "requests":
@ -361,14 +361,14 @@ def query(
url,
params=params,
files={formdata_fieldname: (formdata_filename, io.StringIO(data))},
**req_kwargs
**req_kwargs,
)
else:
result = sess.request(method, url, params=params, data=data, **req_kwargs)
result.raise_for_status()
if stream is True:
# fake a HTTP response header
header_callback("HTTP/1.0 {} MESSAGE".format(result.status_code))
header_callback(f"HTTP/1.0 {result.status_code} MESSAGE")
# fake streaming the content
streaming_callback(result.content)
return {
@ -484,15 +484,12 @@ def query(
result_headers = dict(result.info())
result_text = result.read()
if "Content-Type" in result_headers:
res_content_type, res_params = cgi.parse_header(
result_headers["Content-Type"]
)
if (
res_content_type.startswith("text/")
and "charset" in res_params
and not isinstance(result_text, str)
):
result_text = result_text.decode(res_params["charset"])
msg = email.message.EmailMessage()
msg.add_header("Content-Type", result_headers["Content-Type"])
if msg.get_content_type().startswith("text/"):
content_charset = msg.get_content_charset()
if content_charset and not isinstance(result_text, str):
result_text = result_text.decode(content_charset)
if isinstance(result_text, bytes) and decode_body:
result_text = result_text.decode("utf-8")
ret["body"] = result_text
@ -637,15 +634,12 @@ def query(
result_headers = result.headers
result_text = result.body
if "Content-Type" in result_headers:
res_content_type, res_params = cgi.parse_header(
result_headers["Content-Type"]
)
if (
res_content_type.startswith("text/")
and "charset" in res_params
and not isinstance(result_text, str)
):
result_text = result_text.decode(res_params["charset"])
msg = email.message.EmailMessage()
msg.add_header("Content-Type", result_headers["Content-Type"])
if msg.get_content_type().startswith("text/"):
content_charset = msg.get_content_charset()
if content_charset and not isinstance(result_text, str):
result_text = result_text.decode(content_charset)
if isinstance(result_text, bytes) and decode_body:
result_text = result_text.decode("utf-8")
ret["body"] = result_text
@ -1039,12 +1033,12 @@ def _sanitize_url_components(comp_list, field):
"""
if not comp_list:
return ""
elif comp_list[0].startswith("{}=".format(field)):
ret = "{}=XXXXXXXXXX&".format(field)
elif comp_list[0].startswith(f"{field}="):
ret = f"{field}=XXXXXXXXXX&"
comp_list.remove(comp_list[0])
return ret + _sanitize_url_components(comp_list, field)
else:
ret = "{}&".format(comp_list[0])
ret = f"{comp_list[0]}&"
comp_list.remove(comp_list[0])
return ret + _sanitize_url_components(comp_list, field)