Coverage for datacite/serializers.py: 99%

540 statements  

« prev     ^ index     » next       coverage.py v7.10.1, created at 2025-07-29 15:38 +0000

1# ruff: noqa: FIX002 

2import contextlib 

3import datetime 

4import logging 

5import re 

6from typing import Any, cast 

7 

8from django.contrib.contenttypes.models import ContentType 

9from django.db import IntegrityError, models 

10from rest_framework import serializers 

11from rest_framework.exceptions import ValidationError 

12 

13from datacite.models import ( 

14 Description, 

15 Format, 

16 Funding, 

17 Geolocation, 

18 Identifier, 

19 IdentifierScheme, 

20 Metadata, 

21 MetadataContributor, 

22 MetadataCreator, 

23 Participant, 

24 Publisher, 

25 ResourceType, 

26 Rights, 

27 Title, 

28 TitleTypes, 

29) 

30from datacite.utils import ( 

31 DATE_FORMAT, 

32 SCHEME_URI_REGEX, 

33 regex_date, 

34 regex_date_range, 

35 regex_year, 

36 str_date_to_date, 

37 year_to_date, 

38) 

39from datacite.validators import validate_uri 

40 

41logger = logging.getLogger(__name__) 

42 

43# TODO(garciacp): https://github.com/astral-sh/ruff/issues/3870 

44# Reformat Serializers: Completely separate multi and single identified serializers, 

45# even Read-Only and Write-Only and exploit the multi-inheritance of python. This may be 

46# useful in the case of a funder/affiliation, where a they are participants but has only 

47# one identifier on the serialized form. 

48 

49 

50class ResourceTypeSerializer(serializers.ModelSerializer): 

51 class Meta: 

52 model = ResourceType 

53 fields = ["resource_type", "resource_type_general"] 

54 

55 

56############################## 

57# Datacite 

58############################## 

59class IdentifierSchemeSerializer(serializers.ModelSerializer): 

60 class Meta: 

61 model = IdentifierScheme 

62 fields = ["scheme", "uri"] 

63 extra_kwargs: dict[str, Any] = { 

64 "scheme": {"validators": []}, 

65 "uri": {"validators": [validate_uri]}, 

66 } 

67 

68 def validate(self, data: dict) -> dict: 

69 """Checks if the URI and scheme are incoherent.""" 

70 try: 

71 scheme = IdentifierScheme.objects.get(uri__iexact=data["uri"]) 

72 except IdentifierScheme.DoesNotExist: 

73 pass # It does not exist, we are ok. 

74 else: 

75 if scheme.scheme != data["scheme"]: 75 ↛ 81line 75 didn't jump to line 81 because the condition on line 75 was always true

76 raise ValidationError( 

77 detail="Uri already present but not the right scheme.", 

78 code="invalid", 

79 ) 

80 

81 return data 

82 

83 def create(self, validated_data: dict) -> IdentifierScheme: 

84 scheme, _ = self.get_or_create(validated_data) 

85 return scheme 

86 

87 @staticmethod 

88 def get_or_create(data: dict) -> tuple[IdentifierScheme, bool]: 

89 """Tries to get the scheme, otherwise create it.""" 

90 try: 

91 return IdentifierScheme.objects.get(scheme__iexact=data["scheme"]), False 

92 except IdentifierScheme.DoesNotExist: 

93 return IdentifierScheme.objects.create(**data), True 

94 

95 

96class IdentifierSerializer(serializers.ModelSerializer): 

97 related_instance = None 

98 

99 scheme = IdentifierSchemeSerializer() 

100 

101 class Meta: 

102 model = Identifier 

103 fields = ["scheme", "identifier"] 

104 depth = 1 

105 extra_kwargs: dict[str, Any] = { 

106 "scheme": {"validators": []}, 

107 "identifier": {"validators": []}, 

108 } 

109 

110 def create(self, validated_data: dict) -> object: 

111 """creates identifier if it has identifier and scheme fields and if a related 

112 instance is associated to self.""" 

113 identifier = validated_data.pop("identifier") 

114 scheme_data = validated_data.pop("scheme") 

115 if not self.related_instance: 

116 raise IntegrityError 

117 identifier_obj, _ = Identifier.objects.get_or_create( 

118 identifier=identifier, scheme=scheme_data, instance=self.related_instance 

119 ) 

120 return identifier_obj 

121 

122 def to_representation(self, instance: Identifier) -> dict[str, Any]: 

123 return { 

124 "identifier": instance.identifier, 

125 "identifier_scheme": instance.scheme.scheme, 

126 "scheme_uri": instance.scheme.uri, 

127 } 

128 

129 def __init__(self, instance: Identifier | None = None, **kwargs: Any) -> None: 

130 if "data" in kwargs: 

131 data = kwargs.pop("data") 

132 self.related_instance = data.pop("related_instance", None) 

133 kwargs["data"] = self.reformat_data(data) 

134 

135 super().__init__(instance=instance, **kwargs) 

136 

137 @staticmethod 

138 def reformat_data(data: dict) -> dict: 

