Golang Socket

套接字(socket)是一个抽象层,应用程序可以通过它发送或接收数据,可对其进行像对文件一样的打开、读写和关闭等操作。套接字允许应用程序将I/O插入到网络中,并与网络中的其他应用程序进行通信。网络套接字是IP地址与端口的组合。

TCP

传输控制协议(TCP,Transmission Control Protocol)是一种面向连接的、可靠的、基于字节流的传输层通信协议,由IETF的RFC 793定义。
因为TCP默认是有序的并且是流式传输,当我们连续发送大量的数据时,就会产生粘包或者半包现象

简单示例

简单的实现服务端和客户端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package main

import (
"fmt"
"net"
"os"
)

func handler(conn net.Conn) {
defer func() {
fmt.Println(conn.RemoteAddr(), "\t已断开连接...")
_ = conn.Close()
}()

buff := make([]byte, 4096)
for {
// 读取数据
n, err := conn.Read(buff)
if err != nil {
fmt.Println(err.Error())
return
}
fmt.Println(string(buff[:n]))

// 选择是否写入数据返回
_, _ = conn.Write([]byte("ok"))
}
}

func server() {
addr := "127.0.0.1:8000"
fmt.Println("监听:", addr)
listener, err := net.Listen("tcp", addr)
if err != nil {
fmt.Println(err)
return
}
for {
conn, err := listener.Accept()
if err != nil {
fmt.Println(err)
return
}
fmt.Println(conn.RemoteAddr(), "\t新的连接...")
// 开启goroutine 异步处理
go handler(conn)

}
}

func client() {
conn, err := net.Dial("tcp", "127.0.0.1:8000")
if err != nil {
fmt.Println(err.Error())
return
}
for i := 0; i < 10; i++ {
_, err = conn.Write([]byte(fmt.Sprintf("from client:%d", i)))
if err != nil {
fmt.Println(err)
return
}
buff := make([]byte, 4096)
n, err := conn.Read(buff)
if err != nil {
fmt.Println(err)
return
}
fmt.Println(string(buff[:n]))
}

}

func main() {
fmt.Print("请输入 1 or 2 选择 客户端 or 服务端:")
var res int
_, _ = fmt.Scan(&res)
if res == 1 {
client()
} else if res == 2 {
server()
} else {
fmt.Println("输入错误")
os.Exit(-1)
}

}

添加超时机制

通过 context.WithTimeout 给服务端添加超时机制,连接超过一定时长自动断开连接.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package main

import (
"context"
"fmt"
"gopkg.in/ini.v1"
"net"
"os"
"time"
)

var (
serverPort string
serverIP string
timeOutValue int
)

// init 初始化加载配置文件,加载启动参数
func init() {
cfg, err := ini.Load("./config.ini")
if err != nil {
fmt.Println("加载配置文件失败")
os.Exit(-1)

}

serverPort = cfg.Section("server").Key("port").String()
serverIP = cfg.Section("server").Key("ip").String()
timeOutValue, _ = cfg.Section("server").Key("conn_time_out").Int()

}

// receiveMessage 收取socket消息
func receiveMessage(conn net.Conn, buff []byte, m chan int) {
defer func() {
m <- 1
}()
n, err := conn.Read(buff)
if err != nil {
m <- 2
return
}
fmt.Print(conn.RemoteAddr(), "\t\t的消息:", string(buff[:n]))
_, err = conn.Write([]byte("1\n"))
if err != nil {
fmt.Println(err)
m <- 2
return
}

}

// handler 使用ctx控制goroutine,这里为控制函数
// 异步处理提交的数据
func handler(ctx context.Context, cancel context.CancelFunc, conn net.Conn) {

defer func() {
fmt.Println(conn.RemoteAddr(), "\t已断开连接...")
_ = conn.Close()
cancel()
}()

buff := make([]byte, 1024)
var m = make(chan int, 0)
go receiveMessage(conn, buff, m)
for {
select {
case <-ctx.Done():
fmt.Println(conn.RemoteAddr(), "\t\t超时退出...")
return
case r := <-m:
if r == 2 {
return
} else if r == 1 {
go receiveMessage(conn, buff, m)
}
}
}
}

// main
func main() {
addr := serverIP + ":" + serverPort
fmt.Println("监听:", addr)
listener, err := net.Listen("tcp", addr)
if err != nil {
fmt.Println(err)
return
}
for {
conn, err := listener.Accept()
if err != nil {
fmt.Println(err)
return
}
fmt.Println(conn.RemoteAddr(), "\t新的连接... 连接最长持续 ", timeOutValue, " 秒.")
// 创建 Context WithTimeout 设定超时时间
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(timeOutValue))
go handler(ctx, cancel, conn)

}

}

解决粘包

通过自定义传输报文,在报文中设置数据的长度来解决问题
下面的示例:

  • 使用自己定义数据头标识,此示例中使用”=-=”
  • 使用十六进制四个字节来标识数据长度
  • 使用大端模式构建数据包

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package main

