This documentation supports the latest PCIe host firmware, the latest OpenEphys.Onix1 package, and Bonsai 2.9+.

Table of Contents

Using DataFrameWriter to Save Data

The DataFrameWriter operator provides an easy and efficient way to write ONIX data to disk using the Apache Arrow file format. Apache Arrow stores data in a column-oriented binary layout where values from each channel or signal are packed contiguously on disk. Each file also embeds a schema containing column names, data types, and structure, so no separate metadata file is needed to read it correctly. This tutorial explains how to use DataFrameWriter in an acquisition workflow, configure its properties (including optional compression), and efficiently load the resulting Arrow files in Python.

The first sections walk through how to save data using the DataFrameWriter operator and then load and plot the data DataFrameWriter using Python. More detailed explanations are covered in the Advanced section at the end of this article.

Note

Arrow is supported by many scientific computing environments. For instance:

Using DataFrameWriter

Before starting this tutorial, complete the following steps from the Getting Started guide:

Setting up your workflow

To follow along with this tutorial, you need a workflow that contains at least one DataFrameWriter operator. DataFrameWriter is a Data Sink Operators operator which accepts any data stream producing DataFrame or BufferedDataFrame elements, meaning it can be placed downstream of most data source nodes. You can use the example workflow below which saves data from several data sources on the Breakout Board. The scripts on this page assume you are loading data from this workflow. However, you can also add a DataFrameWriter node to a workflow and adapt the scripts according to your needs to follow along.

workflow for testing DataFrameWriter with Breakout Board data

Configuring DataFrameWriter

DataFrameWriter exposes the following properties in the Editor's property panel:

  • FileName The path of the output file, including the .arrow extension (e.g., data/memory-monitor.arrow). Any intermediate directories in the path are created automatically if they do not already exist.

  • Suffix A modifier inserted into the file name just before the extension each time the workflow starts. Choose one of three options:

    • None (default): No suffix is added. If a file at the specified path already exists and Overwrite is false, the workflow raises an error on startup.
    • FileCount: Appends the count of files already in the same directory with the same base name and extension (0, 1, 2, …). Use this to automatically number successive recordings without having to rename the node before each run. Adding an underscore at the end of the FileName, but before the extension, separates the suffix from the filename (e.g., data/memory-monitor_.arrow).
    • Timestamp: Appends an underscore followed by a high-resolution system timestamp at the moment the file is created (ISO 8601 format), guaranteeing a unique file name for every run.
  • Buffered (default: true) When true, incoming frames are placed in a memory queue and written to disk by a background thread, preventing disk I/O latency from blocking the acquisition thread. Setting this to false writes synchronously on the acquisition thread, which is not recommended for high-bandwidth data streams or when low latency feedback is required.

  • Overwrite (default: false) When true, an existing file at the resolved path is silently replaced when the workflow starts. When false, the workflow raises an error if the file already exists. This setting has no practical effect when Suffix is FileCount or Timestamp, because those modes always produce a unique file name.

  • EnableCompression (default: false) When true, data is compressed before being written to disk to reduce file size at the cost of CPU overhead. See Compression in the Advanced section for more information on compression and when to use it.

Running the workflow

Start the workflow. DataFrameWriter will write data to the file specified in FileName as the workflow runs, and will stop when the workflow stops.

Loading Arrow files in Python

In Python, Arrow files can be read using PyArrow. Several scientific analysis libraries such as pandas, numpy and Polars use PyArrow internally. This section demonstrates file loading using both PyArrow and Pandas. Install both packages using your preferred method before proceeding (e.g. pip install pyarrow pandas).

The following script provides a single public function, load_arrow_file, which loads data from an Arrow file efficiently using zero-copy memory mapping. The optional start, end, and columns parameters allow loading only the rows and columns you need without reading the full file into memory. Place it in the same directory as your other processing scripts.

load_arrow.py⬇ Download
from dataclasses import dataclass
from typing import List, Tuple, Optional, Union
from pathlib import Path

