數據庫?

Peewee Database 對象表示到數據庫的連接。這個 Database 類是用打開到數據庫的連接所需的所有信息實例化的,然后可以用于:

  • 打開和關閉連接。

  • 執行查詢。

  • 管理事務(和保存點)。

  • 自省表、列、索引和約束。

Peewee支持sqlite、mysql和postgres。每個數據庫類都提供一些基本的、特定于數據庫的配置選項。

from peewee import *

# SQLite database using WAL journal mode and 64MB cache.
sqlite_db = SqliteDatabase('/path/to/app.db', pragmas={
    'journal_mode': 'wal',
    'cache_size': -1024 * 64})

# Connect to a MySQL database on network.
mysql_db = MySQLDatabase('my_app', user='app', password='db_password',
                         host='10.1.0.8', port=3306)

# Connect to a Postgres database.
pg_db = PostgresqlDatabase('my_app', user='postgres', password='secret',
                           host='10.1.0.9', port=5432)

Peewee通過特定于數據庫的擴展模塊為SQLite、Postgres和CockroachDB提供高級支持。要使用擴展功能,請導入相應的特定于數據庫的模塊并使用提供的數據庫類:

from playhouse.sqlite_ext import SqliteExtDatabase

# Use SQLite (will register a REGEXP function and set busy timeout to 3s).
db = SqliteExtDatabase('/path/to/app.db', regexp_function=True, timeout=3,
                       pragmas={'journal_mode': 'wal'})


from playhouse.postgres_ext import PostgresqlExtDatabase

# Use Postgres (and register hstore extension).
db = PostgresqlExtDatabase('my_app', user='postgres', register_hstore=True)


from playhouse.cockroachdb import CockroachDatabase

# Use CockroachDB.
db = CockroachDatabase('my_app', user='root', port=26257, host='10.1.0.8')

# CockroachDB connections may require a number of parameters, which can
# alternatively be specified using a connection-string.
db = CockroachDatabase('postgresql://...')

有關數據庫擴展的詳細信息,請參閱:

初始化數據庫?

這個 Database 初始化方法要求將數據庫的名稱作為第一個參數。在建立連接時,后續的關鍵字參數將傳遞給基礎數據庫驅動程序,使您能夠輕松傳遞特定于供應商的參數。

例如,對于PostgreSQL,通常需要指定 host , userpassword 創建連接時。這些不是標準的 Peewee Database 參數,因此它們將直接傳遞回 psycopg2 創建連接時:

db = PostgresqlDatabase(
    'database_name',  # Required by Peewee.
    user='postgres',  # Will be passed directly to psycopg2.
    password='secret',  # Ditto.
    host='db.mysite.com')  # Ditto.

另一個例子是 pymysql 駕駛員接受 charset 不是標準Peewee的參數 Database 參數。要設置此值,只需傳入 charset 除了你的其他價值觀:

db = MySQLDatabase('database_name', user='www-data', charset='utf8mb4')

有關可用參數,請參閱數據庫驅動程序的文檔:

使用PostgreSQL?

要連接到PostgreSQL數據庫,我們將使用 PostgresqlDatabase . 第一個參數始終是數據庫的名稱,之后可以指定任意 psycopg2 parameters .

psql_db = PostgresqlDatabase('my_database', user='postgres')

class BaseModel(Model):
    """A base model that will use our Postgresql database"""
    class Meta:
        database = psql_db

class User(BaseModel):
    username = CharField()

這個 Playhouse,擴展到Peewee 包含一個 Postgresql extension module 它提供了許多Postgres特有的功能,例如:

如果您想使用這些出色的功能,請使用 PostgresqlExtDatabaseplayhouse.postgres_ext 模塊:

from playhouse.postgres_ext import PostgresqlExtDatabase

psql_db = PostgresqlExtDatabase('my_database', user='postgres')

隔離級別?

自Peewee 3.9.7起,可以使用中的符號常量將隔離級別指定為初始化參數。 psycopg2.extensions

from psycopg2.extensions import ISOLATION_LEVEL_SERIALIZABLE

db = PostgresqlDatabase('my_app', user='postgres', host='db-host',
                        isolation_level=ISOLATION_LEVEL_SERIALIZABLE)

備注

在舊版本中,可以手動設置基礎psycopg2連接的隔離級別。這可以一次性完成:

db = PostgresqlDatabase(...)
conn = db.connection()  # returns current connection.

from psycopg2.extensions import ISOLATION_LEVEL_SERIALIZABLE
conn.set_isolation_level(ISOLATION_LEVEL_SERIALIZABLE)

要在每次創建連接時運行此函數,請對 _initialize_database() 鉤子,設計用于此目的:

class SerializedPostgresqlDatabase(PostgresqlDatabase):
    def _initialize_connection(self, conn):
        conn.set_isolation_level(ISOLATION_LEVEL_SERIALIZABLE)

使用CockroachDB?

使用 CockroachDatabase 數據庫類,在中定義 playhouse.cockroachdb

from playhouse.cockroachdb import CockroachDatabase

db = CockroachDatabase('my_app', user='root', port=26257, host='localhost')

如果您使用的是 Cockroach Cloud ,您可能會發現使用連接字符串指定連接參數更容易:

db = CockroachDatabase('postgresql://root:secret@host:26257/defaultdb...')

備注

CockroachDB需要 psycopg2 (Postgres)Python驅動程序。

備注

CockroachDB安裝和入門指南可在此處找到:https://www.cockroachlabs.com/docs/stable/install-cockroachdb.html

CRDB提供客戶端事務重試,使用特殊的 CockroachDatabase.run_transaction() 輔助方法。此方法接受callable,它負責執行任何可能需要重試的事務語句。