139 """Converts plain scheme to dict scheme.""" 

140 if "identifier_scheme" in data and "scheme_uri" in data: 

141 data["scheme"] = { 

142 "scheme": data.pop("identifier_scheme"), 

143 "uri": data.pop("scheme_uri"), 

144 } 

145 return data 

146 

147 

148class MultiIdentifiedObjectSerializer(serializers.ModelSerializer): 

149 prefix = "" 

150 

151 class Meta: 

152 pass 

153 

154 def __init__(self, **kwargs: Any) -> None: 

155 super().__init__(**kwargs) 

156 self.Meta.fields.append("identifiers") 

157 

158 def to_representation(self, instance: Any) -> dict: 

159 """ 

160 Serializes the instance, and adds the list of identifiers with the right prefix. 

161 """ 

162 ret = super().to_representation(instance) 

163 

164 content_type = ContentType.objects.get_for_model(self.Meta.model) 

165 

166 identifiers = Identifier.objects.filter( 

167 content_type=content_type, object_id=instance.pk 

168 ) 

169 

170 # Creates the list of identifiers with the given prefix. 

171 identifiers_data = [ 

172 { 

173 self.get_identifier_scheme_label(): identifier.scheme.scheme, 

174 self.get_identifier_label(): identifier.identifier, 

175 self.get_scheme_uri_label(): identifier.scheme.uri, 

176 } 

177 for identifier in identifiers 

178 ] 

179 if identifiers_data: 

180 ret[self.get_identifiers_label()] = identifiers_data 

181 

182 return ret 

183 

184 def to_internal_value_identifier(self, data: dict) -> dict: 

185 """Deserializes the identifier: 

186 {"name_identifier":..., "name_identifier_scheme":..., "scheme_uri":...} -> 

187 {"identifier":..., "scheme":{"scheme":..., "uri":...}}""" 

188 scheme = {} 

189 if self.get_identifier_scheme_label() in data: 189 ↛ 191line 189 didn't jump to line 191 because the condition on line 189 was always true

190 scheme["scheme"] = data[self.get_identifier_scheme_label()] 

191 if self.get_scheme_uri_label() in data: 191 ↛ 194line 191 didn't jump to line 194 because the condition on line 191 was always true

192 scheme["uri"] = data[self.get_scheme_uri_label()] 

193 

194 return {"scheme": scheme, "identifier": data[self.get_identifier_label()]} 

195 

196 def to_internal_value(self, data: Any) -> Any: 

197 """Deserializes the identified object: 

198 {"name":..., ..., "name_identifiers": [{"name_identifier":...}, ...]} -> 

199 {"name":..., ..., "identifiers": [{"identifier":...}, ...]}""" 

200 validated_data = super().to_internal_value(data) 

201 

202 if self.get_identifiers_label() in data: 

203 identifiers = [ 

204 self.to_internal_value_identifier(identifier_data) 

205 for identifier_data in data.pop(self.get_identifiers_label(), []) 

206 if self.get_identifier_label() in identifier_data 

207 ] 

208 if identifiers: 208 ↛ 210line 208 didn't jump to line 210 because the condition on line 208 was always true

209 validated_data["identifiers"] = identifiers 

210 return validated_data 

211 

212 def create(self, validated_data: dict) -> Any: 

213 """Checks if identifier exists, and gets the instance, otherwise try to create 

214 it and its identifiers.""" 

215 identifiers = validated_data.pop("identifiers", []) 

216 identified_object = None 

217 if identifiers: 

218 identifier_obj = Identifier.objects.filter( 

219 identifier__in=[ 

220 identifier_data["identifier"] for identifier_data in identifiers 

221 ] 

222 ).first() 

223 

224 if identifier_obj and isinstance( 

225 identifier_obj.content_object, self.Meta.model 

226 ): 

227 identified_object = identifier_obj.content_object 

228 else: 

229 identified_object = None 

230 

231 if not identified_object: 

232 identified_object, _ = self.Meta.model.objects.get_or_create( 

233 **validated_data 

234 ) 

235 

236 if identifiers: 

237 for identifier_data in identifiers: 

238 self.create_identifier(identified_object, **identifier_data) 

239 

240 return identified_object 

241 

242 @staticmethod 

243 def create_identifier( 

244 identified_object: models.Model, 

245 identifier: str | None = None, 

246 scheme: dict[str, str] | str | None = None, 

247 uri: str | None = None, 

248 ) -> None: 

249 """If fields ok: {"identifier":..., "scheme:{...}} -> create identifier 

250 I can handle scheme as plain or as dict.""" 

251 if scheme and isinstance(scheme, str) and uri and isinstance(uri, str): 

252 scheme = {"scheme": scheme, "uri": uri} 

253 if ( 

254 identifier 

255 and scheme 

256 and "uri" in scheme 

257 and isinstance(scheme, dict) 

258 and scheme.get("uri") 

259 ): 

260 Identifier.objects.get_or_create( 

261 instance=identified_object, 

262 identifier=identifier, 

263 scheme=scheme, 

264 ) 

265 

