Whamcloud - gitweb
b=15316
[fs/lustre-release.git] / lnet / ulnds / socklnd / usocklnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Maxim Patlasov <maxim@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  */
11
12 #include "usocklnd.h"
13
14 static int
15 usocklnd_send_tx_immediately(usock_conn_t *conn, usock_tx_t *tx)
16 {
17         int           rc;
18         int           rc2;
19         int           partial_send = 0;
20         usock_peer_t *peer         = conn->uc_peer;
21
22         LASSERT (peer != NULL);
23
24         /* usocklnd_enqueue_tx() turned it on for us */
25         LASSERT(conn->uc_sending);
26
27         //counter_imm_start++;
28         rc = usocklnd_send_tx(conn, tx);
29         if (rc == 0) { /* partial send or connection closed */
30                 pthread_mutex_lock(&conn->uc_lock);
31                 list_add(&tx->tx_list, &conn->uc_tx_list);
32                 conn->uc_sending = 0;
33                 pthread_mutex_unlock(&conn->uc_lock);
34                 partial_send = 1;
35         } else {                
36                 usocklnd_destroy_tx(peer->up_ni, tx);
37                 /* NB: lnetmsg was finalized, so we *must* return 0 */
38
39                 if (rc < 0) { /* real error */                       
40                         usocklnd_conn_kill(conn);
41                         return 0;
42                 }
43                 
44                 /* rc == 1: tx was sent completely */
45                 rc = 0; /* let's say to caller 'Ok' */
46                 //counter_imm_complete++;
47         }
48
49         pthread_mutex_lock(&conn->uc_lock);
50         conn->uc_sending = 0;               
51
52         /* schedule write handler */
53         if (partial_send ||
54             (conn->uc_state == UC_READY &&
55              (!list_empty(&conn->uc_tx_list) ||
56               !list_empty(&conn->uc_zcack_list)))) {
57                 conn->uc_tx_deadline = 
58                         cfs_time_shift(usock_tuns.ut_timeout);
59                 conn->uc_tx_flag = 1;
60                 rc2 = usocklnd_add_pollrequest(conn, POLL_TX_SET_REQUEST, POLLOUT);
61                 if (rc2 != 0)
62                         usocklnd_conn_kill_locked(conn);
63                 else
64                         usocklnd_wakeup_pollthread(conn->uc_pt_idx);
65         }
66
67         pthread_mutex_unlock(&conn->uc_lock);
68
69         return rc;
70 }
71
72 int
73 usocklnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
74 {
75         usock_tx_t       *tx;
76         lnet_process_id_t target = lntmsg->msg_target;
77         usock_peer_t     *peer;
78         int               type;
79         int               rc;
80         usock_conn_t     *conn;
81         int               send_immediately;
82         
83         tx = usocklnd_create_tx(lntmsg);
84         if (tx == NULL)
85                 return -ENOMEM;
86         
87         rc = usocklnd_find_or_create_peer(ni, target, &peer);
88         if (rc) {
89                 LIBCFS_FREE (tx, tx->tx_size);
90                 return rc;
91         }                
92         /* peer cannot disappear now because its refcount was incremented */
93
94         type = usocklnd_get_conn_type(lntmsg);
95         rc = usocklnd_find_or_create_conn(peer, type, &conn, tx, NULL,
96                                           &send_immediately);
97         if (rc != 0) {
98                 usocklnd_peer_decref(peer);
99                 usocklnd_check_peer_stale(ni, target);
100                 LIBCFS_FREE (tx, tx->tx_size);
101                 return rc;
102         }
103         /* conn cannot disappear now because its refcount was incremented */
104
105         if (send_immediately)
106                 rc = usocklnd_send_tx_immediately(conn, tx);
107
108         usocklnd_conn_decref(conn);
109         usocklnd_peer_decref(peer);
110         return rc;
111 }
112
113 int
114 usocklnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg, int delayed,
115               unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
116               unsigned int offset, unsigned int mlen, unsigned int rlen)
117 {
118         int           rc   = 0;
119         usock_conn_t *conn = (usock_conn_t *)private;
120
121         /* I don't think that we'll win much concurrency moving lock()
122          * call below lnet_extract_iov() */
123         pthread_mutex_lock(&conn->uc_lock);
124         
125         conn->uc_rx_lnetmsg = msg;
126         conn->uc_rx_nob_wanted = mlen;
127         conn->uc_rx_nob_left = rlen;
128         conn->uc_rx_iov = conn->uc_rx_iova;
129         conn->uc_rx_niov =
130                 lnet_extract_iov(LNET_MAX_IOV, conn->uc_rx_iov,
131                                  niov, iov, offset, mlen);
132
133         /* the gap between lnet_parse() and usocklnd_recv() happened? */
134         if (conn->uc_rx_state == UC_RX_PARSE_WAIT) {
135                 conn->uc_rx_flag = 1; /* waiting for incoming lnet payload */
136                 conn->uc_rx_deadline = 
137                         cfs_time_shift(usock_tuns.ut_timeout);                
138                 rc = usocklnd_add_pollrequest(conn, POLL_RX_SET_REQUEST, POLLIN);
139                 if (rc != 0) {
140                         usocklnd_conn_kill_locked(conn);
141                         goto recv_out;
142                 }
143                 usocklnd_wakeup_pollthread(conn->uc_pt_idx);
144         }
145
146         conn->uc_rx_state = UC_RX_LNET_PAYLOAD;
147   recv_out:        
148         pthread_mutex_unlock(&conn->uc_lock);
149         usocklnd_conn_decref(conn);
150         return rc;
151 }
152
153 int
154 usocklnd_accept(lnet_ni_t *ni, int sock_fd)
155 {
156         int           rc;
157         usock_conn_t *conn;
158         
159         rc = usocklnd_create_passive_conn(ni, sock_fd, &conn);
160         if (rc)
161                 return rc;
162         LASSERT(conn != NULL);
163         
164         /* disable shutdown event temporarily */
165         lnet_ni_addref(ni);
166
167         rc = usocklnd_add_pollrequest(conn, POLL_ADD_REQUEST, POLLIN);
168         if (rc == 0)
169                 usocklnd_wakeup_pollthread(conn->uc_pt_idx);
170
171         /* NB: conn reference counter was incremented while adding
172          * poll request if rc == 0 */
173         
174         usocklnd_conn_decref(conn); /* should destroy conn if rc != 0 */
175         return rc;
176 }