文件和 I/O¶
读取文件¶
在 Python 2 中,从文件系统读取的文件内容不会进行解码。也就是说,文件的内容是字节字符串,而不是 Unicode 字符串。
>>> with open("/etc/passwd") as f:
... content = f.read()
>>> print(type(content))
<type 'str'>
>>> print(type(content.decode("utf-8")))
<type 'unicode'>
在 Python 3 中,open 提供了 encoding
选项。如果文件不是以二进制模式打开,则编码将由 locale.getpreferredencoding(False)
或用户的输入确定。
>>> with open("/etc/hosts", encoding="utf-8") as f:
... content = f.read()
...
>>> print(type(content))
<class 'str'>
二进制模式
>>> with open("/etc/hosts", "rb") as f:
... content = f.read()
...
>>> print(type(content))
<class 'bytes'>
逐行读取¶
>>> with open("/etc/hosts") as f:
... for line in f:
... print(line, end='')
...
127.0.0.1 localhost
255.255.255.255 broadcasthost
::1 localhost
读取文件块¶
>>> chunk_size = 16
>>> content = ''
>>> with open('/etc/hosts') as f:
... for c in iter(lambda: f.read(chunk_size), ''):
... content += c
...
>>> print(content)
127.0.0.1 localhost
255.255.255.255 broadcasthost
::1 localhost
写入文件¶
>>> content = "Awesome Python!"
>>> with open("foo.txt", "w") as f:
... f.write(content)
创建符号链接¶
>>> import os
>>> os.symlink("foo", "bar")
>>> os.readlink("bar")
'foo'
复制文件¶
>>> from distutils.file_util import copy_file
>>> copy_file("foo", "bar")
('bar', 1)
移动文件¶
>>> from distutils.file_util import move_file
>>> move_file("./foo", "./bar")
'./bar'
列出目录¶
>>> >>> import os
>>> dirs = os.listdir(".")
在 Python 3.6 之后,我们可以使用 os.scandir
来列出目录。它更加方便,因为 os.scandir
返回一个 os.DirEntry
对象的迭代器。在这种情况下,我们可以通过访问 os.DirEntry
的属性来获取文件信息。更多信息可以在 文档 中找到。
>>> with os.scandir("foo") as it:
... for entry in it:
... st = entry.stat()
...
创建目录¶
类似于 mkdir -p /path/to/dest
>>> from distutils.dir_util import mkpath
>>> mkpath("foo/bar/baz")
['foo', 'foo/bar', 'foo/bar/baz']
复制目录¶
>>> from distutils.dir_util import copy_tree
>>> copy_tree("foo", "bar")
['bar/baz']
删除目录¶
>>> from distutils.dir_util import remove_tree
>>> remove_tree("dir")
路径拼接¶
>>> from pathlib import Path
>>> p = Path("/Users")
>>> p = p / "Guido" / "pysheeet"
>>> p
PosixPath('/Users/Guido/pysheeet')
获取绝对路径¶
>>> from pathlib import Path
>>> p = Path("README.rst")
PosixPath('/Users/Guido/pysheeet/README.rst')
获取用户主目录¶
>>> from pathlib import Path
>>> Path.home()
PosixPath('/Users/Guido')
获取当前目录¶
>>> from pathlib import Path
>>> p = Path("README.rst")
>>> p.cwd()
PosixPath('/Users/Guido/pysheeet')
获取路径属性¶
>>> from pathlib import Path
>>> p = Path("README.rst").absolute()
>>> p.root
'/'
>>> p.anchor
'/'
>>> p.parent
PosixPath('/Users/Guido/pysheeet')
>>> p.parent.parent
PosixPath('/Users/Guido')
>>> p.name
'README.rst'
>>> p.suffix
'.rst'
>>> p.stem
'README'
>>> p.as_uri()
'file:///Users/Guido/pysheeet/README.rst'
读取 gzip 压缩的 CSV 文件¶
import gzip
import csv
f = "example.gz"
with gzip.open(f, 'rt', newline='') as gz:
reader = csv.DictReader(gz)
for row in reader:
print(row)
Linux Inotify¶
import selectors
import struct
import ctypes
import sys
import os
from pathlib import Path
from ctypes.util import find_library
# ref: <sys/inotify.h>
IN_CREATE = 0x00000100
IN_DELETE = 0x00000200
INOTIFY_EVENT = "iIII"
INOTIFY_EVENT_LEN = struct.calcsize(INOTIFY_EVENT)
lib = find_library("c")
assert lib
libc = ctypes.CDLL(lib)
class Inotify(object):
def __init__(self, path):
self._path = path
self._fd = None
self._wd = None
self._buf = b""
self._sel = selectors.DefaultSelector()
def init(self):
fd = libc.inotify_init()
if fd < 0:
errno = ctypes.get_errno()
raise OSError(errno, f"{os.strerror(errno)}")
return fd
def watch(self, fd, path):
p = str(path).encode("utf8")
wd = libc.inotify_add_watch(fd, p, IN_CREATE | IN_DELETE)
if wd < 0:
errno = ctypes.get_errno()
raise OSError(errno, f"{os.strerror(errno)}")
return wd
def remove(self, fd, wd):
libc.inotify_rm_watch(self._fd, self._wd)
def handle(self, fd, *a):
b = os.read(fd, 1024)
if not b:
return
yield from self.parse(b);
def parse(self, buf):
self._buf += buf
while True:
l = len(self._buf)
if l < INOTIFY_EVENT_LEN:
break
hd = self._buf[:INOTIFY_EVENT_LEN]
wd, mask, cookie, length = struct.unpack(INOTIFY_EVENT, hd)
event_length = INOTIFY_EVENT_LEN + length
if l < event_length:
break
filename = self._buf[INOTIFY_EVENT_LEN:event_length]
self._buf = self._buf[event_length:]
yield mask, filename.rstrip(b"\0").decode("utf8")
def __enter__(self):
self._fd = self.init()
self._wd = self.watch(self._fd, self._path)
self._sel.register(self._fd, selectors.EVENT_READ, self.handle)
return self
def __exit__(self, *e):
self.remove(self._fd, self._wd)
if len(e) > 0 and e[0]:
print(e, file=sys.stderr)
def run(self):
while True:
events = self._sel.select()
for k, mask in events:
cb = k.data
yield from cb(k.fileobj, mask)
with Inotify(Path("/tmp")) as i:
for m, f in i.run():
print(m, f)