266 def get_identifiers_label(self) -> str: 

267 return self.prefix + "_identifiers" 

268 

269 def get_identifier_label(self) -> str: 

270 return self.prefix + "_identifier" 

271 

272 def get_identifier_scheme_label(self) -> str: 

273 return self.prefix + "_identifier_scheme" 

274 

275 @staticmethod 

276 def get_scheme_uri_label() -> str: 

277 return "scheme_uri" 

278 

279 

280class SingleIdentifiedObjectSerializer(MultiIdentifiedObjectSerializer): 

281 prefix = "" 

282 

283 def __init__(self, **kwargs: Any) -> None: 

284 super().__init__(**kwargs) 

285 self.Meta.fields.remove("identifiers") 

286 self.Meta.fields.append("identifier") 

287 

288 def to_representation(self, instance: Any) -> dict: 

289 """Gets the multi-identified serialization and extracts the first element to 

290 insert at identified_object level.""" 

291 ret = super().to_representation(instance) 

292 

293 if ( 

294 self.get_identifiers_label() in ret 

295 and len(ret[self.get_identifiers_label()]) == 1 

296 ): 

297 identifier = ret.pop(self.get_identifiers_label())[0] 

298 ret.update(identifier) 

299 

300 return ret 

301 

302 def to_internal_value(self, data: Any) -> Any: 

303 """Extracts identifier from identified_object and inserts it into a list 

304 (using parent method)""" 

305 identifier = { 

306 key: data.pop(key) 

307 for key in [ 

308 self.get_identifier_label(), 

309 self.get_identifier_scheme_label(), 

310 self.get_scheme_uri_label(), 

311 ] 

312 if key in data 

313 } 

314 

315 if identifier: 

316 data[self.get_identifiers_label()] = [identifier] 

317 

318 return super().to_internal_value(data) 

319 

320 

321class AffiliationSerializer(MultiIdentifiedObjectSerializer): 

322 prefix = "affiliation" 

323 

324 class Meta: 

325 model = Participant 

326 fields = ["name"] 

327 

328 def to_internal_value(self, data: Any) -> Any: 

329 """Deals with plain identifiers and affiliation as str. Then, calls super.""" 

330 data = self.transform_string_input_to_name(data) 

331 identifiers = self.transform_affiliation_identifier(data) 

332 

333 if identifiers: 

334 data[self.get_identifiers_label()] = identifiers 

335 

336 return super().to_internal_value(data) 

337 

338 @staticmethod 

339 def transform_string_input_to_name(data: Any) -> Any: 

340 """At import, affiliations may be a str, we convert it to dict in the right 

341 format.""" 

342 if isinstance(data, str): 

343 return {"name": data} 

344 return data 

345 

346 def transform_affiliation_identifier(self, data: dict[str, Any]) -> list[dict]: 

347 """Converts the plain identifier into dict identifier. 

348 Redundant code. It should use the SingleIdentifiedSerializer for the import.""" 

349 # TODO(garciacp): https://github.com/astral-sh/ruff/issues/3870 

350 identifier = {} 

351 if self.get_identifier_label() in data: 

352 identifier[self.get_identifier_label()] = data.pop( 

353 self.get_identifier_label() 

354 ) 

355 if self.get_identifier_scheme_label() in data: 

356 identifier[self.get_identifier_scheme_label()] = data.pop( 

357 self.get_identifier_scheme_label() 

358 ) 

359 if self.get_scheme_uri_label() in data: 

360 identifier[self.get_scheme_uri_label()] = data.pop( 

361 self.get_scheme_uri_label() 

362 ) 

363 

364 return [identifier] if identifier else [] 

365 

366 

367class DateSerializer(serializers.Serializer): 

368 """Serializes from DB to Datacite json input""" 

369 

370 date_type = serializers.CharField(max_length=200) 

371 date = serializers.CharField(max_length=200) 

372 date_info = serializers.CharField(max_length=200, required=False) 

373 

374 

375class DataciteParticipantSerializer(serializers.ModelSerializer): 

376 """Datacite key is in singular (although a list). The source name maps 

377 the affiliation (in datacite) to affiliations (in our app).""" 

378 

379 affiliation = AffiliationSerializer( 

380 many=True, source="affiliations", required=False 

381 ) 

382 name = serializers.CharField(required=False) 

383 

384 class Meta: 

385 model = Participant 

386 fields = [ 

387 "name", 

388 "given_name", 

389 "family_name", 

390 "name_type", 

391 "lang", 

392 "affiliation", 

393 ] 

394 

395 def to_internal_value(self, data: dict) -> Any: 

396 """Converts the identifiers labels : name_identifier -> identifier. 

397 Probable redundant.""" 

398 # TODO(garciacp): https://github.com/astral-sh/ruff/issues/3870 

399 if "name_identifiers" in data: 

400 identifiers = [ 

401 { 

402 "scheme": identifier.pop("name_identifier_scheme", ""), 

403 "uri": identifier.pop("scheme_uri", ""), 

404 "identifier": identifier.pop("name_identifier", ""), 

405 } 

406 for identifier in data["name_identifiers"] 

407 if "scheme_uri" in identifier 

408 ] 

