Coverage for datacite/serializers.py: 99%
540 statements
« prev ^ index » next coverage.py v7.10.1, created at 2025-07-29 15:38 +0000
« prev ^ index » next coverage.py v7.10.1, created at 2025-07-29 15:38 +0000
1# ruff: noqa: FIX002
2import contextlib
3import datetime
4import logging
5import re
6from typing import Any, cast
8from django.contrib.contenttypes.models import ContentType
9from django.db import IntegrityError, models
10from rest_framework import serializers
11from rest_framework.exceptions import ValidationError
13from datacite.models import (
14 Description,
15 Format,
16 Funding,
17 Geolocation,
18 Identifier,
19 IdentifierScheme,
20 Metadata,
21 MetadataContributor,
22 MetadataCreator,
23 Participant,
24 Publisher,
25 ResourceType,
26 Rights,
27 Title,
28 TitleTypes,
29)
30from datacite.utils import (
31 DATE_FORMAT,
32 SCHEME_URI_REGEX,
33 regex_date,
34 regex_date_range,
35 regex_year,
36 str_date_to_date,
37 year_to_date,
38)
39from datacite.validators import validate_uri
41logger = logging.getLogger(__name__)
43# TODO(garciacp): https://github.com/astral-sh/ruff/issues/3870
44# Reformat Serializers: Completely separate multi and single identified serializers,
45# even Read-Only and Write-Only and exploit the multi-inheritance of python. This may be
46# useful in the case of a funder/affiliation, where a they are participants but has only
47# one identifier on the serialized form.
50class ResourceTypeSerializer(serializers.ModelSerializer):
51 class Meta:
52 model = ResourceType
53 fields = ["resource_type", "resource_type_general"]
56##############################
57# Datacite
58##############################
59class IdentifierSchemeSerializer(serializers.ModelSerializer):
60 class Meta:
61 model = IdentifierScheme
62 fields = ["scheme", "uri"]
63 extra_kwargs: dict[str, Any] = {
64 "scheme": {"validators": []},
65 "uri": {"validators": [validate_uri]},
66 }
68 def validate(self, data: dict) -> dict:
69 """Checks if the URI and scheme are incoherent."""
70 try:
71 scheme = IdentifierScheme.objects.get(uri__iexact=data["uri"])
72 except IdentifierScheme.DoesNotExist:
73 pass # It does not exist, we are ok.
74 else:
75 if scheme.scheme != data["scheme"]: 75 ↛ 81line 75 didn't jump to line 81 because the condition on line 75 was always true
76 raise ValidationError(
77 detail="Uri already present but not the right scheme.",
78 code="invalid",
79 )
81 return data
83 def create(self, validated_data: dict) -> IdentifierScheme:
84 scheme, _ = self.get_or_create(validated_data)
85 return scheme
87 @staticmethod
88 def get_or_create(data: dict) -> tuple[IdentifierScheme, bool]:
89 """Tries to get the scheme, otherwise create it."""
90 try:
91 return IdentifierScheme.objects.get(scheme__iexact=data["scheme"]), False
92 except IdentifierScheme.DoesNotExist:
93 return IdentifierScheme.objects.create(**data), True
96class IdentifierSerializer(serializers.ModelSerializer):
97 related_instance = None
99 scheme = IdentifierSchemeSerializer()
101 class Meta:
102 model = Identifier
103 fields = ["scheme", "identifier"]
104 depth = 1
105 extra_kwargs: dict[str, Any] = {
106 "scheme": {"validators": []},
107 "identifier": {"validators": []},
108 }
110 def create(self, validated_data: dict) -> object:
111 """creates identifier if it has identifier and scheme fields and if a related
112 instance is associated to self."""
113 identifier = validated_data.pop("identifier")
114 scheme_data = validated_data.pop("scheme")
115 if not self.related_instance:
116 raise IntegrityError
117 identifier_obj, _ = Identifier.objects.get_or_create(
118 identifier=identifier, scheme=scheme_data, instance=self.related_instance
119 )
120 return identifier_obj
122 def to_representation(self, instance: Identifier) -> dict[str, Any]:
123 return {
124 "identifier": instance.identifier,
125 "identifier_scheme": instance.scheme.scheme,
126 "scheme_uri": instance.scheme.uri,
127 }
129 def __init__(self, instance: Identifier | None = None, **kwargs: Any) -> None:
130 if "data" in kwargs:
131 data = kwargs.pop("data")
132 self.related_instance = data.pop("related_instance", None)
133 kwargs["data"] = self.reformat_data(data)
135 super().__init__(instance=instance, **kwargs)
137 @staticmethod
138 def reformat_data(data: dict) -> dict:
139 """Converts plain scheme to dict scheme."""
140 if "identifier_scheme" in data and "scheme_uri" in data:
141 data["scheme"] = {
142 "scheme": data.pop("identifier_scheme"),
143 "uri": data.pop("scheme_uri"),
144 }
145 return data
148class MultiIdentifiedObjectSerializer(serializers.ModelSerializer):
149 prefix = ""
151 class Meta:
152 pass
154 def __init__(self, **kwargs: Any) -> None:
155 super().__init__(**kwargs)
156 self.Meta.fields.append("identifiers")
158 def to_representation(self, instance: Any) -> dict:
159 """
160 Serializes the instance, and adds the list of identifiers with the right prefix.
161 """
162 ret = super().to_representation(instance)
164 content_type = ContentType.objects.get_for_model(self.Meta.model)
166 identifiers = Identifier.objects.filter(
167 content_type=content_type, object_id=instance.pk
168 )
170 # Creates the list of identifiers with the given prefix.
171 identifiers_data = [
172 {
173 self.get_identifier_scheme_label(): identifier.scheme.scheme,
174 self.get_identifier_label(): identifier.identifier,
175 self.get_scheme_uri_label(): identifier.scheme.uri,
176 }
177 for identifier in identifiers
178 ]
179 if identifiers_data:
180 ret[self.get_identifiers_label()] = identifiers_data
182 return ret
184 def to_internal_value_identifier(self, data: dict) -> dict:
185 """Deserializes the identifier:
186 {"name_identifier":..., "name_identifier_scheme":..., "scheme_uri":...} ->
187 {"identifier":..., "scheme":{"scheme":..., "uri":...}}"""
188 scheme = {}
189 if self.get_identifier_scheme_label() in data: 189 ↛ 191line 189 didn't jump to line 191 because the condition on line 189 was always true
190 scheme["scheme"] = data[self.get_identifier_scheme_label()]
191 if self.get_scheme_uri_label() in data: 191 ↛ 194line 191 didn't jump to line 194 because the condition on line 191 was always true
192 scheme["uri"] = data[self.get_scheme_uri_label()]
194 return {"scheme": scheme, "identifier": data[self.get_identifier_label()]}
196 def to_internal_value(self, data: Any) -> Any:
197 """Deserializes the identified object:
198 {"name":..., ..., "name_identifiers": [{"name_identifier":...}, ...]} ->
199 {"name":..., ..., "identifiers": [{"identifier":...}, ...]}"""
200 validated_data = super().to_internal_value(data)
202 if self.get_identifiers_label() in data:
203 identifiers = [
204 self.to_internal_value_identifier(identifier_data)
205 for identifier_data in data.pop(self.get_identifiers_label(), [])
206 if self.get_identifier_label() in identifier_data
207 ]
208 if identifiers: 208 ↛ 210line 208 didn't jump to line 210 because the condition on line 208 was always true
209 validated_data["identifiers"] = identifiers
210 return validated_data
212 def create(self, validated_data: dict) -> Any:
213 """Checks if identifier exists, and gets the instance, otherwise try to create
214 it and its identifiers."""
215 identifiers = validated_data.pop("identifiers", [])
216 identified_object = None
217 if identifiers:
218 identifier_obj = Identifier.objects.filter(
219 identifier__in=[
220 identifier_data["identifier"] for identifier_data in identifiers
221 ]
222 ).first()
224 if identifier_obj and isinstance(
225 identifier_obj.content_object, self.Meta.model
226 ):
227 identified_object = identifier_obj.content_object
228 else:
229 identified_object = None
231 if not identified_object:
232 identified_object, _ = self.Meta.model.objects.get_or_create(
233 **validated_data
234 )
236 if identifiers:
237 for identifier_data in identifiers:
238 self.create_identifier(identified_object, **identifier_data)
240 return identified_object
242 @staticmethod
243 def create_identifier(
244 identified_object: models.Model,
245 identifier: str | None = None,
246 scheme: dict[str, str] | str | None = None,
247 uri: str | None = None,
248 ) -> None:
249 """If fields ok: {"identifier":..., "scheme:{...}} -> create identifier
250 I can handle scheme as plain or as dict."""
251 if scheme and isinstance(scheme, str) and uri and isinstance(uri, str):
252 scheme = {"scheme": scheme, "uri": uri}
253 if (
254 identifier
255 and scheme
256 and "uri" in scheme
257 and isinstance(scheme, dict)
258 and scheme.get("uri")
259 ):
260 Identifier.objects.get_or_create(
261 instance=identified_object,
262 identifier=identifier,
263 scheme=scheme,
264 )
266 def get_identifiers_label(self) -> str:
267 return self.prefix + "_identifiers"
269 def get_identifier_label(self) -> str:
270 return self.prefix + "_identifier"
272 def get_identifier_scheme_label(self) -> str:
273 return self.prefix + "_identifier_scheme"
275 @staticmethod
276 def get_scheme_uri_label() -> str:
277 return "scheme_uri"
280class SingleIdentifiedObjectSerializer(MultiIdentifiedObjectSerializer):
281 prefix = ""
283 def __init__(self, **kwargs: Any) -> None:
284 super().__init__(**kwargs)
285 self.Meta.fields.remove("identifiers")
286 self.Meta.fields.append("identifier")
288 def to_representation(self, instance: Any) -> dict:
289 """Gets the multi-identified serialization and extracts the first element to
290 insert at identified_object level."""
291 ret = super().to_representation(instance)
293 if (
294 self.get_identifiers_label() in ret
295 and len(ret[self.get_identifiers_label()]) == 1
296 ):
297 identifier = ret.pop(self.get_identifiers_label())[0]
298 ret.update(identifier)
300 return ret
302 def to_internal_value(self, data: Any) -> Any:
303 """Extracts identifier from identified_object and inserts it into a list
304 (using parent method)"""
305 identifier = {
306 key: data.pop(key)
307 for key in [
308 self.get_identifier_label(),
309 self.get_identifier_scheme_label(),
310 self.get_scheme_uri_label(),
311 ]
312 if key in data
313 }
315 if identifier:
316 data[self.get_identifiers_label()] = [identifier]
318 return super().to_internal_value(data)
321class AffiliationSerializer(MultiIdentifiedObjectSerializer):
322 prefix = "affiliation"
324 class Meta:
325 model = Participant
326 fields = ["name"]
328 def to_internal_value(self, data: Any) -> Any:
329 """Deals with plain identifiers and affiliation as str. Then, calls super."""
330 data = self.transform_string_input_to_name(data)
331 identifiers = self.transform_affiliation_identifier(data)
333 if identifiers:
334 data[self.get_identifiers_label()] = identifiers
336 return super().to_internal_value(data)
338 @staticmethod
339 def transform_string_input_to_name(data: Any) -> Any:
340 """At import, affiliations may be a str, we convert it to dict in the right
341 format."""
342 if isinstance(data, str):
343 return {"name": data}
344 return data
346 def transform_affiliation_identifier(self, data: dict[str, Any]) -> list[dict]:
347 """Converts the plain identifier into dict identifier.
348 Redundant code. It should use the SingleIdentifiedSerializer for the import."""
349 # TODO(garciacp): https://github.com/astral-sh/ruff/issues/3870
350 identifier = {}
351 if self.get_identifier_label() in data:
352 identifier[self.get_identifier_label()] = data.pop(
353 self.get_identifier_label()
354 )
355 if self.get_identifier_scheme_label() in data:
356 identifier[self.get_identifier_scheme_label()] = data.pop(
357 self.get_identifier_scheme_label()
358 )
359 if self.get_scheme_uri_label() in data:
360 identifier[self.get_scheme_uri_label()] = data.pop(
361 self.get_scheme_uri_label()
362 )
364 return [identifier] if identifier else []
367class DateSerializer(serializers.Serializer):
368 """Serializes from DB to Datacite json input"""
370 date_type = serializers.CharField(max_length=200)
371 date = serializers.CharField(max_length=200)
372 date_info = serializers.CharField(max_length=200, required=False)
375class DataciteParticipantSerializer(serializers.ModelSerializer):
376 """Datacite key is in singular (although a list). The source name maps
377 the affiliation (in datacite) to affiliations (in our app)."""
379 affiliation = AffiliationSerializer(
380 many=True, source="affiliations", required=False
381 )
382 name = serializers.CharField(required=False)
384 class Meta:
385 model = Participant
386 fields = [
387 "name",
388 "given_name",
389 "family_name",
390 "name_type",
391 "lang",
392 "affiliation",
393 ]
395 def to_internal_value(self, data: dict) -> Any:
396 """Converts the identifiers labels : name_identifier -> identifier.
397 Probable redundant."""
398 # TODO(garciacp): https://github.com/astral-sh/ruff/issues/3870
399 if "name_identifiers" in data:
400 identifiers = [
401 {
402 "scheme": identifier.pop("name_identifier_scheme", ""),
403 "uri": identifier.pop("scheme_uri", ""),
404 "identifier": identifier.pop("name_identifier", ""),
405 }
406 for identifier in data["name_identifiers"]
407 if "scheme_uri" in identifier
408 ]
409 validated_data = super().to_internal_value(data)
410 validated_data["identifiers"] = identifiers
411 else:
412 validated_data = super().to_internal_value(data)
413 return validated_data
416class ParticipantSerializer(MultiIdentifiedObjectSerializer):
417 prefix = "name"
419 affiliation = AffiliationSerializer(
420 source="affiliations", many=True, required=False
421 )
423 class Meta:
424 model = Participant
425 fields = [
426 "name",
427 "given_name",
428 "family_name",
429 "name_type",
430 "lang",
431 "affiliation",
432 ]
434 def create(self, data: dict) -> Any:
435 """Creates the participant with the affiliations."""
436 affiliations = data.pop("affiliations", [])
438 try:
439 participant = super().create(data)
440 except Participant.MultipleObjectsReturned:
441 participant = Participant.objects.filter(**data).first()
443 for affiliation_data in affiliations:
444 affiliation = AffiliationSerializer().create(affiliation_data)
445 participant.affiliations.add(affiliation)
447 return participant
449 def to_representation(self, instance: Any) -> dict:
450 """Manually serializes the objects....
451 Redundant."""
452 # TODO(garciacp): https://github.com/astral-sh/ruff/issues/3870
453 ret = super().to_representation(instance)
455 if "name_type" in ret and not ret["name_type"]: 455 ↛ 458line 455 didn't jump to line 458 because the condition on line 455 was always true
456 ret.pop("name_type")
458 content_type = ContentType.objects.get_for_model(self.Meta.model)
460 identifiers = Identifier.objects.filter(
461 content_type=content_type, object_id=instance.pk
462 )
463 ret.pop("identifiers", [])
464 ret["name_identifiers"] = []
465 for identifier in identifiers:
466 ret["name_identifiers"].append(
467 {
468 "name_identifier_scheme": identifier.scheme.scheme,
469 "name_identifier": identifier.identifier,
470 "scheme_uri": identifier.scheme.uri,
471 }
472 )
474 ret.pop("affiliations", [])
475 ret["affiliation"] = []
476 for affiliation in instance.affiliations.all():
477 identifier = affiliation.identifiers.filter(scheme__scheme="ROR").first()
478 if identifier is None: 478 ↛ 481line 478 didn't jump to line 481 because the condition on line 478 was always true
479 identifier = affiliation.identifiers.first()
481 identifier_data = (
482 {
483 "affiliation_identifier_scheme": identifier.scheme.scheme,
484 "affiliation_identifier": identifier.identifier,
485 "scheme_uri": identifier.scheme.uri,
486 }
487 if identifier
488 else {}
489 )
491 ret["affiliation"].append({"name": affiliation.name} | identifier_data)
493 return ret
496class MetadataContributorSerializer(serializers.ModelSerializer):
497 contributor = DataciteParticipantSerializer()
499 class Meta:
500 model = MetadataContributor
501 fields = ("contributor", "contributor_type")
504class FormatSerializer(serializers.ModelSerializer):
505 class Meta:
506 model = Format
507 fields = ("format",)
510class FundingSerializer(serializers.ModelSerializer):
511 funder = DataciteParticipantSerializer(required=True)
513 class Meta:
514 model = Funding
515 fields = ["award_uri", "award_title", "award_number", "funder"]
517 def to_internal_value(self, data: dict) -> Any:
518 """Manually deserializes the objects....
519 Redundant."""
520 # TODO(garciacp): https://github.com/astral-sh/ruff/issues/3870
521 identifier = {}
522 scheme_uri = self.try_to_get_scheme_uri_if_missing(data)
523 if scheme_uri:
524 identifier["scheme_uri"] = scheme_uri
525 if "funder_identifier_type" in data:
526 identifier["name_identifier_scheme"] = data.pop("funder_identifier_type")
527 if "funder_identifier" in data:
528 identifier["name_identifier"] = data.pop("funder_identifier")
529 else:
530 identifier = {}
532 if "funder_name" in data:
533 funder = {"name": data.pop("funder_name")}
534 if identifier:
535 funder["name_identifiers"] = [identifier]
536 else:
537 funder = {}
539 if funder:
540 data["funder"] = funder
542 return super().to_internal_value(data)
544 def create(self, validated_data: dict) -> Any:
545 """Manually creates the funder.
546 Redundant, duplicated somewhere."""
547 # TODO(garciacp): https://github.com/astral-sh/ruff/issues/3870
548 funder = validated_data.pop("funder", {})
549 identifier_obj = None
550 if "identifiers" in funder:
551 for identifier in funder["identifiers"]:
552 try:
553 identifier_obj = Identifier.objects.get(
554 identifier=identifier["identifier"]
555 )
556 except Identifier.DoesNotExist:
557 identifier_obj = None
558 else:
559 break
561 if identifier_obj and isinstance(identifier_obj.content_object, Participant):
562 funder_obj = identifier_obj.content_object
563 else:
564 identifiers = funder.pop("identifiers", [])
565 funder_obj = ParticipantSerializer().create(funder)
566 for identifier in identifiers:
567 Identifier.objects.get_or_create(
568 instance=funder_obj,
569 identifier=identifier["identifier"],
570 scheme={
571 "scheme": identifier["scheme"],
572 "uri": identifier["uri"],
573 },
574 )
575 funder_obj.is_funder = True
576 funder_obj.save()
577 validated_data["funder"] = funder_obj
579 funding, _ = Funding.objects.get_or_create(**validated_data)
580 return funding
582 @staticmethod
583 def try_to_get_scheme_uri_if_missing(data: dict[str, Any]) -> str | None:
584 """Try to fetch the field "scheme_uri", otherwise try to fetch the scheme,
585 otherwise try to get from identifier, otherwise None+warning"""
586 if "scheme_uri" in data:
587 # Try to fetch the uri from input
588 return str(data.pop("scheme_uri"))
590 if "funder_identifier_type" in data:
591 # Try to fetch the uri from an existing instance of scheme
592 with contextlib.suppress(IdentifierScheme.DoesNotExist):
593 return IdentifierScheme.objects.get(
594 scheme=data["funder_identifier_type"]
595 ).uri
597 if "scheme_uri" not in data and "funder_identifier" in data:
598 # Try to fetch the uri from the identifier
599 match_uri = re.match(SCHEME_URI_REGEX, data["funder_identifier"])
600 if match_uri is not None:
601 return match_uri.group(0)
603 logger.warning("Error fetching the uri of an identifier scheme ")
604 return None
606 def to_representation(self, instance: Any) -> dict:
607 """Manually deserializes the funder.
608 Probably redundant."""
609 # TODO(garciacp): https://github.com/astral-sh/ruff/issues/3870
610 ret = super().to_representation(instance)
612 # change field name
613 ret.pop("funder", "")
614 ret["funder_name"] = instance.funder.name
615 if instance.funder.identifiers.count() > 0:
616 first_identifier = instance.funder.identifiers.first()
617 ret["funder_identifier"] = first_identifier.identifier
618 ret["funder_identifier_type"] = first_identifier.scheme.scheme
619 ret["scheme_uri"] = first_identifier.scheme.uri
621 # clean empty fields
622 for key, val in [(k, v) for k, v in ret.items()]:
623 if not val:
624 ret.pop(key)
626 return ret
628 def validate(self, data: dict) -> Any:
629 """Complex way of validating that the funder_type is not missing."""
630 # TODO(garciacp): https://github.com/astral-sh/ruff/issues/3870
631 validated_data = super().validate(data)
633 if (
634 "funder" in validated_data
635 and "identifiers" in validated_data["funder"]
636 and len(validated_data["funder"]["identifiers"]) == 1
637 and "identifier" in validated_data["funder"]["identifiers"][0]
638 and "scheme" in validated_data["funder"]["identifiers"][0]
639 and not validated_data["funder"]["identifiers"][0]["scheme"]
640 ):
641 msg = "Must include the funder type"
642 raise serializers.ValidationError(msg)
643 return validated_data
646class RightsSerializer(SingleIdentifiedObjectSerializer):
647 prefix = "rights"
649 rights_uri = serializers.CharField(validators=[validate_uri])
651 class Meta:
652 model = Rights
653 fields = ["rights", "rights_uri", "lang"]
655 def to_representation(self, instance: Any) -> dict:
656 """Cleans the 'identifier' field (not used) and 'lang' if empty."""
657 ret = super().to_representation(instance)
659 if "lang" in ret and not ret["lang"]:
660 ret.pop("lang")
661 ret.pop("identifier", "")
663 return ret
666class TitleSerializer(serializers.ModelSerializer):
667 class Meta:
668 model = Title
669 fields = ["title", "title_type", "lang"]
671 def to_internal_value(self, data: dict) -> Any:
672 """Add default 'title_type' if missing."""
673 instance = super().to_internal_value(data)
674 if "title_type" not in instance:
675 instance["title_type"] = TitleTypes.DEFAULT
676 return instance
678 def to_representation(self, instance: Title) -> Any:
679 """Removes 'title_type' if "MainTitle" as not handled by datacite."""
680 ret = super().to_representation(instance)
681 if ret["title_type"] == "MainTitle": 681 ↛ 683line 681 didn't jump to line 683 because the condition on line 681 was always true
682 del ret["title_type"]
683 return ret
686class PublisherSerializer(SingleIdentifiedObjectSerializer):
687 prefix = "publisher"
689 class Meta:
690 model = Publisher
691 fields = ["name", "lang"]
694class DescriptionSerializer(serializers.ModelSerializer):
695 class Meta:
696 model = Description
697 fields = ["description", "description_type", "lang"]
700class GeolocationSerializer(serializers.ModelSerializer):
701 geo_location_place = serializers.CharField(source="place", max_length=255)
703 class Meta:
704 model = Geolocation
705 fields = [
706 "geo_location_place",
707 "west_bound_longitude",
708 "east_bound_longitude",
709 "south_bound_latitude",
710 "north_bound_latitude",
711 ]
713 def to_internal_value(self, data: dict) -> dict:
714 data["place"] = data.pop("geo_location_place", "")
715 box = data.pop("geo_location_box", {})
716 data.update(box)
717 return data
719 def to_representation(self, instance: Any) -> dict:
720 data = super().to_representation(instance=instance)
721 if data.get("geo_location_place"):
722 geo_location_place = data.pop("geo_location_place")
723 data = self.clean_location_box(
724 box_data=data
725 ) # only the geo_location_box is left
726 if data:
727 return {
728 "geo_location_place": geo_location_place,
729 "geo_location_box": data,
730 }
731 return {"geo_location_place": geo_location_place}
732 return {"geo_location_box": self.clean_location_box(box_data=data)}
734 @staticmethod
735 def clean_location_box(box_data: dict[str, Any]) -> dict[str, Any]:
736 """As coordinates can be null, they are None on default serialization and
737 should be removed."""
738 for bound in [
739 "west_bound_longitude",
740 "east_bound_longitude",
741 "south_bound_latitude",
742 "north_bound_latitude",
743 ]:
744 if box_data[bound] is None:
745 box_data.pop(bound)
746 return box_data
749class MetadataSerializer(serializers.ModelSerializer):
750 """Serializes from DB to Datacite json format"""
752 rights = RightsSerializer(many=True)
753 formats = FormatSerializer(many=True)
754 funding_references = FundingSerializer(source="fundings", many=True)
755 geo_locations = GeolocationSerializer(source="geolocation_set", many=True)
757 class Meta:
758 model = Metadata
759 fields = [
760 "id",
761 "url",
762 "publication_year",
763 "state",
764 "types",
765 "publisher",
766 "rights",
767 "contributors",
768 "formats",
769 "funding_references",
770 "geo_locations",
771 ]
772 depth = 2
774 @staticmethod
775 def date_serializer(
776 date_type: str,
777 date_start: datetime.date | None = None,
778 date_end: datetime.date | None = None,
779 ) -> dict[str, str]:
780 if not date_start:
781 return {}
783 date = {"date_type": date_type, "date_information": ""}
785 if date_end:
786 date["date"] = (
787 f"{date_start.strftime(DATE_FORMAT)}/{date_end.strftime(DATE_FORMAT)}"
788 )
789 else:
790 date["date"] = date_start.strftime(DATE_FORMAT)
792 return date
794 @staticmethod
795 def dates_serializer(metadata: Metadata) -> list[dict]:
796 dates = [MetadataSerializer.date_serializer("Issued", metadata.issued)]
798 if metadata.collected_start:
799 dates.append(
800 MetadataSerializer.date_serializer(
801 "Collected", metadata.collected_start, metadata.collected_end
802 )
803 )
805 # if embargoed, add the availability date too
806 if metadata.available:
807 dates.append(
808 MetadataSerializer.date_serializer("Available", metadata.available)
809 )
810 return dates
812 @staticmethod
813 def sizes_serializer(metadata: Metadata) -> list[str]:
814 return [
815 size
816 for size in [
817 metadata.size_information,
818 metadata.size_increment,
819 metadata.size_total,
820 ]
821 if size
822 ]
824 def to_representation(self, instance: Metadata) -> Any:
825 """Serializes the Metadata object."""
826 ret = super().to_representation(instance)
827 ret["dates"] = self.dates_serializer(metadata=instance)
828 ret["sizes"] = self.sizes_serializer(metadata=instance)
829 ret["formats"] = [format_obj["format"] for format_obj in ret.pop("formats", [])]
830 ret["contributors"] = [
831 ParticipantSerializer(instance=contributor.contributor).data
832 | {"contributor_type": contributor.contributor_type}
833 for contributor in MetadataContributor.objects.filter(metadata=instance)
834 ]
835 ret["doi"] = instance.network.doi
836 ret["titles"] = TitleSerializer(instance.title_set.all(), many=True).data
837 ret["creators"] = [
838 ParticipantSerializer(instance=metadatacreator.creator).data
839 for metadatacreator in MetadataCreator.objects.filter(metadata=instance)
840 .order_by("order")
841 .only("creator")
842 ]
843 ret["rights_list"] = ret.pop("rights")
844 ret["descriptions"] = [
845 DescriptionSerializer(instance=description).data
846 for description in instance.description_set.all()
847 ]
848 ret["funding_references"] = [
849 FundingSerializer(instance=funding).data
850 for funding in instance.fundings.all()
851 ]
853 return ret
856def convert_from_plain_to_nested(
857 plain_data: list, label: str, external_fields: list | None = None
858) -> list:
859 """All the fields present in plain_data, that are not present in external fields,
860 are moved into another dict, which is itself added with the given label:
861 ({"a":1,"b1":2, "b2":3}, "b", ["a"]) -> {"a":1,"b":{b1":2, "b2":3}}"""
862 if external_fields is None:
863 external_fields = []
865 nested_data = []
866 for item in plain_data:
867 nested_item = {}
868 keys = list(item.keys())
869 for k in keys:
870 if k not in external_fields:
871 nested_item[k] = item.pop(k)
872 item.update({label: nested_item})
873 nested_data.append(item)
874 return nested_data
877class MetadataCreatorDeserializer(serializers.ModelSerializer):
878 creator = DataciteParticipantSerializer()
880 class Meta:
881 model = MetadataCreator
882 fields = ("creator",)
885class MetadataDeserializer(serializers.ModelSerializer):
886 """Deserializes from Datacite json output to DB. One shot import...."""
888 titles = TitleSerializer(many=True)
889 creators = MetadataCreatorDeserializer(many=True)
890 contributors = MetadataContributorSerializer(many=True, required=False)
891 dates = DateSerializer(many=True)
892 publisher = PublisherSerializer()
893 types = ResourceTypeSerializer()
894 rights_list = RightsSerializer(source="rights", many=True, required=False)
895 funding_references = FundingSerializer(source="fundings", many=True, required=False)
896 sizes = serializers.ListField(
897 child=serializers.CharField(max_length=255), allow_empty=True, required=False
898 )
899 formats = serializers.ListField(
900 child=serializers.CharField(max_length=255), allow_empty=True, required=False
901 )
902 descriptions = DescriptionSerializer(many=True, required=False)
903 geo_locations = GeolocationSerializer(many=True, required=False)
905 class Meta:
906 model = Metadata
907 fields = [
908 "id",
909 "url",
910 "publication_year",
911 "state",
912 "types",
913 "publisher",
914 "titles",
915 "creators",
916 "contributors",
917 "network",
918 "dates",
919 "rights_list",
920 "funding_references",
921 "sizes",
922 "formats",
923 "descriptions",
924 "geo_locations",
925 ]
927 def __init__(self, **kwargs: Any) -> None:
928 """Fixes creators and contributors, before init"""
929 if "data" in kwargs:
930 if "creators" in kwargs["data"]:
931 kwargs["data"]["creators"] = convert_from_plain_to_nested(
932 kwargs["data"]["creators"], "creator"
933 )
934 if "contributors" in kwargs["data"]:
935 kwargs["data"]["contributors"] = convert_from_plain_to_nested(
936 kwargs["data"]["contributors"], "contributor", ["contributor_type"]
937 )
938 super().__init__(**kwargs)
940 def create(self, validated_data: dict) -> Metadata: # noqa: C901
941 """Creates all the items 1 by 1"""
942 publisher_data = validated_data.pop("publisher")
943 rs_data = validated_data.pop("types")
944 titles_data = validated_data.pop("titles")
945 creators_data = validated_data.pop("creators")
946 contributors_data = validated_data.pop("contributors", [])
947 dates_data = validated_data.pop("dates")
948 rights_data = validated_data.pop("rights")
949 fundings_data = validated_data.pop("fundings")
950 formats_data = validated_data.pop("formats", [])
951 descriptions_data = validated_data.pop("descriptions", [])
952 geo_locations_data = validated_data.pop("geo_locations", [])
954 publisher = PublisherSerializer().create(publisher_data)
956 validated_data.update(self.extract_dates(dates_data))
958 rs, _ = ResourceType.objects.get_or_create(**rs_data)
959 if self.instance:
960 metadata = cast("Metadata", self.instance)
961 Metadata.objects.filter(pk=metadata.pk).update(
962 **validated_data, publisher=publisher, types=rs
963 )
964 metadata.refresh_from_db()
965 else:
966 metadata = Metadata.objects.create(
967 **validated_data, publisher=publisher, types=rs
968 )
969 metadata.formats.set(self.get_or_create_formats(formats_data))
970 # Create titles. If only one and not type, assign as a MainTitle
971 if len(titles_data) == 1 and not titles_data[0]["title_type"]:
972 titles_data[0]["title_type"] = TitleTypes.MAIN_TITLE
973 for title_data in titles_data:
974 Title.objects.get_or_create(**title_data, metadata=metadata)
975 # Create Creators in order.
976 for i, creator_data in enumerate(creators_data):
977 if creator_data.get("creator", {}):
978 creator = ParticipantSerializer().create(creator_data.pop("creator"))
979 metadata.add_ordered_creator(creator, i)
980 # Create Contributors.
981 for contributor_data in contributors_data:
982 if contributor_data.get("contributor", {}):
983 contributor = ParticipantSerializer().create(
984 contributor_data.pop("contributor")
985 )
986 MetadataContributor.objects.get_or_create(
987 metadata=metadata,
988 contributor=contributor,
989 contributor_type=contributor_data.pop("contributor_type"),
990 )
991 # Rights and licenses.
992 for right_data in rights_data:
993 rights = RightsSerializer().create(right_data)
994 metadata.rights.add(rights)
995 # Descriptions.
996 for description_data in descriptions_data:
997 Description.objects.get_or_create(metadata=metadata, **description_data)
998 # Funding.
999 for funding_data in fundings_data:
1000 funding = FundingSerializer().create(funding_data)
1001 metadata.fundings.add(funding)
1002 # Geolocations.
1003 for geolocation_data in geo_locations_data:
1004 GeolocationSerializer().create(geolocation_data | {"metadata": metadata})
1006 return metadata
1008 def to_internal_value(self, data: dict) -> Any:
1009 """Modify sizes format."""
1010 validated_data = super().to_internal_value(data)
1012 sizes = validated_data.pop("sizes", [])
1014 match len(sizes):
1015 case 1:
1016 validated_data["size_information"] = sizes[0]
1017 case 2:
1018 validated_data["size_information"] = sizes[0]
1019 validated_data["size_increment"] = sizes[1]
1020 case 3:
1021 validated_data["size_information"] = sizes[0]
1022 validated_data["size_increment"] = sizes[1]
1023 validated_data["size_total"] = sizes[2]
1024 case _:
1025 pass
1027 return validated_data
1029 @staticmethod
1030 def get_or_create_formats(data: list) -> list:
1031 """From a list of formats, get_or_creates each format,
1032 and returns the list of ids"""
1033 indexes = []
1034 for format_str in data:
1035 try:
1036 index = Format.objects.get(format__iexact=format_str)
1037 except Format.DoesNotExist:
1038 index = Format.objects.create(format=format_str)
1039 indexes.append(index.pk)
1040 return indexes
1042 @staticmethod
1043 def extract_date(
1044 raw_date: str, label_start: str, label_end: str | None = None
1045 ) -> dict:
1046 date = {}
1047 if re.search(regex_year, raw_date):
1048 date[label_start] = year_to_date(raw_date)
1049 elif re.search(regex_date, raw_date):
1050 date[label_start] = str_date_to_date(raw_date)
1051 elif re.search(regex_date_range, raw_date) and label_end:
1052 date_start, date_end = raw_date.split("/")
1053 date[label_start] = str_date_to_date(date_start)
1054 date[label_end] = str_date_to_date(date_end)
1055 return date
1057 @staticmethod
1058 def extract_dates(dates_data: list) -> dict:
1059 dates = {}
1061 for date in dates_data:
1062 # Issued date
1063 if "date_type" in date and date["date_type"] == "Issued":
1064 dates.update(MetadataDeserializer.extract_date(date["date"], "issued"))
1065 # no date range is present in Datacite prod.
1066 # Collected date
1067 if "date_type" in date and date["date_type"] == "Collected":
1068 dates.update(
1069 MetadataDeserializer.extract_date(
1070 date["date"], "collected_start", "collected_end"
1071 )
1072 )
1073 # Available date
1074 elif "date_type" in date and date["date_type"] == "Available":
1075 dates.update(
1076 MetadataDeserializer.extract_date(date["date"], "available")
1077 )
1079 return dates