Data Extraction

Data Extraction 


Data engineering often involves extracting data from various sources, which can include databases, APIs, web scraping, log files, and more. Below are examples of data extraction from different sources using Python. Keep in mind that for each source, you might need to install specific libraries and handle authentication and permissions accordingly.

From Relational Databases

    From MySql

pip install mysql-connector-python  
import mysql.connector

# Connection parameters
db_params = {
    'host': 'your_host',
    'database': 'your_database',
    'user': 'your_user',
    'password': 'your_password',
}

# Establishing a connection
conn = mysql.connector.connect(**db_params)
cursor = conn.cursor()

# Example query
query = "SELECT * FROM your_table;"

# Executing the query
cursor.execute(query)

# Fetching all rows
rows = cursor.fetchall()

# Closing the cursor and connection
cursor.close()
conn.close()

# Printing the results
for row in rows:
    print(row)

    From PostgreSQL


pip install psycopg2-binary
import psycopg2

# Connection parameters
db_params = {
    'host': 'your_host',
    'database': 'your_database',
    'user': 'your_user',
    'password': 'your_password',
}

# Establishing a connection
conn = psycopg2.connect(**db_params)
cursor = conn.cursor()

# Example query
query = "SELECT * FROM your_table;"

# Executing the query
cursor.execute(query)

# Fetching all rows
rows = cursor.fetchall()

# Closing the cursor and connection
cursor.close()
conn.close()

# Printing the results
for row in rows:
    print(row)

    From Microsoft SQL Server

pip install pyodbc

import pyodbc

# Connection parameters
server = 'your_server'
database = 'your_database'
username = 'your_username'
password = 'your_password'
driver = '{SQL Server}'

# Establishing a connection
connection_string = f"DRIVER={driver};SERVER={server};DATABASE={database};UID={username};PWD={password}"
conn = pyodbc.connect(connection_string)
cursor = conn.cursor()

# Example query
query = "SELECT * FROM your_table;"

# Executing the query
cursor.execute(query)

# Fetching all rows
rows = cursor.fetchall()

# Closing the cursor and connection
cursor.close()
conn.close()

# Printing the results
for row in rows:
    print(row)

    From SQLite

# No need to install sqlite3 separately

import sqlite3

# Connection parameters
db_path = 'your_database.db'

# Establishing a connection
conn = sqlite3.connect(db_path)
cursor = conn.cursor()

# Example query
query = "SELECT * FROM your_table;"

# Executing the query
cursor.execute(query)

# Fetching all rows
rows = cursor.fetchall()

# Closing the cursor and connection
cursor.close()
conn.close()

# Printing the results
for row in rows:
    print(row)

    From Oracle Database 

pip install cx_Oracle

import cx_Oracle

# Connection parameters
db_params = {
    'user': 'your_username',
    'password': 'your_password',
    'dsn': 'your_dsn',  # Oracle Database connection string
}

# Establishing a connection
conn = cx_Oracle.connect(**db_params)
cursor = conn.cursor()

# Example query
query = "SELECT * FROM your_table"

# Executing the query
cursor.execute(query)

# Fetching all rows
rows = cursor.fetchall()

# Closing the cursor and connection
cursor.close()
conn.close()

# Printing the results
for row in rows:
    print(row)

From NoSQL Databases

    From MongoDB


pip install pymongo
from pymongo import MongoClient

# Connection parameters
mongo_params = {
    'host': 'your_mongo_host',
    'port': 27017,  # default MongoDB port
    'username': 'your_username',
    'password': 'your_password',
    'authSource': 'your_authentication_database',
    'authMechanism': 'SCRAM-SHA-256'  # or 'MONGODB-CR' for older versions
}

# Establishing a connection
client = MongoClient(**mongo_params)

# Selecting the database and collection
db = client['your_database']
collection = db['your_collection']

# Example query (find all documents)
query = {}

# Executing the query
cursor = collection.find(query)

# Printing the results
for document in cursor:
    print(document)

# Closing the MongoDB connection
client.close()

    From Cassandra


pip install cassandra-driver
from cassandra.cluster import Cluster

# Connection parameters
cluster = Cluster(['your_cassandra_host'])
session = cluster.connect('your_keyspace')

# Example query
query = "SELECT * FROM your_table;"

# Executing the query
rows = session.execute(query)

# Printing the results
for row in rows:
    print(row)

# Closing the connection
cluster.shutdown()

    From CouchDB


pip install requests
import requests