409 validated_data = super().to_internal_value(data) 

410 validated_data["identifiers"] = identifiers 

411 else: 

412 validated_data = super().to_internal_value(data) 

413 return validated_data 

414 

415 

416class ParticipantSerializer(MultiIdentifiedObjectSerializer): 

417 prefix = "name" 

418 

419 affiliation = AffiliationSerializer( 

420 source="affiliations", many=True, required=False 

421 ) 

422 

423 class Meta: 

424 model = Participant 

425 fields = [ 

426 "name", 

427 "given_name", 

428 "family_name", 

429 "name_type", 

430 "lang", 

431 "affiliation", 

432 ] 

433 

434 def create(self, data: dict) -> Any: 

435 """Creates the participant with the affiliations.""" 

436 affiliations = data.pop("affiliations", []) 

437 

438 try: 

439 participant = super().create(data) 

440 except Participant.MultipleObjectsReturned: 

441 participant = Participant.objects.filter(**data).first() 

442 

443 for affiliation_data in affiliations: 

444 affiliation = AffiliationSerializer().create(affiliation_data) 

445 participant.affiliations.add(affiliation) 

446 

447 return participant 

448 

449 def to_representation(self, instance: Any) -> dict: 

450 """Manually serializes the objects.... 

451 Redundant.""" 

452 # TODO(garciacp): https://github.com/astral-sh/ruff/issues/3870 

453 ret = super().to_representation(instance) 

454 

455 if "name_type" in ret and not ret["name_type"]: 455 ↛ 458line 455 didn't jump to line 458 because the condition on line 455 was always true

456 ret.pop("name_type") 

457 

458 content_type = ContentType.objects.get_for_model(self.Meta.model) 

459 

460 identifiers = Identifier.objects.filter( 

461 content_type=content_type, object_id=instance.pk 

462 ) 

463 ret.pop("identifiers", []) 

464 ret["name_identifiers"] = [] 

465 for identifier in identifiers: 

466 ret["name_identifiers"].append( 

467 { 

468 "name_identifier_scheme": identifier.scheme.scheme, 

469 "name_identifier": identifier.identifier, 

470 "scheme_uri": identifier.scheme.uri, 

471 } 

472 ) 

473 

474 ret.pop("affiliations", []) 

475 ret["affiliation"] = [] 

476 for affiliation in instance.affiliations.all(): 

477 identifier = affiliation.identifiers.filter(scheme__scheme="ROR").first() 

478 if identifier is None: 478 ↛ 481line 478 didn't jump to line 481 because the condition on line 478 was always true

479 identifier = affiliation.identifiers.first() 

480 

481 identifier_data = ( 

482 { 

483 "affiliation_identifier_scheme": identifier.scheme.scheme, 

484 "affiliation_identifier": identifier.identifier, 

485 "scheme_uri": identifier.scheme.uri, 

486 } 

487 if identifier 

488 else {} 

489 ) 

490 

491 ret["affiliation"].append({"name": affiliation.name} | identifier_data) 

492 

493 return ret 

494 

495 

496class MetadataContributorSerializer(serializers.ModelSerializer): 

497 contributor = DataciteParticipantSerializer() 

498 

499 class Meta: 

500 model = MetadataContributor 

501 fields = ("contributor", "contributor_type") 

502 

503 

504class FormatSerializer(serializers.ModelSerializer): 

505 class Meta: 

506 model = Format 

507 fields = ("format",) 

508 

509 

510class FundingSerializer(serializers.ModelSerializer): 

511 funder = DataciteParticipantSerializer(required=True) 

512 

513 class Meta: 

514 model = Funding 

515 fields = ["award_uri", "award_title", "award_number", "funder"] 

516 

517 def to_internal_value(self, data: dict) -> Any: 

518 """Manually deserializes the objects.... 

519 Redundant.""" 

520 # TODO(garciacp): https://github.com/astral-sh/ruff/issues/3870 

521 identifier = {} 

522 scheme_uri = self.try_to_get_scheme_uri_if_missing(data) 

523 if scheme_uri: 

524 identifier["scheme_uri"] = scheme_uri 

525 if "funder_identifier_type" in data: 

526 identifier["name_identifier_scheme"] = data.pop("funder_identifier_type") 

527 if "funder_identifier" in data: 

528 identifier["name_identifier"] = data.pop("funder_identifier") 

529 else: 

530 identifier = {} 

531 

532 if "funder_name" in data: 

533 funder = {"name": data.pop("funder_name")} 

534 if identifier: 

535 funder["name_identifiers"] = [identifier] 

536 else: 

537 funder = {} 

538 

539 if funder: 

540 data["funder"] = funder 

541 

542 return super().to_internal_value(data) 

543 

544 def create(self, validated_data: dict) -> Any: 

545 """Manually creates the funder. 

546 Redundant, duplicated somewhere.""" 

547 # TODO(garciacp): https://github.com/astral-sh/ruff/issues/3870 

548 funder = validated_data.pop("funder", {}) 

