first version

This commit is contained in:
2019-03-18 20:03:50 +01:00
commit 6d5398a35c
3 changed files with 713 additions and 0 deletions

4
main.py Normal file
View File

@@ -0,0 +1,4 @@
#!/usr/bin/env python3
import mg
mg.cli()

707
mg/__init__.py Normal file
View File

@@ -0,0 +1,707 @@
import collections
import configparser
import datetime
import hashlib
import os
import re
import stat
import sys
import time
import zlib
import click
class MGException(Exception):
pass
class MGRepository:
"""
Our abstraction of a Git repository. Has a work directory, a Git directory,
and a bunch of configuration.
"""
def __init__(self, path, force=False):
self.worktree = path
self.gitdir = (self.find(path, required=not force) or
os.path.join(path, '.git'))
# Read configuration file in .git/config
self.conf = configparser.ConfigParser()
cf = self.file('config')
if cf and os.path.exists(cf):
self.conf.read([cf])
elif not force:
raise MGException('Cant find the configuration file!')
self.ignore_patterns = []
if os.path.exists(os.path.expanduser('~/.gitignore')):
with open(os.path.expanduser('~/.gitignore'), 'r') as f:
self.ignore_patterns.extend(l.strip() for l in f.readlines() if l.strip())
if not force:
vers = int(self.conf.get('core', 'repositoryformatversion'))
if vers != 0 and not force:
raise MGException(
'Expected git repo version <= 1, found {}'.format(vers)
)
def path(self, *path):
return os.path.join(self.gitdir, *path)
def file(self, *path, mkdir=False):
if self.dir(*path[:-1], mkdir=mkdir):
return self.path(*path)
def dir(self, *path, mkdir=False):
path = self.path(*path)
if os.path.exists(path):
if not os.path.isdir(path):
raise MGException('{} is not a directory!'.format(path))
return path
if not mkdir:
return None
os.makedirs(path)
return path
def create(self):
if os.path.exists(self.worktree):
if not os.path.isdir(self.worktree):
raise MGException(
'{} is not a directory!',format(self.worktree)
)
else:
os.makedirs(self.worktree)
self.dir('branches', mkdir=True)
self.dir('objects', mkdir=True)
self.dir('refs', 'tags', mkdir=True)
self.dir('refs', 'heads', mkdir=True)
# .git/description
with open(self.file('description'), 'w') as f:
f.write(
'Unnamed repository; edit this file \'description\' to name the repository.\n'
)
# .git/HEAD
with open(self.file('HEAD'), 'w') as f:
f.write('ref: refs/heads/master\n')
with open(self.file('config'), 'w') as f:
config = self.default_config()
config.write(f)
def default_config(self):
ret = configparser.ConfigParser()
ret.add_section('core')
ret.set('core', 'repositoryformatversion', '0')
ret.set('core', 'filemode', 'false')
ret.set('core', 'bare', 'false')
return ret
def find(self, path, required=True):
path = os.path.realpath(path)
dir_candidate = os.path.join(path, '.git')
if os.path.isdir(dir_candidate):
return dir_candidate
# If we haven't returned, recurse in parent, if w
parent = os.path.realpath(os.path.join(path, '..'))
if parent == path:
# Bottom case
# os.path.join('/', '..') == '/':
# If parent==path, then path is root.
if required:
raise MGException('not a git repository (or any of the parent directories): .git')
return None
# Recursive case
return self.find(parent, required)
class MGObject:
def __init__(self, repo, data=None, sha=''):
self.repo = repo
self.sha = "{:>040s}".format(sha)
if data:
self.deserialize(data)
def serialize(self):
raise NotImplemented('serialize needs to be implemented!')
def deserialize(self, data):
raise NotImplemented('deserialize needs to be implemented!')
@staticmethod
def read(repo, sha, candidates):
path = repo.file('objects', str(sha[0:2]), str(sha[2:]))
if not path:
raise MGException('{}: bad file'.format(sha))
with open(path, 'rb') as f:
raw = zlib.decompress(f.read())
# Read object type
space = raw.find(b' ')
fmt = raw[0:space].decode('utf-8')
# Read and validate object size
end = raw.find(b'\x00', space)
size = int(raw[space:end].decode('ascii'))
if size != len(raw)-end-1:
raise MGException('Malformed object {}: bad length'.format(sha))
# Call constructor and return object
return MGObject.find_candidate(candidates, fmt)(repo, raw[end+1:], sha=sha)
@staticmethod
def find_candidate(candidates, fmt):
candidates = [c for c in candidates if c.understands == fmt]
if not candidates:
raise MGException('Unknown object type {}'.format(fmt))
if len(candidates) != 1:
raise MGException(
'Multiple classes ({}) claim to understand object type {}'.format(candidates, fmt)
)
return candidates[0]
def write(self, write=True):
# Serialize object data
data = self.serialize()
# Add header
result = b'%s %d\x00%s' % (self.understands.encode(), len(data), data)
# Compute hash
sha = hashlib.sha1(result).hexdigest()
if write:
# Compute path
path = self.repo.file('objects', sha[0:2], sha[2:], mkdir=write)
if not os.path.exists(path):
with open(path, 'wb') as f:
# Compress and write
f.write(zlib.compress(result))
return sha
class MGBlob(MGObject):
understands = 'blob'
def serialize(self):
return self.data
def deserialize(self, data):
self.data = data
class MGCommit(MGObject):
understands = 'commit'
def __init__(self, *args, **kwargs):
self.headers = collections.OrderedDict()
super().__init__(*args, **kwargs)
def deserialize(self, raw, offs=0):
spc = raw.find(b' ', offs)
nl = raw.find(b'\n', offs)
if spc < 0 or nl < spc:
self.data = raw[offs+1:]
return
key = raw[offs:spc]
end = raw.find(b'\n', offs+1)
while raw[end+1] == ord(' '):
end = raw.find(b'\n', end+1)
value = raw[spc+1:end].replace(b'\n ', b'\n')
if key in self.headers:
self.headers[key].append(value)
else:
self.headers[key] = [value]
return self.deserialize(raw, offs=end+1)
def serialize(self):
ret = b''
for k, v in self.headers.items():
for val in v:
ret += k + b' ' + val.replace(b'\n', b'\n ') + b'\n'
ret += b'\n' + self.data
return ret
def from_headers(self, header, dflt):
return self.headers.get(header, [dflt])[0]
def format(self):
fmt_str = '{}\nAuthor: {}\nDate: {}\n\n{}'
author_info = self.from_headers(b'author', b'unknown').decode('utf-8')
name, email, timestamp, tz = author_info.split(' ')
author = '{} {}'.format(name, email)
date = datetime.datetime.fromtimestamp(int(timestamp))
# were cheating with the timezone, but thats okay
date = '{}{}'.format(date.strftime('%a %b %H:%M:%S %Y %z'), tz)
commit_info = '\n'.join(' {}'.format(l) for l in self.data.decode('utf-8').split('\n'))
yellow = click.style('commit {}'.format(self.sha), fg='yellow')
return fmt_str.format(yellow, author, date, commit_info)
@staticmethod
def create(repo, msg, parent=None):
res = MGCommit(repo)
res.headers[b'tree'] = [MGTree.create(repo, '.').write().encode('ascii')]
conf = configparser.ConfigParser()
conf.read([os.path.expanduser('~/.gitconfig')])
res.headers[b'author'] = [b'%s <%s> %d %s' % (
conf.get('user', 'name').encode('utf-8'),
conf.get('user', 'email').encode('utf-8'),
int(datetime.datetime.now().timestamp()),
time.strftime('%z', time.gmtime()).encode('utf-8')
)]
res.headers[b'committer'] = res.headers[b'author']
if parent:
res.headers[b'parent'] = [parent.encode('ascii')]
res.data = msg.encode('utf-8')
return res
class MGTree(MGObject):
understands = 'tree'
class MGLeaf:
def __init__(self, mode, path, sha):
self.mode = mode
self.path = path
self.sha = "{:>020s}".format(sha)
def deserialize_leaf(self, raw, offs=0):
space = raw.find(b' ', offs)
mode = raw[offs:space]
null = raw.find(b'\x00', space)
path = raw[space+1:null]
end = null+21
sha = format(int.from_bytes(raw[null+1:end], 'big'), 'x')
sha = "{:>040s}".format(sha)
return end, MGTree.MGLeaf(mode, path, sha)
def deserialize(self, raw):
self.data = []
pos = 0
end = len(raw)
while pos < end:
pos, data = self.deserialize_leaf(raw, pos)
self.data.append(data)
def serialize(self):
ret = b''
for i in self.data:
sha_str = int(i.sha, 16).to_bytes(20, byteorder='big')
ret += i.mode + b' ' + i.path + b'\x00' + sha_str
return ret
def check_prefix(self, d, f):
d = os.path.realpath(d)
f = os.path.realpath(f)
if os.path.isdir(d): f = os.path.join(d, b'')
if os.path.isdir(f): f = os.path.join(f, b'')
common = os.path.commonprefix([f, d])
return common == d or common == f
def checkout(self, path, prefix=b'.'):
for item in self.data:
obj = MGObject.read(self.repo, item.sha, DFLT_CANDIDATES)
dest = os.path.join(path, item.path)
if obj.understands == 'tree' and self.check_prefix(prefix, dest):
if not os.path.exists(dest):
os.mkdir(dest)
obj.checkout(dest)
if obj.understands == 'blob':
with open(dest, 'wb') as f:
f.write(obj.data)
@staticmethod
def create(repo, path):
res = MGTree(repo)
res.data = []
for (d, _, files) in os.walk(path):
for f in files:
item = os.path.join(d, f)
if '/.git/' in item or any(p in item for p in repo.ignore_patterns):
continue
mode = format(os.stat(item)[stat.ST_MODE], 'o').encode('ascii')
candidate = MGObject.find_candidate(DFLT_CANDIDATES, 'blob')
with open(item, 'rb') as f:
obj = candidate(repo, f.read())
sha = obj.write()
item = item[2:]
res.data.append(MGTree.MGLeaf(mode, item.encode('utf-8'), sha))
return res
class MGTag(MGCommit):
understands = 'tag'
@staticmethod
def create(repo, name, to):
with open(repo.file(os.path.join('refs', 'tags', name)), 'w+') as f:
f.write(to.resolve())
DFLT_CANDIDATES = [MGBlob, MGCommit, MGTree, MGTag]
class MGRef:
def __init__(self, repo, ref, children=None):
self.repo = repo
self.ref = ref
self.children = children
def resolve(self):
recursive = True
data = self.ref
while recursive:
data, recursive, _ = self.references(data)
return data
def references(self, ref=None):
if ref is None:
ref = self.ref
try:
with open(self.repo.file(ref), 'r') as fp:
data = fp.read().strip()
except FileNotFoundError:
prefix_dir = os.path.join('objects', ref[0:2])
path = self.repo.dir(prefix_dir)
if path:
rem = ref[2:]
candidates = []
for f in os.listdir(path):
if f.startswith(rem):
candidates.append("{}{}".format(ref, f))
if len(candidates) > 1:
s = ', '.join(candidates)
raise MGException(
'Ambiguous revision {}\nCandidates: {}'.format(ref, s)
)
if len(candidates) == 1:
return candidates[0], False, False
head = os.path.join('refs', 'heads', ref)
tag = os.path.join('refs', 'tags', ref)
if os.path.exists(self.repo.file(head)):
res, rec, _ = self.references(head)
return res, rec, head
if os.path.exists(self.repo.file(tag)):
res, rec, _ = self.references(tag)
return res, rec, tag
return ref, False, False
if data.startswith('ref: '):
return data[5:], True, False
return data, False, False
def changes_ref(self):
recursive = True
data = self.ref
while recursive:
data, recursive, new_ref = self.references(data)
return new_ref
@staticmethod
def list(repo, path=None):
if not path:
path = repo.dir('refs')
res = []
for f in sorted(os.listdir(path)):
can = os.path.join(path, f)
if os.path.isdir(can):
res.append(MGRef(repo, can, MGRef.list(repo, can)))
else:
res.append(MGRef(repo, can))
return res
def show(self):
if self.children is None:
ref = self.ref.replace(self.repo.dir(''), '')
print('{} {}'.format(self.resolve(), ref))
else:
for ref in self.children:
ref.show()
class MGCatch(click.Group):
def invoke(self, ctx):
try:
return super().invoke(ctx)
except MGException as exc:
print('fatal: {}'.format(exc))
@click.group(help='The stupid content tracker',
cls=MGCatch)
def cli(argv=None):
pass
@cli.command()
def add():
raise MGException('Unimplemented')
@cli.command(help="List, create, or delete branches")
@click.argument("name", default=None, required=False)
@click.option("-d", is_flag=True, help="Delete a branch.")
def branch(name, d):
repo = MGRepository('.')
ref = MGRef(repo, 'HEAD')
if d and name:
branch_file = repo.file(os.path.join('refs', 'heads', name))
if not os.path.exists(branch_file):
raise MGException('Branch \'{}\' not found'.format(name))
os.remove(branch_file)
elif name:
branch_file = repo.file(os.path.join('refs', 'heads', name))
if os.path.exists(branch_file):
raise MGException('Branch \'{}\' already exists'.format(name))
with open(branch_file, 'w') as f:
f.write(ref.resolve())
elif d:
raise MGException('branch name required')
else:
refbranch, _, _2 = ref.references()
refbranch = os.path.basename(refbranch)
for branch in os.listdir(repo.file(os.path.join('refs', 'heads'))):
if refbranch == branch:
print('* {}'.format(click.style(branch, bg='yellow')))
else:
print(' {}'.format(click.style(branch, fg='yellow')))
@cli.command(
name='cat-file',
help='Provide content or type and size information for repository objects'
)
@click.argument(
't',
metavar='type',
type=click.Choice(['blob', 'commit', 'tag', 'tree'])
)
@click.argument('o', metavar='object')
def cat_file(t, o):
repo = MGRepository('.')
obj = MGObject.read(repo, o, DFLT_CANDIDATES)
print(obj.serialize().decode('utf-8'), end='')
@cli.command(help='Checkout a commit or restore files.')
@click.argument('commit', default='HEAD')
@click.option('-p', 'path', default='.', help='The path to check out.')
def checkout(commit, path):
repo = MGRepository('.')
commit = MGRef(repo, commit)
changes_ref = commit.changes_ref()
obj = MGObject.read(repo, commit.resolve(), DFLT_CANDIDATES)
if obj.understands == 'commit':
obj = MGObject.read(repo, obj.from_headers(b'tree', b'').decode('ascii'), DFLT_CANDIDATES)
if not os.path.exists(path):
os.makedirs(path)
obj.checkout(b'.', path.encode('utf-8'))
if changes_ref:
with open(repo.file('HEAD'), 'w') as f:
f.write('ref: {}'.format(changes_ref))
@cli.command()
@click.argument('message')
def commit(message):
repo = MGRepository('.')
parent = None
ref = MGRef(repo, 'HEAD')
references, _, _ = ref.references()
references = repo.file(references)
resolved = ref.resolve()
if os.path.exists(resolved):
with open(ref.resolve(), 'r') as f:
parent = f.read().strip()
commit = MGCommit.create(repo, message, parent)
os.makedirs(os.path.dirname(references), exist_ok=True)
with open(references, 'w+') as f:
f.write(commit.write())
f.write('\n')
@cli.command(
name='hash-object',
help='Compute object ID and optionally creates a blob from a file'
)
@click.option(
'-t',
't',
metavar='type',
type=click.Choice(['blob', 'commit', 'tag', 'tree']),
default='blob',
help='Specify the type'
)
@click.option(
'-w',
'write',
is_flag=True,
help='Actually write the object into the database'
)
@click.argument('path')
def hash_object(t, write, path):
repo = MGRepository('.') if write else None
try:
with open(path, 'rb') as f:
obj = MGObject.find_candidate(DFLT_CANDIDATES, t)(repo, f.read())
except OSError:
raise MGException('unable to hash {}'.format(path))
sha = obj.write(write)
print(sha)
@cli.command(help='Initialize a new, empty repository.')
@click.argument('path', default='.')
def init(path):
MGRepository(path, force=True).create()
@cli.command(help='Display history of a given commit.')
@click.argument('commit', default='HEAD')
def log(commit):
repo = MGRepository('.')
ref = MGRef(repo, commit)
logfn(repo, ref.resolve(), set())
def logfn(repo, sha, seen):
if sha in seen:
return
seen.add(sha)
commit = MGObject.read(repo, sha, DFLT_CANDIDATES)
print(commit.format())
if not b'parent' in commit.headers:
return
parents = commit.headers[b'parent']
for p in parents:
logfn(repo, p.decode('ascii'), seen)
@cli.command(help='Pretty-print a tree object.')
@click.argument('o', metavar='object')
@click.option('-r', is_flag=True, help='Recurse into sub-trees.')
def ls_tree(o, r):
repo = MGRepository('.')
def internal(o, r, prefix=''):
o = MGObject.read(repo, o, DFLT_CANDIDATES)
if o.understands != 'tree':
raise MGException('not a tree object')
for item in o.data:
path = os.path.join(prefix, item.path.decode('ascii'))
t = MGObject.read(repo, item.sha, DFLT_CANDIDATES).understands
if t == 'tree' and r:
internal(item.sha, r, prefix=path)
else:
print('{0} {1} {2}\t{3}'.format(
'0' * (6 - len(item.mode)) + item.mode.decode('ascii'),
t,
item.sha,
path))
internal(o, r)
@cli.command()
def merge():
raise MGException('Unimplemented')
@cli.command()
def rebase():
raise MGException('Unimplemented')
@cli.command()
@click.argument("name")
def rev_parse(name):
repo = MGRepository(".")
print(MGRef(repo, name).resolve())
@cli.command()
def rm():
raise MGException('Unimplemented')
@cli.command(help='List references.')
def show_ref():
repo = MGRepository('.')
refs = MGRef.list(repo)
for ref in refs:
ref.show()
@cli.command(help='List and create tags')
@click.argument('name', default=None, required=False)
@click.argument('o', metavar='object', default='HEAD')
def tag(name, o):
repo = MGRepository('.')
if name:
MGTag.create(repo, name, MGRef(repo, o))
else:
refs = MGRef.list(repo)
for ref in refs:
prefix = repo.dir(os.path.join('refs', 'tags'))
if ref.ref == prefix:
for child in ref.children:
print(child.ref.replace(os.path.join(prefix, ''), ''))

2
requirements.txt Normal file
View File

@@ -0,0 +1,2 @@
click
lol