Whamcloud - gitweb
- added a 'dying' head to fix very bad bug in yesterday's request code
[fs/lustre-release.git] / lustre / ptlrpc / client.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  *
21  */
22
23 #define EXPORT_SYMTAB
24
25 #define DEBUG_SUBSYSTEM S_RPC
26
27 #include <linux/lustre_ha.h>
28
29 void ptlrpc_init_client(struct recovd_obd *recovd, 
30                         void (*recover)(struct ptlrpc_client *recover),
31                         int req_portal,
32                         int rep_portal, struct ptlrpc_client *cl)
33 {
34         memset(cl, 0, sizeof(*cl));
35         cl->cli_recovd = recovd;
36         cl->cli_recover = recover;
37         if (recovd)
38                 recovd_cli_manage(recovd, cl);
39         cl->cli_obd = NULL;
40         cl->cli_request_portal = req_portal;
41         cl->cli_reply_portal = rep_portal;
42         INIT_LIST_HEAD(&cl->cli_sending_head);
43         INIT_LIST_HEAD(&cl->cli_sent_head);
44         INIT_LIST_HEAD(&cl->cli_replied_head);
45         INIT_LIST_HEAD(&cl->cli_replay_head);
46         INIT_LIST_HEAD(&cl->cli_dying_head);
47         spin_lock_init(&cl->cli_lock);
48         sema_init(&cl->cli_rpc_sem, 32);
49 }
50
51 __u8 *ptlrpc_req_to_uuid(struct ptlrpc_request *req)
52 {
53         return req->rq_connection->c_remote_uuid;
54 }
55
56 struct ptlrpc_connection *ptlrpc_uuid_to_connection(char *uuid)
57 {
58         struct ptlrpc_connection *c;
59         struct lustre_peer peer;
60         int err;
61
62         err = kportal_uuid_to_peer(uuid, &peer);
63         if (err != 0) {
64                 CERROR("cannot find peer %s!\n", uuid);
65                 return NULL;
66         }
67
68         c = ptlrpc_get_connection(&peer);
69         if (c)
70                 c->c_epoch++;
71
72         return c;
73 }
74
75 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk(struct ptlrpc_connection *conn)
76 {
77         struct ptlrpc_bulk_desc *bulk;
78
79         OBD_ALLOC(bulk, sizeof(*bulk));
80         if (bulk != NULL) {
81                 bulk->b_connection = ptlrpc_connection_addref(conn);
82                 init_waitqueue_head(&bulk->b_waitq);
83         }
84
85         return bulk;
86 }
87
88 void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *bulk)
89 {
90         ENTRY;
91         if (bulk == NULL) {
92                 EXIT;
93                 return;
94         }
95
96         ptlrpc_put_connection(bulk->b_connection);
97
98         OBD_FREE(bulk, sizeof(*bulk));
99         EXIT;
100 }
101
102 struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl,
103                                        struct ptlrpc_connection *conn,
104                                        int opcode, int count, int *lengths,
105                                        char **bufs)
106 {
107         struct ptlrpc_request *request;
108         int rc;
109         ENTRY;
110
111         OBD_ALLOC(request, sizeof(*request));
112         if (!request) {
113                 CERROR("request allocation out of memory\n");
114                 RETURN(NULL);
115         }
116
117         rc = lustre_pack_msg(count, lengths, bufs,
118                              &request->rq_reqlen, &request->rq_reqmsg);
119         if (rc) {
120                 CERROR("cannot pack request %d\n", rc);
121                 RETURN(NULL);
122         }
123
124         request->rq_type = PTL_RPC_TYPE_REQUEST;
125         request->rq_connection = ptlrpc_connection_addref(conn);
126
127         request->rq_reqmsg->conn = (__u64)(unsigned long)conn->c_remote_conn;
128         request->rq_reqmsg->token = conn->c_remote_token;
129         request->rq_reqmsg->opc = HTON__u32(opcode);
130         request->rq_reqmsg->type = HTON__u32(PTL_RPC_MSG_REQUEST);
131         INIT_LIST_HEAD(&request->rq_list);
132
133         /* this will be dec()d once in req_finished, once in free_committed */
134         atomic_set(&request->rq_refcount, 2);
135
136         spin_lock(&conn->c_lock);
137         request->rq_reqmsg->xid = HTON__u32(++conn->c_xid_out);
138         spin_unlock(&conn->c_lock);
139
140         request->rq_client = cl;
141
142         RETURN(request);
143 }
144
145 void ptlrpc_req_finished(struct ptlrpc_request *request)
146 {
147         if (request == NULL)
148                 return;
149
150         if (request->rq_repmsg != NULL) { 
151                 OBD_FREE(request->rq_repmsg, request->rq_replen);
152                 request->rq_repmsg = NULL;
153         }
154
155         if (atomic_dec_and_test(&request->rq_refcount))
156                 ptlrpc_free_req(request);
157 }
158
159 void ptlrpc_free_req(struct ptlrpc_request *request)
160 {
161         if (request == NULL)
162                 return;
163
164         if (request->rq_repmsg != NULL)
165                 OBD_FREE(request->rq_repmsg, request->rq_replen);
166         if (request->rq_reqmsg != NULL)
167                 OBD_FREE(request->rq_reqmsg, request->rq_reqlen);
168
169         if (request->rq_client) {
170                 spin_lock(&request->rq_client->cli_lock);
171                 list_del(&request->rq_list);
172                 spin_unlock(&request->rq_client->cli_lock);
173         }
174
175         ptlrpc_put_connection(request->rq_connection);
176
177         OBD_FREE(request, sizeof(*request));
178 }
179
180 static int ptlrpc_check_reply(struct ptlrpc_request *req)
181 {
182         int rc = 0;
183
184         if (req->rq_repmsg != NULL) {
185                 req->rq_transno = NTOH__u64(req->rq_repmsg->transno);
186                 req->rq_flags |= PTL_RPC_FL_REPLY;
187                 GOTO(out, rc = 1);
188         }
189
190         if (req->rq_flags & PTL_RPC_FL_RESEND) { 
191                 CERROR("-- RESEND --\n");
192                 req->rq_status = -EAGAIN;
193                 GOTO(out, rc = 1);
194         }
195
196         if (CURRENT_TIME - req->rq_time >= req->rq_timeout) {
197                 CERROR("-- REQ TIMEOUT --\n");
198                 /* clear the timeout */
199                 req->rq_timeout = 0;
200                 req->rq_flags |= PTL_RPC_FL_TIMEOUT;
201                 if (req->rq_client && req->rq_client->cli_recovd)
202                         recovd_cli_fail(req->rq_client);
203                 GOTO(out, rc = 0);
204         }
205
206         if (req->rq_timeout) { 
207                 schedule_timeout(req->rq_timeout * HZ);
208         }
209
210         if (sigismember(&(current->pending.signal), SIGKILL) ||
211             sigismember(&(current->pending.signal), SIGTERM) ||
212             sigismember(&(current->pending.signal), SIGINT)) {
213                 req->rq_flags |= PTL_RPC_FL_INTR;
214                 GOTO(out, rc = 1);
215         }
216
217  out:
218         return rc;
219 }
220
221 int ptlrpc_check_status(struct ptlrpc_request *req, int err)
222 {
223         ENTRY;
224
225         if (err != 0) {
226                 CERROR("err is %d\n", err);
227                 RETURN(err);
228         }
229
230         if (req == NULL) {
231                 CERROR("req == NULL\n");
232                 RETURN(-ENOMEM);
233         }
234
235         if (req->rq_repmsg == NULL) {
236                 CERROR("req->rq_repmsg == NULL\n");
237                 RETURN(-ENOMEM);
238         }
239
240         if (req->rq_repmsg->type == NTOH__u32(PTL_RPC_MSG_ERR)) {
241                 CERROR("req->rq_repmsg->type == PTL_RPC_MSG_ERR\n");
242                 RETURN(-EINVAL);
243         }
244
245         if (req->rq_repmsg->status != 0) {
246                 CERROR("req->rq_repmsg->status is %d\n",
247                        req->rq_repmsg->status);
248                 /* XXX: translate this error from net to host */
249                 RETURN(req->rq_repmsg->status);
250         }
251
252         RETURN(0);
253 }
254
255 static void ptlrpc_cleanup_request_buf(struct ptlrpc_request *request)
256 {
257         OBD_FREE(request->rq_reqmsg, request->rq_reqlen);
258         request->rq_reqmsg = NULL;
259         request->rq_reqlen = 0;
260 }
261
262 /* Abort this request and cleanup any resources associated with it. */
263 static int ptlrpc_abort(struct ptlrpc_request *request)
264 {
265         /* First remove the ME for the reply; in theory, this means
266          * that we can tear down the buffer safely. */
267         PtlMEUnlink(request->rq_reply_me_h);
268         OBD_FREE(request->rq_reply_md.start, request->rq_replen);
269         request->rq_repmsg = NULL;
270         request->rq_replen = 0;
271         return 0;
272 }
273
274 /* caller must lock cli */
275 void ptlrpc_free_committed(struct ptlrpc_client *cli)
276 {
277         struct list_head *tmp, *saved;
278         struct ptlrpc_request *req;
279
280         list_for_each_safe(tmp, saved, &cli->cli_replied_head) {
281                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
282
283                 /* not yet committed */ 
284                 if (req->rq_transno > cli->cli_last_committed)
285                         break; 
286
287                 /* retain for replay if flagged */
288                 list_del(&req->rq_list);
289                 if (req->rq_flags & PTL_RPC_FL_RETAIN) {
290                         list_add(&req->rq_list, &cli->cli_replay_head);
291                 } else {
292                         CDEBUG(D_INFO, "Marking request %p as committed ("
293                                "transno=%Lu, last_committed=%Lu\n", req,
294                                req->rq_transno, cli->cli_last_committed);
295                         if (atomic_dec_and_test(&req->rq_refcount))
296                                 ptlrpc_free_req(req);
297                         else
298                                 list_add(&req->rq_list, &cli->cli_dying_head);
299                 }
300         }
301
302         EXIT;
303         return;
304 }
305
306 void ptlrpc_cleanup_client(struct ptlrpc_client *cli)
307 {
308         struct list_head *tmp, *saved;
309         struct ptlrpc_request *req;
310         ENTRY;
311
312         spin_lock(&cli->cli_lock);
313         list_for_each_safe(tmp, saved, &cli->cli_replied_head) {
314                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
315                 /* We do this to prevent ptlrpc_free_req from taking cli_lock */
316                 CDEBUG(D_INFO, "Cleaning req %p from replied list.\n", req);
317                 list_del(&req->rq_list);
318                 req->rq_client = NULL;
319                 ptlrpc_free_req(req); 
320         }
321         list_for_each_safe(tmp, saved, &cli->cli_sent_head) {
322                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
323                 CDEBUG(D_INFO, "Cleaning req %p from sent list.\n", req);
324                 list_del(&req->rq_list);
325                 req->rq_client = NULL;
326                 ptlrpc_free_req(req); 
327         }
328         list_for_each_safe(tmp, saved, &cli->cli_replay_head) {
329                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
330                 CERROR("Request %p is on the replay list at cleanup!\n", req);
331                 list_del(&req->rq_list);
332                 req->rq_client = NULL;
333                 ptlrpc_free_req(req); 
334         }
335         list_for_each_safe(tmp, saved, &cli->cli_sending_head) {
336                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
337                 CDEBUG(D_INFO, "Cleaning req %p from sending list.\n", req);
338                 list_del(&req->rq_list);
339                 req->rq_client = NULL;
340                 ptlrpc_free_req(req); 
341         }
342         list_for_each_safe(tmp, saved, &cli->cli_dying_head) {
343                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
344                 CERROR("Request %p is on the dying list at cleanup!\n", req);
345                 list_del(&req->rq_list);
346                 req->rq_client = NULL;
347                 ptlrpc_free_req(req); 
348         }
349         spin_unlock(&cli->cli_lock);
350         EXIT;
351         return;
352 }
353
354 int ptlrpc_queue_wait(struct ptlrpc_request *req)
355 {
356         int rc = 0;
357         ENTRY;
358
359         init_waitqueue_head(&req->rq_wait_for_rep);
360  resend:
361         req->rq_time = CURRENT_TIME;
362         req->rq_timeout = 3;
363         rc = ptl_send_rpc(req);
364         if (rc) {
365                 CERROR("error %d, opcode %d\n", rc, req->rq_reqmsg->opc);
366                 ptlrpc_cleanup_request_buf(req);
367                 up(&req->rq_client->cli_rpc_sem);
368                 RETURN(-rc);
369         }
370
371         CDEBUG(D_OTHER, "-- sleeping\n");
372         wait_event_interruptible(req->rq_wait_for_rep, ptlrpc_check_reply(req));
373         CDEBUG(D_OTHER, "-- done\n");
374
375         if (req->rq_flags & PTL_RPC_FL_RESEND) {
376                 req->rq_flags &= ~PTL_RPC_FL_RESEND;
377                 goto resend;
378         }
379
380         //ptlrpc_cleanup_request_buf(req);
381         up(&req->rq_client->cli_rpc_sem);
382         if (req->rq_flags & PTL_RPC_FL_INTR) {
383                 /* Clean up the dangling reply buffers */
384                 ptlrpc_abort(req);
385                 GOTO(out, rc = -EINTR);
386         }
387
388         if (! (req->rq_flags & PTL_RPC_FL_REPLY)) {
389                 CERROR("Unknown reason for wakeup\n");
390                 /* XXX Phil - I end up here when I kill obdctl */
391                 ptlrpc_abort(req);
392                 GOTO(out, rc = -EINTR);
393         }
394
395         rc = lustre_unpack_msg(req->rq_repmsg, req->rq_replen);
396         if (rc) {
397                 CERROR("unpack_rep failed: %d\n", rc);
398                 GOTO(out, rc);
399         }
400         CDEBUG(D_NET, "got rep %d\n", req->rq_repmsg->xid);
401         if (req->rq_repmsg->status == 0)
402                 CDEBUG(D_NET, "--> buf %p len %d status %d\n", req->rq_repmsg,
403                        req->rq_replen, req->rq_repmsg->status);
404
405         spin_lock(&req->rq_client->cli_lock);
406         /* add to the tail of the replied head */
407         list_del(&req->rq_list);
408         list_add(&req->rq_list, req->rq_client->cli_replied_head.prev); 
409
410         req->rq_client->cli_last_rcvd = req->rq_repmsg->last_rcvd;
411         req->rq_client->cli_last_committed = req->rq_repmsg->last_committed;
412         ptlrpc_free_committed(req->rq_client); 
413         spin_unlock(&req->rq_client->cli_lock);
414
415         EXIT;
416  out:
417         return rc;
418 }