API Reference

FetchFox

Source code in src/fetchfox_sdk/client.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
class FetchFox:
    def __init__(self,
            api_key: Optional[str] = None, host: str = "https://fetchfox.ai",
            quiet=False):
        """Initialize the FetchFox SDK.

        You may also provide an API key in the environment variable `FETCHFOX_API_KEY`.

        Args:
            api_key: Your FetchFox API key.  Overrides the environment variable.
            host: API host URL (defaults to production)
            quiet: set to True to suppress printing
        """
        self.base_url = urljoin(host, _API_PREFIX)

        self.api_key = api_key
        if self.api_key is None:
            self.api_key = os.environ.get("FETCHFOX_API_KEY")

        if not self.api_key:
            raise ValueError(
                "API key must be provided either as argument or "
                "in FETCHFOX_API_KEY environment variable")

        self.headers = {
            'Content-Type': 'application/json',
            'Authorization': f'Bearer: {self.api_key}'
        }

        self.quiet = quiet
        self._executor = ThreadPoolExecutor(max_workers=1)
        # TODO: this needs to be changed to support concurrent job polling,
        # but I am setting it to 1 right now as a sanity-check


    def _request(self, method: str, path: str, json_data: Optional[dict] = None,
                    params: Optional[dict] = None) -> dict:
        """Make an API request.

        Args:
            method: HTTP method
            path: API path
            json_data: Optional JSON body
            params: Optional query string parameters
        """
        url = urljoin(self.base_url, path)

        response = requests.request(
            method,
            url,
            headers=self.headers,
            json=json_data,
            params=params,
            timeout=(30,30)
        )

        response.raise_for_status()
        body = response.json()

        logger.debug(
            f"Response from %s %s:\n%s  at %s",
            method, path, pformat(body), datetime.now())
        return body

    def _nqprint(self, *args, **kwargs):
        if not self.quiet:
            print(*args, **kwargs)

    def _workflow(self, url_or_urls: Union[str, List[str]] = None) -> "Workflow":
        """Create a new workflow using this SDK instance.

        Examples of how to use a workflow:

        ```
        city_pages = fox \
            .workflow("https://locations.traderjoes.com/pa/") \
            .extract(
                item_template = {
                    "url": "Find me all the URLs for the city directories"
                }
            )
        ```

        A workflow is kind of like a Django QuerySet.  It will not be executed
        until you attempt to use the results.

        ```
        list_of_city_pages = list(city_pages)
        # This would run the workflow and give you a list of items like:
            {'url': 'https://....'}
        ```

        You could export those results to a file:
        ```
        city_pages.export("city_urls.jsonl")
        city_pages.export("city_urls.csv")
        ```

        And then you could create a new workflow (or two) that use those results:

        ```
        store_info = city_pages.extract(
            item_template = {
                "store_address": "find me the address of the store",
                "store_number": "Find me the number of the store (it's in parentheses)",
                "store_phone": "Find me the phone number of the store"
                }
        )

        store_urls = city_pages.extract(
            item_template = {
                "url": "Find me the URLs of Store detail pages."
            }
        )
        ```

        In the above snippets, the `city_pages` workflow was only ever executed
        once.

        Optionally, a URL and/or params may be passed here to initialize
        the workflow with them.

        Workflow parameters are given in a dictionary.  E.g. if your workflow
        has a `{{state_name}}` parameter, you might pass:

            { 'state_name': 'Alaska' }

        or perhaps

            { 'state_name': ['Alaska', 'Hawaii'] }

        if you wish to run the workflow for both states and collect the results.

        Args:
            url: URL to start from
            params: Workflow parameters.
        """
        w = Workflow(self)
        if url_or_urls:
            w = w.init(url_or_urls)
        # if params:
        #     w = w.configure_params(params)

        return w

    def workflow_from_json(self, json_workflow) -> "Workflow":
        """Given a JSON string, such as you can generate in the wizard at
        https://fetchfox.ai, create a workflow from it.

        Once created, it can be used like a regular workflow.

        Args:
            json_workflow: This must be a valid JSON string that represents a Fetchfox Workflow.  You should not usually try to write these manually, but simply copy-paste from the web interface.
        """
        return self._workflow_from_dict(json.loads(json_workflow))

    def _workflow_from_dict(self, workflow_dict):
        w = Workflow(self)
        w._workflow = workflow_dict
        return w

    def workflow_by_id(self, workflow_id) -> "Workflow":
        """Use a public workflow ID

        Something like fox.workflow_by_id(ID).configure_params({state:"AK"}).export("blah.csv")

        """
        workflow_json = self._get_workflow(workflow_id)
        return self.workflow_from_json(workflow_json)

    def _register_workflow(self, workflow: Workflow) -> str:
        """Create a new workflow.

        Args:
            workflow: Workflow object

        Returns:
            Workflow ID
        """
        response = self._request('POST', 'workflows', workflow.to_dict())

        # NOTE: If we need to return anything else here, we should keep this
        # default behavior, but add an optional kwarg so "full_response=True"
        # can be supplied, and then we return everything
        return response['id']

    def _get_workflows(self) -> list:
        """Get workflows

        Returns:
            List of workflows
        """
        response = self._request("GET", "workflows")

        # NOTE: Should we return Workflow objects intead?
        return response['results']

    def _get_workflow(self, id) -> dict:
        """Get a registered workflow by ID."""
        response = self._request("GET", f"workflow/{id}")
        return response

    def _run_workflow(self, workflow_id: Optional[str] = None,
                    workflow: Optional[Workflow] = None,
                    params: Optional[dict] = None) -> str:
        """Run a workflow. Either provide the ID of a registered workflow,
        or provide a workflow object (which will be registered
        automatically, for convenience).

        You can browse https://fetchfox.ai to find publicly available workflows
        authored by others.  Copy the workflow ID and use it here.  Often,
        in this case, you will also want to provide parameters.

        Args:
            workflow_id: ID of an existing workflow to run
            workflow: A Workflow object to register and run
            params: Optional parameters for the workflow

        Returns:
            Job ID

        Raises:
            ValueError: If neither workflow_id nor workflow is provided
        """
        if workflow_id is None and workflow is None:
            raise ValueError(
                "Either workflow_id or workflow must be provided")

        if workflow_id is not None and workflow is not None:
            raise ValueError(
                "Provide only a workflow or a workflow_id, not both.")

        if workflow is not None and not isinstance(workflow, Workflow):
            raise ValueError(
                "The workflow argument must be a fetchfox_sdk.Workflow")
        if workflow_id and not isinstance(workflow_id, str):
            raise ValueError(
                "The workflow_id argument must be a string "
                "representing a registered workflow's ID")

        if params is not None:
            raise NotImplementedError("Cannot pass params to workflows yet")
            # TODO:
            #   It sounds like these might be passed in the const/init step?
            #   Or, maybe they need to go in as a dictionary on the side?
            # TODO:
            #   https://docs.google.com/document/d/17ieru_HfU3jXBilcZqL1Ksf27rsVPvOIQ8uxmHi2aeE/edit?disco=AAABdjyFjgw
            #   allow list-expansion here like above, pretty cool

        if workflow_id is None:
            workflow_id = self._register_workflow(workflow) # type: ignore
            logger.info("Registered new workflow with id: %s", workflow_id)

        #response = self._request('POST', f'workflows/{workflow_id}/run', params or {})
        response = self._request('POST', f'workflows/{workflow_id}/run')

        # NOTE: If we need to return anything else here, we should keep this
        # default behavior, but add an optional kwarg so "full_response=True"
        # can be supplied, and then we return everything
        return response['jobId']

    def _get_job_status(self, job_id: str) -> dict:
        """Get the status and results of a job.  Returns partial results before
        eventually returning the full results.

        When job_status['done'] == True, the full results are present in
        response['results']['items'].

        If you want to manage your own polling, you can use this instead of
        await_job_completion()

        NOTE: Jobs are not created immediately after you call run_workflow().
        The status will not be available until the job is scheduled, so this
        will 404 initially.
        """
        return self._request('GET', f'jobs/{job_id}')

    def _poll_status_once(self, job_id):
        """Poll until we get one status response.  This may be more than one poll,
        if it is the first one, since the job will 404 for a while before
        it is scheduled."""
        MAX_WAIT_FOR_JOB_ALIVE_MINUTES = 5 #TODO: reasonable?
        started_waiting_for_job_dt = None
        while True:
            try:
                status = self._get_job_status(job_id)
                self._nqprint(".", end="")
                sys.stdout.flush()

                #TODO print partial status?

                return status
            except requests.exceptions.HTTPError as e:
                if e.response.status_code in [404, 500]:
                    self._nqprint("x", end="")
                    sys.stdout.flush()
                    logger.info("Waiting for job %s to be scheduled.", job_id)

                    if started_waiting_for_job_dt is None:
                        started_waiting_for_job_dt = datetime.now()
                    else:
                        waited = datetime.now() - started_waiting_for_job_dt
                        if waited > timedelta(minutes=MAX_WAIT_FOR_JOB_ALIVE_MINUTES):
                            raise RuntimeError(
                                f"Job {job_id} is taking unusually long to schedule.")

                else:
                    raise

    def _cleanup_job_result_item(self, item):
        filtered_item = {
            k: v
            for k, v
            in item.items()
            if not k.startswith('_')
        }

        # TODO: What should we be doing with `_url`?
        # # Keep _url if we have no other keys
        # if not filtered_item and '_url' in item:
        filtered_item['_url'] = item['_url']
        return filtered_item

    def _job_result_items_gen(self, job_id):
        """Yield new result items as they arrive."""
        self._nqprint(f"Streaming results from: [{job_id}]: ")

        seen_ids = set() # We need to track which have been yielded already

        MAX_WAIT_FOR_CHANGE_MINUTES = 5
        # Job will be assumed done/stalled after this much time passes without
        # a new result coming in.
        first_response_dt = None
        results_changed_dt = None

        while True:
            response = self._poll_status_once(job_id)
            # The above will block until we get one successful response
            if not first_response_dt:
                first_response_dt = datetime.now()

            # We are considering only the result_items here, not partials
            if 'items' not in response['results']:
                waited_dur = datetime.now() - first_response_dt
                if waited_dur > timedelta(minutes=MAX_WAIT_FOR_CHANGE_MINUTES):
                    raise RuntimeError(
                        "This job is taking too long - please retry.")
                continue

            for job_result_item in response['results']['items']:
                jri_id = job_result_item['_meta']['id']
                if jri_id not in seen_ids:
                    # We have a new result_item
                    results_changed_dt = datetime.now()
                    seen_ids.add(jri_id)
                    self._nqprint("")
                    yield self._cleanup_job_result_item(job_result_item)

            if results_changed_dt:
                waited_dur2 = results_changed_dt - datetime.now()
                if waited_dur2 > timedelta(minutes=MAX_WAIT_FOR_CHANGE_MINUTES):
                    # It has been too long since we've seen a new result, so
                    # we will assume the job is stalled on the server
                    break

            if response.get("done") == True:
                break

            time.sleep(1)

    def extract(self, url_or_urls, *args, **kwargs):
        """Extract items from a given URL, given an item template.

        An item template is a dictionary where the keys are the desired
        output fieldnames and the values are the instructions for extraction of
        that field.

        Example item templates:
        {
            "magnitude": "What is the magnitude of this earthquake?",
            "location": "What is the location of this earthquake?",
            "time": "What is the time of this earthquake?"
        }

        {
            "url": "Find me all the links to the product detail pages."
        }

        To follow pagination, provide max_pages > 1.

        Args:
            item_template: the item template described above
            mode: 'single'|'multiple'|'auto' - defaults to 'auto'.  Set this to 'single' if each URL has only a single item.  Set this to 'multiple' if each URL should yield multiple items
            max_pages: enable pagination from the given URL.  Defaults to one page only.
            limit: limit the number of items yielded by this step
        """
        return self._workflow(url_or_urls).extract(*args, **kwargs)

    def init(self, url_or_urls, *args, **kwargs):
        """Initialize the workflow with one or more URLs.

        Args:
            url: Can be a single URL as a string, or a list of URLs.
        """
        return self._workflow(url_or_urls)

    def filter(*args, **kwargs):
        raise RuntimeError("Filter cannot be the first step.")


    def unique(*args, **kwargs):
        raise RuntimeError("Unique cannot be the first step.")

__init__(api_key=None, host='https://fetchfox.ai', quiet=False)

Initialize the FetchFox SDK.

You may also provide an API key in the environment variable FETCHFOX_API_KEY.

Parameters:
  • api_key (Optional[str], default: None ) –

    Your FetchFox API key. Overrides the environment variable.

  • host (str, default: 'https://fetchfox.ai' ) –

    API host URL (defaults to production)

  • quiet

    set to True to suppress printing

Source code in src/fetchfox_sdk/client.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def __init__(self,
        api_key: Optional[str] = None, host: str = "https://fetchfox.ai",
        quiet=False):
    """Initialize the FetchFox SDK.

    You may also provide an API key in the environment variable `FETCHFOX_API_KEY`.

    Args:
        api_key: Your FetchFox API key.  Overrides the environment variable.
        host: API host URL (defaults to production)
        quiet: set to True to suppress printing
    """
    self.base_url = urljoin(host, _API_PREFIX)

    self.api_key = api_key
    if self.api_key is None:
        self.api_key = os.environ.get("FETCHFOX_API_KEY")

    if not self.api_key:
        raise ValueError(
            "API key must be provided either as argument or "
            "in FETCHFOX_API_KEY environment variable")

    self.headers = {
        'Content-Type': 'application/json',
        'Authorization': f'Bearer: {self.api_key}'
    }

    self.quiet = quiet
    self._executor = ThreadPoolExecutor(max_workers=1)

