1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
6 * This file is part of Lustre, http://www.lustre.org.
8 * Lustre is free software; you can redistribute it and/or
9 * modify it under the terms of version 2 of the GNU General Public
10 * License as published by the Free Software Foundation.
12 * Lustre is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with Lustre; if not, write to the Free Software
19 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 # define EXPORT_SYMTAB
25 #define DEBUG_SUBSYSTEM S_LMV
27 #include <linux/slab.h>
28 #include <linux/module.h>
29 #include <linux/init.h>
30 #include <linux/slab.h>
31 #include <linux/pagemap.h>
32 #include <asm/div64.h>
34 #include <liblustre.h>
37 #include <linux/obd_support.h>
38 #include <linux/lustre_lib.h>
39 #include <linux/lustre_net.h>
40 #include <linux/lustre_idl.h>
41 #include <linux/lustre_dlm.h>
42 #include <linux/lustre_mds.h>
43 #include <linux/obd_class.h>
44 #include <linux/obd_ost.h>
45 #include <linux/seq_file.h>
46 #include <linux/lprocfs_status.h>
47 #include <linux/lustre_fsfilt.h>
48 #include <linux/obd_lmv.h>
49 #include "lmv_internal.h"
52 int lmv_handle_remote_inode(struct obd_export *exp, struct ll_uctxt *uctxt,
53 void *lmm, int lmmsize,
54 struct lookup_intent *it, int flags,
55 struct ptlrpc_request **reqp,
56 ldlm_blocking_callback cb_blocking)
58 struct obd_device *obd = exp->exp_obd;
59 struct lmv_obd *lmv = &obd->u.lmv;
60 struct mds_body *body = NULL;
64 body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
65 LASSERT(body != NULL);
67 if (body->valid & OBD_MD_MDS) {
68 /* oh, MDS reports that this is remote inode case
69 * i.e. we have to ask for real attrs on another MDS */
70 struct ptlrpc_request *req;
72 struct lustre_handle plock;
75 if (it->it_op == IT_LOOKUP) {
76 /* unfortunately, we have to lie to MDC/MDS to
77 * retrieve attributes llite needs */
78 it->it_op = IT_GETATTR;
81 /* we got LOOKUP lock, but we really need attrs */
82 pmode = it->d.lustre.it_lock_mode;
84 memcpy(&plock, &it->d.lustre.it_lock_handle,
86 it->d.lustre.it_lock_mode = 0;
90 it->d.lustre.it_disposition &= ~DISP_ENQ_COMPLETE;
91 rc = md_intent_lock(lmv->tgts[nfid.mds].exp, uctxt, &nfid,
92 NULL, 0, lmm, lmmsize, NULL, it, flags,
95 /* llite needs LOOKUP lock to track dentry revocation in
96 * order to maintain dcache consistency. thus drop UPDATE
97 * lock here and put LOOKUP in request */
99 LASSERT(it->d.lustre.it_lock_mode != 0);
100 ldlm_lock_decref((void *)&it->d.lustre.it_lock_handle,
101 it->d.lustre.it_lock_mode);
102 memcpy(&it->d.lustre.it_lock_handle, &plock,
104 it->d.lustre.it_lock_mode = pmode;
107 ldlm_lock_decref(&plock, pmode);
109 ptlrpc_req_finished(*reqp);
115 int lmv_intent_open(struct obd_export *exp, struct ll_uctxt *uctxt,
116 struct ll_fid *pfid, const char *name, int len,
117 void *lmm, int lmmsize, struct ll_fid *cfid,
118 struct lookup_intent *it, int flags,
119 struct ptlrpc_request **reqp,
120 ldlm_blocking_callback cb_blocking)
122 struct obd_device *obd = exp->exp_obd;
123 struct lmv_obd *lmv = &obd->u.lmv;
124 struct mds_body *body = NULL;
125 struct ll_fid rpfid = *pfid;
131 /* IT_OPEN is intended to open (and create, possible) an object.
132 * parent (pfid) may be splitted dir */
136 obj = lmv_grab_obj(obd, &rpfid, 0);
138 /* directory is already splitted, so we have to forward
139 * request to the right MDS */
140 mds = raw_name2idx(obj->objcount, name, len);
141 rpfid = obj->objs[mds].fid;
142 CDEBUG(D_OTHER, "forward to MDS #%u\n", mds);
145 rc = md_intent_lock(lmv->tgts[mds].exp, uctxt, &rpfid, name,
146 len, lmm, lmmsize, cfid, it, flags, reqp,
149 if (rc == -ERESTART) {
150 /* directory got splitted. time to update local object
151 * and repeat the request with proper MDS */
152 LASSERT(fid_equal(pfid, &rpfid));
153 rc = lmv_get_mea_and_update_object(exp, &rpfid);
155 ptlrpc_req_finished(*reqp);
162 /* okay, MDS has returned success. probably name has been
163 * resolved in remote inode */
164 rc = lmv_handle_remote_inode(exp, uctxt, lmm, lmmsize, it, flags,
171 /* caller may use attrs MDS returns on IT_OPEN lock request
172 * so, we have to update them for splitted dir */
173 body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
174 LASSERT(body != NULL);
176 obj = lmv_grab_obj(obd, cfid, 0);
177 if (rc == 0 && !obj && (mea = is_body_of_splitted_dir(*reqp, 1))) {
178 /* wow! this is splitted dir, we'd like to handle it */
179 rc = lmv_create_obj_from_attrs(exp, &body->fid1, mea);
181 obj = lmv_grab_obj(obd, cfid, 0);
183 /* this is splitted dir and we'd want to get attrs */
184 CDEBUG(D_OTHER, "attrs from slaves for %lu/%lu/%lu\n",
185 (unsigned long) cfid->mds,
186 (unsigned long) cfid->id,
187 (unsigned long) cfid->generation);
188 rc = lmv_revalidate_slaves(exp, reqp, cfid,
190 } else if (S_ISDIR(body->mode)) {
191 /*CWARN("hmmm, %lu/%lu/%lu has not lmv obj?!\n",
192 (unsigned long) cfid->mds,
193 (unsigned long) cfid->id,
194 (unsigned long) cfid->generation);*/
200 int lmv_intent_getattr(struct obd_export *exp, struct ll_uctxt *uctxt,
201 struct ll_fid *pfid, const char *name, int len,
202 void *lmm, int lmmsize, struct ll_fid *cfid,
203 struct lookup_intent *it, int flags,
204 struct ptlrpc_request **reqp,
205 ldlm_blocking_callback cb_blocking)
207 struct obd_device *obd = exp->exp_obd;
208 struct lmv_obd *lmv = &obd->u.lmv;
209 struct mds_body *body = NULL;
210 struct ll_fid rpfid = *pfid;
211 struct lmv_obj *obj, *obj2;
217 /* caller wants to revalidate attrs of obj
218 * we have to revalidate slaves if requested
219 * object is splitted directory */
220 CDEBUG(D_OTHER, "revalidate attrs for %lu/%lu/%lu\n",
221 (unsigned long) cfid->mds,
222 (unsigned long) cfid->id,
223 (unsigned long) cfid->generation);
225 obj = lmv_grab_obj(obd, cfid, 0);
227 /* in fact, we need not this with current
228 * _intent_lock(), but it may change some day */
229 rpfid = obj->objs[mds].fid;
231 rc = md_intent_lock(lmv->tgts[mds].exp, uctxt, &rpfid, name,
232 len, lmm, lmmsize, cfid, it, flags, reqp,
234 if (obj && rc >= 0) {
235 /* this is splitted dir. in order to optimize things
236 * a bit, we consider obj valid updating missing
237 * parts. FIXME: do we need to return any lock here?
238 * it would be fine if we don't. this means that
239 * nobody should use UPDATE lock to notify about
242 "revalidate slaves for %lu/%lu/%lu, rc %d\n",
243 (unsigned long) cfid->mds,
244 (unsigned long) cfid->id,
245 (unsigned long) cfid->generation, rc);
246 rc = lmv_revalidate_slaves(exp, reqp, cfid, it, rc,
252 CDEBUG(D_OTHER, "INTENT getattr for %*s on %lu/%lu/%lu\n",
253 len, name, (unsigned long) pfid->mds,
254 (unsigned long) pfid->id,
255 (unsigned long) pfid->generation);
258 obj = lmv_grab_obj(obd, pfid, 0);
260 /* directory is already splitted. calculate mds */
261 mds = raw_name2idx(obj->objcount, (char *) name, len);
262 rpfid = obj->objs[mds].fid;
263 CDEBUG(D_OTHER, "forward to MDS #%u (slave %lu/%lu/%lu)\n",
264 mds, (unsigned long) rpfid.mds,
265 (unsigned long) rpfid.id,
266 (unsigned long) rpfid.generation);
268 rc = md_intent_lock(lmv->tgts[mds].exp, uctxt, &rpfid, name,
269 len, lmm, lmmsize, NULL, it, flags, reqp,
275 /* okay, MDS has returned success. probably name has been
276 * resolved in remote inode */
277 rc = lmv_handle_remote_inode(exp, uctxt, lmm, lmmsize, it, flags,
282 body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
283 LASSERT(body != NULL);
285 obj2 = lmv_grab_obj(obd, cfid, 0);
287 if (rc == 0 && !obj2 && (mea = is_body_of_splitted_dir(*reqp, 1))) {
288 /* wow! this is splitted dir, we'd like to handle it */
289 body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
290 LASSERT(body != NULL);
291 rc = lmv_create_obj_from_attrs(exp, &body->fid1, mea);
292 obj2 = lmv_grab_obj(obd, cfid, 0);
296 /* this is splitted dir and we'd want to get attrs */
298 "attrs from slaves for %lu/%lu/%lu, rc %d\n",
299 (unsigned long) cfid->mds,
300 (unsigned long) cfid->id,
301 (unsigned long) cfid->generation, rc);
302 rc = lmv_revalidate_slaves(exp, reqp, cfid,
308 void lmv_update_body_from_obj(struct mds_body *body, struct lmv_inode *obj)
311 body->size += obj->size;
319 int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp)
321 struct obd_device *obd = exp->exp_obd;
322 struct lmv_obd *lmv = &obd->u.lmv;
323 struct mds_body *body = NULL;
324 struct lustre_handle *lockh;
325 struct ldlm_lock *lock;
326 struct mds_body *body2;
327 struct ll_uctxt uctxt;
335 /* master is locked. we'd like to take locks on slaves
336 * and update attributes to be returned from the slaves
337 * it's important that lookup is called in two cases:
338 * - for first time (dcache has no such a resolving yet
339 * - ->d_revalidate() returned false
340 * last case possible only if all the objs (master and
341 * all slaves aren't valid */
343 body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
344 LASSERT(body != NULL);
346 obj = lmv_grab_obj(obd, &body->fid1, 0);
349 CDEBUG(D_OTHER, "lookup slaves for %lu/%lu/%lu\n",
350 (unsigned long) body->fid1.mds,
351 (unsigned long) body->fid1.id,
352 (unsigned long) body->fid1.generation);
356 for (i = 0; i < obj->objcount; i++) {
357 struct ll_fid fid = obj->objs[i].fid;
358 struct ptlrpc_request *req = NULL;
359 struct lookup_intent it;
361 if (fid_equal(&fid, &obj->fid)) {
362 /* skip master obj */
366 CDEBUG(D_OTHER, "lookup slave %lu/%lu/%lu\n",
367 (unsigned long) fid.mds,
368 (unsigned long) fid.id,
369 (unsigned long) fid.generation);
372 memset(&it, 0, sizeof(it));
373 it.it_op = IT_GETATTR;
374 rc = md_intent_lock(lmv->tgts[fid.mds].exp, &uctxt, &fid,
375 NULL, 0, NULL, 0, &fid, &it, 0, &req,
376 lmv_dirobj_blocking_ast);
377 lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle;
379 /* nice, this slave is valid */
380 LASSERT(req == NULL);
381 CDEBUG(D_OTHER, "cached\n");
386 /* error during revalidation */
390 /* rc == 0, this means we have no such a lock and can't
391 * think obj is still valid. lookup it again */
392 LASSERT(req == NULL);
394 memset(&it, 0, sizeof(it));
395 it.it_op = IT_GETATTR;
396 rc = md_intent_lock(lmv->tgts[fid.mds].exp, &uctxt, &fid,
397 NULL, 0, NULL, 0, NULL, &it, 0, &req,
398 lmv_dirobj_blocking_ast);
399 lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle;
402 /* error during lookup */
406 lock = ldlm_handle2lock(lockh);
408 lock->l_ast_data = obj;
409 atomic_inc(&obj->count);
411 body2 = lustre_msg_buf(req->rq_repmsg, 1, sizeof(*body2));
414 obj->objs[i].size = body2->size;
415 CDEBUG(D_OTHER, "fresh: %lu\n",
416 (unsigned long) obj->objs[i].size);
421 ptlrpc_req_finished(req);
423 lmv_update_body_from_obj(body, obj->objs + i);
424 if (it.d.lustre.it_lock_mode)
425 ldlm_lock_decref(lockh, it.d.lustre.it_lock_mode);
431 int lmv_intent_lookup(struct obd_export *exp, struct ll_uctxt *uctxt,
432 struct ll_fid *pfid, const char *name, int len,
433 void *lmm, int lmmsize, struct ll_fid *cfid,
434 struct lookup_intent *it, int flags,
435 struct ptlrpc_request **reqp,
436 ldlm_blocking_callback cb_blocking)
438 struct obd_device *obd = exp->exp_obd;
439 struct lmv_obd *lmv = &obd->u.lmv;
440 struct mds_body *body = NULL;
441 struct ll_fid rpfid = *pfid;
447 /* IT_LOOKUP is intended to produce name -> fid resolving
448 * (let's call this lookup below) or to confirm requested
449 * resolving is still valid (let's call this revalidation)
450 * cfid != NULL specifies revalidation */
453 /* this is revalidation: we have to check is LOOKUP
454 * lock still valid for given fid. very important
455 * part is that we have to choose right mds because
456 * namespace is per mds */
458 obj = lmv_grab_obj(obd, pfid, 0);
460 mds = raw_name2idx(obj->objcount, (char *) name, len);
461 rpfid = obj->objs[mds].fid;
465 CDEBUG(D_OTHER, "revalidate lookup for %lu/%lu/%lu to %d MDS\n",
466 (unsigned long) cfid->mds,
467 (unsigned long) cfid->id,
468 (unsigned long) cfid->generation, mds);
469 rc = md_intent_lock(lmv->tgts[mds].exp, uctxt, pfid, name,
470 len, lmm, lmmsize, cfid, it, flags,
477 /* this is lookup. during lookup we have to update all the
478 * attributes, because returned values will be put in struct
481 obj = lmv_grab_obj(obd, pfid, 0);
483 /* directory is already splitted. calculate mds */
484 mds = raw_name2idx(obj->objcount, (char *) name, len);
485 rpfid = obj->objs[mds].fid;
489 rc = md_intent_lock(lmv->tgts[mds].exp, uctxt, &rpfid, name,
490 len, lmm, lmmsize, NULL, it, flags, reqp,
493 /* very interesting. it seems object is still valid
494 * but for some reason llite calls lookup, not revalidate */
495 CWARN("lookup for %lu/%lu/%lu and data should be uptodate\n",
496 (unsigned long) rpfid.mds,
497 (unsigned long) rpfid.id,
498 (unsigned long) rpfid.generation);
499 LASSERT(*reqp == NULL);
503 if (rc == 0 && *reqp == NULL) {
504 /* once again, we're asked for lookup, not revalidate */
505 CWARN("lookup for %lu/%lu/%lu and data should be uptodate\n",
506 (unsigned long) rpfid.mds,
507 (unsigned long) rpfid.id,
508 (unsigned long) rpfid.generation);
512 if (rc == -ERESTART) {
513 /* directory got splitted since last update. this shouldn't
514 * be becasue splitting causes lock revocation, so revalidate
515 * had to fail and lookup on dir had to return mea */
516 CWARN("we haven't knew about directory splitting!\n");
517 LASSERT(obj == NULL);
518 rc = lmv_create_obj_from_attrs(exp, &rpfid, NULL);
527 /* okay, MDS has returned success. probably name has been
528 * resolved in remote inode */
529 rc = lmv_handle_remote_inode(exp, uctxt, lmm, lmmsize, it, flags,
532 if (rc == 0 && (mea = is_body_of_splitted_dir(*reqp, 1))) {
533 /* wow! this is splitted dir, we'd like to handle it */
534 body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
535 LASSERT(body != NULL);
536 obj = lmv_grab_obj(obd, &body->fid1, 0);
538 rc = lmv_create_obj_from_attrs(exp, &body->fid1, mea);
545 int lmv_intent_lock(struct obd_export *exp, struct ll_uctxt *uctxt,
546 struct ll_fid *pfid, const char *name, int len,
547 void *lmm, int lmmsize, struct ll_fid *cfid,
548 struct lookup_intent *it, int flags,
549 struct ptlrpc_request **reqp,
550 ldlm_blocking_callback cb_blocking)
552 struct obd_device *obd = exp->exp_obd;
559 CDEBUG(D_OTHER, "INTENT LOCK '%s' for '%*s' on %lu/%lu -> %u\n",
560 LL_IT2STR(it), len, name, (unsigned long) pfid->id,
561 (unsigned long) pfid->generation, pfid->mds);
564 if (it->it_op == IT_LOOKUP)
565 rc = lmv_intent_lookup(exp, uctxt, pfid, name, len, lmm,
566 lmmsize, cfid, it, flags, reqp,
568 else if (it->it_op & IT_OPEN)
569 rc = lmv_intent_open(exp, uctxt, pfid, name, len, lmm,
570 lmmsize, cfid, it, flags, reqp,
572 else if (it->it_op == IT_GETATTR)
573 rc = lmv_intent_getattr(exp, uctxt, pfid, name, len, lmm,
574 lmmsize, cfid, it, flags, reqp,
581 int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp,
582 struct ll_fid *mfid, struct lookup_intent *oit,
583 int master_valid, ldlm_blocking_callback cb_blocking)
585 struct obd_device *obd = exp->exp_obd;
586 struct ptlrpc_request *mreq = *reqp;
587 struct lmv_obd *lmv = &obd->u.lmv;
588 struct lustre_handle master_lockh;
589 unsigned long size = 0;
590 struct ldlm_lock *lock;
591 struct mds_body *body;
592 struct ll_uctxt uctxt;
594 int master_lock_mode;
598 /* we have to loop over the subobjects, check validity and update
599 * them from MDSs if needed. it's very useful that we need not to
600 * update all the fields. say, common fields (that are equal on
601 * all the subojects need not to be update, another fields (i_size,
602 * for example) are cached all the time */
603 obj = lmv_grab_obj(obd, mfid, 0);
606 master_lock_mode = 0;
609 for (i = 0; i < obj->objcount; i++) {
610 struct ll_fid fid = obj->objs[i].fid;
611 struct lustre_handle *lockh = NULL;
612 struct ptlrpc_request *req = NULL;
613 ldlm_blocking_callback cb;
614 struct lookup_intent it;
617 CDEBUG(D_OTHER, "revalidate subobj %lu/%lu/%lu\n",
618 (unsigned long) fid.mds,
619 (unsigned long) fid.id,
620 (unsigned long) fid.generation);
622 memset(&it, 0, sizeof(it));
623 it.it_op = IT_GETATTR;
624 cb = lmv_dirobj_blocking_ast;
626 if (fid_equal(&fid, &obj->fid)) {
628 /* lmv_intent_getattr() already checked
629 * validness and took the lock */
631 /* it even got the reply
632 * refresh attrs from that reply */
633 body = lustre_msg_buf(mreq->rq_repmsg,
635 LASSERT(body != NULL);
638 /* take already cached attrs into account */
640 "master is locked and cached\n");
648 rc = md_intent_lock(lmv->tgts[fid.mds].exp, &uctxt, &fid,
649 NULL, 0, NULL, 0, &fid, &it, 0, &req, cb);
650 lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle;
652 /* nice, this slave is valid */
653 LASSERT(req == NULL);
654 CDEBUG(D_OTHER, "cached\n");
659 /* error during revalidation */
663 /* rc == 0, this means we have no such a lock and can't
664 * think obj is still valid. lookup it again */
665 LASSERT(req == NULL);
667 memset(&it, 0, sizeof(it));
668 it.it_op = IT_GETATTR;
669 rc = md_intent_lock(lmv->tgts[fid.mds].exp, &uctxt, &fid,
670 NULL, 0, NULL, 0, NULL, &it, 0, &req, cb);
671 lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle;
674 /* error during lookup */
679 LASSERT(master_valid == 0);
680 /* save lock on master to be returned to the caller */
681 CDEBUG(D_OTHER, "no lock on master yet\n");
682 memcpy(&master_lockh, lockh, sizeof(master_lockh));
683 master_lock_mode = it.d.lustre.it_lock_mode;
684 it.d.lustre.it_lock_mode = 0;
686 /* this is slave. we want to control it */
687 lock = ldlm_handle2lock(lockh);
689 lock->l_ast_data = obj;
690 atomic_inc(&obj->count);
695 /* this is first reply, we'll use it to return
696 * updated data back to the caller */
698 ptlrpc_request_addref(req);
703 body = lustre_msg_buf(req->rq_repmsg, 1, sizeof(*body));
707 obj->objs[i].size = body->size;
708 CDEBUG(D_OTHER, "fresh: %lu\n",
709 (unsigned long) obj->objs[i].size);
712 ptlrpc_req_finished(req);
714 size += obj->objs[i].size;
715 if (it.d.lustre.it_lock_mode)
716 ldlm_lock_decref(lockh, it.d.lustre.it_lock_mode);
720 /* some attrs got refreshed, we have reply and it's time
721 * to put fresh attrs to it */
722 CDEBUG(D_OTHER, "return refreshed attrs: size = %lu\n",
723 (unsigned long) size);
724 body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
726 /* FIXME: what about another attributes? */
729 /* very important to maintain lli->mds the same
730 * because of revalidation. mreq == NULL means
731 * that caller has no reply and the only attr
732 * we can return is size */
733 body->valid = OBD_MD_FLSIZE;
734 body->mds = obj->fid.mds;
736 if (master_valid == 0) {
737 memcpy(&oit->d.lustre.it_lock_handle,
738 &master_lockh, sizeof(master_lockh));
739 oit->d.lustre.it_lock_mode = master_lock_mode;
743 /* it seems all the attrs are fresh and we did no request */
744 CDEBUG(D_OTHER, "all the attrs were fresh\n");
745 if (master_valid == 0)
746 oit->d.lustre.it_lock_mode = master_lock_mode;