最簡單的例子 run_transaction()

def create_user(email):
    # Callable that accepts a single argument (the database instance) and
    # which is responsible for executing the transactional SQL.
    def callback(db_ref):
        return User.create(email=email)

    return db.run_transaction(callback, max_attempts=10)

huey = create_user('huey@example.com')

備注

這個 cockroachdb.ExceededMaxAttempts 如果在給定的嘗試次數后無法提交事務,將引發異常。如果SQL格式錯誤、違反約束等,則函數將向調用方引發異常。

有關詳細信息,請參閱:

使用SQLite?

要連接到sqlite數據庫,我們將使用 SqliteDatabase . 第一個參數是包含數據庫的文件名或字符串 ':memory:' 創建內存中的數據庫。在數據庫文件名之后,可以指定一個列表或pragma或任何其他任意的 sqlite3 parameters .

sqlite_db = SqliteDatabase('my_app.db', pragmas={'journal_mode': 'wal'})

class BaseModel(Model):
    """A base model that will use our Sqlite database."""
    class Meta:
        database = sqlite_db

class User(BaseModel):
    username = TextField()
    # etc, etc

Peewee包括 SQLite extension module 它提供了許多特定于sqlite的功能,例如 full-text search , json extension support 還有更多。如果您想使用這些出色的功能,請使用 SqliteExtDatabaseplayhouse.sqlite_ext 模塊:

from playhouse.sqlite_ext import SqliteExtDatabase

sqlite_db = SqliteExtDatabase('my_app.db', pragmas={
    'journal_mode': 'wal',  # WAL-mode.
    'cache_size': -64 * 1000,  # 64MB cache.
    'synchronous': 0})  # Let the OS manage syncing.

pragma語句?

sqlite允許通過 PRAGMA 聲明( SQLite documentation )。這些語句通常在創建新的數據庫連接時運行。運行一個或多個 PRAGMA 針對新連接的語句,可以將其指定為字典或包含pragma名稱和值的2個元組的列表:

db = SqliteDatabase('my_app.db', pragmas={
    'journal_mode': 'wal',
    'cache_size': 10000,  # 10000 pages, or ~40MB
    'foreign_keys': 1,  # Enforce foreign-key constraints
})

pragma也可以使用 pragma() 方法或暴露在 SqliteDatabase 對象:

# Set cache size to 64MB for *current connection*.
db.pragma('cache_size', -1024 * 64)

# Same as above.
db.cache_size = -1024 * 64

# Read the value of several pragmas:
print('cache_size:', db.cache_size)
print('foreign_keys:', db.foreign_keys)
print('journal_mode:', db.journal_mode)
print('page_size:', db.page_size)

# Set foreign_keys pragma on current connection *AND* on all
# connections opened subsequently.
db.pragma('foreign_keys', 1, permanent=True)

注意

使用 pragma() 默認情況下,方法在連接關閉后不持久。要將pragma配置為在每次打開連接時運行,請指定 permanent=True .

備注

在sqlite文檔中可以找到pragma設置的完整列表、它們的含義和接受的值:http://sqlite.org/pragma.html

用戶定義函數?

可以使用用戶定義的python代碼擴展sqlite。這個 SqliteDatabase 類支持三種類型的用戶定義擴展:

  • 函數-獲取任意數量的參數并返回單個值。

  • 聚合-從多行聚合參數并返回單個值。

  • 排序規則-描述如何對某個值排序。

備注

有關更多擴展支持,請參見 SqliteExtDatabase ,在 playhouse.sqlite_ext 模塊。

用戶定義函數示例:

db = SqliteDatabase('analytics.db')

from urllib.parse import urlparse

@db.func('hostname')
def hostname(url):
    if url is not None:
        return urlparse(url).netloc

# Call this function in our code:
# The following finds the most common hostnames of referrers by count:
query = (PageView
         .select(fn.hostname(PageView.referrer), fn.COUNT(PageView.id))
         .group_by(fn.hostname(PageView.referrer))
         .order_by(fn.COUNT(PageView.id).desc()))

用戶定義聚合示例:

from hashlib import md5

@db.aggregate('md5')
class MD5Checksum(object):
    def __init__(self):
        self.checksum = md5()

    def step(self, value):
        self.checksum.update(value.encode('utf-8'))

    def finalize(self):
        return self.checksum.hexdigest()

# Usage:
# The following computes an aggregate MD5 checksum for files broken
# up into chunks and stored in the database.
query = (FileChunk
         .select(FileChunk.filename, fn.MD5(FileChunk.data))
         .group_by(FileChunk.filename)
         .order_by(FileChunk.filename, FileChunk.sequence))

排序規則示例:

@db.collation('ireverse')
def collate_reverse(s1, s2):
    # Case-insensitive reverse.
    s1, s2 = s1.lower(), s2.lower()
    return (s1 < s2) - (s1 > s2)  # Equivalent to -cmp(s1, s2)

# To use this collation to sort books in reverse order...
Book.select().order_by(collate_reverse.collation(Book.title))

# Or...
Book.select().order_by(Book.title.asc(collation='reverse'))

用戶定義的表值函數示例(請參見 TableFunctiontable_function )更多詳情:

from playhouse.sqlite_ext import TableFunction

db = SqliteDatabase('my_app.db')

