Whamcloud - gitweb
LU-10973 lnet: initial LUTF C infrastructure
[fs/lustre-release.git] / lustre / tests / lutf / src / liblutf_connect.c
1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <sys/types.h>
4 #include <sys/socket.h>
5 #include <netinet/in.h>
6 #include <netinet/tcp.h>
7 #include <arpa/inet.h>
8 #include <pthread.h>
9 #include <errno.h>
10 #include <unistd.h>
11 #include <getopt.h>
12 #include <fcntl.h>
13 #include <string.h>
14 #include <strings.h>
15 #include <sys/time.h>
16 #include "lutf.h"
17 #include "lutf_message.h"
18
19 static lutf_rc_t doNonBlockingConnect(int iSockFd, struct sockaddr *psSA,
20                                       int iSAlen, int iNsec)
21 {
22         int iN, iError = 0;
23         int iLen;
24         fd_set rset, wset;
25         struct timeval tval;
26
27         if ((iN = connect(iSockFd, (struct sockaddr *)psSA, iSAlen)) < 0) {
28                 if (errno != EINPROGRESS) {
29                         PERROR("Connect Failed: %s:%d", strerror(errno), errno);
30                         return EN_LUTF_RC_FAIL;
31                 }
32         }
33
34         if (iN != 0) {
35                 FD_ZERO(&rset);
36                 FD_SET(iSockFd, &rset);
37                 wset = rset;
38                 tval.tv_sec = iNsec;
39                 tval.tv_usec = 0;
40
41                 if ((iN = select(iSockFd+1, &rset, &wset, NULL,
42                                  iNsec ? &tval : NULL)) == 0) {
43                         errno = ETIMEDOUT;
44                         PERROR("Select timed out");
45                         return EN_LUTF_RC_FAIL;
46                 }
47
48                 if (iN < 0)
49                         return EN_LUTF_RC_FAIL;
50
51                 if (FD_ISSET(iSockFd, &rset) || FD_ISSET(iSockFd, &wset)) {
52                         iLen = sizeof(iError);
53                         if (getsockopt(iSockFd, SOL_SOCKET, SO_ERROR, &iError, (socklen_t *)&iLen) < 0) {
54                                 PERROR("getsockopt failed indicating connect failure, errno= %d", errno);
55                                 return EN_LUTF_RC_FAIL;
56                         }
57                 } else {
58                         PERROR("select error: sockfd not set");
59                         return EN_LUTF_RC_FAIL;
60                 }
61         }
62
63         /* There was some error when connecting */
64         if (iError) {
65                 errno = iError;
66                 PERROR("Error on connect. errno = %s", strerror(errno));
67                 return EN_LUTF_RC_FAIL;
68         }
69
70         return EN_LUTF_RC_OK;
71 }
72
73 int establishTCPConnection(unsigned long uiAddress,
74                            int iPort,
75                            bool b_non_block,
76                            bool endian)
77 {
78         int iOption = 1, iFlags;
79         int rsocket;
80         struct sockaddr_in tm_addr;
81         lutf_rc_t eRc = EN_LUTF_RC_OK;
82
83         /* Create TCP socket */
84         if ((rsocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP))
85              == -1)
86                 return EN_LUTF_RC_FAIL;
87
88         /* Turn off Nagle's algorithm for this TCP socket. */
89         setsockopt(rsocket, IPPROTO_TCP, TCP_NODELAY, (void *)&iOption,
90                    sizeof(iOption));
91
92         iFlags = 1;
93         if (setsockopt(rsocket, SOL_SOCKET, SO_REUSEADDR, (void *)&iFlags,
94                        sizeof(iFlags)) < 0) {
95                 /*  Cannot change the socket options.  */
96                 close(rsocket);
97                 return EN_LUTF_RC_FAIL;
98         }
99
100         iFlags = fcntl(rsocket, F_GETFL, 0);
101         if (b_non_block)
102                 fcntl(rsocket, F_SETFL, iFlags | O_NONBLOCK);
103         else
104                 fcntl(rsocket, F_SETFL, iFlags & (~O_NONBLOCK));
105
106         /* Set address parameters for TCP connection */
107         bzero((char *) &tm_addr, sizeof(tm_addr));
108         tm_addr.sin_addr.s_addr = (endian) ? htonl(uiAddress) : uiAddress;
109         tm_addr.sin_port = (endian) ? htons(iPort) : iPort;
110         tm_addr.sin_family = AF_INET;
111
112         if ((eRc = doNonBlockingConnect(rsocket,
113                                         (struct sockaddr *)&tm_addr,
114                                         sizeof(tm_addr),
115                                         SOCKET_CONN_TIMEOUT_SEC))
116             != EN_LUTF_RC_OK) {
117                 close(rsocket);
118                 return eRc;
119         }
120
121         return rsocket;
122 }
123
124 lutf_rc_t closeTcpConnection(int iTcpSocket)
125 {
126         int rc;
127
128         PDEBUG("closing socket %d", iTcpSocket);
129         rc = close(iTcpSocket);
130         if (!rc && errno != EINPROGRESS && errno != ECONNRESET) {
131                 PERROR("failed to close %d:%d\n", iTcpSocket, errno);
132                 return EN_LUTF_RC_FAIL;
133         }
134
135         return EN_LUTF_RC_OK;
136 }
137
138 /*
139  * sendTcpMessage
140  *   Send a TCP message to the specified TCP socket.
141  *
142  * Parameters:      iTcpSocket - Socket file descriptor
143  *                  pcBody - TCP message to send.
144  *                  iBodySize - size of the body
145  *
146  */
147 lutf_rc_t sendTcpMessage(int iTcpSocket, char *pcBody, int iBodySize)
148 {
149         size_t tNleft;
150         ssize_t tNwritten;
151         char *pcCur;
152
153         if (iTcpSocket == INVALID_TCP_SOCKET)
154                 return(EN_LUTF_RC_FAIL);
155
156         /* Start writing bytes to the socket and keep writing until we have
157          * the requested number of bytes sent.
158          */
159         pcCur = (char *)pcBody;
160         tNleft = iBodySize;
161
162         while (tNleft > 0) {
163                 /*  Send as many bytes, up to current maximum, as we can.  */
164                 tNwritten = write(iTcpSocket, pcCur, tNleft);
165
166                 if (tNwritten < 0) {
167                         if (errno == EINTR) {
168                                 /* We were interrupted, but this is not an
169                                  * error condition.
170                                  */
171                                 tNwritten = 0;
172                         } else {
173                                 /* System error has occurred.  */
174                                 PERROR("Failed to send message (%d, %p, %d, %u)  %s:%d",
175                                        iTcpSocket, pcBody, iBodySize, tNwritten,
176                                        strerror(errno), errno);
177                                 return EN_LUTF_RC_SYS_ERR;
178                         }
179                 }
180
181                 tNleft -= tNwritten;
182                 pcCur += tNwritten;
183         }
184
185         return EN_LUTF_RC_OK;
186 }
187
188 /*
189  * populateMsgHdr
190  *      populate the LUTF message header with the passed in information.
191  *
192  * Parameters:      rsocket - Socket file descriptor
193  *                  msg_hdr - pointer to the message header.
194  *                  msg_type - type of message
195  *                  msg_size - message size
196  *                  lutf_version_number - version number
197  *
198  */
199 lutf_rc_t populateMsgHdr(int rsocket, char *msg_hdr,
200                          int msg_type, int msg_size,
201                          int lutf_version_number)
202 {
203         lutf_message_hdr_t *hdr = NULL;
204         struct sockaddr_in sock;
205         int len = sizeof(sock);
206         int rc;
207
208         if (rsocket == INVALID_TCP_SOCKET ||
209             msg_hdr == NULL) {
210                 PERROR("bad parameter: hdr = %p, socket = %d",
211                        msg_hdr, rsocket);
212                 return EN_LUTF_RC_FAIL;
213         }
214
215         hdr = (lutf_message_hdr_t *)msg_hdr;
216
217         /* get the local IP address we are connected on */
218         rc = getsockname(rsocket,
219                         (struct sockaddr *)&sock,
220                         (socklen_t *)&len);
221         if (rc) {
222                 PERROR("getsockname failure %s:%s:%d",
223                        strerror(errno), strerror(rc), rc);
224                 return EN_LUTF_RC_FAIL;
225         }
226
227         hdr->type = htonl(msg_type);
228         hdr->len = htonl(msg_size);
229         hdr->ip.s_addr = sock.sin_addr.s_addr;
230         hdr->version = htonl(lutf_version_number);
231
232         return EN_LUTF_RC_OK;
233 }
234
235 lutf_rc_t readTcpMessage(int iFd, char *pcBuffer,
236                          int iBufferSize, int iTimeout)
237 {
238         size_t tNleft;
239         ssize_t tNread;
240         char *pcCur;
241         struct timeval sTimeout;
242         int iFlags;
243
244         /* Grab a copy of the client's file descriptor
245          * (and make sure it isn't -1).
246          */
247         if (iFd == -1)
248                 return EN_LUTF_RC_CLIENT_CLOSED;
249
250         /* set the timeout */
251         if (iTimeout) {
252                 sTimeout.tv_sec = iTimeout;
253                 sTimeout.tv_usec = 0;
254                 setsockopt(iFd, SOL_SOCKET, SO_RCVTIMEO, (void *)&sTimeout,
255                                 sizeof(sTimeout));
256                 setsockopt(iFd, SOL_SOCKET, SO_SNDTIMEO, (void *)&sTimeout,
257                                 sizeof(sTimeout));
258
259                 iFlags = fcntl(iFd, F_GETFL, 0);
260                 fcntl(iFd, F_SETFL, iFlags & (~O_NONBLOCK));
261         } else {
262                 /* if no timeout specified do a non blocking read */
263                 iFlags = fcntl(iFd, F_GETFL, 0);
264                 fcntl(iFd, F_SETFL, iFlags | O_NONBLOCK);
265         }
266
267         /* Start reading in bytes from the socket and keep reading until we have
268          * the requested number of bytes or EOF occurs.
269          */
270         pcCur = pcBuffer;
271         tNleft = iBufferSize;
272         while (tNleft > 0) {
273                 /*  Get as many bytes, up to current maximum as we can.  */
274                 tNread = read(iFd, pcCur, tNleft);
275
276                 if (tNread < 0) {
277                         if (errno == EINTR) {
278                                 /*  We were interrupted, but this is not an error condition.  */
279                                 tNread = 0;
280                         } else if ((errno == EAGAIN) && (!iTimeout)) {
281                                 return EN_LUTF_RC_SOCKET_FAIL;
282                         } else {
283                                 /*  System error has occurred. */
284                                 return EN_LUTF_RC_SOCKET_FAIL;
285                         }
286                 } else {
287                         if (tNread == 0) {
288                                 /* End of file encountered. This is most
289                                  * likely the client closing their end of the
290                                  * socket.
291                                  */
292                                 return EN_LUTF_RC_SOCKET_FAIL;
293                         }
294                 }
295
296                 tNleft -= tNread;
297                 pcCur += tNread;
298         }
299
300         return EN_LUTF_RC_OK;
301 }
302
303 lutf_rc_t lutf_send_msg(int fd, char *msg, size_t msg_size,
304                         lutf_msg_type_t type)
305 {
306         lutf_rc_t rc = EN_LUTF_RC_RPC_FAIL;
307         lutf_message_hdr_t hdr;
308
309         rc = populateMsgHdr(fd, (char *)&hdr, type,
310                             msg_size, LUTF_VERSION_NUMBER);
311         if (rc != EN_LUTF_RC_OK) {
312                 PERROR("Failed to populate message header");
313                 return rc;
314         }
315
316         rc = sendTcpMessage(fd, (char *)&hdr, sizeof(hdr));
317         if (rc != EN_LUTF_RC_OK) {
318                 PERROR("Failed to send msg header");
319                 return rc;
320         }
321
322         if (msg_size) {
323                 rc = sendTcpMessage(fd, msg, msg_size);
324                 if (rc != EN_LUTF_RC_OK) {
325                         PERROR("Failed to send msg body");
326                         return rc;
327                 }
328         }
329
330         return rc;
331 }