Mi experiencia en el mundo de Big Data - Parte I

David Emmanuel Reyes Núñez

Senior Data Engineer

Hace casi dos años (septiembre 2019), no sabía absolutamente nada de tecnologías Big Data, hoy sé que, aunque ya conozco y he interactuado con algunas de ellas, me falta mucho camino por recorrer y muchas cosas por aprender. Todo empieza confiando en ti y a veces necesitas el impulso de alguien que confíe en ti.

 Me ha gustado tanto este tipo de tecnologías que incluso en este par de años, he podido tomar un par de diplomados y concluir recientemente una Maestría en Análisis y Visualización de Datos.

En esta ocasión quiero compartir una de las experiencias que he tenido con estas tecnologías, la cual se trata de realizar ingestas de archivos hacia Google BigQuery  y Google Cloud Storage, utilizando Google Drive como repositorio fuente.

¿Qué necesitamos?

En este primer articulo haremos uso de Google Drive API y Python 2.6 o superior.

Necesitamos también tener proyecto de Google Cloud Platform con la API habilitada. Para crear un proyecto y habilitar una API, haz clic aquí

Debemos ver una pantalla similar a esta:

Seleccionamos el tipo de aplicación que necesitamos y se nos genera un archivo JSON, llamado credentials.json, que podemos descargar y ubicar en algún directorio que sea sencillo de identificar:

También necesitamos credenciales de autorización para una aplicación de escritorio. Para aprender a crear credenciales para una aplicación de escritorio, haz clic aquí.

Ejecutamos el siguiente comando en la consola para instalar las librerías que utilizaremos:

pip3 install google-api-python-client google-auth-httplib2 google-auth-oauthlib 

Una vez que tenemos configurado nuestro ambiente, comenzamos a crear los módulos de nuestra aplicación.

  • Archivo de Configuración

Para nuestro primer script de Python, necesitamos un archivo que guarde nuestra configuración de rutas y elementos, podemos llamarlo config.ini, y lo llenaremos como muestra el ejemplo que sigue:

[GENERAL_CONFIG] —secciónKey_file_location= path del archive json de credencialesscopes = https://www.googleapis.com/auth/drive https://www.googleapis.com/auth/cloud-platform (los scopes sirven para identificar las APIs que usaremos) CompressionLevel = 9ForwardX11 = yes [DRIVE_API] — en esta sección colocamos los id’s del Drive al que nos queremos conectar, se encuentra ubicado después del ultimo slash (/) de la URL:

drive_id = 1-jrMx9oDTOO9eN7ZGcU5tSZVTKtD
folder_2 = 1i70h0VBdL0Gzw9xR9FiO2gfBFxSQz 

Creamos el script de Python que leera nuestro archive config.ini mediante el siguiente código de Python:

import configparser
import datetime

def readConfig():
    #Obtenemos la fecha del sistema, que nos servirá para nombrar el archivo de Log
    fileDate = datetime.datetime.today().strftime('%Y-%m-%d')

    config = configparser.ConfigParser()
    config.read('./config/Config.ini') #path donde se ubica el archivo.ini
    general_config=config['GENERAL_CONFIG'] #referencia a la seccion
    drive_api=config['DRIVE_API']
    
    readConfig.conf_key_file=general_config['key_file_location']
    readConfig.scopes=general_config['scopes']
    
    readConfig.team_drive=drive_api['team_drive']
    
    #generamos el nombre para el archivo de log, con la fecha de sistema
    fileLogMain='MyLogFile.log'
    sfileLogMain =fileLogMain.split('.')
    fileLogMain=sfileLogMain[0]+'_'+fileDate+'.'+sfileLogMain[1]
 
  • Archivo de parámetros

Aquí es donde comenzamos a hablar de GCP.  Si queremos descargar archivos específicos de nuestro drive, tenemos que crear un archivo en donde le indiquemos el nombre, la extensión, el proyecto GCP de destino, el bucket de GS donde lo subiremos y el nombre de la tabla que creará en bigQuery.

Creamos un archivo parameters.csv con la siguiente estructura:

Nombre_Archivo|Proyecto_GCP|Bucket_GCP|DataSet_BQ|Tabla_BQ|Path_GS|Status

Archivo1.csv|mi_proyecto|mi_bucket|raw_data|mi_tabla_bq|path/gs|1
Archivo2.csv|mi_proyecto|mi_bucket2|raw_data|mi_tabla_bq|path2/gs|0
 

La última columna, indica el status para saber si se descargará o no el archivo (1=Activo, 0=Inactivo)