import (
"io"
"fmt"
"net"
)

func process(conn net.Conn) {
defer func() {
_ = conn.Close()
}()
for {
msg, err := decode(conn)
if err == io.EOF {
return
}
if err != nil {
fmt.Println("decode msg failed, err:", err)
return
}
fmt.Println("收到发来的数据:", string(msg))
}
}

func main() {

listen, err := net.Listen("tcp", "127.0.0.1:9999")
if err != nil {
fmt.Println("listen failed, err:", err)
return
}
defer func() {
_ = listen.Close()
}()
for {
conn, err := listen.Accept()
if err != nil {
fmt.Println("accept failed, err:", err)
continue
}
go process(conn)
}
}

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package main


import (
"fmt"
"net"
)

func main() {
conn, err := net.Dial("tcp", "127.0.0.1:9999")
if err != nil {
fmt.Println("dial failed, err", err)
return
}
defer func() {
_ = conn.Close()
}()
for i := 0; i < 20; i++ {
data, err := encode([]byte("hello server"))
if err != nil {
fmt.Println("encode msg failed, err:", err)
return
}
_, _ = conn.Write(data)
}
}

消息编码/解码

提供编码和解码两个方法,方法如下,参考注释

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package main

import (
"bytes"
"encoding/binary"
"errors"
"net"
"strconv"
"strings"
"encoding/hex"
)

// encode 编码数据
func encode(d []byte) ([]byte, error) {
var pack = new(bytes.Buffer)
var err error

// 写入消息头
headerTag := []byte("=-=")

// 十进制数据长度
l := len(d)
// 十进制数据长度转十六进制
lHexStr := strconv.FormatInt(int64(l), 16)
// 十六进制 数据补足位数
fullHexStr := strings.Repeat("0", 8-len(lHexStr)) + lHexStr
msgLength, err := hex.DecodeString(fullHexStr)
if err != nil {
return nil, errors.New("decode hex to string error:" + err.Error())
}

err = binary.Write(pack, binary.BigEndian, headerTag)
if err != nil {
return nil, errors.New("write header tag error::" + err.Error())
}

err = binary.Write(pack, binary.BigEndian, msgLength)
if err != nil {
return nil, errors.New("write message length error:" + err.Error())
}
// 写入数据
err = binary.Write(pack, binary.BigEndian, d)
if err != nil {
return nil, errors.New("write data error:" + err.Error())
}
return pack.Bytes(), nil
}

// decode 解码数据
func decode(conn net.Conn) ([]byte, error) {
// 接收数据头
var tmpHeaderTag = make([]byte, 3)
n, err := conn.Read(tmpHeaderTag)
if err != nil {
return nil, err
}
// 转换数据头
var headerTag = make([]byte, 3) // 此处必须给定长度,长度与协定的一致
err = binary.Read(bytes.NewBuffer(tmpHeaderTag[:n]), binary.BigEndian, headerTag)
if err != nil {
return nil, err
}
// 判断数据头是否为合法的数据头
if string(headerTag) != "=-=" {
return nil, errors.New("error header tag")
}

// 接收数据长度
var tmpDataLength = make([]byte, 4)
n, err = conn.Read(tmpDataLength)
if err != nil {
return nil, err
}
// 转换数据长度
var dataLength = make([]byte, 4)
err = binary.Read(bytes.NewBuffer(tmpDataLength[:n]), binary.BigEndian, dataLength)
if err != nil {
return nil, err
}

// 解析长度数据为16进制字符串
dataLengthHexStr := hex.EncodeToString(dataLength)
// 转换十六进制字符串为十进制整数
var length int64
length, err = strconv.ParseInt(dataLengthHexStr, 16, 64)
if err != nil {
return nil, err
}

// 读取全部数据
totalLength := int(length)
received := 0
var pack []byte
var buff = make([]byte, 4096)
for {
// 接收数据之前判断,已接收数据大小
// 如果余量不足buff的长度,则重新创建余量大小的buff去接收数据
if received >= totalLength {
break
}
r := totalLength - received
if r <= 4096 {
buff = make([]byte, r)
}
// 接收数据
n, err := conn.Read(buff)
if err != nil {
return nil, err
}
// 拼接所有接收到的数据包
pack = append(pack, buff[:n]...)
received += n
}
if pack == nil {
return nil, errors.New("empty data")
}

// 转换接收到的数据
var result = make([]byte, totalLength)
err = binary.Read(bytes.NewBuffer(pack), binary.BigEndian, result)
if err != nil {
return nil, err
}
return result[:], nil
}

TCP数据转发

简单实现tcp转发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package main

import (
"net"
"log"
"time"
"encoding/hex"
)

var targetConn net.Conn
var client net.Conn
var exitSignal = make(chan int, 1)
var reloadSignal = make(chan int, 0)

