码迷,mamicode.com
首页 > 数据库 > 详细

ORM框架实现 & sqlalchemy

时间:2020-12-18 13:13:19      阅读:4      评论:0      收藏:0      [点我收藏+]

标签:erro   class   rom   style   ada   prim   exce   auto   ISE   

  

import pymysql
from pymysql.cursors import DictCursor


# class Field:
#     def __init__(self,name,column=None,chief=False,unique=False,index=False,nullable=True,default=None):
#         self.name=name
#         if column is None:
#             self.column=name
#         else:
#             self.column=column
#         self.chief=chief
#         self.unique=unique
#         self.index=index
#         self.nullable=nullable
#         self.default=default
#
#     def validate(self,value):
#         raise NotImplementedError
#
#     def __get__(self,instance,owner):
#         if instance is None:
#             return self
#         return instance.__dict__[self.name]
#
#     def __set__(self,instance,value):
#         self.validate(value)
#         instance.__dict__[self.name]=value
#
#     def __str__(self):
#         return <{} <{}>>.format(self.__class__.__name__,self.name)
#
#     __repr__=__str__
#
# class IntField(Field):
#     def __init__(self,name,column=None,chief=False,unique=False,index=False,nullable=True,default=None,
#                  auto_increment=False):
#         super(IntField,self).__init__(name,column,chief,unique,index,nullable,default)
#         self.auto_increment=auto_increment
#
#     def validate(self,value):
#         if value is None:
#             if self.chief:
#                 raise TypeError({} is primary key,yet value is None.format(self.name))
#             if not self.nullable:
#                 raise TypeError({} required.format(self.name))
#         else:
#             if not isinstance(value,int):
#                 raise GeneratorExit({} should be integer.format(value))
#
# class StringField(Field):
#     def __init__(self,name,column=None,chief=None,unique=None,index=None,nullable=True,default=None,length=None):
#         super(StringField,self).__init__(name,column,chief,unique,index,nullable,default)
#         self.length=length
#
#     def validate(self,value):
#         if value is None:
#             if self.chief:
#                 raise TypeError({} is primary key,yet value is None.format(self.name))
#             if not self.nullable:
#                 raise TypeError({} required.format(self.name))
#         else:
#             if not isinstance(value, str):
#                 raise GeneratorExit({} should be str.format(value))
#             if len(value) > self.length:
#                 raise ValueError({} too long.format(value))
#
# class S:
#     id=IntField(id,id,True,nullable=False,auto_increment=True)
#     name=StringField(name,nullable=False,length=64)
#     age=IntField(age)
#
#     def __init__(self,id,name,age):
#         self.id=id
#         self.name=name
#         self.age=age
#
#     def __str__(self):
#         return S({} {} {}).format(self.id,self.name,self.age)
#
#     __repr__=__str__
#
#     def save(self,conn:pymysql.connections.Connection):
#         sql=insert into s (id,name,age) values (%s,%s,%s)
#         cursor=conn.cursor(cursor=DictCursor)
#         cursor.execute(sql,args=(self.id,self.name,self.age))
#         cursor.close()

class Field:
    def __init__(self, name=None, column=None, chief=False, unique=False, index=False, nullable=True, default=None):
        self.name = name
        if column is None:
            self.column = name
        else:
            self.column = column
        self.chief = chief
        self.unique = unique
        self.index = index
        self.nullable = nullable
        self.default = default

    def validate(self, value):
        raise NotImplementedError()

    def __get__(self, instance, owner):
        if instance is None:
            return self
        return instance.__dict__[self.name]

    def __set__(self, instance, value):
        self.validate(value)
        instance.__dict__[self.name] = value

    def __str__(self):
        return <{} {}>.format(self.__class__.__name__, self.name)

    __repr__ = __str__


class IntField(Field, object):
    def __init__(self, name=None, column=None, chief=False, unique=False, index=False, nullable=True, default=None,
                 auto_increment=False):
        self.auto_increment = auto_increment
        super(IntField, self).__init__(name, column, chief, unique, index, nullable, default)

    def validate(self, value):
        if value is None:
            if self.chief:
                raise TypeError({} is primary key,yet value is None.format(self.name))
            if not self.nullable:
                raise TypeError({} required.format(self.name))
        else:
            if not isinstance(value, int):
                raise GeneratorExit({} should be integer.format(value))


class StringField(Field):
    def __init__(self, length, name=None, column=None, chief=False, unique=False, index=False, nullable=True,
                 default=None, ):
        self.length = length
        super(StringField, self).__init__(name, column, chief, unique, index, nullable, default)

    def validate(self, value):
        if value is None:
            if self.chief:
                raise TypeError({} is primary key,yet value is None.format(self.name))
            if not self.nullable:
                raise TypeError({} required.format(self.name))
        else:
            if not isinstance(value, str):
                raise GeneratorExit({} should be str.format(value))
            if len(value) > self.length:
                raise ValueError({} too long.format(value))


class Session:
    def __init__(self, conn: pymysql.connections.Connection):
        self.conn = conn
        self.cursor = None
        self.mark = False

    def execute(self, sql, *args, **kwargs):
        if self.cursor is None:
            self.mark = True
            self.cursor = self.conn.cursor(cursor=DictCursor)
        if args:
            self.cursor.execute(sql, args)
        if kwargs:
            self.cursor.execute(sql, args=kwargs)
        if self.mark:
            self.conn.commit()
            self.mark = False

    def __enter__(self):
        self.cursor = self.conn.cursor(cursor=DictCursor)
        return self  # 以后调用session对象的execute

    def __exit__(self, exc_type, exc_value, exc_tb):
        self.cursor.close()
        print(exc_type, exc_value, exc_tb)
        if exc_type:
            self.conn.rollback()
        else:
            self.conn.commit()


