about inspecting crime traits in your native space. You realize that relevant data exists, and you’ve got some primary analytical abilities that you should use to research this information. Nonetheless, this information is altering regularly, and also you wish to hold your evaluation up to date with the latest crime incidents with out repeating your evaluation. How can we automate this course of?
Nicely, in case you’ve stumbled upon this text, you’re in luck! Collectively, we’ll stroll by way of tips on how to create a knowledge pipeline to extract native police log information, and join this to a visualization platform to look at native crime traits over time. For this text, we’ll extract information on incidents reported to the Cambridge (MA) Police Division (CPD), after which visualize this information as a dashboard in Metabase.
Moreover, this text can function a common template for anyone seeking to write ETL pipelines orchestrated in Prefect, and/or anyone who desires to attach Metabase to their information shops to create insightful analyses/studies.
Word: I’ve no affiliation with Metabase – we’ll merely use Metabase for example platform to create our ultimate dashboard. There are numerous different viable options, that are described on this section.
Contents:
Background Information
Earlier than we dive into the pipeline, it’ll be useful to assessment the next ideas, or hold these hyperlinks as reference as you learn.
Knowledge of Curiosity
The info we’ll be working with comprises a group of police log entries, the place every entry is a single incident reported to/by the CPD. Every entry comprises complete data describing the incident, together with however not restricted to:
- Date & time of the incident
- Kind of incident that occurred
- The road the place the incident occurred
- A plaintext description of what occurred

Take a look at the portal for extra details about the info.
For monitoring crime traits in Cambridge, MA, creating a knowledge pipeline to extract this information is suitable, as the info is up to date each day (based on their web site). If the info was up to date much less regularly (e.g. yearly), then creating a knowledge pipeline to automate this course of wouldn’t save us a lot effort. We might merely revisit the info portal on the finish of every 12 months, obtain the .csv, and full our evaluation.
Now that we’ve discovered the suitable dataset, let’s stroll by way of the implementation.
ETL Pipeline
To go from uncooked CPD log information to a Metabase dashboard, our venture will encompass the next main steps:
- Extract the info through the use of its corresponding API.
- Reworking it to organize it for storage.
- Loading it right into a PostgreSQL database.
- Visualizing the info in Metabase.
The info circulate of our system will appear like the next:

