內容

名稱

Thread::Queue - 執行緒安全佇列

版本

本文件說明 Thread::Queue 版本 3.14

語法

use strict;
use warnings;

use threads;
use Thread::Queue;

my $q = Thread::Queue->new();    # A new empty queue

# Worker thread
my $thr = threads->create(
    sub {
        # Thread will loop until no more work
        while (defined(my $item = $q->dequeue())) {
            # Do work on $item
            ...
        }
    }
);

# Send work to the thread
$q->enqueue($item1, ...);
# Signal that there is no more work to be sent
$q->end();
# Join up with the thread when it finishes
$thr->join();

...

# Count of items in the queue
my $left = $q->pending();

# Non-blocking dequeue
if (defined(my $item = $q->dequeue_nb())) {
    # Work on $item
}

# Blocking dequeue with 5-second timeout
if (defined(my $item = $q->dequeue_timed(5))) {
    # Work on $item
}

# Set a size for a queue
$q->limit = 5;

# Get the second item in the queue without dequeuing anything
my $item = $q->peek(1);

# Insert two items into the queue just behind the head
$q->insert(1, $item1, $item2);

# Extract the last two items on the queue
my ($item1, $item2) = $q->extract(-2, 2);

描述

本模組提供執行緒安全的 FIFO 佇列,任何數量的執行緒都可以安全地存取。

任何由 threads::shared 支援的資料類型都可以透過佇列傳遞

一般純量
陣列參考
雜湊參考
純量參考
基於上述項目的物件

一般純量會原樣加入佇列。

如果尚未執行緒共用,其他複雜資料類型將在放入佇列之前複製到執行緒共用結構中(如果需要,將遞迴複製,並包含任何 bless 和唯讀設定)。

例如,下列會導致 Thread::Queue 透過 &shared([]) 建立一個空的共用陣列參考,將元素「foo」、「bar」和「baz」從 @ary 複製到其中,然後將該共用參考放入佇列中

my @ary = qw/foo bar baz/;
$q->enqueue(\@ary);

不過,對於下列情況,項目已經共用,因此其參考會直接加入佇列,而且不會執行複製

my @ary :shared = qw/foo bar baz/;
$q->enqueue(\@ary);

my $obj = &shared({});
$$obj{'foo'} = 'bar';
$$obj{'qux'} = 99;
bless($obj, 'My::Class');
$q->enqueue($obj);

請參閱 "LIMITATIONS",以了解透過佇列傳遞物件相關的注意事項。

佇列建立

->new()

建立新的空佇列。

->new(LIST)

建立新的佇列,並預先填入提供的項目清單。

基本方法

下列方法以 FIFO 為基礎處理佇列。

->enqueue(LIST)

將項目清單加入佇列尾端。

->dequeue()
->dequeue(COUNT)

從佇列頭端移除請求的項目數量(預設為 1),並傳回這些項目。如果佇列包含的項目數量少於請求的數量,則執行緒會被封鎖,直到有足夠數量的項目可用(即,直到其他執行緒 enqueue 更多項目)。

->dequeue_nb()
->dequeue_nb(COUNT)

從佇列頭端移除請求的項目數量(預設為 1),並傳回這些項目。如果佇列包含的項目數量少於請求的數量,則會立即(即,非封鎖)傳回佇列中的所有項目。如果佇列為空,則傳回 undef

->dequeue_timed(TIMEOUT)
->dequeue_timed(TIMEOUT, COUNT)

從佇列頭端移除請求的項目數量(預設為 1),並傳回這些項目。如果佇列包含的項目數量少於請求的數量,則執行緒會被封鎖,直到有足夠數量的項目可用,或達到逾時時間。如果達到逾時時間,則會傳回佇列中的所有項目,或如果佇列為空,則傳回 undef

逾時時間可以是相對於目前時間的秒數(例如,從呼叫時起 5 秒),或可以是與 cond_timedwait() 相同的絕對逾時時間(以紀元秒表示)。也支援小數秒(例如,2.5 秒)(取決於底層實作)。

如果 TIMEOUT 遺失、為 undef,或小於或等於 0,則此呼叫的行為與 dequeue_nb 相同。

->pending()

傳回佇列中尚有的項目數。如果佇列已結束(見下文),且佇列中沒有更多項目,則傳回 undef

->limit

設定佇列大小。如果設定,則呼叫 enqueue() 將會封鎖,直到佇列中待處理項目的數量降至低於 limitlimit 不會阻止佇列項目超過該數量

my $q = Thread::Queue->new(1, 2);
$q->limit = 4;
$q->enqueue(3, 4, 5);   # Does not block
$q->enqueue(6);         # Blocks until at least 2 items are
                        # dequeued
