使用 sqlalchemy 有3种方式: 方式1, 使用raw sql; 方式2, 使用SqlAlchemy的sql expression; 方式3, 使用ORM. 前两种方式可以统称为 core 方式.
创建数据模型 http://flask-sqlalchemy.pocoo.org/2.3/models/
添加数据 if __name__ == "__main__" : app = create_app(os.getenv('FLASK_CONFIG' ) or 'default' ) with app.app_context(): dev1 = Device('sn1' ,'nick1' ,'comment1' ) dev2 = Device('sn2' ,'nick2' ,'comment2' ) dev3 = Device('sn3' , 'nick3' , 'comment3' ) dev4 = Device('sn4' , 'nick4' , 'comment4' ) group1 = Group('g1' , 'comment1' ) group2 = Group('g2' , 'comment2' ) db.session.add_all([group1,group2]) db.session.commit() print (User.query.all ()) print (User.query.filter_by(name='admin' ).first())
使用 flask-sqlalchemy 操作数据库 mongodb pip install Flask-MongoAlchemy SQLALCHEMY_DATABASE_URI = 'mongodb://localhost/stockServiceData'
使用 flask-sqlalchemy 操作数据库mysql. 数据库配置:注意这里面的使用+pymysql SQLALCHEMY_DATABASE_URI = 'mysql+pymysql://gisServer:adminGisServer@localhost:3306/gisServer' SQLALCHEMY_COMMIT_ON_TEARDOWN = True SQLALCHEMY_TRACK_MODIFICATIONS = False 数据库的相关操作 from GisService import dbfrom model import Useradmin = User('admin' , 'admin@example.com' ,'123456' ) db.create_all() db.session.add(admin) db.session.commit() db.session.add_all([User('admin1' , 'admin1@example.com' ,'123456' ),User('admin2' , 'admin2@example.com' ,'123456' )]) db.session.commit() admin = Role.query.filter_by(name='Admin' ).first() admin.name='Administrator' db.session.commit() mod = User.query.filter_by(name='Admin2' ).first() db.session.delete(mod) db.session.commit() User.query.all () ```sql userCount = Staff.query.count()
User.query.filter_by(username='admin' ).first() User.query.filter_by(username='admin' ).all () User.query.filter_by(username='admin' , email='admin2@example.com' ).all () User.query.filter_by(username='admin' ).count() User.query.count() User.query.with_entities(func.sum (User.id )).all () User.query.with_entities(func.avg(User.role_id)).all () User.query.order_by(User.role_id).all () User.query.order_by(User.role_id.desc()).all () User.query.group_by(User.role_id).all () User.query.limit(1 ).all () User.query.limit(1 ).offset(2 ).all () db.drop_all()
模糊查询 try: if request.values.has_key('name_search') or request.values.has_key('phone_search'): name_search = request.values.get('name_search') queryString = "%{}%".format(name_search) users = Staff.query.filter(or_(Staff.name.like(queryString),Staff.phone.like(queryString))).all() else: users = Staff.query.all() return jsonify({'total': len(users), 'rows': [i.serialize for i in users]}) except: import traceback traceback.print_exc() errMsg = traceback.format_exc() print(errMsg) return jsonify({'code': 100, 'msg': '后台处理出现异常情况,请稍后在试'})
sql语句查询 try : sql = '''SELECT xxx ;''' result = db.engine.execute(sql) names = [] children = [] row = {} group = "" for row in result: simId = row[2 ] ... return jsonify(names) except : import traceback traceback.print_exc() errMsg = traceback.format_exc() print (errMsg) return jsonify({'code' : 100 , 'msg' : '后台处理出现异常情况,请稍后在试' })
参数化查询 sql = '''SELECT f_speed,f_longitude,f_latitude, f_datetime,f_direction FROM t_position201802 WHERE f_devId = ? and f_datetime >= ? and f_datetime <= ? ;''' result = db.engine.execute(sql,(data["devId" ],data["startTime" ],data["endTime" ])) names = [] for row in result: simId = row[2 ] ... return jsonify(names)
数据库迁移 export FLASK_APP=run.py 创建一个空数据库 在model文件里面设置好数据对象 flask db init 每次改动过model之后都需要执行一次下面的循环 flask db migrate -m "initial migrateion" flask db upgrade 执行上面的操作后,数据库会生成对应的表 然后就是每次改动过model之后都需要执行一次migrate,upgrade的循环 获取帮助 flask db --help