Coverage for src/pyrsslocal/xmlhelper/xml_tree.py: 95%

112 statements  

« prev     ^ index     » next       coverage.py v6.4.2, created at 2023-02-02 02:59 +0100

1""" 

2@file 

3 

4@brief parsing XML 

5""" 

6 

7import xml.sax.handler 

8import io 

9import xml.sax.expatreader 

10import xml.sax.saxutils as xsaxutils 

11from xml.parsers import expat 

12 

13from .xml_tree_node import XMLHandlerDictNode 

14 

15 

16class XMLHandlerDict (xml.sax.handler.ContentHandler): 

17 """ 

18 Overloads functions about XML, it produces objects at the end 

19 we assume the file contains a list of objects. 

20 """ 

21 

22 def __init__(self, no_content=False): 

23 """ 

24 @param no_content avoid loading the content of every record 

25 """ 

26 xml.sax.handler.ContentHandler.__init__(self) 

27 self._objs = [] 

28 self._being = None 

29 self._level = 0 

30 self._tag = None 

31 self._tile = [] 

32 self._pointer = None 

33 self._forget_root = True # always True 

34 self._no_content = no_content 

35 self._prepare_stringio() 

36 

37 def _prepare_stringio(self): 

38 """prepare the StringIO stream 

39 """ 

40 

41 if not self._no_content: 

42 self._xmlio = io.StringIO() 

43 self._xmlgen = xsaxutils.XMLGenerator(self._xmlio, "utf8") 

44 self._xmlgen.startDocument() 

45 else: 

46 self._xmlgen = None 

47 

48 def startElement(self, name, attrs): 

49 """ 

50 When enters a section. 

51 """ 

52 if self._level == 0 and self._forget_root: 

53 self._level = 1 

54 return 

55 

56 if self._xmlgen is not None: 

57 self._xmlgen.startElement(name, attrs) 

58 

59 self._tile.append(name) 

60 if self._being is None: 

61 self._tag = name 

62 self._being = XMLHandlerDictNode( 

63 None, name, self._level, root=True) 

64 self._pointer = self._being 

65 else: 

66 node = XMLHandlerDictNode( 

67 self._pointer, name, self._level, root=False) 

68 self._pointer.set(name, node) 

69 self._pointer = node 

70 

71 for k in attrs.getNames(): 

72 self._pointer.set(k, attrs[k].strip()) 

73 self._level += 1 

74 

75 def endElement(self, name): 

76 """ 

77 After a tag. 

78 """ 

79 if len(self._tile) == 0: 

80 return 

81 

82 if self._xmlgen is not None: 

83 self._xmlgen.endElement(name) 

84 

85 self._pointer.strip() 

86 self._tile.pop() 

87 self._level -= 1 

88 if len(self._tile) == 0: 

89 self._being.rearrange() 

90 if self._xmlgen is not None: 

91 self._xmlgen.endDocument() 

92 self._xmlio.write("\n") 

93 content = self._xmlio.getvalue() 

94 if content.startswith("<?xml"): 

95 end = content.find("\n") + 1 

96 if len(content) > end and content[end] == "\n": 

97 end += 1 

98 content = content[end:] 

99 else: 

100 content = "" 

101 

102 if isinstance(content, bytes): 

103 raise AssertionError( # pragma: no cover 

104 "this should not happen") 

105 

106 self._being.add_xml_content(content) 

107 self._objs.append(self._being) 

108 self._being = None 

109 self._pointer = None 

110 self._prepare_stringio() 

111 else: 

112 self._pointer = self._pointer.father 

113 

114 def characters(self, content): 

115 """ 

116 Adds characters. 

117 """ 

118 if self._xmlgen is not None: 

119 self._xmlgen.characters(content) 

120 

121 if self._pointer is not None: 

122 self._pointer.buffer += content 

123 

124# iteration version 

125 

126 

