# File generated from our OpenAPI spec by Stainless. See CONTRIBUTING.md for details.

from __future__ import annotations

import io
import os
import logging
import builtins
from typing import List, overload
from pathlib import Path

import anyio
import httpx

from ... import _legacy_response
from .parts import (
    Parts,
    AsyncParts,
    PartsWithRawResponse,
    AsyncPartsWithRawResponse,
    PartsWithStreamingResponse,
    AsyncPartsWithStreamingResponse,
)
from ...types import FilePurpose, upload_create_params, upload_complete_params
from ..._types import NOT_GIVEN, Body, Query, Headers, NotGiven
from ..._utils import (
    maybe_transform,
    async_maybe_transform,
)
from ..._compat import cached_property
from ..._resource import SyncAPIResource, AsyncAPIResource
from ..._response import to_streamed_response_wrapper, async_to_streamed_response_wrapper
from ..._base_client import make_request_options
from ...types.upload import Upload
from ...types.file_purpose import FilePurpose

__all__ = ["Uploads", "AsyncUploads"]


# 64MB
DEFAULT_PART_SIZE = 64 * 1024 * 1024

log: logging.Logger = logging.getLogger(__name__)


class Uploads(SyncAPIResource):
    @cached_property
    def parts(self) -> Parts:
        return Parts(self._client)

    @cached_property
    def with_raw_response(self) -> UploadsWithRawResponse:
        return UploadsWithRawResponse(self)

    @cached_property
    def with_streaming_response(self) -> UploadsWithStreamingResponse:
        return UploadsWithStreamingResponse(self)

    @overload
    def upload_file_chunked(
        self,
        *,
        file: os.PathLike[str],
        mime_type: str,
        purpose: FilePurpose,
        bytes: int | None = None,
        part_size: int | None = None,
        md5: str | NotGiven = NOT_GIVEN,
    ) -> Upload:
        """Splits a file into multiple 64MB parts and uploads them sequentially."""

    @overload
    def upload_file_chunked(
        self,
        *,
        file: bytes,
        filename: str,
        bytes: int,
        mime_type: str,
        purpose: FilePurpose,
        part_size: int | None = None,
        md5: str | NotGiven = NOT_GIVEN,
    ) -> Upload:
        """Splits an in-memory file into multiple 64MB parts and uploads them sequentially."""

    def upload_file_chunked(
        self,
        *,
        file: os.PathLike[str] | bytes,
        mime_type: str,
        purpose: FilePurpose,
        filename: str | None = None,
        bytes: int | None = None,
        part_size: int | None = None,
        md5: str | NotGiven = NOT_GIVEN,
    ) -> Upload:
        """Splits the given file into multiple parts and uploads them sequentially.

        ```py
        from pathlib import Path

        client.uploads.upload_file(
            file=Path("my-paper.pdf"),
            mime_type="pdf",
            purpose="assistants",
        )
        ```
        """
        if isinstance(file, builtins.bytes):
            if filename is None:
                raise TypeError("The `filename` argument must be given for in-memory files")

            if bytes is None:
                raise TypeError("The `bytes` argument must be given for in-memory files")
        else:
            if not isinstance(file, Path):
                file = Path(file)

            if not filename:
                filename = file.name

            if bytes is None:
                bytes = file.stat().st_size

        upload = self.create(
            bytes=bytes,
            filename=filename,
            mime_type=mime_type,
            purpose=purpose,
        )

        part_ids: list[str] = []

        if part_size is None:
            part_size = DEFAULT_PART_SIZE

        if isinstance(file, builtins.bytes):
            buf: io.FileIO | io.BytesIO = io.BytesIO(file)
        else:
            buf = io.FileIO(file)

        try:
            while True:
                data = buf.read(part_size)
                if not data:
                    # EOF
                    break

                part = self.parts.create(upload_id=upload.id, data=data)
                log.info("Uploaded part %s for upload %s", part.id, upload.id)
                part_ids.append(part.id)
        except Exception:
            buf.close()
            raise

        return self.complete(upload_id=upload.id, part_ids=part_ids, md5=md5)

    def create(
        self,
        *,
        bytes: int,
        filename: str,
        mime_type: str,
        purpose: FilePurpose,
        # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs.
        # The extra values given here take precedence over values defined on the client or passed to this method.
        extra_headers: Headers | None = None,
        extra_query: Query | None = None,
        extra_body: Body | None = None,
        timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN,
    ) -> Upload:
        """
        Creates an intermediate
        [Upload](https://platform.openai.com/docs/api-reference/uploads/object) object
        that you can add
        [Parts](https://platform.openai.com/docs/api-reference/uploads/part-object) to.
        Currently, an Upload can accept at most 8 GB in total and expires after an hour
        after you create it.

        Once you complete the Upload, we will create a
        [File](https://platform.openai.com/docs/api-reference/files/object) object that
        contains all the parts you uploaded. This File is usable in the rest of our
        platform as a regular File object.

        For certain `purpose`s, the correct `mime_type` must be specified. Please refer
        to documentation for the supported MIME types for your use case:

        - [Assistants](https://platform.openai.com/docs/assistants/tools/file-search/supported-files)

        For guidance on the proper filename extensions for each purpose, please follow
        the documentation on
        [creating a File](https://platform.openai.com/docs/api-reference/files/create).

        Args:
          bytes: The number of bytes in the file you are uploading.

          filename: The name of the file to upload.

          mime_type: The MIME type of the file.

              This must fall within the supported MIME types for your file purpose. See the
              supported MIME types for assistants and vision.

          purpose: The intended purpose of the uploaded file.

              See the
              [documentation on File purposes](https://platform.openai.com/docs/api-reference/files/create#files-create-purpose).

          extra_headers: Send extra headers

          extra_query: Add additional query parameters to the request

          extra_body: Add additional JSON properties to the request

          timeout: Override the client-level default timeout for this request, in seconds
        """
        return self._post(
            "/uploads",
            body=maybe_transform(
                {
                    "bytes": bytes,
                    "filename": filename,
                    "mime_type": mime_type,
                    "purpose": purpose,
                },
                upload_create_params.UploadCreateParams,
            ),
            options=make_request_options(
                extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout
            ),
            cast_to=Upload,
        )

    def cancel(
        self,
        upload_id: str,
        *,
        # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs.
        # The extra values given here take precedence over values defined on the client or passed to this method.
        extra_headers: Headers | None = None,
        extra_query: Query | None = None,
        extra_body: Body | None = None,
        timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN,
    ) -> Upload:
        """Cancels the Upload.

        No Parts may be added after an Upload is cancelled.

        Args:
          extra_headers: Send extra headers

          extra_query: Add additional query parameters to the request

          extra_body: Add additional JSON properties to the request

          timeout: Override the client-level default timeout for this request, in seconds
        """
        if not upload_id:
            raise ValueError(f"Expected a non-empty value for `upload_id` but received {upload_id!r}")
        return self._post(
            f"/uploads/{upload_id}/cancel",
            options=make_request_options(
                extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout
            ),
            cast_to=Upload,
        )

    def complete(
        self,
        upload_id: str,
        *,
        part_ids: List[str],
        md5: str | NotGiven = NOT_GIVEN,
        # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs.
        # The extra values given here take precedence over values defined on the client or passed to this method.
        extra_headers: Headers | None = None,
        extra_query: Query | None = None,
        extra_body: Body | None = None,
        timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN,
    ) -> Upload:
        """
        Completes the
        [Upload](https://platform.openai.com/docs/api-reference/uploads/object).

        Within the returned Upload object, there is a nested
        [File](https://platform.openai.com/docs/api-reference/files/object) object that
        is ready to use in the rest of the platform.

        You can specify the order of the Parts by passing in an ordered list of the Part
        IDs.

        The number of bytes uploaded upon completion must match the number of bytes
        initially specified when creating the Upload object. No Parts may be added after
        an Upload is completed.

        Args:
          part_ids: The ordered list of Part IDs.

          md5: The optional md5 checksum for the file contents to verify if the bytes uploaded
              matches what you expect.

          extra_headers: Send extra headers

          extra_query: Add additional query parameters to the request

          extra_body: Add additional JSON properties to the request

          timeout: Override the client-level default timeout for this request, in seconds
        """
        if not upload_id:
            raise ValueError(f"Expected a non-empty value for `upload_id` but received {upload_id!r}")
        return self._post(
            f"/uploads/{upload_id}/complete",
            body=maybe_transform(
                {
                    "part_ids": part_ids,
                    "md5": md5,
                },
                upload_complete_params.UploadCompleteParams,
            ),
            options=make_request_options(
                extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout
            ),
            cast_to=Upload,
        )


