Skip to content

dataset

Dataset

Bases: ABC

This is the base class for Datasets, providing functions that manage task runs and logs.

Source code in data_manager/dataset.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
class Dataset(ABC):
    """
    This is the base class for Datasets, providing functions that manage task runs and logs.
    """

    backend: str
    name: str
    retries: int
    retry_delay: int

    @abstractmethod
    def main(self):
        """
        Dataset child classes must implement a main function
        This is the function that is called when Dataset.run() is invoked
        """
        raise NotImplementedError("Dataset classes must implement a main function")

    def get_logger(self):
        """
        This function will return a logger that implements the Python logging API:
        https://docs.python.org/3/library/logging.html

        If you are using Prefect, the logs will be managed by Prefect
        """
        if self.backend == "prefect":
            from prefect import get_run_logger

            return get_run_logger()
        else:
            return logging.getLogger("dataset")

    @contextmanager
    def tmp_to_dst_file(
        self,
        final_dst: str | os.PathLike,
        make_dst_dir: bool = False,
        tmp_dir: Optional[str | os.PathLike] = None,
        validate_cog: bool = False,
    ):
        """
        Context manager that provides a temporary file path to write
        output files to, that is automatically moved to a final destination
        once the context is exited. This prevents interrupted jobs
        from leaving partially-written files in the filesystem where
        they might be mistaken for complete files.

        Additionally, this context manager can create output directories
        that don't exist yet, or validate COG files after they've been
        written. See the list of parameters below for more information.

        Here is an example of its use:

        ```python
        with self.tmp_to_dst_file(final_dst, validate_cog=True) as tmp_dst:
            with rasterio.open(tmp_dst, "w", **meta) as dst:
                ...
        ```

        Parameters:
            final_dst: Path to where the file should be written.
            make_dst_dir: If set to true, the parent directory of `final_dst` will be created (and any of its parents, as necessary)
            tmp_dir: Path to directory where file should be temporarily stored. If set to `None`, a default directory will be used.
            validate_cog: If set to `True`, the written file will be validated as a COG, and an exception will be raised if this validation fails.
        """
        logger = self.get_logger()

        final_dst = Path(final_dst)

        # make sure that final_dst parent directory exists
        if not final_dst.parent.exists():
            if make_dst_dir:
                os.makedirs(final_dst.parent, exist_ok=True)
            else:
                raise FileNotFoundError(
                    f"Parent directory of requested filepath {str(final_dst)} does not exist."
                )

        tmp_sub_dir = mkdtemp(dir=tmp_dir)
        _, tmp_path = mkstemp(dir=tmp_sub_dir)
        logger.debug(
            f"Created temporary file {tmp_path} with final destination {str(final_dst)}"
        )
        yield tmp_path

        # validate Cloud Optimized GeoTIFF
        # doing this before move because disk r/w is almost certainly faster
        if validate_cog:
            is_valid, errors, warnings = cog_validate(tmp_path)
            if is_valid:
                logger.info(
                    f"Successfully validated output COG {tmp_path} (destined for {str(final_dst)}))"
                )
            else:
                logger.exception(
                    f"Failed to validate COG {tmp_path} (destined for {str(final_dst)})"
                )
            for error in errors:
                logger.error(f"Error encountered when validating COG: {error}")
            for warning in warnings:
                logger.warning(f"Warning encountered when validating COG: {warning}")

        # move file from tmp_path to final_dst
        try:
            logger.debug(f"Attempting to move {tmp_path} to {str(final_dst)}")
            shutil.move(tmp_path, final_dst)
        except Exception:
            logger.exception(
                f"Failed to transfer temporary file {tmp_path} to final destination {str(final_dst)}"
            )
        else:
            logger.debug(
                f"Successfully transferred {tmp_path} to final destination {str(final_dst)}"
            )

    def error_wrapper(self, func: Callable, args: Dict[str, Any]):
        """
        This is the wrapper that is used when running individual tasks
        It will always return a TaskResult!
        """
        logger = self.get_logger()

        for try_no in range(self.retries + 1):
            try:
                return TaskResult(0, "Success", args, func(*args))
            except Exception as e:
                if self.bypass_error_wrapper:
                    logger.info(
                        "Task failed with exception, and error wrapper bypass enabled. Raising..."
                    )
                    raise
                if try_no < self.retries:
                    logger.error(f"Task failed with exception (retrying): {repr(e)}")
                    time.sleep(self.retry_delay)
                    continue
                else:
                    logger.error(f"Task failed with exception (giving up): {repr(e)}")
                    return TaskResult(1, repr(e), args, None)

    def run_serial_tasks(
        self, name, func: Callable, input_list: Iterable[Dict[str, Any]]
    ):
        """
        Run tasks in serial (locally), given a function and list of inputs
        This will always return a list of TaskResults!
        """
        logger = self.get_logger()
        logger.debug(f"run_serial_tasks - input_list: {input_list}")
        return [self.error_wrapper(func, i) for i in input_list]

    def run_concurrent_tasks(
        self,
        name: str,
        func: Callable,
        input_list: Iterable[Dict[str, Any]],
        force_sequential: bool,
        max_workers: int = None,
    ):
        """
        Run tasks concurrently (locally), given a function a list of inputs
        This will always return a list of TaskResults!
        """

        pool_size = 1 if force_sequential else max_workers
        with multiprocessing.Pool(pool_size) as pool:
            results = pool.starmap(
                self.error_wrapper,
                [(func, i) for i in input_list],
                chunksize=self.chunksize,
            )
        return results

    def run_prefect_tasks(
        self,
        name: str,
        func: Callable,
        input_list: Iterable[Dict[str, Any]],
        force_sequential: bool,
        prefect_concurrency_tag: str = None,
        prefect_concurrency_task_value: int = 1,
    ):
        """
        Run tasks using Prefect, using whichever task runner decided in self.run()
        This will always return a list of TaskResults!
        """

        from prefect import task
        from prefect.concurrency.sync import concurrency

        logger = self.get_logger()

        def cfunc(wrapper_args, func_args):
            func, prefect_concurrency_tag, prefect_concurrency_task_value = wrapper_args
            with concurrency(
                prefect_concurrency_tag, occupy=prefect_concurrency_task_value
            ):
                return func(*func_args)

        if not prefect_concurrency_tag:
            task_wrapper = task(
                func,
                name=name,
                retries=self.retries,
                retry_delay_seconds=self.retry_delay,
                persist_result=True,
            )
        else:
            task_wrapper = task(
                cfunc,
                name=name,
                retries=self.retries,
                retry_delay_seconds=self.retry_delay,
                persist_result=True,
            )

        futures = []
        for i in input_list:
            w = [f[1] for f in futures] if force_sequential else None
            if prefect_concurrency_tag:
                args = (
                    (func, prefect_concurrency_tag, prefect_concurrency_task_value),
                    i,
                )
            else:
                args = i
            futures.append(
                (args, task_wrapper.submit(*args, wait_for=w, return_state=False))
            )

        results = []

        states = [(i[0], i[1].wait()) for i in futures]

        while states:
            for ix, (inputs, state) in enumerate(states):
                if state.is_completed():
                    # print('complete', ix, inputs)
                    logger.info(f"complete - {ix} - {inputs}")

                    results.append(TaskResult(0, "Success", inputs, state.result()))
                elif state.is_failed() or state.is_crashed() or state.is_cancelled():
                    # print('fail', ix, inputs)
                    logger.info(f"fail - {ix} - {inputs}")

                    try:
                        msg = repr(state.result(raise_on_failure=True))
                    except Exception as e:
                        msg = f"Unable to retrieve error message - {e}"
                    results.append(TaskResult(1, msg, inputs, None))
                else:
                    # print('not ready', ix, inputs)
                    continue
                _ = states.pop(ix)
            time.sleep(5)

        # for inputs, future in futures:
        #     state = future.wait(60*60*2)
        #     if state.is_completed():
        #         results.append(TaskResult(0, "Success", inputs, state.result()))
        #     elif state.is_failed() or state.is_crashed():
        #         try:
        #             msg = repr(state.result(raise_on_failure=False))
        #         except:
        #             msg = "Unable to retrieve error message"
        #         results.append(TaskResult(1, msg, inputs, None))
        #     else:
        #         pass

        # while futures:
        #     for ix, (inputs, future) in enumerate(futures):
        #         state = future.get_state()
        #         # print(repr(state))
        #         # print(repr(future))
        #         if state.is_completed():
        #             print('complete', ix, inputs)
        #             results.append(TaskResult(0, "Success", inputs, future.result()))
        #         elif state.is_failed() or state.is_crashed() or state.is_cancelled():
        #             print('fail', ix, inputs)
        #             try:
        #                 msg = repr(future.result(raise_on_failure=True))
        #             except Exception as e:
        #                 msg = f"Unable to retrieve error message - {e}"
        #             results.append(TaskResult(1, msg, inputs, None))
        #         else:
        #             # print('not ready', ix, inputs)
        #             continue
        #         _ = futures.pop(ix)
        #         # future.release()
        #     time.sleep(5)

        return results

    def run_mpi_tasks(
        self,
        name: str,
        func: Callable,
        input_list: Iterable[Dict[str, Any]],
        force_sequential: bool,
        max_workers: int = None,
    ):
        """
        Run tasks using MPI, requiring the use of `mpirun`
        self.pool is an MPIPoolExecutor initialized by self.run()
        This will always return a list of TaskResults!
        """
        from mpi4py.futures import MPIPoolExecutor

        if not max_workers:
            max_workers = self.mpi_max_workers

        with MPIPoolExecutor(max_workers=max_workers, chunksize=self.chunksize) as pool:
            futures = []
            for i in input_list:
                f = pool.submit(self.error_wrapper, func, i)
                if force_sequential:
                    wait([f])
                futures.append(f)
        return [f.result() for f in futures]

    def run_tasks(
        self,
        func: Callable,
        input_list: Iterable[Dict[str, Any]],
        name: Optional[str] = None,
        retries: int = 3,
        retry_delay: int = 60,
        force_sequential: bool = False,
        force_serial: bool = False,
        max_workers: Optional[int] = None,
        prefect_concurrency_tag: Optional[str] = None,
        prefect_concurrency_task_value: Optional[int] = None,
    ) -> ResultTuple:
        """
        Run a bunch of tasks, calling one of the above run_tasks functions
        This is the function that should be called most often from self.main()
        It will return a ResultTuple of TaskResults

        Parameters:
            func: The function to run for each task.
            input_list: An iterable of function inputs. For each input, a new task will be created with that input passed as the only parameter.
            name: A name for this task run, for easier reference.
            retries: Number of times to retry a task before giving up.
            retry_delay: Delay (in seconds) to wait between task retries.
            force_sequential: If set to `True`, all tasks in this run will be run in sequence, regardless of backend.
            force_serial: If set to `True`, all tasks will be run locally (using the internal "serial runner") rather than with this Dataset's usual backend. **Please avoid using this parameter, it will likely be deprecated soon!**
            max_workers: Maximum number of tasks to run at once, if using a concurrent mode. This value will not override `force_sequential` or `force_serial`. **Warning: This is not yet supported by the Prefect backend. We hope to fix this soon.**
            prefect_concurrency_tag: If using the Prefect backend, this tag will be used to limit the concurrency of this task. **This will eventually be deprecated in favor of `max_workers` once we have implemented that for the Prefect backend.**
            prefect_concurrency_task_value: If using the Prefect backend, this sets the maximum number of tasks to run at once, similar to `max_workers`. **See warning above.**
        """

        timestamp = datetime.today()

        if not callable(func):
            raise TypeError("Function passed to run_tasks is not callable")

        # Save global retry settings, and override with current values
        old_retries, old_retry_delay = self.retries, self.retry_delay
        self.retries, self.retry_delay = self.init_retries(retries, retry_delay)

        logger = self.get_logger()

        if name is None:
            try:
                name = func.__name__
            except AttributeError:
                logger.warning(
                    "No name given for task run, and function does not have a name (multiple unnamed functions may result in log files being overwritten)"
                )
                name = "unnamed"
        elif not isinstance(name, str):
            raise TypeError("Name of task run must be a string")

        if max_workers is None and hasattr(self, "max_workers"):
            max_workers = self.max_workers

        if self.backend == "serial" or force_serial:
            results = self.run_serial_tasks(name, func, input_list)
        elif self.backend == "concurrent":
            results = self.run_concurrent_tasks(
                name, func, input_list, force_sequential, max_workers=max_workers
            )
        elif self.backend == "prefect":
            results = self.run_prefect_tasks(
                name,
                func,
                input_list,
                force_sequential,
                prefect_concurrency_tag,
                prefect_concurrency_task_value,
            )

        elif self.backend == "mpi":
            results = self.run_mpi_tasks(
                name, func, input_list, force_sequential, max_workers=max_workers
            )
        else:
            raise ValueError(
                "Requested backend not recognized. Have you called this Dataset's run function?"
            )

        if len(results) == 0:
            raise ValueError(
                f"Task run {name} yielded no results. Did it receive any inputs?"
            )

        success_count = sum(1 for r in results if r.status_code == 0)
        error_count = len(results) - success_count
        if error_count == 0:
            logger.info(
                f"Task run {name} completed with {success_count} successes and no errors"
            )
        else:
            logger.warning(
                f"Task run {name} completed with {error_count} errors and {success_count} successes"
            )

        # Restore global retry settings
        self.retries, self.retry_delay = old_retries, old_retry_delay

        return ResultTuple(results, name, timestamp)

    def log_run(
        self,
        results,
        expand_args: list = [],
        expand_results: list = [],
        time_format_str: str = "%Y_%m_%d_%H_%M",
    ):
        """
        Log a task run
        Given a ResultTuple (usually from run_tasks), and save its logs to a CSV file
        time_format_str sets the timestamp format to use in the CSV filename

        expand_results is an optional set of labels for each item in TaskResult.result
          - None values in expand_results will exclude that column from output
          - if expand_results is an empty list, each TaskResult's result value will be
            written as-is to a "results" column in the CSV
        """
        time_str = results.timestamp.strftime(time_format_str)
        log_file = self.log_dir / f"{results.name}_{time_str}.csv"

        fieldnames = ["status_code", "status_message"]

        should_expand_args = False
        args_expansion_spec = []

        for ai, ax in enumerate(expand_args):
            if ax is not None:
                should_expand_args = True
                fieldnames.append(ax)
                args_expansion_spec.append((ax, ai))

        if not should_expand_args:
            fieldnames.append("args")

        should_expand_results = False
        results_expansion_spec = []

        for ri, rx in enumerate(expand_results):
            if rx is not None:
                should_expand_results = True
                fieldnames.append(rx)
                results_expansion_spec.append((rx, ri))

        if not should_expand_results:
            fieldnames.append("results")

        rows_to_write = []

        for r in results:
            row = [r[0], r[1]]
            if should_expand_args:
                row.extend(
                    [
                        r[2][i] if r[2] is not None else None
                        for _, i in args_expansion_spec
                    ]
                )
            else:
                row.append(r[2])

            if should_expand_results:
                row.extend(
                    [
                        r[3][i] if r[3] is not None else None
                        for _, i in results_expansion_spec
                    ]
                )
            else:
                row.append(r[3])

            rows_to_write.append(row)

        with open(log_file, "w", newline="") as lf:
            writer = csv.writer(lf)
            writer.writerow(fieldnames)
            writer.writerows(rows_to_write)

    def init_retries(self, retries: int, retry_delay: int, save_settings: bool = False):
        """
        Given a number of task retries and a retry_delay,
        checks to make sure those values are valid
        (ints greater than or equal to zero), and
        optionally sets class variables to keep their
        settings
        """
        if isinstance(retries, int):
            if retries < 0:
                raise ValueError(
                    "Number of task retries must be greater than or equal to zero"
                )
            elif save_settings:
                self.retries = retries
        else:
            raise TypeError("retries must be an int greater than or equal to zero")

        if isinstance(retry_delay, int):
            if retry_delay < 0:
                raise ValueError("Retry delay must be greater than or equal to zero")
            elif save_settings:
                self.retry_delay = retry_delay
        else:
            raise TypeError(
                "retry_delay must be an int greater than or equal to zero, representing the number of seconds to wait before retrying a task"
            )

        return retries, retry_delay

    def _check_env_and_run(self):
        """
        Check if $TMPDIR is in /local, log warning if it is
        Then, run self.main()
        """
        logger = self.get_logger()

        try:
            # $TMPDIR is the default temporary directory that deployments use to store and execute code
            # is $TMPDIR set, and can we resolve it (find it on the filesystem)?
            tmp_dir = Path(os.environ["TMPDIR"]).resolve(strict=True)
        except KeyError:
            # KeyError if there is no such environment variable
            logger.warning("No $TMPDIR environment variable found!")
        except FileNotFoundError:
            # when we tried to resolve the path, the folder wasn't found on filesystem
            logger.warning("$TMPDIR path not found!")
        else:
            # /local points to local storage on W&M HPC
            slash_local = Path("/local").resolve()
            # is /local a parent dir of tmp_dir?
            for p in tmp_dir.parents:
                if p.resolve() == slash_local:
                    logger.warning(
                        "$TMPDIR in /local, deployments won't be accessible to compute nodes."
                    )

        # run the dataset (self.main() should be defined in child class instance)
        self.main()

    def run(
        self,
        params: RunParameters,
        **kwargs,
    ):
        """
        Run a dataset
        Initializes class variables and chosen backend
        This is how Datasets should usually be run
        Eventually calls _check_env_and_run(), starting dataset (see below)
        """

        # get current timestamp and initialize log directory
        timestamp = datetime.today()
        time_format_str: str = "%Y_%m_%d_%H_%M"
        time_str = timestamp.strftime(time_format_str)
        self.log_dir = Path(params.log_dir) / time_str
        os.makedirs(self.log_dir, exist_ok=True)

        self.init_retries(params.retries, params.retry_delay, save_settings=True)

        self.chunksize = params.chunksize

        self.bypass_error_wrapper = params.bypass_error_wrapper

        # Allow datasets to set their own default max_workers
        if params.max_workers is None and hasattr(self, "max_workers"):
            max_workers = self.max_workers
        else:
            max_workers = params.max_workers
            self.max_workers = max_workers

        # If dataset doesn't come with a name use its class name
        if not self.name:
            self.name = self._type()

        if params.backend == "prefect":
            self.backend = "prefect"

            from prefect import flow
            from prefect.task_runners import (  # , ThreadPoolTaskRunner
                ConcurrentTaskRunner,
                SequentialTaskRunner,
            )

            if params.task_runner == "sequential":
                tr = SequentialTaskRunner
            elif params.task_runner == "concurrent" or params.task_runner is None:
                # placeholder for actual ThreadPoolTaskRunner release in future Prefect versions
                # tr = ThreadPoolTaskRunner(max_workers=max_workers)
                tr = ConcurrentTaskRunner()

            elif params.task_runner == "dask":
                from prefect_dask import DaskTaskRunner

                # if "cluster" in kwargs:
                # del kwargs["cluster"]
                # if "cluster_kwargs" in kwargs:
                # del kwargs["cluster_kwargs"]

                dask_cluster_kwargs = {
                    "n_workers": max_workers,
                    "threads_per_worker": params.threads_per_worker,
                }
                tr = DaskTaskRunner(cluster_kwargs=dask_cluster_kwargs)
            elif params.task_runner == "hpc":
                from hpc import HPCDaskTaskRunner

                job_name = "".join(self.name.split())
                tr = HPCDaskTaskRunner(
                    num_procs=max_workers,
                    job_name=job_name,
                    log_dir=self.log_dir,
                    **kwargs,
                )
            elif params.task_runner == "kubernetes":
                from dask_kubernetes.operator import KubeCluster, make_cluster_spec
                from prefect_dask import DaskTaskRunner

                spec = make_cluster_spec(name="selector-example", n_workers=2)
                spec["spec"]["worker"]["spec"]["containers"][0][
                    "image"
                ] = "docker.io/jacobwhall/geodata-dask"
                spec["spec"]["worker"]["spec"]["containers"][0][
                    "imagePullPolicy"
                ] = "Always"
                spec["spec"]["worker"]["spec"]["containers"][0]["env"] = [
                    {
                        "name": "DATA_MANAGER_VERSION",
                        "value": os.environ["DATA_MANAGER_VERSION"],
                    }
                ]
                spec["spec"]["worker"]["spec"]["containers"][0]["volumeMounts"] = [
                    {"name": "sciclone", "mountPath": "/sciclone"}
                ]

                spec["spec"]["worker"]["spec"]["volumes"] = [
                    {
                        "name": "sciclone",
                        "persistentVolumeClaim": {"claimName": "nova-geodata-prod"},
                    }
                ]

                dask_task_runner_kwargs = {
                    "cluster_class": KubeCluster,
                    "cluster_kwargs": {
                        "custom_cluster_spec": spec,
                    },
                    "adapt_kwargs": {
                        "minimum": 1,
                        "maximum": max_workers,
                    },
                }
                tr = DaskTaskRunner(**dask_task_runner_kwargs)
            else:
                raise ValueError("Prefect task runner not recognized")

            @flow(task_runner=tr, name=self.name)
            def prefect_main_wrapper():
                self._check_env_and_run()

            prefect_main_wrapper()

        else:
            logger = logging.getLogger("dataset")
            logger.setLevel(params.logger_level)
            logger.addHandler(logging.StreamHandler())

            if params.backend == "mpi":
                from mpi4py import MPI

                comm = MPI.COMM_WORLD
                rank = comm.Get_rank()
                if rank != 0:
                    return

                self.backend = "mpi"
                self.mpi_max_workers = max_workers

                self._check_env_and_run()

            elif params.backend == "local":
                if params.run_parallel:
                    self.backend = "concurrent"
                else:
                    self.backend = "serial"
                self._check_env_and_run()

            else:
                raise ValueError(f"Backend {params.backend} not recognized.")