extract(url_or_urls, *args, **kwargs)

Extract items from a given URL, given an item template.

An item template is a dictionary where the keys are the desired output fieldnames and the values are the instructions for extraction of that field.

Example item templates: { "magnitude": "What is the magnitude of this earthquake?", "location": "What is the location of this earthquake?", "time": "What is the time of this earthquake?" }

{ "url": "Find me all the links to the product detail pages." }

To follow pagination, provide max_pages > 1.

Parameters:
  • item_template

    the item template described above

  • mode

    'single'|'multiple'|'auto' - defaults to 'auto'. Set this to 'single' if each URL has only a single item. Set this to 'multiple' if each URL should yield multiple items

  • max_pages

    enable pagination from the given URL. Defaults to one page only.

  • limit

    limit the number of items yielded by this step

Source code in src/fetchfox_sdk/client.py
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
def extract(self, url_or_urls, *args, **kwargs):
    """Extract items from a given URL, given an item template.

    An item template is a dictionary where the keys are the desired
    output fieldnames and the values are the instructions for extraction of
    that field.

    Example item templates:
    {
        "magnitude": "What is the magnitude of this earthquake?",
        "location": "What is the location of this earthquake?",
        "time": "What is the time of this earthquake?"
    }

    {
        "url": "Find me all the links to the product detail pages."
    }

    To follow pagination, provide max_pages > 1.

    Args:
        item_template: the item template described above
        mode: 'single'|'multiple'|'auto' - defaults to 'auto'.  Set this to 'single' if each URL has only a single item.  Set this to 'multiple' if each URL should yield multiple items
        max_pages: enable pagination from the given URL.  Defaults to one page only.
        limit: limit the number of items yielded by this step
    """
    return self._workflow(url_or_urls).extract(*args, **kwargs)

init(url_or_urls, *args, **kwargs)

