Skip to content

Commit bdce982

Browse files
committed
Add Cloud Flow Task endpoint
1 parent eaedc29 commit bdce982

5 files changed

Lines changed: 130 additions & 0 deletions

File tree

tableauserverclient/models/task_item.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ class Type:
1818
_TASK_TYPE_MAPPING = {
1919
"RefreshExtractTask": Type.ExtractRefresh,
2020
"MaterializeViewsTask": Type.DataAcceleration,
21+
"RunFlowTask": Type.RunFlow,
2122
}
2223

2324
def __init__(
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import logging
2+
from typing import List, Optional, Tuple, TYPE_CHECKING
3+
4+
from tableauserverclient.server.endpoint.endpoint import Endpoint, api
5+
from tableauserverclient.server.endpoint.exceptions import MissingRequiredFieldError
6+
from tableauserverclient.models import TaskItem, PaginationItem
7+
from tableauserverclient.server import RequestFactory
8+
9+
from tableauserverclient.helpers.logging import logger
10+
11+
if TYPE_CHECKING:
12+
from tableauserverclient.server.request_options import RequestOptions
13+
14+
15+
class FlowTasks(Endpoint):
16+
@property
17+
def baseurl(self) -> str:
18+
return "{0}/sites/{1}/tasks/flows".format(self.parent_srv.baseurl, self.parent_srv.site_id)
19+
20+
@api(version="3.22")
21+
def create(self, flow_item: TaskItem) -> TaskItem:
22+
if not flow_item:
23+
error = "No flow provided"
24+
raise ValueError(error)
25+
logger.info("Creating an flow task %s", flow_item)
26+
url = self.baseurl
27+
create_req = RequestFactory.Task.create_flow_task_req(flow_item)
28+
server_response = self.post_request(url, create_req)
29+
return server_response.content

tableauserverclient/server/request_factory.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1113,6 +1113,43 @@ def create_extract_req(self, xml_request: ET.Element, extract_item: "TaskItem")
11131113

11141114
return ET.tostring(xml_request)
11151115

1116+
class FlowTaskRequest(object):
1117+
@_tsrequest_wrapped
1118+
def run_req(self, xml_request, task_item):
1119+
# Send an empty tsRequest
1120+
pass
1121+
1122+
@_tsrequest_wrapped
1123+
def create_flow_task_req(self, xml_request: ET.Element, flow_item: "TaskItem") -> bytes:
1124+
flow_element = ET.SubElement(xml_request, "runFlow")
1125+
1126+
# Main attributes
1127+
flow_element.attrib["type"] = flow_item.task_type
1128+
1129+
if flow_item.target is not None:
1130+
target_element = ET.SubElement(flow_element, flow_item.target.type)
1131+
target_element.attrib["id"] = flow_item.target.id
1132+
1133+
if flow_item.schedule_item is None:
1134+
return ET.tostring(xml_request)
1135+
1136+
# Schedule attributes
1137+
schedule_element = ET.SubElement(xml_request, "schedule")
1138+
1139+
interval_item = flow_item.schedule_item.interval_item
1140+
schedule_element.attrib["frequency"] = interval_item._frequency
1141+
frequency_element = ET.SubElement(schedule_element, "frequencyDetails")
1142+
frequency_element.attrib["start"] = str(interval_item.start_time)
1143+
if hasattr(interval_item, "end_time") and interval_item.end_time is not None:
1144+
frequency_element.attrib["end"] = str(interval_item.end_time)
1145+
if hasattr(interval_item, "interval") and interval_item.interval:
1146+
intervals_element = ET.SubElement(frequency_element, "intervals")
1147+
for interval in interval_item._interval_type_pairs(): # type: ignore
1148+
expression, value = interval
1149+
single_interval_element = ET.SubElement(intervals_element, "interval")
1150+
single_interval_element.attrib[expression] = value
1151+
1152+
return ET.tostring(xml_request)
11161153

11171154
class SubscriptionRequest(object):
11181155
@_tsrequest_wrapped

tableauserverclient/server/server.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
Databases,
2626
Tables,
2727
Flows,
28+
FlowTasks,
2829
Webhooks,
2930
DataAccelerationReport,
3031
Favorites,
@@ -82,6 +83,7 @@ def __init__(self, server_address, use_server_version=False, http_options=None,
8283
self.datasources = Datasources(self)
8384
self.favorites = Favorites(self)
8485
self.flows = Flows(self)
86+
self.flow_tasks = FlowTasks(self)
8587
self.projects = Projects(self)
8688
self.schedules = Schedules(self)
8789
self.server_info = ServerInfo(self)

test/test_flowtask.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
import os
2+
import unittest
3+
from datetime import time
4+
from pathlib import Path
5+
6+
import requests_mock
7+
8+
import tableauserverclient as TSC
9+
from tableauserverclient.datetime_helpers import parse_datetime
10+
from tableauserverclient.models.task_item import TaskItem
11+
12+
TEST_ASSET_DIR = Path(__file__).parent / "assets"
13+
14+
GET_XML_NO_WORKBOOK = os.path.join(TEST_ASSET_DIR, "tasks_no_workbook_or_datasource.xml")
15+
GET_XML_WITH_WORKBOOK = os.path.join(TEST_ASSET_DIR, "tasks_with_workbook.xml")
16+
GET_XML_WITH_DATASOURCE = os.path.join(TEST_ASSET_DIR, "tasks_with_datasource.xml")
17+
GET_XML_RUN_NOW_RESPONSE = os.path.join(TEST_ASSET_DIR, "tasks_run_now_response.xml")
18+
GET_XML_CREATE_TASK_RESPONSE = os.path.join(TEST_ASSET_DIR, "tasks_create_extract_task.xml")
19+
GET_XML_WITHOUT_SCHEDULE = TEST_ASSET_DIR / "tasks_without_schedule.xml"
20+
GET_XML_WITH_INTERVAL = TEST_ASSET_DIR / "tasks_with_interval.xml"
21+
22+
GET_XML_CREATE_FLOW_TASK_RESPONSE = os.path.join(TEST_ASSET_DIR, "tasks_create_flow_task.xml")
23+
24+
25+
26+
class TaskTests(unittest.TestCase):
27+
def setUp(self):
28+
self.server = TSC.Server("http://test", False)
29+
self.server.version = "3.22"
30+
31+
# Fake Signin
32+
self.server._site_id = "dad65087-b08b-4603-af4e-2887b8aafc67"
33+
self.server._auth_token = "j80k54ll2lfMZ0tv97mlPvvSCRyD0DOM"
34+
35+
# default task type is extractRefreshes TODO change this
36+
# self.baseurl = "{}/{}".format(self.server.tasks.baseurl, "extractRefreshes")
37+
self.baseurl = self.server.flow_tasks.baseurl
38+
39+
def test_create_flow_task(self):
40+
monthly_interval = TSC.MonthlyInterval(start_time=time(23, 30), interval_value=15)
41+
monthly_schedule = TSC.ScheduleItem(
42+
None,
43+
None,
44+
None,
45+
None,
46+
monthly_interval,
47+
)
48+
target_item = TSC.Target("flow_id", "flow")
49+
50+
task = TaskItem(schedule_item=monthly_schedule, target=target_item)
51+
# task = TaskItem(None, "FullRefresh", None, schedule_item=monthly_schedule, target=target_item)
52+
53+
with open(GET_XML_CREATE_FLOW_TASK_RESPONSE, "rb") as f:
54+
response_xml = f.read().decode("utf-8")
55+
with requests_mock.mock() as m:
56+
m.post("{}".format(self.baseurl), text=response_xml)
57+
create_response_content = self.server.flow_tasks.create(task).decode("utf-8")
58+
59+
self.assertTrue("task_id" in create_response_content)
60+
self.assertTrue("flow_id" in create_response_content)
61+
#self.assertTrue("FullRefresh" in create_response_content)

0 commit comments

Comments
 (0)