import struct
import pyarrow as pa
import io

@dataclass(frozen=True)
class _BatchInfo:
    index: int
    start: int
    end: int
    num_rows: int

def _read_batch_metadata(path: str | Path, end: Optional[int]) -> List[_BatchInfo]:
    MAGIC = b"ARROW1"
    MAGIC_LEN = len(MAGIC)
    FOOTER_SIZE_LEN = 4
    CONTINUATION_MARKER = b"\xff\xff\xff\xff"

    from flatbuffers import encode
    import flatbuffers.number_types as N

    with pa.memory_map(str(path), 'r') as f:
        assert f.read(MAGIC_LEN) == MAGIC, "Not a valid Arrow IPC file"
        f.seek(-(FOOTER_SIZE_LEN + MAGIC_LEN), io.SEEK_END)
        footer_size = struct.unpack("<i", f.read(FOOTER_SIZE_LEN))[0]
        assert f.read(MAGIC_LEN) == MAGIC, "Invalid end magic"
        f.seek(-(FOOTER_SIZE_LEN + MAGIC_LEN + footer_size), 2)
        footer_buf = bytearray(f.read(footer_size))

    root_offset = encode.Get(N.UOffsetTFlags.packer_type, footer_buf, 0)
    tab_start = root_offset
    vtable_offset = tab_start - encode.Get(N.SOffsetTFlags.packer_type, footer_buf, tab_start)
    vtable_len = encode.Get(N.VOffsetTFlags.packer_type, footer_buf, vtable_offset)

    def field_offset(buf, field_index, vtab_len, vtab_off):
        slot = 4 + field_index * 2
        if slot >= vtab_len:
            return 0
        return encode.Get(N.VOffsetTFlags.packer_type, buf, vtab_off + slot)

    rb_field_offset = field_offset(footer_buf, 3, vtable_len, vtable_offset)
    if rb_field_offset == 0:
        return []

    vec_offset_pos = tab_start + rb_field_offset
    vec_offset = vec_offset_pos + encode.Get(N.UOffsetTFlags.packer_type, footer_buf, vec_offset_pos)
    num_blocks = encode.Get(N.UOffsetTFlags.packer_type, footer_buf, vec_offset)

    batch_info: List[_BatchInfo] = []
    row_offset = 0

    with open(path, "rb") as f:
        for i in range(num_blocks):
            block_start = vec_offset + 4 + i * 24
            block_offset = struct.unpack_from("<q", footer_buf, block_start)[0]
            f.seek(block_offset)
            marker = f.read(len(CONTINUATION_MARKER))
            meta_size = struct.unpack("<i", f.read(4) if marker == CONTINUATION_MARKER else marker)[0]
            msg_buf = bytearray(f.read(meta_size))

            msg_root = encode.Get(N.UOffsetTFlags.packer_type, msg_buf, 0)
            msg_vtab_off = msg_root - encode.Get(N.SOffsetTFlags.packer_type, msg_buf, msg_root)
            msg_vtab_len = encode.Get(N.VOffsetTFlags.packer_type, msg_buf, msg_vtab_off)

            header_off = field_offset(msg_buf, 2, msg_vtab_len, msg_vtab_off)
            if header_off == 0:
                raise ValueError("Could not find a valid header offset in Arrow file.")

            header_pos = msg_root + header_off
            rb_root = header_pos + encode.Get(N.UOffsetTFlags.packer_type, msg_buf, header_pos)
            rb_vtab_off = rb_root - encode.Get(N.SOffsetTFlags.packer_type, msg_buf, rb_root)
            rb_vtab_len = encode.Get(N.VOffsetTFlags.packer_type, msg_buf, rb_vtab_off)

            length_off = field_offset(msg_buf, 0, rb_vtab_len, rb_vtab_off)
            if length_off == 0:
                raise ValueError("Found batch with no rows. Verify file integrity.")

            num_rows = struct.unpack_from("<q", msg_buf, rb_root + length_off)[0]
            batch_info.append(_BatchInfo(index=i, start=row_offset, end=row_offset + num_rows - 1, num_rows=num_rows))
            row_offset += num_rows

    return batch_info

def _slice_indices(batch_info: List[_BatchInfo], start: int, end: int) -> Tuple[List[int], int, int]:
    if start < 0 or end > batch_info[-1].end or start > end:
        raise IndexError(f"Range [{start}, {end}] is out of bounds. Valid range: [0, {batch_info[-1].end}]")

    if start == 0 and end == batch_info[-1].end:
        return list(range(len(batch_info))), 0, batch_info[-1].num_rows

    batch_indices = []
    first_idx = last_idx = None

    for i, batch in enumerate(batch_info):
        if batch.end < start:
            continue
        batch_indices.append(batch.index)
        if first_idx is None:
            first_idx = i
        if batch.end >= end:
            last_idx = i
            break

    first_offset = 0 if first_idx is None else start - batch_info[first_idx].start
    last_offset = (batch_info[batch_indices[-1]].num_rows if last_idx is None
                   else end - batch_info[last_idx].start)

    return batch_indices, first_offset, last_offset

def load_arrow_file(
    filepath: Union[str, Path],
    start: Optional[int] = None,
    end: Optional[int] = None,
    columns: Optional[List[str]] = None
) -> pa.Table:
    """
    Load rows [start, end) from an Arrow IPC file as a PyArrow Table.

    Args:
        filepath: Path to the .arrow file.
        start:    First row index to include. Defaults to 0.
        end:      Last row index to include. Defaults to all rows.
        columns:  Column names to return. Defaults to all columns.

    Returns:
        PyArrow.Table containing the requested rows and columns.
    """
    filepath = Path(filepath)
    start = start or 0
    batches = _read_batch_metadata(filepath, end)

    if not batches:
        raise ValueError(f"No valid batches found in {filepath}")

    end = end if end is not None else batches[-1].end
    batch_indices, first_offset, last_offset = _slice_indices(batches, start, end)

    source = pa.memory_map(str(filepath), 'r')
    reader = pa.ipc.open_file(source)

    if not batch_indices:
        schema = reader.schema
        if columns:
            schema = pa.schema([f for f in schema if f.name in columns])
        return pa.Table.from_arrays([], schema=schema)

    def _batches():
        for i, batch_idx in enumerate(batch_indices):
            batch = reader.get_batch(batch_idx)
            if columns is not None:
                batch = batch.select(columns)
            if i == 0 or i == len(batch_indices) - 1:
                s = first_offset if i == 0 else 0
                e = last_offset if i == len(batch_indices) - 1 else batch.num_rows
                batch = batch.slice(s, e - s)
            yield batch

    return pa.Table.from_batches(_batches())

Loading Arrow files

To load the full Arrow file, simply call load_arrow_file with a string pointing to the file; this can be an absolute file path or a relative file path.

The optional start and end parameters are integers that specify the first and last row indices (0-based) to read from the file. The optional columns parameter is a list of strings that restricts loading to specific channels by name. Any combination of these parameters can be used together to avoid loading the full file into memory, which can reduce load time significantly for large recordings.

from load_arrow import load_arrow_file

table = load_arrow_file("memory-monitor_0.arrow", start=100, end=5000, columns=['Clock', 'PercentUsed'])

If start or end is outside the valid row index range, an IndexError is raised indicating what the valid range is. If any string in columns does not match a column name in the file, a KeyError is raised.

Note

Compressed and uncompressed files are loaded identically. PyArrow reads the compression metadata in each record batch header and decompresses automatically when necessary. This decompression process can incur CPU overhead that extends the amount of time it takes to load a file.

Converting to other formats

PyArrow can convert Arrow tables into several other formats for use with different libraries and workflows.

Converting to a pandas DataFrame

If your analysis uses pandas, you can convert an Arrow Table to a DataFrame by calling .to_pandas():

from load_arrow import load_arrow_file