error_wrapper(func, args)

This is the wrapper that is used when running individual tasks It will always return a TaskResult!

Source code in data_manager/dataset.py
def error_wrapper(self, func: Callable, args: Dict[str, Any]):
    """
    This is the wrapper that is used when running individual tasks
    It will always return a TaskResult!
    """
    logger = self.get_logger()

    for try_no in range(self.retries + 1):
        try:
            return TaskResult(0, "Success", args, func(*args))
        except Exception as e:
            if self.bypass_error_wrapper:
                logger.info(
                    "Task failed with exception, and error wrapper bypass enabled. Raising..."
                )
                raise
            if try_no < self.retries:
                logger.error(f"Task failed with exception (retrying): {repr(e)}")
                time.sleep(self.retry_delay)
                continue
            else:
                logger.error(f"Task failed with exception (giving up): {repr(e)}")
                return TaskResult(1, repr(e), args, None)

get_logger()

This function will return a logger that implements the Python logging API: https://docs.python.org/3/library/logging.html

If you are using Prefect, the logs will be managed by Prefect

Source code in data_manager/dataset.py
def get_logger(self):
    """
    This function will return a logger that implements the Python logging API:
    https://docs.python.org/3/library/logging.html

    If you are using Prefect, the logs will be managed by Prefect
    """
    if self.backend == "prefect":
        from prefect import get_run_logger

        return get_run_logger()
    else:
        return logging.getLogger("dataset")

