标签:col 数据库 clean 数据库连接 ret import hash_set 方法 成功
import redis
class My(object):
try:
def __init__(self):
self.__host=‘xxx.xxx.xxx.xxx‘
self.__port=6379
self.__password=‘123456‘
self.r=redis.Redis(host=self.__host,port=self.__port,password=self.__password,db=15)
except Exception as e:
print(‘数据库连接失败‘)
def get(self,k):
res=self.r.get(k) #r连接数据库
if res:
return res.decode()
return None
def __close(self): #私有方法,出了类就使用不了
print(‘close‘)
def str_set(self,k,v):
self.r.set(k,v)
print(‘set成功‘)
def str_delete(self, k):
if self.get(k):
self.r.delete(k)
print(‘redies删除成功‘)
else:
print(‘KEY不存在‘)
def hash_get(self,k1,k2):
res = self.r.hget(k1,k2)
if res:
return res.decode()
return None
def hash_set(self,k1,k2,v):
self.r.hset(k1,k2,v)
print(‘hset成功‘)
def hash_getall(self,k):
res=self.r.hgetall(k)
if res:
new_res={}
for k,v in res.items():#字典变成decode
new_res[k.decode()]=v.decode()
print(new_res)
return None
def hash_del(self,k1,k2):
if self.hash_get(k1,k2):
self.r.hdel(k1,k2)
print(‘redies删除成功‘)
else:
print(‘KEY不存在‘)
def clean_redis(self):
for k in self.r.keys(): # 删除所有KEY
self.r.delete(k)
my=My()
my.str_set(‘xx‘,‘test‘)
# print(my.get(‘xx1‘))
# my.str_delete(‘xx6‘)
# my.hash_set(‘测试‘,‘测试1‘,‘10000‘)
# print(my.hash_get(‘测试‘,‘测试1‘))
# my.hash_getall(‘测试‘)
# my.hash_del(‘测试‘,‘测试1‘)
# my.clean_redis()
标签:col 数据库 clean 数据库连接 ret import hash_set 方法 成功
原文地址:https://www.cnblogs.com/jiadan/p/9090181.html