446 def __iter__(self): |
446 def __iter__(self): |
447 """yield all parts contained in the stream""" |
447 """yield all parts contained in the stream""" |
448 # make sure param have been loaded |
448 # make sure param have been loaded |
449 self.params |
449 self.params |
450 self.ui.debug('start extraction of bundle2 parts\n') |
450 self.ui.debug('start extraction of bundle2 parts\n') |
451 part = self._readpart() |
451 headerblock = self._readpartheader() |
452 while part is not None: |
452 while headerblock is not None: |
|
453 part = unbundlepart(self.ui, headerblock, self._fp) |
453 yield part |
454 yield part |
454 part = self._readpart() |
455 headerblock = self._readpartheader() |
455 self.ui.debug('end of bundle2 stream\n') |
456 self.ui.debug('end of bundle2 stream\n') |
456 |
457 |
457 def _readpart(self): |
458 def _readpartheader(self): |
458 """return None when an end of stream markers is reach""" |
459 """reads a part header size and return the bytes blob |
459 |
460 |
|
461 returns None if empty""" |
460 headersize = self._unpack(_fpartheadersize)[0] |
462 headersize = self._unpack(_fpartheadersize)[0] |
461 self.ui.debug('part header size: %i\n' % headersize) |
463 self.ui.debug('part header size: %i\n' % headersize) |
462 if not headersize: |
464 if headersize: |
463 return None |
465 return self._readexact(headersize) |
464 headerblock = self._readexact(headersize) |
466 return None |
465 # some utility to help reading from the header block |
|
466 self._offset = 0 # layer violation to have something easy to understand |
|
467 def fromheader(size): |
|
468 """return the next <size> byte from the header""" |
|
469 offset = self._offset |
|
470 data = headerblock[offset:(offset + size)] |
|
471 self._offset = offset + size |
|
472 return data |
|
473 def unpackheader(format): |
|
474 """read given format from header |
|
475 |
|
476 This automatically compute the size of the format to read.""" |
|
477 data = fromheader(struct.calcsize(format)) |
|
478 return _unpack(format, data) |
|
479 |
|
480 typesize = unpackheader(_fparttypesize)[0] |
|
481 parttype = fromheader(typesize) |
|
482 self.ui.debug('part type: "%s"\n' % parttype) |
|
483 partid = unpackheader(_fpartid)[0] |
|
484 self.ui.debug('part id: "%s"\n' % partid) |
|
485 ## reading parameters |
|
486 # param count |
|
487 mancount, advcount = unpackheader(_fpartparamcount) |
|
488 self.ui.debug('part parameters: %i\n' % (mancount + advcount)) |
|
489 # param size |
|
490 paramsizes = unpackheader(_makefpartparamsizes(mancount + advcount)) |
|
491 # make it a list of couple again |
|
492 paramsizes = zip(paramsizes[::2], paramsizes[1::2]) |
|
493 # split mandatory from advisory |
|
494 mansizes = paramsizes[:mancount] |
|
495 advsizes = paramsizes[mancount:] |
|
496 # retrive param value |
|
497 manparams = [] |
|
498 for key, value in mansizes: |
|
499 manparams.append((fromheader(key), fromheader(value))) |
|
500 advparams = [] |
|
501 for key, value in advsizes: |
|
502 advparams.append((fromheader(key), fromheader(value))) |
|
503 del self._offset # clean up layer, nobody saw anything. |
|
504 ## part payload |
|
505 payload = [] |
|
506 payloadsize = self._unpack(_fpayloadsize)[0] |
|
507 self.ui.debug('payload chunk size: %i\n' % payloadsize) |
|
508 while payloadsize: |
|
509 payload.append(self._readexact(payloadsize)) |
|
510 payloadsize = self._unpack(_fpayloadsize)[0] |
|
511 self.ui.debug('payload chunk size: %i\n' % payloadsize) |
|
512 payload = ''.join(payload) |
|
513 current = bundlepart(parttype, manparams, advparams, data=payload) |
|
514 current.id = partid |
|
515 return current |
|
516 |
467 |
517 |
468 |
518 class bundlepart(object): |
469 class bundlepart(object): |
519 """A bundle2 part contains application level payload |
470 """A bundle2 part contains application level payload |
520 |
471 |
582 yield chunk |
533 yield chunk |
583 chunk = buff.read(preferedchunksize) |
534 chunk = buff.read(preferedchunksize) |
584 elif len(self.data): |
535 elif len(self.data): |
585 yield self.data |
536 yield self.data |
586 |
537 |
|
538 class unbundlepart(unpackermixin): |
|
539 """a bundle part read from a bundle""" |
|
540 |
|
541 def __init__(self, ui, header, fp): |
|
542 super(unbundlepart, self).__init__(fp) |
|
543 self.ui = ui |
|
544 # unbundle state attr |
|
545 self._headerdata = header |
|
546 # part data |
|
547 self.id = None |
|
548 self.type = None |
|
549 self.mandatoryparams = None |
|
550 self.advisoryparams = None |
|
551 self.data = None |
|
552 self._readdata() |
|
553 |
|
554 def _readdata(self): |
|
555 """read the header and setup the object""" |
|
556 # some utility to help reading from the header block |
|
557 headerblock = self._headerdata |
|
558 self._offset = 0 # layer violation to have something easy to understand |
|
559 def fromheader(size): |
|
560 """return the next <size> byte from the header""" |
|
561 offset = self._offset |
|
562 data = headerblock[offset:(offset + size)] |
|
563 self._offset = offset + size |
|
564 return data |
|
565 def unpackheader(format): |
|
566 """read given format from header |
|
567 |
|
568 This automatically compute the size of the format to read.""" |
|
569 data = fromheader(struct.calcsize(format)) |
|
570 return _unpack(format, data) |
|
571 |
|
572 typesize = unpackheader(_fparttypesize)[0] |
|
573 self.type = fromheader(typesize) |
|
574 self.ui.debug('part type: "%s"\n' % self.type) |
|
575 self.id = unpackheader(_fpartid)[0] |
|
576 self.ui.debug('part id: "%s"\n' % self.id) |
|
577 ## reading parameters |
|
578 # param count |
|
579 mancount, advcount = unpackheader(_fpartparamcount) |
|
580 self.ui.debug('part parameters: %i\n' % (mancount + advcount)) |
|
581 # param size |
|
582 paramsizes = unpackheader(_makefpartparamsizes(mancount + advcount)) |
|
583 # make it a list of couple again |
|
584 paramsizes = zip(paramsizes[::2], paramsizes[1::2]) |
|
585 # split mandatory from advisory |
|
586 mansizes = paramsizes[:mancount] |
|
587 advsizes = paramsizes[mancount:] |
|
588 # retrive param value |
|
589 manparams = [] |
|
590 for key, value in mansizes: |
|
591 manparams.append((fromheader(key), fromheader(value))) |
|
592 advparams = [] |
|
593 for key, value in advsizes: |
|
594 advparams.append((fromheader(key), fromheader(value))) |
|
595 del self._offset # clean up layer, nobody saw anything. |
|
596 self.mandatoryparams = manparams |
|
597 self.advisoryparams = advparams |
|
598 ## part payload |
|
599 payload = [] |
|
600 payloadsize = self._unpack(_fpayloadsize)[0] |
|
601 self.ui.debug('payload chunk size: %i\n' % payloadsize) |
|
602 while payloadsize: |
|
603 payload.append(self._readexact(payloadsize)) |
|
604 payloadsize = self._unpack(_fpayloadsize)[0] |
|
605 self.ui.debug('payload chunk size: %i\n' % payloadsize) |
|
606 self.data = ''.join(payload) |
|
607 |
587 @parthandler('changegroup') |
608 @parthandler('changegroup') |
588 def handlechangegroup(op, inpart): |
609 def handlechangegroup(op, inpart): |
589 """apply a changegroup part on the repo |
610 """apply a changegroup part on the repo |
590 |
611 |
591 This is a very early implementation that will massive rework before being |
612 This is a very early implementation that will massive rework before being |
603 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2') |
624 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2') |
604 op.records.add('changegroup', {'return': ret}) |
625 op.records.add('changegroup', {'return': ret}) |
605 if op.reply is not None: |
626 if op.reply is not None: |
606 # This is definitly not the final form of this |
627 # This is definitly not the final form of this |
607 # return. But one need to start somewhere. |
628 # return. But one need to start somewhere. |
608 op.reply.addpart(bundlepart('reply:changegroup', (), |
629 part = bundlepart('reply:changegroup', (), |
609 [('in-reply-to', str(inpart.id)), |
630 [('in-reply-to', str(inpart.id)), |
610 ('return', '%i' % ret)])) |
631 ('return', '%i' % ret)]) |
|
632 op.reply.addpart(part) |
611 |
633 |
612 @parthandler('reply:changegroup') |
634 @parthandler('reply:changegroup') |
613 def handlechangegroup(op, inpart): |
635 def handlechangegroup(op, inpart): |
614 p = dict(inpart.advisoryparams) |
636 p = dict(inpart.advisoryparams) |
615 ret = int(p['return']) |
637 ret = int(p['return']) |