Whamcloud - gitweb
add ldlm
[fs/lustre-release.git] / lustre / ptlrpc / client.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  *
21  */
22
23 #define DEBUG_SUBSYSTEM S_RPC
24
25 #include <linux/obd_support.h>
26 #include <linux/lustre_lib.h>
27 #include <linux/lustre_ha.h>
28
29 void ptlrpc_init_client(struct recovd_obd *recovd, 
30                         int (*recover)(struct ptlrpc_client *recover),
31                         int req_portal,
32                         int rep_portal, struct ptlrpc_client *cl)
33 {
34         memset(cl, 0, sizeof(*cl));
35         cl->cli_recovd = recovd;
36         cl->cli_recover = recover;
37         if (recovd)
38                 recovd_cli_manage(recovd, cl);
39         cl->cli_obd = NULL;
40         cl->cli_request_portal = req_portal;
41         cl->cli_reply_portal = rep_portal;
42         INIT_LIST_HEAD(&cl->cli_delayed_head);
43         INIT_LIST_HEAD(&cl->cli_sending_head);
44         INIT_LIST_HEAD(&cl->cli_dying_head);
45         spin_lock_init(&cl->cli_lock);
46         sema_init(&cl->cli_rpc_sem, 32);
47 }
48
49 __u8 *ptlrpc_req_to_uuid(struct ptlrpc_request *req)
50 {
51         return req->rq_connection->c_remote_uuid;
52 }
53
54 struct ptlrpc_connection *ptlrpc_uuid_to_connection(char *uuid)
55 {
56         struct ptlrpc_connection *c;
57         struct lustre_peer peer;
58         int err;
59
60         err = kportal_uuid_to_peer(uuid, &peer);
61         if (err != 0) {
62                 CERROR("cannot find peer %s!\n", uuid);
63                 return NULL;
64         }
65
66         c = ptlrpc_get_connection(&peer);
67         if (c) { 
68                 memcpy(c->c_remote_uuid, uuid, sizeof(c->c_remote_uuid));
69                 c->c_epoch++;
70         }
71
72         return c;
73 }
74
75 void ptlrpc_readdress_connection(struct ptlrpc_connection *conn, char *uuid)
76 {
77         struct lustre_peer peer;
78         int err;
79
80         err = kportal_uuid_to_peer(uuid, &peer);
81         if (err != 0) {
82                 CERROR("cannot find peer %s!\n", uuid);
83                 return;
84         }
85         
86         memcpy(&conn->c_peer, &peer, sizeof(peer)); 
87         return;
88 }
89
90 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk(struct ptlrpc_connection *conn)
91 {
92         struct ptlrpc_bulk_desc *desc;
93
94         OBD_ALLOC(desc, sizeof(*desc));
95         if (desc != NULL) {
96                 desc->b_connection = ptlrpc_connection_addref(conn);
97                 atomic_set(&desc->b_pages_remaining, 0);
98                 atomic_set(&desc->b_refcount, 1);
99                 init_waitqueue_head(&desc->b_waitq);
100                 INIT_LIST_HEAD(&desc->b_page_list);
101         }
102
103         return desc;
104 }
105
106 struct ptlrpc_bulk_page *ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc)
107 {
108         struct ptlrpc_bulk_page *bulk;
109
110         OBD_ALLOC(bulk, sizeof(*bulk));
111         if (bulk != NULL) {
112                 bulk->b_desc = desc;
113                 ptl_set_inv_handle(&bulk->b_md_h);
114                 ptl_set_inv_handle(&bulk->b_me_h);
115                 list_add_tail(&bulk->b_link, &desc->b_page_list);
116                 desc->b_page_count++;
117                 atomic_inc(&desc->b_pages_remaining);
118         }
119         return bulk;
120 }
121
122 void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc)
123 {
124         struct list_head *tmp, *next;
125         ENTRY;
126         if (desc == NULL) {
127                 EXIT;
128                 return;
129         }
130
131         list_for_each_safe(tmp, next, &desc->b_page_list) {
132                 struct ptlrpc_bulk_page *bulk;
133                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
134                 ptlrpc_free_bulk_page(bulk);
135         }
136
137         ptlrpc_put_connection(desc->b_connection);
138
139         OBD_FREE(desc, sizeof(*desc));
140         EXIT;
141 }
142
143 void ptlrpc_free_bulk_page(struct ptlrpc_bulk_page *bulk)
144 {
145         ENTRY;
146         if (bulk == NULL) {
147                 EXIT;
148                 return;
149         }
150
151         list_del(&bulk->b_link);
152         bulk->b_desc->b_page_count--;
153         OBD_FREE(bulk, sizeof(*bulk));
154         EXIT;
155 }
156
157 struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl,
158                                        struct ptlrpc_connection *conn,
159                                        int opcode, int count, int *lengths,
160                                        char **bufs)
161 {
162         struct ptlrpc_request *request;
163         int rc;
164         ENTRY;
165
166         OBD_ALLOC(request, sizeof(*request));
167         if (!request) {
168                 CERROR("request allocation out of memory\n");
169                 RETURN(NULL);
170         }
171
172         rc = lustre_pack_msg(count, lengths, bufs,
173                              &request->rq_reqlen, &request->rq_reqmsg);
174         if (rc) {
175                 CERROR("cannot pack request %d\n", rc);
176                 OBD_FREE(request, sizeof(*request));
177                 RETURN(NULL);
178         }
179
180         request->rq_level = LUSTRE_CONN_FULL;
181         request->rq_type = PTL_RPC_TYPE_REQUEST;
182         request->rq_client = cl;
183         request->rq_connection = ptlrpc_connection_addref(conn);
184
185         INIT_LIST_HEAD(&request->rq_list);
186         INIT_LIST_HEAD(&request->rq_multi);
187         /* this will be dec()d once in req_finished, once in free_committed */
188         atomic_set(&request->rq_refcount, 2);
189
190         spin_lock(&conn->c_lock);
191         request->rq_xid = HTON__u32(++conn->c_xid_out);
192         spin_unlock(&conn->c_lock);
193
194         request->rq_reqmsg->magic = PTLRPC_MSG_MAGIC; 
195         request->rq_reqmsg->version = PTLRPC_MSG_VERSION;
196         request->rq_reqmsg->opc = HTON__u32(opcode);
197         request->rq_reqmsg->type = HTON__u32(PTL_RPC_MSG_REQUEST);
198
199         RETURN(request);
200 }
201 struct ptlrpc_request *ptlrpc_prep_req2(struct ptlrpc_client *cl,
202                                         struct ptlrpc_connection *conn,
203                                         struct lustre_handle *handle, 
204                                        int opcode, int count, int *lengths,
205                                        char **bufs)
206 {
207         struct ptlrpc_request *req;
208         req = ptlrpc_prep_req(cl, conn, opcode, count, lengths, bufs);
209         ptlrpc_hdl2req(req, handle);
210         return req;
211 }
212
213 void ptlrpc_req_finished(struct ptlrpc_request *request)
214 {
215         if (request == NULL)
216                 return;
217
218         if (request->rq_repmsg != NULL) { 
219                 OBD_FREE(request->rq_repmsg, request->rq_replen);
220                 request->rq_repmsg = NULL;
221                 request->rq_reply_md.start = NULL; 
222         }
223
224         if (atomic_dec_and_test(&request->rq_refcount))
225                 ptlrpc_free_req(request);
226 }
227
228 void ptlrpc_free_req(struct ptlrpc_request *request)
229 {
230         ENTRY;
231         if (request == NULL) {
232                 EXIT;
233                 return;
234         }
235
236         if (request->rq_repmsg != NULL)
237                 OBD_FREE(request->rq_repmsg, request->rq_replen);
238         if (request->rq_reqmsg != NULL)
239                 OBD_FREE(request->rq_reqmsg, request->rq_reqlen);
240
241         if (request->rq_client) {
242                 spin_lock(&request->rq_client->cli_lock);
243                 list_del_init(&request->rq_list);
244                 spin_unlock(&request->rq_client->cli_lock);
245         }
246
247         ptlrpc_put_connection(request->rq_connection);
248         list_del(&request->rq_multi);
249         OBD_FREE(request, sizeof(*request));
250         EXIT;
251 }
252
253 static int ptlrpc_check_reply(struct ptlrpc_request *req)
254 {
255         int rc = 0;
256
257         if (req->rq_repmsg != NULL) {
258                 req->rq_transno = NTOH__u64(req->rq_repmsg->transno);
259                 req->rq_flags |= PTL_RPC_FL_REPLIED;
260                 GOTO(out, rc = 1);
261         }
262
263         if (req->rq_flags & PTL_RPC_FL_RESEND) { 
264                 CERROR("-- RESEND --\n");
265                 GOTO(out, rc = 1);
266         }
267
268         if (req->rq_flags & PTL_RPC_FL_RECOVERY) { 
269                 CERROR("-- RESTART --\n");
270                 GOTO(out, rc = 1);
271         }
272
273
274         if (CURRENT_TIME - req->rq_time >= req->rq_timeout) {
275                 CERROR("-- REQ TIMEOUT ON CONNID %d XID %Ld --\n",
276                        req->rq_connid, (unsigned long long)req->rq_xid);
277                 /* clear the timeout */
278                 req->rq_timeout = 0;
279                 req->rq_connection->c_level = LUSTRE_CONN_RECOVD;
280                 req->rq_flags |= PTL_RPC_FL_TIMEOUT;
281                 if (req->rq_client && req->rq_client->cli_recovd)
282                         recovd_cli_fail(req->rq_client);
283                 if (req->rq_level < LUSTRE_CONN_FULL)
284                         rc = 1;
285                 else
286                         rc = 0;
287                 GOTO(out, rc);
288         }
289
290         if (req->rq_timeout) { 
291                 schedule_timeout(req->rq_timeout * HZ);
292         }
293
294         if (l_killable_pending(current)) {
295                 req->rq_flags |= PTL_RPC_FL_INTR;
296                 GOTO(out, rc = 1);
297         }
298
299  out:
300         CDEBUG(D_NET, "req = %p, rc = %d\n", req, rc);
301         return rc;
302 }
303
304 int ptlrpc_check_status(struct ptlrpc_request *req, int err)
305 {
306         ENTRY;
307
308         if (err != 0) {
309                 CERROR("err is %d\n", err);
310                 RETURN(err);
311         }
312
313         if (req == NULL) {
314                 CERROR("req == NULL\n");
315                 RETURN(-ENOMEM);
316         }
317
318         if (req->rq_repmsg == NULL) {
319                 CERROR("req->rq_repmsg == NULL\n");
320                 RETURN(-ENOMEM);
321         }
322
323         if (req->rq_repmsg->type == NTOH__u32(PTL_RPC_MSG_ERR)) {
324                 CERROR("req->rq_repmsg->type == PTL_RPC_MSG_ERR\n");
325                 RETURN(-EINVAL);
326         }
327
328         if (req->rq_repmsg->status != 0) {
329                 if (req->rq_repmsg->status < 0)
330                         CERROR("req->rq_repmsg->status is %d\n",
331                                req->rq_repmsg->status);
332                 else
333                         CDEBUG(D_INFO, "req->rq_repmsg->status is %d\n",
334                                req->rq_repmsg->status);
335                 /* XXX: translate this error from net to host */
336                 RETURN(req->rq_repmsg->status);
337         }
338
339         RETURN(0);
340 }
341
342 static void ptlrpc_cleanup_request_buf(struct ptlrpc_request *request)
343 {
344         OBD_FREE(request->rq_reqmsg, request->rq_reqlen);
345         request->rq_reqmsg = NULL;
346         request->rq_reqlen = 0;
347 }
348
349 /* Abort this request and cleanup any resources associated with it. */
350 static int ptlrpc_abort(struct ptlrpc_request *request)
351 {
352         /* First remove the ME for the reply; in theory, this means
353          * that we can tear down the buffer safely. */
354         PtlMEUnlink(request->rq_reply_me_h);
355         OBD_FREE(request->rq_reply_md.start, request->rq_replen);
356         request->rq_repmsg = NULL;
357         request->rq_replen = 0;
358         return 0;
359 }
360
361 /* caller must lock cli */
362 void ptlrpc_free_committed(struct ptlrpc_client *cli)
363 {
364         struct list_head *tmp, *saved;
365         struct ptlrpc_request *req;
366
367         list_for_each_safe(tmp, saved, &cli->cli_sending_head) {
368                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
369
370                 if ( (req->rq_flags & PTL_RPC_FL_REPLAY) ) { 
371                         CDEBUG(D_INFO, "Retaining request %Ld for replay\n",
372                                req->rq_xid);
373                         continue;
374                 }
375
376                 /* not yet committed */
377                 if (req->rq_transno > cli->cli_last_committed)
378                         break;
379
380                 CDEBUG(D_INFO, "Marking request %Ld as committed ("
381                        "transno=%Lu, last_committed=%Lu\n", 
382                        req->rq_xid, req->rq_transno, 
383                        cli->cli_last_committed);
384                 if (atomic_dec_and_test(&req->rq_refcount)) {
385                         /* we do this to prevent free_req deadlock */
386                         list_del_init(&req->rq_list); 
387                         req->rq_client = NULL;
388                         ptlrpc_free_req(req);
389                 } else {
390                         list_del_init(&req->rq_list);
391                         list_add(&req->rq_list, &cli->cli_dying_head);
392                 }
393         }
394
395         EXIT;
396         return;
397 }
398
399 void ptlrpc_cleanup_client(struct ptlrpc_client *cli)
400 {
401         struct list_head *tmp, *saved;
402         struct ptlrpc_request *req;
403         ENTRY;
404
405         spin_lock(&cli->cli_lock);
406         list_for_each_safe(tmp, saved, &cli->cli_sending_head) {
407                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
408                 CDEBUG(D_INFO, "Cleaning req %p from sending list.\n", req);
409                 list_del_init(&req->rq_list);
410                 req->rq_client = NULL;
411                 ptlrpc_free_req(req); 
412         }
413         list_for_each_safe(tmp, saved, &cli->cli_dying_head) {
414                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
415                 CERROR("Request %p is on the dying list at cleanup!\n", req);
416                 list_del_init(&req->rq_list);
417                 req->rq_client = NULL;
418                 ptlrpc_free_req(req); 
419         }
420         spin_unlock(&cli->cli_lock);
421
422         EXIT;
423         return;
424 }
425
426 void ptlrpc_continue_req(struct ptlrpc_request *req)
427 {
428         ENTRY;
429         CDEBUG(D_INODE, "continue delayed request %Ld opc %d\n", 
430                req->rq_xid, req->rq_reqmsg->opc); 
431         wake_up(&req->rq_wait_for_rep); 
432         EXIT;
433 }
434
435 void ptlrpc_resend_req(struct ptlrpc_request *req)
436 {
437         ENTRY;
438         CDEBUG(D_INODE, "resend request %Ld, opc %d\n", 
439                req->rq_xid, req->rq_reqmsg->opc);
440         req->rq_status = -EAGAIN;
441         req->rq_level = LUSTRE_CONN_RECOVD;
442         req->rq_flags |= PTL_RPC_FL_RESEND;
443         req->rq_flags &= ~PTL_RPC_FL_TIMEOUT;
444         wake_up(&req->rq_wait_for_rep);
445         EXIT;
446 }
447
448 void ptlrpc_restart_req(struct ptlrpc_request *req)
449 {
450         ENTRY;
451         CDEBUG(D_INODE, "restart completed request %Ld, opc %d\n", 
452                req->rq_xid, req->rq_reqmsg->opc);
453         req->rq_status = -ERESTARTSYS;
454         req->rq_flags |= PTL_RPC_FL_RECOVERY;
455         req->rq_flags &= ~PTL_RPC_FL_TIMEOUT;
456         wake_up(&req->rq_wait_for_rep);
457         EXIT;
458 }
459
460 int ptlrpc_queue_wait(struct ptlrpc_request *req)
461 {
462         int rc = 0;
463         struct ptlrpc_client *cli = req->rq_client;
464         ENTRY;
465
466         init_waitqueue_head(&req->rq_wait_for_rep);
467         CDEBUG(D_NET, "subsys: %s req %Ld opc %d level %d, conn level %d\n",
468                cli->cli_name, req->rq_xid, req->rq_reqmsg->opc, req->rq_level,
469                req->rq_connection->c_level);
470
471         /* XXX probably both an import and connection level are needed */
472         if (req->rq_level > req->rq_connection->c_level) { 
473                 CERROR("process %d waiting for recovery (%d > %d)\n", 
474                        current->pid, req->rq_level, req->rq_connection->c_level);
475                 spin_lock(&cli->cli_lock);
476                 list_del_init(&req->rq_list);
477                 list_add(&req->rq_list, cli->cli_delayed_head.prev); 
478                 spin_unlock(&cli->cli_lock);
479                 l_wait_event_killable
480                         (req->rq_wait_for_rep, 
481                          req->rq_level <= req->rq_connection->c_level);
482                 spin_lock(&cli->cli_lock);
483                 list_del_init(&req->rq_list);
484                 spin_unlock(&cli->cli_lock);
485                 CERROR("process %d resumed\n", current->pid);
486         }
487  resend:
488         req->rq_time = CURRENT_TIME;
489         req->rq_timeout = 100;
490         rc = ptl_send_rpc(req);
491         if (rc) {
492                 CERROR("error %d, opcode %d\n", rc, req->rq_reqmsg->opc);
493                 if ( rc > 0 ) 
494                         rc = -rc;
495                 ptlrpc_cleanup_request_buf(req);
496                 up(&cli->cli_rpc_sem);
497                 RETURN(-rc);
498         }
499
500         spin_lock(&cli->cli_lock);
501         list_del_init(&req->rq_list);
502         list_add_tail(&req->rq_list, &cli->cli_sending_head);
503         spin_unlock(&cli->cli_lock);
504
505         CDEBUG(D_OTHER, "-- sleeping\n");
506         l_wait_event_killable(req->rq_wait_for_rep, ptlrpc_check_reply(req));
507         CDEBUG(D_OTHER, "-- done\n");
508
509         if (req->rq_flags & PTL_RPC_FL_RESEND) {
510                 req->rq_flags &= ~PTL_RPC_FL_RESEND;
511                 goto resend;
512         }
513
514         up(&cli->cli_rpc_sem);
515         if (req->rq_flags & PTL_RPC_FL_TIMEOUT)
516                 GOTO(out, rc = -ETIMEDOUT);
517
518         if (req->rq_flags & PTL_RPC_FL_INTR) {
519                 /* Clean up the dangling reply buffers */
520                 ptlrpc_abort(req);
521                 GOTO(out, rc = -EINTR);
522         }
523
524         if (!(req->rq_flags & PTL_RPC_FL_REPLIED))
525                 GOTO(out, rc = req->rq_status);
526
527         rc = lustre_unpack_msg(req->rq_repmsg, req->rq_replen);
528         if (rc) {
529                 CERROR("unpack_rep failed: %d\n", rc);
530                 GOTO(out, rc);
531         }
532         CDEBUG(D_NET, "got rep %Ld\n", req->rq_xid);
533         if (req->rq_repmsg->status == 0)
534                 CDEBUG(D_NET, "--> buf %p len %d status %d\n", req->rq_repmsg,
535                        req->rq_replen, req->rq_repmsg->status);
536
537         spin_lock(&cli->cli_lock);
538         cli->cli_last_rcvd = req->rq_repmsg->last_rcvd;
539         cli->cli_last_committed = req->rq_repmsg->last_committed;
540         ptlrpc_free_committed(cli); 
541         spin_unlock(&cli->cli_lock);
542
543         EXIT;
544  out:
545         return rc;
546 }
547
548 int ptlrpc_replay_req(struct ptlrpc_request *req)
549 {
550         int rc = 0;
551         struct ptlrpc_client *cli = req->rq_client;
552         ENTRY;
553
554         init_waitqueue_head(&req->rq_wait_for_rep);
555         CDEBUG(D_NET, "req %Ld opc %d level %d, conn level %d\n",
556                req->rq_xid, req->rq_reqmsg->opc, req->rq_level,
557                req->rq_connection->c_level);
558
559         req->rq_time = CURRENT_TIME;
560         req->rq_timeout = 100;
561         rc = ptl_send_rpc(req);
562         if (rc) {
563                 CERROR("error %d, opcode %d\n", rc, req->rq_reqmsg->opc);
564                 ptlrpc_cleanup_request_buf(req);
565                 up(&cli->cli_rpc_sem);
566                 RETURN(-rc);
567         }
568
569         CDEBUG(D_OTHER, "-- sleeping\n");
570         l_wait_event_killable(req->rq_wait_for_rep, ptlrpc_check_reply(req));
571         CDEBUG(D_OTHER, "-- done\n");
572
573         up(&cli->cli_rpc_sem);
574
575         if (!(req->rq_flags & PTL_RPC_FL_REPLIED)) {
576                 CERROR("Unknown reason for wakeup\n");
577                 /* XXX Phil - I end up here when I kill obdctl */
578                 ptlrpc_abort(req);
579                 GOTO(out, rc = -EINTR);
580         }
581
582         rc = lustre_unpack_msg(req->rq_repmsg, req->rq_replen);
583         if (rc) {
584                 CERROR("unpack_rep failed: %d\n", rc);
585                 GOTO(out, rc);
586         }
587
588         CDEBUG(D_NET, "got rep %Ld\n", req->rq_xid);
589         if (req->rq_repmsg->status == 0)
590                 CDEBUG(D_NET, "--> buf %p len %d status %d\n", req->rq_repmsg,
591                        req->rq_replen, req->rq_repmsg->status);
592         else {
593                 CERROR("recovery failed: "); 
594                 CERROR("req %Ld opc %d level %d, conn level %d\n", 
595                        req->rq_xid, req->rq_reqmsg->opc, req->rq_level,
596                        req->rq_connection->c_level);
597                 LBUG();
598         }
599
600  out:
601         RETURN(rc);
602 }