Whamcloud - gitweb
Add version information to lustre-HOWTO.
[fs/lustre-release.git] / lustre / ptlrpc / client.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2002 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  *
21  */
22
23 #define DEBUG_SUBSYSTEM S_RPC
24
25 #include <linux/obd_support.h>
26 #include <linux/lustre_lib.h>
27 #include <linux/lustre_ha.h>
28
29 void ptlrpc_init_client(struct recovd_obd *recovd, 
30                         int (*recover)(struct ptlrpc_client *recover),
31                         int req_portal,
32                         int rep_portal, struct ptlrpc_client *cl)
33 {
34         memset(cl, 0, sizeof(*cl));
35         cl->cli_recovd = recovd;
36         cl->cli_recover = recover;
37         if (recovd)
38                 recovd_cli_manage(recovd, cl);
39         cl->cli_obd = NULL;
40         cl->cli_request_portal = req_portal;
41         cl->cli_reply_portal = rep_portal;
42         INIT_LIST_HEAD(&cl->cli_delayed_head);
43         INIT_LIST_HEAD(&cl->cli_sending_head);
44         INIT_LIST_HEAD(&cl->cli_dying_head);
45         spin_lock_init(&cl->cli_lock);
46         sema_init(&cl->cli_rpc_sem, 32);
47 }
48
49 __u8 *ptlrpc_req_to_uuid(struct ptlrpc_request *req)
50 {
51         return req->rq_connection->c_remote_uuid;
52 }
53
54 struct ptlrpc_connection *ptlrpc_uuid_to_connection(char *uuid)
55 {
56         struct ptlrpc_connection *c;
57         struct lustre_peer peer;
58         int err;
59
60         err = kportal_uuid_to_peer(uuid, &peer);
61         if (err != 0) {
62                 CERROR("cannot find peer %s!\n", uuid);
63                 return NULL;
64         }
65
66         c = ptlrpc_get_connection(&peer);
67         if (c) { 
68                 memcpy(c->c_remote_uuid, uuid, sizeof(c->c_remote_uuid));
69                 c->c_epoch++;
70         }
71
72         return c;
73 }
74
75 void ptlrpc_readdress_connection(struct ptlrpc_connection *conn, char *uuid)
76 {
77         struct lustre_peer peer;
78         int err;
79
80         err = kportal_uuid_to_peer(uuid, &peer);
81         if (err != 0) {
82                 CERROR("cannot find peer %s!\n", uuid);
83                 return;
84         }
85         
86         memcpy(&conn->c_peer, &peer, sizeof(peer)); 
87         return;
88 }
89
90 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk(struct ptlrpc_connection *conn)
91 {
92         struct ptlrpc_bulk_desc *bulk;
93
94         OBD_ALLOC(bulk, sizeof(*bulk));
95         if (bulk != NULL) {
96                 bulk->b_connection = ptlrpc_connection_addref(conn);
97                 atomic_set(&bulk->b_pages_remaining, 0);
98                 init_waitqueue_head(&bulk->b_waitq);
99                 INIT_LIST_HEAD(&bulk->b_page_list);
100         }
101
102         return bulk;
103 }
104
105 struct ptlrpc_bulk_page *ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc)
106 {
107         struct ptlrpc_bulk_page *bulk;
108
109         OBD_ALLOC(bulk, sizeof(*bulk));
110         if (bulk != NULL) {
111                 bulk->b_desc = desc;
112                 ptl_set_inv_handle(&bulk->b_md_h);
113                 ptl_set_inv_handle(&bulk->b_me_h);
114                 list_add_tail(&bulk->b_link, &desc->b_page_list);
115                 desc->b_page_count++;
116                 atomic_inc(&desc->b_pages_remaining);
117         }
118         return bulk;
119 }
120
121 void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc)
122 {
123         struct list_head *tmp, *next;
124         ENTRY;
125         if (desc == NULL) {
126                 EXIT;
127                 return;
128         }
129
130         list_for_each_safe(tmp, next, &desc->b_page_list) {
131                 struct ptlrpc_bulk_page *bulk;
132                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
133                 ptlrpc_free_bulk_page(bulk);
134         }
135
136         ptlrpc_put_connection(desc->b_connection);
137
138         OBD_FREE(desc, sizeof(*desc));
139         EXIT;
140 }
141
142 void ptlrpc_free_bulk_page(struct ptlrpc_bulk_page *bulk)
143 {
144         ENTRY;
145         if (bulk == NULL) {
146                 EXIT;
147                 return;
148         }
149
150         list_del(&bulk->b_link);
151         bulk->b_desc->b_page_count--;
152         OBD_FREE(bulk, sizeof(*bulk));
153         EXIT;
154 }
155
156 struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl,
157                                        struct ptlrpc_connection *conn,
158                                        int opcode, int count, int *lengths,
159                                        char **bufs)
160 {
161         struct ptlrpc_request *request;
162         int rc;
163         ENTRY;
164
165         OBD_ALLOC(request, sizeof(*request));
166         if (!request) {
167                 CERROR("request allocation out of memory\n");
168                 RETURN(NULL);
169         }
170
171         rc = lustre_pack_msg(count, lengths, bufs,
172                              &request->rq_reqlen, &request->rq_reqmsg);
173         if (rc) {
174                 CERROR("cannot pack request %d\n", rc);
175                 OBD_FREE(request, sizeof(*request));
176                 RETURN(NULL);
177         }
178
179         request->rq_level = LUSTRE_CONN_FULL;
180         request->rq_type = PTL_RPC_TYPE_REQUEST;
181         request->rq_client = cl;
182         request->rq_connection = ptlrpc_connection_addref(conn);
183
184         INIT_LIST_HEAD(&request->rq_list);
185         INIT_LIST_HEAD(&request->rq_multi);
186         /* this will be dec()d once in req_finished, once in free_committed */
187         atomic_set(&request->rq_refcount, 2);
188
189         spin_lock(&conn->c_lock);
190         request->rq_xid = HTON__u32(++conn->c_xid_out);
191         spin_unlock(&conn->c_lock);
192
193         request->rq_reqmsg->magic = PTLRPC_MSG_MAGIC; 
194         request->rq_reqmsg->version = PTLRPC_MSG_VERSION;
195         request->rq_reqmsg->opc = HTON__u32(opcode);
196         request->rq_reqmsg->type = HTON__u32(PTL_RPC_MSG_REQUEST);
197
198         RETURN(request);
199 }
200 struct ptlrpc_request *ptlrpc_prep_req2(struct ptlrpc_client *cl,
201                                         struct ptlrpc_connection *conn,
202                                         struct lustre_handle *handle, 
203                                        int opcode, int count, int *lengths,
204                                        char **bufs)
205 {
206         struct ptlrpc_request *req;
207         req = ptlrpc_prep_req(cl, conn, opcode, count, lengths, bufs);
208         ptlrpc_hdl2req(req, handle);
209         return req;
210 }
211
212 void ptlrpc_req_finished(struct ptlrpc_request *request)
213 {
214         if (request == NULL)
215                 return;
216
217         if (request->rq_repmsg != NULL) { 
218                 OBD_FREE(request->rq_repmsg, request->rq_replen);
219                 request->rq_repmsg = NULL;
220                 request->rq_reply_md.start = NULL; 
221         }
222
223         if (atomic_dec_and_test(&request->rq_refcount))
224                 ptlrpc_free_req(request);
225 }
226
227 void ptlrpc_free_req(struct ptlrpc_request *request)
228 {
229         ENTRY;
230         if (request == NULL) {
231                 EXIT;
232                 return;
233         }
234
235         if (request->rq_repmsg != NULL)
236                 OBD_FREE(request->rq_repmsg, request->rq_replen);
237         if (request->rq_reqmsg != NULL)
238                 OBD_FREE(request->rq_reqmsg, request->rq_reqlen);
239
240         if (request->rq_client) {
241                 spin_lock(&request->rq_client->cli_lock);
242                 list_del_init(&request->rq_list);
243                 spin_unlock(&request->rq_client->cli_lock);
244         }
245
246         ptlrpc_put_connection(request->rq_connection);
247         list_del(&request->rq_multi);
248         OBD_FREE(request, sizeof(*request));
249         EXIT;
250 }
251
252 static int ptlrpc_check_reply(struct ptlrpc_request *req)
253 {
254         int rc = 0;
255
256         if (req->rq_repmsg != NULL) {
257                 req->rq_transno = NTOH__u64(req->rq_repmsg->transno);
258                 req->rq_flags |= PTL_RPC_FL_REPLIED;
259                 GOTO(out, rc = 1);
260         }
261
262         if (req->rq_flags & PTL_RPC_FL_RESEND) { 
263                 CERROR("-- RESEND --\n");
264                 GOTO(out, rc = 1);
265         }
266
267         if (req->rq_flags & PTL_RPC_FL_RECOVERY) { 
268                 CERROR("-- RESTART --\n");
269                 GOTO(out, rc = 1);
270         }
271
272
273         if (CURRENT_TIME - req->rq_time >= req->rq_timeout) {
274                 CERROR("-- REQ TIMEOUT ON CONNID %d XID %Ld --\n",
275                        req->rq_connid, (unsigned long long)req->rq_xid);
276                 /* clear the timeout */
277                 req->rq_timeout = 0;
278                 req->rq_connection->c_level = LUSTRE_CONN_RECOVD;
279                 req->rq_flags |= PTL_RPC_FL_TIMEOUT;
280                 if (req->rq_client && req->rq_client->cli_recovd)
281                         recovd_cli_fail(req->rq_client);
282                 if (req->rq_level < LUSTRE_CONN_FULL)
283                         rc = 1;
284                 else
285                         rc = 0;
286                 GOTO(out, rc);
287         }
288
289         if (req->rq_timeout) { 
290                 schedule_timeout(req->rq_timeout * HZ);
291         }
292
293         if (l_killable_pending(current)) {
294                 req->rq_flags |= PTL_RPC_FL_INTR;
295                 GOTO(out, rc = 1);
296         }
297
298  out:
299         CDEBUG(D_NET, "req = %p, rc = %d\n", req, rc);
300         return rc;
301 }
302
303 int ptlrpc_check_status(struct ptlrpc_request *req, int err)
304 {
305         ENTRY;
306
307         if (err != 0) {
308                 CERROR("err is %d\n", err);
309                 RETURN(err);
310         }
311
312         if (req == NULL) {
313                 CERROR("req == NULL\n");
314                 RETURN(-ENOMEM);
315         }
316
317         if (req->rq_repmsg == NULL) {
318                 CERROR("req->rq_repmsg == NULL\n");
319                 RETURN(-ENOMEM);
320         }
321
322         if (req->rq_repmsg->type == NTOH__u32(PTL_RPC_MSG_ERR)) {
323                 CERROR("req->rq_repmsg->type == PTL_RPC_MSG_ERR\n");
324                 RETURN(-EINVAL);
325         }
326
327         if (req->rq_repmsg->status != 0) {
328                 if (req->rq_repmsg->status < 0)
329                         CERROR("req->rq_repmsg->status is %d\n",
330                                req->rq_repmsg->status);
331                 else
332                         CDEBUG(D_INFO, "req->rq_repmsg->status is %d\n",
333                                req->rq_repmsg->status);
334                 /* XXX: translate this error from net to host */
335                 RETURN(req->rq_repmsg->status);
336         }
337
338         RETURN(0);
339 }
340
341 static void ptlrpc_cleanup_request_buf(struct ptlrpc_request *request)
342 {
343         OBD_FREE(request->rq_reqmsg, request->rq_reqlen);
344         request->rq_reqmsg = NULL;
345         request->rq_reqlen = 0;
346 }
347
348 /* Abort this request and cleanup any resources associated with it. */
349 static int ptlrpc_abort(struct ptlrpc_request *request)
350 {
351         /* First remove the ME for the reply; in theory, this means
352          * that we can tear down the buffer safely. */
353         PtlMEUnlink(request->rq_reply_me_h);
354         OBD_FREE(request->rq_reply_md.start, request->rq_replen);
355         request->rq_repmsg = NULL;
356         request->rq_replen = 0;
357         return 0;
358 }
359
360 /* caller must lock cli */
361 void ptlrpc_free_committed(struct ptlrpc_client *cli)
362 {
363         struct list_head *tmp, *saved;
364         struct ptlrpc_request *req;
365
366         list_for_each_safe(tmp, saved, &cli->cli_sending_head) {
367                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
368
369                 if ( (req->rq_flags & PTL_RPC_FL_REPLAY) ) { 
370                         CDEBUG(D_INFO, "Retaining request %Ld for replay\n",
371                                req->rq_xid);
372                         continue;
373                 }
374
375                 /* not yet committed */
376                 if (req->rq_transno > cli->cli_last_committed)
377                         break;
378
379                 CDEBUG(D_INFO, "Marking request %Ld as committed ("
380                        "transno=%Lu, last_committed=%Lu\n", 
381                        req->rq_xid, req->rq_transno, 
382                        cli->cli_last_committed);
383                 if (atomic_dec_and_test(&req->rq_refcount)) {
384                         /* we do this to prevent free_req deadlock */
385                         list_del_init(&req->rq_list); 
386                         req->rq_client = NULL;
387                         ptlrpc_free_req(req);
388                 } else {
389                         list_del_init(&req->rq_list);
390                         list_add(&req->rq_list, &cli->cli_dying_head);
391                 }
392         }
393
394         EXIT;
395         return;
396 }
397
398 void ptlrpc_cleanup_client(struct ptlrpc_client *cli)
399 {
400         struct list_head *tmp, *saved;
401         struct ptlrpc_request *req;
402         ENTRY;
403
404         spin_lock(&cli->cli_lock);
405         list_for_each_safe(tmp, saved, &cli->cli_sending_head) {
406                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
407                 CDEBUG(D_INFO, "Cleaning req %p from sending list.\n", req);
408                 list_del_init(&req->rq_list);
409                 req->rq_client = NULL;
410                 ptlrpc_free_req(req); 
411         }
412         list_for_each_safe(tmp, saved, &cli->cli_dying_head) {
413                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
414                 CERROR("Request %p is on the dying list at cleanup!\n", req);
415                 list_del_init(&req->rq_list);
416                 req->rq_client = NULL;
417                 ptlrpc_free_req(req); 
418         }
419         spin_unlock(&cli->cli_lock);
420
421         EXIT;
422         return;
423 }
424
425 void ptlrpc_continue_req(struct ptlrpc_request *req)
426 {
427         ENTRY;
428         CDEBUG(D_INODE, "continue delayed request %Ld opc %d\n", 
429                req->rq_xid, req->rq_reqmsg->opc); 
430         wake_up(&req->rq_wait_for_rep); 
431         EXIT;
432 }
433
434 void ptlrpc_resend_req(struct ptlrpc_request *req)
435 {
436         ENTRY;
437         CDEBUG(D_INODE, "resend request %Ld, opc %d\n", 
438                req->rq_xid, req->rq_reqmsg->opc);
439         req->rq_status = -EAGAIN;
440         req->rq_level = LUSTRE_CONN_RECOVD;
441         req->rq_flags |= PTL_RPC_FL_RESEND;
442         req->rq_flags &= ~PTL_RPC_FL_TIMEOUT;
443         wake_up(&req->rq_wait_for_rep);
444         EXIT;
445 }
446
447 void ptlrpc_restart_req(struct ptlrpc_request *req)
448 {
449         ENTRY;
450         CDEBUG(D_INODE, "restart completed request %Ld, opc %d\n", 
451                req->rq_xid, req->rq_reqmsg->opc);
452         req->rq_status = -ERESTARTSYS;
453         req->rq_flags |= PTL_RPC_FL_RECOVERY;
454         req->rq_flags &= ~PTL_RPC_FL_TIMEOUT;
455         wake_up(&req->rq_wait_for_rep);
456         EXIT;
457 }
458
459 int ptlrpc_queue_wait(struct ptlrpc_request *req)
460 {
461         int rc = 0;
462         struct ptlrpc_client *cli = req->rq_client;
463         ENTRY;
464
465         init_waitqueue_head(&req->rq_wait_for_rep);
466         CDEBUG(D_NET, "subsys: %s req %Ld opc %d level %d, conn level %d\n",
467                cli->cli_name, req->rq_xid, req->rq_reqmsg->opc, req->rq_level,
468                req->rq_connection->c_level);
469
470         /* XXX probably both an import and connection level are needed */
471         if (req->rq_level > req->rq_connection->c_level) { 
472                 CERROR("process %d waiting for recovery (%d > %d)\n", 
473                        current->pid, req->rq_level, req->rq_connection->c_level);
474                 spin_lock(&cli->cli_lock);
475                 list_del_init(&req->rq_list);
476                 list_add(&req->rq_list, cli->cli_delayed_head.prev); 
477                 spin_unlock(&cli->cli_lock);
478                 l_wait_event_killable
479                         (req->rq_wait_for_rep, 
480                          req->rq_level <= req->rq_connection->c_level);
481                 spin_lock(&cli->cli_lock);
482                 list_del_init(&req->rq_list);
483                 spin_unlock(&cli->cli_lock);
484                 CERROR("process %d resumed\n", current->pid);
485         }
486  resend:
487         req->rq_time = CURRENT_TIME;
488         req->rq_timeout = 100;
489         rc = ptl_send_rpc(req);
490         if (rc) {
491                 CERROR("error %d, opcode %d\n", rc, req->rq_reqmsg->opc);
492                 if ( rc > 0 ) 
493                         rc = -rc;
494                 ptlrpc_cleanup_request_buf(req);
495                 up(&cli->cli_rpc_sem);
496                 RETURN(-rc);
497         }
498
499         spin_lock(&cli->cli_lock);
500         list_del_init(&req->rq_list);
501         list_add_tail(&req->rq_list, &cli->cli_sending_head);
502         spin_unlock(&cli->cli_lock);
503
504         CDEBUG(D_OTHER, "-- sleeping\n");
505         l_wait_event_killable(req->rq_wait_for_rep, ptlrpc_check_reply(req));
506         CDEBUG(D_OTHER, "-- done\n");
507
508         if (req->rq_flags & PTL_RPC_FL_RESEND) {
509                 req->rq_flags &= ~PTL_RPC_FL_RESEND;
510                 goto resend;
511         }
512
513         up(&cli->cli_rpc_sem);
514         if (req->rq_flags & PTL_RPC_FL_TIMEOUT)
515                 GOTO(out, rc = -ETIMEDOUT);
516
517         if (req->rq_flags & PTL_RPC_FL_INTR) {
518                 /* Clean up the dangling reply buffers */
519                 ptlrpc_abort(req);
520                 GOTO(out, rc = -EINTR);
521         }
522
523         if (!(req->rq_flags & PTL_RPC_FL_REPLIED))
524                 GOTO(out, rc = req->rq_status);
525
526         rc = lustre_unpack_msg(req->rq_repmsg, req->rq_replen);
527         if (rc) {
528                 CERROR("unpack_rep failed: %d\n", rc);
529                 GOTO(out, rc);
530         }
531         CDEBUG(D_NET, "got rep %Ld\n", req->rq_xid);
532         if (req->rq_repmsg->status == 0)
533                 CDEBUG(D_NET, "--> buf %p len %d status %d\n", req->rq_repmsg,
534                        req->rq_replen, req->rq_repmsg->status);
535
536         spin_lock(&cli->cli_lock);
537         cli->cli_last_rcvd = req->rq_repmsg->last_rcvd;
538         cli->cli_last_committed = req->rq_repmsg->last_committed;
539         ptlrpc_free_committed(cli); 
540         spin_unlock(&cli->cli_lock);
541
542         EXIT;
543  out:
544         return rc;
545 }
546
547 int ptlrpc_replay_req(struct ptlrpc_request *req)
548 {
549         int rc = 0;
550         struct ptlrpc_client *cli = req->rq_client;
551         ENTRY;
552
553         init_waitqueue_head(&req->rq_wait_for_rep);
554         CDEBUG(D_NET, "req %Ld opc %d level %d, conn level %d\n",
555                req->rq_xid, req->rq_reqmsg->opc, req->rq_level,
556                req->rq_connection->c_level);
557
558         req->rq_time = CURRENT_TIME;
559         req->rq_timeout = 100;
560         rc = ptl_send_rpc(req);
561         if (rc) {
562                 CERROR("error %d, opcode %d\n", rc, req->rq_reqmsg->opc);
563                 ptlrpc_cleanup_request_buf(req);
564                 up(&cli->cli_rpc_sem);
565                 RETURN(-rc);
566         }
567
568         CDEBUG(D_OTHER, "-- sleeping\n");
569         l_wait_event_killable(req->rq_wait_for_rep, ptlrpc_check_reply(req));
570         CDEBUG(D_OTHER, "-- done\n");
571
572         up(&cli->cli_rpc_sem);
573
574         if (!(req->rq_flags & PTL_RPC_FL_REPLIED)) {
575                 CERROR("Unknown reason for wakeup\n");
576                 /* XXX Phil - I end up here when I kill obdctl */
577                 ptlrpc_abort(req);
578                 GOTO(out, rc = -EINTR);
579         }
580
581         rc = lustre_unpack_msg(req->rq_repmsg, req->rq_replen);
582         if (rc) {
583                 CERROR("unpack_rep failed: %d\n", rc);
584                 GOTO(out, rc);
585         }
586
587         CDEBUG(D_NET, "got rep %Ld\n", req->rq_xid);
588         if (req->rq_repmsg->status == 0)
589                 CDEBUG(D_NET, "--> buf %p len %d status %d\n", req->rq_repmsg,
590                        req->rq_replen, req->rq_repmsg->status);
591         else {
592                 CERROR("recovery failed: "); 
593                 CERROR("req %Ld opc %d level %d, conn level %d\n", 
594                        req->rq_xid, req->rq_reqmsg->opc, req->rq_level,
595                        req->rq_connection->c_level);
596                 LBUG();
597         }
598
599  out:
600         RETURN(rc);
601 }