时间:2023-03-16 00:34:02 | 来源:电子商务
时间:2023-03-16 00:34:02 来源:电子商务
# 导入所需要的库import numpy as npimport pandas as pdimport pymysqlpymysql.install_as_MySQLdb()from sqlalchemy import create_engineimport datetime# 连接数据库(源数据层和数据仓库)AdventureOds = create_engine("mysql://UserName_1:Password_1@IP_Address_1/ods?charset=gbk")AdventureDw = create_engine("mysql://UserName_2:Password_2@IP_Address_2/dw?charset=gbk")# 读取ods_sales_orders表(订单明细表)SQLquery1 = """select sales_order_key, create_date, customer_key, english_product_name, cpzl_zw, cplb_zw, unit_price from ods_sales_orders where create_date = ( select create_date from dim_date_df order by create_date desc limit 1) """OdsSaleOrder = pd.read_sql_query(SQLquery1, con=AdventureOds)# 读取ods_customer表(每日新增用户表)SQLquery2 = """select customer_key, chinese_territory, chinese_province, chinese_city from ods_customer"""OdsCustomer = pd.read_sql_query(SQLquery2, con=AdventureOds)# 读取dim_date_df表(日期维度表)SQLquery3 = """select create_date, is_current_year, is_last_year, is_yesterday, is_today, is_current_month, is_current_quarter from dim_date_df"""OdsDimDate = pd.read_sql_query(SQLquery3, con=AdventureOds)# 数据加工SaleOrderCustomer = pd.merge(OdsSaleOrder, OdsCustomer, 'left', 'customer_key')SaleOrderCustomerP = SaleOrderCustomer.pivot_table(index=['create_date', 'english_product_name', 'cpzl_zw', 'cplb_zw', 'chinese_territory', 'chinese_province', 'chinese_city'], values=['sales_order_key', 'customer_key', 'unit_price'], aggfunc={'sales_order_key': pd.Series.nunique, 'customer_key': pd.Series.nunique, 'unit_price': sum} )SaleOrderCustomerP = SaleOrderCustomerP.reset_index()SaleOrderCustomerP.rename(columns={'customer_key':'SumCustomer', 'sales_order_key':'SumOrder', 'unit_price':'SumAmount'}, inplace=True)dw_customer_order = pd.merge(SaleOrderCustomerP, OdsDimDate, 'left', 'create_date')# 加工好的dw_customer_order表(时间_地区_产品聚合表)导出至数据仓库try: pd.read_sql_query('Truncate table dw_customer_order', con=AdventureDw)except Exception as e: print('旧表删除Error: %s' %e)dw_customer_order.to_sql('dw_customer_order', con=AdventureDw, if_exists='replace', index=False)
# 创建日期索引前,查询日期为‘2019-03-01’订单明细select * from ods_sales_orders where create_date='2019-03-01';
给ods_sales_orders表创建日期索引后,查询耗时 0.168 ms,查询速度提高 21.65 倍。可见,给表追加索引可以大大提高查询速度,提升ETL效率。# 创建日期索引后,查询日期为‘2019-03-01’订单明细create index date_index on ods_sales_orders(create_date(7));select * from ods_sales_orders where create_date='2019-03-01';
索引的工作原理:表中存在索引时,查询语句不再遍历列的所有元素,而是先遍历索引,再遍历前缀与索引相同的元素,遍历复杂度降低,从而提高查询速度。从explain结果可以看出,创建日期索引date_index后,查询首先遍历索引。# 使用explain展示查询过程explain select * from ods_sales_orders where create_date='2019-03-01';
3.2 增添多进程import multiprocessing # 导入多进程库def runtask(): # dw_order_by_day.py # dw_amount_diff.py # dw_customer_order.pydef callBackTask(arg): # 回调函数必须要有一个形参,否则将报错 print("执行回调函数",arg)if __name__ == "__main__": pool = multiprocessing.Pool(5) # 设置进程池最大同时执行进程数 for index in range(20): pool.apply_async(func=runtask,callback=callBackTask) # 并行的,有回调方法 # pool.apply(func=runtask,) # 串行的,无回调函数 pool.close() # 关闭进程池 pool.join() # 调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
multiprocessing库中的Pool就是进程池,进程池能够管理一定的进程,当有空闲进程时,则利用空闲进程完成任务,直到所有任务完成为止。import schedule # 定时执行模块import timeimport datetimeimport os # 命令窗口交互模块import requestsdef job1(): """ dw_order_by_day 每日环比表 """ print('Job1:每天8:00执行一次') print('Job1-startTime:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) os.system( "/home/anaconda3/bin/python3 /home/******/adventure/dw_order_by_day.py >> /home/******/adventure/logs/dw_order_by_day_schedule.log 2>&1 &") time.sleep(20) print('Job1-endTime:%s' % (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) print('------------------------------------------------------------------------') if __name__ == '__main__': schedule.every().day.at('08:00').do(job1) while True: schedule.run_pending() time.sleep(10) print("wait", datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
将这个文件挂在linux系统的后台,每天早上8点,自动执行,定时更新;同时将代码运行情况写入dw_order_by_day_schedule.log文件。关键词:数据,分析,销售