1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
6 * This file is part of Lustre, http://www.lustre.org.
8 * Lustre is free software; you can redistribute it and/or
9 * modify it under the terms of version 2 of the GNU General Public
10 * License as published by the Free Software Foundation.
12 * Lustre is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with Lustre; if not, write to the Free Software
19 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 # define EXPORT_SYMTAB
25 #define DEBUG_SUBSYSTEM S_LMV
27 #include <linux/slab.h>
28 #include <linux/module.h>
29 #include <linux/init.h>
30 #include <linux/slab.h>
31 #include <linux/pagemap.h>
32 #include <asm/div64.h>
33 #include <linux/seq_file.h>
34 #include <linux/namei.h>
36 #include <liblustre.h>
39 #include <lustre/lustre_idl.h>
40 #include <obd_support.h>
41 #include <lustre_lib.h>
42 #include <lustre_net.h>
43 #include <lustre_dlm.h>
44 #include <lustre_mds.h>
45 #include <obd_class.h>
46 #include <lprocfs_status.h>
47 #include "lmv_internal.h"
49 static inline void lmv_drop_intent_lock(struct lookup_intent *it)
51 if (it->d.lustre.it_lock_mode != 0)
52 ldlm_lock_decref((void *)&it->d.lustre.it_lock_handle,
53 it->d.lustre.it_lock_mode);
56 int lmv_intent_remote(struct obd_export *exp, void *lmm,
57 int lmmsize, struct lookup_intent *it,
58 int flags, struct ptlrpc_request **reqp,
59 ldlm_blocking_callback cb_blocking,
62 struct obd_device *obd = exp->exp_obd;
63 struct lmv_obd *lmv = &obd->u.lmv;
64 struct ptlrpc_request *req = NULL;
65 struct mdt_body *body = NULL;
66 struct lustre_handle plock;
67 struct md_op_data *op_data;
72 body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
73 LASSERT(body != NULL);
75 if (!(body->valid & OBD_MD_MDS))
79 * oh, MDS reports that this is remote inode case i.e. we have to ask
80 * for real attrs on another MDS.
82 if (it->it_op & IT_LOOKUP/* || it->it_op & IT_CHDIR*/) {
84 * unfortunately, we have to lie to MDC/MDS to retrieve
85 * attributes llite needs.
87 it->it_op = IT_GETATTR;
90 /* we got LOOKUP lock, but we really need attrs */
91 pmode = it->d.lustre.it_lock_mode;
93 memcpy(&plock, &it->d.lustre.it_lock_handle,
95 it->d.lustre.it_lock_mode = 0;
96 it->d.lustre.it_data = 0;
99 LASSERT(fid_is_sane(&body->fid1));
102 it->d.lustre.it_disposition &= ~DISP_ENQ_COMPLETE;
104 OBD_ALLOC_PTR(op_data);
106 GOTO(out, rc = -ENOMEM);
109 i = lmv_fld_lookup(obd, &nid);
112 rc = md_intent_lock(lmv->tgts[i].ltd_exp, op_data,
113 lmm, lmmsize, it, flags, &req,
114 cb_blocking, extra_lock_flags);
117 * llite needs LOOKUP lock to track dentry revocation in order to
118 * maintain dcache consistency. Thus drop UPDATE lock here and put
122 lmv_drop_intent_lock(it);
123 memcpy(&it->d.lustre.it_lock_handle, &plock,
125 it->d.lustre.it_lock_mode = pmode;
128 OBD_FREE_PTR(op_data);
132 ldlm_lock_decref(&plock, pmode);
134 ptlrpc_req_finished(*reqp);
139 int lmv_intent_open(struct obd_export *exp, struct lu_fid *pid,
140 const char *name, int len, void *lmm, int lmmsize,
141 struct lu_fid *cid, struct lookup_intent *it,
142 int flags, struct ptlrpc_request **reqp,
143 ldlm_blocking_callback cb_blocking,
144 int extra_lock_flags)
146 struct obd_device *obd = exp->exp_obd;
147 struct lmv_obd *lmv = &obd->u.lmv;
148 struct mdt_body *body = NULL;
149 struct md_op_data *op_data;
150 struct lmv_stripe_md *mea;
151 struct lu_fid rpid = *pid;
152 int rc, mds, loop = 0;
156 OBD_ALLOC_PTR(op_data);
160 /* IT_OPEN is intended to open (and create, possible) an object. Parent
161 * (pid) may be splitted dir */
164 LASSERT(++loop <= 2);
165 mds = lmv_fld_lookup(obd, &rpid);
167 GOTO(out_free_op_data, rc = mds);
168 obj = lmv_obj_grab(obd, &rpid);
170 /* directory is already splitted, so we have to forward
171 * request to the right MDS */
172 mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
175 CDEBUG(D_OTHER, "forward to MDS #%u ("DFID")\n",
177 rpid = obj->lo_inodes[mds].li_fid;
181 op_data->fid1 = rpid;
184 op_data->fid2 = *cid;
185 op_data->name = name;
186 op_data->namelen = len;
188 //mds = lmv_fld_lookup(obd, &rpid);
189 rc = md_intent_lock(lmv->tgts[mds].ltd_exp, op_data,
190 lmm, lmmsize, it, flags, reqp,
191 cb_blocking, extra_lock_flags);
192 if (rc == -ERESTART) {
193 /* directory got splitted. time to update local object and
194 * repeat the request with proper MDS */
195 LASSERT(lu_fid_eq(pid, &rpid));
196 rc = lmv_handle_split(exp, &rpid);
198 ptlrpc_req_finished(*reqp);
199 memset(op_data, 0, sizeof(*op_data));
204 GOTO(out_free_op_data, rc);
206 /* okay, MDS has returned success. Probably name has been resolved in
208 rc = lmv_intent_remote(exp, lmm, lmmsize, it, flags, reqp,
209 cb_blocking, extra_lock_flags);
214 * this is possible, that some userspace application will try to
215 * open file as directory and we will have -ENOTDIR here. As
216 * this is "usual" situation, we should not print error here,
219 CDEBUG(D_OTHER, "can't handle remote %s: dir "DFID"("DFID"):"
220 "%*s: %d\n", LL_IT2STR(it), PFID(pid), PFID(&rpid),
222 GOTO(out_free_op_data, rc);
226 * nothing is found, do not access body->fid1 as it is zero and thus
229 if ((it->d.lustre.it_disposition & DISP_LOOKUP_NEG) &&
230 !(it->d.lustre.it_disposition & DISP_OPEN_CREATE) &&
231 !(it->d.lustre.it_disposition & DISP_OPEN_OPEN))
232 GOTO(out_free_op_data, rc = 0);
234 /* caller may use attrs MDS returns on IT_OPEN lock request so, we have
235 * to update them for splitted dir */
236 body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
237 LASSERT(body != NULL);
239 /* could not find object, FID is not present in response. */
240 if (!(body->valid & OBD_MD_FLID))
241 GOTO(out_free_op_data, rc = 0);
244 obj = lmv_obj_grab(obd, cid);
245 if (!obj && (mea = lmv_get_mea(*reqp, 1))) {
246 /* wow! this is splitted dir, we'd like to handle it */
247 obj = lmv_obj_create(exp, &body->fid1, mea);
249 GOTO(out_free_op_data, rc = (int)PTR_ERR(obj));
253 /* this is splitted dir and we'd want to get attrs */
254 CDEBUG(D_OTHER, "attrs from slaves for "DFID"\n",
257 rc = lmv_revalidate_slaves(exp, reqp, cid, it, 1,
258 cb_blocking, extra_lock_flags);
259 } else if (S_ISDIR(body->mode)) {
260 CDEBUG(D_OTHER, "object "DFID" has not lmv obj?\n",
269 OBD_FREE_PTR(op_data);
273 int lmv_intent_getattr(struct obd_export *exp, struct lu_fid *pid,
274 const char *name, int len, void *lmm, int lmmsize,
275 struct lu_fid *cid, struct lookup_intent *it,
276 int flags, struct ptlrpc_request **reqp,
277 ldlm_blocking_callback cb_blocking,
278 int extra_lock_flags)
280 struct lmv_obj *obj = NULL, *obj2 = NULL;
281 struct obd_device *obd = exp->exp_obd;
282 struct lmv_obd *lmv = &obd->u.lmv;
283 struct mdt_body *body = NULL;
284 struct md_op_data *op_data;
285 struct lu_fid rpid = *pid;
286 struct lmv_stripe_md *mea;
290 OBD_ALLOC_PTR(op_data);
295 /* caller wants to revalidate attrs of obj we have to revalidate
296 * slaves if requested object is splitted directory */
297 CDEBUG(D_OTHER, "revalidate attrs for "DFID"\n", PFID(cid));
298 mds = lmv_fld_lookup(obd, cid);
300 GOTO(out_free_op_data, rc = mds);
302 obj = lmv_obj_grab(obd, cid);
304 /* in fact, we need not this with current intent_lock(),
305 * but it may change some day */
306 if (!lu_fid_eq(pid, cid)){
307 rpid = obj->lo_inodes[mds].li_fid;
308 mds = lmv_fld_lookup(obd, &rpid);
310 GOTO(out_free_op_data, rc = mds);
315 op_data->fid2 = *cid;
317 CDEBUG(D_OTHER, "INTENT getattr for %*s on "DFID"\n",
318 len, name, PFID(pid));
319 mds = lmv_fld_lookup(obd, pid);
321 GOTO(out_free_op_data, rc = mds);
322 obj = lmv_obj_grab(obd, pid);
324 /* directory is already splitted. calculate mds */
325 mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
327 rpid = obj->lo_inodes[mds].li_fid;
328 mds = lmv_fld_lookup(obd, &rpid);
331 CDEBUG(D_OTHER, "forward to MDS #%u (slave "DFID")\n",
336 op_data->fid1 = rpid;
337 op_data->name = name;
338 op_data->namelen = len;
340 /* the same about fid returning. */
341 rc = md_intent_lock(lmv->tgts[mds].ltd_exp, op_data, lmm,
342 lmmsize, it, flags, reqp, cb_blocking,
345 GOTO(out_free_op_data, rc);
349 * this is splitted dir. In order to optimize things a bit, we
350 * consider obj valid updating missing parts.
352 * FIXME: do we need to return any lock here? It would be fine
353 * if we don't. this means that nobody should use UPDATE lock to
354 * notify about object * removal.
357 "revalidate slaves for "DFID", rc %d\n",
361 rc = lmv_revalidate_slaves(exp, reqp, cid, it, rc,
362 cb_blocking, extra_lock_flags);
363 GOTO(out_free_op_data, rc);
367 GOTO(out_free_op_data, rc);
370 * okay, MDS has returned success. probably name has been resolved in
373 rc = lmv_intent_remote(exp, lmm, lmmsize, it, flags,
374 reqp, cb_blocking, extra_lock_flags);
376 GOTO(out_free_op_data, rc);
379 * nothing is found, do not access body->fid1 as it is zero and thus
382 if (it->d.lustre.it_disposition & DISP_LOOKUP_NEG)
383 GOTO(out_free_op_data, rc = 0);
385 body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
386 LASSERT(body != NULL);
388 /* could not find object, FID is not present in response. */
389 if (!(body->valid & OBD_MD_FLID))
390 GOTO(out_free_op_data, rc = 0);
393 obj2 = lmv_obj_grab(obd, cid);
395 if (!obj2 && (mea = lmv_get_mea(*reqp, 1))) {
396 /* wow! this is splitted dir, we'd like to handle it. */
397 body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
398 LASSERT(body != NULL);
400 obj2 = lmv_obj_create(exp, &body->fid1, mea);
402 GOTO(out_free_op_data, rc = (int)PTR_ERR(obj2));
406 /* this is splitted dir and we'd want to get attrs */
407 CDEBUG(D_OTHER, "attrs from slaves for "DFID", rc %d\n",
410 rc = lmv_revalidate_slaves(exp, reqp, cid, it, 1,
411 cb_blocking, extra_lock_flags);
417 OBD_FREE_PTR(op_data);
421 void lmv_update_body(struct mdt_body *body, struct lmv_inode *lino)
424 body->size += lino->li_size;
427 /* this is not used currently */
428 int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp)
430 struct obd_device *obd = exp->exp_obd;
431 struct lmv_obd *lmv = &obd->u.lmv;
432 struct mdt_body *body = NULL;
433 struct lustre_handle *lockh;
434 struct md_op_data *op_data;
435 struct ldlm_lock *lock;
436 struct mdt_body *body2;
444 /* master is locked. we'd like to take locks on slaves and update
445 * attributes to be returned from the slaves it's important that lookup
446 * is called in two cases:
448 * - for first time (dcache has no such a resolving yet).
449 * - ->d_revalidate() returned false.
451 * last case possible only if all the objs (master and all slaves aren't
454 body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
455 LASSERT(body != NULL);
456 LASSERT((body->valid & OBD_MD_FLID) != 0);
458 obj = lmv_obj_grab(obd, &body->fid1);
459 LASSERT(obj != NULL);
461 CDEBUG(D_OTHER, "lookup slaves for "DFID"\n",
464 OBD_ALLOC_PTR(op_data);
470 for (i = 0; i < obj->lo_objcount; i++) {
471 struct lu_fid fid = obj->lo_inodes[i].li_fid;
472 struct ptlrpc_request *req = NULL;
473 struct lookup_intent it;
476 if (lu_fid_eq(&fid, &obj->lo_fid))
477 /* skip master obj */
480 CDEBUG(D_OTHER, "lookup slave "DFID"\n", PFID(&fid));
483 memset(&it, 0, sizeof(it));
484 it.it_op = IT_GETATTR;
486 memset(op_data, 0, sizeof(*op_data));
490 mds = lmv_fld_lookup(obd, &fid);
492 GOTO(cleanup, rc = mds);
493 rc = md_intent_lock(lmv->tgts[mds].ltd_exp, op_data,
494 NULL, 0, &it, 0, &req,
495 lmv_blocking_ast, 0);
497 lockh = (struct lustre_handle *)&it.d.lustre.it_lock_handle;
498 if (rc > 0 && req == NULL) {
499 /* nice, this slave is valid */
500 LASSERT(req == NULL);
501 CDEBUG(D_OTHER, "cached\n");
506 /* error during lookup */
509 lock = ldlm_handle2lock(lockh);
512 lock->l_ast_data = lmv_obj_get(obj);
514 body2 = lustre_msg_buf(req->rq_repmsg, 1, sizeof(*body2));
517 obj->lo_inodes[i].li_size = body2->size;
519 CDEBUG(D_OTHER, "fresh: %lu\n",
520 (unsigned long)obj->lo_inodes[i].li_size);
525 ptlrpc_req_finished(req);
527 lmv_update_body(body, obj->lo_inodes + i);
529 if (it.d.lustre.it_lock_mode)
530 ldlm_lock_decref(lockh, it.d.lustre.it_lock_mode);
535 OBD_FREE_PTR(op_data);
541 int lmv_intent_lookup(struct obd_export *exp, struct lu_fid *pid,
542 const char *name, int len, void *lmm, int lmmsize,
543 struct lu_fid *cid, struct lookup_intent *it,
544 int flags, struct ptlrpc_request **reqp,
545 ldlm_blocking_callback cb_blocking,
546 int extra_lock_flags)
548 struct obd_device *obd = exp->exp_obd;
549 struct lmv_obd *lmv = &obd->u.lmv;
550 struct mdt_body *body = NULL;
551 struct lu_fid rpid = *pid;
552 struct md_op_data *op_data;
553 struct lmv_stripe_md *mea;
554 int rc, mds, loop = 0;
558 OBD_ALLOC_PTR(op_data);
563 * IT_LOOKUP is intended to produce name -> fid resolving (let's call
564 * this lookup below) or to confirm requested resolving is still valid
565 * (let's call this revalidation) cid != NULL specifies revalidation.
569 * this is revalidation: we have to check is LOOKUP lock still
570 * valid for given fid. Very important part is that we have to
571 * choose right mds because namespace is per mds.
574 obj = lmv_obj_grab(obd, pid);
576 mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
578 rpid = obj->lo_inodes[mds].li_fid;
581 mds = lmv_fld_lookup(obd, &rpid);
583 GOTO(out_free_op_data, rc = mds);
585 CDEBUG(D_OTHER, "revalidate lookup for "DFID" to %d MDS\n",
588 op_data->fid2 = *cid;
590 mds = lmv_fld_lookup(obd, pid);
592 GOTO(out_free_op_data, rc = mds);
594 LASSERT(++loop <= 2);
597 * this is lookup. during lookup we have to update all the
598 * attributes, because returned values will be put in struct
601 obj = lmv_obj_grab(obd, pid);
604 /* directory is already splitted. calculate mds */
605 mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
607 rpid = obj->lo_inodes[mds].li_fid;
608 mds = lmv_fld_lookup(obd, &rpid);
610 GOTO(out_free_op_data, rc = mds);
614 memset(&op_data->fid2, 0, sizeof(op_data->fid2));
617 op_data->fid1 = rpid;
618 op_data->name = name;
619 op_data->namelen = len;
621 rc = md_intent_lock(lmv->tgts[mds].ltd_exp, op_data, lmm, lmmsize,
622 it, flags, reqp, cb_blocking, extra_lock_flags);
625 GOTO(out_free_op_data, rc);
629 * very interesting. it seems object is still valid but for some
630 * reason llite calls lookup, not revalidate.
632 CDEBUG(D_OTHER, "lookup for "DFID" and data should be uptodate\n",
634 LASSERT(*reqp == NULL);
635 GOTO(out_free_op_data, rc);
638 if (rc == 0 && *reqp == NULL) {
639 /* once again, we're asked for lookup, not revalidate */
640 CDEBUG(D_OTHER, "lookup for "DFID" and data should be uptodate\n",
642 GOTO(out_free_op_data, rc);
645 if (rc == -ERESTART) {
646 /* directory got splitted since last update. this shouldn't be
647 * becasue splitting causes lock revocation, so revalidate had
648 * to fail and lookup on dir had to return mea */
649 CWARN("we haven't knew about directory splitting!\n");
650 LASSERT(obj == NULL);
652 obj = lmv_obj_create(exp, &rpid, NULL);
654 GOTO(out_free_op_data, rc = (int)PTR_ERR(obj));
656 memset(op_data, 0, sizeof(*op_data));
661 GOTO(out_free_op_data, rc);
663 /* okay, MDS has returned success. Probably name has been resolved in
665 rc = lmv_intent_remote(exp, lmm, lmmsize, it, flags, reqp,
666 cb_blocking, extra_lock_flags);
668 if (rc == 0 && (mea = lmv_get_mea(*reqp, 1))) {
669 /* wow! this is splitted dir, we'd like to handle it */
670 body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
671 LASSERT(body != NULL);
672 LASSERT((body->valid & OBD_MD_FLID) != 0);
674 obj = lmv_obj_grab(obd, &body->fid1);
676 obj = lmv_obj_create(exp, &body->fid1, mea);
678 GOTO(out_free_op_data, rc = (int)PTR_ERR(obj));
685 OBD_FREE_PTR(op_data);
689 int lmv_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
690 void *lmm, int lmmsize, struct lookup_intent *it,
691 int flags, struct ptlrpc_request **reqp,
692 ldlm_blocking_callback cb_blocking,
693 int extra_lock_flags)
695 struct obd_device *obd = exp->exp_obd;
696 const char *name = op_data->name;
697 int len = op_data->namelen;
698 struct lu_fid *pid, *cid;
704 pid = fid_is_sane(&op_data->fid1) ? &op_data->fid1 : NULL;
705 cid = fid_is_sane(&op_data->fid2) ? &op_data->fid2 : NULL;
707 i = lmv_fld_lookup(obd, pid);
710 CDEBUG(D_OTHER, "INTENT LOCK '%s' for '%*s' on "DFID" -> %d\n",
711 LL_IT2STR(it), len, name, PFID(pid), i);
713 rc = lmv_check_connect(obd);
717 if (it->it_op & IT_LOOKUP)
718 rc = lmv_intent_lookup(exp, pid, name, len, lmm,
719 lmmsize, cid, it, flags, reqp,
720 cb_blocking, extra_lock_flags);
721 else if (it->it_op & IT_OPEN)
722 rc = lmv_intent_open(exp, pid, name, len, lmm,
723 lmmsize, cid, it, flags, reqp,
724 cb_blocking, extra_lock_flags);
725 else if (it->it_op & IT_GETATTR/* || it->it_op & IT_CHDIR*/)
726 rc = lmv_intent_getattr(exp, pid, name, len, lmm,
727 lmmsize, cid, it, flags, reqp,
728 cb_blocking, extra_lock_flags);
734 int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp,
735 struct lu_fid *mid, struct lookup_intent *oit,
736 int master_valid, ldlm_blocking_callback cb_blocking,
737 int extra_lock_flags)
739 struct obd_device *obd = exp->exp_obd;
740 struct ptlrpc_request *mreq = *reqp;
741 struct lmv_obd *lmv = &obd->u.lmv;
742 struct lustre_handle master_lockh;
743 struct md_op_data *op_data;
744 struct ldlm_lock *lock;
745 unsigned long size = 0;
746 struct mdt_body *body;
748 int master_lock_mode;
752 OBD_ALLOC_PTR(op_data);
756 /* we have to loop over the subobjects, check validity and update them
757 * from MDSs if needed. it's very useful that we need not to update all
758 * the fields. say, common fields (that are equal on all the subojects
759 * need not to be update, another fields (i_size, for example) are
760 * cached all the time */
761 obj = lmv_obj_grab(obd, mid);
762 LASSERT(obj != NULL);
764 master_lock_mode = 0;
768 for (i = 0; i < obj->lo_objcount; i++) {
769 struct lu_fid fid = obj->lo_inodes[i].li_fid;
770 struct lustre_handle *lockh = NULL;
771 struct ptlrpc_request *req = NULL;
772 ldlm_blocking_callback cb;
773 struct lookup_intent it;
776 CDEBUG(D_OTHER, "revalidate subobj "DFID"\n",
779 memset(op_data, 0, sizeof(*op_data));
780 memset(&it, 0, sizeof(it));
781 it.it_op = IT_GETATTR;
783 cb = lmv_blocking_ast;
785 if (lu_fid_eq(&fid, &obj->lo_fid)) {
787 /* lmv_intent_getattr() already checked
788 * validness and took the lock */
790 /* it even got the reply refresh attrs
792 body = lustre_msg_buf(mreq->rq_repmsg,
794 LASSERT(body != NULL);
797 /* take already cached attrs into account */
799 "master is locked and cached\n");
810 mds = lmv_fld_lookup(obd, &fid);
812 GOTO(out_free_op_data, rc = mds);
813 rc = md_intent_lock(lmv->tgts[mds].ltd_exp, op_data,
814 NULL, 0, &it, 0, &req, cb, extra_lock_flags);
815 lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle;
816 if (rc > 0 && req == NULL) {
817 /* nice, this slave is valid */
818 LASSERT(req == NULL);
819 CDEBUG(D_OTHER, "cached\n");
824 /* error during revalidation */
828 LASSERT(master_valid == 0);
829 /* save lock on master to be returned to the caller */
830 CDEBUG(D_OTHER, "no lock on master yet\n");
831 memcpy(&master_lockh, lockh, sizeof(master_lockh));
832 master_lock_mode = it.d.lustre.it_lock_mode;
833 it.d.lustre.it_lock_mode = 0;
835 /* this is slave. we want to control it */
836 lock = ldlm_handle2lock(lockh);
838 lock->l_ast_data = lmv_obj_get(obj);
843 /* this is first reply, we'll use it to return updated
844 * data back to the caller */
846 ptlrpc_request_addref(req);
851 body = lustre_msg_buf(req->rq_repmsg, 1, sizeof(*body));
855 obj->lo_inodes[i].li_size = body->size;
857 CDEBUG(D_OTHER, "fresh: %lu\n",
858 (unsigned long)obj->lo_inodes[i].li_size);
861 ptlrpc_req_finished(req);
863 size += obj->lo_inodes[i].li_size;
865 if (it.d.lustre.it_lock_mode)
866 ldlm_lock_decref(lockh, it.d.lustre.it_lock_mode);
870 /* some attrs got refreshed, we have reply and it's time to put
871 * fresh attrs to it */
872 CDEBUG(D_OTHER, "return refreshed attrs: size = %lu\n",
873 (unsigned long)size);
875 body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
878 /* FIXME: what about other attributes? */
883 * very important to maintain mds num the same because
884 * of revalidation. mreq == NULL means that caller has
885 * no reply and the only attr we can return is size.
887 body->valid = OBD_MD_FLSIZE;
888 // body->mds = lmv_fld_lookup(obd, &obj->lo_fid);
890 if (master_valid == 0) {
891 memcpy(&oit->d.lustre.it_lock_handle,
892 &master_lockh, sizeof(master_lockh));
893 oit->d.lustre.it_lock_mode = master_lock_mode;
897 /* it seems all the attrs are fresh and we did no request */
898 CDEBUG(D_OTHER, "all the attrs were fresh\n");
899 if (master_valid == 0)
900 oit->d.lustre.it_lock_mode = master_lock_mode;
909 OBD_FREE_PTR(op_data);