Pandas 内存优化

Pandas 是数据分析中常用的一个 Python 库。本文主要讲述 Pandas 内存优化。其中数据读入存储的部分参考资料 简单又实用的pandas技巧:如何将内存占用降低90%,该部分内容在网上有较多说明与介绍。本文更多篇幅放在 Pandas 的 DataFrame 类型相关测试与源码分析上。

问题描述

对数据分析师或算法工程师而言,pandas 是一个常用的 Python 库,经常用来进行数据读入,清洗等操作。因此,不可避免的,从文件中读入数据或将数据存入 DataFrame 类型的变量中进行处理。这里主要针对于 DataFrame 变量对象的优化,包含数据读入(即 DataFrame 类型变量初始化)及向 DataFrame 对象中添加新列或产生新变换后的 DataFrame。

解决思路

此处,优化内存使用了 pandas 库函数之外的 memory_profiler 库以监控 Python 进程的内存使用情况,可以分析到每一行代码增减的内存状况,也可以以时间为横轴监控 Python 进程的内存使用情况。此处主要以监控代码行。使用了 datetime 模块监控函数运行时间。

优化过程

该部分主要分为从文件中读入数据与数据的处理变换部分(即对应机器学习中的数据读入与数据预处理)。

数据读入

数据读入指将数据从文件读入内存的过程,一般载入内存存入到 DataFrame 类型变量中。因此,数据读入部分的内存优化主要为 DataFrame 存储方式的优化(原因可见 结论 && 分析 部分)。在读入时候传入数据对应类型,此处使用 DataFrame.info(memory_usage='deep') 方法打印与 DataFrame 类型相关的所有信息。通过该方法可以准确的获取其内存使用状况。

模拟数据

这里产生单列数据的函数有三个,分别对应 intfloatcategory 三种类型的特征。generate_data 函数最终产生的数据大小为 data_size,其中包含两列时间值,一列目标值(即特征维度至少为4),其余三种类型的特征约为 1:1:1 进行分配。最终产生 str_dfprecision_64_dfprecision_32_dfstr2category_df 四种 DataFrame 类型变量作为返回值,表示 DataFrame 中全部为 object 类型,数值型转为对应 64 位数值型,数值型降为对应类型最小存储单元,object 转为 category 类型依次降低其对内存的使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import numpy
import pandas
import time

def generate_int_feature(field_name, length=5000):
col_arr = numpy.random.randint(0, 10, size=[length])
return field_name, col_arr, "int64"

def generate_float_feature(field_name, length=5000):
col_arr = numpy.random.rand(length) * 100
return field_name, col_arr, "float64"

def generate_categorical_feature(field_name, length=5000, str_size=5):
category_names = ["a", "b", "c", "d", "e", "f", "g"]
category_names = [name*str_size for name in category_names]
category_nums = numpy.random.randint(2, 7)
col_arr = numpy.random.randint(0, category_nums, size=[length])
col_arr = [category_names[index] for index in col_arr]
return field_name, col_arr, "str"

def generate_data(data_size=(5000, 300), str_size=5):
"""
the normal data on business include times, numbers, categories...
:param data_size: the size of data
:str_size: category feature's size of content
"""
length, cols = data_size
# save the data description in order the analysis
data_describe = dict() # field_name: field_type
col_names = list()
col_arrs = list()
# generate data include 2-times, 1-target and other-features
# category features: number features = 1: 2
# generate the 2-times data
time_format = "%Y-%m-%d-%H" # Year-Month-Day-Hour

register_time = numpy.random.randint(
low=time.mktime((2000, 1, 1, 0, 0, 0, 0, 0, 0)),
high=time.mktime((2018, 1, 1, 0, 0, 0, 0, 0, 0)),
size=[length]).tolist()
register_time_str = [time.strftime(time_format, time.localtime(i)) for i in register_time]
col_names.append("register_time")
col_arrs.append(register_time_str)

birth_day = numpy.random.randint(
low=time.mktime((1900, 1, 1, 0, 0, 0, 0, 0, 0)),
high=time.mktime((2000, 1, 1, 0, 0, 0, 0, 0, 0)),
size=[length]).tolist()
birth_day_str = [time.strftime(time_format, time.localtime(i)) for i in birth_day]
col_names.append("birth_day")
col_arrs.append(birth_day_str)