@db.table_function('series')
class Series(TableFunction):
    columns = ['value']
    params = ['start', 'stop', 'step']

    def initialize(self, start=0, stop=None, step=1):
        """
        Table-functions declare an initialize() method, which is
        called with whatever arguments the user has called the
        function with.
        """
        self.start = self.current = start
        self.stop = stop or float('Inf')
        self.step = step

    def iterate(self, idx):
        """
        Iterate is called repeatedly by the SQLite database engine
        until the required number of rows has been read **or** the
        function raises a `StopIteration` signalling no more rows
        are available.
        """
        if self.current > self.stop:
            raise StopIteration

        ret, self.current = self.current, self.current + self.step
        return (ret,)

# Usage:
cursor = db.execute_sql('SELECT * FROM series(?, ?, ?)', (0, 5, 2))
for value, in cursor:
    print(value)

# Prints:
# 0
# 2
# 4

有關詳細信息,請參閱:

設置事務的鎖定模式?

可以以三種不同的模式打開SQLite事務:

  • Deferred默認)-僅在執行讀或寫時獲取鎖。第一次讀取創建 shared lock 第一次寫會創建一個 reserved lock . 由于鎖的獲取被延遲到實際需要時,另一個線程或進程可能會創建一個單獨的事務,并在當前線程上的begin執行之后寫入數據庫。

  • Immediate -A reserved lock 立即獲得。在此模式下,其他數據庫不能寫入數據庫或打開 immediateexclusive 交易。但是,其他進程可以繼續從數據庫中讀取數據。

  • Exclusive -打開一個 exclusive lock 這將阻止所有(除了讀取未提交的)連接在事務完成之前訪問數據庫。

指定鎖定模式的示例:

db = SqliteDatabase('app.db')

with db.atomic('EXCLUSIVE'):
    do_something()


@db.atomic('IMMEDIATE')
def some_other_function():
    # This function is wrapped in an "IMMEDIATE" transaction.
    do_something_else()

有關詳細信息,請參見sqlite locking documentation. To learn more about transactions in Peewee, see the 管理交易 文檔。

高級sqlite驅動程序apsw?

Peewee還附帶了一個備用的SQLite數據庫,該數據庫使用 高級sqlite驅動程序apsw 。有關APSW的更多信息,請訪問 APSW project website 。APSW提供以下特殊功能:

  • 虛擬表、虛擬文件系統、Blob I/O、備份和文件控制。

  • 連接可以跨線程共享,而無需任何附加鎖定。

  • 事務由代碼顯式管理。

  • 處理Unicode 正確地.

  • APSW比標準庫sqlite3模塊更快。

  • 向您的python應用程序公開幾乎所有的sqlite c API。

如果要使用APSW,請使用 APSWDatabaseapsw_ext 模塊:

from playhouse.apsw_ext import APSWDatabase

apsw_db = APSWDatabase('my_app.db')

使用MySQL?

要連接到MySQL數據庫,我們將使用 MySQLDatabase 。在數據庫名稱之后,您可以指定將傳遞回驅動程序的任意連接參數(例如 pymysqlmysqlclient )。

mysql_db = MySQLDatabase('my_database')

class BaseModel(Model):
    """A base model that will use our MySQL database"""
    class Meta:
        database = mysql_db

class User(BaseModel):
    username = CharField()
    # etc, etc

驅動程序信息:

  • pymysql 是一個純PYTHON的MySQL客戶端,支持PYTHON2和PYTHON3。peewee會先嘗試使用pymysql。

  • mysqlclient 使用c擴展并支持python3。它公開了一個 MySQLdb 模塊。如果未安裝pymysql,Peewee將嘗試使用此模塊。

  • mysql-python 也被稱為 MySQLdb1 而且是遺留下來的,不應該使用。因為它與mysqlclient共享相同的模塊名稱,所以同樣適用。

  • mysql-connector python 純 Python (我想??)支持python3。要使用此驅動程序,您可以使用 MySQLConnectorDatabaseplayhouse.mysql_ext 分機。

錯誤2006:MySQL服務器已離開?

當MySQL終止一個空閑的數據庫連接時,可能會發生這個特定的錯誤。這通常發生在不顯式管理數據庫連接的Web應用程序上。發生的情況是應用程序啟動,連接打開以處理執行的第一個查詢,并且,由于該連接從未關閉,它將保持打開狀態,等待更多查詢。

要解決此問題,請確保在需要執行查詢時顯式連接到數據庫,并在完成后關閉連接。在Web應用程序中,這通常意味著您將在請求進入時打開連接,并在返回響應時關閉連接。

框架集成 有關配置通用Web框架以管理數據庫連接的示例的部分。

使用數據庫URL連接?

Playhouse模塊 數據庫URL 提供幫助者 connect() 接受數據庫URL并返回 Database 實例。

示例代碼:

import os

from peewee import *
from playhouse.db_url import connect

# Connect to the database URL defined in the environment, falling
# back to a local Sqlite database if no database URL is specified.
db = connect(os.environ.get('DATABASE') or 'sqlite:///default.db')

class BaseModel(Model):
    class Meta:
        database = db

數據庫URL示例:

運行時數據庫配置?

有時數據庫連接設置直到運行時才知道,此時可以從配置文件或環境加載這些值。在這種情況下,你可以 defer 通過指定 None 作為數據庫的名稱。

database = PostgresqlDatabase(None)  # Un-initialized database.

class SomeModel(Model):
    class Meta:
        database = database

如果在數據庫未初始化時嘗試連接或發出任何查詢,將出現異常:

>>> database.connect()
Exception: Error, database not properly initialized before opening connection

要初始化數據庫,請調用 init() 具有數據庫名稱和任何其他關鍵字參數的方法:

database_name = input('What is the name of the db? ')
database.init(database_name, host='localhost', user='postgres')

有關初始化數據庫的更多控制,請參閱下一節, 動態定義數據庫 .

動態定義數據庫?