# class Session:
#     def __init__(self, conn: pymysql.connections.Connection):
#         self.conn = conn
#
#     def execute(self, sql, *args, **kwargs):
#         try:
#             cursor = self.conn.cursor(cursor=DictCursor)
#             with cursor:
#                 if args:
#                     cursor.execute(sql, args)
#                 if kwargs:
#                     cursor.execute(sql, kwargs)
#         except:
#             self.conn.rollback()
#         else:
#             self.conn.commit()


class ModelMeta(type):
    def __new__(cls, what: str, bases, attrs: dict):
        # print(what,bases,attrs)
        if attrs.get(__tablename__, None) is None:
            # if __tablename__ not in attrs.keys():
            attrs.setdefault(__tablename__, what.lower())

        mapping = {}
        primary_key = []

        for k, v in attrs.items():
            if isinstance(v, (Field,)):
                if v.name is None:
                    v.name = k  # descriptor
                if v.column is None:
                    v.column = k

                mapping[k] = v

                if v.chief:
                    primary_key.append(v)

        attrs.setdefault(__mapping__, mapping)
        attrs.setdefault(__primary_key__, primary_key)

        return super(ModelMeta, cls).__new__(cls, what, bases, attrs)  # 需要return,否则Model为None


class Model(metaclass=ModelMeta):
    def save(self, session: Session):
        names = []
        columns = []
        values = []
        for k, v in type(self).__dict__.items():  # 遍历self实例字典,v是值,而type(self)字典为描述器,借助描述器得到column
            if isinstance(v, (Field,)):
                if k in self.__dict__.keys():
                    names.append(k)
                    columns.append(v.column)
                    values.append(self.__dict__[k])
        print(names, columns, values)

        sql = insert into {} ({}) values ({}).format(
            self.__tablename__,
            ,.join(columns),
            ,.join([%s] * len(columns))
        )

        with session:
            session.execute(sql, *values)


class S(Model):
    __tablename__ = pp
    id = IntField(id, column=id, chief=True, nullable=False, auto_increment=True)
    sname = StringField(64, sname, column=name, nullable=False)
    age = IntField(age, chief=True)

    def __init__(self, id, name, age):
        self.id = id
        self.sname = name
        self.age = age

    def __str__(self):
        return Student({},{},{}).format(self.id, self.sname, self.age)

    __repr__ = __str__

    # def save(self, conn: pymysql.connections.Connection):
    #     sql = insert into pp (id,name,age) values (%(id)s,%(name)s,%(age)s)
    #     try:
    #         cursor = conn.cursor(cursor=DictCursor)
    #         with cursor:
    #             cursor.execute(sql, args={id: self.id, name: self.name, age: self.age})
    #         conn.commit()
    #     except:
    #         conn.rollback()
    #
    # def save(self, session: Session):
    #     sql = insert into pp (id,name,age) values (%(id)s,%(name)s,%(age)s)
    #     session.execute(sql, id=self.id, name=self.name, age=self.age)
    #     # with session:
    #     #     session.execute(sql,id=self.id,name=self.name,age=self.age)
    #
    # def save(self, session: Session):
    #     sql = insert into pp (id,age,name) values (%(id)s,%(age)s,%(name)s)
    #     session.execute(sql, name=self.name, age=self.age, id=self.id)

class Engine:
    def __init__(self,*args,**kwargs):
        self.conn=pymysql.connect(*args,**kwargs)

    def save(self,instance:S):
        names=[]
        columns=[]
        values=[]
        for k,v in instance.__mapping__.items():
            if k in instance.__dict__.keys():
                names.append(`{}`.format(k))
                columns.append(`{}`.format(v.column))
                values.append(instance.__dict__[k])

        query=insert into {} ({}) values ({}).format(
            instance.__tablename__,
            ,.join(columns),
            ,.join([%s]*len(columns))
        )

        try:
            cursor=self.conn.cursor(cursor=DictCursor)
            with cursor:
                cursor.execute(query,args=values)
        except Exception as e:
            print(e.args,e.__context__)
            self.conn.rollback()
        else:
            self.conn.commit()

conn = pymysql.connections.Connection(localhost, root, cruces, uranus)
s = S(444, zxc, 88)
print(s.id, s.sname, s.age)
# s.save(Session(conn))
print(s.__dict__)
print(S.__dict__)
engine=Engine(localhost, root, cruces, uranus)
engine.save(s)
# for k,v in S.__dict__.items():
#     # print(v,v.__class__.__bases__,type(v.__class__))
#     if v.__class__.__bases__[0] == Field:
#         print(k)

 

import sqlalchemy
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column,Integer,String,Float,Enum,Date

Base=declarative_base()

class Student(Base):
    __tablename__=Stu
    id=Column(Integer,primary_key=True,nullable=False,autoincrement=True)
    name=Column(String(64),nullable=False)
    age=Column(Float,unique=True)

    def __repr__(self):
        return <{} id:{}, name:{}, age:{}>.format(self.__class__.__name__,self.id,self.name,self.age)

    __str__=__repr__

engine=sqlalchemy.create_engine("mysql+pymysql://root:cruces@localhost:3306/uranus",echo=True)

Base.metadata.create_all(engine)

 

ORM框架实现 & sqlalchemy

标签:erro   class   rom   style   ada   prim   exce   auto   ISE   

原文地址:https://www.cnblogs.com/dissipate/p/14131050.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!