# generate the 1-target data: 2-class
target_values = numpy.random.randint(0, 2, size=[length]).tolist()
col_names.append("target")
col_arrs.append(target_values)
data_describe["target"] = numpy.int64

# generate features: 1/3 category and 2/3 numbers
category_numbs = cols / 3
for i in range(int(category_numbs)):
field_name, field_value, field_type = generate_categorical_feature(
field_name="category_" + str(i), length=length, str_size=str_size)
col_names.append(field_name)
col_arrs.append(field_value)
data_describe[field_name] = field_type

numerical_numbers = cols - 3 - category_numbs
for i in range(int(numerical_numbers)):
if i % 2 == 0:
field_name, field_value, field_type = generate_int_feature(
field_name="int_" + str(i), length=length)
else:
field_name, field_value, field_type = generate_float_feature(
field_name="float_" + str(i), length=length)
col_names.append(field_name)
col_arrs.append(field_value)
data_describe[field_name] = field_type

str_df = pandas.DataFrame(data=dict(zip(col_names, col_arrs)),
dtype=str, columns=col_names)
precision_64_df = str_df.copy()
for field_name, field_type in data_describe.items():
precision_64_df[field_name] = precision_64_df[field_name].astype(
field_type)
precision_64_df["register_time"] = pandas.to_datetime(
precision_64_df["register_time"], format=time_format)
precision_64_df["birth_day"] = pandas.to_datetime(
precision_64_df["birth_day"], format=time_format)

precision_32_df = precision_64_df.copy()
int_names = precision_32_df.select_dtypes(include=['int']).columns.tolist()
precision_32_df.loc[:, int_names] = precision_32_df[int_names].apply(
pandas.to_numeric, downcast="unsigned")
float_names = precision_32_df.select_dtypes(include=["float"]).columns
precision_32_df.loc[:, float_names] = precision_32_df[float_names].apply(
pandas.to_numeric, downcast="float")

str2category_df = precision_32_df.copy()
str_names = str2category_df.select_dtypes(include=["object"]).columns.tolist()
for field_name in str_names:
if field_name in ("register_time", "birth_day"):
continue
str2category_df[field_name] = str2category_df[field_name].astype(
"category")
return str_df, precision_64_df, precision_32_df, str2category_df

测试验证

测试主要为内存使用大小的信息打印,打印 DataFrame 类型内存使用情况的函数与测试函数如下,其中 category 类型特征产生函数通过 str_size 控制产生的字符串长度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def memory_usage(df):
print(df.info(memory_usage='deep'))

def memory_test():
for i in range(1, 5):
print("==============================================================")
print("DataSize is %s * %s !" % (5000*i, 300*i))
result = generate_data(data_size=(5000*i, 300*i), str_size=20)
print("\nstr_df:")
memory_usage(result[0])
print("\nprecision_64_df:")
memory_usage(result[1])
print("\nprecision_32_df:")
memory_usage(result[2])
print("\nstr2category_df:")
memory_usage(result[3])
分别测试数据量为 (5000*300)、(10000*600)、(15000*900) 及 (20000*1200) 四种情况下字符串长度为 5, 10, 20 的内存使用情况。表 1 为对应尺寸下 DataFrame 类型变量内部存储情况,表 2 - 4 为内存使用大小。