Initialize the workflow with one or more URLs.

Parameters:
  • url

    Can be a single URL as a string, or a list of URLs.

Source code in src/fetchfox_sdk/client.py
419
420
421
422
423
424
425
def init(self, url_or_urls, *args, **kwargs):
    """Initialize the workflow with one or more URLs.

    Args:
        url: Can be a single URL as a string, or a list of URLs.
    """
    return self._workflow(url_or_urls)

workflow_by_id(workflow_id)

Use a public workflow ID

Something like fox.workflow_by_id(ID).configure_params({state:"AK"}).export("blah.csv")

Source code in src/fetchfox_sdk/client.py
182
183
184
185
186
187
188
189
def workflow_by_id(self, workflow_id) -> "Workflow":
    """Use a public workflow ID

    Something like fox.workflow_by_id(ID).configure_params({state:"AK"}).export("blah.csv")

    """
    workflow_json = self._get_workflow(workflow_id)
    return self.workflow_from_json(workflow_json)

workflow_from_json(json_workflow)

Given a JSON string, such as you can generate in the wizard at https://fetchfox.ai, create a workflow from it.

Once created, it can be used like a regular workflow.

Parameters:
  • json_workflow

    This must be a valid JSON string that represents a Fetchfox Workflow. You should not usually try to write these manually, but simply copy-paste from the web interface.

Source code in src/fetchfox_sdk/client.py
166
167
168
169
170
171
172
173
174
175
def workflow_from_json(self, json_workflow) -> "Workflow":
    """Given a JSON string, such as you can generate in the wizard at
    https://fetchfox.ai, create a workflow from it.

    Once created, it can be used like a regular workflow.

    Args:
        json_workflow: This must be a valid JSON string that represents a Fetchfox Workflow.  You should not usually try to write these manually, but simply copy-paste from the web interface.
    """
    return self._workflow_from_dict(json.loads(json_workflow))

Item

Wrapper for result items that provides attribute access with dot notation while maintaining dictionary-like compatibility.

Source code in src/fetchfox_sdk/item.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class Item:
    """
    Wrapper for result items that provides attribute access with dot notation
    while maintaining dictionary-like compatibility.
    """
    def __init__(self, data):
        self._data = data

    def __getattr__(self, name):
        if name in self._data:
            return self._data[name]
        raise AttributeError(f"'Item' object has no attribute '{name}'")

    def __getitem__(self, key):
        return self._data[key]

    def __contains__(self, item):
        return item in self._data

    def __iter__(self):
        return iter(self._data)

    def __len__(self):
        return len(self._data)

    def __repr__(self):
        return f"ResultItem({self._data})"

    def __str__(self):
        return str(self._data)

    # Support dict() conversion
    def keys(self):
        return self._data.keys()

    def items(self):
        return self._data.items()

    def values(self):
        return self._data.values()

    def to_dict(self):
        """Convert back to a regular dictionary."""
        return self._data.copy()

    def get(self, key, default=None):
        """Get a value with a default if the key doesn't exist."""
        return self._data.get(key, default)

    def __eq__(self, other):
        if isinstance(other, ResultItem):
            return self._data == other._data
        elif isinstance(other, dict):
            return self._data == other
        return False

    def __bool__(self):
        return bool(self._data)

get(key, default=None)

Get a value with a default if the key doesn't exist.

Source code in src/fetchfox_sdk/item.py
46
47
48
def get(self, key, default=None):
    """Get a value with a default if the key doesn't exist."""
    return self._data.get(key, default)

to_dict()

Convert back to a regular dictionary.

Source code in src/fetchfox_sdk/item.py
42
43
44
def to_dict(self):
    """Convert back to a regular dictionary."""
    return self._data.copy()

Workflow

