Apache Cassandra: работа с базами из Python и cassandra-driver

Автор: | 05/25/2015
 

Cassandra-LogoПредполагается, что Cassandra уже установлена и работает, и в ней есть база TestKeyspace и таблица users, созданные в статье Apache Cassandra: описание директорий, язык CQL, утилита cqlsh.

Установка драйвера

Для работы с Cassandra из Python требуется cassandra-driver, который можно установить из PIP.

Для установки PIP в Debian — выполняем:

# aptitude install python-pip

И устанавливаем драйвер:

# pip install cassandra-driver

Что бы проверить — запускаем консоль Python:

>>> from cassandra.cluster import Cluster
>>> cluster = Cluster()
>>> session = cluster.connect('testkeyspace')

Что бы подключиться к кластеру по IP — укажите один или несколько адресов:

>>> cluster = Cluster(['127.0.0.1', '192.168.1.0'])
>>> session = cluster.connect('testkeyspace')

Подробнее — cassandra.cluster.

Выполнение запросов к базе

Посмотреть содержимое таблицы users:

>>> session = cluster.connect('testkeyspace')
>>> result = session.execute("select * from users")
>>> print result
[]

Добавим запись:

>>> session.execute("INSERT INTO users (id, name, login, group) VALUES (1, 'User', 'setevoy', 'wheel')")
>>> result = session.execute("select * from users")[0]
>>> print result
Row(id=1, group=u'wheel', login=u'setevoy', name=u'User')

Вывести только отдельные поля:

>>> result = session.execute("select * from users")[0]
>>> print result.login, result.name
setevoy User

Или так:

>>> result = session.execute("select * from users")
>>> for x in result: print x.id
...
1

В запросы можно передавать параметры из переменных, например:

>>> name, login, group = 'newuser', 'newlogin', 'newgroup'
>>> session.execute("INSERT INTO users (id, name, login, group) VALUES (2, %s, %s, %s)", (name, login, group))
>>> session.execute("select * from users")
[Row(id=1, group=u'wheel', login=u'setevoy', name=u'User'), Row(id=2, group=u'newgroup', login=u'newlogin', name=u'newuser')]

Учтите, что в данном случае необходимо использовать %s для всех типов данных, а не только строк.

То же, но с помощью словаря:

>>> dict = {'name':'secondname','login':'secondlogin','group':'secondgroup',}
>>> dict.values()
['secondlogin', 'secondgroup', 'secondname']
>>> session.execute("""INSERT INTO users (id, name, login, group) VALUES (2, %(name)s, %(login)s, %(group)s)""", dict)
>>> session.execute("select * from users")
[Row(id=1, group=u'wheel', login=u'setevoy', name=u'User'), Row(id=2, group=u'secondgroup', login=u'secondlogin', name=u'secondname')]

Типы данных

Тип в Python Тип в CQL
None NULL
bool boolean
float
float
double
int
long
int
bigint
varint
counter
decimal.Decimal decimal
str
unicode
ascii
varchar
text
buffer
bytearray
blob
date date
datetime timestamp
time time
list
tuple
generator
list
set
frozenset
set
dict
OrderedDict
map
uuid.UUID
timeuuid
uuid

Асинхронные запросы

Драйвер так же поддерживает выполнение асинхронных запросов с помощью execute_async(). Вместо того, что бы ждать окончания запроса и вернуть строку результата по окончанию выполнения — этот метод практически сразу возвращает объект ResponseFuture.

Например:

>>> from cassandra import ReadTimeout
>>> query = "SELECT * FROM users"
>>> future = session.execute_async(query)
>>> try:
...   rows = future.result()
...   for user in rows:
...     print user.name, user.group
... except ReadTimeout:
...   print('ERROR: query timed out')
...
User wheel
secondname secondgroup

Так же можно подключать функции «обратной связи», с помощью методов add_callback(), add_errback() и add_callbacks():

>>> def handle_success(rows):
...   try:
...     for user in rows:
...       print user.name, user.group
...   except Exception as e:
...       print 'ERROR: %s' % e
...
>>> def handle_error(exception):
...   print('Failed to fetch user info: %s' % exception)
...
>>> future = session.execute_async(query)
>>> future.add_callbacks(handle_success, handle_error)
User wheel
secondname secondgroup

Изменение Consistency Level

Consistency Level — «уровень согласованности» в Cassandra, который определяет сколько нод будут опрошены для получения результата, который будет считаться успешно завершённым.

По умолчанию используется ConsistencyLevel.ONE. Если хотите изменить этот параметр — используйте для вашего запроса класс SimpleStatement:

>>> from cassandra import ConsistencyLevel
>>> from cassandra.query import SimpleStatement
>>> query = SimpleStatement(
...   "INSERT INTO users (id, name, login, group) VALUES (%s, %s, %s, %s)",
...   consistency_level=ConsistencyLevel.QUORUM)
>>> session.execute(query, (3, 'name3', 'login3', 'group3'))
>>> session.execute('select * from users')
[Row(id=1, group=u'wheel', login=u'setevoy', name=u'User'), Row(id=2, group=u'secondgroup', login=u'secondlogin', name=u'secondname'), Row(id=3, group=u'group3', login=u'login3', name=u'name3')]

Подготовленные запросы

Prepared statements передаются Cassandra и сохраняются в ней для дальнешего выполнения. Когда дайвер использует такой подготовленный запрос — ему остаётся только передать значения параметров. Это позволяет снизить трафик и нагрузку на CPU, так как Cassandra не будет каждый раз парсить весь запрос заново.

Что бы подготовить такой запрос — используйте Session.prepare():

>>> user_lookup_stmt = session.prepare("SELECT * FROM users WHERE id=?")
>>> users = []
>>> user_ids_to_query = range(1,4)
>>> for user_id in user_ids_to_query:
...   user = session.execute(user_lookup_stmt, [user_id])
...   users.append(user)
...
>>> print users
[[], [Row(id=1, group=u'wheel', login=u'setevoy', name=u'User')], [Row(id=2, group=u'secondgroup', login=u'secondlogin', name=u'secondname')], [Row(id=1, group=u'wheel', login=u'setevoy', name=u'User')], [Row(id=2, group=u'secondgroup', login=u'secondlogin', name=u'secondname')], [Row(id=3, group=u'group3', login=u'login3', name=u'name3')]]

Ссылки по теме:

http://datastax.github.io

http://pycon-2012-notes.readthedocs.org