|
51 | 51 | ) |
52 | 52 | from .writablebagfile import create_job, write_bag_file # change this later |
53 | 53 |
|
| 54 | +# from schema_salad.utils import convert_to_dict |
| 55 | + |
| 56 | + |
54 | 57 | if TYPE_CHECKING: |
55 | 58 | from .ro import ResearchObject |
56 | 59 |
|
| 60 | +_attributes_type = Dict[Union[str, Identifier], Any] |
| 61 | + |
57 | 62 |
|
58 | 63 | def copy_job_order(job: Union[Process, JobsType], job_order_object: CWLObjectType) -> CWLObjectType: |
59 | 64 | """Create copy of job object for provenance.""" |
@@ -235,13 +240,13 @@ def evaluate( |
235 | 240 | """Evaluate the nature of job.""" |
236 | 241 | if not hasattr(process, "steps"): |
237 | 242 | # record provenance of independent commandline tool executions |
238 | | - self.prospective_prov(job) |
| 243 | + self.prospective_prov(job, process) |
239 | 244 | customised_job = copy_job_order(job, job_order_object) |
240 | 245 | self.used_artefacts(customised_job, self.workflow_run_uri) |
241 | 246 | create_job(research_obj, customised_job) |
242 | 247 | elif hasattr(job, "workflow"): |
243 | 248 | # record provenance of workflow executions |
244 | | - self.prospective_prov(job) |
| 249 | + self.prospective_prov(job, process) |
245 | 250 | customised_job = copy_job_order(job, job_order_object) |
246 | 251 | self.used_artefacts(customised_job, self.workflow_run_uri) |
247 | 252 | # if CWLPROV['prov'].uri in job_order_object: # maybe move this to another place |
@@ -306,8 +311,7 @@ def _add_nested_annotations( |
306 | 311 | ) -> ProvEntity: |
307 | 312 | """Propagate input data annotations to provenance.""" |
308 | 313 | # Change https:// into http:// first |
309 | | - schema2_uri = "https://schema.org/" |
310 | | - if schema2_uri in annotation_key: |
| 314 | + if (schema2_uri := "https://schema.org/") in annotation_key: |
311 | 315 | annotation_key = SCHEMA[annotation_key.replace(schema2_uri, "")].uri |
312 | 316 |
|
313 | 317 | if not isinstance(annotation_value, (MutableSequence, MutableMapping)): |
@@ -377,9 +381,9 @@ def declare_file(self, value: CWLObjectType) -> Tuple[ProvEntity, ProvEntity, st |
377 | 381 | self.document.specializationOf(file_entity, entity) |
378 | 382 |
|
379 | 383 | # Identify all schema annotations |
380 | | - schema_annotations = dict( |
381 | | - [(v, value[v]) for v in value.keys() if v.startswith("https://schema.org")] |
382 | | - ) |
| 384 | + schema_annotations = { |
| 385 | + v: value[v] for v in value.keys() if v.startswith("https://schema.org") |
| 386 | + } |
383 | 387 |
|
384 | 388 | # Transfer SCHEMA annotations to provenance |
385 | 389 | for s in schema_annotations: |
@@ -509,9 +513,9 @@ def declare_directory(self, value: CWLObjectType) -> ProvEntity: |
509 | 513 | coll_b.add_attributes(coll_b_attribs) |
510 | 514 |
|
511 | 515 | # Identify all schema annotations |
512 | | - schema_annotations = dict( |
513 | | - [(v, value[v]) for v in value.keys() if v.startswith("https://schema.org")] |
514 | | - ) |
| 516 | + schema_annotations = { |
| 517 | + v: value[v] for v in value.keys() if v.startswith("https://schema.org") |
| 518 | + } |
515 | 519 |
|
516 | 520 | # Transfer SCHEMA annotations to provenance |
517 | 521 | for s in schema_annotations: |
@@ -571,7 +575,7 @@ def declare_artefact(self, value: Any) -> ProvEntity: |
571 | 575 | self.research_object.add_uri(entity.identifier.uri) |
572 | 576 | return entity |
573 | 577 |
|
574 | | - if isinstance(value, (str, str)): |
| 578 | + if isinstance(value, str): |
575 | 579 | (entity, _) = self.declare_string(value) |
576 | 580 | return entity |
577 | 581 |
|
@@ -734,35 +738,38 @@ def generate_output_prov( |
734 | 738 | entity, process_run_id, timestamp, None, {"prov:role": role} |
735 | 739 | ) |
736 | 740 |
|
737 | | - def prospective_prov(self, job: JobsType) -> None: |
| 741 | + def prospective_prov(self, job: JobsType, process: Process) -> None: |
738 | 742 | """Create prospective prov recording as wfdesc prov:Plan.""" |
| 743 | + prov_items: _attributes_type = { |
| 744 | + PROV_TYPE: WFDESC["Workflow"] if isinstance(job, WorkflowJob) else WFDESC["Process"], |
| 745 | + "prov:type": PROV["Plan"], |
| 746 | + "prov:label": "Prospective provenance", |
| 747 | + } |
| 748 | + if "doc" in process.tool: |
| 749 | + prov_items["schema:description"] = process.tool["doc"] |
| 750 | + if "label" in process.tool: |
| 751 | + prov_items["schema:name"] = process.tool["label"] |
| 752 | + # # TypeError: unhashable type: 'list' |
| 753 | + # if "intent" in process.tool: |
| 754 | + # prov_items["schema:featureList"] = convert_to_dict(process.tool["intent"]) |
| 755 | + self.document.entity("wf:main", prov_items) |
739 | 756 | if not isinstance(job, WorkflowJob): |
740 | | - # direct command line tool execution |
741 | | - self.document.entity( |
742 | | - "wf:main", |
743 | | - { |
744 | | - PROV_TYPE: WFDESC["Process"], |
745 | | - "prov:type": PROV["Plan"], |
746 | | - "prov:label": "Prospective provenance", |
747 | | - }, |
748 | | - ) |
749 | 757 | return |
750 | 758 |
|
751 | | - self.document.entity( |
752 | | - "wf:main", |
753 | | - { |
754 | | - PROV_TYPE: WFDESC["Workflow"], |
755 | | - "prov:type": PROV["Plan"], |
756 | | - "prov:label": "Prospective provenance", |
757 | | - }, |
758 | | - ) |
759 | | - |
760 | 759 | for step in job.steps: |
761 | 760 | stepnametemp = "wf:main/" + str(step.name)[5:] |
762 | 761 | stepname = urllib.parse.quote(stepnametemp, safe=":/,#") |
| 762 | + provstep_items: _attributes_type = { |
| 763 | + PROV_TYPE: WFDESC["Process"], |
| 764 | + "prov:type": PROV["Plan"], |
| 765 | + } |
| 766 | + if "doc" in step.tool: |
| 767 | + provstep_items["schema:description"] = step.tool["doc"] |
| 768 | + if "label" in step.tool: |
| 769 | + provstep_items["schema:name"] = step.tool["label"] |
763 | 770 | provstep = self.document.entity( |
764 | 771 | stepname, |
765 | | - {PROV_TYPE: WFDESC["Process"], "prov:type": PROV["Plan"]}, |
| 772 | + provstep_items, |
766 | 773 | ) |
767 | 774 | self.document.entity( |
768 | 775 | "wf:main", |
|
0 commit comments