#!/usr/bin/python3
# TODO: option to print out full path name; most useful in the .json output format
VERSION = "0.4"
import os, sys, argparse, xattr, json, sqlite3
# global mutables
answer_json = []
verbose = debug = 0
db = None
xattr_comment = "user.xdg.comment"
xattr_author = "user.xdg.comment.author"
xattr_date = "user.xdg.comment.date"
mode = "db"
#======= debugging/verbose ===========
def print_d(*a):
if debug:
print('>>', *a)
def errorBox(*a):
print(*a)
# >>> snip here <<<
#============ the DnDataBase, UiHelper and FileObj code is shared with other dirnotes programs
import getpass, time, stat, shutil
DEFAULT_CONFIG_FILE = "~/.config/dirnotes/dirnotes.conf" # or /etc/dirnotes.conf
# config
# we could store the config in the database, in a second table
# or in a .json file
DEFAULT_CONFIG = {"xattr_tag":"user.xdg.comment",
"database":"~/.local/share/dirnotes/dirnotes.db",
"start_mode":"xattr",
"options for database":("~/.local/share/dirnotes/dirnotes.db","~/.dirnotes.db","/etc/dirnotes.db"),
"options for start_mode":("db","xattr")
}
class ConfigLoader: # singleton
def __init__(self, configFile):
configFile = os.path.expanduser(configFile)
try:
with open(configFile,"r") as f:
config = json.load(f)
except json.JSONDecodeError:
errorBox(f"problem reading config file {configFile}; check the JSON syntax")
config = DEFAULT_CONFIG
except FileNotFoundError:
errorBox(f"config file {configFile} not found; using the default settings")
config = DEFAULT_CONFIG
try:
os.makedirs(os.path.dirname(configFile),exist_ok = True)
with open(configFile,"w") as f:
json.dump(config,f,indent=4)
except:
errorBox(f"problem creating the config file {configFile}")
self.dbName = os.path.expanduser(config["database"])
self.mode = config["start_mode"] # can get over-ruled by the command line options
self.xattr_comment = config["xattr_tag"]
class DnDataBase:
''' the database is flat
fileName: fully qualified name
st_mtime: a float
size: a long
comment: a string
comment_time: a float, the time of the comment save
author: the username that created the comment
this object: 1) finds or creates the database
2) determine if it's readonly
TODO: the database is usually associated with a user, in $XDG_DATA_HOME (~/.local/share/)
TODO: if the database is not found, create it in $XDG_DATA_DIRS (/usr/local/share/)
make it 0666 permissions (rw-rw-rw-)
'''
def __init__(self,dbFile):
'''try to open the database; if not found, create it'''
try:
self.db = sqlite3.connect(dbFile)
except sqlite3.OperationalError:
print_d(f"Database {dbFile} not found")
try:
os.makedirs(os.path.dirname(dbFile), exist_ok = True)
self.db = sqlite3.connect(dbFile)
except (sqlite3.OperationalError, PermissionError):
printd(f"Failed to create {dbFile}, aborting")
raise
# create new table if it doesn't exist
try:
self.db.execute("select * from dirnotes")
except sqlite3.OperationalError:
self.db.execute("create table dirnotes (name TEXT, date DATETIME, size INTEGER, comment TEXT, comment_date DATETIME, author TEXT)")
self.db.execute("create index dirnotes_i on dirnotes(name)")
print_d(f"Table dirnotes created")
# at this point, if a shared database is required, somebody needs to set perms to 0o666
self.writable = True
try:
self.db.execute("pragma user_verson=0")
except sqlite3.OperationalError:
self.writable = False
DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
class UiHelper:
@staticmethod
def epochToDb(epoch):
return time.strftime(DATE_FORMAT,time.localtime(epoch))
@staticmethod
def DbToEpoch(dbTime):
return time.mktime(time.strptime(dbTime,DATE_FORMAT))
@staticmethod
def getShortDate(longDate):
now = time.time()
diff = now - longDate
if diff > YEAR:
fmt = "%b %e %Y"
else:
fmt = "%b %e %H:%M"
return time.strftime(fmt, time.localtime(longDate))
@staticmethod
def getShortSize(fo):
if fo.isDir():
return "
"
elif fo.isLink():
return " "
size = fo.getSize()
log = int((math.log10(size+1)-2)/3)
s = " KMGTE"[log]
base = int(size/math.pow(10,log*3))
return f"{base}{s}".strip().rjust(7)
## one for each file
## and a special one for ".." parent directory
class FileObj:
""" The FileObj knows about both kinds of comments. """
def __init__(self, fileName, db):
self.fileName = os.path.abspath(fileName) # full path; dirs end WITHOUT a terminal /
self.stat = os.lstat(self.fileName)
self.displayName = os.path.split(fileName)[1] # base name; dirs end with a /
if self.isDir():
if not self.displayName.endswith('/'):
self.displayName += '/'
self.date = self.stat.st_mtime
self.size = self.stat.st_size
self.db = db
def getName(self):
""" returns the absolute pathname """
return self.fileName
def getDisplayName(self):
""" returns just the basename of the file; dirs end in / """
return self.displayName
def getDbData(self):
""" returns (comment, author, comment_date) """
if not hasattr(self,'dbCommentAuthorDate'):
cad = self.db.execute("select comment, author, comment_date from dirnotes where name=? order by comment_date desc",(self.fileName,)).fetchone()
self.dbCommentAuthorDate = cad if cad else (None, None, None)
return self.dbCommentAuthorDate
def getDbComment(self):
return self.getDbData()[0]
def getXattrData(self):
""" returns (comment, author, comment_date) """
if not hasattr(self,'xattrCommentAuthorDate'):
c = a = d = None
try:
c = os.getxattr(self.fileName, xattr_comment, follow_symlinks=False).decode()
a = os.getxattr(self.fileName, xattr_author, follow_symlinks=False).decode()
d = os.getxattr(self.fileName, xattr_date, follow_symlinks=False).decode()
except: # no xattr comment
pass
self.xattrCommentAuthorDate = c,a,d
return self.xattrCommentAuthorDate
def getXattrComment(self):
return self.getXattrData()[0]
def setDbComment(self,newComment):
# how are we going to hook this?
#if not self.db.writable:
# errorBox("The database is readonly; you cannot add or edit comments")
# return
s = os.lstat(self.fileName)
try:
print_d(f"setDbComment db {self.db}, file: {self.fileName}")
self.db.execute("insert into dirnotes (name,date,size,comment,comment_date,author) values (?,datetime(?,'unixepoch','localtime'),?,?,datetime(?,'unixepoch','localtime'),?)",
(self.fileName, s.st_mtime, s.st_size,
str(newComment), time.time(), getpass.getuser()))
self.db.commit()
self.dbCommentAuthorDate = newComment, getpass.getuser(), UiHelper.epochToDb(time.time())
except sqlite3.OperationalError:
print_d("database is locked or unwritable")
errorBox("the database that stores comments is locked or unwritable")
def setXattrComment(self,newComment):
print_d(f"set comment {newComment} on file {self.fileName}")
try:
os.setxattr(self.fileName,xattr_comment,bytes(newComment,'utf8'),follow_symlinks=False)
os.setxattr(self.fileName,xattr_author,bytes(getpass.getuser(),'utf8'),follow_symlinks=False)
os.setxattr(self.fileName,xattr_date,bytes(time.strftime(DATE_FORMAT),'utf8'),follow_symlinks=False)
self.xattrCommentAuthorDate = newComment, getpass.getuser(), time.strftime(DATE_FORMAT)
return True
# we need to move these cases out to a handler
except Exception as e:
if self.isLink():
errorBox("Linux does not allow xattr comments on symlinks; comment is stored in database")
elif self.isSock():
errorBox("Linux does not allow comments on sockets; comment is stored in database")
elif os.access(self.fileName, os.W_OK)!=True:
errorBox(f"you don't appear to have write permissions on this file: {self.fileName}")
# change the listbox background to yellow
elif "Errno 95" in str(e):
errorBox("is this a VFAT or EXFAT volume? these don't allow comments")
return False
def getComment(self,mode):
""" returns the comment for the given mode """
return self.getDbComment() if mode == "db" else self.getXattrComment()
def getOtherComment(self,mode):
return self.getDbComment() if mode == "xattr" else self.getXattrComment()
def getData(self,mode):
""" returns (comment, author, comment_date) for the given mode """
return self.getDbData() if mode == "db" else self.getXattrData()
def getOtherData(self,mode):
""" returns (comment, author, comment_date) for the 'other' mode """
return self.getDbData() if mode == "xattr" else self.getXattrData()
def getDate(self):
return self.date
def getSize(self):
return self.size
def isDir(self):
return stat.S_ISDIR(self.stat.st_mode)
def isLink(self):
return stat.S_ISLNK(self.stat.st_mode)
def isSock(self):
return stat.S_ISSOCK(self.stat.st_mode)
def copyFile(self, dest, doMove = False):
""" dest is either a FQ filename or a FQ directory, to be expanded with same basename """
# NOTE: this method copies the xattr (comment + old author + old date)
# but creates new db (comment + this author + new date)
if os.path.isdir(dest):
dest = os.path.join(destDir,self.displayName)
try:
print_d("try copy from",self.fileName,"to",dest)
# shutil methods preserve dates & chmod/chown & xattr
if doMove:
shutil.move(self.fileName, dest)
else:
shutil.copy2(self.fileName, dest)
# can raise FileNotFoundError, Permission Error, shutil.SameFileError, IsADirectoryError
except:
errorBox(f"file copy/move to <{dest}> failed; check permissions")
return
# and copy the database record
f = FileObj(dest, self.db)
f.setDbComment(self.getDbComment())
def moveFile(self, dest):
""" dest is either a FQ filename or a FQ directory, to be expanded with same basename """
self.copyFile(dest, doMove = True)
# >>> snip here <<<
#============= the functions that are called from the main.loop ===============
def file_copy(f,target,target_is_dir,is_copy,force):
print_d(f"call file_copy/move with args={target},{target_is_dir} and {force}")
dest = target if not target_is_dir else os.path.join(target,f.getDisplayName())
if os.path.exists(dest) and not force:
go = input("The copy/move target <<" + dest + ">> exists. Overwrite? (y or n) ")
if go != 'y' and go != 'Y':
return
print_d(f"copy/move from {f} to {dest}")
if is_copy:
f.copyFile(dest)
else:
f.moveFile(dest)
def file_zap(f,all_flag):
db = f.db
print_d(f"zapping the comment history of {f.getName()}")
if all_flag:
confirm = input("You requested a complete flush of the comment database history. Please hit 'Y' to confirm")
if confirm == 'Y':
print_d("zapping the entire database")
db.execute("delete from dirnotes where comment_date < (select max(comment_date) from dirnotes d2 where d2.name = dirnotes.name)")
else:
db.execute("delete from dirnotes where name=? and comment_date < (select max(comment_date) from dirnotes where name=?)",(f.getName(),f.getName()))
db.commit()
def file_modify_comment(f, create, append, erase):
print_d(f"modify the comment on file {f} with extra={(create,append,erase)}")
if not os.path.exists(f.getName()):
print(f"the target file does not exist; please check the spelling of the file: {f}")
sys.exit(10)
if create:
f.setXattrComment(create)
f.setDbComment(create)
elif append:
c = f.getComment(mode)
f.setXattrComment(f"{c}; {append}")
f.setDbComment(f"{c}; {append}")
elif erase:
f.setXattrComment('')
f.setDbComment('')
def file_display(f, listall, json, minimal):
fn = f.getDisplayName()
print_d(f"list file details {fn}")
c,a,d = f.getData(mode)
c1,a1,d1 = f.getOtherData(mode)
if c or listall:
if c and (c != c1):
c += '*'
if not json:
if minimal:
print(f"{c}")
elif verbose:
print(f"{f.getName()}: {repr(c)}, {repr(a)}, {repr(d)}")
else:
print(f"{fn}: {repr(c)}")
else:
if verbose:
answer_json.append( {"file":f.getName(),"comment":c,"author":a,"date":d } )
else:
answer_json.append( {"file":fn,"comment":c} )
def file_history(f,json):
db = f.db
c = db.execute("select comment, author, comment_date from dirnotes where name=? order by comment_date desc",(f.getName(),))
if not json:
print(f"file: \t\t{f.getName()}\n")
else:
answer_json.append ( {"file":f.getName()} )
for a in c.fetchall():
if not json:
print(f"comment: \t{a[0]}\nauthor: \t{a[1]}\t\tdate: \t\t{a[2]}\n")
else:
answer_json.append( {"comment":a[0],"author":a[1],"date":a[2]} )
def main(args):
parser = argparse.ArgumentParser(description="Display or add comments to files",
epilog="Some options conflict. Use only one of: -l -c -a -H -e -z -Z and one of -d -x")
parser.add_argument('-V',"--version", action="version", version=f"dncli ver:{VERSION}")
parser.add_argument('-v',"--verbose", action='count', help="verbose output (include comment author & date)",default=0)
parser.add_argument('-D',"--debug", action='store_true',help="include debugging output; do not use in scripts",default=0)
parser.add_argument('-j',"--json", action="store_true",help="output in JSON format")
pars_m = parser.add_mutually_exclusive_group()
pars_m.add_argument('-l',"--listall", action="store_true",help="list all files, including those without comments")
parser.add_argument('-d',"--db", action="store_true",help="list comments from database")
parser.add_argument('-x',"--xattr", action="store_true",help="list comments from xattr")
parser.add_argument('-n',"--minimal", action="store_true",help="output only comments; useful in scripting")
parser.add_argument('-H',"--history", action="store_true",help="output the history of database comments for a file")
pars_m.add_argument('-c',"--create", metavar="comment", help="add a comment to a file")
pars_m.add_argument('-a',"--append", metavar="comment", help="append to a comment on a file, separator=';'")
pars_m.add_argument('-C',"--copy", action="store_true",help="copy a file with its comments")
pars_m.add_argument('-M',"--move", action="store_true",help="move a file with its comments")
parser.add_argument('-y',"--cp_force",action="store_true",help="copy over existing files")
pars_m.add_argument('-e',"--erase", action="store_true",help="erase the comment on a file")
pars_m.add_argument('-z',"--zap", action="store_true",help="clear the database comment history on a file")
pars_m.add_argument('-Z',"--zapall", action="store_true",help="clear the comment history in the entire database")
parser.add_argument( "--config", dest="config_file", help="use config file (default ~/.config/dirnotes/dirnotes.conf)")
parser.add_argument('file_list',nargs='*',help="file(s); list commands may omit this")
args = parser.parse_args()
# default is to display all files that have comments
# major modes are: display ( -l -H), add-comment (-a -c -e), clear-history(-z -Z), copy (-C)
# determine the major mode, then apply an appropriate function over the file_list
args.display = not (args.create or args.append or args.copy or args.erase or args.zap or args.zapall)
if args.cp_force and not args.copy:
print("the -y/--cp_force options can only be used with the -C/--copy command")
sys.exit(3)
if args.json and not args.display:
print("the -j/--json option can only be used with the display modes")
sys.exit(4)
if args.minimal and not args.display:
print("the -n/--minimal option only applies to the display modes")
sys.exit(5)
if args.history and not args.display:
print("the -H/--history option only applies to the display modes")
sys.exit(5)
if args.xattr and (args.zap or args.zapall):
print("the -x/--xattr option doesn't apply to the -z/--zap and -Z/--zapall commands")
sys.exit(7)
global verbose, debug
verbose = args.verbose
debug = args.debug
config = ConfigLoader(args.config_file or DEFAULT_CONFIG_FILE)
global mode
mode = config.mode
mode = "xattr" if args.xattr else ("db" if args.db else mode)
db = DnDataBase(config.dbName).db
#====== 1) build the file list =============
files = args.file_list
# for the list commands, auto-fill the file list with the current directory
if not files and args.display:
files = os.listdir(".")
files.sort()
# other command require explicity file lists
if not files:
print("please specify a file or files to use")
sys.exit(10)
print_d("got the files:", files)
#======= 2) build the function
if args.create or args.append or args.erase:
print_d(f"create/append/erase: {args.create} . {args.append} . {args.erase}")
func = file_modify_comment
loop_args = (args.create, args.append, args.erase)
elif args.zap or args.zapall:
print_d(f"got a zap command {args.zap} . {args.zapall}")
func = file_zap
loop_args = (args.zapall,)
if args.zapall:
files = ('.',)
elif args.copy or args.move:
print_d(f"got a copy/move command to copy={args.copy}, move={args.move}")
# the last item on the file list is the target
n_files = len(files)
if n_files < 2:
print("the copy/move command requires at least two arguments, the last one is the destination")
sys.exit(1)
files, target = files[:-1], files[-1]
target_is_directory = os.path.isdir(target)
print_d(f"copy/move from {files} to {target}")
if n_files > 2 and not target_is_directory:
print("multiple copy/move files must go to a target directory")
sys.exit(3)
func = file_copy
loop_args = (target, target_is_directory, args.copy, args.cp_force)
elif args.history:
func = file_history
loop_args = (args.json,)
else:
assert args.display
print_d(f"list files using {mode} priority")
print_d(f"display command with option: {args.listall} and {args.history} and {args.json} and {args.minimal}")
loop_args = (args.listall, args.json, args.minimal)
func = file_display
#====== 3) loop on the list, execute the function =============
for f in files:
func(FileObj(f,db),*loop_args)
if answer_json:
print(json.dumps(answer_json))
if __name__ == "__main__":
main(sys.argv)