Whamcloud - gitweb
b135afe0b1d871789d31825e580b1979acb78e9d
[fs/lustre-release.git] / lustre / ptlrpc / client.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  *
21  */
22
23 #define DEBUG_SUBSYSTEM S_RPC
24
25 #include <linux/obd_support.h>
26 #include <linux/lustre_lib.h>
27 #include <linux/lustre_ha.h>
28
29 void ptlrpc_init_client(struct recovd_obd *recovd, 
30                         int (*recover)(struct ptlrpc_client *recover),
31                         int req_portal,
32                         int rep_portal, struct ptlrpc_client *cl)
33 {
34         memset(cl, 0, sizeof(*cl));
35         cl->cli_recovd = recovd;
36         cl->cli_recover = recover;
37         if (recovd)
38                 recovd_cli_manage(recovd, cl);
39         cl->cli_obd = NULL;
40         cl->cli_request_portal = req_portal;
41         cl->cli_reply_portal = rep_portal;
42         INIT_LIST_HEAD(&cl->cli_delayed_head);
43         INIT_LIST_HEAD(&cl->cli_sending_head);
44         INIT_LIST_HEAD(&cl->cli_dying_head);
45         spin_lock_init(&cl->cli_lock);
46         sema_init(&cl->cli_rpc_sem, 32);
47 }
48
49 __u8 *ptlrpc_req_to_uuid(struct ptlrpc_request *req)
50 {
51         return req->rq_connection->c_remote_uuid;
52 }
53
54 struct ptlrpc_connection *ptlrpc_uuid_to_connection(char *uuid)
55 {
56         struct ptlrpc_connection *c;
57         struct lustre_peer peer;
58         int err;
59
60         err = kportal_uuid_to_peer(uuid, &peer);
61         if (err != 0) {
62                 CERROR("cannot find peer %s!\n", uuid);
63                 return NULL;
64         }
65
66         c = ptlrpc_get_connection(&peer);
67         if (c) { 
68                 memcpy(c->c_remote_uuid, uuid, sizeof(c->c_remote_uuid));
69                 c->c_epoch++;
70         }
71
72         return c;
73 }
74
75 void ptlrpc_readdress_connection(struct ptlrpc_connection *conn, char *uuid)
76 {
77         struct lustre_peer peer;
78         int err;
79
80         err = kportal_uuid_to_peer(uuid, &peer);
81         if (err != 0) {
82                 CERROR("cannot find peer %s!\n", uuid);
83                 return;
84         }
85         
86         memcpy(&conn->c_peer, &peer, sizeof(peer)); 
87         return;
88 }
89
90 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk(struct ptlrpc_connection *conn)
91 {
92         struct ptlrpc_bulk_desc *desc;
93
94         OBD_ALLOC(desc, sizeof(*desc));
95         if (desc != NULL) {
96                 desc->b_connection = ptlrpc_connection_addref(conn);
97                 atomic_set(&desc->b_pages_remaining, 0);
98                 atomic_set(&desc->b_refcount, 1);
99                 init_waitqueue_head(&desc->b_waitq);
100                 INIT_LIST_HEAD(&desc->b_page_list);
101         }
102
103         return desc;
104 }
105
106 struct ptlrpc_bulk_page *ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc)
107 {
108         struct ptlrpc_bulk_page *bulk;
109
110         OBD_ALLOC(bulk, sizeof(*bulk));
111         if (bulk != NULL) {
112                 bulk->b_desc = desc;
113                 ptl_set_inv_handle(&bulk->b_md_h);
114                 ptl_set_inv_handle(&bulk->b_me_h);
115                 list_add_tail(&bulk->b_link, &desc->b_page_list);
116                 desc->b_page_count++;
117                 atomic_inc(&desc->b_pages_remaining);
118         }
119         return bulk;
120 }
121
122 void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc)
123 {
124         struct list_head *tmp, *next;
125         ENTRY;
126         if (desc == NULL) {
127                 EXIT;
128                 return;
129         }
130
131         list_for_each_safe(tmp, next, &desc->b_page_list) {
132                 struct ptlrpc_bulk_page *bulk;
133                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
134                 ptlrpc_free_bulk_page(bulk);
135         }
136
137         ptlrpc_put_connection(desc->b_connection);
138
139         OBD_FREE(desc, sizeof(*desc));
140         EXIT;
141 }
142
143 void ptlrpc_free_bulk_page(struct ptlrpc_bulk_page *bulk)
144 {
145         ENTRY;
146         if (bulk == NULL) {
147                 EXIT;
148                 return;
149         }
150
151         list_del(&bulk->b_link);
152         bulk->b_desc->b_page_count--;
153         OBD_FREE(bulk, sizeof(*bulk));
154         EXIT;
155 }
156
157 struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl,
158                                        struct ptlrpc_connection *conn,
159                                        int opcode, int count, int *lengths,
160                                        char **bufs)
161 {
162         struct ptlrpc_request *request;
163         int rc;
164         ENTRY;
165
166         OBD_ALLOC(request, sizeof(*request));
167         if (!request) {
168                 CERROR("request allocation out of memory\n");
169                 RETURN(NULL);
170         }
171
172         rc = lustre_pack_msg(count, lengths, bufs,
173                              &request->rq_reqlen, &request->rq_reqmsg);
174         if (rc) {
175                 CERROR("cannot pack request %d\n", rc);
176                 OBD_FREE(request, sizeof(*request));
177                 RETURN(NULL);
178         }
179
180         request->rq_level = LUSTRE_CONN_FULL;
181         request->rq_type = PTL_RPC_TYPE_REQUEST;
182         request->rq_client = cl;
183         request->rq_connection = ptlrpc_connection_addref(conn);
184
185         INIT_LIST_HEAD(&request->rq_list);
186         INIT_LIST_HEAD(&request->rq_multi);
187         /* this will be dec()d once in req_finished, once in free_committed */
188         atomic_set(&request->rq_refcount, 2);
189
190         spin_lock(&conn->c_lock);
191         request->rq_xid = HTON__u32(++conn->c_xid_out);
192         spin_unlock(&conn->c_lock);
193
194         request->rq_reqmsg->magic = PTLRPC_MSG_MAGIC; 
195         request->rq_reqmsg->version = PTLRPC_MSG_VERSION;
196         request->rq_reqmsg->opc = HTON__u32(opcode);
197         request->rq_reqmsg->type = HTON__u32(PTL_RPC_MSG_REQUEST);
198
199         RETURN(request);
200 }
201 struct ptlrpc_request *ptlrpc_prep_req2(struct ptlrpc_client *cl,
202                                         struct ptlrpc_connection *conn,
203                                         struct lustre_handle *handle, 
204                                        int opcode, int count, int *lengths,
205                                        char **bufs)
206 {
207         struct ptlrpc_request *req;
208         req = ptlrpc_prep_req(cl, conn, opcode, count, lengths, bufs);
209         ptlrpc_hdl2req(req, handle);
210         return req;
211 }
212
213 void ptlrpc_req_finished(struct ptlrpc_request *request)
214 {
215         if (request == NULL)
216                 return;
217
218         if (request->rq_repmsg != NULL) { 
219                 OBD_FREE(request->rq_repmsg, request->rq_replen);
220                 request->rq_repmsg = NULL;
221                 request->rq_reply_md.start = NULL; 
222         }
223
224         if (atomic_dec_and_test(&request->rq_refcount))
225                 ptlrpc_free_req(request);
226 }
227
228 void ptlrpc_free_req(struct ptlrpc_request *request)
229 {
230         ENTRY;
231         if (request == NULL) {
232                 EXIT;
233                 return;
234         }
235
236         if (request->rq_repmsg != NULL)
237                 OBD_FREE(request->rq_repmsg, request->rq_replen);
238         if (request->rq_reqmsg != NULL)
239                 OBD_FREE(request->rq_reqmsg, request->rq_reqlen);
240
241         if (request->rq_client) {
242                 spin_lock(&request->rq_client->cli_lock);
243                 list_del_init(&request->rq_list);
244                 spin_unlock(&request->rq_client->cli_lock);
245         }
246
247         ptlrpc_put_connection(request->rq_connection);
248         list_del(&request->rq_multi);
249         OBD_FREE(request, sizeof(*request));
250         EXIT;
251 }
252
253 static int ptlrpc_check_reply(struct ptlrpc_request *req)
254 {
255         int rc = 0;
256
257         if (req->rq_repmsg != NULL) {
258                 req->rq_transno = NTOH__u64(req->rq_repmsg->transno);
259                 req->rq_flags |= PTL_RPC_FL_REPLIED;
260                 GOTO(out, rc = 1);
261         }
262
263         if (req->rq_flags & PTL_RPC_FL_RESEND) { 
264                 if (l_killable_pending(current)) {
265                         CERROR("-- INTR --\n");
266                         req->rq_flags |= PTL_RPC_FL_INTR;
267                         GOTO(out, rc = 1);
268                 }
269                 CERROR("-- RESEND --\n");
270                 GOTO(out, rc = 1);
271         }
272
273         if (req->rq_flags & PTL_RPC_FL_RECOVERY) { 
274                 CERROR("-- RESTART --\n");
275                 GOTO(out, rc = 1);
276         }
277
278         if (req->rq_flags & PTL_RPC_FL_TIMEOUT && l_killable_pending(current)) {
279                 req->rq_flags |= PTL_RPC_FL_INTR;
280                 GOTO(out, rc = 1);
281         }
282
283         if (req->rq_timeout &&
284             (CURRENT_TIME - req->rq_time >= req->rq_timeout)) {
285                 CERROR("-- REQ TIMEOUT ON CONNID %d XID %Ld --\n",
286                        req->rq_connid, (unsigned long long)req->rq_xid);
287                 /* clear the timeout */
288                 req->rq_timeout = 0;
289                 req->rq_connection->c_level = LUSTRE_CONN_RECOVD;
290                 req->rq_flags |= PTL_RPC_FL_TIMEOUT;
291                 if (req->rq_client && req->rq_client->cli_recovd)
292                         recovd_cli_fail(req->rq_client);
293                 if (req->rq_level < LUSTRE_CONN_FULL) {
294                         rc = 1;
295                 } else if (l_killable_pending(current)) {
296                         req->rq_flags |= PTL_RPC_FL_INTR;
297                         rc = 1;
298                 } else {
299                         rc = 0;
300                 }
301                 GOTO(out, rc);
302         }
303
304  out:
305         CDEBUG(D_NET, "req = %p, rc = %d\n", req, rc);
306         return rc;
307 }
308
309 int ptlrpc_check_status(struct ptlrpc_request *req, int err)
310 {
311         ENTRY;
312
313         if (err != 0) {
314                 CERROR("err is %d\n", err);
315                 RETURN(err);
316         }
317
318         if (req == NULL) {
319                 CERROR("req == NULL\n");
320                 RETURN(-ENOMEM);
321         }
322
323         if (req->rq_repmsg == NULL) {
324                 CERROR("req->rq_repmsg == NULL\n");
325                 RETURN(-ENOMEM);
326         }
327
328         if (req->rq_repmsg->type == NTOH__u32(PTL_RPC_MSG_ERR)) {
329                 CERROR("req->rq_repmsg->type == PTL_RPC_MSG_ERR\n");
330                 RETURN(-EINVAL);
331         }
332
333         if (req->rq_repmsg->status != 0) {
334                 if (req->rq_repmsg->status < 0)
335                         CERROR("req->rq_repmsg->status is %d\n",
336                                req->rq_repmsg->status);
337                 else
338                         CDEBUG(D_INFO, "req->rq_repmsg->status is %d\n",
339                                req->rq_repmsg->status);
340                 /* XXX: translate this error from net to host */
341                 RETURN(req->rq_repmsg->status);
342         }
343
344         RETURN(0);
345 }
346
347 static void ptlrpc_cleanup_request_buf(struct ptlrpc_request *request)
348 {
349         OBD_FREE(request->rq_reqmsg, request->rq_reqlen);
350         request->rq_reqmsg = NULL;
351         request->rq_reqlen = 0;
352 }
353
354 /* Abort this request and cleanup any resources associated with it. */
355 static int ptlrpc_abort(struct ptlrpc_request *request)
356 {
357         /* First remove the ME for the reply; in theory, this means
358          * that we can tear down the buffer safely. */
359         PtlMEUnlink(request->rq_reply_me_h);
360         OBD_FREE(request->rq_reply_md.start, request->rq_replen);
361         request->rq_repmsg = NULL;
362         request->rq_replen = 0;
363         return 0;
364 }
365
366 /* caller must lock cli */
367 void ptlrpc_free_committed(struct ptlrpc_client *cli)
368 {
369         struct list_head *tmp, *saved;
370         struct ptlrpc_request *req;
371
372         list_for_each_safe(tmp, saved, &cli->cli_sending_head) {
373                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
374
375                 if ( (req->rq_flags & PTL_RPC_FL_REPLAY) ) { 
376                         CDEBUG(D_INFO, "Retaining request %Ld for replay\n",
377                                req->rq_xid);
378                         continue;
379                 }
380
381                 /* not yet committed */
382                 if (req->rq_transno > cli->cli_last_committed)
383                         break;
384
385                 CDEBUG(D_INFO, "Marking request %Ld as committed ("
386                        "transno=%Lu, last_committed=%Lu\n", 
387                        req->rq_xid, req->rq_transno, 
388                        cli->cli_last_committed);
389                 if (atomic_dec_and_test(&req->rq_refcount)) {
390                         /* we do this to prevent free_req deadlock */
391                         list_del_init(&req->rq_list); 
392                         req->rq_client = NULL;
393                         ptlrpc_free_req(req);
394                 } else {
395                         list_del_init(&req->rq_list);
396                         list_add(&req->rq_list, &cli->cli_dying_head);
397                 }
398         }
399
400         EXIT;
401         return;
402 }
403
404 void ptlrpc_cleanup_client(struct ptlrpc_client *cli)
405 {
406         struct list_head *tmp, *saved;
407         struct ptlrpc_request *req;
408         ENTRY;
409
410         spin_lock(&cli->cli_lock);
411         list_for_each_safe(tmp, saved, &cli->cli_sending_head) {
412                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
413                 CDEBUG(D_INFO, "Cleaning req %p from sending list.\n", req);
414                 list_del_init(&req->rq_list);
415                 req->rq_client = NULL;
416                 ptlrpc_free_req(req); 
417         }
418         list_for_each_safe(tmp, saved, &cli->cli_dying_head) {
419                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
420                 CERROR("Request %p is on the dying list at cleanup!\n", req);
421                 list_del_init(&req->rq_list);
422                 req->rq_client = NULL;
423                 ptlrpc_free_req(req); 
424         }
425         spin_unlock(&cli->cli_lock);
426
427         EXIT;
428         return;
429 }
430
431 void ptlrpc_continue_req(struct ptlrpc_request *req)
432 {
433         ENTRY;
434         CDEBUG(D_INODE, "continue delayed request %Ld opc %d\n", 
435                req->rq_xid, req->rq_reqmsg->opc); 
436         wake_up(&req->rq_wait_for_rep); 
437         EXIT;
438 }
439
440 void ptlrpc_resend_req(struct ptlrpc_request *req)
441 {
442         ENTRY;
443         CDEBUG(D_INODE, "resend request %Ld, opc %d\n", 
444                req->rq_xid, req->rq_reqmsg->opc);
445         req->rq_status = -EAGAIN;
446         req->rq_level = LUSTRE_CONN_RECOVD;
447         req->rq_flags |= PTL_RPC_FL_RESEND;
448         req->rq_flags &= ~PTL_RPC_FL_TIMEOUT;
449         wake_up(&req->rq_wait_for_rep);
450         EXIT;
451 }
452
453 void ptlrpc_restart_req(struct ptlrpc_request *req)
454 {
455         ENTRY;
456         CDEBUG(D_INODE, "restart completed request %Ld, opc %d\n", 
457                req->rq_xid, req->rq_reqmsg->opc);
458         req->rq_status = -ERESTARTSYS;
459         req->rq_flags |= PTL_RPC_FL_RECOVERY;
460         req->rq_flags &= ~PTL_RPC_FL_TIMEOUT;
461         wake_up(&req->rq_wait_for_rep);
462         EXIT;
463 }
464
465 int ptlrpc_queue_wait(struct ptlrpc_request *req)
466 {
467         int rc = 0, timeout;
468         struct ptlrpc_client *cli = req->rq_client;
469         ENTRY;
470
471         init_waitqueue_head(&req->rq_wait_for_rep);
472         CDEBUG(D_NET, "subsys: %s req %Ld opc %d level %d, conn level %d\n",
473                cli->cli_name, req->rq_xid, req->rq_reqmsg->opc, req->rq_level,
474                req->rq_connection->c_level);
475
476         /* XXX probably both an import and connection level are needed */
477         if (req->rq_level > req->rq_connection->c_level) { 
478                 CERROR("process %d waiting for recovery (%d > %d)\n", 
479                        current->pid, req->rq_level, req->rq_connection->c_level);
480                 spin_lock(&cli->cli_lock);
481                 list_del_init(&req->rq_list);
482                 list_add(&req->rq_list, cli->cli_delayed_head.prev); 
483                 spin_unlock(&cli->cli_lock);
484                 l_wait_event_killable
485                         (req->rq_wait_for_rep, 
486                          req->rq_level <= req->rq_connection->c_level);
487                 spin_lock(&cli->cli_lock);
488                 list_del_init(&req->rq_list);
489                 spin_unlock(&cli->cli_lock);
490                 CERROR("process %d resumed\n", current->pid);
491         }
492  resend:
493         req->rq_time = CURRENT_TIME;
494         req->rq_timeout = 100;
495         rc = ptl_send_rpc(req);
496         if (rc) {
497                 CERROR("error %d, opcode %d\n", rc, req->rq_reqmsg->opc);
498                 if ( rc > 0 ) 
499                         rc = -rc;
500                 ptlrpc_cleanup_request_buf(req);
501                 up(&cli->cli_rpc_sem);
502                 RETURN(-rc);
503         }
504
505         spin_lock(&cli->cli_lock);
506         list_del_init(&req->rq_list);
507         list_add_tail(&req->rq_list, &cli->cli_sending_head);
508         spin_unlock(&cli->cli_lock);
509
510         CDEBUG(D_OTHER, "-- sleeping\n");
511         /*
512          * req->rq_timeout gets reset in the timeout case, and
513          * l_wait_event_timeout is a macro, so save the timeout value here.
514          */
515         timeout = req->rq_timeout * HZ;
516         l_wait_event_timeout(req->rq_wait_for_rep, ptlrpc_check_reply(req),
517                              timeout);
518         CDEBUG(D_OTHER, "-- done\n");
519
520         if (req->rq_flags & PTL_RPC_FL_RESEND) {
521                 req->rq_flags &= ~PTL_RPC_FL_RESEND;
522                 goto resend;
523         }
524
525         up(&cli->cli_rpc_sem);
526         if (req->rq_flags & PTL_RPC_FL_INTR) {
527                 /* Clean up the dangling reply buffers */
528                 ptlrpc_abort(req);
529                 GOTO(out, rc = -EINTR);
530         }
531
532         if (req->rq_flags & PTL_RPC_FL_TIMEOUT)
533                 GOTO(out, rc = -ETIMEDOUT);
534
535         if (!(req->rq_flags & PTL_RPC_FL_REPLIED))
536                 GOTO(out, rc = req->rq_status);
537
538         rc = lustre_unpack_msg(req->rq_repmsg, req->rq_replen);
539         if (rc) {
540                 CERROR("unpack_rep failed: %d\n", rc);
541                 GOTO(out, rc);
542         }
543         CDEBUG(D_NET, "got rep %Ld\n", req->rq_xid);
544         if (req->rq_repmsg->status == 0)
545                 CDEBUG(D_NET, "--> buf %p len %d status %d\n", req->rq_repmsg,
546                        req->rq_replen, req->rq_repmsg->status);
547
548         spin_lock(&cli->cli_lock);
549         cli->cli_last_rcvd = req->rq_repmsg->last_rcvd;
550         cli->cli_last_committed = req->rq_repmsg->last_committed;
551         ptlrpc_free_committed(cli); 
552         spin_unlock(&cli->cli_lock);
553
554         EXIT;
555  out:
556         return rc;
557 }
558
559 int ptlrpc_replay_req(struct ptlrpc_request *req)
560 {
561         int rc = 0;
562         struct ptlrpc_client *cli = req->rq_client;
563         ENTRY;
564
565         init_waitqueue_head(&req->rq_wait_for_rep);
566         CDEBUG(D_NET, "req %Ld opc %d level %d, conn level %d\n",
567                req->rq_xid, req->rq_reqmsg->opc, req->rq_level,
568                req->rq_connection->c_level);
569
570         req->rq_time = CURRENT_TIME;
571         req->rq_timeout = 100;
572         rc = ptl_send_rpc(req);
573         if (rc) {
574                 CERROR("error %d, opcode %d\n", rc, req->rq_reqmsg->opc);
575                 ptlrpc_cleanup_request_buf(req);
576                 up(&cli->cli_rpc_sem);
577                 RETURN(-rc);
578         }
579
580         CDEBUG(D_OTHER, "-- sleeping\n");
581         l_wait_event_killable(req->rq_wait_for_rep, ptlrpc_check_reply(req));
582         CDEBUG(D_OTHER, "-- done\n");
583
584         up(&cli->cli_rpc_sem);
585
586         if (!(req->rq_flags & PTL_RPC_FL_REPLIED)) {
587                 CERROR("Unknown reason for wakeup\n");
588                 /* XXX Phil - I end up here when I kill obdctl */
589                 ptlrpc_abort(req);
590                 GOTO(out, rc = -EINTR);
591         }
592
593         rc = lustre_unpack_msg(req->rq_repmsg, req->rq_replen);
594         if (rc) {
595                 CERROR("unpack_rep failed: %d\n", rc);
596                 GOTO(out, rc);
597         }
598
599         CDEBUG(D_NET, "got rep %Ld\n", req->rq_xid);
600         if (req->rq_repmsg->status == 0)
601                 CDEBUG(D_NET, "--> buf %p len %d status %d\n", req->rq_repmsg,
602                        req->rq_replen, req->rq_repmsg->status);
603         else {
604                 CERROR("recovery failed: "); 
605                 CERROR("req %Ld opc %d level %d, conn level %d\n", 
606                        req->rq_xid, req->rq_reqmsg->opc, req->rq_level,
607                        req->rq_connection->c_level);
608                 LBUG();
609         }
610
611  out:
612         RETURN(rc);
613 }