|
1 from __future__ import absolute_import, print_function |
|
2 |
|
3 import unittest |
|
4 |
|
5 from mercurial import ( |
|
6 util, |
|
7 wireprotoframing as framing, |
|
8 ) |
|
9 |
|
10 ffs = framing.makeframefromhumanstring |
|
11 |
|
12 def makereactor(): |
|
13 return framing.serverreactor() |
|
14 |
|
15 def sendframes(reactor, gen): |
|
16 """Send a generator of frame bytearray to a reactor. |
|
17 |
|
18 Emits a generator of results from ``onframerecv()`` calls. |
|
19 """ |
|
20 for frame in gen: |
|
21 frametype, frameflags, framelength = framing.parseheader(frame) |
|
22 payload = frame[framing.FRAME_HEADER_SIZE:] |
|
23 assert len(payload) == framelength |
|
24 |
|
25 yield reactor.onframerecv(frametype, frameflags, payload) |
|
26 |
|
27 def sendcommandframes(reactor, cmd, args, datafh=None): |
|
28 """Generate frames to run a command and send them to a reactor.""" |
|
29 return sendframes(reactor, framing.createcommandframes(cmd, args, datafh)) |
|
30 |
|
31 class FrameTests(unittest.TestCase): |
|
32 def testdataexactframesize(self): |
|
33 data = util.bytesio(b'x' * framing.DEFAULT_MAX_FRAME_SIZE) |
|
34 |
|
35 frames = list(framing.createcommandframes(b'command', {}, data)) |
|
36 self.assertEqual(frames, [ |
|
37 ffs(b'command-name have-data command'), |
|
38 ffs(b'command-data continuation %s' % data.getvalue()), |
|
39 ffs(b'command-data eos ') |
|
40 ]) |
|
41 |
|
42 def testdatamultipleframes(self): |
|
43 data = util.bytesio(b'x' * (framing.DEFAULT_MAX_FRAME_SIZE + 1)) |
|
44 frames = list(framing.createcommandframes(b'command', {}, data)) |
|
45 self.assertEqual(frames, [ |
|
46 ffs(b'command-name have-data command'), |
|
47 ffs(b'command-data continuation %s' % ( |
|
48 b'x' * framing.DEFAULT_MAX_FRAME_SIZE)), |
|
49 ffs(b'command-data eos x'), |
|
50 ]) |
|
51 |
|
52 def testargsanddata(self): |
|
53 data = util.bytesio(b'x' * 100) |
|
54 |
|
55 frames = list(framing.createcommandframes(b'command', { |
|
56 b'key1': b'key1value', |
|
57 b'key2': b'key2value', |
|
58 b'key3': b'key3value', |
|
59 }, data)) |
|
60 |
|
61 self.assertEqual(frames, [ |
|
62 ffs(b'command-name have-args|have-data command'), |
|
63 ffs(br'command-argument 0 \x04\x00\x09\x00key1key1value'), |
|
64 ffs(br'command-argument 0 \x04\x00\x09\x00key2key2value'), |
|
65 ffs(br'command-argument eoa \x04\x00\x09\x00key3key3value'), |
|
66 ffs(b'command-data eos %s' % data.getvalue()), |
|
67 ]) |
|
68 |
|
69 class ServerReactorTests(unittest.TestCase): |
|
70 def _sendsingleframe(self, reactor, s): |
|
71 results = list(sendframes(reactor, [ffs(s)])) |
|
72 self.assertEqual(len(results), 1) |
|
73 |
|
74 return results[0] |
|
75 |
|
76 def assertaction(self, res, expected): |
|
77 self.assertIsInstance(res, tuple) |
|
78 self.assertEqual(len(res), 2) |
|
79 self.assertIsInstance(res[1], dict) |
|
80 self.assertEqual(res[0], expected) |
|
81 |
|
82 def test1framecommand(self): |
|
83 """Receiving a command in a single frame yields request to run it.""" |
|
84 reactor = makereactor() |
|
85 results = list(sendcommandframes(reactor, b'mycommand', {})) |
|
86 self.assertEqual(len(results), 1) |
|
87 self.assertaction(results[0], 'runcommand') |
|
88 self.assertEqual(results[0][1], { |
|
89 'command': b'mycommand', |
|
90 'args': {}, |
|
91 'data': None, |
|
92 }) |
|
93 |
|
94 def test1argument(self): |
|
95 reactor = makereactor() |
|
96 results = list(sendcommandframes(reactor, b'mycommand', |
|
97 {b'foo': b'bar'})) |
|
98 self.assertEqual(len(results), 2) |
|
99 self.assertaction(results[0], 'wantframe') |
|
100 self.assertaction(results[1], 'runcommand') |
|
101 self.assertEqual(results[1][1], { |
|
102 'command': b'mycommand', |
|
103 'args': {b'foo': b'bar'}, |
|
104 'data': None, |
|
105 }) |
|
106 |
|
107 def testmultiarguments(self): |
|
108 reactor = makereactor() |
|
109 results = list(sendcommandframes(reactor, b'mycommand', |
|
110 {b'foo': b'bar', b'biz': b'baz'})) |
|
111 self.assertEqual(len(results), 3) |
|
112 self.assertaction(results[0], 'wantframe') |
|
113 self.assertaction(results[1], 'wantframe') |
|
114 self.assertaction(results[2], 'runcommand') |
|
115 self.assertEqual(results[2][1], { |
|
116 'command': b'mycommand', |
|
117 'args': {b'foo': b'bar', b'biz': b'baz'}, |
|
118 'data': None, |
|
119 }) |
|
120 |
|
121 def testsimplecommanddata(self): |
|
122 reactor = makereactor() |
|
123 results = list(sendcommandframes(reactor, b'mycommand', {}, |
|
124 util.bytesio(b'data!'))) |
|
125 self.assertEqual(len(results), 2) |
|
126 self.assertaction(results[0], 'wantframe') |
|
127 self.assertaction(results[1], 'runcommand') |
|
128 self.assertEqual(results[1][1], { |
|
129 'command': b'mycommand', |
|
130 'args': {}, |
|
131 'data': b'data!', |
|
132 }) |
|
133 |
|
134 def testmultipledataframes(self): |
|
135 frames = [ |
|
136 ffs(b'command-name have-data mycommand'), |
|
137 ffs(b'command-data continuation data1'), |
|
138 ffs(b'command-data continuation data2'), |
|
139 ffs(b'command-data eos data3'), |
|
140 ] |
|
141 |
|
142 reactor = makereactor() |
|
143 results = list(sendframes(reactor, frames)) |
|
144 self.assertEqual(len(results), 4) |
|
145 for i in range(3): |
|
146 self.assertaction(results[i], 'wantframe') |
|
147 self.assertaction(results[3], 'runcommand') |
|
148 self.assertEqual(results[3][1], { |
|
149 'command': b'mycommand', |
|
150 'args': {}, |
|
151 'data': b'data1data2data3', |
|
152 }) |
|
153 |
|
154 def testargumentanddata(self): |
|
155 frames = [ |
|
156 ffs(b'command-name have-args|have-data command'), |
|
157 ffs(br'command-argument 0 \x03\x00\x03\x00keyval'), |
|
158 ffs(br'command-argument eoa \x03\x00\x03\x00foobar'), |
|
159 ffs(b'command-data continuation value1'), |
|
160 ffs(b'command-data eos value2'), |
|
161 ] |
|
162 |
|
163 reactor = makereactor() |
|
164 results = list(sendframes(reactor, frames)) |
|
165 |
|
166 self.assertaction(results[-1], 'runcommand') |
|
167 self.assertEqual(results[-1][1], { |
|
168 'command': b'command', |
|
169 'args': { |
|
170 b'key': b'val', |
|
171 b'foo': b'bar', |
|
172 }, |
|
173 'data': b'value1value2', |
|
174 }) |
|
175 |
|
176 def testunexpectedcommandargument(self): |
|
177 """Command argument frame when not running a command is an error.""" |
|
178 result = self._sendsingleframe(makereactor(), |
|
179 b'command-argument 0 ignored') |
|
180 self.assertaction(result, 'error') |
|
181 self.assertEqual(result[1], { |
|
182 'message': b'expected command frame; got 2', |
|
183 }) |
|
184 |
|
185 def testunexpectedcommanddata(self): |
|
186 """Command argument frame when not running a command is an error.""" |
|
187 result = self._sendsingleframe(makereactor(), |
|
188 b'command-data 0 ignored') |
|
189 self.assertaction(result, 'error') |
|
190 self.assertEqual(result[1], { |
|
191 'message': b'expected command frame; got 3', |
|
192 }) |
|
193 |
|
194 def testmissingcommandframeflags(self): |
|
195 """Command name frame must have flags set.""" |
|
196 result = self._sendsingleframe(makereactor(), |
|
197 b'command-name 0 command') |
|
198 self.assertaction(result, 'error') |
|
199 self.assertEqual(result[1], { |
|
200 'message': b'missing frame flags on command frame', |
|
201 }) |
|
202 |
|
203 def testmissingargumentframe(self): |
|
204 frames = [ |
|
205 ffs(b'command-name have-args command'), |
|
206 ffs(b'command-name 0 ignored'), |
|
207 ] |
|
208 |
|
209 results = list(sendframes(makereactor(), frames)) |
|
210 self.assertEqual(len(results), 2) |
|
211 self.assertaction(results[0], 'wantframe') |
|
212 self.assertaction(results[1], 'error') |
|
213 self.assertEqual(results[1][1], { |
|
214 'message': b'expected command argument frame; got 1', |
|
215 }) |
|
216 |
|
217 def testincompleteargumentname(self): |
|
218 """Argument frame with incomplete name.""" |
|
219 frames = [ |
|
220 ffs(b'command-name have-args command1'), |
|
221 ffs(br'command-argument eoa \x04\x00\xde\xadfoo'), |
|
222 ] |
|
223 |
|
224 results = list(sendframes(makereactor(), frames)) |
|
225 self.assertEqual(len(results), 2) |
|
226 self.assertaction(results[0], 'wantframe') |
|
227 self.assertaction(results[1], 'error') |
|
228 self.assertEqual(results[1][1], { |
|
229 'message': b'malformed argument frame: partial argument name', |
|
230 }) |
|
231 |
|
232 def testincompleteargumentvalue(self): |
|
233 """Argument frame with incomplete value.""" |
|
234 frames = [ |
|
235 ffs(b'command-name have-args command'), |
|
236 ffs(br'command-argument eoa \x03\x00\xaa\xaafoopartialvalue'), |
|
237 ] |
|
238 |
|
239 results = list(sendframes(makereactor(), frames)) |
|
240 self.assertEqual(len(results), 2) |
|
241 self.assertaction(results[0], 'wantframe') |
|
242 self.assertaction(results[1], 'error') |
|
243 self.assertEqual(results[1][1], { |
|
244 'message': b'malformed argument frame: partial argument value', |
|
245 }) |
|
246 |
|
247 def testmissingcommanddataframe(self): |
|
248 frames = [ |
|
249 ffs(b'command-name have-data command1'), |
|
250 ffs(b'command-name eos command2'), |
|
251 ] |
|
252 results = list(sendframes(makereactor(), frames)) |
|
253 self.assertEqual(len(results), 2) |
|
254 self.assertaction(results[0], 'wantframe') |
|
255 self.assertaction(results[1], 'error') |
|
256 self.assertEqual(results[1][1], { |
|
257 'message': b'expected command data frame; got 1', |
|
258 }) |
|
259 |
|
260 def testmissingcommanddataframeflags(self): |
|
261 frames = [ |
|
262 ffs(b'command-name have-data command1'), |
|
263 ffs(b'command-data 0 data'), |
|
264 ] |
|
265 results = list(sendframes(makereactor(), frames)) |
|
266 self.assertEqual(len(results), 2) |
|
267 self.assertaction(results[0], 'wantframe') |
|
268 self.assertaction(results[1], 'error') |
|
269 self.assertEqual(results[1][1], { |
|
270 'message': b'command data frame without flags', |
|
271 }) |
|
272 |
|
273 if __name__ == '__main__': |
|
274 import silenttestrunner |
|
275 silenttestrunner.main(__name__) |