Implementing a scalable geospatial operation in MongoDB
Summary
In this note I document an initial test implementation of a spatial join involving 22 millions of points to nearly 16 thousands polygons using MongoDB. I document the necessary steps to run the operation. My results took more time that I expected, a total of more than 12 hours. My conclusion is that the approach can be scalable if combined with other approaches such as the simplification of polygons.
Intro
In this post, I am sharing an implementation of a spatial join type of analysis at scale using MongoDB. MongoDB is a Non-SQL database system, which is extensively used in industry to store large databases distributed over multiple (cloud) machines for storing files.
My case is the analysis of a large database over 22 million geo-located tweets. My first objective is to implement a spatial join kind of analysis, that essentially counts tweets in censal radiuses, which are spatial polygons. In this case I have 15,700 polygons. Such an operation is standardly implemented in geospatial packages such as Arcgis or Qgis, and in Python, for example, using Geopandas. But my ultimate objective is finding a solution that is scalable with large amounts of data. Here my implementation is in single machine local environment (Intel(R) Core(TM) i7-7500U CPU @ 2.70GHz), 16GB Ram and a hard disk drive.
The other database-based solutions I am aware of is, notably, PostGis. Related approaches include accelerating (parallel processing) geopandas using Dask (See a post here), and combining these approaches with grids (such as hexagonal grids, see geohex, h3), which simplify spatial operations.
In order to implement this in MongoDB:
Each point (in my case tweets) should be stored in the database as a document, with a location key that specifies the "type" of spatial object the "location" containing the corresponding spatial coordinates. https://docs.mongodb.com/manual/geospatial-queries/
Once stored, location objects should be spatially indexed, with the special purpose mongo spatial indices.
Load or store polygons as part of other Mongo collections.
Use the $geowithin function to perform the spatial join
1. Storing documents in the database
The following is an example of how should a point be stored in the document format required by MongoDB:
{ 'id': 0999999999999,
'created_at': 1388045287000,
...other keys here...
'text': 'hi',
'location': { type: "Point", coordinates: [ -73.97, 40.77 ] },
}
In my specific case I already needed to reshape the data that it was already as part of a collection in Mongo. I have explained the procedure here.
2. Spatial Indexation
There are two options: "sphere" and "2d". Sphere preserves the analysis in a globe-style spherical projection, and 2d is planar.
I usually create indices from MongoDB Compass directly. Specify the location key as the field to be indexed and the option "2dsphere"
Indices can also be created from the mongoDB command line with createIndex.
3. Load or store polygons as part of other Mongo collections.
My approach here will be store the polygons as part of a second mongo collection.
Since I can load the polygons to memory easily using geopandas, this step is not strictly required, but I will show here the procedure anyway for other applications. The tricky part is preparing the polygons to be stored as geoJson objects.
# loading radiuses with geopandas
import geopandas as gpd
radios=gpd.read_file("data/buenosairesradios.shp")
It is important to have the same spatial projection.
As explained in the manual for queries on GeoJSON objects calculated on the sphere; MongoDB uses the WGS84 reference system for geospatial queries. This is equivalent to EPSG=4326
#reproject coordinates with to_crs()
radios=radios.to_crs(epsg=4326)
I now create a function that transforms the geopandas dataframe into a list of dicts with the necessary geojson format:
#Transform geopandas to geojson
def geopandas_to_geojson(radios):
"""
Geopandas to
"""
listaderecords=[]
for index,radio in radios.iterrows():
#print(index)
record={}
record['COD_2010_1']=radio['COD_2010_1']
record['pop_2010_1']=radio['pop_2010_1']
# radio geometry exterior coords is a sequence of tuples. I need a list of lists of coordinates
# I need first to transform it to a list of tuples
# then iterate over the elements of that list, and finally convert the elements of the tuple into a list
try: len(radio['geometry']) #test if geometry has only one polygon, should return error in that case
except TypeError:
radio_geometry=radio['geometry']
else:
radio_geometry=radio['geometry'][0] # its multypoligon and i will keep only the first
finally:
coordenadas=[[coord for coord in tuplex] for tuplex in list(radio_geometry.exterior.coords)]
record["geometry"] = {
"type": "Polygon",
"coordinates": [ coordenadas ]
}
listaderecords.append(record)
return listaderecords
Populating the collection
listaderecords=geopandas_to_geojson(radios)
# Create the radios collection
db.radios.insert_many(listaderecords)
4. Use the $geowithin function to perform the spatial join
Finally, I iterate over the polygons to count the number of tweets in each radio.
from bson.objectid import ObjectId
# recall that radios are stored in the radios collection
cursor=db.radios.find()
for radio in cursor:
totalradiocount=db.tweets.find( { 'location': { '$geoWithin': { '$geometry': radio['geometry'] } } } ).count()
# I additionally incorporate the result into the radios collection:
recordid=radio['_id']
db.radios.update({'_id': recordid}, {'$set': {"tweets.totalcount": totalradiocount}})
Results
The process took more than 12 hours to complete. For a first attempt my impression is that the procedure is feasible but not further scalable.
Reading operations and writing operation took 92% of activity time, and memory was used on overage in 81%. I have been able to keep doing work in the laptop which is good, and working in the database has the process that is safe in the sense that a power outage or similar would not have reversed the process at the beginning.
Some further tests of the spatial join query reported an average time of 10 seconds per polygon, but with a median of 0.25 seconds. In other words a highly non-normal and sensitive to outliers distribution. I imagine that this is due to the complexity of certain polygons, which might be highly non-linear in terms of the necessary time to process. If that is the case, a scalable alternative would be feasible if simplified geometries are use instead, such as in the case of hexagonal polygons approximations mentioned above.