549 identifier_obj = None 

550 if "identifiers" in funder: 

551 for identifier in funder["identifiers"]: 

552 try: 

553 identifier_obj = Identifier.objects.get( 

554 identifier=identifier["identifier"] 

555 ) 

556 except Identifier.DoesNotExist: 

557 identifier_obj = None 

558 else: 

559 break 

560 

561 if identifier_obj and isinstance(identifier_obj.content_object, Participant): 

562 funder_obj = identifier_obj.content_object 

563 else: 

564 identifiers = funder.pop("identifiers", []) 

565 funder_obj = ParticipantSerializer().create(funder) 

566 for identifier in identifiers: 

567 Identifier.objects.get_or_create( 

568 instance=funder_obj, 

569 identifier=identifier["identifier"], 

570 scheme={ 

571 "scheme": identifier["scheme"], 

572 "uri": identifier["uri"], 

573 }, 

574 ) 

575 funder_obj.is_funder = True 

576 funder_obj.save() 

577 validated_data["funder"] = funder_obj 

578 

579 funding, _ = Funding.objects.get_or_create(**validated_data) 

580 return funding 

581 

582 @staticmethod 

583 def try_to_get_scheme_uri_if_missing(data: dict[str, Any]) -> str | None: 

584 """Try to fetch the field "scheme_uri", otherwise try to fetch the scheme, 

585 otherwise try to get from identifier, otherwise None+warning""" 

586 if "scheme_uri" in data: 

587 # Try to fetch the uri from input 

588 return str(data.pop("scheme_uri")) 

589 

590 if "funder_identifier_type" in data: 

591 # Try to fetch the uri from an existing instance of scheme 

592 with contextlib.suppress(IdentifierScheme.DoesNotExist): 

593 return IdentifierScheme.objects.get( 

594 scheme=data["funder_identifier_type"] 

595 ).uri 

596 

597 if "scheme_uri" not in data and "funder_identifier" in data: 

598 # Try to fetch the uri from the identifier 

599 match_uri = re.match(SCHEME_URI_REGEX, data["funder_identifier"]) 

600 if match_uri is not None: 

601 return match_uri.group(0) 

602 

603 logger.warning("Error fetching the uri of an identifier scheme ") 

604 return None 

605 

606 def to_representation(self, instance: Any) -> dict: 

607 """Manually deserializes the funder. 

608 Probably redundant.""" 

609 # TODO(garciacp): https://github.com/astral-sh/ruff/issues/3870 

610 ret = super().to_representation(instance) 

611 

612 # change field name 

613 ret.pop("funder", "") 

614 ret["funder_name"] = instance.funder.name 

615 if instance.funder.identifiers.count() > 0: 

616 first_identifier = instance.funder.identifiers.first() 

617 ret["funder_identifier"] = first_identifier.identifier 

618 ret["funder_identifier_type"] = first_identifier.scheme.scheme 

619 ret["scheme_uri"] = first_identifier.scheme.uri 

620 

621 # clean empty fields 

622 for key, val in [(k, v) for k, v in ret.items()]: 

623 if not val: 

624 ret.pop(key) 

625 

626 return ret 

627 

628 def validate(self, data: dict) -> Any: 

629 """Complex way of validating that the funder_type is not missing.""" 

630 # TODO(garciacp): https://github.com/astral-sh/ruff/issues/3870 

631 validated_data = super().validate(data) 

632 

633 if ( 

634 "funder" in validated_data 

635 and "identifiers" in validated_data["funder"] 

636 and len(validated_data["funder"]["identifiers"]) == 1 

637 and "identifier" in validated_data["funder"]["identifiers"][0] 

638 and "scheme" in validated_data["funder"]["identifiers"][0] 

639 and not validated_data["funder"]["identifiers"][0]["scheme"] 

640 ): 

641 msg = "Must include the funder type" 

642 raise serializers.ValidationError(msg) 

643 return validated_data 

644 

645 

646class RightsSerializer(SingleIdentifiedObjectSerializer): 

647 prefix = "rights" 

648 

649 rights_uri = serializers.CharField(validators=[validate_uri]) 

650 

651 class Meta: 

652 model = Rights 

653 fields = ["rights", "rights_uri", "lang"] 

654 

655 def to_representation(self, instance: Any) -> dict: 

656 """Cleans the 'identifier' field (not used) and 'lang' if empty.""" 

657 ret = super().to_representation(instance) 

658 

659 if "lang" in ret and not ret["lang"]: 

660 ret.pop("lang") 

661 ret.pop("identifier", "") 

662 

663 return ret 

664 

665 

666class TitleSerializer(serializers.ModelSerializer): 

667 class Meta: 

668 model = Title 

669 fields = ["title", "title_type", "lang"] 

670 

671 def to_internal_value(self, data: dict) -> Any: 

672 """Add default 'title_type' if missing.""" 

673 instance = super().to_internal_value(data) 

674 if "title_type" not in instance: 

675 instance["title_type"] = TitleTypes.DEFAULT 

676 return instance 

677 