table = load_arrow_file("memory-monitor_0.arrow")
df = table.to_pandas()
Important

Calling .to_pandas(), or any other method that converts the table to a pandas DataFrame, might copy the entire dataset into RAM. Pandas and PyArrow can, in certain limited circumstances, maintain memory mapping across boundaries, but it is not guaranteed. Keep this in mind when working with long recordings on machines with limited memory.

Using pandas directly

It is also possible to load an Arrow file directly with Pandas using pandas.read_feather(), which accepts Arrow IPC files directly because Feather v2 and Arrow IPC share the same binary format.

import pandas as pd

df = pd.read_feather("memory-monitor_0.arrow", dtype_backend="pyarrow")

This is convenient for short recordings, but it calls PyArrow internally and loads the entire file into RAM regardless of how large it is. It also does not expose sample-index access, so there is no way to read only a portion of the recording without first loading the whole file. For large or long recordings, the memory-mapped approach described previously is preferable.

Exporting Arrow data to NumPy

Individual columns can be extracted as NumPy arrays using .to_numpy(). For large recordings, pass start and end indices to load_arrow_file() (as shown previously) to avoid loading the entire file into RAM at once.

from load_arrow import load_arrow_file

table = load_arrow_file("memory-monitor_0.arrow")

percent_used = table["PercentUsed"].to_numpy()
clock = table["Clock"].to_numpy()

Plotting data from Arrow files

The examples in this section also require matplotlib (e.g. pip install matplotlib). To display figures interactively, matplotlib needs a GUI backend. In Jupyter notebooks this is handled automatically. In a plain Python script, plt.show() will open a figure window if a compatible GUI toolkit (e.g. Tk, Qt, or wxPython) is installed. See the matplotlib backend documentation if figures do not appear.

This section shows how to plot data saved from a MemoryMonitorData operator. The MemoryMonitor schema contains columns including Clock, PercentUsed, and BytesUsed. The Clock column records the acquisition clock counter value and must be divided by the acquisition clock rate convert its value to seconds.

Reading the acquisition clock rate

The example workflow shown above writes acquisition metadata, including the clock rate, to a start-time_<suffix>.csv file each time it runs. Load it with NumPy before plotting:

import numpy as np

dt = {'names': ('time', 'acq_clk_hz', 'block_read_sz', 'block_write_sz'),
      'formats': ('datetime64[us]', 'u4', 'u4', 'u4')}
meta = np.genfromtxt("start-time_0.csv", delimiter=',', dtype=dt)
acq_clk_hz = meta['acq_clk_hz']

Plotting directly from a PyArrow table

PyArrow column arrays implement the Python array protocol, so most plotting libraries, including Matplotlib, can consume them directly without an explicit conversion step. The Clock column is an exception: arithmetic operations such as dividing by the clock rate require a call to .to_numpy() first to produce a NumPy array.

import pyarrow as pa
import numpy as np
import matplotlib.pyplot as plt
from load_arrow import load_arrow_file

dt = {'names': ('time', 'acq_clk_hz', 'block_read_sz', 'block_write_sz'),
      'formats': ('datetime64[us]', 'u4', 'u4', 'u4')}
meta = np.genfromtxt("start-time_0.csv", delimiter=',', dtype=dt)
acq_clk_hz = meta['acq_clk_hz']

table = load_arrow_file("memory-monitor_0.arrow")

time_s = table["Clock"].to_numpy() / acq_clk_hz

plt.figure()
plt.plot(time_s, table["PercentUsed"])
plt.xlabel("Time (s)")
plt.ylabel("FIFO used (%)")
plt.title("Hardware Buffer Usage")
plt.show()

Plotting with pandas

Converting the Arrow table to a pandas DataFrame gives access to pandas's .plot() method and column-based indexing, which can make exploratory analysis more concise. The pandas workflow for the memory monitor data looks like this:

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from load_arrow import load_arrow_file

