split up into modules
This commit is contained in:
2
main.py
2
main.py
@@ -1,4 +1,4 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
import mg
|
import mg
|
||||||
mg.cli()
|
mg.commands.cli()
|
||||||
|
708
mg/__init__.py
708
mg/__init__.py
@@ -1,707 +1 @@
|
|||||||
import collections
|
from mg import commands
|
||||||
import configparser
|
|
||||||
import datetime
|
|
||||||
import hashlib
|
|
||||||
import os
|
|
||||||
import re
|
|
||||||
import stat
|
|
||||||
import sys
|
|
||||||
import time
|
|
||||||
import zlib
|
|
||||||
|
|
||||||
import click
|
|
||||||
|
|
||||||
|
|
||||||
class MGException(Exception):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class MGRepository:
|
|
||||||
"""
|
|
||||||
Our abstraction of a Git repository. Has a work directory, a Git directory,
|
|
||||||
and a bunch of configuration.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, path, force=False):
|
|
||||||
self.worktree = path
|
|
||||||
self.gitdir = (self.find(path, required=not force) or
|
|
||||||
os.path.join(path, '.git'))
|
|
||||||
|
|
||||||
# Read configuration file in .git/config
|
|
||||||
self.conf = configparser.ConfigParser()
|
|
||||||
cf = self.file('config')
|
|
||||||
|
|
||||||
if cf and os.path.exists(cf):
|
|
||||||
self.conf.read([cf])
|
|
||||||
elif not force:
|
|
||||||
raise MGException('Can’t find the configuration file!')
|
|
||||||
|
|
||||||
self.ignore_patterns = []
|
|
||||||
|
|
||||||
if os.path.exists(os.path.expanduser('~/.gitignore')):
|
|
||||||
with open(os.path.expanduser('~/.gitignore'), 'r') as f:
|
|
||||||
self.ignore_patterns.extend(l.strip() for l in f.readlines() if l.strip())
|
|
||||||
|
|
||||||
if not force:
|
|
||||||
vers = int(self.conf.get('core', 'repositoryformatversion'))
|
|
||||||
if vers != 0 and not force:
|
|
||||||
raise MGException(
|
|
||||||
'Expected git repo version <= 1, found {}'.format(vers)
|
|
||||||
)
|
|
||||||
|
|
||||||
def path(self, *path):
|
|
||||||
return os.path.join(self.gitdir, *path)
|
|
||||||
|
|
||||||
def file(self, *path, mkdir=False):
|
|
||||||
if self.dir(*path[:-1], mkdir=mkdir):
|
|
||||||
return self.path(*path)
|
|
||||||
|
|
||||||
def dir(self, *path, mkdir=False):
|
|
||||||
path = self.path(*path)
|
|
||||||
|
|
||||||
if os.path.exists(path):
|
|
||||||
if not os.path.isdir(path):
|
|
||||||
raise MGException('{} is not a directory!'.format(path))
|
|
||||||
return path
|
|
||||||
|
|
||||||
if not mkdir:
|
|
||||||
return None
|
|
||||||
|
|
||||||
os.makedirs(path)
|
|
||||||
return path
|
|
||||||
|
|
||||||
def create(self):
|
|
||||||
if os.path.exists(self.worktree):
|
|
||||||
if not os.path.isdir(self.worktree):
|
|
||||||
raise MGException(
|
|
||||||
'{} is not a directory!',format(self.worktree)
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
os.makedirs(self.worktree)
|
|
||||||
|
|
||||||
self.dir('branches', mkdir=True)
|
|
||||||
self.dir('objects', mkdir=True)
|
|
||||||
self.dir('refs', 'tags', mkdir=True)
|
|
||||||
self.dir('refs', 'heads', mkdir=True)
|
|
||||||
|
|
||||||
# .git/description
|
|
||||||
with open(self.file('description'), 'w') as f:
|
|
||||||
f.write(
|
|
||||||
'Unnamed repository; edit this file \'description\' to name the repository.\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
# .git/HEAD
|
|
||||||
with open(self.file('HEAD'), 'w') as f:
|
|
||||||
f.write('ref: refs/heads/master\n')
|
|
||||||
|
|
||||||
with open(self.file('config'), 'w') as f:
|
|
||||||
config = self.default_config()
|
|
||||||
config.write(f)
|
|
||||||
|
|
||||||
def default_config(self):
|
|
||||||
ret = configparser.ConfigParser()
|
|
||||||
|
|
||||||
ret.add_section('core')
|
|
||||||
ret.set('core', 'repositoryformatversion', '0')
|
|
||||||
ret.set('core', 'filemode', 'false')
|
|
||||||
ret.set('core', 'bare', 'false')
|
|
||||||
|
|
||||||
return ret
|
|
||||||
|
|
||||||
def find(self, path, required=True):
|
|
||||||
path = os.path.realpath(path)
|
|
||||||
dir_candidate = os.path.join(path, '.git')
|
|
||||||
|
|
||||||
if os.path.isdir(dir_candidate):
|
|
||||||
return dir_candidate
|
|
||||||
|
|
||||||
# If we haven't returned, recurse in parent, if w
|
|
||||||
parent = os.path.realpath(os.path.join(path, '..'))
|
|
||||||
|
|
||||||
if parent == path:
|
|
||||||
# Bottom case
|
|
||||||
# os.path.join('/', '..') == '/':
|
|
||||||
# If parent==path, then path is root.
|
|
||||||
if required:
|
|
||||||
raise MGException('not a git repository (or any of the parent directories): .git')
|
|
||||||
return None
|
|
||||||
|
|
||||||
# Recursive case
|
|
||||||
return self.find(parent, required)
|
|
||||||
|
|
||||||
|
|
||||||
class MGObject:
|
|
||||||
def __init__(self, repo, data=None, sha=''):
|
|
||||||
self.repo = repo
|
|
||||||
self.sha = "{:>040s}".format(sha)
|
|
||||||
|
|
||||||
if data:
|
|
||||||
self.deserialize(data)
|
|
||||||
|
|
||||||
def serialize(self):
|
|
||||||
raise NotImplemented('serialize needs to be implemented!')
|
|
||||||
|
|
||||||
def deserialize(self, data):
|
|
||||||
raise NotImplemented('deserialize needs to be implemented!')
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def read(repo, sha, candidates):
|
|
||||||
path = repo.file('objects', str(sha[0:2]), str(sha[2:]))
|
|
||||||
|
|
||||||
if not path:
|
|
||||||
raise MGException('{}: bad file'.format(sha))
|
|
||||||
|
|
||||||
with open(path, 'rb') as f:
|
|
||||||
raw = zlib.decompress(f.read())
|
|
||||||
|
|
||||||
# Read object type
|
|
||||||
space = raw.find(b' ')
|
|
||||||
fmt = raw[0:space].decode('utf-8')
|
|
||||||
|
|
||||||
# Read and validate object size
|
|
||||||
end = raw.find(b'\x00', space)
|
|
||||||
size = int(raw[space:end].decode('ascii'))
|
|
||||||
if size != len(raw)-end-1:
|
|
||||||
raise MGException('Malformed object {}: bad length'.format(sha))
|
|
||||||
|
|
||||||
# Call constructor and return object
|
|
||||||
return MGObject.find_candidate(candidates, fmt)(repo, raw[end+1:], sha=sha)
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def find_candidate(candidates, fmt):
|
|
||||||
candidates = [c for c in candidates if c.understands == fmt]
|
|
||||||
if not candidates:
|
|
||||||
raise MGException('Unknown object type {}'.format(fmt))
|
|
||||||
if len(candidates) != 1:
|
|
||||||
raise MGException(
|
|
||||||
'Multiple classes ({}) claim to understand object type {}'.format(candidates, fmt)
|
|
||||||
)
|
|
||||||
|
|
||||||
return candidates[0]
|
|
||||||
|
|
||||||
|
|
||||||
def write(self, write=True):
|
|
||||||
# Serialize object data
|
|
||||||
data = self.serialize()
|
|
||||||
# Add header
|
|
||||||
result = b'%s %d\x00%s' % (self.understands.encode(), len(data), data)
|
|
||||||
# Compute hash
|
|
||||||
sha = hashlib.sha1(result).hexdigest()
|
|
||||||
|
|
||||||
if write:
|
|
||||||
# Compute path
|
|
||||||
path = self.repo.file('objects', sha[0:2], sha[2:], mkdir=write)
|
|
||||||
|
|
||||||
if not os.path.exists(path):
|
|
||||||
with open(path, 'wb') as f:
|
|
||||||
# Compress and write
|
|
||||||
f.write(zlib.compress(result))
|
|
||||||
|
|
||||||
return sha
|
|
||||||
|
|
||||||
|
|
||||||
class MGBlob(MGObject):
|
|
||||||
understands = 'blob'
|
|
||||||
|
|
||||||
def serialize(self):
|
|
||||||
return self.data
|
|
||||||
|
|
||||||
def deserialize(self, data):
|
|
||||||
self.data = data
|
|
||||||
|
|
||||||
|
|
||||||
class MGCommit(MGObject):
|
|
||||||
understands = 'commit'
|
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
|
||||||
self.headers = collections.OrderedDict()
|
|
||||||
super().__init__(*args, **kwargs)
|
|
||||||
|
|
||||||
def deserialize(self, raw, offs=0):
|
|
||||||
spc = raw.find(b' ', offs)
|
|
||||||
nl = raw.find(b'\n', offs)
|
|
||||||
|
|
||||||
if spc < 0 or nl < spc:
|
|
||||||
self.data = raw[offs+1:]
|
|
||||||
return
|
|
||||||
|
|
||||||
key = raw[offs:spc]
|
|
||||||
|
|
||||||
end = raw.find(b'\n', offs+1)
|
|
||||||
while raw[end+1] == ord(' '):
|
|
||||||
end = raw.find(b'\n', end+1)
|
|
||||||
|
|
||||||
value = raw[spc+1:end].replace(b'\n ', b'\n')
|
|
||||||
|
|
||||||
if key in self.headers:
|
|
||||||
self.headers[key].append(value)
|
|
||||||
else:
|
|
||||||
self.headers[key] = [value]
|
|
||||||
|
|
||||||
return self.deserialize(raw, offs=end+1)
|
|
||||||
|
|
||||||
def serialize(self):
|
|
||||||
ret = b''
|
|
||||||
|
|
||||||
for k, v in self.headers.items():
|
|
||||||
for val in v:
|
|
||||||
ret += k + b' ' + val.replace(b'\n', b'\n ') + b'\n'
|
|
||||||
|
|
||||||
ret += b'\n' + self.data
|
|
||||||
|
|
||||||
return ret
|
|
||||||
|
|
||||||
def from_headers(self, header, dflt):
|
|
||||||
return self.headers.get(header, [dflt])[0]
|
|
||||||
|
|
||||||
def format(self):
|
|
||||||
fmt_str = '{}\nAuthor: {}\nDate: {}\n\n{}'
|
|
||||||
author_info = self.from_headers(b'author', b'unknown').decode('utf-8')
|
|
||||||
name, email, timestamp, tz = author_info.split(' ')
|
|
||||||
author = '{} {}'.format(name, email)
|
|
||||||
date = datetime.datetime.fromtimestamp(int(timestamp))
|
|
||||||
# were cheating with the timezone, but thats okay
|
|
||||||
date = '{}{}'.format(date.strftime('%a %b %H:%M:%S %Y %z'), tz)
|
|
||||||
commit_info = '\n'.join(' {}'.format(l) for l in self.data.decode('utf-8').split('\n'))
|
|
||||||
yellow = click.style('commit {}'.format(self.sha), fg='yellow')
|
|
||||||
return fmt_str.format(yellow, author, date, commit_info)
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def create(repo, msg, parent=None):
|
|
||||||
res = MGCommit(repo)
|
|
||||||
res.headers[b'tree'] = [MGTree.create(repo, '.').write().encode('ascii')]
|
|
||||||
conf = configparser.ConfigParser()
|
|
||||||
conf.read([os.path.expanduser('~/.gitconfig')])
|
|
||||||
res.headers[b'author'] = [b'%s <%s> %d %s' % (
|
|
||||||
conf.get('user', 'name').encode('utf-8'),
|
|
||||||
conf.get('user', 'email').encode('utf-8'),
|
|
||||||
int(datetime.datetime.now().timestamp()),
|
|
||||||
time.strftime('%z', time.gmtime()).encode('utf-8')
|
|
||||||
)]
|
|
||||||
res.headers[b'committer'] = res.headers[b'author']
|
|
||||||
if parent:
|
|
||||||
res.headers[b'parent'] = [parent.encode('ascii')]
|
|
||||||
res.data = msg.encode('utf-8')
|
|
||||||
return res
|
|
||||||
|
|
||||||
|
|
||||||
class MGTree(MGObject):
|
|
||||||
understands = 'tree'
|
|
||||||
|
|
||||||
class MGLeaf:
|
|
||||||
def __init__(self, mode, path, sha):
|
|
||||||
self.mode = mode
|
|
||||||
self.path = path
|
|
||||||
self.sha = "{:>020s}".format(sha)
|
|
||||||
|
|
||||||
def deserialize_leaf(self, raw, offs=0):
|
|
||||||
space = raw.find(b' ', offs)
|
|
||||||
mode = raw[offs:space]
|
|
||||||
|
|
||||||
null = raw.find(b'\x00', space)
|
|
||||||
path = raw[space+1:null]
|
|
||||||
end = null+21
|
|
||||||
|
|
||||||
sha = format(int.from_bytes(raw[null+1:end], 'big'), 'x')
|
|
||||||
sha = "{:>040s}".format(sha)
|
|
||||||
|
|
||||||
return end, MGTree.MGLeaf(mode, path, sha)
|
|
||||||
|
|
||||||
def deserialize(self, raw):
|
|
||||||
self.data = []
|
|
||||||
pos = 0
|
|
||||||
end = len(raw)
|
|
||||||
while pos < end:
|
|
||||||
pos, data = self.deserialize_leaf(raw, pos)
|
|
||||||
self.data.append(data)
|
|
||||||
|
|
||||||
def serialize(self):
|
|
||||||
ret = b''
|
|
||||||
for i in self.data:
|
|
||||||
sha_str = int(i.sha, 16).to_bytes(20, byteorder='big')
|
|
||||||
ret += i.mode + b' ' + i.path + b'\x00' + sha_str
|
|
||||||
return ret
|
|
||||||
|
|
||||||
def check_prefix(self, d, f):
|
|
||||||
d = os.path.realpath(d)
|
|
||||||
f = os.path.realpath(f)
|
|
||||||
|
|
||||||
if os.path.isdir(d): f = os.path.join(d, b'')
|
|
||||||
if os.path.isdir(f): f = os.path.join(f, b'')
|
|
||||||
|
|
||||||
common = os.path.commonprefix([f, d])
|
|
||||||
|
|
||||||
return common == d or common == f
|
|
||||||
|
|
||||||
def checkout(self, path, prefix=b'.'):
|
|
||||||
for item in self.data:
|
|
||||||
obj = MGObject.read(self.repo, item.sha, DFLT_CANDIDATES)
|
|
||||||
dest = os.path.join(path, item.path)
|
|
||||||
|
|
||||||
if obj.understands == 'tree' and self.check_prefix(prefix, dest):
|
|
||||||
if not os.path.exists(dest):
|
|
||||||
os.mkdir(dest)
|
|
||||||
obj.checkout(dest)
|
|
||||||
if obj.understands == 'blob':
|
|
||||||
with open(dest, 'wb') as f:
|
|
||||||
f.write(obj.data)
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def create(repo, path):
|
|
||||||
res = MGTree(repo)
|
|
||||||
res.data = []
|
|
||||||
|
|
||||||
for (d, _, files) in os.walk(path):
|
|
||||||
for f in files:
|
|
||||||
item = os.path.join(d, f)
|
|
||||||
if '/.git/' in item or any(p in item for p in repo.ignore_patterns):
|
|
||||||
continue
|
|
||||||
mode = format(os.stat(item)[stat.ST_MODE], 'o').encode('ascii')
|
|
||||||
|
|
||||||
candidate = MGObject.find_candidate(DFLT_CANDIDATES, 'blob')
|
|
||||||
with open(item, 'rb') as f:
|
|
||||||
obj = candidate(repo, f.read())
|
|
||||||
sha = obj.write()
|
|
||||||
item = item[2:]
|
|
||||||
res.data.append(MGTree.MGLeaf(mode, item.encode('utf-8'), sha))
|
|
||||||
|
|
||||||
return res
|
|
||||||
|
|
||||||
|
|
||||||
class MGTag(MGCommit):
|
|
||||||
understands = 'tag'
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def create(repo, name, to):
|
|
||||||
with open(repo.file(os.path.join('refs', 'tags', name)), 'w+') as f:
|
|
||||||
f.write(to.resolve())
|
|
||||||
|
|
||||||
|
|
||||||
DFLT_CANDIDATES = [MGBlob, MGCommit, MGTree, MGTag]
|
|
||||||
|
|
||||||
|
|
||||||
class MGRef:
|
|
||||||
def __init__(self, repo, ref, children=None):
|
|
||||||
self.repo = repo
|
|
||||||
self.ref = ref
|
|
||||||
self.children = children
|
|
||||||
|
|
||||||
def resolve(self):
|
|
||||||
recursive = True
|
|
||||||
data = self.ref
|
|
||||||
while recursive:
|
|
||||||
data, recursive, _ = self.references(data)
|
|
||||||
return data
|
|
||||||
|
|
||||||
def references(self, ref=None):
|
|
||||||
if ref is None:
|
|
||||||
ref = self.ref
|
|
||||||
try:
|
|
||||||
with open(self.repo.file(ref), 'r') as fp:
|
|
||||||
data = fp.read().strip()
|
|
||||||
except FileNotFoundError:
|
|
||||||
prefix_dir = os.path.join('objects', ref[0:2])
|
|
||||||
path = self.repo.dir(prefix_dir)
|
|
||||||
if path:
|
|
||||||
rem = ref[2:]
|
|
||||||
candidates = []
|
|
||||||
for f in os.listdir(path):
|
|
||||||
if f.startswith(rem):
|
|
||||||
candidates.append("{}{}".format(ref, f))
|
|
||||||
if len(candidates) > 1:
|
|
||||||
s = ', '.join(candidates)
|
|
||||||
raise MGException(
|
|
||||||
'Ambiguous revision {}\nCandidates: {}'.format(ref, s)
|
|
||||||
)
|
|
||||||
if len(candidates) == 1:
|
|
||||||
return candidates[0], False, False
|
|
||||||
|
|
||||||
|
|
||||||
head = os.path.join('refs', 'heads', ref)
|
|
||||||
tag = os.path.join('refs', 'tags', ref)
|
|
||||||
if os.path.exists(self.repo.file(head)):
|
|
||||||
res, rec, _ = self.references(head)
|
|
||||||
return res, rec, head
|
|
||||||
if os.path.exists(self.repo.file(tag)):
|
|
||||||
res, rec, _ = self.references(tag)
|
|
||||||
return res, rec, tag
|
|
||||||
return ref, False, False
|
|
||||||
if data.startswith('ref: '):
|
|
||||||
return data[5:], True, False
|
|
||||||
return data, False, False
|
|
||||||
|
|
||||||
def changes_ref(self):
|
|
||||||
recursive = True
|
|
||||||
data = self.ref
|
|
||||||
while recursive:
|
|
||||||
data, recursive, new_ref = self.references(data)
|
|
||||||
return new_ref
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def list(repo, path=None):
|
|
||||||
if not path:
|
|
||||||
path = repo.dir('refs')
|
|
||||||
res = []
|
|
||||||
|
|
||||||
for f in sorted(os.listdir(path)):
|
|
||||||
can = os.path.join(path, f)
|
|
||||||
if os.path.isdir(can):
|
|
||||||
res.append(MGRef(repo, can, MGRef.list(repo, can)))
|
|
||||||
else:
|
|
||||||
res.append(MGRef(repo, can))
|
|
||||||
|
|
||||||
return res
|
|
||||||
|
|
||||||
def show(self):
|
|
||||||
if self.children is None:
|
|
||||||
ref = self.ref.replace(self.repo.dir(''), '')
|
|
||||||
print('{} {}'.format(self.resolve(), ref))
|
|
||||||
else:
|
|
||||||
for ref in self.children:
|
|
||||||
ref.show()
|
|
||||||
|
|
||||||
|
|
||||||
class MGCatch(click.Group):
|
|
||||||
def invoke(self, ctx):
|
|
||||||
try:
|
|
||||||
return super().invoke(ctx)
|
|
||||||
except MGException as exc:
|
|
||||||
print('fatal: {}'.format(exc))
|
|
||||||
|
|
||||||
|
|
||||||
@click.group(help='The stupid content tracker',
|
|
||||||
cls=MGCatch)
|
|
||||||
def cli(argv=None):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
@cli.command()
|
|
||||||
def add():
|
|
||||||
raise MGException('Unimplemented')
|
|
||||||
|
|
||||||
|
|
||||||
@cli.command(help="List, create, or delete branches")
|
|
||||||
@click.argument("name", default=None, required=False)
|
|
||||||
@click.option("-d", is_flag=True, help="Delete a branch.")
|
|
||||||
def branch(name, d):
|
|
||||||
repo = MGRepository('.')
|
|
||||||
ref = MGRef(repo, 'HEAD')
|
|
||||||
if d and name:
|
|
||||||
branch_file = repo.file(os.path.join('refs', 'heads', name))
|
|
||||||
if not os.path.exists(branch_file):
|
|
||||||
raise MGException('Branch \'{}\' not found'.format(name))
|
|
||||||
os.remove(branch_file)
|
|
||||||
elif name:
|
|
||||||
branch_file = repo.file(os.path.join('refs', 'heads', name))
|
|
||||||
if os.path.exists(branch_file):
|
|
||||||
raise MGException('Branch \'{}\' already exists'.format(name))
|
|
||||||
with open(branch_file, 'w') as f:
|
|
||||||
f.write(ref.resolve())
|
|
||||||
elif d:
|
|
||||||
raise MGException('branch name required')
|
|
||||||
else:
|
|
||||||
refbranch, _, _2 = ref.references()
|
|
||||||
refbranch = os.path.basename(refbranch)
|
|
||||||
for branch in os.listdir(repo.file(os.path.join('refs', 'heads'))):
|
|
||||||
if refbranch == branch:
|
|
||||||
print('* {}'.format(click.style(branch, bg='yellow')))
|
|
||||||
else:
|
|
||||||
print(' {}'.format(click.style(branch, fg='yellow')))
|
|
||||||
|
|
||||||
|
|
||||||
@cli.command(
|
|
||||||
name='cat-file',
|
|
||||||
help='Provide content or type and size information for repository objects'
|
|
||||||
)
|
|
||||||
@click.argument(
|
|
||||||
't',
|
|
||||||
metavar='type',
|
|
||||||
type=click.Choice(['blob', 'commit', 'tag', 'tree'])
|
|
||||||
)
|
|
||||||
@click.argument('o', metavar='object')
|
|
||||||
def cat_file(t, o):
|
|
||||||
repo = MGRepository('.')
|
|
||||||
obj = MGObject.read(repo, o, DFLT_CANDIDATES)
|
|
||||||
print(obj.serialize().decode('utf-8'), end='')
|
|
||||||
|
|
||||||
|
|
||||||
@cli.command(help='Checkout a commit or restore files.')
|
|
||||||
@click.argument('commit', default='HEAD')
|
|
||||||
@click.option('-p', 'path', default='.', help='The path to check out.')
|
|
||||||
def checkout(commit, path):
|
|
||||||
repo = MGRepository('.')
|
|
||||||
|
|
||||||
commit = MGRef(repo, commit)
|
|
||||||
changes_ref = commit.changes_ref()
|
|
||||||
|
|
||||||
obj = MGObject.read(repo, commit.resolve(), DFLT_CANDIDATES)
|
|
||||||
|
|
||||||
if obj.understands == 'commit':
|
|
||||||
obj = MGObject.read(repo, obj.from_headers(b'tree', b'').decode('ascii'), DFLT_CANDIDATES)
|
|
||||||
|
|
||||||
if not os.path.exists(path):
|
|
||||||
os.makedirs(path)
|
|
||||||
|
|
||||||
obj.checkout(b'.', path.encode('utf-8'))
|
|
||||||
|
|
||||||
if changes_ref:
|
|
||||||
with open(repo.file('HEAD'), 'w') as f:
|
|
||||||
f.write('ref: {}'.format(changes_ref))
|
|
||||||
|
|
||||||
|
|
||||||
@cli.command()
|
|
||||||
@click.argument('message')
|
|
||||||
def commit(message):
|
|
||||||
repo = MGRepository('.')
|
|
||||||
parent = None
|
|
||||||
ref = MGRef(repo, 'HEAD')
|
|
||||||
references, _, _ = ref.references()
|
|
||||||
references = repo.file(references)
|
|
||||||
resolved = ref.resolve()
|
|
||||||
if os.path.exists(resolved):
|
|
||||||
with open(ref.resolve(), 'r') as f:
|
|
||||||
parent = f.read().strip()
|
|
||||||
commit = MGCommit.create(repo, message, parent)
|
|
||||||
os.makedirs(os.path.dirname(references), exist_ok=True)
|
|
||||||
with open(references, 'w+') as f:
|
|
||||||
f.write(commit.write())
|
|
||||||
f.write('\n')
|
|
||||||
|
|
||||||
|
|
||||||
@cli.command(
|
|
||||||
name='hash-object',
|
|
||||||
help='Compute object ID and optionally creates a blob from a file'
|
|
||||||
)
|
|
||||||
@click.option(
|
|
||||||
'-t',
|
|
||||||
't',
|
|
||||||
metavar='type',
|
|
||||||
type=click.Choice(['blob', 'commit', 'tag', 'tree']),
|
|
||||||
default='blob',
|
|
||||||
help='Specify the type'
|
|
||||||
)
|
|
||||||
@click.option(
|
|
||||||
'-w',
|
|
||||||
'write',
|
|
||||||
is_flag=True,
|
|
||||||
help='Actually write the object into the database'
|
|
||||||
)
|
|
||||||
@click.argument('path')
|
|
||||||
def hash_object(t, write, path):
|
|
||||||
repo = MGRepository('.') if write else None
|
|
||||||
|
|
||||||
try:
|
|
||||||
with open(path, 'rb') as f:
|
|
||||||
obj = MGObject.find_candidate(DFLT_CANDIDATES, t)(repo, f.read())
|
|
||||||
except OSError:
|
|
||||||
raise MGException('unable to hash {}'.format(path))
|
|
||||||
sha = obj.write(write)
|
|
||||||
print(sha)
|
|
||||||
|
|
||||||
|
|
||||||
@cli.command(help='Initialize a new, empty repository.')
|
|
||||||
@click.argument('path', default='.')
|
|
||||||
def init(path):
|
|
||||||
MGRepository(path, force=True).create()
|
|
||||||
|
|
||||||
|
|
||||||
@cli.command(help='Display history of a given commit.')
|
|
||||||
@click.argument('commit', default='HEAD')
|
|
||||||
def log(commit):
|
|
||||||
repo = MGRepository('.')
|
|
||||||
|
|
||||||
ref = MGRef(repo, commit)
|
|
||||||
|
|
||||||
logfn(repo, ref.resolve(), set())
|
|
||||||
|
|
||||||
|
|
||||||
def logfn(repo, sha, seen):
|
|
||||||
if sha in seen:
|
|
||||||
return
|
|
||||||
seen.add(sha)
|
|
||||||
|
|
||||||
commit = MGObject.read(repo, sha, DFLT_CANDIDATES)
|
|
||||||
|
|
||||||
print(commit.format())
|
|
||||||
|
|
||||||
if not b'parent' in commit.headers:
|
|
||||||
return
|
|
||||||
|
|
||||||
parents = commit.headers[b'parent']
|
|
||||||
|
|
||||||
for p in parents:
|
|
||||||
logfn(repo, p.decode('ascii'), seen)
|
|
||||||
|
|
||||||
|
|
||||||
@cli.command(help='Pretty-print a tree object.')
|
|
||||||
@click.argument('o', metavar='object')
|
|
||||||
@click.option('-r', is_flag=True, help='Recurse into sub-trees.')
|
|
||||||
def ls_tree(o, r):
|
|
||||||
repo = MGRepository('.')
|
|
||||||
|
|
||||||
def internal(o, r, prefix=''):
|
|
||||||
o = MGObject.read(repo, o, DFLT_CANDIDATES)
|
|
||||||
|
|
||||||
if o.understands != 'tree':
|
|
||||||
raise MGException('not a tree object')
|
|
||||||
|
|
||||||
for item in o.data:
|
|
||||||
path = os.path.join(prefix, item.path.decode('ascii'))
|
|
||||||
t = MGObject.read(repo, item.sha, DFLT_CANDIDATES).understands
|
|
||||||
if t == 'tree' and r:
|
|
||||||
internal(item.sha, r, prefix=path)
|
|
||||||
else:
|
|
||||||
print('{0} {1} {2}\t{3}'.format(
|
|
||||||
'0' * (6 - len(item.mode)) + item.mode.decode('ascii'),
|
|
||||||
t,
|
|
||||||
item.sha,
|
|
||||||
path))
|
|
||||||
|
|
||||||
internal(o, r)
|
|
||||||
|
|
||||||
|
|
||||||
@cli.command()
|
|
||||||
def merge():
|
|
||||||
raise MGException('Unimplemented')
|
|
||||||
|
|
||||||
|
|
||||||
@cli.command()
|
|
||||||
def rebase():
|
|
||||||
raise MGException('Unimplemented')
|
|
||||||
|
|
||||||
|
|
||||||
@cli.command()
|
|
||||||
@click.argument("name")
|
|
||||||
def rev_parse(name):
|
|
||||||
repo = MGRepository(".")
|
|
||||||
print(MGRef(repo, name).resolve())
|
|
||||||
|
|
||||||
|
|
||||||
@cli.command()
|
|
||||||
def rm():
|
|
||||||
raise MGException('Unimplemented')
|
|
||||||
|
|
||||||
|
|
||||||
@cli.command(help='List references.')
|
|
||||||
def show_ref():
|
|
||||||
repo = MGRepository('.')
|
|
||||||
refs = MGRef.list(repo)
|
|
||||||
for ref in refs:
|
|
||||||
ref.show()
|
|
||||||
|
|
||||||
|
|
||||||
@cli.command(help='List and create tags')
|
|
||||||
@click.argument('name', default=None, required=False)
|
|
||||||
@click.argument('o', metavar='object', default='HEAD')
|
|
||||||
def tag(name, o):
|
|
||||||
repo = MGRepository('.')
|
|
||||||
|
|
||||||
if name:
|
|
||||||
MGTag.create(repo, name, MGRef(repo, o))
|
|
||||||
else:
|
|
||||||
refs = MGRef.list(repo)
|
|
||||||
for ref in refs:
|
|
||||||
prefix = repo.dir(os.path.join('refs', 'tags'))
|
|
||||||
if ref.ref == prefix:
|
|
||||||
for child in ref.children:
|
|
||||||
print(child.ref.replace(os.path.join(prefix, ''), ''))
|
|
||||||
|
250
mg/commands.py
Normal file
250
mg/commands.py
Normal file
@@ -0,0 +1,250 @@
|
|||||||
|
import os
|
||||||
|
|
||||||
|
import click
|
||||||
|
|
||||||
|
from .core import *
|
||||||
|
|
||||||
|
class MGCatch(click.Group):
|
||||||
|
def invoke(self, ctx):
|
||||||
|
try:
|
||||||
|
return super().invoke(ctx)
|
||||||
|
except MGException as exc:
|
||||||
|
print('fatal: {}'.format(exc))
|
||||||
|
|
||||||
|
|
||||||
|
@click.group(help='The stupid content tracker',
|
||||||
|
cls=MGCatch)
|
||||||
|
def cli(argv=None):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
@cli.command()
|
||||||
|
def add():
|
||||||
|
raise MGException('Unimplemented')
|
||||||
|
|
||||||
|
|
||||||
|
@cli.command(help="List, create, or delete branches")
|
||||||
|
@click.argument("name", default=None, required=False)
|
||||||
|
@click.option("-d", is_flag=True, help="Delete a branch.")
|
||||||
|
def branch(name, d):
|
||||||
|
repo = MGRepository('.')
|
||||||
|
ref = MGRef(repo, 'HEAD')
|
||||||
|
if d and name:
|
||||||
|
branch_file = repo.file(os.path.join('refs', 'heads', name))
|
||||||
|
if not os.path.exists(branch_file):
|
||||||
|
raise MGException('Branch \'{}\' not found'.format(name))
|
||||||
|
os.remove(branch_file)
|
||||||
|
elif name:
|
||||||
|
branch_file = repo.file(os.path.join('refs', 'heads', name))
|
||||||
|
if os.path.exists(branch_file):
|
||||||
|
raise MGException('Branch \'{}\' already exists'.format(name))
|
||||||
|
with open(branch_file, 'w') as f:
|
||||||
|
f.write(ref.resolve())
|
||||||
|
elif d:
|
||||||
|
raise MGException('branch name required')
|
||||||
|
else:
|
||||||
|
refbranch, _, _2 = ref.references()
|
||||||
|
refbranch = os.path.basename(refbranch)
|
||||||
|
for branch in os.listdir(repo.file(os.path.join('refs', 'heads'))):
|
||||||
|
if refbranch == branch:
|
||||||
|
print('* {}'.format(click.style(branch, bg='yellow')))
|
||||||
|
else:
|
||||||
|
print(' {}'.format(click.style(branch, fg='yellow')))
|
||||||
|
|
||||||
|
|
||||||
|
@cli.command(
|
||||||
|
name='cat-file',
|
||||||
|
help='Provide content or type and size information for repository objects'
|
||||||
|
)
|
||||||
|
@click.argument(
|
||||||
|
't',
|
||||||
|
metavar='type',
|
||||||
|
type=click.Choice(['blob', 'commit', 'tag', 'tree'])
|
||||||
|
)
|
||||||
|
@click.argument('o', metavar='object')
|
||||||
|
def cat_file(t, o):
|
||||||
|
repo = MGRepository('.')
|
||||||
|
obj = MGObject.read(repo, o, DFLT_CANDIDATES)
|
||||||
|
print(obj.serialize().decode('utf-8'), end='')
|
||||||
|
|
||||||
|
|
||||||
|
@cli.command(help='Checkout a commit or restore files.')
|
||||||
|
@click.argument('commit', default='HEAD')
|
||||||
|
@click.option('-p', 'path', default='.', help='The path to check out.')
|
||||||
|
def checkout(commit, path):
|
||||||
|
repo = MGRepository('.')
|
||||||
|
|
||||||
|
commit = MGRef(repo, commit)
|
||||||
|
changes_ref = commit.changes_ref()
|
||||||
|
|
||||||
|
obj = MGObject.read(repo, commit.resolve(), DFLT_CANDIDATES)
|
||||||
|
|
||||||
|
if obj.understands == 'commit':
|
||||||
|
obj = MGObject.read(repo, obj.from_headers(b'tree', b'').decode('ascii'), DFLT_CANDIDATES)
|
||||||
|
|
||||||
|
if not os.path.exists(path):
|
||||||
|
os.makedirs(path)
|
||||||
|
|
||||||
|
obj.checkout(b'.', path.encode('utf-8'))
|
||||||
|
|
||||||
|
if changes_ref:
|
||||||
|
with open(repo.file('HEAD'), 'w') as f:
|
||||||
|
f.write('ref: {}'.format(changes_ref))
|
||||||
|
|
||||||
|
|
||||||
|
@cli.command()
|
||||||
|
@click.argument('message')
|
||||||
|
def commit(message):
|
||||||
|
repo = MGRepository('.')
|
||||||
|
parent = None
|
||||||
|
ref = MGRef(repo, 'HEAD')
|
||||||
|
references, _, _ = ref.references()
|
||||||
|
references = repo.file(references)
|
||||||
|
resolved = ref.resolve()
|
||||||
|
if os.path.exists(resolved):
|
||||||
|
with open(ref.resolve(), 'r') as f:
|
||||||
|
parent = f.read().strip()
|
||||||
|
commit = MGCommit.create(repo, message, parent)
|
||||||
|
os.makedirs(os.path.dirname(references), exist_ok=True)
|
||||||
|
with open(references, 'w+') as f:
|
||||||
|
f.write(commit.write())
|
||||||
|
f.write('\n')
|
||||||
|
|
||||||
|
|
||||||
|
@cli.command(
|
||||||
|
name='hash-object',
|
||||||
|
help='Compute object ID and optionally creates a blob from a file'
|
||||||
|
)
|
||||||
|
@click.option(
|
||||||
|
'-t',
|
||||||
|
't',
|
||||||
|
metavar='type',
|
||||||
|
type=click.Choice(['blob', 'commit', 'tag', 'tree']),
|
||||||
|
default='blob',
|
||||||
|
help='Specify the type'
|
||||||
|
)
|
||||||
|
@click.option(
|
||||||
|
'-w',
|
||||||
|
'write',
|
||||||
|
is_flag=True,
|
||||||
|
help='Actually write the object into the database'
|
||||||
|
)
|
||||||
|
@click.argument('path')
|
||||||
|
def hash_object(t, write, path):
|
||||||
|
repo = MGRepository('.') if write else None
|
||||||
|
|
||||||
|
try:
|
||||||
|
with open(path, 'rb') as f:
|
||||||
|
obj = MGObject.find_candidate(DFLT_CANDIDATES, t)(repo, f.read())
|
||||||
|
except OSError:
|
||||||
|
raise MGException('unable to hash {}'.format(path))
|
||||||
|
sha = obj.write(write)
|
||||||
|
print(sha)
|
||||||
|
|
||||||
|
|
||||||
|
@cli.command(help='Initialize a new, empty repository.')
|
||||||
|
@click.argument('path', default='.')
|
||||||
|
def init(path):
|
||||||
|
MGRepository(path, force=True).create()
|
||||||
|
|
||||||
|
|
||||||
|
@cli.command(help='Display history of a given commit.')
|
||||||
|
@click.argument('commit', default='HEAD')
|
||||||
|
def log(commit):
|
||||||
|
repo = MGRepository('.')
|
||||||
|
|
||||||
|
ref = MGRef(repo, commit)
|
||||||
|
|
||||||
|
logfn(repo, ref.resolve(), set())
|
||||||
|
|
||||||
|
|
||||||
|
def logfn(repo, sha, seen):
|
||||||
|
if sha in seen:
|
||||||
|
return
|
||||||
|
seen.add(sha)
|
||||||
|
|
||||||
|
commit = MGObject.read(repo, sha, DFLT_CANDIDATES)
|
||||||
|
|
||||||
|
print(commit.format())
|
||||||
|
|
||||||
|
if not b'parent' in commit.headers:
|
||||||
|
return
|
||||||
|
|
||||||
|
parents = commit.headers[b'parent']
|
||||||
|
|
||||||
|
for p in parents:
|
||||||
|
logfn(repo, p.decode('ascii'), seen)
|
||||||
|
|
||||||
|
|
||||||
|
@cli.command(help='Pretty-print a tree object.')
|
||||||
|
@click.argument('o', metavar='object')
|
||||||
|
@click.option('-r', is_flag=True, help='Recurse into sub-trees.')
|
||||||
|
def ls_tree(o, r):
|
||||||
|
repo = MGRepository('.')
|
||||||
|
|
||||||
|
def internal(o, r, prefix=''):
|
||||||
|
o = MGObject.read(repo, o, DFLT_CANDIDATES)
|
||||||
|
|
||||||
|
if o.understands != 'tree':
|
||||||
|
raise MGException('not a tree object')
|
||||||
|
|
||||||
|
for item in o.data:
|
||||||
|
path = os.path.join(prefix, item.path.decode('ascii'))
|
||||||
|
t = MGObject.read(repo, item.sha, DFLT_CANDIDATES).understands
|
||||||
|
if t == 'tree' and r:
|
||||||
|
internal(item.sha, r, prefix=path)
|
||||||
|
else:
|
||||||
|
print('{0} {1} {2}\t{3}'.format(
|
||||||
|
'0' * (6 - len(item.mode)) + item.mode.decode('ascii'),
|
||||||
|
t,
|
||||||
|
item.sha,
|
||||||
|
path))
|
||||||
|
|
||||||
|
internal(o, r)
|
||||||
|
|
||||||
|
|
||||||
|
@cli.command()
|
||||||
|
def merge():
|
||||||
|
raise MGException('Unimplemented')
|
||||||
|
|
||||||
|
|
||||||
|
@cli.command()
|
||||||
|
def rebase():
|
||||||
|
raise MGException('Unimplemented')
|
||||||
|
|
||||||
|
|
||||||
|
@cli.command()
|
||||||
|
@click.argument("name")
|
||||||
|
def rev_parse(name):
|
||||||
|
repo = MGRepository(".")
|
||||||
|
print(MGRef(repo, name).resolve())
|
||||||
|
|
||||||
|
|
||||||
|
@cli.command()
|
||||||
|
def rm():
|
||||||
|
raise MGException('Unimplemented')
|
||||||
|
|
||||||
|
|
||||||
|
@cli.command(help='List references.')
|
||||||
|
def show_ref():
|
||||||
|
repo = MGRepository('.')
|
||||||
|
refs = MGRef.list(repo)
|
||||||
|
for ref in refs:
|
||||||
|
ref.show()
|
||||||
|
|
||||||
|
|
||||||
|
@cli.command(help='List and create tags')
|
||||||
|
@click.argument('name', default=None, required=False)
|
||||||
|
@click.argument('o', metavar='object', default='HEAD')
|
||||||
|
def tag(name, o):
|
||||||
|
repo = MGRepository('.')
|
||||||
|
|
||||||
|
if name:
|
||||||
|
MGTag.create(repo, name, MGRef(repo, o))
|
||||||
|
else:
|
||||||
|
refs = MGRef.list(repo)
|
||||||
|
for ref in refs:
|
||||||
|
prefix = repo.dir(os.path.join('refs', 'tags'))
|
||||||
|
if ref.ref == prefix:
|
||||||
|
for child in ref.children:
|
||||||
|
print(child.ref.replace(os.path.join(prefix, ''), ''))
|
452
mg/core.py
Normal file
452
mg/core.py
Normal file
@@ -0,0 +1,452 @@
|
|||||||
|
import collections
|
||||||
|
import configparser
|
||||||
|
import datetime
|
||||||
|
import hashlib
|
||||||
|
import os
|
||||||
|
import stat
|
||||||
|
import time
|
||||||
|
import zlib
|
||||||
|
|
||||||
|
|
||||||
|
class MGException(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class MGRepository:
|
||||||
|
def __init__(self, path, force=False):
|
||||||
|
self.worktree = path
|
||||||
|
self.gitdir = (self.find(path, required=not force) or
|
||||||
|
os.path.join(path, '.git'))
|
||||||
|
|
||||||
|
# Read configuration file in .git/config
|
||||||
|
self.conf = configparser.ConfigParser()
|
||||||
|
cf = self.file('config')
|
||||||
|
|
||||||
|
if cf and os.path.exists(cf):
|
||||||
|
self.conf.read([cf])
|
||||||
|
elif not force:
|
||||||
|
raise MGException('Can’t find the configuration file!')
|
||||||
|
|
||||||
|
self.ignore_patterns = []
|
||||||
|
|
||||||
|
if os.path.exists(os.path.expanduser('~/.gitignore')):
|
||||||
|
with open(os.path.expanduser('~/.gitignore'), 'r') as f:
|
||||||
|
self.ignore_patterns.extend(l.strip() for l in f.readlines() if l.strip())
|
||||||
|
|
||||||
|
if not force:
|
||||||
|
vers = int(self.conf.get('core', 'repositoryformatversion'))
|
||||||
|
if vers != 0 and not force:
|
||||||
|
raise MGException(
|
||||||
|
'Expected git repo version <= 1, found {}'.format(vers)
|
||||||
|
)
|
||||||
|
|
||||||
|
def path(self, *path):
|
||||||
|
return os.path.join(self.gitdir, *path)
|
||||||
|
|
||||||
|
def file(self, *path, mkdir=False):
|
||||||
|
if self.dir(*path[:-1], mkdir=mkdir):
|
||||||
|
return self.path(*path)
|
||||||
|
|
||||||
|
def dir(self, *path, mkdir=False):
|
||||||
|
path = self.path(*path)
|
||||||
|
|
||||||
|
if os.path.exists(path):
|
||||||
|
if not os.path.isdir(path):
|
||||||
|
raise MGException('{} is not a directory!'.format(path))
|
||||||
|
return path
|
||||||
|
|
||||||
|
if not mkdir:
|
||||||
|
return None
|
||||||
|
|
||||||
|
os.makedirs(path)
|
||||||
|
return path
|
||||||
|
|
||||||
|
def create(self):
|
||||||
|
if os.path.exists(self.worktree):
|
||||||
|
if not os.path.isdir(self.worktree):
|
||||||
|
raise MGException(
|
||||||
|
'{} is not a directory!',format(self.worktree)
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
os.makedirs(self.worktree)
|
||||||
|
|
||||||
|
self.dir('branches', mkdir=True)
|
||||||
|
self.dir('objects', mkdir=True)
|
||||||
|
self.dir('refs', 'tags', mkdir=True)
|
||||||
|
self.dir('refs', 'heads', mkdir=True)
|
||||||
|
|
||||||
|
# .git/description
|
||||||
|
with open(self.file('description'), 'w') as f:
|
||||||
|
f.write(
|
||||||
|
'Unnamed repository; edit this file \'description\' to name the repository.\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
# .git/HEAD
|
||||||
|
with open(self.file('HEAD'), 'w') as f:
|
||||||
|
f.write('ref: refs/heads/master\n')
|
||||||
|
|
||||||
|
with open(self.file('config'), 'w') as f:
|
||||||
|
config = self.default_config()
|
||||||
|
config.write(f)
|
||||||
|
|
||||||
|
def default_config(self):
|
||||||
|
ret = configparser.ConfigParser()
|
||||||
|
|
||||||
|
ret.add_section('core')
|
||||||
|
ret.set('core', 'repositoryformatversion', '0')
|
||||||
|
ret.set('core', 'filemode', 'false')
|
||||||
|
ret.set('core', 'bare', 'false')
|
||||||
|
|
||||||
|
return ret
|
||||||
|
|
||||||
|
def find(self, path, required=True):
|
||||||
|
path = os.path.realpath(path)
|
||||||
|
dir_candidate = os.path.join(path, '.git')
|
||||||
|
|
||||||
|
if os.path.isdir(dir_candidate):
|
||||||
|
return dir_candidate
|
||||||
|
|
||||||
|
# If we haven't returned, recurse in parent, if w
|
||||||
|
parent = os.path.realpath(os.path.join(path, '..'))
|
||||||
|
|
||||||
|
if parent == path:
|
||||||
|
# Bottom case
|
||||||
|
# os.path.join('/', '..') == '/':
|
||||||
|
# If parent==path, then path is root.
|
||||||
|
if required:
|
||||||
|
raise MGException('not a git repository (or any of the parent directories): .git')
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Recursive case
|
||||||
|
return self.find(parent, required)
|
||||||
|
|
||||||
|
|
||||||
|
class MGObject:
|
||||||
|
def __init__(self, repo, data=None, sha=''):
|
||||||
|
self.repo = repo
|
||||||
|
self.sha = "{:>040s}".format(sha)
|
||||||
|
|
||||||
|
if data:
|
||||||
|
self.deserialize(data)
|
||||||
|
|
||||||
|
def serialize(self):
|
||||||
|
raise NotImplemented('serialize needs to be implemented!')
|
||||||
|
|
||||||
|
def deserialize(self, data):
|
||||||
|
raise NotImplemented('deserialize needs to be implemented!')
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def read(repo, sha, candidates):
|
||||||
|
path = repo.file('objects', str(sha[0:2]), str(sha[2:]))
|
||||||
|
|
||||||
|
if not path:
|
||||||
|
raise MGException('{}: bad file'.format(sha))
|
||||||
|
|
||||||
|
with open(path, 'rb') as f:
|
||||||
|
raw = zlib.decompress(f.read())
|
||||||
|
|
||||||
|
# Read object type
|
||||||
|
space = raw.find(b' ')
|
||||||
|
fmt = raw[0:space].decode('utf-8')
|
||||||
|
|
||||||
|
# Read and validate object size
|
||||||
|
end = raw.find(b'\x00', space)
|
||||||
|
size = int(raw[space:end].decode('ascii'))
|
||||||
|
if size != len(raw)-end-1:
|
||||||
|
raise MGException('Malformed object {}: bad length'.format(sha))
|
||||||
|
|
||||||
|
# Call constructor and return object
|
||||||
|
return MGObject.find_candidate(candidates, fmt)(repo, raw[end+1:], sha=sha)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def find_candidate(candidates, fmt):
|
||||||
|
candidates = [c for c in candidates if c.understands == fmt]
|
||||||
|
if not candidates:
|
||||||
|
raise MGException('Unknown object type {}'.format(fmt))
|
||||||
|
if len(candidates) != 1:
|
||||||
|
raise MGException(
|
||||||
|
'Multiple classes ({}) claim to understand object type {}'.format(candidates, fmt)
|
||||||
|
)
|
||||||
|
|
||||||
|
return candidates[0]
|
||||||
|
|
||||||
|
|
||||||
|
def write(self, write=True):
|
||||||
|
# Serialize object data
|
||||||
|
data = self.serialize()
|
||||||
|
# Add header
|
||||||
|
result = b'%s %d\x00%s' % (self.understands.encode(), len(data), data)
|
||||||
|
# Compute hash
|
||||||
|
sha = hashlib.sha1(result).hexdigest()
|
||||||
|
|
||||||
|
if write:
|
||||||
|
# Compute path
|
||||||
|
path = self.repo.file('objects', sha[0:2], sha[2:], mkdir=write)
|
||||||
|
|
||||||
|
if not os.path.exists(path):
|
||||||
|
with open(path, 'wb') as f:
|
||||||
|
# Compress and write
|
||||||
|
f.write(zlib.compress(result))
|
||||||
|
|
||||||
|
return sha
|
||||||
|
|
||||||
|
|
||||||
|
class MGBlob(MGObject):
|
||||||
|
understands = 'blob'
|
||||||
|
|
||||||
|
def serialize(self):
|
||||||
|
return self.data
|
||||||
|
|
||||||
|
def deserialize(self, data):
|
||||||
|
self.data = data
|
||||||
|
|
||||||
|
|
||||||
|
class MGCommit(MGObject):
|
||||||
|
understands = 'commit'
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
self.headers = collections.OrderedDict()
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
|
||||||
|
def deserialize(self, raw, offs=0):
|
||||||
|
spc = raw.find(b' ', offs)
|
||||||
|
nl = raw.find(b'\n', offs)
|
||||||
|
|
||||||
|
if spc < 0 or nl < spc:
|
||||||
|
self.data = raw[offs+1:]
|
||||||
|
return
|
||||||
|
|
||||||
|
key = raw[offs:spc]
|
||||||
|
|
||||||
|
end = raw.find(b'\n', offs+1)
|
||||||
|
while raw[end+1] == ord(' '):
|
||||||
|
end = raw.find(b'\n', end+1)
|
||||||
|
|
||||||
|
value = raw[spc+1:end].replace(b'\n ', b'\n')
|
||||||
|
|
||||||
|
if key in self.headers:
|
||||||
|
self.headers[key].append(value)
|
||||||
|
else:
|
||||||
|
self.headers[key] = [value]
|
||||||
|
|
||||||
|
return self.deserialize(raw, offs=end+1)
|
||||||
|
|
||||||
|
def serialize(self):
|
||||||
|
ret = b''
|
||||||
|
|
||||||
|
for k, v in self.headers.items():
|
||||||
|
for val in v:
|
||||||
|
ret += k + b' ' + val.replace(b'\n', b'\n ') + b'\n'
|
||||||
|
|
||||||
|
ret += b'\n' + self.data
|
||||||
|
|
||||||
|
return ret
|
||||||
|
|
||||||
|
def from_headers(self, header, dflt):
|
||||||
|
return self.headers.get(header, [dflt])[0]
|
||||||
|
|
||||||
|
def format(self):
|
||||||
|
fmt_str = '{}\nAuthor: {}\nDate: {}\n\n{}'
|
||||||
|
author_info = self.from_headers(b'author', b'unknown').decode('utf-8')
|
||||||
|
name, email, timestamp, tz = author_info.split(' ')
|
||||||
|
author = '{} {}'.format(name, email)
|
||||||
|
date = datetime.datetime.fromtimestamp(int(timestamp))
|
||||||
|
# were cheating with the timezone, but thats okay
|
||||||
|
date = '{}{}'.format(date.strftime('%a %b %H:%M:%S %Y %z'), tz)
|
||||||
|
commit_info = '\n'.join(' {}'.format(l) for l in self.data.decode('utf-8').split('\n'))
|
||||||
|
yellow = click.style('commit {}'.format(self.sha), fg='yellow')
|
||||||
|
return fmt_str.format(yellow, author, date, commit_info)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def create(repo, msg, parent=None):
|
||||||
|
res = MGCommit(repo)
|
||||||
|
res.headers[b'tree'] = [MGTree.create(repo, '.').write().encode('ascii')]
|
||||||
|
conf = configparser.ConfigParser()
|
||||||
|
conf.read([os.path.expanduser('~/.gitconfig')])
|
||||||
|
res.headers[b'author'] = [b'%s <%s> %d %s' % (
|
||||||
|
conf.get('user', 'name').encode('utf-8'),
|
||||||
|
conf.get('user', 'email').encode('utf-8'),
|
||||||
|
int(datetime.datetime.now().timestamp()),
|
||||||
|
time.strftime('%z', time.gmtime()).encode('utf-8')
|
||||||
|
)]
|
||||||
|
res.headers[b'committer'] = res.headers[b'author']
|
||||||
|
if parent:
|
||||||
|
res.headers[b'parent'] = [parent.encode('ascii')]
|
||||||
|
res.data = msg.encode('utf-8')
|
||||||
|
return res
|
||||||
|
|
||||||
|
|
||||||
|
class MGTree(MGObject):
|
||||||
|
understands = 'tree'
|
||||||
|
|
||||||
|
class MGLeaf:
|
||||||
|
def __init__(self, mode, path, sha):
|
||||||
|
self.mode = mode
|
||||||
|
self.path = path
|
||||||
|
self.sha = "{:>020s}".format(sha)
|
||||||
|
|
||||||
|
def deserialize_leaf(self, raw, offs=0):
|
||||||
|
space = raw.find(b' ', offs)
|
||||||
|
mode = raw[offs:space]
|
||||||
|
|
||||||
|
null = raw.find(b'\x00', space)
|
||||||
|
path = raw[space+1:null]
|
||||||
|
end = null+21
|
||||||
|
|
||||||
|
sha = format(int.from_bytes(raw[null+1:end], 'big'), 'x')
|
||||||
|
sha = "{:>040s}".format(sha)
|
||||||
|
|
||||||
|
return end, MGTree.MGLeaf(mode, path, sha)
|
||||||
|
|
||||||
|
def deserialize(self, raw):
|
||||||
|
self.data = []
|
||||||
|
pos = 0
|
||||||
|
end = len(raw)
|
||||||
|
while pos < end:
|
||||||
|
pos, data = self.deserialize_leaf(raw, pos)
|
||||||
|
self.data.append(data)
|
||||||
|
|
||||||
|
def serialize(self):
|
||||||
|
ret = b''
|
||||||
|
for i in self.data:
|
||||||
|
sha_str = int(i.sha, 16).to_bytes(20, byteorder='big')
|
||||||
|
ret += i.mode + b' ' + i.path + b'\x00' + sha_str
|
||||||
|
return ret
|
||||||
|
|
||||||
|
def check_prefix(self, d, f):
|
||||||
|
d = os.path.realpath(d)
|
||||||
|
f = os.path.realpath(f)
|
||||||
|
|
||||||
|
if os.path.isdir(d): f = os.path.join(d, b'')
|
||||||
|
if os.path.isdir(f): f = os.path.join(f, b'')
|
||||||
|
|
||||||
|
common = os.path.commonprefix([f, d])
|
||||||
|
|
||||||
|
return common == d or common == f
|
||||||
|
|
||||||
|
def checkout(self, path, prefix=b'.'):
|
||||||
|
for item in self.data:
|
||||||
|
obj = MGObject.read(self.repo, item.sha, DFLT_CANDIDATES)
|
||||||
|
dest = os.path.join(path, item.path)
|
||||||
|
|
||||||
|
if obj.understands == 'tree' and self.check_prefix(prefix, dest):
|
||||||
|
if not os.path.exists(dest):
|
||||||
|
os.mkdir(dest)
|
||||||
|
obj.checkout(dest)
|
||||||
|
if obj.understands == 'blob':
|
||||||
|
with open(dest, 'wb') as f:
|
||||||
|
f.write(obj.data)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def create(repo, path):
|
||||||
|
res = MGTree(repo)
|
||||||
|
res.data = []
|
||||||
|
|
||||||
|
for (d, _, files) in os.walk(path):
|
||||||
|
for f in files:
|
||||||
|
item = os.path.join(d, f)
|
||||||
|
if '/.git/' in item or any(p in item for p in repo.ignore_patterns):
|
||||||
|
continue
|
||||||
|
mode = format(os.stat(item)[stat.ST_MODE], 'o').encode('ascii')
|
||||||
|
|
||||||
|
candidate = MGObject.find_candidate(DFLT_CANDIDATES, 'blob')
|
||||||
|
with open(item, 'rb') as f:
|
||||||
|
obj = candidate(repo, f.read())
|
||||||
|
sha = obj.write()
|
||||||
|
item = item[2:]
|
||||||
|
res.data.append(MGTree.MGLeaf(mode, item.encode('utf-8'), sha))
|
||||||
|
|
||||||
|
return res
|
||||||
|
|
||||||
|
|
||||||
|
class MGTag(MGCommit):
|
||||||
|
understands = 'tag'
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def create(repo, name, to):
|
||||||
|
with open(repo.file(os.path.join('refs', 'tags', name)), 'w+') as f:
|
||||||
|
f.write(to.resolve())
|
||||||
|
|
||||||
|
|
||||||
|
DFLT_CANDIDATES = [MGBlob, MGCommit, MGTree, MGTag]
|
||||||
|
|
||||||
|
|
||||||
|
class MGRef:
|
||||||
|
def __init__(self, repo, ref, children=None):
|
||||||
|
self.repo = repo
|
||||||
|
self.ref = ref
|
||||||
|
self.children = children
|
||||||
|
|
||||||
|
def resolve(self):
|
||||||
|
recursive = True
|
||||||
|
data = self.ref
|
||||||
|
while recursive:
|
||||||
|
data, recursive, _ = self.references(data)
|
||||||
|
return data
|
||||||
|
|
||||||
|
def references(self, ref=None):
|
||||||
|
if ref is None:
|
||||||
|
ref = self.ref
|
||||||
|
try:
|
||||||
|
with open(self.repo.file(ref), 'r') as fp:
|
||||||
|
data = fp.read().strip()
|
||||||
|
except FileNotFoundError:
|
||||||
|
prefix_dir = os.path.join('objects', ref[0:2])
|
||||||
|
path = self.repo.dir(prefix_dir)
|
||||||
|
if path:
|
||||||
|
rem = ref[2:]
|
||||||
|
candidates = []
|
||||||
|
for f in os.listdir(path):
|
||||||
|
if f.startswith(rem):
|
||||||
|
candidates.append("{}{}".format(ref, f))
|
||||||
|
if len(candidates) > 1:
|
||||||
|
s = ', '.join(candidates)
|
||||||
|
raise MGException(
|
||||||
|
'Ambiguous revision {}\nCandidates: {}'.format(ref, s)
|
||||||
|
)
|
||||||
|
if len(candidates) == 1:
|
||||||
|
return candidates[0], False, False
|
||||||
|
|
||||||
|
|
||||||
|
head = os.path.join('refs', 'heads', ref)
|
||||||
|
tag = os.path.join('refs', 'tags', ref)
|
||||||
|
if os.path.exists(self.repo.file(head)):
|
||||||
|
res, rec, _ = self.references(head)
|
||||||
|
return res, rec, head
|
||||||
|
if os.path.exists(self.repo.file(tag)):
|
||||||
|
res, rec, _ = self.references(tag)
|
||||||
|
return res, rec, tag
|
||||||
|
return ref, False, False
|
||||||
|
if data.startswith('ref: '):
|
||||||
|
return data[5:], True, False
|
||||||
|
return data, False, False
|
||||||
|
|
||||||
|
def changes_ref(self):
|
||||||
|
recursive = True
|
||||||
|
data = self.ref
|
||||||
|
while recursive:
|
||||||
|
data, recursive, new_ref = self.references(data)
|
||||||
|
return new_ref
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def list(repo, path=None):
|
||||||
|
if not path:
|
||||||
|
path = repo.dir('refs')
|
||||||
|
res = []
|
||||||
|
|
||||||
|
for f in sorted(os.listdir(path)):
|
||||||
|
can = os.path.join(path, f)
|
||||||
|
if os.path.isdir(can):
|
||||||
|
res.append(MGRef(repo, can, MGRef.list(repo, can)))
|
||||||
|
else:
|
||||||
|
res.append(MGRef(repo, can))
|
||||||
|
|
||||||
|
return res
|
||||||
|
|
||||||
|
def show(self):
|
||||||
|
if self.children is None:
|
||||||
|
ref = self.ref.replace(self.repo.dir(''), '')
|
||||||
|
print('{} {}'.format(self.resolve(), ref))
|
||||||
|
else:
|
||||||
|
for ref in self.children:
|
||||||
|
ref.show()
|
Reference in New Issue
Block a user