表 1 DataFrame 类型变量内部存储情况
str_df precision_64_df precision_32_df str2category_df
(5000*300) object(300) datetime64[ns](2)
float64(98)
int64(100)
object(100)
datetime64[ns](2)
float32(98)
uint8(100)
object(100)
datetime64[ns](2)
float32(98)
uint8(100)
category(100)
(10000*600) object(600) datetime64[ns](2)
float64(198)
int64(200)
object(200)
datetime64[ns](2)
float32(198)
uint8(200)
object(200)
datetime64[ns](2)
float32(198)
uint8(200)
category(200)
(15000*900) object(900) datetime64[ns](2)
float64(298)
int64(300)
object(300)
datetime64[ns](2)
float32(298)
uint8(300)
object(300)
datetime64[ns](2)
float32(298)
uint8(300)
category(300)
(20000*1200) object(1200) datetime64[ns](2)
float64(398)
int64(400)
object(400)
datetime64[ns](2)
float32(398)
uint8(400)
object(400)
datetime64[ns](2)
float32(398)
uint8(400)
category(400)
表2 字符串长度为 5 的 DataFrame 类型变量内存使用情况
str_df precision_64_df precision_32_df str2category_df
(5000*300) 74.1 MB 48.6 MB 43.4 MB 2.9 MB
(10000*600) 294.7 MB 194.5 MB 173.6 MB 11.6 MB
(15000*900) 661.8 MB 437.7 MB 390.6 MB 26.0 MB
(20000*1200) 1.1 GB 778.2 MB 694.4 MB 46.1 MB
表3 字符串长度为 10 的 DataFrame 类型变量内存使用情况
str_df precision_64_df precision_32_df str2category_df
(5000*300) 86.1 MB 60.6 MB 55.4 MB 3.0 MB
(10000*600) 342.4 MB 242.2 MB 221.3 MB 11.6 MB
(15000*900) 769.1 MB 545.0 MB 497.9 MB 26.0 MB
(20000*1200) 1.3 GB 968.9 MB 885.2 MB 46.1 MB
表4 字符串长度为 20 的 DataFrame 类型变量内存使用情况
str_df precision_64_df precision_32_df str2category_df
(5000*300) 109.9 MB 84.4 MB 79.2 MB 3.0 MB
(10000*600) 437.8 MB 337.6 MB 316.7 MB 11.7 MB
(15000*900) 983.7 MB 759.6 MB 712.5 MB 26.1 MB
(20000*1200) 1.7 GB 1.3 GB 1.2 GB 46.2 MB

结论 && 分析

结论

由如上实验,我们可以得出如下结论:

  • 减少使用 objcet 类型而使用值类型,指定数据列类型如 int, float 等而不是使用 str 类型可以有效节省内存;
  • 根据数据精度要求,选择合适的类型存储,如 float64 -> float32int64 -> uint8object -> category
  • category 类型可以有效降低因字符串长度带来的内存增长。

因此,在数据读入时,根据先验知识填入对应字段的数据类型可以有效降低数据在内存中的大小,主要是 dtype (E.g. {'a': np.float64, 'b': np.int32} )及 date_parserparse_dates 三个参数。

pandas.read_csv 分析

参考 pandas 中的 read_csv 函数,位于 ”pandas/io/parsers.py“ 文件中。
在函数 _make_parser_function 中,先进行了参数的初步转换工作,如下所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def _make_parser_function(name, sep=','):

default_sep = sep

def parser_f(filepath_or_buffer,
sep=sep,
delimiter=None,
# ignore the others...
float_precision=None):

# Alias sep -> delimiter.
if delimiter is None:
delimiter = sep

if delim_whitespace and delimiter is not default_sep:
raise ValueError("Specified a delimiter with both sep and"
" delim_whitespace=True; you can only"
" specify one.")

if engine is not None:
engine_specified = True
else:
engine = 'c'
engine_specified = False

if skip_footer != 0:
warnings.warn("The 'skip_footer' argument has "
"been deprecated and will be removed "
"in a future version. Please use the "
"'skipfooter' argument instead.",
FutureWarning, stacklevel=2)

kwds = dict(delimiter=delimiter,
engine=engine,
dialect=dialect,
compression=compression,
engine_specified=engine_specified,
# ignore the others...
skip_blank_lines=skip_blank_lines)

return _read(filepath_or_buffer, kwds)

parser_f.__name__ = name
return parser_f

read_csv = _make_parser_function('read_csv', sep=',')
# 此处 Appender 与文档相关,对函数功能未产生影响
read_csv = Appender(_read_csv_doc)(read_csv)
_read 函数中,产生按行读文件的通用 reader,并进行读取文件操作,如下代码所示,最终进行文件读操作的为 TextFileReader。若以迭代方式分批读数据,直接返回该类型对象,否则调用 read 方法获取数据并返回。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def _read(filepath_or_buffer, kwds):
"""Generic reader of line files."""
encoding = kwds.get('encoding', None)
if encoding is not None:
encoding = re.sub('_', '-', encoding).lower()
kwds['encoding'] = encoding

# compression 参数确认使用的文件是否为压缩文件
# {‘infer’, ‘gzip’, ‘bz2’, ‘zip’, ‘xz’, None}, default ‘infer’
compression = kwds.get('compression')
compression = _infer_compression(filepath_or_buffer, compression)
filepath_or_buffer, _, compression = get_filepath_or_buffer(
filepath_or_buffer, encoding, compression)
kwds['compression'] = compression

if kwds.get('date_parser', None) is not None:
if isinstance(kwds['parse_dates'], bool):
kwds['parse_dates'] = True

