-
Notifications
You must be signed in to change notification settings - Fork 16
Expand file tree
/
Copy pathwebhook_collection.py
More file actions
180 lines (150 loc) · 7.56 KB
/
webhook_collection.py
File metadata and controls
180 lines (150 loc) · 7.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
from __future__ import annotations
from typing import TYPE_CHECKING, Any
from apify_client._utils import filter_out_none_values_recursively
from apify_client.clients.base import ResourceCollectionClient, ResourceCollectionClientAsync
from apify_client.clients.resource_clients.webhook import get_webhook_representation
if TYPE_CHECKING:
from apify_shared.consts import WebhookEventType
from apify_client.clients.base.resource_collection_client import ListPage
class WebhookCollectionClient(ResourceCollectionClient):
"""Sub-client for manipulating webhooks."""
def __init__(self, *args: Any, **kwargs: Any) -> None:
resource_path = kwargs.pop('resource_path', 'webhooks')
super().__init__(*args, resource_path=resource_path, **kwargs)
def list(
self,
*,
limit: int | None = None,
offset: int | None = None,
desc: bool | None = None,
) -> ListPage[dict]:
"""List the available webhooks.
https://docs.apify.com/api/v2#/reference/webhooks/webhook-collection/get-list-of-webhooks
Args:
limit: How many webhooks to retrieve.
offset: What webhook to include as first when retrieving the list.
desc: Whether to sort the webhooks in descending order based on their date of creation.
Returns:
The list of available webhooks matching the specified filters.
"""
return self._list(limit=limit, offset=offset, desc=desc)
def create(
self,
*,
event_types: list[WebhookEventType], # ty: ignore[invalid-type-form]
request_url: str,
payload_template: str | None = None,
headers_template: str | None = None,
actor_id: str | None = None,
actor_task_id: str | None = None,
actor_run_id: str | None = None,
ignore_ssl_errors: bool | None = None,
do_not_retry: bool | None = None,
idempotency_key: str | None = None,
is_ad_hoc: bool | None = None,
) -> dict:
"""Create a new webhook.
You have to specify exactly one out of actor_id, actor_task_id or actor_run_id.
https://docs.apify.com/api/v2#/reference/webhooks/webhook-collection/create-webhook
Args:
event_types: List of event types that should trigger the webhook. At least one is required.
request_url: URL that will be invoked once the webhook is triggered.
payload_template: Specification of the payload that will be sent to request_url.
headers_template: Headers that will be sent to the request_url.
actor_id: Id of the Actor whose runs should trigger the webhook.
actor_task_id: Id of the Actor task whose runs should trigger the webhook.
actor_run_id: Id of the Actor run which should trigger the webhook.
ignore_ssl_errors: Whether the webhook should ignore SSL errors returned by request_url.
do_not_retry: Whether the webhook should retry sending the payload to request_url upon failure.
idempotency_key: A unique identifier of a webhook. You can use it to ensure that you won't create
the same webhook multiple times.
is_ad_hoc: Set to True if you want the webhook to be triggered only the first time the condition
is fulfilled. Only applicable when actor_run_id is filled.
Returns:
The created webhook.
"""
webhook_representation = get_webhook_representation(
event_types=event_types,
request_url=request_url,
payload_template=payload_template,
headers_template=headers_template,
actor_id=actor_id,
actor_task_id=actor_task_id,
actor_run_id=actor_run_id,
ignore_ssl_errors=ignore_ssl_errors,
do_not_retry=do_not_retry,
idempotency_key=idempotency_key,
is_ad_hoc=is_ad_hoc,
)
return self._create(filter_out_none_values_recursively(webhook_representation))
class WebhookCollectionClientAsync(ResourceCollectionClientAsync):
"""Async sub-client for manipulating webhooks."""
def __init__(self, *args: Any, **kwargs: Any) -> None:
resource_path = kwargs.pop('resource_path', 'webhooks')
super().__init__(*args, resource_path=resource_path, **kwargs)
async def list(
self,
*,
limit: int | None = None,
offset: int | None = None,
desc: bool | None = None,
) -> ListPage[dict]:
"""List the available webhooks.
https://docs.apify.com/api/v2#/reference/webhooks/webhook-collection/get-list-of-webhooks
Args:
limit: How many webhooks to retrieve.
offset: What webhook to include as first when retrieving the list.
desc: Whether to sort the webhooks in descending order based on their date of creation.
Returns:
The list of available webhooks matching the specified filters.
"""
return await self._list(limit=limit, offset=offset, desc=desc)
async def create(
self,
*,
event_types: list[WebhookEventType], # ty: ignore[invalid-type-form]
request_url: str,
payload_template: str | None = None,
headers_template: str | None = None,
actor_id: str | None = None,
actor_task_id: str | None = None,
actor_run_id: str | None = None,
ignore_ssl_errors: bool | None = None,
do_not_retry: bool | None = None,
idempotency_key: str | None = None,
is_ad_hoc: bool | None = None,
) -> dict:
"""Create a new webhook.
You have to specify exactly one out of actor_id, actor_task_id or actor_run_id.
https://docs.apify.com/api/v2#/reference/webhooks/webhook-collection/create-webhook
Args:
event_types: List of event types that should trigger the webhook. At least one is required.
request_url: URL that will be invoked once the webhook is triggered.
payload_template: Specification of the payload that will be sent to request_url.
headers_template: Headers that will be sent to the request_url.
actor_id: Id of the Actor whose runs should trigger the webhook.
actor_task_id: Id of the Actor task whose runs should trigger the webhook.
actor_run_id: Id of the Actor run which should trigger the webhook.
ignore_ssl_errors: Whether the webhook should ignore SSL errors returned by request_url.
do_not_retry: Whether the webhook should retry sending the payload to request_url upon failure.
idempotency_key: A unique identifier of a webhook. You can use it to ensure that you won't create
the same webhook multiple times.
is_ad_hoc: Set to True if you want the webhook to be triggered only the first time the condition
is fulfilled. Only applicable when actor_run_id is filled.
Returns:
The created webhook.
"""
webhook_representation = get_webhook_representation(
event_types=event_types,
request_url=request_url,
payload_template=payload_template,
headers_template=headers_template,
actor_id=actor_id,
actor_task_id=actor_task_id,
actor_run_id=actor_run_id,
ignore_ssl_errors=ignore_ssl_errors,
do_not_retry=do_not_retry,
idempotency_key=idempotency_key,
is_ad_hoc=is_ad_hoc,
)
return await self._create(filter_out_none_values_recursively(webhook_representation))