import shutil import threading import json import os import fcntl from xml.etree import ElementTree as ET import collections fsLock = threading.RLock() fsXngLock = threading.RLock() def fssafe(fn): """This ensures, that different threads of the same application are mutually excluded from access (in our case, we use this mainly for file access). This is used in our case additionally to file locking with fcntl because fcntl file locking is only possible between different processes. One process is responsible by itself to ensure that file access does not occur concurrently.""" def wrap(*args, **argsd): fsLock.acquire() res = None try: res = fn(*args, **argsd) finally: fsLock.release() return res return wrap @fssafe def write(filename, data): """Threads are mutually excluded from access. This holds true for called function.""" writeNb(filename, data) def writeNb(filename, data): """This is only locked in respect to filesystem. Threads are not mutually excluded.""" with open(filename, 'a') as f: try: fcntl.lockf(f, fcntl.LOCK_EX) f.truncate(0) f.write(data) f.flush() os.fsync(f.fileno()) finally: fcntl.lockf(f, fcntl.LOCK_UN) @fssafe def writebfn(filename, fn): with open(filename, 'ab') as f: try: fcntl.lockf(f, fcntl.LOCK_EX) f.truncate(0) fn(f) f.flush() os.fsync(f.fileno()) finally: fcntl.lockf(f, fcntl.LOCK_UN) @fssafe def readb(filename): with open(filename, 'rb') as f: try: fcntl.lockf(f, fcntl.LOCK_SH) data = f.read() finally: fcntl.lockf(f, fcntl.LOCK_UN) return data @fssafe def read(filename): return readNb(filename) def readNb(filename): with open(filename, 'r') as f: try: fcntl.lockf(f, fcntl.LOCK_SH) data = f.read() finally: fcntl.lockf(f, fcntl.LOCK_UN) return data @fssafe def readXml(filename): with open(filename, 'r') as f: try: fcntl.lockf(f, fcntl.LOCK_SH) tree = ET.parse(f) finally: fcntl.lockf(f, fcntl.LOCK_UN) return tree @fssafe def writeXml(filename, tree): with open(filename, 'a') as f: try: fcntl.lockf(f, fcntl.LOCK_EX) f.truncate(0) tree.write(f) f.flush() os.fsync(f.fileno()) finally: fcntl.lockf(f, fcntl.LOCK_UN) @fssafe def jsondump(filename, data): with open(filename, 'a') as f: try: fcntl.lockf(f, fcntl.LOCK_EX) f.truncate(0) json.dump(data, f, indent=2) f.flush() os.fsync(f.fileno()) finally: fcntl.lockf(f, fcntl.LOCK_UN) #os.fsync(f.fileno()) st = os.stat(filename) assert(st.st_size > 0 ), "Write Stat:Empty file" @fssafe def jsondump_sorted(filename, data): with open(filename, 'a') as f: try: fcntl.lockf(f, fcntl.LOCK_EX) f.truncate(0) json.dump(data, f, indent=2, sort_keys=True) f.flush() os.fsync(f.fileno()) finally: fcntl.lockf(f, fcntl.LOCK_UN) #os.fsync(f.fileno()) st = os.stat(filename) assert(st.st_size > 0 ), "Write Stat:Empty file" @fssafe def jsonload(filename): st = os.stat(filename) assert(st.st_size > 0 ), "Read Stat:Empty file" with open(filename, 'r') as f: try: fcntl.lockf(f, fcntl.LOCK_SH) data = json.load(f) finally: fcntl.lockf(f, fcntl.LOCK_UN) return data @fssafe def jsonloadordered(filename): st = os.stat(filename) assert(st.st_size > 0 ), "Read Stat:Empty file" with open(filename, 'r') as f: try: fcntl.lockf(f, fcntl.LOCK_SH) data = json.load(f, object_pairs_hook=collections.OrderedDict) finally: fcntl.lockf(f, fcntl.LOCK_UN) return data @fssafe def shcopy(srcFilename, destFilename): shcopyNb(srcFilename, destFilename) def shcopyNb(srcFilename, destFilename): with open(srcFilename, 'rb') as srcFile: try: fcntl.lockf(srcFile, fcntl.LOCK_SH) data = srcFile.read() if os.path.isdir(destFilename): destFilename = os.path.join(destFilename, os.path.basename(srcFilename)) with open(destFilename, 'ab') as destFile: try: fcntl.lockf(destFile, fcntl.LOCK_EX) destFile.truncate(0) destFile.write(data) shutil.copymode(srcFilename, destFilename) destFile.flush() os.fsync(destFile.fileno()) finally: fcntl.lockf(destFile, fcntl.LOCK_UN) finally: fcntl.lockf(srcFile, fcntl.LOCK_UN) @fssafe def shcopyfile(srcFilename, destFilename): shcopyfileNb(srcFilename, destFilename) def shcopyfileNb(srcFilename, destFilename): with open(srcFilename, 'rb') as srcFile: try: fcntl.lockf(srcFile, fcntl.LOCK_SH) data = srcFile.read() if os.path.isdir(destFilename): destFilename = os.path.join(destFilename, os.path.basename(srcFilename)) with open(destFilename, 'ab') as destFile: try: fcntl.lockf(destFile, fcntl.LOCK_EX) destFile.truncate(0) destFile.write(data) # no metadata copying! shutil.copymode(srcFilename, destFilename) destFile.flush() os.fsync(destFile.fileno()) finally: fcntl.lockf(destFile, fcntl.LOCK_UN) finally: fcntl.lockf(srcFile, fcntl.LOCK_UN) #(destPath + tempFile, os.path.join(SRBPRG_DIR, tempFile)))