class AsyncUploads(AsyncAPIResource):
    @cached_property
    def parts(self) -> AsyncParts:
        return AsyncParts(self._client)

    @cached_property
    def with_raw_response(self) -> AsyncUploadsWithRawResponse:
        return AsyncUploadsWithRawResponse(self)

    @cached_property
    def with_streaming_response(self) -> AsyncUploadsWithStreamingResponse:
        return AsyncUploadsWithStreamingResponse(self)

    @overload
    async def upload_file_chunked(
        self,
        *,
        file: os.PathLike[str],
        mime_type: str,
        purpose: FilePurpose,
        bytes: int | None = None,
        part_size: int | None = None,
        md5: str | NotGiven = NOT_GIVEN,
    ) -> Upload:
        """Splits a file into multiple 64MB parts and uploads them sequentially."""

    @overload
    async def upload_file_chunked(
        self,
        *,
        file: bytes,
        filename: str,
        bytes: int,
        mime_type: str,
        purpose: FilePurpose,
        part_size: int | None = None,
        md5: str | NotGiven = NOT_GIVEN,
    ) -> Upload:
        """Splits an in-memory file into multiple 64MB parts and uploads them sequentially."""

    async def upload_file_chunked(
        self,
        *,
        file: os.PathLike[str] | bytes,
        mime_type: str,
        purpose: FilePurpose,
        filename: str | None = None,
        bytes: int | None = None,
        part_size: int | None = None,
        md5: str | NotGiven = NOT_GIVEN,
    ) -> Upload:
        """Splits the given file into multiple parts and uploads them sequentially.

        ```py
        from pathlib import Path

        client.uploads.upload_file(
            file=Path("my-paper.pdf"),
            mime_type="pdf",
            purpose="assistants",
        )
        ```
        """
        if isinstance(file, builtins.bytes):
            if filename is None:
                raise TypeError("The `filename` argument must be given for in-memory files")

            if bytes is None:
                raise TypeError("The `bytes` argument must be given for in-memory files")
        else:
            if not isinstance(file, anyio.Path):
                file = anyio.Path(file)

            if not filename:
                filename = file.name

            if bytes is None:
                stat = await file.stat()
                bytes = stat.st_size

        upload = await self.create(
            bytes=bytes,
            filename=filename,
            mime_type=mime_type,
            purpose=purpose,
        )

        part_ids: list[str] = []

        if part_size is None:
            part_size = DEFAULT_PART_SIZE

        if isinstance(file, anyio.Path):
            fd = await file.open("rb")
            async with fd:
                while True:
                    data = await fd.read(part_size)
                    if not data:
                        # EOF
                        break

                    part = await self.parts.create(upload_id=upload.id, data=data)
                    log.info("Uploaded part %s for upload %s", part.id, upload.id)
                    part_ids.append(part.id)
        else:
            buf = io.BytesIO(file)

            try:
                while True:
                    data = buf.read(part_size)
                    if not data:
                        # EOF
                        break

                    part = await self.parts.create(upload_id=upload.id, data=data)
                    log.info("Uploaded part %s for upload %s", part.id, upload.id)
                    part_ids.append(part.id)
            except Exception:
                buf.close()
                raise

        return await self.complete(upload_id=upload.id, part_ids=part_ids, md5=md5)

    async def create(
        self,
        *,
        bytes: int,
        filename: str,
        mime_type: str,
        purpose: FilePurpose,
        # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs.
        # The extra values given here take precedence over values defined on the client or passed to this method.
        extra_headers: Headers | None = None,
        extra_query: Query | None = None,
        extra_body: Body | None = None,
        timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN,
    ) -> Upload:
        """
        Creates an intermediate
        [Upload](https://platform.openai.com/docs/api-reference/uploads/object) object
        that you can add
        [Parts](https://platform.openai.com/docs/api-reference/uploads/part-object) to.
        Currently, an Upload can accept at most 8 GB in total and expires after an hour
        after you create it.

        Once you complete the Upload, we will create a
        [File](https://platform.openai.com/docs/api-reference/files/object) object that
        contains all the parts you uploaded. This File is usable in the rest of our
        platform as a regular File object.

        For certain `purpose`s, the correct `mime_type` must be specified. Please refer
        to documentation for the supported MIME types for your use case:

        - [Assistants](https://platform.openai.com/docs/assistants/tools/file-search/supported-files)

        For guidance on the proper filename extensions for each purpose, please follow
        the documentation on
        [creating a File](https://platform.openai.com/docs/api-reference/files/create).

        Args:
          bytes: The number of bytes in the file you are uploading.

          filename: The name of the file to upload.

          mime_type: The MIME type of the file.

              This must fall within the supported MIME types for your file purpose. See the
              supported MIME types for assistants and vision.

          purpose: The intended purpose of the uploaded file.

              See the
              [documentation on File purposes](https://platform.openai.com/docs/api-reference/files/create#files-create-purpose).

          extra_headers: Send extra headers

          extra_query: Add additional query parameters to the request

          extra_body: Add additional JSON properties to the request

          timeout: Override the client-level default timeout for this request, in seconds
        """
        return await self._post(
            "/uploads",
            body=await async_maybe_transform(
                {
                    "bytes": bytes,
                    "filename": filename,
                    "mime_type": mime_type,
                    "purpose": purpose,
                },
                upload_create_params.UploadCreateParams,
            ),
            options=make_request_options(
                extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout
            ),
            cast_to=Upload,
        )

    async def cancel(
        self,
        upload_id: str,
        *,
        # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs.
        # The extra values given here take precedence over values defined on the client or passed to this method.
        extra_headers: Headers | None = None,
        extra_query: Query | None = None,
        extra_body: Body | None = None,
        timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN,
    ) -> Upload:
        """Cancels the Upload.

        No Parts may be added after an Upload is cancelled.

        Args:
          extra_headers: Send extra headers

          extra_query: Add additional query parameters to the request

          extra_body: Add additional JSON properties to the request

          timeout: Override the client-level default timeout for this request, in seconds
        """
        if not upload_id:
            raise ValueError(f"Expected a non-empty value for `upload_id` but received {upload_id!r}")
        return await self._post(
            f"/uploads/{upload_id}/cancel",
            options=make_request_options(
                extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout
            ),
            cast_to=Upload,
        )

    async def complete(
        self,
        upload_id: str,
        *,
        part_ids: List[str],
        md5: str | NotGiven = NOT_GIVEN,
        # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs.
        # The extra values given here take precedence over values defined on the client or passed to this method.
        extra_headers: Headers | None = None,
        extra_query: Query | None = None,
        extra_body: Body | None = None,
        timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN,
    ) -> Upload:
        """
        Completes the
        [Upload](https://platform.openai.com/docs/api-reference/uploads/object).

        Within the returned Upload object, there is a nested
        [File](https://platform.openai.com/docs/api-reference/files/object) object that
        is ready to use in the rest of the platform.

        You can specify the order of the Parts by passing in an ordered list of the Part
        IDs.

        The number of bytes uploaded upon completion must match the number of bytes
        initially specified when creating the Upload object. No Parts may be added after
        an Upload is completed.

        Args:
          part_ids: The ordered list of Part IDs.

          md5: The optional md5 checksum for the file contents to verify if the bytes uploaded
              matches what you expect.

          extra_headers: Send extra headers

          extra_query: Add additional query parameters to the request

          extra_body: Add additional JSON properties to the request

          timeout: Override the client-level default timeout for this request, in seconds
        """
        if not upload_id:
            raise ValueError(f"Expected a non-empty value for `upload_id` but received {upload_id!r}")
        return await self._post(
            f"/uploads/{upload_id}/complete",
            body=await async_maybe_transform(
                {
                    "part_ids": part_ids,
                    "md5": md5,
                },
                upload_complete_params.UploadCompleteParams,
            ),
            options=make_request_options(
                extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout
            ),
            cast_to=Upload,
        )