127class XMLIterParser(xml.sax.expatreader.ExpatParser): 

128 

129 """ 

130 To use a parser like an iterator. 

131 Example: 

132 

133 :: 

134 

135 zxml = \"\"\" 

136 <mixed engine___="conf1" fid="3" grade___="Fair" query___="queryA" rank="3"> 

137 <urls> 

138 <url___>http://www.shop.com/Soloxine_1_0mg_Tab-181378988-214010464-p!.shtml</url___> 

139 <url___>http://fake</url___> 

140 </urls> 

141 </mixed> 

142 <mixed engine___="conf1" fid="4" grade___="Good" query___="queryA" rank="4" 

143 url___="http%3A//www.lamars.com/products/nutrition.html" /> 

144 \"\"\" 

145 

146 zxml = "<root>%s</root>" % zxml 

147 f = StringIO.StringIO (zxml) 

148 assert len(f.getvalue()) > 0 

149 

150 parser = XMLIterParser() 

151 handler = XMLHandlerDict(no_content = False) 

152 parser.setContentHandler(handler) 

153 nb = 0 

154 for o in parser.parse(f) : 

155 assert o["query___"] == "queryA" 

156 nb += 1 

157 assert nb > 0 

158 """ 

159 

160 def __init__(self, namespaceHandling=0, bufsize=2 ** 17): 

161 if bufsize is None: 

162 bufsize = 2 ** 17 

163 xml.sax.expatreader.ExpatParser.__init__( 

164 self, 

165 namespaceHandling=namespaceHandling, 

166 bufsize=bufsize) 

167 

168 def parse(self, source): 

169 """ 

170 Parses an :epkg:`XML` document from a URL or an *InputSource*. 

171 @param source a file or a stream 

172 """ 

173 source0 = source 

174 source = xsaxutils.prepare_input_source(source) 

175 

176 self._source = source 

177 self.reset() 

178 self._cont_handler.setDocumentLocator( 

179 xml.sax.expatreader.ExpatLocator(self)) 

180 

181 # xmlreader.IncrementalParser.parse(self, source) 

182 # source = xsaxutils.prepare_input_source(source) 

183 

184 self.prepareParser(source) 

185 file_char = source.getCharacterStream() 

186 if file_char is None: 

187 file_bytes = source.getByteStream() 

188 file = file_bytes 

189 else: 

190 file = file_char 

191 

192 if file is None: 

193 raise FileNotFoundError( # pragma: no cover 

194 "File is None, it should not, source='{0}'\n{1}".format( 

195 source0, source0.name)) 

196 

197 buffer = file.read(self._bufsize) 

198 isFinal = 0 

199 while buffer != "" or isFinal == 0: 

200 # self.feed(buffer) 

201 data = buffer 

202 isFinal = 1 if len(buffer) == 0 else 0 

203 

204 if not self._parsing: 

205 self.reset() 

206 self._parsing = 1 

207 self._cont_handler.startDocument() 

208 

209 try: 

210 # The isFinal parameter is internal to the expat reader. 

211 # If it is set to true, expat will check validity of the entire 

212 # document. When feeding chunks, they are not normally final - 

213 # except when invoked from close. 

214 self._parser.Parse(data, isFinal) 

215 

216 for o in self._cont_handler._objs: 

217 yield o 

218 del self._cont_handler._objs[:] 

219 

220 except expat.error as e: # pragma: no cover 

221 exc = xml.sax.SAXParseException( 

222 expat.ErrorString( 

223 e.code), 

224 e, 

225 self) 

226 self._err_handler.fatalError(exc) 

227 

228 buffer = file.read(self._bufsize) 

229 

230 # self.close() 

231 self._cont_handler.endDocument() 

232 self._parsing = 0 

233 # break cycle created by expat handlers pointing to our methods 

234 self._parser = None 

235 

236 for o in self._cont_handler._objs: 

237 yield o 

238 del self._cont_handler._objs[:]