sonumb

Go - conn.Read() 를 비동기 처리하기 - 데드라인과 채널 본문

개발자 이야기/Go

Go - conn.Read() 를 비동기 처리하기 - 데드라인과 채널

sonumb 2021. 11. 2. 18:11

개요

다수의 클라이언트 접속과 요청을 각각의 고루틴으로 처리하는 서버가 있다고 하자.

이 서버에서 고루틴이 정상적으로 종료하는 경우는 두 가지 정도가 있다.

1. 클라이언트 접속 종료

2. 관리자가 서버 애플리케이션 종료

 

방안 1

이 두가지를 동시에 처리하기 위해서, 서버 애플리케이션은 클라이언트 접속을 확인하는 동작과 특정 변수를 통해 애플리케이션 종료가 진행되고 있는지 확인하는 동작이 순차적으로 행해져야 한다.

서버 애플리케이션 종료는 chan 혹은 context로 제어될 수 있으며, 클라이언트 접속 여부를 확인하기 위해서 net.Conn 객체의 Read() 함수를 호출해야 한다.  다만, 이 함수는 기본적으로 동기함수로 처리되므로, 데이터가 수신되어 리턴되기 전까지 context.Done()을 읽는 행위를 하지 못한다. 허나, 이를 net.Conn의 데드라인을 이용해 Read()를 비동기로 처리할 수 있다.

순서를 정리하자면,

  1. conn의 데드라인 지정
  2. Read() 실행: 읽을 데이터가 없어도 Read()함수 리턴
  3. context.Done()이 세팅되었다면 종료
  4. 아니라면, 1번 실행 

와 같다.

 

방안 2

Read() 를 실행하는 고루틴을 새로 생성하고, 읽은 데이터가 발생하면, chan을 통해 알린다.

이 채널을 기존 select 루프에서 확인하는 것이다.

또한, context.Done()이 호출되면, 서버의 모든 net.Conn을 Close()하여 클라이언트와의 접속을 끊는다.

순서를 정리하자면,

 

  1. go루틴 생성( 데이터 read chan과 연결 종료를 알리는 chan이 필요)
  2. for 문으로 아래 내용 실행
    1. 데이터를 수신했다면, 출력
    2. ctx.Done()이 세팅되었다면 종료
    3. 데이터를 수신하는 고루틴이 종료했다면 종료

이다.

✅ 추가:
 리스너의 Accept() 함수는 데드라인이 없다. 따라서 위와 동일하게 context나 사용자 생성한 채널에 신호를 받아, 리스너의 Close()를 호출하는 방식으로 처리한다.

코드로 간단하게 기술하면 아래와 같다.
var quitCh chan bool

func ListenerWithCancel(parentCtx context.Context) {
  ctx, cancel := context.WithCancel(parentCtx)
  
  errCh := make(chan error)
  defer close(errch)

quitCh = make(chan bool)
  defer close(quitCh)

  go func () { 
    conn, err := l.Accept()
    if err != nil {
      str := err.Error()
      if strings.Contains(str, "use of closed network connection") {
        // 리스너 닫힘
        return
      } else {
        errCh <- err
        return
      }
    }

    go WorkerThread(conn)
    
  }()

  for {
    select {
    case err := < errCh
      println("비정상종료:", err.Error())
      return
      
    case <-quitCh:
      println("정상종료")
      cancel()

    case <- ctx.Done():
      l.Close()
      return
    } // select
  } // for
}

 

소스코드

⛔️ 주의
아래 소스코드는 자원해제 같은 것들은 전혀 신경쓰지 않았다.
실제 업무용으로 적용하려면, 후처리를 신경써야 한다.

방안1

package main

import (
  "context"
  "sync"
  "fmt"
  "syscall"
  "time"
  "net"
  "os"
  "log"
  "io"
  "runtime"
)

func Gettid_thread_selfid() (tid int) {
  r0, _, _ := syscall.RawSyscall(syscall.SYS_THREAD_SELFID, 0, 0, 0)
  tid = int(r0)
  return
}

var clientCancel context.CancelFunc

func clientGoroutine(ctx context.Context, wg *sync.WaitGroup) {
  runtime.LockOSThread()
  defer runtime.UnlockOSThread()

  wg.Add(1)
  defer wg.Done()

  t := time.NewTicker(200 * time.Millisecond)
  ctx1, cancel := context.WithCancel(ctx)
  clientCancel = cancel

  conn, _ := net.Dial("tcp", ":5030")
  defer conn.Close()
  conn.Write([]byte("abc"))

  for {
    select {
    case <-quitCh:
      fmt.Println("quit client routine",Gettid_thread_selfid(), ": Done")
      return

    case <-ctx1.Done():
      fmt.Println("c1:",Gettid_thread_selfid(), ": Done")
      return

    case <-t.C:
      // do nothing
    }
  }
}

var quitCh chan bool

func serverGoroutine(ctx context.Context, wg *sync.WaitGroup) {
  runtime.LockOSThread()
  defer runtime.UnlockOSThread()

  wg.Add(1)
  defer wg.Done()

  buf := make([]byte, syscall.Getpagesize())
  t := time.NewTicker(200 * time.Millisecond)
  ctx1, _ := context.WithCancel(ctx)

  l, _ := net.Listen("tcp", ":5030")
  conn, _ := l.Accept()

  for {
    conn.SetReadDeadline(time.Now().Add(300 * time.Millisecond))
    n, err := conn.Read(buf)
    if err == io.EOF { /* EOF */
      println("End-of-connection")
      break
    }

    if err == nil {
      s := string(buf[:n])
      println("Server: client data [",s,"]")
    } else {
      if os.IsTimeout(err) == true {
        // ok: timeout
      } else {
        log.Panicln(err.Error())
        return
      }
    }

    select {
    case <-ctx1.Done():
      fmt.Println("c2:",Gettid_thread_selfid(), ": Done")
      return

    case <-t.C:
      break
    }
  }
}

func main() {
  runtime.GOMAXPROCS(runtime.NumCPU())

  quitCh = make(chan bool)

  ctx, cancelParent := context.WithCancel(context.Background())

  wg := &sync.WaitGroup{}

  go serverGoroutine(ctx, wg)
  time.Sleep(100 * time.Millisecond)

  go clientGoroutine(ctx, wg)

  time.Sleep(3 * time.Second)

  clientCancel()

  for i := 0 ; i < 3 ; i++ {
    time.Sleep(1 * time.Second)
    println("main sleep")
  }

  //quitCh <- true

  for i := 0 ; i < 3 ; i++ {
    time.Sleep(1 * time.Second)
  }

  cancelParent()

  wg.Wait()
}

 

실행 및 결과는 다음과 같다.

$ go build ex1.go ;./ex1 
Server: client data [ abc ]
c1: 214348 : Done
End-of-connection
main sleep
main sleep
main sleep

$

 

 

방안2

package main

import (
  "context"
  "sync"
  "fmt"
  "syscall"
  "time"
  "net"
  "io"
  "runtime"
)

func Gettid_thread_selfid() (tid int) {
  r0, _, _ := syscall.RawSyscall(syscall.SYS_THREAD_SELFID, 0, 0, 0)
  tid = int(r0)
  return
}

var clientCancel context.CancelFunc

func clientGoroutine(ctx context.Context, wg *sync.WaitGroup) {
  runtime.LockOSThread()
  defer runtime.UnlockOSThread()

  wg.Add(1)
  defer wg.Done()

  t := time.NewTicker(1 * time.Second)
  ctx1, cancel := context.WithCancel(ctx)
  clientCancel = cancel

  conn, _ := net.Dial("tcp", ":5030")
  defer conn.Close()

  for {
    select {
    case <-quitCh:
      fmt.Println("quit client routine",Gettid_thread_selfid(), ": Done")
      return

    case <-ctx1.Done():
      fmt.Println("Client:",Gettid_thread_selfid(), ": Done")
      return

    case <-t.C:
      _, err := conn.Write([]byte("TESTDATA"))
      if err != nil {
        println("Client: Write fail:", err.Error() )
        return
      }
    }
  }
}

var quitCh chan bool

var serverCancel context.CancelFunc

func serverGoroutine(ctx context.Context, wg *sync.WaitGroup) {
  runtime.LockOSThread()
  defer runtime.UnlockOSThread()

  wg.Add(1)
  defer wg.Done()

  buf := make([]byte, syscall.Getpagesize())
  //t := time.NewTicker(200 * time.Millisecond)
  ctx1, cancel := context.WithCancel(ctx)
  serverCancel = cancel

  l, _ := net.Listen("tcp", ":5030")
  conn, _ := l.Accept()

  isReadDataCh := make(chan int)
  errCh        := make(chan error)

  go func() {
    for {
      n, err := conn.Read(buf)
      if err == nil {
        isReadDataCh <- n
      } else {
        if err == io.EOF { /* EOF */
          println("Server: End-of-connection")
        }

        errCh <- err
        conn.Close()
        return
      }
    }
  }()

  for {
    select {
    case nbyte := <-isReadDataCh:
      s := string(buf[:nbyte])
      println("Server: client data [",s,"]")

    case err := <-errCh:
      fmt.Println("Server:", err.Error(),": Close")
      return

    case <-ctx1.Done():
      //  실제 구현상으로, 클라이언트에게 접속 종료 프로토콜을 실행해야 한다. (graceful Shutdown)
      conn.Close()
      fmt.Println("Server:",Gettid_thread_selfid(), ": Close")
      return
    //case <-t.C:
      // do nothing
    }
  }
}

func main() {
  runtime.GOMAXPROCS(runtime.NumCPU())

  quitCh = make(chan bool)

  ctx, cancelParent := context.WithCancel(context.Background())

  wg := &sync.WaitGroup{}

  go serverGoroutine(ctx, wg)
  time.Sleep(100 * time.Millisecond)

  go clientGoroutine(ctx, wg)

  for i := 0 ; i < 3 ; i++ {
    time.Sleep(1 * time.Second)
    println("main: sleep")
  }

  println("main: clientCancel()")
  //serverCancel()
  clientCancel()

  for i := 0 ; i < 3 ; i++ {
    time.Sleep(1 * time.Second)
    println("main: sleep")
  }

  for i := 0 ; i < 3 ; i++ {
    time.Sleep(1 * time.Second)
  }

  cancelParent()

  wg.Wait()
}

실행 및 결과는 다음과 같다.

$ go build ex2.go; ./ex2
main: sleep
Server: client data [ TESTDATA ]
main: sleep
Server: client data [ TESTDATA ]
main: sleep
main: clientCancel()
Client: 54472 : Done
Server: client data [ TESTDATA ]
Server: End-of-connection
Server: EOF : Close
main: sleep
main: sleep
main: sleep

$
반응형