HEX
Server: Apache/2.4.6 (CentOS) OpenSSL/1.0.2k-fips PHP/7.2.34
System: Linux atalantini.com 3.10.0-1127.13.1.el7.x86_64 #1 SMP Tue Jun 23 15:46:38 UTC 2020 x86_64
User: root (0)
PHP: 7.2.34
Disabled: NONE
Upload Files
File: //lib/python2.7/site-packages/kobo/hardlink.py
# -*- coding: utf-8 -*-


import os
import shutil
import logging
import tempfile


__all__ = (
    "Hardlink",
    "UndoHardlink",
)


class Hardlink(object):
    """
    Hardink files within one filesystem or copy files
    to another filesystem while preserving hardlinks.
    """

    def __init__(self, test=False, logger=None):
        self._inode_cache = {}
        self._precache = {}
        self.test = test
        self.logger = logger

    def __call__(self, src, dst):
        return self.link(src, dst)

    def log(self, loglevel, msg):
        if self.logger:
            self.logger.log(loglevel, msg)

    def precache(self, path, recursive=False):
        def get_stats(item):
            return [ item[i] for i in ("st_dev", "st_ino", "st_mtime", "st_size") ]

        path = os.path.abspath(path)
        if os.path.isfile(path):
            msg = "Precaching %s" % path
            self.log(logging.DEBUG, msg)
            st = os.stat(path)
            item = {
                "st_dev": st.st_dev,
                "st_ino": st.st_ino,
                "st_mtime": st.st_mtime,
                "st_size": st.st_size,
                "path": path,
            }

            fn = os.path.basename(path)
            precache_key = (fn, item["st_mtime"], item["st_size"])
            if precache_key in self._precache:
                if get_stats(item) != get_stats(self._precache[precache_key]):
                    self.log(logging.DEBUG, "Caching failed, files are different: %s, %s" % (path, self._precache[precache_key]["path"]))

            self._precache[precache_key] = item
            return

        for fn in os.listdir(path):
            fn_path = os.path.join(path, fn)

            if os.path.isfile(fn_path):
                self.precache(fn_path)
                continue

            if recursive:
                self.precache(fn_path, recursive=True)

    def link(self, src, dst):
        """ Create hardlinks or copy files."""

        if os.path.isfile(src):
            self._link_file(src, dst)
        else:
            self._link_dir(src, dst)

    def _link_dir(self, src, dst):
        # src and dst has to be directories
        for i in (src, dst):
            if not os.path.isdir(i):
                raise IOError("Not a file: %s" % i)

        if not os.path.isdir(dst):
            if self.test:
                msg = "TEST: would create directory: %s" % dst
                self.log(logging.INFO, msg)
            else:
                msg = "Creating directory: %s" % dst
                self.log(logging.INFO, msg)
                os.makedirs(dst)

            for fn in os.listdir(src):
                new_src = os.path.join(src, fn)
                new_dst = os.path.join(dst, fn)
                self.link(new_src, new_dst)

    def _link_file(self, src, dst):
        if self.test:
            msg = "TEST: would link %s -> %s" % (src, dst)
            self.log(logging.INFO, msg)
            return

        if not os.path.isfile(src):
            raise IOError("Not a file: %s" % src)

        if os.path.isdir(dst):
            dst = os.path.join(dst, os.path.basename(src))

        src_stat = os.stat(src)
        src_key = (src_stat.st_dev, src_stat.st_ino)

        if os.path.isfile(dst):
            dst_stat = os.stat(dst)
            dst_key = (dst_stat.st_dev, dst_stat.st_ino)
            if src_key == dst_key:
                # same device and inode -> same content -> skip silently
                msg = "Files have the same inode: %s -> %s" % (src, dst)
                self.log(logging.DEBUG, msg)
                return

            if (dst_stat.st_mtime, dst_stat.st_size) == (src_stat.st_mtime, src_stat.st_size):
                # file exists, but size and time is the same
                msg = "File already exists: %s" % dst
                self.log(logging.WARNING, msg)
                return

            # file exists and size or time is different
            msg = "File already exists, time or size differs: %s" % dst
            self.log(logging.ERROR, msg)
            return


        dst_dir = os.path.dirname(dst)
        dst_dir_stat = os.stat(dst_dir)

        if (src_stat.st_dev == dst_dir_stat.st_dev):
            # same device
            msg = "Linking file: %s -> %s" % (src, dst)
            self.log(logging.INFO, msg)
            os.link(src, dst)
            return

        if src_key in self._inode_cache:
            # (st_dev, st_ino) found in the cache
            msg = "Cache hit. Linking file: %s -> %s" % (src, dst)
            self.log(logging.INFO, msg)
            os.link(self._inode_cache[src_key], dst)
            return

        src_fn = os.path.basename(src)
        if src_fn in self._precache:
            if (src_stat.st_mtime, src_stat.st_size) == (self._precache[src_fn]["st_mtime"], self._precache[src_fn]["st_size"]):
                msg = "Preache hit. Linking file: %s -> %s" % (self._precache[src_fn]["path"], dst)
                self.log(logging.INFO, msg)
                os.link(self._precache[src_fn]["path"], dst)
                self._inode_cache[src_key] = dst
                return

        # copy file otherwise and populate _inode_cache
        msg = "Copying file: %s -> %s" % (src, dst)
        self.log(logging.INFO, msg)
        shutil.copy2(src, dst)
        os.utime(dst, (src_stat.st_atime, src_stat.st_mtime))
        self._inode_cache[src_key] = dst


class UndoHardlink(object):

    def __init__(self, test=False, logger=None):
        self.test = test
        self.logger = logger

    def __call__(self, file_path):
        self.undo_hardlink(file_path)

    def log(self, loglevel, msg):
        if self.logger:
            self.logger.log(loglevel, msg)

    def undo_hardlink(self, file_path):
        if os.path.isdir(file_path):
            for fn in os.listdir(file_path):
                self.undo_hardlink(os.path.join(file_path, fn))
            return

        if not os.path.isfile(file_path):
            # skip special files (devices, etc.)
            return

        if self.test:
            self.log(logging.INFO, "TEST: Would remove hardlink: %s" % file_path)
            return

        self.log(logging.DEBUG, "Removing hardlink: %s" % file_path)

        try:
            st = os.stat(file_path)
        except IOError, ex:
            self.log(logging.ERROR, "Stat failed: %s" % ex)

        if st.st_nlink < 2:
            return

        dn = os.path.dirname(file_path)
        fn = os.path.basename(file_path)

        tmp_dir = tempfile.mkdtemp(dir=dn)
        tmp_file = os.path.join(tmp_dir, fn)

        try:
            shutil.copy(file_path, tmp_file)
        except IOError, ex:
            self.log(logging.ERROR, "Copy failed: %s" % ex)
            raise

        try:
            shutil.move(tmp_file, file_path)
        except IOError, ex:
            self.log(logging.ERROR, "Move failed: %s" % ex)
            raise

        os.chown(file_path, st.st_uid, st.st_gid)
        os.chmod(file_path, st.st_mode)
        os.utime(file_path, (st.st_atime, st.st_mtime))

        try:
            os.rmdir(tmp_dir)
        except IOError, ex:
            self.log(logging.ERROR, "Move failed: %s" % ex)
            raise