init_retries(retries, retry_delay, save_settings=False)

Given a number of task retries and a retry_delay, checks to make sure those values are valid (ints greater than or equal to zero), and optionally sets class variables to keep their settings

Source code in data_manager/dataset.py
def init_retries(self, retries: int, retry_delay: int, save_settings: bool = False):
    """
    Given a number of task retries and a retry_delay,
    checks to make sure those values are valid
    (ints greater than or equal to zero), and
    optionally sets class variables to keep their
    settings
    """
    if isinstance(retries, int):
        if retries < 0:
            raise ValueError(
                "Number of task retries must be greater than or equal to zero"
            )
        elif save_settings:
            self.retries = retries
    else:
        raise TypeError("retries must be an int greater than or equal to zero")

    if isinstance(retry_delay, int):
        if retry_delay < 0:
            raise ValueError("Retry delay must be greater than or equal to zero")
        elif save_settings:
            self.retry_delay = retry_delay
    else:
        raise TypeError(
            "retry_delay must be an int greater than or equal to zero, representing the number of seconds to wait before retrying a task"
        )

    return retries, retry_delay

log_run(results, expand_args=[], expand_results=[], time_format_str='%Y_%m_%d_%H_%M')

Log a task run Given a ResultTuple (usually from run_tasks), and save its logs to a CSV file time_format_str sets the timestamp format to use in the CSV filename

