HEX
Server: Apache/2.4.6 (CentOS) OpenSSL/1.0.2k-fips PHP/7.2.34
System: Linux atalantini.com 3.10.0-1127.13.1.el7.x86_64 #1 SMP Tue Jun 23 15:46:38 UTC 2020 x86_64
User: root (0)
PHP: 7.2.34
Disabled: NONE
Upload Files
File: //lib/python2.7/site-packages/kobo/shortcuts.py
# -*- coding: utf-8 -*-


"""
Various useful shortcuts.
"""


import os
import sys
import subprocess
import random
import re
import hashlib
import threading


__all__ = (
    "force_list",
    "force_tuple",
    "allof",
    "anyof",
    "noneof",
    "oneof",
    "is_empty",
    "iter_chunks",
    "random_string",
    "hex_string",
    "touch",
    "read_from_file",
    "save",
    "save_to_file",
    "find_symlinks_to",
    "run",
    "compute_file_checksums",
    "parse_checksum_line",
    "read_checksum_file",
    "split_path",
    "relative_path",
    "makedirs",
)


def force_list(value):
    """Convert a value to list.

    @rtype: list
    """
    if type(value) is list:
        return value
    if type(value) in (tuple, set):
        return list(value)
    return [value]


def force_tuple(value):
    """Convert a value to tuple.

    @rtype: tuple
    """
    if type(value) is tuple:
        return value
    if type(value) in (list, set):
        return tuple(value)
    return (value, )


def allof(*args, **kwargs):
    """Test if all values are True.

    @rtype: bool
    """
    for i in list(args) + kwargs.values():
        if not i:
            return False
    return True


def anyof(*args, **kwargs):
    """Test if at least one of the values is True.

    @rtype: bool
    """
    for i in list(args) + kwargs.values():
        if i:
            return True
    return False


def noneof(*args, **kwargs):
    """Test if all values are False.

    @rtype: bool
    """
    for i in list(args) + kwargs.values():
        if i:
            return False
    return True


def oneof(*args, **kwargs):
    """Test if just one of the values is True.

    @rtype: bool
    """
    found = False
    for i in list(args) + kwargs.values():
        if i:
            if found:
                return False
            found = True
    return found


def is_empty(value):
    """Test if value is None, empty string, list, tuple or dict.

    @rtype: bool
    """
    if value is None:
        return True
    if type(value) in (list, tuple, dict):
        return len(value) == 0
    if not value:
        return True
    return False


def iter_chunks(input_list, chunk_size):
    """Iterate through input_list and yield chunk_size-d chunks."""

    # TODO: any idea how to detect files better? This works for StringIO at least...
    is_file = type(input_list) is file
    is_file |= hasattr(input_list, "read") and hasattr(input_list, "close") and hasattr(input_list, "seek")
    if is_file:
        while 1:
            chunk = input_list.read(chunk_size)
            if not chunk:
                break
            yield chunk
        return

    # TODO: any idea how to detect generators better?
    if hasattr(input_list, "__iter__") and hasattr(input_list, "next"):
        chunk = []
        for i in input_list:
            chunk.append(i)
            if len(chunk) == chunk_size:
                yield chunk
                chunk = []
        if chunk:
            yield chunk
        return

    can_slice = hasattr(input_list, "__getslice__")
    for i in xrange(0, len(input_list), chunk_size):
        if can_slice:
            # regular list
            yield input_list[i:i + chunk_size]
        else:
            # xrange, etc.
            chunk = []
            for j in xrange(i, i + chunk_size):
                if j < len(input_list):
                    chunk.append(input_list[j])
            yield chunk


def random_string(length=32, alphabet=None):
    """Return random string of given lenght which consists of characters from the alphabet.

    @rtype: str
    """
    alphabet = alphabet or "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
    return "".join([ random.choice(alphabet) for i in xrange(length) ])


def hex_string(string):
    """Convert a string to a string of hex digits.

    @rtype: str
    """
    return "".join(( "%02x" % ord(i) for i in string ))


def touch(filename, mode=0644):
    """Touch a file."""
    save_to_file(filename, "", append=True, mode=mode)


