Whamcloud - gitweb
LU-4705 mdc: improve mdc_enqueue() error message
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_MDC
34
35 #include <linux/module.h>
36
37 #include <obd.h>
38 #include <obd_class.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_intent.h>
42 #include <lustre_mdc.h>
43 #include <lustre_net.h>
44 #include <lustre_req_layout.h>
45 #include <lustre_swab.h>
46
47 #include "mdc_internal.h"
48
49 struct mdc_getattr_args {
50         struct obd_export               *ga_exp;
51         struct md_enqueue_info          *ga_minfo;
52 };
53
54 int it_open_error(int phase, struct lookup_intent *it)
55 {
56         if (it_disposition(it, DISP_OPEN_LEASE)) {
57                 if (phase >= DISP_OPEN_LEASE)
58                         return it->it_status;
59                 else
60                         return 0;
61         }
62         if (it_disposition(it, DISP_OPEN_OPEN)) {
63                 if (phase >= DISP_OPEN_OPEN)
64                         return it->it_status;
65                 else
66                         return 0;
67         }
68
69         if (it_disposition(it, DISP_OPEN_CREATE)) {
70                 if (phase >= DISP_OPEN_CREATE)
71                         return it->it_status;
72                 else
73                         return 0;
74         }
75
76         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
77                 if (phase >= DISP_LOOKUP_EXECD)
78                         return it->it_status;
79                 else
80                         return 0;
81         }
82
83         if (it_disposition(it, DISP_IT_EXECD)) {
84                 if (phase >= DISP_IT_EXECD)
85                         return it->it_status;
86                 else
87                         return 0;
88         }
89
90         CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
91         LBUG();
92
93         return 0;
94 }
95 EXPORT_SYMBOL(it_open_error);
96
97 /* this must be called on a lockh that is known to have a referenced lock */
98 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
99                       void *data, __u64 *bits)
100 {
101         struct ldlm_lock *lock;
102         struct inode *new_inode = data;
103         ENTRY;
104
105         if(bits)
106                 *bits = 0;
107
108         if (!lustre_handle_is_used(lockh))
109                 RETURN(0);
110
111         lock = ldlm_handle2lock(lockh);
112
113         LASSERT(lock != NULL);
114         lock_res_and_lock(lock);
115         if (lock->l_resource->lr_lvb_inode &&
116             lock->l_resource->lr_lvb_inode != data) {
117                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
118                 LASSERTF(old_inode->i_state & I_FREEING,
119                          "Found existing inode %p/%lu/%u state %lu in lock: "
120                          "setting data to %p/%lu/%u\n", old_inode,
121                          old_inode->i_ino, old_inode->i_generation,
122                          old_inode->i_state,
123                          new_inode, new_inode->i_ino, new_inode->i_generation);
124         }
125         lock->l_resource->lr_lvb_inode = new_inode;
126         if (bits)
127                 *bits = lock->l_policy_data.l_inodebits.bits;
128
129         unlock_res_and_lock(lock);
130         LDLM_LOCK_PUT(lock);
131
132         RETURN(0);
133 }
134
135 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
136                               const struct lu_fid *fid, enum ldlm_type type,
137                               union ldlm_policy_data *policy,
138                               enum ldlm_mode mode, struct lustre_handle *lockh)
139 {
140         struct ldlm_res_id res_id;
141         enum ldlm_mode rc;
142         ENTRY;
143
144         fid_build_reg_res_name(fid, &res_id);
145         /* LU-4405: Clear bits not supported by server */
146         policy->l_inodebits.bits &= exp_connect_ibits(exp);
147         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
148                              &res_id, type, policy, mode, lockh, 0);
149         RETURN(rc);
150 }
151
152 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
153                       union ldlm_policy_data *policy, enum ldlm_mode mode,
154                       enum ldlm_cancel_flags flags, void *opaque)
155 {
156         struct obd_device *obd = class_exp2obd(exp);
157         struct ldlm_res_id res_id;
158         int rc;
159
160         ENTRY;
161
162         fid_build_reg_res_name(fid, &res_id);
163         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
164                                              policy, mode, flags, opaque);
165         RETURN(rc);
166 }
167
168 int mdc_null_inode(struct obd_export *exp,
169                    const struct lu_fid *fid)
170 {
171         struct ldlm_res_id res_id;
172         struct ldlm_resource *res;
173         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
174         ENTRY;
175
176         LASSERTF(ns != NULL, "no namespace passed\n");
177
178         fid_build_reg_res_name(fid, &res_id);
179
180         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
181         if (IS_ERR(res))
182                 RETURN(0);
183
184         lock_res(res);
185         res->lr_lvb_inode = NULL;
186         unlock_res(res);
187
188         ldlm_resource_putref(res);
189         RETURN(0);
190 }
191
192 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
193 {
194         /* Don't hold error requests for replay. */
195         if (req->rq_replay) {
196                 spin_lock(&req->rq_lock);
197                 req->rq_replay = 0;
198                 spin_unlock(&req->rq_lock);
199         }
200         if (rc && req->rq_transno != 0) {
201                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
202                 LBUG();
203         }
204 }
205
206 /* Save a large LOV EA into the request buffer so that it is available
207  * for replay.  We don't do this in the initial request because the
208  * original request doesn't need this buffer (at most it sends just the
209  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
210  * buffer and may also be difficult to allocate and save a very large
211  * request buffer for each open. (bug 5707)
212  *
213  * OOM here may cause recovery failure if lmm is needed (only for the
214  * original open if the MDS crashed just when this client also OOM'd)
215  * but this is incredibly unlikely, and questionable whether the client
216  * could do MDS recovery under OOM anyways... */
217 int mdc_save_lovea(struct ptlrpc_request *req,
218                    const struct req_msg_field *field,
219                    void *data, u32 size)
220 {
221         struct req_capsule *pill = &req->rq_pill;
222         void *lmm;
223         int rc = 0;
224
225         if (req_capsule_get_size(pill, field, RCL_CLIENT) < size) {
226                 rc = sptlrpc_cli_enlarge_reqbuf(req, field, size);
227                 if (rc) {
228                         CERROR("%s: Can't enlarge ea size to %d: rc = %d\n",
229                                req->rq_export->exp_obd->obd_name,
230                                size, rc);
231                         return rc;
232                 }
233         } else {
234                 req_capsule_shrink(pill, field, size, RCL_CLIENT);
235         }
236
237         req_capsule_set_size(pill, field, RCL_CLIENT, size);
238         lmm = req_capsule_client_get(pill, field);
239         if (lmm)
240                 memcpy(lmm, data, size);
241
242         return rc;
243 }
244
245 static struct ptlrpc_request *
246 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
247                      struct md_op_data *op_data)
248 {
249         struct ptlrpc_request   *req;
250         struct obd_device       *obddev = class_exp2obd(exp);
251         struct ldlm_intent      *lit;
252         const void              *lmm = op_data->op_data;
253         __u32                    lmmsize = op_data->op_data_size;
254         struct list_head         cancels = LIST_HEAD_INIT(cancels);
255         int                      count = 0;
256         enum ldlm_mode           mode;
257         int                      rc;
258         ENTRY;
259
260         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
261
262         /* XXX: openlock is not cancelled for cross-refs. */
263         /* If inode is known, cancel conflicting OPEN locks. */
264         if (fid_is_sane(&op_data->op_fid2)) {
265                 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
266                         if (it->it_flags & FMODE_WRITE)
267                                 mode = LCK_EX;
268                         else
269                                 mode = LCK_PR;
270                 } else {
271                         if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
272                                 mode = LCK_CW;
273 #ifdef FMODE_EXEC
274                         else if (it->it_flags & FMODE_EXEC)
275                                 mode = LCK_PR;
276 #endif
277                         else
278                                 mode = LCK_CR;
279                 }
280                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
281                                                 &cancels, mode,
282                                                 MDS_INODELOCK_OPEN);
283         }
284
285         /* If CREATE, cancel parent's UPDATE lock. */
286         if (it->it_op & IT_CREAT)
287                 mode = LCK_EX;
288         else
289                 mode = LCK_CR;
290         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
291                                          &cancels, mode,
292                                          MDS_INODELOCK_UPDATE);
293
294         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
295                                    &RQF_LDLM_INTENT_OPEN);
296         if (req == NULL) {
297                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
298                 RETURN(ERR_PTR(-ENOMEM));
299         }
300
301         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
302                              op_data->op_namelen + 1);
303         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
304                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
305
306         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
307                              RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
308                              strlen(op_data->op_file_secctx_name) + 1 : 0);
309
310         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
311                              op_data->op_file_secctx_size);
312
313         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
314         if (rc < 0) {
315                 ptlrpc_request_free(req);
316                 RETURN(ERR_PTR(rc));
317         }
318
319         spin_lock(&req->rq_lock);
320         req->rq_replay = req->rq_import->imp_replayable;
321         spin_unlock(&req->rq_lock);
322
323         /* pack the intent */
324         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
325         lit->opc = (__u64)it->it_op;
326
327         /* pack the intended request */
328         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
329                       lmmsize);
330
331         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
332                              obddev->u.cli.cl_max_mds_easize);
333         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
334                              req->rq_import->imp_connect_data.ocd_max_easize);
335         ptlrpc_request_set_replen(req);
336         return req;
337 }
338
339 #define GA_DEFAULT_EA_NAME_LEN 20
340 #define GA_DEFAULT_EA_VAL_LEN  250
341 #define GA_DEFAULT_EA_NUM      10
342
343 static struct ptlrpc_request *
344 mdc_intent_getxattr_pack(struct obd_export *exp,
345                          struct lookup_intent *it,
346                          struct md_op_data *op_data)
347 {
348         struct ptlrpc_request   *req;
349         struct ldlm_intent      *lit;
350         int                     rc, count = 0;
351         struct list_head        cancels = LIST_HEAD_INIT(cancels);
352
353         ENTRY;
354
355         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
356                                         &RQF_LDLM_INTENT_GETXATTR);
357         if (req == NULL)
358                 RETURN(ERR_PTR(-ENOMEM));
359
360         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
361         if (rc) {
362                 ptlrpc_request_free(req);
363                 RETURN(ERR_PTR(rc));
364         }
365
366         /* pack the intent */
367         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
368         lit->opc = IT_GETXATTR;
369
370         /* pack the intended request */
371         mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid,
372                       GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM,
373                       -1, 0);
374
375         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
376                              GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM);
377
378         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER,
379                              GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM);
380
381         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS, RCL_SERVER,
382                              sizeof(__u32) * GA_DEFAULT_EA_NUM);
383
384         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
385
386         ptlrpc_request_set_replen(req);
387
388         RETURN(req);
389 }
390
391 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
392                                                      struct lookup_intent *it,
393                                                      struct md_op_data *op_data)
394 {
395         struct ptlrpc_request *req;
396         struct obd_device     *obddev = class_exp2obd(exp);
397         struct ldlm_intent    *lit;
398         int                    rc;
399         ENTRY;
400
401         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
402                                    &RQF_LDLM_INTENT_UNLINK);
403         if (req == NULL)
404                 RETURN(ERR_PTR(-ENOMEM));
405
406         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
407                              op_data->op_namelen + 1);
408
409         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
410         if (rc) {
411                 ptlrpc_request_free(req);
412                 RETURN(ERR_PTR(rc));
413         }
414
415         /* pack the intent */
416         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
417         lit->opc = (__u64)it->it_op;
418
419         /* pack the intended request */
420         mdc_unlink_pack(req, op_data);
421
422         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
423                              obddev->u.cli.cl_default_mds_easize);
424         ptlrpc_request_set_replen(req);
425         RETURN(req);
426 }
427
428 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
429                                                       struct lookup_intent *it,
430                                                       struct md_op_data *op_data)
431 {
432         struct ptlrpc_request   *req;
433         struct obd_device       *obddev = class_exp2obd(exp);
434         u64                      valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
435                                          OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
436                                          OBD_MD_MEA | OBD_MD_FLACL;
437         struct ldlm_intent      *lit;
438         int                      rc;
439         __u32                    easize;
440         ENTRY;
441
442         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
443                                    &RQF_LDLM_INTENT_GETATTR);
444         if (req == NULL)
445                 RETURN(ERR_PTR(-ENOMEM));
446
447         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
448                              op_data->op_namelen + 1);
449
450         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
451         if (rc) {
452                 ptlrpc_request_free(req);
453                 RETURN(ERR_PTR(rc));
454         }
455
456         /* pack the intent */
457         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
458         lit->opc = (__u64)it->it_op;
459
460         if (obddev->u.cli.cl_default_mds_easize > 0)
461                 easize = obddev->u.cli.cl_default_mds_easize;
462         else
463                 easize = obddev->u.cli.cl_max_mds_easize;
464
465         /* pack the intended request */
466         mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
467
468         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
469         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
470                              req->rq_import->imp_connect_data.ocd_max_easize);
471         ptlrpc_request_set_replen(req);
472         RETURN(req);
473 }
474
475 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
476                                                      struct lookup_intent *it,
477                                                      struct md_op_data *op_data)
478 {
479         struct obd_device     *obd = class_exp2obd(exp);
480         struct ptlrpc_request *req;
481         struct ldlm_intent    *lit;
482         struct layout_intent  *layout;
483         int rc;
484         ENTRY;
485
486         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
487                                 &RQF_LDLM_INTENT_LAYOUT);
488         if (req == NULL)
489                 RETURN(ERR_PTR(-ENOMEM));
490
491         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
492         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
493         if (rc) {
494                 ptlrpc_request_free(req);
495                 RETURN(ERR_PTR(rc));
496         }
497
498         /* pack the intent */
499         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
500         lit->opc = (__u64)it->it_op;
501
502         /* pack the layout intent request */
503         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
504         LASSERT(op_data->op_data != NULL);
505         LASSERT(op_data->op_data_size == sizeof(*layout));
506         memcpy(layout, op_data->op_data, sizeof(*layout));
507
508         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
509                              obd->u.cli.cl_default_mds_easize);
510         ptlrpc_request_set_replen(req);
511         RETURN(req);
512 }
513
514 static struct ptlrpc_request *
515 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
516 {
517         struct ptlrpc_request *req;
518         int rc;
519         ENTRY;
520
521         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
522         if (req == NULL)
523                 RETURN(ERR_PTR(-ENOMEM));
524
525         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
526         if (rc) {
527                 ptlrpc_request_free(req);
528                 RETURN(ERR_PTR(rc));
529         }
530
531         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
532         ptlrpc_request_set_replen(req);
533         RETURN(req);
534 }
535
536 static int mdc_finish_enqueue(struct obd_export *exp,
537                               struct ptlrpc_request *req,
538                               struct ldlm_enqueue_info *einfo,
539                               struct lookup_intent *it,
540                               struct lustre_handle *lockh,
541                               int rc)
542 {
543         struct req_capsule  *pill = &req->rq_pill;
544         struct ldlm_request *lockreq;
545         struct ldlm_reply   *lockrep;
546         struct ldlm_lock    *lock;
547         void                *lvb_data = NULL;
548         __u32                lvb_len = 0;
549         ENTRY;
550
551         LASSERT(rc >= 0);
552         /* Similarly, if we're going to replay this request, we don't want to
553          * actually get a lock, just perform the intent. */
554         if (req->rq_transno || req->rq_replay) {
555                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
556                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
557         }
558
559         if (rc == ELDLM_LOCK_ABORTED) {
560                 einfo->ei_mode = 0;
561                 memset(lockh, 0, sizeof(*lockh));
562                 rc = 0;
563         } else { /* rc = 0 */
564                 lock = ldlm_handle2lock(lockh);
565                 LASSERT(lock != NULL);
566
567                 /* If the server gave us back a different lock mode, we should
568                  * fix up our variables. */
569                 if (lock->l_req_mode != einfo->ei_mode) {
570                         ldlm_lock_addref(lockh, lock->l_req_mode);
571                         ldlm_lock_decref(lockh, einfo->ei_mode);
572                         einfo->ei_mode = lock->l_req_mode;
573                 }
574                 LDLM_LOCK_PUT(lock);
575         }
576
577         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
578         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
579
580         it->it_disposition = (int)lockrep->lock_policy_res1;
581         it->it_status = (int)lockrep->lock_policy_res2;
582         it->it_lock_mode = einfo->ei_mode;
583         it->it_lock_handle = lockh->cookie;
584         it->it_request = req;
585
586         /* Technically speaking rq_transno must already be zero if
587          * it_status is in error, so the check is a bit redundant */
588         if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
589                 mdc_clear_replay_flag(req, it->it_status);
590
591         /* If we're doing an IT_OPEN which did not result in an actual
592          * successful open, then we need to remove the bit which saves
593          * this request for unconditional replay.
594          *
595          * It's important that we do this first!  Otherwise we might exit the
596          * function without doing so, and try to replay a failed create
597          * (bug 3440) */
598         if (it->it_op & IT_OPEN && req->rq_replay &&
599             (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
600                 mdc_clear_replay_flag(req, it->it_status);
601
602         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
603                   it->it_op, it->it_disposition, it->it_status);
604
605         /* We know what to expect, so we do any byte flipping required here */
606         if (it_has_reply_body(it)) {
607                 struct mdt_body *body;
608
609                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
610                 if (body == NULL) {
611                         CERROR ("Can't swab mdt_body\n");
612                         RETURN (-EPROTO);
613                 }
614
615                 if (it_disposition(it, DISP_OPEN_OPEN) &&
616                     !it_open_error(DISP_OPEN_OPEN, it)) {
617                         /*
618                          * If this is a successful OPEN request, we need to set
619                          * replay handler and data early, so that if replay
620                          * happens immediately after swabbing below, new reply
621                          * is swabbed by that handler correctly.
622                          */
623                         mdc_set_open_replay_data(NULL, NULL, it);
624                 }
625
626                 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
627                         void *eadata;
628
629                         mdc_update_max_ea_from_body(exp, body);
630
631                         /*
632                          * The eadata is opaque; just check that it is there.
633                          * Eventually, obd_unpackmd() will check the contents.
634                          */
635                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
636                                                         body->mbo_eadatasize);
637                         if (eadata == NULL)
638                                 RETURN(-EPROTO);
639
640                         /* save lvb data and length in case this is for layout
641                          * lock */
642                         lvb_data = eadata;
643                         lvb_len = body->mbo_eadatasize;
644
645                         /*
646                          * We save the reply LOV EA in case we have to replay a
647                          * create for recovery.  If we didn't allocate a large
648                          * enough request buffer above we need to reallocate it
649                          * here to hold the actual LOV EA.
650                          *
651                          * To not save LOV EA if request is not going to replay
652                          * (for example error one).
653                          */
654                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
655                                 rc = mdc_save_lovea(req, &RMF_EADATA, eadata,
656                                                     body->mbo_eadatasize);
657                                 if (rc) {
658                                         body->mbo_valid &= ~OBD_MD_FLEASIZE;
659                                         body->mbo_eadatasize = 0;
660                                         rc = 0;
661                                 }
662                         }
663                 }
664         } else if (it->it_op & IT_LAYOUT) {
665                 /* maybe the lock was granted right away and layout
666                  * is packed into RMF_DLM_LVB of req */
667                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
668                 if (lvb_len > 0) {
669                         lvb_data = req_capsule_server_sized_get(pill,
670                                                         &RMF_DLM_LVB, lvb_len);
671                         if (lvb_data == NULL)
672                                 RETURN(-EPROTO);
673
674                         /**
675                          * save replied layout data to the request buffer for
676                          * recovery consideration (lest MDS reinitialize
677                          * another set of OST objects).
678                          */
679                         if (req->rq_transno)
680                                 (void)mdc_save_lovea(req, &RMF_EADATA, lvb_data,
681                                                      lvb_len);
682                 }
683         }
684
685         /* fill in stripe data for layout lock.
686          * LU-6581: trust layout data only if layout lock is granted. The MDT
687          * has stopped sending layout unless the layout lock is granted. The
688          * client still does this checking in case it's talking with an old
689          * server. - Jinshan */
690         lock = ldlm_handle2lock(lockh);
691         if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL &&
692             !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
693                 void *lmm;
694
695                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
696                         ldlm_it2str(it->it_op), lvb_len);
697
698                 OBD_ALLOC_LARGE(lmm, lvb_len);
699                 if (lmm == NULL) {
700                         LDLM_LOCK_PUT(lock);
701                         RETURN(-ENOMEM);
702                 }
703                 memcpy(lmm, lvb_data, lvb_len);
704
705                 /* install lvb_data */
706                 lock_res_and_lock(lock);
707                 if (lock->l_lvb_data == NULL) {
708                         lock->l_lvb_type = LVB_T_LAYOUT;
709                         lock->l_lvb_data = lmm;
710                         lock->l_lvb_len = lvb_len;
711                         lmm = NULL;
712                 }
713                 unlock_res_and_lock(lock);
714                 if (lmm != NULL)
715                         OBD_FREE_LARGE(lmm, lvb_len);
716         }
717         if (lock != NULL)
718                 LDLM_LOCK_PUT(lock);
719
720         RETURN(rc);
721 }
722
723 /* We always reserve enough space in the reply packet for a stripe MD, because
724  * we don't know in advance the file type. */
725 static int mdc_enqueue_base(struct obd_export *exp,
726                             struct ldlm_enqueue_info *einfo,
727                             const union ldlm_policy_data *policy,
728                             struct lookup_intent *it,
729                             struct md_op_data *op_data,
730                             struct lustre_handle *lockh,
731                             __u64 extra_lock_flags)
732 {
733         struct obd_device *obddev = class_exp2obd(exp);
734         struct ptlrpc_request *req = NULL;
735         __u64 flags, saved_flags = extra_lock_flags;
736         struct ldlm_res_id res_id;
737         static const union ldlm_policy_data lookup_policy = {
738                                   .l_inodebits = { MDS_INODELOCK_LOOKUP } };
739         static const union ldlm_policy_data update_policy = {
740                                   .l_inodebits = { MDS_INODELOCK_UPDATE } };
741         static const union ldlm_policy_data layout_policy = {
742                                   .l_inodebits = { MDS_INODELOCK_LAYOUT } };
743         static const union ldlm_policy_data getxattr_policy = {
744                                   .l_inodebits = { MDS_INODELOCK_XATTR } };
745         int generation, resends = 0;
746         struct ldlm_reply *lockrep;
747         enum lvb_type lvb_type = 0;
748         int rc;
749         ENTRY;
750
751         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
752                  einfo->ei_type);
753         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
754
755         if (it != NULL) {
756                 LASSERT(policy == NULL);
757
758                 saved_flags |= LDLM_FL_HAS_INTENT;
759                 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
760                         policy = &update_policy;
761                 else if (it->it_op & IT_LAYOUT)
762                         policy = &layout_policy;
763                 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
764                         policy = &getxattr_policy;
765                 else
766                         policy = &lookup_policy;
767         }
768
769         generation = obddev->u.cli.cl_import->imp_generation;
770 resend:
771         flags = saved_flags;
772         if (it == NULL) {
773                 /* The only way right now is FLOCK. */
774                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
775                          einfo->ei_type);
776                 res_id.name[3] = LDLM_FLOCK;
777         } else if (it->it_op & IT_OPEN) {
778                 req = mdc_intent_open_pack(exp, it, op_data);
779         } else if (it->it_op & IT_UNLINK) {
780                 req = mdc_intent_unlink_pack(exp, it, op_data);
781         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
782                 req = mdc_intent_getattr_pack(exp, it, op_data);
783         } else if (it->it_op & IT_READDIR) {
784                 req = mdc_enqueue_pack(exp, 0);
785         } else if (it->it_op & IT_LAYOUT) {
786                 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
787                         RETURN(-EOPNOTSUPP);
788                 req = mdc_intent_layout_pack(exp, it, op_data);
789                 lvb_type = LVB_T_LAYOUT;
790         } else if (it->it_op & IT_GETXATTR) {
791                 req = mdc_intent_getxattr_pack(exp, it, op_data);
792         } else {
793                 LBUG();
794                 RETURN(-EINVAL);
795         }
796
797         if (IS_ERR(req))
798                 RETURN(PTR_ERR(req));
799
800         if (resends) {
801                 req->rq_generation_set = 1;
802                 req->rq_import_generation = generation;
803                 req->rq_sent = ktime_get_real_seconds() + resends;
804         }
805
806         /* It is important to obtain modify RPC slot first (if applicable), so
807          * that threads that are waiting for a modify RPC slot are not polluting
808          * our rpcs in flight counter.
809          * We do not do flock request limiting, though */
810         if (it) {
811                 mdc_get_mod_rpc_slot(req, it);
812                 rc = obd_get_request_slot(&obddev->u.cli);
813                 if (rc != 0) {
814                         mdc_put_mod_rpc_slot(req, it);
815                         mdc_clear_replay_flag(req, 0);
816                         ptlrpc_req_finished(req);
817                         RETURN(rc);
818                 }
819         }
820
821         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
822                               0, lvb_type, lockh, 0);
823         if (!it) {
824                 /* For flock requests we immediatelly return without further
825                    delay and let caller deal with the rest, since rest of
826                    this function metadata processing makes no sense for flock
827                    requests anyway. But in case of problem during comms with
828                    Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
829                    can not rely on caller and this mainly for F_UNLCKs
830                    (explicits or automatically generated by Kernel to clean
831                    current FLocks upon exit) that can't be trashed */
832                 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
833                     (einfo->ei_type == LDLM_FLOCK) &&
834                     (einfo->ei_mode == LCK_NL))
835                         goto resend;
836                 RETURN(rc);
837         }
838
839         obd_put_request_slot(&obddev->u.cli);
840         mdc_put_mod_rpc_slot(req, it);
841
842         if (rc < 0) {
843                 CDEBUG(D_INFO,
844                       "%s: ldlm_cli_enqueue "DFID":"DFID"=%s failed: rc = %d\n",
845                       obddev->obd_name, PFID(&op_data->op_fid1),
846                       PFID(&op_data->op_fid2), op_data->op_name ?: "", rc);
847
848                 mdc_clear_replay_flag(req, rc);
849                 ptlrpc_req_finished(req);
850                 RETURN(rc);
851         }
852
853         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
854         LASSERT(lockrep != NULL);
855
856         lockrep->lock_policy_res2 =
857                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
858
859         /* Retry infinitely when the server returns -EINPROGRESS for the
860          * intent operation, when server returns -EINPROGRESS for acquiring
861          * intent lock, we'll retry in after_reply(). */
862         if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
863                 mdc_clear_replay_flag(req, rc);
864                 ptlrpc_req_finished(req);
865                 resends++;
866
867                 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
868                        obddev->obd_name, resends, it->it_op,
869                        PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
870
871                 if (generation == obddev->u.cli.cl_import->imp_generation) {
872                         goto resend;
873                 } else {
874                         CDEBUG(D_HA, "resend cross eviction\n");
875                         RETURN(-EIO);
876                 }
877         }
878
879         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
880         if (rc < 0) {
881                 if (lustre_handle_is_used(lockh)) {
882                         ldlm_lock_decref(lockh, einfo->ei_mode);
883                         memset(lockh, 0, sizeof(*lockh));
884                 }
885                 ptlrpc_req_finished(req);
886
887                 it->it_lock_handle = 0;
888                 it->it_lock_mode = 0;
889                 it->it_request = NULL;
890         }
891
892         RETURN(rc);
893 }
894
895 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
896                 const union ldlm_policy_data *policy,
897                 struct md_op_data *op_data,
898                 struct lustre_handle *lockh, __u64 extra_lock_flags)
899 {
900         return mdc_enqueue_base(exp, einfo, policy, NULL,
901                                 op_data, lockh, extra_lock_flags);
902 }
903
904 static int mdc_finish_intent_lock(struct obd_export *exp,
905                                   struct ptlrpc_request *request,
906                                   struct md_op_data *op_data,
907                                   struct lookup_intent *it,
908                                   struct lustre_handle *lockh)
909 {
910         struct lustre_handle old_lock;
911         struct ldlm_lock *lock;
912         int rc = 0;
913         ENTRY;
914
915         LASSERT(request != NULL);
916         LASSERT(request != LP_POISON);
917         LASSERT(request->rq_repmsg != LP_POISON);
918
919         if (it->it_op & IT_READDIR)
920                 RETURN(0);
921
922         if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) {
923                 if (it->it_status != 0)
924                         GOTO(out, rc = it->it_status);
925         } else {
926                 if (!it_disposition(it, DISP_IT_EXECD)) {
927                         /* The server failed before it even started executing
928                          * the intent, i.e. because it couldn't unpack the
929                          * request.
930                          */
931                         LASSERT(it->it_status != 0);
932                         GOTO(out, rc = it->it_status);
933                 }
934                 rc = it_open_error(DISP_IT_EXECD, it);
935                 if (rc)
936                         GOTO(out, rc);
937
938                 rc = it_open_error(DISP_LOOKUP_EXECD, it);
939                 if (rc)
940                         GOTO(out, rc);
941
942                 /* keep requests around for the multiple phases of the call
943                  * this shows the DISP_XX must guarantee we make it into the
944                  * call
945                  */
946                 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
947                     it_disposition(it, DISP_OPEN_CREATE) &&
948                     !it_open_error(DISP_OPEN_CREATE, it)) {
949                         it_set_disposition(it, DISP_ENQ_CREATE_REF);
950                         /* balanced in ll_create_node */
951                         ptlrpc_request_addref(request);
952                 }
953                 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
954                     it_disposition(it, DISP_OPEN_OPEN) &&
955                     !it_open_error(DISP_OPEN_OPEN, it)) {
956                         it_set_disposition(it, DISP_ENQ_OPEN_REF);
957                         /* balanced in ll_file_open */
958                         ptlrpc_request_addref(request);
959                         /* BUG 11546 - eviction in the middle of open rpc
960                          * processing
961                          */
962                         OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE,
963                                          obd_timeout);
964                 }
965
966                 if (it->it_op & IT_CREAT) {
967                         /* XXX this belongs in ll_create_it */
968                 } else if (it->it_op == IT_OPEN) {
969                         LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
970                 } else {
971                         LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
972                 }
973         }
974
975         /* If we already have a matching lock, then cancel the new
976          * one.  We have to set the data here instead of in
977          * mdc_enqueue, because we need to use the child's inode as
978          * the l_ast_data to match, and that's not available until
979          * intent_finish has performed the iget().) */
980         lock = ldlm_handle2lock(lockh);
981         if (lock) {
982                 union ldlm_policy_data policy = lock->l_policy_data;
983                 LDLM_DEBUG(lock, "matching against this");
984
985                 if (it_has_reply_body(it)) {
986                         struct mdt_body *body;
987
988                         body = req_capsule_server_get(&request->rq_pill,
989                                                       &RMF_MDT_BODY);
990                         /* mdc_enqueue checked */
991                         LASSERT(body != NULL);
992                         LASSERTF(fid_res_name_eq(&body->mbo_fid1,
993                                                  &lock->l_resource->lr_name),
994                                  "Lock res_id: "DLDLMRES", fid: "DFID"\n",
995                                  PLDLMRES(lock->l_resource),
996                                  PFID(&body->mbo_fid1));
997                 }
998                 LDLM_LOCK_PUT(lock);
999
1000                 memcpy(&old_lock, lockh, sizeof(*lockh));
1001                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1002                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1003                         ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
1004                         memcpy(lockh, &old_lock, sizeof(old_lock));
1005                         it->it_lock_handle = lockh->cookie;
1006                 }
1007         }
1008
1009         EXIT;
1010 out:
1011         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1012                 (int)op_data->op_namelen, op_data->op_name,
1013                 ldlm_it2str(it->it_op), it->it_status,
1014                 it->it_disposition, rc);
1015         return rc;
1016 }
1017
1018 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1019                         struct lu_fid *fid, __u64 *bits)
1020 {
1021         /* We could just return 1 immediately, but since we should only
1022          * be called in revalidate_it if we already have a lock, let's
1023          * verify that. */
1024         struct ldlm_res_id res_id;
1025         struct lustre_handle lockh;
1026         union ldlm_policy_data policy;
1027         enum ldlm_mode mode;
1028         ENTRY;
1029
1030         if (it->it_lock_handle) {
1031                 lockh.cookie = it->it_lock_handle;
1032                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1033         } else {
1034                 fid_build_reg_res_name(fid, &res_id);
1035                 switch (it->it_op) {
1036                 case IT_GETATTR:
1037                         /* File attributes are held under multiple bits:
1038                          * nlink is under lookup lock, size and times are
1039                          * under UPDATE lock and recently we've also got
1040                          * a separate permissions lock for owner/group/acl that
1041                          * were protected by lookup lock before.
1042                          * Getattr must provide all of that information,
1043                          * so we need to ensure we have all of those locks.
1044                          * Unfortunately, if the bits are split across multiple
1045                          * locks, there's no easy way to match all of them here,
1046                          * so an extra RPC would be performed to fetch all
1047                          * of those bits at once for now. */
1048                         /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1049                          * but for old MDTs (< 2.4), permission is covered
1050                          * by LOOKUP lock, so it needs to match all bits here.*/
1051                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1052                                                   MDS_INODELOCK_LOOKUP |
1053                                                   MDS_INODELOCK_PERM;
1054                         break;
1055                 case IT_READDIR:
1056                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1057                         break;
1058                 case IT_LAYOUT:
1059                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1060                         break;
1061                 default:
1062                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1063                         break;
1064                 }
1065
1066                 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1067                                       LDLM_IBITS, &policy,
1068                                       LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1069                                       &lockh);
1070         }
1071
1072         if (mode) {
1073                 it->it_lock_handle = lockh.cookie;
1074                 it->it_lock_mode = mode;
1075         } else {
1076                 it->it_lock_handle = 0;
1077                 it->it_lock_mode = 0;
1078         }
1079
1080         RETURN(!!mode);
1081 }
1082
1083 /*
1084  * This long block is all about fixing up the lock and request state
1085  * so that it is correct as of the moment _before_ the operation was
1086  * applied; that way, the VFS will think that everything is normal and
1087  * call Lustre's regular VFS methods.
1088  *
1089  * If we're performing a creation, that means that unless the creation
1090  * failed with EEXIST, we should fake up a negative dentry.
1091  *
1092  * For everything else, we want to lookup to succeed.
1093  *
1094  * One additional note: if CREATE or OPEN succeeded, we add an extra
1095  * reference to the request because we need to keep it around until
1096  * ll_create/ll_open gets called.
1097  *
1098  * The server will return to us, in it_disposition, an indication of
1099  * exactly what it_status refers to.
1100  *
1101  * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1102  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1103  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1104  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1105  * was successful.
1106  *
1107  * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1108  * child lookup.
1109  */
1110 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1111                     struct lookup_intent *it, struct ptlrpc_request **reqp,
1112                     ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1113 {
1114         struct ldlm_enqueue_info einfo = {
1115                 .ei_type        = LDLM_IBITS,
1116                 .ei_mode        = it_to_lock_mode(it),
1117                 .ei_cb_bl       = cb_blocking,
1118                 .ei_cb_cp       = ldlm_completion_ast,
1119         };
1120         struct lustre_handle lockh;
1121         int rc = 0;
1122         ENTRY;
1123         LASSERT(it);
1124
1125         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1126                 ", intent: %s flags %#llo\n", (int)op_data->op_namelen,
1127                 op_data->op_name, PFID(&op_data->op_fid2),
1128                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1129                 it->it_flags);
1130
1131         lockh.cookie = 0;
1132         if (fid_is_sane(&op_data->op_fid2) &&
1133             (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1134                 /* We could just return 1 immediately, but since we should only
1135                  * be called in revalidate_it if we already have a lock, let's
1136                  * verify that. */
1137                 it->it_lock_handle = 0;
1138                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1139                 /* Only return failure if it was not GETATTR by cfid
1140                    (from inode_revalidate) */
1141                 if (rc || op_data->op_namelen != 0)
1142                         RETURN(rc);
1143         }
1144
1145         /* For case if upper layer did not alloc fid, do it now. */
1146         if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1147                 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1148                 if (rc < 0) {
1149                         CERROR("Can't alloc new fid, rc %d\n", rc);
1150                         RETURN(rc);
1151                 }
1152         }
1153
1154         rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh,
1155                               extra_lock_flags);
1156         if (rc < 0)
1157                 RETURN(rc);
1158
1159         *reqp = it->it_request;
1160         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1161         RETURN(rc);
1162 }
1163
1164 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1165                                               struct ptlrpc_request *req,
1166                                               void *args, int rc)
1167 {
1168         struct mdc_getattr_args  *ga = args;
1169         struct obd_export        *exp = ga->ga_exp;
1170         struct md_enqueue_info   *minfo = ga->ga_minfo;
1171         struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1172         struct lookup_intent     *it;
1173         struct lustre_handle     *lockh;
1174         struct obd_device        *obddev;
1175         struct ldlm_reply        *lockrep;
1176         __u64                     flags = LDLM_FL_HAS_INTENT;
1177         ENTRY;
1178
1179         it    = &minfo->mi_it;
1180         lockh = &minfo->mi_lockh;
1181
1182         obddev = class_exp2obd(exp);
1183
1184         obd_put_request_slot(&obddev->u.cli);
1185         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1186                 rc = -ETIMEDOUT;
1187
1188         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1189                                    &flags, NULL, 0, lockh, rc);
1190         if (rc < 0) {
1191                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1192                 mdc_clear_replay_flag(req, rc);
1193                 GOTO(out, rc);
1194         }
1195
1196         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1197         LASSERT(lockrep != NULL);
1198
1199         lockrep->lock_policy_res2 =
1200                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1201
1202         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1203         if (rc)
1204                 GOTO(out, rc);
1205
1206         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1207         EXIT;
1208
1209 out:
1210         minfo->mi_cb(req, minfo, rc);
1211         return 0;
1212 }
1213
1214 int mdc_intent_getattr_async(struct obd_export *exp,
1215                              struct md_enqueue_info *minfo)
1216 {
1217         struct md_op_data       *op_data = &minfo->mi_data;
1218         struct lookup_intent    *it = &minfo->mi_it;
1219         struct ptlrpc_request   *req;
1220         struct mdc_getattr_args *ga;
1221         struct obd_device       *obddev = class_exp2obd(exp);
1222         struct ldlm_res_id       res_id;
1223         union ldlm_policy_data policy = {
1224                                 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1225                                                  MDS_INODELOCK_UPDATE } };
1226         int                      rc = 0;
1227         __u64                    flags = LDLM_FL_HAS_INTENT;
1228         ENTRY;
1229
1230         CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#llo\n",
1231                 (int)op_data->op_namelen, op_data->op_name,
1232                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1233
1234         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1235         req = mdc_intent_getattr_pack(exp, it, op_data);
1236         if (IS_ERR(req))
1237                 RETURN(PTR_ERR(req));
1238
1239         rc = obd_get_request_slot(&obddev->u.cli);
1240         if (rc != 0) {
1241                 ptlrpc_req_finished(req);
1242                 RETURN(rc);
1243         }
1244
1245         rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1246                               &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1247         if (rc < 0) {
1248                 obd_put_request_slot(&obddev->u.cli);
1249                 ptlrpc_req_finished(req);
1250                 RETURN(rc);
1251         }
1252
1253         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1254         ga = ptlrpc_req_async_args(req);
1255         ga->ga_exp = exp;
1256         ga->ga_minfo = minfo;
1257
1258         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1259         ptlrpcd_add_req(req);
1260
1261         RETURN(0);
1262 }