Whamcloud - gitweb
- lmv should accept IT_CHDIR intent
[fs/lustre-release.git] / lustre / lmv / lmv_intent.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #ifndef EXPORT_SYMTAB
23 # define EXPORT_SYMTAB
24 #endif
25 #define DEBUG_SUBSYSTEM S_LMV
26 #ifdef __KERNEL__
27 #include <linux/slab.h>
28 #include <linux/module.h>
29 #include <linux/init.h>
30 #include <linux/slab.h>
31 #include <linux/pagemap.h>
32 #include <asm/div64.h>
33 #include <linux/seq_file.h>
34 #else
35 #include <liblustre.h>
36 #endif
37
38 #include <linux/obd_support.h>
39 #include <linux/lustre_lib.h>
40 #include <linux/lustre_net.h>
41 #include <linux/lustre_idl.h>
42 #include <linux/lustre_dlm.h>
43 #include <linux/lustre_mds.h>
44 #include <linux/obd_class.h>
45 #include <linux/obd_ost.h>
46 #include <linux/lprocfs_status.h>
47 #include <linux/lustre_fsfilt.h>
48 #include <linux/obd_lmv.h>
49 #include "lmv_internal.h"
50
51
52 int lmv_handle_remote_inode(struct obd_export *exp, struct ll_uctxt *uctxt,
53                             void *lmm, int lmmsize, 
54                             struct lookup_intent *it, int flags,
55                             struct ptlrpc_request **reqp,
56                             ldlm_blocking_callback cb_blocking)
57 {
58         struct obd_device *obd = exp->exp_obd;
59         struct lmv_obd *lmv = &obd->u.lmv;
60         struct mds_body *body = NULL;
61         int rc = 0;
62         ENTRY;
63
64         body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
65         LASSERT(body != NULL);
66
67         if (body->valid & OBD_MD_MDS) {
68                 /* oh, MDS reports that this is remote inode case
69                  * i.e. we have to ask for real attrs on another MDS */
70                 struct ptlrpc_request *req;
71                 struct ll_fid nfid;
72                 struct lustre_handle plock;
73                 int pmode;
74
75                 if (it->it_op == IT_LOOKUP) {
76                         /* unfortunately, we have to lie to MDC/MDS to
77                          * retrieve attributes llite needs */
78                         it->it_op = IT_GETATTR;
79                 }
80
81                 /* we got LOOKUP lock, but we really need attrs */
82                 pmode = it->d.lustre.it_lock_mode;
83                 if (pmode) {
84                         memcpy(&plock, &it->d.lustre.it_lock_handle,
85                                         sizeof(plock));
86                         it->d.lustre.it_lock_mode = 0;
87                 }
88
89                 nfid = body->fid1;
90                 it->d.lustre.it_disposition &= ~DISP_ENQ_COMPLETE;
91                 rc = md_intent_lock(lmv->tgts[nfid.mds].exp, uctxt, &nfid,
92                                     NULL, 0, lmm, lmmsize, NULL, it, flags,
93                                     &req, cb_blocking);
94
95                 /* llite needs LOOKUP lock to track dentry revocation in 
96                  * order to maintain dcache consistency. thus drop UPDATE
97                  * lock here and put LOOKUP in request */
98                 if (rc == 0) {
99                         LASSERT(it->d.lustre.it_lock_mode != 0);
100                         ldlm_lock_decref((void *)&it->d.lustre.it_lock_handle,
101                                          it->d.lustre.it_lock_mode);
102                         memcpy(&it->d.lustre.it_lock_handle, &plock,
103                                         sizeof(plock));
104                         it->d.lustre.it_lock_mode = pmode;
105                         
106                 } else if (pmode)
107                         ldlm_lock_decref(&plock, pmode);
108
109                 ptlrpc_req_finished(*reqp);
110                 *reqp = req;
111         }
112         RETURN(rc);
113 }
114
115 int lmv_intent_open(struct obd_export *exp, struct ll_uctxt *uctxt,
116                     struct ll_fid *pfid, const char *name, int len,
117                     void *lmm, int lmmsize, struct ll_fid *cfid,
118                     struct lookup_intent *it, int flags,
119                     struct ptlrpc_request **reqp,
120                     ldlm_blocking_callback cb_blocking)
121 {
122         struct obd_device *obd = exp->exp_obd;
123         struct lmv_obd *lmv = &obd->u.lmv;
124         struct mds_body *body = NULL;
125         struct ll_fid rpfid = *pfid;
126         struct lmv_obj *obj;
127         struct mea *mea;
128         int rc, mds;
129         ENTRY;
130
131         /* IT_OPEN is intended to open (and create, possible) an object.
132          * parent (pfid) may be splitted dir */
133
134 repeat:
135         mds = rpfid.mds;
136         obj = lmv_grab_obj(obd, &rpfid, 0);
137         if (obj) {
138                 /* directory is already splitted, so we have to forward
139                  * request to the right MDS */
140                 mds = raw_name2idx(obj->objcount, (char *)name, len);
141                 rpfid = obj->objs[mds].fid;
142                 CDEBUG(D_OTHER, "forward to MDS #%u\n", mds);
143         }
144
145         rc = md_intent_lock(lmv->tgts[mds].exp, uctxt, &rpfid, name,
146                             len, lmm, lmmsize, cfid, it, flags, reqp,
147                             cb_blocking);
148         lmv_put_obj(obj);
149         if (rc == -ERESTART) {
150                 /* directory got splitted. time to update local object
151                  * and repeat the request with proper MDS */
152                 LASSERT(fid_equal(pfid, &rpfid));
153                 rc = lmv_get_mea_and_update_object(exp, &rpfid);
154                 if (rc == 0) {
155                         ptlrpc_req_finished(*reqp);
156                         goto repeat;
157                 }
158         }
159         if (rc != 0)
160                 RETURN(rc);
161
162         /* okay, MDS has returned success. probably name has been
163          * resolved in remote inode */
164         rc = lmv_handle_remote_inode(exp, uctxt, lmm, lmmsize, it, flags,
165                                      reqp, cb_blocking);
166         if (rc != 0) {
167                 LASSERT(rc < 0);
168                 RETURN(rc);
169         }
170
171         /* caller may use attrs MDS returns on IT_OPEN lock request
172          * so, we have to update them for splitted dir */
173         body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
174         LASSERT(body != NULL);
175         cfid = &body->fid1;
176         obj = lmv_grab_obj(obd, cfid, 0);
177         if (rc == 0 && !obj && (mea = is_body_of_splitted_dir(*reqp, 1))) {
178                 /* wow! this is splitted dir, we'd like to handle it */
179                 rc = lmv_create_obj_from_attrs(exp, &body->fid1, mea);
180         }
181         obj = lmv_grab_obj(obd, cfid, 0);
182         if (obj) {
183                 /* this is splitted dir and we'd want to get attrs */
184                 CDEBUG(D_OTHER, "attrs from slaves for %lu/%lu/%lu\n",
185                                 (unsigned long) cfid->mds,
186                                 (unsigned long) cfid->id,
187                                 (unsigned long) cfid->generation);
188                 rc = lmv_revalidate_slaves(exp, reqp, cfid,
189                                 it, 1, cb_blocking);
190         } else if (S_ISDIR(body->mode)) {
191                 /*CWARN("hmmm, %lu/%lu/%lu has not lmv obj?!\n",
192                                 (unsigned long) cfid->mds,
193                                 (unsigned long) cfid->id,
194                                 (unsigned long) cfid->generation);*/
195         }
196         lmv_put_obj(obj);
197         RETURN(rc);
198 }
199
200 int lmv_intent_getattr(struct obd_export *exp, struct ll_uctxt *uctxt,
201                        struct ll_fid *pfid, const char *name, int len,
202                        void *lmm, int lmmsize, struct ll_fid *cfid,
203                        struct lookup_intent *it, int flags,
204                        struct ptlrpc_request **reqp,
205                        ldlm_blocking_callback cb_blocking)
206 {
207         struct obd_device *obd = exp->exp_obd;
208         struct lmv_obd *lmv = &obd->u.lmv;
209         struct mds_body *body = NULL;
210         struct ll_fid rpfid = *pfid;
211         struct lmv_obj *obj, *obj2;
212         struct mea *mea;
213         int rc = 0, mds;
214         ENTRY;
215
216         if (cfid) {
217                 /* caller wants to revalidate attrs of obj
218                  * we have to revalidate slaves if requested
219                  * object is splitted directory */
220                 CDEBUG(D_OTHER, "revalidate attrs for %lu/%lu/%lu\n",
221                        (unsigned long) cfid->mds,
222                        (unsigned long) cfid->id,
223                        (unsigned long) cfid->generation);
224                 mds = cfid->mds;
225                 obj = lmv_grab_obj(obd, cfid, 0);
226                 if (obj) {
227                         /* in fact, we need not this with current
228                          * _intent_lock(), but it may change some day */
229                         rpfid = obj->objs[mds].fid;
230                 }
231                 rc = md_intent_lock(lmv->tgts[mds].exp, uctxt, &rpfid, name,
232                                     len, lmm, lmmsize, cfid, it, flags, reqp,
233                                     cb_blocking);
234                 if (obj && rc >= 0) {
235                         /* this is splitted dir. in order to optimize things
236                          * a bit, we consider obj valid updating missing
237                          * parts. FIXME: do we need to return any lock here?
238                          * it would be fine if we don't. this means that
239                          * nobody should use UPDATE lock to notify about
240                          * object removal */
241                         CDEBUG(D_OTHER,
242                                "revalidate slaves for %lu/%lu/%lu, rc %d\n",
243                                (unsigned long) cfid->mds,
244                                (unsigned long) cfid->id,
245                                (unsigned long) cfid->generation, rc);
246                         rc = lmv_revalidate_slaves(exp, reqp, cfid, it, rc,
247                                                    cb_blocking);
248                 }
249                 RETURN(rc);                
250         }
251
252         CDEBUG(D_OTHER, "INTENT getattr for %*s on %lu/%lu/%lu\n",
253                len, name, (unsigned long) pfid->mds,
254                (unsigned long) pfid->id,
255                (unsigned long) pfid->generation);
256
257         mds = pfid->mds;
258         obj = lmv_grab_obj(obd, pfid, 0);
259         if (obj && len) {
260                 /* directory is already splitted. calculate mds */
261                 mds = raw_name2idx(obj->objcount, (char *) name, len);
262                 rpfid = obj->objs[mds].fid;
263                 CDEBUG(D_OTHER, "forward to MDS #%u (slave %lu/%lu/%lu)\n",
264                        mds, (unsigned long) rpfid.mds,
265                        (unsigned long) rpfid.id,
266                        (unsigned long) rpfid.generation);
267         }
268         rc = md_intent_lock(lmv->tgts[mds].exp, uctxt, &rpfid, name,
269                             len, lmm, lmmsize, NULL, it, flags, reqp,
270                             cb_blocking);
271         if (rc < 0)
272                 RETURN(rc);
273         LASSERT(rc == 0);
274
275         /* okay, MDS has returned success. probably name has been
276          * resolved in remote inode */
277         rc = lmv_handle_remote_inode(exp, uctxt, lmm, lmmsize, it, flags,
278                                      reqp, cb_blocking);
279         if (rc < 0)
280                 RETURN(rc);
281
282         body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
283         LASSERT(body != NULL);
284         cfid = &body->fid1;
285         obj2 = lmv_grab_obj(obd, cfid, 0);
286
287         if (rc == 0 && !obj2 && (mea = is_body_of_splitted_dir(*reqp, 1))) {
288                 /* wow! this is splitted dir, we'd like to handle it */
289                 body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
290                 LASSERT(body != NULL);
291                 rc = lmv_create_obj_from_attrs(exp, &body->fid1, mea);
292                 obj2 = lmv_grab_obj(obd, cfid, 0);
293         }
294
295         if (obj2) {
296                 /* this is splitted dir and we'd want to get attrs */
297                 CDEBUG(D_OTHER,
298                                 "attrs from slaves for %lu/%lu/%lu, rc %d\n",
299                                 (unsigned long) cfid->mds,
300                                 (unsigned long) cfid->id,
301                                 (unsigned long) cfid->generation, rc);
302                 rc = lmv_revalidate_slaves(exp, reqp, cfid,
303                                 it, 1, cb_blocking);
304         }
305         RETURN(rc);
306 }
307
308 void lmv_update_body_from_obj(struct mds_body *body, struct lmv_inode *obj)
309 {
310         /* update size */
311         body->size += obj->size;
312
313         /* update atime */
314         /* update ctime */
315         /* update mtime */
316         /* update nlink */
317 }
318
319 int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp)
320 {
321         struct obd_device *obd = exp->exp_obd;
322         struct lmv_obd *lmv = &obd->u.lmv;
323         struct mds_body *body = NULL;
324         struct lustre_handle *lockh;
325         struct ldlm_lock *lock;
326         struct mds_body *body2;
327         struct ll_uctxt uctxt;
328         struct lmv_obj *obj;
329         int i, rc = 0;
330         ENTRY;
331
332         LASSERT(reqp);
333         LASSERT(*reqp);
334
335         /* master is locked. we'd like to take locks on slaves
336          * and update attributes to be returned from the slaves
337          * it's important that lookup is called in two cases:
338          *  - for first time (dcache has no such a resolving yet
339          *  - ->d_revalidate() returned false
340          * last case possible only if all the objs (master and
341          * all slaves aren't valid */
342
343         body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
344         LASSERT(body != NULL);
345
346         obj = lmv_grab_obj(obd, &body->fid1, 0);
347         LASSERT(obj);
348
349         CDEBUG(D_OTHER, "lookup slaves for %lu/%lu/%lu\n",
350                (unsigned long) body->fid1.mds,
351                (unsigned long) body->fid1.id,
352                (unsigned long) body->fid1.generation);
353
354         uctxt.gid1 = 0;
355         uctxt.gid2 = 0;
356         for (i = 0; i < obj->objcount; i++) {
357                 struct ll_fid fid = obj->objs[i].fid;
358                 struct ptlrpc_request *req = NULL;
359                 struct lookup_intent it;
360
361                 if (fid_equal(&fid, &obj->fid)) {
362                         /* skip master obj */
363                         continue;
364                 }
365
366                 CDEBUG(D_OTHER, "lookup slave %lu/%lu/%lu\n",
367                        (unsigned long) fid.mds,
368                        (unsigned long) fid.id,
369                        (unsigned long) fid.generation);
370
371                 /* is obj valid? */
372                 memset(&it, 0, sizeof(it));
373                 it.it_op = IT_GETATTR;
374                 rc = md_intent_lock(lmv->tgts[fid.mds].exp, &uctxt, &fid,
375                                     NULL, 0, NULL, 0, &fid, &it, 0, &req,
376                                     lmv_dirobj_blocking_ast);
377                 lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle;
378                 if (rc > 0) {
379                         /* nice, this slave is valid */
380                         LASSERT(req == NULL);
381                         CDEBUG(D_OTHER, "cached\n");
382                         goto release_lock;
383                 }
384
385                 if (rc < 0) {
386                         /* error during revalidation */
387                         GOTO(cleanup, rc);
388                 }
389
390                 /* rc == 0, this means we have no such a lock and can't
391                  * think obj is still valid. lookup it again */
392                 LASSERT(req == NULL);
393                 req = NULL;
394                 memset(&it, 0, sizeof(it));
395                 it.it_op = IT_GETATTR;
396                 rc = md_intent_lock(lmv->tgts[fid.mds].exp, &uctxt, &fid,
397                                     NULL, 0, NULL, 0, NULL, &it, 0, &req,
398                                     lmv_dirobj_blocking_ast);
399                 lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle;
400                 LASSERT(rc <= 0);
401                 if (rc < 0) {
402                         /* error during lookup */
403                         GOTO(cleanup, rc);
404                 }
405                 
406                 lock = ldlm_handle2lock(lockh);
407                 LASSERT(lock);
408                 lock->l_ast_data = obj;
409                 atomic_inc(&obj->count);
410
411                 body2 = lustre_msg_buf(req->rq_repmsg, 1, sizeof(*body2));
412                 LASSERT(body2);
413
414                 obj->objs[i].size = body2->size;
415                 CDEBUG(D_OTHER, "fresh: %lu\n",
416                                 (unsigned long) obj->objs[i].size);
417
418                 LDLM_LOCK_PUT(lock);
419
420                 if (req)
421                         ptlrpc_req_finished(req);
422 release_lock:
423                 lmv_update_body_from_obj(body, obj->objs + i);
424                 if (it.d.lustre.it_lock_mode)
425                         ldlm_lock_decref(lockh, it.d.lustre.it_lock_mode);
426         }
427 cleanup:
428         RETURN(rc);
429 }
430
431 int lmv_intent_lookup(struct obd_export *exp, struct ll_uctxt *uctxt,
432                 struct ll_fid *pfid, const char *name, int len,
433                 void *lmm, int lmmsize, struct ll_fid *cfid,
434                 struct lookup_intent *it, int flags,
435                 struct ptlrpc_request **reqp,
436                 ldlm_blocking_callback cb_blocking)
437 {
438         struct obd_device *obd = exp->exp_obd;
439         struct lmv_obd *lmv = &obd->u.lmv;
440         struct mds_body *body = NULL;
441         struct ll_fid rpfid = *pfid;
442         struct lmv_obj *obj;
443         struct mea *mea;
444         int rc, mds;
445         ENTRY;
446
447         /* IT_LOOKUP is intended to produce name -> fid resolving
448          * (let's call this lookup below) or to confirm requested
449          * resolving is still valid (let's call this revalidation)
450          * cfid != NULL specifies revalidation */
451
452         if (cfid) {
453                 /* this is revalidation: we have to check is LOOKUP
454                  * lock still valid for given fid. very important
455                  * part is that we have to choose right mds because
456                  * namespace is per mds */
457                 rpfid = *pfid;
458                 obj = lmv_grab_obj(obd, pfid, 0);
459                 if (obj) {
460                         mds = raw_name2idx(obj->objcount, (char *) name, len);
461                         rpfid = obj->objs[mds].fid;
462                         lmv_put_obj(obj);
463                 }
464                 mds = rpfid.mds;
465                 CDEBUG(D_OTHER, "revalidate lookup for %lu/%lu/%lu to %d MDS\n",
466                        (unsigned long) cfid->mds,
467                        (unsigned long) cfid->id,
468                        (unsigned long) cfid->generation, mds);
469                 rc = md_intent_lock(lmv->tgts[mds].exp, uctxt, pfid, name,
470                                     len, lmm, lmmsize, cfid, it, flags,
471                                     reqp, cb_blocking);
472                 RETURN(rc);
473         }
474
475         mds = pfid->mds;
476 repeat:
477         /* this is lookup. during lookup we have to update all the
478          * attributes, because returned values will be put in struct
479          * inode */
480
481         obj = lmv_grab_obj(obd, pfid, 0);
482         if (obj && len) {
483                 /* directory is already splitted. calculate mds */
484                 mds = raw_name2idx(obj->objcount, (char *) name, len);
485                 rpfid = obj->objs[mds].fid;
486                 lmv_put_obj(obj);
487         }
488
489         rc = md_intent_lock(lmv->tgts[mds].exp, uctxt, &rpfid, name,
490                             len, lmm, lmmsize, NULL, it, flags, reqp,
491                             cb_blocking);
492         if (rc > 0) {
493                 /* very interesting. it seems object is still valid
494                  * but for some reason llite calls lookup, not revalidate */
495                 CWARN("lookup for %lu/%lu/%lu and data should be uptodate\n",
496                        (unsigned long) rpfid.mds,
497                        (unsigned long) rpfid.id,
498                        (unsigned long) rpfid.generation);
499                 LASSERT(*reqp == NULL);
500                 RETURN(rc);
501         }
502
503         if (rc == 0 && *reqp == NULL) {
504                 /* once again, we're asked for lookup, not revalidate */
505                 CWARN("lookup for %lu/%lu/%lu and data should be uptodate\n",
506                        (unsigned long) rpfid.mds,
507                        (unsigned long) rpfid.id,
508                        (unsigned long) rpfid.generation);
509                 RETURN(rc);
510         }
511        
512         if (rc == -ERESTART) {
513                 /* directory got splitted since last update. this shouldn't
514                  * be becasue splitting causes lock revocation, so revalidate
515                  * had to fail and lookup on dir had to return mea */
516                 CWARN("we haven't knew about directory splitting!\n");
517                 LASSERT(obj == NULL);
518                 rc = lmv_create_obj_from_attrs(exp, &rpfid, NULL);
519                 if (rc)
520                         RETURN(rc);
521                 goto repeat;
522         }
523
524         if (rc < 0)
525                 RETURN(rc);
526
527         /* okay, MDS has returned success. probably name has been
528          * resolved in remote inode */
529         rc = lmv_handle_remote_inode(exp, uctxt, lmm, lmmsize, it, flags,
530                                      reqp, cb_blocking);
531
532         if (rc == 0 && (mea = is_body_of_splitted_dir(*reqp, 1))) {
533                 /* wow! this is splitted dir, we'd like to handle it */
534                 body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
535                 LASSERT(body != NULL);
536                 obj = lmv_grab_obj(obd, &body->fid1, 0);
537                 if (!obj)
538                         rc = lmv_create_obj_from_attrs(exp, &body->fid1, mea);
539                 lmv_put_obj(obj);
540         }
541
542         RETURN(rc);
543 }
544
545 int lmv_intent_lock(struct obd_export *exp, struct ll_uctxt *uctxt,
546                 struct ll_fid *pfid, const char *name, int len,
547                 void *lmm, int lmmsize, struct ll_fid *cfid,
548                 struct lookup_intent *it, int flags,
549                 struct ptlrpc_request **reqp,
550                 ldlm_blocking_callback cb_blocking)
551 {
552         struct obd_device *obd = exp->exp_obd;
553         int rc = 0;
554         ENTRY;
555
556         LASSERT(it);
557         LASSERT(pfid);
558
559         CDEBUG(D_OTHER, "INTENT LOCK '%s' for '%*s' on %lu/%lu -> %u\n",
560                         LL_IT2STR(it), len, name, (unsigned long) pfid->id,
561                         (unsigned long) pfid->generation, pfid->mds);
562
563         lmv_connect(obd);
564         if (it->it_op == IT_LOOKUP)
565                 rc = lmv_intent_lookup(exp, uctxt, pfid, name, len, lmm,
566                                        lmmsize, cfid, it, flags, reqp,
567                                        cb_blocking);
568         else if (it->it_op & IT_OPEN)
569                 rc = lmv_intent_open(exp, uctxt, pfid, name, len, lmm,
570                                      lmmsize, cfid, it, flags, reqp,
571                                      cb_blocking);
572         else if (it->it_op == IT_GETATTR || it->it_op == IT_CHDIR)
573                 rc = lmv_intent_getattr(exp, uctxt, pfid, name, len, lmm,
574                                         lmmsize, cfid, it, flags, reqp,
575                                         cb_blocking);
576         else
577                 LBUG();
578         RETURN(rc);
579 }
580
581 int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp,
582                           struct ll_fid *mfid, struct lookup_intent *oit,
583                           int master_valid, ldlm_blocking_callback cb_blocking)
584 {
585         struct obd_device *obd = exp->exp_obd;
586         struct ptlrpc_request *mreq = *reqp;
587         struct lmv_obd *lmv = &obd->u.lmv;
588         struct lustre_handle master_lockh;
589         unsigned long size = 0;
590         struct ldlm_lock *lock;
591         struct mds_body *body;
592         struct ll_uctxt uctxt;
593         struct lmv_obj *obj;
594         int master_lock_mode;
595         int i, rc = 0;
596         ENTRY;
597
598         /* we have to loop over the subobjects, check validity and update
599          * them from MDSs if needed. it's very useful that we need not to
600          * update all the fields. say, common fields (that are equal on 
601          * all the subojects need not to be update, another fields (i_size,
602          * for example) are cached all the time */
603         obj = lmv_grab_obj(obd, mfid, 0);
604         LASSERT(obj);
605
606         master_lock_mode = 0;
607         uctxt.gid1 = 0;
608         uctxt.gid2 = 0;
609         for (i = 0; i < obj->objcount; i++) {
610                 struct ll_fid fid = obj->objs[i].fid;
611                 struct lustre_handle *lockh = NULL;
612                 struct ptlrpc_request *req = NULL;
613                 ldlm_blocking_callback cb;
614                 struct lookup_intent it;
615                 int master = 0;
616
617                 CDEBUG(D_OTHER, "revalidate subobj %lu/%lu/%lu\n",
618                        (unsigned long) fid.mds,
619                        (unsigned long) fid.id,
620                        (unsigned long) fid.generation);
621
622                 memset(&it, 0, sizeof(it));
623                 it.it_op = IT_GETATTR;
624                 cb = lmv_dirobj_blocking_ast;
625
626                 if (fid_equal(&fid, &obj->fid)) {
627                         if (master_valid) {
628                                 /* lmv_intent_getattr() already checked
629                                  * validness and took the lock */
630                                 if (mreq) {
631                                         /* it even got the reply
632                                          * refresh attrs from that reply */
633                                         body = lustre_msg_buf(mreq->rq_repmsg,
634                                                               1,sizeof(*body));
635                                         LASSERT(body != NULL);
636                                         goto update; 
637                                 }
638                                 /* take already cached attrs into account */
639                                 CDEBUG(D_OTHER,
640                                        "master is locked and cached\n");
641                                 goto release_lock;
642                         }
643                         master = 1;
644                         cb = cb_blocking;
645                 }
646
647                 /* is obj valid? */
648                 rc = md_intent_lock(lmv->tgts[fid.mds].exp, &uctxt, &fid,
649                                     NULL, 0, NULL, 0, &fid, &it, 0, &req, cb);
650                 lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle;
651                 if (rc > 0) {
652                         /* nice, this slave is valid */
653                         LASSERT(req == NULL);
654                         CDEBUG(D_OTHER, "cached\n");
655                         goto release_lock;
656                 }
657
658                 if (rc < 0) {
659                         /* error during revalidation */
660                         GOTO(cleanup, rc);
661                 }
662
663                 /* rc == 0, this means we have no such a lock and can't
664                  * think obj is still valid. lookup it again */
665                 LASSERT(req == NULL);
666                 req = NULL;
667                 memset(&it, 0, sizeof(it));
668                 it.it_op = IT_GETATTR;
669                 rc = md_intent_lock(lmv->tgts[fid.mds].exp, &uctxt, &fid,
670                                     NULL, 0, NULL, 0, NULL, &it, 0, &req, cb);
671                 lockh = (struct lustre_handle *) &it.d.lustre.it_lock_handle;
672                 LASSERT(rc <= 0);
673                 if (rc < 0) {
674                         /* error during lookup */
675                         GOTO(cleanup, rc);
676                 }
677               
678                 if (master) {
679                         LASSERT(master_valid == 0);
680                         /* save lock on master to be returned to the caller */
681                         CDEBUG(D_OTHER, "no lock on master yet\n");
682                         memcpy(&master_lockh, lockh, sizeof(master_lockh));
683                         master_lock_mode = it.d.lustre.it_lock_mode;
684                         it.d.lustre.it_lock_mode = 0;
685                 } else {
686                         /* this is slave. we want to control it */
687                         lock = ldlm_handle2lock(lockh);
688                         LASSERT(lock);
689                         lock->l_ast_data = obj;
690                         atomic_inc(&obj->count);
691                         LDLM_LOCK_PUT(lock);
692                 }
693
694                 if (*reqp == NULL) {
695                         /* this is first reply, we'll use it to return
696                          * updated data back to the caller */
697                         LASSERT(req);
698                         ptlrpc_request_addref(req);
699                         *reqp = req;
700
701                 }
702
703                 body = lustre_msg_buf(req->rq_repmsg, 1, sizeof(*body));
704                 LASSERT(body);
705                 
706 update:
707                 obj->objs[i].size = body->size;
708                 CDEBUG(D_OTHER, "fresh: %lu\n",
709                        (unsigned long) obj->objs[i].size);
710
711                 if (req)
712                         ptlrpc_req_finished(req);
713 release_lock:
714                 size += obj->objs[i].size;
715                 if (it.d.lustre.it_lock_mode)
716                         ldlm_lock_decref(lockh, it.d.lustre.it_lock_mode);
717         }
718
719         if (*reqp) {
720                 /* some attrs got refreshed, we have reply and it's time
721                  * to put fresh attrs to it */
722                 CDEBUG(D_OTHER, "return refreshed attrs: size = %lu\n",
723                        (unsigned long) size);
724                 body = lustre_msg_buf((*reqp)->rq_repmsg, 1, sizeof(*body));
725                 LASSERT(body);
726                 /* FIXME: what about another attributes? */
727                 body->size = size;
728                 if (mreq == NULL) {
729                         /* very important to maintain lli->mds the same
730                          * because of revalidation. mreq == NULL means
731                          * that caller has no reply and the only attr
732                          * we can return is size */
733                         body->valid = OBD_MD_FLSIZE;
734                         body->mds = obj->fid.mds;
735                 }
736                 if (master_valid == 0) {
737                         memcpy(&oit->d.lustre.it_lock_handle,
738                                &master_lockh, sizeof(master_lockh));
739                         oit->d.lustre.it_lock_mode = master_lock_mode;
740                 }
741                 rc = 0;
742         } else {
743                 /* it seems all the attrs are fresh and we did no request */
744                 CDEBUG(D_OTHER, "all the attrs were fresh\n");
745                 if (master_valid == 0)
746                         oit->d.lustre.it_lock_mode = master_lock_mode;
747                 rc = 1;
748         }
749 cleanup:
750         RETURN(rc);
751 }
752