用C和Java实现简单多线程

简单的多线程——Linux下的pthread和Java下的fork-join

本项目分别用Linux下的pthread和Java的fork-join系统,用快速排序和归并排序两种方法实现了多线程的排序算法。重要步骤及心得体会记录在下:

一.用pthread实现多线程

1.pthread的基本思路和框架

首先pthread不是一个具体实现而是一个有关多线程的框架,我们这边使用它在Linux系统下的接口就好。
声明线程:

1
pthread_t tid;

为线程赋予属性:

1
2
pthread_attr_t attr;
pthread_attr_init(&attr);

这里就直接用attr的默认参数了,以上可以认为是一些前摇和初始化操作。

创建线程:

1
pthread_create(&tid,&attr,function,param);

注意一下这个函数的参数:第三个参数是一个函数,也是子线程要做的动作,这个函数的参数写法有点清奇,那么param就是它对应的参数了。param是一个指针,如果传多个参数的话我的意见是使用结构体,有没有其他方法不知道@_@。
将子线程合并到父线程:

1
pthread_join(tid,NULL);

2.排序算法实现思路

我们如果要用多线程加速排序算法的话,需要把对一个个数组的排序划分成多个子问题(没错分治法)我这里用两个线程,一个排序前面,一个排序后面。我用的是快排,当然你用冒泡啥的都一样。这里就有一点归并排序的味道了,但是我只是用了两个线程来处理各一半而已,并没有分到底。当然最后我又用了一个线程,在O(n)的时间里把两个已经排序好的数组归并到一起。
我们着重来看一下任务函数的写法:

1
2
3
4
5
void *sort(void *r)//function to be put in a thread
{
quicksort(array,((param*)r)->start,((param*)r)->end);
}

我们都知道一个正常的快排函数需要三个参数:一个待排的数组,两个定位参数,但是我们的任务函数只能有一个void*类型的指针,因此这里我们把参数写成了一个结构体,里面装定位下标。数组就设成全局变量就好。再看看另一个任务函数:

1
2
3
4
5
6
7
8
9
void *merge(void *r)
{
int start1=((params*)r)->param1->start;
int end1=((params*)r)->param1->end;
int start2=((params*)r)->param2->start;
int end2=((params*)r)->param2->end;
...
}

这里要传两组定界标志,我们又用了一个结构体来装:

1
2
3
4
5
6
7
8
9
10
typedef struct{
int start;
int end;
}param;

typedef struct{
param* param1;
param* param2;
}params;

3. 完整代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
int *array;
int *result;

int divide(int *a,int low,int high)
{
int k=a[low];
do{
while(low<high&&a[high]>=k)--high;
if(low<high){a[low]=a[high];++low;}
while(low<high&&a[low]<=k)++low;
if(low<high){a[high]=a[low];--high;}
}while(low!=high);
a[low]=k;
return low;
}
void quicksort(int *a,int low,int high)
{
int mid;
if(low>=high)return;
mid = divide(a,low,high);
quicksort(a,low,mid-1);
quicksort(a,mid+1,high);
return;
}

typedef struct{
int start;
int end;
}param;

typedef struct{
param* param1;
param* param2;
}params;

void *sort(void *r)//function to be put in a thread
{
quicksort(array,((param*)r)->start,((param*)r)->end);
}
void *merge(void *r)
{
int start1=((params*)r)->param1->start;
int end1=((params*)r)->param1->end;
int start2=((params*)r)->param2->start;
int end2=((params*)r)->param2->end;
int index1=start1;
int index2=start2;
int index=0;
while(index1<=end1&&index2<=end2)//not at the end
{
if(array[index1]<array[index2])
{
result[index]=array[index1];
++index1;
++index;
}
else
{
result[index]=array[index2];
++index2;
++index;
}
}
if(index1<=end1)//array not over
{
while(index1<=end1)
{
result[index]=array[index1];
++index1;
++index;
}
}
else
{
while(index2<=end2)
{
result[index]=array[index2];
++index2;
++index;
}
}
}

int main()
{
int n;
printf("please enter the number of the integers\n");
scanf("%d",&n);
array = (int*)malloc(sizeof(int)*n);
result = (int*)malloc(sizeof(int)*n);

for(int i=0;i<n;i++)
scanf("%d",&array[i]);
//now we have implemented the input of data
//it's time to deal with the thread


pthread_t tid1,tid2,tid3;
pthread_attr_t attr1,attr2,attr3;
pthread_attr_init(&attr1);
pthread_attr_init(&attr2);
pthread_attr_init(&attr3);
//test

params* my_param;
my_param=(params*)malloc(sizeof(params));
my_param->param1=(param*)malloc(sizeof(param));
my_param->param2=(param*)malloc(sizeof(param));
my_param->param1->start=0;
my_param->param1->end=n/2;
my_param->param2->start=n/2+1;
my_param->param2->end=n-1;

pthread_create(&tid1,&attr1,sort,my_param->param1);
pthread_create(&tid2,&attr2,sort,my_param->param2);
pthread_join(tid1,NULL);
pthread_join(tid2,NULL);
pthread_create(&tid3,&attr3,merge,my_param);
pthread_join(tid3,NULL);


//output
printf("after sort the result is\n");
for(int i=0;i<n;i++)
{
printf("%d",result[i]);
printf("\n");

}

return 0;

}

二.用Java的fork-join系统实现多线程

1.Java多线程的框架:

这里就不谈Java的那些基本语法了,跟C++可像了。我们知道它是面向对象的,因此需要一个类,这个类这里从RecursiveAction派生,这样我们只要重写其中的compute()函数,就能为子线程指派任务。
前摇:

1
2
ForkJoinPool mypool=new ForkJoinPool();
mypool.invoke(myquicksort);

这里我们把一个任务类对象放进去啦。我们在compute()函数中创建子线程:

1
2
3
4
5
6
7
8
9
quicksort leftquick = new quicksort(mid-this.low, this.low, mid-1, array);
quicksort rightquick = new quicksort(this.high-mid,mid+1,this.high,array);

leftquick.fork();
rightquick.fork();
//test
//System.out.println("a new thread is created\n");
leftquick.join();
rightquick.join();

你可以看到的是,这里的多线程其实就相当于我们在快速排序里的递归函数了,当然我们要为这个递归设置一个终止条件:

1
2
3
4
if(this.n<=1)
{
;
}

当元素个数小于等于一时,什么也不干,相当于直接return。
当然,更节约资源的方法是,当还要排序的元素个数小于一定数量(比如10个)时,不再创建新的线程,直接用比较或冒泡排序法做。

2.完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import java.util.Scanner;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;

public class quicksort extends RecursiveAction {
public int n;
public int[] array;
public int high;
public int low;
public quicksort(int num,int start, int end,int array[])
{
n=num;
high=end;
low = start;
this.array=array;
}
public int divide(int low,int high)
{
int k=this.array[low];

do{
while(low<high&&this.array[high]>=k)--high;
if(low<high){this.array[low]=this.array[high];++low;}
while(low<high&&this.array[low]<=k)++low;
if(low<high){this.array[high]=this.array[low];--high;}
}while(low!=high);
this.array[low]=k;
return low;

}
protected void compute()
{
if(this.n<=1)
{
;//相当于递归终止条件了。
//当然,更节约资源的方法是,当还要排序的元素个数小于一定数量(比如10个)时,不再创建新的线程,直接用比较或冒泡排序法做
}
else{ int mid=this.divide(this.low,this.high);
quicksort leftquick = new quicksort(mid-this.low, this.low, mid-1, array);
quicksort rightquick = new quicksort(this.high-mid,mid+1,this.high,array);

leftquick.fork();
rightquick.fork();
//test
//System.out.println("a new thread is created\n");
leftquick.join();
rightquick.join();
}

}
public static void main(String[] args) throws Exception {
System.out.println("水遁大坝谁修哈\n");
Scanner sc = new Scanner(System.in);
System.out.println("please input the number of Integers");
int ele_num=sc.nextInt();
int[] a=new int[ele_num];
for(int i=0;i<ele_num;i++)
{
a[i]=sc.nextInt();
}
quicksort myquicksort = new quicksort(ele_num,0,ele_num-1,a);

sc.close();
ForkJoinPool mypool=new ForkJoinPool();
mypool.invoke(myquicksort);

System.out.println("水遁大鲛弹\n");
for(int i=1;i<ele_num;i++)
{
System.out.println(myquicksort.array[i]);
}

}
}


     
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import java.util.concurrent.RecursiveAction;
import java.util.Scanner;
import java.util.concurrent.ForkJoinPool;
public class mergesort extends RecursiveAction {
public int []array;
public int start;
public int end;
public mergesort(int a[],int start,int end)
{
this.array=a;
this.start=start;
this.end=end;
}

public void compute()
{
if(this.end<=this.start){;}//只有一个元素
else{int mid = (this.start+this.end)/2;
mergesort mergeleft = new mergesort(this.array,this.start,mid);
mergesort mergeright = new mergesort(this.array,mid+1,this.end);

mergeleft.fork();
mergeright.fork();

mergeleft.join();
mergeright.join();

//分割完毕,开始归并
int []result = new int[this.end-this.start+1];
int result_index=0;
int index1=this.start;
int index2=mid+1;
while(index1<=mid&&index2<=this.end)//两个归并都没有完
{
if(this.array[index1]<this.array[index2])
{
result[result_index]=this.array[index1];
++index1;
++result_index;
}
else{
result[result_index]=this.array[index2];
++index2;
++result_index;
}
}
if(index1<=mid){
while(index1<=mid)
{
result[result_index]=this.array[index1];
++index1;
++result_index;
}
}
else{
while(index2<=this.end)
{
result[result_index]=this.array[index2];
++index2;
++result_index;
}
}
for(int i=this.start;i<=this.end;i++)
{
this.array[i]=result[i-this.start];
}
}
}
public static void main(String[] args) throws Exception {
Scanner sc = new Scanner(System.in);
System.out.println("please input the number of Integers");
int ele_num=sc.nextInt();
int[] a=new int[ele_num];
for(int i=0;i<ele_num;i++)
{
a[i]=sc.nextInt();
}

mergesort myMergesort = new mergesort(a, 0, ele_num-1);

sc.close();
ForkJoinPool mypool=new ForkJoinPool();
mypool.invoke(myMergesort);

System.out.println("水遁大鲛弹\n");
for(int i=0;i<ele_num;i++)
{
System.out.println(myMergesort.array[i]);
}

}
}


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!