mergerfs.dup 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399
  1. #!/usr/bin/env python3
  2. # Copyright (c) 2016, Antonio SJ Musumeci <trapexit@spawn.link>
  3. #
  4. # Permission to use, copy, modify, and/or distribute this software for any
  5. # purpose with or without fee is hereby granted, provided that the above
  6. # copyright notice and this permission notice appear in all copies.
  7. #
  8. # THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
  9. # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
  10. # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
  11. # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
  12. # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
  13. # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
  14. # OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  15. import argparse
  16. import ctypes
  17. import errno
  18. import fnmatch
  19. import io
  20. import os
  21. import shlex
  22. import subprocess
  23. import sys
  24. _libc = ctypes.CDLL("libc.so.6",use_errno=True)
  25. _lgetxattr = _libc.lgetxattr
  26. _lgetxattr.argtypes = [ctypes.c_char_p,ctypes.c_char_p,ctypes.c_void_p,ctypes.c_size_t]
  27. def lgetxattr(path,name):
  28. if type(path) == str:
  29. path = path.encode(errors='backslashreplace')
  30. if type(name) == str:
  31. name = name.encode(errors='backslashreplace')
  32. length = 64
  33. while True:
  34. buf = ctypes.create_string_buffer(length)
  35. res = _lgetxattr(path,name,buf,ctypes.c_size_t(length))
  36. if res >= 0:
  37. return buf.raw[0:res].decode(errors='backslashreplace')
  38. else:
  39. err = ctypes.get_errno()
  40. if err == errno.ERANGE:
  41. length *= 2
  42. elif err == errno.ENODATA:
  43. return None
  44. else:
  45. raise IOError(err,os.strerror(err),path)
  46. def ismergerfs(path):
  47. try:
  48. lgetxattr(path,'user.mergerfs.basepath')
  49. return True
  50. except IOError as e:
  51. return False
  52. def mergerfs_control_file(basedir):
  53. if basedir == '/':
  54. return None
  55. ctrlfile = os.path.join(basedir,'.mergerfs')
  56. if os.path.exists(ctrlfile):
  57. return ctrlfile
  58. basedir = os.path.dirname(basedir)
  59. return mergerfs_control_file(basedir)
  60. def mergerfs_branches(ctrlfile):
  61. branches = lgetxattr(ctrlfile,'user.mergerfs.srcmounts')
  62. branches = branches.split(':')
  63. return branches
  64. def match(filename,matches):
  65. for match in matches:
  66. if fnmatch.fnmatch(filename,match):
  67. return True
  68. return False
  69. def execute_cmd(args):
  70. return subprocess.call(args)
  71. def print_args(args):
  72. quoted = [shlex.quote(arg) for arg in args]
  73. print(' '.join(quoted))
  74. def build_copy_file(src,tgt,rel):
  75. srcpath = os.path.join(src,'./',rel)
  76. tgtpath = tgt + '/'
  77. return ['rsync',
  78. '-avHAXWE',
  79. '--numeric-ids',
  80. '--progress',
  81. '--relative',
  82. srcpath,
  83. tgtpath]
  84. def build_branches_freespace(branches):
  85. rv = dict()
  86. for branch in branches:
  87. st = os.statvfs(branch)
  88. rv[branch] = st.f_bavail * st.f_frsize
  89. return rv
  90. def print_help():
  91. help = \
  92. '''
  93. usage: mergerfs.dup [<options>] <dir>
  94. Duplicate files & directories across multiple drives in a pool.
  95. Will print out commands for inspection and out of band use.
  96. positional arguments:
  97. dir starting directory
  98. optional arguments:
  99. -c, --count= Number of copies to create. (default: 2)
  100. -d, --dup= Which file (if more than one exists) to choose to
  101. duplicate. Each one falls back to `mergerfs` if
  102. all files have the same value. (default: newest)
  103. * newest : file with largest mtime
  104. * oldest : file with smallest mtime
  105. * smallest : file with smallest size
  106. * largest : file with largest size
  107. * mergerfs : file chosen by mergerfs' getattr
  108. -p, --prune Remove files above `count`. Without this enabled
  109. it will update all existing files.
  110. -e, --execute Execute `rsync` and `rm` commands. Not just
  111. print them.
  112. -I, --include= fnmatch compatible filter to include files.
  113. Can be used multiple times.
  114. -E, --exclude= fnmatch compatible filter to exclude files.
  115. Can be used multiple times.
  116. '''
  117. print(help)
  118. def buildargparser():
  119. parser = argparse.ArgumentParser(add_help=False)
  120. parser.add_argument('dir',
  121. type=str,
  122. nargs='?',
  123. default=None)
  124. parser.add_argument('-c','--count',
  125. dest='count',
  126. type=int,
  127. default=2)
  128. parser.add_argument('-p','--prune',
  129. dest='prune',
  130. action='store_true')
  131. parser.add_argument('-d','--dup',
  132. choices=['newest','oldest',
  133. 'smallest','largest',
  134. 'mergerfs'],
  135. default='newest')
  136. parser.add_argument('-e','--execute',
  137. dest='execute',
  138. action='store_true')
  139. parser.add_argument('-I','--include',
  140. dest='include',
  141. type=str,
  142. action='append',
  143. default=[])
  144. parser.add_argument('-E','--exclude',
  145. dest='exclude',
  146. type=str,
  147. action='append',
  148. default=[])
  149. parser.add_argument('-h','--help',
  150. action='store_true')
  151. return parser
  152. def xattr_basepath(fullpath):
  153. return lgetxattr(fullpath,'user.mergerfs.basepath')
  154. def xattr_allpaths(fullpath):
  155. return lgetxattr(fullpath,'user.mergerfs.allpaths')
  156. def xattr_relpath(fullpath):
  157. return lgetxattr(fullpath,'user.mergerfs.relpath')
  158. def exists(base,rel,name):
  159. fullpath = os.path.join(base,rel,name)
  160. return os.path.lexists(fullpath)
  161. def mergerfs_all_basepaths(fullpath,relpath):
  162. attr = xattr_allpaths(fullpath)
  163. if not attr:
  164. dirname = os.path.dirname(fullpath)
  165. basename = os.path.basename(fullpath)
  166. attr = xattr_allpaths(dirname)
  167. attr = attr.split('\0')
  168. attr = [os.path.join(path,basename)
  169. for path in attr
  170. if os.path.lexists(os.path.join(path,basename))]
  171. else:
  172. attr = attr.split('\0')
  173. return [x[:-len(relpath)].rstrip('/') for x in attr]
  174. def mergerfs_basepath(fullpath):
  175. attr = xattr_basepath(fullpath)
  176. if not attr:
  177. dirname = os.path.dirname(fullpath)
  178. basename = os.path.basename(fullpath)
  179. attr = xattr_allpaths(dirname)
  180. attr = attr.split('\0')
  181. for path in attr:
  182. fullpath = os.path.join(path,basename)
  183. if os.path.lexists(fullpath):
  184. relpath = xattr_relpath(dirname)
  185. return path[:-len(relpath)].rstrip('/')
  186. return attr
  187. def mergerfs_relpath(fullpath):
  188. attr = xattr_relpath(fullpath)
  189. if not attr:
  190. dirname = os.path.dirname(fullpath)
  191. basename = os.path.basename(fullpath)
  192. attr = xattr_relpath(dirname)
  193. attr = os.path.join(attr,basename)
  194. return attr.lstrip('/')
  195. def newest_dupfun(default_basepath,relpath,basepaths):
  196. sts = dict([(f,os.lstat(os.path.join(f,relpath))) for f in basepaths])
  197. mtime = sts[basepaths[0]].st_mtime
  198. if not all([st.st_mtime == mtime for st in sts.values()]):
  199. return sorted(sts,key=lambda x: sts.get(x).st_mtime,reverse=True)[0]
  200. ctime = sts[basepaths[0]].st_ctime
  201. if not all([st.st_ctime == ctime for st in sts.values()]):
  202. return sorted(sts,key=lambda x: sts.get(x).st_ctime,reverse=True)[0]
  203. return default_basepath
  204. def oldest_dupfun(default_basepath,relpath,basepaths):
  205. sts = dict([(f,os.lstat(os.path.join(f,relpath))) for f in basepaths])
  206. mtime = sts[basepaths[0]].st_mtime
  207. if not all([st.st_mtime == mtime for st in sts.values()]):
  208. return sorted(sts,key=lambda x: sts.get(x).st_mtime,reverse=False)[0]
  209. ctime = sts[basepaths[0]].st_ctime
  210. if not all([st.st_ctime == ctime for st in sts.values()]):
  211. return sorted(sts,key=lambda x: sts.get(x).st_ctime,reverse=False)[0]
  212. return default_basepath
  213. def largest_dupfun(default_basepath,relpath,basepaths):
  214. sts = dict([(f,os.lstat(os.path.join(f,relpath))) for f in basepaths])
  215. size = sts[basepaths[0]].st_size
  216. if not all([st.st_size == size for st in sts.values()]):
  217. return sorted(sts,key=lambda x: sts.get(x).st_size,reverse=True)[0]
  218. return default_basepath
  219. def smallest_dupfun(default_basepath,relpath,basepaths):
  220. sts = dict([(f,os.lstat(os.path.join(f,relpath))) for f in basepaths])
  221. size = sts[basepaths[0]].st_size
  222. if not all([st.st_size == size for st in sts.values()]):
  223. return sorted(sts,key=lambda x: sts.get(x).st_size,reverse=False)[0]
  224. return default_basepath
  225. def mergerfs_dupfun(default_basepath,relpath,basepaths):
  226. return default_basepath
  227. def getdupfun(name):
  228. funs = {'newest': newest_dupfun,
  229. 'oldest': oldest_dupfun,
  230. 'smallest': smallest_dupfun,
  231. 'largest': largest_dupfun,
  232. 'mergerfs': mergerfs_dupfun}
  233. return funs[name]
  234. def main():
  235. sys.stdout = io.TextIOWrapper(sys.stdout.buffer,
  236. encoding='utf8',
  237. errors='backslashreplace',
  238. line_buffering=True)
  239. sys.stderr = io.TextIOWrapper(sys.stderr.buffer,
  240. encoding='utf8',
  241. errors='backslashreplace',
  242. line_buffering=True)
  243. parser = buildargparser()
  244. args = parser.parse_args()
  245. if args.help or not args.dir:
  246. print_help()
  247. sys.exit(0)
  248. args.dir = os.path.realpath(args.dir)
  249. if not ismergerfs(args.dir):
  250. print("%s is not a mergerfs mount" % args.dir)
  251. sys.exit(1)
  252. prune = args.prune
  253. execute = args.execute
  254. includes = ['*'] if not args.include else args.include
  255. excludes = args.exclude
  256. dupfun = getdupfun(args.dup)
  257. ctrlfile = mergerfs_control_file(args.dir)
  258. branches = mergerfs_branches(ctrlfile)
  259. branches = build_branches_freespace(branches)
  260. count = min(args.count,len(branches))
  261. try:
  262. for (dirpath,dirnames,filenames) in os.walk(args.dir):
  263. for filename in filenames:
  264. if match(filename,excludes):
  265. continue
  266. if not match(filename,includes):
  267. continue
  268. fullpath = os.path.join(dirpath,filename)
  269. basepath = mergerfs_basepath(fullpath)
  270. relpath = mergerfs_relpath(fullpath)
  271. existing = mergerfs_all_basepaths(fullpath,relpath)
  272. srcpath = dupfun(basepath,relpath,existing)
  273. srcfile = os.path.join(srcpath,relpath)
  274. srcfile_size = os.lstat(srcfile).st_size
  275. existing.remove(srcpath)
  276. i = 1
  277. copies = []
  278. for tgtpath in existing:
  279. if prune and i >= count:
  280. break
  281. copies.append(tgtpath)
  282. args = build_copy_file(srcpath,tgtpath,relpath)
  283. print('# overwrite')
  284. print_args(args)
  285. if execute:
  286. execute_cmd(args)
  287. i += 1
  288. for _ in range(i,count):
  289. for branch in sorted(branches,key=branches.get,reverse=True):
  290. tgtfile = os.path.join(branch,relpath)
  291. if branch in copies or os.path.exists(tgtfile):
  292. continue
  293. copies.append(branch)
  294. branches[branch] -= srcfile_size
  295. args = build_copy_file(srcpath,branch,relpath)
  296. print('# copy')
  297. print_args(args)
  298. if execute:
  299. execute_cmd(args)
  300. break
  301. if prune:
  302. leftovers = set(existing) - set(copies)
  303. for branch in leftovers:
  304. branches[branch] += srcfile_size
  305. tgtfile = os.path.join(branch,relpath)
  306. print('# remove')
  307. args = ['rm','-vf',tgtfile]
  308. print_args(args)
  309. if execute:
  310. execute_cmd(args)
  311. except KeyboardInterrupt:
  312. print("exiting: CTRL-C pressed")
  313. except BrokenPipeError:
  314. pass
  315. sys.exit(0)
  316. if __name__ == "__main__":
  317. main()