# CouchDB server and database information
couchdb_url = 'http://your_couchdb_server:5984'
database_name = 'your_database'

# Example HTTP request to get all documents from the database
url = f'{couchdb_url}/{database_name}/_all_docs'
response = requests.get(url)

# Check if the request was successful (status code 200)
if response.status_code == 200:
    data = response.json()
    rows = data.get('rows', [])

    # Extracting and printing document IDs
    for row in rows:
        doc_id = row.get('id')
        print(f"Document ID: {doc_id}")

else:
    print(f"Error: {response.status_code} - {response.text}")

    From Redis


pip install redis
import redis

# Connection parameters
redis_host = 'your_redis_host'
redis_port = 6379  # default Redis port
redis_password = 'your_redis_password'

# Establishing a connection
redis_client = redis.StrictRedis(
    host=redis_host,
    port=redis_port,
    password=redis_password,
    decode_responses=True,  # Decode byte responses to strings
)

# Example: Fetching all keys
keys = redis_client.keys('*')

# Printing the keys and their values
for key in keys:
    value = redis_client.get(key)
    print(f"{key}: {value}")

From Files and Formats

    From CSV(Comma-Seperated Values)


import csv

# Example: Reading from a CSV file
csv_file_path = 'your_file.csv'

with open(csv_file_path, 'r') as file:
    csv_reader = csv.reader(file)
    
    # Assuming the first row contains headers
    headers = next(csv_reader)
    
    # Iterate through rows
    for row in csv_reader:
        print(dict(zip(headers, row)))

    From JSON(JavaScript Object Notation)

import json

# Example: Reading from a JSON file
json_file_path = 'your_file.json'

with open(json_file_path, 'r') as file:
    data = json.load(file)
    print(data)

    From Excel File

import pandas as pd

# Example: Reading from an Excel file

excel_file_path = 'your_file.xlsx'

df = pd.read_excel(excel_file_path)

print(df) 

    From Parquet 


pip install pandas pyarrow
import pandas as pd

# Example: Reading from a Parquet file
parquet_file_path = 'your_file.parquet'

df = pd.read_parquet(parquet_file_path)
print(df)

    From Avro

    
pip install fastavro
from fastavro import reader

# Example: Reading from an Avro file
avro_file_path = 'your_file.avro'

with open(avro_file_path, 'rb') as file:
    avro_reader = reader(file)
    
    for record in avro_reader:
        print(record)

    From XML(eXtensible Markup Language)


import xml.etree.ElementTree as ET

# Example: Reading from an XML file
xml_file_path = 'your_file.xml'

tree = ET.parse(xml_file_path)
root = tree.getroot()

for element in root:
    print(element.tag, element.text)

From Web and Apis

    From a REST API


pip install requests
import requests

# API endpoint
api_url = 'https://api.example.com/data'

# Sending a GET request to the API
response = requests.get(api_url)

# Checking if the request was successful (status code 200)
if response.status_code == 200:
    # Parsing the JSON data from the response
    api_data = response.json()

    # Printing the extracted data
    print(api_data)
else:
    print(f"Error: {response.status_code}")


    From a Web Page


pip install beautifulsoup4
import requests
from bs4 import BeautifulSoup

# Web page URL
web_url = 'https://example.com'

# Sending a GET request to the web page
response = requests.get(web_url)

# Checking if the request was successful (status code 200)
if response.status_code == 200:
    # Parsing HTML content with BeautifulSoup
    soup = BeautifulSoup(response.text, 'html.parser')

    # Extracting specific data from the HTML
    # Example: Extracting all the links on the page
    links = soup.find_all('a')

    # Printing the extracted data
    for link in links:
        print(link.get('href'))
else:
    print(f"Error: {response.status_code}")

From Message Queues and Streaming Platforms

    From RabbitMQ


pip install pika
import pika

# Connection parameters
rabbitmq_params = pika.ConnectionParameters(
    host='your_rabbitmq_host',
    port=5672,  # default RabbitMQ port
)

# Establishing a connection
connection = pika.BlockingConnection(rabbitmq_params)
channel = connection.channel()

# Example: Consuming messages from a queue
queue_name = 'your_queue'
channel.queue_declare(queue=queue_name)

def callback(ch, method, properties, body):
    print(f"Received message: {body}")

channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