dt = {'names': ('time', 'acq_clk_hz', 'block_read_sz', 'block_write_sz'),
      'formats': ('datetime64[us]', 'u4', 'u4', 'u4')}
meta = np.genfromtxt("start-time_0.csv", delimiter=',', dtype=dt)
acq_clk_hz = meta['acq_clk_hz']

table = load_arrow_file("memory-monitor_0.arrow")
df = table.to_pandas()

df["time_s"] = df["Clock"] / acq_clk_hz

fig, axes = plt.subplots(2, 1, sharex=True)

df.plot(
  ax=axes[0],
  x="time_s",
  y="PercentUsed",
  ylabel="FIFO used (%)",
  legend=False)

df.plot(
  ax=axes[1],
  x="time_s",
  y="BytesUsed",
  xlabel="Time (s)",
  ylabel="Bytes used",
  legend=False)

plt.tight_layout()
plt.show()
Important

Calling .to_pandas() can copy the entire table into RAM, potentially exceeding memory usage limits. For short recordings this is a convenient workflow, but for large files on memory-limited machines, prefer working directly with the PyArrow table as shown above.

Working with subsampled data

Some ONIX device data frames contain data from streams that are acquired at different rates. For example, NeuropixelsV1DataFrame combines AP-band spike data sampled at 30 kHz and LFP-band data sampled at 2.5 kHz. Because all columns share the row count of the faster AP-band stream, each LFP value repeats 12 rows in the loaded data. The following script extracts only the unique LFP samples and their corresponding Clock values from the loaded data.

import numpy as np
from load_arrow import load_arrow_file

table = load_arrow_file("npix-v1e-lfp_0.arrow")

data_cols = [name for name in table.schema.names if "LfpData" in name]
data = np.column_stack([table[col].to_numpy() for col in data_cols])
clock = table["Clock"].to_numpy()

stride = 12  # 30 kHz primary rate / 2.5 kHz LFP rate for NeuropixelsV1

mask = np.zeros(len(clock), dtype=bool)
mask[::stride] = True

data_unique = data[mask]    # shape: (num_unique_lfp_samples, num_channels)
clock_unique = clock[mask]  # acquisition clock counts at each unique LFP sample

Divide clock_unique by the acquisition clock rate to convert clock counts to seconds. See Reading the acquisition clock rate for how to load that value from the metadata CSV file.

See the Subsampled data section for a more detailed explanation.

Advanced Arrow Topics

What is the Apache Arrow file format?

Apache Arrow files organize data in a column-oriented layout optimized for operations typical in time-series analysis, such as filtering, grouping, and aggregation. Concretely, samples from a single data source, e.g. a single electrophysiology channel, are stored next to each other on disk. This means an analysis tool can read just the channels it needs without first rearranging or copying the contents of the file after it has been loaded into memory. Additionally, each file is self-describing: it opens with a schema that declares every column's name and data type, followed by a sequence of record batches. Each record batch is a group of rows in which each column's values are stored as a contiguous array. Unlike plain text data formats (e.g. CSV files produced by CsvWriter) or flat binary files (e.g. files produced by MatrixWriter), Arrow files contain data type information (e.g., 16-bit integers, 64-bit floating point numbers, etc.), and do not require a separate metadata file or prior knowledge of the data layout in order to be loaded correctly.

Note

To convert from Arrow files to a format similar to the output of CsvWriter or MatrixWriter, check out this section for details on how to convert the data into other formats.

DataFrameWriter Batching and Flush Behavior

DataFrameWriter accumulates incoming frames in memory and writes them to disk as Arrow record batches. For periodic data sources (e.g. Neuropixels or Intan Chips), each batch holds approximately one second of data. For aperiodic or bursty sources (e.g. Digital Inputs), batch sizes vary; a flush is triggered after at most five seconds regardless of how many samples have accumulated. This design balances read and write performance, file size, and worst-case data loss in the event of a catastrophic failure such as a power outage.

Compression

