Adjust type hints for NonBlockingStreamReader()

This commit is contained in:
2Shirt 2023-07-02 14:05:20 -07:00
parent 86203a4b86
commit 172f00e4e9
Signed by: 2Shirt
GPG key ID: 152FAC923B0E132C

View file

@ -9,7 +9,7 @@ import re
import subprocess import subprocess
import time import time
from io import BufferedReader, TextIOWrapper from io import IOBase
from queue import Queue, Empty from queue import Queue, Empty
from threading import Thread from threading import Thread
from typing import Any, Callable, Iterable from typing import Any, Callable, Iterable
@ -28,11 +28,11 @@ class NonBlockingStreamReader():
## https://gist.github.com/EyalAr/7915597 ## https://gist.github.com/EyalAr/7915597
## https://stackoverflow.com/a/4896288 ## https://stackoverflow.com/a/4896288
def __init__(self, stream: BufferedReader | TextIOWrapper): def __init__(self, stream: IOBase):
self.stream: BufferedReader | TextIOWrapper = stream self.stream: IOBase = stream
self.queue: Queue = Queue() self.queue: Queue = Queue()
def populate_queue(stream: BufferedReader | TextIOWrapper, queue: Queue) -> None: def populate_queue(stream: IOBase, queue: Queue) -> None:
"""Collect lines from stream and put them in queue.""" """Collect lines from stream and put them in queue."""
while not stream.closed: while not stream.closed:
try: try: