この記事では、PythonのQueue(キュー)の種類と使い方を解説します。
Pythonでは、queue
モジュールを使うことで様々なキューを扱うことができます。これらのオブジェクトはマルチスレッドプログラミングにおいて安全に値の受け渡しが可能です。
Queue
は、先入先出(FIFO)のコレクションというイメージが強いですが、queue
モジュールには以下のような3種類のキューが用意されています。
- 先入先出(FIFO)キュー
- 後入先出(LIFO)キュー
- 優先順位付きキュー
また、collections
モジュールにはdeque(デック)
と呼ばれるスタックとキューを一般化したものが定義されており、こちらのオブジェクトもキューとして使用可能です。
Linkスタックやキューのように扱えるdeque(デック)の使い方
それでは、Pythonで使えるキューの種類と使い方を見ていきましょう!
queueモジュールで扱えるキューの種類
queue
モジュールには以下の3種類のキューが用意されています。
- queue.Queue(maxsize=0)
- 先入先出(FIFO)キューのコンストラクタ。
- queue.LifoQueue(maxsize=0)
- 後入先出(LIFO)キューのコンストラクタ。
- queue.PriorityQueue(maxsize=0)
- 優先順位付きキューのコンストラクタ。このキューでは要素がソートされ、 最も低い値が最初に取り出されます。
- maxsize引数は、キューに入れられる要素数の上限を設定する整数
- 要素数が上限に達している場合、挿入処理はキューの要素が消費されるまでブロックされる
- maxsize引数が0以下の場合は、キューのサイズは無限となる
これら3つのキューは、キューから取り出されるエントリの順番だけが異なり、使用できる機能(メソッド)は同じです。
また、タスク追跡などを行わない、よりシンプルなキューも用意されています。
- queue.SimpleQueue()
- シンプルな先入先出(FIFO)キューのコンストラクタ。
- Python 3.7 で追加
キューオブジェクトで使えるメソッド
Queue
、LifoQueue
、PriorityQueue
では、以下のようなメソッドが使用可能です。
アイテムの追加
- put(item, block=True, timeout=None)
-
item
引数をキューに追加する。import queue q = queue.Queue() q.put(1) # 1を追加 q.put(2) # 2を追加 q.put(3) # 3を追加
block
引数がTrue
、timeout
引数がNone
の場合、キューに空きなかったら空きがでるまでブロックし、空きが出た瞬間にアイテムを追加します。import queue import time, threading # 1つしかアイテムが入らないキュー q = queue.Queue(1) # 1秒ごとにキューのアイテムを取り出す関数 def worker(): while True: time.sleep(1) item = q.get() print(f'item: {item}') # スレッド生成・実行 threading.Thread(target=worker, daemon=True).start() # キューにアイテムを追加 for item in range(5): q.put(item)
timeout
引数が正の数の場合、最大でtimeout秒間ブロックする。指定した秒数以内に空きができなかった場合は、Full
例外が送出されます。先ほどのコードのtimeoutを0.5とかに設定すると指定した秒数以内にキューに空きが出ないのでFull例外が送出される。
# キューにアイテムを追加 for item in range(5): q.put(item, timeout=0.5)
block
引数がFalse
の場合、キューに空きがなかったら直ちにFull
例外が送出されます。put(item, block=False)
と同等のput_nowait(item)
メソッドも用意されている。 - get(block=True, timeout=None)
-
キューからアイテムを取り除き、返します。
import queue q = queue.Queue() q.put(1) # 1を追加 q.put(2) # 2を追加 q.put(3) # 3を追加 print(q.get()) # 1 print(q.get()) # 2 print(q.get()) # 3
block
引数がTrue
、timeout
引数がNone
の場合、アイテムが取り出せるようになるまでブロックします。import queue import time, threading q = queue.Queue() # 1秒ごとにキューにアイテムを追加する関数 def worker(): for item in range(5): q.put(item) time.sleep(1) # スレッド生成・実行 threading.Thread(target=worker, daemon=True).start() # キューからアイテムを取り出す for item in range(5): print(f'item: {q.get()}')
timeout
引数が正の数の場合、最大でtimeout秒間ブロックする。指定した秒数以内にアイテムを取り出せない場合は、Empty
例外が送出されます。先ほどのコードのtimeoutを0.5とかに設定すると指定した秒数以内にアイテムが取り出せないのでEmpty例外が送出される。
# キューからアイテムを取り出す for item in range(5): print(f'item: {q.get(timeout=0.5)}')
block
引数がFalse
の場合、キューからアイテムが取り出せなければ直ちにEmpty
例外が送出されます。get(block=False)
と同等のget_nowait()
メソッドも用意されている。 - qsize()
-
キューの近似サイズを返す。
import queue q = queue.Queue() q.put(1) q.put(2) q.put(3) print(q.qsize()) # 3
- empty()
-
キューが空の場合は
True
、そうでなければFalse
を返す。import queue q = queue.Queue() print(q.empty()) # True q.put(1) q.put(2) q.put(3) print(q.empty()) # False
- full()
-
キューが最大の場合は
True
、そうでなければFalse
を返す。import queue q = queue.Queue(3) print(q.full()) # False q.put(1) q.put(2) q.put(3) print(q.full()) # True
アイテムを取り出す
サイズの取得
空かどうかの取得
最大かどうかの取得
処理待ち
キューのアイテムが全てコンシューマースレッドで処理されるまで待ちたい場合は、以下の2つのメソッドを組み合わせて使います。
- 「コンシューマー(消費者)スレッド」とは「データを消費、つまりは取り出す側のスレッド」のこと
- データを追加する側は「プロデューサー(生産者)スレッド」と呼ばれる
- task_done()
- join()
-
join()
は、キューのすべてのアイテムが取り出されて処理されるまでブロックするメソッド。キューにアイテムが追加される度に未完了タスクカウントが増加します。コンシューマースレッドで
get()
の後にtask_done()
を呼び出すと未完了タスクカウントが減少します。未完了タスクカウントが0になったときにjoin()
のブロックが解除されます。import queue import time, threading q = queue.Queue() q.put(1) # 未完了タスクカウント増加 q.put(2) # 未完了タスクカウント増加 q.put(3) # 未完了タスクカウント増加 def worker(): while True: item = q.get() print(f'アイテム: {item} 処理中') time.sleep(1) print(f'アイテム: {item} 完了') q.task_done() # 未完了タスクカウント減少 threading.Thread(target=worker, daemon=True).start() q.join() print('全てのキューを処理完了')
実行結果
アイテム: 1 処理中 アイテム: 1 完了 アイテム: 2 処理中 アイテム: 2 完了 アイテム: 3 処理中 アイテム: 3 完了 全てのキューを処理完了
キューオブジェクトで使えるメソッドは以上となります。
まとめ
この記事では、PythonのQueue(キュー)の種類と使い方を解説しました。
Pythonでマルチスレッドを使う際には、queue
モジュールのキューを使うことで簡単で安全にデータの受け渡しができてとても便利です。
それでは今回の内容はここまでです。ではまたどこかで〜( ・∀・)ノ