562 |
536 |
563 def _abort(self, exception): |
537 def _abort(self, exception): |
564 raise exception |
538 raise exception |
565 |
539 |
566 |
540 |
567 def sendv2request( |
|
568 ui, opener, requestbuilder, apiurl, permission, requests, redirect |
|
569 ): |
|
570 wireprotoframing.populatestreamencoders() |
|
571 |
|
572 uiencoders = ui.configlist(b'experimental', b'httppeer.v2-encoder-order') |
|
573 |
|
574 if uiencoders: |
|
575 encoders = [] |
|
576 |
|
577 for encoder in uiencoders: |
|
578 if encoder not in wireprotoframing.STREAM_ENCODERS: |
|
579 ui.warn( |
|
580 _( |
|
581 b'wire protocol version 2 encoder referenced in ' |
|
582 b'config (%s) is not known; ignoring\n' |
|
583 ) |
|
584 % encoder |
|
585 ) |
|
586 else: |
|
587 encoders.append(encoder) |
|
588 |
|
589 else: |
|
590 encoders = wireprotoframing.STREAM_ENCODERS_ORDER |
|
591 |
|
592 reactor = wireprotoframing.clientreactor( |
|
593 ui, |
|
594 hasmultiplesend=False, |
|
595 buffersends=True, |
|
596 clientcontentencoders=encoders, |
|
597 ) |
|
598 |
|
599 handler = wireprotov2peer.clienthandler( |
|
600 ui, reactor, opener=opener, requestbuilder=requestbuilder |
|
601 ) |
|
602 |
|
603 url = b'%s/%s' % (apiurl, permission) |
|
604 |
|
605 if len(requests) > 1: |
|
606 url += b'/multirequest' |
|
607 else: |
|
608 url += b'/%s' % requests[0][0] |
|
609 |
|
610 ui.debug(b'sending %d commands\n' % len(requests)) |
|
611 for command, args, f in requests: |
|
612 ui.debug( |
|
613 b'sending command %s: %s\n' |
|
614 % (command, stringutil.pprint(args, indent=2)) |
|
615 ) |
|
616 assert not list( |
|
617 handler.callcommand(command, args, f, redirect=redirect) |
|
618 ) |
|
619 |
|
620 # TODO stream this. |
|
621 body = b''.join(map(bytes, handler.flushcommands())) |
|
622 |
|
623 # TODO modify user-agent to reflect v2 |
|
624 headers = { |
|
625 'Accept': wireprotov2server.FRAMINGTYPE, |
|
626 'Content-Type': wireprotov2server.FRAMINGTYPE, |
|
627 } |
|
628 |
|
629 req = requestbuilder(pycompat.strurl(url), body, headers) |
|
630 req.add_unredirected_header('Content-Length', '%d' % len(body)) |
|
631 |
|
632 try: |
|
633 res = opener.open(req) |
|
634 except urlerr.httperror as e: |
|
635 if e.code == 401: |
|
636 raise error.Abort(_(b'authorization failed')) |
|
637 |
|
638 raise |
|
639 except httplib.HTTPException as e: |
|
640 ui.traceback() |
|
641 raise IOError(None, e) |
|
642 |
|
643 return handler, res |
|
644 |
|
645 |
|
646 class queuedcommandfuture(pycompat.futures.Future): |
541 class queuedcommandfuture(pycompat.futures.Future): |
647 """Wraps result() on command futures to trigger submission on call.""" |
542 """Wraps result() on command futures to trigger submission on call.""" |
648 |
543 |
649 def result(self, timeout=None): |
544 def result(self, timeout=None): |
650 if self.done(): |
545 if self.done(): |
655 # sendcommands() will restore the original __class__ and self.result |
550 # sendcommands() will restore the original __class__ and self.result |
656 # will resolve to Future.result. |
551 # will resolve to Future.result. |
657 return self.result(timeout) |
552 return self.result(timeout) |
658 |
553 |
659 |
554 |
660 @interfaceutil.implementer(repository.ipeercommandexecutor) |
|
661 class httpv2executor(object): |
|
662 def __init__( |
|
663 self, ui, opener, requestbuilder, apiurl, descriptor, redirect |
|
664 ): |
|
665 self._ui = ui |
|
666 self._opener = opener |
|
667 self._requestbuilder = requestbuilder |
|
668 self._apiurl = apiurl |
|
669 self._descriptor = descriptor |
|
670 self._redirect = redirect |
|
671 self._sent = False |
|
672 self._closed = False |
|
673 self._neededpermissions = set() |
|
674 self._calls = [] |
|
675 self._futures = weakref.WeakSet() |
|
676 self._responseexecutor = None |
|
677 self._responsef = None |
|
678 |
|
679 def __enter__(self): |
|
680 return self |
|
681 |
|
682 def __exit__(self, exctype, excvalue, exctb): |
|
683 self.close() |
|
684 |
|
685 def callcommand(self, command, args): |
|
686 if self._sent: |
|
687 raise error.ProgrammingError( |
|
688 b'callcommand() cannot be used after commands are sent' |
|
689 ) |
|
690 |
|
691 if self._closed: |
|
692 raise error.ProgrammingError( |
|
693 b'callcommand() cannot be used after close()' |
|
694 ) |
|
695 |
|
696 # The service advertises which commands are available. So if we attempt |
|
697 # to call an unknown command or pass an unknown argument, we can screen |
|
698 # for this. |
|
699 if command not in self._descriptor[b'commands']: |
|
700 raise error.ProgrammingError( |
|
701 b'wire protocol command %s is not available' % command |
|
702 ) |
|
703 |
|
704 cmdinfo = self._descriptor[b'commands'][command] |
|
705 unknownargs = set(args.keys()) - set(cmdinfo.get(b'args', {})) |
|
706 |
|
707 if unknownargs: |
|
708 raise error.ProgrammingError( |
|
709 b'wire protocol command %s does not accept argument: %s' |
|
710 % (command, b', '.join(sorted(unknownargs))) |
|
711 ) |
|
712 |
|
713 self._neededpermissions |= set(cmdinfo[b'permissions']) |
|
714 |
|
715 # TODO we /could/ also validate types here, since the API descriptor |
|
716 # includes types... |
|
717 |
|
718 f = pycompat.futures.Future() |
|
719 |
|
720 # Monkeypatch it so result() triggers sendcommands(), otherwise result() |
|
721 # could deadlock. |
|
722 f.__class__ = queuedcommandfuture |
|
723 f._peerexecutor = self |
|
724 |
|
725 self._futures.add(f) |
|
726 self._calls.append((command, args, f)) |
|
727 |
|
728 return f |
|
729 |
|
730 def sendcommands(self): |
|
731 if self._sent: |
|
732 return |
|
733 |
|
734 if not self._calls: |
|
735 return |
|
736 |
|
737 self._sent = True |
|
738 |
|
739 # Unhack any future types so caller sees a clean type and so we |
|
740 # break reference cycle. |
|
741 for f in self._futures: |
|
742 if isinstance(f, queuedcommandfuture): |
|
743 f.__class__ = pycompat.futures.Future |
|
744 f._peerexecutor = None |
|
745 |
|
746 # Mark the future as running and filter out cancelled futures. |
|
747 calls = [ |
|
748 (command, args, f) |
|
749 for command, args, f in self._calls |
|
750 if f.set_running_or_notify_cancel() |
|
751 ] |
|
752 |
|
753 # Clear out references, prevent improper object usage. |
|
754 self._calls = None |
|
755 |
|
756 if not calls: |
|
757 return |
|
758 |
|
759 permissions = set(self._neededpermissions) |
|
760 |
|
761 if b'push' in permissions and b'pull' in permissions: |
|
762 permissions.remove(b'pull') |
|
763 |
|
764 if len(permissions) > 1: |
|
765 raise error.RepoError( |
|
766 _(b'cannot make request requiring multiple permissions: %s') |
|
767 % _(b', ').join(sorted(permissions)) |
|
768 ) |
|
769 |
|
770 permission = { |
|
771 b'push': b'rw', |
|
772 b'pull': b'ro', |
|
773 }[permissions.pop()] |
|
774 |
|
775 handler, resp = sendv2request( |
|
776 self._ui, |
|
777 self._opener, |
|
778 self._requestbuilder, |
|
779 self._apiurl, |
|
780 permission, |
|
781 calls, |
|
782 self._redirect, |
|
783 ) |
|
784 |
|
785 # TODO we probably want to validate the HTTP code, media type, etc. |
|
786 |
|
787 self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1) |
|
788 self._responsef = self._responseexecutor.submit( |
|
789 self._handleresponse, handler, resp |
|
790 ) |
|
791 |
|
792 def close(self): |
|
793 if self._closed: |
|
794 return |
|
795 |
|
796 self.sendcommands() |
|
797 |
|
798 self._closed = True |
|
799 |
|
800 if not self._responsef: |
|
801 return |
|
802 |
|
803 # TODO ^C here may not result in immediate program termination. |
|
804 |
|
805 try: |
|
806 self._responsef.result() |
|
807 finally: |
|
808 self._responseexecutor.shutdown(wait=True) |
|
809 self._responsef = None |
|
810 self._responseexecutor = None |
|
811 |
|
812 # If any of our futures are still in progress, mark them as |
|
813 # errored, otherwise a result() could wait indefinitely. |
|
814 for f in self._futures: |
|
815 if not f.done(): |
|
816 f.set_exception( |
|
817 error.ResponseError(_(b'unfulfilled command response')) |
|
818 ) |
|
819 |
|
820 self._futures = None |
|
821 |
|
822 def _handleresponse(self, handler, resp): |
|
823 # Called in a thread to read the response. |
|
824 |
|
825 while handler.readdata(resp): |
|
826 pass |
|
827 |
|
828 |
|
829 @interfaceutil.implementer(repository.ipeerv2) |
|
830 class httpv2peer(object): |
|
831 |
|
832 limitedarguments = False |
|
833 |
|
834 def __init__( |
|
835 self, ui, repourl, apipath, opener, requestbuilder, apidescriptor |
|
836 ): |
|
837 self.ui = ui |
|
838 self.apidescriptor = apidescriptor |
|
839 |
|
840 if repourl.endswith(b'/'): |
|
841 repourl = repourl[:-1] |
|
842 |
|
843 self._url = repourl |
|
844 self._apipath = apipath |
|
845 self._apiurl = b'%s/%s' % (repourl, apipath) |
|
846 self._opener = opener |
|
847 self._requestbuilder = requestbuilder |
|
848 |
|
849 self._redirect = wireprotov2peer.supportedredirects(ui, apidescriptor) |
|
850 |
|
851 # Start of ipeerconnection. |
|
852 |
|
853 def url(self): |
|
854 return self._url |
|
855 |
|
856 def local(self): |
|
857 return None |
|
858 |
|
859 def peer(self): |
|
860 return self |
|
861 |
|
862 def canpush(self): |
|
863 # TODO change once implemented. |
|
864 return False |
|
865 |
|
866 def close(self): |
|
867 self.ui.note( |
|
868 _( |
|
869 b'(sent %d HTTP requests and %d bytes; ' |
|
870 b'received %d bytes in responses)\n' |
|
871 ) |
|
872 % ( |
|
873 self._opener.requestscount, |
|
874 self._opener.sentbytescount, |
|
875 self._opener.receivedbytescount, |
|
876 ) |
|
877 ) |
|
878 |
|
879 # End of ipeerconnection. |
|
880 |
|
881 # Start of ipeercapabilities. |
|
882 |
|
883 def capable(self, name): |
|
884 # The capabilities used internally historically map to capabilities |
|
885 # advertised from the "capabilities" wire protocol command. However, |
|
886 # version 2 of that command works differently. |
|
887 |
|
888 # Maps to commands that are available. |
|
889 if name in ( |
|
890 b'branchmap', |
|
891 b'getbundle', |
|
892 b'known', |
|
893 b'lookup', |
|
894 b'pushkey', |
|
895 ): |
|
896 return True |
|
897 |
|
898 # Other concepts. |
|
899 if name in (b'bundle2',): |
|
900 return True |
|
901 |
|
902 # Alias command-* to presence of command of that name. |
|
903 if name.startswith(b'command-'): |
|
904 return name[len(b'command-') :] in self.apidescriptor[b'commands'] |
|
905 |
|
906 return False |
|
907 |
|
908 def requirecap(self, name, purpose): |
|
909 if self.capable(name): |
|
910 return |
|
911 |
|
912 raise error.CapabilityError( |
|
913 _( |
|
914 b'cannot %s; client or remote repository does not support the ' |
|
915 b'\'%s\' capability' |
|
916 ) |
|
917 % (purpose, name) |
|
918 ) |
|
919 |
|
920 # End of ipeercapabilities. |
|
921 |
|
922 def _call(self, name, **args): |
|
923 with self.commandexecutor() as e: |
|
924 return e.callcommand(name, args).result() |
|
925 |
|
926 def commandexecutor(self): |
|
927 return httpv2executor( |
|
928 self.ui, |
|
929 self._opener, |
|
930 self._requestbuilder, |
|
931 self._apiurl, |
|
932 self.apidescriptor, |
|
933 self._redirect, |
|
934 ) |
|
935 |
|
936 |
|
937 # Registry of API service names to metadata about peers that handle it. |
|
938 # |
|
939 # The following keys are meaningful: |
|
940 # |
|
941 # init |
|
942 # Callable receiving (ui, repourl, servicepath, opener, requestbuilder, |
|
943 # apidescriptor) to create a peer. |
|
944 # |
|
945 # priority |
|
946 # Integer priority for the service. If we could choose from multiple |
|
947 # services, we choose the one with the highest priority. |
|
948 API_PEERS = { |
|
949 wireprototypes.HTTP_WIREPROTO_V2: { |
|
950 b'init': httpv2peer, |
|
951 b'priority': 50, |
|
952 }, |
|
953 } |
|
954 |
|
955 |
|
956 def performhandshake(ui, url, opener, requestbuilder): |
555 def performhandshake(ui, url, opener, requestbuilder): |
957 # The handshake is a request to the capabilities command. |
556 # The handshake is a request to the capabilities command. |
958 |
557 |
959 caps = None |
558 caps = None |
960 |
559 |
961 def capable(x): |
560 def capable(x): |
962 raise error.ProgrammingError(b'should not be called') |
561 raise error.ProgrammingError(b'should not be called') |
963 |
562 |
964 args = {} |
563 args = {} |
965 |
|
966 # The client advertises support for newer protocols by adding an |
|
967 # X-HgUpgrade-* header with a list of supported APIs and an |
|
968 # X-HgProto-* header advertising which serializing formats it supports. |
|
969 # We only support the HTTP version 2 transport and CBOR responses for |
|
970 # now. |
|
971 advertisev2 = ui.configbool(b'experimental', b'httppeer.advertise-v2') |
|
972 |
|
973 if advertisev2: |
|
974 args[b'headers'] = { |
|
975 'X-HgProto-1': 'cbor', |
|
976 } |
|
977 |
|
978 args[b'headers'].update( |
|
979 encodevalueinheaders( |
|
980 b' '.join(sorted(API_PEERS)), |
|
981 b'X-HgUpgrade', |
|
982 # We don't know the header limit this early. |
|
983 # So make it small. |
|
984 1024, |
|
985 ) |
|
986 ) |
|
987 |
564 |
988 req, requrl, qs = makev1commandrequest( |
565 req, requrl, qs = makev1commandrequest( |
989 ui, requestbuilder, caps, capable, url, b'capabilities', args |
566 ui, requestbuilder, caps, capable, url, b'capabilities', args |
990 ) |
567 ) |
991 resp = sendrequest(ui, opener, req) |
568 resp = sendrequest(ui, opener, req) |
1002 # issue without behavior degradation. And according to issue 5860, it may |
579 # issue without behavior degradation. And according to issue 5860, it may |
1003 # be a longstanding bug in some server implementations. So we allow a |
580 # be a longstanding bug in some server implementations. So we allow a |
1004 # redirect that drops the query string to "just work." |
581 # redirect that drops the query string to "just work." |
1005 try: |
582 try: |
1006 respurl, ct, resp = parsev1commandresponse( |
583 respurl, ct, resp = parsev1commandresponse( |
1007 ui, url, requrl, qs, resp, compressible=False, allowcbor=advertisev2 |
584 ui, url, requrl, qs, resp, compressible=False |
1008 ) |
585 ) |
1009 except RedirectedRepoError as e: |
586 except RedirectedRepoError as e: |
1010 req, requrl, qs = makev1commandrequest( |
587 req, requrl, qs = makev1commandrequest( |
1011 ui, requestbuilder, caps, capable, e.respurl, b'capabilities', args |
588 ui, requestbuilder, caps, capable, e.respurl, b'capabilities', args |
1012 ) |
589 ) |
1013 resp = sendrequest(ui, opener, req) |
590 resp = sendrequest(ui, opener, req) |
1014 respurl, ct, resp = parsev1commandresponse( |
591 respurl, ct, resp = parsev1commandresponse( |
1015 ui, url, requrl, qs, resp, compressible=False, allowcbor=advertisev2 |
592 ui, url, requrl, qs, resp, compressible=False |
1016 ) |
593 ) |
1017 |
594 |
1018 try: |
595 try: |
1019 rawdata = resp.read() |
596 rawdata = resp.read() |
1020 finally: |
597 finally: |
1021 resp.close() |
598 resp.close() |
1022 |
599 |
1023 if not ct.startswith(b'application/mercurial-'): |
600 if not ct.startswith(b'application/mercurial-'): |
1024 raise error.ProgrammingError(b'unexpected content-type: %s' % ct) |
601 raise error.ProgrammingError(b'unexpected content-type: %s' % ct) |
1025 |
602 |
1026 if advertisev2: |
603 info = {b'v1capabilities': set(rawdata.split())} |
1027 if ct == b'application/mercurial-cbor': |
|
1028 try: |
|
1029 info = cborutil.decodeall(rawdata)[0] |
|
1030 except cborutil.CBORDecodeError: |
|
1031 raise error.Abort( |
|
1032 _(b'error decoding CBOR from remote server'), |
|
1033 hint=_( |
|
1034 b'try again and consider contacting ' |
|
1035 b'the server operator' |
|
1036 ), |
|
1037 ) |
|
1038 |
|
1039 # We got a legacy response. That's fine. |
|
1040 elif ct in (b'application/mercurial-0.1', b'application/mercurial-0.2'): |
|
1041 info = {b'v1capabilities': set(rawdata.split())} |
|
1042 |
|
1043 else: |
|
1044 raise error.RepoError( |
|
1045 _(b'unexpected response type from server: %s') % ct |
|
1046 ) |
|
1047 else: |
|
1048 info = {b'v1capabilities': set(rawdata.split())} |
|
1049 |
604 |
1050 return respurl, info |
605 return respurl, info |
1051 |
606 |
1052 |
607 |
1053 def makepeer(ui, path, opener=None, requestbuilder=urlreq.request): |
608 def makepeer(ui, path, opener=None, requestbuilder=urlreq.request): |