查詢?

本節將介紹通常在關系數據庫上執行的基本CRUD操作:

備注

還有大量示例查詢,這些查詢來自 Postgresql Exercises 網站。示例列在 query examples 文件。

創建新記錄?

你可以使用 Model.create() 創建新模型實例。此方法接受關鍵字參數,其中鍵對應于模型字段的名稱。將返回一個新實例,并向表中添加一行。

>>> User.create(username='Charlie')
<__main__.User object at 0x2529350>

本遺囑 INSERT 數據庫中的新行。主鍵將自動檢索并存儲在模型實例上。

或者,可以通過編程構建模型實例,然后調用 save()

>>> user = User(username='Charlie')
>>> user.save()  # save() returns the number of rows modified.
1
>>> user.id
1
>>> huey = User()
>>> huey.username = 'Huey'
>>> huey.save()
1
>>> huey.id
2

當模型具有外鍵時,可以在創建新記錄時將模型實例直接分配給外鍵字段。

>>> tweet = Tweet.create(user=huey, message='Hello!')

還可以使用相關對象的主鍵值:

>>> tweet = Tweet.create(user=2, message='Hello again!')

如果只希望插入數據而不需要創建模型實例,則可以使用 Model.insert()

>>> User.insert(username='Mickey').execute()
3

執行插入查詢后,將返回新行的主鍵。

備注

有幾種方法可以加速大容量插入操作。退房 插入大量數據 配方部分了解更多信息。

插入大量數據?

有幾種方法可以快速加載大量數據。天真的做法是 Model.create() 在循環中:

data_source = [
    {'field1': 'val1-1', 'field2': 'val1-2'},
    {'field1': 'val2-1', 'field2': 'val2-2'},
    # ...
]

for data_dict in data_source:
    MyModel.create(**data_dict)

上述方法緩慢的原因有兩個:

  1. 如果不在事務中包裝循環,則每次調用 create() 發生在它自己的事務中。那真是太慢了!

  2. 有相當多的python邏輯妨礙了您的工作,并且 InsertQuery 必須生成并解析為SQL。

  3. 這是您發送到數據庫進行解析的大量數據(以SQL的原始字節為單位)。

  4. 我們正在檢索 上次插入ID, 這會導致在某些情況下執行額外的查詢。

只需將其包裝在事務中 atomic() .

# This is much faster.
with db.atomic():
    for data_dict in data_source:
        MyModel.create(**data_dict)

上述代碼仍然受到點2、3和4的影響。我們可以通過使用 insert_many() . 此方法接受元組或字典列表,并在單個查詢中插入多行:

data_source = [
    {'field1': 'val1-1', 'field2': 'val1-2'},
    {'field1': 'val2-1', 'field2': 'val2-2'},
    # ...
]

# Fastest way to INSERT multiple rows.
MyModel.insert_many(data_source).execute()

這個 insert_many() 方法還接受行元組列表,前提是還指定了相應的字段:

# We can INSERT tuples as well...
data = [('val1-1', 'val1-2'),
        ('val2-1', 'val2-2'),
        ('val3-1', 'val3-2')]

# But we need to indicate which fields the values correspond to.
MyModel.insert_many(data, fields=[MyModel.field1, MyModel.field2]).execute()

在事務中包裝大容量插入也是一個很好的實踐:

# You can, of course, wrap this in a transaction as well:
with db.atomic():
    MyModel.insert_many(data, fields=fields).execute()

備注

當使用大容量插入時,SQLite用戶應該知道一些警告。具體來說,您的sqlite3版本必須是3.7.11.0或更高版本才能利用大容量插入API。此外,默認情況下,sqlite將SQL查詢中綁定變量的數量限制為 999 對于3.32.0之前的SQLite版本(2020-05-22)和32766(對于3.32.0之后的SQLite版本)。

批量插入行?

