Whamcloud - gitweb
land 0.5.20.3 b_devel onto HEAD (b_devel will remain)
[fs/lustre-release.git] / lustre / mdc / mdc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.sf.net/projects/lustre/
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #define EXPORT_SYMTAB
23 #define DEBUG_SUBSYSTEM S_MDC
24
25 #include <linux/module.h>
26 #include <linux/pagemap.h>
27 #include <linux/miscdevice.h>
28 #include <linux/lustre_mds.h>
29 #include <linux/lustre_lite.h>
30 #include <linux/lustre_dlm.h>
31 #include <linux/init.h>
32 #include <linux/lprocfs_status.h>
33
34 #define REQUEST_MINOR 244
35
36 extern int mds_queue_req(struct ptlrpc_request *);
37 struct mdc_rpc_lock mdc_rpc_lock;
38 struct mdc_rpc_lock mdc_setattr_lock;
39 EXPORT_SYMBOL(mdc_rpc_lock);
40
41 /* Helper that implements most of mdc_getstatus and signal_completed_replay. */
42 static int send_getstatus(struct obd_import *imp, struct ll_fid *rootfid,
43                           int level, int msg_flags)
44 {
45         struct ptlrpc_request *req;
46         struct mds_body *body;
47         int rc, size = sizeof(*body);
48         ENTRY;
49
50         req = ptlrpc_prep_req(imp, MDS_GETSTATUS, 1, &size, NULL);
51         if (!req)
52                 GOTO(out, rc = -ENOMEM);
53
54         body = lustre_msg_buf(req->rq_reqmsg, 0);
55         req->rq_level = level;
56         req->rq_replen = lustre_msg_size(1, &size);
57
58         mds_pack_req_body(req);
59         req->rq_reqmsg->flags |= msg_flags;
60         mdc_get_rpc_lock(&mdc_rpc_lock, NULL);
61         rc = ptlrpc_queue_wait(req);
62         mdc_put_rpc_lock(&mdc_rpc_lock, NULL);
63
64         if (!rc) {
65                 body = lustre_msg_buf(req->rq_repmsg, 0);
66                 mds_unpack_body(body);
67                 memcpy(rootfid, &body->fid1, sizeof(*rootfid));
68
69                 CDEBUG(D_NET, "root ino="LPU64", last_committed="LPU64
70                        ", last_xid="LPU64"\n",
71                        rootfid->id, req->rq_repmsg->last_committed,
72                        req->rq_repmsg->last_xid);
73         }
74
75         EXIT;
76  out:
77         ptlrpc_req_finished(req);
78         return rc;
79 }
80
81 /* should become mdc_getinfo() */
82 int mdc_getstatus(struct lustre_handle *conn, struct ll_fid *rootfid)
83 {
84         return send_getstatus(class_conn2cliimp(conn), rootfid, LUSTRE_CONN_CON,
85                               0);
86 }
87
88 int mdc_getlovinfo(struct obd_device *obd, struct lustre_handle *mdc_connh,
89                    struct ptlrpc_request **request)
90 {
91         struct ptlrpc_request *req;
92         struct mds_status_req *streq;
93         int rc, size[2] = {sizeof(*streq)};
94         ENTRY;
95
96         req = ptlrpc_prep_req(class_conn2cliimp(mdc_connh), MDS_GETLOVINFO, 1,
97                               size, NULL);
98         if (!req)
99                 GOTO(out, rc = -ENOMEM);
100
101         *request = req;
102         streq = lustre_msg_buf(req->rq_reqmsg, 0);
103         streq->flags = HTON__u32(MDS_STATUS_LOV);
104         streq->repbuf = HTON__u32(8192);
105
106         /* prepare for reply */
107         req->rq_level = LUSTRE_CONN_CON;
108         size[0] = 512;
109         size[1] = 8192;
110         req->rq_replen = lustre_msg_size(2, size);
111         mdc_get_rpc_lock(&mdc_rpc_lock, NULL);
112         rc = ptlrpc_queue_wait(req);
113         mdc_put_rpc_lock(&mdc_rpc_lock, NULL);
114
115  out:
116         RETURN(rc);
117 }
118
119 int mdc_getattr(struct lustre_handle *conn,
120                 obd_id ino, int type, unsigned long valid, unsigned int ea_size,
121                 struct ptlrpc_request **request)
122 {
123         struct ptlrpc_request *req;
124         struct mds_body *body;
125         int rc, size[2] = {sizeof(*body), 0}, bufcount = 1;
126         ENTRY;
127
128         /* XXX do we need to make another request here?  We just did a getattr
129          *     to do the lookup in the first place.
130          */
131         req = ptlrpc_prep_req(class_conn2cliimp(conn), MDS_GETATTR, 1, size,
132                               NULL);
133         if (!req)
134                 GOTO(out, rc = -ENOMEM);
135
136         body = lustre_msg_buf(req->rq_reqmsg, 0);
137         ll_ino2fid(&body->fid1, ino, 0, type);
138         body->valid = valid;
139
140         if (ea_size) {
141                 size[bufcount] = ea_size;
142                 bufcount++;
143                 body->size = ea_size;
144                 CDEBUG(D_INODE, "reserved %u bytes for MD/symlink in packet\n",
145                        ea_size);
146         }
147         req->rq_replen = lustre_msg_size(bufcount, size);
148         mds_pack_req_body(req);
149
150         mdc_get_rpc_lock(&mdc_rpc_lock, NULL);
151         rc = ptlrpc_queue_wait(req);
152         mdc_put_rpc_lock(&mdc_rpc_lock, NULL);
153         if (!rc) {
154                 body = lustre_msg_buf(req->rq_repmsg, 0);
155                 mds_unpack_body(body);
156                 CDEBUG(D_NET, "mode: %o\n", body->mode);
157         }
158
159         GOTO(out, rc);
160  out:
161         *request = req;
162         return rc;
163 }
164
165 int mdc_getattr_name(struct lustre_handle *conn, struct inode *parent,
166                      char *filename, int namelen, unsigned long valid,
167                      unsigned int ea_size, struct ptlrpc_request **request)
168 {
169         struct ptlrpc_request *req;
170         struct mds_body *body;
171         int rc, size[2] = {sizeof(*body), namelen}, bufcount = 1;
172         ENTRY;
173
174         req = ptlrpc_prep_req(class_conn2cliimp(conn), MDS_GETATTR_NAME, 2,
175                               size, NULL);
176         if (!req)
177                 GOTO(out, rc = -ENOMEM);
178
179         body = lustre_msg_buf(req->rq_reqmsg, 0);
180         ll_inode2fid(&body->fid1, parent);
181         body->valid = valid;
182         memcpy(lustre_msg_buf(req->rq_reqmsg, 1), filename, namelen);
183
184         if (ea_size) {
185                 size[1] = ea_size;
186                 bufcount++;
187                 body->size = ea_size;
188                 CDEBUG(D_INODE, "reserved %u bytes for MD/symlink in packet\n",
189                        ea_size);
190                 valid |= OBD_MD_FLEASIZE;
191         }
192
193         req->rq_replen = lustre_msg_size(bufcount, size);
194         mds_pack_req_body(req);
195
196         mdc_get_rpc_lock(&mdc_rpc_lock, NULL);
197         rc = ptlrpc_queue_wait(req);
198         mdc_put_rpc_lock(&mdc_rpc_lock, NULL);
199         if (!rc) {
200                 body = lustre_msg_buf(req->rq_repmsg, 0);
201                 mds_unpack_body(body);
202         }
203
204         EXIT;
205  out:
206         *request = req;
207         return rc;
208 }
209
210 /* This should be called with both the request and the reply still packed. */
211 void mdc_store_inode_generation(struct ptlrpc_request *req, int reqoff,
212                                 int repoff)
213 {
214         struct mds_rec_create *rec = lustre_msg_buf(req->rq_reqmsg, reqoff);
215         struct mds_body *body = lustre_msg_buf(req->rq_repmsg, repoff);
216
217         memcpy(&rec->cr_replayfid, &body->fid1, sizeof rec->cr_replayfid);
218         DEBUG_REQ(D_HA, req, "storing generation %x for ino "LPD64,
219                   rec->cr_replayfid.generation, rec->cr_replayfid.id);
220 }
221
222 static int mdc_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
223                             void *data, int flag)
224 {
225         int rc;
226         struct lustre_handle lockh;
227         ENTRY;
228
229
230         switch (flag) {
231         case LDLM_CB_BLOCKING:
232                 ldlm_lock2handle(lock, &lockh);
233                 rc = ldlm_cli_cancel(&lockh);
234                 if (rc < 0) {
235                         CDEBUG(D_INODE, "ldlm_cli_cancel: %d\n", rc);
236                         RETURN(rc);
237                 }
238                 break;
239         case LDLM_CB_CANCELING: {
240                 /* Invalidate all dentries associated with this inode */
241                 struct inode *inode = lock->l_data;
242
243                 LASSERT(data != NULL);
244
245                 /* XXX what tells us that 'data' is a valid inode at all?
246                  *     we should probably validate the lock handle first?
247                  */
248                 inode = igrab(inode);
249
250                 if (inode == NULL) /* inode->i_state & I_FREEING */
251                         break;
252
253                 if (S_ISDIR(inode->i_mode)) {
254                         CDEBUG(D_INODE, "invalidating inode %lu\n",
255                                inode->i_ino);
256
257                         ll_invalidate_inode_pages(inode);
258                 }
259
260                 if (inode->i_sb->s_root && 
261                     inode != inode->i_sb->s_root->d_inode)
262                         d_unhash_aliases(inode);
263
264                 iput(inode);
265                 break;
266         }
267         default:
268                 LBUG();
269         }
270
271         RETURN(0);
272 }
273
274 /* We always reserve enough space in the reply packet for a stripe MD, because
275  * we don't know in advance the file type.
276  *
277  * XXX we could get that from ext2_dir_entry_2 file_type
278  */
279 int mdc_enqueue(struct lustre_handle *conn, int lock_type,
280                 struct lookup_intent *it, int lock_mode, struct inode *dir,
281                 struct dentry *de, struct lustre_handle *lockh,
282                 char *tgt, int tgtlen, void *data, int datalen)
283 {
284         struct ptlrpc_request *req;
285         struct obd_device *obddev = class_conn2obd(conn);
286         struct ldlm_res_id res_id =
287                 { .name = {dir->i_ino, dir->i_generation} };
288         int size[6] = {sizeof(struct ldlm_request), sizeof(struct ldlm_intent)};
289         int rc, flags = LDLM_FL_HAS_INTENT;
290         int repsize[3] = {sizeof(struct ldlm_reply),
291                           sizeof(struct mds_body),
292                           obddev->u.cli.cl_max_mds_easize};
293         struct mdc_unlink_data *d = data;
294         struct ldlm_reply *dlm_rep;
295         struct ldlm_intent *lit;
296         struct ldlm_request *lockreq;
297         ENTRY;
298
299         LDLM_DEBUG_NOLOCK("mdsintent %s parent dir %lu",
300                           ldlm_it2str(it->it_op), dir->i_ino);
301
302         if (it->it_op & IT_OPEN) {
303                 it->it_mode |= S_IFREG;
304                 it->it_mode &= ~current->fs->umask;
305
306                 size[2] = sizeof(struct mds_rec_create);
307                 size[3] = de->d_name.len + 1;
308                 req = ptlrpc_prep_req(class_conn2cliimp(conn), LDLM_ENQUEUE, 4,
309                                       size, NULL);
310                 if (!req)
311                         RETURN(-ENOMEM);
312
313                 req->rq_flags |= PTL_RPC_FL_REPLAY;
314
315                 /* pack the intent */
316                 lit = lustre_msg_buf(req->rq_reqmsg, 1);
317                 lit->opc = NTOH__u64((__u64)it->it_op);
318
319                 /* pack the intended request */
320 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
321                 mds_open_pack(req, 2, dir, it->it_mode, 0, current->fsuid,
322                               current->fsgid, CURRENT_TIME, it->it_flags,
323                               de->d_name.name, de->d_name.len, tgt, tgtlen);
324 #else
325                 mds_open_pack(req, 2, dir, it->it_mode, 0, current->fsuid,
326                               current->fsgid, CURRENT_TIME.tv_sec, it->it_flags,
327                               de->d_name.name, de->d_name.len, tgt, tgtlen);
328 #endif
329                 req->rq_replen = lustre_msg_size(3, repsize);
330         } else if (it->it_op & IT_UNLINK) {
331                 size[2] = sizeof(struct mds_rec_unlink);
332                 size[3] = d->unl_len + 1;
333                 req = ptlrpc_prep_req(class_conn2cliimp(conn), LDLM_ENQUEUE, 4,
334                                       size, NULL);
335                 if (!req)
336                         RETURN(-ENOMEM);
337
338                 /* pack the intent */
339                 lit = lustre_msg_buf(req->rq_reqmsg, 1);
340                 lit->opc = NTOH__u64((__u64)it->it_op);
341
342                 /* pack the intended request */
343                 mds_unlink_pack(req, 2, d->unl_dir, 
344                                 d->unl_de, d->unl_mode,
345                                 d->unl_name, d->unl_len);
346                 req->rq_replen = lustre_msg_size(3, repsize);
347         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
348                 int valid = OBD_MD_FLNOTOBD | OBD_MD_FLEASIZE;
349                 size[2] = sizeof(struct mds_body);
350                 size[3] = de->d_name.len + 1;
351
352                 req = ptlrpc_prep_req(class_conn2cliimp(conn), LDLM_ENQUEUE, 4,
353                                       size, NULL);
354                 if (!req)
355                         RETURN(-ENOMEM);
356
357                 /* pack the intent */
358                 lit = lustre_msg_buf(req->rq_reqmsg, 1);
359                 lit->opc = NTOH__u64((__u64)it->it_op);
360
361                 /* pack the intended request */
362                 mds_getattr_pack(req, valid, 2, it->it_flags,  dir,
363                                  de->d_name.name, de->d_name.len);
364                 /* get ready for the reply */
365                 req->rq_replen = lustre_msg_size(3, repsize);
366         } else if (it->it_op == IT_READDIR) {
367                 req = ptlrpc_prep_req(class_conn2cliimp(conn), LDLM_ENQUEUE, 1,
368                                       size, NULL);
369                 if (!req)
370                         RETURN(-ENOMEM);
371
372                 /* get ready for the reply */
373                 req->rq_replen = lustre_msg_size(1, repsize);
374         }  else {
375                 LBUG();
376                 RETURN(-EINVAL);
377         }
378
379         mdc_get_rpc_lock(&mdc_rpc_lock, it);
380         rc = ldlm_cli_enqueue(conn, req, obddev->obd_namespace, NULL, res_id,
381                               lock_type, NULL, 0, lock_mode, &flags,
382                               ldlm_completion_ast, mdc_blocking_ast, dir, NULL,
383                               lockh);
384         mdc_put_rpc_lock(&mdc_rpc_lock, it);
385
386         /* If we successfully created, mark the request so that replay will
387          * do the right thing */
388         if (req->rq_transno) {
389                 struct mds_rec_create *rec = lustre_msg_buf(req->rq_reqmsg, 2);
390                 rec->cr_opcode |= REINT_REPLAYING;
391         }
392         /* Similarly, if we're going to replay this request, we don't want to
393          * actually get a lock, just perform the intent. */
394         if (req->rq_transno || (req->rq_flags & PTL_RPC_FL_REPLAY)) {
395                 lockreq = lustre_msg_buf(req->rq_reqmsg, 0);
396                 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
397         }
398
399         /* This can go when we're sure that this can never happen */
400         LASSERT(rc != -ENOENT);
401         if (rc == ELDLM_LOCK_ABORTED) {
402                 lock_mode = 0;
403                 memset(lockh, 0, sizeof(*lockh));
404         } else if (rc != 0) {
405                 CERROR("ldlm_cli_enqueue: %d\n", rc);
406                 RETURN(rc);
407         } else { /* rc = 0 */
408                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
409                 struct lustre_handle lockh2;
410                 LASSERT(lock);
411
412                 /* If the server gave us back a different lock mode, we should
413                  * fix up our variables. */
414                 if (lock->l_req_mode != lock_mode) {
415                         ldlm_lock_addref(lockh, lock->l_req_mode);
416                         ldlm_lock_decref(lockh, lock_mode);
417                         lock_mode = lock->l_req_mode;
418                 }
419
420                 /* The server almost certainly gave us a lock other than the
421                  * one that we asked for.  If we already have a matching lock,
422                  * then cancel this one--we don't need two. */
423                 LDLM_DEBUG(lock, "matching against this");
424
425                 memcpy(&lockh2, lockh, sizeof(lockh2));
426                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
427                                     LDLM_PLAIN, NULL, 0, LCK_NL, &lockh2)) {
428                         /* We already have a lock; cancel the new one */
429                         ldlm_lock_decref_and_cancel(lockh, lock_mode);
430                         memcpy(lockh, &lockh2, sizeof(lockh2));
431                 }
432                 LDLM_LOCK_PUT(lock);
433         }
434
435         dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
436         it->it_disposition = (int) dlm_rep->lock_policy_res1;
437         it->it_status = (int) dlm_rep->lock_policy_res2;
438         it->it_lock_mode = lock_mode;
439         it->it_data = req;
440
441         RETURN(rc);
442 }
443
444 void mdc_lock_set_inode(struct lustre_handle *lockh, struct inode *inode)
445 {
446         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
447         ENTRY;
448
449         LASSERT(lock != NULL);
450         lock->l_data = inode;
451         LDLM_LOCK_PUT(lock);
452         EXIT;
453 }
454
455 int mdc_cancel_unused(struct lustre_handle *conn, struct inode *inode,
456                       int flags)
457 {
458         struct ldlm_res_id res_id =
459                 { .name = {inode->i_ino, inode->i_generation} };
460         struct obd_device *obddev = class_conn2obd(conn);
461         ENTRY;
462         RETURN(ldlm_cli_cancel_unused(obddev->obd_namespace, &res_id, flags));
463 }
464
465 static void mdc_replay_open(struct ptlrpc_request *req)
466 {
467         struct lustre_handle old, *file_fh = req->rq_replay_data;
468         struct list_head *tmp;
469         struct mds_body *body = lustre_msg_buf(req->rq_repmsg, 0);
470
471         mds_unpack_body(body);
472         memcpy(&old, file_fh, sizeof(old));
473         CDEBUG(D_HA, "updating from "LPD64"/"LPD64" to "LPD64"/"LPD64"\n",
474                file_fh->addr, file_fh->cookie, body->handle.addr,
475                body->handle.cookie);
476         memcpy(file_fh, &body->handle, sizeof(body->handle));
477
478         /* A few frames up, ptlrpc_replay holds the lock, so this is safe. */
479         list_for_each(tmp, &req->rq_import->imp_sending_list) {
480                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
481                 if (req->rq_reqmsg->opc != MDS_CLOSE)
482                         continue;
483                 body = lustre_msg_buf(req->rq_reqmsg, 0);
484                 if (memcmp(&body->handle, &old, sizeof(old)))
485                         continue;
486
487                 DEBUG_REQ(D_HA, req, "updating close body with new fh");
488                 memcpy(&body->handle, file_fh, sizeof(*file_fh));
489         }
490 }
491
492 void mdc_set_open_replay_data(struct ll_file_data *fd)
493 {
494         struct ptlrpc_request *req = fd->fd_req;
495         struct mds_rec_create *rec = lustre_msg_buf(req->rq_reqmsg, 2);
496         struct mds_body *body = lustre_msg_buf(req->rq_repmsg, 1);
497
498         memcpy(&rec->cr_replayfid, &body->fid1, sizeof rec->cr_replayfid);
499         fd->fd_req->rq_replay_cb = mdc_replay_open;
500         fd->fd_req->rq_replay_data = &fd->fd_mdshandle;
501 }
502
503 int mdc_close(struct lustre_handle *conn, obd_id ino, int type,
504               struct lustre_handle *fh, struct ptlrpc_request **request)
505 {
506         struct mds_body *body;
507         int rc, size = sizeof(*body);
508         struct ptlrpc_request *req;
509         ENTRY;
510
511         req = ptlrpc_prep_req(class_conn2cliimp(conn), MDS_CLOSE, 1, &size,
512                               NULL);
513         if (!req)
514                 GOTO(out, rc = -ENOMEM);
515
516         body = lustre_msg_buf(req->rq_reqmsg, 0);
517         ll_ino2fid(&body->fid1, ino, 0, type);
518         memcpy(&body->handle, fh, sizeof(body->handle));
519
520         req->rq_replen = lustre_msg_size(0, NULL);
521
522         mdc_get_rpc_lock(&mdc_rpc_lock, NULL);
523         rc = ptlrpc_queue_wait(req);
524         mdc_put_rpc_lock(&mdc_rpc_lock, NULL);
525
526         EXIT;
527  out:
528         *request = req;
529         return rc;
530 }
531
532 int mdc_readpage(struct lustre_handle *conn, obd_id ino, int type, __u64 offset,
533                  char *addr, struct ptlrpc_request **request)
534 {
535         struct obd_import *imp = class_conn2cliimp(conn);
536         struct ptlrpc_connection *connection =
537                 client_conn2cli(conn)->cl_import.imp_connection;
538         struct ptlrpc_request *req = NULL;
539         struct ptlrpc_bulk_desc *desc = NULL;
540         struct ptlrpc_bulk_page *bulk = NULL;
541         struct mds_body *body;
542         int rc, size = sizeof(*body);
543         ENTRY;
544
545         CDEBUG(D_INODE, "inode: %ld\n", (long)ino);
546
547         desc = ptlrpc_prep_bulk(connection);
548         if (desc == NULL)
549                 GOTO(out, rc = -ENOMEM);
550
551         req = ptlrpc_prep_req(imp, MDS_READPAGE, 1, &size, NULL);
552         if (!req)
553                 GOTO(out2, rc = -ENOMEM);
554
555         /* XXX FIXME bug 249 */
556         req->rq_request_portal = MDS_READPAGE_PORTAL;
557
558         bulk = ptlrpc_prep_bulk_page(desc);
559         if (bulk == NULL)
560                 GOTO(out2, rc = -ENOMEM);
561
562         bulk->bp_xid = ptlrpc_next_xid();
563         bulk->bp_buflen = PAGE_CACHE_SIZE;
564         bulk->bp_buf = addr;
565
566         desc->bd_ptl_ev_hdlr = NULL;
567         desc->bd_portal = MDS_BULK_PORTAL;
568
569         rc = ptlrpc_register_bulk_put(desc);
570         if (rc) {
571                 CERROR("couldn't setup bulk sink: error %d.\n", rc);
572                 GOTO(out2, rc);
573         }
574
575         mds_readdir_pack(req, offset, ino, type, bulk->bp_xid);
576
577         req->rq_replen = lustre_msg_size(1, &size);
578         rc = ptlrpc_queue_wait(req);
579         if (rc) {
580                 ptlrpc_abort_bulk(desc);
581                 GOTO(out2, rc);
582         } else {
583                 body = lustre_msg_buf(req->rq_repmsg, 0);
584                 mds_unpack_body(body);
585         }
586
587         EXIT;
588  out2:
589         ptlrpc_bulk_decref(desc);
590  out:
591         *request = req;
592         return rc;
593 }
594
595 static int mdc_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
596 {
597         struct ptlrpc_request *req;
598         int rc, size = sizeof(*osfs);
599         ENTRY;
600
601         req = ptlrpc_prep_req(class_conn2cliimp(conn), MDS_STATFS, 0, NULL,
602                               NULL);
603         if (!req)
604                 RETURN(-ENOMEM);
605
606         req->rq_replen = lustre_msg_size(1, &size);
607
608         mdc_get_rpc_lock(&mdc_rpc_lock, NULL);
609         rc = ptlrpc_queue_wait(req);
610         mdc_put_rpc_lock(&mdc_rpc_lock, NULL);
611
612         if (rc)
613                 GOTO(out, rc);
614
615         obd_statfs_unpack(osfs, lustre_msg_buf(req->rq_repmsg, 0));
616
617         EXIT;
618 out:
619         ptlrpc_req_finished(req);
620
621         return rc;
622 }
623
624 static int mdc_attach(struct obd_device *dev, obd_count len, void *data)
625 {
626         struct lprocfs_static_vars lvars;
627
628         lprocfs_init_vars(&lvars);
629         return lprocfs_obd_attach(dev, lvars.obd_vars);
630 }
631
632 static int mdc_detach(struct obd_device *dev)
633 {
634         return lprocfs_obd_detach(dev);
635 }
636
637 /* Send a mostly-dummy GETSTATUS request and indicate that we're done replay. */
638 static int signal_completed_replay(struct obd_import *imp)
639 {
640         struct ll_fid fid;
641
642         return send_getstatus(imp, &fid, LUSTRE_CONN_RECOVD, MSG_LAST_REPLAY);
643 }
644
645 static int mdc_recover(struct obd_import *imp, int phase)
646 {
647         int rc;
648         unsigned long flags;
649         struct ptlrpc_request *req;
650         struct ldlm_namespace *ns = imp->imp_obd->obd_namespace;
651         ENTRY;
652
653         switch(phase) {
654             case PTLRPC_RECOVD_PHASE_PREPARE:
655                 ldlm_cli_cancel_unused(ns, NULL, LDLM_FL_LOCAL_ONLY);
656                 RETURN(0);
657
658             case PTLRPC_RECOVD_PHASE_NOTCONN:
659                 ldlm_namespace_cleanup(ns, 1);
660                 ptlrpc_abort_inflight(imp, 0);
661                 /* FALL THROUGH */
662             case PTLRPC_RECOVD_PHASE_RECOVER:
663         reconnect:
664                 rc = ptlrpc_reconnect_import(imp, MDS_CONNECT, &req);
665
666                 flags = req->rq_repmsg
667                         ? lustre_msg_get_op_flags(req->rq_repmsg)
668                         : 0;
669
670                 if (rc == -EBUSY && (flags & MSG_CONNECT_RECOVERING))
671                         CERROR("reconnect denied by recovery; should retry\n");
672
673                 if (rc) {
674                         if (phase != PTLRPC_RECOVD_PHASE_NOTCONN) {
675                                 CERROR("can't reconnect, invalidating\n");
676                                 ldlm_namespace_cleanup(ns, 1);
677                                 ptlrpc_abort_inflight(imp, 0);
678                         }
679                         ptlrpc_req_finished(req);
680                         RETURN(rc);
681                 }
682
683                 if (flags & MSG_CONNECT_RECOVERING) {
684                         /* Replay if they want it. */
685                         DEBUG_REQ(D_HA, req, "MDS wants replay");
686                         rc = ptlrpc_replay(imp);
687                         if (rc)
688                                 GOTO(check_rc, rc);
689
690                         rc = ldlm_replay_locks(imp);
691                         if (rc)
692                                 GOTO(check_rc, rc);
693
694                         rc = signal_completed_replay(imp);
695                         if (rc)
696                                 GOTO(check_rc, rc);
697                 } else if (flags & MSG_CONNECT_RECONNECT) {
698                         DEBUG_REQ(D_HA, req, "reconnecting to MDS");
699                         /* Nothing else to do here. */
700                 } else {
701                         DEBUG_REQ(D_HA, req, "evicted: invalidating");
702                         /* Otherwise, clean everything up. */
703                         ldlm_namespace_cleanup(ns, 1);
704                         ptlrpc_abort_inflight(imp, 0);
705                 }
706
707                 ptlrpc_req_finished(req);
708                 spin_lock_irqsave(&imp->imp_lock, flags);
709                 imp->imp_level = LUSTRE_CONN_FULL;
710                 spin_unlock_irqrestore(&imp->imp_lock, flags);
711
712                 ptlrpc_wake_delayed(imp);
713
714                 rc = ptlrpc_resend(imp);
715                 if (rc)
716                         GOTO(check_rc, rc);
717
718                 RETURN(0);
719         check_rc:
720                 /* If we get disconnected in the middle, recovery has probably
721                  * failed.  Reconnect and find out.
722                  */
723                 if (rc == -ENOTCONN)
724                         goto reconnect;
725                 RETURN(rc);
726
727             default:
728                 RETURN(-EINVAL);
729         }
730 }
731
732 static int mdc_connect(struct lustre_handle *conn, struct obd_device *obd,
733                        struct obd_uuid *cluuid, struct recovd_obd *recovd,
734                        ptlrpc_recovery_cb_t recover)
735 {
736         struct obd_import *imp = &obd->u.cli.cl_import;
737         imp->imp_recover = mdc_recover;
738         return client_obd_connect(conn, obd, cluuid, recovd, recover);
739 }
740
741 struct obd_ops mdc_obd_ops = {
742         o_owner:       THIS_MODULE,
743         o_attach:      mdc_attach,
744         o_detach:      mdc_detach,
745         o_setup:       client_obd_setup,
746         o_cleanup:     client_obd_cleanup,
747         o_connect:     mdc_connect,
748         o_disconnect:  client_obd_disconnect,
749         o_statfs:      mdc_statfs
750 };
751
752 static int __init ptlrpc_request_init(void)
753 {
754         struct lprocfs_static_vars lvars;
755         mdc_init_rpc_lock(&mdc_rpc_lock);
756         mdc_init_rpc_lock(&mdc_setattr_lock);
757         lprocfs_init_vars(&lvars);
758         return class_register_type(&mdc_obd_ops, lvars.module_vars,
759                                    LUSTRE_MDC_NAME);
760 }
761
762 static void __exit ptlrpc_request_exit(void)
763 {
764         class_unregister_type(LUSTRE_MDC_NAME);
765 }
766
767 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
768 MODULE_DESCRIPTION("Lustre Metadata Client");
769 MODULE_LICENSE("GPL");
770
771 EXPORT_SYMBOL(mdc_getstatus);
772 EXPORT_SYMBOL(mdc_getlovinfo);
773 EXPORT_SYMBOL(mdc_enqueue);
774 EXPORT_SYMBOL(mdc_cancel_unused);
775 EXPORT_SYMBOL(mdc_getattr);
776 EXPORT_SYMBOL(mdc_getattr_name);
777 EXPORT_SYMBOL(mdc_create);
778 EXPORT_SYMBOL(mdc_unlink);
779 EXPORT_SYMBOL(mdc_rename);
780 EXPORT_SYMBOL(mdc_link);
781 EXPORT_SYMBOL(mdc_readpage);
782 EXPORT_SYMBOL(mdc_setattr);
783 EXPORT_SYMBOL(mdc_close);
784 EXPORT_SYMBOL(mdc_lock_set_inode);
785 EXPORT_SYMBOL(mdc_set_open_replay_data);
786
787 EXPORT_SYMBOL(mdc_store_inode_generation);
788
789 module_init(ptlrpc_request_init);
790 module_exit(ptlrpc_request_exit);