def read_from_file(filename, lines=None, re_filter=None):
    """Read a text file."""
    result = []
    fo = open(filename, "rt")

    if re_filter is not None:
        re_filter_compiled = re.compile(re_filter)

    for i, line in enumerate(fo):
        if lines is not None and i+1 not in lines:
            continue
        line = line.rstrip("\r\n")
        if re_filter is not None and not re_filter_compiled.match(line):
            continue
        result.append(line)

    fo.close()
    return result


def save_to_file(filename, text, append=False, mode=0644):
    """Save text to a file."""
    if append and os.path.exists(filename):
        fd = os.open(filename, os.O_RDWR | os.O_APPEND, mode)
    else:
        fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, mode)
    os.write(fd, text)
    os.close(fd)


def save(*args, **kwargs):
    print >> sys.stderr, "DeprecationWarning: kobo.shortcuts.save() is deprecated, use kobo.shortcuts.save_to_file() instead."
    return save_to_file(*args, **kwargs)


def find_symlinks_to(target, directory):
    """Find symlinks which point to a target.

    @param target: the symlink target we're looking for
    @type target: str
    @param directory: directory with symlinks
    @type directory: str
    @return: list of symlinks to the target
    @rtype: list
    """

    target = os.path.abspath(target)
    directory = os.path.abspath(directory)
    result = []
    for fn in os.listdir(directory):
        path = os.path.abspath(os.path.join(directory, fn))

        if not os.path.islink(path):
            continue

        if os.path.abspath(os.path.join(directory, os.path.normpath(os.readlink(path)))) == os.path.abspath(target):
            result.append(path)

    return result


def run(cmd, show_cmd=False, stdout=False, logfile=None, can_fail=False, workdir=None, stdin_data=None, return_stdout=True, buffer_size=4096, **kwargs):
    """Run a command in shell.

    @param show_cmd: show command in stdout/log
    @type show_cmd: bool
    @param stdout: print output to stdout
    @type stdout: bool
    @param logfile: save output to logfile
    @type logfile: str
    @param can_fail: when set, retcode is returned instead of raising RuntimeError
    @type can_fail: bool
    @param workdir: change current directory to workdir before starting a command
    @type workdir: str
    @param stdin_data: stdin data passed to a command
    @type stdin_data: str
    @param buffer_size: size of buffer for reading from proc's stdout, use -1 for line-buffering
    @type buffer_size: int
    @return_stdout: return command stdout as a function result (turn off when working with large data, None is returned instead of stdout)
    @return_stdout: bool
    @return: (command return code, merged stdout+stderr)
    @rtype: (int, str) or (int, None)
    """
    if type(cmd) in (list, tuple):
        import pipes
        cmd = " ".join((pipes.quote(i) for i in cmd))

    log = None
    if logfile:
        logfile = os.path.join(workdir or "", logfile)
        # What happens to log file if it exists already? If show_cmd is True,
        # it will be overwritten. Otherwise the command output will just be
        # appended to the existing file.
        mode = 'a' if not show_cmd and os.path.exists(logfile) else 'w'
        log = open(logfile, mode)

    try:

        if show_cmd:
            command = "COMMAND: %s\n%s\n" % (cmd, "-" * (len(cmd) + 9))
            if stdout:
                print command,
            if logfile:
                log.write(command)

        stdin = None
        if stdin_data is not None:
            stdin = subprocess.PIPE

        proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
                                stderr=subprocess.STDOUT, stdin=stdin,
                                cwd=workdir, **kwargs)

        if stdin_data is not None:
            class StdinThread(threading.Thread):
                def run(self):
                    proc.stdin.write(stdin_data)
                    proc.stdin.close()
            stdin_thread = StdinThread()
            stdin_thread.daemon = True
            stdin_thread.start()

        output = ""
        while True:
            if buffer_size == -1:
                lines = proc.stdout.readline()
            else:
                try:
                    lines = proc.stdout.read(buffer_size)
                except (IOError, OSError), ex:
                    import errno
                    if ex.errno == errno.EINTR:
                        continue
                    else:
                        raise

            if lines == "":
                break
            if stdout:
                sys.stdout.write(lines)
            if logfile:
                log.write(lines)
            if return_stdout:
                output += lines
        proc.wait()

    finally:
        if logfile:
            log.close()

    if stdin_data is not None:
        stdin_thread.join()

    err_msg = "ERROR running command: %s" % cmd
    if logfile:
        err_msg += "\nFor more details see %s" % logfile

    if proc.returncode != 0 and not can_fail:
        exc = RuntimeError(err_msg)
        exc.output = output
        raise exc

    if proc.returncode != 0 and show_cmd:
        print >> sys.stderr, err_msg

    if not return_stdout:
        output = None

    return (proc.returncode, output)


