Whamcloud - gitweb
Revert "LU-2459 osd: add LMA incompat flag check"
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_MDC
38
39 #ifdef __KERNEL__
40 # include <linux/module.h>
41 # include <linux/pagemap.h>
42 # include <linux/miscdevice.h>
43 # include <linux/init.h>
44 #else
45 # include <liblustre.h>
46 #endif
47
48 #include <lustre_acl.h>
49 #include <obd_class.h>
50 #include <lustre_dlm.h>
51 /* fid_res_name_eq() */
52 #include <lustre_fid.h>
53 #include <lprocfs_status.h>
54 #include "mdc_internal.h"
55
56 struct mdc_getattr_args {
57         struct obd_export           *ga_exp;
58         struct md_enqueue_info      *ga_minfo;
59         struct ldlm_enqueue_info    *ga_einfo;
60 };
61
62 int it_disposition(struct lookup_intent *it, int flag)
63 {
64         return it->d.lustre.it_disposition & flag;
65 }
66 EXPORT_SYMBOL(it_disposition);
67
68 void it_set_disposition(struct lookup_intent *it, int flag)
69 {
70         it->d.lustre.it_disposition |= flag;
71 }
72 EXPORT_SYMBOL(it_set_disposition);
73
74 void it_clear_disposition(struct lookup_intent *it, int flag)
75 {
76         it->d.lustre.it_disposition &= ~flag;
77 }
78 EXPORT_SYMBOL(it_clear_disposition);
79
80 int it_open_error(int phase, struct lookup_intent *it)
81 {
82         if (it_disposition(it, DISP_OPEN_OPEN)) {
83                 if (phase >= DISP_OPEN_OPEN)
84                         return it->d.lustre.it_status;
85                 else
86                         return 0;
87         }
88
89         if (it_disposition(it, DISP_OPEN_CREATE)) {
90                 if (phase >= DISP_OPEN_CREATE)
91                         return it->d.lustre.it_status;
92                 else
93                         return 0;
94         }
95
96         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
97                 if (phase >= DISP_LOOKUP_EXECD)
98                         return it->d.lustre.it_status;
99                 else
100                         return 0;
101         }
102
103         if (it_disposition(it, DISP_IT_EXECD)) {
104                 if (phase >= DISP_IT_EXECD)
105                         return it->d.lustre.it_status;
106                 else
107                         return 0;
108         }
109         CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
110                it->d.lustre.it_status);
111         LBUG();
112         return 0;
113 }
114 EXPORT_SYMBOL(it_open_error);
115
116 /* this must be called on a lockh that is known to have a referenced lock */
117 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
118                       __u64 *bits)
119 {
120         struct ldlm_lock *lock;
121         struct inode *new_inode = data;
122         ENTRY;
123
124         if(bits)
125                 *bits = 0;
126
127         if (!*lockh)
128                 RETURN(0);
129
130         lock = ldlm_handle2lock((struct lustre_handle *)lockh);
131
132         LASSERT(lock != NULL);
133         lock_res_and_lock(lock);
134 #ifdef __KERNEL__
135         if (lock->l_resource->lr_lvb_inode &&
136             lock->l_resource->lr_lvb_inode != data) {
137                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
138                 LASSERTF(old_inode->i_state & I_FREEING,
139                          "Found existing inode %p/%lu/%u state %lu in lock: "
140                          "setting data to %p/%lu/%u\n", old_inode,
141                          old_inode->i_ino, old_inode->i_generation,
142                          old_inode->i_state,
143                          new_inode, new_inode->i_ino, new_inode->i_generation);
144         }
145 #endif
146         lock->l_resource->lr_lvb_inode = new_inode;
147         if (bits)
148                 *bits = lock->l_policy_data.l_inodebits.bits;
149
150         unlock_res_and_lock(lock);
151         LDLM_LOCK_PUT(lock);
152
153         RETURN(0);
154 }
155
156 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
157                            const struct lu_fid *fid, ldlm_type_t type,
158                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
159                            struct lustre_handle *lockh)
160 {
161         struct ldlm_res_id res_id;
162         ldlm_mode_t rc;
163         ENTRY;
164
165         fid_build_reg_res_name(fid, &res_id);
166         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
167                              &res_id, type, policy, mode, lockh, 0);
168         RETURN(rc);
169 }
170
171 int mdc_cancel_unused(struct obd_export *exp,
172                       const struct lu_fid *fid,
173                       ldlm_policy_data_t *policy,
174                       ldlm_mode_t mode,
175                       ldlm_cancel_flags_t flags,
176                       void *opaque)
177 {
178         struct ldlm_res_id res_id;
179         struct obd_device *obd = class_exp2obd(exp);
180         int rc;
181
182         ENTRY;
183
184         fid_build_reg_res_name(fid, &res_id);
185         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
186                                              policy, mode, flags, opaque);
187         RETURN(rc);
188 }
189
190 int mdc_null_inode(struct obd_export *exp,
191                    const struct lu_fid *fid)
192 {
193         struct ldlm_res_id res_id;
194         struct ldlm_resource *res;
195         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
196         ENTRY;
197
198         LASSERTF(ns != NULL, "no namespace passed\n");
199
200         fid_build_reg_res_name(fid, &res_id);
201
202         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
203         if(res == NULL)
204                 RETURN(0);
205
206         lock_res(res);
207         res->lr_lvb_inode = NULL;
208         unlock_res(res);
209
210         ldlm_resource_putref(res);
211         RETURN(0);
212 }
213
214 /* find any ldlm lock of the inode in mdc
215  * return 0    not find
216  *        1    find one
217  *      < 0    error */
218 int mdc_find_cbdata(struct obd_export *exp,
219                     const struct lu_fid *fid,
220                     ldlm_iterator_t it, void *data)
221 {
222         struct ldlm_res_id res_id;
223         int rc = 0;
224         ENTRY;
225
226         fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
227         rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
228                                    it, data);
229         if (rc == LDLM_ITER_STOP)
230                 RETURN(1);
231         else if (rc == LDLM_ITER_CONTINUE)
232                 RETURN(0);
233         RETURN(rc);
234 }
235
236 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
237 {
238         /* Don't hold error requests for replay. */
239         if (req->rq_replay) {
240                 spin_lock(&req->rq_lock);
241                 req->rq_replay = 0;
242                 spin_unlock(&req->rq_lock);
243         }
244         if (rc && req->rq_transno != 0) {
245                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
246                 LBUG();
247         }
248 }
249
250 /* Save a large LOV EA into the request buffer so that it is available
251  * for replay.  We don't do this in the initial request because the
252  * original request doesn't need this buffer (at most it sends just the
253  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
254  * buffer and may also be difficult to allocate and save a very large
255  * request buffer for each open. (bug 5707)
256  *
257  * OOM here may cause recovery failure if lmm is needed (only for the
258  * original open if the MDS crashed just when this client also OOM'd)
259  * but this is incredibly unlikely, and questionable whether the client
260  * could do MDS recovery under OOM anyways... */
261 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
262                                 struct mdt_body *body)
263 {
264         int     rc;
265
266         /* FIXME: remove this explicit offset. */
267         rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
268                                         body->eadatasize);
269         if (rc) {
270                 CERROR("Can't enlarge segment %d size to %d\n",
271                        DLM_INTENT_REC_OFF + 4, body->eadatasize);
272                 body->valid &= ~OBD_MD_FLEASIZE;
273                 body->eadatasize = 0;
274         }
275 }
276
277 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
278                                                    struct lookup_intent *it,
279                                                    struct md_op_data *op_data,
280                                                    void *lmm, int lmmsize,
281                                                    void *cb_data)
282 {
283         struct ptlrpc_request *req;
284         struct obd_device     *obddev = class_exp2obd(exp);
285         struct ldlm_intent    *lit;
286         CFS_LIST_HEAD(cancels);
287         int                    count = 0;
288         int                    mode;
289         int                    rc;
290         ENTRY;
291
292         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
293
294         /* XXX: openlock is not cancelled for cross-refs. */
295         /* If inode is known, cancel conflicting OPEN locks. */
296         if (fid_is_sane(&op_data->op_fid2)) {
297                 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
298                         mode = LCK_CW;
299 #ifdef FMODE_EXEC
300                 else if (it->it_flags & FMODE_EXEC)
301                         mode = LCK_PR;
302 #endif
303                 else
304                         mode = LCK_CR;
305                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
306                                                 &cancels, mode,
307                                                 MDS_INODELOCK_OPEN);
308         }
309
310         /* If CREATE, cancel parent's UPDATE lock. */
311         if (it->it_op & IT_CREAT)
312                 mode = LCK_EX;
313         else
314                 mode = LCK_CR;
315         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
316                                          &cancels, mode,
317                                          MDS_INODELOCK_UPDATE);
318
319         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
320                                    &RQF_LDLM_INTENT_OPEN);
321         if (req == NULL) {
322                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
323                 RETURN(ERR_PTR(-ENOMEM));
324         }
325
326         /* parent capability */
327         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
328         /* child capability, reserve the size according to parent capa, it will
329          * be filled after we get the reply */
330         mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
331
332         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
333                              op_data->op_namelen + 1);
334         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
335                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
336
337         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
338         if (rc) {
339                 ptlrpc_request_free(req);
340                 return NULL;
341         }
342
343         spin_lock(&req->rq_lock);
344         req->rq_replay = req->rq_import->imp_replayable;
345         spin_unlock(&req->rq_lock);
346
347         /* pack the intent */
348         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
349         lit->opc = (__u64)it->it_op;
350
351         /* pack the intended request */
352         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
353                       lmmsize);
354
355         /* for remote client, fetch remote perm for current user */
356         if (client_is_remote(exp))
357                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
358                                      sizeof(struct mdt_remote_perm));
359         ptlrpc_request_set_replen(req);
360         return req;
361 }
362
363 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
364                                                      struct lookup_intent *it,
365                                                      struct md_op_data *op_data)
366 {
367         struct ptlrpc_request *req;
368         struct obd_device     *obddev = class_exp2obd(exp);
369         struct ldlm_intent    *lit;
370         int                    rc;
371         ENTRY;
372
373         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
374                                    &RQF_LDLM_INTENT_UNLINK);
375         if (req == NULL)
376                 RETURN(ERR_PTR(-ENOMEM));
377
378         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
379         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
380                              op_data->op_namelen + 1);
381
382         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
383         if (rc) {
384                 ptlrpc_request_free(req);
385                 RETURN(ERR_PTR(rc));
386         }
387
388         /* pack the intent */
389         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
390         lit->opc = (__u64)it->it_op;
391
392         /* pack the intended request */
393         mdc_unlink_pack(req, op_data);
394
395         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
396                              obddev->u.cli.cl_max_mds_easize);
397         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
398                              obddev->u.cli.cl_max_mds_cookiesize);
399         ptlrpc_request_set_replen(req);
400         RETURN(req);
401 }
402
403 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
404                                                       struct lookup_intent *it,
405                                                       struct md_op_data *op_data)
406 {
407         struct ptlrpc_request *req;
408         struct obd_device     *obddev = class_exp2obd(exp);
409         obd_valid              valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
410                                        OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
411                                        OBD_MD_FLMDSCAPA | OBD_MD_MEA |
412                                        (client_is_remote(exp) ?
413                                                OBD_MD_FLRMTPERM : OBD_MD_FLACL);
414         struct ldlm_intent    *lit;
415         int                    rc;
416         ENTRY;
417
418         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
419                                    &RQF_LDLM_INTENT_GETATTR);
420         if (req == NULL)
421                 RETURN(ERR_PTR(-ENOMEM));
422
423         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
424         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
425                              op_data->op_namelen + 1);
426
427         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
428         if (rc) {
429                 ptlrpc_request_free(req);
430                 RETURN(ERR_PTR(rc));
431         }
432
433         /* pack the intent */
434         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
435         lit->opc = (__u64)it->it_op;
436
437         /* pack the intended request */
438         mdc_getattr_pack(req, valid, it->it_flags, op_data,
439                          obddev->u.cli.cl_max_mds_easize);
440
441         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
442                              obddev->u.cli.cl_max_mds_easize);
443         if (client_is_remote(exp))
444                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
445                                      sizeof(struct mdt_remote_perm));
446         ptlrpc_request_set_replen(req);
447         RETURN(req);
448 }
449
450 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
451                                                      struct lookup_intent *it,
452                                                      struct md_op_data *unused)
453 {
454         struct obd_device     *obd = class_exp2obd(exp);
455         struct ptlrpc_request *req;
456         struct ldlm_intent    *lit;
457         struct layout_intent  *layout;
458         int rc;
459         ENTRY;
460
461         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
462                                 &RQF_LDLM_INTENT_LAYOUT);
463         if (req == NULL)
464                 RETURN(ERR_PTR(-ENOMEM));
465
466         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
467         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
468         if (rc) {
469                 ptlrpc_request_free(req);
470                 RETURN(ERR_PTR(rc));
471         }
472
473         /* pack the intent */
474         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
475         lit->opc = (__u64)it->it_op;
476
477         /* pack the layout intent request */
478         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
479         /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
480          * set for replication */
481         layout->li_opc = LAYOUT_INTENT_ACCESS;
482
483         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
484                         obd->u.cli.cl_max_mds_easize);
485         ptlrpc_request_set_replen(req);
486         RETURN(req);
487 }
488
489 static struct ptlrpc_request *
490 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
491 {
492         struct ptlrpc_request *req;
493         int rc;
494         ENTRY;
495
496         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
497         if (req == NULL)
498                 RETURN(ERR_PTR(-ENOMEM));
499
500         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
501         if (rc) {
502                 ptlrpc_request_free(req);
503                 RETURN(ERR_PTR(rc));
504         }
505
506         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
507         ptlrpc_request_set_replen(req);
508         RETURN(req);
509 }
510
511 static int mdc_finish_enqueue(struct obd_export *exp,
512                               struct ptlrpc_request *req,
513                               struct ldlm_enqueue_info *einfo,
514                               struct lookup_intent *it,
515                               struct lustre_handle *lockh,
516                               int rc)
517 {
518         struct req_capsule  *pill = &req->rq_pill;
519         struct ldlm_request *lockreq;
520         struct ldlm_reply   *lockrep;
521         struct lustre_intent_data *intent = &it->d.lustre;
522         struct ldlm_lock    *lock;
523         void                *lvb_data = NULL;
524         int                  lvb_len = 0;
525         ENTRY;
526
527         LASSERT(rc >= 0);
528         /* Similarly, if we're going to replay this request, we don't want to
529          * actually get a lock, just perform the intent. */
530         if (req->rq_transno || req->rq_replay) {
531                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
532                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
533         }
534
535         if (rc == ELDLM_LOCK_ABORTED) {
536                 einfo->ei_mode = 0;
537                 memset(lockh, 0, sizeof(*lockh));
538                 rc = 0;
539         } else { /* rc = 0 */
540                 lock = ldlm_handle2lock(lockh);
541                 LASSERT(lock != NULL);
542
543                 /* If the server gave us back a different lock mode, we should
544                  * fix up our variables. */
545                 if (lock->l_req_mode != einfo->ei_mode) {
546                         ldlm_lock_addref(lockh, lock->l_req_mode);
547                         ldlm_lock_decref(lockh, einfo->ei_mode);
548                         einfo->ei_mode = lock->l_req_mode;
549                 }
550                 LDLM_LOCK_PUT(lock);
551         }
552
553         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
554         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
555
556         intent->it_disposition = (int)lockrep->lock_policy_res1;
557         intent->it_status = (int)lockrep->lock_policy_res2;
558         intent->it_lock_mode = einfo->ei_mode;
559         intent->it_lock_handle = lockh->cookie;
560         intent->it_data = req;
561
562         /* Technically speaking rq_transno must already be zero if
563          * it_status is in error, so the check is a bit redundant */
564         if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
565                 mdc_clear_replay_flag(req, intent->it_status);
566
567         /* If we're doing an IT_OPEN which did not result in an actual
568          * successful open, then we need to remove the bit which saves
569          * this request for unconditional replay.
570          *
571          * It's important that we do this first!  Otherwise we might exit the
572          * function without doing so, and try to replay a failed create
573          * (bug 3440) */
574         if (it->it_op & IT_OPEN && req->rq_replay &&
575             (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
576                 mdc_clear_replay_flag(req, intent->it_status);
577
578         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
579                   it->it_op, intent->it_disposition, intent->it_status);
580
581         /* We know what to expect, so we do any byte flipping required here */
582         if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
583                 struct mdt_body *body;
584
585                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
586                 if (body == NULL) {
587                         CERROR ("Can't swab mdt_body\n");
588                         RETURN (-EPROTO);
589                 }
590
591                 if (it_disposition(it, DISP_OPEN_OPEN) &&
592                     !it_open_error(DISP_OPEN_OPEN, it)) {
593                         /*
594                          * If this is a successful OPEN request, we need to set
595                          * replay handler and data early, so that if replay
596                          * happens immediately after swabbing below, new reply
597                          * is swabbed by that handler correctly.
598                          */
599                         mdc_set_open_replay_data(NULL, NULL, req);
600                 }
601
602                 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
603                         void *eadata;
604
605                         mdc_update_max_ea_from_body(exp, body);
606
607                         /*
608                          * The eadata is opaque; just check that it is there.
609                          * Eventually, obd_unpackmd() will check the contents.
610                          */
611                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
612                                                               body->eadatasize);
613                         if (eadata == NULL)
614                                 RETURN(-EPROTO);
615
616                         /* save lvb data and length in case this is for layout
617                          * lock */
618                         lvb_data = eadata;
619                         lvb_len = body->eadatasize;
620
621                         /*
622                          * We save the reply LOV EA in case we have to replay a
623                          * create for recovery.  If we didn't allocate a large
624                          * enough request buffer above we need to reallocate it
625                          * here to hold the actual LOV EA.
626                          *
627                          * To not save LOV EA if request is not going to replay
628                          * (for example error one).
629                          */
630                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
631                                 void *lmm;
632                                 if (req_capsule_get_size(pill, &RMF_EADATA,
633                                                          RCL_CLIENT) <
634                                     body->eadatasize)
635                                         mdc_realloc_openmsg(req, body);
636                                 else
637                                         req_capsule_shrink(pill, &RMF_EADATA,
638                                                            body->eadatasize,
639                                                            RCL_CLIENT);
640
641                                 req_capsule_set_size(pill, &RMF_EADATA,
642                                                      RCL_CLIENT,
643                                                      body->eadatasize);
644
645                                 lmm = req_capsule_client_get(pill, &RMF_EADATA);
646                                 if (lmm)
647                                         memcpy(lmm, eadata, body->eadatasize);
648                         }
649                 }
650
651                 if (body->valid & OBD_MD_FLRMTPERM) {
652                         struct mdt_remote_perm *perm;
653
654                         LASSERT(client_is_remote(exp));
655                         perm = req_capsule_server_swab_get(pill, &RMF_ACL,
656                                                 lustre_swab_mdt_remote_perm);
657                         if (perm == NULL)
658                                 RETURN(-EPROTO);
659                 }
660                 if (body->valid & OBD_MD_FLMDSCAPA) {
661                         struct lustre_capa *capa, *p;
662
663                         capa = req_capsule_server_get(pill, &RMF_CAPA1);
664                         if (capa == NULL)
665                                 RETURN(-EPROTO);
666
667                         if (it->it_op & IT_OPEN) {
668                                 /* client fid capa will be checked in replay */
669                                 p = req_capsule_client_get(pill, &RMF_CAPA2);
670                                 LASSERT(p);
671                                 *p = *capa;
672                         }
673                 }
674                 if (body->valid & OBD_MD_FLOSSCAPA) {
675                         struct lustre_capa *capa;
676
677                         capa = req_capsule_server_get(pill, &RMF_CAPA2);
678                         if (capa == NULL)
679                                 RETURN(-EPROTO);
680                 }
681         } else if (it->it_op & IT_LAYOUT) {
682                 /* maybe the lock was granted right away and layout
683                  * is packed into RMF_DLM_LVB of req */
684                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
685                 if (lvb_len > 0) {
686                         lvb_data = req_capsule_server_sized_get(pill,
687                                                         &RMF_DLM_LVB, lvb_len);
688                         if (lvb_data == NULL)
689                                 RETURN(-EPROTO);
690                 }
691         }
692
693         /* fill in stripe data for layout lock */
694         lock = ldlm_handle2lock(lockh);
695         if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
696                 void *lmm;
697
698                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
699                         ldlm_it2str(it->it_op), lvb_len);
700
701                 OBD_ALLOC_LARGE(lmm, lvb_len);
702                 if (lmm == NULL) {
703                         LDLM_LOCK_PUT(lock);
704                         RETURN(-ENOMEM);
705                 }
706                 memcpy(lmm, lvb_data, lvb_len);
707
708                 /* install lvb_data */
709                 lock_res_and_lock(lock);
710                 if (lock->l_lvb_data == NULL) {
711                         lock->l_lvb_data = lmm;
712                         lock->l_lvb_len = lvb_len;
713                         lmm = NULL;
714                 }
715                 unlock_res_and_lock(lock);
716                 if (lmm != NULL)
717                         OBD_FREE_LARGE(lmm, lvb_len);
718         }
719         if (lock != NULL)
720                 LDLM_LOCK_PUT(lock);
721
722         RETURN(rc);
723 }
724
725 /* We always reserve enough space in the reply packet for a stripe MD, because
726  * we don't know in advance the file type. */
727 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
728                 struct lookup_intent *it, struct md_op_data *op_data,
729                 struct lustre_handle *lockh, void *lmm, int lmmsize,
730                 struct ptlrpc_request **reqp, __u64 extra_lock_flags)
731 {
732         struct obd_device     *obddev = class_exp2obd(exp);
733         struct ptlrpc_request *req = NULL;
734         __u64                  flags, saved_flags = extra_lock_flags;
735         int                    rc;
736         struct ldlm_res_id res_id;
737         static const ldlm_policy_data_t lookup_policy =
738                             { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
739         static const ldlm_policy_data_t update_policy =
740                             { .l_inodebits = { MDS_INODELOCK_UPDATE } };
741         static const ldlm_policy_data_t layout_policy =
742                             { .l_inodebits = { MDS_INODELOCK_LAYOUT } };
743         ldlm_policy_data_t const *policy = &lookup_policy;
744         int                    generation, resends = 0;
745         struct ldlm_reply     *lockrep;
746         enum lvb_type          lvb_type = 0;
747         ENTRY;
748
749         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
750                  einfo->ei_type);
751
752         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
753
754         if (it) {
755                 saved_flags |= LDLM_FL_HAS_INTENT;
756                 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
757                         policy = &update_policy;
758                 else if (it->it_op & IT_LAYOUT)
759                         policy = &layout_policy;
760         }
761
762         LASSERT(reqp == NULL);
763
764         generation = obddev->u.cli.cl_import->imp_generation;
765 resend:
766         flags = saved_flags;
767         if (!it) {
768                 /* The only way right now is FLOCK, in this case we hide flock
769                    policy as lmm, but lmmsize is 0 */
770                 LASSERT(lmm && lmmsize == 0);
771                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
772                          einfo->ei_type);
773                 policy = (ldlm_policy_data_t *)lmm;
774                 res_id.name[3] = LDLM_FLOCK;
775         } else if (it->it_op & IT_OPEN) {
776                 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
777                                            einfo->ei_cbdata);
778                 policy = &update_policy;
779                 einfo->ei_cbdata = NULL;
780                 lmm = NULL;
781         } else if (it->it_op & IT_UNLINK) {
782                 req = mdc_intent_unlink_pack(exp, it, op_data);
783         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
784                 req = mdc_intent_getattr_pack(exp, it, op_data);
785         } else if (it->it_op & IT_READDIR) {
786                 req = mdc_enqueue_pack(exp, 0);
787         } else if (it->it_op & IT_LAYOUT) {
788                 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
789                         RETURN(-EOPNOTSUPP);
790
791                 req = mdc_intent_layout_pack(exp, it, op_data);
792                 lvb_type = LVB_T_LAYOUT;
793         } else {
794                 LBUG();
795                 RETURN(-EINVAL);
796         }
797
798         if (IS_ERR(req))
799                 RETURN(PTR_ERR(req));
800
801         if (req != NULL && it && it->it_op & IT_CREAT)
802                 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
803                  * retry logic */
804                 req->rq_no_retry_einprogress = 1;
805
806         if (resends) {
807                 req->rq_generation_set = 1;
808                 req->rq_import_generation = generation;
809                 req->rq_sent = cfs_time_current_sec() + resends;
810         }
811
812         /* It is important to obtain rpc_lock first (if applicable), so that
813          * threads that are serialised with rpc_lock are not polluting our
814          * rpcs in flight counter. We do not do flock request limiting, though*/
815         if (it) {
816                 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
817                 rc = mdc_enter_request(&obddev->u.cli);
818                 if (rc != 0) {
819                         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
820                         mdc_clear_replay_flag(req, 0);
821                         ptlrpc_req_finished(req);
822                         RETURN(rc);
823                 }
824         }
825
826         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
827                               0, lvb_type, lockh, 0);
828         if (!it) {
829                 /* For flock requests we immediatelly return without further
830                    delay and let caller deal with the rest, since rest of
831                    this function metadata processing makes no sense for flock
832                    requests anyway */
833                 RETURN(rc);
834         }
835
836         mdc_exit_request(&obddev->u.cli);
837         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
838
839         if (rc < 0) {
840                 CERROR("ldlm_cli_enqueue: %d\n", rc);
841                 mdc_clear_replay_flag(req, rc);
842                 ptlrpc_req_finished(req);
843                 RETURN(rc);
844         }
845
846         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
847         LASSERT(lockrep != NULL);
848
849         /* Retry the create infinitely when we get -EINPROGRESS from
850          * server. This is required by the new quota design. */
851         if (it && it->it_op & IT_CREAT &&
852             (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
853                 mdc_clear_replay_flag(req, rc);
854                 ptlrpc_req_finished(req);
855                 resends++;
856
857                 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
858                        obddev->obd_name, resends, it->it_op,
859                        PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
860
861                 if (generation == obddev->u.cli.cl_import->imp_generation) {
862                         goto resend;
863                 } else {
864                         CDEBUG(D_HA, "resend cross eviction\n");
865                         RETURN(-EIO);
866                 }
867         }
868
869         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
870         if (rc < 0) {
871                 if (lustre_handle_is_used(lockh)) {
872                         ldlm_lock_decref(lockh, einfo->ei_mode);
873                         memset(lockh, 0, sizeof(*lockh));
874                 }
875                 ptlrpc_req_finished(req);
876         }
877         RETURN(rc);
878 }
879
880 static int mdc_finish_intent_lock(struct obd_export *exp,
881                                   struct ptlrpc_request *request,
882                                   struct md_op_data *op_data,
883                                   struct lookup_intent *it,
884                                   struct lustre_handle *lockh)
885 {
886         struct lustre_handle old_lock;
887         struct mdt_body *mdt_body;
888         struct ldlm_lock *lock;
889         int rc;
890
891
892         LASSERT(request != NULL);
893         LASSERT(request != LP_POISON);
894         LASSERT(request->rq_repmsg != LP_POISON);
895
896         if (!it_disposition(it, DISP_IT_EXECD)) {
897                 /* The server failed before it even started executing the
898                  * intent, i.e. because it couldn't unpack the request. */
899                 LASSERT(it->d.lustre.it_status != 0);
900                 RETURN(it->d.lustre.it_status);
901         }
902         rc = it_open_error(DISP_IT_EXECD, it);
903         if (rc)
904                 RETURN(rc);
905
906         mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
907         LASSERT(mdt_body != NULL);      /* mdc_enqueue checked */
908
909         /* If we were revalidating a fid/name pair, mark the intent in
910          * case we fail and get called again from lookup */
911         if (fid_is_sane(&op_data->op_fid2) &&
912             it->it_create_mode & M_CHECK_STALE &&
913             it->it_op != IT_GETATTR) {
914                 it_set_disposition(it, DISP_ENQ_COMPLETE);
915
916                 /* Also: did we find the same inode? */
917                 /* sever can return one of two fids:
918                  * op_fid2 - new allocated fid - if file is created.
919                  * op_fid3 - existent fid - if file only open.
920                  * op_fid3 is saved in lmv_intent_open */
921                 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
922                     (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
923                         CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
924                                "\n", PFID(&op_data->op_fid2),
925                                PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
926                         RETURN(-ESTALE);
927                 }
928         }
929
930         rc = it_open_error(DISP_LOOKUP_EXECD, it);
931         if (rc)
932                 RETURN(rc);
933
934         /* keep requests around for the multiple phases of the call
935          * this shows the DISP_XX must guarantee we make it into the call
936          */
937         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
938             it_disposition(it, DISP_OPEN_CREATE) &&
939             !it_open_error(DISP_OPEN_CREATE, it)) {
940                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
941                 ptlrpc_request_addref(request); /* balanced in ll_create_node */
942         }
943         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
944             it_disposition(it, DISP_OPEN_OPEN) &&
945             !it_open_error(DISP_OPEN_OPEN, it)) {
946                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
947                 ptlrpc_request_addref(request); /* balanced in ll_file_open */
948                 /* BUG 11546 - eviction in the middle of open rpc processing */
949                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
950         }
951
952         if (it->it_op & IT_CREAT) {
953                 /* XXX this belongs in ll_create_it */
954         } else if (it->it_op == IT_OPEN) {
955                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
956         } else {
957                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
958         }
959
960         /* If we already have a matching lock, then cancel the new
961          * one.  We have to set the data here instead of in
962          * mdc_enqueue, because we need to use the child's inode as
963          * the l_ast_data to match, and that's not available until
964          * intent_finish has performed the iget().) */
965         lock = ldlm_handle2lock(lockh);
966         if (lock) {
967                 ldlm_policy_data_t policy = lock->l_policy_data;
968                 LDLM_DEBUG(lock, "matching against this");
969
970                 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
971                                          &lock->l_resource->lr_name),
972                          "Lock res_id: %lu/%lu/%lu, fid: %lu/%lu/%lu.\n",
973                          (unsigned long)lock->l_resource->lr_name.name[0],
974                          (unsigned long)lock->l_resource->lr_name.name[1],
975                          (unsigned long)lock->l_resource->lr_name.name[2],
976                          (unsigned long)fid_seq(&mdt_body->fid1),
977                          (unsigned long)fid_oid(&mdt_body->fid1),
978                          (unsigned long)fid_ver(&mdt_body->fid1));
979                 LDLM_LOCK_PUT(lock);
980
981                 memcpy(&old_lock, lockh, sizeof(*lockh));
982                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
983                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
984                         ldlm_lock_decref_and_cancel(lockh,
985                                                     it->d.lustre.it_lock_mode);
986                         memcpy(lockh, &old_lock, sizeof(old_lock));
987                         it->d.lustre.it_lock_handle = lockh->cookie;
988                 }
989         }
990         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
991                op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
992                it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
993         RETURN(rc);
994 }
995
996 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
997                         struct lu_fid *fid, __u64 *bits)
998 {
999         /* We could just return 1 immediately, but since we should only
1000          * be called in revalidate_it if we already have a lock, let's
1001          * verify that. */
1002         struct ldlm_res_id res_id;
1003         struct lustre_handle lockh;
1004         ldlm_policy_data_t policy;
1005         ldlm_mode_t mode;
1006         ENTRY;
1007
1008         if (it->d.lustre.it_lock_handle) {
1009                 lockh.cookie = it->d.lustre.it_lock_handle;
1010                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1011         } else {
1012                 fid_build_reg_res_name(fid, &res_id);
1013                 switch (it->it_op) {
1014                 case IT_GETATTR:
1015                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1016                         break;
1017                 case IT_LAYOUT:
1018                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1019                         break;
1020                 default:
1021                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1022                         break;
1023                 }
1024                 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
1025                                        LDLM_FL_BLOCK_GRANTED, &res_id,
1026                                        LDLM_IBITS, &policy,
1027                                        LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0);
1028         }
1029
1030         if (mode) {
1031                 it->d.lustre.it_lock_handle = lockh.cookie;
1032                 it->d.lustre.it_lock_mode = mode;
1033         } else {
1034                 it->d.lustre.it_lock_handle = 0;
1035                 it->d.lustre.it_lock_mode = 0;
1036         }
1037
1038         RETURN(!!mode);
1039 }
1040
1041 /*
1042  * This long block is all about fixing up the lock and request state
1043  * so that it is correct as of the moment _before_ the operation was
1044  * applied; that way, the VFS will think that everything is normal and
1045  * call Lustre's regular VFS methods.
1046  *
1047  * If we're performing a creation, that means that unless the creation
1048  * failed with EEXIST, we should fake up a negative dentry.
1049  *
1050  * For everything else, we want to lookup to succeed.
1051  *
1052  * One additional note: if CREATE or OPEN succeeded, we add an extra
1053  * reference to the request because we need to keep it around until
1054  * ll_create/ll_open gets called.
1055  *
1056  * The server will return to us, in it_disposition, an indication of
1057  * exactly what d.lustre.it_status refers to.
1058  *
1059  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1060  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1061  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1062  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1063  * was successful.
1064  *
1065  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1066  * child lookup.
1067  */
1068 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1069                     void *lmm, int lmmsize, struct lookup_intent *it,
1070                     int lookup_flags, struct ptlrpc_request **reqp,
1071                     ldlm_blocking_callback cb_blocking,
1072                     __u64 extra_lock_flags)
1073 {
1074         struct lustre_handle lockh;
1075         int rc = 0;
1076         ENTRY;
1077         LASSERT(it);
1078
1079         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1080                ", intent: %s flags %#o\n", op_data->op_namelen,
1081                op_data->op_name, PFID(&op_data->op_fid2),
1082                PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1083                it->it_flags);
1084
1085         lockh.cookie = 0;
1086         if (fid_is_sane(&op_data->op_fid2) &&
1087             (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
1088                 /* We could just return 1 immediately, but since we should only
1089                  * be called in revalidate_it if we already have a lock, let's
1090                  * verify that. */
1091                 it->d.lustre.it_lock_handle = 0;
1092                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1093                 /* Only return failure if it was not GETATTR by cfid
1094                    (from inode_revalidate) */
1095                 if (rc || op_data->op_namelen != 0)
1096                         RETURN(rc);
1097         }
1098
1099         /* lookup_it may be called only after revalidate_it has run, because
1100          * revalidate_it cannot return errors, only zero.  Returning zero causes
1101          * this call to lookup, which *can* return an error.
1102          *
1103          * We only want to execute the request associated with the intent one
1104          * time, however, so don't send the request again.  Instead, skip past
1105          * this and use the request from revalidate.  In this case, revalidate
1106          * never dropped its reference, so the refcounts are all OK */
1107         if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
1108                 struct ldlm_enqueue_info einfo =
1109                         { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
1110                           ldlm_completion_ast, NULL, NULL, NULL };
1111
1112                 /* For case if upper layer did not alloc fid, do it now. */
1113                 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1114                         rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
1115                         if (rc < 0) {
1116                                 CERROR("Can't alloc new fid, rc %d\n", rc);
1117                                 RETURN(rc);
1118                         }
1119                 }
1120                 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
1121                                  lmm, lmmsize, NULL, extra_lock_flags);
1122                 if (rc < 0)
1123                         RETURN(rc);
1124         } else if (!fid_is_sane(&op_data->op_fid2) ||
1125                    !(it->it_create_mode & M_CHECK_STALE)) {
1126                 /* DISP_ENQ_COMPLETE set means there is extra reference on
1127                  * request referenced from this intent, saved for subsequent
1128                  * lookup.  This path is executed when we proceed to this
1129                  * lookup, so we clear DISP_ENQ_COMPLETE */
1130                 it_clear_disposition(it, DISP_ENQ_COMPLETE);
1131         }
1132         *reqp = it->d.lustre.it_data;
1133         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1134         RETURN(rc);
1135 }
1136
1137 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1138                                               struct ptlrpc_request *req,
1139                                               void *args, int rc)
1140 {
1141         struct mdc_getattr_args  *ga = args;
1142         struct obd_export        *exp = ga->ga_exp;
1143         struct md_enqueue_info   *minfo = ga->ga_minfo;
1144         struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1145         struct lookup_intent     *it;
1146         struct lustre_handle     *lockh;
1147         struct obd_device        *obddev;
1148         __u64                     flags = LDLM_FL_HAS_INTENT;
1149         ENTRY;
1150
1151         it    = &minfo->mi_it;
1152         lockh = &minfo->mi_lockh;
1153
1154         obddev = class_exp2obd(exp);
1155
1156         mdc_exit_request(&obddev->u.cli);
1157         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1158                 rc = -ETIMEDOUT;
1159
1160         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1161                                    &flags, NULL, 0, lockh, rc);
1162         if (rc < 0) {
1163                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1164                 mdc_clear_replay_flag(req, rc);
1165                 GOTO(out, rc);
1166         }
1167
1168         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1169         if (rc)
1170                 GOTO(out, rc);
1171
1172         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1173         EXIT;
1174
1175 out:
1176         OBD_FREE_PTR(einfo);
1177         minfo->mi_cb(req, minfo, rc);
1178         return 0;
1179 }
1180
1181 int mdc_intent_getattr_async(struct obd_export *exp,
1182                              struct md_enqueue_info *minfo,
1183                              struct ldlm_enqueue_info *einfo)
1184 {
1185         struct md_op_data       *op_data = &minfo->mi_data;
1186         struct lookup_intent    *it = &minfo->mi_it;
1187         struct ptlrpc_request   *req;
1188         struct mdc_getattr_args *ga;
1189         struct obd_device       *obddev = class_exp2obd(exp);
1190         struct ldlm_res_id       res_id;
1191         /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1192          *     for statahead currently. Consider CMD in future, such two bits
1193          *     maybe managed by different MDS, should be adjusted then. */
1194         ldlm_policy_data_t       policy = {
1195                                         .l_inodebits = { MDS_INODELOCK_LOOKUP | 
1196                                                          MDS_INODELOCK_UPDATE }
1197                                  };
1198         int                      rc = 0;
1199         __u64                    flags = LDLM_FL_HAS_INTENT;
1200         ENTRY;
1201
1202         CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#o\n",
1203                op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1204                ldlm_it2str(it->it_op), it->it_flags);
1205
1206         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1207         req = mdc_intent_getattr_pack(exp, it, op_data);
1208         if (!req)
1209                 RETURN(-ENOMEM);
1210
1211         rc = mdc_enter_request(&obddev->u.cli);
1212         if (rc != 0) {
1213                 ptlrpc_req_finished(req);
1214                 RETURN(rc);
1215         }
1216
1217         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1218                               0, LVB_T_NONE, &minfo->mi_lockh, 1);
1219         if (rc < 0) {
1220                 mdc_exit_request(&obddev->u.cli);
1221                 ptlrpc_req_finished(req);
1222                 RETURN(rc);
1223         }
1224
1225         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1226         ga = ptlrpc_req_async_args(req);
1227         ga->ga_exp = exp;
1228         ga->ga_minfo = minfo;
1229         ga->ga_einfo = einfo;
1230
1231         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1232         ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
1233
1234         RETURN(0);
1235 }