文件和 I/O

读取文件

在 Python 2 中,从文件系统读取的文件内容不会进行解码。也就是说,文件的内容是字节字符串,而不是 Unicode 字符串。

>>> with open("/etc/passwd") as f:
...    content = f.read()
>>> print(type(content))
<type 'str'>
>>> print(type(content.decode("utf-8")))
<type 'unicode'>

在 Python 3 中,open 提供了 encoding 选项。如果文件不是以二进制模式打开,则编码将由 locale.getpreferredencoding(False) 或用户的输入确定。

>>> with open("/etc/hosts", encoding="utf-8") as f:
...     content = f.read()
...
>>> print(type(content))
<class 'str'>

二进制模式

>>> with open("/etc/hosts", "rb") as f:
...     content = f.read()
...
>>> print(type(content))
<class 'bytes'>

逐行读取

>>> with open("/etc/hosts") as f:
...     for line in f:
...         print(line, end='')
...
127.0.0.1       localhost
255.255.255.255     broadcasthost
::1             localhost

读取文件块

>>> chunk_size = 16
>>> content = ''
>>> with open('/etc/hosts') as f:
...     for c in iter(lambda: f.read(chunk_size), ''):
...         content += c
...
>>> print(content)
127.0.0.1       localhost
255.255.255.255 broadcasthost
::1             localhost

写入文件

>>> content = "Awesome Python!"
>>> with open("foo.txt", "w") as f:
...     f.write(content)

复制文件

>>> from distutils.file_util import copy_file
>>> copy_file("foo", "bar")
('bar', 1)

移动文件

>>> from distutils.file_util import move_file
>>> move_file("./foo", "./bar")
'./bar'

列出目录

>>> >>> import os
>>> dirs = os.listdir(".")

在 Python 3.6 之后,我们可以使用 os.scandir 来列出目录。它更加方便,因为 os.scandir 返回一个 os.DirEntry 对象的迭代器。在这种情况下,我们可以通过访问 os.DirEntry 的属性来获取文件信息。更多信息可以在 文档 中找到。

>>> with os.scandir("foo") as it:
...     for entry in it:
...         st = entry.stat()
...

创建目录

类似于 mkdir -p /path/to/dest

>>> from distutils.dir_util import mkpath
>>> mkpath("foo/bar/baz")
['foo', 'foo/bar', 'foo/bar/baz']

复制目录

>>> from distutils.dir_util import copy_tree
>>> copy_tree("foo", "bar")
['bar/baz']

删除目录

>>> from distutils.dir_util import remove_tree
>>> remove_tree("dir")

路径拼接

>>> from pathlib import Path
>>> p = Path("/Users")
>>> p = p / "Guido" / "pysheeet"
>>> p
PosixPath('/Users/Guido/pysheeet')

获取绝对路径

>>> from pathlib import Path
>>> p = Path("README.rst")
PosixPath('/Users/Guido/pysheeet/README.rst')

获取用户主目录

>>> from pathlib import Path
>>> Path.home()
PosixPath('/Users/Guido')

获取当前目录

>>> from pathlib import Path
>>> p = Path("README.rst")
>>> p.cwd()
PosixPath('/Users/Guido/pysheeet')

获取路径属性

>>> from pathlib import Path
>>> p = Path("README.rst").absolute()
>>> p.root
'/'
>>> p.anchor
'/'
>>> p.parent
PosixPath('/Users/Guido/pysheeet')
>>> p.parent.parent
PosixPath('/Users/Guido')
>>> p.name
'README.rst'
>>> p.suffix
'.rst'
>>> p.stem
'README'
>>> p.as_uri()
'file:///Users/Guido/pysheeet/README.rst'

读取 gzip 压缩的 CSV 文件

import gzip
import csv

f = "example.gz"
with gzip.open(f, 'rt', newline='') as gz:
    reader = csv.DictReader(gz)
    for row in reader:
        print(row)

Linux Inotify

import selectors
import struct
import ctypes
import sys
import os

from pathlib import Path
from ctypes.util import find_library

# ref: <sys/inotify.h>
IN_CREATE = 0x00000100
IN_DELETE = 0x00000200

INOTIFY_EVENT = "iIII"
INOTIFY_EVENT_LEN = struct.calcsize(INOTIFY_EVENT)

lib = find_library("c")
assert lib

libc = ctypes.CDLL(lib)


class Inotify(object):
    def __init__(self, path):
        self._path = path
        self._fd = None
        self._wd = None
        self._buf = b""
        self._sel = selectors.DefaultSelector()

    def init(self):
        fd = libc.inotify_init()
        if fd < 0:
            errno = ctypes.get_errno()
            raise OSError(errno, f"{os.strerror(errno)}")
        return fd

    def watch(self, fd, path):
        p = str(path).encode("utf8")
        wd = libc.inotify_add_watch(fd, p, IN_CREATE | IN_DELETE)
        if wd < 0:
            errno = ctypes.get_errno()
            raise OSError(errno, f"{os.strerror(errno)}")
        return wd

    def remove(self, fd, wd):
        libc.inotify_rm_watch(self._fd, self._wd)

    def handle(self, fd, *a):
        b = os.read(fd, 1024)
        if not b:
            return
        yield from self.parse(b);

    def parse(self, buf):
        self._buf += buf
        while True:
            l = len(self._buf)
            if l < INOTIFY_EVENT_LEN:
                break

            hd = self._buf[:INOTIFY_EVENT_LEN]
            wd, mask, cookie, length = struct.unpack(INOTIFY_EVENT, hd)
            event_length = INOTIFY_EVENT_LEN + length
            if l < event_length:
                break

            filename = self._buf[INOTIFY_EVENT_LEN:event_length]
            self._buf = self._buf[event_length:]
            yield mask, filename.rstrip(b"\0").decode("utf8")

    def __enter__(self):
        self._fd = self.init()
        self._wd = self.watch(self._fd, self._path)
        self._sel.register(self._fd, selectors.EVENT_READ, self.handle)
        return self

    def __exit__(self, *e):
        self.remove(self._fd, self._wd)
        if len(e) > 0 and e[0]:
            print(e, file=sys.stderr)

    def run(self):
        while True:
            events = self._sel.select()
            for k, mask in events:
                cb = k.data
                yield from cb(k.fileobj, mask)


with Inotify(Path("/tmp")) as i:
    for m, f in i.run():
        print(m, f)