print('Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

    

    From Apache Kafka


pip install confluent-kafka
from confluent_kafka import Consumer, KafkaError

# Consumer configuration
kafka_params = {
    'bootstrap.servers': 'your_kafka_bootstrap_servers',
    'group.id': 'your_consumer_group',
    'auto.offset.reset': 'earliest',
}

# Create Consumer instance
consumer = Consumer(kafka_params)

# Subscribe to a topic
topic = 'your_kafka_topic'
consumer.subscribe([topic])

# Example: Consuming messages from a Kafka topic
while True:
    msg = consumer.poll(1.0)

    if msg is None:
        continue
    if msg.error():
        if msg.error().code() == KafkaError._PARTITION_EOF:
            continue
        else:
            print(msg.error())
            break

    print('Received message: {}'.format(msg.value().decode('utf-8')))

# Close down consumer to commit final offsets.
consumer.close()


    From Amazon Kinesis


pip install boto3
import boto3
import time

# AWS credentials and Kinesis stream details
aws_access_key = 'your_access_key'
aws_secret_key = 'your_secret_key'
region_name = 'your_region'
stream_name = 'your_kinesis_stream_name'

# Creating Kinesis client
kinesis_client = boto3.client(
    'kinesis',
    aws_access_key_id=aws_access_key,
    aws_secret_access_key=aws_secret_key,
    region_name=region_name
)

# Get the shard iterator for the shard (assuming one shard for simplicity)
shard_iterator_response = kinesis_client.get_shard_iterator(
    StreamName=stream_name,
    ShardId='shardId-000000000000',  # replace with your actual shard ID
    ShardIteratorType='LATEST'
)

shard_iterator = shard_iterator_response['ShardIterator']

# Keep polling for new records
while True:
    # Get records using the shard iterator
    records_response = kinesis_client.get_records(
        ShardIterator=shard_iterator,
        Limit=10  # Adjust the limit as needed
    )

    # Process each record
    for record in records_response['Records']:
        print(f"Received record: {record['Data'].decode('utf-8')}")

    # Update the shard iterator for the next set of records
    shard_iterator = records_response['NextShardIterator']

    # Add some delay to avoid excessive API calls
    time.sleep(1)

From Logs and Events

    From Logs Files


import re
from datetime import datetime

# Example log lines
log_lines = [
    "[2023-01-01 12:00:00] INFO: User 'Alice' logged in.",
    "[2023-01-01 12:05:00] ERROR: Connection error for user 'Bob'.",
]

# Define a regular expression pattern for log parsing
log_pattern = re.compile(r"\[(.*?)\] (\w+): (.*)")

# Extracting data from logs
for log_line in log_lines:
    match = log_pattern.match(log_line)
    if match:
        timestamp_str, log_level, message = match.groups()

        # Convert timestamp string to datetime object
        timestamp = datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M:%S")

        # Process or store the extracted data as needed
        print(f"Timestamp: {timestamp}, Level: {log_level}, Message: {message}")
    else:
        print(f"Failed to parse log line: {log_line}")

    From Event Streams - Apache Kafka

 from confluent_kafka import Consumer, KafkaError


# Consumer configuration

conf = {

    'bootstrap.servers': 'your_kafka_bootstrap_servers',

    'group.id': 'your_consumer_group_id',

    'auto.offset.reset': 'earliest'

}


# Create a Kafka consumer instance

consumer = Consumer(conf)


# Subscribe to topics

topics = ['your_topic']

consumer.subscribe(topics)


try:

    while True:

        # Poll for messages

        msg = consumer.poll(1.0)


        if msg is None:

            continue

        if msg.error():

            if msg.error().code() == KafkaError._PARTITION_EOF:

                # End of partition event

                print("Reached end of partition")

            else:

                print(msg.error())

        else:

            # Process the received message

            print(f"Received message: {msg.value().decode('utf-8')}")


except KeyboardInterrupt:

    pass

finally:

    # Close down consumer to commit final offsets.

    consumer.close()


From Cloud Services

    From Amazon S3


pip install boto3
import boto3

# AWS credentials and S3 bucket details
aws_access_key_id = 'your_access_key_id'
aws_secret_access_key = 'your_secret_access_key'
s3_bucket_name = 'your_s3_bucket'

# Creating an S3 client
s3_client = boto3.client('s3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)

# Example: Listing all objects in a bucket
response = s3_client.list_objects(Bucket=s3_bucket_name)

# Extracting data from objects
for obj in response.get('Contents', []):
    key = obj['Key']
    # Example: Downloading the object content
    obj_content = s3_client.get_object(Bucket=s3_bucket_name, Key=key)['Body'].read().decode('utf-8')
    print(f"Object Key: {key}\nObject Content:\n{obj_content}\n")

    

    From Google Cloud Storage


pip install google-cloud-storage
from google.cloud import storage

# GCP credentials and GCS bucket details
gcp_credentials_path = 'path/to/your/credentials.json'
gcs_bucket_name = 'your_gcs_bucket'

# Creating a GCS client
storage_client = storage.Client.from_service_account_json(gcp_credentials_path)

# Example: Listing all objects in a bucket
bucket = storage_client.get_bucket(gcs_bucket_name)
blobs = bucket.list_blobs()

# Extracting data from objects
for blob in blobs:
    # Example: Downloading the object content
    obj_content = blob.download_as_text()
    print(f"Object Name: {blob.name}\nObject Content:\n{obj_content}\n")

    From Microsft Azure (Blob Storage)


pip install azure-storage-blob
from azure.storage.blob import BlobServiceClient

# Connection parameters
account_name = 'your_storage_account_name'
account_key = 'your_storage_account_key'
container_name = 'your_container_name'

# Creating a BlobServiceClient
blob_service_client = BlobServiceClient(account_url=f"https://{account_name}.blob.core.windows.net", credential=account_key)

# Example: List all blobs in a container
container_client = blob_service_client.get_container_client(container_name)
blobs = container_client.list_blobs()

# Printing blob names
for blob in blobs:
    print(blob.name)

From Sensors and IoT Devices

    This example assumes the use of Kafka for data streaming.

    
pip install kafka-python
import time
import random
from datetime import datetime
from kafka import KafkaProducer

# Simulated sensor data generation
def generate_sensor_data():
    sensor_id = 1
    temperature = round(random.uniform(20.0, 30.0), 2)
    humidity = round(random.uniform(40.0, 60.0), 2)
    timestamp = datetime.utcnow().isoformat()

    return {"sensor_id": sensor_id, "temperature": temperature, "humidity": humidity, "timestamp": timestamp}


# Kafka producer configuration
kafka_bootstrap_servers = 'your_kafka_bootstrap_servers'
kafka_topic = 'sensor_data'

producer = KafkaProducer(bootstrap_servers=kafka_bootstrap_servers, value_serializer=lambda v: json.dumps(v).encode('utf-8'))

# Simulate data streaming from sensors
while True:
    sensor_data = generate_sensor_data()
    producer.send(kafka_topic, value=sensor_data)
    print(f"Sent data: {sensor_data}")
    time.sleep(5)  # Simulate a delay between sensor readings

From External Data Providers

    From Public APIs


pip install requests
import requests

# API endpoint and parameters
api_url = 'https://api.example.com/data'
api_params = {
    'param1': 'value1',
    'param2': 'value2',
}

# Optional: API key or authentication token
api_key = 'your_api_key'

# Adding API key to the headers if required
headers = {'Authorization': f'Bearer {api_key}'} if api_key else {}

# Making the API request
response = requests.get(api_url, params=api_params, headers=headers)

# Checking if the request was successful (status code 200)
if response.status_code == 200:
    # Parsing the response data (assuming it's JSON)
    data = response.json()

    # Processing the data (example: printing the results)
    for item in data['items']:
        print(item)
else:
    # Handling errors
    print(f"Error: {response.status_code}, {response.text}")

From Social Media and Streaming Services

    From Twitter


pip install tweepy
import tweepy

# Twitter API credentials
consumer_key = 'your_consumer_key'
consumer_secret = 'your_consumer_secret'
access_token = 'your_access_token'
access_token_secret = 'your_access_token_secret'

# Authenticate with the Twitter API
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth)

# Example: Extracting tweets from a user's timeline
user_timeline = api.user_timeline(screen_name='username', count=10)

# Printing tweet details
for tweet in user_timeline:
    print(f"{tweet.user.screen_name}: {tweet.text}")

    From Spotify

pip install spotipy

import spotipy

from spotipy.oauth2 import SpotifyOAuth


# Spotify API credentials

client_id = 'your_client_id'

client_secret = 'your_client_secret'

redirect_uri = 'your_redirect_uri'


# Authenticate with the Spotify API

sp = spotipy.Spotify(auth_manager=SpotifyOAuth(client_id=client_id, client_secret=client_secret, redirect_uri=redirect_uri, scope="playlist-read-private"))


# Example: Extracting user's playlists

user_playlists = sp.current_user_playlists()


# Printing playlist details

for playlist in user_playlists['items']:

    print(f"Playlist: {playlist['name']}, Owner: {playlist['owner']['display_name']}")