Source code in src/fetchfox_sdk/workflow.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
class Workflow:

    _executor = concurrent.futures.ThreadPoolExecutor()

    def __init__(self, sdk_context):

        self._sdk = sdk_context

        self._workflow = {
            "steps": [],
            "options": {}
        }

        self._results = None
        self._ran_job_id = None
        self._future = None

    @property
    def all_results(self):
        """Get all results, executing the query if necessary, blocks until done.
        Returns results as Item objects for easier attribute access.
        """
        if not self.has_results:
            self._run__block_until_done() # writes to self._results

        return [Item(item) for item in self._results]

    def results(self):
        yield from self._results_gen()

    @property
    def has_results(self):
        """If you want to check whether a workflow has results already, but
        do NOT want to trigger execution yet."""
        if self._results is None:
            return False
        return True

    @property
    def has_run(self):
        """If this workflow has been executed before (even if there were no
        results)
        """
        if self._ran_job_id is not None:
            return True
        return False

    def __iter__(self) -> Generator[Item, None, None]:
        """Make the workflow iterable.
        Accessing the results property will execute the workflow if necessary.
        """
        # Use the results property which already returns Items
        yield from self.results()

    def __getitem__(self, key):
        """Allow indexing into the workflow results.
        Accessing the results property will execute the workflow if necessary.
        NOTE: Workflows will NEVER execute partially.  Accessing any item of
        the results will always trigger a complete execution.

        Args:
            key: Can be an integer index or a slice
        """
        # Results property already returns Items
        return self.all_results[key]

    def __bool__(self):
        """Return True if the workflow has any results, False otherwise.
        Accessing the results property will execute the workflow if necessary.
        """
        return bool(self.all_results)

    def __len__(self):
        """Return the number of results.
        Accessing the results property will execute the workflow if necessary.
        """
        return len(self.all_results)

    def __contains__(self, item):
        """Check if an item exists in the results.
        Accessing the results property will execute the workflow if necessary.
        """
        return item in self.all_results

    def _clone(self):
        """Create a new instance with copied workflow OR copied results"""
        # check underlying, not property, because we don't want to trigger exec
        if self._results is None or len(self._results) < 1:
            # If there are no results, we are extending the steps of this workflow
            # so that, when it runs, we'll produce the desired results
            if self._ran_job_id is not None:
                #TODO - anything else we should do when we've run but no results?
                logger.debug("Cloning a job that ran, but which had no results")

            new_instance = Workflow(self._sdk)
            new_instance._workflow = copy.deepcopy(self._workflow)
            return new_instance
        else:
            # We purportedly have more than zero results:
            # We are disposing of the steps that have been executed.
            # The results are now used for workflows that derive from this one,
            # This allows re-using a workflow to make many deriviatives without
            # re-executing it or having to manually initialize them from
            # the results
            new_instance = Workflow(self._sdk)
            new_instance._workflow["steps"] = [
                {
                    "name": "const",
                    "args": {
                        "items": copy.deepcopy(self._results)
                        # We use the internal _results field, because it's a
                        # list of dictionaries rather than Items
                    }
                }
            ]
            return new_instance

    #TODO: refresh?
    #Force a re-run, even though results are present?

    def _run__block_until_done(self) -> List[Dict]:
        """Execute the workflow and return results.

        Note that running the workflow will attach the results to it.  After it
        has results, derived workflows will be given the _results_ from this workflow,
        NOT the steps of this workflow.
        """
        logger.debug("Running workflow to completion")
        return list(self._results_gen())

    def _results_gen(self):
        """Generator yields results as they are available from the job.
        Attaches results to workflow as it proceeds, so they are later available
        without running again.
        """

        logger.debug("Streaming Results")
        if not self.has_results:
            self._results = []
            job_id = self._sdk._run_workflow(workflow=self)
            self._ran_job_id = job_id #track that we have ran
            for item in self._sdk._job_result_items_gen(job_id):
                self._results.append(item)
                yield Item(item)
        else:
            yield from self.all_results #yields Items

    def _future_done_cb(self, future):
        """Done-callback: triggered when the future completes
        (success, fail, or cancelled).
        We store final results if everything’s okay;
        otherwise, we can handle exceptions.
        """
        if not future.cancelled():
            self._results = future.result()
        else:
            self._future = None

    def results_future(self):
        """Returns a plain concurrent.futures.Future object that yields ALL results
        when the job is complete.  Access the_future.result() to block, or use
        the_future.done() to check for completion without any blocking.

        If we already have results, they will be immediately available in the
        `future.result()`
        """

        if self._results is not None:
            # Already have final results: return a completed future
            completed_future = concurrent.futures.Future()
            completed_future.set_result(self._results)
            self._future = completed_future

        if self._future is not None:
            # Already started, so reuse existing future
            return self._future

        self._future = self._executor.submit(self._run__block_until_done)
        self._future.add_done_callback(self._future_done_cb)
        return self._future

    def init(self, url: Union[str, List[str]]) -> "Workflow":
        """Initialize the workflow with one or more URLs.

        Args:
            url: Can be a single URL as a string, or a list of URLs.
        """
        #TODO: if used more than once, raise error and print helpful message
        #TODO: do params here?

        new_instance = self._clone()

        if isinstance(url, str):
            items = [{"url": url}]
        else:
            items = [{"url": u} for u in url]

        new_instance._workflow["steps"].append({
            "name": "const",
            "args": {
                "items": items,
                "maxPages": 1 #TODO
            }
        })
        return new_instance

    def configure_params(self, params) -> "Workflow":
        raise NotImplementedError()

    def export(self, filename: str, overwrite: bool = False) -> None:
        """Execute workflow and save results to file.

        Args:
            filename: Path to output file, must end with .csv or .jsonl
            overwrite: Defaults to False, which causes an error to be raised if the file exists already.  Set it to true if you want to overwrite.

        Raises:
            ValueError: If filename doesn't end with .csv or .jsonl
            FileExistsError: If file exists and overwrite is False
        """

        if not (filename.endswith('.csv') or filename.endswith('.jsonl')):
            raise ValueError("Output filename must end with .csv or .jsonl")

        if os.path.exists(filename) and not overwrite:
            raise FileExistsError(
                f"File {filename} already exists. Use overwrite=True to overwrite.")

        if self.has_run:
            if not self.has_results:
                raise RuntimeError("A job ran, but there are no results.")

            # If it has run, and results is not None, results could still be []
            # anyway, accessing it here won't trigger another run
            if len(self.all_results) < 1:
                if os.path.exists(filename) and overwrite:
                    raise RuntimeError("No results.  Refusing to overwrite.")
                else:
                    self._sdk._nqprint("No results to export.")

        # Now we access the magic property, so execution will occur if needed
        raw_results = [ dict(result_item) for result_item in self.all_results ]

        if filename.endswith('.csv'):
            fieldnames = set()
            for item in raw_results:
                fieldnames.update(item.keys())

            with open(filename, 'w', newline='') as f:
                writer = csv.DictWriter(f, fieldnames=sorted(fieldnames))
                writer.writeheader()
                writer.writerows(raw_results)

        else:
            with open(filename, 'w') as f:
                for item in raw_results:
                    f.write(json.dumps(item) + '\n')


    def extract(self, item_template: dict, mode=None, view=None,
            limit=None, max_pages=1) -> "Workflow":
        """Provide an item_template which describes what you want to extract
        from the URLs processed by this step.

        The keys of this template are the fieldnames,
        and the values are the instructions for extracting that field.

        Examples:
        {
            "magnitude": "What is the magnitude of this earthquake?",
            "location": "What is the location of this earthquake?",
            "time": "What is the time of this earthquake?"
        }

        {
            "url": "Find me the URLs of the product detail pages."
        }

        Args:
            item_template: the item template described above
            mode: 'single'|'multiple'|'auto' - defaults to 'auto'.  Set this to 'single' if each URL has only a single item.  Set this to 'multiple' if each URL should yield multiple items
            max_pages: enable pagination from the given URL.  Defaults to one page only.
            limit: limit the number of items yielded by this step
            view: 'html' | 'selectHtml' | 'text' - defaults to HTML (the full HTML).  Use 'selectHTML' to have the AI see only text and links.  Use 'text' to have the AI see only text.
        """
        # Validate field names to prevent collisions with Item methods
        RESERVED_PROPERTIES = {'keys', 'items', 'values', 'to_dict', 'get'}

        for field_name in item_template.keys():
            if field_name in RESERVED_PROPERTIES:
                raise ValueError(
                    f"Field name '{field_name}' is a reserved property name. "
                    f"Please choose a different field name. "
                    f"Reserved names are: {', '.join(RESERVED_PROPERTIES)}"
                )

        if mode is not None and mode not in ["single", "multiple", "auto"]:
            raise ValueError("Mode may only be 'single'|'multiple'|'auto'")

        new_instance = self._clone()

        new_step = {
            "name": "extract",
            "args": {
                "questions": item_template,
                "maxPages": max_pages,
                "limit": limit,
            }
        }

        if view is not None:
            new_step['args']['view'] = view

        if mode is not None:
            new_step['args']['mode'] = mode

        new_instance._workflow["steps"].append(new_step)

        return new_instance

    def limit(self, n: int) -> "Workflow":
        """
        Limit the total number of results that this workflow will produce.
        """
        if self._workflow['options'].get('limit') is not None:
            raise ValueError(
                "This limit is per-workflow, and may only be set once.")

        #TODO: if there are results, I think we could actually carry them through?
        new_instance = self._clone()
        new_instance._workflow['options']["limit"] = n
        return new_instance

    def unique(self, fields_list: List[str], limit=None) -> "Workflow":
        """Provide a list of fields which will be used to check the uniqueness
        of the items passing through this step.

        Any items which are duplicates (as determined by these fields only),
        will be filtered and will not be seen by the next step in your workflow.

        Args:
            fields_list: the instruction described above
            limit: limit the number of items yielded by this step
        """
        new_instance = self._clone()

        new_instance._workflow['steps'].append({
            "name": "unique",
            "args": {
                "fields": fields_list,
                "limit": limit
            }
        })

        return new_instance

    def filter(self, instruction: str, limit=None) -> "Workflow":
        """Provide instructions for how to filter items.

        Example: "Exclude any earthquakes that were unlikely to cause significant property damage."

        Args:
            instruction: the instruction described above
            limit: limit the number of items yielded by this step
        """
        new_instance = self._clone()
        new_instance._workflow['steps'].append({
            "name": "filter",
            "args": {
                "query": instruction,
                "limit": limit
            }
        })

        return new_instance

    def to_dict(self) -> Dict[str, Any]:
        """Convert workflow to dictionary format."""
        return self._workflow

    def to_json(self):
        return json.dumps(self._workflow)

