Whamcloud - gitweb
LU-1303 lod: introduce lod device
[fs/lustre-release.git] / lustre / lmv / lmv_intent.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_LMV
38 #ifdef __KERNEL__
39 #include <linux/slab.h>
40 #include <linux/module.h>
41 #include <linux/init.h>
42 #include <linux/slab.h>
43 #include <linux/pagemap.h>
44 #include <asm/div64.h>
45 #include <linux/seq_file.h>
46 #include <linux/namei.h>
47 #include <linux/lustre_intent.h>
48 #else
49 #include <liblustre.h>
50 #endif
51
52 #include <obd_support.h>
53 #include <lustre/lustre_idl.h>
54 #include <lustre_lib.h>
55 #include <lustre_net.h>
56 #include <lustre_dlm.h>
57 #include <obd_class.h>
58 #include <lprocfs_status.h>
59 #include "lmv_internal.h"
60
61 int lmv_intent_remote(struct obd_export *exp, void *lmm,
62                       int lmmsize, struct lookup_intent *it,
63                       int flags, struct ptlrpc_request **reqp,
64                       ldlm_blocking_callback cb_blocking,
65                       int extra_lock_flags)
66 {
67         struct obd_device      *obd = exp->exp_obd;
68         struct lmv_obd         *lmv = &obd->u.lmv;
69         struct ptlrpc_request  *req = NULL;
70         struct lustre_handle    plock;
71         struct md_op_data      *op_data;
72         struct lmv_tgt_desc    *tgt;
73         struct mdt_body        *body;
74         int                     pmode;
75         int                     rc = 0;
76         ENTRY;
77
78         body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY);
79         if (body == NULL)
80                 RETURN(-EPROTO);
81
82         /*
83          * Not cross-ref case, just get out of here.
84          */
85         if (!(body->valid & OBD_MD_MDS))
86                 RETURN(0);
87
88         /*
89          * Unfortunately, we have to lie to MDC/MDS to retrieve
90          * attributes llite needs and provideproper locking.
91          */
92         if (it->it_op & IT_LOOKUP)
93                 it->it_op = IT_GETATTR;
94
95         /* 
96          * We got LOOKUP lock, but we really need attrs. 
97          */
98         pmode = it->d.lustre.it_lock_mode;
99         if (pmode) {
100                 plock.cookie = it->d.lustre.it_lock_handle;
101                 it->d.lustre.it_lock_mode = 0;
102                 it->d.lustre.it_data = NULL;
103         }
104
105         LASSERT(fid_is_sane(&body->fid1));
106
107         tgt = lmv_find_target(lmv, &body->fid1);
108         if (IS_ERR(tgt))
109                 GOTO(out, rc = PTR_ERR(tgt));
110
111         OBD_ALLOC_PTR(op_data);
112         if (op_data == NULL)
113                 GOTO(out, rc = -ENOMEM);
114
115         op_data->op_fid1 = body->fid1;
116         op_data->op_bias = MDS_CROSS_REF;
117         
118         CDEBUG(D_INODE, 
119                "REMOTE_INTENT with fid="DFID" -> mds #%d\n", 
120                PFID(&body->fid1), tgt->ltd_idx);
121
122         it->d.lustre.it_disposition &= ~DISP_ENQ_COMPLETE;
123         rc = md_intent_lock(tgt->ltd_exp, op_data, lmm, lmmsize, it,
124                             flags, &req, cb_blocking, extra_lock_flags);
125         if (rc)
126                 GOTO(out_free_op_data, rc);
127
128         /*
129          * LLite needs LOOKUP lock to track dentry revocation in order to
130          * maintain dcache consistency. Thus drop UPDATE lock here and put
131          * LOOKUP in request.
132          */
133         if (it->d.lustre.it_lock_mode != 0) {
134                 ldlm_lock_decref((void *)&it->d.lustre.it_lock_handle,
135                                  it->d.lustre.it_lock_mode);
136                 it->d.lustre.it_lock_mode = 0;
137         }
138         it->d.lustre.it_lock_handle = plock.cookie;
139         it->d.lustre.it_lock_mode = pmode;
140
141         EXIT;
142 out_free_op_data:
143         OBD_FREE_PTR(op_data);
144 out:
145         if (rc && pmode)
146                 ldlm_lock_decref(&plock, pmode);
147
148         ptlrpc_req_finished(*reqp);
149         *reqp = req;
150         return rc;
151 }
152
153 /*
154  * IT_OPEN is intended to open (and create, possible) an object. Parent (pid)
155  * may be split dir.
156  */
157 int lmv_intent_open(struct obd_export *exp, struct md_op_data *op_data,
158                     void *lmm, int lmmsize, struct lookup_intent *it,
159                     int flags, struct ptlrpc_request **reqp,
160                     ldlm_blocking_callback cb_blocking,
161                     int extra_lock_flags)
162 {
163         struct obd_device     *obd = exp->exp_obd;
164         struct lu_fid          rpid = op_data->op_fid1;
165         struct lmv_obd        *lmv = &obd->u.lmv;
166         struct md_op_data     *sop_data;
167         struct lmv_stripe_md  *mea;
168         struct lmv_tgt_desc   *tgt;
169         struct mdt_body       *body;
170         struct lmv_object     *obj;
171         int                    rc;
172         int                    loop = 0;
173         int                    sidx;
174         ENTRY;
175
176         OBD_ALLOC_PTR(sop_data);
177         if (sop_data == NULL)
178                 RETURN(-ENOMEM);
179
180         /* save op_data fro repeat case */
181         *sop_data = *op_data;
182
183 repeat:
184
185         ++loop;
186         LASSERT(loop <= 2);
187         obj = lmv_object_find(obd, &rpid);
188         if (obj) {
189                 /*
190                  * Directory is already split, so we have to forward request to
191                  * the right MDS.
192                  */
193                 sidx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
194                                        (char *)op_data->op_name,
195                                        op_data->op_namelen);
196
197                 rpid = obj->lo_stripes[sidx].ls_fid;
198
199                 sop_data->op_mds = obj->lo_stripes[sidx].ls_mds;
200                 tgt = lmv_get_target(lmv, sop_data->op_mds);
201                 sop_data->op_bias &= ~MDS_CHECK_SPLIT;
202                 lmv_object_put(obj);
203
204                 CDEBUG(D_INODE,
205                        "Choose slave dir ("DFID") -> mds #%d\n", 
206                        PFID(&rpid), tgt->ltd_idx);
207         } else {
208                 sop_data->op_bias |= MDS_CHECK_SPLIT;
209                 tgt = lmv_find_target(lmv, &rpid);
210                 sop_data->op_mds = tgt->ltd_idx;
211         }
212         if (IS_ERR(tgt))
213                 GOTO(out_free_sop_data, rc = PTR_ERR(tgt));
214
215         sop_data->op_fid1 = rpid;
216
217         if (it->it_op & IT_CREAT) {
218                 /*
219                  * For open with IT_CREATE and for IT_CREATE cases allocate new
220                  * fid and setup FLD for it.
221                  */
222                 sop_data->op_fid3 = sop_data->op_fid2;
223                 rc = lmv_fid_alloc(exp, &sop_data->op_fid2, sop_data);
224                 if (rc)
225                         GOTO(out_free_sop_data, rc);
226
227                 if (rc == -ERESTART)
228                         goto repeat;
229                 else if (rc)
230                         GOTO(out_free_sop_data, rc);
231         }
232
233         CDEBUG(D_INODE, 
234                "OPEN_INTENT with fid1="DFID", fid2="DFID", name='%s' -> mds #%d\n", 
235                PFID(&sop_data->op_fid1), PFID(&sop_data->op_fid2), 
236                sop_data->op_name, tgt->ltd_idx);
237
238         rc = md_intent_lock(tgt->ltd_exp, sop_data, lmm, lmmsize, it, flags,
239                             reqp, cb_blocking, extra_lock_flags);
240
241         if (rc == -ERESTART) {
242                 LASSERT(*reqp != NULL);
243                 DEBUG_REQ(D_WARNING|D_RPCTRACE, *reqp,
244                           "Got -ERESTART during open!\n");
245                 ptlrpc_req_finished(*reqp);
246                 *reqp = NULL;
247                 it->d.lustre.it_data = NULL;
248
249                 /*
250                  * Directory got split. Time to update local object and repeat
251                  * the request with proper MDS.
252                  */
253                 LASSERT(lu_fid_eq(&op_data->op_fid1, &rpid));
254                 rc = lmv_handle_split(exp, &rpid);
255                 if (rc == 0) {
256                         /* We should reallocate child FID. */
257                         rc = lmv_allocate_slaves(obd, &rpid, op_data,
258                                                  &sop_data->op_fid2);
259                         if (rc == 0)
260                                 goto repeat;
261                 }
262         }
263
264         if (rc != 0)
265                 GOTO(out_free_sop_data, rc);
266
267         /*
268          * Nothing is found, do not access body->fid1 as it is zero and thus
269          * pointless.
270          */
271         if ((it->d.lustre.it_disposition & DISP_LOOKUP_NEG) &&
272             !(it->d.lustre.it_disposition & DISP_OPEN_CREATE) &&
273             !(it->d.lustre.it_disposition & DISP_OPEN_OPEN))
274                 GOTO(out_free_sop_data, rc = 0);
275
276         /*
277          * Okay, MDS has returned success. Probably name has been resolved in
278          * remote inode.
279          */
280         rc = lmv_intent_remote(exp, lmm, lmmsize, it, flags, reqp,
281                                cb_blocking, extra_lock_flags);
282         if (rc != 0) {
283                 LASSERT(rc < 0);
284                 /*
285                  * This is possible, that some userspace application will try to
286                  * open file as directory and we will have -ENOTDIR here. As
287                  * this is normal situation, we should not print error here,
288                  * only debug info.
289                  */
290                 CDEBUG(D_INODE, "Can't handle remote %s: dir "DFID"("DFID"):"
291                        "%*s: %d\n", LL_IT2STR(it), PFID(&op_data->op_fid2),
292                        PFID(&rpid), op_data->op_namelen, op_data->op_name, rc);
293                 GOTO(out_free_sop_data, rc);
294         }
295
296         /* 
297          * Caller may use attrs MDS returns on IT_OPEN lock request so, we have
298          * to update them for split dir. 
299          */
300         body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY);
301         LASSERT(body != NULL);
302         
303         /* 
304          * Could not find object, FID is not present in response. 
305          */
306         if (!(body->valid & OBD_MD_FLID))
307                 GOTO(out_free_sop_data, rc = 0);
308
309         obj = lmv_object_find(obd, &body->fid1);
310         if (obj == NULL) {
311                 /* 
312                  * XXX: Capability for remote call! 
313                  */
314                 mea = lmv_get_mea(*reqp);
315                 if (mea != NULL) {
316                         obj = lmv_object_create(exp, &body->fid1, mea);
317                         if (IS_ERR(obj))
318                                 GOTO(out_free_sop_data, rc = (int)PTR_ERR(obj));
319                 }
320         }
321
322         if (obj) {
323                 /* 
324                  * This is split dir and we'd want to get attrs. 
325                  */
326                 CDEBUG(D_INODE, "Slave attributes for "DFID"\n",
327                        PFID(&body->fid1));
328
329                 rc = lmv_revalidate_slaves(exp, reqp, &body->fid1, it, 1,
330                                            cb_blocking, extra_lock_flags);
331                 lmv_object_put(obj);
332         }
333         EXIT;
334 out_free_sop_data:
335         OBD_FREE_PTR(sop_data);
336         return rc;
337 }
338
339 /*
340  * Handler for: getattr, lookup and revalidate cases.
341  */
342 int lmv_intent_lookup(struct obd_export *exp, struct md_op_data *op_data,
343                       void *lmm, int lmmsize, struct lookup_intent *it,
344                       int flags, struct ptlrpc_request **reqp,
345                       ldlm_blocking_callback cb_blocking,
346                       int extra_lock_flags)
347 {
348         struct obd_device      *obd = exp->exp_obd;
349         struct lu_fid           rpid = op_data->op_fid1;
350         struct lmv_obd         *lmv = &obd->u.lmv;
351         struct lmv_object      *obj = NULL;
352         struct md_op_data      *sop_data;
353         struct lmv_stripe_md   *mea;
354         struct lmv_tgt_desc    *tgt = NULL;
355         struct mdt_body        *body;
356         int                     sidx;
357         int                     loop = 0;
358         int                     rc = 0;
359         ENTRY;
360
361         OBD_ALLOC_PTR(sop_data);
362         if (sop_data == NULL)
363                 RETURN(-ENOMEM);
364
365         *sop_data = *op_data;
366
367 repeat:
368         ++loop;
369         LASSERT(loop <= 2);
370
371         obj = lmv_object_find(obd, &op_data->op_fid1);
372         if (obj && op_data->op_namelen) {
373                 sidx = raw_name2idx(obj->lo_hashtype,
374                                        obj->lo_objcount,
375                                        (char *)op_data->op_name,
376                                        op_data->op_namelen);
377                 rpid = obj->lo_stripes[sidx].ls_fid;
378                 tgt = lmv_get_target(lmv, 
379                                      obj->lo_stripes[sidx].ls_mds);
380                 CDEBUG(D_INODE,
381                        "Choose slave dir ("DFID") -> mds #%d\n", 
382                        PFID(&rpid), tgt->ltd_idx);
383                 sop_data->op_bias &= ~MDS_CHECK_SPLIT;
384         } else {
385                 tgt = lmv_find_target(lmv, &op_data->op_fid1);
386                 sop_data->op_bias |= MDS_CHECK_SPLIT;
387         }
388         if (obj)
389                 lmv_object_put(obj);
390         
391         if (IS_ERR(tgt))
392                 GOTO(out_free_sop_data, rc = PTR_ERR(tgt));
393         
394         if (!fid_is_sane(&sop_data->op_fid2))
395                 fid_zero(&sop_data->op_fid2);
396         
397         CDEBUG(D_INODE, 
398                "LOOKUP_INTENT with fid1="DFID", fid2="DFID
399                ", name='%s' -> mds #%d\n",
400                PFID(&sop_data->op_fid1), PFID(&sop_data->op_fid2), 
401                sop_data->op_name ? sop_data->op_name : "<NULL>", 
402                tgt->ltd_idx);
403
404         sop_data->op_bias &= ~MDS_CROSS_REF;
405         sop_data->op_fid1 = rpid;
406
407         rc = md_intent_lock(tgt->ltd_exp, sop_data, lmm, lmmsize, it, 
408                             flags, reqp, cb_blocking, extra_lock_flags);
409
410         if (rc == -ERESTART) {
411                 LASSERT(*reqp != NULL);
412                 DEBUG_REQ(D_WARNING|D_RPCTRACE, *reqp,
413                           "Got -ERESTART during lookup!\n");
414                 ptlrpc_req_finished(*reqp);
415                 *reqp = NULL;
416                 it->d.lustre.it_data = 0;
417
418                 /*
419                  * Directory got split since last update. This shouldn't be
420                  * because splitting causes lock revocation, so revalidate had
421                  * to fail and lookup on dir had to return mea.
422                  */
423                 LASSERT(obj == NULL);
424
425                 obj = lmv_object_create(exp, &rpid, NULL);
426                 if (IS_ERR(obj))
427                         GOTO(out_free_sop_data, rc = PTR_ERR(obj));
428                 lmv_object_put(obj);
429                 goto repeat;
430         }
431         
432         if (rc < 0)
433                 GOTO(out_free_sop_data, rc);
434
435         if (obj && rc > 0) {
436                 /*
437                  * This is split dir. In order to optimize things a bit, we
438                  * consider obj valid updating missing parts.
439                  */
440                 CDEBUG(D_INODE,
441                        "Revalidate slaves for "DFID", rc %d\n",
442                        PFID(&op_data->op_fid1), rc);
443
444                 LASSERT(fid_is_sane(&op_data->op_fid2));
445                 rc = lmv_revalidate_slaves(exp, reqp, &op_data->op_fid1, it, rc,
446                                            cb_blocking, extra_lock_flags);
447                 GOTO(out_free_sop_data, rc);
448         }
449
450         if (*reqp == NULL)
451                 GOTO(out_free_sop_data, rc);
452
453         /*
454          * MDS has returned success. Probably name has been resolved in
455          * remote inode. Let's check this.
456          */
457         rc = lmv_intent_remote(exp, lmm, lmmsize, it, flags,
458                                reqp, cb_blocking, extra_lock_flags);
459         if (rc < 0)
460                 GOTO(out_free_sop_data, rc);
461
462         /*
463          * Nothing is found, do not access body->fid1 as it is zero and thus
464          * pointless.
465          */
466         if (it->d.lustre.it_disposition & DISP_LOOKUP_NEG)
467                 GOTO(out_free_sop_data, rc = 0);
468
469         LASSERT(*reqp != NULL);
470         LASSERT((*reqp)->rq_repmsg != NULL);
471         body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY);
472         LASSERT(body != NULL);
473
474         /* 
475          * Could not find object, FID is not present in response. 
476          */
477         if (!(body->valid & OBD_MD_FLID))
478                 GOTO(out_free_sop_data, rc = 0);
479
480         obj = lmv_object_find(obd, &body->fid1);
481         if (obj == NULL) {
482                 /* 
483                  * XXX: Remote capability is not handled.
484                  */
485                 mea = lmv_get_mea(*reqp);
486                 if (mea != NULL) {
487                         obj = lmv_object_create(exp, &body->fid1, mea);
488                         if (IS_ERR(obj))
489                                 GOTO(out_free_sop_data, rc = (int)PTR_ERR(obj));
490                 }
491         } else {
492                 CDEBUG(D_INODE, "Slave attributes for "DFID", rc %d\n",
493                        PFID(&body->fid1), rc);
494
495                 rc = lmv_revalidate_slaves(exp, reqp, &body->fid1, it, 1,
496                                            cb_blocking, extra_lock_flags);
497                 lmv_object_put(obj);
498         }
499
500         EXIT;
501 out_free_sop_data:
502         OBD_FREE_PTR(sop_data);
503         return rc;
504 }
505
506 int lmv_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
507                     void *lmm, int lmmsize, struct lookup_intent *it,
508                     int flags, struct ptlrpc_request **reqp,
509                     ldlm_blocking_callback cb_blocking,
510                     int extra_lock_flags)
511 {
512         struct obd_device *obd = exp->exp_obd;
513         int                rc;
514         ENTRY;
515
516         LASSERT(it != NULL);
517         LASSERT(fid_is_sane(&op_data->op_fid1));
518
519         CDEBUG(D_INODE, "INTENT LOCK '%s' for '%*s' on "DFID"\n",
520                LL_IT2STR(it), op_data->op_namelen, op_data->op_name,
521                PFID(&op_data->op_fid1));
522
523         rc = lmv_check_connect(obd);
524         if (rc)
525                 RETURN(rc);
526
527         if (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_LAYOUT))
528                 rc = lmv_intent_lookup(exp, op_data, lmm, lmmsize, it,
529                                        flags, reqp, cb_blocking,
530                                        extra_lock_flags);
531         else if (it->it_op & IT_OPEN)
532                 rc = lmv_intent_open(exp, op_data, lmm, lmmsize, it,
533                                      flags, reqp, cb_blocking,
534                                      extra_lock_flags);
535         else
536                 LBUG();
537         RETURN(rc);
538 }
539
540 int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp,
541                           const struct lu_fid *mid, struct lookup_intent *oit,
542                           int master_valid, ldlm_blocking_callback cb_blocking,
543                           int extra_lock_flags)
544 {
545         struct obd_device      *obd = exp->exp_obd;
546         struct lmv_obd         *lmv = &obd->u.lmv;
547         int                     master_lockm = 0;
548         struct lustre_handle   *lockh = NULL;
549         struct ptlrpc_request  *mreq = *reqp;
550         struct lustre_handle    master_lockh = { 0 };
551         struct md_op_data      *op_data;
552         struct ldlm_lock       *lock;
553         unsigned long           size = 0;
554         struct mdt_body        *body;
555         struct lmv_object      *obj;
556         int                     i;
557         int                     rc = 0;
558         struct lu_fid           fid;
559         struct ptlrpc_request  *req;
560         ldlm_blocking_callback  cb;
561         struct lookup_intent    it;
562         struct lmv_tgt_desc    *tgt;
563         int                     master;
564         ENTRY;
565
566         CDEBUG(D_INODE, "Revalidate master obj "DFID"\n", PFID(mid));
567
568         OBD_ALLOC_PTR(op_data);
569         if (op_data == NULL)
570                 RETURN(-ENOMEM);
571
572         /*
573          * We have to loop over the subobjects, check validity and update them
574          * from MDS if needed. It's very useful that we need not to update all
575          * the fields. Say, common fields (that are equal on all the subojects
576          * need not to be update, another fields (i_size, for example) are
577          * cached all the time.
578          */
579         obj = lmv_object_find_lock(obd, mid);
580         if (obj == NULL) {
581                 OBD_FREE_PTR(op_data);
582                 RETURN(-EALREADY);
583         }
584
585         for (i = 0; i < obj->lo_objcount; i++) {
586                 fid = obj->lo_stripes[i].ls_fid;
587                 master = lu_fid_eq(&fid, &obj->lo_fid);
588                 cb = master ? cb_blocking : lmv_blocking_ast;
589
590                 /*
591                  * We need i_size and we would like to check possible cached locks, 
592                  * so this is is IT_GETATTR intent.
593                  */
594                 memset(&it, 0, sizeof(it));
595                 it.it_op = IT_GETATTR;
596
597                 if (master && master_valid) {
598                         /*
599                          * lmv_intent_lookup() already checked
600                          * validness and took the lock.
601                          */
602                         if (mreq != NULL) {
603                                 body = req_capsule_server_get(&mreq->rq_pill,
604                                                               &RMF_MDT_BODY);
605                                 LASSERT(body != NULL);
606                                 goto update;
607                         }
608                         /* 
609                          * Take already cached attrs into account.
610                          */
611                         CDEBUG(D_INODE,
612                                "Master "DFID"is locked and cached\n",
613                                PFID(mid));
614                         goto release_lock;
615                 }
616
617                 /*
618                  * Prepare op_data for revalidating. Note that @fid2 shuld be
619                  * defined otherwise it will go to server and take new lock
620                  * which is what we reall not need here.
621                  */
622                 memset(op_data, 0, sizeof(*op_data));
623                 op_data->op_bias = MDS_CROSS_REF;
624                 op_data->op_fid1 = fid;
625                 op_data->op_fid2 = fid;
626                 req = NULL;
627
628                 tgt = lmv_get_target(lmv, obj->lo_stripes[i].ls_mds);
629                 if (IS_ERR(tgt))
630                         GOTO(cleanup, rc = PTR_ERR(tgt));
631
632                 CDEBUG(D_INODE, "Revalidate slave obj "DFID" -> mds #%d\n", 
633                        PFID(&fid), tgt->ltd_idx);
634
635                 rc = md_intent_lock(tgt->ltd_exp, op_data, NULL, 0, &it, 0, 
636                                     &req, cb, extra_lock_flags);
637
638                 lockh = (struct lustre_handle *)&it.d.lustre.it_lock_handle;
639                 if (rc > 0 && req == NULL) {
640                         /* 
641                          * Nice, this slave is valid.
642                          */
643                         CDEBUG(D_INODE, "Cached slave "DFID"\n", PFID(&fid));
644                         goto release_lock;
645                 }
646
647                 if (rc < 0)
648                         GOTO(cleanup, rc);
649
650                 if (master) {
651                         /* 
652                          * Save lock on master to be returned to the caller. 
653                          */
654                         CDEBUG(D_INODE, "No lock on master "DFID" yet\n", 
655                                PFID(mid));
656                         memcpy(&master_lockh, lockh, sizeof(master_lockh));
657                         master_lockm = it.d.lustre.it_lock_mode;
658                         it.d.lustre.it_lock_mode = 0;
659                 } else {
660                         /* 
661                          * This is slave. We want to control it. 
662                          */
663                         lock = ldlm_handle2lock(lockh);
664                         LASSERT(lock != NULL);
665                         lock->l_ast_data = lmv_object_get(obj);
666                         LDLM_LOCK_PUT(lock);
667                 }
668
669                 if (*reqp == NULL) {
670                         /*
671                          * This is first reply, we'll use it to return updated
672                          * data back to the caller.
673                          */
674                         LASSERT(req != NULL);
675                         ptlrpc_request_addref(req);
676                         *reqp = req;
677                 }
678
679                 body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
680                 LASSERT(body != NULL);
681
682 update:
683                 obj->lo_stripes[i].ls_size = body->size;
684
685                 CDEBUG(D_INODE, "Fresh size %lu from "DFID"\n",
686                        (unsigned long)obj->lo_stripes[i].ls_size, PFID(&fid));
687
688                 if (req)
689                         ptlrpc_req_finished(req);
690 release_lock:
691                 size += obj->lo_stripes[i].ls_size;
692
693                 if (it.d.lustre.it_lock_mode && lockh) {
694                         ldlm_lock_decref(lockh, it.d.lustre.it_lock_mode);
695                         it.d.lustre.it_lock_mode = 0;
696                 }
697         }
698
699         if (*reqp) {
700                 /*
701                  * Some attrs got refreshed, we have reply and it's time to put
702                  * fresh attrs to it.
703                  */
704                 CDEBUG(D_INODE, "Return refreshed attrs: size = %lu for "DFID"\n",
705                        (unsigned long)size, PFID(mid));
706
707                 body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY);
708                 LASSERT(body != NULL);
709                 body->size = size;
710
711                 if (mreq == NULL) {
712                         /*
713                          * Very important to maintain mds num the same because
714                          * of revalidation. mreq == NULL means that caller has
715                          * no reply and the only attr we can return is size.
716                          */
717                         body->valid = OBD_MD_FLSIZE;
718                 }
719                 if (master_valid == 0) {
720                         oit->d.lustre.it_lock_handle = master_lockh.cookie;
721                         oit->d.lustre.it_lock_mode = master_lockm;
722                 }
723                 rc = 0;
724         } else {
725                 /* 
726                  * It seems all the attrs are fresh and we did no request. 
727                  */
728                 CDEBUG(D_INODE, "All the attrs were fresh on "DFID"\n", 
729                        PFID(mid));
730                 if (master_valid == 0)
731                         oit->d.lustre.it_lock_mode = master_lockm;
732                 rc = 1;
733         }
734
735         EXIT;
736 cleanup:
737         OBD_FREE_PTR(op_data);
738         lmv_object_put_unlock(obj);
739         return rc;
740 }
741
742 int lmv_allocate_slaves(struct obd_device *obd, struct lu_fid *pid,
743                         struct md_op_data *op, struct lu_fid *fid)
744 {
745         struct lmv_obd          *lmv = &obd->u.lmv;
746         struct lmv_object       *obj;
747         mdsno_t                  mds;
748         int                      sidx;
749         int                      rc;
750         ENTRY;
751
752         obj = lmv_object_find(obd, pid);
753         if (obj == NULL)
754                 RETURN(-EALREADY);
755
756         sidx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
757                             (char *)op->op_name, op->op_namelen);
758         mds = obj->lo_stripes[sidx].ls_mds;
759         lmv_object_put(obj);
760
761         rc = __lmv_fid_alloc(lmv, fid, mds);
762         if (rc) {
763                 CERROR("Can't allocate fid, rc %d\n", rc);
764                 RETURN(rc);
765         }
766
767         CDEBUG(D_INODE, "Allocate new fid "DFID" for slave "
768                "obj -> mds #%x\n", PFID(fid), mds);
769
770         RETURN(rc);
771 }