Whamcloud - gitweb
LU-10973 lnet: LUTF infrastructure updates
[fs/lustre-release.git] / lustre / tests / lutf / src / liblutf_connect.c
1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <sys/types.h>
4 #include <sys/socket.h>
5 #include <netinet/in.h>
6 #include <netinet/tcp.h>
7 #include <arpa/inet.h>
8 #include <pthread.h>
9 #include <errno.h>
10 #include <unistd.h>
11 #include <getopt.h>
12 #include <fcntl.h>
13 #include <string.h>
14 #include <strings.h>
15 #include <sys/time.h>
16 #include "lutf.h"
17 #include "lutf_message.h"
18
19 static lutf_rc_t doNonBlockingConnect(int iSockFd, struct sockaddr *psSA,
20                                       int iSAlen, int iNsec)
21 {
22         int iN, iError = 0;
23         int iLen;
24         fd_set rset, wset;
25         struct timeval tval;
26
27         iN = connect(iSockFd, (struct sockaddr *)psSA, iSAlen);
28         if (iN < 0) {
29                 if (errno != EINPROGRESS) {
30                         PERROR("Connect Failed: %s:%d", strerror(errno), errno);
31                         return EN_LUTF_RC_FAIL;
32                 }
33         }
34
35         if (iN != 0) {
36                 FD_ZERO(&rset);
37                 FD_SET(iSockFd, &rset);
38                 wset = rset;
39                 tval.tv_sec = iNsec;
40                 tval.tv_usec = 0;
41
42                 iN = select(iSockFd+1, &rset, &wset, NULL,
43                             iNsec ? &tval : NULL);
44                 if (iN == 0) {
45                         errno = ETIMEDOUT;
46                         PERROR("Select timed out");
47                         return EN_LUTF_RC_FAIL;
48                 }
49
50                 if (iN < 0)
51                         return EN_LUTF_RC_FAIL;
52
53                 if (FD_ISSET(iSockFd, &rset) || FD_ISSET(iSockFd, &wset)) {
54                         iLen = sizeof(iError);
55                         if (getsockopt(iSockFd, SOL_SOCKET, SO_ERROR, &iError,
56                                        (socklen_t *)&iLen) < 0) {
57                                 PERROR("getsockopt failed indicating connect failure, errno= %d",
58                                        errno);
59                                 return EN_LUTF_RC_FAIL;
60                         }
61                 } else {
62                         PERROR("select error: sockfd not set");
63                         return EN_LUTF_RC_FAIL;
64                 }
65         }
66
67         /* There was some error when connecting */
68         if (iError) {
69                 errno = iError;
70                 PERROR("Error on connect. errno = %s", strerror(errno));
71                 return EN_LUTF_RC_FAIL;
72         }
73
74         return EN_LUTF_RC_OK;
75 }
76
77 int establishTCPConnection(unsigned long uiAddress,
78                            int iPort,
79                            bool b_non_block,
80                            bool endian)
81 {
82         int iOption = 1, iFlags;
83         int rsocket;
84         struct sockaddr_in tm_addr;
85         lutf_rc_t eRc = EN_LUTF_RC_OK;
86
87         /* Create TCP socket */
88         rsocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
89         if (rsocket == -1)
90                 return EN_LUTF_RC_FAIL;
91
92         /* Turn off Nagle's algorithm for this TCP socket. */
93         setsockopt(rsocket, IPPROTO_TCP, TCP_NODELAY, (void *)&iOption,
94                    sizeof(iOption));
95
96         iFlags = 1;
97         if (setsockopt(rsocket, SOL_SOCKET, SO_REUSEADDR, (void *)&iFlags,
98                        sizeof(iFlags)) < 0) {
99                 /*  Cannot change the socket options.  */
100                 close(rsocket);
101                 return EN_LUTF_RC_FAIL;
102         }
103
104         iFlags = fcntl(rsocket, F_GETFL, 0);
105         if (b_non_block)
106                 fcntl(rsocket, F_SETFL, iFlags | O_NONBLOCK);
107         else
108                 fcntl(rsocket, F_SETFL, iFlags & (~O_NONBLOCK));
109
110         /* Set address parameters for TCP connection */
111         bzero((char *) &tm_addr, sizeof(tm_addr));
112         tm_addr.sin_addr.s_addr = (endian) ? htonl(uiAddress) : uiAddress;
113         tm_addr.sin_port = (endian) ? htons(iPort) : iPort;
114         tm_addr.sin_family = AF_INET;
115
116         eRc = doNonBlockingConnect(rsocket, (struct sockaddr *)&tm_addr,
117                                    sizeof(tm_addr), SOCKET_CONN_TIMEOUT_SEC);
118         if (eRc != EN_LUTF_RC_OK) {
119                 close(rsocket);
120                 return eRc;
121         }
122
123         return rsocket;
124 }
125
126 lutf_rc_t closeTcpConnection(int iTcpSocket)
127 {
128         int rc;
129
130         PDEBUG("closing socket %d", iTcpSocket);
131         rc = close(iTcpSocket);
132         if (rc && errno != EINPROGRESS && errno != ECONNRESET) {
133                 PERROR("failed to close %d:%d\n", iTcpSocket, errno);
134                 return EN_LUTF_RC_FAIL;
135         }
136
137         return EN_LUTF_RC_OK;
138 }
139
140 /*
141  * sendTcpMessage
142  *   Send a TCP message to the specified TCP socket.
143  *
144  * Parameters:      iTcpSocket - Socket file descriptor
145  *                  pcBody - TCP message to send.
146  *                  iBodySize - size of the body
147  *
148  */
149 lutf_rc_t sendTcpMessage(int iTcpSocket, char *pcBody, int iBodySize)
150 {
151         size_t tNleft;
152         ssize_t tNwritten;
153         char *pcCur;
154
155         if (iTcpSocket == INVALID_TCP_SOCKET)
156                 return(EN_LUTF_RC_FAIL);
157
158         /* Start writing bytes to the socket and keep writing until we have
159          * the requested number of bytes sent.
160          */
161         pcCur = (char *)pcBody;
162         tNleft = iBodySize;
163
164         while (tNleft > 0) {
165                 /*  Send as many bytes, up to current maximum, as we can.  */
166                 tNwritten = write(iTcpSocket, pcCur, tNleft);
167
168                 if (tNwritten < 0) {
169                         if (errno == EINTR) {
170                                 /* We were interrupted, but this is not an
171                                  * error condition.
172                                  */
173                                 tNwritten = 0;
174                         } else {
175                                 /* System error has occurred.  */
176                                 PERROR("Failed to send message (%d, %p, %d, %u)  %s:%d",
177                                        iTcpSocket, pcBody, iBodySize, tNwritten,
178                                        strerror(errno), errno);
179                                 return EN_LUTF_RC_SYS_ERR;
180                         }
181                 }
182
183                 tNleft -= tNwritten;
184                 pcCur += tNwritten;
185         }
186
187         return EN_LUTF_RC_OK;
188 }
189
190 /*
191  * populateMsgHdr
192  *      populate the LUTF message header with the passed in information.
193  *
194  * Parameters:      rsocket - Socket file descriptor
195  *                  msg_hdr - pointer to the message header.
196  *                  msg_type - type of message
197  *                  msg_size - message size
198  *                  lutf_version_number - version number
199  *
200  */
201 lutf_rc_t populateMsgHdr(int rsocket, char *msg_hdr,
202                          int msg_type, int msg_size,
203                          int lutf_version_number)
204 {
205         lutf_message_hdr_t *hdr = NULL;
206         struct sockaddr_in sock;
207         int len = sizeof(sock);
208         int rc;
209
210         if (rsocket == INVALID_TCP_SOCKET ||
211             msg_hdr == NULL) {
212                 PERROR("bad parameter: hdr = %p, socket = %d",
213                        msg_hdr, rsocket);
214                 return EN_LUTF_RC_FAIL;
215         }
216
217         hdr = (lutf_message_hdr_t *)msg_hdr;
218
219         /* get the local IP address we are connected on */
220         rc = getsockname(rsocket,
221                         (struct sockaddr *)&sock,
222                         (socklen_t *)&len);
223         if (rc) {
224                 PERROR("getsockname failure %s:%s:%d",
225                        strerror(errno), strerror(rc), rc);
226                 return EN_LUTF_RC_FAIL;
227         }
228
229         hdr->type = htonl(msg_type);
230         hdr->len = htonl(msg_size);
231         hdr->ip.s_addr = sock.sin_addr.s_addr;
232         hdr->version = htonl(lutf_version_number);
233
234         return EN_LUTF_RC_OK;
235 }
236
237 lutf_rc_t readTcpMessage(int iFd, char *pcBuffer,
238                          int iBufferSize, int iTimeout)
239 {
240         size_t tNleft;
241         ssize_t tNread;
242         char *pcCur;
243         struct timeval sTimeout;
244         int iFlags;
245
246         /* Grab a copy of the client's file descriptor
247          * (and make sure it isn't -1).
248          */
249         if (iFd == -1)
250                 return EN_LUTF_RC_CLIENT_CLOSED;
251
252         /* set the timeout */
253         if (iTimeout) {
254                 sTimeout.tv_sec = iTimeout;
255                 sTimeout.tv_usec = 0;
256                 setsockopt(iFd, SOL_SOCKET, SO_RCVTIMEO, (void *)&sTimeout,
257                                 sizeof(sTimeout));
258                 setsockopt(iFd, SOL_SOCKET, SO_SNDTIMEO, (void *)&sTimeout,
259                                 sizeof(sTimeout));
260
261                 iFlags = fcntl(iFd, F_GETFL, 0);
262                 fcntl(iFd, F_SETFL, iFlags & (~O_NONBLOCK));
263         } else {
264                 /* if no timeout specified do a non blocking read */
265                 iFlags = fcntl(iFd, F_GETFL, 0);
266                 fcntl(iFd, F_SETFL, iFlags | O_NONBLOCK);
267         }
268
269         /* Start reading in bytes from the socket and keep reading until we have
270          * the requested number of bytes or EOF occurs.
271          */
272         pcCur = pcBuffer;
273         tNleft = iBufferSize;
274         while (tNleft > 0) {
275                 /*  Get as many bytes, up to current maximum as we can.  */
276                 tNread = read(iFd, pcCur, tNleft);
277
278                 if (tNread < 0) {
279                         if (errno == EINTR) {
280                                 /* We were interrupted, but this is not an
281                                  * error condition.
282                                  */
283                                 tNread = 0;
284                         } else if ((errno == EAGAIN) && (!iTimeout)) {
285                                 return EN_LUTF_RC_SOCKET_FAIL;
286                         } else {
287                                 /*  System error has occurred. */
288                                 return EN_LUTF_RC_SOCKET_FAIL;
289                         }
290                 } else {
291                         if (tNread == 0) {
292                                 /* End of file encountered. This is most
293                                  * likely the client closing their end of the
294                                  * socket.
295                                  */
296                                 return EN_LUTF_RC_SOCKET_FAIL;
297                         }
298                 }
299
300                 tNleft -= tNread;
301                 pcCur += tNread;
302         }
303
304         return EN_LUTF_RC_OK;
305 }
306
307 lutf_rc_t lutf_send_msg(int fd, char *msg, size_t msg_size,
308                         lutf_msg_type_t type)
309 {
310         lutf_rc_t rc = EN_LUTF_RC_RPC_FAIL;
311         lutf_message_hdr_t hdr;
312
313         rc = populateMsgHdr(fd, (char *)&hdr, type,
314                             msg_size, LUTF_VERSION_NUMBER);
315         if (rc != EN_LUTF_RC_OK) {
316                 PERROR("Failed to populate message header");
317                 return rc;
318         }
319
320         rc = sendTcpMessage(fd, (char *)&hdr, sizeof(hdr));
321         if (rc != EN_LUTF_RC_OK) {
322                 PERROR("Failed to send msg header");
323                 return rc;
324         }
325
326         if (msg_size) {
327                 rc = sendTcpMessage(fd, msg, msg_size);
328                 if (rc != EN_LUTF_RC_OK) {
329                         PERROR("Failed to send msg body");
330                         return rc;
331                 }
332         }
333
334         return rc;
335 }