為了更好地控制數據庫的定義/初始化方式,可以使用 DatabaseProxy 幫手。 DatabaseProxy 對象充當占位符,然后在運行時可以將其替換為其他對象。在下面的示例中,我們將根據應用程序的配置方式交換數據庫:

database_proxy = DatabaseProxy()  # Create a proxy for our db.

class BaseModel(Model):
    class Meta:
        database = database_proxy  # Use proxy for our DB.

class User(BaseModel):
    username = CharField()

# Based on configuration, use a different database.
if app.config['DEBUG']:
    database = SqliteDatabase('local.db')
elif app.config['TESTING']:
    database = SqliteDatabase(':memory:')
else:
    database = PostgresqlDatabase('mega_production_db')

# Configure our proxy to use the db we specified in config.
database_proxy.initialize(database)

警告

只有當實際的數據庫驅動程序在運行時發生變化時才使用此方法。例如,如果測試和本地開發環境在sqlite上運行,但部署的應用程序使用PostgreSQL,則可以使用 DatabaseProxy 在運行時更換發動機。

但是,如果只有連接值在運行時發生變化,例如到數據庫文件的路徑或數據庫主機的路徑,則應改為使用 Database.init() . 見 運行時數據庫配置 了解更多詳細信息。

備注

可能更容易避免使用 DatabaseProxy 而是使用 Database.bind() 以及設置或更改數據庫的相關方法。參見 在運行時設置數據庫 有關詳細信息。

在運行時設置數據庫?

我們已經看到了三種使用peewee配置數據庫的方法:

# The usual way:
db = SqliteDatabase('my_app.db', pragmas={'journal_mode': 'wal'})


# Specify the details at run-time:
db = SqliteDatabase(None)
...
db.init(db_filename, pragmas={'journal_mode': 'wal'})


# Or use a placeholder:
db = DatabaseProxy()
...
db.initialize(SqliteDatabase('my_app.db', pragmas={'journal_mode': 'wal'}))

Peewee還可以為模型類設置或更改數據庫。Peewee測試套件使用此技術在運行測試時將測試模型類綁定到各種數據庫實例。

有兩套互補的方法:

例如,我們將聲明兩個模型 沒有 指定任何數據庫:

class User(Model):
    username = TextField()

class Tweet(Model):
    user = ForeignKeyField(User, backref='tweets')
    content = TextField()
    timestamp = TimestampField()

在運行時將模型綁定到數據庫:

postgres_db = PostgresqlDatabase('my_app', user='postgres')
sqlite_db = SqliteDatabase('my_app.db')

# At this point, the User and Tweet models are NOT bound to any database.

# Let's bind them to the Postgres database:
postgres_db.bind([User, Tweet])

# Now we will temporarily bind them to the sqlite database:
with sqlite_db.bind_ctx([User, Tweet]):
    # User and Tweet are now bound to the sqlite database.
    assert User._meta.database is sqlite_db

# User and Tweet are once again bound to the Postgres database.
assert User._meta.database is postgres_db

這個 Model.bind()Model.bind_ctx() 方法對綁定給定模型類的作用相同:

# Bind the user model to the sqlite db. By default, Peewee will also
# bind any models that are related to User via foreign-key as well.
User.bind(sqlite_db)

assert User._meta.database is sqlite_db
assert Tweet._meta.database is sqlite_db  # Related models bound too.

# Here we will temporarily bind *just* the User model to the postgres db.
with User.bind_ctx(postgres_db, bind_backrefs=False):
    assert User._meta.database is postgres_db
    assert Tweet._meta.database is sqlite_db  # Has not changed.

# And now User is back to being bound to the sqlite_db.
assert User._meta.database is sqlite_db

這個 測試Peewee應用程序 本文檔的部分還包含一些使用 bind() 方法。

線程安全與多數據庫?

如果您計劃在多線程應用程序中的運行時更改數據庫,則將模型的數據庫存儲在本地線程中將防止爭用條件。這可以通過自定義模型來實現 Metadata 類(請參見 ThreadSafeDatabaseMetadata ,包含在 playhouse.shortcuts ):

from peewee import *
from playhouse.shortcuts import ThreadSafeDatabaseMetadata

class BaseModel(Model):
    class Meta:
        # Instruct peewee to use our thread-safe metadata implementation.
        model_metadata_class = ThreadSafeDatabaseMetadata

現在可以使用熟悉的在多線程環境中運行時安全地交換數據庫 Database.bind()Database.bind_ctx() 方法。

連接管理?

要打開到數據庫的連接,請使用 Database.connect() 方法:

>>> db = SqliteDatabase(':memory:')  # In-memory SQLite database.
>>> db.connect()
True

如果我們試著調用 connect() 在一個已經打開的數據庫中,我們得到一個 OperationalError

>>> db.connect()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/charles/pypath/peewee.py", line 2390, in connect
    raise OperationalError('Connection already opened.')
peewee.OperationalError: Connection already opened.

為了防止引發此異常,我們可以調用 connect() 再加上一個論點, reuse_if_open

>>> db.close()  # Close connection.
True
>>> db.connect()
True
>>> db.connect(reuse_if_open=True)
False

注意,呼叫 connect() 收益率 False 如果數據庫連接已打開。

要關閉連接,請使用 Database.close() 方法:

>>> db.close()
True

調用 close() 在已關閉的連接上,不會導致異常,但會返回 False

>>> db.connect()  # Open connection.
True
>>> db.close()  # Close connection.
True
>>> db.close()  # Connection already closed, returns False.
False

您可以使用 Database.is_closed() 方法:

>>> db.is_closed()
True

使用自動連接?

