标签:protoc truncated leo find term esc for regex returns
from django.db import models class Book(models.Model): title=models.CharField(max_length=32) price=models.IntegerField() pub_date=models.DateField(null=True,blank=True) publish=models.ForeignKey("Publish",on_delete=models.CASCADE) authors=models.ManyToManyField("Author") def __str__(self): return self.title class Publish(models.Model): name=models.CharField(max_length=32) email=models.EmailField() def __str__(self): return self.name class Author(models.Model): name=models.CharField(max_length=32) age=models.IntegerField() def __str__(self): return self.name
上述包含书籍、出版社、作者模型表,其中出版社和书籍是一对多的关系,作者和书籍是多对多的关系。
以Book表为例,对其进行增、删、改、查:
#方式一: Book.objects.create(title=‘Python‘,price=12,pub_date=‘2017-12-10‘,publish=‘天津出版社‘) #其中publish为外键对象,或者写publish_id=2 #方式二: Book.objects.create(**{‘title‘:‘Python‘,‘price‘:12,‘pub_date‘:‘2017-12-10‘,‘publish‘:‘天津出版社‘)
#方式一: book = Book(title=‘Python‘,price=12,pub_date=‘2017-12-10‘,publish=‘天津出版社‘) #其中publish为外键对象,或者写publish_id=2 book.save() #方式二: book = Book() book.title = ‘Python‘ book.price = 12 book.pub_date = ‘2017-12-10‘ book.publish = ‘天津出版社‘ #或者book.publish_id=2 book.save
book = Book.objects.filter(id=1).delete()
#方式一: Book.objects.filter(id=2).update(title=‘Java‘) #方式二: book = Book.objects.get(id=2) book.title= ‘Java‘ book.save()
注意:get()方法获取的内容更新的是所有的,效率较低,并且只能获取一个对象,而filter()获取的是queryset对象的集合。建议更新用update()方法。
Book.objects.all().values(‘title‘).distinct()#对于某一个字段去重 Book.objects.filter(title=‘Python‘).values(‘title‘,‘publish‘,‘pub_date‘)#根据具体条件查找
Publish和Book是一对多的关系,所以以它们为例进行增、删、改、查操作:
#方式一: 直接给外键的数据库字段赋值 Book.objects.create(title=‘linux‘,price=15,pub_date=‘2017-12-10‘,publish_id=2) #方式二:对象方法添加
publish_obj = Publish.objects.filter(name=‘机械出版社‘)[0] Book.objects.create(title=‘php‘,price=15,pub_date=‘2017-12-10‘,publish=publish_obj)
#外键在的表-主表 对象调用外键publish Book.objects.filter(title=‘linux‘)[0].publish.delete() #主表-外键在的表 对象调用book_set Publish.objects.filter(id=1)[0].book_set.all().delete()
#外键在的表-主表 Book.objects.filter(publish__name=‘机械出版社‘).delete() #主表-外键在的表 Publish.objects.filter(book__name=‘python‘).delete()
#主表-外键在的表 Publish.objects.filter(book__title=‘python‘).update(name=‘北京出版社‘) # 外键在的表-主表 Book.objects.filter(publish__name=‘北京出版社‘).update(title=‘python‘)
#外键所在的表-主表 ret = Book.objects.filter(publish=Publish.objects.filter(name=‘北京出版社‘)[0]).values(‘title‘,‘price‘) print(ret) #主表—外键所在的表 ret =Publish.objects.filter(name=‘北京出版社‘)[0].book_set.values(‘title‘,‘price‘) print(‘Book表内容‘,ret)
上述外键反向使用的是book_set,另一种方法是利用related_name=‘a‘属性
Publish.objects.filter(name=‘北京出版社).values(‘a‘) Publish.objects.filter(name=‘北京出版社‘).values(‘a_title‘,‘a_price‘)#用于反向跨表
#外键在的表-主表 ret = Book.objects.filter(publish__name=‘北京出版社‘).values(‘title‘,‘price‘) print(ret)
#主表-外键在的表 ret = Publish.objects.filter(book__title=‘php‘).values(‘name‘,‘book__pub_date‘) print(ret)
Book和Author是多对多关系,所以Book和Author表会生成第三张表,在第三张表中保存了这两张表的关系,所以以这两张表为实例进行增、删、改、查操作:
#构建第三张表中的关系 #ManyToMany字段在的表-主表 author1 = Author.objects.get(id=1) author2 = Author.objects.filter(id=2)[0] book1 = Book.objects.get(id=2) book1.authors.add(author1,author2)#等同于boo1.authors.add(*[author1.author2]) # 主表-ManyToMany字段在的表 author1 = Author.objects.get(id=2) book1 = Book.objects.get(id=3) author1.book_set.add(book1) #也可以添加多个和上面相同
book1 = Book.objects.get(id=2) book1.authors.add(2) #其中authors是ManytoMany字段,2是Author表中的id book1.authors.add(*[2,3])
#ManyToMany字段在的表-主表 author1 = Author.objects.get(id=1) author2 = Author.objects.filter(id=2)[0] book1 = Book.objects.get(id=2) book1.authors.remove(author1, author2) # 等同于boo1.authors.remove(*[author1.author2]) #主表-ManyToMany字段在的表 author1 = Author.objects.get(id=2) book1 = Book.objects.get(id=3) author1.book_set.remove(book1)
删除操作可以使用clear方法,删除操作也就是j将第三张表中的关系清除掉:
book1.authors.clear() #将与book1对象相关的关系在第三张表所有清空
book1 = Book.objects.get(id=2) author = Author.objects.filter(id__gt=2)[0] book1.authors.clear() #clear先将第三张表清空 book1.authors.add(author) #这里实际就是增加操作了,可以增加一个或者多个
修改也可以使用set方法:
book.authors.set([2,3,4]) #重置,如果存在就不管,不存在就设置,如果不符合条件的删除,重新建立。第三张表id从最后一个开始往上加
#子表-主表 ret = Book.objects.filter(authors = Author.objects.filter(name=‘aaa‘)[0]).values(‘title‘) print(ret)
#主表-子表 ret = Author.objects.filter(name=‘aaa‘)[0].book_set.all().values(‘title‘) print(ret)
#子表-主表 ret = Book.objects.filter(authors__name=‘aaa‘).values(‘title‘) #主表-子表 ret = Author.objects.filter(book__title=‘python‘).values(‘name‘) print(ret)
多对多操作使用到了以下方法:
add() #添加方法,可添加对象或者id。并且可以以列表的形式添加多个 remove() #删除,实际上就是移除掉与指定对象在第三张表中的关系 clear() #也可用于删除,但是它不需要传递任何参数,清空掉所有调用它的对象在第三张表中的关系 set()#修改方法,传入需要修改对象的id列表
Django的ORM操作主要就是对queryset类型进行操作:
def filter(self, *args, **kwargs) # 条件查询 # 条件可以是:参数,字典,Q
def all(self) # 获取所有的数据对象
def get(self, *args, **kwargs): #返回一个匹配的对象
def values(self, *fields): # 获取每行数据为字典格式
def exclude(self, *args, **kwargs) # 条件查询 # 条件可以是:参数,字典,Q # 用于取反 models.Book.objects.all().exclude(id__gt=2)
# 用于排序 models.Book.objects.all().order_by(‘-id‘)
# 确认queryset是否已经排好序 order = models.Book.objects.all().ordered()
# 用于排序后倒序 models.Book.objects.all().order_by(‘id‘).reverse()
# 用于distinct去重 models.Book.objects.values(‘title‘).distinct() # select distinct title from app01_book
def values_list(self, *fields, **kwargs): # 获取每行数据为元祖
def count(self): # 获取queryset中对象个数
def first(self): # 获取第一个对象
def last(self): # 获取最后一个对象
def exists(self): # 判断queryset是否有数据
models.Book.objects.only(‘title‘,‘publish‘) #或 models.Book.objects.filter(id__gt = 4).only(‘title‘,‘publish‘)
models.Book.objects.defer(‘title‘,‘publish‘) #或 models.Book.objects.filter(id__gt = 4).defer(‘title‘,‘publish‘)
def raw(self, raw_query, params=None, translations=None, using=None): # 执行原生SQL models.Book.objects.raw(‘select * from app01_book‘)
def none(self): # 空QuerySet对象
def dates(self, field_name, kind, order=‘ASC‘): # 根据时间对某一部分进行去重查找并截取指定内容 # kind只能是:"year"(年), "month"(年-月), "day"(年-月-日) # order只能是:"ASC" "DESC" # 并获取转换后的时间 - year : 年-01-01 - month: 年-月-01 - day : 年-月-日 models.Publish.objects.dates(‘ctime‘,‘day‘,‘DESC‘)
datetimes():根据时间对某一部分进行去重查找并截取指定内容,将时间转换为指定时区时间
def datetimes(self, field_name, kind, order=‘ASC‘, tzinfo=None): # 根据时间对某一部分进行去重查找并截取指定内容,将时间转换为指定时区时间 # kind只能是 "year", "month", "day", "hour", "minute", "second" # order只能是:"ASC" "DESC" # tzinfo时区对象 models.Publish.objects.datetimes(‘ctime‘,‘hour‘,tzinfo=pytz.UTC) models.Publish.objects.datetimes(‘ctime‘,‘hour‘,tzinfo=pytz.timezone(‘Asia/Shanghai‘))
当字段和字段进行比较时用F查询
from django.db.models import F from django.db.models.functions import Concat from django.db.models import Value #两个字段作比较,收藏数大于给赞数Goods.objects.filter(collection_num__gt=F(‘star_num‘)) #F() 对象和常数之间的加减乘除和取模的操作,将价格都加10元 Book.objects.all().update(price=F(‘price‘)+10) #字符串拼接 Book.objects.update(title=Concat(F("title"), Value("第一版")))
当查询条件是”或“ 的时候 用Q查询,而默认的filter参数都是”且“的关系
Book.objects.all().filter(Q(name=‘python‘)|Q(price=24)
上面使用Q查询用的是字段名,如果是字符串(“title”,"price")应该怎么处理呢?
q = Q() # 实例化一个Q对象 q.connector = "or" # 默认是且的关系,这里是或的关系 q.children.append("title", "python") q.children.append("price", 24) Book.objects.filter(q)
这和字段名的效果是一样的,只不过这里使用的是字符串。
子查询 extra(self, select=None, where=None, params=None, tables=None, order_by=None, select_params=None)
在执行原生sql语句中有时会有较为复杂的子查询:
""" select id, title, (select count(1) from app01_publish) as n from app01_book """
而在ORM操作中,这种子查询可以使用extra方法,在QuerySet的基础上继续执行子语句:
book_obj=models.Book.objects.all().extra(select={ ‘n‘:"select count(1) from app01_publish WHERE id=%s or id=%s", }, select_params=[1,2]) #可以取出id,title,n(子查询的结果)的值
当然,extra中还有其它其它参数,可以进行where子语句等:
models.Book.objects.extra(where=[‘id in (1,3) OR title like "py%" ‘,‘id>2‘],order_by=‘-id‘)
注意:参数中select和select_params是一组,where和params是一组,tables用来设置from哪个表
执行原生SQL
from django.db import connection cursor = connection.cursor() # cursor = connections[‘default‘].cursor() cursor.execute("""SELECT * from app01_book where id = %s""", [1]) row = cursor.fetchone() print(row)
聚合查询 aggregate(*args, **kwargs)
aggregate()是QuerySet 的一个终止子句,它返回一个包含一些键值对的字典。键的名称是聚合值的标识符,值是计算出来的聚合值。键的名称是按照字段和聚合函数的名称自动生成出来的。
from django.db.models import Avg def test(request): averge_price = models.Book.objects.all().aggregate(Avg("price")) print(averge_price) #{‘price__avg‘: 27.0}
当然,也可以将其重新命名:
averge_price = models.Book.objects.all().aggregate(avg_price=Avg("price")) print(averge_price) #{‘avg_price‘: 27.0}
另外,aggregate还可以生成多个聚合,只需要向其传递另外的参数:
#书籍的平均价格、最大价格、最小价格、价格的总和 from django.db.models import Avg,Max,Min,Sum def test(request): averge_price = models.Book.objects.all().aggregate(Avg("price"),Max("price"),Min("price"),Sum("price")) print(averge_price) #{‘price__avg‘: 27.0, ‘price__max‘: 56, ‘price__min‘: 12, ‘price__sum‘: 135}
注意:聚合函数中的字段是可以使用‘__’跨表,查询其它表中的内容
用于实现聚合group by查询,为调用的QuerySet中每一个对象都生成一个独立的统计值 ,例如要检索每本书有多少个作者:
from django.db.models import Avg,Max,Min,Sum,Count def test(request): #分组查询 book_list = models.Book.objects.all().annotate(authors_num = Count(‘authors‘)) print(book_list[0].authors_num) #取出第一本书的作者数量 #或者循环取出每一本的作者数量 for book in book_list: print(book.authors_num)
其sql语句类似:
SELECT id,title COUNT(authors) AS `authors_num` FROM app01_book GROUP BY authors
与aggregate()返回的字典不同,annotate()的返回值是一个QuerySet。
字段查找是指定SQL WHERE
子句的内容的方式。它们被指定为QuerySet
方法的关键字参数,如filter()
, exclude()以及
get()等
。
#精确匹配 models.Book.objects.get(id__exact=4) #SQL等价于: SELECT ... WHERE id = 4;
iexact
#不区分大小写的完全匹配。 models.Book.objects.get(title__iexact=‘python‘) #SQL等价于: SELECT ... WHERE name ILIKE ‘python‘;
contains
#大小写敏感的匹配查询,带有%为模糊查询 models.Book.objects.get(title__contains=‘python‘) #SQL等价于: SELECT ... WHERE title LIKE ‘%python%‘;
icontains
#大小写不敏感的匹配查询 models.Book.objects.get(title__icontains=‘python‘) #SQL等价于: SELECT ... WHERE title LIKE ‘%python%‘;
in
#在给定的可迭代中; 通常是列表,元组或查询集 models.Book.objects.get(id__in=[1,2]) models.Book.objects.get(id__title=‘abc‘) #SQL等价于: SELECT ... WHERE id IN (1, 2); SELECT ... WHERE title IN (‘a‘, ‘b‘, ‘c‘);
gt
#大小 models.Book.objects.get(id__gt=2) #SQL等价于: SELECT ... WHERE id > 2;
gte
#大小等于 models.Book.objects.get(id__gte=2) #SQL等价于: SELECT ... WHERE id >= 2;
#小于 models.Book.objects.get(id__lt=2) #SQL等价于: SELECT ... WHERE id < 2;
#小于等于 models.Book.objects.get(id__lte=2) #SQL等价于: SELECT ... WHERE id <= 2;
#区分大小写的开头,以..为开头 models.Book.objects.get(title__startswith=‘py‘) #SQL等价于: SELECT ... WHERE titleLIKE ‘py%‘;
注意:istartswith是不区分大小写开头
#区分大小写的结尾,以..为结尾 models.Book.objects.get(title__endswith=‘thon‘) #SQL等价于: SELECT ... WHERE titleLIKE ‘%thon‘;
注意:iendswith是不区分大小写开头
#在某一个范围内,包括两端 import datetime start_date = datetime.date(2015, 3, 1) end_date = datetime.date(2015, 3, 23) models.Publish.objects.filter(pub_date__range=(start_date, end_date)) #SQL等价于: SELECT ... WHERE pub_date BETWEEN ‘2015-03-01‘ and ‘2015-03-23‘;
#根据某一个字段的值是否为空进行过滤 models.Book.objects.get(title__isnull=True) #SQL等价于: SELECT ... WHERE title IS NULL;
#区分大小写的正则表达式匹配,正则表达式语法是Python re模块的语法 models.Book.objects.get(title__regex=r‘^(An?|The) +‘) #SQL等价于: SELECT ... WHERE title REGEXP BINARY ‘^(An?|The) +‘;
注意:iregex不区分大小写的正则表达式匹配。并且建议使用原始字符串(例如,r‘foo‘
而不是‘foo‘
)来传递正则表达式语法。
#对于datetime字段,将值转换为日期。允许链接其他字段查找。采用日期值。 models.Publish.objects.filter(pub_date__date=datetime.date(2005, 1, 1)) models.Publish.objects.filter(pub_date__date__gt=datetime.date(2005, 1, 1))
#对于datetime字段,将值转换为时间。允许链接其他字段查找。取一个datetime.time值 models.Publish.objects.filter(pub_date__time=datetime.time(14, 30)) models.Publish.objects.filter(pub_date__time__range=(datetime.time(8), datetime.time(17)))
详情查看:https://docs.djangoproject.com/en/2.2/ref/models/querysets/#date
def bulk_create(self, objs, batch_size=None): # 批量插入 # batch_size表示一次插入的个数 objs = [ models.Book(title=‘aaa‘), models.Book(title=‘bbb‘) ] models.Book.objects.bulk_create(objs, 10)
def get_or_create(self, defaults=None, **kwargs): # 如果存在,则获取,否则,创建 # defaults 指定创建时,其他字段的值 obj, created = models.Book.objects.get_or_create(title=‘aaa‘, defaults={‘publish_id‘: 2,})
def update_or_create(self, defaults=None, **kwargs): # 如果存在,则更新,否则,创建 # defaults 指定创建时或更新时的其他字段 obj, created = models.Book.objects.update_or_create(title=‘aaa‘, defaults={‘publish_id‘: 2,})
def in_bulk(self, id_list=None): # 根据主键ID进行查找 id_list = [1,2,3] models.Book.objects.in_bulk(id_list)
对于一对一字段(OneToOneField)和多对一字段,可以使用select_related 来对QuerySet进行优化,在对QuerySet使用select_related()函数后,Django会获取相应外键对应的对象,从而在之后需要的时候不必再查询数据库了。实际上就是表之间进行join连表操作,一次性获取关联的数据。
def select_related(self, *fields) #表之间进行join连表操作,一次性获取关联的数据。 #没有指定的字段不会缓存,如果要访问的话Django会再次进行SQL查询。 #使用双下划线“__”连接字段名来实现指定的递归查询。 models.Book.objects.select_related(‘publish‘).all()
对于多对多字段(ManyToManyField)和一对多字段,可以使用prefetch_related()来进行优化。prefetch_related()利用的是分别查询每个表,然后用Python处理他们之间的关系。
def prefetch_related(self, *lookups) #性能相关:多表连表操作时速度会慢,使用其执行多次SQL查询在Python代码中实现连表操作。 models.Book.objects.prefetch_related(‘authors‘).all() #authors是多对多字段
Django的queryset对应于数据库的若干记录(row),通过可选的查询来过滤。例如,下面的代码会得到数据库中书名称为‘Python’的所有书籍:
book_set = models.Book.objects.filter(title="Python").all()
但是 上面的代码并没有运行任何的数据库查询。要真正从数据库获得数据,需要遍历queryset或者说当用到数据时就会执行sql,去数据库中查询:
book_set = models.Book.objects.filter(title="Python").all() for book in book_set: print(book.title)
当遍历queryset时,所有匹配的记录会从数据库获取,然后转换成Django的model。这些model会保存在queryset内置的cache中,这样如果再次遍历这个queryset, 不需要重复运行通用的查询。
一次性向内存读入大量的数据,会造成内存的浪费,并且很可能会造成程序的崩溃。要避免在遍历数据的同时产生queryset cache,可以使用iterator()方法 来获取数据,处理完数据就将其丢弃。
book_set= Book.objects.all().iterator() # iterator()可以一次只从数据库获取少量数据,这样可以节省内存 for obj in book_set: print(obj.name)
def iterator(self, chunk_size=2000): """ An iterator over the results from applying this QuerySet to the database. """ if chunk_size <= 0: raise ValueError(‘Chunk size must be strictly positive.‘) use_chunked_fetch = not connections[self.db].settings_dict.get(‘DISABLE_SERVER_SIDE_CURSORS‘) return self._iterator(use_chunked_fetch, chunk_size)
iterator中有默认参数 chunk_size=2000,表示在数据库驱动程序级别缓存的结果数。
以上的API都是基于django.db.models.query.QuerySet中的API所得,详情参考:
class QuerySet: """Represent a lazy database lookup for a set of objects.""" def __init__(self, model=None, query=None, using=None, hints=None): self.model = model self._db = using self._hints = hints or {} self.query = query or sql.Query(self.model) self._result_cache = None self._sticky_filter = False self._for_write = False self._prefetch_related_lookups = () self._prefetch_done = False self._known_related_objects = {} # {rel_field: {pk: rel_obj}} self._iterable_class = ModelIterable self._fields = None def as_manager(cls): # Address the circular dependency between `Queryset` and `Manager`. from django.db.models.manager import Manager manager = Manager.from_queryset(cls)() manager._built_with_as_manager = True return manager as_manager.queryset_only = True as_manager = classmethod(as_manager) ######################## # PYTHON MAGIC METHODS # ######################## def __deepcopy__(self, memo): """Don‘t populate the QuerySet‘s cache.""" obj = self.__class__() for k, v in self.__dict__.items(): if k == ‘_result_cache‘: obj.__dict__[k] = None else: obj.__dict__[k] = copy.deepcopy(v, memo) return obj def __getstate__(self): # Force the cache to be fully populated. self._fetch_all() obj_dict = self.__dict__.copy() obj_dict[DJANGO_VERSION_PICKLE_KEY] = get_version() return obj_dict def __setstate__(self, state): msg = None pickled_version = state.get(DJANGO_VERSION_PICKLE_KEY) if pickled_version: current_version = get_version() if current_version != pickled_version: msg = ( "Pickled queryset instance‘s Django version %s does not " "match the current version %s." % (pickled_version, current_version) ) else: msg = "Pickled queryset instance‘s Django version is not specified." if msg: warnings.warn(msg, RuntimeWarning, stacklevel=2) self.__dict__.update(state) def __repr__(self): data = list(self[:REPR_OUTPUT_SIZE + 1]) if len(data) > REPR_OUTPUT_SIZE: data[-1] = "...(remaining elements truncated)..." return ‘<%s %r>‘ % (self.__class__.__name__, data) def __len__(self): self._fetch_all() return len(self._result_cache) def __iter__(self): """ The queryset iterator protocol uses three nested iterators in the default case: 1. sql.compiler:execute_sql() - Returns 100 rows at time (constants.GET_ITERATOR_CHUNK_SIZE) using cursor.fetchmany(). This part is responsible for doing some column masking, and returning the rows in chunks. 2. sql.compiler.results_iter() - Returns one row at time. At this point the rows are still just tuples. In some cases the return values are converted to Python values at this location. 3. self.iterator() - Responsible for turning the rows into model objects. """ self._fetch_all() return iter(self._result_cache) def __bool__(self): self._fetch_all() return bool(self._result_cache) def __getitem__(self, k): """Retrieve an item or slice from the set of results.""" if not isinstance(k, (int, slice)): raise TypeError assert ((not isinstance(k, slice) and (k >= 0)) or (isinstance(k, slice) and (k.start is None or k.start >= 0) and (k.stop is None or k.stop >= 0))), "Negative indexing is not supported." if self._result_cache is not None: return self._result_cache[k] if isinstance(k, slice): qs = self._chain() if k.start is not None: start = int(k.start) else: start = None if k.stop is not None: stop = int(k.stop) else: stop = None qs.query.set_limits(start, stop) return list(qs)[::k.step] if k.step else qs qs = self._chain() qs.query.set_limits(k, k + 1) qs._fetch_all() return qs._result_cache[0] def __and__(self, other): self._merge_sanity_check(other) if isinstance(other, EmptyQuerySet): return other if isinstance(self, EmptyQuerySet): return self combined = self._chain() combined._merge_known_related_objects(other) combined.query.combine(other.query, sql.AND) return combined def __or__(self, other): self._merge_sanity_check(other) if isinstance(self, EmptyQuerySet): return other if isinstance(other, EmptyQuerySet): return self combined = self._chain() combined._merge_known_related_objects(other) combined.query.combine(other.query, sql.OR) return combined #################################### # METHODS THAT DO DATABASE QUERIES # #################################### def _iterator(self, use_chunked_fetch, chunk_size): yield from self._iterable_class(self, chunked_fetch=use_chunked_fetch, chunk_size=chunk_size) def iterator(self, chunk_size=2000): """ An iterator over the results from applying this QuerySet to the database. """ if chunk_size <= 0: raise ValueError(‘Chunk size must be strictly positive.‘) use_chunked_fetch = not connections[self.db].settings_dict.get(‘DISABLE_SERVER_SIDE_CURSORS‘) return self._iterator(use_chunked_fetch, chunk_size) def aggregate(self, *args, **kwargs): """ Return a dictionary containing the calculations (aggregation) over the current queryset. If args is present the expression is passed as a kwarg using the Aggregate object‘s default alias. """ if self.query.distinct_fields: raise NotImplementedError("aggregate() + distinct(fields) not implemented.") self._validate_values_are_expressions(args + tuple(kwargs.values()), method_name=‘aggregate‘) for arg in args: # The default_alias property raises TypeError if default_alias # can‘t be set automatically or AttributeError if it isn‘t an # attribute. try: arg.default_alias except (AttributeError, TypeError): raise TypeError("Complex aggregates require an alias") kwargs[arg.default_alias] = arg query = self.query.chain() for (alias, aggregate_expr) in kwargs.items(): query.add_annotation(aggregate_expr, alias, is_summary=True) if not query.annotations[alias].contains_aggregate: raise TypeError("%s is not an aggregate expression" % alias) return query.get_aggregation(self.db, kwargs) def count(self): """ Perform a SELECT COUNT() and return the number of records as an integer. If the QuerySet is already fully cached, return the length of the cached results set to avoid multiple SELECT COUNT(*) calls. """ if self._result_cache is not None: return len(self._result_cache) return self.query.get_count(using=self.db) def get(self, *args, **kwargs): """ Perform the query and return a single object matching the given keyword arguments. """ clone = self.filter(*args, **kwargs) if self.query.can_filter() and not self.query.distinct_fields: clone = clone.order_by() num = len(clone) if num == 1: return clone._result_cache[0] if not num: raise self.model.DoesNotExist( "%s matching query does not exist." % self.model._meta.object_name ) raise self.model.MultipleObjectsReturned( "get() returned more than one %s -- it returned %s!" % (self.model._meta.object_name, num) ) def create(self, **kwargs): """ Create a new object with the given kwargs, saving it to the database and returning the created object. """ obj = self.model(**kwargs) self._for_write = True obj.save(force_insert=True, using=self.db) return obj def _populate_pk_values(self, objs): for obj in objs: if obj.pk is None: obj.pk = obj._meta.pk.get_pk_value_on_save(obj) def bulk_create(self, objs, batch_size=None): """ Insert each of the instances into the database. Do *not* call save() on each of the instances, do not send any pre/post_save signals, and do not set the primary key attribute if it is an autoincrement field (except if features.can_return_ids_from_bulk_insert=True). Multi-table models are not supported. """ # When you bulk insert you don‘t get the primary keys back (if it‘s an # autoincrement, except if can_return_ids_from_bulk_insert=True), so # you can‘t insert into the child tables which references this. There # are two workarounds: # 1) This could be implemented if you didn‘t have an autoincrement pk # 2) You could do it by doing O(n) normal inserts into the parent # tables to get the primary keys back and then doing a single bulk # insert into the childmost table. # We currently set the primary keys on the objects when using # PostgreSQL via the RETURNING ID clause. It should be possible for # Oracle as well, but the semantics for extracting the primary keys is # trickier so it‘s not done yet. assert batch_size is None or batch_size > 0 # Check that the parents share the same concrete model with the our # model to detect the inheritance pattern ConcreteGrandParent -> # MultiTableParent -> ProxyChild. Simply checking self.model._meta.proxy # would not identify that case as involving multiple tables. for parent in self.model._meta.get_parent_list(): if parent._meta.concrete_model is not self.model._meta.concrete_model: raise ValueError("Can‘t bulk create a multi-table inherited model") if not objs: return objs self._for_write = True connection = connections[self.db] fields = self.model._meta.concrete_fields objs = list(objs) self._populate_pk_values(objs) with transaction.atomic(using=self.db, savepoint=False): objs_with_pk, objs_without_pk = partition(lambda o: o.pk is None, objs) if objs_with_pk: self._batched_insert(objs_with_pk, fields, batch_size) if objs_without_pk: fields = [f for f in fields if not isinstance(f, AutoField)] ids = self._batched_insert(objs_without_pk, fields, batch_size) if connection.features.can_return_ids_from_bulk_insert: assert len(ids) == len(objs_without_pk) for obj_without_pk, pk in zip(objs_without_pk, ids): obj_without_pk.pk = pk obj_without_pk._state.adding = False obj_without_pk._state.db = self.db return objs def get_or_create(self, defaults=None, **kwargs): """ Look up an object with the given kwargs, creating one if necessary. Return a tuple of (object, created), where created is a boolean specifying whether an object was created. """ lookup, params = self._extract_model_params(defaults, **kwargs) # The get() needs to be targeted at the write database in order # to avoid potential transaction consistency problems. self._for_write = True try: return self.get(**lookup), False except self.model.DoesNotExist: return self._create_object_from_params(lookup, params) def update_or_create(self, defaults=None, **kwargs): """ Look up an object with the given kwargs, updating one with defaults if it exists, otherwise create a new one. Return a tuple (object, created), where created is a boolean specifying whether an object was created. """ defaults = defaults or {} lookup, params = self._extract_model_params(defaults, **kwargs) self._for_write = True with transaction.atomic(using=self.db): try: obj = self.select_for_update().get(**lookup) except self.model.DoesNotExist: obj, created = self._create_object_from_params(lookup, params) if created: return obj, created for k, v in defaults.items(): setattr(obj, k, v() if callable(v) else v) obj.save(using=self.db) return obj, False def _create_object_from_params(self, lookup, params): """ Try to create an object using passed params. Used by get_or_create() and update_or_create(). """ try: with transaction.atomic(using=self.db): params = {k: v() if callable(v) else v for k, v in params.items()} obj = self.create(**params) return obj, True except IntegrityError as e: try: return self.get(**lookup), False except self.model.DoesNotExist: pass raise e def _extract_model_params(self, defaults, **kwargs): """ Prepare `lookup` (kwargs that are valid model attributes), `params` (for creating a model instance) based on given kwargs; for use by get_or_create() and update_or_create(). """ defaults = defaults or {} lookup = kwargs.copy() for f in self.model._meta.fields: if f.attname in lookup: lookup[f.name] = lookup.pop(f.attname) params = {k: v for k, v in kwargs.items() if LOOKUP_SEP not in k} params.update(defaults) property_names = self.model._meta._property_names invalid_params = [] for param in params: try: self.model._meta.get_field(param) except exceptions.FieldDoesNotExist: # It‘s okay to use a model‘s property if it has a setter. if not (param in property_names and getattr(self.model, param).fset): invalid_params.append(param) if invalid_params: raise exceptions.FieldError( "Invalid field name(s) for model %s: ‘%s‘." % ( self.model._meta.object_name, "‘, ‘".join(sorted(invalid_params)), )) return lookup, params def _earliest_or_latest(self, *fields, field_name=None): """ Return the latest object, according to the model‘s ‘get_latest_by‘ option or optional given field_name. """ if fields and field_name is not None: raise ValueError(‘Cannot use both positional arguments and the field_name keyword argument.‘) order_by = None if field_name is not None: warnings.warn( ‘The field_name keyword argument to earliest() and latest() ‘ ‘is deprecated in favor of passing positional arguments.‘, RemovedInDjango30Warning, ) order_by = (field_name,) elif fields: order_by = fields else: order_by = getattr(self.model._meta, ‘get_latest_by‘) if order_by and not isinstance(order_by, (tuple, list)): order_by = (order_by,) if order_by is None: raise ValueError( "earliest() and latest() require either fields as positional " "arguments or ‘get_latest_by‘ in the model‘s Meta." ) assert self.query.can_filter(), "Cannot change a query once a slice has been taken." obj = self._chain() obj.query.set_limits(high=1) obj.query.clear_ordering(force_empty=True) obj.query.add_ordering(*order_by) return obj.get() def earliest(self, *fields, field_name=None): return self._earliest_or_latest(*fields, field_name=field_name) def latest(self, *fields, field_name=None): return self.reverse()._earliest_or_latest(*fields, field_name=field_name) def first(self): """Return the first object of a query or None if no match is found.""" for obj in (self if self.ordered else self.order_by(‘pk‘))[:1]: return obj def last(self): """Return the last object of a query or None if no match is found.""" for obj in (self.reverse() if self.ordered else self.order_by(‘-pk‘))[:1]: return obj def in_bulk(self, id_list=None, *, field_name=‘pk‘): """ Return a dictionary mapping each of the given IDs to the object with that ID. If `id_list` isn‘t provided, evaluate the entire QuerySet. """ assert self.query.can_filter(), "Cannot use ‘limit‘ or ‘offset‘ with in_bulk" if field_name != ‘pk‘ and not self.model._meta.get_field(field_name).unique: raise ValueError("in_bulk()‘s field_name must be a unique field but %r isn‘t." % field_name) if id_list is not None: if not id_list: return {} filter_key = ‘{}__in‘.format(field_name) batch_size = connections[self.db].features.max_query_params id_list = tuple(id_list) # If the database has a limit on the number of query parameters # (e.g. SQLite), retrieve objects in batches if necessary. if batch_size and batch_size < len(id_list): qs = () for offset in range(0, len(id_list), batch_size): batch = id_list[offset:offset + batch_size] qs += tuple(self.filter(**{filter_key: batch}).order_by()) else: qs = self.filter(**{filter_key: id_list}).order_by() else: qs = self._chain() return {getattr(obj, field_name): obj for obj in qs} def delete(self): """Delete the records in the current QuerySet.""" assert self.query.can_filter(), "Cannot use ‘limit‘ or ‘offset‘ with delete." if self._fields is not None: raise TypeError("Cannot call delete() after .values() or .values_list()") del_query = self._chain() # The delete is actually 2 queries - one to find related objects, # and one to delete. Make sure that the discovery of related # objects is performed on the same database as the deletion. del_query._for_write = True # Disable non-supported fields. del_query.query.select_for_update = False del_query.query.select_related = False del_query.query.clear_ordering(force_empty=True) collector = Collector(using=del_query.db) collector.collect(del_query) deleted, _rows_count = collector.delete() # Clear the result cache, in case this QuerySet gets reused. self._result_cache = None return deleted, _rows_count delete.alters_data = True delete.queryset_only = True def _raw_delete(self, using): """ Delete objects found from the given queryset in single direct SQL query. No signals are sent and there is no protection for cascades. """ return sql.DeleteQuery(self.model).delete_qs(self, using) _raw_delete.alters_data = True def update(self, **kwargs): """ Update all elements in the current QuerySet, setting all the given fields to the appropriate values. """ assert self.query.can_filter(), "Cannot update a query once a slice has been taken." self._for_write = True query = self.query.chain(sql.UpdateQuery) query.add_update_values(kwargs) # Clear any annotations so that they won‘t be present in subqueries. query._annotations = None with transaction.atomic(using=self.db, savepoint=False): rows = query.get_compiler(self.db).execute_sql(CURSOR) self._result_cache = None return rows update.alters_data = True def _update(self, values): """ A version of update() that accepts field objects instead of field names. Used primarily for model saving and not intended for use by general code (it requires too much poking around at model internals to be useful at that level). """ assert self.query.can_filter(), "Cannot update a query once a slice has been taken." query = self.query.chain(sql.UpdateQuery) query.add_update_fields(values) self._result_cache = None return query.get_compiler(self.db).execute_sql(CURSOR) _update.alters_data = True _update.queryset_only = False def exists(self): if self._result_cache is None: return self.query.has_results(using=self.db) return bool(self._result_cache) def _prefetch_related_objects(self): # This method can only be called once the result cache has been filled. prefetch_related_objects(self._result_cache, *self._prefetch_related_lookups) self._prefetch_done = True ################################################## # PUBLIC METHODS THAT RETURN A QUERYSET SUBCLASS # ################################################## def raw(self, raw_query, params=None, translations=None, using=None): if using is None: using = self.db return RawQuerySet(raw_query, model=self.model, params=params, translations=translations, using=using) def _values(self, *fields, **expressions): clone = self._chain() if expressions: clone = clone.annotate(**expressions) clone._fields = fields clone.query.set_values(fields) return clone def values(self, *fields, **expressions): fields += tuple(expressions) clone = self._values(*fields, **expressions) clone._iterable_class = ValuesIterable return clone def values_list(self, *fields, flat=False, named=False): if flat and named: raise TypeError("‘flat‘ and ‘named‘ can‘t be used together.") if flat and len(fields) > 1: raise TypeError("‘flat‘ is not valid when values_list is called with more than one field.") field_names = {f for f in fields if not hasattr(f, ‘resolve_expression‘)} _fields = [] expressions = {} counter = 1 for field in fields: if hasattr(field, ‘resolve_expression‘): field_id_prefix = getattr(field, ‘default_alias‘, field.__class__.__name__.lower()) while True: field_id = field_id_prefix + str(counter) counter += 1 if field_id not in field_names: break expressions[field_id] = field _fields.append(field_id) else: _fields.append(field) clone = self._values(*_fields, **expressions) clone._iterable_class = ( NamedValuesListIterable if named else FlatValuesListIterable if flat else ValuesListIterable ) return clone def dates(self, field_name, kind, order=‘ASC‘): """ Return a list of date objects representing all available dates for the given field_name, scoped to ‘kind‘. """ assert kind in ("year", "month", "day"), "‘kind‘ must be one of ‘year‘, ‘month‘ or ‘day‘." assert order in (‘ASC‘, ‘DESC‘), "‘order‘ must be either ‘ASC‘ or ‘DESC‘." return self.annotate( datefield=Trunc(field_name, kind, output_field=DateField()), plain_field=F(field_name) ).values_list( ‘datefield‘, flat=True ).distinct().filter(plain_field__isnull=False).order_by((‘-‘ if order == ‘DESC‘ else ‘‘) + ‘datefield‘) def datetimes(self, field_name, kind, order=‘ASC‘, tzinfo=None): """ Return a list of datetime objects representing all available datetimes for the given field_name, scoped to ‘kind‘. """ assert kind in ("year", "month", "day", "hour", "minute", "second"), "‘kind‘ must be one of ‘year‘, ‘month‘, ‘day‘, ‘hour‘, ‘minute‘ or ‘second‘." assert order in (‘ASC‘, ‘DESC‘), "‘order‘ must be either ‘ASC‘ or ‘DESC‘." if settings.USE_TZ: if tzinfo is None: tzinfo = timezone.get_current_timezone() else: tzinfo = None return self.annotate( datetimefield=Trunc(field_name, kind, output_field=DateTimeField(), tzinfo=tzinfo), plain_field=F(field_name) ).values_list( ‘datetimefield‘, flat=True ).distinct().filter(plain_field__isnull=False).order_by((‘-‘ if order == ‘DESC‘ else ‘‘) + ‘datetimefield‘) def none(self): """Return an empty QuerySet.""" clone = self._chain() clone.query.set_empty() return clone ################################################################## # PUBLIC METHODS THAT ALTER ATTRIBUTES AND RETURN A NEW QUERYSET # ################################################################## def all(self): """ Return a new QuerySet that is a copy of the current one. This allows a QuerySet to proxy for a model manager in some cases. """ return self._chain() def filter(self, *args, **kwargs): """ Return a new QuerySet instance with the args ANDed to the existing set. """ return self._filter_or_exclude(False, *args, **kwargs) def exclude(self, *args, **kwargs): """ Return a new QuerySet instance with NOT (args) ANDed to the existing set. """ return self._filter_or_exclude(True, *args, **kwargs) def _filter_or_exclude(self, negate, *args, **kwargs): if args or kwargs: assert self.query.can_filter(), "Cannot filter a query once a slice has been taken." clone = self._chain() if negate: clone.query.add_q(~Q(*args, **kwargs)) else: clone.query.add_q(Q(*args, **kwargs)) return clone def complex_filter(self, filter_obj): """ Return a new QuerySet instance with filter_obj added to the filters. filter_obj can be a Q object or a dictionary of keyword lookup arguments. This exists to support framework features such as ‘limit_choices_to‘, and usually it will be more natural to use other methods. """ if isinstance(filter_obj, Q): clone = self._chain() clone.query.add_q(filter_obj) return clone else: return self._filter_or_exclude(None, **filter_obj) def _combinator_query(self, combinator, *other_qs, all=False): # Clone the query to inherit the select list and everything clone = self._chain() # Clear limits and ordering so they can be reapplied clone.query.clear_ordering(True) clone.query.clear_limits() clone.query.combined_queries = (self.query,) + tuple(qs.query for qs in other_qs) clone.query.combinator = combinator clone.query.combinator_all = all return clone def union(self, *other_qs, all=False): # If the query is an EmptyQuerySet, combine all nonempty querysets. if isinstance(self, EmptyQuerySet): qs = [q for q in other_qs if not isinstance(q, EmptyQuerySet)] return qs[0]._combinator_query(‘union‘, *qs[1:], all=all) if qs else self return self._combinator_query(‘union‘, *other_qs, all=all) def intersection(self, *other_qs): # If any query is an EmptyQuerySet, return it. if isinstance(self, EmptyQuerySet): return self for other in other_qs: if isinstance(other, EmptyQuerySet): return other return self._combinator_query(‘intersection‘, *other_qs) def difference(self, *other_qs): # If the query is an EmptyQuerySet, return it. if isinstance(self, EmptyQuerySet): return self return self._combinator_query(‘difference‘, *other_qs) def select_for_update(self, nowait=False, skip_locked=False, of=()): """ Return a new QuerySet instance that will select objects with a FOR UPDATE lock. """ if nowait and skip_locked: raise ValueError(‘The nowait option cannot be used with skip_locked.‘) obj = self._chain() obj._for_write = True obj.query.select_for_update = True obj.query.select_for_update_nowait = nowait obj.query.select_for_update_skip_locked = skip_locked obj.query.select_for_update_of = of return obj def select_related(self, *fields): """ Return a new QuerySet instance that will select related objects. If fields are specified, they must be ForeignKey fields and only those related objects are included in the selection. If select_related(None) is called, clear the list. """ if self._fields is not None: raise TypeError("Cannot call select_related() after .values() or .values_list()") obj = self._chain() if fields == (None,): obj.query.select_related = False elif fields: obj.query.add_select_related(fields) else: obj.query.select_related = True return obj def prefetch_related(self, *lookups): """ Return a new QuerySet instance that will prefetch the specified Many-To-One and Many-To-Many related objects when the QuerySet is evaluated. When prefetch_related() is called more than once, append to the list of prefetch lookups. If prefetch_related(None) is called, clear the list. """ clone = self._chain() if lookups == (None,): clone._prefetch_related_lookups = () else: for lookup in lookups: if isinstance(lookup, Prefetch): lookup = lookup.prefetch_to lookup = lookup.split(LOOKUP_SEP, 1)[0] if lookup in self.query._filtered_relations: raise ValueError(‘prefetch_related() is not supported with FilteredRelation.‘) clone._prefetch_related_lookups = clone._prefetch_related_lookups + lookups return clone def annotate(self, *args, **kwargs): """ Return a query set in which the returned objects have been annotated with extra data or aggregations. """ self._validate_values_are_expressions(args + tuple(kwargs.values()), method_name=‘annotate‘) annotations = OrderedDict() # To preserve ordering of args for arg in args: # The default_alias property may raise a TypeError. try: if arg.default_alias in kwargs: raise ValueError("The named annotation ‘%s‘ conflicts with the " "default name for another annotation." % arg.default_alias) except TypeError: raise TypeError("Complex annotations require an alias") annotations[arg.default_alias] = arg annotations.update(kwargs) clone = self._chain() names = self._fields if names is None: names = {f.name for f in self.model._meta.get_fields()} for alias, annotation in annotations.items(): if alias in names: raise ValueError("The annotation ‘%s‘ conflicts with a field on " "the model." % alias) if isinstance(annotation, FilteredRelation): clone.query.add_filtered_relation(annotation, alias) else: clone.query.add_annotation(annotation, alias, is_summary=False) for alias, annotation in clone.query.annotations.items(): if alias in annotations and annotation.contains_aggregate: if clone._fields is None: clone.query.group_by = True else: clone.query.set_group_by() break return clone def order_by(self, *field_names): """Return a new QuerySet instance with the ordering changed.""" assert self.query.can_filter(), "Cannot reorder a query once a slice has been taken." obj = self._chain() obj.query.clear_ordering(force_empty=False) obj.query.add_ordering(*field_names) return obj def distinct(self, *field_names): """ Return a new QuerySet instance that will select only distinct results. """ assert self.query.can_filter(), "Cannot create distinct fields once a slice has been taken." obj = self._chain() obj.query.add_distinct_fields(*field_names) return obj def extra(self, select=None, where=None, params=None, tables=None, order_by=None, select_params=None): """Add extra SQL fragments to the query.""" assert self.query.can_filter(), "Cannot change a query once a slice has been taken" clone = self._chain() clone.query.add_extra(select, select_params, where, params, tables, order_by) return clone def reverse(self): """Reverse the ordering of the QuerySet.""" if not self.query.can_filter(): raise TypeError(‘Cannot reverse a query once a slice has been taken.‘) clone = self._chain() clone.query.standard_ordering = not clone.query.standard_ordering return clone def defer(self, *fields): """ Defer the loading of data for certain fields until they are accessed. Add the set of deferred fields to any existing set of deferred fields. The only exception to this is if None is passed in as the only parameter, in which case removal all deferrals. """ if self._fields is not None: raise TypeError("Cannot call defer() after .values() or .values_list()") clone = self._chain() if fields == (None,): clone.query.clear_deferred_loading() else: clone.query.add_deferred_loading(fields) return clone def only(self, *fields): """ Essentially, the opposite of defer(). Only the fields passed into this method and that are not already specified as deferred are loaded immediately when the queryset is evaluated. """ if self._fields is not None: raise TypeError("Cannot call only() after .values() or .values_list()") if fields == (None,): # Can only pass None to defer(), not only(), as the rest option. # That won‘t stop people trying to do this, so let‘s be explicit. raise TypeError("Cannot pass None as an argument to only().") for field in fields: field = field.split(LOOKUP_SEP, 1)[0] if field in self.query._filtered_relations: raise ValueError(‘only() is not supported with FilteredRelation.‘) clone = self._chain() clone.query.add_immediate_loading(fields) return clone def using(self, alias): """Select which database this QuerySet should execute against.""" clone = self._chain() clone._db = alias return clone ################################### # PUBLIC INTROSPECTION ATTRIBUTES # ################################### @property def ordered(self): """ Return True if the QuerySet is ordered -- i.e. has an order_by() clause or a default ordering on the model. """ if self.query.extra_order_by or self.query.order_by: return True elif self.query.default_ordering and self.query.get_meta().ordering: return True else: return False @property def db(self): """Return the database used if this query is executed now.""" if self._for_write: return self._db or router.db_for_write(self.model, **self._hints) return self._db or router.db_for_read(self.model, **self._hints) ################### # PRIVATE METHODS # ################### def _insert(self, objs, fields, return_id=False, raw=False, using=None): """ Insert a new record for the given model. This provides an interface to the InsertQuery class and is how Model.save() is implemented. """ self._for_write = True if using is None: using = self.db query = sql.InsertQuery(self.model) query.insert_values(fields, objs, raw=raw) return query.get_compiler(using=using).execute_sql(return_id) _insert.alters_data = True _insert.queryset_only = False def _batched_insert(self, objs, fields, batch_size): """ A helper method for bulk_create() to insert the bulk one batch at a time. Insert recursively a batch from the front of the bulk and then _batched_insert() the remaining objects again. """ if not objs: return ops = connections[self.db].ops batch_size = (batch_size or max(ops.bulk_batch_size(fields, objs), 1)) inserted_ids = [] for item in [objs[i:i + batch_size] for i in range(0, len(objs), batch_size)]: if connections[self.db].features.can_return_ids_from_bulk_insert: inserted_id = self._insert(item, fields=fields, using=self.db, return_id=True) if isinstance(inserted_id, list): inserted_ids.extend(inserted_id) else: inserted_ids.append(inserted_id) else: self._insert(item, fields=fields, using=self.db) return inserted_ids def _chain(self, **kwargs): """ Return a copy of the current QuerySet that‘s ready for another operation. """ obj = self._clone() if obj._sticky_filter: obj.query.filter_is_sticky = True obj._sticky_filter = False obj.__dict__.update(kwargs) return obj def _clone(self): """ Return a copy of the current QuerySet. A lightweight alternative to deepcopy(). """ c = self.__class__(model=self.model, query=self.query.chain(), using=self._db, hints=self._hints) c._sticky_filter = self._sticky_filter c._for_write = self._for_write c._prefetch_related_lookups = self._prefetch_related_lookups[:] c._known_related_objects = self._known_related_objects c._iterable_class = self._iterable_class c._fields = self._fields return c def _fetch_all(self): if self._result_cache is None: self._result_cache = list(self._iterable_class(self)) if self._prefetch_related_lookups and not self._prefetch_done: self._prefetch_related_objects() def _next_is_sticky(self): """ Indicate that the next filter call and the one following that should be treated as a single filter. This is only important when it comes to determining when to reuse tables for many-to-many filters. Required so that we can filter naturally on the results of related managers. This doesn‘t return a clone of the current QuerySet (it returns "self"). The method is only used internally and should be immediately followed by a filter() that does create a clone. """ self._sticky_filter = True return self def _merge_sanity_check(self, other): """Check that two QuerySet classes may be merged.""" if self._fields is not None and ( set(self.query.values_select) != set(other.query.values_select) or set(self.query.extra_select) != set(other.query.extra_select) or set(self.query.annotation_select) != set(other.query.annotation_select)): raise TypeError( "Merging ‘%s‘ classes must involve the same values in each case." % self.__class__.__name__ ) def _merge_known_related_objects(self, other): """ Keep track of all known related objects from either QuerySet instance. """ for field, objects in other._known_related_objects.items(): self._known_related_objects.setdefault(field, {}).update(objects) def resolve_expression(self, *args, **kwargs): if self._fields and len(self._fields) > 1: # values() queryset can only be used as nested queries # if they are set up to select only a single field. raise TypeError(‘Cannot use multi-field values as a filter value.‘) query = self.query.resolve_expression(*args, **kwargs) query._db = self._db return query resolve_expression.queryset_only = True def _add_hints(self, **hints): """ Update hinting information for use by routers. Add new key/values or overwrite existing key/values. """ self._hints.update(hints) def _has_filters(self): """ Check if this QuerySet has any filtering going on. This isn‘t equivalent with checking if all objects are present in results, for example, qs[1:]._has_filters() -> False. """ return self.query.has_filters() @staticmethod def _validate_values_are_expressions(values, method_name): invalid_args = sorted(str(arg) for arg in values if not hasattr(arg, ‘resolve_expression‘)) if invalid_args: raise TypeError( ‘QuerySet.%s() received non-expression(s): %s.‘ % ( method_name, ‘, ‘.join(invalid_args), ) )
参考文章:https://docs.djangoproject.com/en/2.2/ref/models/querysets/
https://www.cnblogs.com/yuanchenqi/articles/7570003.html
标签:protoc truncated leo find term esc for regex returns
原文地址:https://www.cnblogs.com/shenjianping/p/11526538.html