Our pipeline follows an ETL workflow, which implies that we’ll rework the info earlier than importing it into PostgreSQL. This requires loading information into reminiscence whereas executing information transformations, which can be problematic for big datasets which might be too large to slot in reminiscence. On this case, we could contemplate an ELT workflow, the place we rework the info in the identical infrastructure the place it’s saved. Since our dataset is small (<10k rows), this shouldn’t be an issue, and we’ll benefit from the truth that pandas makes information transformation simple.
We’ll extract the CPD log information by making a request for the dataset to the Socrata Open Data API. We’ll use sodapy — a python shopper for the API — to make the request.
We’ll encapsulate this extraction code in its personal file — extract.py.
import pandas as pd
from sodapy import Socrata
from dotenv import load_dotenv
import os
from prefect import job
@job(retries=3, retry_delay_seconds=[10, 10, 10]) # retry API request in case of failure
def extract_data():
'''
Extract incident information reported to the Cambridge Police Division utilizing the Socrata Open Knowledge API.
Return the incident information as a Pandas DataFrame.
'''
# fetch Socrata app token from .env
# embrace this app token when interacting with the Socrata API to keep away from request throttling, so we are able to fetch all of the incidents
load_dotenv()
APP_TOKEN = os.getenv("SOCRATA_APP_TOKEN")
# create Socrata shopper to work together with the Socrata API (https://github.com/afeld/sodapy)
shopper = Socrata(
"information.cambridgema.gov",
APP_TOKEN,
timeout=30 # enhance timeout from 10s default - generally, it takes longer to fetch all the outcomes
)
# fetch all information, paginating over outcomes
DATASET_ID = "3gki-wyrb" # distinctive identifier for Cambridge Police Log information (https://information.cambridgema.gov/Public-Security/Day by day-Police-Log/3gki-wyrb/about_data)
outcomes = shopper.get_all(DATASET_ID)
# Convert to pandas DataFrame
results_df = pd.DataFrame.from_records(outcomes)
return results_df
Notes concerning the code:
- Socrata throttles requests in case you don’t embrace an app token that uniquely identifies your utility. To fetch all the outcomes, we’ll embrace this token in our request and put this in a .env file to maintain this out of our supply code.
- We’ll specify a 30 second timeout (as a substitute of the ten second default timeout) when making our request to the Socrata API. From expertise utilizing the API, fetching all the outcomes might generally take longer than 10 seconds, and 30 seconds was sometimes sufficient to keep away from timeout errors.
- We’ll load the fetched outcomes right into a pandas DataFrame, since we’ll validate and rework this information utilizing pandas.
ETL: Validate
Now, we’ll do some primary information high quality checks on the info.
The info is already pretty clear (which is sensible because it’s supplied by the Cambridge Police Division). So, our information high quality checks will act extra as a “sanity test” that we didn’t ingest something surprising.
We’ll validate the next:
- All of the anticipated columns (as specified here) are current.
- All IDs are numeric.
- Datetimes observe ISO 8601 format.
- There are not any lacking values in columns that ought to comprise information. Particularly, every incident ought to have a Datetime, ID, Kind, and Location.
We’ll put this validation code in its personal file — validate.py.
from datetime import datetime
from collections import Counter
import pandas as pd
from prefect import job
### UTILITIES
def check_valid_schema(df):
'''
Test whether or not the DataFrame content material comprises the anticipated columns for the Cambridge Police dataset.
In any other case, increase an error.
'''
SCHEMA_COLS = ['date_time', 'id', 'type', 'subtype', 'location', 'last_updated', 'description']
if Counter(df.columns) != Counter(SCHEMA_COLS):
increase ValueError("Schema doesn't match with the anticipated schema.")
def check_numeric_id(df):
'''
Convert 'id' values to numeric.
If any 'id' values are non-numeric, exchange them with NaN, to allow them to be eliminated downstream within the information transformations.
'''
df['id'] = pd.to_numeric(df['id'], errors='coerce')
return df
def verify_datetime(df):
'''
Confirm 'date_time' values observe ISO 8601 format (https://www.iso.org/iso-8601-date-and-time-format.html).
Increase a ValueError if any of the 'date_time' values are invalid.
'''
df.apply(lambda row: datetime.fromisoformat(row['date_time']), axis=1)
def check_missing_values(df):
'''
Test whether or not there are any lacking values in columns that require information.
For police logs, every incident ought to have a datetime, ID, incident sort, and site.
'''
REQUIRED_COLS = ['date_time', 'id', 'type', 'location']
for col in REQUIRED_COLS:
if df[col].isnull().sum() > 0:
increase ValueError(f"Lacking values are current within the '{col}' attribute.")
### VALIDATION LOGIC
@job
def validate_data(df):
'''
Test the info satisfies the next information high quality checks:
- schema is legitimate
- IDs are numeric
- datetime follows ISO 8601 format
- no lacking values in columns that require information
'''
check_valid_schema(df)
df = check_numeric_id(df)
verify_datetime(df)
check_missing_values(df)
return df
When implementing these information high quality checks, it’s vital to consider tips on how to deal with information high quality checks that fail.
- Do we would like our pipeline to fail loudly (e.g. increase an error/crash)?
- Ought to our pipeline deal with failures silently? As an illustration, mark information recognized to be invalid in order that it may be eliminated downstream?
We’ll increase an error if:
- The ingested information doesn’t observe the anticipated schema. It doesn’t make sense to course of the info if it doesn’t comprise what we anticipate.
- Datetime doesn’t observe ISO 8601 format. There’s no commonplace option to convert incorrect datetime values to its corresponding appropriate datetime format.
- The incident comprises lacking values for any considered one of datetime, ID, sort, and site. With out these values, the incident can’t be described comprehensively.
For data which have non-numeric IDs, we’ll fill them with NaN placeholders after which take away them downstream within the transformation step. These data don’t break our evaluation if we merely take away them.
ETL: Remodel
Now, we’ll do some transformations on our information to organize it for storage in PostgreSQL.
We’ll do the next transformations:
- Take away duplicate rows — we’ll use the ‘ID’ column to establish duplicates.
- Take away invalid rows — among the rows that failed the info high quality checks had been marked with an NaN ‘ID’, so we’ll take away these.
- Cut up the datetime column into separate 12 months, month, day, and time columns. In our ultimate evaluation, we could wish to analyze crime traits by these completely different time intervals, so we’ll create these extra columns right here to simplify our queries downstream.
We’ll put this transformation code in its personal file — rework.py.
import pandas as pd
from prefect import job
### UTILITIES
def remove_duplicates(df):
'''
Take away duplicate rows from dataframe primarily based on 'id' column. Maintain the primary incidence.
'''
return df.drop_duplicates(subset=["id"], hold='first')
def remove_invalid_rows(df):
'''
Take away rows the place the 'id' is NaN, as these IDs had been recognized as non-numeric.
'''
return df.dropna(subset='id')
def split_datetime(df):
'''
Cut up the date_time column into separate 12 months, month, day, and time columns.
'''
# convert to datetime
df['date_time'] = pd.to_datetime(df['date_time'])
# extract 12 months/month/day/time
df['year'] = df['date_time'].dt.12 months
df['month'] = df['date_time'].dt.month
df['day'] = df['date_time'].dt.day
df['hour'] = df['date_time'].dt.hour
df['minute'] = df['date_time'].dt.minute
df['second'] = df['date_time'].dt.second
return df
### TRANSFORMATION LOGIC
@job
def transform_data(df):
'''
Apply the next transformations to the handed in dataframe:
- deduplicate data (hold the primary)
- take away invalid rows
- cut up datetime into 12 months, month, day, and time columns
'''
df = remove_duplicates(df)
df = remove_invalid_rows(df)
df = split_datetime(df)
return df
ETL: Load
Now our information is able to import into into PostgreSQL.
Earlier than we are able to import our information, we have to create our PostgreSQL occasion. We’ll create one regionally utilizing a compose file. This file permits us to outline & configure all of the providers that our utility wants.
providers:
postgres_cpd: # postgres occasion for CPD ETL pipeline
picture: postgres:16
container_name: postgres_cpd_dev
surroundings:
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
POSTGRES_DB: cpd_db
ports:
- "5433:5432" # Postgres is already on port 5432 on my native machine
volumes:
- pgdata_cpd:/var/lib/postgresql/information
restart: unless-stopped
pgadmin:
picture: dpage/pgadmin4
container_name: pgadmin_dev
surroundings:
PGADMIN_DEFAULT_EMAIL: ${PGADMIN_DEFAULT_EMAIL}
PGADMIN_DEFAULT_PASSWORD: ${PGADMIN_DEFAULT_PASSWORD}
ports:
- "8081:80"
depends_on: # do not begin pg_admin till our postgres occasion is working
- postgres_cpd
volumes:
pgdata_cpd: # all information for our postgres_cpd service can be saved right here
There are two foremost providers outlined right here:
- postgres_cpd — That is our PostgreSQL occasion the place we’ll retailer our information.
- pgadmin —DB admin platform which supplies a GUI we are able to use to question information in our PostgreSQL database. Not functionally required, however helpful for checking the info in our database. For extra data on connecting to your PostgreSQL database in pgAdmin, click on here.
Let’s spotlight some vital configuration for our postgres_cpd service:
- container_name: postgres_cpd_dev -> Our service will run in a container (i.e. an remoted course of) named postgres_cpd_dev. Docker generates random container names in case you don’t specify this, so assigning a reputation will make it extra easy to work together with the container.
- surroundings: -> We create a Postgres person from credentials saved in our .env file. Moreover, we create a default database, cpd_dev.
- ports: -> Our PostgreSQL service will pay attention on port 5432 inside the container. Nonetheless, we’ll map port 5433 on the host machine to port 5432 within the container, permitting us to hook up with PostgreSQL from our host machine by way of port 5433.
- volumes: -> Our service will retailer all its information (e.g. configuration, information recordsdata) underneath the next listing inside the container: /var/lib/postgresql/information. We’ll mount this container listing to a named Docker quantity saved on our native machine, pgdata_cpd. This enables us to persist the database information past the lifetime of the container.
Now that we’ve created our PostgreSQL occasion, we are able to execute queries towards it. Importing our information into PostgreSQL requires executing two queries towards the database:
- Creating the desk that can retailer the info.
- Loading our remodeled information into that desk.
Every time we execute a question towards our PostgreSQL occasion, we have to do the next:
- Set up our connection to PostgreSQL.
- Execute the question.
- Commit the adjustments & shut the connection.
from prefect import job
from sqlalchemy import create_engine
import psycopg2
from dotenv import load_dotenv
import os
# learn content material from .env, which comprises our Postgres credentials
load_dotenv()
def create_postgres_table():
'''
Create the cpd_incidents desk in Postgres DB (cpd_db) if it would not exist.
'''
# set up connection to DB
conn = psycopg2.join(
host="localhost",
port="5433",
database="cpd_db",
person=os.getenv("POSTGRES_USER"),
password=os.getenv("POSTGRES_PASSWORD")
)
# create cursor object to execute SQL
cur = conn.cursor()
# execute question to create the desk
create_table_query = '''
CREATE TABLE IF NOT EXISTS cpd_incidents (
date_time TIMESTAMP,
id INTEGER PRIMARY KEY,
sort TEXT,
subtype TEXT,
location TEXT,
description TEXT,
last_updated TIMESTAMP,
12 months INTEGER,
month INTEGER,
day INTEGER,
hour INTEGER,
minute INTEGER,
second INTEGER
)
'''
cur.execute(create_table_query)
# commit adjustments
conn.commit()
# shut cursor and connection
cur.shut()
conn.shut()
@job
def load_into_postgres(df):
'''
Masses the remodeled information handed in as a DataFrame
into the 'cpd_incidents' desk in our Postgres occasion.
'''
# create desk to insert information into as crucial
create_postgres_table()
# create Engine object to hook up with DB
engine = create_engine(f"postgresql://{os.getenv("POSTGRES_USER")}:{os.getenv("POSTGRES_PASSWORD")}@localhost:5433/cpd_db")
# insert information into Postgres DB into the 'cpd_incidents' desk
df.to_sql('cpd_incidents', engine, if_exists='exchange')
Issues to notice concerning the code above:
- Much like how we fetched our app token for extracting our information, we’ll fetch our Postgres credentials from a .env file.
- To load the DataFrame containing our remodeled information into Postgres, we’ll use the pandas.DataFrame.to_sql(). It’s a easy option to insert DataFrame information into any database supported by SQLAlchemy.
Defining the Knowledge Pipeline
We’ve applied the person parts of the ETL course of. Now, we’re able to encapsulate these parts right into a pipeline.
There are numerous instruments obtainable to make use of for orchestrating pipelines outlined in python. Two standard choices are Apache Airflow and Prefect.
For it’s simplicity, we’ll proceed with defining our pipeline utilizing Prefect. We have to do the next to get began:
- Set up Prefect in our growth surroundings.
- Get a Prefect API server. Since we don’t wish to handle our personal infrastructure to run Prefect, we’ll join for Prefect Cloud.
For extra data on Prefect setup, take a look at the docs.
Subsequent, we should add the next decorators to our code:
- @job -> Add this to every operate that implements a element of our ETL pipeline (i.e. our extract, validate, rework, and cargo features).
- @circulate -> Add this decorator to the operate that encapsulates the ETL parts into an executable pipeline.
If you happen to look again at our extract, validate, rework, and cargo code, you’ll see that we added the @job decorator to those features.
Now, let’s outline our ETL pipeline that executes these duties. We’ll put the next in a separate file, etl_pipeline.py.
from extract import extract_data
from validate import validate_data
from rework import transform_data
from load import load_into_postgres
from prefect import circulate
@circulate(title="cpd_incident_etl", log_prints=True) # Our pipeline will seem as 'cpd_incident_etl' within the Prefect UI. All print outputs can be displayed in Prefect.
def etl():
'''
Execute the ETL pipeline:
- Extract CPD incident information from the Socrata API
- Validate and rework the extracted information to organize it for storage
- Import the remodeled information into Postgres
'''
print("Extracting information...")
extracted_df = extract_data()
print("Performing information high quality checks...")
validated_df = validate_data(extracted_df)
print("Performing information transformations...")
transformed_df = transform_data(validated_df)
print("Importing information into Postgres...")
load_into_postgres(transformed_df)
print("ETL full!")
if __name__ == "__main__":
# CPD information is predicted to be up to date each day (https://information.cambridgema.gov/Public-Security/Day by day-Police-Log/3gki-wyrb/about_data)
# Thus, we'll execute our pipeline every day (at midnight)
etl.serve(title="cpd-pipeline-deployment", cron="0 0 * * *")
Issues to notice concerning the code:
- @circulate(title=”cpd_incident_etl”, log_prints=True) -> this names our pipeline “cpd_incident_etl”, which can be mirrored within the Prefect UI. The output of all our print statements can be logged in Prefect.
- etl.serve(title=”cpd-pipeline-deployment”, cron=”0 0 * * *”) -> this creates a deployment of our pipeline, named “cpd-pipeline-deployment”, that runs each day at midnight.


