Whamcloud - gitweb
- raw_name2idx declaration to avoid warnings
[fs/lustre-release.git] / lustre / lmv / lmv_intent.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #ifndef EXPORT_SYMTAB
23 # define EXPORT_SYMTAB
24 #endif
25 #define DEBUG_SUBSYSTEM S_LMV
26 #ifdef __KERNEL__
27 #include <linux/slab.h>
28 #include <linux/module.h>
29 #include <linux/init.h>
30 #include <linux/slab.h>
31 #include <linux/pagemap.h>
32 #include <asm/div64.h>
33 #else
34 #include <liblustre.h>
35 #endif
36
37 #include <linux/obd_support.h>
38 #include <linux/lustre_lib.h>
39 #include <linux/lustre_net.h>
40 #include <linux/lustre_idl.h>
41 #include <linux/lustre_dlm.h>
42 #include <linux/lustre_mds.h>
43 #include <linux/obd_class.h>
44 #include <linux/obd_ost.h>
45 #include <linux/seq_file.h>
46 #include <linux/lprocfs_status.h>
47 #include <linux/lustre_fsfilt.h>
48 #include <linux/obd_lmv.h>
49 #include "lmv_internal.h"
50
51
52 int lmv_handle_remote_inode(struct obd_export *exp, struct ll_uctxt *uctxt,
53                             void *lmm, int lmmsize, 
54                             struct lookup_intent *it, int flags,
55                             struct ptlrpc_request **reqp,
56                             ldlm_blocking_callback cb_blocking)
57 {
58         struct obd_device *obd = exp->exp_obd;
59         struct lmv_obd *lmv = &obd->u.lmv;
60         struct mds_body *body = NULL;
61         int rc = 0;
62         ENTRY;
63
64         body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
65         LASSERT(body != NULL);
66
67         if (body->valid & OBD_MD_MDS) {
68                 /* oh, MDS reports that this is remote inode case
69                  * i.e. we have to ask for real attrs on another MDS */
70                 struct ptlrpc_request *req;
71                 struct ll_fid nfid;
72                 struct lustre_handle plock;
73                 int pmode;
74
75                 if (it->it_op == IT_LOOKUP) {
76                         /* unfortunately, we have to lie to MDC/MDS to
77                          * retrieve attributes llite needs */
78                         it->it_op = IT_GETATTR;
79                 }
80
81                 /* we got LOOKUP lock, but we really need attrs */
82                 pmode = it->d.lustre.it_lock_mode;
83                 if (pmode) {
84                         memcpy(&plock, &it->d.lustre.it_lock_handle,
85                                         sizeof(plock));
86                         it->d.lustre.it_lock_mode = 0;
87                 }
88
89                 nfid = body->fid1;
90                 it->d.lustre.it_disposition &= ~DISP_ENQ_COMPLETE;
91                 rc = md_intent_lock(lmv->tgts[nfid.mds].exp, uctxt, &nfid,
92                                     NULL, 0, lmm, lmmsize, NULL, it, flags,
93                                     &req, cb_blocking);
94
95                 /* llite needs LOOKUP lock to track dentry revocation in 
96                  * order to maintain dcache consistency. thus drop UPDATE
97                  * lock here and put LOOKUP in request */
98                 if (rc == 0) {
99                         LASSERT(it->d.lustre.it_lock_mode != 0);
100                         ldlm_lock_decref((void *)&it->d.lustre.it_lock_handle,
101                                          it->d.lustre.it_lock_mode);
102                         memcpy(&it->d.lustre.it_lock_handle, &plock,
103                                         sizeof(plock));
104                         it->d.lustre.it_lock_mode = pmode;
105                         
106                 } else if (pmode)
107                         ldlm_lock_decref(&plock, pmode);
108
109                 ptlrpc_req_finished(*reqp);
110                 *reqp = req;
111         }
112         RETURN(rc);
113 }
114
115 int lmv_intent_open(struct obd_export *exp, struct ll_uctxt *uctxt,
116                     struct ll_fid *pfid, const char *name, int len,
117                     void *lmm, int lmmsize, struct ll_fid *cfid,
118                     struct lookup_intent *it, int flags,
119                     struct ptlrpc_request **reqp,
120                     ldlm_blocking_callback cb_blocking)
121 {
122         struct obd_device *obd = exp->exp_obd;
123         struct lmv_obd *lmv = &obd->u.lmv;
124         struct mds_body *body = NULL;
125         struct ll_fid rpfid = *pfid;
126         struct lmv_obj *obj;
127         struct mea *mea;
128         int rc, mds;
129         ENTRY;
130
131         /* IT_OPEN is intended to open (and create, possible) an object.
132          * parent (pfid) may be splitted dir */
133
134         mds = pfid->mds;
135         obj = lmv_grab_obj(obd, pfid, 0);
136         if (obj) {
137                 /* directory is already splitted, so we have to forward
138                  * request to the right MDS */
139                 mds = raw_name2idx(obj->objcount, name, len);
140                 rpfid = obj->objs[mds].fid;
141                 CDEBUG(D_OTHER, "forward to MDS #%u\n", mds);
142         }
143
144         rc = md_intent_lock(lmv->tgts[mds].exp, uctxt, &rpfid, name, len,
145                             lmm, lmmsize, cfid, it, flags, reqp, cb_blocking);
146        
147         lmv_put_obj(obj);
148         if (rc != 0)
149                 RETURN(rc);
150
151         /* okay, MDS has returned success. probably name has been
152          * resolved in remote inode */
153         rc = lmv_handle_remote_inode(exp, uctxt, lmm, lmmsize, it, flags,
154                                      reqp, cb_blocking);
155         if (rc != 0) {
156                 LASSERT(rc < 0);
157                 RETURN(rc);
158         }
159
160         /* caller may use attrs MDS returns on IT_OPEN lock request
161          * so, we have to update them for splitted dir */
162         body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
163         LASSERT(body != NULL);
164         cfid = &body->fid1;
165         obj = lmv_grab_obj(obd, cfid, 0);
166         if (rc == 0 && !obj && (mea = is_body_of_splitted_dir(*reqp, 1))) {
167                 /* wow! this is splitted dir, we'd like to handle it */
168                 rc = lmv_create_obj_from_attrs(exp, &body->fid1, mea);
169         }
170         obj = lmv_grab_obj(obd, cfid, 0);
171         if (obj) {
172                 /* this is splitted dir and we'd want to get attrs */
173                 CDEBUG(D_OTHER, "attrs from slaves for %lu/%lu/%lu\n",
174                                 (unsigned long) cfid->mds,
175                                 (unsigned long) cfid->id,
176                                 (unsigned long) cfid->generation);
177                 rc = lmv_revalidate_slaves(exp, reqp, cfid,
178                                 it, 1, cb_blocking);
179         } else if (S_ISDIR(body->mode)) {
180                 /*CWARN("hmmm, %lu/%lu/%lu has not lmv obj?!\n",
181                                 (unsigned long) cfid->mds,
182                                 (unsigned long) cfid->id,
183                                 (unsigned long) cfid->generation);*/
184         }
185         lmv_put_obj(obj);
186         RETURN(rc);
187 }
188
189 int lmv_intent_getattr(struct obd_export *exp, struct ll_uctxt *uctxt,
190                        struct ll_fid *pfid, const char *name, int len,
191                        void *lmm, int lmmsize, struct ll_fid *cfid,
192                        struct lookup_intent *it, int flags,
193                        struct ptlrpc_request **reqp,
194                        ldlm_blocking_callback cb_blocking)
195 {
196         struct obd_device *obd = exp->exp_obd;
197         struct lmv_obd *lmv = &obd->u.lmv;
198         struct mds_body *body = NULL;
199         struct ll_fid rpfid = *pfid;
200         struct lmv_obj *obj, *obj2;
201         struct mea *mea;
202         int rc = 0, mds;
203         ENTRY;
204
205         if (cfid) {
206                 /* caller wants to revalidate attrs of obj
207                  * we have to revalidate slaves if requested
208                  * object is splitted directory */
209                 CDEBUG(D_OTHER, "revalidate attrs for %lu/%lu/%lu\n",
210                        (unsigned long) cfid->mds,
211                        (unsigned long) cfid->id,
212                        (unsigned long) cfid->generation);
213                 mds = cfid->mds;
214                 obj = lmv_grab_obj(obd, cfid, 0);
215                 if (obj) {
216                         /* in fact, we need not this with current
217                          * _intent_lock(), but it may change some day */
218                         rpfid = obj->objs[mds].fid;
219                 }
220                 rc = md_intent_lock(lmv->tgts[mds].exp, uctxt, &rpfid, name,
221                                     len, lmm, lmmsize, cfid, it, flags, reqp,
222                                     cb_blocking);
223                 if (obj && rc >= 0) {
224                         /* this is splitted dir. in order to optimize things
225                          * a bit, we consider obj valid updating missing
226                          * parts. FIXME: do we need to return any lock here?
227                          * it would be fine if we don't. this means that
228                          * nobody should use UPDATE lock to notify about
229                          * object removal */
230                         CDEBUG(D_OTHER,
231                                "revalidate slaves for %lu/%lu/%lu, rc %d\n",
232                                (unsigned long) cfid->mds,
233                                (unsigned long) cfid->id,
234                                (unsigned long) cfid->generation, rc);
235                         rc = lmv_revalidate_slaves(exp, reqp, cfid, it, rc,
236                                                    cb_blocking);
237                 }
238                 RETURN(rc);                
239         }
240
241         CDEBUG(D_OTHER, "INTENT getattr for %*s on %lu/%lu/%lu\n",
242                len, name, (unsigned long) pfid->mds,
243                (unsigned long) pfid->id,
244                (unsigned long) pfid->generation);
245
246         mds = pfid->mds;
247         obj = lmv_grab_obj(obd, pfid, 0);
248         if (obj && len) {
249                 /* directory is already splitted. calculate mds */
250                 mds = raw_name2idx(obj->objcount, (char *) name, len);
251                 rpfid = obj->objs[mds].fid;
252                 CDEBUG(D_OTHER, "forward to MDS #%u (slave %lu/%lu/%lu)\n",
253                        mds, (unsigned long) rpfid.mds,
254                        (unsigned long) rpfid.id,
255                        (unsigned long) rpfid.generation);
256         }
257         rc = md_intent_lock(lmv->tgts[mds].exp, uctxt, &rpfid, name,
258                             len, lmm, lmmsize, NULL, it, flags, reqp,
259                             cb_blocking);
260         if (rc < 0)
261                 RETURN(rc);
262         LASSERT(rc == 0);
263
264         /* okay, MDS has returned success. probably name has been
265          * resolved in remote inode */
266         rc = lmv_handle_remote_inode(exp, uctxt, lmm, lmmsize, it, flags,
267                                      reqp, cb_blocking);
268         if (rc < 0)
269                 RETURN(rc);
270
271         body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
272         LASSERT(body != NULL);
273         cfid = &body->fid1;
274         obj2 = lmv_grab_obj(obd, cfid, 0);
275
276         if (rc == 0 && !obj2 && (mea = is_body_of_splitted_dir(*reqp, 1))) {
277                 /* wow! this is splitted dir, we'd like to handle it */
278                 body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
279                 LASSERT(body != NULL);
280                 rc = lmv_create_obj_from_attrs(exp, &body->fid1, mea);
281                 obj2 = lmv_grab_obj(obd, cfid, 0);
282         }
283
284         if (obj2) {
285                 /* this is splitted dir and we'd want to get attrs */
286                 CDEBUG(D_OTHER,
287                                 "attrs from slaves for %lu/%lu/%lu, rc %d\n",
288                                 (unsigned long) cfid->mds,
289                                 (unsigned long) cfid->id,
290                                 (unsigned long) cfid->generation, rc);
291                 rc = lmv_revalidate_slaves(exp, reqp, cfid,
292                                 it, 1, cb_blocking);
293         }
294         RETURN(rc);
295 }
296
297 void lmv_update_body_from_obj(struct mds_body *body, struct lmv_inode *obj)
298 {
299         /* update size */
300         body->size += obj->size;
301
302         /* update atime */
303         /* update ctime */
304         /* update mtime */
305         /* update nlink */
306 }
307
308 int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp)
309 {
310         struct obd_device *obd = exp->exp_obd;
311         struct lmv_obd *lmv = &obd->u.lmv;
312         struct mds_body *body = NULL;
313         struct lustre_handle *lockh;
314         struct ldlm_lock *lock;
315         struct mds_body *body2;
316         struct ll_uctxt uctxt;
317         struct lmv_obj *obj;
318         int i, rc = 0;
319         ENTRY;
320
321         LASSERT(reqp);
322         LASSERT(*reqp);
323
324         /* master is locked. we'd like to take locks on slaves
325          * and update attributes to be returned from the slaves
326          * it's important that lookup is called in two cases:
327          *  - for first time (dcache has no such a resolving yet
328          *  - ->d_revalidate() returned false
329          * last case possible only if all the objs (master and
330          * all slaves aren't valid */
331
332         body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
333         LASSERT(body != NULL);
334
335         obj = lmv_grab_obj(obd, &body->fid1, 0);
336         LASSERT(obj);
337
338         CDEBUG(D_OTHER, "lookup slaves for %lu/%lu/%lu\n",
339                (unsigned long) body->fid1.mds,
340                (unsigned long) body->fid1.id,
341                (unsigned long) body->fid1.generation);
342
343         uctxt.gid1 = 0;
344         uctxt.gid2 = 0;
345         for (i = 0; i < obj->objcount; i++) {
346                 struct ll_fid fid = obj->objs[i].fid;
347                 struct ptlrpc_request *req = NULL;
348                 struct lookup_intent it;
349
350                 if (fid_equal(&fid, &obj->fid)) {
351                         /* skip master obj */
352                         continue;
353                 }
354
355                 CDEBUG(D_OTHER, "lookup slave %lu/%lu/%lu\n",
356                        (unsigned long) fid.mds,
357                        (unsigned long) fid.id,
358                        (unsigned long) fid.generation);
359
360                 /* is obj valid? */
361                 memset(&it, 0, sizeof(it));
362                 it.it_op = IT_GETATTR;
363                 rc = md_intent_lock(lmv->tgts[fid.mds].exp, &uctxt, &fid,
364                                     NULL, 0, NULL, 0, &fid, &it, 0, &req,
365                                     lmv_dirobj_blocking_ast);
366                 lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle;
367                 if (rc > 0) {
368                         /* nice, this slave is valid */
369                         LASSERT(req == NULL);
370                         CDEBUG(D_OTHER, "cached\n");
371                         goto release_lock;
372                 }
373
374                 if (rc < 0) {
375                         /* error during revalidation */
376                         GOTO(cleanup, rc);
377                 }
378
379                 /* rc == 0, this means we have no such a lock and can't
380                  * think obj is still valid. lookup it again */
381                 LASSERT(req == NULL);
382                 req = NULL;
383                 memset(&it, 0, sizeof(it));
384                 it.it_op = IT_GETATTR;
385                 rc = md_intent_lock(lmv->tgts[fid.mds].exp, &uctxt, &fid,
386                                     NULL, 0, NULL, 0, NULL, &it, 0, &req,
387                                     lmv_dirobj_blocking_ast);
388                 lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle;
389                 LASSERT(rc <= 0);
390                 if (rc < 0) {
391                         /* error during lookup */
392                         GOTO(cleanup, rc);
393                 }
394                 
395                 lock = ldlm_handle2lock(lockh);
396                 LASSERT(lock);
397                 lock->l_ast_data = obj;
398                 atomic_inc(&obj->count);
399
400                 body2 = lustre_msg_buf(req->rq_repmsg, 1, sizeof(*body2));
401                 LASSERT(body2);
402
403                 obj->objs[i].size = body2->size;
404                 CDEBUG(D_OTHER, "fresh: %lu\n",
405                                 (unsigned long) obj->objs[i].size);
406
407                 LDLM_LOCK_PUT(lock);
408
409                 if (req)
410                         ptlrpc_req_finished(req);
411 release_lock:
412                 lmv_update_body_from_obj(body, obj->objs + i);
413                 if (it.d.lustre.it_lock_mode)
414                         ldlm_lock_decref(lockh, it.d.lustre.it_lock_mode);
415         }
416 cleanup:
417         RETURN(rc);
418 }
419
420 int lmv_intent_lookup(struct obd_export *exp, struct ll_uctxt *uctxt,
421                 struct ll_fid *pfid, const char *name, int len,
422                 void *lmm, int lmmsize, struct ll_fid *cfid,
423                 struct lookup_intent *it, int flags,
424                 struct ptlrpc_request **reqp,
425                 ldlm_blocking_callback cb_blocking)
426 {
427         struct obd_device *obd = exp->exp_obd;
428         struct lmv_obd *lmv = &obd->u.lmv;
429         struct mds_body *body = NULL;
430         struct ll_fid rpfid = *pfid;
431         struct lmv_obj *obj;
432         struct mea *mea;
433         int rc, mds;
434         ENTRY;
435
436         /* IT_LOOKUP is intended to produce name -> fid resolving
437          * (let's call this lookup below) or to confirm requested
438          * resolving is still valid (let's call this revalidation)
439          * cfid != NULL specifies revalidation */
440
441         if (cfid) {
442                 /* this is revalidation: we have to check is LOOKUP
443                  * lock still valid for given fid. very important
444                  * part is that we have to choose right mds because
445                  * namespace is per mds */
446                 rpfid = *pfid;
447                 obj = lmv_grab_obj(obd, pfid, 0);
448                 if (obj) {
449                         mds = raw_name2idx(obj->objcount, (char *) name, len);
450                         rpfid = obj->objs[mds].fid;
451                         lmv_put_obj(obj);
452                 }
453                 mds = rpfid.mds;
454                 CDEBUG(D_OTHER, "revalidate lookup for %lu/%lu/%lu to %d MDS\n",
455                        (unsigned long) cfid->mds,
456                        (unsigned long) cfid->id,
457                        (unsigned long) cfid->generation, mds);
458                 rc = md_intent_lock(lmv->tgts[mds].exp, uctxt, pfid, name,
459                                     len, lmm, lmmsize, cfid, it, flags,
460                                     reqp, cb_blocking);
461                 RETURN(rc);
462         }
463
464         mds = pfid->mds;
465 repeat:
466         /* this is lookup. during lookup we have to update all the
467          * attributes, because returned values will be put in struct
468          * inode */
469
470         obj = lmv_grab_obj(obd, pfid, 0);
471         if (obj && len) {
472                 /* directory is already splitted. calculate mds */
473                 mds = raw_name2idx(obj->objcount, (char *) name, len);
474                 rpfid = obj->objs[mds].fid;
475                 lmv_put_obj(obj);
476         }
477
478         rc = md_intent_lock(lmv->tgts[mds].exp, uctxt, &rpfid, name,
479                             len, lmm, lmmsize, NULL, it, flags, reqp,
480                             cb_blocking);
481         if (rc > 0) {
482                 /* very interesting. it seems object is still valid
483                  * but for some reason llite calls lookup, not revalidate */
484                 CWARN("lookup for %lu/%lu/%lu and data should be uptodate\n",
485                        (unsigned long) rpfid.mds,
486                        (unsigned long) rpfid.id,
487                        (unsigned long) rpfid.generation);
488                 LASSERT(*reqp == NULL);
489                 RETURN(rc);
490         }
491
492         if (rc == 0 && *reqp == NULL) {
493                 /* once again, we're asked for lookup, not revalidate */
494                 CWARN("lookup for %lu/%lu/%lu and data should be uptodate\n",
495                        (unsigned long) rpfid.mds,
496                        (unsigned long) rpfid.id,
497                        (unsigned long) rpfid.generation);
498                 RETURN(rc);
499         }
500        
501         if (rc == -ESTALE) {
502                 /* directory got splitted since last update. this shouldn't
503                  * be becasue splitting causes lock revocation, so revalidate
504                  * had to fail and lookup on dir had to return mea */
505                 CWARN("we haven't knew about directory splitting!\n");
506                 LASSERT(obj == NULL);
507                 rc = lmv_create_obj_from_attrs(exp, &rpfid, NULL);
508                 if (rc)
509                         RETURN(rc);
510                 goto repeat;
511         }
512
513         if (rc < 0)
514                 RETURN(rc);
515
516         /* okay, MDS has returned success. probably name has been
517          * resolved in remote inode */
518         rc = lmv_handle_remote_inode(exp, uctxt, lmm, lmmsize, it, flags,
519                                      reqp, cb_blocking);
520
521         if (rc == 0 && (mea = is_body_of_splitted_dir(*reqp, 1))) {
522                 /* wow! this is splitted dir, we'd like to handle it */
523                 body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
524                 LASSERT(body != NULL);
525                 obj = lmv_grab_obj(obd, &body->fid1, 0);
526                 if (!obj)
527                         rc = lmv_create_obj_from_attrs(exp, &body->fid1, mea);
528                 lmv_put_obj(obj);
529         }
530
531         RETURN(rc);
532 }
533
534 int lmv_intent_lock(struct obd_export *exp, struct ll_uctxt *uctxt,
535                 struct ll_fid *pfid, const char *name, int len,
536                 void *lmm, int lmmsize, struct ll_fid *cfid,
537                 struct lookup_intent *it, int flags,
538                 struct ptlrpc_request **reqp,
539                 ldlm_blocking_callback cb_blocking)
540 {
541         struct obd_device *obd = exp->exp_obd;
542         int rc = 0;
543         ENTRY;
544
545         LASSERT(it);
546         LASSERT(pfid);
547
548         CDEBUG(D_OTHER, "INTENT LOCK '%s' for '%*s' on %lu/%lu -> %u\n",
549                         LL_IT2STR(it), len, name, (unsigned long) pfid->id,
550                         (unsigned long) pfid->generation, pfid->mds);
551
552         lmv_connect(obd);
553         if (it->it_op == IT_LOOKUP)
554                 rc = lmv_intent_lookup(exp, uctxt, pfid, name, len, lmm,
555                                        lmmsize, cfid, it, flags, reqp,
556                                        cb_blocking);
557         else if (it->it_op & IT_OPEN)
558                 rc = lmv_intent_open(exp, uctxt, pfid, name, len, lmm,
559                                      lmmsize, cfid, it, flags, reqp,
560                                      cb_blocking);
561         else if (it->it_op == IT_GETATTR)
562                 rc = lmv_intent_getattr(exp, uctxt, pfid, name, len, lmm,
563                                         lmmsize, cfid, it, flags, reqp,
564                                         cb_blocking);
565         else
566                 LBUG();
567         RETURN(rc);
568 }
569
570 int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp,
571                           struct ll_fid *mfid, struct lookup_intent *oit,
572                           int master_valid, ldlm_blocking_callback cb_blocking)
573 {
574         struct obd_device *obd = exp->exp_obd;
575         struct ptlrpc_request *mreq = *reqp;
576         struct lmv_obd *lmv = &obd->u.lmv;
577         struct lustre_handle master_lockh;
578         unsigned long size = 0;
579         struct ldlm_lock *lock;
580         struct mds_body *body;
581         struct ll_uctxt uctxt;
582         struct lmv_obj *obj;
583         int master_lock_mode;
584         int i, rc = 0;
585         ENTRY;
586
587         /* we have to loop over the subobjects, check validity and update
588          * them from MDSs if needed. it's very useful that we need not to
589          * update all the fields. say, common fields (that are equal on 
590          * all the subojects need not to be update, another fields (i_size,
591          * for example) are cached all the time */
592         obj = lmv_grab_obj(obd, mfid, 0);
593         LASSERT(obj);
594
595         master_lock_mode = 0;
596         uctxt.gid1 = 0;
597         uctxt.gid2 = 0;
598         for (i = 0; i < obj->objcount; i++) {
599                 struct ll_fid fid = obj->objs[i].fid;
600                 struct lustre_handle *lockh = NULL;
601                 struct ptlrpc_request *req = NULL;
602                 ldlm_blocking_callback cb;
603                 struct lookup_intent it;
604                 int master = 0;
605
606                 CDEBUG(D_OTHER, "revalidate subobj %lu/%lu/%lu\n",
607                        (unsigned long) fid.mds,
608                        (unsigned long) fid.id,
609                        (unsigned long) fid.generation);
610
611                 memset(&it, 0, sizeof(it));
612                 it.it_op = IT_GETATTR;
613                 cb = lmv_dirobj_blocking_ast;
614
615                 if (fid_equal(&fid, &obj->fid)) {
616                         if (master_valid) {
617                                 /* lmv_intent_getattr() already checked
618                                  * validness and took the lock */
619                                 if (mreq) {
620                                         /* it even got the reply
621                                          * refresh attrs from that reply */
622                                         body = lustre_msg_buf(mreq->rq_repmsg,
623                                                               1,sizeof(*body));
624                                         LASSERT(body != NULL);
625                                         goto update; 
626                                 }
627                                 /* take already cached attrs into account */
628                                 CDEBUG(D_OTHER,
629                                        "master is locked and cached\n");
630                                 goto release_lock;
631                         }
632                         master = 1;
633                         cb = cb_blocking;
634                 }
635
636                 /* is obj valid? */
637                 rc = md_intent_lock(lmv->tgts[fid.mds].exp, &uctxt, &fid,
638                                     NULL, 0, NULL, 0, &fid, &it, 0, &req, cb);
639                 lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle;
640                 if (rc > 0) {
641                         /* nice, this slave is valid */
642                         LASSERT(req == NULL);
643                         CDEBUG(D_OTHER, "cached\n");
644                         goto release_lock;
645                 }
646
647                 if (rc < 0) {
648                         /* error during revalidation */
649                         GOTO(cleanup, rc);
650                 }
651
652                 /* rc == 0, this means we have no such a lock and can't
653                  * think obj is still valid. lookup it again */
654                 LASSERT(req == NULL);
655                 req = NULL;
656                 memset(&it, 0, sizeof(it));
657                 it.it_op = IT_GETATTR;
658                 rc = md_intent_lock(lmv->tgts[fid.mds].exp, &uctxt, &fid,
659                                     NULL, 0, NULL, 0, NULL, &it, 0, &req, cb);
660                 lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle;
661                 LASSERT(rc <= 0);
662                 if (rc < 0) {
663                         /* error during lookup */
664                         GOTO(cleanup, rc);
665                 }
666               
667                 if (master) {
668                         LASSERT(master_valid == 0);
669                         /* save lock on master to be returned to the caller */
670                         CDEBUG(D_OTHER, "no lock on master yet\n");
671                         memcpy(&master_lockh, lockh, sizeof(master_lockh));
672                         master_lock_mode = it.d.lustre.it_lock_mode;
673                         it.d.lustre.it_lock_mode = 0;
674                 } else {
675                         /* this is slave. we want to control it */
676                         lock = ldlm_handle2lock(lockh);
677                         LASSERT(lock);
678                         lock->l_ast_data = obj;
679                         atomic_inc(&obj->count);
680                         LDLM_LOCK_PUT(lock);
681                 }
682
683                 if (*reqp == NULL) {
684                         /* this is first reply, we'll use it to return
685                          * updated data back to the caller */
686                         LASSERT(req);
687                         ptlrpc_request_addref(req);
688                         *reqp = req;
689
690                 }
691
692                 body = lustre_msg_buf(req->rq_repmsg, 1, sizeof(*body));
693                 LASSERT(body);
694                 
695 update:
696                 obj->objs[i].size = body->size;
697                 CDEBUG(D_OTHER, "fresh: %lu\n",
698                        (unsigned long) obj->objs[i].size);
699
700                 if (req)
701                         ptlrpc_req_finished(req);
702 release_lock:
703                 size += obj->objs[i].size;
704                 if (it.d.lustre.it_lock_mode)
705                         ldlm_lock_decref(lockh, it.d.lustre.it_lock_mode);
706         }
707
708         if (*reqp) {
709                 /* some attrs got refreshed, we have reply and it's time
710                  * to put fresh attrs to it */
711                 CDEBUG(D_OTHER, "return refreshed attrs: size = %lu\n",
712                        (unsigned long) size);
713                 body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
714                 LASSERT(body);
715                 /* FIXME: what about another attributes? */
716                 body->size = size;
717                 if (mreq == NULL) {
718                         /* very important to maintain lli->mds the same
719                          * because of revalidation. mreq == NULL means
720                          * that caller has no reply and the only attr
721                          * we can return is size */
722                         body->valid = OBD_MD_FLSIZE;
723                         body->mds = obj->fid.mds;
724                 }
725                 if (master_valid == 0) {
726                         memcpy(&oit->d.lustre.it_lock_handle,
727                                &master_lockh, sizeof(master_lockh));
728                         oit->d.lustre.it_lock_mode = master_lock_mode;
729                 }
730                 rc = 0;
731         } else {
732                 /* it seems all the attrs are fresh and we did no request */
733                 CDEBUG(D_OTHER, "all the attrs were fresh\n");
734                 if (master_valid == 0)
735                         oit->d.lustre.it_lock_mode = master_lock_mode;
736                 rc = 1;
737         }
738 cleanup:
739         RETURN(rc);
740 }
741