如果數據庫初始化為 autoconnect=True (默認)。顯式管理連接被視為 最佳實踐 因此,您可以考慮禁用 autoconnect 行為。

明確您的連接生命周期是非常有幫助的。例如,如果連接失敗,則在打開連接時將捕獲異常,而不是在執行查詢后的某個任意時間。此外,如果使用 connection pool ,有必要打電話 connect()close() 以確保正確回收連接。

為確保正確性,請禁用 autoconnect

db = PostgresqlDatabase('my_app', user='postgres', autoconnect=False)

線程安全性?

peewee使用線程本地存儲跟蹤連接狀態,使peewee Database 對象可以安全地與多個線程一起使用。每個線程都有自己的連接,因此任何給定的線程在給定的時間只能有一個打開的連接。

上下文管理器?

數據庫對象本身可以用作上下文管理器,它在打包的代碼塊期間打開連接。此外,事務在包裝塊的開頭打開,并在連接關閉之前提交(除非發生錯誤,在這種情況下事務將回滾)。

>>> db.is_closed()
True
>>> with db:
...     print(db.is_closed())  # db is open inside context manager.
...
False
>>> db.is_closed()  # db is closed.
True

如果要單獨管理事務,可以使用 Database.connection_context() 上下文管理器。

>>> with db.connection_context():
...     # db connection is open.
...     pass
...
>>> db.is_closed()  # db connection is closed.
True

這個 connection_context() 方法也可以用作修飾器:

@db.connection_context()
def prepare_database():
    # DB connection will be managed by the decorator, which opens
    # a connection, calls function, and closes upon returning.
    db.create_tables(MODELS)  # Create schema.
    load_fixture_data(db)

DB-API連接對象?

要獲取對基礎DB-API 2.0連接的引用,請使用 Database.connection() 方法。此方法將返回當前打開的連接對象(如果存在),否則將打開新的連接。

>>> db.connection()
<sqlite3.Connection object at 0x7f94e9362f10>

連接池?

連接池由 pool module ,包括在 playhouse 擴展庫。池支持:

  • 超時后將回收連接。

  • 打開的連接數的上限。

from playhouse.pool import PooledPostgresqlExtDatabase

db = PooledPostgresqlExtDatabase(
    'my_database',
    max_connections=8,
    stale_timeout=300,
    user='postgres')

class BaseModel(Model):
    class Meta:
        database = db

以下集合數據庫類可用:

有關Peewee連接池的深入討論,請參見 連接池 剖面圖 playhouse 文檔。

測試Peewee應用程序?

當為使用peewee的應用程序編寫測試時,可能需要使用特殊的測試數據庫。另一種常見的做法是對干凈的數據庫運行測試,這意味著確保在每個測試開始時表是空的。

要在運行時將模型綁定到數據庫,可以使用以下方法:

  • Database.bind_ctx() 返回一個上下文管理器,該管理器將給定模型綁定到包裝塊期間的數據庫實例。

  • Model.bind_ctx() ,它同樣返回一個上下文管理器,該管理器在包裝塊期間將模型(及其依賴項)綁定到給定的數據庫。

  • Database.bind() 這是一個一次性操作,將模型(及其依賴項)綁定到給定的數據庫。

  • Model.bind() 這是一個一次性操作,將模型(及其依賴項)綁定到給定的數據庫。

根據您的用例,這些選項中的一個可能更有意義。對于下面的示例,我將使用 Model.bind() .

測試用例設置示例:

# tests.py
import unittest
from my_app.models import EventLog, Relationship, Tweet, User

MODELS = [User, Tweet, EventLog, Relationship]

# use an in-memory SQLite for tests.
test_db = SqliteDatabase(':memory:')

class BaseTestCase(unittest.TestCase):
    def setUp(self):
        # Bind model classes to test db. Since we have a complete list of
        # all models, we do not need to recursively bind dependencies.
        test_db.bind(MODELS, bind_refs=False, bind_backrefs=False)

        test_db.connect()
        test_db.create_tables(MODELS)

    def tearDown(self):
        # Not strictly necessary since SQLite in-memory databases only live
        # for the duration of the connection, and in the next step we close
        # the connection...but a good practice all the same.
        test_db.drop_tables(MODELS)

        # Close connection to db.
        test_db.close()

        # If we wanted, we could re-bind the models to their original
        # database here. But for tests this is probably not necessary.

除此之外,根據經驗,我建議您使用生產中使用的相同數據庫后端測試應用程序,以避免任何潛在的兼容性問題。

如果您想看到更多關于如何使用peewee運行測試的示例,請查看peewee自己的 test-suite .

與gevent異步?

gevent 建議使用PostgreSQL或MySQL進行異步I/O。我喜歡GEvent的原因:

  • 無需特殊用途的“循環感知”重新實現 一切. 使用Asyncio的第三方庫通常必須重新實現代碼的層和層,以及重新實現協議本身。

  • gevent允許您用普通、干凈、慣用的Python編寫應用程序。不需要亂丟每一行“異步”、“等待”和其他雜音。沒有回電,未來,任務,承諾。沒有污點。

  • gevent與python 2一起工作 and Python 3。

  • 蓋特斯群島 Python 的. Asyncio是一個不喜歡吃 Python 的人。

除猴子修補插座外,如果您使用 MySQL 使用純python驅動程序 pymysql 或正在使用 mysql-connector 在純Python模式下。用C語言編寫的MySQL驅動程序需要特殊的配置,這超出了本文檔的范圍。

為了 Postgrespsycopg2 ,這是C擴展,您可以使用以下代碼段注冊將使您的連接異步的事件掛鉤:

from gevent.socket import wait_read, wait_write
from psycopg2 import extensions

# Call this function after monkey-patching socket (etc).
def patch_psycopg2():
    extensions.set_wait_callback(_psycopg2_gevent_callback)

def _psycopg2_gevent_callback(conn, timeout=None):
    while True:
        state = conn.poll()
        if state == extensions.POLL_OK:
            break
        elif state == extensions.POLL_READ:
            wait_read(conn.fileno(), timeout=timeout)
        elif state == extensions.POLL_WRITE:
            wait_write(conn.fileno(), timeout=timeout)
        else:
            raise ValueError('poll() returned unexpected result')

數據庫, 因為它是嵌入在Python應用程序本身中的,所以不執行任何套接字操作,這將是非阻塞的候選操作。異步對SQLite數據庫沒有任何影響。

框架集成?

對于Web應用程序,通常在收到請求時打開連接,在傳遞響應時關閉連接。在這一節中,我將描述如何向Web應用添加鉤子,以確保正確處理數據庫連接。

這些步驟將確保無論您使用的是簡單的sqlite數據庫還是多個postgres連接池,peewee都能正確處理這些連接。

備注

接收大量流量的應用程序可能會從使用 connection pool 減少每次請求時建立和斷開連接的成本。

Flask?

Flask和peewee是一個很好的組合,我可以選擇任何規模的項目。Flask提供兩個鉤子,我們將使用它們來打開和關閉DB連接。我們將在收到請求時打開連接,然后在返回響應時關閉連接。

from flask import Flask
from peewee import *

database = SqliteDatabase('my_app.db')
app = Flask(__name__)

# This hook ensures that a connection is opened to handle any queries
# generated by the request.
@app.before_request
def _db_connect():
    database.connect()

# This hook ensures that the connection is closed when we've finished
# processing the request.
@app.teardown_request
def _db_close(exc):
    if not database.is_closed():
        database.close()

丹戈?

雖然Peewee和Django一起使用不太常見,但實際上很容易使用這兩者。為了管理與Django的Peewee數據庫連接,我認為最簡單的方法是向應用程序添加中間件。中間件應該是中間件列表中的第一個,以確保它在處理請求時首先運行,在返回響應時最后運行。

如果你有一個叫Django的項目 my_blog Peewee數據庫在模塊中定義。 my_blog.db ,您可以添加以下中間件類:

# middleware.py
from my_blog.db import database  # Import the peewee database instance.


def PeeweeConnectionMiddleware(get_response):
    def middleware(request):
        database.connect()
        try:
            response = get_response(request)
        finally:
            if not database.is_closed():
                database.close()
        return response
    return middleware


# Older Django < 1.10 middleware.
class PeeweeConnectionMiddleware(object):
    def process_request(self, request):
        database.connect()

    def process_response(self, request, response):
        if not database.is_closed():
            database.close()
        return response

為了確保中間件得到執行,請將其添加到 settings 模塊:

# settings.py
MIDDLEWARE_CLASSES = (
    # Our custom middleware appears first in the list.
    'my_blog.middleware.PeeweeConnectionMiddleware',

    # These are the default Django 1.7 middlewares. Yours may differ,
    # but the important this is that our Peewee middleware comes first.
    'django.middleware.common.CommonMiddleware',
    'django.contrib.sessions.middleware.SessionMiddleware',
    'django.middleware.csrf.CsrfViewMiddleware',
    'django.contrib.auth.middleware.AuthenticationMiddleware',
    'django.contrib.messages.middleware.MessageMiddleware',
)

# ... other Django settings ...

Boottle?

我自己沒有使用瓶子,但查看文檔時,我認為以下代碼應該確保數據庫連接得到正確管理:

# app.py
from bottle import hook  #, route, etc, etc.
from peewee import *

db = SqliteDatabase('my-bottle-app.db')

@hook('before_request')
def _connect_db():
    db.connect()

@hook('after_request')
def _close_db():
    if not db.is_closed():
        db.close()

# Rest of your bottle app goes here.

Web.py?

參見文檔 application processors .

db = SqliteDatabase('my_webpy_app.db')

def connection_processor(handler):
    db.connect()
    try:
        return handler()
    finally:
        if not db.is_closed():
            db.close()

app.add_processor(connection_processor)

Tornado?

看起來像龍卷風 RequestHandler 類實現了兩個鉤子,可以在處理請求時用于打開和關閉連接。

from tornado.web import RequestHandler

db = SqliteDatabase('my_db.db')

class PeeweeRequestHandler(RequestHandler):
    def prepare(self):
        db.connect()
        return super(PeeweeRequestHandler, self).prepare()

    def on_finish(self):
        if not db.is_closed():
            db.close()
        return super(PeeweeRequestHandler, self).on_finish()

在應用程序中,而不是擴展默認值 RequestHandler ,現在可以擴展 PeeweeRequestHandler .

注意,這并不能解決如何在Tornado或其他事件循環中異步使用Peewee。

Wheezy.web?

連接處理代碼可以放在 middleware .

def peewee_middleware(request, following):
    db.connect()
    try:
        response = following(request)
    finally:
        if not db.is_closed():
            db.close()
    return response

app = WSGIApplication(middleware=[
    lambda x: peewee_middleware,
    # ... other middlewares ...
])

感謝github用戶*@tuukkamustonen*提交此代碼。

Falcon?

連接處理代碼可以放在 middleware component .

import falcon
from peewee import *

database = SqliteDatabase('my_app.db')

class PeeweeConnectionMiddleware(object):
    def process_request(self, req, resp):
        database.connect()

    def process_response(self, req, resp, resource, req_succeeded):
        if not database.is_closed():
            database.close()

