1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
6 * This file is part of Lustre, http://www.lustre.org.
8 * Lustre is free software; you can redistribute it and/or
9 * modify it under the terms of version 2 of the GNU General Public
10 * License as published by the Free Software Foundation.
12 * Lustre is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with Lustre; if not, write to the Free Software
19 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 # define EXPORT_SYMTAB
25 #define DEBUG_SUBSYSTEM S_LMV
27 #include <linux/slab.h>
28 #include <linux/module.h>
29 #include <linux/init.h>
30 #include <linux/slab.h>
31 #include <linux/pagemap.h>
32 #include <asm/div64.h>
34 #include <liblustre.h>
37 #include <linux/obd_support.h>
38 #include <linux/lustre_lib.h>
39 #include <linux/lustre_net.h>
40 #include <linux/lustre_idl.h>
41 #include <linux/lustre_dlm.h>
42 #include <linux/lustre_mds.h>
43 #include <linux/obd_class.h>
44 #include <linux/obd_ost.h>
45 #include <linux/seq_file.h>
46 #include <linux/lprocfs_status.h>
47 #include <linux/lustre_fsfilt.h>
48 #include <linux/obd_lmv.h>
49 #include "lmv_internal.h"
52 int lmv_handle_remote_inode(struct obd_export *exp, struct ll_uctxt *uctxt,
53 void *lmm, int lmmsize,
54 struct lookup_intent *it, int flags,
55 struct ptlrpc_request **reqp,
56 ldlm_blocking_callback cb_blocking)
58 struct obd_device *obd = exp->exp_obd;
59 struct lmv_obd *lmv = &obd->u.lmv;
60 struct mds_body *body = NULL;
64 body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
65 LASSERT(body != NULL);
67 if (body->valid & OBD_MD_MDS) {
68 /* oh, MDS reports that this is remote inode case
69 * i.e. we have to ask for real attrs on another MDS */
70 struct ptlrpc_request *req;
72 struct lustre_handle plock;
75 if (it->it_op == IT_LOOKUP) {
76 /* unfortunately, we have to lie to MDC/MDS to
77 * retrieve attributes llite needs */
78 it->it_op = IT_GETATTR;
81 /* we got LOOKUP lock, but we really need attrs */
82 pmode = it->d.lustre.it_lock_mode;
84 memcpy(&plock, &it->d.lustre.it_lock_handle,
86 it->d.lustre.it_lock_mode = 0;
90 it->d.lustre.it_disposition &= ~DISP_ENQ_COMPLETE;
91 rc = md_intent_lock(lmv->tgts[nfid.mds].exp, uctxt, &nfid,
92 NULL, 0, lmm, lmmsize, NULL, it, flags,
95 /* llite needs LOOKUP lock to track dentry revocation in
96 * order to maintain dcache consistency. thus drop UPDATE
97 * lock here and put LOOKUP in request */
99 LASSERT(it->d.lustre.it_lock_mode != 0);
100 ldlm_lock_decref((void *)&it->d.lustre.it_lock_handle,
101 it->d.lustre.it_lock_mode);
102 memcpy(&it->d.lustre.it_lock_handle, &plock,
104 it->d.lustre.it_lock_mode = pmode;
107 ldlm_lock_decref(&plock, pmode);
109 ptlrpc_req_finished(*reqp);
115 int lmv_intent_open(struct obd_export *exp, struct ll_uctxt *uctxt,
116 struct ll_fid *pfid, const char *name, int len,
117 void *lmm, int lmmsize, struct ll_fid *cfid,
118 struct lookup_intent *it, int flags,
119 struct ptlrpc_request **reqp,
120 ldlm_blocking_callback cb_blocking)
122 struct obd_device *obd = exp->exp_obd;
123 struct lmv_obd *lmv = &obd->u.lmv;
124 struct mds_body *body = NULL;
125 struct ll_fid rpfid = *pfid;
131 /* IT_OPEN is intended to open (and create, possible) an object.
132 * parent (pfid) may be splitted dir */
135 obj = lmv_grab_obj(obd, pfid, 0);
137 /* directory is already splitted, so we have to forward
138 * request to the right MDS */
139 mds = raw_name2idx(obj->objcount, name, len);
140 rpfid = obj->objs[mds].fid;
141 CDEBUG(D_OTHER, "forward to MDS #%u\n", mds);
144 rc = md_intent_lock(lmv->tgts[mds].exp, uctxt, &rpfid, name, len,
145 lmm, lmmsize, cfid, it, flags, reqp, cb_blocking);
151 /* okay, MDS has returned success. probably name has been
152 * resolved in remote inode */
153 rc = lmv_handle_remote_inode(exp, uctxt, lmm, lmmsize, it, flags,
160 /* caller may use attrs MDS returns on IT_OPEN lock request
161 * so, we have to update them for splitted dir */
162 body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
163 LASSERT(body != NULL);
165 obj = lmv_grab_obj(obd, cfid, 0);
166 if (rc == 0 && !obj && (mea = is_body_of_splitted_dir(*reqp, 1))) {
167 /* wow! this is splitted dir, we'd like to handle it */
168 rc = lmv_create_obj_from_attrs(exp, &body->fid1, mea);
170 obj = lmv_grab_obj(obd, cfid, 0);
172 /* this is splitted dir and we'd want to get attrs */
173 CDEBUG(D_OTHER, "attrs from slaves for %lu/%lu/%lu\n",
174 (unsigned long) cfid->mds,
175 (unsigned long) cfid->id,
176 (unsigned long) cfid->generation);
177 rc = lmv_revalidate_slaves(exp, reqp, cfid,
179 } else if (S_ISDIR(body->mode)) {
180 /*CWARN("hmmm, %lu/%lu/%lu has not lmv obj?!\n",
181 (unsigned long) cfid->mds,
182 (unsigned long) cfid->id,
183 (unsigned long) cfid->generation);*/
189 int lmv_intent_getattr(struct obd_export *exp, struct ll_uctxt *uctxt,
190 struct ll_fid *pfid, const char *name, int len,
191 void *lmm, int lmmsize, struct ll_fid *cfid,
192 struct lookup_intent *it, int flags,
193 struct ptlrpc_request **reqp,
194 ldlm_blocking_callback cb_blocking)
196 struct obd_device *obd = exp->exp_obd;
197 struct lmv_obd *lmv = &obd->u.lmv;
198 struct mds_body *body = NULL;
199 struct ll_fid rpfid = *pfid;
200 struct lmv_obj *obj, *obj2;
206 /* caller wants to revalidate attrs of obj
207 * we have to revalidate slaves if requested
208 * object is splitted directory */
209 CDEBUG(D_OTHER, "revalidate attrs for %lu/%lu/%lu\n",
210 (unsigned long) cfid->mds,
211 (unsigned long) cfid->id,
212 (unsigned long) cfid->generation);
214 obj = lmv_grab_obj(obd, cfid, 0);
216 /* in fact, we need not this with current
217 * _intent_lock(), but it may change some day */
218 rpfid = obj->objs[mds].fid;
220 rc = md_intent_lock(lmv->tgts[mds].exp, uctxt, &rpfid, name,
221 len, lmm, lmmsize, cfid, it, flags, reqp,
223 if (obj && rc >= 0) {
224 /* this is splitted dir. in order to optimize things
225 * a bit, we consider obj valid updating missing
226 * parts. FIXME: do we need to return any lock here?
227 * it would be fine if we don't. this means that
228 * nobody should use UPDATE lock to notify about
231 "revalidate slaves for %lu/%lu/%lu, rc %d\n",
232 (unsigned long) cfid->mds,
233 (unsigned long) cfid->id,
234 (unsigned long) cfid->generation, rc);
235 rc = lmv_revalidate_slaves(exp, reqp, cfid, it, rc,
241 CDEBUG(D_OTHER, "INTENT getattr for %*s on %lu/%lu/%lu\n",
242 len, name, (unsigned long) pfid->mds,
243 (unsigned long) pfid->id,
244 (unsigned long) pfid->generation);
247 obj = lmv_grab_obj(obd, pfid, 0);
249 /* directory is already splitted. calculate mds */
250 mds = raw_name2idx(obj->objcount, (char *) name, len);
251 rpfid = obj->objs[mds].fid;
252 CDEBUG(D_OTHER, "forward to MDS #%u (slave %lu/%lu/%lu)\n",
253 mds, (unsigned long) rpfid.mds,
254 (unsigned long) rpfid.id,
255 (unsigned long) rpfid.generation);
257 rc = md_intent_lock(lmv->tgts[mds].exp, uctxt, &rpfid, name,
258 len, lmm, lmmsize, NULL, it, flags, reqp,
264 /* okay, MDS has returned success. probably name has been
265 * resolved in remote inode */
266 rc = lmv_handle_remote_inode(exp, uctxt, lmm, lmmsize, it, flags,
271 body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
272 LASSERT(body != NULL);
274 obj2 = lmv_grab_obj(obd, cfid, 0);
276 if (rc == 0 && !obj2 && (mea = is_body_of_splitted_dir(*reqp, 1))) {
277 /* wow! this is splitted dir, we'd like to handle it */
278 body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
279 LASSERT(body != NULL);
280 rc = lmv_create_obj_from_attrs(exp, &body->fid1, mea);
281 obj2 = lmv_grab_obj(obd, cfid, 0);
285 /* this is splitted dir and we'd want to get attrs */
287 "attrs from slaves for %lu/%lu/%lu, rc %d\n",
288 (unsigned long) cfid->mds,
289 (unsigned long) cfid->id,
290 (unsigned long) cfid->generation, rc);
291 rc = lmv_revalidate_slaves(exp, reqp, cfid,
297 void lmv_update_body_from_obj(struct mds_body *body, struct lmv_inode *obj)
300 body->size += obj->size;
308 int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp)
310 struct obd_device *obd = exp->exp_obd;
311 struct lmv_obd *lmv = &obd->u.lmv;
312 struct mds_body *body = NULL;
313 struct lustre_handle *lockh;
314 struct ldlm_lock *lock;
315 struct mds_body *body2;
316 struct ll_uctxt uctxt;
324 /* master is locked. we'd like to take locks on slaves
325 * and update attributes to be returned from the slaves
326 * it's important that lookup is called in two cases:
327 * - for first time (dcache has no such a resolving yet
328 * - ->d_revalidate() returned false
329 * last case possible only if all the objs (master and
330 * all slaves aren't valid */
332 body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
333 LASSERT(body != NULL);
335 obj = lmv_grab_obj(obd, &body->fid1, 0);
338 CDEBUG(D_OTHER, "lookup slaves for %lu/%lu/%lu\n",
339 (unsigned long) body->fid1.mds,
340 (unsigned long) body->fid1.id,
341 (unsigned long) body->fid1.generation);
345 for (i = 0; i < obj->objcount; i++) {
346 struct ll_fid fid = obj->objs[i].fid;
347 struct ptlrpc_request *req = NULL;
348 struct lookup_intent it;
350 if (fid_equal(&fid, &obj->fid)) {
351 /* skip master obj */
355 CDEBUG(D_OTHER, "lookup slave %lu/%lu/%lu\n",
356 (unsigned long) fid.mds,
357 (unsigned long) fid.id,
358 (unsigned long) fid.generation);
361 memset(&it, 0, sizeof(it));
362 it.it_op = IT_GETATTR;
363 rc = md_intent_lock(lmv->tgts[fid.mds].exp, &uctxt, &fid,
364 NULL, 0, NULL, 0, &fid, &it, 0, &req,
365 lmv_dirobj_blocking_ast);
366 lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle;
368 /* nice, this slave is valid */
369 LASSERT(req == NULL);
370 CDEBUG(D_OTHER, "cached\n");
375 /* error during revalidation */
379 /* rc == 0, this means we have no such a lock and can't
380 * think obj is still valid. lookup it again */
381 LASSERT(req == NULL);
383 memset(&it, 0, sizeof(it));
384 it.it_op = IT_GETATTR;
385 rc = md_intent_lock(lmv->tgts[fid.mds].exp, &uctxt, &fid,
386 NULL, 0, NULL, 0, NULL, &it, 0, &req,
387 lmv_dirobj_blocking_ast);
388 lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle;
391 /* error during lookup */
395 lock = ldlm_handle2lock(lockh);
397 lock->l_ast_data = obj;
398 atomic_inc(&obj->count);
400 body2 = lustre_msg_buf(req->rq_repmsg, 1, sizeof(*body2));
403 obj->objs[i].size = body2->size;
404 CDEBUG(D_OTHER, "fresh: %lu\n",
405 (unsigned long) obj->objs[i].size);
410 ptlrpc_req_finished(req);
412 lmv_update_body_from_obj(body, obj->objs + i);
413 if (it.d.lustre.it_lock_mode)
414 ldlm_lock_decref(lockh, it.d.lustre.it_lock_mode);
420 int lmv_intent_lookup(struct obd_export *exp, struct ll_uctxt *uctxt,
421 struct ll_fid *pfid, const char *name, int len,
422 void *lmm, int lmmsize, struct ll_fid *cfid,
423 struct lookup_intent *it, int flags,
424 struct ptlrpc_request **reqp,
425 ldlm_blocking_callback cb_blocking)
427 struct obd_device *obd = exp->exp_obd;
428 struct lmv_obd *lmv = &obd->u.lmv;
429 struct mds_body *body = NULL;
430 struct ll_fid rpfid = *pfid;
436 /* IT_LOOKUP is intended to produce name -> fid resolving
437 * (let's call this lookup below) or to confirm requested
438 * resolving is still valid (let's call this revalidation)
439 * cfid != NULL specifies revalidation */
442 /* this is revalidation: we have to check is LOOKUP
443 * lock still valid for given fid. very important
444 * part is that we have to choose right mds because
445 * namespace is per mds */
447 obj = lmv_grab_obj(obd, pfid, 0);
449 mds = raw_name2idx(obj->objcount, (char *) name, len);
450 rpfid = obj->objs[mds].fid;
454 CDEBUG(D_OTHER, "revalidate lookup for %lu/%lu/%lu to %d MDS\n",
455 (unsigned long) cfid->mds,
456 (unsigned long) cfid->id,
457 (unsigned long) cfid->generation, mds);
458 rc = md_intent_lock(lmv->tgts[mds].exp, uctxt, pfid, name,
459 len, lmm, lmmsize, cfid, it, flags,
466 /* this is lookup. during lookup we have to update all the
467 * attributes, because returned values will be put in struct
470 obj = lmv_grab_obj(obd, pfid, 0);
472 /* directory is already splitted. calculate mds */
473 mds = raw_name2idx(obj->objcount, (char *) name, len);
474 rpfid = obj->objs[mds].fid;
478 rc = md_intent_lock(lmv->tgts[mds].exp, uctxt, &rpfid, name,
479 len, lmm, lmmsize, NULL, it, flags, reqp,
482 /* very interesting. it seems object is still valid
483 * but for some reason llite calls lookup, not revalidate */
484 CWARN("lookup for %lu/%lu/%lu and data should be uptodate\n",
485 (unsigned long) rpfid.mds,
486 (unsigned long) rpfid.id,
487 (unsigned long) rpfid.generation);
488 LASSERT(*reqp == NULL);
492 if (rc == 0 && *reqp == NULL) {
493 /* once again, we're asked for lookup, not revalidate */
494 CWARN("lookup for %lu/%lu/%lu and data should be uptodate\n",
495 (unsigned long) rpfid.mds,
496 (unsigned long) rpfid.id,
497 (unsigned long) rpfid.generation);
502 /* directory got splitted since last update. this shouldn't
503 * be becasue splitting causes lock revocation, so revalidate
504 * had to fail and lookup on dir had to return mea */
505 CWARN("we haven't knew about directory splitting!\n");
506 LASSERT(obj == NULL);
507 rc = lmv_create_obj_from_attrs(exp, &rpfid, NULL);
516 /* okay, MDS has returned success. probably name has been
517 * resolved in remote inode */
518 rc = lmv_handle_remote_inode(exp, uctxt, lmm, lmmsize, it, flags,
521 if (rc == 0 && (mea = is_body_of_splitted_dir(*reqp, 1))) {
522 /* wow! this is splitted dir, we'd like to handle it */
523 body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
524 LASSERT(body != NULL);
525 obj = lmv_grab_obj(obd, &body->fid1, 0);
527 rc = lmv_create_obj_from_attrs(exp, &body->fid1, mea);
534 int lmv_intent_lock(struct obd_export *exp, struct ll_uctxt *uctxt,
535 struct ll_fid *pfid, const char *name, int len,
536 void *lmm, int lmmsize, struct ll_fid *cfid,
537 struct lookup_intent *it, int flags,
538 struct ptlrpc_request **reqp,
539 ldlm_blocking_callback cb_blocking)
541 struct obd_device *obd = exp->exp_obd;
548 CDEBUG(D_OTHER, "INTENT LOCK '%s' for '%*s' on %lu/%lu -> %u\n",
549 LL_IT2STR(it), len, name, (unsigned long) pfid->id,
550 (unsigned long) pfid->generation, pfid->mds);
553 if (it->it_op == IT_LOOKUP)
554 rc = lmv_intent_lookup(exp, uctxt, pfid, name, len, lmm,
555 lmmsize, cfid, it, flags, reqp,
557 else if (it->it_op & IT_OPEN)
558 rc = lmv_intent_open(exp, uctxt, pfid, name, len, lmm,
559 lmmsize, cfid, it, flags, reqp,
561 else if (it->it_op == IT_GETATTR)
562 rc = lmv_intent_getattr(exp, uctxt, pfid, name, len, lmm,
563 lmmsize, cfid, it, flags, reqp,
570 int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp,
571 struct ll_fid *mfid, struct lookup_intent *oit,
572 int master_valid, ldlm_blocking_callback cb_blocking)
574 struct obd_device *obd = exp->exp_obd;
575 struct ptlrpc_request *mreq = *reqp;
576 struct lmv_obd *lmv = &obd->u.lmv;
577 struct lustre_handle master_lockh;
578 unsigned long size = 0;
579 struct ldlm_lock *lock;
580 struct mds_body *body;
581 struct ll_uctxt uctxt;
583 int master_lock_mode;
587 /* we have to loop over the subobjects, check validity and update
588 * them from MDSs if needed. it's very useful that we need not to
589 * update all the fields. say, common fields (that are equal on
590 * all the subojects need not to be update, another fields (i_size,
591 * for example) are cached all the time */
592 obj = lmv_grab_obj(obd, mfid, 0);
595 master_lock_mode = 0;
598 for (i = 0; i < obj->objcount; i++) {
599 struct ll_fid fid = obj->objs[i].fid;
600 struct lustre_handle *lockh = NULL;
601 struct ptlrpc_request *req = NULL;
602 ldlm_blocking_callback cb;
603 struct lookup_intent it;
606 CDEBUG(D_OTHER, "revalidate subobj %lu/%lu/%lu\n",
607 (unsigned long) fid.mds,
608 (unsigned long) fid.id,
609 (unsigned long) fid.generation);
611 memset(&it, 0, sizeof(it));
612 it.it_op = IT_GETATTR;
613 cb = lmv_dirobj_blocking_ast;
615 if (fid_equal(&fid, &obj->fid)) {
617 /* lmv_intent_getattr() already checked
618 * validness and took the lock */
620 /* it even got the reply
621 * refresh attrs from that reply */
622 body = lustre_msg_buf(mreq->rq_repmsg,
624 LASSERT(body != NULL);
627 /* take already cached attrs into account */
629 "master is locked and cached\n");
637 rc = md_intent_lock(lmv->tgts[fid.mds].exp, &uctxt, &fid,
638 NULL, 0, NULL, 0, &fid, &it, 0, &req, cb);
639 lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle;
641 /* nice, this slave is valid */
642 LASSERT(req == NULL);
643 CDEBUG(D_OTHER, "cached\n");
648 /* error during revalidation */
652 /* rc == 0, this means we have no such a lock and can't
653 * think obj is still valid. lookup it again */
654 LASSERT(req == NULL);
656 memset(&it, 0, sizeof(it));
657 it.it_op = IT_GETATTR;
658 rc = md_intent_lock(lmv->tgts[fid.mds].exp, &uctxt, &fid,
659 NULL, 0, NULL, 0, NULL, &it, 0, &req, cb);
660 lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle;
663 /* error during lookup */
668 LASSERT(master_valid == 0);
669 /* save lock on master to be returned to the caller */
670 CDEBUG(D_OTHER, "no lock on master yet\n");
671 memcpy(&master_lockh, lockh, sizeof(master_lockh));
672 master_lock_mode = it.d.lustre.it_lock_mode;
673 it.d.lustre.it_lock_mode = 0;
675 /* this is slave. we want to control it */
676 lock = ldlm_handle2lock(lockh);
678 lock->l_ast_data = obj;
679 atomic_inc(&obj->count);
684 /* this is first reply, we'll use it to return
685 * updated data back to the caller */
687 ptlrpc_request_addref(req);
692 body = lustre_msg_buf(req->rq_repmsg, 1, sizeof(*body));
696 obj->objs[i].size = body->size;
697 CDEBUG(D_OTHER, "fresh: %lu\n",
698 (unsigned long) obj->objs[i].size);
701 ptlrpc_req_finished(req);
703 size += obj->objs[i].size;
704 if (it.d.lustre.it_lock_mode)
705 ldlm_lock_decref(lockh, it.d.lustre.it_lock_mode);
709 /* some attrs got refreshed, we have reply and it's time
710 * to put fresh attrs to it */
711 CDEBUG(D_OTHER, "return refreshed attrs: size = %lu\n",
712 (unsigned long) size);
713 body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
715 /* FIXME: what about another attributes? */
718 /* very important to maintain lli->mds the same
719 * because of revalidation. mreq == NULL means
720 * that caller has no reply and the only attr
721 * we can return is size */
722 body->valid = OBD_MD_FLSIZE;
723 body->mds = obj->fid.mds;
725 if (master_valid == 0) {
726 memcpy(&oit->d.lustre.it_lock_handle,
727 &master_lockh, sizeof(master_lockh));
728 oit->d.lustre.it_lock_mode = master_lock_mode;
732 /* it seems all the attrs are fresh and we did no request */
733 CDEBUG(D_OTHER, "all the attrs were fresh\n");
734 if (master_valid == 0)
735 oit->d.lustre.it_lock_mode = master_lock_mode;