1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2002, 2003, 2004, 2005, 2006 Cluster File Systems, Inc.
6 * This file is part of Lustre, http://www.lustre.org.
8 * Lustre is free software; you can redistribute it and/or
9 * modify it under the terms of version 2 of the GNU General Public
10 * License as published by the Free Software Foundation.
12 * Lustre is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with Lustre; if not, write to the Free Software
19 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 # define EXPORT_SYMTAB
25 #define DEBUG_SUBSYSTEM S_LMV
27 #include <linux/slab.h>
28 #include <linux/module.h>
29 #include <linux/init.h>
30 #include <linux/slab.h>
31 #include <linux/pagemap.h>
32 #include <asm/div64.h>
33 #include <linux/seq_file.h>
34 #include <linux/namei.h>
36 #include <liblustre.h>
39 #include <lustre/lustre_idl.h>
40 #include <obd_support.h>
41 #include <lustre_lib.h>
42 #include <lustre_net.h>
43 #include <lustre_dlm.h>
44 #include <obd_class.h>
45 #include <lprocfs_status.h>
46 #include "lmv_internal.h"
48 static inline void lmv_drop_intent_lock(struct lookup_intent *it)
50 if (it->d.lustre.it_lock_mode != 0)
51 ldlm_lock_decref((void *)&it->d.lustre.it_lock_handle,
52 it->d.lustre.it_lock_mode);
55 int lmv_intent_remote(struct obd_export *exp, void *lmm,
56 int lmmsize, struct lookup_intent *it,
57 int flags, struct ptlrpc_request **reqp,
58 ldlm_blocking_callback cb_blocking,
61 struct obd_device *obd = exp->exp_obd;
62 struct lmv_obd *lmv = &obd->u.lmv;
63 struct ptlrpc_request *req = NULL;
64 struct mdt_body *body = NULL;
65 struct lustre_handle plock;
66 struct md_op_data *op_data;
67 struct obd_export *tgt_exp;
71 body = lustre_msg_buf((*reqp)->rq_repmsg,
72 DLM_REPLY_REC_OFF, sizeof(*body));
73 LASSERT(body != NULL);
75 if (!(body->valid & OBD_MD_MDS))
79 * oh, MDS reports that this is remote inode case i.e. we have to ask
80 * for real attrs on another MDS.
82 if (it->it_op & IT_LOOKUP) {
84 * unfortunately, we have to lie to MDC/MDS to retrieve
85 * attributes llite needs.
87 it->it_op = IT_GETATTR;
90 /* we got LOOKUP lock, but we really need attrs */
91 pmode = it->d.lustre.it_lock_mode;
93 memcpy(&plock, &it->d.lustre.it_lock_handle,
95 it->d.lustre.it_lock_mode = 0;
96 it->d.lustre.it_data = 0;
99 LASSERT(fid_is_sane(&body->fid1));
101 it->d.lustre.it_disposition &= ~DISP_ENQ_COMPLETE;
103 OBD_ALLOC_PTR(op_data);
105 GOTO(out, rc = -ENOMEM);
107 op_data->fid1 = body->fid1;
109 tgt_exp = lmv_get_export(lmv, &body->fid1);
111 RETURN(PTR_ERR(tgt_exp));
113 rc = md_intent_lock(tgt_exp, op_data, lmm, lmmsize, it, flags,
114 &req, cb_blocking, extra_lock_flags);
117 * llite needs LOOKUP lock to track dentry revocation in order to
118 * maintain dcache consistency. Thus drop UPDATE lock here and put
122 lmv_drop_intent_lock(it);
123 memcpy(&it->d.lustre.it_lock_handle, &plock,
125 it->d.lustre.it_lock_mode = pmode;
128 OBD_FREE_PTR(op_data);
132 ldlm_lock_decref(&plock, pmode);
134 ptlrpc_req_finished(*reqp);
140 * IT_OPEN is intended to open (and create, possible) an object. Parent (pid)
143 int lmv_intent_open(struct obd_export *exp, struct md_op_data *op_data,
144 void *lmm, int lmmsize, struct lookup_intent *it,
145 int flags, struct ptlrpc_request **reqp,
146 ldlm_blocking_callback cb_blocking,
147 int extra_lock_flags)
149 struct obd_device *obd = exp->exp_obd;
150 struct lu_fid rpid = op_data->fid1;
151 struct lmv_obd *lmv = &obd->u.lmv;
152 struct mdt_body *body = NULL;
153 struct md_op_data *sop_data;
154 struct lmv_stripe_md *mea;
160 OBD_ALLOC_PTR(sop_data);
161 if (sop_data == NULL)
164 /* save op_data fro repeat case */
165 *sop_data = *op_data;
168 LASSERT(++loop <= 2);
169 rc = lmv_fld_lookup(lmv, &rpid, &mds);
171 GOTO(out_free_sop_data, rc);
172 obj = lmv_obj_grab(obd, &rpid);
175 * Directory is already split, so we have to forward request to
178 mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
179 (char *)op_data->name, op_data->namelen);
181 rpid = obj->lo_inodes[mds].li_fid;
182 rc = lmv_fld_lookup(lmv, &rpid, &mds);
185 GOTO(out_free_sop_data, rc);
187 CDEBUG(D_OTHER, "forward to MDS #"LPU64" ("DFID")\n",
191 sop_data->fid1 = rpid;
193 rc = md_intent_lock(lmv->tgts[mds].ltd_exp, sop_data,
194 lmm, lmmsize, it, flags, reqp,
195 cb_blocking, extra_lock_flags);
196 if (rc == -ERESTART) {
198 * Directory got split. Time to update local object and repeat
199 * the request with proper MDS.
201 LASSERT(lu_fid_eq(&op_data->fid1, &rpid));
202 rc = lmv_handle_split(exp, &rpid);
204 ptlrpc_req_finished(*reqp);
209 GOTO(out_free_sop_data, rc);
212 * Okay, MDS has returned success. Probably name has been resolved in
215 rc = lmv_intent_remote(exp, lmm, lmmsize, it, flags, reqp,
216 cb_blocking, extra_lock_flags);
221 * This is possible, that some userspace application will try to
222 * open file as directory and we will have -ENOTDIR here. As
223 * this is "usual" situation, we should not print error here,
226 CDEBUG(D_OTHER, "can't handle remote %s: dir "DFID"("DFID"):"
227 "%*s: %d\n", LL_IT2STR(it), PFID(&op_data->fid2),
228 PFID(&rpid), op_data->namelen, op_data->name, rc);
229 GOTO(out_free_sop_data, rc);
233 * nothing is found, do not access body->fid1 as it is zero and thus
236 if ((it->d.lustre.it_disposition & DISP_LOOKUP_NEG) &&
237 !(it->d.lustre.it_disposition & DISP_OPEN_CREATE) &&
238 !(it->d.lustre.it_disposition & DISP_OPEN_OPEN))
239 GOTO(out_free_sop_data, rc = 0);
241 /* caller may use attrs MDS returns on IT_OPEN lock request so, we have
242 * to update them for split dir */
243 body = lustre_msg_buf((*reqp)->rq_repmsg,
244 DLM_REPLY_REC_OFF, sizeof(*body));
245 LASSERT(body != NULL);
247 /* could not find object, FID is not present in response. */
248 if (!(body->valid & OBD_MD_FLID))
249 GOTO(out_free_sop_data, rc = 0);
251 obj = lmv_obj_grab(obd, &body->fid1);
252 if (!obj && (mea = lmv_get_mea(*reqp, DLM_REPLY_REC_OFF))) {
253 /* wow! this is split dir, we'd like to handle it */
254 obj = lmv_obj_create(exp, &body->fid1, mea);
256 GOTO(out_free_sop_data, rc = (int)PTR_ERR(obj));
260 /* this is split dir and we'd want to get attrs */
261 CDEBUG(D_OTHER, "attrs from slaves for "DFID"\n",
264 rc = lmv_revalidate_slaves(exp, reqp, &body->fid1, it, 1,
265 cb_blocking, extra_lock_flags);
266 } else if (S_ISDIR(body->mode)) {
267 CDEBUG(D_OTHER, "object "DFID" has not lmv obj?\n",
276 OBD_FREE_PTR(sop_data);
280 int lmv_intent_getattr(struct obd_export *exp, struct md_op_data *op_data,
281 void *lmm, int lmmsize, struct lookup_intent *it,
282 int flags, struct ptlrpc_request **reqp,
283 ldlm_blocking_callback cb_blocking,
284 int extra_lock_flags)
286 struct lmv_obj *obj = NULL, *obj2 = NULL;
287 struct obd_device *obd = exp->exp_obd;
288 struct lu_fid rpid = op_data->fid1;
289 struct lmv_obd *lmv = &obd->u.lmv;
290 struct mdt_body *body = NULL;
291 struct md_op_data *sop_data;
292 struct lmv_stripe_md *mea;
297 OBD_ALLOC_PTR(sop_data);
298 if (sop_data == NULL)
301 /* save op_data fro repeat case */
302 *sop_data = *op_data;
304 if (fid_is_sane(&op_data->fid2)) {
306 * Caller wants to revalidate attrs of obj we have to revalidate
307 * slaves if requested object is split directory.
309 CDEBUG(D_OTHER, "revalidate attrs for "DFID"\n",
310 PFID(&op_data->fid2));
312 rc = lmv_fld_lookup(lmv, &op_data->fid2, &mds);
314 GOTO(out_free_sop_data, rc);
317 * In fact, we do not need this with current intent_lock(), but
318 * it may change some day.
320 obj = lmv_obj_grab(obd, &op_data->fid2);
322 if (!lu_fid_eq(&op_data->fid1, &op_data->fid2)){
323 rpid = obj->lo_inodes[mds].li_fid;
324 rc = lmv_fld_lookup(lmv, &rpid, &mds);
327 GOTO(out_free_sop_data, rc);
334 CDEBUG(D_OTHER, "INTENT getattr for %*s on "DFID"\n",
335 op_data->namelen, op_data->name,
336 PFID(&op_data->fid1));
338 rc = lmv_fld_lookup(lmv, &op_data->fid1, &mds);
340 GOTO(out_free_sop_data, rc);
341 obj = lmv_obj_grab(obd, &op_data->fid1);
342 if (obj && op_data->namelen) {
343 /* directory is already split. calculate mds */
344 mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
345 (char *)op_data->name,
348 rpid = obj->lo_inodes[mds].li_fid;
349 rc = lmv_fld_lookup(lmv, &rpid, &mds);
352 GOTO(out_free_sop_data, rc);
356 CDEBUG(D_OTHER, "forward to MDS #"LPU64" (slave "DFID")\n",
361 sop_data->fid1 = rpid;
363 rc = md_intent_lock(lmv->tgts[mds].ltd_exp, sop_data, lmm,
364 lmmsize, it, flags, reqp, cb_blocking,
367 GOTO(out_free_sop_data, rc);
371 * This is split dir. In order to optimize things a bit, we
372 * consider obj valid updating missing parts.
374 * FIXME: do we need to return any lock here? It would be fine
375 * if we don't. This means that nobody should use UPDATE lock to
376 * notify about object * removal.
379 "revalidate slaves for "DFID", rc %d\n",
380 PFID(&op_data->fid2), rc);
382 LASSERT(fid_is_sane(&op_data->fid2));
383 rc = lmv_revalidate_slaves(exp, reqp, &op_data->fid2, it, rc,
384 cb_blocking, extra_lock_flags);
385 GOTO(out_free_sop_data, rc);
389 GOTO(out_free_sop_data, rc);
392 * okay, MDS has returned success. Probably name has been resolved in
395 rc = lmv_intent_remote(exp, lmm, lmmsize, it, flags,
396 reqp, cb_blocking, extra_lock_flags);
398 GOTO(out_free_sop_data, rc);
401 * Nothing is found, do not access body->fid1 as it is zero and thus
404 if (it->d.lustre.it_disposition & DISP_LOOKUP_NEG)
405 GOTO(out_free_sop_data, rc = 0);
408 LASSERT((*reqp)->rq_repmsg);
409 body = lustre_msg_buf((*reqp)->rq_repmsg,
410 DLM_REPLY_REC_OFF, sizeof(*body));
411 LASSERT(body != NULL);
413 /* could not find object, FID is not present in response. */
414 if (!(body->valid & OBD_MD_FLID))
415 GOTO(out_free_sop_data, rc = 0);
417 obj2 = lmv_obj_grab(obd, &body->fid1);
419 if (!obj2 && (mea = lmv_get_mea(*reqp, DLM_REPLY_REC_OFF))) {
420 /* wow! this is split dir, we'd like to handle it. */
421 obj2 = lmv_obj_create(exp, &body->fid1, mea);
423 GOTO(out_free_sop_data, rc = (int)PTR_ERR(obj2));
427 /* this is split dir and we'd want to get attrs */
428 CDEBUG(D_OTHER, "attrs from slaves for "DFID", rc %d\n",
429 PFID(&body->fid1), rc);
431 rc = lmv_revalidate_slaves(exp, reqp, &body->fid1, it, 1,
432 cb_blocking, extra_lock_flags);
438 OBD_FREE_PTR(sop_data);
442 void lmv_update_body(struct mdt_body *body, struct lmv_inode *lino)
445 body->size += lino->li_size;
448 /* this is not used currently */
449 int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp)
451 struct obd_device *obd = exp->exp_obd;
452 struct lmv_obd *lmv = &obd->u.lmv;
453 struct mdt_body *body = NULL;
454 struct lustre_handle *lockh;
455 struct md_op_data *op_data;
456 struct ldlm_lock *lock;
457 struct mdt_body *body2;
465 /* master is locked. we'd like to take locks on slaves and update
466 * attributes to be returned from the slaves it's important that lookup
467 * is called in two cases:
469 * - for first time (dcache has no such a resolving yet). -
470 * ->d_revalidate() returned false.
472 * last case possible only if all the objs (master and all slaves aren't
475 body = lustre_msg_buf((*reqp)->rq_repmsg,
476 DLM_REPLY_REC_OFF, sizeof(*body));
477 LASSERT(body != NULL);
478 LASSERT((body->valid & OBD_MD_FLID) != 0);
480 obj = lmv_obj_grab(obd, &body->fid1);
481 LASSERT(obj != NULL);
483 CDEBUG(D_OTHER, "lookup slaves for "DFID"\n",
486 OBD_ALLOC_PTR(op_data);
492 for (i = 0; i < obj->lo_objcount; i++) {
493 struct lu_fid fid = obj->lo_inodes[i].li_fid;
494 struct ptlrpc_request *req = NULL;
495 struct obd_export *tgt_exp;
496 struct lookup_intent it;
498 if (lu_fid_eq(&fid, &obj->lo_fid))
499 /* skip master obj */
502 CDEBUG(D_OTHER, "lookup slave "DFID"\n", PFID(&fid));
505 memset(&it, 0, sizeof(it));
506 it.it_op = IT_GETATTR;
508 memset(op_data, 0, sizeof(*op_data));
512 tgt_exp = lmv_get_export(lmv, &fid);
514 GOTO(cleanup, rc = PTR_ERR(tgt_exp));
516 rc = md_intent_lock(tgt_exp, op_data, NULL, 0, &it, 0, &req,
517 lmv_blocking_ast, 0);
519 lockh = (struct lustre_handle *)&it.d.lustre.it_lock_handle;
520 if (rc > 0 && req == NULL) {
521 /* nice, this slave is valid */
522 LASSERT(req == NULL);
523 CDEBUG(D_OTHER, "cached\n");
528 /* error during lookup */
531 lock = ldlm_handle2lock(lockh);
534 lock->l_ast_data = lmv_obj_get(obj);
536 body2 = lustre_msg_buf(req->rq_repmsg,
537 DLM_REPLY_REC_OFF, sizeof(*body2));
540 obj->lo_inodes[i].li_size = body2->size;
542 CDEBUG(D_OTHER, "fresh: %lu\n",
543 (unsigned long)obj->lo_inodes[i].li_size);
548 ptlrpc_req_finished(req);
550 lmv_update_body(body, obj->lo_inodes + i);
552 if (it.d.lustre.it_lock_mode)
553 ldlm_lock_decref(lockh, it.d.lustre.it_lock_mode);
558 OBD_FREE_PTR(op_data);
564 int lmv_intent_lookup(struct obd_export *exp, struct md_op_data *op_data,
565 void *lmm, int lmmsize, struct lookup_intent *it,
566 int flags, struct ptlrpc_request **reqp,
567 ldlm_blocking_callback cb_blocking,
568 int extra_lock_flags)
570 struct obd_device *obd = exp->exp_obd;
571 struct lu_fid rpid = op_data->fid1;
572 struct lmv_obd *lmv = &obd->u.lmv;
573 struct mdt_body *body = NULL;
574 struct md_op_data *sop_data;
575 struct lmv_stripe_md *mea;
581 OBD_ALLOC_PTR(sop_data);
582 if (sop_data == NULL)
585 /* save op_data fro repeat case */
586 *sop_data = *op_data;
589 * IT_LOOKUP is intended to produce name -> fid resolving (let's call
590 * this lookup below) or to confirm requested resolving is still valid
591 * (let's call this revalidation) fid_is_sane(&sop_data->fid2) specifies
594 if (fid_is_sane(&op_data->fid2)) {
596 * This is revalidate: we have to check is LOOKUP lock still
597 * valid for given fid. Very important part is that we have to
598 * choose right mds because namespace is per mds.
600 rpid = op_data->fid1;
601 obj = lmv_obj_grab(obd, &rpid);
603 mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
604 (char *)op_data->name,
606 rpid = obj->lo_inodes[mds].li_fid;
609 rc = lmv_fld_lookup(lmv, &rpid, &mds);
611 GOTO(out_free_sop_data, rc);
613 CDEBUG(D_OTHER, "revalidate lookup for "DFID" to #"LPU64" MDS\n",
614 PFID(&op_data->fid2), mds);
616 rc = lmv_fld_lookup(lmv, &op_data->fid1, &mds);
618 GOTO(out_free_sop_data, rc);
620 LASSERT(++loop <= 2);
623 * This is lookup. During lookup we have to update all the
624 * attributes, because returned values will be put in struct
627 obj = lmv_obj_grab(obd, &op_data->fid1);
629 if (op_data->namelen) {
630 /* directory is already split. calculate mds */
631 mds = raw_name2idx(obj->lo_hashtype,
633 (char *)op_data->name,
635 rpid = obj->lo_inodes[mds].li_fid;
636 rc = lmv_fld_lookup(lmv, &rpid, &mds);
639 GOTO(out_free_sop_data, rc);
644 fid_zero(&op_data->fid2);
647 sop_data->fid1 = rpid;
649 rc = md_intent_lock(lmv->tgts[mds].ltd_exp, sop_data, lmm, lmmsize,
650 it, flags, reqp, cb_blocking, extra_lock_flags);
652 LASSERT(fid_is_sane(&op_data->fid2));
653 GOTO(out_free_sop_data, rc);
657 * Very interesting. it seems object is still valid but for some
658 * reason llite calls lookup, not revalidate.
660 CDEBUG(D_OTHER, "lookup for "DFID" and data should be uptodate\n",
662 LASSERT(*reqp == NULL);
663 GOTO(out_free_sop_data, rc);
666 if (rc == 0 && *reqp == NULL) {
667 /* once again, we're asked for lookup, not revalidate */
668 CDEBUG(D_OTHER, "lookup for "DFID" and data should be uptodate\n",
670 GOTO(out_free_sop_data, rc);
673 if (rc == -ERESTART) {
675 * Directory got split since last update. This shouldn't be
676 * becasue splitting causes lock revocation, so revalidate had
677 * to fail and lookup on dir had to return mea.
679 CWARN("we haven't knew about directory splitting!\n");
680 LASSERT(obj == NULL);
682 obj = lmv_obj_create(exp, &rpid, NULL);
684 RETURN((int)PTR_ERR(obj));
690 GOTO(out_free_sop_data, rc);
693 * Okay, MDS has returned success. Probably name has been resolved in
696 rc = lmv_intent_remote(exp, lmm, lmmsize, it, flags, reqp,
697 cb_blocking, extra_lock_flags);
699 if (rc == 0 && (mea = lmv_get_mea(*reqp, DLM_REPLY_REC_OFF))) {
700 /* wow! this is split dir, we'd like to handle it */
701 body = lustre_msg_buf((*reqp)->rq_repmsg,
702 DLM_REPLY_REC_OFF, sizeof(*body));
703 LASSERT(body != NULL);
704 LASSERT((body->valid & OBD_MD_FLID) != 0);
706 obj = lmv_obj_grab(obd, &body->fid1);
708 obj = lmv_obj_create(exp, &body->fid1, mea);
710 GOTO(out_free_sop_data, rc = (int)PTR_ERR(obj));
717 OBD_FREE_PTR(sop_data);
721 int lmv_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
722 void *lmm, int lmmsize, struct lookup_intent *it,
723 int flags, struct ptlrpc_request **reqp,
724 ldlm_blocking_callback cb_blocking,
725 int extra_lock_flags)
727 struct obd_device *obd = exp->exp_obd;
732 LASSERT(fid_is_sane(&op_data->fid1));
734 CDEBUG(D_OTHER, "INTENT LOCK '%s' for '%*s' on "DFID"\n",
735 LL_IT2STR(it), op_data->namelen, op_data->name,
736 PFID(&op_data->fid1));
738 rc = lmv_check_connect(obd);
742 if (it->it_op & IT_LOOKUP)
743 rc = lmv_intent_lookup(exp, op_data, lmm, lmmsize, it,
744 flags, reqp, cb_blocking,
746 else if (it->it_op & IT_OPEN)
747 rc = lmv_intent_open(exp, op_data, lmm, lmmsize, it,
748 flags, reqp, cb_blocking,
750 else if (it->it_op & IT_GETATTR)
751 rc = lmv_intent_getattr(exp, op_data,lmm, lmmsize, it,
752 flags, reqp, cb_blocking,
759 int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp,
760 const struct lu_fid *mid, struct lookup_intent *oit,
761 int master_valid, ldlm_blocking_callback cb_blocking,
762 int extra_lock_flags)
764 struct obd_device *obd = exp->exp_obd;
765 struct ptlrpc_request *mreq = *reqp;
766 struct lmv_obd *lmv = &obd->u.lmv;
767 struct lustre_handle master_lockh;
768 struct obd_export *tgt_exp;
769 struct md_op_data *op_data;
770 struct ldlm_lock *lock;
771 unsigned long size = 0;
772 struct mdt_body *body;
774 int master_lock_mode;
778 OBD_ALLOC_PTR(op_data);
782 /* we have to loop over the subobjects, check validity and update them
783 * from MDSs if needed. it's very useful that we need not to update all
784 * the fields. say, common fields (that are equal on all the subojects
785 * need not to be update, another fields (i_size, for example) are
786 * cached all the time */
787 obj = lmv_obj_grab(obd, mid);
788 LASSERT(obj != NULL);
790 master_lock_mode = 0;
794 for (i = 0; i < obj->lo_objcount; i++) {
795 struct lu_fid fid = obj->lo_inodes[i].li_fid;
796 struct lustre_handle *lockh = NULL;
797 struct ptlrpc_request *req = NULL;
798 ldlm_blocking_callback cb;
799 struct lookup_intent it;
802 CDEBUG(D_OTHER, "revalidate subobj "DFID"\n",
805 memset(op_data, 0, sizeof(*op_data));
806 memset(&it, 0, sizeof(it));
807 it.it_op = IT_GETATTR;
809 cb = lmv_blocking_ast;
811 if (lu_fid_eq(&fid, &obj->lo_fid)) {
813 /* lmv_intent_getattr() already checked
814 * validness and took the lock */
816 /* it even got the reply refresh attrs
818 body = lustre_msg_buf(mreq->rq_repmsg,
821 LASSERT(body != NULL);
824 /* take already cached attrs into account */
826 "master is locked and cached\n");
837 tgt_exp = lmv_get_export(lmv, &fid);
839 GOTO(out_free_op_data, rc = PTR_ERR(tgt_exp));
841 rc = md_intent_lock(tgt_exp, op_data, NULL, 0, &it, 0, &req, cb,
844 lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle;
845 if (rc > 0 && req == NULL) {
846 /* nice, this slave is valid */
847 LASSERT(req == NULL);
848 CDEBUG(D_OTHER, "cached\n");
853 /* error during revalidation */
857 LASSERT(master_valid == 0);
858 /* save lock on master to be returned to the caller */
859 CDEBUG(D_OTHER, "no lock on master yet\n");
860 memcpy(&master_lockh, lockh, sizeof(master_lockh));
861 master_lock_mode = it.d.lustre.it_lock_mode;
862 it.d.lustre.it_lock_mode = 0;
864 /* this is slave. we want to control it */
865 lock = ldlm_handle2lock(lockh);
867 lock->l_ast_data = lmv_obj_get(obj);
872 /* this is first reply, we'll use it to return updated
873 * data back to the caller */
875 ptlrpc_request_addref(req);
880 body = lustre_msg_buf(req->rq_repmsg,
881 DLM_REPLY_REC_OFF, sizeof(*body));
885 obj->lo_inodes[i].li_size = (MAX_HASH_SIZE/obj->lo_objcount) *
888 CDEBUG(D_OTHER, "fresh: %lu\n",
889 (unsigned long)obj->lo_inodes[i].li_size);
892 ptlrpc_req_finished(req);
894 size += obj->lo_inodes[i].li_size;
896 if (it.d.lustre.it_lock_mode)
897 ldlm_lock_decref(lockh, it.d.lustre.it_lock_mode);
901 /* some attrs got refreshed, we have reply and it's time to put
902 * fresh attrs to it */
903 CDEBUG(D_OTHER, "return refreshed attrs: size = %lu\n",
904 (unsigned long)size);
906 body = lustre_msg_buf((*reqp)->rq_repmsg,
907 DLM_REPLY_REC_OFF, sizeof(*body));
914 * very important to maintain mds num the same because
915 * of revalidation. mreq == NULL means that caller has
916 * no reply and the only attr we can return is size.
918 body->valid = OBD_MD_FLSIZE;
921 rc = lmv_fld_lookup(lmv, &obj->lo_fid, &body->mds);
926 if (master_valid == 0) {
927 memcpy(&oit->d.lustre.it_lock_handle,
928 &master_lockh, sizeof(master_lockh));
929 oit->d.lustre.it_lock_mode = master_lock_mode;
933 /* it seems all the attrs are fresh and we did no request */
934 CDEBUG(D_OTHER, "all the attrs were fresh\n");
935 if (master_valid == 0)
936 oit->d.lustre.it_lock_mode = master_lock_mode;
945 OBD_FREE_PTR(op_data);