运维开发网

进程队列补充、socket实现服务器并发、线程完结

运维开发网 https://www.qedev.com 2020-07-25 16:07 出处:网络 作者:运维开发网整理
目录 1.队列补充 2.关于python并发与并行的补充 3.TCP服务端实现并发 4.GIL全局解释器锁 5.验证多线程的作用 对结论的验证: 6.死锁现象 7.递归锁 8.信号量(了解) 9.线程队列 1.队列补充 队列内部是管道+锁(数据在队列中是阻塞的) 2.关于python并发与并行的补充 解释型语言单个进程下多个线程不可以并行,但是向C语言等其他语言中在多核情况下是可以实现并行的,所有

目录

  • 1.队列补充
  • 2.关于python并发与并行的补充
  • 3.TCP服务端实现并发
  • 4.GIL全局解释器锁
  • 5.验证多线程的作用
    • 对结论的验证:
  • 6.死锁现象
  • 7.递归锁
  • 8.信号量(了解)
  • 9.线程队列

1.队列补充

队列内部是管道+锁(数据在队列中是阻塞的)

2.关于python并发与并行的补充

解释型语言单个进程下多个线程不可以并行,但是向C语言等其他语言中在多核情况下是可以实现并行的,所有语言在单核下都是无法实现并行的,只能并发。

3.TCP服务端实现并发

#服务端
import socket
from threading import Thread



server = socket.socket()
server.bind(('127.0.0.1',6666))
server.listen(5)
def serv(conn,addr):
    while True:
        try:
            print(addr)
            rec_data = conn.recv(1024).decode('utf-8')
            print(rec_data)
            send_data = rec_data.upper()
            conn.send(send_data.encode('utf-8'))
        except Exception as e:
            print(e)
            break

while True:
    conn,addr = server.accept()
    t = Thread(target=serv,args=(conn,addr))
    t.start()
#客户端
import socket
import time

client = socket.socket()
client.connect(('127.0.0.1',6666))
while True:
    client.send(b'hello')
    data = client.recv(1024)
    print(data)
    time.sleep(1)

4.GIL全局解释器锁

在CPython中,全局解释器锁(即GIL)是一个互斥锁,可以防止一个进程中的多个线程同时(并行)执行。 锁定是必要的,主要是因为CPython的内存管理不是线程安全的。GIL的存在就是为了保证线程安全。

注意:多个线程过来执行,一旦遇到IO操作就会立马释放GIL解释器锁,交个下一个先进来的线程。

在纯计算程序中GIL锁起到锁定python解释器的作用,就是一个线程抢到解释器后不会再有其他线程抢到解释器的使用权(直到这个程序遇到IO操作,这时GIL会释放解释器的使用权)。(一个线程只有运行结束才会被内存管理机制回收)

import time
from threading import Thread,current_thread


number = 100

def task():
    global number
    number2 = number
    number = number2 - 1
    print(number,current_thread().name)

for line in range(100):
    t = Thread(target=task)
    t.start()
    
99 Thread-1
98 Thread-2
97 Thread-3
96 Thread-4
95 Thread-5
94 Thread-6
93 Thread-7
92 Thread-8
91 Thread-9
90 Thread-10
。
。
。

给这个程序加上IO操作看下打印结果:

import time
from threading import Thread,current_thread


number = 100

def task():
    global number
    number2 = number
    #print(number)
    time.sleep(1)
    number = number2 - 1
    #time.sleep(1)#如果sleep放在这里,则会打印结果都是0,这是因为线程间数据是共享的
    print(number,current_thread().name)

for line in range(100):
    t = Thread(target=task)
    t.start()
    
99 Thread-24
99 Thread-23
99 Thread-22
99 Thread-20
99 Thread-18
99 Thread-17
99 Thread-15
99 Thread-21
99 Thread-14
.
.
.

5.验证多线程的作用

什么时候使用多线程,什么时候使用多进程,多线程和多进程各有什么优缺点?

结论:在计算密集型程序中使用多进程,这时能够充分发挥计算机多核的优势;在IO密集型的程序中使用多线程,这时能够充分发挥多线程对CPU高校利用率的优势。高效执行多个进程,内有多个IO密集型程序,要使用多进程+多线程。

对结论的验证:

运算密集型操作验证:

from threading import Thread
from multiprocessing import Process
import os,time

#计算密集型多进程下运行
def work1():
    number = 0
    for line in range(50000000):
        number += 1


if __name__ == '__main__':
    #测试计算密集型
    print(os.cpu_count())#打印CPU有几个内核
    start_time = time.time()
    list1 = []
    for line in range(4):
        p = Process(target=work1)
        list1.append(p)
        p.start()
        #p.join()#join如果放在这里比在列表里执行速度慢,是因为在列表里是等所有的进程都起来之后再告诉系统要加join,而在第一个for循环里面则是每起一个进程都会告诉系统一次,这个过程需要时间。

    for p in list1:
        p.join()
    end_time = time.time()
    print(f'程序执行的时间{end_time - start_time}')
#12.24461030960083


#多线程下运行
from threading import Thread
from multiprocessing import Process
import os,time

#计算密集型
def work1():
    number = 0
    for line in range(50000000):
        number += 1
#IO密集型
def work2():
    time.sleep(1)

if __name__ == '__main__':
    #测试计算密集型
    print(os.cpu_count())#打印CPU有几个内核
    start_time = time.time()
    list1 = []
    for line in range(4):
        p = Thread(target=work1)
        list1.append(p)
        p.start()
        #p.join()#4.407487869262695

    for p in list1:
        p.join()
    end_time = time.time()
    print(f'程序执行的时间{end_time - start_time}')
#16.305360794067383

这里本人的测试结果是在计算的数据比较大时开启多进程才会有优势,如果运算数据比较小,开启多线程运算速度反而比开启多进程快得多。

IO密集型操作验证:

#多进程测试

from threading import Thread
from multiprocessing import Process
import os,time


#IO密集型
def work2():
    time.sleep(1)

if __name__ == '__main__':
    #测试IO密集型
    print(os.cpu_count())#打印CPU有几个内核
    start_time = time.time()
    list1 = []
    for line in range(4):
        p = Process(target=work2)
        list1.append(p)
        p.start()
        #p.join()#4.407487869262695

    for p in list1:
        p.join()
    end_time = time.time()
    print(f'程序执行的时间{end_time - start_time}')
#2.642327070236206


#多线程测试
from threading import Thread
from multiprocessing import Process
import os,time


#IO密集型
def work2():
    time.sleep(1)

if __name__ == '__main__':
    #测试IO密集型
    print(os.cpu_count())#打印CPU有几个内核
    start_time = time.time()
    list1 = []
    for line in range(4):
        p = Thread(target=work2)
        list1.append(p)
        p.start()
        #p.join()#4.407487869262695

    for p in list1:
        p.join()
    end_time = time.time()
    print(f'程序执行的时间{end_time - start_time}')
#1.0189073085784912

可以看出在IO密集型操作中,开相同数量的程要比开相同数量的线程执行速度慢

6.死锁现象

所谓死锁:是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁。

from threading import Lock,Thread,current_thread
import time


mutex_a = Lock()#实例化一把锁
mutex_b = Lock()

class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()

    def func1(self):
        mutex_a.acquire()
        print(f'用户{self.name}抢到了锁a')
        mutex_b.acquire()
        print(f'用户{self.name}抢到了锁b')
        mutex_b.release()
        print(f'用户{self.name}释放了锁b')
        mutex_a.release()
        print(f'用户{self.name}释放了锁a')

    def func2(self):
        mutex_b.acquire()
        print(f'用户{self.name}抢到了锁b')
        time.sleep(1)
        mutex_a.acquire()
        print(f'用户{self.name}抢到了锁a')
        mutex_a.release()
        print(f'用户{self.name}释放了锁a')
        mutex_b.release()
        print(f'用户{self.name}释放锁b')

for line in range(10):
    t = MyThread()
    t.start()
用户Thread-1抢到了锁a
用户Thread-1抢到了锁b
用户Thread-1释放了锁b
用户Thread-1释放了锁a
用户Thread-1抢到了锁b
用户Thread-2抢到了锁a

当线程1抢到了b而线程2抢到了a时,线程1要执行强a的任务,而线程2要执行抢b的任务,而两把锁都没有释放手中的锁,所以就造成了死锁的现象。

7.递归锁

递归锁用于解决死锁问题,递归锁可以被多个线程使用,当第一个线程使用时,遇到几把锁,它的引用计数就为几,只有当它的引用计数为零时才会给第二个线程使用。这样拥有递归锁的那个线程就可以将自己的锁里的内容执行完然后,其他使用递归锁的线程才可以执行。也就是但是第一个使用这把锁的线程会对这把锁加一个引用计数,只有引用计数为零时才能真正释放该锁。

from threading import Lock,Thread,RLock
import time


mutex_a = mutex_b = RLock()#实例化一把递归锁
class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()

    def func1(self):
        mutex_a.acquire()
        print(f'用户{self.name}抢到了锁a')
        mutex_b.acquire()
        print(f'用户{self.name}抢到了锁b')
        mutex_b.release()
        print(f'用户{self.name}释放了锁b')
        mutex_a.release()
        print(f'用户{self.name}释放了锁a')

    def func2(self):
        mutex_b.acquire()
        print(f'用户{self.name}抢到了锁b')
        time.sleep(0.1)
        mutex_a.acquire()
        print(f'用户{self.name}抢到了锁a')
        mutex_a.release()
        print(f'用户{self.name}释放了锁a')
        mutex_b.release()
        print(f'用户{self.name}释放锁b')

for line in range(10):
    t = MyThread()
    t.start()

8.信号量(了解)

互斥锁比喻成一个家用马桶,同一时间只能一个人用;信号量比喻成一个公厕,同一时间可以有多个人用。

from threading import Semaphore,Lock,current_thread,Thread
import time

sm = Semaphore(5)#5个马桶
#mutex = Lock()#一个马桶
def task():
    sm.acquire()
    print(f'{current_thread().name}执行任务')
    time.sleep(1)
    sm.release()

for line in range(20):
    t = Thread(target=task)
    t.start()
#这段代码的功能是每次让五个线程并发执行

9.线程队列

线程Q:线程队列 FIFO(先进先出)就是队列,面试会问。

queue 是python解释器自带的模块,进程中的Queue是python解释器自带的模块multiprocessing里面的一个类。

普通队列(FIFO):先进先出

特殊队列(LIFO):后进先出

import queue
q = queue.Queue()#先进先出
q.put(1)
q.put(2)
print('q',q.get())

q1 = queue.LifoQueue()#先进后出
q1.put(1)
q1.put(2)
print('q1',q1.get())

q 1
q1 2

优先级队列

优先级根据数字来判断,数字为几优先级就为几,1的优先级最高

若参数中传的是元组,优先级以第一个元素的数字大小为准,如果元组的第一个元素都为字母则以字母的ASCII码为准,如果第一个元素的优先级相同就判断第二个元素的顺序(汉字也会判断顺序)但是如果同一列的元素既有数字又有字母会报错。

import queue
q2 = queue.PriorityQueue()#优先级队列

q2.put((1,2,3))
q2.put((2,2,3))
q2.put((3,2,3))
print(q2.get())

(1, 2, 3)
0

精彩评论

暂无评论...
验证码 换一张
取 消