Nota: Debemos contar con permisos y credenciales para los proyectos a los que queramos subir el archivo, este tema lo iremos abordando en siguientes entregas. Ahora solo nos limitaremos a la parte de descargar un archivo desde Google Drive hacia nuestro disco local.

El script de Python para leer y descargar los archivos es el siguiente:

#Librerias para lectura del archivo, autenticación de servicios y lectura de archivos csv
from modules.readConfig import readConfig
from modules.auth import get_service
from modules.processDriveFiles import get_FoldersList
import logging
import csv
import os,glob

def readProperties():
    #Configuramos los niveles de logging
    logging.basicConfig(level=logging.INFO,
                        format='%(asctime)s %(name)-8s %(levelname)-8s %(message)s',
                        datefmt='%m-%d %H:%M',
                        filename=readConfig.log_file_main,
                        filemode='a')
    logging.info('Inicio...')
    #Comienza la lectura del archivo de parámetros
   For row in reader:
    with open(readConfig.params_file, 'r', encoding='utf-8') as f:
   
        reader = csv.DictReader(f, delimiter='|')
        props = {}
        api_name='drive'
        api_version='v3'
        scopes=[readConfig.scopes]
        key_file_location=readConfig.conf_key_file 


        #Validamos que el archivo que queremos esté activo para poder descargarlo y guardamos los parámetros en un arreglo.
        if row['Status']==1:
                props ={'nombre_archivo':row['Nombre_Archivo],'proyecto':row['Proyecto_GCP'],'DataSet_BQ':row['DataSet_BQ'],'Tabla':row['Tabla_BQ'],'Bucket_GCP':row['Bucket_GCP'],'Path_GS':row['Path_GS'], 'Status':row['Status'

#Funciones para autenticación y obtener el listado de folders
creds = get_service(api_name,api_version,scopes,key_file_location,
                        props) 
        get_FoldersList(creds,props)
        f.close()



#funcion get_service (ubicada en el módulo auth.py) Esta función nos servirá para seleccionar el archivo de credenciales que usaremos para autenticarnos en Google drive
def get_service(api_name, api_version, scopes, key_file_location,props):
        key_file_location='./auth/driveCredentials.json'
        
credentialsDrive = service_account.Credentials.from_service_account_file(
            key_file_location, scopes=scopes)


#funcion get_FoldersList. Una vez autenticados, nos permitirá listar los folders de nuestro Google Drive de acuerdo con los parámetros que le enviemos. Esta contenida en el script processDriveFiles.py



def get_FoldersList(creds,props):

    # Obtiene un listado de los Google Drive folders
        done = True
        logging.info('Ejecutando consulta: ')
        query = "mimeType!='application/vnd.google-apps.folder' and name='"+props.get('archivo_origen')+"' and parents!='"+readConfig.success_drive+"' and parents!='"+readConfig.failure_drive+"' and starred=False and trashed=False"
        response = creds.get('service').files().list(
                q=query,            
                fields='*',  
                orderBy='folder',
                driveId=readConfig.team_drive,                
                corpora='drive',
                includeItemsFromAllDrives=True,
                supportsAllDrives=True).execute()

        items = response.get('files', [])

        if not items:
                logging.info('No se han encontrado archivos')
        else:
                for item in items:
                #descargamos el archivo
                    file_id = item['id']
                    fh = io.FileIO(item['name'],'w')
                    
                    request = creds.get('service').files().get_media(fileId=file_id)
                    fh = io.FileIO(item['name'],'w')
                    downloader = MediaIoBaseDownload(fh, request)
                    done = False
				
       statusdw=0 
       #Se realiza la descarga
       while (done is False and int(statusdw)<100):
              file_ext=''
              status, done = downloader.next_chunk()
              logging.info('status: ')
              statusdw=(int(status.progress())*100)  
              logging.info(int(statusdw))
              logging.info('Descargando: '+item['name']+" %d%%." % int(statusdw))                      
              logging.info('Descargando: '+item['name']+" %d%%." % int(status.progress() * 100))
 

En esta parte, ya debemos ver nuestro archive descargado en el directorio local de la aplicación.      

Esta es la primera entrega de nuestro administrador de archivos, el objetivo de este desarrollo es llevar archivos desde Google Drive hacia Google Cloud. En la siguiente entrega continuaremos con el código de la función processDriveFiles.py y crearemos los scripts para hacer la carga de archivos hacia Google Cloud.

¿Quieres saber más de lo que ofrecemos y ver otros casos de éxito?