Ingestions
Ingestions pull external source lists into the canonical persons table.
Each source owns its raw API shape and mapping logic, then hands normalized
rows to the shared ingestion runner.
Files
app/lib/ingestions/types.ts: shared normalized ingestion contract.app/lib/ingestions/actions.ts: run lifecycle, dedupe planning, DB writes.app/lib/ingestions/registry.ts: list of enabled ingestion sources.app/lib/ingestions/scheduled.ts: runs all registered sources.app/lib/ingestions/sources/*.ts: one file per external source.drizzle/0021_ingestions.sql: run log table.workers/app.ts: Cloudflare scheduled handler.wrangler.jsonc: cron trigger config.
Source Pattern
Each source file should export one IngestionSource object:
export const pacientesTerremotoSource = {
handle: SOURCES.PACIENTES_TERREMOTO_VENEZUELA.handle,
label: SOURCES.PACIENTES_TERREMOTO_VENEZUELA.label,
async ingest(context) {
const raw = await fetchSourceRows(context.cursor)
const rows = raw.map(mapPerson)
return ingestPersons(context, {
source: SOURCES.PACIENTES_TERREMOTO_VENEZUELA.handle,
fetchedCount: raw.length,
skippedCount,
rows,
cursor,
})
},
} satisfies IngestionSource
Keep raw source types in the source file:
interface SourcePerson {
id: string
full_name: string
updated_at: string
}
Do not add Dto suffixes. Use short source-local names like SourcePerson,
SourceHospital, and mapPerson.
Normalized Row
Sources map raw records into IngestionPerson. This is intentionally close to
the persons table because the shared runner performs one dedupe and write
path for every source.
Required identity fields:
sourcesource_idfirst_namelast_namenational_idwhen availablehospitalwhen the row is hospital-basedfound_atcreated_atupdated_at
Dedupe
The shared runner applies matches in this order:
- Existing
sources.source + sources.source_id. - Existing active person with the same
national_id. - Strong identity match for no-cedula rows: same hospital and name tokens, plus same age, gender, date, or sparse source data.
- Insert when no match exists.
Within the same source run, duplicate source IDs, repeated national IDs, and repeated strong identities are skipped.
Run Log
Every non-dry run creates one ingestions row:
- starts as
in_progress - ends as
completedwith counts and cursor - ends as
failedwithmessageif the source throws
Counts:
fetched_count: raw source rows fetchedcreated_count: newpersonsrows plannedupdated_count: existingpersonsrows plannedskipped_count: source-level skips plus dedupe skips
Cursor
The runner passes the latest completed cursor to each source. Sources decide
how to use it. Pacientes Terremoto uses updated_at > cursor.
Dry runs do not read or write ingestion cursors.
Schedule
Cloudflare Cron runs hourly:
"triggers": {
"crons": ["0 * * * *"]
}
The Worker handler calls:
runScheduledIngestions(env.DB)
Production starts after both are true:
- Remote D1 migrations are applied.
- The Worker is deployed.
Adding A Source
- Add the source to
SOURCESinapp/lib/constants.ts. - Create
app/lib/ingestions/sources/<source>.ts. - Define source-local raw interfaces.
- Fetch raw rows and map them into
IngestionPerson. - Call
ingestPersons(context, ...). - Register the source in
app/lib/ingestions/registry.ts. - Run typecheck, tests, and a dry run where possible.
Manual Checks
Local dry run for Pacientes Terremoto:
bun scripts/migrate-pacientes-terremoto.ts --dry-run
Remote migration status:
bunx wrangler d1 migrations list venezuela-te-busca-database --remote
Recent ingestion rows:
bunx wrangler d1 execute venezuela-te-busca-database --remote \
--command "SELECT source, status, fetched_count, created_count, updated_count, skipped_count, message, started_at, finished_at FROM ingestions ORDER BY created_at DESC LIMIT 10;"