666 self._active = True |
666 self._active = True |
667 |
667 |
668 return makeframe(requestid, self.streamid, streamflags, typeid, flags, |
668 return makeframe(requestid, self.streamid, streamflags, typeid, flags, |
669 payload) |
669 payload) |
670 |
670 |
|
671 class inputstream(stream): |
|
672 """Represents a stream used for receiving data.""" |
|
673 |
671 def setdecoder(self, name, extraobjs): |
674 def setdecoder(self, name, extraobjs): |
672 """Set the decoder for this stream. |
675 """Set the decoder for this stream. |
673 |
676 |
674 Receives the stream profile name and any additional CBOR objects |
677 Receives the stream profile name and any additional CBOR objects |
675 decoded from the stream encoding settings frame payloads. |
678 decoded from the stream encoding settings frame payloads. |
676 """ |
679 """ |
|
680 |
|
681 class outputstream(stream): |
|
682 """Represents a stream used for sending data.""" |
677 |
683 |
678 def ensureserverstream(stream): |
684 def ensureserverstream(stream): |
679 if stream.streamid % 2: |
685 if stream.streamid % 2: |
680 raise error.ProgrammingError('server should only write to even ' |
686 raise error.ProgrammingError('server should only write to even ' |
681 'numbered streams; %d is not even' % |
687 'numbered streams; %d is not even' % |
797 self._state = 'errored' |
803 self._state = 'errored' |
798 return self._makeerrorresult( |
804 return self._makeerrorresult( |
799 _('received frame on unknown inactive stream without ' |
805 _('received frame on unknown inactive stream without ' |
800 'beginning of stream flag set')) |
806 'beginning of stream flag set')) |
801 |
807 |
802 self._incomingstreams[frame.streamid] = stream(frame.streamid) |
808 self._incomingstreams[frame.streamid] = inputstream(frame.streamid) |
803 |
809 |
804 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED: |
810 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED: |
805 # TODO handle decoding frames |
811 # TODO handle decoding frames |
806 self._state = 'errored' |
812 self._state = 'errored' |
807 raise error.ProgrammingError('support for decoding stream payloads ' |
813 raise error.ProgrammingError('support for decoding stream payloads ' |
1010 def makeoutputstream(self): |
1016 def makeoutputstream(self): |
1011 """Create a stream to be used for sending data to the client.""" |
1017 """Create a stream to be used for sending data to the client.""" |
1012 streamid = self._nextoutgoingstreamid |
1018 streamid = self._nextoutgoingstreamid |
1013 self._nextoutgoingstreamid += 2 |
1019 self._nextoutgoingstreamid += 2 |
1014 |
1020 |
1015 s = stream(streamid) |
1021 s = outputstream(streamid) |
1016 self._outgoingstreams[streamid] = s |
1022 self._outgoingstreams[streamid] = s |
1017 |
1023 |
1018 return s |
1024 return s |
1019 |
1025 |
1020 def _makeerrorresult(self, msg): |
1026 def _makeerrorresult(self, msg): |
1370 self._canissuecommands = True |
1376 self._canissuecommands = True |
1371 self._cansend = True |
1377 self._cansend = True |
1372 |
1378 |
1373 self._nextrequestid = 1 |
1379 self._nextrequestid = 1 |
1374 # We only support a single outgoing stream for now. |
1380 # We only support a single outgoing stream for now. |
1375 self._outgoingstream = stream(1) |
1381 self._outgoingstream = outputstream(1) |
1376 self._pendingrequests = collections.deque() |
1382 self._pendingrequests = collections.deque() |
1377 self._activerequests = {} |
1383 self._activerequests = {} |
1378 self._incomingstreams = {} |
1384 self._incomingstreams = {} |
1379 self._streamsettingsdecoders = {} |
1385 self._streamsettingsdecoders = {} |
1380 |
1386 |
1483 return 'error', { |
1489 return 'error', { |
1484 'message': _('received frame on unknown stream ' |
1490 'message': _('received frame on unknown stream ' |
1485 'without beginning of stream flag set'), |
1491 'without beginning of stream flag set'), |
1486 } |
1492 } |
1487 |
1493 |
1488 self._incomingstreams[frame.streamid] = stream(frame.streamid) |
1494 self._incomingstreams[frame.streamid] = inputstream( |
|
1495 frame.streamid) |
1489 |
1496 |
1490 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED: |
1497 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED: |
1491 raise error.ProgrammingError('support for decoding stream ' |
1498 raise error.ProgrammingError('support for decoding stream ' |
1492 'payloads not yet implemneted') |
1499 'payloads not yet implemneted') |
1493 |
1500 |