678 def to_representation(self, instance: Title) -> Any: 

679 """Removes 'title_type' if "MainTitle" as not handled by datacite.""" 

680 ret = super().to_representation(instance) 

681 if ret["title_type"] == "MainTitle": 681 ↛ 683line 681 didn't jump to line 683 because the condition on line 681 was always true

682 del ret["title_type"] 

683 return ret 

684 

685 

686class PublisherSerializer(SingleIdentifiedObjectSerializer): 

687 prefix = "publisher" 

688 

689 class Meta: 

690 model = Publisher 

691 fields = ["name", "lang"] 

692 

693 

694class DescriptionSerializer(serializers.ModelSerializer): 

695 class Meta: 

696 model = Description 

697 fields = ["description", "description_type", "lang"] 

698 

699 

700class GeolocationSerializer(serializers.ModelSerializer): 

701 geo_location_place = serializers.CharField(source="place", max_length=255) 

702 

703 class Meta: 

704 model = Geolocation 

705 fields = [ 

706 "geo_location_place", 

707 "west_bound_longitude", 

708 "east_bound_longitude", 

709 "south_bound_latitude", 

710 "north_bound_latitude", 

711 ] 

712 

713 def to_internal_value(self, data: dict) -> dict: 

714 data["place"] = data.pop("geo_location_place", "") 

715 box = data.pop("geo_location_box", {}) 

716 data.update(box) 

717 return data 

718 

719 def to_representation(self, instance: Any) -> dict: 

720 data = super().to_representation(instance=instance) 

721 if data.get("geo_location_place"): 

722 geo_location_place = data.pop("geo_location_place") 

723 data = self.clean_location_box( 

724 box_data=data 

725 ) # only the geo_location_box is left 

726 if data: 

727 return { 

728 "geo_location_place": geo_location_place, 

729 "geo_location_box": data, 

730 } 

731 return {"geo_location_place": geo_location_place} 

732 return {"geo_location_box": self.clean_location_box(box_data=data)} 

733 

734 @staticmethod 

735 def clean_location_box(box_data: dict[str, Any]) -> dict[str, Any]: 

736 """As coordinates can be null, they are None on default serialization and 

737 should be removed.""" 

738 for bound in [ 

739 "west_bound_longitude", 

740 "east_bound_longitude", 

741 "south_bound_latitude", 

742 "north_bound_latitude", 

743 ]: 

744 if box_data[bound] is None: 

745 box_data.pop(bound) 

746 return box_data 

747 

748 

749class MetadataSerializer(serializers.ModelSerializer): 

750 """Serializes from DB to Datacite json format""" 

751 

752 rights = RightsSerializer(many=True) 

753 formats = FormatSerializer(many=True) 

754 funding_references = FundingSerializer(source="fundings", many=True) 

755 geo_locations = GeolocationSerializer(source="geolocation_set", many=True) 

756 

757 class Meta: 

758 model = Metadata 

759 fields = [ 

760 "id", 

761 "url", 

762 "publication_year", 

763 "state", 

764 "types", 

765 "publisher", 

766 "rights", 

767 "contributors", 

768 "formats", 

769 "funding_references", 

770 "geo_locations", 

771 ] 

772 depth = 2 

773 

774 @staticmethod 

775 def date_serializer( 

776 date_type: str, 

777 date_start: datetime.date | None = None, 

778 date_end: datetime.date | None = None, 

779 ) -> dict[str, str]: 

780 if not date_start: 

781 return {} 

782 

783 date = {"date_type": date_type, "date_information": ""} 

784 

785 if date_end: 

786 date["date"] = ( 

787 f"{date_start.strftime(DATE_FORMAT)}/{date_end.strftime(DATE_FORMAT)}" 

788 ) 

789 else: 

790 date["date"] = date_start.strftime(DATE_FORMAT) 

791 

792 return date 

793 

794 @staticmethod 

795 def dates_serializer(metadata: Metadata) -> list[dict]: 

796 dates = [MetadataSerializer.date_serializer("Issued", metadata.issued)] 

797 

798 if metadata.collected_start: 

799 dates.append( 

800 MetadataSerializer.date_serializer( 

801 "Collected", metadata.collected_start, metadata.collected_end 

802 ) 

803 ) 

804 

805 # if embargoed, add the availability date too 

806 if metadata.available: 

807 dates.append( 

808 MetadataSerializer.date_serializer("Available", metadata.available) 

809 ) 

810 return dates 

811 

812 @staticmethod 

813 def sizes_serializer(metadata: Metadata) -> list[str]: 

814 return [ 

815 size 

816 for size in [ 

817 metadata.size_information, 

818 metadata.size_increment, 

819 metadata.size_total, 

820 ] 

821 if size 

822 ] 

823 

824 def to_representation(self, instance: Metadata) -> Any: 

825 """Serializes the Metadata object.""" 

826 ret = super().to_representation(instance) 

827 ret["dates"] = self.dates_serializer(metadata=instance) 

828 ret["sizes"] = self.sizes_serializer(metadata=instance) 

829 ret["formats"] = [format_obj["format"] for format_obj in ret.pop("formats", [])] 