Now that we’ve created our pipeline to load our information into PostgreSQL, it’s time to visualise it.
There are numerous approaches we might take to visualise our information. Some notable choices embrace:
Each are good choices. With out going into an excessive amount of element behind every BI software, we’ll use Metabase to make a easy dashboard.
- Metabase is an open-source BI and embedded analytics software that makes information visualization and evaluation easy.
- Connecting Metabase to our information sources and deploying it’s easy, in comparison with different BI instruments (ex: Apache Superset).
Sooner or later, if we wish to have extra customization over our visuals/studies, we are able to think about using different instruments. For now, Metabase will do for making a POC.
Metabase lets you choose between utilizing its cloud model or managing a self-hosted occasion. Metabase Cloud offeres several payment plans, however you possibly can create a self-hosted occasion of Metabase free of charge utilizing Docker. We’ll outline our Metabase occasion in our compose file.
- Since we’re self-hosting, we additionally need to outline the Metabase application database, which comprises the metadata that Metabase wants to question your information sources (in our case, postgres_cpd).
providers:
postgres_cpd: # postgres occasion for CPD ETL pipeline
picture: postgres:16
container_name: postgres_cpd_dev
surroundings:
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
POSTGRES_DB: cpd_db
ports:
- "5433:5432" # Postgres is already on port 5432 on my native machine
volumes:
- pgdata_cpd:/var/lib/postgresql/information
restart: unless-stopped
networks:
- metanet1
pgadmin:
picture: dpage/pgadmin4
container_name: pgadmin_dev
surroundings:
PGADMIN_DEFAULT_EMAIL: ${PGADMIN_DEFAULT_EMAIL}
PGADMIN_DEFAULT_PASSWORD: ${PGADMIN_DEFAULT_PASSWORD}
ports:
- "8081:80"
depends_on:
- postgres_cpd
networks:
- metanet1
metabase: # taken from https://www.metabase.com/docs/newest/installation-and-operation/running-metabase-on-docker
picture: metabase/metabase:newest
container_name: metabase
hostname: metabase
volumes:
- /dev/urandom:/dev/random:ro
ports:
- "3000:3000"
surroundings:
MB_DB_TYPE: postgres
MB_DB_DBNAME: metabaseappdb
MB_DB_PORT: 5432
MB_DB_USER: ${METABASE_DB_USER}
MB_DB_PASS: ${METABASE_DB_PASSWORD}
MB_DB_HOST: postgres_metabase # should match container title of postgres_mb (Metabase Postgres occasion)
networks:
- metanet1
healthcheck:
take a look at: curl --fail -I http://localhost:3000/api/well being || exit 1
interval: 15s
timeout: 5s
retries: 5
postgres_mb: # postgres occasion for managing Metabase occasion
picture: postgres:16
container_name: postgres_metabase # different providers should use this title to speak with this container
hostname: postgres_metabase # inner identifier, would not influence communication with different providers (useful for logs)
surroundings:
POSTGRES_USER: ${METABASE_DB_USER}
POSTGRES_DB: metabaseappdb
POSTGRES_PASSWORD: ${METABASE_DB_PASSWORD}
ports:
- "5434:5432"
volumes:
- pgdata_mb:/var/lib/postgresql/information
networks:
- metanet1
# Right here, we'll outline separate volumes to isolate DB configuration & information recordsdata for every Postgres database.
# Our Postgres DB for our utility ought to retailer its config/information individually from the Postgres DB our Metabase service depends on.
volumes:
pgdata_cpd:
pgdata_mb:
# outline the community over which all of the providers will talk
networks:
metanet1:
driver: bridge # TO DO: 'bridge' is the default community - providers will be capable to talk with one another utilizing their service names
To create our Metabase occasion, we made the next adjustments to our compose file:
- Added two providers: metabase (our Metabase occasion) and postgres_mb (our Metabase occasion’s utility database).
- Outlined an extra quantity, pgdata_mb. This may retailer the info for the Metabase utility database (postgres_mb).
- Outlined the community over which the providers will talk, metanet1.
With out going into an excessive amount of element, let’s break down the metabase and postgres_mb providers.
Our Metabase occasion (metabase):
- This service can be uncovered on port 3000 on the host machine and inside the container. If we’re working this service on our native machine, we’ll be capable to entry it at localhost:3000.
- We join Metabase to it’s utility database by guaranteeing that the MB_DB_HOST, MB_DB_PORT, and MB_DB_NAME surroundings variables match up with the container title, ports, and database title listed underneath the postgres_mb service.
For extra data on tips on how to run Metabase in Docker, take a look at the docs.
After organising Metabase, you’ll be prompted to attach Metabase to your information supply.

