Go红队开发—并发编程
目录[*]并发编程
[*]go协程
[*]chan通道
[*]无缓冲通道
[*]有缓冲通道
[*]创建⽆缓冲和缓冲通道
[*]等协程
[*]sync.WaitGroup同步
[*]Runtime包
[*]Gosched()
[*]Goexit()
[*]区别
[*]同步变量
[*]sync.Mutex互斥锁
[*]atomic原子变量
[*]Select
[*]Ticker定时器
[*]控制并发数量
[*]核心机制
[*]并发编程阶段练习
[*]重要的细节
[*]端口扫描
[*]股票监控
并发编程
go协程
chan通道
必要条件:
从通道取元素的时候要先关闭通道,程序才知道你不往通道输入了,才能取出元素来进行操作,否则会由于通道没有被关闭,range 操作会一直等待新的数据到来,导致程序陷入死锁状态。
close关闭通道不是必要的,但是如果你不关闭,还一直读取里面的东西的话, 你要保证你有源源不断的内容进入到通道中不能让他处于死锁状态。
//在go中的通道var 变量名 = make(chan 类型)var c = make(chan int)//通道可以给很多数据,取出来的时候可以一个个的pop一样,每次取一个就pop一个c <- 123c <- 456close(c) //你要取出内容的时候要先关闭通道,程序才知道你不要往通道输入了,要打印了。否则打印失败,因为一直处于等待状态会死锁。for v := range c{ fmt.Println(v)//这里会依次打印123,456}个人理解:
0.go协程感觉可以理解为比线程更加细的,同时也更快的一种东西,它能够实现多线程操作(其实应该说多协程操作,就是能同时干很多事情)1.go的协程main运行结束前他能够做抢占运行权的能力,但是一旦main运行结束他也就寄寄了。2.`通道(Channels)`:这个很重要,在协程中如果你没有用缓冲通道,就代表你使用go协程的话,能够进到协程里面运行的概率比较小,因为main一直抢占执行权,如果很快就执行完main的话就没go协程啥事情了。3.缓冲区:当你在使用Channels的时候且使用⽆缓冲通道,如果造成了阻塞->那么就是你要取通道的东西,但是需要等通道有东西取的时候才能往下走程序。可如果你make有缓冲通道的时候,就不会造成阻塞的状态,前提是拿缓冲通道东西的时候一定要有东西,否则他会一直等待,就像上面的for循环,他就是一直等待导致了死锁,所以报错,不是因为他代码错误哈!那么同时,如果你缓冲区有东西,就算进入了go协程也会很有可能被main程序抢占,`但如果缓冲区为空,那么main在取出数据的时候会因没有数据而等待某个协程给数据到通道然后取出来。`对通道与协程之间的关系详细解释:(核心是能够让goroutine协程能够互相通信)
Go 提供了⼀种称为通道的机制,⽤于在 goroutine 之间共享数据。Channels 通道可以共享内置、命名、结构和引⽤类型的值和 指针。两种类型的通道:⽆缓冲通道和缓冲通道。⽆缓冲通道⽤于执⾏goroutine之间的同步通信,⽽缓冲通道⽤于执⾏异步通信。⽆缓冲通道保证在发送和接收发⽣的瞬间执⾏两个 goroutine 之间的交换。缓冲通道没有这样的保证。 协程之间⽆法直接通信,但可以使⽤通道来使协程之间互相通信通道分为有缓冲和⽆缓冲,有缓冲,容量设置后,可以异步读取数据,不会堵塞,⽆缓冲的话, 放⼀个数据就必须取出来,否则就会阻塞无缓冲通道
无缓冲通道的特性发送:发送操作会阻塞,直到有 Goroutine 接收该值。接收:接收操作会阻塞,直到有 Goroutine 发送一个值。举例子:`假设你要在main中进行发送数据且读取数据是行不通的`,一定要开一个go协程发送数据,这样的话你的main就不会阻塞,因为go和main是互不干扰的,但是go协程可能执行权限轮不到他,main会抢占,所以这时候就能解释为什么要用sleep来休眠,等待go协程去执行发送通道了。(当然sleep这种方式只是演示,实际中不会用,有其他方法解决)有缓冲通道
在带缓冲的通道中,只要缓冲区未满,发送不会阻塞`只要缓冲区未空,接收不会阻塞`创建⽆缓冲和缓冲通道
make(chan int) //整型⽆缓冲通道,这里就是没有缓冲make(chan int ,1) //整型有缓冲通道,1也代表有一个缓冲//注意甄别:缓冲和通道是不一样的,通道是通道,你缓冲是拿来缓冲的,懂吧hhh?示例代码:
package mainimport ( "fmt" "math/rand" "time")var ch = make(chan int, 10)func Ion() { rand.Seed(time.Now().UnixNano()) intn := rand.Intn(100) fmt.Println("随机数为:", intn) ch <- intn ch <- 2//对比数据 ch <- 55//对比数据}func main() { defer close(ch) //main关闭后,关闭通道 //ch <- 2如果这里注释打开,就不会进入下面的go协程,因为主程序很快抢占回来执行结束,那么go的协程在go 的main运行完就无法运行了 go Ion() fmt.Println("Wait...") data := <-ch fmt.Println("值为:", data)//最终取出来的是随机数 fmt.Println("Done")}等协程
sync.WaitGroup同步
在上述中学到了,通道取出来数据是需要等待通道中有数据的,不然会阻塞,那么这当然是一种很好的同步方式,但是当我们go协程开启后,与main运行不相干的时候,main是不管协程的。main:"抢得过我再说"。
三个核心
var wg sync.WaitGroupwg.Add(1)wg.Done()//这个函数里面其实是执行wg.Add(-1)wg.Wait()单单看上面三个函数其实就能理解了
当我们需要go协程与主函数同步执行的时候,可以用add添加,然后wait等待,那么怎么判断是否需要继续wait呢,那么就是用done来判断了,如果一直减到0的时候就不用wait了示例代码:
//这样的话就不用sleep函数了,sleep我们也不清楚要多少秒才能让协程完成,所以肯定是需要这种同步机制来操作func wq() { var wg sync.WaitGroup //比如你每次进入go协程之前都add一下 for i := 0; i < 10; i++ { fmt.Println("进入go协程之前add一下") wg.Add(1) go func(n int) { fmt.Println("我是协程:", n) defer wg.Done() //通知一下go协程OK了,然后defer是为了防止中途报错无法释放。defer闭包释放好一点。 }(i) } //这里需要等待wait一下 wg.Wait() fmt.Println("等待结束")}Runtime包
这个也可以让go协程同步
Gosched()
这个函数是用来等待协程的,如果还有协程在执行,main就会让出执行权利,实现同步
func runtime_go_Gosched() { fmt.Println("演示Gosched") go func() { fmt.Println("go协程执行中...") runtime.Goexit() fmt.Println("go协程还能执行吗...") }() runtime.Gosched() //等待go协程执行完 fmt.Println("演示Gosched结束")}Goexit()
exit顾名思义退出,当你go协程执行的时候可以用这个退出,然后回到外层函数继续执行
func runtime_go_Goexit() { fmt.Println("演示Goexit") go func() { fmt.Println("go协程执行中...") for i := 0; i < 10; i++ { if i > 7 { fmt.Println("go协程退出:", i) runtime.Goexit() } } }() runtime.Gosched() //等待go协程执行完 fmt.Println("演示Goexit结束")}区别
sync.WaitGroup的作用一般是用于等待一组 goroutine 完成任务,done之后就不会阻塞了,即使你goroutine任务done完的代码后面还有代码任务,那么这时候goroutine就看情况抢占调度了,运气不好或者你main也没啥可执行的了就会直接结束,结束了你的go携程里面就算有代码可能抢不过main调度。runtime就是用于等待或结束一个goruntine的,可以用多个,每一个go携程中都能够同步变量
sync.Mutex互斥锁
不管是在以前学的线程还是go中的协程,开了多线程后都会面临着几个程序抢占同一个变量的事情,可能会产生死锁问题。
两个函数,加锁和解锁
var Lock sync.MutexLock.Lock()Lock.Unlock()简单的示例代码,使用加减法队同一个变量操作,你就会发现不会发生死锁问题
这里采用了x=10,然后循环加减法50次,你会发现基本不会发生一个加分或者减法一直执行的事情发生。
func Mutex_lock() { var ( x int = 10 wg sync.WaitGroup Lock sync.Mutex ) var Add = func() { defer wg.Done() Lock.Lock() x += 1 fmt.Println("x++:", x) Lock.Unlock() } var Sub = func() { defer wg.Done() Lock.Lock() x -= 1 fmt.Println("x--:", x) Lock.Unlock() } for i := 0; i < 50; i++ {//循环50次 wg.Add(1) go Add() wg.Add(1) go Sub() } wg.Wait() fmt.Println("结束", x)}atomic原子变量
除了Mutex互斥锁能够实现变量的同步操作外,go中还提供了原子变量
因为原子操作不能被中断,所以它需要足够简单,并且要求快速。因此,操作系统层面只对二进制位或整数的原子操作提供了支持。`原子操作只支持有限的数据类型`,所以多数场景下,往往互斥锁更合适。这里用同样的一个操作,加减法来实现一个变量的同步。
简单了解一下几个函数(以int类型为例子,其他类型一样的)
//CAS 是 Compare-And-Swap(⽐较并交换)的缩写//作用是:⽐较内存中的某个值是否等于预期值,如果相等则将其更新为新值atomic.CompareAndSwapInt32(&i, 100, 200)//Load(读)//读取过程中相当于我们Metux中的加锁解锁操作,不会发生死锁、变量值不同步问题atomic.LoadInt32(&i)//Store(写)atomic.StoreInt32(&i, 200)//高版本的go可以直接使用:atomic.Load(var_p)atomic.Store(var_p)示例代码:
func myAtomic() { //实现域Metux一样的操作 var ( xint32 = 10 wg sync.WaitGroup ) var Add = func() { wg.Done() atomic.AddInt32(&x, 3) fmt.Println("Add 1") } var Sub = func() { wg.Done() atomic.AddInt32(&x, -2) fmt.Println("Sub 1") } for i := 0; i < 5; i++ { wg.Add(1) go Add() wg.Add(1) go Sub() } wg.Wait() fmt.Println("结束", x)}Select
是Go中的⼀个控制结构,类似于switch语句,select会监听case语句中channel的读写操作,当case可读写的时候,就执行case,进行读写操作以及case里面写的语句,当然select是随机选择的。
default是当case当中都没有可读写的channel了,就会执行default中的语句
注意事项:
select中的case语句必须是⼀个channel操作多个case都,select会随机选出⼀个执⾏`没有可运⾏的case语句,且没有default语句,select将会阻塞,直到某个case通信可以运⾏`需求:
创建不同类型的chan通道,写入数据后通过case对他进行分类别读取。示例代码理解最快:
func select_case() { var ( wg sync.WaitGroup d_int = make(chan int) d_string= make(chan string) d_float64 = make(chan float64) ) go func() { wg.Add(1) //进入一个go协程+1 d_int <- 123 d_int <- 456 d_string <- "string1" d_string <- "string2" d_float64 <- 123.456 //养成好习惯,如果你不用通道了及时关闭,否则后面遍历处理数据的时候就会出错 close(d_int) close(d_string) close(d_float64) }() wg.Wait() //等待 for { select { case r := <-d_int: fmt.Println("int:", r) case r := <-d_string: fmt.Println("string:", r) case r := <-d_float64: fmt.Println("float64:", r) default: fmt.Println("default") } time.Sleep(1 * time.Second) }}Ticker定时器
创建定时器后,会向定时器的管道中按照指定的周期时间,定期写入事件,所以当你在周期到了的时候读取数据就不会阻塞,你读取完之后,还没到周期时间执行就会一直阻塞。
go的定时器记住三个函数
time.NewTicker(time.Second * 1) //创建一个周期性定时器ticker.C //ticker通道,Ticker对外仅暴露一个channelticker.Stop()//停止ticker示例代码
func myTicker() { ticker := time.NewTicker(1 * time.Second) count := 1 for _ = range ticker.C { fmt.Println("执行了:", count) count++ if count >= 10 { ticker.Stop() break } }}控制并发数量
核心机制
这里只是教你如何控制并发数量的例子,并不一定要按照我的方法来。
[*]sem <- data 的作用
[*]sem 是一个带缓冲的通道,缓冲区大小为 workers,即 sem := make(chan 类型, workers)。
[*]当通道的缓冲区已满时,发送操作(sem <-)会阻塞,等待通道有空间释放后再继续。
[*]如果通道没有满,发送会立即成功。
[*]<-sem 的作用
[*]defer func() { <-sem }() 会在协程完成任务后,从通道中取出一个值,释放空间。
[*]释放的空间让之前阻塞的发送操作可以继续执行,从而不会导致程序崩溃或死锁。
//这里是端口扫描的代码,提前搬上来了解一下控制并发数量func start_WaitGroup_scan_port() { var ( wg sync.WaitGroup ch = make(chan int, 1024) // 增加缓冲区,减少阻塞 count int workers = 100 // 控制并发数 ) var scanPort = func(hostname string, port int) { defer wg.Done() address := fmt.Sprintf("%s:%d", hostname, port) conn, err := net.DialTimeout("tcp", address, 2*time.Second) if err == nil { conn.Close() ch <- port } } // 控制并发数,很关键的理解 sem := make(chan int, workers) for i := 0; i < 65536; i++ { wg.Add(1) sem <- 1 go func(port int) { defer func() { <-sem }() scanPort("127.0.0.1", port) }(i) } go func() { wg.Wait() close(ch) }() for port := range ch { fmt.Printf("open: %d\n", port) count++ } fmt.Printf("------------Open ports: %d-------------\n", count) fmt.Println("------------Scan——Done------------")}并发编程阶段练习
重要的细节
wg.Wait() 移到一个匿名协程中处理,并在 wg.Wait() 之后关闭通道,保证了以下两点:
[*]通道关闭时,没有协程向通道写数据
[*]主协程从通道 rep 中读取数据,不负责关闭通道。
[*]匿名协程负责等待所有任务完成并关闭通道,确保通道关闭与写入互相隔离。
[*]避免主协程被抢占的死锁风险
[*]如果你直接在主协程中调用 wg.Wait() 和 close(rep),主协程在执行时可能与其他协程抢占资源,导致协程无法按时完成。
[*]匿名协程中执行 wg.Wait() 独立于主协程,不受读取数据或其他逻辑的影响。
可以看到下面中
go func() { wg.Wait() close(rep)}()这里原本我是直接放在主函数使用,开一个匿名协程去执行的,但是gpt解释了一下
就是:不能够在主函数中直接等待与关闭,而是需要在go协程中进行,因为直接在主函数wait后就close的话会导致go协程中还有正在执行的,这就很矛盾了,我们的wait命名是等待,但是不要忘记了他其实是Add函数去判断是否还需要等待,如果你提前Done了,主函数会立即抢占,那么这时候你的go协程其实还有很多代码没有执行完成就被抢占回去了,然后后面就是你要进行close,那么后续的go协程write相关操作就会报错了
func start_WaitGroup_scan_port() { var ( wg sync.WaitGroup count int = 0 rep = make(chan bool) ) var WaitGroup_scan_port = func(hostname string, port int) { address := fmt.Sprintf("%s:%d", hostname, port) //ip端口 conn, err := net.DialTimeout("tcp", address, 2*time.Second) //2秒 if err != nil { wg.Done() rep <- false } else { conn.Close() fmt.Printf("open: %d\n", port) wg.Done() rep <- true } } for i := 0; i <= 65535; i++ { wg.Add(1) go WaitGroup_scan_port("127.0.0.1", i) } go func() { wg.Wait() close(rep) }() for v := range rep { if v { count++ } } fmt.Printf("------------Open ports: %d-------------\n", count) fmt.Println("------------Scan——Done------------")}端口扫描
1.使用WaitGroup进行并发,需要控制并发数量,可能会因为资源不够而导致结果不准确、不完整。
func start_WaitGroup_scan_port() { var ( wg sync.WaitGroup ch = make(chan int, 1024) // 增加缓冲区,减少阻塞 count int workers = 100 // 控制并发数 ) var scanPort = func(hostname string, port int) { defer wg.Done() address := fmt.Sprintf("%s:%d", hostname, port) conn, err := net.DialTimeout("tcp", address, 2*time.Second) if err == nil { conn.Close() ch <- port } } // 控制并发数 sem := make(chan int, workers) for i := 0; i < 65536; i++ { wg.Add(1) sem <- 1 go func(port int) { defer func() { <-sem }() scanPort("127.0.0.1", port) }(i) } go func() { wg.Wait() close(ch) }() for port := range ch { fmt.Printf("open: %d\n", port) count++ } fmt.Printf("------------Open ports: %d-------------\n", count) fmt.Println("------------Scan——Done------------")}2.使用通道,即单协程进行端口扫描
func start_Ch_scan_port() { var ( ch = make(chan int) count int = 0 ) var Ch_scan_port = func(hostname string) { defer close(ch) for i := 0; i < 65535; i++ { //扫描到开放端口放到ch通道中即可,主函数一直等待读取通道信息 address := fmt.Sprintf("%s:%d", hostname, i) //ip端口 conn, err := net.DialTimeout("tcp", address, 2*time.Second) //2秒 if err == nil { conn.Close() ch <- i } } runtime.Goexit() } go Ch_scan_port("127.0.0.1") runtime.Gosched() //等待go协程执行完 for i := range ch { fmt.Printf("open: %d\n", i) count++ } fmt.Printf("------------Open ports: %d-------------\n", count) fmt.Println("------------Scan——Done------------")}股票监控
这一题中能够领悟到挺多东西。
1.管道的读取数据,一般是不同协程之间进行读写操作,所以我们需要对读或者写分开协程操作2.go协程之间需要不断对一个数据进行操作的话,一般要开一个通道然后通过对通道进行读写操作最终效果
示例代码:
package mainimport ( "fmt" "math/rand" "sync/atomic" "time")var price atomic.Value //股票价格var priceCh = make(chan float64) //股票通道,读写之间的一个通道var stopCh = make(chan bool) //开关监控// 模拟股票涨跌(随机)func randGP() float64 { return rand.Float64()*10 + 100 //控制在110之间}// ticker 定时获取股票价格。模拟监控股票,且变更十次func myTicker() { ticker := time.NewTicker(time.Second * 1) for { select { case <-ticker.C: priceCh <- randGP() case <-stopCh: fmt.Println("结束监控。") return } }}func update_g() { for i := range priceCh { price.Store(i) fmt.Printf("股票更新为:%.2f\n", i) }}func main() { go myTicker() go update_g() time.Sleep(5 * time.Second) // 监控5秒 close(stopCh) // 结束监控,close后stopCh会等于0,case能够读取 //获取最终股票价格 fmt.Printf("最终股票价格为:%.2f\n", price.Load())}
页:
[1]