cachefs.py (10113B)
1 #!/usr/bin/python 2 3 """FUSE filesystem to maintain a fixed-size cache for a hierarchy""" 4 5 import fuse 6 import errno 7 import logging 8 import os 9 import shutil 10 import sys 11 import pickle 12 import time 13 import heapq 14 from threading import Lock 15 from metacachefs import MetaCacheFS 16 17 DB = ".cache.db" 18 19 class __Struct: 20 def __init__(self, **kw): 21 for k, v in kw.iteritems(): 22 setattr(self, k, v) 23 24 class Stat(__Struct): pass 25 26 class FileTooBigException: 27 pass 28 29 fuse.fuse_python_api = (0, 2) 30 31 class File: 32 @property 33 def size(self): 34 return os.lstat(os.path.join(self.cache.path, self.path)).st_size 35 36 def __init__(self, cache, path): 37 self.cache = cache 38 self.path = path 39 self.deleted = False 40 self.touch() 41 42 def touch(self): 43 self.lastAccessed = time.time() 44 45 class Cache: 46 def __init__(self, maxSize, path): 47 self.maxSize = maxSize 48 self.currentSize = 0 49 self.heap = [] 50 self.files = {} 51 self.path = path 52 # TODO self.discover() 53 54 @property 55 def freeSize(self): 56 return self.maxSize - self.currentSize 57 58 def discover(self): 59 for x in os.walk(self.path): 60 dirpath, dirnames, filenames = x 61 for f in filenames: 62 full = os.path.join(dirpath, f)[len(self.path):] 63 if full == DB: 64 continue 65 if full not in self.files.keys(): 66 self.addFile(full) 67 68 def addFile(self, path): 69 self.files[path] = File(self, path) 70 heapq.heappush(self.heap, (time.time(), self.files[path])) 71 self.currentSize += self.files[path].size 72 73 def resizeFile(self, path, newSize): 74 f = self.files[path] 75 self.currentSize -= f.size 76 self.currentSize += newSize 77 78 def moveFile(self, path, newPath): 79 self.files[path].path = newPath 80 # TODO check no overwrite! 81 self.files[newPath] = self.files[path] 82 del self.files[path] 83 84 def deleteFile(self, path): 85 self.currentSize -= self.files[path].size 86 self.files[path].deleted = True 87 del self.files[path] 88 89 def oldestFile(self): 90 while len(self.heap) > 0: 91 time, f = heapq.heappop(self.heap) 92 if time != f.lastAccessed: 93 # stale 94 heapq.heappush(self.heap, (f.lastAccessed, f)) 95 if f.deleted: 96 continue 97 return f 98 99 100 class CacheFS(MetaCacheFS): 101 def __init__(self, *args, **kw): 102 MetaCacheFS.__init__(self, *args, **kw) 103 104 self.rwlock = Lock() 105 self.cache = None 106 107 # Initialize a Logger() object to handle logging. 108 self.logger = logging.getLogger('cachefs') 109 self.logger.setLevel(logging.INFO) 110 self.logger.addHandler(logging.StreamHandler(sys.stderr)) 111 self.parser.add_option('--cache', dest='cache', metavar='CACHE', 112 help="cache (mandatory)") 113 self.parser.add_option('--db', dest='db', metavar='DB', default=DB, 114 help="db location (absolute or relative to the cache)") 115 self.parser.add_option('--size', dest='size', metavar='SIZE', 116 type='int', help="maximal size of cache") 117 118 def fsinit(self): 119 MetaCacheFS.fsinit(self) 120 121 options = self.cmdline[0] 122 self.cacheRoot = options.cache 123 self.db = os.path.join(self.cacheRoot, options.db) 124 self.size = options.size 125 if not self.size: 126 vfs = os.statvfs(self.cacheRoot) 127 # half of available space on the cache fs 128 self.size = (vfs.f_bavail * vfs.f_bsize) / 2 129 try: 130 with open(self.db, 'rb') as f: 131 self.cache = pickle.load(f) 132 assert(self.cache != None) 133 except Exception as e: 134 self.cache = Cache(self.size, self.cacheRoot) 135 if (self.cache.maxSize > self.size): 136 self.makeRoom(self.cache.maxSize - self.size) 137 self.cache.maxSize = self.size 138 139 140 def fsdestroy(self): 141 with open(self.db, 'wb+') as f: 142 pickle.dump(self.cache, f) 143 144 def cachePath(self, path): 145 return os.path.join(self.cacheRoot, path[1:]) 146 147 def makeRoom(self, bytes): 148 # TODO maybe don't flush all the cache for a big file even if it fits... 149 # TODO adjust for the available size of the underlying FS 150 if bytes > self.cache.maxSize: 151 raise FileTooBigException() 152 while bytes > self.cache.freeSize: 153 f = self.cache.oldestFile() 154 self.cache.deleteFile(f.path) 155 print("remove %s" % self.cachePath("/"+f.path)) 156 os.unlink(self.cachePath("/"+f.path)) 157 print("current size is now %d" % self.cache.currentSize) 158 159 def registerHit(self, path): 160 """register a hit for path in the cache""" 161 self.cache.files[path[1:]].touch() 162 163 def isCached(self, path): 164 """is a path cached?""" 165 if path == "/" + DB: 166 return False 167 if os.path.exists(self.cachePath(path)): 168 statOriginal = os.lstat(self.cachePath(path)) 169 statCache = os.lstat(self.cachePath(path)) 170 if statOriginal.st_size == statCache.st_size: 171 # the cache file is good 172 # TODO better checks 173 return True 174 return False 175 176 def prepare(self, path): 177 if path == "/" + DB: 178 return self.sourcePath(path) 179 if not os.path.exists(self.sourcePath(path)): 180 # no such original file, let the source handle it 181 return self.sourcePath(path) 182 if self.isCached(path): 183 self.registerHit(path) 184 return self.cachePath(path) 185 statOriginal = os.lstat(self.sourcePath(path)) 186 # cache the file and then open it 187 with self.rwlock: 188 try: 189 self.makeRoom(statOriginal.st_size) 190 except FileTooBigException: 191 # no room to cache, open the original file 192 return self.sourcePath(path) 193 # create folder hierarchy 194 head, tail = os.path.split(self.cachePath(path)) 195 print "will create folders" 196 try: 197 os.makedirs(head) 198 except OSError as exc: # Python >2.5 199 if exc.errno == errno.EEXIST and os.path.isdir(head): 200 pass 201 else: raise 202 print "will copy %s to %s" % (self.sourcePath(path), self.cachePath(path)) 203 shutil.copy2(self.sourcePath(path), self.cachePath(path)) 204 self.cache.addFile(path[1:]) 205 return self.cachePath(path) 206 207 def chmod(self, path, mode): 208 wasCached = self.isCached(path) 209 retval = super(CacheFS, self).chmod(path, mode) 210 if wasCached: 211 os.chmod(self.cachePath(path), mode) # ignore errors 212 return retval 213 214 def chown(self, path, mode): 215 wasCached = self.isCached(path) 216 retval = super(CacheFS, self).chown(path, mode) 217 if wasCached: 218 os.chown(self.cachePath(path), mode) # ignore errors 219 return retval 220 221 def read(self, path, size, offset): 222 f = self.prepare(path) 223 with self.rwlock: 224 fh = os.open(f, os.O_RDONLY) 225 os.lseek(fh, offset, 0) 226 x = os.read(fh, size) 227 os.close(fh) 228 return x 229 230 def rename(self, old, new): 231 wasCached = self.isCached(old) 232 retval = super(CacheFS, self).rename(old, new) 233 if wasCached: 234 self.cache.moveFile(old[1:], new[1:]) 235 os.rename(self.cachePath(old), self.cachePath(new)) 236 return retval 237 238 def statfs(self): 239 stv = os.statvfs(self.cacheRoot) 240 stv = Stat(**dict((key, getattr(stv, key)) for key in ["f_bavail", 241 "f_bfree", "f_blocks", "f_bsize", "f_favail", "f_ffree", "f_files", 242 "f_flag", "f_frsize", "f_namemax"])) 243 stv.f_bfree = (self.cache.maxSize - self.cache.currentSize)/stv.f_bsize 244 stv.f_bavail = stv.f_bfree 245 stv.f_blocks = self.cache.maxSize/stv.f_bsize 246 return stv 247 248 def doTruncate(self, path, length): 249 with open(path, 'r+') as f: 250 f.truncate(length) 251 252 def truncate(self, path, length): 253 wasCached = self.isCached(path) 254 retval = super(CacheFS, self).truncate(path, length) 255 self.doTruncate(self.sourcePath(path), length) 256 if wasCached: 257 self.cache.currentSize -= self.cache.files[path[1:]].size 258 self.doTruncate(self.cachePath(path), length) 259 self.cache.currentSize += self.cache.files[path[1:]].size 260 return retval 261 262 def unlink(self, path): 263 wasCached = self.isCached(path) 264 retval = super(CacheFS, self).unlink(path) 265 if wasCached: 266 self.cache.deleteFile(path[1:]) 267 os.unlink(self.cachePath(path)) 268 return retval 269 270 def utimens(self, path, ts_acc, ts_mod): 271 wasCached = self.isCached(path) 272 times = (ts_acc.tv_sec, ts_mod.tv_sec) 273 retval = super(CacheFS, self).utimens(path, ts_acc, ts_mod) 274 if wasCached: 275 os.utime(self.cachePath(path), times) 276 return retval 277 278 def doWrite(self, path, data, offset): 279 with self.rwlock: 280 fh = os.open(path, os.O_WRONLY) 281 os.lseek(fh, offset, 0) 282 x = os.write(fh, data) 283 os.close(fh) 284 return x 285 286 def write(self, path, data, offset): 287 wasCached = self.isCached(path) 288 print "writing to a %s file" % ("cached" if wasCached else "non-cached") 289 retval = super(CacheFS, self).write(path, data, offset) 290 if retval > 0 and wasCached: 291 self.makeRoom(len(data)) 292 self.cache.currentSize -= self.cache.files[path[1:]].size 293 self.doWrite(self.cachePath(path), data, offset) 294 self.cache.currentSize += self.cache.files[path[1:]].size 295 return retval 296 297 298 if __name__ == "__main__": 299 cachefs = CacheFS() 300 fuse_opts = cachefs.parse(['-o', 'fsname=cachefs'] + sys.argv[1:]) 301 cachefs.main() 302