Whamcloud - gitweb
- fixes about using lmv_get_export(), fixes possible memory leaks;
[fs/lustre-release.git] / lustre / lmv / lmv_intent.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003, 2004, 2005, 2006 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #ifndef EXPORT_SYMTAB
23 # define EXPORT_SYMTAB
24 #endif
25 #define DEBUG_SUBSYSTEM S_LMV
26 #ifdef __KERNEL__
27 #include <linux/slab.h>
28 #include <linux/module.h>
29 #include <linux/init.h>
30 #include <linux/slab.h>
31 #include <linux/pagemap.h>
32 #include <asm/div64.h>
33 #include <linux/seq_file.h>
34 #include <linux/namei.h>
35 #else
36 #include <liblustre.h>
37 #endif
38
39 #include <lustre/lustre_idl.h>
40 #include <obd_support.h>
41 #include <lustre_lib.h>
42 #include <lustre_net.h>
43 #include <lustre_dlm.h>
44 #include <obd_class.h>
45 #include <lprocfs_status.h>
46 #include "lmv_internal.h"
47
48 static inline void lmv_drop_intent_lock(struct lookup_intent *it)
49 {
50         if (it->d.lustre.it_lock_mode != 0) {
51                 ldlm_lock_decref((void *)&it->d.lustre.it_lock_handle,
52                                  it->d.lustre.it_lock_mode);
53                 it->d.lustre.it_lock_mode = 0;
54         }
55 }
56
57 int lmv_intent_remote(struct obd_export *exp, void *lmm,
58                       int lmmsize, struct lookup_intent *it,
59                       int flags, struct ptlrpc_request **reqp,
60                       ldlm_blocking_callback cb_blocking,
61                       int extra_lock_flags)
62 {
63         struct obd_device *obd = exp->exp_obd;
64         struct lmv_obd *lmv = &obd->u.lmv;
65         struct ptlrpc_request *req = NULL;
66         struct mdt_body *body = NULL;
67         struct lustre_handle plock;
68         struct md_op_data *op_data;
69         struct obd_export *tgt_exp;
70         int pmode, rc = 0;
71         ENTRY;
72
73         body = lustre_msg_buf((*reqp)->rq_repmsg,
74                               DLM_REPLY_REC_OFF, sizeof(*body));
75         LASSERT(body != NULL);
76
77         if (!(body->valid & OBD_MD_MDS))
78                 RETURN(0);
79
80         tgt_exp = lmv_get_export(lmv, &body->fid1);
81         if (IS_ERR(tgt_exp))
82                 RETURN(PTR_ERR(tgt_exp));
83
84         /*
85          * oh, MDS reports that this is remote inode case i.e. we have to ask
86          * for real attrs on another MDS.
87          */
88         if (it->it_op & IT_LOOKUP) {
89                 /*
90                  * unfortunately, we have to lie to MDC/MDS to retrieve
91                  * attributes llite needs.
92                  */
93                 it->it_op = IT_GETATTR;
94         }
95
96         /* we got LOOKUP lock, but we really need attrs */
97         pmode = it->d.lustre.it_lock_mode;
98         if (pmode) {
99                 plock.cookie = it->d.lustre.it_lock_handle;
100                 it->d.lustre.it_lock_mode = 0;
101                 it->d.lustre.it_data = 0;
102         }
103
104         LASSERT(fid_is_sane(&body->fid1));
105
106         it->d.lustre.it_disposition &= ~DISP_ENQ_COMPLETE;
107
108         OBD_ALLOC_PTR(op_data);
109         if (op_data == NULL)
110                 GOTO(out, rc = -ENOMEM);
111
112         op_data->fid1 = body->fid1;
113
114         rc = md_intent_lock(tgt_exp, op_data, lmm, lmmsize, it, flags,
115                             &req, cb_blocking, extra_lock_flags);
116
117         /*
118          * llite needs LOOKUP lock to track dentry revocation in order to
119          * maintain dcache consistency. Thus drop UPDATE lock here and put
120          * LOOKUP in request.
121          */
122         if (rc == 0) {
123                 lmv_drop_intent_lock(it);
124                 it->d.lustre.it_lock_handle = plock.cookie;
125                 it->d.lustre.it_lock_mode = pmode;
126         }
127
128         OBD_FREE_PTR(op_data);
129         EXIT;
130 out:
131         if (rc && pmode)
132                 ldlm_lock_decref(&plock, pmode);
133
134         ptlrpc_req_finished(*reqp);
135         *reqp = req;
136         return rc;
137 }
138
139 int lmv_alloc_fid_for_split(struct obd_device *obd, struct lu_fid *pid,
140                             struct md_op_data *op, struct lu_fid *fid)
141 {
142         struct lmv_obd *lmv = &obd->u.lmv;
143         struct lmv_obj *obj;
144         struct lu_fid *rpid;
145         mdsno_t mds;
146         int rc;
147         ENTRY;
148
149         obj = lmv_obj_grab(obd, pid);
150         if (!obj)
151                RETURN(0);
152         mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
153                            (char *)op->name, op->namelen);
154         rpid = &obj->lo_inodes[mds].li_fid;
155         rc = lmv_fld_lookup(lmv, rpid, &mds);
156         if (rc)
157                 GOTO(cleanup, rc);
158
159         rc = obd_fid_alloc(lmv->tgts[mds].ltd_exp, fid, NULL);
160         if (rc < 0)
161                 GOTO(cleanup, rc);
162         if (rc > 0) {
163                 LASSERT(fid_is_sane(fid));
164                 rc = fld_client_create(&lmv->lmv_fld,
165                                        fid_seq(fid), mds, NULL);
166                 if (rc) {
167                         CERROR("can't create fld rc%d\n", rc);
168                         GOTO(cleanup, rc);
169                 }
170         }
171         CDEBUG(D_INFO, "Allocate new fid"DFID"for split obj\n",PFID(fid));
172 cleanup:
173         lmv_obj_put(obj);
174         RETURN(rc);
175 }
176
177 /*
178  * IT_OPEN is intended to open (and create, possible) an object. Parent (pid)
179  * may be split dir.
180  */
181 int lmv_intent_open(struct obd_export *exp, struct md_op_data *op_data,
182                     void *lmm, int lmmsize, struct lookup_intent *it,
183                     int flags, struct ptlrpc_request **reqp,
184                     ldlm_blocking_callback cb_blocking,
185                     int extra_lock_flags)
186 {
187         struct obd_device *obd = exp->exp_obd;
188         struct lu_fid rpid = op_data->fid1;
189         struct lmv_obd *lmv = &obd->u.lmv;
190         struct mdt_body *body = NULL;
191         struct md_op_data *sop_data;
192         struct lmv_stripe_md *mea;
193         struct lmv_obj *obj;
194         int rc, loop = 0;
195         mdsno_t mds;
196         ENTRY;
197
198         OBD_ALLOC_PTR(sop_data);
199         if (sop_data == NULL)
200                 RETURN(-ENOMEM);
201
202         /* save op_data fro repeat case */
203         *sop_data = *op_data;
204
205 repeat:
206         LASSERT(++loop <= 2);
207         rc = lmv_fld_lookup(lmv, &rpid, &mds);
208         if (rc)
209                 GOTO(out_free_sop_data, rc);
210
211         obj = lmv_obj_grab(obd, &rpid);
212         if (obj) {
213                 /*
214                  * Directory is already split, so we have to forward request to
215                  * the right MDS.
216                  */
217                 mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
218                                    (char *)op_data->name, op_data->namelen);
219
220                 rpid = obj->lo_inodes[mds].li_fid;
221                 rc = lmv_fld_lookup(lmv, &rpid, &mds);
222                 lmv_obj_put(obj);
223                 if (rc)
224                         GOTO(out_free_sop_data, rc);
225
226                 CDEBUG(D_OTHER, "forward to MDS #"LPU64" ("DFID")\n",
227                        mds, PFID(&rpid));
228         }
229
230         sop_data->fid1 = rpid;
231
232         rc = md_intent_lock(lmv->tgts[mds].ltd_exp, sop_data,
233                             lmm, lmmsize, it, flags, reqp,
234                             cb_blocking, extra_lock_flags);
235
236         if (rc == -ERESTART) {
237                 /*
238                  * Directory got split. Time to update local object and repeat
239                  * the request with proper MDS.
240                  */
241                 LASSERT(lu_fid_eq(&op_data->fid1, &rpid));
242                 rc = lmv_handle_split(exp, &rpid);
243                 if (rc == 0) {
244                         ptlrpc_req_finished(*reqp);
245
246                         /* 
247                          * Zero out reqp to not confuse client. In many cases it
248                          * tries to free req even if error is returned.
249                          */
250                         it->d.lustre.it_data = 0;
251                         *reqp = NULL;
252
253                        /* We shoudld reallocate the FID for the object */
254                         rc = lmv_alloc_fid_for_split(obd, &rpid, op_data,
255                                                      &sop_data->fid2);
256                         if (rc)
257                                 GOTO(out_free_sop_data, rc);
258                         goto repeat;
259                 }
260         }
261         
262         if (rc != 0)
263                 GOTO(out_free_sop_data, rc);
264
265         /*
266          * Okay, MDS has returned success. Probably name has been resolved in
267          * remote inode.
268          */
269         rc = lmv_intent_remote(exp, lmm, lmmsize, it, flags, reqp,
270                                cb_blocking, extra_lock_flags);
271         if (rc != 0) {
272                 LASSERT(rc < 0);
273
274                 /*
275                  * This is possible, that some userspace application will try to
276                  * open file as directory and we will have -ENOTDIR here. As
277                  * this is "usual" situation, we should not print error here,
278                  * only debug info.
279                  */
280                 CDEBUG(D_OTHER, "can't handle remote %s: dir "DFID"("DFID"):"
281                        "%*s: %d\n", LL_IT2STR(it), PFID(&op_data->fid2),
282                        PFID(&rpid), op_data->namelen, op_data->name, rc);
283                 GOTO(out_free_sop_data, rc);
284         }
285
286         /*
287          * nothing is found, do not access body->fid1 as it is zero and thus
288          * pointless.
289          */
290         if ((it->d.lustre.it_disposition & DISP_LOOKUP_NEG) &&
291             !(it->d.lustre.it_disposition & DISP_OPEN_CREATE) &&
292             !(it->d.lustre.it_disposition & DISP_OPEN_OPEN))
293                 GOTO(out_free_sop_data, rc = 0);
294
295         /* caller may use attrs MDS returns on IT_OPEN lock request so, we have
296          * to update them for split dir */
297         body = lustre_msg_buf((*reqp)->rq_repmsg,
298                               DLM_REPLY_REC_OFF, sizeof(*body));
299         LASSERT(body != NULL);
300
301         /* could not find object, FID is not present in response. */
302         if (!(body->valid & OBD_MD_FLID))
303                 GOTO(out_free_sop_data, rc = 0);
304
305         obj = lmv_obj_grab(obd, &body->fid1);
306         if (!obj && (mea = lmv_get_mea(*reqp, DLM_REPLY_REC_OFF))) {
307
308                 /* FIXME: capability for remote! */
309                 /* wow! this is split dir, we'd like to handle it */
310                 obj = lmv_obj_create(exp, &body->fid1, mea);
311                 if (IS_ERR(obj))
312                         GOTO(out_free_sop_data, rc = (int)PTR_ERR(obj));
313         }
314
315         if (obj) {
316                 /* This is split dir and we'd want to get attrs. */
317                 CDEBUG(D_OTHER, "attrs from slaves for "DFID"\n",
318                        PFID(&body->fid1));
319
320                 rc = lmv_revalidate_slaves(exp, reqp, &body->fid1, it, 1,
321                                            cb_blocking, extra_lock_flags);
322         } else if (S_ISDIR(body->mode)) {
323                 CDEBUG(D_OTHER, "object "DFID" has not lmv obj?\n",
324                        PFID(&body->fid1));
325         }
326
327         if (obj)
328                 lmv_obj_put(obj);
329
330         EXIT;
331 out_free_sop_data:
332         OBD_FREE_PTR(sop_data);
333         return rc;
334 }
335
336 int lmv_intent_getattr(struct obd_export *exp, struct md_op_data *op_data,
337                        void *lmm, int lmmsize, struct lookup_intent *it,
338                        int flags, struct ptlrpc_request **reqp,
339                        ldlm_blocking_callback cb_blocking,
340                        int extra_lock_flags)
341 {
342         struct lmv_obj *obj = NULL, *obj2 = NULL;
343         struct obd_device *obd = exp->exp_obd;
344         struct lu_fid rpid = op_data->fid1;
345         struct lmv_obd *lmv = &obd->u.lmv;
346         struct mdt_body *body = NULL;
347         struct md_op_data *sop_data;
348         struct lmv_stripe_md *mea;
349         mdsno_t mds;
350         int rc = 0;
351         ENTRY;
352
353         OBD_ALLOC_PTR(sop_data);
354         if (sop_data == NULL)
355                 RETURN(-ENOMEM);
356
357         /* save op_data fro repeat case */
358         *sop_data = *op_data;
359
360         if (fid_is_sane(&op_data->fid2)) {
361                 /*
362                  * Caller wants to revalidate attrs of obj we have to revalidate
363                  * slaves if requested object is split directory.
364                  */
365                 CDEBUG(D_OTHER, "revalidate attrs for "DFID"\n",
366                        PFID(&op_data->fid2));
367
368                 rc = lmv_fld_lookup(lmv, &op_data->fid2, &mds);
369                 if (rc)
370                         GOTO(out_free_sop_data, rc);
371 #if 0
372                 /*
373                  * In fact, we do not need this with current intent_lock(), but
374                  * it may change some day.
375                  */
376                 obj = lmv_obj_grab(obd, &op_data->fid2);
377                 if (obj) {
378                         if (!lu_fid_eq(&op_data->fid1, &op_data->fid2)){
379                                 rpid = obj->lo_inodes[mds].li_fid;
380                                 rc = lmv_fld_lookup(lmv, &rpid, &mds);
381                                 if (rc) {
382                                         lmv_obj_put(obj);
383                                         GOTO(out_free_sop_data, rc);
384                                 }
385                         }
386                         lmv_obj_put(obj);
387                 }
388 #endif
389         } else {
390                 CDEBUG(D_OTHER, "INTENT getattr for %*s on "DFID"\n",
391                        op_data->namelen, op_data->name,
392                        PFID(&op_data->fid1));
393
394                 rc = lmv_fld_lookup(lmv, &op_data->fid1, &mds);
395                 if (rc)
396                         GOTO(out_free_sop_data, rc);
397                 obj = lmv_obj_grab(obd, &op_data->fid1);
398                 if (obj && op_data->namelen) {
399                         /* directory is already split. calculate mds */
400                         mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
401                                            (char *)op_data->name,
402                                            op_data->namelen);
403
404                         rpid = obj->lo_inodes[mds].li_fid;
405                         rc = lmv_fld_lookup(lmv, &rpid, &mds);
406                         if (rc) {
407                                 lmv_obj_put(obj);
408                                 GOTO(out_free_sop_data, rc);
409                         }
410                         lmv_obj_put(obj);
411
412                         CDEBUG(D_OTHER, "forward to MDS #"LPU64" (slave "DFID")\n",
413                                mds, PFID(&rpid));
414                 }
415         }
416
417         sop_data->fid1 = rpid;
418
419         rc = md_intent_lock(lmv->tgts[mds].ltd_exp, sop_data, lmm,
420                             lmmsize, it, flags, reqp, cb_blocking,
421                             extra_lock_flags);
422         if (rc < 0)
423                 GOTO(out_free_sop_data, rc);
424
425         if (obj && rc > 0) {
426                 /*
427                  * This is split dir. In order to optimize things a bit, we
428                  * consider obj valid updating missing parts.
429
430                  * FIXME: do we need to return any lock here? It would be fine
431                  * if we don't. This means that nobody should use UPDATE lock to
432                  * notify about object * removal.
433                  */
434                 CDEBUG(D_OTHER,
435                        "revalidate slaves for "DFID", rc %d\n",
436                        PFID(&op_data->fid2), rc);
437
438                 LASSERT(fid_is_sane(&op_data->fid2));
439                 rc = lmv_revalidate_slaves(exp, reqp, &op_data->fid2, it, rc,
440                                            cb_blocking, extra_lock_flags);
441                 GOTO(out_free_sop_data, rc);
442         }
443
444         if (*reqp == NULL)
445                 GOTO(out_free_sop_data, rc);
446
447         /*
448          * okay, MDS has returned success. Probably name has been resolved in
449          * remote inode.
450          */
451         rc = lmv_intent_remote(exp, lmm, lmmsize, it, flags,
452                                reqp, cb_blocking, extra_lock_flags);
453         if (rc < 0)
454                 GOTO(out_free_sop_data, rc);
455
456         /*
457          * Nothing is found, do not access body->fid1 as it is zero and thus
458          * pointless.
459          */
460         if (it->d.lustre.it_disposition & DISP_LOOKUP_NEG)
461                 GOTO(out_free_sop_data, rc = 0);
462
463         LASSERT(*reqp);
464         LASSERT((*reqp)->rq_repmsg);
465         body = lustre_msg_buf((*reqp)->rq_repmsg,
466                               DLM_REPLY_REC_OFF, sizeof(*body));
467         LASSERT(body != NULL);
468
469         /* could not find object, FID is not present in response. */
470         if (!(body->valid & OBD_MD_FLID))
471                 GOTO(out_free_sop_data, rc = 0);
472
473         obj2 = lmv_obj_grab(obd, &body->fid1);
474
475         if (!obj2 && (mea = lmv_get_mea(*reqp, DLM_REPLY_REC_OFF))) {
476
477                 /* FIXME remote capability! */
478                 /* wow! this is split dir, we'd like to handle it. */
479                 obj2 = lmv_obj_create(exp, &body->fid1, mea);
480                 if (IS_ERR(obj2))
481                         GOTO(out_free_sop_data, rc = (int)PTR_ERR(obj2));
482         }
483
484         if (obj2) {
485                 /* this is split dir and we'd want to get attrs */
486                 CDEBUG(D_OTHER, "attrs from slaves for "DFID", rc %d\n",
487                        PFID(&body->fid1), rc);
488
489                 rc = lmv_revalidate_slaves(exp, reqp, &body->fid1, it, 1,
490                                            cb_blocking, extra_lock_flags);
491                 lmv_obj_put(obj2);
492         }
493
494         EXIT;
495 out_free_sop_data:
496         OBD_FREE_PTR(sop_data);
497         return rc;
498 }
499
500 void lmv_update_body(struct mdt_body *body, struct lmv_inode *lino)
501 {
502         /* update size */
503         body->size += lino->li_size;
504 }
505
506 /* this is not used currently */
507 int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp)
508 {
509         struct obd_device *obd = exp->exp_obd;
510         struct lmv_obd *lmv = &obd->u.lmv;
511         struct mdt_body *body = NULL;
512         struct lustre_handle *lockh;
513         struct md_op_data *op_data;
514         struct ldlm_lock *lock;
515         struct mdt_body *body2;
516         struct lmv_obj *obj;
517         int i, rc = 0;
518         ENTRY;
519
520         LASSERT(reqp);
521         LASSERT(*reqp);
522
523         /* master is locked. we'd like to take locks on slaves and update
524          * attributes to be returned from the slaves it's important that lookup
525          * is called in two cases:
526
527          *  - for first time (dcache has no such a resolving yet).  -
528          *  ->d_revalidate() returned false.
529
530          * last case possible only if all the objs (master and all slaves aren't
531          * valid */
532
533         body = lustre_msg_buf((*reqp)->rq_repmsg,
534                               DLM_REPLY_REC_OFF, sizeof(*body));
535         LASSERT(body != NULL);
536         LASSERT((body->valid & OBD_MD_FLID) != 0);
537
538         obj = lmv_obj_grab(obd, &body->fid1);
539         LASSERT(obj != NULL);
540
541         CDEBUG(D_OTHER, "lookup slaves for "DFID"\n",
542                PFID(&body->fid1));
543
544         OBD_ALLOC_PTR(op_data);
545         if (op_data == NULL)
546                 RETURN(-ENOMEM);
547
548         lmv_obj_lock(obj);
549
550         for (i = 0; i < obj->lo_objcount; i++) {
551                 struct lu_fid fid = obj->lo_inodes[i].li_fid;
552                 struct ptlrpc_request *req = NULL;
553                 struct obd_export *tgt_exp;
554                 struct lookup_intent it;
555
556                 if (lu_fid_eq(&fid, &obj->lo_fid))
557                         /* skip master obj */
558                         continue;
559
560                 CDEBUG(D_OTHER, "lookup slave "DFID"\n", PFID(&fid));
561
562                 /* is obj valid? */
563                 memset(&it, 0, sizeof(it));
564                 it.it_op = IT_GETATTR;
565
566                 memset(op_data, 0, sizeof(*op_data));
567                 op_data->fid1 = fid;
568                 op_data->fid2 = fid;
569
570                 tgt_exp = lmv_get_export(lmv, &fid);
571                 if (IS_ERR(tgt_exp))
572                         GOTO(cleanup, rc = PTR_ERR(tgt_exp));
573
574                 rc = md_intent_lock(tgt_exp, op_data, NULL, 0, &it, 0, &req,
575                                     lmv_blocking_ast, 0);
576
577                 lockh = (struct lustre_handle *)&it.d.lustre.it_lock_handle;
578                 if (rc > 0 && req == NULL) {
579                         /* nice, this slave is valid */
580                         LASSERT(req == NULL);
581                         CDEBUG(D_OTHER, "cached\n");
582                         goto release_lock;
583                 }
584
585                 if (rc < 0) {
586                         /* error during lookup */
587                         GOTO(cleanup, rc);
588                 }
589                 lock = ldlm_handle2lock(lockh);
590                 LASSERT(lock);
591
592                 lock->l_ast_data = lmv_obj_get(obj);
593
594                 body2 = lustre_msg_buf(req->rq_repmsg,
595                                        DLM_REPLY_REC_OFF, sizeof(*body2));
596                 LASSERT(body2);
597
598                 obj->lo_inodes[i].li_size = body2->size;
599
600                 CDEBUG(D_OTHER, "fresh: %lu\n",
601                        (unsigned long)obj->lo_inodes[i].li_size);
602
603                 LDLM_LOCK_PUT(lock);
604
605                 if (req)
606                         ptlrpc_req_finished(req);
607 release_lock:
608                 lmv_update_body(body, obj->lo_inodes + i);
609
610                 if (it.d.lustre.it_lock_mode) {
611                         ldlm_lock_decref(lockh, it.d.lustre.it_lock_mode);
612                         it.d.lustre.it_lock_mode = 0;
613                 }
614         }
615
616         EXIT;
617 cleanup:
618         OBD_FREE_PTR(op_data);
619         lmv_obj_unlock(obj);
620         lmv_obj_put(obj);
621         return rc;
622 }
623
624 int lmv_intent_lookup(struct obd_export *exp, struct md_op_data *op_data,
625                       void *lmm, int lmmsize, struct lookup_intent *it,
626                       int flags, struct ptlrpc_request **reqp,
627                       ldlm_blocking_callback cb_blocking,
628                       int extra_lock_flags)
629 {
630         struct obd_device *obd = exp->exp_obd;
631         struct lu_fid rpid = op_data->fid1;
632         struct lmv_obd *lmv = &obd->u.lmv;
633         struct mdt_body *body = NULL;
634         struct md_op_data *sop_data;
635         struct lmv_stripe_md *mea;
636         struct lmv_obj *obj;
637         int rc, loop = 0;
638         mdsno_t mds;
639         ENTRY;
640
641         OBD_ALLOC_PTR(sop_data);
642         if (sop_data == NULL)
643                 RETURN(-ENOMEM);
644
645         /* save op_data fro repeat case */
646         *sop_data = *op_data;
647
648         /*
649          * IT_LOOKUP is intended to produce name -> fid resolving (let's call
650          * this lookup below) or to confirm requested resolving is still valid
651          * (let's call this revalidation) fid_is_sane(&sop_data->fid2) specifies
652          * revalidation.
653          */
654         if (fid_is_sane(&op_data->fid2)) {
655                 /*
656                  * This is revalidate: we have to check is LOOKUP lock still
657                  * valid for given fid. Very important part is that we have to
658                  * choose right mds because namespace is per mds.
659                  */
660                 rpid = op_data->fid1;
661                 obj = lmv_obj_grab(obd, &rpid);
662                 if (obj) {
663                         mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
664                                            (char *)op_data->name,
665                                            op_data->namelen);
666                         rpid = obj->lo_inodes[mds].li_fid;
667                         lmv_obj_put(obj);
668                 }
669                 rc = lmv_fld_lookup(lmv, &rpid, &mds);
670                 if (rc)
671                         GOTO(out_free_sop_data, rc);
672
673                 CDEBUG(D_OTHER, "revalidate lookup for "DFID" to #"LPU64" MDS\n",
674                        PFID(&op_data->fid2), mds);
675         } else {
676                 rc = lmv_fld_lookup(lmv, &op_data->fid1, &mds);
677                 if (rc)
678                         GOTO(out_free_sop_data, rc);
679 repeat:
680                 LASSERT(++loop <= 2);
681
682                 /*
683                  * This is lookup. During lookup we have to update all the
684                  * attributes, because returned values will be put in struct
685                  * inode.
686                  */
687                 obj = lmv_obj_grab(obd, &op_data->fid1);
688                 if (obj) {
689                         if (op_data->namelen) {
690                                 /* directory is already split. calculate mds */
691                                 mds = raw_name2idx(obj->lo_hashtype,
692                                                    obj->lo_objcount,
693                                                    (char *)op_data->name,
694                                                    op_data->namelen);
695                                 rpid = obj->lo_inodes[mds].li_fid;
696                                 rc = lmv_fld_lookup(lmv, &rpid, &mds);
697                                 if (rc) {
698                                         lmv_obj_put(obj);
699                                         GOTO(out_free_sop_data, rc);
700                                 }
701                         }
702                         lmv_obj_put(obj);
703                 }
704                 fid_zero(&op_data->fid2);
705         }
706
707         sop_data->fid1 = rpid;
708
709         rc = md_intent_lock(lmv->tgts[mds].ltd_exp, sop_data, lmm, lmmsize,
710                             it, flags, reqp, cb_blocking, extra_lock_flags);
711         if (rc > 0) {
712                 LASSERT(fid_is_sane(&op_data->fid2));
713                 GOTO(out_free_sop_data, rc);
714         }
715         if (rc > 0) {
716                 /*
717                  * Very interesting. it seems object is still valid but for some
718                  * reason llite calls lookup, not revalidate.
719                  */
720                 CDEBUG(D_OTHER, "lookup for "DFID" and data should be uptodate\n",
721                        PFID(&rpid));
722                 LASSERT(*reqp == NULL);
723                 GOTO(out_free_sop_data, rc);
724         }
725
726         if (rc == 0 && *reqp == NULL) {
727                 /* once again, we're asked for lookup, not revalidate */
728                 CDEBUG(D_OTHER, "lookup for "DFID" and data should be uptodate\n",
729                        PFID(&rpid));
730                 GOTO(out_free_sop_data, rc);
731         }
732
733         if (rc == -ERESTART) {
734                 /*
735                  * Directory got split since last update. This shouldn't be
736                  * becasue splitting causes lock revocation, so revalidate had
737                  * to fail and lookup on dir had to return mea.
738                  */
739                 CWARN("we haven't knew about directory splitting!\n");
740                 LASSERT(obj == NULL);
741
742                 obj = lmv_obj_create(exp, &rpid, NULL);
743                 if (IS_ERR(obj))
744                         RETURN((int)PTR_ERR(obj));
745                 lmv_obj_put(obj);
746                 goto repeat;
747         }
748
749         if (rc < 0)
750                 GOTO(out_free_sop_data, rc);
751
752         /*
753          * Okay, MDS has returned success. Probably name has been resolved in
754          * remote inode.
755          */
756         rc = lmv_intent_remote(exp, lmm, lmmsize, it, flags, reqp,
757                                cb_blocking, extra_lock_flags);
758
759         if (rc == 0 && (mea = lmv_get_mea(*reqp, DLM_REPLY_REC_OFF))) {
760                 /* wow! this is split dir, we'd like to handle it */
761                 body = lustre_msg_buf((*reqp)->rq_repmsg,
762                                       DLM_REPLY_REC_OFF, sizeof(*body));
763                 LASSERT(body != NULL);
764                 LASSERT((body->valid & OBD_MD_FLID) != 0);
765
766                 body = lustre_msg_buf((*reqp)->rq_repmsg,
767                                       DLM_REPLY_REC_OFF, sizeof(*body));
768                 LASSERT(body != NULL);
769                 LASSERT((body->valid & OBD_MD_FLID) != 0);
770
771                 obj = lmv_obj_grab(obd, &body->fid1);
772                 if (!obj) {
773                         /* FIXME: remote capability */
774                         obj = lmv_obj_create(exp, &body->fid1, mea);
775                         if (IS_ERR(obj))
776                                 GOTO(out_free_sop_data, rc = (int)PTR_ERR(obj));
777                 }
778                 lmv_obj_put(obj);
779         }
780
781         EXIT;
782 out_free_sop_data:
783         OBD_FREE_PTR(sop_data);
784         return rc;
785 }
786
787 int lmv_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
788                     void *lmm, int lmmsize, struct lookup_intent *it,
789                     int flags, struct ptlrpc_request **reqp,
790                     ldlm_blocking_callback cb_blocking,
791                     int extra_lock_flags)
792 {
793         struct obd_device *obd = exp->exp_obd;
794         int rc;
795         ENTRY;
796
797         LASSERT(it != NULL);
798         LASSERT(fid_is_sane(&op_data->fid1));
799
800         CDEBUG(D_OTHER, "INTENT LOCK '%s' for '%*s' on "DFID"\n",
801                LL_IT2STR(it), op_data->namelen, op_data->name,
802                PFID(&op_data->fid1));
803
804         rc = lmv_check_connect(obd);
805         if (rc)
806                 RETURN(rc);
807
808         if (it->it_op & IT_LOOKUP)
809                 rc = lmv_intent_lookup(exp, op_data, lmm, lmmsize, it,
810                                        flags, reqp, cb_blocking,
811                                        extra_lock_flags);
812         else if (it->it_op & IT_OPEN)
813                 rc = lmv_intent_open(exp, op_data, lmm, lmmsize, it,
814                                      flags, reqp, cb_blocking,
815                                      extra_lock_flags);
816         else if (it->it_op & IT_GETATTR)
817                 rc = lmv_intent_getattr(exp, op_data,lmm, lmmsize, it,
818                                         flags, reqp, cb_blocking,
819                                         extra_lock_flags);
820         else
821                 LBUG();
822         RETURN(rc);
823 }
824
825 int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp,
826                           const struct lu_fid *mid, struct lookup_intent *oit,
827                           int master_valid, ldlm_blocking_callback cb_blocking,
828                           int extra_lock_flags)
829 {
830         struct obd_device *obd = exp->exp_obd;
831         struct ptlrpc_request *mreq = *reqp;
832         struct lmv_obd *lmv = &obd->u.lmv;
833         struct lustre_handle master_lockh;
834         struct obd_export *tgt_exp;
835         struct md_op_data *op_data;
836         struct ldlm_lock *lock;
837         unsigned long size = 0;
838         struct mdt_body *body;
839         struct lmv_obj *obj;
840         int master_lock_mode;
841         int i, rc = 0;
842         ENTRY;
843
844         OBD_ALLOC_PTR(op_data);
845         if (op_data == NULL)
846                 RETURN(-ENOMEM);
847
848         /*
849          * We have to loop over the subobjects, check validity and update them
850          * from MDSs if needed. it's very useful that we need not to update all
851          * the fields. say, common fields (that are equal on all the subojects
852          * need not to be update, another fields (i_size, for example) are
853          * cached all the time.
854          */
855         obj = lmv_obj_grab(obd, mid);
856         LASSERT(obj != NULL);
857
858         master_lock_mode = 0;
859
860         lmv_obj_lock(obj);
861
862         for (i = 0; i < obj->lo_objcount; i++) {
863                 struct lu_fid fid = obj->lo_inodes[i].li_fid;
864                 struct lustre_handle *lockh = NULL;
865                 struct ptlrpc_request *req = NULL;
866                 ldlm_blocking_callback cb;
867                 struct lookup_intent it;
868                 int master = 0;
869
870                 CDEBUG(D_OTHER, "revalidate subobj "DFID"\n",
871                        PFID(&fid));
872
873                 memset(op_data, 0, sizeof(*op_data));
874                 memset(&it, 0, sizeof(it));
875                 it.it_op = IT_GETATTR;
876
877                 cb = lmv_blocking_ast;
878
879                 if (lu_fid_eq(&fid, &obj->lo_fid)) {
880                         if (master_valid) {
881                                 /* lmv_intent_getattr() already checked
882                                  * validness and took the lock */
883                                 if (mreq) {
884                                         /* it even got the reply refresh attrs
885                                          * from that reply */
886                                         body = lustre_msg_buf(mreq->rq_repmsg,
887                                                               DLM_REPLY_REC_OFF,
888                                                               sizeof(*body));
889                                         LASSERT(body != NULL);
890                                         goto update;
891                                 }
892                                 /* take already cached attrs into account */
893                                 CDEBUG(D_OTHER,
894                                        "master is locked and cached\n");
895                                 goto release_lock;
896                         }
897                         master = 1;
898                         cb = cb_blocking;
899                 }
900
901                 op_data->fid1 = fid;
902                 op_data->fid2 = fid;
903
904                 /* is obj valid? */
905                 tgt_exp = lmv_get_export(lmv, &fid);
906                 if (IS_ERR(tgt_exp))
907                         GOTO(out_free_op_data, rc = PTR_ERR(tgt_exp));
908
909                 rc = md_intent_lock(tgt_exp, op_data, NULL, 0, &it, 0, &req, cb,
910                                     extra_lock_flags);
911
912                 lockh = (struct lustre_handle *)&it.d.lustre.it_lock_handle;
913                 if (rc > 0 && req == NULL) {
914                         /* nice, this slave is valid */
915                         LASSERT(req == NULL);
916                         CDEBUG(D_OTHER, "cached\n");
917                         goto release_lock;
918                 }
919
920                 if (rc < 0)
921                         GOTO(cleanup, rc);
922                 
923                 if (master) {
924                         LASSERT(master_valid == 0);
925                         /* save lock on master to be returned to the caller */
926                         CDEBUG(D_OTHER, "no lock on master yet\n");
927                         memcpy(&master_lockh, lockh, sizeof(master_lockh));
928                         master_lock_mode = it.d.lustre.it_lock_mode;
929                         it.d.lustre.it_lock_mode = 0;
930                 } else {
931                         /* this is slave. we want to control it */
932                         lock = ldlm_handle2lock(lockh);
933                         LASSERT(lock);
934                         lock->l_ast_data = lmv_obj_get(obj);
935                         LDLM_LOCK_PUT(lock);
936                 }
937
938                 if (*reqp == NULL) {
939                         /* this is first reply, we'll use it to return updated
940                          * data back to the caller */
941                         LASSERT(req);
942                         ptlrpc_request_addref(req);
943                         *reqp = req;
944
945                 }
946
947                 body = lustre_msg_buf(req->rq_repmsg,
948                                       DLM_REPLY_REC_OFF, sizeof(*body));
949                 LASSERT(body);
950
951 update:
952                 obj->lo_inodes[i].li_size = body->size;
953
954                 CDEBUG(D_OTHER, "fresh: %lu\n",
955                        (unsigned long)obj->lo_inodes[i].li_size);
956
957                 if (req)
958                         ptlrpc_req_finished(req);
959 release_lock:
960                 size += obj->lo_inodes[i].li_size;
961
962                 if (it.d.lustre.it_lock_mode) {
963                         ldlm_lock_decref(lockh, it.d.lustre.it_lock_mode);
964                         it.d.lustre.it_lock_mode = 0;
965                 }
966         }
967
968         if (*reqp) {
969                 /* some attrs got refreshed, we have reply and it's time to put
970                  * fresh attrs to it */
971                 CDEBUG(D_OTHER, "return refreshed attrs: size = %lu\n",
972                        (unsigned long)size);
973
974                 body = lustre_msg_buf((*reqp)->rq_repmsg,
975                                       DLM_REPLY_REC_OFF, sizeof(*body));
976                 LASSERT(body);
977
978                 body->size = size;
979
980                 if (mreq == NULL) {
981                         /*
982                          * Very important to maintain mds num the same because
983                          * of revalidation. mreq == NULL means that caller has
984                          * no reply and the only attr we can return is size.
985                          */
986                         body->valid = OBD_MD_FLSIZE;
987
988 #if 0
989                         rc = lmv_fld_lookup(lmv, &obj->lo_fid, &body->mds);
990                         if (rc)
991                                 GOTO(cleanup, rc);
992 #endif
993                 }
994                 if (master_valid == 0) {
995                         memcpy(&oit->d.lustre.it_lock_handle,
996                                &master_lockh, sizeof(master_lockh));
997                         oit->d.lustre.it_lock_mode = master_lock_mode;
998                 }
999                 rc = 0;
1000         } else {
1001                 /* it seems all the attrs are fresh and we did no request */
1002                 CDEBUG(D_OTHER, "all the attrs were fresh\n");
1003                 if (master_valid == 0)
1004                         oit->d.lustre.it_lock_mode = master_lock_mode;
1005                 rc = 1;
1006         }
1007
1008         EXIT;
1009 cleanup:
1010         lmv_obj_unlock(obj);
1011         lmv_obj_put(obj);
1012 out_free_op_data:
1013         OBD_FREE_PTR(op_data);
1014         return rc;
1015 }