def compute_file_checksums(filename, checksum_types):
    """
    Compute a file checksum of given type.
    Checksum must be supported by hashlib.

    @param filename: path to a file
    @type filename: str
    @param checksum_type: checksum types, supported by hashlib
    @type checksum_type: str or list
    @return: {checksum_type: digest in lowercase hex}
    @rtype: dict
    """

    checksum_types = set(force_tuple(checksum_types))
    checksums = {}

    for checksum_type in checksum_types:
        try:
            checksums[checksum_type] = getattr(hashlib, checksum_type)()
        except AttributeError:
            raise ValueError("Checksum is not supported in hashlib: %s" % checksum_type)

    fo = open(filename, "r")
    while True:
        chunk = fo.read(1024**2)
        if not chunk:
            break
        for checksum_type in checksum_types:
            checksums[checksum_type].update(chunk)
    fo.close()

    result = {}
    for checksum_type, checksum in checksums.iteritems():
        result[checksum_type] = checksum.hexdigest().lower()
    return result


CHECKSUM_LINE_RE = re.compile(r"^(?P<escaped>\\)?(?P<checksum>\S+) [ \*](?P<path>.*)$")
def parse_checksum_line(line):
    """Parse a line of md5sum, sha256sum, ... file.

    @param line: line of a *sum file
    @type line: str
    @return: (checksum, path)
    @rtype: (str, str)
    """

    if not line.strip():
        return None
    if line.strip().startswith("#"):
        return None

    match = CHECKSUM_LINE_RE.match(line)
    data = match.groupdict()
    if data["escaped"]:
        data["path"] = data["path"].decode('string-escape')
    return (data["checksum"], data["path"])


def read_checksum_file(file_name):
    """Read checksums from a file.

    @param file_name: checksum file
    @type file_name: str
    @return: [(checksum, path)]
    @rtype: [(str, str)]
    """

    result = []
    fo = open(file_name, "rb")

    for line in fo:
        checksum_tuple = parse_checksum_line(line)
        if checksum_tuple is not None:
            result.append(checksum_tuple)

    fo.close()
    return result


def split_path(path):
    """Split path to a list.

    @param path: a path
    @type path: str
    @return: list with path elements
    @rtype: list
    """

    head, tail = os.path.split(os.path.normpath(path))
    if not head:
        return [tail]
    if not tail:
        if head == os.sep:
            return [head]
        return split_path(head[:-len(os.sep)])
    return split_path(head) + [tail]


def relative_path(src_path, dst_path=None):
    """Determine relative path between two paths.
    This function is primarily meant for tweaking paths for symlinks.
    Paths ending with '/' are considered as directories.

    @param src_path:
    @type src_path:
    @param: dst_path:
    @type dst_path:
    @returns: relative path from src to dst
    @rtype: str
    """

    if dst_path is None:
        dst_path = os.path.curpath(os.curdir) + "/"
    src_path, src_file = os.path.split(src_path)
    dst_path, dst_file = os.path.split(dst_path)

    src_path = split_path(src_path)
    dst_path = split_path(dst_path)

    if not src_file and dst_file:
        raise RuntimeError("")

    while src_path and dst_path and src_path[0] == dst_path[0]:
        src_path.pop(0)
        dst_path.pop(0)

    while dst_path:
        src_path.insert(0, "..")
        dst_path.pop(0)

    src_path.append(src_file)
    dst_path.append(dst_file)
    return os.path.join(*src_path)


def makedirs(path, mode=0777):
    """
    Wrapper to os.makedirs which does not
    throw an exception on existing directory.
    """
    try:
        os.makedirs(path, mode)
    except OSError, ex:
        if ex.errno != 17:
            raise
        if not os.path.isdir(path):
            raise