根據數據源中的行數,您可能需要將其分成塊。尤其是sqlite通常具有 limit of 999 or 32766 每個查詢的變量(批大小為999//行長度或32766//行長度)。

您可以編寫一個循環,將您的數據批處理成塊(在這種情況下是 strongly recommended 您使用事務處理):

# Insert rows 100 at a time.
with db.atomic():
    for idx in range(0, len(data_source), 100):
        MyModel.insert_many(data_source[idx:idx+100]).execute()

Peewee帶著 chunked() 可用于的助手函數 efficiently 將一個泛型iterable分為一系列 批處理- 尺寸:

from peewee import chunked

# Insert rows 100 at a time.
with db.atomic():
    for batch in chunked(data_source, 100):
        MyModel.insert_many(batch).execute()

選擇?

這個 Model.bulk_create() 方法的行為與 Model.insert_many() ,但它接受要插入的未保存模型實例的列表,并且可以選擇接受批處理大小參數。使用 bulk_create() 應用程序編程接口:

# Read list of usernames from a file, for example.
with open('user_list.txt') as fh:
    # Create a list of unsaved User instances.
    users = [User(username=line.strip()) for line in fh.readlines()]

# Wrap the operation in a transaction and batch INSERT the users
# 100 at a time.
with db.atomic():
    User.bulk_create(users, batch_size=100)

備注

如果您使用的是PostgreSQL(它支持 RETURNING 子句),則以前未保存的模型實例將自動填充其新的主鍵值。

此外,Peewee還提供 Model.bulk_update() ,它可以有效地更新模型列表中的一個或多個列。例如:

# First, create 3 users with usernames u1, u2, u3.
u1, u2, u3 = [User.create(username='u%s' % i) for i in (1, 2, 3)]

# Now we'll modify the user instances.
u1.username = 'u1-x'
u2.username = 'u2-y'
u3.username = 'u3-z'

# Update all three users with a single UPDATE query.
User.bulk_update([u1, u2, u3], fields=[User.username])

備注

對于大型對象列表,應指定一個合理的批處理大小并將調用包裝到 bulk_update() 具有 Database.atomic()

with database.atomic():
    User.bulk_update(list_of_users, fields=['username'], batch_size=50)

或者,您可以使用 Database.batch_commit() 幫助處理內部行塊 批處理- 調整交易規模。該方法還為PostgreSQL之外的數據庫提供了一種解決方案,當必須獲取新創建行的主鍵時。

# List of row data to insert.
row_data = [{'username': 'u1'}, {'username': 'u2'}, ...]

# Assume there are 789 items in row_data. The following code will result in
# 8 total transactions (7x100 rows + 1x89 rows).
for row in db.batch_commit(row_data, 100):
    User.create(**row)

從另一個表批量加載?

如果要批量加載的數據存儲在另一個表中,則還可以創建 INSERT 源為的查詢 SELECT 查詢。使用 Model.insert_from() 方法:

res = (TweetArchive
       .insert_from(
           Tweet.select(Tweet.user, Tweet.message),
           fields=[TweetArchive.user, TweetArchive.message])
       .execute())

上面的查詢等價于以下SQL:

INSERT INTO "tweet_archive" ("user_id", "message")
SELECT "user_id", "message" FROM "tweet";

更新現有記錄?

一旦模型實例具有主鍵,任何后續調用 save() 將導致 UPDATE 而不是另一個 INSERT. 模型的主鍵不會更改:

>>> user.save()  # save() returns the number of rows modified.
1
>>> user.id
1
>>> user.save()
>>> user.id
1
>>> huey.save()
1
>>> huey.id
2

如果要更新多個記錄,請發出 UPDATE 查詢。以下示例將更新所有 Tweet 對象,標記為 出版, 如果它們是在今天之前創建的。 Model.update() 接受關鍵字參數,其中鍵對應于模型的字段名:

>>> today = datetime.today()
>>> query = Tweet.update(is_published=True).where(Tweet.creation_date < today)
>>> query.execute()  # Returns the number of rows that were updated.
4

有關詳細信息,請參閱 Model.update() , UpdateModel.bulk_update() .

備注

如果希望了解有關執行原子更新的更多信息(例如增加列的值),請檢查 atomic update 食譜。

Atomic更新?

Peewee允許您執行原子更新。假設我們需要更新一些計數器。幼稚的方法是寫這樣的東西:

>>> for stat in Stat.select().where(Stat.url == request.url):
...     stat.counter += 1
...     stat.save()

**不要這樣做!**這不僅速度很慢,而且如果多個進程同時更新計數器,它還容易受到競爭條件的影響。

相反,您可以使用 update()

>>> query = Stat.update(counter=Stat.counter + 1).where(Stat.url == request.url)
>>> query.execute()

您可以使這些更新語句盡可能復雜。讓我們給所有員工一筆獎金,相當于他們以前的獎金加上他們工資的10%:

>>> query = Employee.update(bonus=(Employee.bonus + (Employee.salary * .1)))
>>> query.execute()  # Give everyone a bonus!

我們甚至可以使用子查詢來更新列的值。假設我們在 User 這個模型存儲了用戶發的微博數量,我們定期更新這個值。下面是如何編寫這樣的查詢:

>>> subquery = Tweet.select(fn.COUNT(Tweet.id)).where(Tweet.user == User.id)
>>> update = User.update(num_tweets=subquery)
>>> update.execute()

上衣?

Peewee為不同類型的upsert功能提供支持。對于3.24.0之前的sqlite和mysql,peewee提供 replace() ,它允許您插入一條記錄,或者在違反約束的情況下替換現有記錄。

使用示例 replace()on_conflict_replace()

class User(Model):
    username = TextField(unique=True)
    last_login = DateTimeField(null=True)

# Insert or update the user. The "last_login" value will be updated
# regardless of whether the user existed previously.
user_id = (User
           .replace(username='the-user', last_login=datetime.now())
           .execute())

# This query is equivalent:
user_id = (User
           .insert(username='the-user', last_login=datetime.now())
           .on_conflict_replace()
           .execute())

備注

除了 代替, sqlite、mysql和postgresql提供 ignore 行動(見: on_conflict_ignore() )如果您只希望插入并忽略任何潛在的約束沖突。

MySQL 支持通過 ON DUPLICATE KEY UPDATE 條款。例如:

class User(Model):
    username = TextField(unique=True)
    last_login = DateTimeField(null=True)
    login_count = IntegerField()

# Insert a new user.
User.create(username='huey', login_count=0)

# Simulate the user logging in. The login count and timestamp will be
# either created or updated correctly.
now = datetime.now()
rowid = (User
         .insert(username='huey', last_login=now, login_count=1)
         .on_conflict(
             preserve=[User.last_login],  # Use the value we would have inserted.
             update={User.login_count: User.login_count + 1})
         .execute())

在上面的示例中,我們可以根據需要安全地多次調用upsert查詢。登錄計數將自動遞增,最后一個登錄列將被更新,并且不會創建重復的行。

Postgresql and SQLite (3.24.0及更高版本)提供不同的語法,允許對哪些約束沖突應觸發沖突解決以及應更新或保留哪些值進行更精細的控制。

使用示例 on_conflict() 要執行PostgreSQL樣式的upsert(或sqlite 3.24+):

class User(Model):
    username = TextField(unique=True)
    last_login = DateTimeField(null=True)
    login_count = IntegerField()

# Insert a new user.
User.create(username='huey', login_count=0)

# Simulate the user logging in. The login count and timestamp will be
# either created or updated correctly.
now = datetime.now()
rowid = (User
         .insert(username='huey', last_login=now, login_count=1)
         .on_conflict(
             conflict_target=[User.username],  # Which constraint?
             preserve=[User.last_login],  # Use the value we would have inserted.
             update={User.login_count: User.login_count + 1})
         .execute())

在上面的示例中,我們可以根據需要安全地多次調用upsert查詢。登錄計數將自動遞增,最后一個登錄列將被更新,并且不會創建重復的行。

備注

mysql和postgresql/sqlite的主要區別在于postgresql和sqlite要求您指定 conflict_target .

下面是一個更高級的(如果是人為的)示例,使用 EXCLUDED 命名空間。這個 EXCLUDED helper允許我們引用沖突數據中的值。對于我們的示例,我們假設一個簡單的表將唯一鍵(字符串)映射為值(整數):

class KV(Model):
    key = CharField(unique=True)
    value = IntegerField()

# Create one row.
KV.create(key='k1', value=1)

# Demonstrate usage of EXCLUDED.
# Here we will attempt to insert a new value for a given key. If that
# key already exists, then we will update its value with the *sum* of its
# original value and the value we attempted to insert -- provided that
# the new value is larger than the original value.
query = (KV.insert(key='k1', value=10)
         .on_conflict(conflict_target=[KV.key],
                      update={KV.value: KV.value + EXCLUDED.value},
                      where=(EXCLUDED.value > KV.value)))

# Executing the above query will result in the following data being
# present in the "kv" table:
# (key='k1', value=11)
query.execute()

# If we attempted to execute the query *again*, then nothing would be
# updated, as the new value (10) is now less than the value in the
# original row (11).

有關詳細信息,請參閱 Insert.on_conflict()OnConflict .

刪除記錄?

要刪除單個模型實例,可以使用 Model.delete_instance() 捷徑。 delete_instance() 將刪除給定的模型實例,并且可以選擇遞歸刪除任何依賴對象(通過指定 recursive=True) .

>>> user = User.get(User.id == 1)
>>> user.delete_instance()  # Returns the number of rows deleted.
1

>>> User.get(User.id == 1)
UserDoesNotExist: instance matching query does not exist:
SQL: SELECT t1."id", t1."username" FROM "user" AS t1 WHERE t1."id" = ?
PARAMS: [1]

要刪除任意行集,可以發出 DELETE 查詢。以下內容將全部刪除 Tweet 超過一年的對象:

>>> query = Tweet.delete().where(Tweet.creation_date < one_year_ago)
>>> query.execute()  # Returns the number of rows deleted.
7

有關詳細信息,請參閱以下文檔:

選擇單個記錄?

你可以使用 Model.get() 方法來檢索與給定查詢匹配的單個實例。對于主鍵查找,還可以使用快捷方式 Model.get_by_id() .

此方法是調用 Model.select() 使用給定的查詢,但將結果集限制為一行。此外,如果沒有與給定查詢匹配的模型,則 DoesNotExist 將引發異常。

>>> User.get(User.id == 1)
<__main__.User object at 0x25294d0>

>>> User.get_by_id(1)  # Same as above.
<__main__.User object at 0x252df10>

>>> User[1]  # Also same as above.
<__main__.User object at 0x252dd10>

>>> User.get(User.id == 1).username
u'Charlie'

>>> User.get(User.username == 'Charlie')
<__main__.User object at 0x2529410>

>>> User.get(User.username == 'nobody')
UserDoesNotExist: instance matching query does not exist:
SQL: SELECT t1."id", t1."username" FROM "user" AS t1 WHERE t1."username" = ?
PARAMS: ['nobody']

對于更高級的操作,可以使用 SelectBase.get() . 下面的查詢從名為 查理:

>>> (Tweet
...  .select()
...  .join(User)
...  .where(User.username == 'charlie')
...  .order_by(Tweet.created_date.desc())
...  .get())
<__main__.Tweet object at 0x2623410>

有關詳細信息,請參閱以下文檔:

創建或獲取?

Peewee有一個用于執行“get/create”類型操作的助手方法: Model.get_or_create() ,它首先嘗試檢索匹配的行。如果失敗,將創建一個新行。

對于“創建或獲取”類型邏輯,通常需要依賴 unique 用于防止創建重復對象的約束或主鍵。例如,假設我們希望使用 example User model . 這個 User 模型有 unique 限制用戶名字段,因此我們將依賴數據庫的完整性保證,以確保最終不會出現重復的用戶名:

try:
    with db.atomic():
        return User.create(username=username)
except peewee.IntegrityError:
    # `username` is a unique column, so this username already exists,
    # making it safe to call .get().
    return User.get(User.username == username)

您可以很容易地將這種類型的邏輯封裝為 classmethod 靠你自己 Model 類。

上面的示例首先嘗試創建,然后返回到檢索,依賴數據庫來強制執行唯一約束。如果您希望先嘗試檢索記錄,則可以使用 get_or_create() . 此方法與同名的django函數沿同一行實現??梢允褂胐jango-style關鍵字參數篩選器指定 WHERE 條件。函數返回一個包含實例的2元組和一個指示對象是否已創建的布爾值。

下面是如何使用 get_or_create()

user, created = User.get_or_create(username=username)

假設我們有不同的型號 Person 并希望獲取或創建一個Person對象。我們在檢索 Person 是他們的姓和名, but 如果我們最終需要創建一個新記錄,我們還將指定他們的出生日期和最喜歡的顏色:

person, created = Person.get_or_create(
    first_name=first_name,
    last_name=last_name,
    defaults={'dob': dob, 'favorite_color': 'green'})

傳遞給的任何關鍵字參數 get_or_create() 將用于 get() 邏輯的一部分,除了 defaults 字典,用于在新創建的實例上填充值。

有關更多詳細信息,請閱讀 Model.get_or_create() .

選擇多個記錄?

我們可以使用 Model.select() 從表中檢索行。當你構建一個 SELECT 查詢時,數據庫將返回與查詢對應的所有行。Peewee允許您遍歷這些行,并使用索引和切片操作:

>>> query = User.select()
>>> [user.username for user in query]
['Charlie', 'Huey', 'Peewee']

>>> query[1]
<__main__.User at 0x7f83e80f5550>

>>> query[1].username
'Huey'

>>> query[:2]
[<__main__.User at 0x7f83e80f53a8>, <__main__.User at 0x7f83e80f5550>]

Select 查詢是智能的,因為您可以多次迭代、索引和切片查詢,但查詢只執行一次。

在下面的示例中,我們將簡單地調用 select() 并迭代返回值,返回值是 Select . 這將返回 User

>>> for user in User.select():
...     print(user.username)
...
Charlie
Huey
Peewee

備注

在緩存結果時,同一查詢的后續迭代將不會命中數據庫。要禁用此行為(減少內存使用),請調用 Select.iterator() 迭代時。

在對包含外鍵的模型進行迭代時,請注意訪問相關模型上的值的方式。意外地解析外鍵或在后引用上迭代可能導致 N+1 query behavior .

創建外鍵時,例如 Tweet.user ,您可以使用 backref 創建后參照的步驟( User.tweets )背面參考暴露為 Select 實例:

>>> tweet = Tweet.get()
>>> tweet.user  # Accessing a foreign key returns the related model.
<tw.User at 0x7f3ceb017f50>

>>> user = User.get()
>>> user.tweets  # Accessing a back-reference returns a query.
<peewee.ModelSelect at 0x7f73db3bafd0>

您可以迭代 user.tweets 像其他任何參考一樣 Select

>>> for tweet in user.tweets:
...     print(tweet.message)
...
hello world
this is fun
look at this picture of my food

除了返回模型實例外, Select 查詢可以返回字典、元組和namedtuples。根據您的用例,您可能會發現使用行作為字典更容易,例如:

>>> query = User.select().dicts()
>>> for row in query:
...     print(row)

{'id': 1, 'username': 'Charlie'}
{'id': 2, 'username': 'Huey'}
{'id': 3, 'username': 'Peewee'}

namedtuples() , tuples() , dicts() 更多信息。

迭代大型結果集?

默認情況下,peewee將緩存在 Select 查詢。這是一種優化,允許多次迭代,以及索引和切片,而不引起額外的查詢。但是,當您計劃迭代大量行時,這種緩存可能會有問題。

要減少Peewee在迭代查詢時使用的內存量,請使用 iterator() 方法。此方法允許您在不緩存返回的每個模型的情況下進行迭代,在迭代大型結果集時使用更少的內存。

# Let's assume we've got 10 million stat objects to dump to a csv file.
stats = Stat.select()

# Our imaginary serializer class
serializer = CSVSerializer()

# Loop over all the stats and serialize.
for stat in stats.iterator():
    serializer.serialize_object(stat)

對于簡單的查詢,您可以通過將行作為字典、命名上傳或元組返回來進一步提高速度。以下方法可用于任何 Select 更改結果行類型的查詢:

別忘了附加 iterator() 方法調用還可以減少內存消耗。例如,上面的代碼可能如下所示:

# Let's assume we've got 10 million stat objects to dump to a csv file.
stats = Stat.select()

# Our imaginary serializer class
serializer = CSVSerializer()

# Loop over all the stats (rendered as tuples, without caching) and serialize.
for stat_tuple in stats.tuples().iterator():
    serializer.serialize_tuple(stat_tuple)

當遍歷包含多個表中的列的大量行時,Peewee將為返回的每一行重建模型圖。對于復雜的圖,此操作可能很慢。例如,如果我們選擇一個tweet列表以及tweet作者的用戶名和化身,那么peewee必須為每行創建兩個對象(tweet和一個用戶)。除了上面的行類型之外,還有第四種方法 objects() 它將以模型實例的形式返回行,但不會嘗試解析模型圖。

例如:

query = (Tweet
         .select(Tweet, User)  # Select tweet and user data.
         .join(User))

# Note that the user columns are stored in a separate User instance
# accessible at tweet.user:
for tweet in query:
    print(tweet.user.username, tweet.content)

# Using ".objects()" will not create the tweet.user object and assigns all
# user attributes to the tweet instance:
for tweet in query.objects():
    print(tweet.username, tweet.content)

為了獲得最大的性能,可以執行查詢,然后使用底層數據庫光標迭代結果。 Database.execute() 接受查詢對象,執行查詢,并返回db-api 2.0 Cursor 對象。光標將返回原始行元組:

query = Tweet.select(Tweet.content, User.username).join(User)
cursor = database.execute(query)
for (content, username) in cursor:
    print(username, '->', content)

篩選記錄?

您可以使用普通的python操作符過濾特定的記錄。Peewee支持多種 query operators .

>>> user = User.get(User.username == 'Charlie')
>>> for tweet in Tweet.select().where(Tweet.user == user, Tweet.is_published == True):
...     print(tweet.user.username, '->', tweet.message)
...
Charlie -> hello world
Charlie -> this is fun

>>> for tweet in Tweet.select().where(Tweet.created_date < datetime.datetime(2011, 1, 1)):
...     print(tweet.message, tweet.created_date)
...
Really old tweet 2010-01-01 00:00:00

您還可以跨聯接進行篩選:

>>> for tweet in Tweet.select().join(User).where(User.username == 'Charlie'):
...     print(tweet.message)
hello world
this is fun
look at this picture of my food

如果要表示復雜的查詢,請使用括號和python的位 orand 運營商:

>>> Tweet.select().join(User).where(
...     (User.username == 'Charlie') |
...     (User.username == 'Peewee Herman'))

備注

請注意,peewee使用 bitwise 操作員( &| )而不是邏輯運算符( andor )原因是python將邏輯操作的返回值強制為布爾值。這也是必須使用 .in_() 而不是 in 操作員。

退房 the table of query operations 以查看哪些類型的查詢是可能的。

備注

查詢的WHERE子句中可以包含很多有趣的內容,例如:

  • 字段表達式,例如 User.username == 'Charlie'

  • 函數表達式,例如 fn.Lower(fn.Substr(User.username, 1, 1)) == 'a'

  • 一列與另一列的比較,例如 Employee.salary < (Employee.tenure * 1000) + 40000

您還可以嵌套查詢,例如用戶名以“a”開頭的用戶發出的tweets:

# get users whose username starts with "a"
a_users = User.select().where(fn.Lower(fn.Substr(User.username, 1, 1)) == 'a')

# the ".in_()" method signifies an "IN" query
a_user_tweets = Tweet.select().where(Tweet.user.in_(a_users))

更多查詢示例?

備注

有關廣泛的示例查詢,請參見 Query Examples 文檔,其中顯示如何從 PostgreSQL Exercises 網站。

獲取活動用戶:

User.select().where(User.active == True)

獲取員工或超級用戶:

User.select().where(
    (User.is_staff == True) | (User.is_superuser == True))

獲取名為“charlie”的用戶的tweets:

Tweet.select().join(User).where(User.username == 'charlie')

按員工或超級用戶獲取推文(假設為FK關系):

Tweet.select().join(User).where(
    (User.is_staff == True) | (User.is_superuser == True))

使用子查詢按員工或超級用戶獲取推文:

staff_super = User.select(User.id).where(
    (User.is_staff == True) | (User.is_superuser == True))
Tweet.select().where(Tweet.user.in_(staff_super))

分類記錄?

要按順序返回行,請使用 order_by() 方法:

>>> for t in Tweet.select().order_by(Tweet.created_date):
...     print(t.pub_date)
...
2010-01-01 00:00:00
2011-06-07 14:08:48
2011-06-07 14:12:57

>>> for t in Tweet.select().order_by(Tweet.created_date.desc()):
...     print(t.pub_date)
...
2011-06-07 14:12:57
2011-06-07 14:08:48
2010-01-01 00:00:00

您也可以使用 +- 用于指示排序的前綴運算符:

# The following queries are equivalent:
Tweet.select().order_by(Tweet.created_date.desc())

Tweet.select().order_by(-Tweet.created_date)  # Note the "-" prefix.

# Similarly you can use "+" to indicate ascending order, though ascending
# is the default when no ordering is otherwise specified.
User.select().order_by(+User.username)

您還可以跨聯接排序。假設您希望按作者的用戶名訂購tweets,然后按創建日期訂購:

query = (Tweet
         .select()
         .join(User)
         .order_by(User.username, Tweet.created_date.desc()))
SELECT t1."id", t1."user_id", t1."message", t1."is_published", t1."created_date"
FROM "tweet" AS t1
INNER JOIN "user" AS t2
  ON t1."user_id" = t2."id"
ORDER BY t2."username", t1."created_date" DESC

對計算值進行排序時,可以包含必要的SQL表達式,也可以引用分配給該值的別名。以下是兩個例子,說明了這些方法:

# Let's start with our base query. We want to get all usernames and the number of
# tweets they've made. We wish to sort this list from users with most tweets to
# users with fewest tweets.
query = (User
         .select(User.username, fn.COUNT(Tweet.id).alias('num_tweets'))
         .join(Tweet, JOIN.LEFT_OUTER)
         .group_by(User.username))

您可以使用 select 條款。在下面的示例中,我們按照 COUNT() tweet id降序:

query = (User
         .select(User.username, fn.COUNT(Tweet.id).alias('num_tweets'))
         .join(Tweet, JOIN.LEFT_OUTER)
         .group_by(User.username)
         .order_by(fn.COUNT(Tweet.id).desc()))

或者,您可以引用分配給計算值的別名 select 條款。這種方法的好處是更容易閱讀。注意,我們不是直接引用命名別名,而是使用 SQL 幫手:

query = (User
         .select(User.username, fn.COUNT(Tweet.id).alias('num_tweets'))
         .join(Tweet, JOIN.LEFT_OUTER)
         .group_by(User.username)
         .order_by(SQL('num_tweets').desc()))

或者,以“peewee”的方式做事:

ntweets = fn.COUNT(Tweet.id)
query = (User
         .select(User.username, ntweets.alias('num_tweets'))
         .join(Tweet, JOIN.LEFT_OUTER)
         .group_by(User.username)
         .order_by(ntweets.desc())

獲取隨機記錄?

有時您可能希望從數據庫中提取隨機記錄。您可以通過 randomrand 函數(取決于數據庫):

PostgreSQL和SQLite使用 Random 功能:

# Pick 5 lucky winners:
LotteryNumber.select().order_by(fn.Random()).limit(5)

MySQL使用 Rand:

# Pick 5 lucky winners:
LotteryNumber.select().order_by(fn.Rand()).limit(5)

分頁記錄?

這個 paginate() 方法使獲取 page 或記錄。 paginate() 取兩個參數, page_numberitems_per_page .

注意

頁碼以1為基礎,因此結果的第一頁將是第1頁。

>>> for tweet in Tweet.select().order_by(Tweet.id).paginate(2, 10):
...     print(tweet.message)
...
tweet 10
tweet 11
tweet 12
tweet 13
tweet 14
tweet 15
tweet 16
tweet 17
tweet 18
tweet 19

如果您想要更細粒度的控制,可以始終使用 limit()offset() .

盤點記錄?

您可以計算任何選擇查詢中的行數:

>>> Tweet.select().count()
100
>>> Tweet.select().where(Tweet.id > 50).count()
50

Peewee將把您的查詢包裝在執行計數的外部查詢中,這將導致類似SQL的結果:

SELECT COUNT(1) FROM ( ... your query ... );

正在聚合記錄?

假設您有一些用戶,并且想要得到他們的列表以及每個用戶的tweet數量。

query = (User
         .select(User, fn.Count(Tweet.id).alias('count'))
         .join(Tweet, JOIN.LEFT_OUTER)
         .group_by(User))

結果查詢將返回 User 具有所有正常屬性和附加屬性的對象 count 它將包含每個用戶的tweet數。我們使用左外部連接來包括沒有tweet的用戶。

假設您有一個標記應用程序,希望找到具有一定數量相關對象的標記。對于這個例子,我們將在 many-to-many 配置:

class Photo(Model):
    image = CharField()

class Tag(Model):
    name = CharField()

class PhotoTag(Model):
    photo = ForeignKeyField(Photo)
    tag = ForeignKeyField(Tag)

現在假設我們想要找到至少有5張照片與之關聯的標簽:

query = (Tag
         .select()
         .join(PhotoTag)
         .join(Photo)
         .group_by(Tag)
         .having(fn.Count(Photo.id) > 5))

此查詢等效于以下SQL:

SELECT t1."id", t1."name"
FROM "tag" AS t1
INNER JOIN "phototag" AS t2 ON t1."id" = t2."tag_id"
INNER JOIN "photo" AS t3 ON t2."photo_id" = t3."id"
GROUP BY t1."id", t1."name"
HAVING Count(t3."id") > 5

假設我們想要獲取相關的計數并將其存儲在標簽上:

query = (Tag
         .select(Tag, fn.Count(Photo.id).alias('count'))
         .join(PhotoTag)
         .join(Photo)
         .group_by(Tag)
         .having(fn.Count(Photo.id) > 5))

檢索標量值?

您可以通過調用 Query.scalar() . 例如:

>>> PageView.select(fn.Count(fn.Distinct(PageView.url))).scalar()
100

可以通過傳遞來檢索多個標量值 as_tuple=True

>>> Employee.select(
...     fn.Min(Employee.salary), fn.Max(Employee.salary)
... ).scalar(as_tuple=True)
(30000, 50000)

窗口功能?

A Window 函數是指一個聚合函數,該函數在作為 SELECT 查詢。窗口功能使您可以執行以下操作:

  1. 對結果集的子集執行聚合。

  2. 計算運行總數。

  3. 排名結果。

  4. 將行值與前面(或后面)中的值進行比較!行。

Peewee支持SQL窗口函數,可以通過調用 Function.over() 并傳入分區或排序參數。

對于以下示例,我們將使用以下模型和示例數據:

class Sample(Model):
    counter = IntegerField()
    value = FloatField()

data = [(1, 10),
        (1, 20),
        (2, 1),
        (2, 3),
        (3, 100)]
Sample.insert_many(data, fields=[Sample.counter, Sample.value]).execute()

我們的示例表現在包含:

身份證件

柜臺

價值

1

1

10.0

2

1

20.0

3

2

1.0

4

2

3.0

5

3

100.0

有序窗口?

我們來計算 value 字段。為了使它成為一個“連續的”總數,我們需要訂購它,所以我們將訂購關于樣品的 id 領域:

query = Sample.select(
    Sample.counter,
    Sample.value,
    fn.SUM(Sample.value).over(order_by=[Sample.id]).alias('total'))

for sample in query:
    print(sample.counter, sample.value, sample.total)

# 1    10.    10.
# 1    20.    30.
# 2     1.    31.
# 2     3.    34.
# 3   100    134.

對于另一個例子,我們將計算當前值和上一個值之間的差異,當 id

difference = Sample.value - fn.LAG(Sample.value, 1).over(order_by=[Sample.id])
query = Sample.select(
    Sample.counter,
    Sample.value,
    difference.alias('diff'))

for sample in query:
    print(sample.counter, sample.value, sample.diff)

# 1    10.   NULL
# 1    20.    10.  -- (20 - 10)
# 2     1.   -19.  -- (1 - 20)
# 2     3.     2.  -- (3 - 1)
# 3   100     97.  -- (100 - 3)

分區窗口?

讓我們計算一下平均值 value 對于每個不同的“計數器”值。注意,有三個可能的值 counter 字段(1、2和3)。我們可以通過計算 AVG()value 列位于根據 counter 領域:

query = Sample.select(
    Sample.counter,
    Sample.value,
    fn.AVG(Sample.value).over(partition_by=[Sample.counter]).alias('cavg'))

for sample in query:
    print(sample.counter, sample.value, sample.cavg)

# 1    10.    15.
# 1    20.    15.
# 2     1.     2.
# 2     3.     2.
# 3   100    100.

我們可以通過指定 order_bypartition_by 參數。例如,讓我們在每個不同的 counter 組。

query = Sample.select(
    Sample.counter,
    Sample.value,
    fn.RANK().over(
        order_by=[Sample.value],
        partition_by=[Sample.counter]).alias('rank'))

for sample in query:
    print(sample.counter, sample.value, sample.rank)

# 1    10.    1
# 1    20.    2
# 2     1.    1
# 2     3.    2
# 3   100     1

有界窗口?

默認情況下,使用 unbounded preceding 從窗口開始,然后 current row 作為結束。我們可以通過指定 start 和/或 end 在召喚 Function.over() . 此外,Peewee在 Window 用于生成適當邊界引用的對象:

為了檢查邊界是如何工作的,我們將計算 value 列,按 id , but 我們只查看當前行的運行合計,它是前面兩行:

query = Sample.select(
    Sample.counter,
    Sample.value,
    fn.SUM(Sample.value).over(
        order_by=[Sample.id],
        start=Window.preceding(2),
        end=Window.CURRENT_ROW).alias('rsum'))

for sample in query:
    print(sample.counter, sample.value, sample.rsum)

# 1    10.    10.
# 1    20.    30.  -- (20 + 10)
# 2     1.    31.  -- (1 + 20 + 10)
# 2     3.    24.  -- (3 + 1 + 20)
# 3   100    104.  -- (100 + 3 + 1)

備注

從技術上講,我們不需要具體說明 end=Window.CURRENT 因為這是默認值。示例中顯示了它以供演示。

讓我們來看另一個例子。在這個例子中,我們將計算一個運行總數的“相反”值,其中所有值的總和都會被樣本值減少,排序是 id . 為此,我們將計算從當前行到最后一行的總和。

query = Sample.select(
    Sample.counter,
    Sample.value,
    fn.SUM(Sample.value).over(
        order_by=[Sample.id],
        start=Window.CURRENT_ROW,
        end=Window.following()).alias('rsum'))

# 1    10.   134.  -- (10 + 20 + 1 + 3 + 100)
# 1    20.   124.  -- (20 + 1 + 3 + 100)
# 2     1.   104.  -- (1 + 3 + 100)
# 2     3.   103.  -- (3 + 100)
# 3   100    100.  -- (100)

過濾集料?

聚合函數也可以支持過濾函數(postgres和sqlite 3.25+),這些函數被轉換為 FILTER (WHERE...) 條款。將篩選表達式添加到聚合函數中, Function.filter() 方法。

例如,我們將計算 value 字段相對于 id ,但我們將篩選出 counter=2 .

query = Sample.select(
    Sample.counter,
    Sample.value,
    fn.SUM(Sample.value).filter(Sample.counter != 2).over(
        order_by=[Sample.id]).alias('csum'))

for sample in query:
    print(sample.counter, sample.value, sample.csum)

# 1    10.    10.
# 1    20.    30.
# 2     1.    30.
# 2     3.    30.
# 3   100    130.

備注

呼喚 filter() 必須在調用之前 over() .

重用窗口定義?

如果要對多個聚合使用相同的窗口定義,可以創建 Window 對象。這個 Window 對象的參數與 Function.over() ,并可以傳遞給 over() 方法代替單個參數。

在這里,我們將申報一個單窗,按樣品訂購。 id ,并使用該窗口定義調用多個窗口函數:

win = Window(order_by=[Sample.id])
query = Sample.select(
    Sample.counter,
    Sample.value,
    fn.LEAD(Sample.value).over(win),
    fn.LAG(Sample.value).over(win),
    fn.SUM(Sample.value).over(win)
).window(win)  # Include our window definition in query.

for row in query.tuples():
    print(row)

# counter  value  lead()  lag()  sum()
# 1          10.     20.   NULL    10.
# 1          20.      1.    10.    30.
# 2           1.      3.    20.    31.
# 2           3.    100.     1.    34.
# 3         100.    NULL     3.   134.

多窗口定義?

在前面的示例中,我們看到了如何聲明 Window 定義并重新用于多個不同的聚合。您可以根據需要在查詢中包含盡可能多的窗口定義,但必須確保每個窗口具有唯一的別名:

w1 = Window(order_by=[Sample.id]).alias('w1')
w2 = Window(partition_by=[Sample.counter]).alias('w2')
query = Sample.select(
    Sample.counter,
    Sample.value,
    fn.SUM(Sample.value).over(w1).alias('rsum'),  # Running total.
    fn.AVG(Sample.value).over(w2).alias('cavg')   # Avg per category.
).window(w1, w2)  # Include our window definitions.

for sample in query:
    print(sample.counter, sample.value, sample.rsum, sample.cavg)

# counter  value   rsum     cavg
# 1          10.     10.     15.
# 1          20.     30.     15.
# 2           1.     31.      2.
# 2           3.     34.      2.
# 3         100     134.    100.

同樣,如果有多個窗口定義共享類似的定義,則可以擴展以前定義的窗口定義。例如,這里我們將按計數器值對數據集進行分區,因此我們將針對計數器進行聚合。然后,我們將定義擴展此分區的第二個窗口,并添加一個排序子句:

w1 = Window(partition_by=[Sample.counter]).alias('w1')

# By extending w1, this window definition will also be partitioned
# by "counter".
w2 = Window(extends=w1, order_by=[Sample.value.desc()]).alias('w2')

query = (Sample
         .select(Sample.counter, Sample.value,
                 fn.SUM(Sample.value).over(w1).alias('group_sum'),
                 fn.RANK().over(w2).alias('revrank'))
         .window(w1, w2)
         .order_by(Sample.id))

for sample in query:
    print(sample.counter, sample.value, sample.group_sum, sample.revrank)

# counter  value   group_sum   revrank
# 1        10.     30.         2
# 1        20.     30.         1
# 2        1.      4.          2
# 2        3.      4.          1
# 3        100.    100.        1

幀類型:范圍vs行vs組?

根據幀類型,數據庫將以不同的方式處理有序組。我們再創建兩個 Sample 顯示差異的行:

>>> Sample.create(counter=1, value=20.)
<Sample 6>
>>> Sample.create(counter=2, value=1.)
<Sample 7>

我們的表現在包含:

身份證件

柜臺

價值

1

1

10.0

2

1

20.0

3

2

1.0

4

2

3.0

5

3

100.0

6

1

20.0

7

2

1.0

讓我們通過計算樣本的“運行總和”來檢查差異,這些樣本是按照 countervalue 領域。要指定幀類型,可以使用以下任一項:

行為 RANGE 如果存在邏輯重復項,則可能導致意外結果:

query = Sample.select(
    Sample.counter,
    Sample.value,
    fn.SUM(Sample.value).over(
        order_by=[Sample.counter, Sample.value],
        frame_type=Window.RANGE).alias('rsum'))

for sample in query.order_by(Sample.counter, Sample.value):
    print(sample.counter, sample.value, sample.rsum)

# counter  value   rsum
# 1          10.     10.
# 1          20.     50.
# 1          20.     50.
# 2           1.     52.
# 2           1.     52.
# 2           3.     55.
# 3         100     155.

通過包含新行,我們現在有了一些重復的行 categoryvalue 價值觀。這個 RANGE 幀類型使這些重復項一起計算,而不是單獨計算。

通過使用 ROWS 作為框架類型:

query = Sample.select(
    Sample.counter,
    Sample.value,
    fn.SUM(Sample.value).over(
        order_by=[Sample.counter, Sample.value],
        frame_type=Window.ROWS).alias('rsum'))

for sample in query.order_by(Sample.counter, Sample.value):
    print(sample.counter, sample.value, sample.rsum)

# counter  value   rsum
# 1          10.     10.
# 1          20.     30.
# 1          20.     50.
# 2           1.     51.
# 2           1.     52.
# 2           3.     55.
# 3         100     155.

Peewee使用以下規則確定要使用的幀類型:

  • 如果用戶指定 frame_type ,將使用該框架類型。

  • 如果 start 和/或 end 邊界是指定的,peewee將默認為使用 ROWS .

  • 如果用戶沒有指定幀類型或開始/結束邊界,Peewee將使用數據庫默認值,即 RANGE .

這個 Window.GROUPS 框架類型根據排序術語按行組查看窗口范圍規范。使用 GROUPS ,我們可以定義框架,以便它覆蓋不同的行分組。讓我們來看一個例子:

query = (Sample
         .select(Sample.counter, Sample.value,
                 fn.SUM(Sample.value).over(
                    order_by=[Sample.counter, Sample.value],
                    frame_type=Window.GROUPS,
                    start=Window.preceding(1)).alias('gsum'))
         .order_by(Sample.counter, Sample.value))

for sample in query:
    print(sample.counter, sample.value, sample.gsum)

#  counter   value    gsum
#  1         10       10
#  1         20       50
#  1         20       50   (10) + (20+0)
#  2         1        42
#  2         1        42   (20+20) + (1+1)
#  2         3        5    (1+1) + 3
#  3         100      103  (3) + 100

正如您希望推斷的那樣,窗口按其排序術語分組,即 (counter, value) .我們正在查看一個在前一組和當前組之間擴展的窗口。

備注

有關窗口函數API的信息,請參見:

有關窗口函數的一般信息,請閱讀Postgres window functions tutorial

另外, postgres docs 以及 sqlite docs 包含很多好信息。

檢索行元組/字典/名稱雙重?

有時,您不需要創建模型實例的開銷,只需要遍歷行數據,而不需要提供所有API。 Model . 為此,請使用:

stats = (Stat
         .select(Stat.url, fn.Count(Stat.url))
         .group_by(Stat.url)
         .tuples())

# iterate over a list of 2-tuples containing the url and count
for stat_url, stat_count in stats:
    print(stat_url, stat_count)

同樣,您可以使用 dicts()

stats = (Stat
         .select(Stat.url, fn.Count(Stat.url).alias('ct'))
         .group_by(Stat.url)
         .dicts())

# iterate over a list of 2-tuples containing the url and count
for stat in stats:
    print(stat['url'], stat['ct'])

退回條款?

PostgresqlDatabase 支持 RETURNING 條款 UPDATE , INSERTDELETE 查詢。指定一個 RETURNING 子句允許您迭代查詢訪問的行。

默認情況下,執行不同查詢時的返回值為:

  • INSERT -自動遞增新插入行的主鍵值。如果不使用自動遞增的主鍵,Postgres將返回新行的主鍵,但SQLite和MySQL將不返回。

  • UPDATE -修改的行數

  • DELETE -刪除的行數

當使用返回子句時,執行查詢時的返回值將是一個可ITerable光標對象。

PostgreSQL允許通過 RETURNING 子句,從查詢插入或修改的行返回數據。

例如,假設您有 Update 這將停用注冊已過期的所有用戶帳戶。停用后,您希望向每個用戶發送一封電子郵件,讓他們知道他們的帳戶已停用。而不是寫兩個查詢, SELECT 和一個 UPDATE ,您可以在一個 UPDATE 用A查詢 RETURNING 條款:

query = (User
         .update(is_active=False)
         .where(User.registration_expired == True)
         .returning(User))

# Send an email to every user that was deactivated.
for deactivate_user in query.execute():
    send_deactivation_email(deactivated_user.email)

這個 RETURNING 條款也可用于 InsertDelete . 當使用時 INSERT ,將返回新創建的行。當使用時 DELETE ,將返回已刪除的行。

唯一的限制 RETURNING 子句是它只能由查詢中列出的表中的列組成 FROM 條款。要從特定表中選擇所有列,只需傳入 Model 班級。

作為另一個示例,我們添加一個用戶,并將其創建日期設置為服務器生成的當前時間戳。我們將在單個查詢中創建和檢索新用戶的ID、電子郵件和創建時間戳:

query = (User
         .insert(email='foo@bar.com', created=fn.now())
         .returning(User))  # Shorthand for all columns on User.

# When using RETURNING, execute() returns a cursor.
cursor = query.execute()

# Get the user object we just inserted and log the data:
user = cursor[0]
logger.info('Created user %s (id=%s) at %s', user.email, user.id, user.created)

默認情況下,光標將返回 Model 實例,但可以指定其他行類型:

data = [{'name': 'charlie'}, {'name': 'huey'}, {'name': 'mickey'}]
query = (User
         .insert_many(data)
         .returning(User.id, User.username)
         .dicts())

for new_user in query.execute():
    print('Added user "%s", id=%s' % (new_user['username'], new_user['id']))

就像 Select 查詢,您可以指定 result row types .

公用表表達式?

Peewee支持在所有類型的查詢中包含公共表表達式(CTE)。CTE可用于:

  • 分解出一個公共子查詢。

  • 按CTE結果集中派生的列進行分組或篩選。

  • 正在寫入遞歸查詢。

宣布一項 Select 查詢用作CTE,使用 cte() 方法,它將查詢包裝在 CTE 對象。表示 CTE 應作為查詢的一部分包含,使用 Query.with_cte() 方法,傳遞CTE對象列表。

簡單實例?

例如,假設我們有一些由鍵和浮點值組成的數據點。讓我們定義模型并填充一些測試數據:

class Sample(Model):
    key = TextField()
    value = FloatField()

data = (
    ('a', (1.25, 1.5, 1.75)),
    ('b', (2.1, 2.3, 2.5, 2.7, 2.9)),
    ('c', (3.5, 3.5)))

# Populate data.
for key, values in data:
    Sample.insert_many([(key, value) for value in values],
                       fields=[Sample.key, Sample.value]).execute()

讓我們使用一個CTE來計算每個不同的鍵,哪個值高于該鍵的平均值。

# First we'll declare the query that will be used as a CTE. This query
# simply determines the average value for each key.
cte = (Sample
       .select(Sample.key, fn.AVG(Sample.value).alias('avg_value'))
       .group_by(Sample.key)
       .cte('key_avgs', columns=('key', 'avg_value')))

# Now we'll query the sample table, using our CTE to find rows whose value
# exceeds the average for the given key. We'll calculate how far above the
# average the given sample's value is, as well.
query = (Sample
         .select(Sample.key, Sample.value)
         .join(cte, on=(Sample.key == cte.c.key))
         .where(Sample.value > cte.c.avg_value)
         .order_by(Sample.value)
         .with_cte(cte))

我們可以迭代查詢返回的樣本,以查看哪些樣本的給定組的值高于平均值:

>>> for sample in query:
...     print(sample.key, sample.value)

# 'a', 1.75
# 'b', 2.7
# 'b', 2.9

復雜實例?

對于更完整的示例,讓我們考慮下面的查詢,它使用多個CTE來查找僅在頂部銷售區域中的每個產品銷售總額。我們的模型如下:

class Order(Model):
    region = TextField()
    amount = FloatField()
    product = TextField()
    quantity = IntegerField()

下面是如何用SQL編寫查詢。這個例子可以在 postgresql documentation .

WITH regional_sales AS (
    SELECT region, SUM(amount) AS total_sales
    FROM orders
    GROUP BY region
  ), top_regions AS (
    SELECT region
    FROM regional_sales
    WHERE total_sales > (SELECT SUM(total_sales) / 10 FROM regional_sales)
  )
SELECT region,
       product,
       SUM(quantity) AS product_units,
       SUM(amount) AS product_sales
FROM orders
WHERE region IN (SELECT region FROM top_regions)
GROUP BY region, product;

用peewee,我們會寫:

reg_sales = (Order
             .select(Order.region,
                     fn.SUM(Order.amount).alias('total_sales'))
             .group_by(Order.region)
             .cte('regional_sales'))

top_regions = (reg_sales
               .select(reg_sales.c.region)
               .where(reg_sales.c.total_sales > (
                   reg_sales.select(fn.SUM(reg_sales.c.total_sales) / 10)))
               .cte('top_regions'))

query = (Order
         .select(Order.region,
                 Order.product,
                 fn.SUM(Order.quantity).alias('product_units'),
                 fn.SUM(Order.amount).alias('product_sales'))
         .where(Order.region.in_(top_regions.select(top_regions.c.region)))
         .group_by(Order.region, Order.product)
         .with_cte(reg_sales, top_regions))

遞歸CTE?

Peewee支持遞歸CTE。例如,當您具有由父鏈接外鍵表示的樹數據結構時,遞歸CTE可能很有用。例如,假設我們有一個在線書店的類別層次結構。我們希望生成一個顯示所有類別及其絕對深度的表,以及從根目錄到類別的路徑。

我們將假設以下模型定義,其中每個類別都有其直接父類別的外鍵:

class Category(Model):
    name = TextField()
    parent = ForeignKeyField('self', backref='children', null=True)

要列出所有類別及其深度和父級,我們可以使用遞歸CTE:

# Define the base case of our recursive CTE. This will be categories that
# have a null parent foreign-key.
Base = Category.alias()
level = Value(1).alias('level')
path = Base.name.alias('path')
base_case = (Base
             .select(Base.id, Base.name, Base.parent, level, path)
             .where(Base.parent.is_null())
             .cte('base', recursive=True))

# Define the recursive terms.
RTerm = Category.alias()
rlevel = (base_case.c.level + 1).alias('level')
rpath = base_case.c.path.concat('->').concat(RTerm.name).alias('path')
recursive = (RTerm
             .select(RTerm.id, RTerm.name, RTerm.parent, rlevel, rpath)
             .join(base_case, on=(RTerm.parent == base_case.c.id)))

# The recursive CTE is created by taking the base case and UNION ALL with
# the recursive term.
cte = base_case.union_all(recursive)

# We will now query from the CTE to get the categories, their levels,  and
# their paths.
query = (cte
         .select_from(cte.c.name, cte.c.level, cte.c.path)
         .order_by(cte.c.path))

# We can now iterate over a list of all categories and print their names,
# absolute levels, and path from root -> category.
for category in query:
    print(category.name, category.level, category.path)

# Example output:
# root, 1, root
# p1, 2, root->p1
# c1-1, 3, root->p1->c1-1
# c1-2, 3, root->p1->c1-2
# p2, 2, root->p2
# c2-1, 3, root->p2->c2-1

數據修改CTE?

Peewee支持數據修改CTE。

使用數據修改CTE使用單個查詢將數據從一個表移動到存檔表的示例:

class Event(Model):
    name = CharField()
    timestamp = DateTimeField()

class Archive(Model):
    name = CharField()
    timestamp = DateTimeField()

# Move rows older than 24 hours from the Event table to the Archive.
cte = (Event
       .delete()
       .where(Event.timestamp < (datetime.now() - timedelta(days=1)))
       .returning(Event)
       .cte('moved_rows'))

# Create a simple SELECT to get the resulting rows from the CTE.
src = Select((cte,), (cte.c.id, cte.c.name, cte.c.timestamp))

# Insert into the archive table whatever data was returned by the DELETE.
res = (Archive
       .insert_from(src, (Archive.id, Archive.name, Archive.timestamp))
       .with_cte(cte)
       .execute())

上面的內容大致對應于下面的SQL:

WITH "moved_rows" AS (
    DELETE FROM "event" WHERE ("timestamp" < XXXX-XX-XXTXX:XX:XX)
    RETURNING "id", "name", "timestamp")
INSERT INTO "archive" ("id", "name", "timestamp")
SELECT "moved_rows"."id", "moved_rows"."name", "moved_rows"."timestamp"
FROM "moved_rows";

有關其他示例,請參閱中的測試 models.pysql.py

外鍵和聯接?

本節已移至其自己的文檔中: 關系和連接 。