Skip to main content

Ingestions

Ingestions pull external source lists into the canonical persons table. Each source owns its raw API shape and mapping logic, then hands normalized rows to the shared ingestion runner.

Files

  • app/lib/ingestions/types.ts: shared normalized ingestion contract.
  • app/lib/ingestions/actions.ts: run lifecycle, dedupe planning, DB writes.
  • app/lib/ingestions/registry.ts: list of enabled ingestion sources.
  • app/lib/ingestions/scheduled.ts: runs all registered sources.
  • app/lib/ingestions/sources/*.ts: one file per external source.
  • drizzle/0021_ingestions.sql: run log table.
  • workers/app.ts: Cloudflare scheduled handler.
  • wrangler.jsonc: cron trigger config.

Source Pattern

Each source file should export one IngestionSource object:

export const pacientesTerremotoSource = {
handle: SOURCES.PACIENTES_TERREMOTO_VENEZUELA.handle,
label: SOURCES.PACIENTES_TERREMOTO_VENEZUELA.label,
async ingest(context) {
const raw = await fetchSourceRows(context.cursor)
const rows = raw.map(mapPerson)
return ingestPersons(context, {
source: SOURCES.PACIENTES_TERREMOTO_VENEZUELA.handle,
fetchedCount: raw.length,
skippedCount,
rows,
cursor,
})
},
} satisfies IngestionSource

Keep raw source types in the source file:

interface SourcePerson {
id: string
full_name: string
updated_at: string
}

Do not add Dto suffixes. Use short source-local names like SourcePerson, SourceHospital, and mapPerson.

Normalized Row

Sources map raw records into IngestionPerson. This is intentionally close to the persons table because the shared runner performs one dedupe and write path for every source.

Required identity fields:

  • source
  • source_id
  • first_name
  • last_name
  • national_id when available
  • hospital when the row is hospital-based
  • found_at
  • created_at
  • updated_at

Dedupe

The shared runner applies matches in this order:

  1. Existing sources.source + sources.source_id.
  2. Existing active person with the same national_id.
  3. Strong identity match for no-cedula rows: same hospital and name tokens, plus same age, gender, date, or sparse source data.
  4. Insert when no match exists.

Within the same source run, duplicate source IDs, repeated national IDs, and repeated strong identities are skipped.

Run Log

Every non-dry run creates one ingestions row:

  • starts as in_progress
  • ends as completed with counts and cursor
  • ends as failed with message if the source throws

Counts:

  • fetched_count: raw source rows fetched
  • created_count: new persons rows planned
  • updated_count: existing persons rows planned
  • skipped_count: source-level skips plus dedupe skips

Cursor

The runner passes the latest completed cursor to each source. Sources decide how to use it. Pacientes Terremoto uses updated_at > cursor.

Dry runs do not read or write ingestion cursors.

Schedule

Cloudflare Cron runs hourly:

"triggers": {
"crons": ["0 * * * *"]
}

The Worker handler calls:

runScheduledIngestions(env.DB)

Production starts after both are true:

  1. Remote D1 migrations are applied.
  2. The Worker is deployed.

Adding A Source

  1. Add the source to SOURCES in app/lib/constants.ts.
  2. Create app/lib/ingestions/sources/<source>.ts.
  3. Define source-local raw interfaces.
  4. Fetch raw rows and map them into IngestionPerson.
  5. Call ingestPersons(context, ...).
  6. Register the source in app/lib/ingestions/registry.ts.
  7. Run typecheck, tests, and a dry run where possible.

Manual Checks

Local dry run for Pacientes Terremoto:

bun scripts/migrate-pacientes-terremoto.ts --dry-run

Remote migration status:

bunx wrangler d1 migrations list venezuela-te-busca-database --remote

Recent ingestion rows:

bunx wrangler d1 execute venezuela-te-busca-database --remote \
--command "SELECT source, status, fetched_count, created_count, updated_count, skipped_count, message, started_at, finished_at FROM ingestions ORDER BY created_at DESC LIMIT 10;"