my $size = $q->limit;   # Returns the current limit (may return
                        # 'undef')
$q->limit = 0;          # Queue size is now unlimited

如果使用大於佇列 limitCOUNT 呼叫任何出列方法,將會產生錯誤。

->end()

宣告不再會將更多項目加入佇列。

所有封鎖於 dequeue() 呼叫的執行緒將會解除封鎖,且佇列中任何剩餘項目和/或 undef 將會傳回。任何後續呼叫 dequeue() 的行為將如同 dequeue_nb()

一旦結束,就不會再有更多項目放入佇列。

進階方法

以下方法可用於操作佇列中任何位置的項目。

若要防止佇列內容在檢查和/或變更時被其他執行緒修改,請在區塊內 鎖定 佇列

{
    lock($q);   # Keep other threads from changing the queue's contents
    my $item = $q->peek();
    if ($item ...) {
        ...
    }
}
# Queue is now unlocked
->peek()
->peek(INDEX)

從佇列傳回項目,但不會出列任何項目。如果未指定索引,則預設為佇列開頭(索引位置 0)。負索引值受支援,如同 陣列(即 -1 是佇列結尾,-2 是倒數第二個,依此類推)。

如果在指定索引處不存在任何項目(即佇列為空,或索引超出佇列中項目的數量),則傳回 undef

請記住,傳回的項目不會從佇列中移除,因此操作 peek 的參考會影響佇列中的項目。

->insert(INDEX, LIST)

將項目清單新增至佇列中指定索引位置(0 是清單開頭)。該位置及之後的任何現有項目都會被推到新加入項目的後面

$q->enqueue(1, 2, 3, 4);
$q->insert(1, qw/foo bar/);
# Queue now contains:  1, foo, bar, 2, 3, 4

指定大於佇列中項目數的索引位置,只會將清單新增至結尾。

負索引位置受支援

$q->enqueue(1, 2, 3, 4);
$q->insert(-2, qw/foo bar/);
# Queue now contains:  1, 2, foo, bar, 3, 4

指定大於佇列中項目數的負索引位置,會將清單新增至佇列開頭。

->extract()
->extract(INDEX)
->extract(INDEX, COUNT)

移除並傳回指定數量的項目(預設為 1)至佇列中指定的索引位置(0 為佇列的開頭)。當不帶任何參數呼叫時,extract 的運作方式與 dequeue_nb 相同。

此方法是非封鎖的,且僅會傳回可滿足要求的項目數量

$q->enqueue(1, 2, 3, 4);
my $item  = $q->extract(2)     # Returns 3
                               # Queue now contains:  1, 2, 4
my @items = $q->extract(1, 3)  # Returns (2, 4)
                               # Queue now contains:  1

指定大於佇列中項目數量的索引位置會導致傳回 undef 或空清單。

$q->enqueue('foo');
my $nada = $q->extract(3)      # Returns undef
my @nada = $q->extract(1, 3)   # Returns ()

支援負索引位置。指定大於佇列中項目數量的負索引位置可能會傳回佇列開頭的項目(類似於 dequeue_nb),如果數量與從指定位置開始的佇列開頭重疊(即佇列大小 + 索引 + 數量大於零)

$q->enqueue(qw/foo bar baz/);
my @nada = $q->extract(-6, 2);  # Returns ()      - (3+(-6)+2) <= 0
my @some = $q->extract(-6, 4);  # Returns (foo)   - (3+(-6)+4) > 0
                                # Queue now contains:  bar, baz
my @rest = $q->extract(-3, 4);  # Returns (bar, baz) -
                                #                   (2+(-3)+4) > 0

備註

Thread::Queue 建立的佇列可以在有執行緒或沒有執行緒的應用程式中使用。

限制

如果物件的類別不支援共用,則在佇列中傳遞物件可能無法運作。有關更多資訊,請參閱 threads::shared 中的「BUGS AND LIMITATIONS」

傳遞包含物件的陣列/雜湊參照在 Perl 5.10.0 之前可能無法運作。

另請參閱

MetaCPAN 上的 Thread::Queue:https://metacpan.org/release/Thread-Queue

CPAN 發行版的程式碼存放庫:https://github.com/Dual-Life/Thread-Queue

threadsthreads::shared

此 CPAN 發行版的 examples 目錄中的範例程式碼。

維護人員

Jerry D. Hedden,<jdhedden AT cpan DOT org>

授權

此程式為自由軟體;您可以在與 Perl 相同的條款下重新發布或修改它。