En las aplicaciones modernas basadas en datos, la conservación de documentos históricos es esencial para el cumplimiento de normativas, la auditoría y la optimización de costes. Sin embargo, conservar todos los datos indefinidamente en su base de datos operativa principal suele ser insostenible y caro.
En esta entrada del blog, te guiaré a través de la construcción de un canal de archivo totalmente sin servidor que traslada automáticamente documentos de Couchbase a Amazon S3utilizando Eventos de Couchbase, Puerta de enlace API de Amazon, SNSy AWS Lambda. La arquitectura demuestra cómo aprovechar desacoplamiento asíncrono para mejorar la resistencia, la escalabilidad y el rendimiento.
Al final de este tutorial, tendrás una solución robusta, de extremo a extremo, que reacciona a las mutaciones de documentos o vencimientos basados en TTL en Couchbase y los archiva de manera eficiente en S3-sin ninguna intervención manual.
Arquitectura general
Este es el aspecto de la arquitectura:

Flujo:
-
- Couchbase detecta una condición en un documento (como la expiración del TTL o un
archivo: true
bandera). - Una función de Couchbase Eventing dispara y envía el documento a una API Gateway.
- API Gateway reenvía el documento a un tema SNS.
- SNS invoca una función Lambda suscrita al tema.
- Lambda escribe el documento JSON completo en un bucket de S3 utilizando una estructura de carpetas basada en fechas.
- Couchbase detecta una condición en un documento (como la expiración del TTL o un
Esta configuración está desacoplada, es escalable y no requiere ningún sondeo.
¿Por qué Couchbase Eventing para archivo?
Eventos de Couchbase proporciona una forma nativa de activar la lógica de negocio en respuesta a las mutaciones de documentos (creaciones, actualizaciones, eliminaciones) o expiraciones.
Con Eventing, podemos:
-
- Supervisar tipos de documentos o campos específicos (como
archivo === true
y/otipo === registros
) - Reaccione en tiempo real a los vencimientos de TTL
- Envío de datos a servicios externos (como AWS) mediante llamadas HTTP
- Supervisar tipos de documentos o campos específicos (como
Escribiendo la función Couchbase Eventing
Este es un ejemplo simplificado de la función Couchbase Eventing que usamos para archivar documentos. La función implementa lógica para manejar dos escenarios principales:
-
- Archivo basado en TTL: Cuando un documento tiene un
caducidad
establecido, registramos un temporizador que se dispara 60 segundos antes del TTL. Una vez que el temporizador expira, elDocTimerCallback
que, a su vez, llama a la funciónpublicar(doc, meta)
para archivar el documento. - Archivo basado en indicadores: Alternativamente, si un documento incluye el campo
archivo: true
la función llama inmediatamente apublicar(doc, meta)
para archivar el documento.
- Archivo basado en TTL: Cuando un documento tiene un
En ambos casos, el documento se envía a una pasarela de API externa para su archivo. Si el estado de la respuesta de la API es 200
o 302
el documento se elimina explícitamente del bucket de origen, completando así el flujo de trabajo de archivado. Esto proporciona un mecanismo flexible para archivar documentos bajo demanda o mediante la automatización basada en TTL.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
función OnUpdate(doc, meta) { si (doc.archivo && doc.archivo === verdadero){ registro(Archivar documento con ID:, meta.id); var estado = publicar(doc, meta); si (estado === verdadero) { borrar src[meta.id]; } si no { registro('Publicación fallida, el documento no se borrará:', meta.id); } } si no si (meta.caducidad > 0){ var nMinsPrior = nuevo Fecha((meta.caducidad - 60) * 1000); var currentTime = nuevo Fecha().getTime(); registro(Diferencia de tiempo (ms): ', currentTime - nMinsPrior); si (currentTime > nMinsPrior) { registro('Dentro de 1 minuto de expiración TTL, archivar:', meta.id); var publishStatus = publicar(doc, meta); } si no { registro('Temporizador fijado para futuros archivos:', meta.id); crearTiempo(DocTimerCallback, nMinsPrior, meta.id, meta.id); } } si no { registro('No se cumplen las condiciones de archivo para:', meta.id); devolver; } } función DocTimerCallback(contexto) { var doc = src[contexto]; si (doc) { var meta = { id: contexto }; var publishStatus = publicar(doc, meta); } si no { registro(Error en la llamada al temporizador: documento no encontrado para:', contexto); } } función publicar(doc, meta) { pruebe { var solicitar = { ruta: "archivo, cabeceras: { Tipo de contenido: aplicación/json }, cuerpo: { ...doc, id: meta.id } }; registro("Enviando solicitud:", solicitar); var respuesta = rizo(POST, archivo2S3, solicitar); si (respuesta.estado === 200 || respuesta.estado === 302) { registro("Éxito de publicación para:", meta.id, " Cuerpo de la respuesta:", respuesta.cuerpo); devolver verdadero; } si no { registro("Publicación fallida con estado:", respuesta.estado, " Cuerpo de la solicitud:", solicitar); devolver falso; } } captura (e) { registro("Excepción durante la publicación:", e); devolver falso; } } |
Nota: Por razones de rendimiento, recomendamos comentar todos los caracteres
log()
que se muestran arriba. Estos registros se incluyeron principalmente con fines de depuración y desarrollo. El exceso de registros en entornos de producción puede afectar al rendimiento y aumentar los costes de almacenamiento de registros.
Así es como definimos la configuración y los enlaces, mientras creábamos la función de eventos.
Hit Siguiente
para crear enlaces. Aquí es donde enlazaremos el endpoint de nuestra API Gateway, a un alias archivo2S3
y también utiliza el cubo de origen como alias src
. Tenga en cuenta que hemos utilizado el permiso de Lectura/Escritura para nuestro cubo de origen, ya que queremos que los datos sean purgados de allí.
Hit Siguiente
de nuevo y copiar/pegar la función JS de arriba en la ventana y Guardar
. En este punto su función está guardada pero no desplegada. Pulse tres puntos y seleccione Despliegue
para ejecutar la función eventing. Así es como se vería una vez que la función se está ejecutando.
Creación de la función lambda para archivar en S3
La función Lambda consume el mensaje SNS y archiva el JSON completo en un bucket de S3, organizado por fecha.
Ejemplo de código Lambda
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
importar boto3 importar json de datetime importar datetime s3 = boto3.cliente('s3') nombre_cubo = 'tu-s3-archivo-bucket' def lambda_handler(evento, contexto): para registro en evento[Registros]: msg = json.cargas(registro[Sns][Mensaje]) doc_id = msg[id] contenido = msg ahora = datetime.utcnow() carpeta = f"{ahora.año}/{ahora.mes}/{ahora.día}" clave = f"{folder}/{doc_id}.json" s3.poner_objeto( Cubo=nombre_cubo, Clave=clave, Cuerpo=json.vuelca(contenido), Tipo de contenido=aplicación/json ) devolver { 'statusCode': 200, "cuerpo: Archivado correctamente. } |
Esto resulta en un objeto S3 como:
1 |
s3://your-s3-archive-bucket/2025/6/29/log123.json |
Utilizar el SNS para desacoplar la activación del archivo
SNS (Servicio simple de notificación) permite que varios servicios reciban el mismo mensaje. Aquí, pasa la solicitud de archivo a una función Lambda.
Pasos:
-
- Crear un tema, por ejemplo
ArchivoTriggerTopic
- Permitir que API Gateway publique en él a través de IAM
- Suscribir la función Lambda
- Crear un tema, por ejemplo

Fijación de permisos
Asegúrese de que se permite a API Gateway publicar en el tema SNS mediante una política de confianza y acceso:
Política de confianza para snsAccess
Papel:
1 2 3 4 5 6 7 8 |
{ "Efecto": "Permitir", "Principal": { "Servicio": "apigateway.amazonaws.com" }, "Acción": "sts:AssumeRole" } |
Política de acceso para el rol:
1 2 3 4 5 6 |
{ "Efecto": "Permitir", "Acción": "sns:Publicar, "Recurso": "arn:aws:sns:us-east-1:account-id:ArchiveTriggerTopic" } |
Configuración de API Gateway para aceptar solicitudes de archivo
API Gateway actúa como nuestro punto final público que recibe las solicitudes de archivo y las reenvía a SNS.
Pasos clave:
-
- Crear un API REST en API Gateway.
Seleccione una de las siguientes opciones API REST
ya que proporciona integración con el SNS
servicio.
Dar un Nombre API
y seleccione Tipo de punto final de la API
como Regional
. Hit Crear API
botón.
-
- Establezca un
POST /archivo
ruta.
- Establezca un
En la próxima Recursos
cree un recurso pulsando Crear recurso
del panel izquierdo.

Nombre del recurso
. Estoy llamando a mi nombre de recursos para ser archivo
.
Hit Crear recurso
botón . En la página siguiente, en la sección Métodos
golpe de panel Crear método
. Esto nos permitirá asignar un montón de configuraciones a nuestro método POST y detalles sobre nuestra SNS servicio como lo que Región AWS
se está ejecutando, ARN del Función IAM
que tiene el permiso necesario para publicar en el tema SNS.

TemaArn
. Además, mapearemos Mensaje
a method.request.body
que contendrá la carga útil completa de nuestro documento JSON.

Guardar
botón.
Enhorabuena, acaba de desplegar un Pasarela API que puede enviar su documento JSON a la base de datos SNS Temaque, en última instancia, desencadena el Lambda para escribirlo en S3.
Probar el flujo de extremo a extremo
Puede probar su canalización de dos maneras:
Pruebe primero el método POST de la API
-
- Pulsa el botón
Prueba
y enviar un simple JSON, conid
campo como debe.
- Pulsa el botón
Cuando pulse el botón Prueba
asegúrese de que la traza del registro no muestra ningún error y el estado de la respuesta es 200. En este punto tenemos nuestro endpoint API funcionando correctamente. A continuación probaremos este servicio desde curl.
Desde curl o Postman
1 2 3 4 5 6 7 8 9 |
rizo -X POST https://your-api-gateway-url/archive \ -H "Content-Type: application/json" \ -d '{ "id": "hotel::10025", "tipo": "Hotel", "mensaje": "Archivar vía curl", "archivo": true }' |
Tras la activación, compruebe que se ha creado el objeto correspondiente en su bucket de S3.
Desde Capella utilizando Query Workbench
Para probar la configuración desde el Capellainserte un documento con un Valor TTL de 120 segundos.
1 2 3 |
UPSERT EN a granel.datos.fuente (CLAVE, VALOR) VALORES ("test::001", {"tipo": "test", "campo": "valor"}, {"caducidad": 2*60}); |
Ejecute el comando SQL anterior desde el query workbench y espere a que aparezca el documento en el configurado Cubo S3 aproximadamente 60 segundos antes de su expiraciónya que la función Eventing establece un temporizador para activar un minuto antes del TTL.

Solución de problemas
He aquí algunos problemas comunes y cómo solucionarlos:
ValidationError: el mensaje no debe ser nulo
-
- Esto suele significar que el
Mensaje
enviado al SNS está vacío. - Asegúrese de que su Plantilla de asignación de API Gateway está extrayendo correctamente el cuerpo.
- Esto suele significar que el
API Gateway no tiene permiso para asumir el rol
-
- Confirme que su rol IAM tiene el política de confianza.
- El papel debe permitir
apigateway.amazonaws.com
servicio para asumirlo.
Content-Type incorrecto en la solicitud API
-
- API Gateway sólo aplica plantillas de asignación cuando el tipo de contenido es
application/json
. - Asegúrese de que la función Couchbase Eventing (o Postman) establece esta cabecera.
- API Gateway sólo aplica plantillas de asignación cuando el tipo de contenido es
SNS recibe JSON escapado o malformado
-
- Compruebe el uso de
$util.escapeJavaScript($input.body)
en la plantilla de asignación. - Un escape incorrecto puede causar problemas en el análisis posterior de Lambda.
- Compruebe el uso de
Registros de CloudWatch para inspeccionar Lambda
-
- Supervisar la traza de ejecución de la función Lambda para confirmar que todo se ha ejecutado como se esperaba.
Mejoras y buenas prácticas
-
- Utilice variables de entorno en Lambda para el nombre del bucket de S3 y la región.
- Activar Cifrado del servidor S3 (SSE-S3 o SSE-KMS) para su cumplimiento.
- Encender Versionado S3 para conservar copias históricas.
- Añadir Alarmas de CloudWatch para errores de Lambda o API Gateway 5XX.
- Utilice Abanico SNS para notificar a consumidores adicionales (por ejemplo, Kinesis, otras Lambdas).
- Considere la posibilidad de sustituir SNS por integración directa de Lambda si sólo tienes un consumidor y quieres permisos simplificados.
Conclusión
En esta entrada de blog, construimos una robusta canalización de archivo de documentos en tiempo real utilizando:
-
- Couchbase Eventing para detectar documentos archivables
- API Gateway para exponer un punto final público
- SNS para desvincular a productores y consumidores
- Lambda para procesar y guardar documentos en S3
Esta arquitectura no requiere ningún servidor, se escala sin esfuerzo y es una forma rentable de descargar datos históricos para su conservación, cumplimiento o análisis.
Recursos
Para ayudarle a profundizar y ampliar sus conocimientos sobre las tecnologías utilizadas en esta canalización, he aquí algunos recursos valiosos: