python中bottle使用实例代码

时间:2024-05-01 01:53:26 来源:网络 浏览:48次

模仿学习同事的代码来写的,主要是搞懂python中如何来组织包,如何调用包,如何读取配置文件,连接数据库,设置路由,路由分组。(注:使用的是python3.6)

整体目录设计如下:

python中bottle使用实例代码

根据调用层级从上往下来说:

首先项目根目录下的main.py是整个程序的入口,主要作用启动http服务器,调用分组路由。

main.py

import bottlefrom confg.conf import CONFfrom api.user import User db_url = CONF.db.url default_app = bottle.default_app() #相当于分组路由default_app.mount("/user", User(db_url, "").app) app = default_app if __name__ == ’__main__’: bottle.run(app=app, host="localhost", port="8000")

接着是controller层,就是api目录。api目录包括service文件夹和api下的文件。(注:一般来说controller层,service层是同级的,本项目其实api下的非service文件都是属于controller层,所以还是同一层的,因为要遵守调用顺序,不然可能会发生循环调用)。

python中bottle使用实例代码

/api/user.py文件

import logging from bottle import request#db数据库引擎from common.base import DB#调用service层from api.service.user import UserService logger = logging.getLogger("arview") class User(DB, UserService): def __init__(self, *args, **kwargs): print(">>> User init begin") logging.debug(’>>> User init begin’) super(User, self).__init__(*args, **kwargs) self.dispatch() logger.debug(’>>> User init end’) def create(self, db=None): create_body = request.json create_data = self.create_user(create_body, db) return create_data def delete(self, db=None): delete_body = request.json delete_data = self.delete_user(delete_body, db) return delete_data def list(self, db=None): list_data = self.list_user(db) return list_data #相当于分组路由 def dispatch(self): self.app.route(’/listUser’, method=’post’)(self.list) self.app.route(’/createUser’, method=’post’)(self.create) self.app.route(’/deleteUser’, method=’post’)(self.delete)

/service/user.py

import time#model层from db.models.user import UserModel class UserService(object): def list_user(self, db): user_info_list = db.query(UserModel).all() for item in user_info_list: print(item.username) return user_info_list def create_user(self, create_body, db): user_model = UserModel( username=create_body.get("username"), password=create_body.get("password"), role=create_body.get("role"), create_time=time.time() ) db.add(user_model) db.commit() return "success" def delete_user(self, delete_body, db): db.query(UserModel).filter(UserModel.id == (delete_body["id"])).delete() db.commit() return delete_body

然后是dao层也就是数据库操作层(但是明显虽然有dao层但是数据库操作的逻辑已经在service层里了)

最后是读取配置文件和创建数据库引擎。

读取配置文件使用的包是oslo_config。

conf.py