# Extract some of the arguments (pass chunksize on).
iterator = kwds.get('iterator', False)
chunksize = _validate_integer('chunksize', kwds.get('chunksize', None), 1)
nrows = _validate_integer('nrows', kwds.get('nrows', None))

# Create the parser.
parser = TextFileReader(filepath_or_buffer, **kwds)

if chunksize or iterator:
return parser

try:
data = parser.read(nrows)
finally:
parser.close()
return data
TextFileReader 中数据读入如下所示。读入使用的分析引擎 engine 取值为 {'c', 'python'},C 引擎更快但 Python 引擎的功能更完备一些。此处发现对文件读取进行了更底层的封装,构造返回的 DataFrame 对象也是以 DataFrame(col_dict, columns=columns, index=index) 方式构造的。数据类型的转换操作发生在解析引擎的底层(若数据类型参数为 None 时会在底层进行尝试解析)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class TextFileReader(BaseIterator):
# ignore other methods...

def read(self, nrows=None):
if nrows is not None:
if self.options.get('skipfooter'):
raise ValueError('skipfooter not supported for iteration')

ret = self._engine.read(nrows)

if self.options.get('as_recarray'):
return ret

# May alter columns / col_dict
index, columns, col_dict = self._create_index(ret)

if index is None:
if col_dict:
# Any column is actually fine:
new_rows = len(compat.next(compat.itervalues(col_dict)))
index = RangeIndex(self._currow, self._currow + new_rows)
else:
new_rows = 0
else:
new_rows = len(index)

df = DataFrame(col_dict, columns=columns, index=index)
self._currow += new_rows
if self.squeeze and len(df.columns) == 1:
return df[df.columns[0]].copy()
return df

def __next__(self):
try:
return self.get_chunk()
except StopIteration:
self.close()
raise

def get_chunk(self, size=None):
if size is None:
size = self.chunksize
if self.nrows is not None:
if self._currow >= self.nrows:
raise StopIteration
size = min(size, self.nrows - self._currow)
return self.read(nrows=size)
如此处使用 cdef 定义的 TextReader 类中的 _convert_with_dtype 方法用来进行类型的转换。因此在调用该函数时直接传入对应的数据类型可以提升读入的性能。

因此,由上述代码可知,其初始化的 DataFrame 类型使用 dict 传入数据,因此分析 DataFrame 使用 dict 初始化的部分。

pandas.DataFrame 分析

参考 pandas 中的 DataFrame 类,位于 ”pandas/core/frame.py“ 文件中。
data 参数为 dict 类型时,其初始化函数可以简化如下。DataFrame 继承关系为:
DataFrame --> NDFrame --> PandasObject; SelectionMixin --> StringMixin --> Object; Object。在 NDFrame 中解释为 "N 维 DataFrame 的类似物,存储多维度的尺寸可变,有标记的数据结构"。其中 data 参数存储为 BlockManager 类型的数据(由上机器之心提到,BlockManager 类负责保留行列索引与实际块之间的映射关系,可以作为一个 API 提供了对底层数据的访问)。如下代码中,可以看到最终构建的为多个块对数据进行管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# pandas.core.frame.py
class DataFrame(NDFrame):
# ignore some methods for understanding better
def __init__(self, data=None, index=None, columns=None, dtype=None,
copy=False):
mgr = self._init_dict(data, index, columns, dtype=dtype)
NDFrame.__init__(self, mgr, fastpath=True)

def _init_dict(self, data, index, columns, dtype=None):
"""
Segregate Series based on type and coerce into matrices.
Needs to handle a lot of exceptional cases.
"""
if columns is not None:
columns = _ensure_index(columns)

# GH10856
# raise ValueError if only scalars in dict
if index is None:
extract_index(list(data.values()))

# prefilter if columns passed
data = dict((k, v) for k, v in compat.iteritems(data)
if k in columns)

if index is None:
index = extract_index(list(data.values()))
else:
index = _ensure_index(index)

arrays = []
data_names = []
for k in columns:
if k not in data:
# no obvious "empty" int column
if dtype is not None and issubclass(dtype.type,
np.integer):
continue

if dtype is None:
# 1783
v = np.empty(len(index), dtype=object)
elif np.issubdtype(dtype, np.flexible):
v = np.empty(len(index), dtype=object)
else:
v = np.empty(len(index), dtype=dtype)

