Whamcloud - gitweb
aaa677133d4b1aaf7f2a94c08739d86df07cf6b2
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_MDC
38
39 #include <linux/module.h>
40 #include <obd.h>
41 #include <obd_class.h>
42 #include <lustre_dlm.h>
43 #include <lustre_fid.h> /* fid_res_name_eq() */
44 #include <lustre_intent.h>
45 #include <lustre_mdc.h>
46 #include <lustre_net.h>
47 #include <lustre_req_layout.h>
48 #include "mdc_internal.h"
49
50 struct mdc_getattr_args {
51         struct obd_export               *ga_exp;
52         struct md_enqueue_info          *ga_minfo;
53 };
54
55 int it_open_error(int phase, struct lookup_intent *it)
56 {
57         if (it_disposition(it, DISP_OPEN_LEASE)) {
58                 if (phase >= DISP_OPEN_LEASE)
59                         return it->d.lustre.it_status;
60                 else
61                         return 0;
62         }
63         if (it_disposition(it, DISP_OPEN_OPEN)) {
64                 if (phase >= DISP_OPEN_OPEN)
65                         return it->d.lustre.it_status;
66                 else
67                         return 0;
68         }
69
70         if (it_disposition(it, DISP_OPEN_CREATE)) {
71                 if (phase >= DISP_OPEN_CREATE)
72                         return it->d.lustre.it_status;
73                 else
74                         return 0;
75         }
76
77         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
78                 if (phase >= DISP_LOOKUP_EXECD)
79                         return it->d.lustre.it_status;
80                 else
81                         return 0;
82         }
83
84         if (it_disposition(it, DISP_IT_EXECD)) {
85                 if (phase >= DISP_IT_EXECD)
86                         return it->d.lustre.it_status;
87                 else
88                         return 0;
89         }
90         CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
91                it->d.lustre.it_status);
92         LBUG();
93         return 0;
94 }
95 EXPORT_SYMBOL(it_open_error);
96
97 /* this must be called on a lockh that is known to have a referenced lock */
98 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
99                       __u64 *bits)
100 {
101         struct ldlm_lock *lock;
102         struct inode *new_inode = data;
103         ENTRY;
104
105         if(bits)
106                 *bits = 0;
107
108         if (!*lockh)
109                 RETURN(0);
110
111         lock = ldlm_handle2lock((struct lustre_handle *)lockh);
112
113         LASSERT(lock != NULL);
114         lock_res_and_lock(lock);
115         if (lock->l_resource->lr_lvb_inode &&
116             lock->l_resource->lr_lvb_inode != data) {
117                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
118                 LASSERTF(old_inode->i_state & I_FREEING,
119                          "Found existing inode %p/%lu/%u state %lu in lock: "
120                          "setting data to %p/%lu/%u\n", old_inode,
121                          old_inode->i_ino, old_inode->i_generation,
122                          old_inode->i_state,
123                          new_inode, new_inode->i_ino, new_inode->i_generation);
124         }
125         lock->l_resource->lr_lvb_inode = new_inode;
126         if (bits)
127                 *bits = lock->l_policy_data.l_inodebits.bits;
128
129         unlock_res_and_lock(lock);
130         LDLM_LOCK_PUT(lock);
131
132         RETURN(0);
133 }
134
135 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
136                               const struct lu_fid *fid, enum ldlm_type type,
137                               union ldlm_policy_data *policy,
138                               enum ldlm_mode mode, struct lustre_handle *lockh)
139 {
140         struct ldlm_res_id res_id;
141         enum ldlm_mode rc;
142         ENTRY;
143
144         fid_build_reg_res_name(fid, &res_id);
145         /* LU-4405: Clear bits not supported by server */
146         policy->l_inodebits.bits &= exp_connect_ibits(exp);
147         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
148                              &res_id, type, policy, mode, lockh, 0);
149         RETURN(rc);
150 }
151
152 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
153                       union ldlm_policy_data *policy, enum ldlm_mode mode,
154                       enum ldlm_cancel_flags flags, void *opaque)
155 {
156         struct obd_device *obd = class_exp2obd(exp);
157         struct ldlm_res_id res_id;
158         int rc;
159
160         ENTRY;
161
162         fid_build_reg_res_name(fid, &res_id);
163         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
164                                              policy, mode, flags, opaque);
165         RETURN(rc);
166 }
167
168 int mdc_null_inode(struct obd_export *exp,
169                    const struct lu_fid *fid)
170 {
171         struct ldlm_res_id res_id;
172         struct ldlm_resource *res;
173         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
174         ENTRY;
175
176         LASSERTF(ns != NULL, "no namespace passed\n");
177
178         fid_build_reg_res_name(fid, &res_id);
179
180         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
181         if (IS_ERR(res))
182                 RETURN(0);
183
184         lock_res(res);
185         res->lr_lvb_inode = NULL;
186         unlock_res(res);
187
188         ldlm_resource_putref(res);
189         RETURN(0);
190 }
191
192 /* find any ldlm lock of the inode in mdc
193  * return 0    not find
194  *        1    find one
195  *      < 0    error */
196 int mdc_find_cbdata(struct obd_export *exp,
197                     const struct lu_fid *fid,
198                     ldlm_iterator_t it, void *data)
199 {
200         struct ldlm_res_id res_id;
201         int rc = 0;
202         ENTRY;
203
204         fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
205         rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
206                                    it, data);
207         if (rc == LDLM_ITER_STOP)
208                 RETURN(1);
209         else if (rc == LDLM_ITER_CONTINUE)
210                 RETURN(0);
211         RETURN(rc);
212 }
213
214 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
215 {
216         /* Don't hold error requests for replay. */
217         if (req->rq_replay) {
218                 spin_lock(&req->rq_lock);
219                 req->rq_replay = 0;
220                 spin_unlock(&req->rq_lock);
221         }
222         if (rc && req->rq_transno != 0) {
223                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
224                 LBUG();
225         }
226 }
227
228 /* Save a large LOV EA into the request buffer so that it is available
229  * for replay.  We don't do this in the initial request because the
230  * original request doesn't need this buffer (at most it sends just the
231  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
232  * buffer and may also be difficult to allocate and save a very large
233  * request buffer for each open. (bug 5707)
234  *
235  * OOM here may cause recovery failure if lmm is needed (only for the
236  * original open if the MDS crashed just when this client also OOM'd)
237  * but this is incredibly unlikely, and questionable whether the client
238  * could do MDS recovery under OOM anyways... */
239 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
240                                 struct mdt_body *body)
241 {
242         int     rc;
243
244         /* FIXME: remove this explicit offset. */
245         rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
246                                         body->mbo_eadatasize);
247         if (rc) {
248                 CERROR("Can't enlarge segment %d size to %d\n",
249                        DLM_INTENT_REC_OFF + 4, body->mbo_eadatasize);
250                 body->mbo_valid &= ~OBD_MD_FLEASIZE;
251                 body->mbo_eadatasize = 0;
252         }
253 }
254
255 static struct ptlrpc_request *
256 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
257                      struct md_op_data *op_data)
258 {
259         struct ptlrpc_request   *req;
260         struct obd_device       *obddev = class_exp2obd(exp);
261         struct ldlm_intent      *lit;
262         const void              *lmm = op_data->op_data;
263         __u32                    lmmsize = op_data->op_data_size;
264         struct list_head         cancels = LIST_HEAD_INIT(cancels);
265         int                      count = 0;
266         enum ldlm_mode           mode;
267         int                      rc;
268         ENTRY;
269
270         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
271
272         /* XXX: openlock is not cancelled for cross-refs. */
273         /* If inode is known, cancel conflicting OPEN locks. */
274         if (fid_is_sane(&op_data->op_fid2)) {
275                 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
276                         if (it->it_flags & FMODE_WRITE)
277                                 mode = LCK_EX;
278                         else
279                                 mode = LCK_PR;
280                 } else {
281                         if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
282                                 mode = LCK_CW;
283 #ifdef FMODE_EXEC
284                         else if (it->it_flags & FMODE_EXEC)
285                                 mode = LCK_PR;
286 #endif
287                         else
288                                 mode = LCK_CR;
289                 }
290                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
291                                                 &cancels, mode,
292                                                 MDS_INODELOCK_OPEN);
293         }
294
295         /* If CREATE, cancel parent's UPDATE lock. */
296         if (it->it_op & IT_CREAT)
297                 mode = LCK_EX;
298         else
299                 mode = LCK_CR;
300         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
301                                          &cancels, mode,
302                                          MDS_INODELOCK_UPDATE);
303
304         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
305                                    &RQF_LDLM_INTENT_OPEN);
306         if (req == NULL) {
307                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
308                 RETURN(ERR_PTR(-ENOMEM));
309         }
310
311         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
312                              op_data->op_namelen + 1);
313         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
314                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
315
316         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
317         if (rc < 0) {
318                 ptlrpc_request_free(req);
319                 RETURN(ERR_PTR(rc));
320         }
321
322         spin_lock(&req->rq_lock);
323         req->rq_replay = req->rq_import->imp_replayable;
324         spin_unlock(&req->rq_lock);
325
326         /* pack the intent */
327         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
328         lit->opc = (__u64)it->it_op;
329
330         /* pack the intended request */
331         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
332                       lmmsize);
333
334         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
335                              obddev->u.cli.cl_max_mds_easize);
336
337         /* for remote client, fetch remote perm for current user */
338         if (client_is_remote(exp))
339                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
340                                      sizeof(struct mdt_remote_perm));
341         ptlrpc_request_set_replen(req);
342         return req;
343 }
344
345 static struct ptlrpc_request *
346 mdc_intent_getxattr_pack(struct obd_export *exp,
347                          struct lookup_intent *it,
348                          struct md_op_data *op_data)
349 {
350         struct ptlrpc_request   *req;
351         struct ldlm_intent      *lit;
352         int                     rc, count = 0;
353         __u32                   maxdata;
354         struct list_head        cancels = LIST_HEAD_INIT(cancels);
355
356         ENTRY;
357
358         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
359                                         &RQF_LDLM_INTENT_GETXATTR);
360         if (req == NULL)
361                 RETURN(ERR_PTR(-ENOMEM));
362
363         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
364         if (rc) {
365                 ptlrpc_request_free(req);
366                 RETURN(ERR_PTR(rc));
367         }
368
369         /* pack the intent */
370         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
371         lit->opc = IT_GETXATTR;
372
373         maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
374
375         /* pack the intended request */
376         mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid, maxdata, -1,
377                       0);
378
379         req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
380                                 RCL_SERVER, maxdata);
381
382         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
383                                 RCL_SERVER, maxdata);
384
385         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
386                                 RCL_SERVER, maxdata);
387
388         ptlrpc_request_set_replen(req);
389
390         RETURN(req);
391 }
392
393 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
394                                                      struct lookup_intent *it,
395                                                      struct md_op_data *op_data)
396 {
397         struct ptlrpc_request *req;
398         struct obd_device     *obddev = class_exp2obd(exp);
399         struct ldlm_intent    *lit;
400         int                    rc;
401         ENTRY;
402
403         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
404                                    &RQF_LDLM_INTENT_UNLINK);
405         if (req == NULL)
406                 RETURN(ERR_PTR(-ENOMEM));
407
408         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
409                              op_data->op_namelen + 1);
410
411         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
412         if (rc) {
413                 ptlrpc_request_free(req);
414                 RETURN(ERR_PTR(rc));
415         }
416
417         /* pack the intent */
418         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
419         lit->opc = (__u64)it->it_op;
420
421         /* pack the intended request */
422         mdc_unlink_pack(req, op_data);
423
424         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
425                              obddev->u.cli.cl_default_mds_easize);
426         ptlrpc_request_set_replen(req);
427         RETURN(req);
428 }
429
430 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
431                                                       struct lookup_intent *it,
432                                                       struct md_op_data *op_data)
433 {
434         struct ptlrpc_request   *req;
435         struct obd_device       *obddev = class_exp2obd(exp);
436         u64                      valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
437                                          OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
438                                          OBD_MD_MEA |
439                                          (client_is_remote(exp) ?
440                                           OBD_MD_FLRMTPERM : OBD_MD_FLACL);
441         struct ldlm_intent      *lit;
442         int                      rc;
443         __u32                    easize;
444         ENTRY;
445
446         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
447                                    &RQF_LDLM_INTENT_GETATTR);
448         if (req == NULL)
449                 RETURN(ERR_PTR(-ENOMEM));
450
451         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
452                              op_data->op_namelen + 1);
453
454         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
455         if (rc) {
456                 ptlrpc_request_free(req);
457                 RETURN(ERR_PTR(rc));
458         }
459
460         /* pack the intent */
461         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
462         lit->opc = (__u64)it->it_op;
463
464         if (obddev->u.cli.cl_default_mds_easize > 0)
465                 easize = obddev->u.cli.cl_default_mds_easize;
466         else
467                 easize = obddev->u.cli.cl_max_mds_easize;
468
469         /* pack the intended request */
470         mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
471
472         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
473         if (client_is_remote(exp))
474                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
475                                      sizeof(struct mdt_remote_perm));
476         ptlrpc_request_set_replen(req);
477         RETURN(req);
478 }
479
480 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
481                                                      struct lookup_intent *it,
482                                                      struct md_op_data *unused)
483 {
484         struct obd_device     *obd = class_exp2obd(exp);
485         struct ptlrpc_request *req;
486         struct ldlm_intent    *lit;
487         struct layout_intent  *layout;
488         int rc;
489         ENTRY;
490
491         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
492                                 &RQF_LDLM_INTENT_LAYOUT);
493         if (req == NULL)
494                 RETURN(ERR_PTR(-ENOMEM));
495
496         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
497         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
498         if (rc) {
499                 ptlrpc_request_free(req);
500                 RETURN(ERR_PTR(rc));
501         }
502
503         /* pack the intent */
504         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
505         lit->opc = (__u64)it->it_op;
506
507         /* pack the layout intent request */
508         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
509         /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
510          * set for replication */
511         layout->li_opc = LAYOUT_INTENT_ACCESS;
512
513         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
514                              obd->u.cli.cl_default_mds_easize);
515         ptlrpc_request_set_replen(req);
516         RETURN(req);
517 }
518
519 static struct ptlrpc_request *
520 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
521 {
522         struct ptlrpc_request *req;
523         int rc;
524         ENTRY;
525
526         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
527         if (req == NULL)
528                 RETURN(ERR_PTR(-ENOMEM));
529
530         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
531         if (rc) {
532                 ptlrpc_request_free(req);
533                 RETURN(ERR_PTR(rc));
534         }
535
536         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
537         ptlrpc_request_set_replen(req);
538         RETURN(req);
539 }
540
541 static int mdc_finish_enqueue(struct obd_export *exp,
542                               struct ptlrpc_request *req,
543                               struct ldlm_enqueue_info *einfo,
544                               struct lookup_intent *it,
545                               struct lustre_handle *lockh,
546                               int rc)
547 {
548         struct req_capsule  *pill = &req->rq_pill;
549         struct ldlm_request *lockreq;
550         struct ldlm_reply   *lockrep;
551         struct lustre_intent_data *intent = &it->d.lustre;
552         struct ldlm_lock    *lock;
553         void                *lvb_data = NULL;
554         __u32                lvb_len = 0;
555         ENTRY;
556
557         LASSERT(rc >= 0);
558         /* Similarly, if we're going to replay this request, we don't want to
559          * actually get a lock, just perform the intent. */
560         if (req->rq_transno || req->rq_replay) {
561                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
562                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
563         }
564
565         if (rc == ELDLM_LOCK_ABORTED) {
566                 einfo->ei_mode = 0;
567                 memset(lockh, 0, sizeof(*lockh));
568                 rc = 0;
569         } else { /* rc = 0 */
570                 lock = ldlm_handle2lock(lockh);
571                 LASSERT(lock != NULL);
572
573                 /* If the server gave us back a different lock mode, we should
574                  * fix up our variables. */
575                 if (lock->l_req_mode != einfo->ei_mode) {
576                         ldlm_lock_addref(lockh, lock->l_req_mode);
577                         ldlm_lock_decref(lockh, einfo->ei_mode);
578                         einfo->ei_mode = lock->l_req_mode;
579                 }
580                 LDLM_LOCK_PUT(lock);
581         }
582
583         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
584         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
585
586         intent->it_disposition = (int)lockrep->lock_policy_res1;
587         intent->it_status = (int)lockrep->lock_policy_res2;
588         intent->it_lock_mode = einfo->ei_mode;
589         intent->it_lock_handle = lockh->cookie;
590         intent->it_data = req;
591
592         /* Technically speaking rq_transno must already be zero if
593          * it_status is in error, so the check is a bit redundant */
594         if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
595                 mdc_clear_replay_flag(req, intent->it_status);
596
597         /* If we're doing an IT_OPEN which did not result in an actual
598          * successful open, then we need to remove the bit which saves
599          * this request for unconditional replay.
600          *
601          * It's important that we do this first!  Otherwise we might exit the
602          * function without doing so, and try to replay a failed create
603          * (bug 3440) */
604         if (it->it_op & IT_OPEN && req->rq_replay &&
605             (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
606                 mdc_clear_replay_flag(req, intent->it_status);
607
608         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
609                   it->it_op, intent->it_disposition, intent->it_status);
610
611         /* We know what to expect, so we do any byte flipping required here */
612         if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
613                 struct mdt_body *body;
614
615                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
616                 if (body == NULL) {
617                         CERROR ("Can't swab mdt_body\n");
618                         RETURN (-EPROTO);
619                 }
620
621                 if (it_disposition(it, DISP_OPEN_OPEN) &&
622                     !it_open_error(DISP_OPEN_OPEN, it)) {
623                         /*
624                          * If this is a successful OPEN request, we need to set
625                          * replay handler and data early, so that if replay
626                          * happens immediately after swabbing below, new reply
627                          * is swabbed by that handler correctly.
628                          */
629                         mdc_set_open_replay_data(NULL, NULL, it);
630                 }
631
632                 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
633                         void *eadata;
634
635                         mdc_update_max_ea_from_body(exp, body);
636
637                         /*
638                          * The eadata is opaque; just check that it is there.
639                          * Eventually, obd_unpackmd() will check the contents.
640                          */
641                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
642                                                         body->mbo_eadatasize);
643                         if (eadata == NULL)
644                                 RETURN(-EPROTO);
645
646                         /* save lvb data and length in case this is for layout
647                          * lock */
648                         lvb_data = eadata;
649                         lvb_len = body->mbo_eadatasize;
650
651                         /*
652                          * We save the reply LOV EA in case we have to replay a
653                          * create for recovery.  If we didn't allocate a large
654                          * enough request buffer above we need to reallocate it
655                          * here to hold the actual LOV EA.
656                          *
657                          * To not save LOV EA if request is not going to replay
658                          * (for example error one).
659                          */
660                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
661                                 void *lmm;
662                                 if (req_capsule_get_size(pill, &RMF_EADATA,
663                                                          RCL_CLIENT) <
664                                     body->mbo_eadatasize)
665                                         mdc_realloc_openmsg(req, body);
666                                 else
667                                         req_capsule_shrink(pill, &RMF_EADATA,
668                                                            body->mbo_eadatasize,
669                                                            RCL_CLIENT);
670
671                                 req_capsule_set_size(pill, &RMF_EADATA,
672                                                      RCL_CLIENT,
673                                                      body->mbo_eadatasize);
674
675                                 lmm = req_capsule_client_get(pill, &RMF_EADATA);
676                                 if (lmm)
677                                         memcpy(lmm, eadata,
678                                                body->mbo_eadatasize);
679                         }
680                 }
681
682                 if (body->mbo_valid & OBD_MD_FLRMTPERM) {
683                         struct mdt_remote_perm *perm;
684
685                         LASSERT(client_is_remote(exp));
686                         perm = req_capsule_server_swab_get(pill, &RMF_ACL,
687                                                 lustre_swab_mdt_remote_perm);
688                         if (perm == NULL)
689                                 RETURN(-EPROTO);
690                 }
691         } else if (it->it_op & IT_LAYOUT) {
692                 /* maybe the lock was granted right away and layout
693                  * is packed into RMF_DLM_LVB of req */
694                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
695                 if (lvb_len > 0) {
696                         lvb_data = req_capsule_server_sized_get(pill,
697                                                         &RMF_DLM_LVB, lvb_len);
698                         if (lvb_data == NULL)
699                                 RETURN(-EPROTO);
700                 }
701         }
702
703         /* fill in stripe data for layout lock.
704          * LU-6581: trust layout data only if layout lock is granted. The MDT
705          * has stopped sending layout unless the layout lock is granted. The
706          * client still does this checking in case it's talking with an old
707          * server. - Jinshan */
708         lock = ldlm_handle2lock(lockh);
709         if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL &&
710             !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
711                 void *lmm;
712
713                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
714                         ldlm_it2str(it->it_op), lvb_len);
715
716                 OBD_ALLOC_LARGE(lmm, lvb_len);
717                 if (lmm == NULL) {
718                         LDLM_LOCK_PUT(lock);
719                         RETURN(-ENOMEM);
720                 }
721                 memcpy(lmm, lvb_data, lvb_len);
722
723                 /* install lvb_data */
724                 lock_res_and_lock(lock);
725                 if (lock->l_lvb_data == NULL) {
726                         lock->l_lvb_type = LVB_T_LAYOUT;
727                         lock->l_lvb_data = lmm;
728                         lock->l_lvb_len = lvb_len;
729                         lmm = NULL;
730                 }
731                 unlock_res_and_lock(lock);
732                 if (lmm != NULL)
733                         OBD_FREE_LARGE(lmm, lvb_len);
734         }
735         if (lock != NULL)
736                 LDLM_LOCK_PUT(lock);
737
738         RETURN(rc);
739 }
740
741 /* We always reserve enough space in the reply packet for a stripe MD, because
742  * we don't know in advance the file type. */
743 int mdc_enqueue(struct obd_export *exp,
744                 struct ldlm_enqueue_info *einfo,
745                 const union ldlm_policy_data *policy,
746                 struct lookup_intent *it, struct md_op_data *op_data,
747                 struct lustre_handle *lockh, __u64 extra_lock_flags)
748 {
749         struct obd_device *obddev = class_exp2obd(exp);
750         struct ptlrpc_request *req = NULL;
751         __u64 flags, saved_flags = extra_lock_flags;
752         struct ldlm_res_id res_id;
753         static const union ldlm_policy_data lookup_policy = {
754                                   .l_inodebits = { MDS_INODELOCK_LOOKUP } };
755         static const union ldlm_policy_data update_policy = {
756                                   .l_inodebits = { MDS_INODELOCK_UPDATE } };
757         static const union ldlm_policy_data layout_policy = {
758                                   .l_inodebits = { MDS_INODELOCK_LAYOUT } };
759         static const union ldlm_policy_data getxattr_policy = {
760                                   .l_inodebits = { MDS_INODELOCK_XATTR } };
761         int generation, resends = 0;
762         struct ldlm_reply *lockrep;
763         enum lvb_type lvb_type = 0;
764         int rc;
765         ENTRY;
766
767         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
768                  einfo->ei_type);
769         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
770
771         if (it != NULL) {
772                 LASSERT(policy == NULL);
773
774                 saved_flags |= LDLM_FL_HAS_INTENT;
775                 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_GETATTR | IT_READDIR))
776                         policy = &update_policy;
777                 else if (it->it_op & IT_LAYOUT)
778                         policy = &layout_policy;
779                 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
780                         policy = &getxattr_policy;
781                 else
782                         policy = &lookup_policy;
783         }
784
785         generation = obddev->u.cli.cl_import->imp_generation;
786 resend:
787         flags = saved_flags;
788         if (it == NULL) {
789                 /* The only way right now is FLOCK. */
790                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
791                          einfo->ei_type);
792                 res_id.name[3] = LDLM_FLOCK;
793         } else if (it->it_op & IT_OPEN) {
794                 req = mdc_intent_open_pack(exp, it, op_data);
795         } else if (it->it_op & IT_UNLINK) {
796                 req = mdc_intent_unlink_pack(exp, it, op_data);
797         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
798                 req = mdc_intent_getattr_pack(exp, it, op_data);
799         } else if (it->it_op & IT_READDIR) {
800                 req = mdc_enqueue_pack(exp, 0);
801         } else if (it->it_op & IT_LAYOUT) {
802                 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
803                         RETURN(-EOPNOTSUPP);
804                 req = mdc_intent_layout_pack(exp, it, op_data);
805                 lvb_type = LVB_T_LAYOUT;
806         } else if (it->it_op & IT_GETXATTR) {
807                 req = mdc_intent_getxattr_pack(exp, it, op_data);
808         } else {
809                 LBUG();
810                 RETURN(-EINVAL);
811         }
812
813         if (IS_ERR(req))
814                 RETURN(PTR_ERR(req));
815
816         if (resends) {
817                 req->rq_generation_set = 1;
818                 req->rq_import_generation = generation;
819                 req->rq_sent = cfs_time_current_sec() + resends;
820         }
821
822         /* It is important to obtain modify RPC slot first (if applicable), so
823          * that threads that are waiting for a modify RPC slot are not polluting
824          * our rpcs in flight counter.
825          * We do not do flock request limiting, though */
826         if (it) {
827                 mdc_get_mod_rpc_slot(req, it);
828                 rc = obd_get_request_slot(&obddev->u.cli);
829                 if (rc != 0) {
830                         mdc_put_mod_rpc_slot(req, it);
831                         mdc_clear_replay_flag(req, 0);
832                         ptlrpc_req_finished(req);
833                         RETURN(rc);
834                 }
835         }
836
837         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
838                               0, lvb_type, lockh, 0);
839         if (!it) {
840                 /* For flock requests we immediatelly return without further
841                    delay and let caller deal with the rest, since rest of
842                    this function metadata processing makes no sense for flock
843                    requests anyway. But in case of problem during comms with
844                    Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
845                    can not rely on caller and this mainly for F_UNLCKs
846                    (explicits or automatically generated by Kernel to clean
847                    current FLocks upon exit) that can't be trashed */
848                 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
849                     (einfo->ei_type == LDLM_FLOCK) &&
850                     (einfo->ei_mode == LCK_NL))
851                         goto resend;
852                 RETURN(rc);
853         }
854
855         obd_put_request_slot(&obddev->u.cli);
856         mdc_put_mod_rpc_slot(req, it);
857
858         if (rc < 0) {
859                 CDEBUG(D_INFO, "%s: ldlm_cli_enqueue failed: rc = %d\n",
860                        obddev->obd_name, rc);
861
862                 mdc_clear_replay_flag(req, rc);
863                 ptlrpc_req_finished(req);
864                 RETURN(rc);
865         }
866
867         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
868         LASSERT(lockrep != NULL);
869
870         lockrep->lock_policy_res2 =
871                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
872
873         /* Retry infinitely when the server returns -EINPROGRESS for the
874          * intent operation, when server returns -EINPROGRESS for acquiring
875          * intent lock, we'll retry in after_reply(). */
876         if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
877                 mdc_clear_replay_flag(req, rc);
878                 ptlrpc_req_finished(req);
879                 resends++;
880
881                 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
882                        obddev->obd_name, resends, it->it_op,
883                        PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
884
885                 if (generation == obddev->u.cli.cl_import->imp_generation) {
886                         goto resend;
887                 } else {
888                         CDEBUG(D_HA, "resend cross eviction\n");
889                         RETURN(-EIO);
890                 }
891         }
892
893         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
894         if (rc < 0) {
895                 if (lustre_handle_is_used(lockh)) {
896                         ldlm_lock_decref(lockh, einfo->ei_mode);
897                         memset(lockh, 0, sizeof(*lockh));
898                 }
899                 ptlrpc_req_finished(req);
900
901                 it->d.lustre.it_lock_handle = 0;
902                 it->d.lustre.it_lock_mode = 0;
903                 it->d.lustre.it_data = NULL;
904         }
905
906         RETURN(rc);
907 }
908
909 static int mdc_finish_intent_lock(struct obd_export *exp,
910                                   struct ptlrpc_request *request,
911                                   struct md_op_data *op_data,
912                                   struct lookup_intent *it,
913                                   struct lustre_handle *lockh)
914 {
915         struct lustre_handle old_lock;
916         struct mdt_body *mdt_body;
917         struct ldlm_lock *lock;
918         int rc;
919         ENTRY;
920
921         LASSERT(request != NULL);
922         LASSERT(request != LP_POISON);
923         LASSERT(request->rq_repmsg != LP_POISON);
924
925         if (it->it_op & IT_READDIR)
926                 RETURN(0);
927
928         if (!it_disposition(it, DISP_IT_EXECD)) {
929                 /* The server failed before it even started executing the
930                  * intent, i.e. because it couldn't unpack the request. */
931                 LASSERT(it->d.lustre.it_status != 0);
932                 RETURN(it->d.lustre.it_status);
933         }
934         rc = it_open_error(DISP_IT_EXECD, it);
935         if (rc)
936                 RETURN(rc);
937
938         mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
939         LASSERT(mdt_body != NULL);      /* mdc_enqueue checked */
940
941         rc = it_open_error(DISP_LOOKUP_EXECD, it);
942         if (rc)
943                 RETURN(rc);
944
945         /* keep requests around for the multiple phases of the call
946          * this shows the DISP_XX must guarantee we make it into the call
947          */
948         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
949             it_disposition(it, DISP_OPEN_CREATE) &&
950             !it_open_error(DISP_OPEN_CREATE, it)) {
951                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
952                 ptlrpc_request_addref(request); /* balanced in ll_create_node */
953         }
954         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
955             it_disposition(it, DISP_OPEN_OPEN) &&
956             !it_open_error(DISP_OPEN_OPEN, it)) {
957                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
958                 ptlrpc_request_addref(request); /* balanced in ll_file_open */
959                 /* BUG 11546 - eviction in the middle of open rpc processing */
960                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
961         }
962
963         if (it->it_op & IT_CREAT) {
964                 /* XXX this belongs in ll_create_it */
965         } else if (it->it_op == IT_OPEN) {
966                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
967         } else {
968                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
969         }
970
971         /* If we already have a matching lock, then cancel the new
972          * one.  We have to set the data here instead of in
973          * mdc_enqueue, because we need to use the child's inode as
974          * the l_ast_data to match, and that's not available until
975          * intent_finish has performed the iget().) */
976         lock = ldlm_handle2lock(lockh);
977         if (lock) {
978                 union ldlm_policy_data policy = lock->l_policy_data;
979                 LDLM_DEBUG(lock, "matching against this");
980
981                 LASSERTF(fid_res_name_eq(&mdt_body->mbo_fid1,
982                                          &lock->l_resource->lr_name),
983                          "Lock res_id: "DLDLMRES", fid: "DFID"\n",
984                          PLDLMRES(lock->l_resource), PFID(&mdt_body->mbo_fid1));
985                 LDLM_LOCK_PUT(lock);
986
987                 memcpy(&old_lock, lockh, sizeof(*lockh));
988                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
989                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
990                         ldlm_lock_decref_and_cancel(lockh,
991                                                     it->d.lustre.it_lock_mode);
992                         memcpy(lockh, &old_lock, sizeof(old_lock));
993                         it->d.lustre.it_lock_handle = lockh->cookie;
994                 }
995         }
996         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
997                 (int)op_data->op_namelen, op_data->op_name,
998                 ldlm_it2str(it->it_op), it->d.lustre.it_status,
999                 it->d.lustre.it_disposition, rc);
1000         RETURN(rc);
1001 }
1002
1003 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1004                         struct lu_fid *fid, __u64 *bits)
1005 {
1006         /* We could just return 1 immediately, but since we should only
1007          * be called in revalidate_it if we already have a lock, let's
1008          * verify that. */
1009         struct ldlm_res_id res_id;
1010         struct lustre_handle lockh;
1011         union ldlm_policy_data policy;
1012         enum ldlm_mode mode;
1013         ENTRY;
1014
1015         if (it->d.lustre.it_lock_handle) {
1016                 lockh.cookie = it->d.lustre.it_lock_handle;
1017                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1018         } else {
1019                 fid_build_reg_res_name(fid, &res_id);
1020                 switch (it->it_op) {
1021                 case IT_GETATTR:
1022                         /* File attributes are held under multiple bits:
1023                          * nlink is under lookup lock, size and times are
1024                          * under UPDATE lock and recently we've also got
1025                          * a separate permissions lock for owner/group/acl that
1026                          * were protected by lookup lock before.
1027                          * Getattr must provide all of that information,
1028                          * so we need to ensure we have all of those locks.
1029                          * Unfortunately, if the bits are split across multiple
1030                          * locks, there's no easy way to match all of them here,
1031                          * so an extra RPC would be performed to fetch all
1032                          * of those bits at once for now. */
1033                         /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1034                          * but for old MDTs (< 2.4), permission is covered
1035                          * by LOOKUP lock, so it needs to match all bits here.*/
1036                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1037                                                   MDS_INODELOCK_LOOKUP |
1038                                                   MDS_INODELOCK_PERM;
1039                         break;
1040                 case IT_READDIR:
1041                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1042                         break;
1043                 case IT_LAYOUT:
1044                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1045                         break;
1046                 default:
1047                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1048                         break;
1049                 }
1050
1051                 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1052                                       LDLM_IBITS, &policy,
1053                                       LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1054                                       &lockh);
1055         }
1056
1057         if (mode) {
1058                 it->d.lustre.it_lock_handle = lockh.cookie;
1059                 it->d.lustre.it_lock_mode = mode;
1060         } else {
1061                 it->d.lustre.it_lock_handle = 0;
1062                 it->d.lustre.it_lock_mode = 0;
1063         }
1064
1065         RETURN(!!mode);
1066 }
1067
1068 /*
1069  * This long block is all about fixing up the lock and request state
1070  * so that it is correct as of the moment _before_ the operation was
1071  * applied; that way, the VFS will think that everything is normal and
1072  * call Lustre's regular VFS methods.
1073  *
1074  * If we're performing a creation, that means that unless the creation
1075  * failed with EEXIST, we should fake up a negative dentry.
1076  *
1077  * For everything else, we want to lookup to succeed.
1078  *
1079  * One additional note: if CREATE or OPEN succeeded, we add an extra
1080  * reference to the request because we need to keep it around until
1081  * ll_create/ll_open gets called.
1082  *
1083  * The server will return to us, in it_disposition, an indication of
1084  * exactly what d.lustre.it_status refers to.
1085  *
1086  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1087  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1088  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1089  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1090  * was successful.
1091  *
1092  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1093  * child lookup.
1094  */
1095 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1096                     struct lookup_intent *it, struct ptlrpc_request **reqp,
1097                     ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1098 {
1099         struct ldlm_enqueue_info einfo = {
1100                 .ei_type        = LDLM_IBITS,
1101                 .ei_mode        = it_to_lock_mode(it),
1102                 .ei_cb_bl       = cb_blocking,
1103                 .ei_cb_cp       = ldlm_completion_ast,
1104         };
1105         struct lustre_handle lockh;
1106         int rc = 0;
1107         ENTRY;
1108         LASSERT(it);
1109
1110         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1111                 ", intent: %s flags %#"LPF64"o\n", (int)op_data->op_namelen,
1112                 op_data->op_name, PFID(&op_data->op_fid2),
1113                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1114                 it->it_flags);
1115
1116         lockh.cookie = 0;
1117         if (fid_is_sane(&op_data->op_fid2) &&
1118             (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1119                 /* We could just return 1 immediately, but since we should only
1120                  * be called in revalidate_it if we already have a lock, let's
1121                  * verify that. */
1122                 it->d.lustre.it_lock_handle = 0;
1123                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1124                 /* Only return failure if it was not GETATTR by cfid
1125                    (from inode_revalidate) */
1126                 if (rc || op_data->op_namelen != 0)
1127                         RETURN(rc);
1128         }
1129
1130         /* For case if upper layer did not alloc fid, do it now. */
1131         if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1132                 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1133                 if (rc < 0) {
1134                         CERROR("Can't alloc new fid, rc %d\n", rc);
1135                         RETURN(rc);
1136                 }
1137         }
1138
1139         rc = mdc_enqueue(exp, &einfo, NULL, it, op_data, &lockh,
1140                          extra_lock_flags);
1141         if (rc < 0)
1142                 RETURN(rc);
1143
1144         *reqp = it->d.lustre.it_data;
1145         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1146         RETURN(rc);
1147 }
1148
1149 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1150                                               struct ptlrpc_request *req,
1151                                               void *args, int rc)
1152 {
1153         struct mdc_getattr_args  *ga = args;
1154         struct obd_export        *exp = ga->ga_exp;
1155         struct md_enqueue_info   *minfo = ga->ga_minfo;
1156         struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1157         struct lookup_intent     *it;
1158         struct lustre_handle     *lockh;
1159         struct obd_device        *obddev;
1160         struct ldlm_reply        *lockrep;
1161         __u64                     flags = LDLM_FL_HAS_INTENT;
1162         ENTRY;
1163
1164         it    = &minfo->mi_it;
1165         lockh = &minfo->mi_lockh;
1166
1167         obddev = class_exp2obd(exp);
1168
1169         obd_put_request_slot(&obddev->u.cli);
1170         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1171                 rc = -ETIMEDOUT;
1172
1173         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1174                                    &flags, NULL, 0, lockh, rc);
1175         if (rc < 0) {
1176                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1177                 mdc_clear_replay_flag(req, rc);
1178                 GOTO(out, rc);
1179         }
1180
1181         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1182         LASSERT(lockrep != NULL);
1183
1184         lockrep->lock_policy_res2 =
1185                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1186
1187         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1188         if (rc)
1189                 GOTO(out, rc);
1190
1191         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1192         EXIT;
1193
1194 out:
1195         minfo->mi_cb(req, minfo, rc);
1196         return 0;
1197 }
1198
1199 int mdc_intent_getattr_async(struct obd_export *exp,
1200                              struct md_enqueue_info *minfo)
1201 {
1202         struct md_op_data       *op_data = &minfo->mi_data;
1203         struct lookup_intent    *it = &minfo->mi_it;
1204         struct ptlrpc_request   *req;
1205         struct mdc_getattr_args *ga;
1206         struct obd_device       *obddev = class_exp2obd(exp);
1207         struct ldlm_res_id       res_id;
1208         union ldlm_policy_data policy = {
1209                                 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1210                                                  MDS_INODELOCK_UPDATE } };
1211         int                      rc = 0;
1212         __u64                    flags = LDLM_FL_HAS_INTENT;
1213         ENTRY;
1214
1215         CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#"
1216                 LPF64"o\n",
1217                 (int)op_data->op_namelen, op_data->op_name,
1218                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1219
1220         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1221         req = mdc_intent_getattr_pack(exp, it, op_data);
1222         if (IS_ERR(req))
1223                 RETURN(PTR_ERR(req));
1224
1225         rc = obd_get_request_slot(&obddev->u.cli);
1226         if (rc != 0) {
1227                 ptlrpc_req_finished(req);
1228                 RETURN(rc);
1229         }
1230
1231         rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1232                               &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1233         if (rc < 0) {
1234                 obd_put_request_slot(&obddev->u.cli);
1235                 ptlrpc_req_finished(req);
1236                 RETURN(rc);
1237         }
1238
1239         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1240         ga = ptlrpc_req_async_args(req);
1241         ga->ga_exp = exp;
1242         ga->ga_minfo = minfo;
1243
1244         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1245         ptlrpcd_add_req(req);
1246
1247         RETURN(0);
1248 }