[关闭]
@mSolo 2015-05-07T09:04:14.000000Z 字数 9537 阅读 2079

Golang 自学之路(三):并发与网络

Go Golang


高级 Go

Goroutines

An experiential rule of thumb seems to be that for n cores setting GOMAXPROCS to n-1 yields the best performance, and the following should also be followed:
number of goroutines > 1 + GOMAXPROCS > 1

  1. var numCores = flag.Int(“n”, 2, number of CPU cores to use”)
  2. runtime.GOMAXPROCS(*numCores)
  1. func main() {
  2. go longWait()
  3. go shortWait()
  4. fmt.Println(“About to sleep in main()”)
  5. time.Sleep(10 * 1e9)
  6. fmt.Println(“At the end of main()”)
  7. }
  8. func longWait() {
  9. fmt.Println(“Beginning longWait()”)
  10. time.Sleep(5 * 1e9) // sleep for 5 seconds
  11. fmt.Println(“End of longWait()”)
  12. }
  13. func shortWait() {
  14. fmt.Println(“Beginning shortWait()”)
  15. time.Sleep(2 * 1e9) // sleep for 2 seconds
  16. fmt.Println(“End of shortWait()”)
  17. }

using recover with goroutines

  1. func server(workChan <-chan *Work) {
  2. for work := range workChan {
  3. go safelyDo(work)
  4. }
  5. }
  6. func safelyDo(work *Work) {
  7. defer func() {
  8. if err := recover(); err != nil {
  9. log.Printf(“work failed with %s in %v:”, err, work)
  10. }
  11. }()
  12. do(work)
  13. }

benchmarking goroutines

  1. func main() {
  2. fmt.Println(“sync”, testing.Benchmark(BenchmarkChannelSync).String())
  3. fmt.Println(“buffered”, testing.Benchmark(BenchmarkChannelBuffered).String())
  4. }
  5. func BenchmarkChannelSync(b *testing.B) {
  6. ch := make(chan int)
  7. go func() {
  8. for i := 0; i < b.N; i++ {
  9. ch <- i
  10. }
  11. close(ch)
  12. }()
  13. for _ = range ch {
  14. }
  15. }
  16. func BenchmarkChannelBuffered(b *testing.B) {
  17. ch := make(chan int, 128)
  18. go func() {
  19. for i := 0; i < b.N; i++ {
  20. ch <- i
  21. }
  22. close(ch)
  23. }()
  24. for _ = range ch {
  25. }
  26. }
  27. /* Output:
  28. Windows: N Time 1 op Operations per sec
  29. sync 1000000 2443 ns/op --> 409 332 / s
  30. buffered 1000000 4850 ns/op --> 810 477 / s

Channels

  1. var ch1 chan string
  2. ch1 = make(chan string)
  3. ch1 := make(chan string)
  4. buf := 100
  5. ch1 := make(chan string, buf)
  6. chanOfChans := make(chan chan int)
  7. funcChan := chan func()
  1. func main() {
  2. ch := make(chan string)
  3. go sendData(ch)
  4. go getData(ch)
  5. time.Sleep(1e9)
  6. }
  7. func sendData(ch chan string) {
  8. ch <- Washington
  9. ch <- Tripoli
  10. ch <- London
  11. }
  12. func getData(ch chan string) {
  13. var input string
  14. for { input = <-ch; fmt.Printf(“%s “, input) }
  15. }

Semaphore pattern

  1. type Empty interface {}
  2. var empty Empty
  3. ...
  4. data := make([]float64, N)
  5. res := make([]float64, N)
  6. sem := make(chan Empty, N) // semaphore
  7. ...
  8. for i, xi := range data {
  9. go func (i int, xi float64) {
  10. res[i] = doSomething(i,xi)
  11. sem <- empty
  12. } (i, xi)
  13. }
  14. for i := 0; i < N; i++ { // wait for goroutines to finish
  15. <-sem
  16. }

Channel Factory pattern

  1. func main() {
  2. stream := pump()
  3. go suck(stream) // shortened : go suck( pump() )
  4. time.Sleep(1e9)
  5. }
  6. func pump() chan int {
  7. ch := make(chan int)
  8. go func() {
  9. for i := 0; ; i++ {
  10. ch <- i
  11. }
  12. }()
  13. return ch
  14. }
  15. func suck(ch chan int) {
  16. for {
  17. fmt.Println(<-ch)
  18. }
  19. }
  20. func suck(ch chan int) {
  21. go func() {
  22. for v := range ch {
  23. fmt.Println(v)
  24. }
  25. }()
  26. }

Channel directionality

  1. var c = make(chan int) // bidirectional
  2. go source(c)
  3. go sink(c)
  4. func source(ch chan<- int) {
  5. for { ch <- 1 }
  6. }
  7. func sink(ch <-chan int) {
  8. for { <-ch }
  9. }

closing a channel

  1. func sendData(ch chan string) {
  2. ch <- Washington
  3. ch <- Tripoli
  4. ch <- London
  5. ch <- Beijing
  6. ch <- Tokio
  7. close(ch)
  8. }
  9. func getData(ch chan string) {
  10. for {
  11. input, open := <-ch
  12. if !open {
  13. break
  14. }
  15. fmt.Printf(“%s “, input)
  16. }
  17. }

Switching between goroutines with select

  1. select {
  2. case u:= <- ch1:
  3. ...
  4. case v:= <- ch2:
  5. ...
  6. default: // no value ready to be received
  7. ...
  8. }

channels with timeouts and tickers

  1. import time
  2. rate_per_sec := 10
  3. var dur Duration = 1e8 // rate_per_sec
  4. chRate := time.Tick(dur) // every 1/10th of a second
  5. for req := range requests {
  6. <- chRate // rate limit our Service.Method RPC calls
  7. go client.Call(“Service.Method”, req, ...)
  8. }
  1. func main() {
  2. tick := time.Tick(1e8)
  3. boom := time.After(5e8)
  4. for {
  5. select {
  6. case <-tick:
  7. fmt.Println(“tick.”)
  8. case <-boom:
  9. fmt.Println(“BOOM!”)
  10. return
  11. default:
  12. fmt.Println(“ .”)
  13. time.Sleep(5e7)
  14. }
  15. }
  16. }

Tasks and Worker Processes

  1. type Pool struct {
  2. Mu sync.Mutex
  3. Tasks []Task
  4. }
  5. func Worker(pool *Pool) {
  6. for {
  7. pool.Mu.Lock()
  8. // begin critical section:
  9. task := pool.Tasks[0] // take the first task
  10. pool.Tasks = pool.Tasks[1:] // update the pool
  11. // end critical section
  12. pool.Mu.Unlock()
  13. process(task)
  14. }
  15. }
  16. func main() {
  17. pending, done := make(chan *Task), make(chan *Task)
  18. go sendWork(pending) // put tasks with work
  19. for i := 0; i < N; i++ { // start N goroutines to do
  20. go Worker(pending, done)
  21. }
  22. consumeWork(done)
  23. }
  24. func Worker(in, out chan *Task) {
  25. for {
  26. t := <-in
  27. process(t)
  28. out <- t
  29. }
  30. }

lazy generator

  1. var resume chan int
  2. func integers() chan int {
  3. yield := make (chan int)
  4. count := 0
  5. go func () {
  6. for {
  7. yield <- count
  8. count++
  9. }
  10. } ()
  11. return yield
  12. }
  13. func generateInteger() int {
  14. return <-resume
  15. }
  16. func main() {
  17. resume = integers()
  18. fmt.Println(generateInteger()) //=> 0
  19. fmt.Println(generateInteger()) //=> 1
  20. }

implement a mutex

  1. /* mutexes */
  2. func (s semaphore) Lock() {
  3. s.P(1)
  4. }
  5. func (s semaphore) Unlock() {
  6. s.V(1)
  7. }
  8. /* signal-wait */
  9. func (s semaphore) Wait(n int) {
  10. s.P(n)
  11. }
  12. func (s semaphore) Signal() {
  13. s.V(1)
  14. }

A tcp server

  1. import (
  2. fmt
  3. net
  4. )
  5. func main() {
  6. fmt.Println(“Starting the server ...”)
  7. listener, err := net.Listen(“tcp”, localhost:50000”)
  8. if err != nil {
  9. fmt.Println(“Error listening”, err.Error())
  10. return // terminate program
  11. }
  12. for {
  13. conn, err := listener.Accept()
  14. if err != nil {
  15. fmt.Println(“Error accepting”, err.Error())
  16. return
  17. }
  18. go doServerStuff(conn)
  19. }
  20. }
  21. func doServerStuff(conn net.Conn) {
  22. for {
  23. buf := make([]byte, 512)
  24. _, err := conn.Read(buf)
  25. if err != nil {
  26. fmt.Println(“Error reading”, err.Error())
  27. return
  28. }
  29. fmt.Printf(“Received data: %v”, string(buf))
  30. }
  31. }
  1. func main() {
  2. conn, err := net.Dial(“tcp”, localhost:50000”)
  3. if err != nil {
  4. fmt.Println(“Error dialing”, err.Error())
  5. return
  6. }
  7. inputReader := bufio.NewReader(os.Stdin)
  8. fmt.Println(“First, what is your name?”)
  9. clientName, _ := inputReader.ReadString(‘\n’)
  10. // fmt.Printf(“CLIENTNAME %s”,clientName)
  11. trimmedClient := strings.Trim(clientName, \r\n”)
  12. for {
  13. fmt.Println(“What to send to the server? Type Q to quit.”)
  14. input, _ := inputReader.ReadString(‘\n’)
  15. trimmedInput := strings.Trim(input, \r\n”)
  16. if trimmedInput == Q {
  17. return
  18. }
  19. _, err = conn.Write([]byte(trimmedClient + says: + trimmedInput))
  20. }
  21. }
  1. func main() {
  2. var (
  3. host = www.apache.org
  4. port = 80
  5. remote = host + “:” + port
  6. msg string = GET / \n
  7. data = make([]uint8, 4096)
  8. read = true
  9. count = 0
  10. )
  11. con, err := net.Dial(“tcp”, remote)
  12. io.WriteString(con, msg)
  13. for read {
  14. count, err = con.Read(data)
  15. read = (err == nil)
  16. fmt.Printf(string(data[0:count]))
  17. }
  18. con.Close()
  19. }
  1. func initServer(hostAndPort string) *net.TCPListener {
  2. serverAddr, err := net.ResolveTCPAddr(“tcp”, hostAndPort)
  3. checkError(err, Resolving address:port failed: `” + hostAndPort + “’”)
  4. listener, err := net.ListenTCP(“tcp”, serverAddr)
  5. checkError(err, “ListenTCP: “)
  6. println(“Listening to: “, listener.Addr().String())
  7. return listener
  8. }
  9. func connectionHandler(conn net.Conn) {
  10. connFrom := conn.RemoteAddr().String()
  11. println(“Connection from: “, connFrom)
  12. sayHello(conn)
  13. for {
  14. var ibuf []byte = make([]byte, maxRead + 1)
  15. length, err := conn.Read(ibuf[0:maxRead])
  16. ibuf[maxRead] = 0 // to prevent overflow
  17. switch err {
  18. case nil:
  19. handleMsg(length, err, ibuf)
  20. case os.EAGAIN: // try again
  21. continue
  22. default:
  23. goto DISCONNECT
  24. }
  25. }
  26. DISCONNECT:
  27. err := conn.Close()
  28. println(“Closed connection: “, connFrom)
  29. checkError(err, “Close: “)
  30. }

a simple webserver

http.URL resp, err := http.Head(url)
http.Request resp.Status
request.ParseForm(); req.FormValue(“var1”)
var1, found := request.Form[“var1”]
http.Response http.ResponseWriter
http.StatusContinue = 100 http.StatusUnauthorized = 401
http.StatusOK = 200 http.StatusForbidden = 403
http.StatusFound = 302 http.StatusNotFound = 404
http.StatusBadRequest = 400 http.StatusInternalServerError=500

- http.Redirect(w ResponseWriter, r *Request, url string, code int)
- http.NotFound(w ResponseWriter, r *Request)
- `http.Error(w ResponseWriter, error string, code int)

  1. package main
  2. import (
  3. fmt
  4. net/http
  5. log
  6. )
  7. func HelloServer(w http.ResponseWriter, req *http.Request) {
  8. fmt.Println(“Inside HelloServer handler”)
  9. fmt.Fprint(w, Hello,” + req.URL.Path[1:])
  10. // fmt.Fprintf(w, “<h1>%s</h1><div>%s</div>”, title, body)
  11. } // w.Header().Set(“Content-Type”, “../..”)
  12. func main() {
  13. http.HandleFunc(“/”,HelloServer)
  14. err := http.ListenAndServe(“localhost:8080”, nil)
  15. // http.ListenAndServe(“:8080”, http.HandlerFunc(HelloServer)
  16. // http.ListenAndServeTLS()
  17. if err != nil {
  18. log.Fatal(“ListenAndServe: “, err.Error())
  19. }
  20. }
  21. func main() {
  22. xlsUrl := "http://market.finance.sina.com.cn/downxls.php?"
  23. xlsUrl += "date=2014-04-25&symbol=sz000002"
  24. res, err := http.Get(xlsUrl)
  25. CheckError(err)
  26. data, err := ioutil.ReadAll(res.Body)
  27. CheckError(err)
  28. fmt.Printf("%s", string(data))
  29. }
  30. func CheckError(err error) {
  31. if err != nil {
  32. log.Fatalf("Get: %v", err)
  33. }
  34. }

twitter_status.go

  1. type Status struct {
  2. Text string
  3. }
  4. type User struct {
  5. XMLName xml.Name
  6. Status Status
  7. }
  8. func main() {
  9. response, _ := http.Get(“http://twitter.com/users/Googland.xml”)
  10. user := User{xml.Name{“”, user”}, Status{“”}}
  11. xml.Unmarshal(response.Body, &user)
  12. fmt.Printf(“status: %s”, user.Status.Text)
  13. }

Making a web application robust

  1. type HandleFnc func(http.ResponseWriter, *http.Request)
  2. // ...
  3. func main() {
  4. http.HandleFunc(“/test1”, logPanics(SimpleServer))
  5. http.HandleFunc(“/test2”, logPanics(FormServer))
  6. if err := http.ListenAndServe(“:8088”, nil); err != nil {
  7. panic(err)
  8. }
  9. }
  10. func logPanics(function HandleFnc) HandleFnc {
  11. return func(writer http.ResponseWriter, request *http.Request) {
  12. defer func() {
  13. if x := recover(); x != nil {
  14. log.Printf(“[%v] caught panic: %v”, request.RemoteAddr, x)
  15. }
  16. }()
  17. function(writer, request)
  18. }
  19. }

Sending mails with smtp

  1. package main
  2. import (
  3. log
  4. smtp
  5. )
  6. func main() {
  7. auth := smtp.PlainAuth(
  8. “”,
  9. user@example.com”,
  10. password”,
  11. mail.example.com”,
  12. )
  13. err := smtp.SendMail(
  14. mail.example.com:25”,
  15. auth,
  16. sender@example.org”,
  17. []string{“recipient@example.net”},
  18. []byte(“This is the email body.”),
  19. )
  20. if err != nil {
  21. log.Fatal(err)
  22. }
  23. }
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注