all_results property

Get all results, executing the query if necessary, blocks until done. Returns results as Item objects for easier attribute access.

has_results property

If you want to check whether a workflow has results already, but do NOT want to trigger execution yet.

has_run property

If this workflow has been executed before (even if there were no results)

__bool__()

Return True if the workflow has any results, False otherwise. Accessing the results property will execute the workflow if necessary.

Source code in src/fetchfox_sdk/workflow.py
79
80
81
82
83
def __bool__(self):
    """Return True if the workflow has any results, False otherwise.
    Accessing the results property will execute the workflow if necessary.
    """
    return bool(self.all_results)

__contains__(item)

Check if an item exists in the results. Accessing the results property will execute the workflow if necessary.

Source code in src/fetchfox_sdk/workflow.py
91
92
93
94
95
def __contains__(self, item):
    """Check if an item exists in the results.
    Accessing the results property will execute the workflow if necessary.
    """
    return item in self.all_results

__getitem__(key)

Allow indexing into the workflow results. Accessing the results property will execute the workflow if necessary. NOTE: Workflows will NEVER execute partially. Accessing any item of the results will always trigger a complete execution.

Parameters:
  • key

    Can be an integer index or a slice

Source code in src/fetchfox_sdk/workflow.py
67
68
69
70
71
72
73
74
75
76
77
def __getitem__(self, key):
    """Allow indexing into the workflow results.
    Accessing the results property will execute the workflow if necessary.
    NOTE: Workflows will NEVER execute partially.  Accessing any item of
    the results will always trigger a complete execution.

    Args:
        key: Can be an integer index or a slice
    """
    # Results property already returns Items
    return self.all_results[key]

