fork 引起的一些问题
multiprocess 建立在 fork 基础上, 所以fork 会引发很多问题,比如 多个进程使用相同的 fd。
如果只是 fd 的话可能冲突概率还不大。 但是 数据库 连接 , 尤其 消息队列的 心跳 连接,肯定会出现问题,
两种解决方法
先 释放 conn ,再 创建
- 在 fork 或者 multiprocess 之前 先去,dispose 释放 所有连接。
- 再在 数据库 或者 消息队列中实现, 对应的 retry 方法 当 执行前 自行创建 新的连接。
先 创建 proc ,再释放
不推荐
解决思路
当前很多 数据库连接中 以及消息队列连接 中 都有 自带的 dispose 方法
去在 进程拷贝之前 dispose 掉 现有的 连接对象。
dispose 和 close 的差别,
close 以后 ,还可以open 得到 同属性的连接,
但是 dispose 之后 ,会将 对象 彻底释放,下次使用必须 重新 create 连接对象。如此一来 就不会产生进程间的 连接冲突问题。
一些实际工程中的 demo
py3 oslo_service fork 启动子进程
python3.7/site-packages/oslo_service/service.py
_start_child 之前 invoke 了 _child_process
- 关闭了写管道
- 重置了随机种子
- 翻译不好 直接把 代码留下吧
def _child_process_handle_signal(self): # Setup child signal handlers differently 。。。。
完整代码
class ProcessLauncher(object): ......省略 def _child_process(self, service): self._child_process_handle_signal() # Reopen the eventlet hub to make sure we don't share an epoll # fd with parent and/or siblings, which would be bad eventlet.hubs.use_hub() # Close write to ensure only parent has it open os.close(self.writepipe) # Create greenthread to watch for parent to close pipe eventlet.spawn_n(self._pipe_watcher) # Reseed random number generator random.seed() launcher = Launcher(self.conf, restart_method=self.restart_method) launcher.launch_service(service) return launcher def _start_child(self, wrap): if len(wrap.forktimes) > wrap.workers: # Limit ourselves to one process a second (over the period of # number of workers * 1 second). This will allow workers to # start up quickly but ensure we don't fork off children that # die instantly too quickly. if time.time() - wrap.forktimes[0] < wrap.workers: LOG.info('Forking too fast, sleeping') time.sleep(1) wrap.forktimes.pop(0) wrap.forktimes.append(time.time()) pid = os.fork() if pid == 0: self.launcher = self._child_process(wrap.service) while True: self._child_process_handle_signal() status, signo = self._child_wait_for_exit_or_signal( self.launcher) if not _is_sighup_and_daemon(signo): self.launcher.wait() break self.launcher.restart() os._exit(status) LOG.debug('Started child %d', pid) wrap.children.add(pid) self.children[pid] = wrap return pid