830 ret["contributors"] = [ 

831 ParticipantSerializer(instance=contributor.contributor).data 

832 | {"contributor_type": contributor.contributor_type} 

833 for contributor in MetadataContributor.objects.filter(metadata=instance) 

834 ] 

835 ret["doi"] = instance.network.doi 

836 ret["titles"] = TitleSerializer(instance.title_set.all(), many=True).data 

837 ret["creators"] = [ 

838 ParticipantSerializer(instance=metadatacreator.creator).data 

839 for metadatacreator in MetadataCreator.objects.filter(metadata=instance) 

840 .order_by("order") 

841 .only("creator") 

842 ] 

843 ret["rights_list"] = ret.pop("rights") 

844 ret["descriptions"] = [ 

845 DescriptionSerializer(instance=description).data 

846 for description in instance.description_set.all() 

847 ] 

848 ret["funding_references"] = [ 

849 FundingSerializer(instance=funding).data 

850 for funding in instance.fundings.all() 

851 ] 

852 

853 return ret 

854 

855 

856def convert_from_plain_to_nested( 

857 plain_data: list, label: str, external_fields: list | None = None 

858) -> list: 

859 """All the fields present in plain_data, that are not present in external fields, 

860 are moved into another dict, which is itself added with the given label: 

861 ({"a":1,"b1":2, "b2":3}, "b", ["a"]) -> {"a":1,"b":{b1":2, "b2":3}}""" 

862 if external_fields is None: 

863 external_fields = [] 

864 

865 nested_data = [] 

866 for item in plain_data: 

867 nested_item = {} 

868 keys = list(item.keys()) 

869 for k in keys: 

870 if k not in external_fields: 

871 nested_item[k] = item.pop(k) 

872 item.update({label: nested_item}) 

873 nested_data.append(item) 

874 return nested_data 

875 

876 

877class MetadataCreatorDeserializer(serializers.ModelSerializer): 

878 creator = DataciteParticipantSerializer() 

879 

880 class Meta: 

881 model = MetadataCreator 

882 fields = ("creator",) 

883 

884 

885class MetadataDeserializer(serializers.ModelSerializer): 

886 """Deserializes from Datacite json output to DB. One shot import....""" 

887 

888 titles = TitleSerializer(many=True) 

889 creators = MetadataCreatorDeserializer(many=True) 

890 contributors = MetadataContributorSerializer(many=True, required=False) 

891 dates = DateSerializer(many=True) 

892 publisher = PublisherSerializer() 

893 types = ResourceTypeSerializer() 

894 rights_list = RightsSerializer(source="rights", many=True, required=False) 

895 funding_references = FundingSerializer(source="fundings", many=True, required=False) 

896 sizes = serializers.ListField( 

897 child=serializers.CharField(max_length=255), allow_empty=True, required=False 

898 ) 

899 formats = serializers.ListField( 

900 child=serializers.CharField(max_length=255), allow_empty=True, required=False 

901 ) 

902 descriptions = DescriptionSerializer(many=True, required=False) 

903 geo_locations = GeolocationSerializer(many=True, required=False) 

904 

905 class Meta: 

906 model = Metadata 

907 fields = [ 

908 "id", 

909 "url", 

910 "publication_year", 

911 "state", 

912 "types", 

913 "publisher", 

914 "titles", 

915 "creators", 

916 "contributors", 

917 "network", 

918 "dates", 

919 "rights_list", 

920 "funding_references", 

921 "sizes", 

922 "formats", 

923 "descriptions", 

924 "geo_locations", 

925 ] 

926 

927 def __init__(self, **kwargs: Any) -> None: 

928 """Fixes creators and contributors, before init""" 

929 if "data" in kwargs: 

930 if "creators" in kwargs["data"]: 

931 kwargs["data"]["creators"] = convert_from_plain_to_nested( 

932 kwargs["data"]["creators"], "creator" 

933 ) 

934 if "contributors" in kwargs["data"]: 

935 kwargs["data"]["contributors"] = convert_from_plain_to_nested( 

936 kwargs["data"]["contributors"], "contributor", ["contributor_type"] 

937 ) 

938 super().__init__(**kwargs) 

939 

940 def create(self, validated_data: dict) -> Metadata: # noqa: C901 

941 """Creates all the items 1 by 1""" 

942 publisher_data = validated_data.pop("publisher") 

943 rs_data = validated_data.pop("types") 

944 titles_data = validated_data.pop("titles") 

945 creators_data = validated_data.pop("creators") 

946 contributors_data = validated_data.pop("contributors", []) 

947 dates_data = validated_data.pop("dates") 

948 rights_data = validated_data.pop("rights") 

949 fundings_data = validated_data.pop("fundings") 

950 formats_data = validated_data.pop("formats", []) 

951 descriptions_data = validated_data.pop("descriptions", []) 

952 geo_locations_data = validated_data.pop("geo_locations", []) 

953 

954 publisher = PublisherSerializer().create(publisher_data) 

955 

956 validated_data.update(self.extract_dates(dates_data)) 

