openai/openai-python

Public

mirrored fromhttps://github.com/openai/openai-pythonAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
dev/hayden/bedrock-aws-auth

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

tests/api_resources/admin/organization/test_audit_logs.py

117lines · modecode

1# File generated from our OpenAPI spec by Stainless. See CONTRIBUTING.md for details.
2
3from __future__ import annotations
4
5import os
6from typing import Any, cast
7
8import pytest
9
10from openai import OpenAI, AsyncOpenAI
11from tests.utils import assert_matches_type
12from openai.pagination import SyncConversationCursorPage, AsyncConversationCursorPage
13from openai.types.admin.organization import AuditLogListResponse
14
15base_url = os.environ.get("TEST_API_BASE_URL", "http://127.0.0.1:4010")
16
17
18class TestAuditLogs:
19 parametrize = pytest.mark.parametrize("client", [False, True], indirect=True, ids=["loose", "strict"])
20
21 @parametrize
22 def test_method_list(self, client: OpenAI) -> None:
23 audit_log = client.admin.organization.audit_logs.list()
24 assert_matches_type(SyncConversationCursorPage[AuditLogListResponse], audit_log, path=["response"])
25
26 @parametrize
27 def test_method_list_with_all_params(self, client: OpenAI) -> None:
28 audit_log = client.admin.organization.audit_logs.list(
29 actor_emails=["string"],
30 actor_ids=["string"],
31 after="after",
32 before="before",
33 effective_at={
34 "gt": 0,
35 "gte": 0,
36 "lt": 0,
37 "lte": 0,
38 },
39 event_types=["api_key.created"],
40 limit=0,
41 project_ids=["string"],
42 resource_ids=["string"],
43 tenant_only=True,
44 )
45 assert_matches_type(SyncConversationCursorPage[AuditLogListResponse], audit_log, path=["response"])
46
47 @parametrize
48 def test_raw_response_list(self, client: OpenAI) -> None:
49 response = client.admin.organization.audit_logs.with_raw_response.list()
50
51 assert response.is_closed is True
52 assert response.http_request.headers.get("X-Stainless-Lang") == "python"
53 audit_log = response.parse()
54 assert_matches_type(SyncConversationCursorPage[AuditLogListResponse], audit_log, path=["response"])
55
56 @parametrize
57 def test_streaming_response_list(self, client: OpenAI) -> None:
58 with client.admin.organization.audit_logs.with_streaming_response.list() as response:
59 assert not response.is_closed
60 assert response.http_request.headers.get("X-Stainless-Lang") == "python"
61
62 audit_log = response.parse()
63 assert_matches_type(SyncConversationCursorPage[AuditLogListResponse], audit_log, path=["response"])
64
65 assert cast(Any, response.is_closed) is True
66
67
68class TestAsyncAuditLogs:
69 parametrize = pytest.mark.parametrize(
70 "async_client", [False, True, {"http_client": "aiohttp"}], indirect=True, ids=["loose", "strict", "aiohttp"]
71 )
72
73 @parametrize
74 async def test_method_list(self, async_client: AsyncOpenAI) -> None:
75 audit_log = await async_client.admin.organization.audit_logs.list()
76 assert_matches_type(AsyncConversationCursorPage[AuditLogListResponse], audit_log, path=["response"])
77
78 @parametrize
79 async def test_method_list_with_all_params(self, async_client: AsyncOpenAI) -> None:
80 audit_log = await async_client.admin.organization.audit_logs.list(
81 actor_emails=["string"],
82 actor_ids=["string"],
83 after="after",
84 before="before",
85 effective_at={
86 "gt": 0,
87 "gte": 0,
88 "lt": 0,
89 "lte": 0,
90 },
91 event_types=["api_key.created"],
92 limit=0,
93 project_ids=["string"],
94 resource_ids=["string"],
95 tenant_only=True,
96 )
97 assert_matches_type(AsyncConversationCursorPage[AuditLogListResponse], audit_log, path=["response"])
98
99 @parametrize
100 async def test_raw_response_list(self, async_client: AsyncOpenAI) -> None:
101 response = await async_client.admin.organization.audit_logs.with_raw_response.list()
102
103 assert response.is_closed is True
104 assert response.http_request.headers.get("X-Stainless-Lang") == "python"
105 audit_log = response.parse()
106 assert_matches_type(AsyncConversationCursorPage[AuditLogListResponse], audit_log, path=["response"])
107
108 @parametrize
109 async def test_streaming_response_list(self, async_client: AsyncOpenAI) -> None:
110 async with async_client.admin.organization.audit_logs.with_streaming_response.list() as response:
111 assert not response.is_closed
112 assert response.http_request.headers.get("X-Stainless-Lang") == "python"
113
114 audit_log = await response.parse()
115 assert_matches_type(AsyncConversationCursorPage[AuditLogListResponse], audit_log, path=["response"])
116
117 assert cast(Any, response.is_closed) is True
118