__iter__()

Make the workflow iterable. Accessing the results property will execute the workflow if necessary.

Source code in src/fetchfox_sdk/workflow.py
60
61
62
63
64
65
def __iter__(self) -> Generator[Item, None, None]:
    """Make the workflow iterable.
    Accessing the results property will execute the workflow if necessary.
    """
    # Use the results property which already returns Items
    yield from self.results()

__len__()

Return the number of results. Accessing the results property will execute the workflow if necessary.

Source code in src/fetchfox_sdk/workflow.py
85
86
87
88
89
def __len__(self):
    """Return the number of results.
    Accessing the results property will execute the workflow if necessary.
    """
    return len(self.all_results)

export(filename, overwrite=False)

Execute workflow and save results to file.

Parameters:
  • filename (str) –

    Path to output file, must end with .csv or .jsonl

  • overwrite (bool, default: False ) –

    Defaults to False, which causes an error to be raised if the file exists already. Set it to true if you want to overwrite.

Raises:
  • ValueError

    If filename doesn't end with .csv or .jsonl

  • FileExistsError

    If file exists and overwrite is False

Source code in src/fetchfox_sdk/workflow.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
def export(self, filename: str, overwrite: bool = False) -> None:
    """Execute workflow and save results to file.

    Args:
        filename: Path to output file, must end with .csv or .jsonl
        overwrite: Defaults to False, which causes an error to be raised if the file exists already.  Set it to true if you want to overwrite.

    Raises:
        ValueError: If filename doesn't end with .csv or .jsonl
        FileExistsError: If file exists and overwrite is False
    """

    if not (filename.endswith('.csv') or filename.endswith('.jsonl')):
        raise ValueError("Output filename must end with .csv or .jsonl")

    if os.path.exists(filename) and not overwrite:
        raise FileExistsError(
            f"File {filename} already exists. Use overwrite=True to overwrite.")

    if self.has_run:
        if not self.has_results:
            raise RuntimeError("A job ran, but there are no results.")

        # If it has run, and results is not None, results could still be []
        # anyway, accessing it here won't trigger another run
        if len(self.all_results) < 1:
            if os.path.exists(filename) and overwrite:
                raise RuntimeError("No results.  Refusing to overwrite.")
            else:
                self._sdk._nqprint("No results to export.")

    # Now we access the magic property, so execution will occur if needed
    raw_results = [ dict(result_item) for result_item in self.all_results ]

    if filename.endswith('.csv'):
        fieldnames = set()
        for item in raw_results:
            fieldnames.update(item.keys())

        with open(filename, 'w', newline='') as f:
            writer = csv.DictWriter(f, fieldnames=sorted(fieldnames))
            writer.writeheader()
            writer.writerows(raw_results)

    else:
        with open(filename, 'w') as f:
            for item in raw_results:
                f.write(json.dumps(item) + '\n')

extract(item_template, mode=None, view=None, limit=None, max_pages=1)

Provide an item_template which describes what you want to extract from the URLs processed by this step.

The keys of this template are the fieldnames, and the values are the instructions for extracting that field.

Examples:

{ "magnitude": "What is the magnitude of this earthquake?", "location": "What is the location of this earthquake?", "time": "What is the time of this earthquake?" }

{ "url": "Find me the URLs of the product detail pages." }

Parameters:
  • item_template (dict) –

    the item template described above

  • mode

    'single'|'multiple'|'auto' - defaults to 'auto'. Set this to 'single' if each URL has only a single item. Set this to 'multiple' if each URL should yield multiple items

  • max_pages

    enable pagination from the given URL. Defaults to one page only.

  • limit

    limit the number of items yielded by this step

  • view

    'html' | 'selectHtml' | 'text' - defaults to HTML (the full HTML). Use 'selectHTML' to have the AI see only text and links. Use 'text' to have the AI see only text.

Source code in src/fetchfox_sdk/workflow.py
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
def extract(self, item_template: dict, mode=None, view=None,
        limit=None, max_pages=1) -> "Workflow":
    """Provide an item_template which describes what you want to extract
    from the URLs processed by this step.

    The keys of this template are the fieldnames,
    and the values are the instructions for extracting that field.

    Examples:
    {
        "magnitude": "What is the magnitude of this earthquake?",
        "location": "What is the location of this earthquake?",
        "time": "What is the time of this earthquake?"
    }

    {
        "url": "Find me the URLs of the product detail pages."
    }

    Args:
        item_template: the item template described above
        mode: 'single'|'multiple'|'auto' - defaults to 'auto'.  Set this to 'single' if each URL has only a single item.  Set this to 'multiple' if each URL should yield multiple items
        max_pages: enable pagination from the given URL.  Defaults to one page only.
        limit: limit the number of items yielded by this step
        view: 'html' | 'selectHtml' | 'text' - defaults to HTML (the full HTML).  Use 'selectHTML' to have the AI see only text and links.  Use 'text' to have the AI see only text.
    """
    # Validate field names to prevent collisions with Item methods
    RESERVED_PROPERTIES = {'keys', 'items', 'values', 'to_dict', 'get'}

    for field_name in item_template.keys():
        if field_name in RESERVED_PROPERTIES:
            raise ValueError(
                f"Field name '{field_name}' is a reserved property name. "
                f"Please choose a different field name. "
                f"Reserved names are: {', '.join(RESERVED_PROPERTIES)}"
            )

    if mode is not None and mode not in ["single", "multiple", "auto"]:
        raise ValueError("Mode may only be 'single'|'multiple'|'auto'")

    new_instance = self._clone()

    new_step = {
        "name": "extract",
        "args": {
            "questions": item_template,
            "maxPages": max_pages,
            "limit": limit,
        }
    }

    if view is not None:
        new_step['args']['view'] = view

    if mode is not None:
        new_step['args']['mode'] = mode

    new_instance._workflow["steps"].append(new_step)

    return new_instance

