Whamcloud - gitweb
b=20583 Do not create more than pre-created objects for sanity_27y
[fs/lustre-release.git] / lustre / lmv / lmv_intent.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_LMV
41 #ifdef __KERNEL__
42 #include <linux/slab.h>
43 #include <linux/module.h>
44 #include <linux/init.h>
45 #include <linux/slab.h>
46 #include <linux/pagemap.h>
47 #include <asm/div64.h>
48 #include <linux/seq_file.h>
49 #include <linux/namei.h>
50 # ifndef HAVE_VFS_INTENT_PATCHES
51 # include <linux/lustre_intent.h>
52 # endif
53 #else
54 #include <liblustre.h>
55 #endif
56
57 #include <lustre/lustre_idl.h>
58 #include <obd_support.h>
59 #include <lustre_lib.h>
60 #include <lustre_net.h>
61 #include <lustre_dlm.h>
62 #include <obd_class.h>
63 #include <lprocfs_status.h>
64 #include "lmv_internal.h"
65
66 int lmv_intent_remote(struct obd_export *exp, void *lmm,
67                       int lmmsize, struct lookup_intent *it,
68                       int flags, struct ptlrpc_request **reqp,
69                       ldlm_blocking_callback cb_blocking,
70                       int extra_lock_flags)
71 {
72         struct obd_device      *obd = exp->exp_obd;
73         struct lmv_obd         *lmv = &obd->u.lmv;
74         struct ptlrpc_request  *req = NULL;
75         struct lustre_handle    plock;
76         struct md_op_data      *op_data;
77         struct lmv_tgt_desc    *tgt;
78         struct mdt_body        *body;
79         int                     pmode;
80         int                     rc = 0;
81         ENTRY;
82
83         body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY);
84         if (body == NULL)
85                 RETURN(-EPROTO);
86
87         /*
88          * Not cross-ref case, just get out of here.
89          */
90         if (!(body->valid & OBD_MD_MDS))
91                 RETURN(0);
92
93         /*
94          * Unfortunately, we have to lie to MDC/MDS to retrieve
95          * attributes llite needs and provideproper locking.
96          */
97         if (it->it_op & IT_LOOKUP)
98                 it->it_op = IT_GETATTR;
99
100         /* 
101          * We got LOOKUP lock, but we really need attrs. 
102          */
103         pmode = it->d.lustre.it_lock_mode;
104         if (pmode) {
105                 plock.cookie = it->d.lustre.it_lock_handle;
106                 it->d.lustre.it_lock_mode = 0;
107                 it->d.lustre.it_data = NULL;
108         }
109
110         LASSERT(fid_is_sane(&body->fid1));
111
112         tgt = lmv_find_target(lmv, &body->fid1);
113         if (IS_ERR(tgt))
114                 GOTO(out, rc = PTR_ERR(tgt));
115
116         OBD_ALLOC_PTR(op_data);
117         if (op_data == NULL)
118                 GOTO(out, rc = -ENOMEM);
119
120         op_data->op_fid1 = body->fid1;
121         op_data->op_bias = MDS_CROSS_REF;
122         
123         CDEBUG(D_INODE, 
124                "REMOTE_INTENT with fid="DFID" -> mds #%d\n", 
125                PFID(&body->fid1), tgt->ltd_idx);
126
127         it->d.lustre.it_disposition &= ~DISP_ENQ_COMPLETE;
128         rc = md_intent_lock(tgt->ltd_exp, op_data, lmm, lmmsize, it,
129                             flags, &req, cb_blocking, extra_lock_flags);
130         if (rc)
131                 GOTO(out_free_op_data, rc);
132
133         /*
134          * LLite needs LOOKUP lock to track dentry revocation in order to
135          * maintain dcache consistency. Thus drop UPDATE lock here and put
136          * LOOKUP in request.
137          */
138         if (it->d.lustre.it_lock_mode != 0) {
139                 ldlm_lock_decref((void *)&it->d.lustre.it_lock_handle,
140                                  it->d.lustre.it_lock_mode);
141                 it->d.lustre.it_lock_mode = 0;
142         }
143         it->d.lustre.it_lock_handle = plock.cookie;
144         it->d.lustre.it_lock_mode = pmode;
145
146         EXIT;
147 out_free_op_data:
148         OBD_FREE_PTR(op_data);
149 out:
150         if (rc && pmode)
151                 ldlm_lock_decref(&plock, pmode);
152
153         ptlrpc_req_finished(*reqp);
154         *reqp = req;
155         return rc;
156 }
157
158 /*
159  * IT_OPEN is intended to open (and create, possible) an object. Parent (pid)
160  * may be split dir.
161  */
162 int lmv_intent_open(struct obd_export *exp, struct md_op_data *op_data,
163                     void *lmm, int lmmsize, struct lookup_intent *it,
164                     int flags, struct ptlrpc_request **reqp,
165                     ldlm_blocking_callback cb_blocking,
166                     int extra_lock_flags)
167 {
168         struct obd_device     *obd = exp->exp_obd;
169         struct lu_fid          rpid = op_data->op_fid1;
170         struct lmv_obd        *lmv = &obd->u.lmv;
171         struct md_op_data     *sop_data;
172         struct lmv_stripe_md  *mea;
173         struct lmv_tgt_desc   *tgt;
174         struct mdt_body       *body;
175         struct lmv_object     *obj;
176         int                    rc;
177         int                    loop = 0;
178         int                    sidx;
179         ENTRY;
180
181         OBD_ALLOC_PTR(sop_data);
182         if (sop_data == NULL)
183                 RETURN(-ENOMEM);
184
185         /* save op_data fro repeat case */
186         *sop_data = *op_data;
187
188 repeat:
189
190         ++loop;
191         LASSERT(loop <= 2);
192         obj = lmv_object_find(obd, &rpid);
193         if (obj) {
194                 /*
195                  * Directory is already split, so we have to forward request to
196                  * the right MDS.
197                  */
198                 sidx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
199                                        (char *)op_data->op_name,
200                                        op_data->op_namelen);
201
202                 rpid = obj->lo_stripes[sidx].ls_fid;
203
204                 sop_data->op_mds = obj->lo_stripes[sidx].ls_mds;
205                 tgt = lmv_get_target(lmv, sop_data->op_mds);
206                 sop_data->op_bias &= ~MDS_CHECK_SPLIT;
207                 lmv_object_put(obj);
208
209                 CDEBUG(D_INODE,
210                        "Choose slave dir ("DFID") -> mds #%d\n", 
211                        PFID(&rpid), tgt->ltd_idx);
212         } else {
213                 sop_data->op_bias |= MDS_CHECK_SPLIT;
214                 tgt = lmv_find_target(lmv, &rpid);
215                 sop_data->op_mds = tgt->ltd_idx;
216         }
217         if (IS_ERR(tgt))
218                 GOTO(out_free_sop_data, rc = PTR_ERR(tgt));
219
220         sop_data->op_fid1 = rpid;
221
222         if (it->it_op & IT_CREAT) {
223                 /*
224                  * For open with IT_CREATE and for IT_CREATE cases allocate new
225                  * fid and setup FLD for it.
226                  */
227                 sop_data->op_fid3 = sop_data->op_fid2;
228                 rc = lmv_fid_alloc(exp, &sop_data->op_fid2, sop_data);
229                 if (rc)
230                         GOTO(out_free_sop_data, rc);
231
232                 if (rc == -ERESTART)
233                         goto repeat;
234                 else if (rc)
235                         GOTO(out_free_sop_data, rc);
236         }
237
238         CDEBUG(D_INODE, 
239                "OPEN_INTENT with fid1="DFID", fid2="DFID", name='%s' -> mds #%d\n", 
240                PFID(&sop_data->op_fid1), PFID(&sop_data->op_fid2), 
241                sop_data->op_name, tgt->ltd_idx);
242
243         rc = md_intent_lock(tgt->ltd_exp, sop_data, lmm, lmmsize, it, flags,
244                             reqp, cb_blocking, extra_lock_flags);
245
246         if (rc == -ERESTART) {
247                 LASSERT(*reqp != NULL);
248                 DEBUG_REQ(D_WARNING|D_RPCTRACE, *reqp,
249                           "Got -ERESTART during open!\n");
250                 ptlrpc_req_finished(*reqp);
251                 *reqp = NULL;
252                 it->d.lustre.it_data = NULL;
253
254                 /*
255                  * Directory got split. Time to update local object and repeat
256                  * the request with proper MDS.
257                  */
258                 LASSERT(lu_fid_eq(&op_data->op_fid1, &rpid));
259                 rc = lmv_handle_split(exp, &rpid);
260                 if (rc == 0) {
261                         /* We should reallocate child FID. */
262                         rc = lmv_allocate_slaves(obd, &rpid, op_data,
263                                                  &sop_data->op_fid2);
264                         if (rc == 0)
265                                 goto repeat;
266                 }
267         }
268
269         if (rc != 0)
270                 GOTO(out_free_sop_data, rc);
271
272         /*
273          * Nothing is found, do not access body->fid1 as it is zero and thus
274          * pointless.
275          */
276         if ((it->d.lustre.it_disposition & DISP_LOOKUP_NEG) &&
277             !(it->d.lustre.it_disposition & DISP_OPEN_CREATE) &&
278             !(it->d.lustre.it_disposition & DISP_OPEN_OPEN))
279                 GOTO(out_free_sop_data, rc = 0);
280
281         /*
282          * Okay, MDS has returned success. Probably name has been resolved in
283          * remote inode.
284          */
285         rc = lmv_intent_remote(exp, lmm, lmmsize, it, flags, reqp,
286                                cb_blocking, extra_lock_flags);
287         if (rc != 0) {
288                 LASSERT(rc < 0);
289                 /*
290                  * This is possible, that some userspace application will try to
291                  * open file as directory and we will have -ENOTDIR here. As
292                  * this is normal situation, we should not print error here,
293                  * only debug info.
294                  */
295                 CDEBUG(D_INODE, "Can't handle remote %s: dir "DFID"("DFID"):"
296                        "%*s: %d\n", LL_IT2STR(it), PFID(&op_data->op_fid2),
297                        PFID(&rpid), op_data->op_namelen, op_data->op_name, rc);
298                 GOTO(out_free_sop_data, rc);
299         }
300
301         /* 
302          * Caller may use attrs MDS returns on IT_OPEN lock request so, we have
303          * to update them for split dir. 
304          */
305         body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY);
306         LASSERT(body != NULL);
307         
308         /* 
309          * Could not find object, FID is not present in response. 
310          */
311         if (!(body->valid & OBD_MD_FLID))
312                 GOTO(out_free_sop_data, rc = 0);
313
314         obj = lmv_object_find(obd, &body->fid1);
315         if (obj == NULL) {
316                 /* 
317                  * XXX: Capability for remote call! 
318                  */
319                 mea = lmv_get_mea(*reqp);
320                 if (mea != NULL) {
321                         obj = lmv_object_create(exp, &body->fid1, mea);
322                         if (IS_ERR(obj))
323                                 GOTO(out_free_sop_data, rc = (int)PTR_ERR(obj));
324                 }
325         }
326
327         if (obj) {
328                 /* 
329                  * This is split dir and we'd want to get attrs. 
330                  */
331                 CDEBUG(D_INODE, "Slave attributes for "DFID"\n",
332                        PFID(&body->fid1));
333
334                 rc = lmv_revalidate_slaves(exp, reqp, &body->fid1, it, 1,
335                                            cb_blocking, extra_lock_flags);
336                 lmv_object_put(obj);
337         }
338         EXIT;
339 out_free_sop_data:
340         OBD_FREE_PTR(sop_data);
341         return rc;
342 }
343
344 /*
345  * Handler for: getattr, lookup and revalidate cases.
346  */
347 int lmv_intent_lookup(struct obd_export *exp, struct md_op_data *op_data,
348                       void *lmm, int lmmsize, struct lookup_intent *it,
349                       int flags, struct ptlrpc_request **reqp,
350                       ldlm_blocking_callback cb_blocking,
351                       int extra_lock_flags)
352 {
353         struct obd_device      *obd = exp->exp_obd;
354         struct lu_fid           rpid = op_data->op_fid1;
355         struct lmv_obd         *lmv = &obd->u.lmv;
356         struct lmv_object      *obj = NULL;
357         struct md_op_data      *sop_data;
358         struct lmv_stripe_md   *mea;
359         struct lmv_tgt_desc    *tgt = NULL;
360         struct mdt_body        *body;
361         int                     sidx;
362         int                     loop = 0;
363         int                     rc = 0;
364         ENTRY;
365
366         OBD_ALLOC_PTR(sop_data);
367         if (sop_data == NULL)
368                 RETURN(-ENOMEM);
369
370         *sop_data = *op_data;
371
372 repeat:
373         ++loop;
374         LASSERT(loop <= 2);
375
376         obj = lmv_object_find(obd, &op_data->op_fid1);
377         if (obj && op_data->op_namelen) {
378                 sidx = raw_name2idx(obj->lo_hashtype,
379                                        obj->lo_objcount,
380                                        (char *)op_data->op_name,
381                                        op_data->op_namelen);
382                 rpid = obj->lo_stripes[sidx].ls_fid;
383                 tgt = lmv_get_target(lmv, 
384                                      obj->lo_stripes[sidx].ls_mds);
385                 CDEBUG(D_INODE,
386                        "Choose slave dir ("DFID") -> mds #%d\n", 
387                        PFID(&rpid), tgt->ltd_idx);
388                 sop_data->op_bias &= ~MDS_CHECK_SPLIT;
389         } else {
390                 tgt = lmv_find_target(lmv, &op_data->op_fid1);
391                 sop_data->op_bias |= MDS_CHECK_SPLIT;
392         }
393         if (obj)
394                 lmv_object_put(obj);
395         
396         if (IS_ERR(tgt))
397                 GOTO(out_free_sop_data, rc = PTR_ERR(tgt));
398         
399         if (!fid_is_sane(&sop_data->op_fid2))
400                 fid_zero(&sop_data->op_fid2);
401         
402         CDEBUG(D_INODE, 
403                "LOOKUP_INTENT with fid1="DFID", fid2="DFID
404                ", name='%s' -> mds #%d\n",
405                PFID(&sop_data->op_fid1), PFID(&sop_data->op_fid2), 
406                sop_data->op_name ? sop_data->op_name : "<NULL>", 
407                tgt->ltd_idx);
408
409         sop_data->op_bias &= ~MDS_CROSS_REF;
410         sop_data->op_fid1 = rpid;
411
412         rc = md_intent_lock(tgt->ltd_exp, sop_data, lmm, lmmsize, it, 
413                             flags, reqp, cb_blocking, extra_lock_flags);
414
415         if (rc == -ERESTART) {
416                 LASSERT(*reqp != NULL);
417                 DEBUG_REQ(D_WARNING|D_RPCTRACE, *reqp,
418                           "Got -ERESTART during lookup!\n");
419                 ptlrpc_req_finished(*reqp);
420                 *reqp = NULL;
421                 it->d.lustre.it_data = 0;
422
423                 /*
424                  * Directory got split since last update. This shouldn't be
425                  * because splitting causes lock revocation, so revalidate had
426                  * to fail and lookup on dir had to return mea.
427                  */
428                 LASSERT(obj == NULL);
429
430                 obj = lmv_object_create(exp, &rpid, NULL);
431                 if (IS_ERR(obj))
432                         GOTO(out_free_sop_data, rc = PTR_ERR(obj));
433                 lmv_object_put(obj);
434                 goto repeat;
435         }
436         
437         if (rc < 0)
438                 GOTO(out_free_sop_data, rc);
439
440         if (obj && rc > 0) {
441                 /*
442                  * This is split dir. In order to optimize things a bit, we
443                  * consider obj valid updating missing parts.
444                  */
445                 CDEBUG(D_INODE,
446                        "Revalidate slaves for "DFID", rc %d\n",
447                        PFID(&op_data->op_fid1), rc);
448
449                 LASSERT(fid_is_sane(&op_data->op_fid2));
450                 rc = lmv_revalidate_slaves(exp, reqp, &op_data->op_fid1, it, rc,
451                                            cb_blocking, extra_lock_flags);
452                 GOTO(out_free_sop_data, rc);
453         }
454
455         if (*reqp == NULL)
456                 GOTO(out_free_sop_data, rc);
457
458         /*
459          * MDS has returned success. Probably name has been resolved in
460          * remote inode. Let's check this.
461          */
462         rc = lmv_intent_remote(exp, lmm, lmmsize, it, flags,
463                                reqp, cb_blocking, extra_lock_flags);
464         if (rc < 0)
465                 GOTO(out_free_sop_data, rc);
466
467         /*
468          * Nothing is found, do not access body->fid1 as it is zero and thus
469          * pointless.
470          */
471         if (it->d.lustre.it_disposition & DISP_LOOKUP_NEG)
472                 GOTO(out_free_sop_data, rc = 0);
473
474         LASSERT(*reqp != NULL);
475         LASSERT((*reqp)->rq_repmsg != NULL);
476         body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY);
477         LASSERT(body != NULL);
478
479         /* 
480          * Could not find object, FID is not present in response. 
481          */
482         if (!(body->valid & OBD_MD_FLID))
483                 GOTO(out_free_sop_data, rc = 0);
484
485         obj = lmv_object_find(obd, &body->fid1);
486         if (obj == NULL) {
487                 /* 
488                  * XXX: Remote capability is not handled.
489                  */
490                 mea = lmv_get_mea(*reqp);
491                 if (mea != NULL) {
492                         obj = lmv_object_create(exp, &body->fid1, mea);
493                         if (IS_ERR(obj))
494                                 GOTO(out_free_sop_data, rc = (int)PTR_ERR(obj));
495                 }
496         } else {
497                 CDEBUG(D_INODE, "Slave attributes for "DFID", rc %d\n",
498                        PFID(&body->fid1), rc);
499
500                 rc = lmv_revalidate_slaves(exp, reqp, &body->fid1, it, 1,
501                                            cb_blocking, extra_lock_flags);
502                 lmv_object_put(obj);
503         }
504
505         EXIT;
506 out_free_sop_data:
507         OBD_FREE_PTR(sop_data);
508         return rc;
509 }
510
511 int lmv_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
512                     void *lmm, int lmmsize, struct lookup_intent *it,
513                     int flags, struct ptlrpc_request **reqp,
514                     ldlm_blocking_callback cb_blocking,
515                     int extra_lock_flags)
516 {
517         struct obd_device *obd = exp->exp_obd;
518         int                rc;
519         ENTRY;
520
521         LASSERT(it != NULL);
522         LASSERT(fid_is_sane(&op_data->op_fid1));
523
524         CDEBUG(D_INODE, "INTENT LOCK '%s' for '%*s' on "DFID"\n",
525                LL_IT2STR(it), op_data->op_namelen, op_data->op_name,
526                PFID(&op_data->op_fid1));
527
528         rc = lmv_check_connect(obd);
529         if (rc)
530                 RETURN(rc);
531
532         if (it->it_op & (IT_LOOKUP | IT_GETATTR))
533                 rc = lmv_intent_lookup(exp, op_data, lmm, lmmsize, it,
534                                        flags, reqp, cb_blocking,
535                                        extra_lock_flags);
536         else if (it->it_op & IT_OPEN)
537                 rc = lmv_intent_open(exp, op_data, lmm, lmmsize, it,
538                                      flags, reqp, cb_blocking,
539                                      extra_lock_flags);
540         else
541                 LBUG();
542         RETURN(rc);
543 }
544
545 int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp,
546                           const struct lu_fid *mid, struct lookup_intent *oit,
547                           int master_valid, ldlm_blocking_callback cb_blocking,
548                           int extra_lock_flags)
549 {
550         struct obd_device      *obd = exp->exp_obd;
551         struct lmv_obd         *lmv = &obd->u.lmv;
552         int                     master_lockm = 0;
553         struct lustre_handle   *lockh = NULL;
554         struct ptlrpc_request  *mreq = *reqp;
555         struct lustre_handle    master_lockh;
556         struct md_op_data      *op_data;
557         struct ldlm_lock       *lock;
558         unsigned long           size = 0;
559         struct mdt_body        *body;
560         struct lmv_object      *obj;
561         int                     i;
562         int                     rc = 0;
563         struct lu_fid           fid;
564         struct ptlrpc_request  *req;
565         ldlm_blocking_callback  cb;
566         struct lookup_intent    it;
567         struct lmv_tgt_desc    *tgt;
568         int                     master;
569         ENTRY;
570
571         CDEBUG(D_INODE, "Revalidate master obj "DFID"\n", PFID(mid));
572
573         OBD_ALLOC_PTR(op_data);
574         if (op_data == NULL)
575                 RETURN(-ENOMEM);
576
577         /*
578          * We have to loop over the subobjects, check validity and update them
579          * from MDS if needed. It's very useful that we need not to update all
580          * the fields. Say, common fields (that are equal on all the subojects
581          * need not to be update, another fields (i_size, for example) are
582          * cached all the time.
583          */
584         obj = lmv_object_find_lock(obd, mid);
585         if (obj == NULL)
586                 RETURN(-EALREADY);
587
588         for (i = 0; i < obj->lo_objcount; i++) {
589                 fid = obj->lo_stripes[i].ls_fid;
590                 master = lu_fid_eq(&fid, &obj->lo_fid);
591                 cb = master ? cb_blocking : lmv_blocking_ast;
592
593                 /*
594                  * We need i_size and we would like to check possible cached locks, 
595                  * so this is is IT_GETATTR intent.
596                  */
597                 memset(&it, 0, sizeof(it));
598                 it.it_op = IT_GETATTR;
599
600                 if (master && master_valid) {
601                         /*
602                          * lmv_intent_lookup() already checked
603                          * validness and took the lock.
604                          */
605                         if (mreq != NULL) {
606                                 body = req_capsule_server_get(&mreq->rq_pill,
607                                                               &RMF_MDT_BODY);
608                                 LASSERT(body != NULL);
609                                 goto update;
610                         }
611                         /* 
612                          * Take already cached attrs into account.
613                          */
614                         CDEBUG(D_INODE,
615                                "Master "DFID"is locked and cached\n",
616                                PFID(mid));
617                         goto release_lock;
618                 }
619
620                 /*
621                  * Prepare op_data for revalidating. Note that @fid2 shuld be
622                  * defined otherwise it will go to server and take new lock
623                  * which is what we reall not need here.
624                  */
625                 memset(op_data, 0, sizeof(*op_data));
626                 op_data->op_bias = MDS_CROSS_REF;
627                 op_data->op_fid1 = fid;
628                 op_data->op_fid2 = fid;
629                 req = NULL;
630
631                 tgt = lmv_get_target(lmv, obj->lo_stripes[i].ls_mds);
632                 if (IS_ERR(tgt))
633                         GOTO(cleanup, rc = PTR_ERR(tgt));
634
635                 CDEBUG(D_INODE, "Revalidate slave obj "DFID" -> mds #%d\n", 
636                        PFID(&fid), tgt->ltd_idx);
637
638                 rc = md_intent_lock(tgt->ltd_exp, op_data, NULL, 0, &it, 0, 
639                                     &req, cb, extra_lock_flags);
640
641                 lockh = (struct lustre_handle *)&it.d.lustre.it_lock_handle;
642                 if (rc > 0 && req == NULL) {
643                         /* 
644                          * Nice, this slave is valid.
645                          */
646                         CDEBUG(D_INODE, "Cached slave "DFID"\n", PFID(&fid));
647                         goto release_lock;
648                 }
649
650                 if (rc < 0)
651                         GOTO(cleanup, rc);
652
653                 if (master) {
654                         /* 
655                          * Save lock on master to be returned to the caller. 
656                          */
657                         CDEBUG(D_INODE, "No lock on master "DFID" yet\n", 
658                                PFID(mid));
659                         memcpy(&master_lockh, lockh, sizeof(master_lockh));
660                         master_lockm = it.d.lustre.it_lock_mode;
661                         it.d.lustre.it_lock_mode = 0;
662                 } else {
663                         /* 
664                          * This is slave. We want to control it. 
665                          */
666                         lock = ldlm_handle2lock(lockh);
667                         LASSERT(lock != NULL);
668                         lock->l_ast_data = lmv_object_get(obj);
669                         LDLM_LOCK_PUT(lock);
670                 }
671
672                 if (*reqp == NULL) {
673                         /*
674                          * This is first reply, we'll use it to return updated
675                          * data back to the caller.
676                          */
677                         LASSERT(req != NULL);
678                         ptlrpc_request_addref(req);
679                         *reqp = req;
680                 }
681
682                 body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
683                 LASSERT(body != NULL);
684
685 update:
686                 obj->lo_stripes[i].ls_size = body->size;
687
688                 CDEBUG(D_INODE, "Fresh size %lu from "DFID"\n",
689                        (unsigned long)obj->lo_stripes[i].ls_size, PFID(&fid));
690
691                 if (req)
692                         ptlrpc_req_finished(req);
693 release_lock:
694                 size += obj->lo_stripes[i].ls_size;
695
696                 if (it.d.lustre.it_lock_mode && lockh) {
697                         ldlm_lock_decref(lockh, it.d.lustre.it_lock_mode);
698                         it.d.lustre.it_lock_mode = 0;
699                 }
700         }
701
702         if (*reqp) {
703                 /*
704                  * Some attrs got refreshed, we have reply and it's time to put
705                  * fresh attrs to it.
706                  */
707                 CDEBUG(D_INODE, "Return refreshed attrs: size = %lu for "DFID"\n",
708                        (unsigned long)size, PFID(mid));
709
710                 body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY);
711                 LASSERT(body != NULL);
712                 body->size = size;
713
714                 if (mreq == NULL) {
715                         /*
716                          * Very important to maintain mds num the same because
717                          * of revalidation. mreq == NULL means that caller has
718                          * no reply and the only attr we can return is size.
719                          */
720                         body->valid = OBD_MD_FLSIZE;
721                 }
722                 if (master_valid == 0) {
723                         oit->d.lustre.it_lock_handle = master_lockh.cookie;
724                         oit->d.lustre.it_lock_mode = master_lockm;
725                 }
726                 rc = 0;
727         } else {
728                 /* 
729                  * It seems all the attrs are fresh and we did no request. 
730                  */
731                 CDEBUG(D_INODE, "All the attrs were fresh on "DFID"\n", 
732                        PFID(mid));
733                 if (master_valid == 0)
734                         oit->d.lustre.it_lock_mode = master_lockm;
735                 rc = 1;
736         }
737
738         EXIT;
739 cleanup:
740         OBD_FREE_PTR(op_data);
741         lmv_object_put_unlock(obj);
742         return rc;
743 }
744
745 int lmv_allocate_slaves(struct obd_device *obd, struct lu_fid *pid,
746                         struct md_op_data *op, struct lu_fid *fid)
747 {
748         struct lmv_obd          *lmv = &obd->u.lmv;
749         struct lmv_object       *obj;
750         mdsno_t                  mds;
751         int                      sidx;
752         int                      rc;
753         ENTRY;
754
755         obj = lmv_object_find(obd, pid);
756         if (obj == NULL)
757                 RETURN(-EALREADY);
758
759         sidx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
760                             (char *)op->op_name, op->op_namelen);
761         mds = obj->lo_stripes[sidx].ls_mds;
762         lmv_object_put(obj);
763
764         rc = __lmv_fid_alloc(lmv, fid, mds);
765         if (rc) {
766                 CERROR("Can't allocate fid, rc %d\n", rc);
767                 RETURN(rc);
768         }
769
770         CDEBUG(D_INODE, "Allocate new fid "DFID" for slave "
771                "obj -> mds #%x\n", PFID(fid), mds);
772
773         RETURN(rc);
774 }