expand_results is an optional set of labels for each item in TaskResult.result - None values in expand_results will exclude that column from output - if expand_results is an empty list, each TaskResult's result value will be written as-is to a "results" column in the CSV

Source code in data_manager/dataset.py
def log_run(
    self,
    results,
    expand_args: list = [],
    expand_results: list = [],
    time_format_str: str = "%Y_%m_%d_%H_%M",
):
    """
    Log a task run
    Given a ResultTuple (usually from run_tasks), and save its logs to a CSV file
    time_format_str sets the timestamp format to use in the CSV filename

    expand_results is an optional set of labels for each item in TaskResult.result
      - None values in expand_results will exclude that column from output
      - if expand_results is an empty list, each TaskResult's result value will be
        written as-is to a "results" column in the CSV
    """
    time_str = results.timestamp.strftime(time_format_str)
    log_file = self.log_dir / f"{results.name}_{time_str}.csv"

    fieldnames = ["status_code", "status_message"]

    should_expand_args = False
    args_expansion_spec = []

    for ai, ax in enumerate(expand_args):
        if ax is not None:
            should_expand_args = True
            fieldnames.append(ax)
            args_expansion_spec.append((ax, ai))

    if not should_expand_args:
        fieldnames.append("args")

    should_expand_results = False
    results_expansion_spec = []

    for ri, rx in enumerate(expand_results):
        if rx is not None:
            should_expand_results = True
            fieldnames.append(rx)
            results_expansion_spec.append((rx, ri))

    if not should_expand_results:
        fieldnames.append("results")

    rows_to_write = []

    for r in results:
        row = [r[0], r[1]]
        if should_expand_args:
            row.extend(
                [
                    r[2][i] if r[2] is not None else None
                    for _, i in args_expansion_spec
                ]
            )
        else:
            row.append(r[2])

        if should_expand_results:
            row.extend(
                [
                    r[3][i] if r[3] is not None else None
                    for _, i in results_expansion_spec
                ]
            )
        else:
            row.append(r[3])

        rows_to_write.append(row)

    with open(log_file, "w", newline="") as lf:
        writer = csv.writer(lf)
        writer.writerow(fieldnames)
        writer.writerows(rows_to_write)

main() abstractmethod

Dataset child classes must implement a main function This is the function that is called when Dataset.run() is invoked

Source code in data_manager/dataset.py
@abstractmethod
def main(self):
    """
    Dataset child classes must implement a main function
    This is the function that is called when Dataset.run() is invoked
    """
    raise NotImplementedError("Dataset classes must implement a main function")

run(params, **kwargs)

Run a dataset Initializes class variables and chosen backend This is how Datasets should usually be run Eventually calls _check_env_and_run(), starting dataset (see below)

Source code in data_manager/dataset.py
def run(
    self,
    params: RunParameters,
    **kwargs,
):
    """
    Run a dataset
    Initializes class variables and chosen backend
    This is how Datasets should usually be run
    Eventually calls _check_env_and_run(), starting dataset (see below)
    """

    # get current timestamp and initialize log directory
    timestamp = datetime.today()
    time_format_str: str = "%Y_%m_%d_%H_%M"
    time_str = timestamp.strftime(time_format_str)
    self.log_dir = Path(params.log_dir) / time_str
    os.makedirs(self.log_dir, exist_ok=True)

    self.init_retries(params.retries, params.retry_delay, save_settings=True)

    self.chunksize = params.chunksize

    self.bypass_error_wrapper = params.bypass_error_wrapper

    # Allow datasets to set their own default max_workers
    if params.max_workers is None and hasattr(self, "max_workers"):
        max_workers = self.max_workers
    else:
        max_workers = params.max_workers
        self.max_workers = max_workers

    # If dataset doesn't come with a name use its class name
    if not self.name:
        self.name = self._type()

    if params.backend == "prefect":
        self.backend = "prefect"

        from prefect import flow
        from prefect.task_runners import (  # , ThreadPoolTaskRunner
            ConcurrentTaskRunner,
            SequentialTaskRunner,
        )

        if params.task_runner == "sequential":
            tr = SequentialTaskRunner
        elif params.task_runner == "concurrent" or params.task_runner is None:
            # placeholder for actual ThreadPoolTaskRunner release in future Prefect versions
            # tr = ThreadPoolTaskRunner(max_workers=max_workers)
            tr = ConcurrentTaskRunner()

        elif params.task_runner == "dask":
            from prefect_dask import DaskTaskRunner

            # if "cluster" in kwargs:
            # del kwargs["cluster"]
            # if "cluster_kwargs" in kwargs:
            # del kwargs["cluster_kwargs"]

            dask_cluster_kwargs = {
                "n_workers": max_workers,
                "threads_per_worker": params.threads_per_worker,
            }
            tr = DaskTaskRunner(cluster_kwargs=dask_cluster_kwargs)
        elif params.task_runner == "hpc":
            from hpc import HPCDaskTaskRunner

            job_name = "".join(self.name.split())
            tr = HPCDaskTaskRunner(
                num_procs=max_workers,
                job_name=job_name,
                log_dir=self.log_dir,
                **kwargs,
            )
        elif params.task_runner == "kubernetes":
            from dask_kubernetes.operator import KubeCluster, make_cluster_spec
            from prefect_dask import DaskTaskRunner

            spec = make_cluster_spec(name="selector-example", n_workers=2)
            spec["spec"]["worker"]["spec"]["containers"][0][
                "image"
            ] = "docker.io/jacobwhall/geodata-dask"
            spec["spec"]["worker"]["spec"]["containers"][0][
                "imagePullPolicy"
            ] = "Always"
            spec["spec"]["worker"]["spec"]["containers"][0]["env"] = [
                {
                    "name": "DATA_MANAGER_VERSION",
                    "value": os.environ["DATA_MANAGER_VERSION"],
                }
            ]
            spec["spec"]["worker"]["spec"]["containers"][0]["volumeMounts"] = [
                {"name": "sciclone", "mountPath": "/sciclone"}
            ]

            spec["spec"]["worker"]["spec"]["volumes"] = [
                {
                    "name": "sciclone",
                    "persistentVolumeClaim": {"claimName": "nova-geodata-prod"},
                }
            ]

            dask_task_runner_kwargs = {
                "cluster_class": KubeCluster,
                "cluster_kwargs": {
                    "custom_cluster_spec": spec,
                },
                "adapt_kwargs": {
                    "minimum": 1,
                    "maximum": max_workers,
                },
            }
            tr = DaskTaskRunner(**dask_task_runner_kwargs)
        else:
            raise ValueError("Prefect task runner not recognized")

        @flow(task_runner=tr, name=self.name)
        def prefect_main_wrapper():
            self._check_env_and_run()

        prefect_main_wrapper()

    else:
        logger = logging.getLogger("dataset")
        logger.setLevel(params.logger_level)
        logger.addHandler(logging.StreamHandler())

        if params.backend == "mpi":
            from mpi4py import MPI

            comm = MPI.COMM_WORLD
            rank = comm.Get_rank()
            if rank != 0:
                return

            self.backend = "mpi"
            self.mpi_max_workers = max_workers

            self._check_env_and_run()

        elif params.backend == "local":
            if params.run_parallel:
                self.backend = "concurrent"
            else:
                self.backend = "serial"
            self._check_env_and_run()

        else:
            raise ValueError(f"Backend {params.backend} not recognized.")