v.fill(np.nan)
else:
v = data[k]
data_names.append(k)
arrays.append(v)
else:
keys = list(data.keys())
if not isinstance(data, OrderedDict):
keys = _try_sort(keys)
columns = data_names = Index(keys)
arrays = [data[k] for k in keys]

return _arrays_to_mgr(arrays, data_names, index, columns, dtype=dtype)

def _arrays_to_mgr(arrays, arr_names, index, columns, dtype=None):
"""
Segregate Series based on type and coerce into matrices.
Needs to handle a lot of exceptional cases.
"""
# figure out the index, if necessary
if index is None:
index = extract_index(arrays)
else:
index = _ensure_index(index)

# don't force copy because getting jammed in an ndarray anyway
arrays = _homogenize(arrays, index, dtype)

# from BlockManager perspective
axes = [_ensure_index(columns), _ensure_index(index)]

return create_block_manager_from_arrays(arrays, arr_names, axes)

# pandas.core.internals.py
def create_block_manager_from_arrays(arrays, names, axes):
try:
blocks = form_blocks(arrays, names, axes)
mgr = BlockManager(blocks, axes)
mgr._consolidate_inplace()
return mgr
except ValueError as e:
construction_error(len(arrays), arrays[0].shape, axes, e)

探究可知,DataFrame 在内存中存储为多个块的形式(每个块为 numpy.array)类型,通过 BlockManager 对块进行数据的增删改查操作。每种数据类型有其对应的类,如上 ObjectBlock 表示字符串块,FloatBlock 表示浮点数列块。这些块是连续存储的。因此通过降低单个数值存储的大小有效降低了其内存的使用。

数据预处理

数据分析中,将数据进行清洗变换得到一份新的数据以进行后续分析训练是常见且高频的操作,此处主要产生新数据的方式进行测试分析。

模拟预处理

此处代码主要模拟了两种预处理行为,一列变换为一列(如对数变换,归一化变换,异常值缺失值过滤等情况),一列变为多列(如 OneHot 编码,时间序列分解等)。为方便后续插入到 DataFrame 中,返回为 list 类型的列名与列变换后的值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import numpy

def generate_one(field_name, length=5000):
col_arr = numpy.random.randint(0, 10, size=[length])
return [field_name], [col_arr]

def generate_more_than_one(field_name, length=5000):
col_nums = numpy.random.randint(2, 7)
col_arr = numpy.random.randint(0, 2, size=[length, col_nums])
result_names = list()
result_values = list()
for idx in range(col_nums):
result_values.append(col_arr[:, idx])
result_names.append("_".join([field_name, str(idx)]))
return result_names, result_values
在列数较多时,随机的方式测试效果更平均,接近于真实项目中的情况。

预处理方式

实现测试了两种存储预处理之后列数据的方式,分别为

  • 原有 DataFrame 变量中调用 reindex 方法加入到原有 DataFrame 变量中;
  • 原有 DataFrame 变量中逐列加入新列;
  • 保存预处理后的列名与列值,构建列名到列值的映射,调用 DataFrame 初始化方法构建新变量。

代码实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import numpy
import pandas

def analog_pre_process_reindex(data_size=(5000, 300)):
length, cols = data_size
processed_df = pandas.DataFrame()
processed_df["target"] = numpy.random.randint(0, 3, size=[length])
# generate the test data length * cols include the target
# but result maybe has more cols
# default OneHot:Standard=1:1
cur_field_names = processed_df.columns.tolist()
for field_index in range(1, cols):
field_name = str(field_index)
if field_index % 2 == 0:
processed_names, processed_values = generate_one(
field_name=field_name, length=length)
else:
processed_names, processed_values = generate_more_than_one(
field_name=field_name, length=length)

cur_field_names.extend(processed_names)
processed_df = processed_df.reindex(
columns=cur_field_names, copy=False)
processed_df[processed_names] = numpy.array(processed_values).T

return processed_df

def analog_pre_process_one_to_df(data_size=(5000, 300)):
length, cols = data_size
processed_df = pandas.DataFrame()
processed_df["target"] = numpy.random.randint(0, 3, size=[length])
# generate the test data length * cols include the target
# but result maybe has more cols
# default OneHot:Standard=1:1
for field_index in range(1, cols):
field_name = str(field_index)
if field_index % 2 == 0:
processed_names, processed_values = generate_one(
field_name=field_name, length=length)
else:
processed_names, processed_values = generate_more_than_one(
field_name=field_name, length=length)

