Coverage for src/pyrsslocal/helper/download_helper.py: 67%

27 statements  

« prev     ^ index     » next       coverage.py v6.4.2, created at 2023-02-02 02:59 +0100

1""" 

2@file 

3 

4@brief various function to get the content from a url 

5""" 

6 

7from urllib.error import HTTPError, URLError 

8import urllib 

9import urllib.request 

10import socket 

11import http 

12import gzip 

13 

14from pyquickhelper.loghelper import fLOG 

15 

16 

17def get_url_content_timeout(url, timeout=10, output=None, encoding="utf8"): 

18 """ 

19 Downloads a file from internet 

20 (it assumes it is text information, otherwise, encoding should be None). 

21 

22 @param url (str) url 

23 @param timeout (in seconds), after this time, the function drops an returns None, -1 for forever 

24 @param output (str) if None, the content is stored in that file 

25 @param encoding (str) utf8 by default, but if it is None, the returned information is binary 

26 @return content of the url 

27 

28 If the function automatically detects that the downloaded data is in gzip 

29 format, it will decompress it. 

30 """ 

31 try: 

32 if timeout != -1: 

33 with urllib.request.urlopen(url, timeout=timeout) as ur: 

34 res = ur.read() 

35 else: 

36 with urllib.request.urlopen(url) as ur: 

37 res = ur.read() 

38 except (HTTPError, URLError) as error: # pragma: no cover 

39 fLOG("[get_url_content_timeout] unable to retrieve content from ", 

40 url, "exc:", str(error)) 

41 return None 

42 except socket.timeout: # pragma: no cover 

43 fLOG( 

44 "[get_url_content_timeout] unable to retrieve content from ", 

45 url, 

46 " because of timeout: ", 

47 timeout) 

48 return None 

49 except ConnectionResetError as e: # pragma: no cover 

50 fLOG( 

51 "[get_url_content_timeout] unable to retrieve content from ", 

52 url, 

53 " because of ConnectionResetError: ", 

54 e) 

55 return None 

56 except http.client.BadStatusLine as e: # pragma: no cover 

57 fLOG( 

58 "[get_url_content_timeout] unable to retrieve content from ", 

59 url, 

60 " because of http.client.BadStatusLine: ", 

61 e) 

62 return None 

63 except http.client.IncompleteRead as e: # pragma: no cover 

64 fLOG( 

65 "[get_url_content_timeout] unable to retrieve content from ", 

66 url, 

67 " because of http.client.IncompleteRead: ", 

68 e) 

69 return None 

70 except Exception as e: # pragma: no cover 

71 fLOG( 

72 "[get_url_content_timeout] unable to retrieve content from ", 

73 url, 

74 " because of unknown exception: ", 

75 e) 

76 raise e 

77 

78 if len(res) >= 2 and res[:2] == b"\x1f\x8B": 

79 # gzip format 

80 res = gzip.decompress(res) 

81 

82 if encoding is not None: 

83 try: 

84 content = res.decode(encoding) 

85 except UnicodeDecodeError as e: # pragma: no cover 

86 # we try different encoding 

87 

88 laste = [e] 

89 othenc = ["iso-8859-1", "latin-1"] 

90 

91 for encode in othenc: 

92 try: 

93 content = res.decode(encode) 

94 break 

95 except UnicodeDecodeError as ee: 

96 laste.append(ee) 

97 content = None 

98 

99 if content is None: 

100 mes = ["unable to parse blog post: " + url] 

101 mes.append("tried:" + str([encoding] + othenc)) 

102 mes.append("beginning:\n" + str([res])[:50]) 

103 for e in laste: 

104 mes.append("Exception: " + str(e)) 

105 raise ValueError("\n".join(mes)) from e 

106 else: 

107 content = res 

108 

109 if output is not None: 

110 if encoding is not None: 

111 with open(output, "w", encoding=encoding) as f: 

112 f.write(content) 

113 else: 

114 with open(output, "wb") as f: 

115 f.write(content) 

116 

117 return content