Data Extraction
Data engineering often involves extracting data from various sources, which can include databases, APIs, web scraping, log files, and more. Below are examples of data extraction from different sources using Python. Keep in mind that for each source, you might need to install specific libraries and handle authentication and permissions accordingly.
From Relational Databases
From MySql
pip install mysql-connector-python
import mysql.connector
# Connection parametersdb_params = { 'host': 'your_host', 'database': 'your_database', 'user': 'your_user', 'password': 'your_password',}
# Establishing a connectionconn = mysql.connector.connect(**db_params)cursor = conn.cursor()
# Example queryquery = "SELECT * FROM your_table;"
# Executing the querycursor.execute(query)
# Fetching all rowsrows = cursor.fetchall()
# Closing the cursor and connectioncursor.close()conn.close()
# Printing the resultsfor row in rows: print(row)
From PostgreSQL
pip install psycopg2-binaryimport psycopg2# Connection parametersdb_params = {'host': 'your_host','database': 'your_database','user': 'your_user','password': 'your_password',}# Establishing a connectionconn = psycopg2.connect(**db_params)cursor = conn.cursor()# Example queryquery = "SELECT * FROM your_table;"# Executing the querycursor.execute(query)# Fetching all rowsrows = cursor.fetchall()# Closing the cursor and connectioncursor.close()conn.close()# Printing the resultsfor row in rows:print(row)
From Microsoft SQL Server
pip install pyodbc
import pyodbc# Connection parametersserver = 'your_server'database = 'your_database'username = 'your_username'password = 'your_password'driver = '{SQL Server}'# Establishing a connectionconnection_string = f"DRIVER={driver};SERVER={server};DATABASE={database};UID={username};PWD={password}"conn = pyodbc.connect(connection_string)cursor = conn.cursor()# Example queryquery = "SELECT * FROM your_table;"# Executing the querycursor.execute(query)# Fetching all rowsrows = cursor.fetchall()# Closing the cursor and connectioncursor.close()conn.close()# Printing the resultsfor row in rows:print(row)
From SQLite
# No need to install sqlite3 separately
import sqlite3# Connection parametersdb_path = 'your_database.db'# Establishing a connectionconn = sqlite3.connect(db_path)cursor = conn.cursor()# Example queryquery = "SELECT * FROM your_table;"# Executing the querycursor.execute(query)# Fetching all rowsrows = cursor.fetchall()# Closing the cursor and connectioncursor.close()conn.close()# Printing the resultsfor row in rows:print(row)
From Oracle Database
pip install cx_Oracle
import cx_Oracle# Connection parametersdb_params = {'user': 'your_username','password': 'your_password','dsn': 'your_dsn', # Oracle Database connection string}# Establishing a connectionconn = cx_Oracle.connect(**db_params)cursor = conn.cursor()# Example queryquery = "SELECT * FROM your_table"# Executing the querycursor.execute(query)# Fetching all rowsrows = cursor.fetchall()# Closing the cursor and connectioncursor.close()conn.close()# Printing the resultsfor row in rows:print(row)
From NoSQL Databases
From MongoDB
pip install pymongofrom pymongo import MongoClient# Connection parametersmongo_params = {'host': 'your_mongo_host','port': 27017, # default MongoDB port'username': 'your_username','password': 'your_password','authSource': 'your_authentication_database','authMechanism': 'SCRAM-SHA-256' # or 'MONGODB-CR' for older versions}# Establishing a connectionclient = MongoClient(**mongo_params)# Selecting the database and collectiondb = client['your_database']collection = db['your_collection']# Example query (find all documents)query = {}# Executing the querycursor = collection.find(query)# Printing the resultsfor document in cursor:print(document)# Closing the MongoDB connectionclient.close()
From Cassandra
pip install cassandra-driverfrom cassandra.cluster import Cluster# Connection parameterscluster = Cluster(['your_cassandra_host'])session = cluster.connect('your_keyspace')# Example queryquery = "SELECT * FROM your_table;"# Executing the queryrows = session.execute(query)# Printing the resultsfor row in rows:print(row)# Closing the connectioncluster.shutdown()
From CouchDB
pip install requestsimport requests# CouchDB server and database informationcouchdb_url = 'http://your_couchdb_server:5984'database_name = 'your_database'# Example HTTP request to get all documents from the databaseurl = f'{couchdb_url}/{database_name}/_all_docs'response = requests.get(url)# Check if the request was successful (status code 200)if response.status_code == 200:data = response.json()rows = data.get('rows', [])# Extracting and printing document IDsfor row in rows:doc_id = row.get('id')print(f"Document ID: {doc_id}")else:print(f"Error: {response.status_code} - {response.text}")
From Redis
pip install redisimport redis# Connection parametersredis_host = 'your_redis_host'redis_port = 6379 # default Redis portredis_password = 'your_redis_password'# Establishing a connectionredis_client = redis.StrictRedis(host=redis_host,port=redis_port,password=redis_password,decode_responses=True, # Decode byte responses to strings)# Example: Fetching all keyskeys = redis_client.keys('*')# Printing the keys and their valuesfor key in keys:value = redis_client.get(key)print(f"{key}: {value}")
From Files and Formats
From CSV(Comma-Seperated Values)
import csv# Example: Reading from a CSV filecsv_file_path = 'your_file.csv'with open(csv_file_path, 'r') as file:csv_reader = csv.reader(file)# Assuming the first row contains headersheaders = next(csv_reader)# Iterate through rowsfor row in csv_reader:print(dict(zip(headers, row)))
import json# Example: Reading from a JSON filejson_file_path = 'your_file.json'with open(json_file_path, 'r') as file:data = json.load(file)print(data)
From Excel File
import pandas as pd
# Example: Reading from an Excel file
excel_file_path = 'your_file.xlsx'
df = pd.read_excel(excel_file_path)
print(df)
From Parquet
pip install pandas pyarrowimport pandas as pd# Example: Reading from a Parquet fileparquet_file_path = 'your_file.parquet'df = pd.read_parquet(parquet_file_path)print(df)
From Avro
pip install fastavrofrom fastavro import reader# Example: Reading from an Avro fileavro_file_path = 'your_file.avro'with open(avro_file_path, 'rb') as file:avro_reader = reader(file)for record in avro_reader:print(record)
From XML(eXtensible Markup Language)
import xml.etree.ElementTree as ET# Example: Reading from an XML filexml_file_path = 'your_file.xml'tree = ET.parse(xml_file_path)root = tree.getroot()for element in root:print(element.tag, element.text)
From Web and Apis
From a REST API
pip install requestsimport requests# API endpointapi_url = 'https://api.example.com/data'# Sending a GET request to the APIresponse = requests.get(api_url)# Checking if the request was successful (status code 200)if response.status_code == 200:# Parsing the JSON data from the responseapi_data = response.json()# Printing the extracted dataprint(api_data)else:print(f"Error: {response.status_code}")
From a Web Page
pip install beautifulsoup4import requestsfrom bs4 import BeautifulSoup# Web page URLweb_url = 'https://example.com'# Sending a GET request to the web pageresponse = requests.get(web_url)# Checking if the request was successful (status code 200)if response.status_code == 200:# Parsing HTML content with BeautifulSoupsoup = BeautifulSoup(response.text, 'html.parser')# Extracting specific data from the HTML# Example: Extracting all the links on the pagelinks = soup.find_all('a')# Printing the extracted datafor link in links:print(link.get('href'))else:print(f"Error: {response.status_code}")
From Message Queues and Streaming Platforms
From RabbitMQ
pip install pikaimport pika# Connection parametersrabbitmq_params = pika.ConnectionParameters(host='your_rabbitmq_host',port=5672, # default RabbitMQ port)# Establishing a connectionconnection = pika.BlockingConnection(rabbitmq_params)channel = connection.channel()# Example: Consuming messages from a queuequeue_name = 'your_queue'channel.queue_declare(queue=queue_name)def callback(ch, method, properties, body):print(f"Received message: {body}")channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)print('Waiting for messages. To exit press CTRL+C')channel.start_consuming()
From Apache Kafka
pip install confluent-kafkafrom confluent_kafka import Consumer, KafkaError# Consumer configurationkafka_params = {'bootstrap.servers': 'your_kafka_bootstrap_servers','group.id': 'your_consumer_group','auto.offset.reset': 'earliest',}# Create Consumer instanceconsumer = Consumer(kafka_params)# Subscribe to a topictopic = 'your_kafka_topic'consumer.subscribe([topic])# Example: Consuming messages from a Kafka topicwhile True:msg = consumer.poll(1.0)if msg is None:continueif msg.error():if msg.error().code() == KafkaError._PARTITION_EOF:continueelse:print(msg.error())breakprint('Received message: {}'.format(msg.value().decode('utf-8')))# Close down consumer to commit final offsets.consumer.close()
From Amazon Kinesis
pip install boto3import boto3import time# AWS credentials and Kinesis stream detailsaws_access_key = 'your_access_key'aws_secret_key = 'your_secret_key'region_name = 'your_region'stream_name = 'your_kinesis_stream_name'# Creating Kinesis clientkinesis_client = boto3.client('kinesis',aws_access_key_id=aws_access_key,aws_secret_access_key=aws_secret_key,region_name=region_name)# Get the shard iterator for the shard (assuming one shard for simplicity)shard_iterator_response = kinesis_client.get_shard_iterator(StreamName=stream_name,ShardId='shardId-000000000000', # replace with your actual shard IDShardIteratorType='LATEST')shard_iterator = shard_iterator_response['ShardIterator']# Keep polling for new recordswhile True:# Get records using the shard iteratorrecords_response = kinesis_client.get_records(ShardIterator=shard_iterator,Limit=10 # Adjust the limit as needed)# Process each recordfor record in records_response['Records']:print(f"Received record: {record['Data'].decode('utf-8')}")# Update the shard iterator for the next set of recordsshard_iterator = records_response['NextShardIterator']# Add some delay to avoid excessive API callstime.sleep(1)
From Logs and Events
From Logs Files
import refrom datetime import datetime# Example log lineslog_lines = ["[2023-01-01 12:00:00] INFO: User 'Alice' logged in.","[2023-01-01 12:05:00] ERROR: Connection error for user 'Bob'.",]# Define a regular expression pattern for log parsinglog_pattern = re.compile(r"\[(.*?)\] (\w+): (.*)")# Extracting data from logsfor log_line in log_lines:match = log_pattern.match(log_line)if match:timestamp_str, log_level, message = match.groups()# Convert timestamp string to datetime objecttimestamp = datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M:%S")# Process or store the extracted data as neededprint(f"Timestamp: {timestamp}, Level: {log_level}, Message: {message}")else:print(f"Failed to parse log line: {log_line}")
From Event Streams - Apache Kafka
from confluent_kafka import Consumer, KafkaError
# Consumer configuration
conf = {
'bootstrap.servers': 'your_kafka_bootstrap_servers',
'group.id': 'your_consumer_group_id',
'auto.offset.reset': 'earliest'
}
# Create a Kafka consumer instance
consumer = Consumer(conf)
# Subscribe to topics
topics = ['your_topic']
consumer.subscribe(topics)
try:
while True:
# Poll for messages
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
print("Reached end of partition")
else:
print(msg.error())
else:
# Process the received message
print(f"Received message: {msg.value().decode('utf-8')}")
except KeyboardInterrupt:
pass
finally:
# Close down consumer to commit final offsets.
consumer.close()
From Cloud Services
From Amazon S3
pip install boto3import boto3# AWS credentials and S3 bucket detailsaws_access_key_id = 'your_access_key_id'aws_secret_access_key = 'your_secret_access_key's3_bucket_name = 'your_s3_bucket'# Creating an S3 clients3_client = boto3.client('s3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)# Example: Listing all objects in a bucketresponse = s3_client.list_objects(Bucket=s3_bucket_name)# Extracting data from objectsfor obj in response.get('Contents', []):key = obj['Key']# Example: Downloading the object contentobj_content = s3_client.get_object(Bucket=s3_bucket_name, Key=key)['Body'].read().decode('utf-8')print(f"Object Key: {key}\nObject Content:\n{obj_content}\n")
From Google Cloud Storage
pip install google-cloud-storagefrom google.cloud import storage# GCP credentials and GCS bucket detailsgcp_credentials_path = 'path/to/your/credentials.json'gcs_bucket_name = 'your_gcs_bucket'# Creating a GCS clientstorage_client = storage.Client.from_service_account_json(gcp_credentials_path)# Example: Listing all objects in a bucketbucket = storage_client.get_bucket(gcs_bucket_name)blobs = bucket.list_blobs()# Extracting data from objectsfor blob in blobs:# Example: Downloading the object contentobj_content = blob.download_as_text()print(f"Object Name: {blob.name}\nObject Content:\n{obj_content}\n")
From Microsft Azure (Blob Storage)
pip install azure-storage-blobfrom azure.storage.blob import BlobServiceClient# Connection parametersaccount_name = 'your_storage_account_name'account_key = 'your_storage_account_key'container_name = 'your_container_name'# Creating a BlobServiceClientblob_service_client = BlobServiceClient(account_url=f"https://{account_name}.blob.core.windows.net", credential=account_key)# Example: List all blobs in a containercontainer_client = blob_service_client.get_container_client(container_name)blobs = container_client.list_blobs()# Printing blob namesfor blob in blobs:print(blob.name)
From Sensors and IoT Devices
This example assumes the use of Kafka for data streaming.
pip install kafka-pythonimport timeimport randomfrom datetime import datetimefrom kafka import KafkaProducer# Simulated sensor data generationdef generate_sensor_data():sensor_id = 1temperature = round(random.uniform(20.0, 30.0), 2)humidity = round(random.uniform(40.0, 60.0), 2)timestamp = datetime.utcnow().isoformat()return {"sensor_id": sensor_id, "temperature": temperature, "humidity": humidity, "timestamp": timestamp}# Kafka producer configurationkafka_bootstrap_servers = 'your_kafka_bootstrap_servers'kafka_topic = 'sensor_data'producer = KafkaProducer(bootstrap_servers=kafka_bootstrap_servers, value_serializer=lambda v: json.dumps(v).encode('utf-8'))# Simulate data streaming from sensorswhile True:sensor_data = generate_sensor_data()producer.send(kafka_topic, value=sensor_data)print(f"Sent data: {sensor_data}")time.sleep(5) # Simulate a delay between sensor readings
From External Data Providers
From Public APIs
pip install requestsimport requests# API endpoint and parametersapi_url = 'https://api.example.com/data'api_params = {'param1': 'value1','param2': 'value2',}# Optional: API key or authentication tokenapi_key = 'your_api_key'# Adding API key to the headers if requiredheaders = {'Authorization': f'Bearer {api_key}'} if api_key else {}# Making the API requestresponse = requests.get(api_url, params=api_params, headers=headers)# Checking if the request was successful (status code 200)if response.status_code == 200:# Parsing the response data (assuming it's JSON)data = response.json()# Processing the data (example: printing the results)for item in data['items']:print(item)else:# Handling errorsprint(f"Error: {response.status_code}, {response.text}")
From Social Media and Streaming Services
From Twitter
pip install tweepyimport tweepy# Twitter API credentialsconsumer_key = 'your_consumer_key'consumer_secret = 'your_consumer_secret'access_token = 'your_access_token'access_token_secret = 'your_access_token_secret'# Authenticate with the Twitter APIauth = tweepy.OAuthHandler(consumer_key, consumer_secret)auth.set_access_token(access_token, access_token_secret)api = tweepy.API(auth)# Example: Extracting tweets from a user's timelineuser_timeline = api.user_timeline(screen_name='username', count=10)# Printing tweet detailsfor tweet in user_timeline:print(f"{tweet.user.screen_name}: {tweet.text}")
From Spotify
pip install spotipy
import spotipy
from spotipy.oauth2 import SpotifyOAuth
# Spotify API credentials
client_id = 'your_client_id'
client_secret = 'your_client_secret'
redirect_uri = 'your_redirect_uri'
# Authenticate with the Spotify API
sp = spotipy.Spotify(auth_manager=SpotifyOAuth(client_id=client_id, client_secret=client_secret, redirect_uri=redirect_uri, scope="playlist-read-private"))
# Example: Extracting user's playlists
user_playlists = sp.current_user_playlists()
# Printing playlist details
for playlist in user_playlists['items']:
print(f"Playlist: {playlist['name']}, Owner: {playlist['owner']['display_name']}")