Whamcloud - gitweb
LU-2901 ldlm: fix resource/fid check, use DLDLMRES
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_MDC
38
39 #ifdef __KERNEL__
40 # include <linux/module.h>
41 # include <linux/pagemap.h>
42 # include <linux/miscdevice.h>
43 # include <linux/init.h>
44 #else
45 # include <liblustre.h>
46 #endif
47
48 #include <lustre_acl.h>
49 #include <obd_class.h>
50 #include <lustre_dlm.h>
51 /* fid_res_name_eq() */
52 #include <lustre_fid.h>
53 #include <lprocfs_status.h>
54 #include "mdc_internal.h"
55
56 struct mdc_getattr_args {
57         struct obd_export           *ga_exp;
58         struct md_enqueue_info      *ga_minfo;
59         struct ldlm_enqueue_info    *ga_einfo;
60 };
61
62 int it_disposition(struct lookup_intent *it, int flag)
63 {
64         return it->d.lustre.it_disposition & flag;
65 }
66 EXPORT_SYMBOL(it_disposition);
67
68 void it_set_disposition(struct lookup_intent *it, int flag)
69 {
70         it->d.lustre.it_disposition |= flag;
71 }
72 EXPORT_SYMBOL(it_set_disposition);
73
74 void it_clear_disposition(struct lookup_intent *it, int flag)
75 {
76         it->d.lustre.it_disposition &= ~flag;
77 }
78 EXPORT_SYMBOL(it_clear_disposition);
79
80 int it_open_error(int phase, struct lookup_intent *it)
81 {
82         if (it_disposition(it, DISP_OPEN_LEASE)) {
83                 if (phase >= DISP_OPEN_LEASE)
84                         return it->d.lustre.it_status;
85                 else
86                         return 0;
87         }
88         if (it_disposition(it, DISP_OPEN_OPEN)) {
89                 if (phase >= DISP_OPEN_OPEN)
90                         return it->d.lustre.it_status;
91                 else
92                         return 0;
93         }
94
95         if (it_disposition(it, DISP_OPEN_CREATE)) {
96                 if (phase >= DISP_OPEN_CREATE)
97                         return it->d.lustre.it_status;
98                 else
99                         return 0;
100         }
101
102         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
103                 if (phase >= DISP_LOOKUP_EXECD)
104                         return it->d.lustre.it_status;
105                 else
106                         return 0;
107         }
108
109         if (it_disposition(it, DISP_IT_EXECD)) {
110                 if (phase >= DISP_IT_EXECD)
111                         return it->d.lustre.it_status;
112                 else
113                         return 0;
114         }
115         CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
116                it->d.lustre.it_status);
117         LBUG();
118         return 0;
119 }
120 EXPORT_SYMBOL(it_open_error);
121
122 /* this must be called on a lockh that is known to have a referenced lock */
123 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
124                       __u64 *bits)
125 {
126         struct ldlm_lock *lock;
127         struct inode *new_inode = data;
128         ENTRY;
129
130         if(bits)
131                 *bits = 0;
132
133         if (!*lockh)
134                 RETURN(0);
135
136         lock = ldlm_handle2lock((struct lustre_handle *)lockh);
137
138         LASSERT(lock != NULL);
139         lock_res_and_lock(lock);
140 #ifdef __KERNEL__
141         if (lock->l_resource->lr_lvb_inode &&
142             lock->l_resource->lr_lvb_inode != data) {
143                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
144                 LASSERTF(old_inode->i_state & I_FREEING,
145                          "Found existing inode %p/%lu/%u state %lu in lock: "
146                          "setting data to %p/%lu/%u\n", old_inode,
147                          old_inode->i_ino, old_inode->i_generation,
148                          old_inode->i_state,
149                          new_inode, new_inode->i_ino, new_inode->i_generation);
150         }
151 #endif
152         lock->l_resource->lr_lvb_inode = new_inode;
153         if (bits)
154                 *bits = lock->l_policy_data.l_inodebits.bits;
155
156         unlock_res_and_lock(lock);
157         LDLM_LOCK_PUT(lock);
158
159         RETURN(0);
160 }
161
162 ldlm_mode_t mdc_lock_match(struct obd_export *exp, __u64 flags,
163                            const struct lu_fid *fid, ldlm_type_t type,
164                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
165                            struct lustre_handle *lockh)
166 {
167         struct ldlm_res_id res_id;
168         ldlm_mode_t rc;
169         ENTRY;
170
171         fid_build_reg_res_name(fid, &res_id);
172         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
173                              &res_id, type, policy, mode, lockh, 0);
174         RETURN(rc);
175 }
176
177 int mdc_cancel_unused(struct obd_export *exp,
178                       const struct lu_fid *fid,
179                       ldlm_policy_data_t *policy,
180                       ldlm_mode_t mode,
181                       ldlm_cancel_flags_t flags,
182                       void *opaque)
183 {
184         struct ldlm_res_id res_id;
185         struct obd_device *obd = class_exp2obd(exp);
186         int rc;
187
188         ENTRY;
189
190         fid_build_reg_res_name(fid, &res_id);
191         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
192                                              policy, mode, flags, opaque);
193         RETURN(rc);
194 }
195
196 int mdc_null_inode(struct obd_export *exp,
197                    const struct lu_fid *fid)
198 {
199         struct ldlm_res_id res_id;
200         struct ldlm_resource *res;
201         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
202         ENTRY;
203
204         LASSERTF(ns != NULL, "no namespace passed\n");
205
206         fid_build_reg_res_name(fid, &res_id);
207
208         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
209         if(res == NULL)
210                 RETURN(0);
211
212         lock_res(res);
213         res->lr_lvb_inode = NULL;
214         unlock_res(res);
215
216         ldlm_resource_putref(res);
217         RETURN(0);
218 }
219
220 /* find any ldlm lock of the inode in mdc
221  * return 0    not find
222  *        1    find one
223  *      < 0    error */
224 int mdc_find_cbdata(struct obd_export *exp,
225                     const struct lu_fid *fid,
226                     ldlm_iterator_t it, void *data)
227 {
228         struct ldlm_res_id res_id;
229         int rc = 0;
230         ENTRY;
231
232         fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
233         rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
234                                    it, data);
235         if (rc == LDLM_ITER_STOP)
236                 RETURN(1);
237         else if (rc == LDLM_ITER_CONTINUE)
238                 RETURN(0);
239         RETURN(rc);
240 }
241
242 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
243 {
244         /* Don't hold error requests for replay. */
245         if (req->rq_replay) {
246                 spin_lock(&req->rq_lock);
247                 req->rq_replay = 0;
248                 spin_unlock(&req->rq_lock);
249         }
250         if (rc && req->rq_transno != 0) {
251                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
252                 LBUG();
253         }
254 }
255
256 /* Save a large LOV EA into the request buffer so that it is available
257  * for replay.  We don't do this in the initial request because the
258  * original request doesn't need this buffer (at most it sends just the
259  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
260  * buffer and may also be difficult to allocate and save a very large
261  * request buffer for each open. (bug 5707)
262  *
263  * OOM here may cause recovery failure if lmm is needed (only for the
264  * original open if the MDS crashed just when this client also OOM'd)
265  * but this is incredibly unlikely, and questionable whether the client
266  * could do MDS recovery under OOM anyways... */
267 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
268                                 struct mdt_body *body)
269 {
270         int     rc;
271
272         /* FIXME: remove this explicit offset. */
273         rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
274                                         body->eadatasize);
275         if (rc) {
276                 CERROR("Can't enlarge segment %d size to %d\n",
277                        DLM_INTENT_REC_OFF + 4, body->eadatasize);
278                 body->valid &= ~OBD_MD_FLEASIZE;
279                 body->eadatasize = 0;
280         }
281 }
282
283 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
284                                                    struct lookup_intent *it,
285                                                    struct md_op_data *op_data,
286                                                    void *lmm, int lmmsize,
287                                                    void *cb_data)
288 {
289         struct ptlrpc_request *req;
290         struct obd_device     *obddev = class_exp2obd(exp);
291         struct ldlm_intent    *lit;
292         CFS_LIST_HEAD(cancels);
293         int                    count = 0;
294         int                    mode;
295         int                    rc;
296         ENTRY;
297
298         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
299
300         /* XXX: openlock is not cancelled for cross-refs. */
301         /* If inode is known, cancel conflicting OPEN locks. */
302         if (fid_is_sane(&op_data->op_fid2)) {
303                 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
304                         if (it->it_flags & FMODE_WRITE)
305                                 mode = LCK_EX;
306                         else
307                                 mode = LCK_PR;
308                 } else {
309                         if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
310                                 mode = LCK_CW;
311 #ifdef FMODE_EXEC
312                         else if (it->it_flags & FMODE_EXEC)
313                                 mode = LCK_PR;
314 #endif
315                         else
316                                 mode = LCK_CR;
317                 }
318                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
319                                                 &cancels, mode,
320                                                 MDS_INODELOCK_OPEN);
321         }
322
323         /* If CREATE, cancel parent's UPDATE lock. */
324         if (it->it_op & IT_CREAT)
325                 mode = LCK_EX;
326         else
327                 mode = LCK_CR;
328         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
329                                          &cancels, mode,
330                                          MDS_INODELOCK_UPDATE);
331
332         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
333                                    &RQF_LDLM_INTENT_OPEN);
334         if (req == NULL) {
335                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
336                 RETURN(ERR_PTR(-ENOMEM));
337         }
338
339         /* parent capability */
340         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
341         /* child capability, reserve the size according to parent capa, it will
342          * be filled after we get the reply */
343         mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
344
345         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
346                              op_data->op_namelen + 1);
347         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
348                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
349
350         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
351         if (rc) {
352                 ptlrpc_request_free(req);
353                 return NULL;
354         }
355
356         spin_lock(&req->rq_lock);
357         req->rq_replay = req->rq_import->imp_replayable;
358         spin_unlock(&req->rq_lock);
359
360         /* pack the intent */
361         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
362         lit->opc = (__u64)it->it_op;
363
364         /* pack the intended request */
365         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
366                       lmmsize);
367
368         /* for remote client, fetch remote perm for current user */
369         if (client_is_remote(exp))
370                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
371                                      sizeof(struct mdt_remote_perm));
372         ptlrpc_request_set_replen(req);
373         return req;
374 }
375
376 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
377                                                      struct lookup_intent *it,
378                                                      struct md_op_data *op_data)
379 {
380         struct ptlrpc_request *req;
381         struct obd_device     *obddev = class_exp2obd(exp);
382         struct ldlm_intent    *lit;
383         int                    rc;
384         ENTRY;
385
386         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
387                                    &RQF_LDLM_INTENT_UNLINK);
388         if (req == NULL)
389                 RETURN(ERR_PTR(-ENOMEM));
390
391         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
392         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
393                              op_data->op_namelen + 1);
394
395         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
396         if (rc) {
397                 ptlrpc_request_free(req);
398                 RETURN(ERR_PTR(rc));
399         }
400
401         /* pack the intent */
402         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
403         lit->opc = (__u64)it->it_op;
404
405         /* pack the intended request */
406         mdc_unlink_pack(req, op_data);
407
408         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
409                              obddev->u.cli.cl_max_mds_easize);
410         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
411                              obddev->u.cli.cl_max_mds_cookiesize);
412         ptlrpc_request_set_replen(req);
413         RETURN(req);
414 }
415
416 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
417                                                       struct lookup_intent *it,
418                                                       struct md_op_data *op_data)
419 {
420         struct ptlrpc_request *req;
421         struct obd_device     *obddev = class_exp2obd(exp);
422         obd_valid              valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
423                                        OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
424                                        OBD_MD_FLMDSCAPA | OBD_MD_MEA |
425                                        (client_is_remote(exp) ?
426                                                OBD_MD_FLRMTPERM : OBD_MD_FLACL);
427         struct ldlm_intent    *lit;
428         int                    rc;
429         ENTRY;
430
431         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
432                                    &RQF_LDLM_INTENT_GETATTR);
433         if (req == NULL)
434                 RETURN(ERR_PTR(-ENOMEM));
435
436         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
437         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
438                              op_data->op_namelen + 1);
439
440         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
441         if (rc) {
442                 ptlrpc_request_free(req);
443                 RETURN(ERR_PTR(rc));
444         }
445
446         /* pack the intent */
447         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
448         lit->opc = (__u64)it->it_op;
449
450         /* pack the intended request */
451         mdc_getattr_pack(req, valid, it->it_flags, op_data,
452                          obddev->u.cli.cl_max_mds_easize);
453
454         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
455                              obddev->u.cli.cl_max_mds_easize);
456         if (client_is_remote(exp))
457                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
458                                      sizeof(struct mdt_remote_perm));
459         ptlrpc_request_set_replen(req);
460         RETURN(req);
461 }
462
463 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
464                                                      struct lookup_intent *it,
465                                                      struct md_op_data *unused)
466 {
467         struct obd_device     *obd = class_exp2obd(exp);
468         struct ptlrpc_request *req;
469         struct ldlm_intent    *lit;
470         struct layout_intent  *layout;
471         int rc;
472         ENTRY;
473
474         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
475                                 &RQF_LDLM_INTENT_LAYOUT);
476         if (req == NULL)
477                 RETURN(ERR_PTR(-ENOMEM));
478
479         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
480         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
481         if (rc) {
482                 ptlrpc_request_free(req);
483                 RETURN(ERR_PTR(rc));
484         }
485
486         /* pack the intent */
487         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
488         lit->opc = (__u64)it->it_op;
489
490         /* pack the layout intent request */
491         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
492         /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
493          * set for replication */
494         layout->li_opc = LAYOUT_INTENT_ACCESS;
495
496         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
497                         obd->u.cli.cl_max_mds_easize);
498         ptlrpc_request_set_replen(req);
499         RETURN(req);
500 }
501
502 static struct ptlrpc_request *
503 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
504 {
505         struct ptlrpc_request *req;
506         int rc;
507         ENTRY;
508
509         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
510         if (req == NULL)
511                 RETURN(ERR_PTR(-ENOMEM));
512
513         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
514         if (rc) {
515                 ptlrpc_request_free(req);
516                 RETURN(ERR_PTR(rc));
517         }
518
519         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
520         ptlrpc_request_set_replen(req);
521         RETURN(req);
522 }
523
524 static int mdc_finish_enqueue(struct obd_export *exp,
525                               struct ptlrpc_request *req,
526                               struct ldlm_enqueue_info *einfo,
527                               struct lookup_intent *it,
528                               struct lustre_handle *lockh,
529                               int rc)
530 {
531         struct req_capsule  *pill = &req->rq_pill;
532         struct ldlm_request *lockreq;
533         struct ldlm_reply   *lockrep;
534         struct lustre_intent_data *intent = &it->d.lustre;
535         struct ldlm_lock    *lock;
536         void                *lvb_data = NULL;
537         int                  lvb_len = 0;
538         ENTRY;
539
540         LASSERT(rc >= 0);
541         /* Similarly, if we're going to replay this request, we don't want to
542          * actually get a lock, just perform the intent. */
543         if (req->rq_transno || req->rq_replay) {
544                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
545                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
546         }
547
548         if (rc == ELDLM_LOCK_ABORTED) {
549                 einfo->ei_mode = 0;
550                 memset(lockh, 0, sizeof(*lockh));
551                 rc = 0;
552         } else { /* rc = 0 */
553                 lock = ldlm_handle2lock(lockh);
554                 LASSERT(lock != NULL);
555
556                 /* If the server gave us back a different lock mode, we should
557                  * fix up our variables. */
558                 if (lock->l_req_mode != einfo->ei_mode) {
559                         ldlm_lock_addref(lockh, lock->l_req_mode);
560                         ldlm_lock_decref(lockh, einfo->ei_mode);
561                         einfo->ei_mode = lock->l_req_mode;
562                 }
563                 LDLM_LOCK_PUT(lock);
564         }
565
566         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
567         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
568
569         intent->it_disposition = (int)lockrep->lock_policy_res1;
570         intent->it_status = (int)lockrep->lock_policy_res2;
571         intent->it_lock_mode = einfo->ei_mode;
572         intent->it_lock_handle = lockh->cookie;
573         intent->it_data = req;
574
575         /* Technically speaking rq_transno must already be zero if
576          * it_status is in error, so the check is a bit redundant */
577         if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
578                 mdc_clear_replay_flag(req, intent->it_status);
579
580         /* If we're doing an IT_OPEN which did not result in an actual
581          * successful open, then we need to remove the bit which saves
582          * this request for unconditional replay.
583          *
584          * It's important that we do this first!  Otherwise we might exit the
585          * function without doing so, and try to replay a failed create
586          * (bug 3440) */
587         if (it->it_op & IT_OPEN && req->rq_replay &&
588             (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
589                 mdc_clear_replay_flag(req, intent->it_status);
590
591         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
592                   it->it_op, intent->it_disposition, intent->it_status);
593
594         /* We know what to expect, so we do any byte flipping required here */
595         if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
596                 struct mdt_body *body;
597
598                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
599                 if (body == NULL) {
600                         CERROR ("Can't swab mdt_body\n");
601                         RETURN (-EPROTO);
602                 }
603
604                 if (it_disposition(it, DISP_OPEN_OPEN) &&
605                     !it_open_error(DISP_OPEN_OPEN, it)) {
606                         /*
607                          * If this is a successful OPEN request, we need to set
608                          * replay handler and data early, so that if replay
609                          * happens immediately after swabbing below, new reply
610                          * is swabbed by that handler correctly.
611                          */
612                         mdc_set_open_replay_data(NULL, NULL, req);
613                 }
614
615                 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
616                         void *eadata;
617
618                         mdc_update_max_ea_from_body(exp, body);
619
620                         /*
621                          * The eadata is opaque; just check that it is there.
622                          * Eventually, obd_unpackmd() will check the contents.
623                          */
624                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
625                                                               body->eadatasize);
626                         if (eadata == NULL)
627                                 RETURN(-EPROTO);
628
629                         /* save lvb data and length in case this is for layout
630                          * lock */
631                         lvb_data = eadata;
632                         lvb_len = body->eadatasize;
633
634                         /*
635                          * We save the reply LOV EA in case we have to replay a
636                          * create for recovery.  If we didn't allocate a large
637                          * enough request buffer above we need to reallocate it
638                          * here to hold the actual LOV EA.
639                          *
640                          * To not save LOV EA if request is not going to replay
641                          * (for example error one).
642                          */
643                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
644                                 void *lmm;
645                                 if (req_capsule_get_size(pill, &RMF_EADATA,
646                                                          RCL_CLIENT) <
647                                     body->eadatasize)
648                                         mdc_realloc_openmsg(req, body);
649                                 else
650                                         req_capsule_shrink(pill, &RMF_EADATA,
651                                                            body->eadatasize,
652                                                            RCL_CLIENT);
653
654                                 req_capsule_set_size(pill, &RMF_EADATA,
655                                                      RCL_CLIENT,
656                                                      body->eadatasize);
657
658                                 lmm = req_capsule_client_get(pill, &RMF_EADATA);
659                                 if (lmm)
660                                         memcpy(lmm, eadata, body->eadatasize);
661                         }
662                 }
663
664                 if (body->valid & OBD_MD_FLRMTPERM) {
665                         struct mdt_remote_perm *perm;
666
667                         LASSERT(client_is_remote(exp));
668                         perm = req_capsule_server_swab_get(pill, &RMF_ACL,
669                                                 lustre_swab_mdt_remote_perm);
670                         if (perm == NULL)
671                                 RETURN(-EPROTO);
672                 }
673                 if (body->valid & OBD_MD_FLMDSCAPA) {
674                         struct lustre_capa *capa, *p;
675
676                         capa = req_capsule_server_get(pill, &RMF_CAPA1);
677                         if (capa == NULL)
678                                 RETURN(-EPROTO);
679
680                         if (it->it_op & IT_OPEN) {
681                                 /* client fid capa will be checked in replay */
682                                 p = req_capsule_client_get(pill, &RMF_CAPA2);
683                                 LASSERT(p);
684                                 *p = *capa;
685                         }
686                 }
687                 if (body->valid & OBD_MD_FLOSSCAPA) {
688                         struct lustre_capa *capa;
689
690                         capa = req_capsule_server_get(pill, &RMF_CAPA2);
691                         if (capa == NULL)
692                                 RETURN(-EPROTO);
693                 }
694         } else if (it->it_op & IT_LAYOUT) {
695                 /* maybe the lock was granted right away and layout
696                  * is packed into RMF_DLM_LVB of req */
697                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
698                 if (lvb_len > 0) {
699                         lvb_data = req_capsule_server_sized_get(pill,
700                                                         &RMF_DLM_LVB, lvb_len);
701                         if (lvb_data == NULL)
702                                 RETURN(-EPROTO);
703                 }
704         }
705
706         /* fill in stripe data for layout lock */
707         lock = ldlm_handle2lock(lockh);
708         if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL) {
709                 void *lmm;
710
711                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
712                         ldlm_it2str(it->it_op), lvb_len);
713
714                 OBD_ALLOC_LARGE(lmm, lvb_len);
715                 if (lmm == NULL) {
716                         LDLM_LOCK_PUT(lock);
717                         RETURN(-ENOMEM);
718                 }
719                 memcpy(lmm, lvb_data, lvb_len);
720
721                 /* install lvb_data */
722                 lock_res_and_lock(lock);
723                 if (lock->l_lvb_data == NULL) {
724                         lock->l_lvb_data = lmm;
725                         lock->l_lvb_len = lvb_len;
726                         lmm = NULL;
727                 }
728                 unlock_res_and_lock(lock);
729                 if (lmm != NULL)
730                         OBD_FREE_LARGE(lmm, lvb_len);
731         }
732         if (lock != NULL)
733                 LDLM_LOCK_PUT(lock);
734
735         RETURN(rc);
736 }
737
738 /* We always reserve enough space in the reply packet for a stripe MD, because
739  * we don't know in advance the file type. */
740 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
741                 struct lookup_intent *it, struct md_op_data *op_data,
742                 struct lustre_handle *lockh, void *lmm, int lmmsize,
743                 struct ptlrpc_request **reqp, __u64 extra_lock_flags)
744 {
745         struct obd_device     *obddev = class_exp2obd(exp);
746         struct ptlrpc_request *req = NULL;
747         __u64                  flags, saved_flags = extra_lock_flags;
748         int                    rc;
749         struct ldlm_res_id res_id;
750         static const ldlm_policy_data_t lookup_policy =
751                             { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
752         static const ldlm_policy_data_t update_policy =
753                             { .l_inodebits = { MDS_INODELOCK_UPDATE } };
754         static const ldlm_policy_data_t layout_policy =
755                             { .l_inodebits = { MDS_INODELOCK_LAYOUT } };
756         ldlm_policy_data_t const *policy = &lookup_policy;
757         int                    generation, resends = 0;
758         struct ldlm_reply     *lockrep;
759         enum lvb_type          lvb_type = 0;
760         ENTRY;
761
762         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
763                  einfo->ei_type);
764
765         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
766
767         if (it) {
768                 saved_flags |= LDLM_FL_HAS_INTENT;
769                 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
770                         policy = &update_policy;
771                 else if (it->it_op & IT_LAYOUT)
772                         policy = &layout_policy;
773         }
774
775         LASSERT(reqp == NULL);
776
777         generation = obddev->u.cli.cl_import->imp_generation;
778 resend:
779         flags = saved_flags;
780         if (!it) {
781                 /* The only way right now is FLOCK, in this case we hide flock
782                    policy as lmm, but lmmsize is 0 */
783                 LASSERT(lmm && lmmsize == 0);
784                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
785                          einfo->ei_type);
786                 policy = (ldlm_policy_data_t *)lmm;
787                 res_id.name[3] = LDLM_FLOCK;
788         } else if (it->it_op & IT_OPEN) {
789                 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
790                                            einfo->ei_cbdata);
791                 policy = &update_policy;
792                 einfo->ei_cbdata = NULL;
793                 lmm = NULL;
794         } else if (it->it_op & IT_UNLINK) {
795                 req = mdc_intent_unlink_pack(exp, it, op_data);
796         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
797                 req = mdc_intent_getattr_pack(exp, it, op_data);
798         } else if (it->it_op & IT_READDIR) {
799                 req = mdc_enqueue_pack(exp, 0);
800         } else if (it->it_op & IT_LAYOUT) {
801                 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
802                         RETURN(-EOPNOTSUPP);
803
804                 req = mdc_intent_layout_pack(exp, it, op_data);
805                 lvb_type = LVB_T_LAYOUT;
806         } else {
807                 LBUG();
808                 RETURN(-EINVAL);
809         }
810
811         if (IS_ERR(req))
812                 RETURN(PTR_ERR(req));
813
814         if (req != NULL && it && it->it_op & IT_CREAT)
815                 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
816                  * retry logic */
817                 req->rq_no_retry_einprogress = 1;
818
819         if (resends) {
820                 req->rq_generation_set = 1;
821                 req->rq_import_generation = generation;
822                 req->rq_sent = cfs_time_current_sec() + resends;
823         }
824
825         /* It is important to obtain rpc_lock first (if applicable), so that
826          * threads that are serialised with rpc_lock are not polluting our
827          * rpcs in flight counter. We do not do flock request limiting, though*/
828         if (it) {
829                 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
830                 rc = mdc_enter_request(&obddev->u.cli);
831                 if (rc != 0) {
832                         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
833                         mdc_clear_replay_flag(req, 0);
834                         ptlrpc_req_finished(req);
835                         RETURN(rc);
836                 }
837         }
838
839         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
840                               0, lvb_type, lockh, 0);
841         if (!it) {
842                 /* For flock requests we immediatelly return without further
843                    delay and let caller deal with the rest, since rest of
844                    this function metadata processing makes no sense for flock
845                    requests anyway. But in case of problem during comms with
846                    Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
847                    can not rely on caller and this mainly for F_UNLCKs
848                    (explicits or automatically generated by Kernel to clean
849                    current FLocks upon exit) that can't be trashed */
850                 if ((rc == -EINTR) || (rc == -ETIMEDOUT))
851                         goto resend;
852                 RETURN(rc);
853         }
854
855         mdc_exit_request(&obddev->u.cli);
856         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
857
858         if (rc < 0) {
859                 CERROR("ldlm_cli_enqueue: %d\n", rc);
860                 mdc_clear_replay_flag(req, rc);
861                 ptlrpc_req_finished(req);
862                 RETURN(rc);
863         }
864
865         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
866         LASSERT(lockrep != NULL);
867
868         lockrep->lock_policy_res2 =
869                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
870
871         /* Retry the create infinitely when we get -EINPROGRESS from
872          * server. This is required by the new quota design. */
873         if (it && it->it_op & IT_CREAT &&
874             (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
875                 mdc_clear_replay_flag(req, rc);
876                 ptlrpc_req_finished(req);
877                 resends++;
878
879                 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
880                        obddev->obd_name, resends, it->it_op,
881                        PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
882
883                 if (generation == obddev->u.cli.cl_import->imp_generation) {
884                         goto resend;
885                 } else {
886                         CDEBUG(D_HA, "resend cross eviction\n");
887                         RETURN(-EIO);
888                 }
889         }
890
891         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
892         if (rc < 0) {
893                 if (lustre_handle_is_used(lockh)) {
894                         ldlm_lock_decref(lockh, einfo->ei_mode);
895                         memset(lockh, 0, sizeof(*lockh));
896                 }
897                 ptlrpc_req_finished(req);
898         }
899         RETURN(rc);
900 }
901
902 static int mdc_finish_intent_lock(struct obd_export *exp,
903                                   struct ptlrpc_request *request,
904                                   struct md_op_data *op_data,
905                                   struct lookup_intent *it,
906                                   struct lustre_handle *lockh)
907 {
908         struct lustre_handle old_lock;
909         struct mdt_body *mdt_body;
910         struct ldlm_lock *lock;
911         int rc;
912         ENTRY;
913
914         LASSERT(request != NULL);
915         LASSERT(request != LP_POISON);
916         LASSERT(request->rq_repmsg != LP_POISON);
917
918         if (!it_disposition(it, DISP_IT_EXECD)) {
919                 /* The server failed before it even started executing the
920                  * intent, i.e. because it couldn't unpack the request. */
921                 LASSERT(it->d.lustre.it_status != 0);
922                 RETURN(it->d.lustre.it_status);
923         }
924         rc = it_open_error(DISP_IT_EXECD, it);
925         if (rc)
926                 RETURN(rc);
927
928         mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
929         LASSERT(mdt_body != NULL);      /* mdc_enqueue checked */
930
931         /* If we were revalidating a fid/name pair, mark the intent in
932          * case we fail and get called again from lookup */
933         if (fid_is_sane(&op_data->op_fid2) &&
934             it->it_create_mode & M_CHECK_STALE &&
935             it->it_op != IT_GETATTR) {
936                 it_set_disposition(it, DISP_ENQ_COMPLETE);
937
938                 /* Also: did we find the same inode? */
939                 /* sever can return one of two fids:
940                  * op_fid2 - new allocated fid - if file is created.
941                  * op_fid3 - existent fid - if file only open.
942                  * op_fid3 is saved in lmv_intent_open */
943                 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
944                     (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
945                         CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
946                                "\n", PFID(&op_data->op_fid2),
947                                PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
948                         RETURN(-ESTALE);
949                 }
950         }
951
952         rc = it_open_error(DISP_LOOKUP_EXECD, it);
953         if (rc)
954                 RETURN(rc);
955
956         /* keep requests around for the multiple phases of the call
957          * this shows the DISP_XX must guarantee we make it into the call
958          */
959         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
960             it_disposition(it, DISP_OPEN_CREATE) &&
961             !it_open_error(DISP_OPEN_CREATE, it)) {
962                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
963                 ptlrpc_request_addref(request); /* balanced in ll_create_node */
964         }
965         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
966             it_disposition(it, DISP_OPEN_OPEN) &&
967             !it_open_error(DISP_OPEN_OPEN, it)) {
968                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
969                 ptlrpc_request_addref(request); /* balanced in ll_file_open */
970                 /* BUG 11546 - eviction in the middle of open rpc processing */
971                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
972         }
973
974         if (it->it_op & IT_CREAT) {
975                 /* XXX this belongs in ll_create_it */
976         } else if (it->it_op == IT_OPEN) {
977                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
978         } else {
979                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
980         }
981
982         /* If we already have a matching lock, then cancel the new
983          * one.  We have to set the data here instead of in
984          * mdc_enqueue, because we need to use the child's inode as
985          * the l_ast_data to match, and that's not available until
986          * intent_finish has performed the iget().) */
987         lock = ldlm_handle2lock(lockh);
988         if (lock) {
989                 ldlm_policy_data_t policy = lock->l_policy_data;
990                 LDLM_DEBUG(lock, "matching against this");
991
992                 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
993                                          &lock->l_resource->lr_name),
994                          "Lock res_id: "DLDLMRES", fid: "DFID"\n",
995                          PLDLMRES(lock->l_resource), PFID(&mdt_body->fid1));
996                 LDLM_LOCK_PUT(lock);
997
998                 memcpy(&old_lock, lockh, sizeof(*lockh));
999                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1000                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1001                         ldlm_lock_decref_and_cancel(lockh,
1002                                                     it->d.lustre.it_lock_mode);
1003                         memcpy(lockh, &old_lock, sizeof(old_lock));
1004                         it->d.lustre.it_lock_handle = lockh->cookie;
1005                 }
1006         }
1007         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1008                op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
1009                it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
1010         RETURN(rc);
1011 }
1012
1013 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1014                         struct lu_fid *fid, __u64 *bits)
1015 {
1016         /* We could just return 1 immediately, but since we should only
1017          * be called in revalidate_it if we already have a lock, let's
1018          * verify that. */
1019         struct ldlm_res_id res_id;
1020         struct lustre_handle lockh;
1021         ldlm_policy_data_t policy;
1022         ldlm_mode_t mode;
1023         ENTRY;
1024
1025         if (it->d.lustre.it_lock_handle) {
1026                 lockh.cookie = it->d.lustre.it_lock_handle;
1027                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1028         } else {
1029                 fid_build_reg_res_name(fid, &res_id);
1030                 switch (it->it_op) {
1031                 case IT_GETATTR:
1032                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1033                         break;
1034                 case IT_LAYOUT:
1035                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1036                         break;
1037                 default:
1038                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1039                         break;
1040                 }
1041                 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
1042                                        LDLM_FL_BLOCK_GRANTED, &res_id,
1043                                        LDLM_IBITS, &policy,
1044                                        LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0);
1045         }
1046
1047         if (mode) {
1048                 it->d.lustre.it_lock_handle = lockh.cookie;
1049                 it->d.lustre.it_lock_mode = mode;
1050         } else {
1051                 it->d.lustre.it_lock_handle = 0;
1052                 it->d.lustre.it_lock_mode = 0;
1053         }
1054
1055         RETURN(!!mode);
1056 }
1057
1058 /*
1059  * This long block is all about fixing up the lock and request state
1060  * so that it is correct as of the moment _before_ the operation was
1061  * applied; that way, the VFS will think that everything is normal and
1062  * call Lustre's regular VFS methods.
1063  *
1064  * If we're performing a creation, that means that unless the creation
1065  * failed with EEXIST, we should fake up a negative dentry.
1066  *
1067  * For everything else, we want to lookup to succeed.
1068  *
1069  * One additional note: if CREATE or OPEN succeeded, we add an extra
1070  * reference to the request because we need to keep it around until
1071  * ll_create/ll_open gets called.
1072  *
1073  * The server will return to us, in it_disposition, an indication of
1074  * exactly what d.lustre.it_status refers to.
1075  *
1076  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1077  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1078  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1079  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1080  * was successful.
1081  *
1082  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1083  * child lookup.
1084  */
1085 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1086                     void *lmm, int lmmsize, struct lookup_intent *it,
1087                     int lookup_flags, struct ptlrpc_request **reqp,
1088                     ldlm_blocking_callback cb_blocking,
1089                     __u64 extra_lock_flags)
1090 {
1091         struct lustre_handle lockh;
1092         int rc = 0;
1093         ENTRY;
1094         LASSERT(it);
1095
1096         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1097                 ", intent: %s flags %#Lo\n", op_data->op_namelen,
1098                 op_data->op_name, PFID(&op_data->op_fid2),
1099                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1100                 it->it_flags);
1101
1102         lockh.cookie = 0;
1103         if (fid_is_sane(&op_data->op_fid2) &&
1104             (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
1105                 /* We could just return 1 immediately, but since we should only
1106                  * be called in revalidate_it if we already have a lock, let's
1107                  * verify that. */
1108                 it->d.lustre.it_lock_handle = 0;
1109                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1110                 /* Only return failure if it was not GETATTR by cfid
1111                    (from inode_revalidate) */
1112                 if (rc || op_data->op_namelen != 0)
1113                         RETURN(rc);
1114         }
1115
1116         /* lookup_it may be called only after revalidate_it has run, because
1117          * revalidate_it cannot return errors, only zero.  Returning zero causes
1118          * this call to lookup, which *can* return an error.
1119          *
1120          * We only want to execute the request associated with the intent one
1121          * time, however, so don't send the request again.  Instead, skip past
1122          * this and use the request from revalidate.  In this case, revalidate
1123          * never dropped its reference, so the refcounts are all OK */
1124         if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
1125                 struct ldlm_enqueue_info einfo = {
1126                         .ei_type        = LDLM_IBITS,
1127                         .ei_mode        = it_to_lock_mode(it),
1128                         .ei_cb_bl       = cb_blocking,
1129                         .ei_cb_cp       = ldlm_completion_ast,
1130                 };
1131
1132                 /* For case if upper layer did not alloc fid, do it now. */
1133                 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1134                         rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
1135                         if (rc < 0) {
1136                                 CERROR("Can't alloc new fid, rc %d\n", rc);
1137                                 RETURN(rc);
1138                         }
1139                 }
1140                 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
1141                                  lmm, lmmsize, NULL, extra_lock_flags);
1142                 if (rc < 0)
1143                         RETURN(rc);
1144         } else if (!fid_is_sane(&op_data->op_fid2) ||
1145                    !(it->it_create_mode & M_CHECK_STALE)) {
1146                 /* DISP_ENQ_COMPLETE set means there is extra reference on
1147                  * request referenced from this intent, saved for subsequent
1148                  * lookup.  This path is executed when we proceed to this
1149                  * lookup, so we clear DISP_ENQ_COMPLETE */
1150                 it_clear_disposition(it, DISP_ENQ_COMPLETE);
1151         }
1152         *reqp = it->d.lustre.it_data;
1153         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1154         RETURN(rc);
1155 }
1156
1157 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1158                                               struct ptlrpc_request *req,
1159                                               void *args, int rc)
1160 {
1161         struct mdc_getattr_args  *ga = args;
1162         struct obd_export        *exp = ga->ga_exp;
1163         struct md_enqueue_info   *minfo = ga->ga_minfo;
1164         struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1165         struct lookup_intent     *it;
1166         struct lustre_handle     *lockh;
1167         struct obd_device        *obddev;
1168         struct ldlm_reply        *lockrep;
1169         __u64                     flags = LDLM_FL_HAS_INTENT;
1170         ENTRY;
1171
1172         it    = &minfo->mi_it;
1173         lockh = &minfo->mi_lockh;
1174
1175         obddev = class_exp2obd(exp);
1176
1177         mdc_exit_request(&obddev->u.cli);
1178         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1179                 rc = -ETIMEDOUT;
1180
1181         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1182                                    &flags, NULL, 0, lockh, rc);
1183         if (rc < 0) {
1184                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1185                 mdc_clear_replay_flag(req, rc);
1186                 GOTO(out, rc);
1187         }
1188
1189         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1190         LASSERT(lockrep != NULL);
1191
1192         lockrep->lock_policy_res2 =
1193                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1194
1195         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1196         if (rc)
1197                 GOTO(out, rc);
1198
1199         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1200         EXIT;
1201
1202 out:
1203         OBD_FREE_PTR(einfo);
1204         minfo->mi_cb(req, minfo, rc);
1205         return 0;
1206 }
1207
1208 int mdc_intent_getattr_async(struct obd_export *exp,
1209                              struct md_enqueue_info *minfo,
1210                              struct ldlm_enqueue_info *einfo)
1211 {
1212         struct md_op_data       *op_data = &minfo->mi_data;
1213         struct lookup_intent    *it = &minfo->mi_it;
1214         struct ptlrpc_request   *req;
1215         struct mdc_getattr_args *ga;
1216         struct obd_device       *obddev = class_exp2obd(exp);
1217         struct ldlm_res_id       res_id;
1218         /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1219          *     for statahead currently. Consider CMD in future, such two bits
1220          *     maybe managed by different MDS, should be adjusted then. */
1221         ldlm_policy_data_t       policy = {
1222                                         .l_inodebits = { MDS_INODELOCK_LOOKUP | 
1223                                                          MDS_INODELOCK_UPDATE }
1224                                  };
1225         int                      rc = 0;
1226         __u64                    flags = LDLM_FL_HAS_INTENT;
1227         ENTRY;
1228
1229         CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#Lo\n",
1230                 op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1231                 ldlm_it2str(it->it_op), it->it_flags);
1232
1233         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1234         req = mdc_intent_getattr_pack(exp, it, op_data);
1235         if (!req)
1236                 RETURN(-ENOMEM);
1237
1238         rc = mdc_enter_request(&obddev->u.cli);
1239         if (rc != 0) {
1240                 ptlrpc_req_finished(req);
1241                 RETURN(rc);
1242         }
1243
1244         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1245                               0, LVB_T_NONE, &minfo->mi_lockh, 1);
1246         if (rc < 0) {
1247                 mdc_exit_request(&obddev->u.cli);
1248                 ptlrpc_req_finished(req);
1249                 RETURN(rc);
1250         }
1251
1252         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1253         ga = ptlrpc_req_async_args(req);
1254         ga->ga_exp = exp;
1255         ga->ga_minfo = minfo;
1256         ga->ga_einfo = einfo;
1257
1258         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1259         ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
1260
1261         RETURN(0);
1262 }