957 

958 rs, _ = ResourceType.objects.get_or_create(**rs_data) 

959 if self.instance: 

960 metadata = cast("Metadata", self.instance) 

961 Metadata.objects.filter(pk=metadata.pk).update( 

962 **validated_data, publisher=publisher, types=rs 

963 ) 

964 metadata.refresh_from_db() 

965 else: 

966 metadata = Metadata.objects.create( 

967 **validated_data, publisher=publisher, types=rs 

968 ) 

969 metadata.formats.set(self.get_or_create_formats(formats_data)) 

970 # Create titles. If only one and not type, assign as a MainTitle 

971 if len(titles_data) == 1 and not titles_data[0]["title_type"]: 

972 titles_data[0]["title_type"] = TitleTypes.MAIN_TITLE 

973 for title_data in titles_data: 

974 Title.objects.get_or_create(**title_data, metadata=metadata) 

975 # Create Creators in order. 

976 for i, creator_data in enumerate(creators_data): 

977 if creator_data.get("creator", {}): 

978 creator = ParticipantSerializer().create(creator_data.pop("creator")) 

979 metadata.add_ordered_creator(creator, i) 

980 # Create Contributors. 

981 for contributor_data in contributors_data: 

982 if contributor_data.get("contributor", {}): 

983 contributor = ParticipantSerializer().create( 

984 contributor_data.pop("contributor") 

985 ) 

986 MetadataContributor.objects.get_or_create( 

987 metadata=metadata, 

988 contributor=contributor, 

989 contributor_type=contributor_data.pop("contributor_type"), 

990 ) 

991 # Rights and licenses. 

992 for right_data in rights_data: 

993 rights = RightsSerializer().create(right_data) 

994 metadata.rights.add(rights) 

995 # Descriptions. 

996 for description_data in descriptions_data: 

997 Description.objects.get_or_create(metadata=metadata, **description_data) 

998 # Funding. 

999 for funding_data in fundings_data: 

1000 funding = FundingSerializer().create(funding_data) 

1001 metadata.fundings.add(funding) 

1002 # Geolocations. 

1003 for geolocation_data in geo_locations_data: 

1004 GeolocationSerializer().create(geolocation_data | {"metadata": metadata}) 

1005 

1006 return metadata 

1007 

1008 def to_internal_value(self, data: dict) -> Any: 

1009 """Modify sizes format.""" 

1010 validated_data = super().to_internal_value(data) 

1011 

1012 sizes = validated_data.pop("sizes", []) 

1013 

1014 match len(sizes): 

1015 case 1: 

1016 validated_data["size_information"] = sizes[0] 

1017 case 2: 

1018 validated_data["size_information"] = sizes[0] 

1019 validated_data["size_increment"] = sizes[1] 

1020 case 3: 

1021 validated_data["size_information"] = sizes[0] 

1022 validated_data["size_increment"] = sizes[1] 

1023 validated_data["size_total"] = sizes[2] 

1024 case _: 

1025 pass 

1026 

1027 return validated_data 

1028 

1029 @staticmethod 

1030 def get_or_create_formats(data: list) -> list: 

1031 """From a list of formats, get_or_creates each format, 

1032 and returns the list of ids""" 

1033 indexes = [] 

1034 for format_str in data: 

1035 try: 

1036 index = Format.objects.get(format__iexact=format_str) 

1037 except Format.DoesNotExist: 

1038 index = Format.objects.create(format=format_str) 

1039 indexes.append(index.pk) 

1040 return indexes 

1041 

1042 @staticmethod 

1043 def extract_date( 

1044 raw_date: str, label_start: str, label_end: str | None = None 

1045 ) -> dict: 

1046 date = {} 

1047 if re.search(regex_year, raw_date): 

1048 date[label_start] = year_to_date(raw_date) 

1049 elif re.search(regex_date, raw_date): 

1050 date[label_start] = str_date_to_date(raw_date) 

1051 elif re.search(regex_date_range, raw_date) and label_end: 

1052 date_start, date_end = raw_date.split("/") 

1053 date[label_start] = str_date_to_date(date_start) 

1054 date[label_end] = str_date_to_date(date_end) 

1055 return date 

1056 

1057 @staticmethod 

1058 def extract_dates(dates_data: list) -> dict: 

1059 dates = {} 

1060 

1061 for date in dates_data: 

1062 # Issued date 

1063 if "date_type" in date and date["date_type"] == "Issued": 

1064 dates.update(MetadataDeserializer.extract_date(date["date"], "issued")) 

1065 # no date range is present in Datacite prod. 

1066 # Collected date 

1067 if "date_type" in date and date["date_type"] == "Collected": 

1068 dates.update( 

1069 MetadataDeserializer.extract_date( 

1070 date["date"], "collected_start", "collected_end" 

1071 ) 

1072 ) 

1073 # Available date 

1074 elif "date_type" in date and date["date_type"] == "Available": 

1075 dates.update( 

1076 MetadataDeserializer.extract_date(date["date"], "available") 

1077 ) 

1078 

1079 return dates