After deciding on a PostgreSQL information supply, we are able to specify the next connection string to attach Metabase to our PostgreSQL occasion, substituting your credentials as crucial:
postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@postgres_cpd:5432/cpd_db

After organising the connection, we are able to create our dashboard. You’ll be able to create a wide variety of visuals in Metabase, so we gained’t go into the specifics right here.
Let’s revisit the instance dashboard that we displayed at first of this put up. This dashboard properly summarizes current and historic traits in reported CPD incidents.

From this dashboard, we are able to see the next:
- Most incidents are reported to the CPD within the mid-late afternoon.
- An amazing majority of reported incidents are of the “INCIDENT” sort.
- The variety of reported incidents peaked round August-October of 2025, and has been lowering steadily ever since.
Fortunately for us, Metabase will question our database every time we load this dashboard, so we gained’t have to fret about this dashboard displaying stale information.
Take a look at the Git repo here if you wish to dive deeper into the implementation.
Wrap-up and Future Work
Thanks for studying! Let’s briefly recap what we constructed:
- We constructed a knowledge pipeline to extract, rework, and cargo Cambridge Police Log information right into a self-hosted PostgreSQL database.
- We deployed this pipeline utilizing Prefect and scheduled it to run each day.
- We created a self-hosted occasion of Metabase, related it to our PostgreSQL database, and created a dashboard to visualise current and historic crime traits in Cambridge, MA.
There are numerous methods to construct upon this venture, together with however not restricted to:
- Creating extra visualizations (geospatial heatmap) to visualise crime frequencies in numerous areas inside Cambridge. This could require reworking our avenue location information into latitude/longitude coordinates.
- Deploying our self-hosted pipeline and providers off of our native machine.
- Think about becoming a member of this information with different datasets for insightful cross-domain evaluation. As an illustration, maybe we might be part of this dataset to demographic/census information (utilizing avenue location) to see whether or not areas of various demographic make-up inside Cambridge have completely different incident charges.
When you have every other concepts for tips on how to lengthen upon this venture, otherwise you would’ve constructed issues in a different way, I’d love to listen to it within the feedback!
The writer has created all pictures on this article.
Sources & GitHub
Prefect:
Metabase:
Docker:
GitHub Repo:
CPD Day by day Police Log Dataset:

