Nachos Lab03 同步机制

第一部分

Exercise1 调研

调研Linux中实现的同步机制。具体内容见课堂要求。

Linux中的同步机制有:

  • 中断屏蔽。由于内核的进程调度等操作依赖中断实现,因此可以避免抢占进程间的并发。仅适用于单CPU,不能解决多CPU引发的竞争。
  • 原子操作。原子操作在内核中主要保护某个共享变量,防止该变量被同时访问造成数据不同步问题。其定义的数据操作中间不会被中断。
  • 信号量。阻塞式等待的同步互斥机制,保证进程能够正确合理的使用公共资源。
  • 自旋锁。使用忙等的方法,进程不会阻塞。适用于保持时间较短、可抢占的内核线程、多处理器间共享数据的应用场合。
  • 读写锁、读写自旋锁、读写信号量。分别针对读、写操作做的处理。写操作时仅允许一个进程进入临界区,读操作允许多个进程进入临界区。适用于读远比写操作频繁的场合。
  • RCU机制。随意读,但是在更新数据的时候,先复制一份副本,在副本上完成修改再一次性地更新旧数据。同样是针对读多写少的场合。
  • 内存和优化屏障。让程序在处理完屏障之前的代码之前,不会处理屏障之后的代码。
  • 顺序锁。更偏向写者的读写锁。
  • 大内核锁。与普通的锁类似,但是其存在自动释放的特性,进程会因为主动放弃CPU时自动释放锁,换入CPU时重新获得锁。

Exercise2 源代码阅读

阅读下列源代码,理解Nachos现有的同步机制。

  • code/threads/synch.h和code/threads/synch.cc

  • code/threads/synchlist.h和code/threads/synchlist.cc

code/threads/synch.h**和code/threads/synch.cc:Nachos提供了三种同步机制信号量、锁和条件变量机制。

  • 信号量:仅包含P,V两种操作,以及非负的信号量值和等待队列。当信号量值大于0时,P操作将信号量值减少;当信号量值等于0时,P操作将当前线程放入等待队列并睡眠。而V操作则从等待队列取出线程放入就绪队列,然后增加信号量值。

    (P操作中的while不能换成if,因为当线程1被唤醒时仍要判断一次value的值,否则可能存在线程刚被唤醒就被换出了CPU。然后新的线程2执行P操作获得了资源,但还未执行V操作时,线程1重新换入CPU,此时value的值是0,但是由于此时没有再次判断value的值,就会导致线程1也会进入临界区。)

  • 锁:一个锁有两种状态BUSY和FREE。当锁处于FREE,线程可以取得锁进入临界区,执行完后,仅能由该线程释放锁。当锁处理BUSY时,申请锁的线程会进入阻塞状态,直到锁被释放变为FREE才被唤醒。

  • 条件变量机制:该机制不包含任何值,或者说可以将其看作一个二值的机制。它需要与锁一同使用。所有的操作仅当线程获得了锁并且是同一个锁的时候,能够执行线程等待条件变量(Wait),唤醒一个等待该条件变量的线程(Signal),唤醒所有等待该条件变量的线程(BroadCast

code/threads/synchlist.h和code/threads/synchlist.cc:这里实现了用于同步访问的列表,该列表利用了锁和条件变量两个机制。该列表用于1.当线程需要从中删除元素时,如果没有元素则会让线程等待,直到列表有一个元素;2. 同一时刻仅有一个线程可以访问该列表。

关于线程阻塞后中断的问题

首先观察线程Thread类下的Yield函数。Yield函数会先关中断;然后在Yield函数的末尾,会直接执行调度器的Run方法去执行下一个线程,此时下一个线程是处于关中断状态的。但是,当执行Yield的线程被重新换入CPU的时候,会从Run后面继续执行,也就会进行开中断。也就是说,上一个线程在Yield中关中断,切换到下一个线程,然后在下一个线程的Yield中去开中断。

Yield函数
1
2
3
4
5
IntStatus oldLevel = interrupt->SetLevel(IntOff);
...
scheduler->ReadyToRun(this);
scheduler->Run(nextThread);
(void) interrupt->SetLevel(oldLevel);

这里会存在一个新的问题。

如果新换上的线程是刚刚进过Fork创建的线程,而不是执行Yield换出过的线程,那么新线程又是如何开中断的呢?

我们进一步观察Fork函数的实现。可以看到在其内部调用了StackAllocate函数来创建线程栈,并且初始化了一些机器状态,其中就有StartupPCState状态指向了开中断函数。这样上述问题就解决了,新线程会在首次执行的时候开中断一次。

1
2
3
4
5
machineState[PCState] = (int*)ThreadRoot;
machineState[StartupPCState] = (int*)InterruptEnable; // 开中断
machineState[InitialPCState] = (int*)func;
machineState[InitialArgState] = arg;
machineState[WhenDonePCState] = (int*)ThreadFinish;

然后再将视角转向信号量的P操作。在P操作中,线程会先关中断,然后因为请求的资源不足(即value == 0)而将自己睡眠,直到重新被唤醒的时候才开中断。而在Sleep的实现中,并没有去开中断,甚至断言当前的中断状态是关。但是Yield部分和P操作说明了新换入的线程该如何去开中断。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/*
* threads/synch.cc
*/
void
Semaphore::P()
{
IntStatus oldLevel = interrupt->SetLevel(IntOff); // disable interrupts

while (value == 0) { // semaphore not available
queue->Append((void *)currentThread); // so go to sleep
currentThread->Sleep();
}
value--; // semaphore available,
// consume its value

(void) interrupt->SetLevel(oldLevel); // re-enable interrupts
}

/*
* threads/thread.cc
*/
void
Thread::Sleep ()
{
Thread *nextThread;

ASSERT(this == currentThread);
ASSERT(interrupt->getLevel() == IntOff);

DEBUG('t', "Sleeping thread \"%s\"\n", getName());

status = BLOCKED;
while ((nextThread = scheduler->FindNextToRun()) == NULL)
interrupt->Idle(); // no one to run, wait for an interrupt

scheduler->Run(nextThread); // returns when we've been signalled
}

将上面描述的信息总结起来可以得出结论。由Sleep陷入阻塞的线程或者由Yield切换线程前所关闭的中断,其开启的时机:

  1. 被换入的是线程 - 通过StartupPCState中存储的函数指针来开中断
  2. 被换入的是曾经由Yield换出的线程 - 执行Yield末尾的开中断
  3. 被换入的是从阻塞中被唤醒的线程 - 继续执行其末尾的开中断(如P操作的末尾的开中断)

Exercise3 实现锁和条件变量

可以使用sleep和wakeup两个原语操作(注意屏蔽系统中断),也可以使用Semaphore作为唯一同步原语(不必自己编写开关中断的代码)。

思路

:可以借助信号量来实现锁所需的机制。在锁内放置一个信号量lock,尝试获取锁就间接变成了P(lock)操作;而释放锁也就等同于V(lock)操作。此外为了保证锁的特性“仅有持有锁的进程能够释放锁”,增设一个属性用于记录是哪个线程获取到了锁。

条件变量:条件变量很重要的一点就是内部需要有等待队列以提供阻塞和唤醒功能,因此条件变量内部仅需要维护这样一个等待队列。

当然上述二者的操作均需保证其执行的原子性(对于Nachos保证原子性的方式较为单一,即通过关中断来保证)。

实现

首先是关于锁的实现。锁的实现机制是利用信号量机制,在锁中新增两个私有成员owner和lock。owner变量用于记录持有锁的线程,lock变量用于上锁和释放锁的实现。上锁和释放锁的操作均利用关中断来保证操作的原子性。对于上锁Acquire操作,线程先对锁进行P操作,如果锁已经被占用,则该线程会阻塞;反之,则获得锁并设置锁的owner。对于释放锁Release操作,则会先断言释放该锁的线程必须是持有锁的线程,然后V操作释放锁资源并将持有者owner置空。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Lock::Lock(char* debugName) {
name = debugName;
lock = new Semaphore("Called in Lock", 1);
owner = NULL;
}

Lock::~Lock() {
delete lock;
}

void Lock::Acquire() {
IntStatus oldLevel = interrupt->SetLevel(IntOff);
lock->P();
owner = currentThread;
(void) interrupt->SetLevel(oldLevel);
}

void Lock::Release() {
IntStatus oldLevel = interrupt->SetLevel(IntOff);
ASSERT(isHeldByCurrentThread()); // 仅有持有锁的线程才能释放锁
lock->V();
owner = NULL;
(void) interrupt->SetLevel(oldLevel);
}

bool Lock::isHeldByCurrentThread() {
return currentThread == owner;
}

对于条件变量的实现,则新增了一个等待队列queue,用于记录调用Wait而阻塞的线程。对于等待Wait的实现,则同样先断言获得到锁的线程才能操作条件变量,然后释放锁,阻塞当前线程并将其加入到等待队列;直到线程被唤醒时获取锁,然后进入临界区操作。对于唤醒Signal的实现,则是通过判断等待队列是否为空,如果不为空,则取出一个线程加入到就绪队列。同理Broadcast函数的实现,则是对等待队列的所有线程执行Signal操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
Condition::Condition(char* debugName) {
name = debugName;
queue = new List;
}

Condition::~Condition() {
delete queue;
}

void Condition::Wait(Lock* conditionLock) {
// ASSERT(FALSE);
IntStatus oldLevel = interrupt->SetLevel(IntOff);

ASSERT(conditionLock->isHeldByCurrentThread());
// 因为无法获得锁的线程会阻塞,因此请求锁的线程仍在执行就说明是有问题的
conditionLock->Release(); // 睡眠之前释放锁
queue->Append(currentThread);
currentThread->Sleep();
conditionLock->Acquire(); // 被唤醒时重新请求锁,进入临界区

(void) interrupt->SetLevel(oldLevel);
}
void Condition::Signal(Lock* conditionLock) {
IntStatus oldLevel = interrupt->SetLevel(IntOff);
// 取出一个线程加入到就绪队列
if (!queue->IsEmpty()) {
Thread* nextThread = queue->Remove();
scheduler->ReadyToRun(nextThread);
}

(void) interrupt->SetLevel(oldLevel);
}
void Condition::Broadcast(Lock* conditionLock) {
IntStatus oldLevel = interrupt->SetLevel(IntOff);

while (!queue->IsEmpty()) {
Signal(conditionLock);
}

(void) interrupt->SetLevel(oldLevel);
}

Exercise4 实现同步互斥实例

基于Nachos中的信号量、锁和条件变量,采用两种方式实现同步和互斥机制应用(其中使用条件变量实现同步互斥机制为必选题目)。具体可选择“生产者-消费者问题”、“读者-写者问题”、“哲学家就餐问题”、“睡眠理发师问题”等。(也可选择其他经典的同步互斥问题)

生产者-消费者问题(信号量实现)

设置三个信号量,缓冲区互斥信号量mutex、缓冲区剩余空间信号量empty、缓冲区已有产品信号量full。

然后编写生产者函数Producer,共生产10个产品,每次生产前判断缓冲区是否还有空间,然后再获取互斥变量mutex,接着进入临界区执行生产任务(往缓冲区的空位置写1),最后再释放这些信号量。对于消费者Consumer则与之类似,每次消费前判断缓冲区是否有产品,然后再获取mutex,接着进入临界区执行消费任务(将缓冲区一个为1的位置改为-1),最后同样释放信号量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// 信号量
Semaphore * mutex = new Semaphore("Producer-Consumer-mutex", 1);
Semaphore * full = new Semaphore("Producer-Consumer-full", 0);
Semaphore * empty = new Semaphore("Producer-Consumer-empty", 5);
// 互斥缓冲区
int buff[5] = {0};

void printBuff(char * name) {
printf("===========%s===========\n", name);
for (int i = 0; i < 5; i++)
printf("%d ", buff[i]);
printf("\n===========End===========\n");
}

void Producer(int which) {
int num = 10;
while (num--) {
printf("Thread %d is trying to produce\n", which);
empty->P();
printf("Thread %d is trying to get mutex\n", which);
mutex->P();

printf("Thread %d is producing\n", which);
buff[num % 5] = 1;
printBuff("Producer");

full->V();
mutex->V();
}
}

void Consumer(int which) {
int num = 10;
while (num--) {
printf("Thread %d is trying to consume\n", which);
full->P();
printf("Thread %d is trying to get mutex\n", which);
mutex->P();

printf("Thread %d is comsuming\n", which);
buff[num % 5] = -1;
printBuff("Consumer");

empty->V();
mutex->V();
}
}

void
ThreadTest5() {
DEBUG('t', "Entering ThreadTest5\n");

Thread* prod = new Thread("Producer");
Thread* cons = new Thread("Consumer");

prod->Fork(Producer, (void*)1);
cons->Fork(Consumer, (void*)2);
}

测试结果如下:

Shell输出结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
root@b5500d28dd66:/nachos/nachos-3.4/code/threads# ./nachos -q 5
No test specified.
Thread 1 is trying to produce
Thread 1 is trying to get mutex
Thread 2 is trying to consume
Thread 1 is producing
===========Producer===========
0 0 0 0 1
===========End===========
Thread 1 is trying to produce
Thread 1 is trying to get mutex
Thread 1 is producing
===========Producer===========
0 0 0 1 1
===========End===========
Thread 2 is trying to get mutex
Thread 2 is comsuming
===========Consumer===========
0 0 0 1 -1
===========End===========
Thread 1 is trying to produce
Thread 1 is trying to get mutex
Thread 1 is producing
===========Producer===========
0 0 1 1 -1
===========End===========
Thread 1 is trying to produce
Thread 1 is trying to get mutex
Thread 1 is producing
===========Producer===========
0 1 1 1 -1
===========End===========
Thread 1 is trying to produce
Thread 1 is trying to get mutex
Thread 1 is producing
===========Producer===========
1 1 1 1 -1
===========End===========
Thread 1 is trying to produce
Thread 1 is trying to get mutex
Thread 1 is producing
===========Producer===========
1 1 1 1 1
===========End===========
Thread 2 is trying to consume
Thread 2 is trying to get mutex
Thread 1 is trying to produce
Thread 2 is comsuming
===========Consumer===========
1 1 1 -1 1
===========End===========
Thread 2 is trying to consume
Thread 2 is trying to get mutex
Thread 2 is comsuming
===========Consumer===========
1 1 -1 -1 1
===========End===========
Thread 1 is trying to get mutex
Thread 2 is trying to consume
Thread 2 is trying to get mutex
Thread 2 is comsuming
===========Consumer===========
1 -1 -1 -1 1
===========End===========
Thread 2 is trying to consume
Thread 2 is trying to get mutex
Thread 2 is comsuming
===========Consumer===========
-1 -1 -1 -1 1
===========End===========
Thread 2 is trying to consume
Thread 2 is trying to get mutex
Thread 2 is comsuming
===========Consumer===========
-1 -1 -1 -1 -1
===========End===========
Thread 2 is trying to consume
Thread 1 is producing
===========Producer===========
-1 -1 -1 1 -1
===========End===========
Thread 1 is trying to produce
Thread 1 is trying to get mutex
Thread 1 is producing
===========Producer===========
-1 -1 1 1 -1
===========End===========
Thread 1 is trying to produce
Thread 1 is trying to get mutex
Thread 1 is producing
===========Producer===========
-1 1 1 1 -1
===========End===========
Thread 1 is trying to produce
Thread 1 is trying to get mutex
Thread 2 is trying to get mutex
Thread 1 is producing
===========Producer===========
1 1 1 1 -1
===========End===========
Thread 2 is comsuming
===========Consumer===========
1 1 1 -1 -1
===========End===========
Thread 2 is trying to consume
Thread 2 is trying to get mutex
Thread 2 is comsuming
===========Consumer===========
1 1 -1 -1 -1
===========End===========
Thread 2 is trying to consume
Thread 2 is trying to get mutex
Thread 2 is comsuming
===========Consumer===========
1 -1 -1 -1 -1
===========End===========
Thread 2 is trying to consume
Thread 2 is trying to get mutex
Thread 2 is comsuming
===========Consumer===========
-1 -1 -1 -1 -1
===========End===========
No threads ready or runnable, and no pending interrupts.
Assuming the program completed.
Machine halting!

Ticks: total 940, idle 20, system 920, user 0
Disk I/O: reads 0, writes 0
Console I/O: reads 0, writes 0
Paging: faults 0
Network I/O: packets received 0, sent 0

Cleaning up...

生产者-消费者问题(条件变量实现)

设置一个互斥锁CondLock_PC_mutex来实现对条件变量以及临界区的互斥操作。因此对生产者和消费者分别设置一个条件变量用于记录各自的阻塞队列。最后用两个变量分别标记缓冲区被使用的空间大小,以及缓冲区的最大限度。

同样,生产者和消费者的实现上比较类似,因此以生产者举例。设置了共10个生产任务及5个缓冲区大小。生产者首先获取互斥锁来访问缓冲区,然后用while语句判断是否有空闲空间以供生产。如果没有则将自己置入生产者条件变量的等待队列;如果有则生产一个产品,最后唤醒消费者并释放锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// 条件变量
Lock * CondLock_PC_mutex = new Lock("CondLock_PC_mutex"); // 互斥锁
Condition * Cond_PC_p = new Condition("Cond_PC_p"); // 生产者条件变量
Condition * Cond_PC_c = new Condition("Cond_PC_c"); // 消费者条件变量
int buffSize = 0; // 互斥缓冲区
int maxBuffSize = 5;

void Producer(int which) {
int num = 10;
while (num--) {
printf("Thread %d is trying to get lock\n", which);
CondLock_PC_mutex->Acquire();
while (buffSize >= maxBuffSize) {
printf("Thread %d is waiting\n", which);
Cond_PC_p->Wait(CondLock_PC_mutex);
}
printf("Thread %d is producing\n", which);
buffSize++;
Cond_PC_c->Signal(CondLock_PC_mutex);
CondLock_PC_mutex->Release();
}
}

void Consumer(int which) {
int num = 10;
while (num--) {
printf("Thread %d is trying to get lock\n", which);
CondLock_PC_mutex->Acquire();
while (buffSize <= 0) {
printf("Thread %d is waiting\n", which);
Cond_PC_c->Wait(CondLock_PC_mutex);
}
printf("Thread %d is consuming\n", which);
buffSize--;
Cond_PC_p->Signal(CondLock_PC_mutex);
CondLock_PC_mutex->Release();
}
}

void
ThreadTest6() {
DEBUG('t', "Entering ThreadTest5\n");

Thread* prod = new Thread("Producer");
Thread* cons = new Thread("Consumer");

prod->Fork(Producer, (void*)1);
cons->Fork(Consumer, (void*)2);
}

测试结果如下:

Shell输出结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
root@b5500d28dd66:/nachos/nachos-3.4/code/threads# ./nachos -q 5 
No test specified.
Thread 1 is trying to get lock
Thread 1 is producing
Thread 2 is trying to get lock
Thread 1 is trying to get lock
Thread 1 is producing
Thread 1 is trying to get lock
Thread 1 is producing
Thread 1 is trying to get lock
Thread 1 is producing
Thread 1 is trying to get lock
Thread 1 is producing
Thread 1 is trying to get lock
Thread 1 is waiting
Thread 2 is consuming
Thread 1 is producing
Thread 1 is trying to get lock
Thread 1 is waiting
Thread 2 is trying to get lock
Thread 2 is consuming
Thread 2 is trying to get lock
Thread 2 is consuming
Thread 2 is trying to get lock
Thread 2 is consuming
Thread 2 is trying to get lock
Thread 2 is consuming
Thread 2 is trying to get lock
Thread 2 is consuming
Thread 2 is trying to get lock
Thread 2 is waiting
Thread 1 is producing
Thread 1 is trying to get lock
Thread 1 is producing
Thread 1 is trying to get lock
Thread 1 is producing
Thread 1 is trying to get lock
Thread 1 is producing
Thread 2 is consuming
Thread 2 is trying to get lock
Thread 2 is consuming
Thread 2 is trying to get lock
Thread 2 is consuming
Thread 2 is trying to get lock
Thread 2 is consuming
No threads ready or runnable, and no pending interrupts.
Assuming the program completed.
Machine halting!

Ticks: total 760, idle 20, system 740, user 0
Disk I/O: reads 0, writes 0
Console I/O: reads 0, writes 0
Paging: faults 0
Network I/O: packets received 0, sent 0

Cleaning up...

Challenge 1 实现barrier

可以使用Nachos 提供的同步互斥机制(如条件变量)来实现barrier,使得当且仅当若干个线程同时到达某一点时方可继续执行。

思路

若要满足“当且仅当若干个线程同时到达某一点时方可继续执行”,那么可以考虑用到条件变量中的Broadcast方法。然后增设一个变量来统计进入到条件变量等待队列中的线程数,当所有线程抵达barrier所设的点时,也即所有线程都进入到了等待队列中,此时将所有线程同时唤醒。

实现

采取新建一个Barrier类的方式来实现。首先在threads/synch.h中创建Barrier类的基本结构如下。Barrier主要包含了4个属性,锁lock用于条件变量的操作,条件变量用于记录不满足条件时等待的线程以及后续的唤醒操作,size用于记录满足Barrier要求的线程数,cur_size记录当前抵达Barrier时阻塞的线程数。

Barrier除了基本的构造/析构函数外,仅有一个操作方法Set用于在某个点设置barrier(即在需要放置barrier的点调用Set函数即可)。Set的实现方式,首先获取锁保证临界区的互斥操作,然后将抵达该点的线程计数器增加,接着判断是否满足了释放所有线程的条件size。如果满足则调用条件变量的Broadcast方法唤醒所有的线程;如果不满足则将当前线程放入条件变量的阻塞队列中。这样就实现了Barrier的基本功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/*
* threads/synch.h
*/
class Barrier {
public:
Barrier(char* debugName, int initSize);
~Barrier();
char* getName() { return (name); }
void Set();

private:
Lock* lock;
Condition* condition;
char* name;
int size;
int cur_size;
};

/*
* threads/synch.cc
*/
Barrier::Barrier(char *debugName, int initSize) {
size = initSize;
lock = new Lock("Barrier Lock");
name = debugName;
condition = new Condition("Barrier Condition");
cur_size = 0;
}

Barrier::~Barrier() {
delete lock;
delete condition;
}

void Barrier::Set() {
lock->Acquire();
cur_size++;
if (cur_size >= size) {
printf("Thread %s, current number %d, needed number %d, do Broadcast\n",
currentThread->getName(), cur_size, size);
condition->Broadcast(lock);
lock->Release();
} else {
printf("Thread %s, current number %d, needed number %d, do Wait\n",
currentThread->getName(), cur_size, size);
condition->Wait(lock);
lock->Release();
}

printf("Thread %s continue run\n",
currentThread->getName(), cur_size, size);
}

然后编写测试函数,在两个线程中分别对Barrier_x和Barrier_y赋值,然后进行计算。这就意味着仅当两个线程同时赋值结束之后才能获得正确的运算结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
int Barrier_x = 0, Barrier_y = 0;
Barrier* barrier = new Barrier("Barrier", 2);

void BarrierFunc_1(int which) {
Barrier_x = 10;
barrier->Set();
printf("BarrierFunc_1 do \'x * y\', get %d\n", Barrier_x * Barrier_y);
}

void BarrierFunc_2(int which) {
Barrier_y = 3;
barrier->Set();
printf("BarrierFunc_2 do \'x + y\', get %d\n", Barrier_x + Barrier_y);
}

void
ThreadTest7() {
DEBUG('t', "Entering ThreadTest5\n");

Thread* thread_1 = new Thread("1");
Thread* thread_2 = new Thread("2");

thread_1->Fork(BarrierFunc_1, (void*)1);
thread_2->Fork(BarrierFunc_2, (void*)2);
}

测试结果如下:

Shell输出结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
root@b5500d28dd66:/nachos/nachos-3.4/code/threads# ./nachos -q 7 
Thread 1, current number 1, needed number 2, do Wait
Thread 2, current number 2, needed number 2, do Broadcast
Thread 2 continue run
BarrierFunc_2 do 'x + y', get 13
Thread 1 continue run
BarrierFunc_1 do 'x * y', get 30
No threads ready or runnable, and no pending interrupts.
Assuming the program completed.
Machine halting!

Ticks: total 140, idle 10, system 130, user 0
Disk I/O: reads 0, writes 0
Console I/O: reads 0, writes 0
Paging: faults 0
Network I/O: packets received 0, sent 0

Cleaning up...

Challenge 2 实现read/write lock

基于Nachos提供的lock(synch.h和synch.cc),实现read/write lock。使得若干线程可以同时读取某共享数据区内的数据,但是在某一特定的时刻,只有一个线程可以向该共享数据区写入数据。

思路

读写锁是计算机程序的并发控制的一种同步机制,也称“共享-互斥锁”、多读者-单写者锁。 用于解决读写问题。读操作可并发重入,写操作是互斥的。

互斥原则:

  • 读-读能共存,
  • 读-写不能共存,
  • 写-写不能共存。

实现

利用条件变量实现读写锁。使用一个计数器refCount来标记当前锁的状态,当该值为-1时表示有1个写者正在写;当该值为0时表示锁未使用;当该值为正数时表示有n个读者正在读。为保证写写互斥,refCount负值仅能为-1。readCond条件变量记录当有读者正在读时,写者的等待队列;writeCond条件变量记录当有写者正在写时,读者的等待队列。最后两个变量readWaiters和writeWaiters分别记录读写者等待的数量,以及用于在释放锁时使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class ReadWriteLock {
public:
ReadWriteLock(char* debugName);
~ReadWriteLock();
void ReadLock();
void WriteLock();
void Unlock();

private:
char* name;
int refCount; // -1表示有写者,0表示无人加锁,正数表示读者个数
Lock* rwLock;
Condition* readCond;
Condition* writeCond;

int readWaiters;
int writeWaiters;
};

对于读锁的实现主要是根据refCount的值判断是否有写者正在写,如果有则进入等待队列;如果没有则将refCount的值增加,且获取锁。因此读锁的可重入就是通过refCount的判断来实现的,读者仅会因为有写者正在写而阻塞。

这里要注意的是,while不能用if来判断,理由与P操作中的while类似。首先,Wait操作会释放锁rwLock,然后等待readCond执行Signal,最后重新操作条件变量时,再对rwLock加锁。这样就会存在一种情况,在当前读线程从被Signal唤醒之后,还没来得及对rwLock进行加锁时,另外一个线程被换上CPU来获取写锁,此时refCount为0,因此导致refcount变成了-1。如果不重新对refcount进行判断就会导致读者写者同时获取锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void ReadWriteLock::ReadLock() {
rwLock->Acquire();
printf("Thread %s is tring to get ReadLock, readerWaiters %d, writeWaiters %d, refCount %d\n",
currentThread->getName(), readWaiters, writeWaiters, refCount);
while (refCount < 0) {
readWaiters++;
readCond->Wait(rwLock);
readWaiters--;
}
refCount++;
printf("Thread %s get ReadLock, readerWaiters %d, writeWaiters %d, refCount %d\n",
currentThread->getName(), readWaiters, writeWaiters, refCount);
rwLock->Release();
}

写锁的实现方式与读锁类似。主要区别在于为了保证写写互斥、读写互斥,要求refCount的值为0,即仅当没有任何线程获取到锁时,才能加写锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void ReadWriteLock::WriteLock() {
rwLock->Acquire();
printf("Thread %s is tring to get WriteLock, readerWaiters %d, writeWaiters %d, refCount %d\n",
currentThread->getName(), readWaiters, writeWaiters, refCount);
while (refCount != 0) {
writeWaiters++;
writeCond->Wait(rwLock);
writeWaiters--;
}
refCount = -1;
printf("Thread %s get WriteLock, readerWaiters %d, writeWaiters %d, refCount %d\n",
currentThread->getName(), readWaiters, writeWaiters, refCount);
rwLock->Release();
}

最后是解锁Unlock的实现。如果是写者释放锁,则直接将refCount置0;如果是读者释放锁,则将refCount减1。改动完了refCount之后,再进一步判断是否所有线程都释放了锁。当存在等待的写者时,则用Signal唤醒一个写者;当存在等待的读者时,则用Broadcast唤醒所有读者(因为读锁是可重入的)。

在该实现下,如果有写者进入等待队列,但在这之后运行新的读者获得锁,因此这是一种读优先的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void ReadWriteLock::Unlock() {
ASSERT(refCount != 0);

rwLock->Acquire();
if (refCount == -1) {
refCount = 0;
} else {
refCount--;
}

if (refCount == 0) {
if (writeWaiters > 0) {
writeCond->Signal(rwLock);
} else if (readWaiters > 0) {
readCond->Broadcast(rwLock);
}
}
printf("Thread %s unlocked, readerWaiters %d, writeWaiters %d, refCount %d\n",
currentThread->getName(), readWaiters, writeWaiters, refCount);
rwLock->Release();
}

最后编写测试函数如下。编写相应的测试函数。使用一个整数来模拟共享文件。读/写函数类似,读操作仅获取sharedFile的值,写操作会对该值+1。为了能见到读写锁的互斥以及写优先策略的效果,在释放锁前和释放锁后均让线程执行Yield放弃CPU。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
ReadWriteLock* readWriteLock = new ReadWriteLock("ReadWriteLock Test");
int sharedFile = 0;

void do_read(int which) {
int num = 2;
while (num--) {
readWriteLock->ReadLock();
printf("Thread %s is reading, sharedFile value %d\n", currentThread->getName(), sharedFile);
currentThread->Yield();
readWriteLock->Unlock();
currentThread->Yield();
}
}

void do_write(int which) {
int num = 3;
while (num--) {
readWriteLock->WriteLock();
sharedFile++;
printf("Thread %s is writing, sharedFile value %d\n", currentThread->getName(), sharedFile);
currentThread->Yield();
readWriteLock->Unlock();
currentThread->Yield();
}
}

void
ThreadTest8() {
DEBUG('t', "Entering ThreadTest5\n");

Thread* read_1 = new Thread("Read_1");
Thread* read_2 = new Thread("Read_2");
Thread* read_3 = new Thread("Read_3");
Thread* write_1 = new Thread("Write_1");
Thread* write_2 = new Thread("Write_2");

write_1->Fork(do_write, (void*)0);
read_1->Fork(do_read, (void*)0);
read_2->Fork(do_read, (void*)0);
write_2->Fork(do_write, (void*)0);
read_3->Fork(do_read, (void*)0);
}

然后分别创建了3个读线程,2个写线程进行测试,结果如下。可以看到当有写着在写时,其他写者和读者加入到等待队列。然后该写者写完时,新的写者优先获得锁。直到等待队列中没有写者时,读者才开始读。多个线程同时获得了读锁。

Shell执行结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
root@b5500d28dd66:/nachos/nachos-3.4/code/threads# ./nachos -q 8
Thread Write_1 is tring to get WriteLock, readerWaiters 0, writeWaiters 0, refCount 0
Thread Write_1 get WriteLock, readerWaiters 0, writeWaiters 0, refCount -1
Thread Write_1 is writing, sharedFile value 1
Thread Read_1 is tring to get ReadLock, readerWaiters 0, writeWaiters 0, refCount -1
Thread Read_2 is tring to get ReadLock, readerWaiters 1, writeWaiters 0, refCount -1
Thread Write_2 is tring to get WriteLock, readerWaiters 2, writeWaiters 0, refCount -1
Thread Read_3 is tring to get ReadLock, readerWaiters 2, writeWaiters 1, refCount -1
Thread Write_1 unlocked, readerWaiters 3, writeWaiters 1, refCount 0
Thread Write_2 get WriteLock, readerWaiters 3, writeWaiters 0, refCount -1
Thread Write_2 is writing, sharedFile value 2
Thread Write_1 is tring to get WriteLock, readerWaiters 3, writeWaiters 0, refCount -1
Thread Write_2 unlocked, readerWaiters 3, writeWaiters 1, refCount 0
Thread Write_1 get WriteLock, readerWaiters 3, writeWaiters 0, refCount -1
Thread Write_1 is writing, sharedFile value 3
Thread Write_2 is tring to get WriteLock, readerWaiters 3, writeWaiters 0, refCount -1
Thread Write_1 unlocked, readerWaiters 3, writeWaiters 1, refCount 0
Thread Write_2 get WriteLock, readerWaiters 3, writeWaiters 0, refCount -1
Thread Write_2 is writing, sharedFile value 4
Thread Write_1 is tring to get WriteLock, readerWaiters 3, writeWaiters 0, refCount -1
Thread Write_2 unlocked, readerWaiters 3, writeWaiters 1, refCount 0
Thread Write_1 get WriteLock, readerWaiters 3, writeWaiters 0, refCount -1
Thread Write_1 is writing, sharedFile value 5
Thread Write_2 is tring to get WriteLock, readerWaiters 3, writeWaiters 0, refCount -1
Thread Write_1 unlocked, readerWaiters 3, writeWaiters 1, refCount 0
Thread Write_2 get WriteLock, readerWaiters 3, writeWaiters 0, refCount -1
Thread Write_2 is writing, sharedFile value 6
Thread Write_2 unlocked, readerWaiters 3, writeWaiters 0, refCount 0
Thread Read_1 get ReadLock, readerWaiters 2, writeWaiters 0, refCount 1
Thread Read_2 get ReadLock, readerWaiters 1, writeWaiters 0, refCount 2
Thread Read_3 get ReadLock, readerWaiters 0, writeWaiters 0, refCount 3
Thread Read_1 is reading, sharedFile value 6
Thread Read_2 is reading, sharedFile value 6
Thread Read_3 is reading, sharedFile value 6
Thread Read_1 unlocked, readerWaiters 0, writeWaiters 0, refCount 2
Thread Read_2 unlocked, readerWaiters 0, writeWaiters 0, refCount 1
Thread Read_3 unlocked, readerWaiters 0, writeWaiters 0, refCount 0
Thread Read_1 is tring to get ReadLock, readerWaiters 0, writeWaiters 0, refCount 0
Thread Read_1 get ReadLock, readerWaiters 0, writeWaiters 0, refCount 1
Thread Read_1 is reading, sharedFile value 6
Thread Read_2 is tring to get ReadLock, readerWaiters 0, writeWaiters 0, refCount 1
Thread Read_2 get ReadLock, readerWaiters 0, writeWaiters 0, refCount 2
Thread Read_3 is tring to get ReadLock, readerWaiters 0, writeWaiters 0, refCount 2
Thread Read_3 get ReadLock, readerWaiters 0, writeWaiters 0, refCount 3
Thread Read_3 is reading, sharedFile value 6
Thread Read_2 is reading, sharedFile value 6
Thread Read_3 unlocked, readerWaiters 0, writeWaiters 0, refCount 2
Thread Read_2 unlocked, readerWaiters 0, writeWaiters 0, refCount 1
Thread Read_1 unlocked, readerWaiters 0, writeWaiters 0, refCount 0
No threads ready or runnable, and no pending interrupts.
Assuming the program completed.
Machine halting!

Ticks: total 1100, idle 10, system 1090, user 0
Disk I/O: reads 0, writes 0
Console I/O: reads 0, writes 0
Paging: faults 0
Network I/O: packets received 0, sent 0

Cleaning up...

Challenge 3 研究Linux的kfifo机制是否可以移植到Nachos上作为一个新的同步模块

思路

kfifo是一个Linux内核使用的环形缓冲区。该缓冲区要求一次仅能有一个写进程和一个读进程在同时工作。其缓冲区如下图,in表示写者写入的位置,out表示读者开始读的位置。因为这种类似与生产者消费者的工作模式(即只要生产了数据,即可立即消费数据),同时缓冲区的数据满足FIFO的特性,所以可以使用一个队列来实现。

从Linux内核中kfifo的实现方式可以看到,需要有一个存储数据的共享缓冲区、该缓冲区的大小(以2的幂次较好,in % size 可以转化为 in & (size – 1))、in添加数据的起始游标、out输出数据的起始游标、以及一个自旋锁(用于防止重入)。

Linux中kfifo的实现
1
2
3
4
5
6
7
struct kfifo {
unsigned char *buffer; /* the buffer holding the data */
unsigned int size; /* the size of the allocated buffer */
unsigned int in; /* data is added at offset (in % size) */
unsigned int out; /* data is extracted from off. (out % size) */
spinlock_t *lock; /* protects concurrent modifications */
};

kfifo的巧妙之处在于in和out定义为无符号类型,在put和get时,in和out都是增加,当达到最大值时,产生溢出,使得从0开始,进行循环使用。如下图,当in溢出后,in的位置比out低,但是缓冲区数据的长度仍是in - out

另外kfifo还使用了内存屏障(smp_mb、smp_wmb、smp_rmb),用于来处理kfifo中读/写和out/in指针更新之间可能存在的内存乱序访问的情况。但是在单CPU的情况下,多线程执行不存在运行时内存乱序访问,因此对于只支持单CPU的Nachos来说,不需要考虑实现内存屏障。

实现

首先在machine/sysdep.h中声明Memcpy的函数,然后在machine/sysdep.cc中实现。该函数用于kfifo中缓冲区的写入和读取。

1
2
3
4
5
6
7
8
9
10
11
12
/*
* machine/sysdep.h
*/
// 内存拷贝函数
extern void* Memcpy(void *str1, const void *str2, unsigned int n);

/*
* machine/sysdep.cc
*/
void* Memcpy(void *str1, const void *str2, unsigned int n) {
return memcpy(str1, str2, n);
}

然后KFifo类的声明如下,包含基本的缓冲区buffer,缓冲区大小size,输入游标in,输出游标out,输入函数Put,输出函数Get,以及缓冲区使用空间查询函数BufferUsedSize

为了简便,这里删去了Linux中用到的自旋锁,自旋锁是为了保证同一时刻最多有一个读者和一个写者在操作KFifo。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/*
* threads/synch.h
*/
class KFifo {
public:
KFifo(char * debugname, unsigned int bufsize);
~KFifo();
unsigned int Put(char * inBuff, int len);
unsigned int Get(char * oufBuff, int len);
unsigned int BufferUsedSize();
private:
char * name;
unsigned char * buffer;
unsigned int size; // 缓冲区大小
unsigned int in; // 输入的位置
unsigned int out; // 输出的位置
};

接下来是其主要功能的实现,位于threads/synch.cc。对于构造函数,在这里除了基本的初始化之外,最重要的是将缓冲区的大小向上取整,为了后续计算的方便。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
KFifo::KFifo(char *debugname, unsigned int bufSize) {
name = debugname;
ASSERT(bufSize <= (1 << 31)); // 无法表示 1 << 32 的整数
unsigned int upSize = 1;
while (upSize < bufSize) { upSize = upSize << 1; }
size = upSize; // 大小向上取2的倍数,好处为对size的取模运算可以转化为与运算
buffer = new char[size];
in = 0;
out = 0;
}

KFifo::~KFifo() {
delete buffer;
}

Put函数的实现。首先是计算可以访问的空间,将传入数据的长度len和缓冲区的剩余空间size - in + out取较小值,即为缓冲区能够写入的数据大小。为了模拟循环队列,先尝试将输入从in一直写入到buffer(buffer自身是线性存储)的末尾;如果仍有数据要写入,则继续从buffer的头部开始写入。

1
2
3
4
5
6
7
8
9
10
11
12
unsigned int KFifo::Put(char *inBuff, int len) {
unsigned int l; // 从in到缓冲区末尾的长度

len = min(len, size - in + out); // 空闲空间
// 先将数据从in写到buffer的末尾
l = min(len, size - (in & (size - 1)));
Memcpy(buffer + (in & (size - 1)), inBuff, l);
// 然后将剩余部分写在buffer的头部
Memcpy(buffer, buffer + l, len - l);
in += len;
return len;
}

Get函数的实现与Put类似。首先是计算可以读取的数据长度,将传入待读取长度len和缓冲区的数据长度in - out取较小值,即为缓冲区能够写入的数据大小。同样为了模拟循环队列,先尝试将输出从out一直读取到buffer(buffer自身是线性存储)的末尾;如果仍有数据要读取,则考虑继续从buffer的头部开始读取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
unsigned int KFifo::Get(char *oufBuff, int len) {
unsigned int l; // 从in到缓冲区末尾的长度

len = min(len, in - out); // 可读的数据
// 先读取从out到buffer末尾的部分
l = min(len, size - (out & (size - 1)));
Memcpy(oufBuff, buffer + (out & (size - 1)), l);
// 然后继续读取位于buffer头部的部分
Memcpy(oufBuff + l, buffer, len - l);
out += len;

if (in == out) { in = 0; out = 0; } // 二者相等时buffer为空,所以置0
return len;
}

最后编写测试函数。写者每次向缓冲区写入随机长度的字符,读者则每次从缓冲区读取随机长度的字符并打印。仅同时存在一个读者和一个写者。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
KFifo* kFifo = new KFifo("KFifo Test", 256);

void BuffWriter(int which) {
int num = 5;
while (num--) {
char * inBuff = "abcdefghijklmnopqrstuvwxyz";
int len = Random() % 26;
int r = kFifo->Put(inBuff, len);
printf("Thread %s try to put %d chars in kFifo, finally %d in\n", currentThread->getName(), len, r);
currentThread->Yield();
}
}

void BuffReader(int which) {
int num = 5;
while (num--) {
char outBuff[256];
int len = Random() % 26;
int r = kFifo->Get(outBuff, len);
printf("Thread %s try to get %d chars from kFifo, finally %d out\n", currentThread->getName(), len, r);
outBuff[r] = '\0';
printf("Read Info => %s == now buffer unread size %d\n", outBuff, kFifo->BufferUsedSize());
currentThread->Yield();
}
}

void
ThreadTest9() {
DEBUG('t', "Entering ThreadTest5\n");

Thread* read = new Thread("Read");
Thread* write = new Thread("Write");

write->Fork(BuffWriter, (void*)0);
read->Fork(BuffReader, (void*)0);
}

测试结果如下。

Shell执行结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
root@b5500d28dd66:/nachos/nachos-3.4/code/threads# ./nachos -q 9
Thread Write try to put 13 chars in kFifo, finally 13 in
Thread Read try to get 22 chars from kFifo, finally 13 out
Read Info => abcdefghijklm == now buffer unread size 0
Thread Read try to get 11 chars from kFifo, finally 0 out
Read Info => == now buffer unread size 0
Thread Write try to put 17 chars in kFifo, finally 17 in
Thread Read try to get 1 chars from kFifo, finally 1 out
Read Info => a == now buffer unread size 16
Thread Write try to put 1 chars in kFifo, finally 1 in
Thread Read try to get 12 chars from kFifo, finally 12 out
Read Info => bcdefghijklm == now buffer unread size 5
Thread Write try to put 16 chars in kFifo, finally 16 in
Thread Read try to get 1 chars from kFifo, finally 1 out
Read Info => n == now buffer unread size 20
Thread Write try to put 7 chars in kFifo, finally 7 in
No threads ready or runnable, and no pending interrupts.
Assuming the program completed.
Machine halting!

Ticks: total 180, idle 10, system 170, user 0
Disk I/O: reads 0, writes 0
Console I/O: reads 0, writes 0
Paging: faults 0
Network I/O: packets received 0, sent 0

Cleaning up...

遇到的困难以及解决方法

参考文献

[1] 荒野之萍. Nachos-Lab3-同步与互斥机制模块实现[EB/OL]. https://icoty.github.io/2019/05/14/nachos-3-4-Lab3/

[2] Github. Nachos中文教程.pdf[EB/OL]. https://github.com/zhanglizeyi/CSE120/blob/master/Nachos%E4%B8%AD%E6%96%87%E6%95%99%E7%A8%8B.pdf

[3] 维基百科. 读写锁[EB/OL]. https://zh.wikipedia.org/wiki/%E8%AF%BB%E5%86%99%E9%94%81

[4] 博客园. 一步一步实现读写锁[EB/OL]. https://www.cnblogs.com/myd620/p/6129112.html

[5] 知乎. 深入理解 Linux 的 RCU 机制[EB/OL]. https://zhuanlan.zhihu.com/p/30583695

[6] 博客园. Linux 下的同步机制[EB/OL]. https://www.cnblogs.com/ck1020/p/6532985.html

[7] CSDN. Linux各种同步机制的比较[EB/OL]. https://blog.csdn.net/q921374795/article/details/88814272?utm_medium=distribute.pc_relevant.none-task-blog-baidulandingword-3&spm=1001.2101.3001.4242

[8] CSDN. linux内核同步机制中的概念介绍和方法[EB/OL]. https://blog.csdn.net/wealoong/article/details/7957385?utm_medium=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-2.control&depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-2.control

[9] CSDN. 神奇的大内核锁[EB/OL]. https://blog.csdn.net/DLUTBruceZhang/article/details/11037159

[10] 博客园. linux内核数据结构之kfifo[EB/OL]. https://www.cnblogs.com/anker/p/3481373.html

Author

Chaos Chen

Posted on

2020-11-22

Updated on

2023-06-30

Licensed under

Commentaires