What is CSV and Why It Dominates Data Exchange
CSV (Comma-Separated Values) is the world's most widely used tabular data exchange format. Despite its simplicity — or precisely because of it — it remains the de facto standard for:
- Databases: export and import tables in MySQL, PostgreSQL, SQLite
- Spreadsheets: Excel and Google Sheets read and write CSV natively
- APIs and services: many APIs offer CSV results alongside JSON
- Machine Learning: scikit-learn, XGBoost and PyTorch accept CSV as input
- ETL pipelines: almost every data pipeline includes CSV steps
A CSV file is plain text: one row per record, fields separated by commas (or semicolons, tabs, etc.), with an optional header line.
Common Variants
| Dialect | Separator | Typical Use |
|---|---|---|
| Standard CSV (RFC 4180) | , |
International |
| European CSV | ; |
Countries using , as decimal |
| TSV | \t (tab) |
Bioinformatics, logs |
| PSV | | |
Data containing commas in values |
Python's Built-in csv Module
For medium-sized files without external dependencies:
import csv
from pathlib import Path
# ── Basic reading ──────────────────────────────────────────────────────────
def read_csv(path, delimiter=',', encoding='utf-8'):
"""Read a CSV and return a list of dictionaries."""
with open(path, newline='', encoding=encoding) as f:
reader = csv.DictReader(f, delimiter=delimiter)
rows = list(reader)
print(f"Read {len(rows)} rows from {path}")
return rows
# ── Basic writing ──────────────────────────────────────────────────────────
def write_csv(data, path, fields=None, delimiter=',', encoding='utf-8'):
"""
Write a list of dictionaries to CSV.
If fields=None, uses the keys from the first element.
"""
if not data:
print("Empty list — nothing to write")
return
fields = fields or list(data[0].keys())
with open(path, 'w', newline='', encoding=encoding) as f:
writer = csv.DictWriter(f, fieldnames=fields,
delimiter=delimiter,
extrasaction='ignore')
writer.writeheader()
writer.writerows(data)
size_kb = Path(path).stat().st_size / 1024
print(f"CSV written: {path} ({len(data)} rows, {size_kb:.1f} KB)")
# ── Auto-detect dialect ────────────────────────────────────────────────────
def detect_dialect(path, sample_bytes=4096):
"""Detect the separator and dialect of an unknown CSV."""
with open(path, newline='', encoding='utf-8-sig') as f:
sample = f.read(sample_bytes)
dialect = csv.Sniffer().sniff(sample, delimiters=',;\t|')
has_header = csv.Sniffer().has_header(sample)
print(f"Detected separator: '{dialect.delimiter}'")
print(f"Has header: {has_header}")
print(f"Quote char: '{dialect.quotechar}'")
return dialect
# Example
employees = [
{'name': 'Alice Johnson', 'dept': 'Engineering', 'salary': 95000},
{'name': 'Bob Smith', 'dept': 'Marketing', 'salary': 78000},
{'name': 'Carol Lee', 'dept': 'Design', 'salary': 85000},
]
write_csv(employees, 'employees.csv')
data = read_csv('employees.csv')
CSV with pandas (Analysis and Transformations)
import pandas as pd
import numpy as np
def read_csv_pandas(path, **kwargs):
"""
Read CSV with automatic type inference.
Uses utf-8-sig to handle Excel BOM.
"""
defaults = {
'encoding': 'utf-8-sig',
'on_bad_lines': 'warn',
}
defaults.update(kwargs)
df = pd.read_csv(path, **defaults)
print(f"DataFrame: {df.shape[0]:,} rows × {df.shape[1]} columns")
print(f"Types:\n{df.dtypes.to_string()}")
print(f"Nulls:\n{df.isnull().sum().to_string()}")
return df
def clean_csv(input_path, output_path):
"""CSV data cleaning pipeline."""
df = pd.read_csv(input_path, encoding='utf-8-sig')
print(f"Before: {df.shape}")
# 1. Drop completely empty rows
df = df.dropna(how='all')
# 2. Normalize column names (snake_case)
import re
df.columns = [
re.sub(r'\s+', '_', col.lower().strip().replace('-', '_'))
for col in df.columns
]
# 3. Remove duplicates
n_before = len(df)
df = df.drop_duplicates()
n_dup = n_before - len(df)
if n_dup:
print(f" Duplicates removed: {n_dup}")
# 4. Strip whitespace from string columns
str_cols = df.select_dtypes(include='object').columns
for col in str_cols:
df[col] = df[col].str.strip()
# 5. Infer numeric types where possible
df = df.infer_objects()
print(f"After: {df.shape}")
df.to_csv(output_path, index=False, encoding='utf-8')
print(f"Clean CSV saved: {output_path}")
return df
Converting CSV to Other Formats
def csv_to_json(csv_path, json_path, orient='records', indent=2):
"""
Convert CSV to JSON.
orient: 'records' (list), 'split', 'index', 'columns', 'values'
"""
df = pd.read_csv(csv_path, encoding='utf-8-sig')
df.to_json(json_path, orient=orient, indent=indent, force_ascii=False)
size_kb = Path(json_path).stat().st_size / 1024
print(f"CSV→JSON: {json_path} ({size_kb:.1f} KB)")
def csv_to_excel(csv_path, excel_path, sheet_name='Data'):
"""Convert CSV to XLSX with basic formatting."""
df = pd.read_csv(csv_path, encoding='utf-8-sig')
with pd.ExcelWriter(excel_path, engine='openpyxl') as writer:
df.to_excel(writer, sheet_name=sheet_name, index=False)
# Auto-fit column widths
sheet = writer.sheets[sheet_name]
for col_idx, col in enumerate(df.columns, 1):
max_len = max(
df[col].astype(str).map(len).max(),
len(col)
) + 2
from openpyxl.utils import get_column_letter
sheet.column_dimensions[get_column_letter(col_idx)].width = min(max_len, 40)
print(f"CSV→Excel: {excel_path}")
def csv_to_parquet(csv_path, parquet_path, compression='snappy'):
"""
Convert CSV to Parquet for big data analytics.
Parquet is columnar, compressed and much faster than CSV for queries.
"""
df = pd.read_csv(csv_path, encoding='utf-8-sig')
df.to_parquet(parquet_path, compression=compression, index=False)
size_csv = Path(csv_path).stat().st_size / 1024
size_par = Path(parquet_path).stat().st_size / 1024
reduction = (1 - size_par / size_csv) * 100
print(f"CSV→Parquet: {parquet_path} "
f"({size_csv:.0f} KB → {size_par:.0f} KB, -{reduction:.0f}%)")
def csv_to_sqlite(csv_path, db_path, table_name=None):
"""Import a CSV directly into a SQLite table."""
import sqlite3
df = pd.read_csv(csv_path, encoding='utf-8-sig')
if table_name is None:
table_name = Path(csv_path).stem.lower().replace(' ', '_')
with sqlite3.connect(db_path) as conn:
df.to_sql(table_name, conn, if_exists='replace', index=False)
print(f"CSV→SQLite: table '{table_name}' ({len(df):,} rows) in {db_path}")
# Conversions
csv_to_json('data.csv', 'data.json')
csv_to_excel('data.csv', 'data.xlsx')
csv_to_parquet('data.csv', 'data.parquet')
csv_to_sqlite('data.csv', 'database.db')
Processing Large CSV Files Without Loading into Memory
def process_large_csv(path, filter_func, output_path, chunk_size=50_000):
"""
Process large CSVs in chunks without loading everything into memory.
Ideal for multi-GB files.
"""
total_processed = 0
total_filtered = 0
first_write = True
for chunk in pd.read_csv(path, chunksize=chunk_size, encoding='utf-8-sig'):
total_processed += len(chunk)
filtered = filter_func(chunk)
total_filtered += len(filtered)
if not filtered.empty:
filtered.to_csv(
output_path,
mode='w' if first_write else 'a',
index=False,
header=first_write
)
first_write = False
print(f"\rProcessed: {total_processed:,} rows...", end='')
print(f"\nTotal: {total_processed:,} processed → {total_filtered:,} filtered")
print(f"Result: {output_path}")
def merge_csvs(pattern_or_list, output, deduplicate=True):
"""Merge multiple CSV files into one."""
if isinstance(pattern_or_list, str):
files = sorted(Path('.').glob(pattern_or_list))
else:
files = [Path(f) for f in pattern_or_list]
dfs = []
for file in files:
df = pd.read_csv(file, encoding='utf-8-sig')
df['_source'] = file.name
dfs.append(df)
print(f" Loaded: {file.name} ({len(df):,} rows)")
combined = pd.concat(dfs, ignore_index=True)
if deduplicate:
n_before = len(combined)
combined = combined.drop_duplicates(
subset=[c for c in combined.columns if c != '_source']
)
print(f"Duplicates removed: {n_before - len(combined):,}")
combined.to_csv(output, index=False, encoding='utf-8')
print(f"Merged: {len(files)} files → {output} ({len(combined):,} rows)")
return combined
CSV Data Validation
def validate_csv(path, schema):
"""
Validate a CSV against a schema of rules.
schema = {
'column': {'type': 'int', 'min': 0, 'max': 100, 'required': True}
}
"""
df = pd.read_csv(path, encoding='utf-8-sig')
errors = []
for column, rules in schema.items():
if rules.get('required') and column not in df.columns:
errors.append(f"Required column missing: '{column}'")
continue
if column not in df.columns:
continue
col = df[column]
# Check nulls
n_nulls = col.isnull().sum()
if rules.get('required') and n_nulls > 0:
errors.append(f"'{column}': {n_nulls} null values not allowed")
# Check range
if rules.get('min') is not None:
low = (pd.to_numeric(col, errors='coerce') < rules['min']).sum()
if low:
errors.append(f"'{column}': {low} values below minimum {rules['min']}")
if rules.get('max') is not None:
high = (pd.to_numeric(col, errors='coerce') > rules['max']).sum()
if high:
errors.append(f"'{column}': {high} values above maximum {rules['max']}")
if errors:
print(f"❌ Validation failed ({len(errors)} errors):")
for e in errors:
print(f" - {e}")
else:
print(f"✅ CSV valid: {path}")
return errors
# Example schema
employee_schema = {
'name': {'required': True},
'age': {'required': True, 'type': 'int', 'min': 18, 'max': 99},
'salary': {'required': True, 'min': 0, 'max': 500_000},
}
validate_csv('employees.csv', employee_schema)
Conclusion
CSV is the lowest common denominator of data exchange: human-readable, universally supported, and generated by any tool. Python offers three tiers for working with it:
csvmodule — for small files with no dependencies- pandas — for analysis, cleaning and transformations (up to ~2 GB in RAM)
- Polars or Dask — for tens of GB without loading into memory
The CSV → Parquet conversion is one of the most valuable transformations in data engineering: same data but 5-20× faster for queries and 60-80% less disk space.
Related conversions
Document conversions that follow this topic naturally: