Modularization + Poetry + Docker (#213)

* reorg

* added configuration management; initial click cmds

* reset dirs completed

* major modularization effort

* prepping mbtiles

* first round of PR review updates

* round 2 of feedback review

* checkpoint

* habemus dockerfile 🎉

* updated dock-er-compose with long running container

* census generation works

* logging working

* updated README

* updated README

* last small update to README

* added instructions for log visualization

* census etl update for reusable fips module

* ejscreem etl updated

* further modularization

* score modularization

* tmp cleanup
This commit is contained in:
Jorge Escobar 2021-06-28 16:16:14 -04:00 committed by GitHub
commit 67c73dde2a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
29 changed files with 2383 additions and 433 deletions

View file

@ -12,11 +12,17 @@
"import csv\n",
"from pathlib import Path\n",
"import os\n",
"import sys\n",
"\n",
"module_path = os.path.abspath(os.path.join('..'))\n",
"if module_path not in sys.path:\n",
" sys.path.append(module_path)\n",
"\n",
"from etl.sources.census.etl_utils import get_state_fips_codes\n",
"\n",
"ACS_YEAR = 2019\n",
"\n",
"DATA_PATH = Path.cwd().parent / \"data\"\n",
"FIPS_CSV_PATH = DATA_PATH / \"fips_states_2010.csv\"\n",
"OUTPUT_PATH = DATA_PATH / \"dataset\" / f\"census_acs_{ACS_YEAR}\"\n",
"\n",
"GEOID_FIELD_NAME = \"GEOID10\"\n",
@ -57,27 +63,19 @@
"\n",
"\n",
"dfs = []\n",
"with open(FIPS_CSV_PATH) as csv_file:\n",
" csv_reader = csv.reader(csv_file, delimiter=\",\")\n",
" line_count = 0\n",
"for fips in get_state_fips_codes(DATA_PATH):\n",
" print(f\"Downloading data for state/territory with FIPS code {fips}\")\n",
"\n",
" for row in csv_reader:\n",
" if line_count == 0:\n",
" line_count += 1\n",
" else:\n",
" fips = row[0].strip()\n",
" print(f\"Downloading data for state/territory with FIPS code {fips}\")\n",
"\n",
" dfs.append(\n",
" censusdata.download(\n",
" src=\"acs5\",\n",
" year=ACS_YEAR,\n",
" geo=censusdata.censusgeo(\n",
" [(\"state\", fips), (\"county\", \"*\"), (\"block group\", \"*\")]\n",
" ),\n",
" var=[\"B23025_005E\", \"B23025_003E\"],\n",
" )\n",
" )\n",
" dfs.append(\n",
" censusdata.download(\n",
" src=\"acs5\",\n",
" year=ACS_YEAR,\n",
" geo=censusdata.censusgeo(\n",
" [(\"state\", fips), (\"county\", \"*\"), (\"block group\", \"*\")]\n",
" ),\n",
" var=[\"B23025_005E\", \"B23025_003E\"],\n",
" )\n",
" )\n",
"\n",
"df = pd.concat(dfs)\n",
"\n",

View file

@ -8,33 +8,25 @@
"outputs": [],
"source": [
"from pathlib import Path\n",
"import requests\n",
"import zipfile\n",
"import numpy as np\n",
"import pandas as pd\n",
"import csv\n",
"import sys\n",
"import os\n",
"\n",
"data_path = Path.cwd().parent / \"data\"\n",
"fips_csv_path = data_path / \"fips_states_2010.csv\"\n",
"csv_path = data_path / \"dataset\" / \"ejscreen_2020\""
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "67a58c24",
"metadata": {},
"outputs": [],
"source": [
"download = requests.get(\n",
" \"https://gaftp.epa.gov/EJSCREEN/2020/EJSCREEN_2020_StatePctile.csv.zip\",\n",
" verify=False,\n",
")\n",
"file_contents = download.content\n",
"zip_file_path = data_path / \"tmp\"\n",
"zip_file = open(zip_file_path / \"downloaded.zip\", \"wb\")\n",
"zip_file.write(file_contents)\n",
"zip_file.close()"
"module_path = os.path.abspath(os.path.join('..'))\n",
"if module_path not in sys.path:\n",
" sys.path.append(module_path)\n",
"\n",
"from etl.sources.census.etl_utils import get_state_fips_codes\n",
"from utils import unzip_file_from_url, remove_all_from_dir\n",
"\n",
"DATA_PATH = Path.cwd().parent / \"data\"\n",
"TMP_PATH = DATA_PATH / \"tmp\"\n",
"EJSCREEN_FTP_URL = \"https://gaftp.epa.gov/EJSCREEN/2020/EJSCREEN_2020_StatePctile.csv.zip\"\n",
"EJSCREEN_CSV = TMP_PATH / \"EJSCREEN_2020_StatePctile.csv\"\n",
"CSV_PATH = DATA_PATH / \"dataset\" / \"ejscreen_2020\"\n",
"print(DATA_PATH)"
]
},
{
@ -44,9 +36,8 @@
"metadata": {},
"outputs": [],
"source": [
"with zipfile.ZipFile(zip_file_path / \"downloaded.zip\", \"r\") as zip_ref:\n",
" zip_ref.extractall(zip_file_path)\n",
"ejscreen_csv = data_path / \"tmp\" / \"EJSCREEN_2020_StatePctile.csv\""
"# download file from ejscreen ftp\n",
"unzip_file_from_url(EJSCREEN_FTP_URL, TMP_PATH, TMP_PATH)"
]
},
{
@ -58,7 +49,7 @@
},
"outputs": [],
"source": [
"df = pd.read_csv(ejscreen_csv, dtype={\"ID\": \"string\"}, low_memory=False)"
"df = pd.read_csv(EJSCREEN_CSV, dtype={\"ID\": \"string\"}, low_memory=False)"
]
},
{
@ -69,8 +60,8 @@
"outputs": [],
"source": [
"# write nationwide csv\n",
"csv_path.mkdir(parents=True, exist_ok=True)\n",
"df.to_csv(csv_path / f\"usa.csv\", index=False)"
"CSV_PATH.mkdir(parents=True, exist_ok=True)\n",
"df.to_csv(CSV_PATH / f\"usa.csv\", index=False)"
]
},
{
@ -81,19 +72,11 @@
"outputs": [],
"source": [
"# write per state csvs\n",
"with open(fips_csv_path) as csv_file:\n",
" csv_reader = csv.reader(csv_file, delimiter=\",\")\n",
" line_count = 0\n",
"\n",
" for row in csv_reader:\n",
" if line_count == 0:\n",
" line_count += 1\n",
" else:\n",
" fips = row[0].strip()\n",
" print(f\"Generating data{fips} csv\")\n",
" df1 = df[df.ID.str[:2] == fips]\n",
" # we need to name the file data01.csv for ogr2ogr csv merge to work\n",
" df1.to_csv(csv_path / f\"data{fips}.csv\", index=False)"
"for fips in get_state_fips_codes(DATA_PATH):\n",
" print(f\"Generating data{fips} csv\")\n",
" df1 = df[df.ID.str[:2] == fips]\n",
" # we need to name the file data01.csv for ogr2ogr csv merge to work\n",
" df1.to_csv(CSV_PATH / f\"data{fips}.csv\", index=False)"
]
},
{
@ -102,6 +85,17 @@
"id": "81b977f8",
"metadata": {},
"outputs": [],
"source": [
"# cleanup\n",
"remove_all_from_dir(TMP_PATH)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "6d4f74d7",
"metadata": {},
"outputs": [],
"source": []
}
],

View file

@ -10,15 +10,22 @@
"import pandas as pd\n",
"import censusdata\n",
"import csv\n",
"import requests\n",
"import zipfile\n",
"\n",
"from pathlib import Path\n",
"import os\n",
"import sys\n",
"\n",
"module_path = os.path.abspath(os.path.join('..'))\n",
"if module_path not in sys.path:\n",
" sys.path.append(module_path)\n",
"\n",
"from etl.sources.census.etl_utils import get_state_fips_codes\n",
"from utils import unzip_file_from_url, remove_all_from_dir\n",
"\n",
"ACS_YEAR = 2019\n",
"\n",
"DATA_PATH = Path.cwd().parent / \"data\"\n",
"FIPS_CSV_PATH = DATA_PATH / \"fips_states_2010.csv\"\n",
"TMP_PATH = DATA_PATH / \"tmp\"\n",
"HOUSING_FTP_URL = \"https://htaindex.cnt.org/download/download.php?focus=blkgrp&geoid=\"\n",
"OUTPUT_PATH = DATA_PATH / \"dataset\" / \"housing_and_transportation_index\"\n",
"\n",
"GEOID_FIELD_NAME = \"GEOID10\""
@ -31,44 +38,18 @@
"metadata": {},
"outputs": [],
"source": [
"# https://htaindex.cnt.org/download/download.php?focus=blkgrp&geoid=01\n",
"\n",
"# Download each state / territory individually\n",
"dfs = []\n",
"with open(FIPS_CSV_PATH) as csv_file:\n",
" csv_reader = csv.reader(csv_file, delimiter=\",\")\n",
" line_count = 0\n",
"zip_file_dir = TMP_PATH / \"housing_and_transportation_index\"\n",
"for fips in get_state_fips_codes(DATA_PATH):\n",
" print(f\"Downloading housing data for state/territory with FIPS code {fips}\")\n",
" unzip_file_from_url(f\"{HOUSING_FTP_URL}{fips}\", TMP_PATH, zip_file_dir)\n",
" \n",
" # New file name:\n",
" tmp_csv_file_path = zip_file_dir / f\"htaindex_data_blkgrps_{fips}.csv\"\n",
" tmp_df = pd.read_csv(filepath_or_buffer=tmp_csv_file_path)\n",
"\n",
" for row in csv_reader:\n",
" if line_count == 0:\n",
" line_count += 1\n",
" else:\n",
" fips = row[0].strip()\n",
"\n",
" print(f\"Downloading data for state/territory with FIPS code {fips}\")\n",
"\n",
" download = requests.get(\n",
" f\"https://htaindex.cnt.org/download/download.php?focus=blkgrp&geoid={fips}\",\n",
" verify=False,\n",
" )\n",
" file_contents = download.content\n",
" zip_file_dir = DATA_PATH / \"tmp\" / \"housing_and_transportation_index\"\n",
"\n",
" # Make the directory if it doesn't exist\n",
" zip_file_dir.mkdir(parents=True, exist_ok=True)\n",
" zip_file_path = zip_file_dir / f\"{fips}-downloaded.zip\"\n",
" zip_file = open(zip_file_path, \"wb\")\n",
" zip_file.write(file_contents)\n",
" zip_file.close()\n",
"\n",
" with zipfile.ZipFile(zip_file_path, \"r\") as zip_ref:\n",
" zip_ref.extractall(zip_file_dir)\n",
"\n",
" # New file name:\n",
" tmp_csv_file_path = zip_file_dir / f\"htaindex_data_blkgrps_{fips}.csv\"\n",
" tmp_df = pd.read_csv(filepath_or_buffer=tmp_csv_file_path)\n",
"\n",
" dfs.append(tmp_df)\n",
" dfs.append(tmp_df)\n",
"\n",
"df = pd.concat(dfs)\n",
"\n",
@ -105,6 +86,17 @@
"id": "ef5bb862",
"metadata": {},
"outputs": [],
"source": [
"# cleanup\n",
"remove_all_from_dir(TMP_PATH)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "9269e497",
"metadata": {},
"outputs": [],
"source": []
}
],

View file

@ -17,6 +17,14 @@
"from pathlib import Path\n",
"import pandas as pd\n",
"import csv\n",
"import os\n",
"import sys\n",
"\n",
"module_path = os.path.abspath(os.path.join('..'))\n",
"if module_path not in sys.path:\n",
" sys.path.append(module_path)\n",
"\n",
"from etl.sources.census.etl_utils import get_state_fips_codes\n",
"\n",
"# Define some global parameters\n",
"GEOID_FIELD_NAME = \"GEOID10\"\n",
@ -37,9 +45,8 @@
"\n",
"PERCENTILE_FIELD_SUFFIX = \" (percentile)\"\n",
"\n",
"data_path = Path.cwd().parent / \"data\"\n",
"fips_csv_path = data_path / \"fips_states_2010.csv\"\n",
"score_csv_path = data_path / \"score\" / \"csv\"\n",
"DATA_PATH = Path.cwd().parent / \"data\"\n",
"SCORE_CSV_PATH = DATA_PATH / \"score\" / \"csv\"\n",
"\n",
"# Tell pandas to display all columns\n",
"pd.set_option(\"display.max_columns\", None)"
@ -55,7 +62,7 @@
"outputs": [],
"source": [
"# EJSCreen csv Load\n",
"ejscreen_csv = data_path / \"dataset\" / \"ejscreen_2020\" / \"usa.csv\"\n",
"ejscreen_csv = DATA_PATH / \"dataset\" / \"ejscreen_2020\" / \"usa.csv\"\n",
"ejscreen_df = pd.read_csv(ejscreen_csv, dtype={\"ID\": \"string\"}, low_memory=False)\n",
"ejscreen_df.rename(columns={\"ID\": GEOID_FIELD_NAME}, inplace=True)\n",
"ejscreen_df.head()"
@ -69,7 +76,7 @@
"outputs": [],
"source": [
"# Load census data\n",
"census_csv = data_path / \"dataset\" / \"census_acs_2019\" / \"usa.csv\"\n",
"census_csv = DATA_PATH / \"dataset\" / \"census_acs_2019\" / \"usa.csv\"\n",
"census_df = pd.read_csv(\n",
" census_csv, dtype={GEOID_FIELD_NAME: \"string\"}, low_memory=False\n",
")\n",
@ -85,7 +92,7 @@
"source": [
"# Load housing and transportation data\n",
"housing_and_transportation_index_csv = (\n",
" data_path / \"dataset\" / \"housing_and_transportation_index\" / \"usa.csv\"\n",
" DATA_PATH / \"dataset\" / \"housing_and_transportation_index\" / \"usa.csv\"\n",
")\n",
"housing_and_transportation_df = pd.read_csv(\n",
" housing_and_transportation_index_csv,\n",
@ -352,7 +359,7 @@
"outputs": [],
"source": [
"# write nationwide csv\n",
"df.to_csv(score_csv_path / f\"usa.csv\", index=False)"
"df.to_csv(SCORE_CSV_PATH / f\"usa.csv\", index=False)"
]
},
{
@ -363,19 +370,11 @@
"outputs": [],
"source": [
"# write per state csvs\n",
"with open(fips_csv_path) as csv_file:\n",
" csv_reader = csv.reader(csv_file, delimiter=\",\")\n",
" line_count = 0\n",
"\n",
" for row in csv_reader:\n",
" if line_count == 0:\n",
" line_count += 1\n",
" else:\n",
" states_fips = row[0].strip()\n",
" print(f\"Generating data{states_fips} csv\")\n",
" df1 = df[df[\"GEOID10\"].str[:2] == states_fips]\n",
" # we need to name the file data01.csv for ogr2ogr csv merge to work\n",
" df1.to_csv(score_csv_path / f\"data{states_fips}.csv\", index=False)"
"for states_fips in get_state_fips_codes(DATA_PATH):\n",
" print(f\"Generating data{states_fips} csv\")\n",
" df1 = df[df[\"GEOID10\"].str[:2] == states_fips]\n",
" # we need to name the file data01.csv for ogr2ogr csv merge to work\n",
" df1.to_csv(SCORE_CSV_PATH / f\"data{states_fips}.csv\", index=False)"
]
},
{