// 交换管理,当连接异常退出时,这里接收信号并处理
func switchMgr(c1, c2 net.Conn) {
defer func() {
targetConn = nil
client = nil
log.Println("[switchMgr] 连接中断... 退出交换")
}()
go switchData(c1, c2, "server -> term")
go switchData(c2, c1, "term -> server")
<-exitSignal
_ = c1.Close()
_ = c2.Close()
<-exitSignal
return
}

// 交换两个tcp连接的数据
func switchData(c1, c2 net.Conn, tag string) {
defer func() {
log.Printf("[switchData] TAG: %s 退出...", tag)
}()
var buff = make([]byte, 1024)
for {
n, err := c1.Read(buff)
if err != nil {
exitSignal <- 1
log.Printf("[switchData] TAG: %s 读取数据失败: %s", tag, err.Error())
return
}

_, err = c2.Write(buff[:n])
if err != nil {
exitSignal <- 1
log.Printf("[switchData] TAG: %s 写入数据失败: %s", tag, err.Error())
return
}
log.Printf("[switchData] TAG: %s 数据: %s", tag, hex.EncodeToString(buff[:n]))
}
}

// 转发数据目标主机,尝试建立连接
func connectTargetHost() {
// 尝试建立TCP连接到转发目标主机
var n = 1
serverAddr := "192.168.100.224"
serverPort := "8901"
log.Printf("[target] 尝试连接 ... Addr:%s Port:%s", serverAddr, serverPort)
for {
conn, err := net.DialTimeout("tcp", serverAddr+":"+serverPort, time.Second*time.Duration(5))
if err != nil {
if n == 3 {
log.Println("[target] 尝试连接达到最大次数,退出程序")
return
}
log.Printf("[target] 第 %d 次尝试连接目标主机失败:%s 10秒后重试...", n, err.Error())
n++
time.Sleep(time.Duration(10) * time.Second)
continue
}

targetConn = conn
log.Printf("[target] 连接目标主机成功:%s ", targetConn.RemoteAddr().String())
break
}

}

// 主函数,当一方连接异常时,关闭全部连接重新建立
func run() {
defer func() {

if targetConn != nil {
_ = targetConn.Close()
}
if client != nil {
_ = client.Close()
}

reloadSignal <- 1
log.Println("[main] 结束运行...\n--------------------------------------------------------------")
}()
log.Println("[main] TCP 转发程序启动... 监听端口: 8901")
listener, err := net.Listen("tcp", "0.0.0.0:8901")
if err != nil {
log.Fatal("[main] 监听端口失败:", err.Error())
}
defer func() {
_ = listener.Close()
}()

connectTargetHost()

log.Println("[main] 等待客户端接入...")

conn, err := listener.Accept()
if err != nil {
log.Println("[main] 连接接受失败:", err.Error())
return
}
log.Printf("[main] 新客户端接入: %s", conn.RemoteAddr().String())

client = conn
log.Printf("[main] 开始数据交换: 服务端: %s 客户端: %s", targetConn.RemoteAddr().String(), client.RemoteAddr().String())
switchMgr(targetConn, client)

return

}

func main() {
go run()
for {
select {
case <-reloadSignal:
time.Sleep(time.Second * time.Duration(3))
go run()
default:
time.Sleep(time.Second)
}
}
}

UDP

一种不可靠的协议,面向无连接,不保证数据的完整性顺序性,但是传输效率很高

特别场景下很有用处,如果使用udp传输重要数据,需要自行实现分包,校验,重传

简单的局域网广播示例

本示例可用于局域网发现设备案例使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package main

import (
"log"
"net"
"sync"
"time"
)

// 接收本地9000端口的消息
func receive() {
lAddr := &net.UDPAddr{
IP: net.IPv4(0, 0, 0, 0),
Port: 9000,
}
serverConn, err := net.ListenUDP("udp", lAddr)
if err != nil {
log.Println(err.Error())
return
}
defer func() {
_ = serverConn.Close()
}()

buff := make([]byte, 4096)
for {
n, rAddr, err := serverConn.ReadFromUDP(buff)
if err != nil {
log.Println(err.Error())
return
}
log.Printf("地址: %s 数据: %s", rAddr.String(), string(buff[:n]))
}
}

// 使用本机网卡向全网发送广播,端口为9000
func send(w *sync.WaitGroup) {
rAddr := &net.UDPAddr{
IP: net.IPv4(255, 255, 255, 255),
Port: 9000,
}
conn, err := net.DialUDP("udp", nil, rAddr)
if err != nil {
log.Println(err.Error())
return
}
defer func() {
w.Done()
_ = conn.Close()
}()
n := 0
for {
if n > 10 {
break
}
log.Println("发送第", n, "次")
_, _ = conn.Write([]byte("hello"))
time.Sleep(time.Second * 1)
n++
}
}

func main() {
wg := sync.WaitGroup{}
wg.Add(1)
go receive()
go send(&wg)
wg.Wait()
}