commit 6d5398a35cb41307c42a9bfd54dbbf7af67720c0 Author: hellerve Date: Mon Mar 18 20:03:50 2019 +0100 first version diff --git a/main.py b/main.py new file mode 100644 index 0000000..7a96600 --- /dev/null +++ b/main.py @@ -0,0 +1,4 @@ +#!/usr/bin/env python3 + +import mg +mg.cli() diff --git a/mg/__init__.py b/mg/__init__.py new file mode 100644 index 0000000..f3cde89 --- /dev/null +++ b/mg/__init__.py @@ -0,0 +1,707 @@ +import collections +import configparser +import datetime +import hashlib +import os +import re +import stat +import sys +import time +import zlib + +import click + + +class MGException(Exception): + pass + + +class MGRepository: + """ + Our abstraction of a Git repository. Has a work directory, a Git directory, + and a bunch of configuration. + """ + + def __init__(self, path, force=False): + self.worktree = path + self.gitdir = (self.find(path, required=not force) or + os.path.join(path, '.git')) + + # Read configuration file in .git/config + self.conf = configparser.ConfigParser() + cf = self.file('config') + + if cf and os.path.exists(cf): + self.conf.read([cf]) + elif not force: + raise MGException('Can’t find the configuration file!') + + self.ignore_patterns = [] + + if os.path.exists(os.path.expanduser('~/.gitignore')): + with open(os.path.expanduser('~/.gitignore'), 'r') as f: + self.ignore_patterns.extend(l.strip() for l in f.readlines() if l.strip()) + + if not force: + vers = int(self.conf.get('core', 'repositoryformatversion')) + if vers != 0 and not force: + raise MGException( + 'Expected git repo version <= 1, found {}'.format(vers) + ) + + def path(self, *path): + return os.path.join(self.gitdir, *path) + + def file(self, *path, mkdir=False): + if self.dir(*path[:-1], mkdir=mkdir): + return self.path(*path) + + def dir(self, *path, mkdir=False): + path = self.path(*path) + + if os.path.exists(path): + if not os.path.isdir(path): + raise MGException('{} is not a directory!'.format(path)) + return path + + if not mkdir: + return None + + os.makedirs(path) + return path + + def create(self): + if os.path.exists(self.worktree): + if not os.path.isdir(self.worktree): + raise MGException( + '{} is not a directory!',format(self.worktree) + ) + else: + os.makedirs(self.worktree) + + self.dir('branches', mkdir=True) + self.dir('objects', mkdir=True) + self.dir('refs', 'tags', mkdir=True) + self.dir('refs', 'heads', mkdir=True) + + # .git/description + with open(self.file('description'), 'w') as f: + f.write( + 'Unnamed repository; edit this file \'description\' to name the repository.\n' + ) + + # .git/HEAD + with open(self.file('HEAD'), 'w') as f: + f.write('ref: refs/heads/master\n') + + with open(self.file('config'), 'w') as f: + config = self.default_config() + config.write(f) + + def default_config(self): + ret = configparser.ConfigParser() + + ret.add_section('core') + ret.set('core', 'repositoryformatversion', '0') + ret.set('core', 'filemode', 'false') + ret.set('core', 'bare', 'false') + + return ret + + def find(self, path, required=True): + path = os.path.realpath(path) + dir_candidate = os.path.join(path, '.git') + + if os.path.isdir(dir_candidate): + return dir_candidate + + # If we haven't returned, recurse in parent, if w + parent = os.path.realpath(os.path.join(path, '..')) + + if parent == path: + # Bottom case + # os.path.join('/', '..') == '/': + # If parent==path, then path is root. + if required: + raise MGException('not a git repository (or any of the parent directories): .git') + return None + + # Recursive case + return self.find(parent, required) + + +class MGObject: + def __init__(self, repo, data=None, sha=''): + self.repo = repo + self.sha = "{:>040s}".format(sha) + + if data: + self.deserialize(data) + + def serialize(self): + raise NotImplemented('serialize needs to be implemented!') + + def deserialize(self, data): + raise NotImplemented('deserialize needs to be implemented!') + + @staticmethod + def read(repo, sha, candidates): + path = repo.file('objects', str(sha[0:2]), str(sha[2:])) + + if not path: + raise MGException('{}: bad file'.format(sha)) + + with open(path, 'rb') as f: + raw = zlib.decompress(f.read()) + + # Read object type + space = raw.find(b' ') + fmt = raw[0:space].decode('utf-8') + + # Read and validate object size + end = raw.find(b'\x00', space) + size = int(raw[space:end].decode('ascii')) + if size != len(raw)-end-1: + raise MGException('Malformed object {}: bad length'.format(sha)) + + # Call constructor and return object + return MGObject.find_candidate(candidates, fmt)(repo, raw[end+1:], sha=sha) + + @staticmethod + def find_candidate(candidates, fmt): + candidates = [c for c in candidates if c.understands == fmt] + if not candidates: + raise MGException('Unknown object type {}'.format(fmt)) + if len(candidates) != 1: + raise MGException( + 'Multiple classes ({}) claim to understand object type {}'.format(candidates, fmt) + ) + + return candidates[0] + + + def write(self, write=True): + # Serialize object data + data = self.serialize() + # Add header + result = b'%s %d\x00%s' % (self.understands.encode(), len(data), data) + # Compute hash + sha = hashlib.sha1(result).hexdigest() + + if write: + # Compute path + path = self.repo.file('objects', sha[0:2], sha[2:], mkdir=write) + + if not os.path.exists(path): + with open(path, 'wb') as f: + # Compress and write + f.write(zlib.compress(result)) + + return sha + + +class MGBlob(MGObject): + understands = 'blob' + + def serialize(self): + return self.data + + def deserialize(self, data): + self.data = data + + +class MGCommit(MGObject): + understands = 'commit' + + def __init__(self, *args, **kwargs): + self.headers = collections.OrderedDict() + super().__init__(*args, **kwargs) + + def deserialize(self, raw, offs=0): + spc = raw.find(b' ', offs) + nl = raw.find(b'\n', offs) + + if spc < 0 or nl < spc: + self.data = raw[offs+1:] + return + + key = raw[offs:spc] + + end = raw.find(b'\n', offs+1) + while raw[end+1] == ord(' '): + end = raw.find(b'\n', end+1) + + value = raw[spc+1:end].replace(b'\n ', b'\n') + + if key in self.headers: + self.headers[key].append(value) + else: + self.headers[key] = [value] + + return self.deserialize(raw, offs=end+1) + + def serialize(self): + ret = b'' + + for k, v in self.headers.items(): + for val in v: + ret += k + b' ' + val.replace(b'\n', b'\n ') + b'\n' + + ret += b'\n' + self.data + + return ret + + def from_headers(self, header, dflt): + return self.headers.get(header, [dflt])[0] + + def format(self): + fmt_str = '{}\nAuthor: {}\nDate: {}\n\n{}' + author_info = self.from_headers(b'author', b'unknown').decode('utf-8') + name, email, timestamp, tz = author_info.split(' ') + author = '{} {}'.format(name, email) + date = datetime.datetime.fromtimestamp(int(timestamp)) + # were cheating with the timezone, but thats okay + date = '{}{}'.format(date.strftime('%a %b %H:%M:%S %Y %z'), tz) + commit_info = '\n'.join(' {}'.format(l) for l in self.data.decode('utf-8').split('\n')) + yellow = click.style('commit {}'.format(self.sha), fg='yellow') + return fmt_str.format(yellow, author, date, commit_info) + + @staticmethod + def create(repo, msg, parent=None): + res = MGCommit(repo) + res.headers[b'tree'] = [MGTree.create(repo, '.').write().encode('ascii')] + conf = configparser.ConfigParser() + conf.read([os.path.expanduser('~/.gitconfig')]) + res.headers[b'author'] = [b'%s <%s> %d %s' % ( + conf.get('user', 'name').encode('utf-8'), + conf.get('user', 'email').encode('utf-8'), + int(datetime.datetime.now().timestamp()), + time.strftime('%z', time.gmtime()).encode('utf-8') + )] + res.headers[b'committer'] = res.headers[b'author'] + if parent: + res.headers[b'parent'] = [parent.encode('ascii')] + res.data = msg.encode('utf-8') + return res + + +class MGTree(MGObject): + understands = 'tree' + + class MGLeaf: + def __init__(self, mode, path, sha): + self.mode = mode + self.path = path + self.sha = "{:>020s}".format(sha) + + def deserialize_leaf(self, raw, offs=0): + space = raw.find(b' ', offs) + mode = raw[offs:space] + + null = raw.find(b'\x00', space) + path = raw[space+1:null] + end = null+21 + + sha = format(int.from_bytes(raw[null+1:end], 'big'), 'x') + sha = "{:>040s}".format(sha) + + return end, MGTree.MGLeaf(mode, path, sha) + + def deserialize(self, raw): + self.data = [] + pos = 0 + end = len(raw) + while pos < end: + pos, data = self.deserialize_leaf(raw, pos) + self.data.append(data) + + def serialize(self): + ret = b'' + for i in self.data: + sha_str = int(i.sha, 16).to_bytes(20, byteorder='big') + ret += i.mode + b' ' + i.path + b'\x00' + sha_str + return ret + + def check_prefix(self, d, f): + d = os.path.realpath(d) + f = os.path.realpath(f) + + if os.path.isdir(d): f = os.path.join(d, b'') + if os.path.isdir(f): f = os.path.join(f, b'') + + common = os.path.commonprefix([f, d]) + + return common == d or common == f + + def checkout(self, path, prefix=b'.'): + for item in self.data: + obj = MGObject.read(self.repo, item.sha, DFLT_CANDIDATES) + dest = os.path.join(path, item.path) + + if obj.understands == 'tree' and self.check_prefix(prefix, dest): + if not os.path.exists(dest): + os.mkdir(dest) + obj.checkout(dest) + if obj.understands == 'blob': + with open(dest, 'wb') as f: + f.write(obj.data) + + @staticmethod + def create(repo, path): + res = MGTree(repo) + res.data = [] + + for (d, _, files) in os.walk(path): + for f in files: + item = os.path.join(d, f) + if '/.git/' in item or any(p in item for p in repo.ignore_patterns): + continue + mode = format(os.stat(item)[stat.ST_MODE], 'o').encode('ascii') + + candidate = MGObject.find_candidate(DFLT_CANDIDATES, 'blob') + with open(item, 'rb') as f: + obj = candidate(repo, f.read()) + sha = obj.write() + item = item[2:] + res.data.append(MGTree.MGLeaf(mode, item.encode('utf-8'), sha)) + + return res + + +class MGTag(MGCommit): + understands = 'tag' + + @staticmethod + def create(repo, name, to): + with open(repo.file(os.path.join('refs', 'tags', name)), 'w+') as f: + f.write(to.resolve()) + + +DFLT_CANDIDATES = [MGBlob, MGCommit, MGTree, MGTag] + + +class MGRef: + def __init__(self, repo, ref, children=None): + self.repo = repo + self.ref = ref + self.children = children + + def resolve(self): + recursive = True + data = self.ref + while recursive: + data, recursive, _ = self.references(data) + return data + + def references(self, ref=None): + if ref is None: + ref = self.ref + try: + with open(self.repo.file(ref), 'r') as fp: + data = fp.read().strip() + except FileNotFoundError: + prefix_dir = os.path.join('objects', ref[0:2]) + path = self.repo.dir(prefix_dir) + if path: + rem = ref[2:] + candidates = [] + for f in os.listdir(path): + if f.startswith(rem): + candidates.append("{}{}".format(ref, f)) + if len(candidates) > 1: + s = ', '.join(candidates) + raise MGException( + 'Ambiguous revision {}\nCandidates: {}'.format(ref, s) + ) + if len(candidates) == 1: + return candidates[0], False, False + + + head = os.path.join('refs', 'heads', ref) + tag = os.path.join('refs', 'tags', ref) + if os.path.exists(self.repo.file(head)): + res, rec, _ = self.references(head) + return res, rec, head + if os.path.exists(self.repo.file(tag)): + res, rec, _ = self.references(tag) + return res, rec, tag + return ref, False, False + if data.startswith('ref: '): + return data[5:], True, False + return data, False, False + + def changes_ref(self): + recursive = True + data = self.ref + while recursive: + data, recursive, new_ref = self.references(data) + return new_ref + + @staticmethod + def list(repo, path=None): + if not path: + path = repo.dir('refs') + res = [] + + for f in sorted(os.listdir(path)): + can = os.path.join(path, f) + if os.path.isdir(can): + res.append(MGRef(repo, can, MGRef.list(repo, can))) + else: + res.append(MGRef(repo, can)) + + return res + + def show(self): + if self.children is None: + ref = self.ref.replace(self.repo.dir(''), '') + print('{} {}'.format(self.resolve(), ref)) + else: + for ref in self.children: + ref.show() + + +class MGCatch(click.Group): + def invoke(self, ctx): + try: + return super().invoke(ctx) + except MGException as exc: + print('fatal: {}'.format(exc)) + + +@click.group(help='The stupid content tracker', + cls=MGCatch) +def cli(argv=None): + pass + + +@cli.command() +def add(): + raise MGException('Unimplemented') + + +@cli.command(help="List, create, or delete branches") +@click.argument("name", default=None, required=False) +@click.option("-d", is_flag=True, help="Delete a branch.") +def branch(name, d): + repo = MGRepository('.') + ref = MGRef(repo, 'HEAD') + if d and name: + branch_file = repo.file(os.path.join('refs', 'heads', name)) + if not os.path.exists(branch_file): + raise MGException('Branch \'{}\' not found'.format(name)) + os.remove(branch_file) + elif name: + branch_file = repo.file(os.path.join('refs', 'heads', name)) + if os.path.exists(branch_file): + raise MGException('Branch \'{}\' already exists'.format(name)) + with open(branch_file, 'w') as f: + f.write(ref.resolve()) + elif d: + raise MGException('branch name required') + else: + refbranch, _, _2 = ref.references() + refbranch = os.path.basename(refbranch) + for branch in os.listdir(repo.file(os.path.join('refs', 'heads'))): + if refbranch == branch: + print('* {}'.format(click.style(branch, bg='yellow'))) + else: + print(' {}'.format(click.style(branch, fg='yellow'))) + + +@cli.command( + name='cat-file', + help='Provide content or type and size information for repository objects' +) +@click.argument( + 't', + metavar='type', + type=click.Choice(['blob', 'commit', 'tag', 'tree']) +) +@click.argument('o', metavar='object') +def cat_file(t, o): + repo = MGRepository('.') + obj = MGObject.read(repo, o, DFLT_CANDIDATES) + print(obj.serialize().decode('utf-8'), end='') + + +@cli.command(help='Checkout a commit or restore files.') +@click.argument('commit', default='HEAD') +@click.option('-p', 'path', default='.', help='The path to check out.') +def checkout(commit, path): + repo = MGRepository('.') + + commit = MGRef(repo, commit) + changes_ref = commit.changes_ref() + + obj = MGObject.read(repo, commit.resolve(), DFLT_CANDIDATES) + + if obj.understands == 'commit': + obj = MGObject.read(repo, obj.from_headers(b'tree', b'').decode('ascii'), DFLT_CANDIDATES) + + if not os.path.exists(path): + os.makedirs(path) + + obj.checkout(b'.', path.encode('utf-8')) + + if changes_ref: + with open(repo.file('HEAD'), 'w') as f: + f.write('ref: {}'.format(changes_ref)) + + +@cli.command() +@click.argument('message') +def commit(message): + repo = MGRepository('.') + parent = None + ref = MGRef(repo, 'HEAD') + references, _, _ = ref.references() + references = repo.file(references) + resolved = ref.resolve() + if os.path.exists(resolved): + with open(ref.resolve(), 'r') as f: + parent = f.read().strip() + commit = MGCommit.create(repo, message, parent) + os.makedirs(os.path.dirname(references), exist_ok=True) + with open(references, 'w+') as f: + f.write(commit.write()) + f.write('\n') + + +@cli.command( + name='hash-object', + help='Compute object ID and optionally creates a blob from a file' +) +@click.option( + '-t', + 't', + metavar='type', + type=click.Choice(['blob', 'commit', 'tag', 'tree']), + default='blob', + help='Specify the type' +) +@click.option( + '-w', + 'write', + is_flag=True, + help='Actually write the object into the database' +) +@click.argument('path') +def hash_object(t, write, path): + repo = MGRepository('.') if write else None + + try: + with open(path, 'rb') as f: + obj = MGObject.find_candidate(DFLT_CANDIDATES, t)(repo, f.read()) + except OSError: + raise MGException('unable to hash {}'.format(path)) + sha = obj.write(write) + print(sha) + + +@cli.command(help='Initialize a new, empty repository.') +@click.argument('path', default='.') +def init(path): + MGRepository(path, force=True).create() + + +@cli.command(help='Display history of a given commit.') +@click.argument('commit', default='HEAD') +def log(commit): + repo = MGRepository('.') + + ref = MGRef(repo, commit) + + logfn(repo, ref.resolve(), set()) + + +def logfn(repo, sha, seen): + if sha in seen: + return + seen.add(sha) + + commit = MGObject.read(repo, sha, DFLT_CANDIDATES) + + print(commit.format()) + + if not b'parent' in commit.headers: + return + + parents = commit.headers[b'parent'] + + for p in parents: + logfn(repo, p.decode('ascii'), seen) + + +@cli.command(help='Pretty-print a tree object.') +@click.argument('o', metavar='object') +@click.option('-r', is_flag=True, help='Recurse into sub-trees.') +def ls_tree(o, r): + repo = MGRepository('.') + + def internal(o, r, prefix=''): + o = MGObject.read(repo, o, DFLT_CANDIDATES) + + if o.understands != 'tree': + raise MGException('not a tree object') + + for item in o.data: + path = os.path.join(prefix, item.path.decode('ascii')) + t = MGObject.read(repo, item.sha, DFLT_CANDIDATES).understands + if t == 'tree' and r: + internal(item.sha, r, prefix=path) + else: + print('{0} {1} {2}\t{3}'.format( + '0' * (6 - len(item.mode)) + item.mode.decode('ascii'), + t, + item.sha, + path)) + + internal(o, r) + + +@cli.command() +def merge(): + raise MGException('Unimplemented') + + +@cli.command() +def rebase(): + raise MGException('Unimplemented') + + +@cli.command() +@click.argument("name") +def rev_parse(name): + repo = MGRepository(".") + print(MGRef(repo, name).resolve()) + + +@cli.command() +def rm(): + raise MGException('Unimplemented') + + +@cli.command(help='List references.') +def show_ref(): + repo = MGRepository('.') + refs = MGRef.list(repo) + for ref in refs: + ref.show() + + +@cli.command(help='List and create tags') +@click.argument('name', default=None, required=False) +@click.argument('o', metavar='object', default='HEAD') +def tag(name, o): + repo = MGRepository('.') + + if name: + MGTag.create(repo, name, MGRef(repo, o)) + else: + refs = MGRef.list(repo) + for ref in refs: + prefix = repo.dir(os.path.join('refs', 'tags')) + if ref.ref == prefix: + for child in ref.children: + print(child.ref.replace(os.path.join(prefix, ''), '')) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..2a94c04 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +click +lol