From ae99050289f369f7da501de0a31a70c1bf22414e Mon Sep 17 00:00:00 2001 From: hellerve Date: Mon, 18 Mar 2019 20:47:36 +0100 Subject: [PATCH] split up into modules --- main.py | 2 +- mg/__init__.py | 708 +------------------------------------------------ mg/commands.py | 250 +++++++++++++++++ mg/core.py | 452 +++++++++++++++++++++++++++++++ 4 files changed, 704 insertions(+), 708 deletions(-) create mode 100644 mg/commands.py create mode 100644 mg/core.py diff --git a/main.py b/main.py index 7a96600..e74beeb 100644 --- a/main.py +++ b/main.py @@ -1,4 +1,4 @@ #!/usr/bin/env python3 import mg -mg.cli() +mg.commands.cli() diff --git a/mg/__init__.py b/mg/__init__.py index f3cde89..af22667 100644 --- a/mg/__init__.py +++ b/mg/__init__.py @@ -1,707 +1 @@ -import collections -import configparser -import datetime -import hashlib -import os -import re -import stat -import sys -import time -import zlib - -import click - - -class MGException(Exception): - pass - - -class MGRepository: - """ - Our abstraction of a Git repository. Has a work directory, a Git directory, - and a bunch of configuration. - """ - - def __init__(self, path, force=False): - self.worktree = path - self.gitdir = (self.find(path, required=not force) or - os.path.join(path, '.git')) - - # Read configuration file in .git/config - self.conf = configparser.ConfigParser() - cf = self.file('config') - - if cf and os.path.exists(cf): - self.conf.read([cf]) - elif not force: - raise MGException('Can’t find the configuration file!') - - self.ignore_patterns = [] - - if os.path.exists(os.path.expanduser('~/.gitignore')): - with open(os.path.expanduser('~/.gitignore'), 'r') as f: - self.ignore_patterns.extend(l.strip() for l in f.readlines() if l.strip()) - - if not force: - vers = int(self.conf.get('core', 'repositoryformatversion')) - if vers != 0 and not force: - raise MGException( - 'Expected git repo version <= 1, found {}'.format(vers) - ) - - def path(self, *path): - return os.path.join(self.gitdir, *path) - - def file(self, *path, mkdir=False): - if self.dir(*path[:-1], mkdir=mkdir): - return self.path(*path) - - def dir(self, *path, mkdir=False): - path = self.path(*path) - - if os.path.exists(path): - if not os.path.isdir(path): - raise MGException('{} is not a directory!'.format(path)) - return path - - if not mkdir: - return None - - os.makedirs(path) - return path - - def create(self): - if os.path.exists(self.worktree): - if not os.path.isdir(self.worktree): - raise MGException( - '{} is not a directory!',format(self.worktree) - ) - else: - os.makedirs(self.worktree) - - self.dir('branches', mkdir=True) - self.dir('objects', mkdir=True) - self.dir('refs', 'tags', mkdir=True) - self.dir('refs', 'heads', mkdir=True) - - # .git/description - with open(self.file('description'), 'w') as f: - f.write( - 'Unnamed repository; edit this file \'description\' to name the repository.\n' - ) - - # .git/HEAD - with open(self.file('HEAD'), 'w') as f: - f.write('ref: refs/heads/master\n') - - with open(self.file('config'), 'w') as f: - config = self.default_config() - config.write(f) - - def default_config(self): - ret = configparser.ConfigParser() - - ret.add_section('core') - ret.set('core', 'repositoryformatversion', '0') - ret.set('core', 'filemode', 'false') - ret.set('core', 'bare', 'false') - - return ret - - def find(self, path, required=True): - path = os.path.realpath(path) - dir_candidate = os.path.join(path, '.git') - - if os.path.isdir(dir_candidate): - return dir_candidate - - # If we haven't returned, recurse in parent, if w - parent = os.path.realpath(os.path.join(path, '..')) - - if parent == path: - # Bottom case - # os.path.join('/', '..') == '/': - # If parent==path, then path is root. - if required: - raise MGException('not a git repository (or any of the parent directories): .git') - return None - - # Recursive case - return self.find(parent, required) - - -class MGObject: - def __init__(self, repo, data=None, sha=''): - self.repo = repo - self.sha = "{:>040s}".format(sha) - - if data: - self.deserialize(data) - - def serialize(self): - raise NotImplemented('serialize needs to be implemented!') - - def deserialize(self, data): - raise NotImplemented('deserialize needs to be implemented!') - - @staticmethod - def read(repo, sha, candidates): - path = repo.file('objects', str(sha[0:2]), str(sha[2:])) - - if not path: - raise MGException('{}: bad file'.format(sha)) - - with open(path, 'rb') as f: - raw = zlib.decompress(f.read()) - - # Read object type - space = raw.find(b' ') - fmt = raw[0:space].decode('utf-8') - - # Read and validate object size - end = raw.find(b'\x00', space) - size = int(raw[space:end].decode('ascii')) - if size != len(raw)-end-1: - raise MGException('Malformed object {}: bad length'.format(sha)) - - # Call constructor and return object - return MGObject.find_candidate(candidates, fmt)(repo, raw[end+1:], sha=sha) - - @staticmethod - def find_candidate(candidates, fmt): - candidates = [c for c in candidates if c.understands == fmt] - if not candidates: - raise MGException('Unknown object type {}'.format(fmt)) - if len(candidates) != 1: - raise MGException( - 'Multiple classes ({}) claim to understand object type {}'.format(candidates, fmt) - ) - - return candidates[0] - - - def write(self, write=True): - # Serialize object data - data = self.serialize() - # Add header - result = b'%s %d\x00%s' % (self.understands.encode(), len(data), data) - # Compute hash - sha = hashlib.sha1(result).hexdigest() - - if write: - # Compute path - path = self.repo.file('objects', sha[0:2], sha[2:], mkdir=write) - - if not os.path.exists(path): - with open(path, 'wb') as f: - # Compress and write - f.write(zlib.compress(result)) - - return sha - - -class MGBlob(MGObject): - understands = 'blob' - - def serialize(self): - return self.data - - def deserialize(self, data): - self.data = data - - -class MGCommit(MGObject): - understands = 'commit' - - def __init__(self, *args, **kwargs): - self.headers = collections.OrderedDict() - super().__init__(*args, **kwargs) - - def deserialize(self, raw, offs=0): - spc = raw.find(b' ', offs) - nl = raw.find(b'\n', offs) - - if spc < 0 or nl < spc: - self.data = raw[offs+1:] - return - - key = raw[offs:spc] - - end = raw.find(b'\n', offs+1) - while raw[end+1] == ord(' '): - end = raw.find(b'\n', end+1) - - value = raw[spc+1:end].replace(b'\n ', b'\n') - - if key in self.headers: - self.headers[key].append(value) - else: - self.headers[key] = [value] - - return self.deserialize(raw, offs=end+1) - - def serialize(self): - ret = b'' - - for k, v in self.headers.items(): - for val in v: - ret += k + b' ' + val.replace(b'\n', b'\n ') + b'\n' - - ret += b'\n' + self.data - - return ret - - def from_headers(self, header, dflt): - return self.headers.get(header, [dflt])[0] - - def format(self): - fmt_str = '{}\nAuthor: {}\nDate: {}\n\n{}' - author_info = self.from_headers(b'author', b'unknown').decode('utf-8') - name, email, timestamp, tz = author_info.split(' ') - author = '{} {}'.format(name, email) - date = datetime.datetime.fromtimestamp(int(timestamp)) - # were cheating with the timezone, but thats okay - date = '{}{}'.format(date.strftime('%a %b %H:%M:%S %Y %z'), tz) - commit_info = '\n'.join(' {}'.format(l) for l in self.data.decode('utf-8').split('\n')) - yellow = click.style('commit {}'.format(self.sha), fg='yellow') - return fmt_str.format(yellow, author, date, commit_info) - - @staticmethod - def create(repo, msg, parent=None): - res = MGCommit(repo) - res.headers[b'tree'] = [MGTree.create(repo, '.').write().encode('ascii')] - conf = configparser.ConfigParser() - conf.read([os.path.expanduser('~/.gitconfig')]) - res.headers[b'author'] = [b'%s <%s> %d %s' % ( - conf.get('user', 'name').encode('utf-8'), - conf.get('user', 'email').encode('utf-8'), - int(datetime.datetime.now().timestamp()), - time.strftime('%z', time.gmtime()).encode('utf-8') - )] - res.headers[b'committer'] = res.headers[b'author'] - if parent: - res.headers[b'parent'] = [parent.encode('ascii')] - res.data = msg.encode('utf-8') - return res - - -class MGTree(MGObject): - understands = 'tree' - - class MGLeaf: - def __init__(self, mode, path, sha): - self.mode = mode - self.path = path - self.sha = "{:>020s}".format(sha) - - def deserialize_leaf(self, raw, offs=0): - space = raw.find(b' ', offs) - mode = raw[offs:space] - - null = raw.find(b'\x00', space) - path = raw[space+1:null] - end = null+21 - - sha = format(int.from_bytes(raw[null+1:end], 'big'), 'x') - sha = "{:>040s}".format(sha) - - return end, MGTree.MGLeaf(mode, path, sha) - - def deserialize(self, raw): - self.data = [] - pos = 0 - end = len(raw) - while pos < end: - pos, data = self.deserialize_leaf(raw, pos) - self.data.append(data) - - def serialize(self): - ret = b'' - for i in self.data: - sha_str = int(i.sha, 16).to_bytes(20, byteorder='big') - ret += i.mode + b' ' + i.path + b'\x00' + sha_str - return ret - - def check_prefix(self, d, f): - d = os.path.realpath(d) - f = os.path.realpath(f) - - if os.path.isdir(d): f = os.path.join(d, b'') - if os.path.isdir(f): f = os.path.join(f, b'') - - common = os.path.commonprefix([f, d]) - - return common == d or common == f - - def checkout(self, path, prefix=b'.'): - for item in self.data: - obj = MGObject.read(self.repo, item.sha, DFLT_CANDIDATES) - dest = os.path.join(path, item.path) - - if obj.understands == 'tree' and self.check_prefix(prefix, dest): - if not os.path.exists(dest): - os.mkdir(dest) - obj.checkout(dest) - if obj.understands == 'blob': - with open(dest, 'wb') as f: - f.write(obj.data) - - @staticmethod - def create(repo, path): - res = MGTree(repo) - res.data = [] - - for (d, _, files) in os.walk(path): - for f in files: - item = os.path.join(d, f) - if '/.git/' in item or any(p in item for p in repo.ignore_patterns): - continue - mode = format(os.stat(item)[stat.ST_MODE], 'o').encode('ascii') - - candidate = MGObject.find_candidate(DFLT_CANDIDATES, 'blob') - with open(item, 'rb') as f: - obj = candidate(repo, f.read()) - sha = obj.write() - item = item[2:] - res.data.append(MGTree.MGLeaf(mode, item.encode('utf-8'), sha)) - - return res - - -class MGTag(MGCommit): - understands = 'tag' - - @staticmethod - def create(repo, name, to): - with open(repo.file(os.path.join('refs', 'tags', name)), 'w+') as f: - f.write(to.resolve()) - - -DFLT_CANDIDATES = [MGBlob, MGCommit, MGTree, MGTag] - - -class MGRef: - def __init__(self, repo, ref, children=None): - self.repo = repo - self.ref = ref - self.children = children - - def resolve(self): - recursive = True - data = self.ref - while recursive: - data, recursive, _ = self.references(data) - return data - - def references(self, ref=None): - if ref is None: - ref = self.ref - try: - with open(self.repo.file(ref), 'r') as fp: - data = fp.read().strip() - except FileNotFoundError: - prefix_dir = os.path.join('objects', ref[0:2]) - path = self.repo.dir(prefix_dir) - if path: - rem = ref[2:] - candidates = [] - for f in os.listdir(path): - if f.startswith(rem): - candidates.append("{}{}".format(ref, f)) - if len(candidates) > 1: - s = ', '.join(candidates) - raise MGException( - 'Ambiguous revision {}\nCandidates: {}'.format(ref, s) - ) - if len(candidates) == 1: - return candidates[0], False, False - - - head = os.path.join('refs', 'heads', ref) - tag = os.path.join('refs', 'tags', ref) - if os.path.exists(self.repo.file(head)): - res, rec, _ = self.references(head) - return res, rec, head - if os.path.exists(self.repo.file(tag)): - res, rec, _ = self.references(tag) - return res, rec, tag - return ref, False, False - if data.startswith('ref: '): - return data[5:], True, False - return data, False, False - - def changes_ref(self): - recursive = True - data = self.ref - while recursive: - data, recursive, new_ref = self.references(data) - return new_ref - - @staticmethod - def list(repo, path=None): - if not path: - path = repo.dir('refs') - res = [] - - for f in sorted(os.listdir(path)): - can = os.path.join(path, f) - if os.path.isdir(can): - res.append(MGRef(repo, can, MGRef.list(repo, can))) - else: - res.append(MGRef(repo, can)) - - return res - - def show(self): - if self.children is None: - ref = self.ref.replace(self.repo.dir(''), '') - print('{} {}'.format(self.resolve(), ref)) - else: - for ref in self.children: - ref.show() - - -class MGCatch(click.Group): - def invoke(self, ctx): - try: - return super().invoke(ctx) - except MGException as exc: - print('fatal: {}'.format(exc)) - - -@click.group(help='The stupid content tracker', - cls=MGCatch) -def cli(argv=None): - pass - - -@cli.command() -def add(): - raise MGException('Unimplemented') - - -@cli.command(help="List, create, or delete branches") -@click.argument("name", default=None, required=False) -@click.option("-d", is_flag=True, help="Delete a branch.") -def branch(name, d): - repo = MGRepository('.') - ref = MGRef(repo, 'HEAD') - if d and name: - branch_file = repo.file(os.path.join('refs', 'heads', name)) - if not os.path.exists(branch_file): - raise MGException('Branch \'{}\' not found'.format(name)) - os.remove(branch_file) - elif name: - branch_file = repo.file(os.path.join('refs', 'heads', name)) - if os.path.exists(branch_file): - raise MGException('Branch \'{}\' already exists'.format(name)) - with open(branch_file, 'w') as f: - f.write(ref.resolve()) - elif d: - raise MGException('branch name required') - else: - refbranch, _, _2 = ref.references() - refbranch = os.path.basename(refbranch) - for branch in os.listdir(repo.file(os.path.join('refs', 'heads'))): - if refbranch == branch: - print('* {}'.format(click.style(branch, bg='yellow'))) - else: - print(' {}'.format(click.style(branch, fg='yellow'))) - - -@cli.command( - name='cat-file', - help='Provide content or type and size information for repository objects' -) -@click.argument( - 't', - metavar='type', - type=click.Choice(['blob', 'commit', 'tag', 'tree']) -) -@click.argument('o', metavar='object') -def cat_file(t, o): - repo = MGRepository('.') - obj = MGObject.read(repo, o, DFLT_CANDIDATES) - print(obj.serialize().decode('utf-8'), end='') - - -@cli.command(help='Checkout a commit or restore files.') -@click.argument('commit', default='HEAD') -@click.option('-p', 'path', default='.', help='The path to check out.') -def checkout(commit, path): - repo = MGRepository('.') - - commit = MGRef(repo, commit) - changes_ref = commit.changes_ref() - - obj = MGObject.read(repo, commit.resolve(), DFLT_CANDIDATES) - - if obj.understands == 'commit': - obj = MGObject.read(repo, obj.from_headers(b'tree', b'').decode('ascii'), DFLT_CANDIDATES) - - if not os.path.exists(path): - os.makedirs(path) - - obj.checkout(b'.', path.encode('utf-8')) - - if changes_ref: - with open(repo.file('HEAD'), 'w') as f: - f.write('ref: {}'.format(changes_ref)) - - -@cli.command() -@click.argument('message') -def commit(message): - repo = MGRepository('.') - parent = None - ref = MGRef(repo, 'HEAD') - references, _, _ = ref.references() - references = repo.file(references) - resolved = ref.resolve() - if os.path.exists(resolved): - with open(ref.resolve(), 'r') as f: - parent = f.read().strip() - commit = MGCommit.create(repo, message, parent) - os.makedirs(os.path.dirname(references), exist_ok=True) - with open(references, 'w+') as f: - f.write(commit.write()) - f.write('\n') - - -@cli.command( - name='hash-object', - help='Compute object ID and optionally creates a blob from a file' -) -@click.option( - '-t', - 't', - metavar='type', - type=click.Choice(['blob', 'commit', 'tag', 'tree']), - default='blob', - help='Specify the type' -) -@click.option( - '-w', - 'write', - is_flag=True, - help='Actually write the object into the database' -) -@click.argument('path') -def hash_object(t, write, path): - repo = MGRepository('.') if write else None - - try: - with open(path, 'rb') as f: - obj = MGObject.find_candidate(DFLT_CANDIDATES, t)(repo, f.read()) - except OSError: - raise MGException('unable to hash {}'.format(path)) - sha = obj.write(write) - print(sha) - - -@cli.command(help='Initialize a new, empty repository.') -@click.argument('path', default='.') -def init(path): - MGRepository(path, force=True).create() - - -@cli.command(help='Display history of a given commit.') -@click.argument('commit', default='HEAD') -def log(commit): - repo = MGRepository('.') - - ref = MGRef(repo, commit) - - logfn(repo, ref.resolve(), set()) - - -def logfn(repo, sha, seen): - if sha in seen: - return - seen.add(sha) - - commit = MGObject.read(repo, sha, DFLT_CANDIDATES) - - print(commit.format()) - - if not b'parent' in commit.headers: - return - - parents = commit.headers[b'parent'] - - for p in parents: - logfn(repo, p.decode('ascii'), seen) - - -@cli.command(help='Pretty-print a tree object.') -@click.argument('o', metavar='object') -@click.option('-r', is_flag=True, help='Recurse into sub-trees.') -def ls_tree(o, r): - repo = MGRepository('.') - - def internal(o, r, prefix=''): - o = MGObject.read(repo, o, DFLT_CANDIDATES) - - if o.understands != 'tree': - raise MGException('not a tree object') - - for item in o.data: - path = os.path.join(prefix, item.path.decode('ascii')) - t = MGObject.read(repo, item.sha, DFLT_CANDIDATES).understands - if t == 'tree' and r: - internal(item.sha, r, prefix=path) - else: - print('{0} {1} {2}\t{3}'.format( - '0' * (6 - len(item.mode)) + item.mode.decode('ascii'), - t, - item.sha, - path)) - - internal(o, r) - - -@cli.command() -def merge(): - raise MGException('Unimplemented') - - -@cli.command() -def rebase(): - raise MGException('Unimplemented') - - -@cli.command() -@click.argument("name") -def rev_parse(name): - repo = MGRepository(".") - print(MGRef(repo, name).resolve()) - - -@cli.command() -def rm(): - raise MGException('Unimplemented') - - -@cli.command(help='List references.') -def show_ref(): - repo = MGRepository('.') - refs = MGRef.list(repo) - for ref in refs: - ref.show() - - -@cli.command(help='List and create tags') -@click.argument('name', default=None, required=False) -@click.argument('o', metavar='object', default='HEAD') -def tag(name, o): - repo = MGRepository('.') - - if name: - MGTag.create(repo, name, MGRef(repo, o)) - else: - refs = MGRef.list(repo) - for ref in refs: - prefix = repo.dir(os.path.join('refs', 'tags')) - if ref.ref == prefix: - for child in ref.children: - print(child.ref.replace(os.path.join(prefix, ''), '')) +from mg import commands diff --git a/mg/commands.py b/mg/commands.py new file mode 100644 index 0000000..4f78b16 --- /dev/null +++ b/mg/commands.py @@ -0,0 +1,250 @@ +import os + +import click + +from .core import * + +class MGCatch(click.Group): + def invoke(self, ctx): + try: + return super().invoke(ctx) + except MGException as exc: + print('fatal: {}'.format(exc)) + + +@click.group(help='The stupid content tracker', + cls=MGCatch) +def cli(argv=None): + pass + + +@cli.command() +def add(): + raise MGException('Unimplemented') + + +@cli.command(help="List, create, or delete branches") +@click.argument("name", default=None, required=False) +@click.option("-d", is_flag=True, help="Delete a branch.") +def branch(name, d): + repo = MGRepository('.') + ref = MGRef(repo, 'HEAD') + if d and name: + branch_file = repo.file(os.path.join('refs', 'heads', name)) + if not os.path.exists(branch_file): + raise MGException('Branch \'{}\' not found'.format(name)) + os.remove(branch_file) + elif name: + branch_file = repo.file(os.path.join('refs', 'heads', name)) + if os.path.exists(branch_file): + raise MGException('Branch \'{}\' already exists'.format(name)) + with open(branch_file, 'w') as f: + f.write(ref.resolve()) + elif d: + raise MGException('branch name required') + else: + refbranch, _, _2 = ref.references() + refbranch = os.path.basename(refbranch) + for branch in os.listdir(repo.file(os.path.join('refs', 'heads'))): + if refbranch == branch: + print('* {}'.format(click.style(branch, bg='yellow'))) + else: + print(' {}'.format(click.style(branch, fg='yellow'))) + + +@cli.command( + name='cat-file', + help='Provide content or type and size information for repository objects' +) +@click.argument( + 't', + metavar='type', + type=click.Choice(['blob', 'commit', 'tag', 'tree']) +) +@click.argument('o', metavar='object') +def cat_file(t, o): + repo = MGRepository('.') + obj = MGObject.read(repo, o, DFLT_CANDIDATES) + print(obj.serialize().decode('utf-8'), end='') + + +@cli.command(help='Checkout a commit or restore files.') +@click.argument('commit', default='HEAD') +@click.option('-p', 'path', default='.', help='The path to check out.') +def checkout(commit, path): + repo = MGRepository('.') + + commit = MGRef(repo, commit) + changes_ref = commit.changes_ref() + + obj = MGObject.read(repo, commit.resolve(), DFLT_CANDIDATES) + + if obj.understands == 'commit': + obj = MGObject.read(repo, obj.from_headers(b'tree', b'').decode('ascii'), DFLT_CANDIDATES) + + if not os.path.exists(path): + os.makedirs(path) + + obj.checkout(b'.', path.encode('utf-8')) + + if changes_ref: + with open(repo.file('HEAD'), 'w') as f: + f.write('ref: {}'.format(changes_ref)) + + +@cli.command() +@click.argument('message') +def commit(message): + repo = MGRepository('.') + parent = None + ref = MGRef(repo, 'HEAD') + references, _, _ = ref.references() + references = repo.file(references) + resolved = ref.resolve() + if os.path.exists(resolved): + with open(ref.resolve(), 'r') as f: + parent = f.read().strip() + commit = MGCommit.create(repo, message, parent) + os.makedirs(os.path.dirname(references), exist_ok=True) + with open(references, 'w+') as f: + f.write(commit.write()) + f.write('\n') + + +@cli.command( + name='hash-object', + help='Compute object ID and optionally creates a blob from a file' +) +@click.option( + '-t', + 't', + metavar='type', + type=click.Choice(['blob', 'commit', 'tag', 'tree']), + default='blob', + help='Specify the type' +) +@click.option( + '-w', + 'write', + is_flag=True, + help='Actually write the object into the database' +) +@click.argument('path') +def hash_object(t, write, path): + repo = MGRepository('.') if write else None + + try: + with open(path, 'rb') as f: + obj = MGObject.find_candidate(DFLT_CANDIDATES, t)(repo, f.read()) + except OSError: + raise MGException('unable to hash {}'.format(path)) + sha = obj.write(write) + print(sha) + + +@cli.command(help='Initialize a new, empty repository.') +@click.argument('path', default='.') +def init(path): + MGRepository(path, force=True).create() + + +@cli.command(help='Display history of a given commit.') +@click.argument('commit', default='HEAD') +def log(commit): + repo = MGRepository('.') + + ref = MGRef(repo, commit) + + logfn(repo, ref.resolve(), set()) + + +def logfn(repo, sha, seen): + if sha in seen: + return + seen.add(sha) + + commit = MGObject.read(repo, sha, DFLT_CANDIDATES) + + print(commit.format()) + + if not b'parent' in commit.headers: + return + + parents = commit.headers[b'parent'] + + for p in parents: + logfn(repo, p.decode('ascii'), seen) + + +@cli.command(help='Pretty-print a tree object.') +@click.argument('o', metavar='object') +@click.option('-r', is_flag=True, help='Recurse into sub-trees.') +def ls_tree(o, r): + repo = MGRepository('.') + + def internal(o, r, prefix=''): + o = MGObject.read(repo, o, DFLT_CANDIDATES) + + if o.understands != 'tree': + raise MGException('not a tree object') + + for item in o.data: + path = os.path.join(prefix, item.path.decode('ascii')) + t = MGObject.read(repo, item.sha, DFLT_CANDIDATES).understands + if t == 'tree' and r: + internal(item.sha, r, prefix=path) + else: + print('{0} {1} {2}\t{3}'.format( + '0' * (6 - len(item.mode)) + item.mode.decode('ascii'), + t, + item.sha, + path)) + + internal(o, r) + + +@cli.command() +def merge(): + raise MGException('Unimplemented') + + +@cli.command() +def rebase(): + raise MGException('Unimplemented') + + +@cli.command() +@click.argument("name") +def rev_parse(name): + repo = MGRepository(".") + print(MGRef(repo, name).resolve()) + + +@cli.command() +def rm(): + raise MGException('Unimplemented') + + +@cli.command(help='List references.') +def show_ref(): + repo = MGRepository('.') + refs = MGRef.list(repo) + for ref in refs: + ref.show() + + +@cli.command(help='List and create tags') +@click.argument('name', default=None, required=False) +@click.argument('o', metavar='object', default='HEAD') +def tag(name, o): + repo = MGRepository('.') + + if name: + MGTag.create(repo, name, MGRef(repo, o)) + else: + refs = MGRef.list(repo) + for ref in refs: + prefix = repo.dir(os.path.join('refs', 'tags')) + if ref.ref == prefix: + for child in ref.children: + print(child.ref.replace(os.path.join(prefix, ''), '')) diff --git a/mg/core.py b/mg/core.py new file mode 100644 index 0000000..615007e --- /dev/null +++ b/mg/core.py @@ -0,0 +1,452 @@ +import collections +import configparser +import datetime +import hashlib +import os +import stat +import time +import zlib + + +class MGException(Exception): + pass + + +class MGRepository: + def __init__(self, path, force=False): + self.worktree = path + self.gitdir = (self.find(path, required=not force) or + os.path.join(path, '.git')) + + # Read configuration file in .git/config + self.conf = configparser.ConfigParser() + cf = self.file('config') + + if cf and os.path.exists(cf): + self.conf.read([cf]) + elif not force: + raise MGException('Can’t find the configuration file!') + + self.ignore_patterns = [] + + if os.path.exists(os.path.expanduser('~/.gitignore')): + with open(os.path.expanduser('~/.gitignore'), 'r') as f: + self.ignore_patterns.extend(l.strip() for l in f.readlines() if l.strip()) + + if not force: + vers = int(self.conf.get('core', 'repositoryformatversion')) + if vers != 0 and not force: + raise MGException( + 'Expected git repo version <= 1, found {}'.format(vers) + ) + + def path(self, *path): + return os.path.join(self.gitdir, *path) + + def file(self, *path, mkdir=False): + if self.dir(*path[:-1], mkdir=mkdir): + return self.path(*path) + + def dir(self, *path, mkdir=False): + path = self.path(*path) + + if os.path.exists(path): + if not os.path.isdir(path): + raise MGException('{} is not a directory!'.format(path)) + return path + + if not mkdir: + return None + + os.makedirs(path) + return path + + def create(self): + if os.path.exists(self.worktree): + if not os.path.isdir(self.worktree): + raise MGException( + '{} is not a directory!',format(self.worktree) + ) + else: + os.makedirs(self.worktree) + + self.dir('branches', mkdir=True) + self.dir('objects', mkdir=True) + self.dir('refs', 'tags', mkdir=True) + self.dir('refs', 'heads', mkdir=True) + + # .git/description + with open(self.file('description'), 'w') as f: + f.write( + 'Unnamed repository; edit this file \'description\' to name the repository.\n' + ) + + # .git/HEAD + with open(self.file('HEAD'), 'w') as f: + f.write('ref: refs/heads/master\n') + + with open(self.file('config'), 'w') as f: + config = self.default_config() + config.write(f) + + def default_config(self): + ret = configparser.ConfigParser() + + ret.add_section('core') + ret.set('core', 'repositoryformatversion', '0') + ret.set('core', 'filemode', 'false') + ret.set('core', 'bare', 'false') + + return ret + + def find(self, path, required=True): + path = os.path.realpath(path) + dir_candidate = os.path.join(path, '.git') + + if os.path.isdir(dir_candidate): + return dir_candidate + + # If we haven't returned, recurse in parent, if w + parent = os.path.realpath(os.path.join(path, '..')) + + if parent == path: + # Bottom case + # os.path.join('/', '..') == '/': + # If parent==path, then path is root. + if required: + raise MGException('not a git repository (or any of the parent directories): .git') + return None + + # Recursive case + return self.find(parent, required) + + +class MGObject: + def __init__(self, repo, data=None, sha=''): + self.repo = repo + self.sha = "{:>040s}".format(sha) + + if data: + self.deserialize(data) + + def serialize(self): + raise NotImplemented('serialize needs to be implemented!') + + def deserialize(self, data): + raise NotImplemented('deserialize needs to be implemented!') + + @staticmethod + def read(repo, sha, candidates): + path = repo.file('objects', str(sha[0:2]), str(sha[2:])) + + if not path: + raise MGException('{}: bad file'.format(sha)) + + with open(path, 'rb') as f: + raw = zlib.decompress(f.read()) + + # Read object type + space = raw.find(b' ') + fmt = raw[0:space].decode('utf-8') + + # Read and validate object size + end = raw.find(b'\x00', space) + size = int(raw[space:end].decode('ascii')) + if size != len(raw)-end-1: + raise MGException('Malformed object {}: bad length'.format(sha)) + + # Call constructor and return object + return MGObject.find_candidate(candidates, fmt)(repo, raw[end+1:], sha=sha) + + @staticmethod + def find_candidate(candidates, fmt): + candidates = [c for c in candidates if c.understands == fmt] + if not candidates: + raise MGException('Unknown object type {}'.format(fmt)) + if len(candidates) != 1: + raise MGException( + 'Multiple classes ({}) claim to understand object type {}'.format(candidates, fmt) + ) + + return candidates[0] + + + def write(self, write=True): + # Serialize object data + data = self.serialize() + # Add header + result = b'%s %d\x00%s' % (self.understands.encode(), len(data), data) + # Compute hash + sha = hashlib.sha1(result).hexdigest() + + if write: + # Compute path + path = self.repo.file('objects', sha[0:2], sha[2:], mkdir=write) + + if not os.path.exists(path): + with open(path, 'wb') as f: + # Compress and write + f.write(zlib.compress(result)) + + return sha + + +class MGBlob(MGObject): + understands = 'blob' + + def serialize(self): + return self.data + + def deserialize(self, data): + self.data = data + + +class MGCommit(MGObject): + understands = 'commit' + + def __init__(self, *args, **kwargs): + self.headers = collections.OrderedDict() + super().__init__(*args, **kwargs) + + def deserialize(self, raw, offs=0): + spc = raw.find(b' ', offs) + nl = raw.find(b'\n', offs) + + if spc < 0 or nl < spc: + self.data = raw[offs+1:] + return + + key = raw[offs:spc] + + end = raw.find(b'\n', offs+1) + while raw[end+1] == ord(' '): + end = raw.find(b'\n', end+1) + + value = raw[spc+1:end].replace(b'\n ', b'\n') + + if key in self.headers: + self.headers[key].append(value) + else: + self.headers[key] = [value] + + return self.deserialize(raw, offs=end+1) + + def serialize(self): + ret = b'' + + for k, v in self.headers.items(): + for val in v: + ret += k + b' ' + val.replace(b'\n', b'\n ') + b'\n' + + ret += b'\n' + self.data + + return ret + + def from_headers(self, header, dflt): + return self.headers.get(header, [dflt])[0] + + def format(self): + fmt_str = '{}\nAuthor: {}\nDate: {}\n\n{}' + author_info = self.from_headers(b'author', b'unknown').decode('utf-8') + name, email, timestamp, tz = author_info.split(' ') + author = '{} {}'.format(name, email) + date = datetime.datetime.fromtimestamp(int(timestamp)) + # were cheating with the timezone, but thats okay + date = '{}{}'.format(date.strftime('%a %b %H:%M:%S %Y %z'), tz) + commit_info = '\n'.join(' {}'.format(l) for l in self.data.decode('utf-8').split('\n')) + yellow = click.style('commit {}'.format(self.sha), fg='yellow') + return fmt_str.format(yellow, author, date, commit_info) + + @staticmethod + def create(repo, msg, parent=None): + res = MGCommit(repo) + res.headers[b'tree'] = [MGTree.create(repo, '.').write().encode('ascii')] + conf = configparser.ConfigParser() + conf.read([os.path.expanduser('~/.gitconfig')]) + res.headers[b'author'] = [b'%s <%s> %d %s' % ( + conf.get('user', 'name').encode('utf-8'), + conf.get('user', 'email').encode('utf-8'), + int(datetime.datetime.now().timestamp()), + time.strftime('%z', time.gmtime()).encode('utf-8') + )] + res.headers[b'committer'] = res.headers[b'author'] + if parent: + res.headers[b'parent'] = [parent.encode('ascii')] + res.data = msg.encode('utf-8') + return res + + +class MGTree(MGObject): + understands = 'tree' + + class MGLeaf: + def __init__(self, mode, path, sha): + self.mode = mode + self.path = path + self.sha = "{:>020s}".format(sha) + + def deserialize_leaf(self, raw, offs=0): + space = raw.find(b' ', offs) + mode = raw[offs:space] + + null = raw.find(b'\x00', space) + path = raw[space+1:null] + end = null+21 + + sha = format(int.from_bytes(raw[null+1:end], 'big'), 'x') + sha = "{:>040s}".format(sha) + + return end, MGTree.MGLeaf(mode, path, sha) + + def deserialize(self, raw): + self.data = [] + pos = 0 + end = len(raw) + while pos < end: + pos, data = self.deserialize_leaf(raw, pos) + self.data.append(data) + + def serialize(self): + ret = b'' + for i in self.data: + sha_str = int(i.sha, 16).to_bytes(20, byteorder='big') + ret += i.mode + b' ' + i.path + b'\x00' + sha_str + return ret + + def check_prefix(self, d, f): + d = os.path.realpath(d) + f = os.path.realpath(f) + + if os.path.isdir(d): f = os.path.join(d, b'') + if os.path.isdir(f): f = os.path.join(f, b'') + + common = os.path.commonprefix([f, d]) + + return common == d or common == f + + def checkout(self, path, prefix=b'.'): + for item in self.data: + obj = MGObject.read(self.repo, item.sha, DFLT_CANDIDATES) + dest = os.path.join(path, item.path) + + if obj.understands == 'tree' and self.check_prefix(prefix, dest): + if not os.path.exists(dest): + os.mkdir(dest) + obj.checkout(dest) + if obj.understands == 'blob': + with open(dest, 'wb') as f: + f.write(obj.data) + + @staticmethod + def create(repo, path): + res = MGTree(repo) + res.data = [] + + for (d, _, files) in os.walk(path): + for f in files: + item = os.path.join(d, f) + if '/.git/' in item or any(p in item for p in repo.ignore_patterns): + continue + mode = format(os.stat(item)[stat.ST_MODE], 'o').encode('ascii') + + candidate = MGObject.find_candidate(DFLT_CANDIDATES, 'blob') + with open(item, 'rb') as f: + obj = candidate(repo, f.read()) + sha = obj.write() + item = item[2:] + res.data.append(MGTree.MGLeaf(mode, item.encode('utf-8'), sha)) + + return res + + +class MGTag(MGCommit): + understands = 'tag' + + @staticmethod + def create(repo, name, to): + with open(repo.file(os.path.join('refs', 'tags', name)), 'w+') as f: + f.write(to.resolve()) + + +DFLT_CANDIDATES = [MGBlob, MGCommit, MGTree, MGTag] + + +class MGRef: + def __init__(self, repo, ref, children=None): + self.repo = repo + self.ref = ref + self.children = children + + def resolve(self): + recursive = True + data = self.ref + while recursive: + data, recursive, _ = self.references(data) + return data + + def references(self, ref=None): + if ref is None: + ref = self.ref + try: + with open(self.repo.file(ref), 'r') as fp: + data = fp.read().strip() + except FileNotFoundError: + prefix_dir = os.path.join('objects', ref[0:2]) + path = self.repo.dir(prefix_dir) + if path: + rem = ref[2:] + candidates = [] + for f in os.listdir(path): + if f.startswith(rem): + candidates.append("{}{}".format(ref, f)) + if len(candidates) > 1: + s = ', '.join(candidates) + raise MGException( + 'Ambiguous revision {}\nCandidates: {}'.format(ref, s) + ) + if len(candidates) == 1: + return candidates[0], False, False + + + head = os.path.join('refs', 'heads', ref) + tag = os.path.join('refs', 'tags', ref) + if os.path.exists(self.repo.file(head)): + res, rec, _ = self.references(head) + return res, rec, head + if os.path.exists(self.repo.file(tag)): + res, rec, _ = self.references(tag) + return res, rec, tag + return ref, False, False + if data.startswith('ref: '): + return data[5:], True, False + return data, False, False + + def changes_ref(self): + recursive = True + data = self.ref + while recursive: + data, recursive, new_ref = self.references(data) + return new_ref + + @staticmethod + def list(repo, path=None): + if not path: + path = repo.dir('refs') + res = [] + + for f in sorted(os.listdir(path)): + can = os.path.join(path, f) + if os.path.isdir(can): + res.append(MGRef(repo, can, MGRef.list(repo, can))) + else: + res.append(MGRef(repo, can)) + + return res + + def show(self): + if self.children is None: + ref = self.ref.replace(self.repo.dir(''), '') + print('{} {}'.format(self.resolve(), ref)) + else: + for ref in self.children: + ref.show()