When EnableCompression is true, DataFrameWriter compresses each record batch using the Zstandard codec before writing it to disk. Zstandard is an open-source general-purpose algorithm that balances compression ratio and speed. Since compression runs on the same machine that is acquiring data, it competes with the rest of the acquisition pipeline for CPU resources. For experiments that combine high-bandwidth hardware configurations (e.g., multiple Neuropixels probes) or computationally intensive workflows with long recordings or low-latency (e.g., sub-millisecond) closed-loop feedback, benchmark the system with compression enabled.

Tip
  • To evaluate the impact of compression on your specific setup, compare the data from a MemoryMonitorData operator to examine the state of the hardware buffer when EnableCompression is set to true and when it is set to false. If the PercentUsed value remains near zero in both cases, compression does not risk buffer overflow and might not be impacting the real-time performance. Refer to the Tuning closed-loop performance tutorial for more information.
  • Enabling the Buffered property can alleviate the CPU overhead by offloading compression to a background thread.

Subsampled data

When DataFrameWriter writes frames that contain streams acquired at different rates, all columns in the resulting Arrow file share the same number of rows, set by the primary (faster) rate. Each subsampled value therefore occupies stride consecutive rows. For instance, each 2.5 kHz LFP sample from a NeuropixelsV1DataFrame appears in 12 consecutive rows of the file to match the 30 kHz spike data sample rate. This is a form of run-length encoding. Calling .to_numpy() on a subsampled column returns an array in which every distinct value is repeated stride times. To recover only unique values, the Working with subsampled data script applies a boolean mask with True at every stride-th index. This is an example of strided indexing, which selects only the rows that contain new samples. The sample rate for each device is listed on the relevant data frame API page (e.g., NeuropixelsV1DataFrame), from which the stride can be calculated.

Manually Loading Arrow Files

The scripts in the loading section abstract the details of Arrow file access. This section covers lower-level Arrow API usage for cases where direct control is needed or the provided scripts do not meet specific requirements.

Memory-mapped loading

The most efficient way to read Arrow files in Python is to open the file as a memory map and pass it to pyarrow.ipc.open_file(). With this approach, PyArrow maps the file into the process's virtual address space and reads data on demand rather than copying the entire file into RAM upfront. For large recordings, this is significantly more memory-efficient than loading everything at once.

import pyarrow as pa

with pa.memory_map("memory-monitor_0.arrow", "r") as source:
    with pa.ipc.open_file(source) as reader:
        print(f"Schema: {reader.schema}")
        print(f"Number of record batches: {reader.num_record_batches}")
        table = reader.read_all()

Individual record batches can be read one at a time, which is useful when you only need a subset of the data or when the full recording does not fit in available RAM:

import pyarrow as pa

with pa.memory_map("memory-monitor_0.arrow", "r") as source:
    with pa.ipc.open_file(source) as reader:
        for i in range(reader.num_record_batches):
            batch = reader.get_batch(i)
            # Process individual batch

It is also possible to load a specific range of record batches into a table to analyze a fixed time window without loading the full recording. In the following snippet, the first 10 record batches are loaded.

import pyarrow as pa

with pa.memory_map("memory-monitor_0.arrow", "r") as source:
    with pa.ipc.open_file(source) as reader:
        indices = range(10) # Make sure the range does not exceed reader.num_record_batches
        table = pa.Table.from_batches(reader.get_batch(i) for i in indices)

Loading and recovering corrupt files

If a power outage or other unforeseen event occurs during recording and leaves the file in a state where it cannot be opened by the example scripts above, the following scripts can be used to recover a file that has closed exceptionally.

Note

Data durability was a first class requirement when selecting the Arrow file format. Worst case data loss is a single record batch. For high-bandwidth sources like Neuropixels, DataFrameWriter produces record batches that are 1 second in duration. For low-bandwidth or aperiodic sources, it flushes its input buffer to disk every 5 seconds. Data written before any interruption (unhandled exception, out-of-memory condition, power outage, etc.) can always be recovered.