filter(instruction, limit=None)

Provide instructions for how to filter items.

Example: "Exclude any earthquakes that were unlikely to cause significant property damage."

Parameters:
  • instruction (str) –

    the instruction described above

  • limit

    limit the number of items yielded by this step

Source code in src/fetchfox_sdk/workflow.py
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
def filter(self, instruction: str, limit=None) -> "Workflow":
    """Provide instructions for how to filter items.

    Example: "Exclude any earthquakes that were unlikely to cause significant property damage."

    Args:
        instruction: the instruction described above
        limit: limit the number of items yielded by this step
    """
    new_instance = self._clone()
    new_instance._workflow['steps'].append({
        "name": "filter",
        "args": {
            "query": instruction,
            "limit": limit
        }
    })

    return new_instance

init(url)

Initialize the workflow with one or more URLs.

Parameters:
  • url (Union[str, List[str]]) –

    Can be a single URL as a string, or a list of URLs.

Source code in src/fetchfox_sdk/workflow.py
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
def init(self, url: Union[str, List[str]]) -> "Workflow":
    """Initialize the workflow with one or more URLs.

    Args:
        url: Can be a single URL as a string, or a list of URLs.
    """
    #TODO: if used more than once, raise error and print helpful message
    #TODO: do params here?

    new_instance = self._clone()

    if isinstance(url, str):
        items = [{"url": url}]
    else:
        items = [{"url": u} for u in url]

    new_instance._workflow["steps"].append({
        "name": "const",
        "args": {
            "items": items,
            "maxPages": 1 #TODO
        }
    })
    return new_instance

limit(n)

Limit the total number of results that this workflow will produce.

Source code in src/fetchfox_sdk/workflow.py
333
334
335
336
337
338
339
340
341
342
343
344
def limit(self, n: int) -> "Workflow":
    """
    Limit the total number of results that this workflow will produce.
    """
    if self._workflow['options'].get('limit') is not None:
        raise ValueError(
            "This limit is per-workflow, and may only be set once.")

    #TODO: if there are results, I think we could actually carry them through?
    new_instance = self._clone()
    new_instance._workflow['options']["limit"] = n
    return new_instance

results_future()

Returns a plain concurrent.futures.Future object that yields ALL results when the job is complete. Access the_future.result() to block, or use the_future.done() to check for completion without any blocking.

If we already have results, they will be immediately available in the future.result()

Source code in src/fetchfox_sdk/workflow.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
def results_future(self):
    """Returns a plain concurrent.futures.Future object that yields ALL results
    when the job is complete.  Access the_future.result() to block, or use
    the_future.done() to check for completion without any blocking.

    If we already have results, they will be immediately available in the
    `future.result()`
    """

    if self._results is not None:
        # Already have final results: return a completed future
        completed_future = concurrent.futures.Future()
        completed_future.set_result(self._results)
        self._future = completed_future

    if self._future is not None:
        # Already started, so reuse existing future
        return self._future

    self._future = self._executor.submit(self._run__block_until_done)
    self._future.add_done_callback(self._future_done_cb)
    return self._future

to_dict()

Convert workflow to dictionary format.

Source code in src/fetchfox_sdk/workflow.py
389
390
391
def to_dict(self) -> Dict[str, Any]:
    """Convert workflow to dictionary format."""
    return self._workflow

unique(fields_list, limit=None)

Provide a list of fields which will be used to check the uniqueness of the items passing through this step.

Any items which are duplicates (as determined by these fields only), will be filtered and will not be seen by the next step in your workflow.

Parameters:
  • fields_list (List[str]) –

    the instruction described above

  • limit

    limit the number of items yielded by this step

Source code in src/fetchfox_sdk/workflow.py
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
def unique(self, fields_list: List[str], limit=None) -> "Workflow":
    """Provide a list of fields which will be used to check the uniqueness
    of the items passing through this step.

    Any items which are duplicates (as determined by these fields only),
    will be filtered and will not be seen by the next step in your workflow.

    Args:
        fields_list: the instruction described above
        limit: limit the number of items yielded by this step
    """
    new_instance = self._clone()

    new_instance._workflow['steps'].append({
        "name": "unique",
        "args": {
            "fields": fields_list,
            "limit": limit
        }
    })

    return new_instance