Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Seeking on Async FS is bugged / not-working #1772

Open
bluecoconut opened this issue Jan 14, 2025 · 7 comments
Open

Seeking on Async FS is bugged / not-working #1772

bluecoconut opened this issue Jan 14, 2025 · 7 comments

Comments

@bluecoconut
Copy link

Here's a minimal example:

import fsspec
import asyncio

async def async_version():
    print("Async Version")
    fs = fsspec.filesystem("http", asynchronous=True)
    session = await fs.set_session()
    file = await fs.open_async("https://example.com/")
    print("Starting Tell", file.tell(), "seeking to 20")
    file.seek(20)
    print(f"Read 5 bytes, from tell of {file.tell()}:", await file.read(5), "now tell:", file.tell())
    file.seek(20)
    print(f"Read 5 bytes, from tell of {file.tell()}:", await file.read(5), "now tell:", file.tell())
    await file.close()
    await session.close()

def sync_version():
    print("Sync Version")
    fs = fsspec.filesystem("http")
    file = fs.open("https://example.com/")
    print("Starting Tell", file.tell(), "seeking to 20")
    file.seek(20)
    print(f"Read 5 bytes, from tell of {file.tell()}:", file.read(5), "now tell:", file.tell())
    file.seek(20)
    print(f"Read 5 bytes, from tell of {file.tell()}:", file.read(5), "now tell:", file.tell())
    file.close()

if __name__ == '__main__':
    asyncio.run(async_version())
    sync_version()

This outputs

Async Version
Starting Tell 0 seeking to 20
Read 5 bytes, from tell of 20: b'<!doc' now tell: 25
Read 5 bytes, from tell of 20: b'type ' now tell: 25
Sync Version
Starting Tell 0 seeking to 20
Read 5 bytes, from tell of 20: b'l>\n<h' now tell: 25
Read 5 bytes, from tell of 20: b'l>\n<h' now tell: 25

Note the async version, while respecting seek and tell, and even updating the .loc after a read, so updated .tell works in terms of describing the .loc, but the actual bytes that .read is operating on are wrong.

The document starts <!doctype so we can see that the two .read() are just reading sequentially, and the seek operation in the async implementation is not affecting the returned bytes (despite updating the .loc)

I actually originally found this via a s3 filesystem with cache_type='background', but as I removed things I eventually got all the way down to pure http and found it still is not working.

@martindurant
Copy link
Member

We should fix this - async/streaming files should not be seekable at all

@bluecoconut
Copy link
Author

Hmm... is "asynchronous" flag on the filesystem meant to mean 1:1 with streaming? That feels like an over-specification / a mismatch from what I would expect when trying to use an async interface.

