#!/usr/bin/python3 # TODO: option to print out full path name; most useful in the .json output format VERSION = "0.4" import os, sys, argparse, xattr, json, sqlite3 # global mutables answer_json = [] verbose = debug = 0 db = None xattr_comment = "user.xdg.comment" xattr_author = "user.xdg.comment.author" xattr_date = "user.xdg.comment.date" mode = "db" #======= debugging/verbose =========== def print_d(*a): if debug: print('>>', *a) def errorBox(*a): print(*a) # >>> snip here <<< #============ the DnDataBase, UiHelper and FileObj code is shared with other dirnotes programs import getpass, time, stat, shutil DEFAULT_CONFIG_FILE = "~/.config/dirnotes/dirnotes.conf" # or /etc/dirnotes.conf # config # we could store the config in the database, in a second table # or in a .json file DEFAULT_CONFIG = {"xattr_tag":"user.xdg.comment", "database":"~/.local/share/dirnotes/dirnotes.db", "start_mode":"xattr", "options for database":("~/.local/share/dirnotes/dirnotes.db","~/.dirnotes.db","/etc/dirnotes.db"), "options for start_mode":("db","xattr") } class ConfigLoader: # singleton def __init__(self, configFile): configFile = os.path.expanduser(configFile) try: with open(configFile,"r") as f: config = json.load(f) except json.JSONDecodeError: errorBox(f"problem reading config file {configFile}; check the JSON syntax") config = DEFAULT_CONFIG except FileNotFoundError: errorBox(f"config file {configFile} not found; using the default settings") config = DEFAULT_CONFIG try: os.makedirs(os.path.dirname(configFile),exist_ok = True) with open(configFile,"w") as f: json.dump(config,f,indent=4) except: errorBox(f"problem creating the config file {configFile}") self.dbName = os.path.expanduser(config["database"]) self.mode = config["start_mode"] # can get over-ruled by the command line options self.xattr_comment = config["xattr_tag"] class DnDataBase: ''' the database is flat fileName: fully qualified name st_mtime: a float size: a long comment: a string comment_time: a float, the time of the comment save author: the username that created the comment this object: 1) finds or creates the database 2) determine if it's readonly TODO: the database is usually associated with a user, in $XDG_DATA_HOME (~/.local/share/) TODO: if the database is not found, create it in $XDG_DATA_DIRS (/usr/local/share/) make it 0666 permissions (rw-rw-rw-) ''' def __init__(self,dbFile): '''try to open the database; if not found, create it''' try: self.db = sqlite3.connect(dbFile) except sqlite3.OperationalError: print_d(f"Database {dbFile} not found") try: os.makedirs(os.path.dirname(dbFile), exist_ok = True) self.db = sqlite3.connect(dbFile) except (sqlite3.OperationalError, PermissionError): printd(f"Failed to create {dbFile}, aborting") raise # create new table if it doesn't exist try: self.db.execute("select * from dirnotes") except sqlite3.OperationalError: self.db.execute("create table dirnotes (name TEXT, date DATETIME, size INTEGER, comment TEXT, comment_date DATETIME, author TEXT)") self.db.execute("create index dirnotes_i on dirnotes(name)") print_d(f"Table dirnotes created") # at this point, if a shared database is required, somebody needs to set perms to 0o666 self.writable = True try: self.db.execute("pragma user_verson=0") except sqlite3.OperationalError: self.writable = False DATE_FORMAT = "%Y-%m-%d %H:%M:%S" class UiHelper: @staticmethod def epochToDb(epoch): return time.strftime(DATE_FORMAT,time.localtime(epoch)) @staticmethod def DbToEpoch(dbTime): return time.mktime(time.strptime(dbTime,DATE_FORMAT)) @staticmethod def getShortDate(longDate): now = time.time() diff = now - longDate if diff > YEAR: fmt = "%b %e %Y" else: fmt = "%b %e %H:%M" return time.strftime(fmt, time.localtime(longDate)) @staticmethod def getShortSize(fo): if fo.isDir(): return " " elif fo.isLink(): return " " size = fo.getSize() log = int((math.log10(size+1)-2)/3) s = " KMGTE"[log] base = int(size/math.pow(10,log*3)) return f"{base}{s}".strip().rjust(7) ## one for each file ## and a special one for ".." parent directory class FileObj: """ The FileObj knows about both kinds of comments. """ def __init__(self, fileName, db): self.fileName = os.path.abspath(fileName) # full path; dirs end WITHOUT a terminal / self.stat = os.lstat(self.fileName) self.displayName = os.path.split(fileName)[1] # base name; dirs end with a / if self.isDir(): if not self.displayName.endswith('/'): self.displayName += '/' self.date = self.stat.st_mtime self.size = self.stat.st_size self.db = db def getName(self): """ returns the absolute pathname """ return self.fileName def getDisplayName(self): """ returns just the basename of the file; dirs end in / """ return self.displayName def getDbData(self): """ returns (comment, author, comment_date) """ if not hasattr(self,'dbCommentAuthorDate'): cad = self.db.execute("select comment, author, comment_date from dirnotes where name=? order by comment_date desc",(self.fileName,)).fetchone() self.dbCommentAuthorDate = cad if cad else (None, None, None) return self.dbCommentAuthorDate def getDbComment(self): return self.getDbData()[0] def getXattrData(self): """ returns (comment, author, comment_date) """ if not hasattr(self,'xattrCommentAuthorDate'): c = a = d = None try: c = os.getxattr(self.fileName, xattr_comment, follow_symlinks=False).decode() a = os.getxattr(self.fileName, xattr_author, follow_symlinks=False).decode() d = os.getxattr(self.fileName, xattr_date, follow_symlinks=False).decode() except: # no xattr comment pass self.xattrCommentAuthorDate = c,a,d return self.xattrCommentAuthorDate def getXattrComment(self): return self.getXattrData()[0] def setDbComment(self,newComment): # how are we going to hook this? #if not self.db.writable: # errorBox("The database is readonly; you cannot add or edit comments") # return s = os.lstat(self.fileName) try: print_d(f"setDbComment db {self.db}, file: {self.fileName}") self.db.execute("insert into dirnotes (name,date,size,comment,comment_date,author) values (?,datetime(?,'unixepoch','localtime'),?,?,datetime(?,'unixepoch','localtime'),?)", (self.fileName, s.st_mtime, s.st_size, str(newComment), time.time(), getpass.getuser())) self.db.commit() self.dbCommentAuthorDate = newComment, getpass.getuser(), UiHelper.epochToDb(time.time()) except sqlite3.OperationalError: print_d("database is locked or unwritable") errorBox("the database that stores comments is locked or unwritable") def setXattrComment(self,newComment): print_d(f"set comment {newComment} on file {self.fileName}") try: os.setxattr(self.fileName,xattr_comment,bytes(newComment,'utf8'),follow_symlinks=False) os.setxattr(self.fileName,xattr_author,bytes(getpass.getuser(),'utf8'),follow_symlinks=False) os.setxattr(self.fileName,xattr_date,bytes(time.strftime(DATE_FORMAT),'utf8'),follow_symlinks=False) self.xattrCommentAuthorDate = newComment, getpass.getuser(), time.strftime(DATE_FORMAT) return True # we need to move these cases out to a handler except Exception as e: if self.isLink(): errorBox("Linux does not allow xattr comments on symlinks; comment is stored in database") elif self.isSock(): errorBox("Linux does not allow comments on sockets; comment is stored in database") elif os.access(self.fileName, os.W_OK)!=True: errorBox(f"you don't appear to have write permissions on this file: {self.fileName}") # change the listbox background to yellow elif "Errno 95" in str(e): errorBox("is this a VFAT or EXFAT volume? these don't allow comments") return False def getComment(self,mode): """ returns the comment for the given mode """ return self.getDbComment() if mode == "db" else self.getXattrComment() def getOtherComment(self,mode): return self.getDbComment() if mode == "xattr" else self.getXattrComment() def getData(self,mode): """ returns (comment, author, comment_date) for the given mode """ return self.getDbData() if mode == "db" else self.getXattrData() def getOtherData(self,mode): """ returns (comment, author, comment_date) for the 'other' mode """ return self.getDbData() if mode == "xattr" else self.getXattrData() def getDate(self): return self.date def getSize(self): return self.size def isDir(self): return stat.S_ISDIR(self.stat.st_mode) def isLink(self): return stat.S_ISLNK(self.stat.st_mode) def isSock(self): return stat.S_ISSOCK(self.stat.st_mode) def copyFile(self, dest, doMove = False): """ dest is either a FQ filename or a FQ directory, to be expanded with same basename """ # NOTE: this method copies the xattr (comment + old author + old date) # but creates new db (comment + this author + new date) if os.path.isdir(dest): dest = os.path.join(destDir,self.displayName) try: print_d("try copy from",self.fileName,"to",dest) # shutil methods preserve dates & chmod/chown & xattr if doMove: shutil.move(self.fileName, dest) else: shutil.copy2(self.fileName, dest) # can raise FileNotFoundError, Permission Error, shutil.SameFileError, IsADirectoryError except: errorBox(f"file copy/move to <{dest}> failed; check permissions") return # and copy the database record f = FileObj(dest, self.db) f.setDbComment(self.getDbComment()) def moveFile(self, dest): """ dest is either a FQ filename or a FQ directory, to be expanded with same basename """ self.copyFile(dest, doMove = True) # >>> snip here <<< #============= the functions that are called from the main.loop =============== def file_copy(f,target,target_is_dir,is_copy,force): print_d(f"call file_copy/move with args={target},{target_is_dir} and {force}") dest = target if not target_is_dir else os.path.join(target,f.getDisplayName()) if os.path.exists(dest) and not force: go = input("The copy/move target <<" + dest + ">> exists. Overwrite? (y or n) ") if go != 'y' and go != 'Y': return print_d(f"copy/move from {f} to {dest}") if is_copy: f.copyFile(dest) else: f.moveFile(dest) def file_zap(f,all_flag): db = f.db print_d(f"zapping the comment history of {f.getName()}") if all_flag: confirm = input("You requested a complete flush of the comment database history. Please hit 'Y' to confirm") if confirm == 'Y': print_d("zapping the entire database") db.execute("delete from dirnotes where comment_date < (select max(comment_date) from dirnotes d2 where d2.name = dirnotes.name)") else: db.execute("delete from dirnotes where name=? and comment_date < (select max(comment_date) from dirnotes where name=?)",(f.getName(),f.getName())) db.commit() def file_modify_comment(f, create, append, erase): print_d(f"modify the comment on file {f} with extra={(create,append,erase)}") if not os.path.exists(f.getName()): print(f"the target file does not exist; please check the spelling of the file: {f}") sys.exit(10) if create: f.setXattrComment(create) f.setDbComment(create) elif append: c = f.getComment(mode) f.setXattrComment(f"{c}; {append}") f.setDbComment(f"{c}; {append}") elif erase: f.setXattrComment('') f.setDbComment('') def file_display(f, listall, json, minimal): fn = f.getDisplayName() print_d(f"list file details {fn}") c,a,d = f.getData(mode) c1,a1,d1 = f.getOtherData(mode) diffFlag = '*' if c and (c != c1) else '' if c or listall: if not json: if minimal: print(f"{c}{diffFlag}") elif verbose: print(f"{f.getName()}: {repr(c)}{diffFlag}, {repr(a)}, {repr(d)}") else: print(f"{fn}: {repr(c)}{diffFlag}") else: entry = {"file":fn, "comment":c} if verbose: entry.update({"file":f.getName(),"author":a, "date": d}) if diffFlag: entry["diffFlag"] = True answer_json.append(entry) def file_history(f,json): db = f.db c = db.execute("select comment, author, comment_date from dirnotes where name=? order by comment_date desc",(f.getName(),)) if not json: print(f"file: \t\t{f.getName()}\n") else: answer_json.append ( {"file":f.getName()} ) for a in c.fetchall(): if not json: print(f"comment: \t{a[0]}\nauthor: \t{a[1]}\t\tdate: \t\t{a[2]}\n") else: answer_json.append( {"comment":a[0],"author":a[1],"date":a[2]} ) def main(args): parser = argparse.ArgumentParser(description="Display or add comments to files", epilog="Some options conflict. Use only one of: -l -c -a -H -e -z -Z and one of -d -x") parser.add_argument('-V',"--version", action="version", version=f"dncli ver:{VERSION}") parser.add_argument('-v',"--verbose", action='count', help="verbose output (include comment author & date)",default=0) parser.add_argument('-D',"--debug", action='store_true',help="include debugging output; do not use in scripts",default=0) parser.add_argument('-j',"--json", action="store_true",help="output in JSON format") pars_m = parser.add_mutually_exclusive_group() pars_m.add_argument('-l',"--listall", action="store_true",help="list all files, including those without comments") parser.add_argument('-d',"--db", action="store_true",help="list comments from database") parser.add_argument('-x',"--xattr", action="store_true",help="list comments from xattr") parser.add_argument('-n',"--minimal", action="store_true",help="output only comments; useful in scripting") parser.add_argument('-H',"--history", action="store_true",help="output the history of database comments for a file") pars_m.add_argument('-c',"--create", metavar="comment", help="add a comment to a file") pars_m.add_argument('-a',"--append", metavar="comment", help="append to a comment on a file, separator=';'") pars_m.add_argument('-C',"--copy", action="store_true",help="copy a file with its comments") pars_m.add_argument('-M',"--move", action="store_true",help="move a file with its comments") parser.add_argument('-y',"--cp_force",action="store_true",help="copy over existing files") pars_m.add_argument('-e',"--erase", action="store_true",help="erase the comment on a file") pars_m.add_argument('-z',"--zap", action="store_true",help="clear the database comment history on a file") pars_m.add_argument('-Z',"--zapall", action="store_true",help="clear the comment history in the entire database") parser.add_argument( "--config", dest="config_file", help="use config file (default ~/.config/dirnotes/dirnotes.conf)") parser.add_argument('file_list',nargs='*',help="file(s); list commands may omit this") args = parser.parse_args() # default is to display all files that have comments # major modes are: display ( -l -H), add-comment (-a -c -e), clear-history(-z -Z), copy (-C) # determine the major mode, then apply an appropriate function over the file_list args.display = not (args.create or args.append or args.copy or args.erase or args.zap or args.zapall) if args.cp_force and not args.copy: print("the -y/--cp_force options can only be used with the -C/--copy command") sys.exit(3) if args.json and not args.display: print("the -j/--json option can only be used with the display modes") sys.exit(4) if args.minimal and not args.display: print("the -n/--minimal option only applies to the display modes") sys.exit(5) if args.history and not args.display: print("the -H/--history option only applies to the display modes") sys.exit(5) if args.xattr and (args.zap or args.zapall): print("the -x/--xattr option doesn't apply to the -z/--zap and -Z/--zapall commands") sys.exit(7) global verbose, debug verbose = args.verbose debug = args.debug config = ConfigLoader(args.config_file or DEFAULT_CONFIG_FILE) global mode mode = config.mode mode = "xattr" if args.xattr else ("db" if args.db else mode) db = DnDataBase(config.dbName).db #====== 1) build the file list ============= files = args.file_list # for the list commands, auto-fill the file list with the current directory if not files and args.display: files = os.listdir(".") files.sort() # other command require explicity file lists if not files: print("please specify a file or files to use") sys.exit(10) print_d("got the files:", files) #======= 2) build the function if args.create or args.append or args.erase: print_d(f"create/append/erase: {args.create} . {args.append} . {args.erase}") func = file_modify_comment loop_args = (args.create, args.append, args.erase) elif args.zap or args.zapall: print_d(f"got a zap command {args.zap} . {args.zapall}") func = file_zap loop_args = (args.zapall,) if args.zapall: files = ('.',) elif args.copy or args.move: print_d(f"got a copy/move command to copy={args.copy}, move={args.move}") # the last item on the file list is the target n_files = len(files) if n_files < 2: print("the copy/move command requires at least two arguments, the last one is the destination") sys.exit(1) files, target = files[:-1], files[-1] target_is_directory = os.path.isdir(target) print_d(f"copy/move from {files} to {target}") if n_files > 2 and not target_is_directory: print("multiple copy/move files must go to a target directory") sys.exit(3) func = file_copy loop_args = (target, target_is_directory, args.copy, args.cp_force) elif args.history: func = file_history loop_args = (args.json,) else: assert args.display print_d(f"list files using {mode} priority") print_d(f"display command with option: {args.listall} and {args.history} and {args.json} and {args.minimal}") loop_args = (args.listall, args.json, args.minimal) func = file_display #====== 3) loop on the list, execute the function ============= for f in files: func(FileObj(f,db),*loop_args) if answer_json: print(json.dumps(answer_json)) if __name__ == "__main__": main(sys.argv)