run_concurrent_tasks(name, func, input_list, force_sequential, max_workers=None)

Run tasks concurrently (locally), given a function a list of inputs This will always return a list of TaskResults!

Source code in data_manager/dataset.py
def run_concurrent_tasks(
    self,
    name: str,
    func: Callable,
    input_list: Iterable[Dict[str, Any]],
    force_sequential: bool,
    max_workers: int = None,
):
    """
    Run tasks concurrently (locally), given a function a list of inputs
    This will always return a list of TaskResults!
    """

    pool_size = 1 if force_sequential else max_workers
    with multiprocessing.Pool(pool_size) as pool:
        results = pool.starmap(
            self.error_wrapper,
            [(func, i) for i in input_list],
            chunksize=self.chunksize,
        )
    return results

run_mpi_tasks(name, func, input_list, force_sequential, max_workers=None)

Run tasks using MPI, requiring the use of mpirun self.pool is an MPIPoolExecutor initialized by self.run() This will always return a list of TaskResults!

Source code in data_manager/dataset.py
def run_mpi_tasks(
    self,
    name: str,
    func: Callable,
    input_list: Iterable[Dict[str, Any]],
    force_sequential: bool,
    max_workers: int = None,
):
    """
    Run tasks using MPI, requiring the use of `mpirun`
    self.pool is an MPIPoolExecutor initialized by self.run()
    This will always return a list of TaskResults!
    """
    from mpi4py.futures import MPIPoolExecutor

    if not max_workers:
        max_workers = self.mpi_max_workers

    with MPIPoolExecutor(max_workers=max_workers, chunksize=self.chunksize) as pool:
        futures = []
        for i in input_list:
            f = pool.submit(self.error_wrapper, func, i)
            if force_sequential:
                wait([f])
            futures.append(f)
    return [f.result() for f in futures]

run_prefect_tasks(name, func, input_list, force_sequential, prefect_concurrency_tag=None, prefect_concurrency_task_value=1)

Run tasks using Prefect, using whichever task runner decided in self.run() This will always return a list of TaskResults!

Source code in data_manager/dataset.py
def run_prefect_tasks(
    self,
    name: str,
    func: Callable,
    input_list: Iterable[Dict[str, Any]],
    force_sequential: bool,
    prefect_concurrency_tag: str = None,
    prefect_concurrency_task_value: int = 1,
):
    """
    Run tasks using Prefect, using whichever task runner decided in self.run()
    This will always return a list of TaskResults!
    """

    from prefect import task
    from prefect.concurrency.sync import concurrency

    logger = self.get_logger()

    def cfunc(wrapper_args, func_args):
        func, prefect_concurrency_tag, prefect_concurrency_task_value = wrapper_args
        with concurrency(
            prefect_concurrency_tag, occupy=prefect_concurrency_task_value
        ):
            return func(*func_args)

    if not prefect_concurrency_tag:
        task_wrapper = task(
            func,
            name=name,
            retries=self.retries,
            retry_delay_seconds=self.retry_delay,
            persist_result=True,
        )
    else:
        task_wrapper = task(
            cfunc,
            name=name,
            retries=self.retries,
            retry_delay_seconds=self.retry_delay,
            persist_result=True,
        )

    futures = []
    for i in input_list:
        w = [f[1] for f in futures] if force_sequential else None
        if prefect_concurrency_tag:
            args = (
                (func, prefect_concurrency_tag, prefect_concurrency_task_value),
                i,
            )
        else:
            args = i
        futures.append(
            (args, task_wrapper.submit(*args, wait_for=w, return_state=False))
        )

    results = []

    states = [(i[0], i[1].wait()) for i in futures]

    while states:
        for ix, (inputs, state) in enumerate(states):
            if state.is_completed():
                # print('complete', ix, inputs)
                logger.info(f"complete - {ix} - {inputs}")

                results.append(TaskResult(0, "Success", inputs, state.result()))
            elif state.is_failed() or state.is_crashed() or state.is_cancelled():
                # print('fail', ix, inputs)
                logger.info(f"fail - {ix} - {inputs}")

                try:
                    msg = repr(state.result(raise_on_failure=True))
                except Exception as e:
                    msg = f"Unable to retrieve error message - {e}"
                results.append(TaskResult(1, msg, inputs, None))
            else:
                # print('not ready', ix, inputs)
                continue
            _ = states.pop(ix)
        time.sleep(5)

    # for inputs, future in futures:
    #     state = future.wait(60*60*2)
    #     if state.is_completed():
    #         results.append(TaskResult(0, "Success", inputs, state.result()))
    #     elif state.is_failed() or state.is_crashed():
    #         try:
    #             msg = repr(state.result(raise_on_failure=False))
    #         except:
    #             msg = "Unable to retrieve error message"
    #         results.append(TaskResult(1, msg, inputs, None))
    #     else:
    #         pass

    # while futures:
    #     for ix, (inputs, future) in enumerate(futures):
    #         state = future.get_state()
    #         # print(repr(state))
    #         # print(repr(future))
    #         if state.is_completed():
    #             print('complete', ix, inputs)
    #             results.append(TaskResult(0, "Success", inputs, future.result()))
    #         elif state.is_failed() or state.is_crashed() or state.is_cancelled():
    #             print('fail', ix, inputs)
    #             try:
    #                 msg = repr(future.result(raise_on_failure=True))
    #             except Exception as e:
    #                 msg = f"Unable to retrieve error message - {e}"
    #             results.append(TaskResult(1, msg, inputs, None))
    #         else:
    #             # print('not ready', ix, inputs)
    #             continue
    #         _ = futures.pop(ix)
    #         # future.release()
    #     time.sleep(5)

    return results

run_serial_tasks(name, func, input_list)

Run tasks in serial (locally), given a function and list of inputs This will always return a list of TaskResults!