application = falcon.API(middleware=[
    PeeweeConnectionMiddleware(),
    # ... other middlewares ...
])

Pyramid?

設置處理數據庫連接生存期的請求工廠,如下所示:

from pyramid.request import Request

db = SqliteDatabase('pyramidapp.db')

class MyRequest(Request):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        db.connect()
        self.add_finished_callback(self.finish)

    def finish(self, request):
        if not db.is_closed():
            db.close()

在您的應用程序中 main() make sure MyRequest 用作 request_factory:

def main(global_settings, **settings):
    config = Configurator(settings=settings, ...)
    config.set_request_factory(MyRequest)

CherryPy?

Publish/Subscribe pattern .

def _db_connect():
    db.connect()

def _db_close():
    if not db.is_closed():
        db.close()

cherrypy.engine.subscribe('before_request', _db_connect)
cherrypy.engine.subscribe('after_request', _db_close)

Sanic?

在SANIC中,連接處理代碼可以放在請求和響應中間件中。 sanic middleware .

# app.py
@app.middleware('request')
async def handle_request(request):
    db.connect()

@app.middleware('response')
async def handle_response(request, response):
    if not db.is_closed():
        db.close()

快速API?

與Flask類似,FastAPI提供了兩個基于事件的鉤子,我們將使用它們來打開和關閉數據庫連接。我們將在收到請求時打開連接,然后在返回響應時關閉連接。

from fastapi import FastAPI
from peewee import *

db = SqliteDatabase('my_app.db')
app = FastAPI()

# This hook ensures that a connection is opened to handle any queries
# generated by the request.
@app.on_event("startup")
def startup():
    db.connect()


# This hook ensures that the connection is closed when we've finished
# processing the request.
@app.on_event("shutdown")
def shutdown():
    if not db.is_closed():
        db.close()

其他框架?

在這里看不到您的框架?拜托 open a GitHub ticket 我將看到如何添加一個部分,或者更好地說,提交一個文檔請求。

正在執行查詢?

SQL查詢通常通過調用 execute() 在使用查詢生成器API構造的查詢上(或者在 Select 查詢)。對于希望直接執行SQL的情況,可以使用 Database.execute_sql() 方法。

db = SqliteDatabase('my_app.db')
db.connect()

# Example of executing a simple query and ignoring the results.
db.execute_sql("ATTACH DATABASE ':memory:' AS cache;")

# Example of iterating over the results of a query using the cursor.
cursor = db.execute_sql('SELECT * FROM users WHERE status = ?', (ACTIVE,))
for row in cursor.fetchall():
    # Do something with row, which is a tuple containing column data.
    pass

管理交易?

Peewee提供了幾個處理事務的接口。最普遍的是 Database.atomic() 方法,它還支持嵌套事務。 atomic() 塊將在事務或保存點中運行,具體取決于嵌套的級別。

如果包裝塊中發生異常,則當前事務/保存點將回滾。否則,語句將在包裝塊的末尾提交。

備注

而在一塊被 atomic() 上下文管理器,您可以通過調用 Transaction.rollback()Transaction.commit() . 當您在包裝好的代碼塊內執行此操作時,將自動啟動新事務。

with db.atomic() as transaction:  # Opens new transaction.
    try:
        save_some_objects()
    except ErrorSavingData:
        # Because this block of code is wrapped with "atomic", a
        # new transaction will begin automatically after the call
        # to rollback().
        transaction.rollback()
        error_saving = True

    create_report(error_saving=error_saving)
    # Note: no need to call commit. Since this marks the end of the
    # wrapped block of code, the `atomic` context manager will
    # automatically call commit for us.

備注

atomic() 可以用作 context manager 或A 裝飾者.

上下文管理器?

使用 atomic 作為上下文管理器:

db = SqliteDatabase(':memory:')

with db.atomic() as txn:
    # This is the outer-most level, so this block corresponds to
    # a transaction.
    User.create(username='charlie')

    with db.atomic() as nested_txn:
        # This block corresponds to a savepoint.
        User.create(username='huey')

        # This will roll back the above create() query.
        nested_txn.rollback()

    User.create(username='mickey')

# When the block ends, the transaction is committed (assuming no error
# occurs). At that point there will be two users, "charlie" and "mickey".

你可以使用 atomic 執行方法 get or create 操作也是如此:

try:
    with db.atomic():
        user = User.create(username=username)
    return 'Success'
except peewee.IntegrityError:
    return 'Failure: %s is already in use.' % username

裝飾者?

使用 atomic 作為裝飾者:

@db.atomic()
def create_user(username):
    # This statement will run in a transaction. If the caller is already
    # running in an `atomic` block, then a savepoint will be used instead.
    return User.create(username=username)

create_user('charlie')

嵌套事務?

atomic() 提供事務的透明嵌套。使用時 atomic() ,最外部的調用將包裝在事務中,并且任何嵌套調用都將使用保存點。

with db.atomic() as txn:
    perform_operation()

    with db.atomic() as nested_txn:
        perform_another_operation()