My goal when using async is to prevent IO bound operations from blocking. (I'm trying to read multiple files (from multiple urls / s3 keys) at once). Is there another way I can achieve this without going down the "streaming" path?

This is sort of what I originally thought I could do (just open with open_async)

import fsspec
import asyncio
import random

async def read_bytes(url, start, end):
    my_id = random.randint(0, 100000)
    print(f"[{my_id:05}] Starting")
    print(f"[{my_id:05}] Starting read from {start} to {end}")
    try:
        fs = fsspec.filesystem("http")
        f = await fs.open_async(url, 'rb')
        f.seek(start)
        return await f.read(end - start)
    finally:
        await f.close()
        print(f"[{my_id:05}] Done")


async def batch_read():
    url = "https://ash-speed.hetzner.com/1GB.bin"
    offsets = [random.randint(0, 1024 * 1024 * 1024) for _ in range(5)]
    tasks = [read_bytes(url, o, o + 50 * 1024 * 1024) for o in offsets]
    results = await asyncio.gather(*tasks)
    print([len(r) for r in results])

if __name__ == '__main__':
    asyncio.run(batch_read())

but this gets an error

File "... lib/python3.12/site-packages/aiohttp/helpers.py", line 636, in __enter__
RuntimeError: Timeout context manager should be used inside a task

Digging around, i found the asynchronous=True flag on the fs class, which removes the error about "Timeout context manager inside task", but then breaks the actual behavior (seek and reads are no longer behaving / have bugs with that exact code, but then asynchronous=True added to filesystem(...) constructor.

@martindurant
Copy link
Member

Hmm... is "asynchronous" flag on the filesystem meant to mean 1:1 with streaming?

No, that specifies the intent on whether the async def _* methods will get called, or their sync counterparts.

However, the sync file-like from open() doesn't make sense in an async context, but its probably a bad idea to overload open() to produce the streaming async variant when asynchronous=True.

@martindurant
Copy link
Member

My goal when using async is to prevent IO bound operations from blocking

The sync file-like object does, of course, call down into the async code, so it is possible to get a true async and random-access file, but this is not exposed. We don't know what the API should look like, since IOBase is certainly sync; furthermore, a file-like is stateful (the file position), so races are very possible.

@bluecoconut
Copy link
Author

here's a verison where I don't set the asynchronous=True flag, but still get behavior that is unexpected.

import fsspec
import asyncio
import random

async def read_bytes(url, start, end):
    my_id = random.randint(0, 100000)
    print(f"[{my_id:05}] Starting")
    print(f"[{my_id:05}] Starting read from {start} to {end}")
    try:
        fs = fsspec.filesystem("http", loop=asyncio.get_running_loop())
        f = await fs.open_async(url, 'rb')
        f.seek(start)
        return await f.read(end - start)
    finally:
        await f.close()
        print(f"[{my_id:05}] Done")


async def batch_read():
    url = "https://ash-speed.hetzner.com/1GB.bin"
    offsets = [random.randint(0, 1024 * 1024 * 1024) for _ in range(5)]
    tasks = [read_bytes(url, o, o + 50 * 1024 * 1024) for o in offsets]
    results = await asyncio.gather(*tasks)
    print("Results:", [r[:15] for r in results])
    print([len(r) for r in results])

if __name__ == '__main__':
    asyncio.run(batch_read())

This runs, but outputs:

[17262] Starting
[17262] Starting read from 94493796 to 146922596
[00710] Starting
[00710] Starting read from 384436511 to 436865311
[45430] Starting
[45430] Starting read from 632900967 to 685329767
[96631] Starting
[96631] Starting read from 534823591 to 587252391
[02151] Starting
[02151] Starting read from 615312974 to 667741774
[45430] Done
[02151] Done
[00710] Done
[17262] Done
[96631] Done
Results: [b'\x08\x00)3\\\x0f\xb81\xd1\xfa\xcd\xea\xbe#\x97', b'\x08\x00)3\\\x0f\xb81\xd1\xfa\xcd\xea\xbe#\x97', b'\x08\x00)3\\\x0f\xb81\xd1\xfa\xcd\xea\xbe#\x97', b'\x08\x00)3\\\x0f\xb81\xd1\xfa\xcd\xea\xbe#\x97', b'\x08\x00)3\\\x0f\xb81\xd1\xfa\xcd\xea\xbe#\x97']
[16126, 16126, 16126, 16126, 16126]

by specifying the loop, i avoid the co-routine error (which i believe is due to fsspec creating its own event loop if not specified).

Note that all the reads are identical (ignoring seek) and also not the full-size they are supposed to be (16126 bytes).


In the code above, if I remove the async behavior from fsspec, things instead start to block (the underlying sync -> async doesn't prevent the sync calls from blocking eachother).

import fsspec
import asyncio
import random

async def read_bytes(url, start, end):
    my_id = random.randint(0, 100000)
    print(f"[{my_id:05}] Starting")
    print(f"[{my_id:05}] Starting read from {start} to {end}")
    try:
        fs = fsspec.filesystem("http")
        f = fs.open(url, 'rb')
        f.seek(start)
        return f.read(end - start)
    finally:
        print(f"[{my_id:05}] Done")


async def batch_read():
    url = "https://ash-speed.hetzner.com/1GB.bin"
    offsets = [random.randint(0, 1024 * 1024 * 1024) for _ in range(5)]
    tasks = [read_bytes(url, o, o + 50 * 1024 * 1024) for o in offsets]
    results = await asyncio.gather(*tasks)
    print("Results:", [r[:15] for r in results])
    print([len(r) for r in results])

if __name__ == '__main__':
    asyncio.run(batch_read())

the calls block eachother and there are no parallel requests happening.

Am I missing something obvious about the API / how to use fsspec?

@bluecoconut
Copy link
Author

Based on your comment:

The sync file-like object does, of course, call down into the async code, so it is possible to get a true async and random-access file, but this is not exposed.

I decided to try to see if I could get the behavior I wanted by digging in a bit more.

I got something that now works, but took a lot of monkey-patching.

Specifically, i ended up monkeypatching:

  • fsspec.spec.AbstractBufferedFile.read_async = read_async
  • fsspec.caching.UpdatableLRU.acall = acall
  • fsspec.caching.BackgroundBlockCache._fetch_async = _fetch_async
  • fsspec.caching.BackgroundBlockCache._fetch_block_async = _fetch_block_async

I created a cleanup method:
( fsspec.caching.BackgroundBlockCache._cleanup_async_future = _cleanup_async_future

and then to side-step the .info call in open(...) I directly called fsspec.implementations.http.HTTPFile initializer, and wrote my own method to pre-populate the size.

Example code here
import fsspec
import asyncio
import random
import aiohttp

import logging

logger = logging.getLogger("fsspec")

async def read_async(self, length=-1):
    """
    Return data from cache, or fetch pieces as necessary

    Parameters
    ----------
    length: int (-1)
        Number of bytes to read; if <0, all remaining bytes.
    """
    length = -1 if length is None else int(length)
    if self.mode != "rb":
        raise ValueError("File not in read mode")
    if length < 0:
        length = self.size - self.loc
    if self.closed:
        raise ValueError("I/O operation on closed file.")
    if length == 0:
        # don't even bother calling fetch
        return b""
    out = await self.cache._fetch_async(self.loc, self.loc + length)
    logger.debug(
        "%s read: %i - %i %s",
        self,
        self.loc,
        self.loc + length,
        self.cache._log_stats(),
    )
    self.loc += len(out)
    return out

fsspec.spec.AbstractBufferedFile.read_async = read_async

async def _fetch_block_async(self, block_number: int, log_info: str = "sync") -> bytes:
    """
    Fetch the block of data for `block_number`.
    """
    if block_number > self.nblocks:
        raise ValueError(
            f"'block_number={block_number}' is greater than "
            f"the number of blocks ({self.nblocks})"
        )

    start = block_number * self.blocksize
    end = start + self.blocksize
    logger.info("BlockCache fetching block (%s) %d", log_info, block_number)
    self.total_requested_bytes += end - start
    self.miss_count += 1
    async_fetcher = self.fetcher.__self__.async_fetch_range
    block_contents = await async_fetcher(start, end)
    return block_contents

fsspec.caching.BackgroundBlockCache._fetch_block_async = _fetch_block_async

async def acall(self, asyncfunc, *args, **kwargs):
    if kwargs:
        raise TypeError(f"Got unexpected keyword argument {kwargs.keys()}")
    with self._lock:
        if args in self._cache:
            self._cache.move_to_end(args)
            self._hits += 1
            return self._cache[args]

    result = await asyncfunc(*args, **kwargs)

    with self._lock:
        self._cache[args] = result
        self._misses += 1
        if len(self._cache) > self._max_size:
            self._cache.popitem(last=False)

    return result

fsspec.caching.UpdatableLRU.acall = acall

async def _fetch_async(self, start: int, end: int) -> bytes:
    if start is None:
        start = 0
    if end is None:
        end = self.size
    if start >= self.size or start >= end:
        return b""

    # byte position -> block numbers
    start_block_number = start // self.blocksize
    end_block_number = end // self.blocksize

    fetch_future_block_number = None
    fetch_future = None
    with self._fetch_future_lock:
        # Background thread is running. Check we we can or must join it.
        if self._fetch_future is not None:
            assert self._fetch_future_block_number is not None
            if self._fetch_future.done():
                logger.info("BlockCache joined background fetch without waiting.")
                self._fetch_block_cached.add_key(
                    await self._fetch_future, self._fetch_future_block_number
                )
                # Cleanup the fetch variables. Done with fetching the block.
                self._fetch_future_block_number = None
                self._fetch_future = None
            else:
                # Must join if we need the block for the current fetch
                must_join = bool(
                    start_block_number
                    <= self._fetch_future_block_number
                    <= end_block_number
                )
                if must_join:
                    # Copy to the local variables to release lock
                    # before waiting for result
                    fetch_future_block_number = self._fetch_future_block_number
                    fetch_future = self._fetch_future

                    # Cleanup the fetch variables. Have a local copy.
                    self._fetch_future_block_number = None
                    self._fetch_future = None

    # Need to wait for the future for the current read
    if fetch_future is not None:
        logger.info("BlockCache waiting for background fetch.")
        # Wait until result and put it in cache
        self._fetch_block_cached.add_key(
            await fetch_future, fetch_future_block_number
        )

    # these are cached, so safe to do multiple calls for the same start and end.
    for block_number in range(start_block_number, end_block_number + 1):
        # self._fetch_block_cached(block_number)
        await self._fetch_block_cached.acall(self._fetch_block_async, block_number)

    # fetch next block in the background if nothing is running in the background,
    # the block is within file and it is not already cached
    end_block_plus_1 = end_block_number + 1
    with self._fetch_future_lock:
        if (
            self._fetch_future is None
            and end_block_plus_1 <= self.nblocks
            and not self._fetch_block_cached.is_key_cached(end_block_plus_1)
        ):
            self._fetch_future_block_number = end_block_plus_1
            self._fetch_future = asyncio.ensure_future(
                self._fetch_block_async(end_block_plus_1, "async")
            )

    return self._read_cache(
        start,
        end,
        start_block_number=start_block_number,
        end_block_number=end_block_number,
    )

fsspec.caching.BackgroundBlockCache._fetch_async = _fetch_async

async def _cleanup_async_future(self):
    with self._fetch_future_lock:
        if self._fetch_future is not None:
            self._fetch_future.cancel()
            self._fetch_future = None

fsspec.caching.BackgroundBlockCache._cleanup_async_future = _cleanup_async_future


async def read_bytes(url, start, end):
    my_id = random.randint(0, 100000)
    print(f"[{my_id:05}] Starting")
    print(f"[{my_id:05}] Starting read from {start} to {end}")
    try:
        async with aiohttp.ClientSession() as session:
            fs = fsspec.filesystem("http")
            async with session.get(url, headers={"Range": "bytes=0-0"}) as response:
                size = int(response.headers.get("Content-Range", "bytes 0-0/0").split("/")[-1])
            f = fsspec.implementations.http.HTTPFile(
                fs,
                url,
                session=session,
                size=size,
                mode="rb",
                block_size=1024*1024*50,
                cache_type='background'
            )
            f.seek(start)
            data = await f.read_async(end - start)
            print(f"[{my_id:05}] Read {len(data)} bytes")
            await f.cache._cleanup_async_future()
            return data
    finally:
        print(f"[{my_id:05}] Done")


async def batch_read():
    url = "https://ash-speed.hetzner.com/1GB.bin"
    offsets = [random.randint(0, 1024 * 1024 * 1024) for _ in range(5)]
    tasks = [read_bytes(url, o, o + 50 * 1024 * 1024) for o in offsets]
    results = await asyncio.gather(*tasks)
    print("Results:", [r[:15] for r in results])
    print([len(r) for r in results])

if __name__ == '__main__':
    asyncio.run(batch_read())

In terms of APIs, I think the most confusing thing (that this thread highlighted) is that the open_async method returns a AsyncStreamFile. To me, conceputally the streaming backends are a separable behavior from async handling, and it'd be nice if fsspec had a set of async/non-blocking APIs. (for my use-case, at the file-level, all I need are async version of open, read and close.

@martindurant
Copy link
Member

You are quite right, there are two distinct things going on:

  • streaming files (i.e., no random-access), possibly with unknown length
  • async methods on a file

It is possible that you would want both, which is what open_async currently does, but each is independently useful in some cases.

There are some implementations of async file-like objects (aiofiles and such) but nothing standard. You couldn't pass this thing to anything expecting a standard IOBase object. Is a file an iterator of chunks, or lines, or something else? Can multiple coroutines wait on the same file?

There is exactly one sync streaming file, in HTTPFileSystem for the case that the size cannot be determined and/or byte-range requests are not allowed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants