반응형
Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
Tags
- 전처리기
- Pointer
- 포인터
- DBMS 개발
- getopts
- bash
- Windows via c/c++
- Preprocessor
- 컴퓨터 강좌
- UNIX Internals
- 함수포인터
- 구조와 원리
- FreeBSD
- 한빛미디어
- OS 커널
- 긴옵션
- Programming
- SQLite
- Golang
- newSQL
- go
- UNIX
- kernel
- TiKV
- DBMS
- 커널
- Symbol
- 약어
- TiDB
- 포인터변수
Archives
- Today
- Total
sonumb
Go - conn.Read() 를 비동기 처리하기 - 데드라인과 채널 본문
개요
다수의 클라이언트 접속과 요청을 각각의 고루틴으로 처리하는 서버가 있다고 하자.
이 서버에서 고루틴이 정상적으로 종료하는 경우는 두 가지 정도가 있다.
1. 클라이언트 접속 종료
2. 관리자가 서버 애플리케이션 종료
방안 1
이 두가지를 동시에 처리하기 위해서, 서버 애플리케이션은 클라이언트 접속을 확인하는 동작과 특정 변수를 통해 애플리케이션 종료가 진행되고 있는지 확인하는 동작이 순차적으로 행해져야 한다.
서버 애플리케이션 종료는 chan 혹은 context로 제어될 수 있으며, 클라이언트 접속 여부를 확인하기 위해서 net.Conn 객체의 Read() 함수를 호출해야 한다. 다만, 이 함수는 기본적으로 동기함수로 처리되므로, 데이터가 수신되어 리턴되기 전까지 context.Done()을 읽는 행위를 하지 못한다. 허나, 이를 net.Conn의 데드라인을 이용해 Read()를 비동기로 처리할 수 있다.
순서를 정리하자면,
- conn의 데드라인 지정
- Read() 실행: 읽을 데이터가 없어도 Read()함수 리턴
- context.Done()이 세팅되었다면 종료
- 아니라면, 1번 실행
와 같다.
방안 2
Read() 를 실행하는 고루틴을 새로 생성하고, 읽은 데이터가 발생하면, chan을 통해 알린다.
이 채널을 기존 select 루프에서 확인하는 것이다.
또한, context.Done()이 호출되면, 서버의 모든 net.Conn을 Close()하여 클라이언트와의 접속을 끊는다.
순서를 정리하자면,
- go루틴 생성( 데이터 read chan과 연결 종료를 알리는 chan이 필요)
- for 문으로 아래 내용 실행
- 데이터를 수신했다면, 출력
- ctx.Done()이 세팅되었다면 종료
- 데이터를 수신하는 고루틴이 종료했다면 종료
이다.
✅ 추가: 리스너의 Accept() 함수는 데드라인이 없다. 따라서 위와 동일하게 context나 사용자 생성한 채널에 신호를 받아, 리스너의 Close()를 호출하는 방식으로 처리한다. 코드로 간단하게 기술하면 아래와 같다. |
var quitCh chan bool
func ListenerWithCancel(parentCtx context.Context) {
ctx, cancel := context.WithCancel(parentCtx)
errCh := make(chan error)
defer close(errch)
quitCh = make(chan bool)
defer close(quitCh)
go func () {
conn, err := l.Accept()
if err != nil {
str := err.Error()
if strings.Contains(str, "use of closed network connection") {
// 리스너 닫힘
return
} else {
errCh <- err
return
}
}
go WorkerThread(conn)
}()
for {
select {
case err := < errCh
println("비정상종료:", err.Error())
return
case <-quitCh:
println("정상종료")
cancel()
case <- ctx.Done():
l.Close()
return
} // select
} // for
}
소스코드
⛔️ 주의 아래 소스코드는 자원해제 같은 것들은 전혀 신경쓰지 않았다. 실제 업무용으로 적용하려면, 후처리를 신경써야 한다. |
방안1
package main
import (
"context"
"sync"
"fmt"
"syscall"
"time"
"net"
"os"
"log"
"io"
"runtime"
)
func Gettid_thread_selfid() (tid int) {
r0, _, _ := syscall.RawSyscall(syscall.SYS_THREAD_SELFID, 0, 0, 0)
tid = int(r0)
return
}
var clientCancel context.CancelFunc
func clientGoroutine(ctx context.Context, wg *sync.WaitGroup) {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
wg.Add(1)
defer wg.Done()
t := time.NewTicker(200 * time.Millisecond)
ctx1, cancel := context.WithCancel(ctx)
clientCancel = cancel
conn, _ := net.Dial("tcp", ":5030")
defer conn.Close()
conn.Write([]byte("abc"))
for {
select {
case <-quitCh:
fmt.Println("quit client routine",Gettid_thread_selfid(), ": Done")
return
case <-ctx1.Done():
fmt.Println("c1:",Gettid_thread_selfid(), ": Done")
return
case <-t.C:
// do nothing
}
}
}
var quitCh chan bool
func serverGoroutine(ctx context.Context, wg *sync.WaitGroup) {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
wg.Add(1)
defer wg.Done()
buf := make([]byte, syscall.Getpagesize())
t := time.NewTicker(200 * time.Millisecond)
ctx1, _ := context.WithCancel(ctx)
l, _ := net.Listen("tcp", ":5030")
conn, _ := l.Accept()
for {
conn.SetReadDeadline(time.Now().Add(300 * time.Millisecond))
n, err := conn.Read(buf)
if err == io.EOF { /* EOF */
println("End-of-connection")
break
}
if err == nil {
s := string(buf[:n])
println("Server: client data [",s,"]")
} else {
if os.IsTimeout(err) == true {
// ok: timeout
} else {
log.Panicln(err.Error())
return
}
}
select {
case <-ctx1.Done():
fmt.Println("c2:",Gettid_thread_selfid(), ": Done")
return
case <-t.C:
break
}
}
}
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
quitCh = make(chan bool)
ctx, cancelParent := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
go serverGoroutine(ctx, wg)
time.Sleep(100 * time.Millisecond)
go clientGoroutine(ctx, wg)
time.Sleep(3 * time.Second)
clientCancel()
for i := 0 ; i < 3 ; i++ {
time.Sleep(1 * time.Second)
println("main sleep")
}
//quitCh <- true
for i := 0 ; i < 3 ; i++ {
time.Sleep(1 * time.Second)
}
cancelParent()
wg.Wait()
}
실행 및 결과는 다음과 같다.
$ go build ex1.go ;./ex1
Server: client data [ abc ]
c1: 214348 : Done
End-of-connection
main sleep
main sleep
main sleep
$
방안2
package main
import (
"context"
"sync"
"fmt"
"syscall"
"time"
"net"
"io"
"runtime"
)
func Gettid_thread_selfid() (tid int) {
r0, _, _ := syscall.RawSyscall(syscall.SYS_THREAD_SELFID, 0, 0, 0)
tid = int(r0)
return
}
var clientCancel context.CancelFunc
func clientGoroutine(ctx context.Context, wg *sync.WaitGroup) {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
wg.Add(1)
defer wg.Done()
t := time.NewTicker(1 * time.Second)
ctx1, cancel := context.WithCancel(ctx)
clientCancel = cancel
conn, _ := net.Dial("tcp", ":5030")
defer conn.Close()
for {
select {
case <-quitCh:
fmt.Println("quit client routine",Gettid_thread_selfid(), ": Done")
return
case <-ctx1.Done():
fmt.Println("Client:",Gettid_thread_selfid(), ": Done")
return
case <-t.C:
_, err := conn.Write([]byte("TESTDATA"))
if err != nil {
println("Client: Write fail:", err.Error() )
return
}
}
}
}
var quitCh chan bool
var serverCancel context.CancelFunc
func serverGoroutine(ctx context.Context, wg *sync.WaitGroup) {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
wg.Add(1)
defer wg.Done()
buf := make([]byte, syscall.Getpagesize())
//t := time.NewTicker(200 * time.Millisecond)
ctx1, cancel := context.WithCancel(ctx)
serverCancel = cancel
l, _ := net.Listen("tcp", ":5030")
conn, _ := l.Accept()
isReadDataCh := make(chan int)
errCh := make(chan error)
go func() {
for {
n, err := conn.Read(buf)
if err == nil {
isReadDataCh <- n
} else {
if err == io.EOF { /* EOF */
println("Server: End-of-connection")
}
errCh <- err
conn.Close()
return
}
}
}()
for {
select {
case nbyte := <-isReadDataCh:
s := string(buf[:nbyte])
println("Server: client data [",s,"]")
case err := <-errCh:
fmt.Println("Server:", err.Error(),": Close")
return
case <-ctx1.Done():
// 실제 구현상으로, 클라이언트에게 접속 종료 프로토콜을 실행해야 한다. (graceful Shutdown)
conn.Close()
fmt.Println("Server:",Gettid_thread_selfid(), ": Close")
return
//case <-t.C:
// do nothing
}
}
}
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
quitCh = make(chan bool)
ctx, cancelParent := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
go serverGoroutine(ctx, wg)
time.Sleep(100 * time.Millisecond)
go clientGoroutine(ctx, wg)
for i := 0 ; i < 3 ; i++ {
time.Sleep(1 * time.Second)
println("main: sleep")
}
println("main: clientCancel()")
//serverCancel()
clientCancel()
for i := 0 ; i < 3 ; i++ {
time.Sleep(1 * time.Second)
println("main: sleep")
}
for i := 0 ; i < 3 ; i++ {
time.Sleep(1 * time.Second)
}
cancelParent()
wg.Wait()
}
실행 및 결과는 다음과 같다.
$ go build ex2.go; ./ex2
main: sleep
Server: client data [ TESTDATA ]
main: sleep
Server: client data [ TESTDATA ]
main: sleep
main: clientCancel()
Client: 54472 : Done
Server: client data [ TESTDATA ]
Server: End-of-connection
Server: EOF : Close
main: sleep
main: sleep
main: sleep
$
반응형