Peewee通過使用保存點支持嵌套事務(有關詳細信息,請參閱 savepoint()

顯式事務?

如果希望在事務中顯式運行代碼,可以使用 transaction() . 喜歡 atomic() , transaction() 可以用作上下文管理器或裝飾器。

如果包裝塊中發生異常,則事務將回滾。否則,語句將在包裝塊的末尾提交。

db = SqliteDatabase(':memory:')

with db.transaction() as txn:
    # Delete the user and their associated tweets.
    user.delete_instance(recursive=True)

事務可以在包裝的塊中顯式提交或回滾。發生這種情況時,將啟動一個新事務。

with db.transaction() as txn:
    User.create(username='mickey')
    txn.commit()  # Changes are saved and a new transaction begins.
    User.create(username='huey')

    # Roll back. "huey" will not be saved, but since "mickey" was already
    # committed, that row will remain in the database.
    txn.rollback()

with db.transaction() as txn:
    User.create(username='whiskers')
    # Roll back changes, which removes "whiskers".
    txn.rollback()

    # Create a new row for "mr. whiskers" which will be implicitly committed
    # at the end of the `with` block.
    User.create(username='mr. whiskers')

備注

如果嘗試使用peewee嵌套事務,請使用 transaction() 上下文管理器,只使用最外部的事務。但是,如果嵌套塊中發生異常,這可能會導致不可預測的行為,因此強烈建議您使用 atomic() .

顯式保存點?

正如可以顯式創建事務一樣,也可以使用 savepoint() 方法。保存點必須出現在事務中,但可以任意深度嵌套。

with db.transaction() as txn:
    with db.savepoint() as sp:
        User.create(username='mickey')

    with db.savepoint() as sp2:
        User.create(username='zaizee')
        sp2.rollback()  # "zaizee" will not be saved, but "mickey" will be.

警告

如果手動提交或回滾保存點,則為新的保存點 will not 自動創建。這與 transaction ,這將在手動提交/回滾后自動打開新事務。

自動提交模式?

默認情況下,Peewee在 自動提交模式, 這樣,在事務外部執行的任何語句都在自己的事務中運行。要將多個語句分組為一個事務,peewee提供 atomic() 上下文管理器/裝飾器。這應該涵蓋所有用例,但在不太可能的情況下,您希望暫時完全禁用Peewee的事務管理,您可以使用 Database.manual_commit() 上下文管理器/裝飾器。

以下是您如何模擬 transaction() 上下文管理器:

with db.manual_commit():
    db.begin()  # Have to begin transaction explicitly.
    try:
        user.delete_instance(recursive=True)
    except:
        db.rollback()  # Rollback! An error occurred.
        raise
    else:
        try:
            db.commit()  # Commit changes.
        except:
            db.rollback()
            raise

再說一遍——我沒料到會有人需要這個,但它在這里以防萬一。

數據庫錯誤?

python db-api 2.0規范描述了 several types of exceptions . 因為大多數數據庫驅動程序都有它們自己的異常實現,Peewee通過在任何特定于實現的異常類周圍提供自己的包裝器來簡化這些事情。這樣,您就不必擔心導入任何特殊的異常類,只需使用peewee中的類:

  • DatabaseError

  • DataError

  • IntegrityError

  • InterfaceError

  • InternalError

  • NotSupportedError

  • OperationalError

  • ProgrammingError

備注

所有這些錯誤類都擴展 PeeweeException .

日志查詢?

所有查詢都記錄到 peewee 使用標準庫的命名空間 logging 模塊。查詢記錄使用 DEBUG 水平。如果您對查詢感興趣,可以簡單地注冊一個處理程序。

# Print all queries to stderr.
import logging
logger = logging.getLogger('peewee')
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.DEBUG)

添加新的數據庫驅動程序?

Peewee內置了對Postgres、MySQL和SQLite的支持。這些數據庫非常流行,可以從快速、可嵌入的數據庫到適合大規模部署的重量級服務器。也就是說,如果驅動程序支持 DB-API 2.0 spec .

如果您已經使用了標準庫sqlite3驅動程序、psycopg2等,那么DB-API 2.0規范應該對您很熟悉。Peewee目前依賴于以下幾個部分:

  • Connection.commit

  • Connection.execute

  • Connection.rollback

  • Cursor.description

  • Cursor.fetchone

這些方法通常包含在更高級別的抽象中,并由 Database 所以,即使你的司機沒有做到這些,你仍然可以得到很多里程的peewee。一個例子是 apsw sqlite driver 在“Playhouse”模塊中。

第一件事是提供 Database 這將打開連接。

from peewee import Database
import foodb  # Our fictional DB-API 2.0 driver.


class FooDatabase(Database):
    def _connect(self, database, **kwargs):
        return foodb.connect(database, **kwargs)

這個 Database 提供更高級別的API,負責執行查詢、創建表和索引,以及自省數據庫以獲取表列表。上面的實現是所需的絕對最小值,盡管有些功能不起作用——為了獲得最佳結果,您需要另外添加一個方法,用于從數據庫中提取表的表和索引列表。我們會假裝的 FooDB 與MySQL非常相似,并且有特殊的“show”語句:

class FooDatabase(Database):
    def _connect(self):
        return foodb.connect(self.database, **self.connect_params)

    def get_tables(self):
        res = self.execute('SHOW TABLES;')
        return [r[0] for r in res.fetchall()]

這里不介紹的數據庫處理的其他內容包括:

  • last_insert_id() and rows_affected()

  • paramquote 它告訴SQL生成代碼如何添加參數占位符和引用實體名稱。

  • field_types 用于將int或text等數據類型映射到其供應商特定的類型名稱。

  • operations 用于將“like/ilike”等操作映射到其等效數據庫

參考 Database API參考或 source code . 詳情。

備注

如果您的驅動程序符合DB-API 2.0規范,那么就不需要太多的工作來啟動和運行。

我們的新數據庫可以像其他任何數據庫子類那樣使用:

from peewee import *
from foodb_ext import FooDatabase

db = FooDatabase('my_database', user='foo', password='secret')

class BaseModel(Model):
    class Meta:
        database = db

class Blog(BaseModel):
    title = CharField()
    contents = TextField()
    pub_date = DateTimeField()