起因:前段时间,我们把通过happybase向hbase 写数据的操作put() 操作换成了batch() 结果发现性能并没有提升
阅读代码,我发现put() 实现使用的就是批量插入
table.py
def put(self, row, data, timestamp=None, wal=True):
"""Store data in the table.
This method stores the data in the `data` argument for the row
specified by `row`. The `data` argument is dictionary that maps columns
to values. Column names must include a family and qualifier part, e.g.
`cf:col`, though the qualifier part may be the empty string, e.g.
`cf:`.
Note that, in many situations, :py:meth:`batch()` is a more appropriate
method to manipulate data.
.. versionadded:: 0.7
`wal` argument
:param str row: the row key
:param dict data: the data to store
:param int timestamp: timestamp (optional)
:param wal bool: whether to write to the WAL (optional)
"""
with self.batch(timestamp=timestamp, wal=wal) as batch:
batch.put(row, data) # 很明显是批量操作
batch.py
class Batch(object):
"""Batch mutation class.
This class cannot be instantiated directly; use :py:meth:`Table.batch`
instead.
"""
def __init__(self, table, timestamp=None, batch_size=None,
transaction=False, wal=True):
"""Initialise a new Batch instance."""
if not (timestamp is None or isinstance(timestamp, Integral)):
raise TypeError("‘timestamp‘ must be an integer or None")
if batch_size is not None:
if transaction:
raise TypeError("‘transaction‘ cannot be used when "
"‘batch_size‘ is specified")
if not batch_size > 0:
raise ValueError("‘batch_size‘ must be > 0")
self._table = table
self._batch_size = batch_size
self._timestamp = timestamp
self._transaction = transaction
self._wal = wal
self._families = None
self._reset_mutations()
def _reset_mutations(self):
"""Reset the internal mutation buffer."""
self._mutations = defaultdict(list)
self._mutation_count = 0
def send(self):
"""Send the batch to the server."""
bms = [BatchMutation(row, m) for row, m in self._mutations.iteritems()]
if not bms:
return
logger.debug("Sending batch for ‘%s‘ (%d mutations on %d rows)",
self._table.name, self._mutation_count, len(bms))
if self._timestamp is None:
self._table.connection.client.mutateRows(self._table.name, bms, {})
else:
self._table.connection.client.mutateRowsTs(
self._table.name, bms, self._timestamp, {})
self._reset_mutations()
#
# Mutation methods
#
def put(self, row, data, wal=None):
"""Store data in the table.
See :py:meth:`Table.put` for a description of the `row`, `data`,
and `wal` arguments. The `wal` argument should normally not be
used; its only use is to override the batch-wide value passed to
:py:meth:`Table.batch`.
"""
if wal is None:
wal = self._wal
self._mutations[row].extend(
Mutation(
isDelete=False,
column=column,
value=value,
writeToWAL=wal)
for column, value in data.iteritems())
self._mutation_count += len(data)
if self._batch_size and self._mutation_count >= self._batch_size: # 只有大于_batch_size 才会真正发送数据
self.send()
版权声明:本文为博主原创文章,未经博主允许不得转载。
原文地址:http://blog.csdn.net/woshiaotian/article/details/47083159