# coding:utf8# from __future__ import print_functionfrom oslo_config import cfg DEFAULT_ARVIEW_DB_NAME = ’ginblog’DEFAULT_ARVIEW_DB_USER = ’root’DEFAULT_ARVIEW_DB_USER_PASSWORD = ’33demysql’DEFAULT_ARVIEW_DB_HOST = ’81.68.179.136’DEFAULT_ARVIEW_DB_PORT = 3306DEFAULT_ARVIEW_DB_URL_TEMPLATE = ’mysql+mysqlconnector://{}:{}@’ ’{}:{}/{}?charset=utf8’DEFAULT_ARVIEW_DB_URL = DEFAULT_ARVIEW_DB_URL_TEMPLATE.format( DEFAULT_ARVIEW_DB_USER, DEFAULT_ARVIEW_DB_USER_PASSWORD, DEFAULT_ARVIEW_DB_HOST, DEFAULT_ARVIEW_DB_PORT, DEFAULT_ARVIEW_DB_NAME) # 声明参数选项opt_group = cfg.OptGroup(’keystone_authtoken’)mysql_opt_group = cfg.OptGroup(’db’) auth_opts = [ cfg.StrOpt(’memcached_servers’, default=’localhost:11211’, choices=("localhost:11211", "0.0.0.0:11211"), help=(’localhost local’, ’0.0.0.0 So listen’) ), cfg.StrOpt(’signing_dir’, default=’/var/cache/cinder’, choices=("/var/cache/cinder", "/var/cache/cinder"), ),] # mysqlmysql_opts = [ cfg.StrOpt(’url’, default=DEFAULT_ARVIEW_DB_URL), cfg.StrOpt(’Db’, default=’3mysql’), cfg.StrOpt(’DbHost’, default=’381.68.179.136’), cfg.StrOpt(’DbPort’, default=’33306’), cfg.StrOpt(’DbUser’, default=’3DbUser’), cfg.StrOpt(’DbPassWord’, default=’3DbPassWord’), cfg.StrOpt(’DbName’, default=’3DbName’), cfg.BoolOpt(’create’, default=False), cfg.BoolOpt(’commit’, default=True), cfg.BoolOpt(’echo’, default=True, help=’是否显示回显’), cfg.BoolOpt(’echo_pool’, default=False, help=’数据库连接池是否记录 checkouts/checkins操作’), cfg.IntOpt(’pool_size’, default=1000, help=’数据库连接池中保持打开的连接数量’), cfg.IntOpt(’pool_recycle’, default=600, help=’数据库连接池在连接被创建多久(单位秒)以后回收连接’)] token_opts = [ cfg.StrOpt(’project_domain_name’), cfg.StrOpt(’project_name’),] CINDER_OPTS = (auth_opts + token_opts)MYSQLCINDER_OPTS = (mysql_opts) # 注册参数选项CONF = cfg.CONF# 注册组CONF.register_group(opt_group)CONF.register_group(mysql_opt_group) # 将各个选项注册进组里CONF.register_opts(CINDER_OPTS, group=opt_group)CONF.register_opts(MYSQLCINDER_OPTS, group=mysql_opt_group) if __name__ == "__main__": # 要读取哪个配置文件 CONF(default_config_files=[’cinder.conf’]) print(’mysql Db配置组为%s’ % (CONF.db.Db)) print(’mysql DbHost%s’ % (CONF.db.DbHost)) print(’mysql DbPort配置组为%s’ % (CONF.db.DbPort)) print(’mysql DbUser%s’ % (CONF.db.DbUser))

配置文件cinder.conf

[db]Db = mysqlDbHost = 81.68.179.136DbPort = 3306DbUser = rootDbPassWord = 33demysqlDbName = ginblogcreate = falsecommit = trueecho = falseecho_pool = falsepool_size = 1000pool_recycle =600

它的使用方法是,先声明参数选项就是(相当于声明组)

mysql_opt_group = cfg.OptGroup(’db’),

然后声明组内的选项,

mysql_opts = [ cfg.StrOpt(’url’, default=DEFAULT_ARVIEW_DB_URL), cfg.StrOpt(’Db’, default=’3mysql’), cfg.StrOpt(’DbHost’, default=’381.68.179.136’), cfg.StrOpt(’DbPort’, default=’33306’), cfg.StrOpt(’DbUser’, default=’3DbUser’), cfg.StrOpt(’DbPassWord’, default=’3DbPassWord’), cfg.StrOpt(’DbName’, default=’3DbName’), cfg.BoolOpt(’create’, default=False), cfg.BoolOpt(’commit’, default=True), cfg.BoolOpt(’echo’, default=True, help=’是否显示回显’), cfg.BoolOpt(’echo_pool’, default=False, help=’数据库连接池是否记录 checkouts/checkins操作’), cfg.IntOpt(’pool_size’, default=1000, help=’数据库连接池中保持打开的连接数量’), cfg.IntOpt(’pool_recycle’, default=600, help=’数据库连接池在连接被创建多久(单位秒)以后回收连接’)]

拼接选项

MYSQLCINDER_OPTS = (mysql_opts)

接着注册组,