class UploadsWithRawResponse:
    def __init__(self, uploads: Uploads) -> None:
        self._uploads = uploads

        self.create = _legacy_response.to_raw_response_wrapper(
            uploads.create,
        )
        self.cancel = _legacy_response.to_raw_response_wrapper(
            uploads.cancel,
        )
        self.complete = _legacy_response.to_raw_response_wrapper(
            uploads.complete,
        )

    @cached_property
    def parts(self) -> PartsWithRawResponse:
        return PartsWithRawResponse(self._uploads.parts)


class AsyncUploadsWithRawResponse:
    def __init__(self, uploads: AsyncUploads) -> None:
        self._uploads = uploads

        self.create = _legacy_response.async_to_raw_response_wrapper(
            uploads.create,
        )
        self.cancel = _legacy_response.async_to_raw_response_wrapper(
            uploads.cancel,
        )
        self.complete = _legacy_response.async_to_raw_response_wrapper(
            uploads.complete,
        )

    @cached_property
    def parts(self) -> AsyncPartsWithRawResponse:
        return AsyncPartsWithRawResponse(self._uploads.parts)


class UploadsWithStreamingResponse:
    def __init__(self, uploads: Uploads) -> None:
        self._uploads = uploads

        self.create = to_streamed_response_wrapper(
            uploads.create,
        )
        self.cancel = to_streamed_response_wrapper(
            uploads.cancel,
        )
        self.complete = to_streamed_response_wrapper(
            uploads.complete,
        )

    @cached_property
    def parts(self) -> PartsWithStreamingResponse:
        return PartsWithStreamingResponse(self._uploads.parts)


class AsyncUploadsWithStreamingResponse:
    def __init__(self, uploads: AsyncUploads) -> None:
        self._uploads = uploads

        self.create = async_to_streamed_response_wrapper(
            uploads.create,
        )
        self.cancel = async_to_streamed_response_wrapper(
            uploads.cancel,
        )
        self.complete = async_to_streamed_response_wrapper(
            uploads.complete,
        )

    @cached_property
    def parts(self) -> AsyncPartsWithStreamingResponse:
        return AsyncPartsWithStreamingResponse(self._uploads.parts)