mongodb¶
destino
grava documentos em uma collection mongodb. suporta insert simples, replace total da collection e upsert por chave de negócio.
casos de uso¶
- sincronizar dados processados de volta para uma collection operacional
- popular uma collection com dados enriquecidos de outras fontes
- manter uma collection de cache atualizada via pipeline agendado
configuração¶
| campo | tipo | obrigatório | descrição |
|---|---|---|---|
credential_id |
uuid | sim | id da conexão mongodb em conexões |
database |
string | sim | banco de dados de destino |
collection |
string | sim | collection de destino |
write_mode |
enum | não | insert, replace, upsert · padrão: insert |
upsert_key |
string | sim* | campo usado como chave de upsert |
upsert_keys |
lista | sim* | alternativa: lista de campos para chave composta |
*obrigatório quando write_mode = upsert.
modos de escrita¶
insert¶
insere todos os registros como novos documentos. usa insert_many com ordered=false.
{
"credential_id": "uuid",
"database": "app_producao",
"collection": "eventos",
"write_mode": "insert"
}
ordered=false
com ordered=false, a inserção continua mesmo se um documento falhar (ex: chave duplicada). os erros individuais são ignorados silenciosamente.
replace¶
apaga todos os documentos da collection (drop) e reinsere os dados do upstream.
{
"credential_id": "uuid",
"database": "app_producao",
"collection": "catalogo_produtos",
"write_mode": "replace"
}
replace apaga tudo
drop() é executado antes da reinserção. use apenas quando a collection representa um snapshot completo que será totalmente substituído.
upsert¶
atualiza documentos com chave coincidente via $set e insere os novos. usa bulk_write com UpdateOne e upsert=True.
{
"credential_id": "uuid",
"database": "app_producao",
"collection": "clientes",
"write_mode": "upsert",
"upsert_key": "cliente_id"
}
upsert com chave composta:
{
"credential_id": "uuid",
"database": "analytics",
"collection": "metricas_diarias",
"write_mode": "upsert",
"upsert_keys": ["data", "canal"]
}
fluxo de execução¶
duckdb (tabela upstream)
↓ SELECT * FROM upstream_table
dataframe pandas → list[dict]
↓ pymongo insert_many / bulk_write
collection mongodb
conversão de tipos¶
dados do duckdb são convertidos para tipos python antes de serem inseridos:
| duckdb | mongodb |
|---|---|
| string | string |
| int / float | number |
| bool | boolean |
| timestamp | datetime |
| null | null |
| struct / list | document aninhado / array |
campo _id
se os dados contêm um campo _id, ele é usado como identificador do documento. se não contêm, o mongo gera um ObjectId automaticamente.
branch developer¶
em pipelines no branch developer, este conector é bloqueado. as escritas são redirecionadas para o lakehouse interno de staging.