for index in range(len(processed_names)):
one_field_name = processed_names[index]
one_field_values = processed_values[index]
processed_df[one_field_name] = one_field_values

return processed_df

def analog_pre_process_list(data_size=(5000, 300)):
length, cols = data_size
col_names = ["target"]
col_values = [numpy.random.randint(0, 3, size=[length])]
# generate the test data length * cols include the target
# but result maybe has more cols
# default OneHot:Standard=1:1
for field_index in range(1, cols):
field_name = str(field_index)
if field_index % 2 == 0:
processed_names, processed_values = generate_one(
field_name=field_name, length=length)
else:
processed_names, processed_values = generate_more_than_one(
field_name=field_name, length=length)

col_names.extend(processed_names)
col_values.extend(processed_values)

fields_num = len(col_names)
data_dict = {col_names[idx]: col_values[idx]
for idx in range(fields_num)}
return pandas.DataFrame(data_dict)
此处,特征数量中一列变换到一列与一列变换到多列的比例为1:1,原有 DataFrame 变量中只有目标分类列(由数据产生方式可看出为三分类数据)。

测试验证

测试包含时间与内存消耗两部分,时间测试函数与主函数模块如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from datetime import datetime

def record_time(func, data_size):
start_time = datetime.now()
res = func(data_size)
end_time = datetime.now()
return res, (end_time - start_time).total_seconds()

if __name__ == "__main__":
for i in range(1, 5):
print("==============================================================")
print("DataSize is %s * %s !" % (5000*i, 300*i))
df, cost_time = record_time(
func=analog_pre_process_one_to_df, data_size=(5000 * i, 300 * i))
# func=analog_pre_process_reindex, data_size=(5000 * i, 300 * i))
# func=analog_pre_process_list, data_size=(5000 * i, 300 * i))
del df
print("CostTime = %s seconds!" % cost_time)

为避免同时测试对性能的影响,此处对每种方式均单独测试运行,分别测试数据量为 (5000*300)、(10000*600)、(15000*900) 及 (20000*1200) 数据量的情况,时间代价如下(单位为秒)。

表 5 保存预处理结果时间消耗统计表
analog_pre_process_reindex analog_pre_process_one_to_df analog_pre_process_list
(5000*300) 5.89472 0.292583 0.069674
(10000*600) 48.151761 1.250182 0.215701
(15000*900) 160.599144 3.279289 0.462308
(20000*1200) 763.805195 6.772731 0.771077

内存测试使用 memory_profiler 库函数,对检测函数 func_name 进行内存分析的使用方式如下。此处修饰符 @profile(precision=6) 精度为 6 位小数。

1
2
3
4
5
6
from memory_profiler import profile

@profile(precision=6)
def func_name(param...):
pass

运行三次分别得到的结果对比如下图所示


此处时间略大于不进行内存测试时的时间,因为使用该模块进行内存监控会降低代码的运行速度,属于正常现象。

结论 && 分析

结论

由上述测试结果明显可以得出结论:

  • 将获取到的所有预处理数据统一初始化构造为 DataFrame 对象是比向 DataFrame 变量中插入数据具有更高效的处理方式
  • 在数据预处理中逐次添加一列或多列数据到 DataFrame 均会带来内存的增加与执行速度的降低。
分析

DataFrame 初始化中源码分析可知,在该类型变量底层是通过 numpy.array 申请的连续内存块存储数据的,因此添加新列需要重新申请内存并释放旧内存,在频繁加入新内存时即带来了内存的额外消耗和性能下降。

环境说明

此处实验的计算机硬件配置为

1
2
3
4
CPU: CPU Intel(R) Core(TM) i7-4770 CPU @ 3.40GHz
# 但未用到 GPU 加速等相关操作
GPU: VGA compatible controller: NVIDIA Corporation GM206 [GeForce GTX 960] (rev a1)
内存: 16GB
软件环境为
1
2
3
4
5
OS: Ubuntu-16.04
Python: 3.5.3
numpy: 1.13.1
pandas: 0.22.0
memory-profiler: 0.54.0

后记

最近在做 Python 机器学习项目有关内存优化的工作,本文主要是优化过程中与 Pandas 相关的优化,通过该方式已经有效降低了与此相关的内存使用,效果明显,因此记录下来以供大家参考。

坚持原创技术分享,您的支持将鼓励我继续创作!