Source code in data_manager/dataset.py
def run_serial_tasks(
    self, name, func: Callable, input_list: Iterable[Dict[str, Any]]
):
    """
    Run tasks in serial (locally), given a function and list of inputs
    This will always return a list of TaskResults!
    """
    logger = self.get_logger()
    logger.debug(f"run_serial_tasks - input_list: {input_list}")
    return [self.error_wrapper(func, i) for i in input_list]

run_tasks(func, input_list, name=None, retries=3, retry_delay=60, force_sequential=False, force_serial=False, max_workers=None, prefect_concurrency_tag=None, prefect_concurrency_task_value=None)

Run a bunch of tasks, calling one of the above run_tasks functions This is the function that should be called most often from self.main() It will return a ResultTuple of TaskResults

Parameters:

Name Type Description Default
func Callable

The function to run for each task.

required
input_list Iterable[Dict[str, Any]]

An iterable of function inputs. For each input, a new task will be created with that input passed as the only parameter.

required
name Optional[str]

A name for this task run, for easier reference.

None
retries int

Number of times to retry a task before giving up.

3
retry_delay int

Delay (in seconds) to wait between task retries.

60
force_sequential bool

If set to True, all tasks in this run will be run in sequence, regardless of backend.

False
force_serial bool

If set to True, all tasks will be run locally (using the internal "serial runner") rather than with this Dataset's usual backend. Please avoid using this parameter, it will likely be deprecated soon!

False
max_workers Optional[int]

Maximum number of tasks to run at once, if using a concurrent mode. This value will not override force_sequential or force_serial. Warning: This is not yet supported by the Prefect backend. We hope to fix this soon.

None
prefect_concurrency_tag Optional[str]

If using the Prefect backend, this tag will be used to limit the concurrency of this task. This will eventually be deprecated in favor of max_workers once we have implemented that for the Prefect backend.

None
prefect_concurrency_task_value Optional[int]

If using the Prefect backend, this sets the maximum number of tasks to run at once, similar to max_workers. See warning above.

None
Source code in data_manager/dataset.py
def run_tasks(
    self,
    func: Callable,
    input_list: Iterable[Dict[str, Any]],
    name: Optional[str] = None,
    retries: int = 3,
    retry_delay: int = 60,
    force_sequential: bool = False,
    force_serial: bool = False,
    max_workers: Optional[int] = None,
    prefect_concurrency_tag: Optional[str] = None,
    prefect_concurrency_task_value: Optional[int] = None,
) -> ResultTuple:
    """
    Run a bunch of tasks, calling one of the above run_tasks functions
    This is the function that should be called most often from self.main()
    It will return a ResultTuple of TaskResults

    Parameters:
        func: The function to run for each task.
        input_list: An iterable of function inputs. For each input, a new task will be created with that input passed as the only parameter.
        name: A name for this task run, for easier reference.
        retries: Number of times to retry a task before giving up.
        retry_delay: Delay (in seconds) to wait between task retries.
        force_sequential: If set to `True`, all tasks in this run will be run in sequence, regardless of backend.
        force_serial: If set to `True`, all tasks will be run locally (using the internal "serial runner") rather than with this Dataset's usual backend. **Please avoid using this parameter, it will likely be deprecated soon!**
        max_workers: Maximum number of tasks to run at once, if using a concurrent mode. This value will not override `force_sequential` or `force_serial`. **Warning: This is not yet supported by the Prefect backend. We hope to fix this soon.**
        prefect_concurrency_tag: If using the Prefect backend, this tag will be used to limit the concurrency of this task. **This will eventually be deprecated in favor of `max_workers` once we have implemented that for the Prefect backend.**
        prefect_concurrency_task_value: If using the Prefect backend, this sets the maximum number of tasks to run at once, similar to `max_workers`. **See warning above.**
    """

    timestamp = datetime.today()

    if not callable(func):
        raise TypeError("Function passed to run_tasks is not callable")

    # Save global retry settings, and override with current values
    old_retries, old_retry_delay = self.retries, self.retry_delay
    self.retries, self.retry_delay = self.init_retries(retries, retry_delay)

    logger = self.get_logger()

    if name is None:
        try:
            name = func.__name__
        except AttributeError:
            logger.warning(
                "No name given for task run, and function does not have a name (multiple unnamed functions may result in log files being overwritten)"
            )
            name = "unnamed"
    elif not isinstance(name, str):
        raise TypeError("Name of task run must be a string")

    if max_workers is None and hasattr(self, "max_workers"):
        max_workers = self.max_workers

    if self.backend == "serial" or force_serial:
        results = self.run_serial_tasks(name, func, input_list)
    elif self.backend == "concurrent":
        results = self.run_concurrent_tasks(
            name, func, input_list, force_sequential, max_workers=max_workers
        )
    elif self.backend == "prefect":
        results = self.run_prefect_tasks(
            name,
            func,
            input_list,
            force_sequential,
            prefect_concurrency_tag,
            prefect_concurrency_task_value,
        )

    elif self.backend == "mpi":
        results = self.run_mpi_tasks(
            name, func, input_list, force_sequential, max_workers=max_workers
        )
    else:
        raise ValueError(
            "Requested backend not recognized. Have you called this Dataset's run function?"
        )

    if len(results) == 0:
        raise ValueError(
            f"Task run {name} yielded no results. Did it receive any inputs?"
        )

    success_count = sum(1 for r in results if r.status_code == 0)
    error_count = len(results) - success_count
    if error_count == 0:
        logger.info(
            f"Task run {name} completed with {success_count} successes and no errors"
        )
    else:
        logger.warning(
            f"Task run {name} completed with {error_count} errors and {success_count} successes"
        )

    # Restore global retry settings
    self.retries, self.retry_delay = old_retries, old_retry_delay

    return ResultTuple(results, name, timestamp)

tmp_to_dst_file(final_dst, make_dst_dir=False, tmp_dir=None, validate_cog=False)

Context manager that provides a temporary file path to write output files to, that is automatically moved to a final destination once the context is exited. This prevents interrupted jobs from leaving partially-written files in the filesystem where they might be mistaken for complete files.

Additionally, this context manager can create output directories that don't exist yet, or validate COG files after they've been written. See the list of parameters below for more information.

Here is an example of its use:

with self.tmp_to_dst_file(final_dst, validate_cog=True) as tmp_dst:
    with rasterio.open(tmp_dst, "w", **meta) as dst:
        ...

Parameters:

Name Type Description Default
final_dst str | PathLike

Path to where the file should be written.

required
make_dst_dir bool

If set to true, the parent directory of final_dst will be created (and any of its parents, as necessary)

