Whamcloud - gitweb
LU-1818 quota: en/disable quota enforcement via conf_param
[fs/lustre-release.git] / lustre / lmv / lmv_intent.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_LMV
38 #ifdef __KERNEL__
39 #include <linux/slab.h>
40 #include <linux/module.h>
41 #include <linux/init.h>
42 #include <linux/slab.h>
43 #include <linux/pagemap.h>
44 #include <asm/div64.h>
45 #include <linux/seq_file.h>
46 #include <linux/namei.h>
47 #include <linux/lustre_intent.h>
48 #else
49 #include <liblustre.h>
50 #endif
51
52 #include <obd_support.h>
53 #include <lustre/lustre_idl.h>
54 #include <lustre_lib.h>
55 #include <lustre_net.h>
56 #include <lustre_dlm.h>
57 #include <obd_class.h>
58 #include <lprocfs_status.h>
59 #include "lmv_internal.h"
60
61 int lmv_intent_remote(struct obd_export *exp, void *lmm,
62                       int lmmsize, struct lookup_intent *it,
63                       int flags, struct ptlrpc_request **reqp,
64                       ldlm_blocking_callback cb_blocking,
65                       int extra_lock_flags)
66 {
67         struct obd_device      *obd = exp->exp_obd;
68         struct lmv_obd         *lmv = &obd->u.lmv;
69         struct ptlrpc_request  *req = NULL;
70         struct lustre_handle    plock;
71         struct md_op_data      *op_data;
72         struct lmv_tgt_desc    *tgt;
73         struct mdt_body        *body;
74         int                     pmode;
75         int                     rc = 0;
76         ENTRY;
77
78         body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY);
79         if (body == NULL)
80                 RETURN(-EPROTO);
81
82         /*
83          * Not cross-ref case, just get out of here.
84          */
85         if (!(body->valid & OBD_MD_MDS))
86                 RETURN(0);
87
88         /*
89          * Unfortunately, we have to lie to MDC/MDS to retrieve
90          * attributes llite needs and provideproper locking.
91          */
92         if (it->it_op & IT_LOOKUP)
93                 it->it_op = IT_GETATTR;
94
95         /* 
96          * We got LOOKUP lock, but we really need attrs. 
97          */
98         pmode = it->d.lustre.it_lock_mode;
99         if (pmode) {
100                 plock.cookie = it->d.lustre.it_lock_handle;
101                 it->d.lustre.it_lock_mode = 0;
102                 it->d.lustre.it_data = NULL;
103         }
104
105         LASSERT(fid_is_sane(&body->fid1));
106
107         tgt = lmv_find_target(lmv, &body->fid1);
108         if (IS_ERR(tgt))
109                 GOTO(out, rc = PTR_ERR(tgt));
110
111         OBD_ALLOC_PTR(op_data);
112         if (op_data == NULL)
113                 GOTO(out, rc = -ENOMEM);
114
115         op_data->op_fid1 = body->fid1;
116         op_data->op_bias = MDS_CROSS_REF;
117         
118         CDEBUG(D_INODE, 
119                "REMOTE_INTENT with fid="DFID" -> mds #%d\n", 
120                PFID(&body->fid1), tgt->ltd_idx);
121
122         it->d.lustre.it_disposition &= ~DISP_ENQ_COMPLETE;
123         rc = md_intent_lock(tgt->ltd_exp, op_data, lmm, lmmsize, it,
124                             flags, &req, cb_blocking, extra_lock_flags);
125         if (rc)
126                 GOTO(out_free_op_data, rc);
127
128         /*
129          * LLite needs LOOKUP lock to track dentry revocation in order to
130          * maintain dcache consistency. Thus drop UPDATE lock here and put
131          * LOOKUP in request.
132          */
133         if (it->d.lustre.it_lock_mode != 0) {
134                 ldlm_lock_decref((void *)&it->d.lustre.it_lock_handle,
135                                  it->d.lustre.it_lock_mode);
136                 it->d.lustre.it_lock_mode = 0;
137         }
138         it->d.lustre.it_lock_handle = plock.cookie;
139         it->d.lustre.it_lock_mode = pmode;
140
141         EXIT;
142 out_free_op_data:
143         OBD_FREE_PTR(op_data);
144 out:
145         if (rc && pmode)
146                 ldlm_lock_decref(&plock, pmode);
147
148         ptlrpc_req_finished(*reqp);
149         *reqp = req;
150         return rc;
151 }
152
153 /*
154  * IT_OPEN is intended to open (and create, possible) an object. Parent (pid)
155  * may be split dir.
156  */
157 int lmv_intent_open(struct obd_export *exp, struct md_op_data *op_data,
158                     void *lmm, int lmmsize, struct lookup_intent *it,
159                     int flags, struct ptlrpc_request **reqp,
160                     ldlm_blocking_callback cb_blocking,
161                     int extra_lock_flags)
162 {
163         struct obd_device     *obd = exp->exp_obd;
164         struct lu_fid          rpid = op_data->op_fid1;
165         struct lmv_obd        *lmv = &obd->u.lmv;
166         struct md_op_data     *sop_data;
167         struct lmv_stripe_md  *mea;
168         struct lmv_tgt_desc   *tgt;
169         struct mdt_body       *body;
170         struct lmv_object     *obj;
171         int                    rc;
172         int                    loop = 0;
173         int                    sidx;
174         ENTRY;
175
176         OBD_ALLOC_PTR(sop_data);
177         if (sop_data == NULL)
178                 RETURN(-ENOMEM);
179
180         /* save op_data fro repeat case */
181         *sop_data = *op_data;
182
183 repeat:
184
185         ++loop;
186         LASSERT(loop <= 2);
187         obj = lmv_object_find(obd, &rpid);
188         if (obj) {
189                 /*
190                  * Directory is already split, so we have to forward request to
191                  * the right MDS.
192                  */
193                 sidx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
194                                        (char *)op_data->op_name,
195                                        op_data->op_namelen);
196
197                 rpid = obj->lo_stripes[sidx].ls_fid;
198
199                 sop_data->op_mds = obj->lo_stripes[sidx].ls_mds;
200                 tgt = lmv_get_target(lmv, sop_data->op_mds);
201                 sop_data->op_bias &= ~MDS_CHECK_SPLIT;
202                 lmv_object_put(obj);
203
204                 CDEBUG(D_INODE,
205                        "Choose slave dir ("DFID") -> mds #%d\n", 
206                        PFID(&rpid), tgt->ltd_idx);
207         } else {
208                 sop_data->op_bias |= MDS_CHECK_SPLIT;
209                 tgt = lmv_find_target(lmv, &rpid);
210                 sop_data->op_mds = tgt->ltd_idx;
211         }
212         if (IS_ERR(tgt))
213                 GOTO(out_free_sop_data, rc = PTR_ERR(tgt));
214
215         sop_data->op_fid1 = rpid;
216
217         if (it->it_op & IT_CREAT) {
218                 /*
219                  * For open with IT_CREATE and for IT_CREATE cases allocate new
220                  * fid and setup FLD for it.
221                  */
222                 sop_data->op_fid3 = sop_data->op_fid2;
223                 rc = lmv_fid_alloc(exp, &sop_data->op_fid2, sop_data);
224                 if (rc)
225                         GOTO(out_free_sop_data, rc);
226
227                 if (rc == -ERESTART)
228                         goto repeat;
229                 else if (rc)
230                         GOTO(out_free_sop_data, rc);
231         }
232
233         CDEBUG(D_INODE, 
234                "OPEN_INTENT with fid1="DFID", fid2="DFID", name='%s' -> mds #%d\n", 
235                PFID(&sop_data->op_fid1), PFID(&sop_data->op_fid2), 
236                sop_data->op_name, tgt->ltd_idx);
237
238         rc = md_intent_lock(tgt->ltd_exp, sop_data, lmm, lmmsize, it, flags,
239                             reqp, cb_blocking, extra_lock_flags);
240
241         if (rc == -ERESTART) {
242                 LASSERT(*reqp != NULL);
243                 DEBUG_REQ(D_WARNING|D_RPCTRACE, *reqp,
244                           "Got -ERESTART during open!\n");
245                 ptlrpc_req_finished(*reqp);
246                 *reqp = NULL;
247                 it->d.lustre.it_data = NULL;
248
249                 /*
250                  * Directory got split. Time to update local object and repeat
251                  * the request with proper MDS.
252                  */
253                 LASSERT(lu_fid_eq(&op_data->op_fid1, &rpid));
254                 rc = lmv_handle_split(exp, &rpid);
255                 if (rc == 0) {
256                         /* We should reallocate child FID. */
257                         rc = lmv_allocate_slaves(obd, &rpid, op_data,
258                                                  &sop_data->op_fid2);
259                         if (rc == 0)
260                                 goto repeat;
261                 }
262         }
263
264         if (rc != 0)
265                 GOTO(out_free_sop_data, rc);
266
267         /*
268          * Nothing is found, do not access body->fid1 as it is zero and thus
269          * pointless.
270          */
271         if ((it->d.lustre.it_disposition & DISP_LOOKUP_NEG) &&
272             !(it->d.lustre.it_disposition & DISP_OPEN_CREATE) &&
273             !(it->d.lustre.it_disposition & DISP_OPEN_OPEN))
274                 GOTO(out_free_sop_data, rc = 0);
275
276         /*
277          * Okay, MDS has returned success. Probably name has been resolved in
278          * remote inode.
279          */
280         rc = lmv_intent_remote(exp, lmm, lmmsize, it, flags, reqp,
281                                cb_blocking, extra_lock_flags);
282         if (rc != 0) {
283                 LASSERT(rc < 0);
284                 /*
285                  * This is possible, that some userspace application will try to
286                  * open file as directory and we will have -ENOTDIR here. As
287                  * this is normal situation, we should not print error here,
288                  * only debug info.
289                  */
290                 CDEBUG(D_INODE, "Can't handle remote %s: dir "DFID"("DFID"):"
291                        "%*s: %d\n", LL_IT2STR(it), PFID(&op_data->op_fid2),
292                        PFID(&rpid), op_data->op_namelen, op_data->op_name, rc);
293                 GOTO(out_free_sop_data, rc);
294         }
295
296         /* 
297          * Caller may use attrs MDS returns on IT_OPEN lock request so, we have
298          * to update them for split dir. 
299          */
300         body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY);
301         LASSERT(body != NULL);
302         
303         /* 
304          * Could not find object, FID is not present in response. 
305          */
306         if (!(body->valid & OBD_MD_FLID))
307                 GOTO(out_free_sop_data, rc = 0);
308
309         obj = lmv_object_find(obd, &body->fid1);
310         if (obj == NULL) {
311                 /* 
312                  * XXX: Capability for remote call! 
313                  */
314                 mea = lmv_get_mea(*reqp);
315                 if (mea != NULL) {
316                         obj = lmv_object_create(exp, &body->fid1, mea);
317                         if (IS_ERR(obj))
318                                 GOTO(out_free_sop_data, rc = (int)PTR_ERR(obj));
319                 }
320         }
321
322         if (obj) {
323                 /* 
324                  * This is split dir and we'd want to get attrs. 
325                  */
326                 CDEBUG(D_INODE, "Slave attributes for "DFID"\n",
327                        PFID(&body->fid1));
328
329                 rc = lmv_revalidate_slaves(exp, reqp, &body->fid1, it, 1,
330                                            cb_blocking, extra_lock_flags);
331                 lmv_object_put(obj);
332         }
333         EXIT;
334 out_free_sop_data:
335         OBD_FREE_PTR(sop_data);
336         return rc;
337 }
338
339 /*
340  * Handler for: getattr, lookup and revalidate cases.
341  */
342 int lmv_intent_lookup(struct obd_export *exp, struct md_op_data *op_data,
343                       void *lmm, int lmmsize, struct lookup_intent *it,
344                       int flags, struct ptlrpc_request **reqp,
345                       ldlm_blocking_callback cb_blocking,
346                       int extra_lock_flags)
347 {
348         struct obd_device      *obd = exp->exp_obd;
349         struct lu_fid           rpid = op_data->op_fid1;
350         struct lmv_obd         *lmv = &obd->u.lmv;
351         struct lmv_object      *obj = NULL;
352         struct md_op_data      *sop_data;
353         struct lmv_stripe_md   *mea;
354         struct lmv_tgt_desc    *tgt = NULL;
355         struct mdt_body        *body;
356         int                     sidx;
357         int                     loop = 0;
358         int                     rc = 0;
359         ENTRY;
360
361         OBD_ALLOC_PTR(sop_data);
362         if (sop_data == NULL)
363                 RETURN(-ENOMEM);
364
365         *sop_data = *op_data;
366
367 repeat:
368         ++loop;
369         LASSERT(loop <= 2);
370
371         obj = lmv_object_find(obd, &op_data->op_fid1);
372         if (obj && op_data->op_namelen) {
373                 sidx = raw_name2idx(obj->lo_hashtype,
374                                        obj->lo_objcount,
375                                        (char *)op_data->op_name,
376                                        op_data->op_namelen);
377                 rpid = obj->lo_stripes[sidx].ls_fid;
378                 tgt = lmv_get_target(lmv, 
379                                      obj->lo_stripes[sidx].ls_mds);
380                 CDEBUG(D_INODE,
381                        "Choose slave dir ("DFID") -> mds #%d\n", 
382                        PFID(&rpid), tgt->ltd_idx);
383                 sop_data->op_bias &= ~MDS_CHECK_SPLIT;
384         } else {
385                 tgt = lmv_find_target(lmv, &op_data->op_fid1);
386                 sop_data->op_bias |= MDS_CHECK_SPLIT;
387         }
388         if (obj)
389                 lmv_object_put(obj);
390         
391         if (IS_ERR(tgt))
392                 GOTO(out_free_sop_data, rc = PTR_ERR(tgt));
393         
394         if (!fid_is_sane(&sop_data->op_fid2))
395                 fid_zero(&sop_data->op_fid2);
396         
397         CDEBUG(D_INODE, 
398                "LOOKUP_INTENT with fid1="DFID", fid2="DFID
399                ", name='%s' -> mds #%d\n",
400                PFID(&sop_data->op_fid1), PFID(&sop_data->op_fid2), 
401                sop_data->op_name ? sop_data->op_name : "<NULL>", 
402                tgt->ltd_idx);
403
404         sop_data->op_bias &= ~MDS_CROSS_REF;
405         sop_data->op_fid1 = rpid;
406
407         rc = md_intent_lock(tgt->ltd_exp, sop_data, lmm, lmmsize, it, 
408                             flags, reqp, cb_blocking, extra_lock_flags);
409
410         if (rc == -ERESTART) {
411                 LASSERT(*reqp != NULL);
412                 DEBUG_REQ(D_WARNING|D_RPCTRACE, *reqp,
413                           "Got -ERESTART during lookup!\n");
414                 ptlrpc_req_finished(*reqp);
415                 *reqp = NULL;
416                 it->d.lustre.it_data = 0;
417
418                 /*
419                  * Directory got split since last update. This shouldn't be
420                  * because splitting causes lock revocation, so revalidate had
421                  * to fail and lookup on dir had to return mea.
422                  */
423                 LASSERT(obj == NULL);
424
425                 obj = lmv_object_create(exp, &rpid, NULL);
426                 if (IS_ERR(obj))
427                         GOTO(out_free_sop_data, rc = PTR_ERR(obj));
428                 lmv_object_put(obj);
429                 goto repeat;
430         }
431         
432         if (rc < 0)
433                 GOTO(out_free_sop_data, rc);
434
435         if (obj && rc > 0) {
436                 /*
437                  * This is split dir. In order to optimize things a bit, we
438                  * consider obj valid updating missing parts.
439                  */
440                 CDEBUG(D_INODE,
441                        "Revalidate slaves for "DFID", rc %d\n",
442                        PFID(&op_data->op_fid1), rc);
443
444                 LASSERT(fid_is_sane(&op_data->op_fid2));
445                 rc = lmv_revalidate_slaves(exp, reqp, &op_data->op_fid1, it, rc,
446                                            cb_blocking, extra_lock_flags);
447                 GOTO(out_free_sop_data, rc);
448         }
449
450         if (*reqp == NULL)
451                 GOTO(out_free_sop_data, rc);
452
453         /*
454          * MDS has returned success. Probably name has been resolved in
455          * remote inode. Let's check this.
456          */
457         rc = lmv_intent_remote(exp, lmm, lmmsize, it, flags,
458                                reqp, cb_blocking, extra_lock_flags);
459         if (rc < 0)
460                 GOTO(out_free_sop_data, rc);
461
462         /*
463          * Nothing is found, do not access body->fid1 as it is zero and thus
464          * pointless.
465          */
466         if (it->d.lustre.it_disposition & DISP_LOOKUP_NEG)
467                 GOTO(out_free_sop_data, rc = 0);
468
469         LASSERT(*reqp != NULL);
470         LASSERT((*reqp)->rq_repmsg != NULL);
471         body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY);
472         LASSERT(body != NULL);
473
474         /* 
475          * Could not find object, FID is not present in response. 
476          */
477         if (!(body->valid & OBD_MD_FLID))
478                 GOTO(out_free_sop_data, rc = 0);
479
480         obj = lmv_object_find(obd, &body->fid1);
481         if (obj == NULL) {
482                 /* 
483                  * XXX: Remote capability is not handled.
484                  */
485                 mea = lmv_get_mea(*reqp);
486                 if (mea != NULL) {
487                         obj = lmv_object_create(exp, &body->fid1, mea);
488                         if (IS_ERR(obj))
489                                 GOTO(out_free_sop_data, rc = (int)PTR_ERR(obj));
490                 }
491         } else {
492                 CDEBUG(D_INODE, "Slave attributes for "DFID", rc %d\n",
493                        PFID(&body->fid1), rc);
494
495                 rc = lmv_revalidate_slaves(exp, reqp, &body->fid1, it, 1,
496                                            cb_blocking, extra_lock_flags);
497                 lmv_object_put(obj);
498         }
499
500         EXIT;
501 out_free_sop_data:
502         OBD_FREE_PTR(sop_data);
503         return rc;
504 }
505
506 int lmv_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
507                     void *lmm, int lmmsize, struct lookup_intent *it,
508                     int flags, struct ptlrpc_request **reqp,
509                     ldlm_blocking_callback cb_blocking,
510                     int extra_lock_flags)
511 {
512         struct obd_device *obd = exp->exp_obd;
513         int                rc;
514         ENTRY;
515
516         LASSERT(it != NULL);
517         LASSERT(fid_is_sane(&op_data->op_fid1));
518
519         CDEBUG(D_INODE, "INTENT LOCK '%s' for '%*s' on "DFID"\n",
520                LL_IT2STR(it), op_data->op_namelen, op_data->op_name,
521                PFID(&op_data->op_fid1));
522
523         rc = lmv_check_connect(obd);
524         if (rc)
525                 RETURN(rc);
526
527         if (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_LAYOUT))
528                 rc = lmv_intent_lookup(exp, op_data, lmm, lmmsize, it,
529                                        flags, reqp, cb_blocking,
530                                        extra_lock_flags);
531         else if (it->it_op & IT_OPEN)
532                 rc = lmv_intent_open(exp, op_data, lmm, lmmsize, it,
533                                      flags, reqp, cb_blocking,
534                                      extra_lock_flags);
535         else
536                 LBUG();
537         RETURN(rc);
538 }
539
540 int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp,
541                           const struct lu_fid *mid, struct lookup_intent *oit,
542                           int master_valid, ldlm_blocking_callback cb_blocking,
543                           int extra_lock_flags)
544 {
545         struct obd_device      *obd = exp->exp_obd;
546         struct lmv_obd         *lmv = &obd->u.lmv;
547         int                     master_lockm = 0;
548         struct lustre_handle   *lockh = NULL;
549         struct ptlrpc_request  *mreq = *reqp;
550         struct lustre_handle    master_lockh = { 0 };
551         struct md_op_data      *op_data;
552         struct ldlm_lock       *lock;
553         unsigned long           size = 0;
554         struct mdt_body        *body;
555         struct lmv_object      *obj;
556         int                     i;
557         int                     rc = 0;
558         struct lu_fid           fid;
559         struct ptlrpc_request  *req;
560         ldlm_blocking_callback  cb;
561         struct lookup_intent    it;
562         struct lmv_tgt_desc    *tgt;
563         int                     master;
564         ENTRY;
565
566         CDEBUG(D_INODE, "Revalidate master obj "DFID"\n", PFID(mid));
567
568         OBD_ALLOC_PTR(op_data);
569         if (op_data == NULL)
570                 RETURN(-ENOMEM);
571
572         /*
573          * We have to loop over the subobjects, check validity and update them
574          * from MDS if needed. It's very useful that we need not to update all
575          * the fields. Say, common fields (that are equal on all the subojects
576          * need not to be update, another fields (i_size, for example) are
577          * cached all the time.
578          */
579         obj = lmv_object_find_lock(obd, mid);
580         if (obj == NULL)
581                 RETURN(-EALREADY);
582
583         for (i = 0; i < obj->lo_objcount; i++) {
584                 fid = obj->lo_stripes[i].ls_fid;
585                 master = lu_fid_eq(&fid, &obj->lo_fid);
586                 cb = master ? cb_blocking : lmv_blocking_ast;
587
588                 /*
589                  * We need i_size and we would like to check possible cached locks, 
590                  * so this is is IT_GETATTR intent.
591                  */
592                 memset(&it, 0, sizeof(it));
593                 it.it_op = IT_GETATTR;
594
595                 if (master && master_valid) {
596                         /*
597                          * lmv_intent_lookup() already checked
598                          * validness and took the lock.
599                          */
600                         if (mreq != NULL) {
601                                 body = req_capsule_server_get(&mreq->rq_pill,
602                                                               &RMF_MDT_BODY);
603                                 LASSERT(body != NULL);
604                                 goto update;
605                         }
606                         /* 
607                          * Take already cached attrs into account.
608                          */
609                         CDEBUG(D_INODE,
610                                "Master "DFID"is locked and cached\n",
611                                PFID(mid));
612                         goto release_lock;
613                 }
614
615                 /*
616                  * Prepare op_data for revalidating. Note that @fid2 shuld be
617                  * defined otherwise it will go to server and take new lock
618                  * which is what we reall not need here.
619                  */
620                 memset(op_data, 0, sizeof(*op_data));
621                 op_data->op_bias = MDS_CROSS_REF;
622                 op_data->op_fid1 = fid;
623                 op_data->op_fid2 = fid;
624                 req = NULL;
625
626                 tgt = lmv_get_target(lmv, obj->lo_stripes[i].ls_mds);
627                 if (IS_ERR(tgt))
628                         GOTO(cleanup, rc = PTR_ERR(tgt));
629
630                 CDEBUG(D_INODE, "Revalidate slave obj "DFID" -> mds #%d\n", 
631                        PFID(&fid), tgt->ltd_idx);
632
633                 rc = md_intent_lock(tgt->ltd_exp, op_data, NULL, 0, &it, 0, 
634                                     &req, cb, extra_lock_flags);
635
636                 lockh = (struct lustre_handle *)&it.d.lustre.it_lock_handle;
637                 if (rc > 0 && req == NULL) {
638                         /* 
639                          * Nice, this slave is valid.
640                          */
641                         CDEBUG(D_INODE, "Cached slave "DFID"\n", PFID(&fid));
642                         goto release_lock;
643                 }
644
645                 if (rc < 0)
646                         GOTO(cleanup, rc);
647
648                 if (master) {
649                         /* 
650                          * Save lock on master to be returned to the caller. 
651                          */
652                         CDEBUG(D_INODE, "No lock on master "DFID" yet\n", 
653                                PFID(mid));
654                         memcpy(&master_lockh, lockh, sizeof(master_lockh));
655                         master_lockm = it.d.lustre.it_lock_mode;
656                         it.d.lustre.it_lock_mode = 0;
657                 } else {
658                         /* 
659                          * This is slave. We want to control it. 
660                          */
661                         lock = ldlm_handle2lock(lockh);
662                         LASSERT(lock != NULL);
663                         lock->l_ast_data = lmv_object_get(obj);
664                         LDLM_LOCK_PUT(lock);
665                 }
666
667                 if (*reqp == NULL) {
668                         /*
669                          * This is first reply, we'll use it to return updated
670                          * data back to the caller.
671                          */
672                         LASSERT(req != NULL);
673                         ptlrpc_request_addref(req);
674                         *reqp = req;
675                 }
676
677                 body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
678                 LASSERT(body != NULL);
679
680 update:
681                 obj->lo_stripes[i].ls_size = body->size;
682
683                 CDEBUG(D_INODE, "Fresh size %lu from "DFID"\n",
684                        (unsigned long)obj->lo_stripes[i].ls_size, PFID(&fid));
685
686                 if (req)
687                         ptlrpc_req_finished(req);
688 release_lock:
689                 size += obj->lo_stripes[i].ls_size;
690
691                 if (it.d.lustre.it_lock_mode && lockh) {
692                         ldlm_lock_decref(lockh, it.d.lustre.it_lock_mode);
693                         it.d.lustre.it_lock_mode = 0;
694                 }
695         }
696
697         if (*reqp) {
698                 /*
699                  * Some attrs got refreshed, we have reply and it's time to put
700                  * fresh attrs to it.
701                  */
702                 CDEBUG(D_INODE, "Return refreshed attrs: size = %lu for "DFID"\n",
703                        (unsigned long)size, PFID(mid));
704
705                 body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY);
706                 LASSERT(body != NULL);
707                 body->size = size;
708
709                 if (mreq == NULL) {
710                         /*
711                          * Very important to maintain mds num the same because
712                          * of revalidation. mreq == NULL means that caller has
713                          * no reply and the only attr we can return is size.
714                          */
715                         body->valid = OBD_MD_FLSIZE;
716                 }
717                 if (master_valid == 0) {
718                         oit->d.lustre.it_lock_handle = master_lockh.cookie;
719                         oit->d.lustre.it_lock_mode = master_lockm;
720                 }
721                 rc = 0;
722         } else {
723                 /* 
724                  * It seems all the attrs are fresh and we did no request. 
725                  */
726                 CDEBUG(D_INODE, "All the attrs were fresh on "DFID"\n", 
727                        PFID(mid));
728                 if (master_valid == 0)
729                         oit->d.lustre.it_lock_mode = master_lockm;
730                 rc = 1;
731         }
732
733         EXIT;
734 cleanup:
735         OBD_FREE_PTR(op_data);
736         lmv_object_put_unlock(obj);
737         return rc;
738 }
739
740 int lmv_allocate_slaves(struct obd_device *obd, struct lu_fid *pid,
741                         struct md_op_data *op, struct lu_fid *fid)
742 {
743         struct lmv_obd          *lmv = &obd->u.lmv;
744         struct lmv_object       *obj;
745         mdsno_t                  mds;
746         int                      sidx;
747         int                      rc;
748         ENTRY;
749
750         obj = lmv_object_find(obd, pid);
751         if (obj == NULL)
752                 RETURN(-EALREADY);
753
754         sidx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
755                             (char *)op->op_name, op->op_namelen);
756         mds = obj->lo_stripes[sidx].ls_mds;
757         lmv_object_put(obj);
758
759         rc = __lmv_fid_alloc(lmv, fid, mds);
760         if (rc) {
761                 CERROR("Can't allocate fid, rc %d\n", rc);
762                 RETURN(rc);
763         }
764
765         CDEBUG(D_INODE, "Allocate new fid "DFID" for slave "
766                "obj -> mds #%x\n", PFID(fid), mds);
767
768         RETURN(rc);
769 }