If recording is interrupted, the file may be missing the footer that Arrow uses to index record batches, causing PyArrow to raise ArrowInvalid: Not an Arrow file when you try to open it. This script works around the missing footer by opening the file as an Arrow Stream rather than an Arrow File. The Stream format reads record batches sequentially without relying on the footer, so all batches written before the interruption can still be recovered. The recovered batches are then written to a new file, which automatically generates a valid footer on close.

import pyarrow as pa

input_path = r"./path/to/corrupted_file.arrow"
output_path = r"./path/to/fixed_file.arrow"

with pa.memory_map(input_path, 'r') as f:
    magic = f.read(8)
    if magic != b'ARROW1\x00\x00':
        raise ValueError('Not an Arrow file.')

    with pa.ipc.open_stream(f) as reader:
        schema = reader.schema
        num_batches = 0

        with pa.ipc.new_file(output_path, schema) as writer:
            while True:
                try:
                    batch = reader.read_next_batch()
                    writer.write_batch(batch)
                    num_batches += 1
                except StopIteration:
                    print(f"Read {num_batches} batches from corrupted file.")
                    break
                except (pa.ArrowInvalid, OSError) as e:
                    print(f"Stopped reading at batch {num_batches}: {e}")
                    break

    print(f"Recovered {num_batches} batches from {input_path}, saved to {output_path}")

Recovering an Arrow file with corrupted batches

This script reads an Arrow file with an intact footer that has been corrupted in some other way (invalid buffers, corrupted headers, etc.) and writes all valid batches to a new file. One error that indicates the buffers or headers have been corrupted is ArrowInvalid: Unexpected empty message in IPC file format.

Warning

This script will discard any batches that have an error without attempting to correct the error, leading to skips in the data. The Clock column can be inspected afterward to identify gaps where batches were skipped.

import pyarrow as pa

input_path = r"./path/to/corrupted_file.arrow"
output_path = r"./path/to/fixed_file.arrow"

with pa.memory_map(input_path, 'r') as f:
    magic = f.read(8)
    if magic != b'ARROW1\x00\x00':
        raise ValueError('Not an Arrow file.')

    with pa.ipc.open_file(f) as reader:
        schema = reader.schema
        num_batches = 0

        with pa.ipc.new_file(output_path, schema) as writer:
            for i in range(reader.num_record_batches):
                try:
                    batch = reader.get_batch(i)
                    writer.write_batch(batch)
                    num_batches += 1
                except (pa.ArrowInvalid, OSError) as e:
                    print(f"Skipped batch {i}: {e}")

        print(f"Recovered {num_batches} out of {reader.num_record_batches} batches from {input_path}, saved to {output_path}")

Handling compressed data

If the corrupt data file was originally saved with compression and you want to re-save the recovered data with compression, pass an IpcWriteOptions object to pa.ipc.new_file(). The example below applies Zstandard compression, which is the same algorithm used by DataFrameWriter when EnableCompression is true. The change is the same for both recovery scripts above; this example uses the invalid footer script:

import pyarrow as pa

input_path = r"./path/to/corrupted_file.arrow"
output_path = r"./path/to/fixed_file.arrow"

with pa.memory_map(input_path, 'r') as f:
    magic = f.read(8)
    if magic != b'ARROW1\x00\x00':
        raise ValueError('Not an Arrow file.')

    with pa.ipc.open_stream(f) as reader:
        schema = reader.schema
        num_batches = 0

        options = pa.ipc.IpcWriteOptions(compression="zstd")

        with pa.ipc.new_file(output_path, schema, options=options) as writer:
            while True:
                try:
                    batch = reader.read_next_batch()
                    writer.write_batch(batch)
                    num_batches += 1
                except StopIteration:
                    print(f"Read {num_batches} batches from corrupted file.")
                    break
                except (pa.ArrowInvalid, OSError) as e:
                    print(f"Stopped reading at batch {num_batches}: {e}")
                    break

    print(f"Recovered {num_batches} batches from {input_path}, saved to {output_path}")