Using DataFrameWriter to Save Data
The DataFrameWriter operator provides an
easy and efficient way to write ONIX data to disk using
the Apache Arrow file format.
Apache Arrow stores data in a column-oriented binary layout where values from
each channel or signal are packed contiguously on disk. Each file also embeds a
schema containing column names, data types, and structure, so no separate
metadata file is needed to read it correctly. This tutorial explains how to use
DataFrameWriter in an acquisition workflow, configure its properties
(including optional compression), and efficiently load the resulting Arrow files
in Python.
The first sections walk through how to save data using the DataFrameWriter
operator and then load and plot the data DataFrameWriter using Python. More
detailed explanations are covered in the Advanced
section at the end of this article.
Note
Arrow is supported by many scientific computing environments. For instance:
Using DataFrameWriter
Before starting this tutorial, complete the following steps from the Getting Started guide:
- Install or update the required Bonsai packages,
including the latest
OpenEphys.Onix1package. - Complete the data acquisition quick start guide for your hardware.
Setting up your workflow
To follow along with this tutorial, you need a workflow that contains at least
one DataFrameWriter operator. DataFrameWriter is a
Data Sink Operators operator which accepts any data stream producing
DataFrame or BufferedDataFrame
elements, meaning it can be placed downstream of most data source
nodes. You can use the example workflow below which saves data
from several data sources on the Breakout Board. The scripts on this page assume
you are loading data from this workflow. However, you can also add a
DataFrameWriter node to a workflow and adapt the scripts according to your
needs to follow along.
Configuring DataFrameWriter
DataFrameWriter exposes the following properties in the Editor's property
panel:
FileName The path of the output file, including the
.arrowextension (e.g.,data/memory-monitor.arrow). Any intermediate directories in the path are created automatically if they do not already exist.Suffix A modifier inserted into the file name just before the extension each time the workflow starts. Choose one of three options:
None(default): No suffix is added. If a file at the specified path already exists andOverwriteisfalse, the workflow raises an error on startup.FileCount: Appends the count of files already in the same directory with the same base name and extension (0,1,2, …). Use this to automatically number successive recordings without having to rename the node before each run. Adding an underscore at the end of theFileName, but before the extension, separates the suffix from the filename (e.g.,data/memory-monitor_.arrow).Timestamp: Appends an underscore followed by a high-resolution system timestamp at the moment the file is created (ISO 8601 format), guaranteeing a unique file name for every run.
Buffered (default:
true) Whentrue, incoming frames are placed in a memory queue and written to disk by a background thread, preventing disk I/O latency from blocking the acquisition thread. Setting this tofalsewrites synchronously on the acquisition thread, which is not recommended for high-bandwidth data streams or when low latency feedback is required.Overwrite (default:
false) Whentrue, an existing file at the resolved path is silently replaced when the workflow starts. Whenfalse, the workflow raises an error if the file already exists. This setting has no practical effect whenSuffixisFileCountorTimestamp, because those modes always produce a unique file name.EnableCompression (default:
false) Whentrue, data is compressed before being written to disk to reduce file size at the cost of CPU overhead. See Compression in the Advanced section for more information on compression and when to use it.
Running the workflow
Start the workflow.
DataFrameWriter will write data to the file specified in FileName as the
workflow runs, and will stop when the workflow stops.
Loading Arrow files in Python
In Python, Arrow files can be read using
PyArrow. Several
scientific analysis libraries such as pandas,
numpy and Polars use PyArrow
internally. This section demonstrates file loading using both PyArrow and
Pandas. Install both packages using your preferred method before proceeding
(e.g. pip install pyarrow pandas).
The following script provides a single public function, load_arrow_file, which
loads data from an Arrow file efficiently using
zero-copy memory mapping. The
optional start, end, and columns parameters allow loading only the rows
and columns you need without reading the full file into memory. Place it in the
same directory as your other processing scripts.
load_arrow.py⬇ Download
from dataclasses import dataclass
from typing import List, Tuple, Optional, Union
from pathlib import Path
import struct
import pyarrow as pa
import io
@dataclass(frozen=True)
class _BatchInfo:
index: int
start: int
end: int
num_rows: int
def _read_batch_metadata(path: str | Path, end: Optional[int]) -> List[_BatchInfo]:
MAGIC = b"ARROW1"
MAGIC_LEN = len(MAGIC)
FOOTER_SIZE_LEN = 4
CONTINUATION_MARKER = b"\xff\xff\xff\xff"
from flatbuffers import encode
import flatbuffers.number_types as N
with pa.memory_map(str(path), 'r') as f:
assert f.read(MAGIC_LEN) == MAGIC, "Not a valid Arrow IPC file"
f.seek(-(FOOTER_SIZE_LEN + MAGIC_LEN), io.SEEK_END)
footer_size = struct.unpack("<i", f.read(FOOTER_SIZE_LEN))[0]
assert f.read(MAGIC_LEN) == MAGIC, "Invalid end magic"
f.seek(-(FOOTER_SIZE_LEN + MAGIC_LEN + footer_size), 2)
footer_buf = bytearray(f.read(footer_size))
root_offset = encode.Get(N.UOffsetTFlags.packer_type, footer_buf, 0)
tab_start = root_offset
vtable_offset = tab_start - encode.Get(N.SOffsetTFlags.packer_type, footer_buf, tab_start)
vtable_len = encode.Get(N.VOffsetTFlags.packer_type, footer_buf, vtable_offset)
def field_offset(buf, field_index, vtab_len, vtab_off):
slot = 4 + field_index * 2
if slot >= vtab_len:
return 0
return encode.Get(N.VOffsetTFlags.packer_type, buf, vtab_off + slot)
rb_field_offset = field_offset(footer_buf, 3, vtable_len, vtable_offset)
if rb_field_offset == 0:
return []
vec_offset_pos = tab_start + rb_field_offset
vec_offset = vec_offset_pos + encode.Get(N.UOffsetTFlags.packer_type, footer_buf, vec_offset_pos)
num_blocks = encode.Get(N.UOffsetTFlags.packer_type, footer_buf, vec_offset)
batch_info: List[_BatchInfo] = []
row_offset = 0
with open(path, "rb") as f:
for i in range(num_blocks):
block_start = vec_offset + 4 + i * 24
block_offset = struct.unpack_from("<q", footer_buf, block_start)[0]
f.seek(block_offset)
marker = f.read(len(CONTINUATION_MARKER))
meta_size = struct.unpack("<i", f.read(4) if marker == CONTINUATION_MARKER else marker)[0]
msg_buf = bytearray(f.read(meta_size))
msg_root = encode.Get(N.UOffsetTFlags.packer_type, msg_buf, 0)
msg_vtab_off = msg_root - encode.Get(N.SOffsetTFlags.packer_type, msg_buf, msg_root)
msg_vtab_len = encode.Get(N.VOffsetTFlags.packer_type, msg_buf, msg_vtab_off)
header_off = field_offset(msg_buf, 2, msg_vtab_len, msg_vtab_off)
if header_off == 0:
raise ValueError("Could not find a valid header offset in Arrow file.")
header_pos = msg_root + header_off
rb_root = header_pos + encode.Get(N.UOffsetTFlags.packer_type, msg_buf, header_pos)
rb_vtab_off = rb_root - encode.Get(N.SOffsetTFlags.packer_type, msg_buf, rb_root)
rb_vtab_len = encode.Get(N.VOffsetTFlags.packer_type, msg_buf, rb_vtab_off)
length_off = field_offset(msg_buf, 0, rb_vtab_len, rb_vtab_off)
if length_off == 0:
raise ValueError("Found batch with no rows. Verify file integrity.")
num_rows = struct.unpack_from("<q", msg_buf, rb_root + length_off)[0]
batch_info.append(_BatchInfo(index=i, start=row_offset, end=row_offset + num_rows - 1, num_rows=num_rows))
row_offset += num_rows
return batch_info
def _slice_indices(batch_info: List[_BatchInfo], start: int, end: int) -> Tuple[List[int], int, int]:
if start < 0 or end > batch_info[-1].end or start > end:
raise IndexError(f"Range [{start}, {end}] is out of bounds. Valid range: [0, {batch_info[-1].end}]")
if start == 0 and end == batch_info[-1].end:
return list(range(len(batch_info))), 0, batch_info[-1].num_rows
batch_indices = []
first_idx = last_idx = None
for i, batch in enumerate(batch_info):
if batch.end < start:
continue
batch_indices.append(batch.index)
if first_idx is None:
first_idx = i
if batch.end >= end:
last_idx = i
break
first_offset = 0 if first_idx is None else start - batch_info[first_idx].start
last_offset = (batch_info[batch_indices[-1]].num_rows if last_idx is None
else end - batch_info[last_idx].start)
return batch_indices, first_offset, last_offset
def load_arrow_file(
filepath: Union[str, Path],
start: Optional[int] = None,
end: Optional[int] = None,
columns: Optional[List[str]] = None
) -> pa.Table:
"""
Load rows [start, end) from an Arrow IPC file as a PyArrow Table.
Args:
filepath: Path to the .arrow file.
start: First row index to include. Defaults to 0.
end: Last row index to include. Defaults to all rows.
columns: Column names to return. Defaults to all columns.
Returns:
PyArrow.Table containing the requested rows and columns.
"""
filepath = Path(filepath)
start = start or 0
batches = _read_batch_metadata(filepath, end)
if not batches:
raise ValueError(f"No valid batches found in {filepath}")
end = end if end is not None else batches[-1].end
batch_indices, first_offset, last_offset = _slice_indices(batches, start, end)
source = pa.memory_map(str(filepath), 'r')
reader = pa.ipc.open_file(source)
if not batch_indices:
schema = reader.schema
if columns:
schema = pa.schema([f for f in schema if f.name in columns])
return pa.Table.from_arrays([], schema=schema)
def _batches():
for i, batch_idx in enumerate(batch_indices):
batch = reader.get_batch(batch_idx)
if columns is not None:
batch = batch.select(columns)
if i == 0 or i == len(batch_indices) - 1:
s = first_offset if i == 0 else 0
e = last_offset if i == len(batch_indices) - 1 else batch.num_rows
batch = batch.slice(s, e - s)
yield batch
return pa.Table.from_batches(_batches())
Loading Arrow files
To load the full Arrow file, simply call load_arrow_file with a string
pointing to the file; this can be an absolute file path or a relative file path.
The optional start and end parameters are integers that specify the first
and last row indices (0-based) to read from the file. The optional columns
parameter is a list of strings that restricts loading to specific channels by
name. Any combination of these parameters can be used together to avoid loading
the full file into memory, which can reduce load time significantly for large
recordings.
from load_arrow import load_arrow_file
table = load_arrow_file("memory-monitor_0.arrow", start=100, end=5000, columns=['Clock', 'PercentUsed'])
If start or end is outside the valid row index range, an IndexError is
raised indicating what the valid range is. If any string in columns does not
match a column name in the file, a KeyError is raised.
Note
Compressed and uncompressed files are loaded identically. PyArrow reads the compression metadata in each record batch header and decompresses automatically when necessary. This decompression process can incur CPU overhead that extends the amount of time it takes to load a file.
Converting to other formats
PyArrow can convert Arrow tables into several other formats for use with different libraries and workflows.
Converting to a pandas DataFrame
If your analysis uses pandas, you can convert an Arrow Table to a DataFrame by
calling .to_pandas():
from load_arrow import load_arrow_file
table = load_arrow_file("memory-monitor_0.arrow")
df = table.to_pandas()
Important
Calling .to_pandas(), or any other method that converts the table to a
pandas DataFrame, might copy the entire dataset into RAM. Pandas and PyArrow
can, in certain limited circumstances, maintain memory mapping across
boundaries, but it is not guaranteed. Keep this in mind when working with long
recordings on machines with limited memory.
Using pandas directly
It is also possible to load an Arrow file directly with Pandas using
pandas.read_feather(),
which accepts Arrow IPC files directly because Feather
v2 and
Arrow IPC share the same binary format.
import pandas as pd
df = pd.read_feather("memory-monitor_0.arrow", dtype_backend="pyarrow")
This is convenient for short recordings, but it calls PyArrow internally and loads the entire file into RAM regardless of how large it is. It also does not expose sample-index access, so there is no way to read only a portion of the recording without first loading the whole file. For large or long recordings, the memory-mapped approach described previously is preferable.
Exporting Arrow data to NumPy
Individual columns can be extracted as NumPy arrays using .to_numpy(). For
large recordings, pass start and end indices to load_arrow_file() (as shown
previously) to avoid loading the entire file into RAM at
once.
from load_arrow import load_arrow_file
table = load_arrow_file("memory-monitor_0.arrow")
percent_used = table["PercentUsed"].to_numpy()
clock = table["Clock"].to_numpy()
Plotting data from Arrow files
The examples in this section also require matplotlib (e.g. pip install matplotlib). To display figures interactively, matplotlib needs a GUI backend.
In Jupyter notebooks this is handled automatically. In a plain Python script,
plt.show() will open a figure window if a compatible GUI toolkit (e.g. Tk, Qt,
or wxPython) is installed. See the matplotlib backend
documentation
if figures do not appear.
This section shows how to plot data saved from a
MemoryMonitorData operator. The MemoryMonitor schema
contains columns including Clock, PercentUsed, and BytesUsed. The Clock
column records the acquisition clock counter value and must be divided by the
acquisition clock rate convert its value to seconds.
Reading the acquisition clock rate
The example workflow shown above writes
acquisition metadata, including the clock rate, to a start-time_<suffix>.csv
file each time it runs. Load it with NumPy before plotting:
import numpy as np
dt = {'names': ('time', 'acq_clk_hz', 'block_read_sz', 'block_write_sz'),
'formats': ('datetime64[us]', 'u4', 'u4', 'u4')}
meta = np.genfromtxt("start-time_0.csv", delimiter=',', dtype=dt)
acq_clk_hz = meta['acq_clk_hz']
Plotting directly from a PyArrow table
PyArrow column arrays implement the Python array
protocol, so most plotting
libraries, including Matplotlib, can consume them directly without an explicit
conversion step. The Clock column is an exception: arithmetic operations such
as dividing by the clock rate require a call to .to_numpy() first to produce a
NumPy array.
import pyarrow as pa
import numpy as np
import matplotlib.pyplot as plt
from load_arrow import load_arrow_file
dt = {'names': ('time', 'acq_clk_hz', 'block_read_sz', 'block_write_sz'),
'formats': ('datetime64[us]', 'u4', 'u4', 'u4')}
meta = np.genfromtxt("start-time_0.csv", delimiter=',', dtype=dt)
acq_clk_hz = meta['acq_clk_hz']
table = load_arrow_file("memory-monitor_0.arrow")
time_s = table["Clock"].to_numpy() / acq_clk_hz
plt.figure()
plt.plot(time_s, table["PercentUsed"])
plt.xlabel("Time (s)")
plt.ylabel("FIFO used (%)")
plt.title("Hardware Buffer Usage")
plt.show()
Plotting with pandas
Converting the Arrow table to a pandas DataFrame gives access to pandas's
.plot() method and column-based indexing, which can make exploratory analysis
more concise. The pandas workflow for the memory monitor data looks like this:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from load_arrow import load_arrow_file
dt = {'names': ('time', 'acq_clk_hz', 'block_read_sz', 'block_write_sz'),
'formats': ('datetime64[us]', 'u4', 'u4', 'u4')}
meta = np.genfromtxt("start-time_0.csv", delimiter=',', dtype=dt)
acq_clk_hz = meta['acq_clk_hz']
table = load_arrow_file("memory-monitor_0.arrow")
df = table.to_pandas()
df["time_s"] = df["Clock"] / acq_clk_hz
fig, axes = plt.subplots(2, 1, sharex=True)
df.plot(
ax=axes[0],
x="time_s",
y="PercentUsed",
ylabel="FIFO used (%)",
legend=False)
df.plot(
ax=axes[1],
x="time_s",
y="BytesUsed",
xlabel="Time (s)",
ylabel="Bytes used",
legend=False)
plt.tight_layout()
plt.show()
Important
Calling .to_pandas() can copy the entire table into RAM, potentially
exceeding memory usage limits. For short recordings this is a convenient
workflow, but for large files on memory-limited machines, prefer working
directly with the PyArrow table as shown above.
Working with subsampled data
Some ONIX device data frames contain data from streams that are acquired at
different rates. For example, NeuropixelsV1DataFrame
combines AP-band spike data sampled at 30 kHz and LFP-band data sampled at 2.5
kHz. Because all columns share the row count of the faster AP-band stream, each
LFP value repeats 12 rows in the loaded data. The following script extracts only
the unique LFP samples and their corresponding Clock values from the loaded
data.
import numpy as np
from load_arrow import load_arrow_file
table = load_arrow_file("npix-v1e-lfp_0.arrow")
data_cols = [name for name in table.schema.names if "LfpData" in name]
data = np.column_stack([table[col].to_numpy() for col in data_cols])
clock = table["Clock"].to_numpy()
stride = 12 # 30 kHz primary rate / 2.5 kHz LFP rate for NeuropixelsV1
mask = np.zeros(len(clock), dtype=bool)
mask[::stride] = True
data_unique = data[mask] # shape: (num_unique_lfp_samples, num_channels)
clock_unique = clock[mask] # acquisition clock counts at each unique LFP sample
Divide clock_unique by the acquisition clock rate to convert clock counts to
seconds. See Reading the acquisition clock
rate for how to load that value from the
metadata CSV file.
See the Subsampled data section for a more detailed explanation.
Advanced Arrow Topics
What is the Apache Arrow file format?
Apache Arrow files organize data in a column-oriented layout optimized for operations typical in time-series analysis, such as filtering, grouping, and aggregation. Concretely, samples from a single data source, e.g. a single electrophysiology channel, are stored next to each other on disk. This means an analysis tool can read just the channels it needs without first rearranging or copying the contents of the file after it has been loaded into memory. Additionally, each file is self-describing: it opens with a schema that declares every column's name and data type, followed by a sequence of record batches. Each record batch is a group of rows in which each column's values are stored as a contiguous array. Unlike plain text data formats (e.g. CSV files produced by CsvWriter) or flat binary files (e.g. files produced by MatrixWriter), Arrow files contain data type information (e.g., 16-bit integers, 64-bit floating point numbers, etc.), and do not require a separate metadata file or prior knowledge of the data layout in order to be loaded correctly.
Note
To convert from Arrow files to a format similar to the output of CsvWriter
or MatrixWriter, check out this section for
details on how to convert the data into other formats.
DataFrameWriter Batching and Flush Behavior
DataFrameWriter accumulates incoming frames in memory and writes them to disk as Arrow record batches. For periodic data sources (e.g. Neuropixels or Intan Chips), each batch holds approximately one second of data. For aperiodic or bursty sources (e.g. Digital Inputs), batch sizes vary; a flush is triggered after at most five seconds regardless of how many samples have accumulated. This design balances read and write performance, file size, and worst-case data loss in the event of a catastrophic failure such as a power outage.
Compression
When EnableCompression is true, DataFrameWriter compresses each record
batch using the Zstandard codec before
writing it to disk. Zstandard is an open-source general-purpose algorithm that
balances compression ratio and speed. Since compression runs on the same machine
that is acquiring data, it competes with the rest of the acquisition pipeline
for CPU resources. For experiments that combine high-bandwidth hardware
configurations (e.g., multiple Neuropixels probes) or computationally intensive
workflows with long recordings or low-latency (e.g., sub-millisecond)
closed-loop feedback, benchmark the system with compression enabled.
Tip
- To evaluate the impact of compression on your specific setup, compare the
data from a MemoryMonitorData operator to examine the
state of the hardware buffer when
EnableCompressionis set totrueand when it is set tofalse. If thePercentUsedvalue remains near zero in both cases, compression does not risk buffer overflow and might not be impacting the real-time performance. Refer to the Tuning closed-loop performance tutorial for more information. - Enabling the
Bufferedproperty can alleviate the CPU overhead by offloading compression to a background thread.
Subsampled data
When DataFrameWriter writes frames that contain streams acquired at different
rates, all columns in the resulting Arrow file share the same number of rows,
set by the primary (faster) rate. Each subsampled value therefore occupies
stride consecutive rows. For instance, each 2.5 kHz LFP sample from a
NeuropixelsV1DataFrame appears in 12 consecutive rows of the file to match the
30 kHz spike data sample rate. This is a form of run-length
encoding. Calling
.to_numpy() on a subsampled column returns an array in which every distinct
value is repeated stride times. To recover only unique values, the Working
with subsampled data script applies a boolean
mask with True at every stride-th index. This is an example of strided
indexing, which selects only
the rows that contain new samples. The sample rate for each device is listed on
the relevant data frame API page (e.g.,
NeuropixelsV1DataFrame), from which the stride can be
calculated.
Manually Loading Arrow Files
The scripts in the loading section abstract the details of Arrow file access. This section covers lower-level Arrow API usage for cases where direct control is needed or the provided scripts do not meet specific requirements.
Memory-mapped loading
The most efficient way to read Arrow files in Python is to open the file as a
memory map and pass it to pyarrow.ipc.open_file(). With this approach, PyArrow
maps the file into the process's virtual address space and reads data on demand
rather than copying the entire file into RAM upfront. For large recordings, this
is significantly more memory-efficient than loading everything at once.
import pyarrow as pa
with pa.memory_map("memory-monitor_0.arrow", "r") as source:
with pa.ipc.open_file(source) as reader:
print(f"Schema: {reader.schema}")
print(f"Number of record batches: {reader.num_record_batches}")
table = reader.read_all()
Individual record batches can be read one at a time, which is useful when you only need a subset of the data or when the full recording does not fit in available RAM:
import pyarrow as pa
with pa.memory_map("memory-monitor_0.arrow", "r") as source:
with pa.ipc.open_file(source) as reader:
for i in range(reader.num_record_batches):
batch = reader.get_batch(i)
# Process individual batch
It is also possible to load a specific range of record batches into a table to analyze a fixed time window without loading the full recording. In the following snippet, the first 10 record batches are loaded.
import pyarrow as pa
with pa.memory_map("memory-monitor_0.arrow", "r") as source:
with pa.ipc.open_file(source) as reader:
indices = range(10) # Make sure the range does not exceed reader.num_record_batches
table = pa.Table.from_batches(reader.get_batch(i) for i in indices)
Loading and recovering corrupt files
If a power outage or other unforeseen event occurs during recording and leaves the file in a state where it cannot be opened by the example scripts above, the following scripts can be used to recover a file that has closed exceptionally.
Note
Data durability was a first class requirement when selecting the Arrow file
format. Worst case data loss is a single record batch. For high-bandwidth
sources like Neuropixels, DataFrameWriter produces record batches that are
1 second in duration. For low-bandwidth or aperiodic sources, it flushes its
input buffer to disk every 5 seconds. Data written before any interruption
(unhandled exception, out-of-memory condition, power outage, etc.) can always
be recovered.
Recovering an Arrow file with invalid footer
If recording is interrupted, the file may be missing the footer that Arrow uses
to index record batches, causing PyArrow to raise ArrowInvalid: Not an Arrow file when you try to open it. This script works around the missing footer by
opening the file as an Arrow Stream rather than an Arrow File. The Stream format
reads record batches sequentially without relying on the footer, so all batches
written before the interruption can still be recovered. The recovered batches
are then written to a new file, which automatically generates a valid footer on
close.
import pyarrow as pa
input_path = r"./path/to/corrupted_file.arrow"
output_path = r"./path/to/fixed_file.arrow"
with pa.memory_map(input_path, 'r') as f:
magic = f.read(8)
if magic != b'ARROW1\x00\x00':
raise ValueError('Not an Arrow file.')
with pa.ipc.open_stream(f) as reader:
schema = reader.schema
num_batches = 0
with pa.ipc.new_file(output_path, schema) as writer:
while True:
try:
batch = reader.read_next_batch()
writer.write_batch(batch)
num_batches += 1
except StopIteration:
print(f"Read {num_batches} batches from corrupted file.")
break
except (pa.ArrowInvalid, OSError) as e:
print(f"Stopped reading at batch {num_batches}: {e}")
break
print(f"Recovered {num_batches} batches from {input_path}, saved to {output_path}")
Recovering an Arrow file with corrupted batches
This script reads an Arrow file with an intact footer that has been corrupted in
some other way (invalid buffers, corrupted headers, etc.) and writes all valid
batches to a new file. One error that indicates the buffers or headers have been
corrupted is ArrowInvalid: Unexpected empty message in IPC file format.
Warning
This script will discard any batches that have an error without attempting to
correct the error, leading to skips in the data. The Clock column can be
inspected afterward to identify gaps where batches were skipped.
import pyarrow as pa
input_path = r"./path/to/corrupted_file.arrow"
output_path = r"./path/to/fixed_file.arrow"
with pa.memory_map(input_path, 'r') as f:
magic = f.read(8)
if magic != b'ARROW1\x00\x00':
raise ValueError('Not an Arrow file.')
with pa.ipc.open_file(f) as reader:
schema = reader.schema
num_batches = 0
with pa.ipc.new_file(output_path, schema) as writer:
for i in range(reader.num_record_batches):
try:
batch = reader.get_batch(i)
writer.write_batch(batch)
num_batches += 1
except (pa.ArrowInvalid, OSError) as e:
print(f"Skipped batch {i}: {e}")
print(f"Recovered {num_batches} out of {reader.num_record_batches} batches from {input_path}, saved to {output_path}")
Handling compressed data
If the corrupt data file was originally saved with compression and you want to
re-save the recovered data with compression, pass an IpcWriteOptions object to
pa.ipc.new_file(). The example below applies Zstandard compression, which is
the same algorithm used by DataFrameWriter when EnableCompression is true.
The change is the same for both recovery scripts above; this example uses the
invalid footer script:
import pyarrow as pa
input_path = r"./path/to/corrupted_file.arrow"
output_path = r"./path/to/fixed_file.arrow"
with pa.memory_map(input_path, 'r') as f:
magic = f.read(8)
if magic != b'ARROW1\x00\x00':
raise ValueError('Not an Arrow file.')
with pa.ipc.open_stream(f) as reader:
schema = reader.schema
num_batches = 0
options = pa.ipc.IpcWriteOptions(compression="zstd")
with pa.ipc.new_file(output_path, schema, options=options) as writer:
while True:
try:
batch = reader.read_next_batch()
writer.write_batch(batch)
num_batches += 1
except StopIteration:
print(f"Read {num_batches} batches from corrupted file.")
break
except (pa.ArrowInvalid, OSError) as e:
print(f"Stopped reading at batch {num_batches}: {e}")
break
print(f"Recovered {num_batches} batches from {input_path}, saved to {output_path}")