CONF.register_group(mysql_opt_group)

最后将选项注册进组。

CONF.register_opts(MYSQLCINDER_OPTS, group=mysql_opt_group)

当然最重要的注册参数选项,我的理解就是暴露句柄。

# 注册参数选项
CONF = cfg.CONF

然后创建数据库引擎

common/utils/sqlalchemy_util.py

import loggingfrom json import loads as json_loads from sqlalchemy.engine import create_enginefrom sqlalchemy.pool import QueuePoolfrom confg import CONF SQLALCHEMY_ENGINE_CONTAINER = {} logger = logging.getLogger("arview") def json_deserializer(s, **kw): if isinstance(s, bytes): return json_loads(s.decode(’utf-8’), **kw) else: return json_loads(s, **kw) def get_sqlalchemy_engine(db_url): if db_url not in SQLALCHEMY_ENGINE_CONTAINER: engine = create_engine(db_url, echo=CONF.db.echo, # pool_pre_ping如果值为True,那么每次从连接池中拿连接的时候,都会向数据库发送一个类似 # select 1的测试查询语句来判断服务器是否正常运行。当该连接出现disconnect的情况时, # 该连接连同pool中的其它连接都会被回收 pool_pre_ping=True, echo_pool=CONF.db.echo_pool, pool_size=CONF.db.pool_size, pool_recycle=CONF.db.pool_recycle, json_deserializer=json_deserializer, poolclass=QueuePool) logger.info(’Create sqlalchemy engine %s’, engine) SQLALCHEMY_ENGINE_CONTAINER[db_url] = engine return SQLALCHEMY_ENGINE_CONTAINER[db_url]

这里引用配置文件的数据,直接引入CONF

from confg import CONF

然后使用

CONF.db.echo_pool

创建句柄,

 与我之前使用的方法不同的是,这里的数据库引擎不需要在使用的地方引入了,会在main里注册路由分组时,通过plugin插件自动将数据库引擎导入。这也是我有点搞不懂的地方,虽然更方便,但是不知道就很难知道了,问了同事才知道是怎么回事。

bottle源码

def install(self, plugin): ’’’ Add a plugin to the list of plugins and prepare it for being applied to all routes of this application. A plugin may be a simple decorator or an object that implements the :class:`Plugin` API. ’’’

plugin就是相当与golang的中间件,不过作用范围是全部路由。

这里创建数据库句柄并使用是一个比较绕的过程。总体思路:

1.写一个bottle plugin,创建数据库句柄,然后install安装这个plugin。就可以在所有的路由中自动引入这个插件(就是不用在包里在导入db句柄了,bottle会自动导入)。

/common/base.py 创建plugin并安装

import loggingfrom bottle import Bottlefrom confg.conf import CONFfrom sqlalchemy import create_enginefrom sqlalchemy.orm import sessionmaker, scoped_sessionfrom db.models.base import Base as ApiModelBasefrom common.utils.sqlalchemy_util import get_sqlalchemy_enginefrom bottle_sqlalchemy import SQLAlchemyPlugin logger = logging.getLogger("arview")base = ApiModelBase # sqlalchemy orm base class class Plugins: SQLALCHEMY_PLUGIN = None # sqlalchemy plugin, global only one instance APSCHEDULER_PLUGIN = None # apsechduler plugin. global only one instance class Base(object): def __init__(self, *args, **kwargs): logger.debug(’>>>> Base init begin’) self.app = Bottle() # self.app.install(SwaggerPlugin(self._type)) logger.debug(’>>>> Base init end’) class DB(Base): def __init__(self, db_url, create=None, commit=None, *args, **kwargs): print(’db_url:’, db_url) super(DB, self).__init__(*args, **kwargs) if create is None: create = CONF.db.create if commit is None: commit = CONF.db.commit if Plugins.SQLALCHEMY_PLUGIN is None: Plugins.SQLALCHEMY_PLUGIN = _create_sqlalchemy_plugin(db_url, create=create, commit=commit) self.app.install(Plugins.SQLALCHEMY_PLUGIN) logger.debug("Install plugin: sqlalchemy.") # if CONF.api.enable_request_interval_plugin: # self.app.install(RequestTimeIntervalPlugin()) logger.debug(’>>>> DB init end’) class CommonBase(object): def __init__(self): self._db = None @property def db(self): if not self._db: DBURL = "mysql+mysqlconnector://{}:{}@{}:{}/{}?charset=utf8".format(CONF.mysql.DbUser, CONF.mysql.DbPassWord, CONF.mysql.DbHost, CONF.mysql.DbPort, CONF.mysql.DbName) engine = create_engine(DBURL, echo=False) self._db = sessionmaker()(bind=engine) return self._db @db.deleter def db(self): if self._db: self._db.commit() self._db.close() self._db = None def _create_sqlalchemy_plugin(db_url, create, commit): """ 创建sqlalchemy插件 :param db_url: :param echo: :param create: :param commit: :return: """ logger.debug(’>>>> create sqlalchemy plugin begin’) engine = get_sqlalchemy_engine(db_url) plugin = SQLAlchemyPlugin(engine, metadata=ApiModelBase.metadata, create=create, commit=commit, use_kwargs=True) logger.debug(’>>>> create sqlalchemy plugin %s’ % plugin) return plugin

最后使用

/api/user.py

import logging from bottle import request from common.base import DBfrom api.service.user import UserService logger = logging.getLogger("arview") class User(DB, UserService): def __init__(self, *args, **kwargs): print(">>> User init begin") logging.debug(’>>> User init begin’) super(User, self).__init__(*args, **kwargs) self.dispatch() logger.debug(’>>> User init end’) def create(self, db=None): create_body = request.json create_data = self.create_user(create_body, db) return create_data def delete(self, db=None): delete_body = request.json delete_data = self.delete_user(delete_body, db) return delete_data def list(self, db=None): list_data = self.list_user(db) return list_data def dispatch(self): self.app.route(’/listUser’, method=’post’)(self.list) self.app.route(’/createUser’, method=’post’)(self.create) self.app.route(’/deleteUser’, method=’post’)(self.delete)

这里的db就不需要导入了,可以直接使用。

db层
主要是模型层 /db/model/user.py

from sqlalchemy import Column, String, Enum, TIMESTAMP, Boolean, Integer, BIGINT, DATETIME from db.models.base import Base class UserModel(Base): __tablename__ = "user" id = Column("id", BIGINT, primary_key=True, comment="用户id") created_at = Column("created_at", DATETIME, comment="创建时间") updated_at = Column("updated_at", DATETIME, comment="更新时间") deleted_at = Column("deleted_at", DATETIME, comment="删除时间") username = Column("username", String(20), comment="用户名") password = Column("password", String(500), comment="密码") role = Column("role", BIGINT, comment="角色") def __init__(self, id, created_at, updated_at, deleted_at, username, password, role): self.id = id self.created_at = created_at self.updated_at = updated_at self.deleted_at = deleted_at self.username = username self.password = password self.role = role

/db/model/base.py

from datetime import datetime from sqlalchemy import Column, TIMESTAMPfrom sqlalchemy.ext.declarative import declarative_base # sqlalchemy orm base classBase = declarative_base() class TimestampMixin(object): """为ORM提供时间戳基类""" created_at = Column(’created_at’, TIMESTAMP(True), default=datetime.now, comment=u"创建时间") updated_at = Column(’updated_at’, TIMESTAMP(True), default=datetime.now, onupdate=datetime.now, comment=u"更新时间")

到此这篇关于python bottle使用实例的文章就介绍到这了,更多相关python bottle使用内容请搜索ABC学习网以前的文章或继续浏览下面的相关文章希望大家以后多多支持ABC学习网!

(window.slotbydup = window.slotbydup || []).push({id: "u6915441",container: "_5rmj5io5v3i",async: true});
评论
评论
发 布