Quick Start#

This tutorial is intended as an introduction to working with PyMongoArrow. The reader is assumed to be familiar with basic PyMongo and MongoDB concepts.

Prerequisites#

Before we start, make sure that you have the PyMongoArrow distribution installed. In the Python shell, the following should run without raising an exception:

import pymongoarrow as pma

This tutorial also assumes that a MongoDB instance is running on the default host and port. Assuming you have downloaded and installed MongoDB, you can start it like so:

$ mongod

Extending PyMongo#

The pymongoarrow.monkey module provides an interface to patch PyMongo, in place, and add PyMongoArrow’s functionality directly to Collection instances:

from pymongoarrow.monkey import patch_all
patch_all()

After running patch_all(), new instances of Collection will have PyMongoArrow’s APIs, e.g. find_pandas_all().

Note

Users can also directly use any of PyMongoArrow’s APIs by importing them from pymongoarrow.api. The only difference in usage would be the need to manually pass the instance of Collection on which the operation is to be run as the first argument when directly using the API method.

Test data#

Before we begin, we must first add some data to our cluster that we can query. We can do so using PyMongo:

from datetime import datetime
from pymongo import MongoClient
client = MongoClient()
client.db.data.insert_many([
  {'_id': 1, 'amount': 21, 'last_updated': datetime(2020, 12, 10, 1, 3, 1), 'account': {'name': 'Customer1', 'account_number': 1}, 'txns': ['A']},
  {'_id': 2, 'amount': 16, 'last_updated': datetime(2020, 7, 23, 6, 7, 11), 'account': {'name': 'Customer2', 'account_number': 2}, 'txns': ['A', 'B']},
  {'_id': 3, 'amount': 3,  'last_updated': datetime(2021, 3, 10, 18, 43, 9), 'account': {'name': 'Customer3', 'account_number': 3}, 'txns': ['A', 'B', 'C']},
  {'_id': 4, 'amount': 0,  'last_updated': datetime(2021, 2, 25, 3, 50, 31), 'account': {'name': 'Customer4', 'account_number': 4}, 'txns': ['A', 'B', 'C', 'D']}])

Defining the schema#

PyMongoArrow relies upon a data schema to marshall query result sets into tabular form. This schema can either be automatically inferred from the data, or provided by the user. Users can define the schema by instantiating pymongoarrow.api.Schema using a mapping of field names to type-specifiers, e.g.:

from pymongoarrow.api import Schema
schema = Schema({'_id': int, 'amount': float, 'last_updated': datetime})

PyMongoArrow offers first-class support for Nested data (embedded documents):

schema = Schema({'_id': int, 'amount': float, 'account': { 'name': str, 'account_number': int}})

Lists (and nested lists) are also supported:

from pyarrow import list_, string
schema = Schema({'txns': list_(string())})
polars_df = client.db.data.find_polars_all({'amount': {'$gt': 0}}, schema=schema)

There are multiple permissible type-identifiers for each supported BSON type. For a full-list of data types and associated type-identifiers see Data Types.

Find operations#

We are now ready to query our data. Let’s start by running a find operation to load all records with a non-zero amount as a pandas.DataFrame:

df = client.db.data.find_pandas_all({'amount': {'$gt': 0}}, schema=schema)

We can also load the same result set as a pyarrow.Table instance:

arrow_table = client.db.data.find_arrow_all({'amount': {'$gt': 0}}, schema=schema)

a polars.DataFrame:

df = client.db.data.find_polars_all({'amount': {'$gt': 0}}, schema=schema)

or as Numpy arrays:

ndarrays = client.db.data.find_numpy_all({'amount': {'$gt': 0}}, schema=schema)

In the NumPy case, the return value is a dictionary where the keys are field names and values are corresponding numpy.ndarray instances.

Note

For all of the examples above, the schema can be omitted like so:

arrow_table = client.db.data.find_arrow_all({'amount': {'$gt': 0}})

In this case, PyMongoArrow will try to automatically apply a schema based on the data contained in the first batch.

Aggregate operations#

Running an aggregate operation is similar to find, but it takes a sequence of operations to perform. Here is a simple example of aggregate_pandas_all that outputs a new dataframe in which all _id values are grouped together and their amount values summed:

df = client.db.data.aggregate_pandas_all([{'$group': {'_id': None, 'total_amount': { '$sum': '$amount' }}}])

Nested data (embedded documents) are also supported. In this more complex example, we unwind values in the nested txn field, count the number of each, then return as a list of numpy ndarrays sorted in decreasing order:

pipeline = [{'$unwind': '$txns'}, {'$group': {'_id': '$txns', 'count': {'$sum': 1}}}, {'$sort': {"count": -1}}]
ndarrays = client.db.data.aggregate_numpy_all(pipeline)

More information on aggregation pipelines can be found here.

Writing to MongoDB#

All of these types, Arrow’s Table, Pandas’ DataFrame, NumPy’s ndarray, or DataFrame can be easily written to your MongoDB database using the write() function:

from pymongoarrow.api import write
from pymongo import MongoClient
coll = MongoClient().db.my_collection
write(coll, df)
write(coll, arrow_table)
write(coll, ndarrays)

(Keep in mind that NumPy arrays are specified as dict[str, ndarray].)

Writing to other formats#

Once result sets have been loaded, one can then write them to any format that the package supports.

For example, to write the table referenced by the variable arrow_table to a Parquet file example.parquet, run:

import pyarrow.parquet as pq
pq.write_table(arrow_table, 'example.parquet')

Pandas also supports writing DataFrame instances to a variety of formats including CSV, and HDF. To write the data frame referenced by the variable df to a CSV file out.csv, for example, run:

df.to_csv('out.csv', index=False)

The Polars API is a mix of the two:

import polars as pl
df = pl.DataFrame({"foo": [1, 2, 3, 4, 5]})
df.write_parquet('example.parquet')

Note

Nested data is supported for parquet read/write but is not well supported by Arrow or Pandas for CSV read/write.