False
tmp_dir Optional[str | PathLike]

Path to directory where file should be temporarily stored. If set to None, a default directory will be used.

None
validate_cog bool

If set to True, the written file will be validated as a COG, and an exception will be raised if this validation fails.

False
Source code in data_manager/dataset.py
@contextmanager
def tmp_to_dst_file(
    self,
    final_dst: str | os.PathLike,
    make_dst_dir: bool = False,
    tmp_dir: Optional[str | os.PathLike] = None,
    validate_cog: bool = False,
):
    """
    Context manager that provides a temporary file path to write
    output files to, that is automatically moved to a final destination
    once the context is exited. This prevents interrupted jobs
    from leaving partially-written files in the filesystem where
    they might be mistaken for complete files.

    Additionally, this context manager can create output directories
    that don't exist yet, or validate COG files after they've been
    written. See the list of parameters below for more information.

    Here is an example of its use:

    ```python
    with self.tmp_to_dst_file(final_dst, validate_cog=True) as tmp_dst:
        with rasterio.open(tmp_dst, "w", **meta) as dst:
            ...
    ```

    Parameters:
        final_dst: Path to where the file should be written.
        make_dst_dir: If set to true, the parent directory of `final_dst` will be created (and any of its parents, as necessary)
        tmp_dir: Path to directory where file should be temporarily stored. If set to `None`, a default directory will be used.
        validate_cog: If set to `True`, the written file will be validated as a COG, and an exception will be raised if this validation fails.
    """
    logger = self.get_logger()

    final_dst = Path(final_dst)

    # make sure that final_dst parent directory exists
    if not final_dst.parent.exists():
        if make_dst_dir:
            os.makedirs(final_dst.parent, exist_ok=True)
        else:
            raise FileNotFoundError(
                f"Parent directory of requested filepath {str(final_dst)} does not exist."
            )

    tmp_sub_dir = mkdtemp(dir=tmp_dir)
    _, tmp_path = mkstemp(dir=tmp_sub_dir)
    logger.debug(
        f"Created temporary file {tmp_path} with final destination {str(final_dst)}"
    )
    yield tmp_path

    # validate Cloud Optimized GeoTIFF
    # doing this before move because disk r/w is almost certainly faster
    if validate_cog:
        is_valid, errors, warnings = cog_validate(tmp_path)
        if is_valid:
            logger.info(
                f"Successfully validated output COG {tmp_path} (destined for {str(final_dst)}))"
            )
        else:
            logger.exception(
                f"Failed to validate COG {tmp_path} (destined for {str(final_dst)})"
            )
        for error in errors:
            logger.error(f"Error encountered when validating COG: {error}")
        for warning in warnings:
            logger.warning(f"Warning encountered when validating COG: {warning}")

    # move file from tmp_path to final_dst
    try:
        logger.debug(f"Attempting to move {tmp_path} to {str(final_dst)}")
        shutil.move(tmp_path, final_dst)
    except Exception:
        logger.exception(
            f"Failed to transfer temporary file {tmp_path} to final destination {str(final_dst)}"
        )
    else:
        logger.debug(
            f"Successfully transferred {tmp_path} to final destination {str(final_dst)}"
        )

ResultTuple

Bases: Sequence

This is an immutable sequence designed to hold TaskResults It also keeps track of the name of a run and the time it started ResultTuple.results() returns a list of results from each task.

Inherits the Sequence class, and therefore provides methods such as __len__ and __getitem__.

Source code in data_manager/dataset.py
class ResultTuple(Sequence):
    """
    This is an immutable sequence designed to hold TaskResults
    It also keeps track of the name of a run and the time it started
    ResultTuple.results() returns a list of results from each task.

    Inherits the `Sequence` class, and therefore provides methods
    such as `__len__` and `__getitem__`.
    """

    def __init__(
        self,
        iterable: Iterable[TaskResult],
        name: str,
        timestamp: datetime = datetime.today(),
    ):
        """
        Parameters:
            iterable: Itererable of `TaskResult`s to store.
            name: Name of this `ResultTuple`.
            timestamp: Timestamp of the task run that produced these results.
        """
        self.elements = []
        for value in iterable:
            if isinstance(value, TaskResult):
                self.elements.append(value)
            else:
                raise ValueError(
                    "ResultTuples must only consist of TaskResult namedtuples!"
                )
        self.name = name
        self.timestamp = timestamp

    def __getitem__(self, key: int):
        return self.elements[key]

    def __len__(self) -> int:
        """
        Returns the number of results in this `ResultTuple`.
        """
        return len(self.elements)

    def __repr__(self) -> str:
        success_count = sum(1 for t in self.elements if t.status_code == 0)
        error_count = len(self.elements) - success_count
        return f'<ResultTuple named "{self.name}" with {success_count} successes, {error_count} errors>'

    def args(self):
        args = [t.args for t in self.elements if t.status_code == 0]
        if len(args) < len(self.elements):
            logging.getLogger("dataset").warning(
                f"args() function for ResultTuple {self.name} skipping errored tasks"
            )
        return args

    def results(self):
        results = [t.result for t in self.elements if t.status_code == 0]
        if len(results) < len(self.elements):
            logging.getLogger("dataset").warning(
                f"results() function for ResultTuple {self.name} skipping errored tasks"
            )
        return results

__init__(iterable, name, timestamp=datetime.today())

Parameters:

Name Type Description Default
iterable Iterable[TaskResult]

Itererable of TaskResults to store.

required
name str

Name of this ResultTuple.

required
timestamp datetime

Timestamp of the task run that produced these results.

today()
Source code in data_manager/dataset.py
def __init__(
    self,
    iterable: Iterable[TaskResult],
    name: str,
    timestamp: datetime = datetime.today(),
):
    """
    Parameters:
        iterable: Itererable of `TaskResult`s to store.
        name: Name of this `ResultTuple`.
        timestamp: Timestamp of the task run that produced these results.
    """
    self.elements = []
    for value in iterable:
        if isinstance(value, TaskResult):
            self.elements.append(value)
        else:
            raise ValueError(
                "ResultTuples must only consist of TaskResult namedtuples!"
            )
    self.name = name
    self.timestamp = timestamp

__len__()

Returns the number of results in this ResultTuple.

Source code in data_manager/dataset.py
def __len__(self) -> int:
    """
    Returns the number of results in this `ResultTuple`.
    """
    return len(self.elements)