Whamcloud - gitweb
LU-6401 headers: move swab functions to new header files
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2015, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_MDC
38
39 #include <linux/module.h>
40
41 #include <obd.h>
42 #include <obd_class.h>
43 #include <lustre_dlm.h>
44 #include <lustre_fid.h>
45 #include <lustre_intent.h>
46 #include <lustre_mdc.h>
47 #include <lustre_net.h>
48 #include <lustre_req_layout.h>
49 #include <lustre_swab.h>
50
51 #include "mdc_internal.h"
52
53 struct mdc_getattr_args {
54         struct obd_export               *ga_exp;
55         struct md_enqueue_info          *ga_minfo;
56 };
57
58 int it_open_error(int phase, struct lookup_intent *it)
59 {
60         if (it_disposition(it, DISP_OPEN_LEASE)) {
61                 if (phase >= DISP_OPEN_LEASE)
62                         return it->d.lustre.it_status;
63                 else
64                         return 0;
65         }
66         if (it_disposition(it, DISP_OPEN_OPEN)) {
67                 if (phase >= DISP_OPEN_OPEN)
68                         return it->d.lustre.it_status;
69                 else
70                         return 0;
71         }
72
73         if (it_disposition(it, DISP_OPEN_CREATE)) {
74                 if (phase >= DISP_OPEN_CREATE)
75                         return it->d.lustre.it_status;
76                 else
77                         return 0;
78         }
79
80         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
81                 if (phase >= DISP_LOOKUP_EXECD)
82                         return it->d.lustre.it_status;
83                 else
84                         return 0;
85         }
86
87         if (it_disposition(it, DISP_IT_EXECD)) {
88                 if (phase >= DISP_IT_EXECD)
89                         return it->d.lustre.it_status;
90                 else
91                         return 0;
92         }
93         CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
94                it->d.lustre.it_status);
95         LBUG();
96         return 0;
97 }
98 EXPORT_SYMBOL(it_open_error);
99
100 /* this must be called on a lockh that is known to have a referenced lock */
101 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
102                       __u64 *bits)
103 {
104         struct ldlm_lock *lock;
105         struct inode *new_inode = data;
106         ENTRY;
107
108         if(bits)
109                 *bits = 0;
110
111         if (!*lockh)
112                 RETURN(0);
113
114         lock = ldlm_handle2lock((struct lustre_handle *)lockh);
115
116         LASSERT(lock != NULL);
117         lock_res_and_lock(lock);
118         if (lock->l_resource->lr_lvb_inode &&
119             lock->l_resource->lr_lvb_inode != data) {
120                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
121                 LASSERTF(old_inode->i_state & I_FREEING,
122                          "Found existing inode %p/%lu/%u state %lu in lock: "
123                          "setting data to %p/%lu/%u\n", old_inode,
124                          old_inode->i_ino, old_inode->i_generation,
125                          old_inode->i_state,
126                          new_inode, new_inode->i_ino, new_inode->i_generation);
127         }
128         lock->l_resource->lr_lvb_inode = new_inode;
129         if (bits)
130                 *bits = lock->l_policy_data.l_inodebits.bits;
131
132         unlock_res_and_lock(lock);
133         LDLM_LOCK_PUT(lock);
134
135         RETURN(0);
136 }
137
138 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
139                               const struct lu_fid *fid, enum ldlm_type type,
140                               union ldlm_policy_data *policy,
141                               enum ldlm_mode mode, struct lustre_handle *lockh)
142 {
143         struct ldlm_res_id res_id;
144         enum ldlm_mode rc;
145         ENTRY;
146
147         fid_build_reg_res_name(fid, &res_id);
148         /* LU-4405: Clear bits not supported by server */
149         policy->l_inodebits.bits &= exp_connect_ibits(exp);
150         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
151                              &res_id, type, policy, mode, lockh, 0);
152         RETURN(rc);
153 }
154
155 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
156                       union ldlm_policy_data *policy, enum ldlm_mode mode,
157                       enum ldlm_cancel_flags flags, void *opaque)
158 {
159         struct obd_device *obd = class_exp2obd(exp);
160         struct ldlm_res_id res_id;
161         int rc;
162
163         ENTRY;
164
165         fid_build_reg_res_name(fid, &res_id);
166         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
167                                              policy, mode, flags, opaque);
168         RETURN(rc);
169 }
170
171 int mdc_null_inode(struct obd_export *exp,
172                    const struct lu_fid *fid)
173 {
174         struct ldlm_res_id res_id;
175         struct ldlm_resource *res;
176         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
177         ENTRY;
178
179         LASSERTF(ns != NULL, "no namespace passed\n");
180
181         fid_build_reg_res_name(fid, &res_id);
182
183         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
184         if (IS_ERR(res))
185                 RETURN(0);
186
187         lock_res(res);
188         res->lr_lvb_inode = NULL;
189         unlock_res(res);
190
191         ldlm_resource_putref(res);
192         RETURN(0);
193 }
194
195 /* find any ldlm lock of the inode in mdc
196  * return 0    not find
197  *        1    find one
198  *      < 0    error */
199 int mdc_find_cbdata(struct obd_export *exp,
200                     const struct lu_fid *fid,
201                     ldlm_iterator_t it, void *data)
202 {
203         struct ldlm_res_id res_id;
204         int rc = 0;
205         ENTRY;
206
207         fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
208         rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
209                                    it, data);
210         if (rc == LDLM_ITER_STOP)
211                 RETURN(1);
212         else if (rc == LDLM_ITER_CONTINUE)
213                 RETURN(0);
214         RETURN(rc);
215 }
216
217 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
218 {
219         /* Don't hold error requests for replay. */
220         if (req->rq_replay) {
221                 spin_lock(&req->rq_lock);
222                 req->rq_replay = 0;
223                 spin_unlock(&req->rq_lock);
224         }
225         if (rc && req->rq_transno != 0) {
226                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
227                 LBUG();
228         }
229 }
230
231 /* Save a large LOV EA into the request buffer so that it is available
232  * for replay.  We don't do this in the initial request because the
233  * original request doesn't need this buffer (at most it sends just the
234  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
235  * buffer and may also be difficult to allocate and save a very large
236  * request buffer for each open. (bug 5707)
237  *
238  * OOM here may cause recovery failure if lmm is needed (only for the
239  * original open if the MDS crashed just when this client also OOM'd)
240  * but this is incredibly unlikely, and questionable whether the client
241  * could do MDS recovery under OOM anyways... */
242 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
243                                 struct mdt_body *body)
244 {
245         int     rc;
246
247         /* FIXME: remove this explicit offset. */
248         rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
249                                         body->mbo_eadatasize);
250         if (rc) {
251                 CERROR("Can't enlarge segment %d size to %d\n",
252                        DLM_INTENT_REC_OFF + 4, body->mbo_eadatasize);
253                 body->mbo_valid &= ~OBD_MD_FLEASIZE;
254                 body->mbo_eadatasize = 0;
255         }
256 }
257
258 static struct ptlrpc_request *
259 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
260                      struct md_op_data *op_data)
261 {
262         struct ptlrpc_request   *req;
263         struct obd_device       *obddev = class_exp2obd(exp);
264         struct ldlm_intent      *lit;
265         const void              *lmm = op_data->op_data;
266         __u32                    lmmsize = op_data->op_data_size;
267         struct list_head         cancels = LIST_HEAD_INIT(cancels);
268         int                      count = 0;
269         enum ldlm_mode           mode;
270         int                      rc;
271         ENTRY;
272
273         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
274
275         /* XXX: openlock is not cancelled for cross-refs. */
276         /* If inode is known, cancel conflicting OPEN locks. */
277         if (fid_is_sane(&op_data->op_fid2)) {
278                 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
279                         if (it->it_flags & FMODE_WRITE)
280                                 mode = LCK_EX;
281                         else
282                                 mode = LCK_PR;
283                 } else {
284                         if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
285                                 mode = LCK_CW;
286 #ifdef FMODE_EXEC
287                         else if (it->it_flags & FMODE_EXEC)
288                                 mode = LCK_PR;
289 #endif
290                         else
291                                 mode = LCK_CR;
292                 }
293                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
294                                                 &cancels, mode,
295                                                 MDS_INODELOCK_OPEN);
296         }
297
298         /* If CREATE, cancel parent's UPDATE lock. */
299         if (it->it_op & IT_CREAT)
300                 mode = LCK_EX;
301         else
302                 mode = LCK_CR;
303         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
304                                          &cancels, mode,
305                                          MDS_INODELOCK_UPDATE);
306
307         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
308                                    &RQF_LDLM_INTENT_OPEN);
309         if (req == NULL) {
310                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
311                 RETURN(ERR_PTR(-ENOMEM));
312         }
313
314         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
315                              op_data->op_namelen + 1);
316         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
317                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
318
319         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
320         if (rc < 0) {
321                 ptlrpc_request_free(req);
322                 RETURN(ERR_PTR(rc));
323         }
324
325         spin_lock(&req->rq_lock);
326         req->rq_replay = req->rq_import->imp_replayable;
327         spin_unlock(&req->rq_lock);
328
329         /* pack the intent */
330         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
331         lit->opc = (__u64)it->it_op;
332
333         /* pack the intended request */
334         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
335                       lmmsize);
336
337         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
338                              obddev->u.cli.cl_max_mds_easize);
339
340         /* for remote client, fetch remote perm for current user */
341         if (client_is_remote(exp))
342                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
343                                      sizeof(struct mdt_remote_perm));
344         ptlrpc_request_set_replen(req);
345         return req;
346 }
347
348 static struct ptlrpc_request *
349 mdc_intent_getxattr_pack(struct obd_export *exp,
350                          struct lookup_intent *it,
351                          struct md_op_data *op_data)
352 {
353         struct ptlrpc_request   *req;
354         struct ldlm_intent      *lit;
355         int                     rc, count = 0;
356         __u32                   maxdata;
357         struct list_head        cancels = LIST_HEAD_INIT(cancels);
358
359         ENTRY;
360
361         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
362                                         &RQF_LDLM_INTENT_GETXATTR);
363         if (req == NULL)
364                 RETURN(ERR_PTR(-ENOMEM));
365
366         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
367         if (rc) {
368                 ptlrpc_request_free(req);
369                 RETURN(ERR_PTR(rc));
370         }
371
372         /* pack the intent */
373         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
374         lit->opc = IT_GETXATTR;
375
376         maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
377
378         /* pack the intended request */
379         mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid, maxdata, -1,
380                       0);
381
382         req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
383                                 RCL_SERVER, maxdata);
384
385         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
386                                 RCL_SERVER, maxdata);
387
388         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
389                                 RCL_SERVER, maxdata);
390
391         ptlrpc_request_set_replen(req);
392
393         RETURN(req);
394 }
395
396 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
397                                                      struct lookup_intent *it,
398                                                      struct md_op_data *op_data)
399 {
400         struct ptlrpc_request *req;
401         struct obd_device     *obddev = class_exp2obd(exp);
402         struct ldlm_intent    *lit;
403         int                    rc;
404         ENTRY;
405
406         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
407                                    &RQF_LDLM_INTENT_UNLINK);
408         if (req == NULL)
409                 RETURN(ERR_PTR(-ENOMEM));
410
411         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
412                              op_data->op_namelen + 1);
413
414         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
415         if (rc) {
416                 ptlrpc_request_free(req);
417                 RETURN(ERR_PTR(rc));
418         }
419
420         /* pack the intent */
421         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
422         lit->opc = (__u64)it->it_op;
423
424         /* pack the intended request */
425         mdc_unlink_pack(req, op_data);
426
427         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
428                              obddev->u.cli.cl_default_mds_easize);
429         ptlrpc_request_set_replen(req);
430         RETURN(req);
431 }
432
433 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
434                                                       struct lookup_intent *it,
435                                                       struct md_op_data *op_data)
436 {
437         struct ptlrpc_request   *req;
438         struct obd_device       *obddev = class_exp2obd(exp);
439         u64                      valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
440                                          OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
441                                          OBD_MD_MEA |
442                                          (client_is_remote(exp) ?
443                                           OBD_MD_FLRMTPERM : OBD_MD_FLACL);
444         struct ldlm_intent      *lit;
445         int                      rc;
446         __u32                    easize;
447         ENTRY;
448
449         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
450                                    &RQF_LDLM_INTENT_GETATTR);
451         if (req == NULL)
452                 RETURN(ERR_PTR(-ENOMEM));
453
454         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
455                              op_data->op_namelen + 1);
456
457         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
458         if (rc) {
459                 ptlrpc_request_free(req);
460                 RETURN(ERR_PTR(rc));
461         }
462
463         /* pack the intent */
464         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
465         lit->opc = (__u64)it->it_op;
466
467         if (obddev->u.cli.cl_default_mds_easize > 0)
468                 easize = obddev->u.cli.cl_default_mds_easize;
469         else
470                 easize = obddev->u.cli.cl_max_mds_easize;
471
472         /* pack the intended request */
473         mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
474
475         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
476         if (client_is_remote(exp))
477                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
478                                      sizeof(struct mdt_remote_perm));
479         ptlrpc_request_set_replen(req);
480         RETURN(req);
481 }
482
483 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
484                                                      struct lookup_intent *it,
485                                                      struct md_op_data *unused)
486 {
487         struct obd_device     *obd = class_exp2obd(exp);
488         struct ptlrpc_request *req;
489         struct ldlm_intent    *lit;
490         struct layout_intent  *layout;
491         int rc;
492         ENTRY;
493
494         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
495                                 &RQF_LDLM_INTENT_LAYOUT);
496         if (req == NULL)
497                 RETURN(ERR_PTR(-ENOMEM));
498
499         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
500         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
501         if (rc) {
502                 ptlrpc_request_free(req);
503                 RETURN(ERR_PTR(rc));
504         }
505
506         /* pack the intent */
507         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
508         lit->opc = (__u64)it->it_op;
509
510         /* pack the layout intent request */
511         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
512         /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
513          * set for replication */
514         layout->li_opc = LAYOUT_INTENT_ACCESS;
515
516         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
517                              obd->u.cli.cl_default_mds_easize);
518         ptlrpc_request_set_replen(req);
519         RETURN(req);
520 }
521
522 static struct ptlrpc_request *
523 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
524 {
525         struct ptlrpc_request *req;
526         int rc;
527         ENTRY;
528
529         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
530         if (req == NULL)
531                 RETURN(ERR_PTR(-ENOMEM));
532
533         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
534         if (rc) {
535                 ptlrpc_request_free(req);
536                 RETURN(ERR_PTR(rc));
537         }
538
539         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
540         ptlrpc_request_set_replen(req);
541         RETURN(req);
542 }
543
544 static int mdc_finish_enqueue(struct obd_export *exp,
545                               struct ptlrpc_request *req,
546                               struct ldlm_enqueue_info *einfo,
547                               struct lookup_intent *it,
548                               struct lustre_handle *lockh,
549                               int rc)
550 {
551         struct req_capsule  *pill = &req->rq_pill;
552         struct ldlm_request *lockreq;
553         struct ldlm_reply   *lockrep;
554         struct lustre_intent_data *intent = &it->d.lustre;
555         struct ldlm_lock    *lock;
556         void                *lvb_data = NULL;
557         __u32                lvb_len = 0;
558         ENTRY;
559
560         LASSERT(rc >= 0);
561         /* Similarly, if we're going to replay this request, we don't want to
562          * actually get a lock, just perform the intent. */
563         if (req->rq_transno || req->rq_replay) {
564                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
565                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
566         }
567
568         if (rc == ELDLM_LOCK_ABORTED) {
569                 einfo->ei_mode = 0;
570                 memset(lockh, 0, sizeof(*lockh));
571                 rc = 0;
572         } else { /* rc = 0 */
573                 lock = ldlm_handle2lock(lockh);
574                 LASSERT(lock != NULL);
575
576                 /* If the server gave us back a different lock mode, we should
577                  * fix up our variables. */
578                 if (lock->l_req_mode != einfo->ei_mode) {
579                         ldlm_lock_addref(lockh, lock->l_req_mode);
580                         ldlm_lock_decref(lockh, einfo->ei_mode);
581                         einfo->ei_mode = lock->l_req_mode;
582                 }
583                 LDLM_LOCK_PUT(lock);
584         }
585
586         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
587         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
588
589         intent->it_disposition = (int)lockrep->lock_policy_res1;
590         intent->it_status = (int)lockrep->lock_policy_res2;
591         intent->it_lock_mode = einfo->ei_mode;
592         intent->it_lock_handle = lockh->cookie;
593         intent->it_data = req;
594
595         /* Technically speaking rq_transno must already be zero if
596          * it_status is in error, so the check is a bit redundant */
597         if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
598                 mdc_clear_replay_flag(req, intent->it_status);
599
600         /* If we're doing an IT_OPEN which did not result in an actual
601          * successful open, then we need to remove the bit which saves
602          * this request for unconditional replay.
603          *
604          * It's important that we do this first!  Otherwise we might exit the
605          * function without doing so, and try to replay a failed create
606          * (bug 3440) */
607         if (it->it_op & IT_OPEN && req->rq_replay &&
608             (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
609                 mdc_clear_replay_flag(req, intent->it_status);
610
611         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
612                   it->it_op, intent->it_disposition, intent->it_status);
613
614         /* We know what to expect, so we do any byte flipping required here */
615         if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
616                 struct mdt_body *body;
617
618                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
619                 if (body == NULL) {
620                         CERROR ("Can't swab mdt_body\n");
621                         RETURN (-EPROTO);
622                 }
623
624                 if (it_disposition(it, DISP_OPEN_OPEN) &&
625                     !it_open_error(DISP_OPEN_OPEN, it)) {
626                         /*
627                          * If this is a successful OPEN request, we need to set
628                          * replay handler and data early, so that if replay
629                          * happens immediately after swabbing below, new reply
630                          * is swabbed by that handler correctly.
631                          */
632                         mdc_set_open_replay_data(NULL, NULL, it);
633                 }
634
635                 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
636                         void *eadata;
637
638                         mdc_update_max_ea_from_body(exp, body);
639
640                         /*
641                          * The eadata is opaque; just check that it is there.
642                          * Eventually, obd_unpackmd() will check the contents.
643                          */
644                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
645                                                         body->mbo_eadatasize);
646                         if (eadata == NULL)
647                                 RETURN(-EPROTO);
648
649                         /* save lvb data and length in case this is for layout
650                          * lock */
651                         lvb_data = eadata;
652                         lvb_len = body->mbo_eadatasize;
653
654                         /*
655                          * We save the reply LOV EA in case we have to replay a
656                          * create for recovery.  If we didn't allocate a large
657                          * enough request buffer above we need to reallocate it
658                          * here to hold the actual LOV EA.
659                          *
660                          * To not save LOV EA if request is not going to replay
661                          * (for example error one).
662                          */
663                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
664                                 void *lmm;
665                                 if (req_capsule_get_size(pill, &RMF_EADATA,
666                                                          RCL_CLIENT) <
667                                     body->mbo_eadatasize)
668                                         mdc_realloc_openmsg(req, body);
669                                 else
670                                         req_capsule_shrink(pill, &RMF_EADATA,
671                                                            body->mbo_eadatasize,
672                                                            RCL_CLIENT);
673
674                                 req_capsule_set_size(pill, &RMF_EADATA,
675                                                      RCL_CLIENT,
676                                                      body->mbo_eadatasize);
677
678                                 lmm = req_capsule_client_get(pill, &RMF_EADATA);
679                                 if (lmm)
680                                         memcpy(lmm, eadata,
681                                                body->mbo_eadatasize);
682                         }
683                 }
684
685                 if (body->mbo_valid & OBD_MD_FLRMTPERM) {
686                         struct mdt_remote_perm *perm;
687
688                         LASSERT(client_is_remote(exp));
689                         perm = req_capsule_server_swab_get(pill, &RMF_ACL,
690                                                 lustre_swab_mdt_remote_perm);
691                         if (perm == NULL)
692                                 RETURN(-EPROTO);
693                 }
694         } else if (it->it_op & IT_LAYOUT) {
695                 /* maybe the lock was granted right away and layout
696                  * is packed into RMF_DLM_LVB of req */
697                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
698                 if (lvb_len > 0) {
699                         lvb_data = req_capsule_server_sized_get(pill,
700                                                         &RMF_DLM_LVB, lvb_len);
701                         if (lvb_data == NULL)
702                                 RETURN(-EPROTO);
703                 }
704         }
705
706         /* fill in stripe data for layout lock.
707          * LU-6581: trust layout data only if layout lock is granted. The MDT
708          * has stopped sending layout unless the layout lock is granted. The
709          * client still does this checking in case it's talking with an old
710          * server. - Jinshan */
711         lock = ldlm_handle2lock(lockh);
712         if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL &&
713             !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
714                 void *lmm;
715
716                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
717                         ldlm_it2str(it->it_op), lvb_len);
718
719                 OBD_ALLOC_LARGE(lmm, lvb_len);
720                 if (lmm == NULL) {
721                         LDLM_LOCK_PUT(lock);
722                         RETURN(-ENOMEM);
723                 }
724                 memcpy(lmm, lvb_data, lvb_len);
725
726                 /* install lvb_data */
727                 lock_res_and_lock(lock);
728                 if (lock->l_lvb_data == NULL) {
729                         lock->l_lvb_type = LVB_T_LAYOUT;
730                         lock->l_lvb_data = lmm;
731                         lock->l_lvb_len = lvb_len;
732                         lmm = NULL;
733                 }
734                 unlock_res_and_lock(lock);
735                 if (lmm != NULL)
736                         OBD_FREE_LARGE(lmm, lvb_len);
737         }
738         if (lock != NULL)
739                 LDLM_LOCK_PUT(lock);
740
741         RETURN(rc);
742 }
743
744 /* We always reserve enough space in the reply packet for a stripe MD, because
745  * we don't know in advance the file type. */
746 int mdc_enqueue(struct obd_export *exp,
747                 struct ldlm_enqueue_info *einfo,
748                 const union ldlm_policy_data *policy,
749                 struct lookup_intent *it, struct md_op_data *op_data,
750                 struct lustre_handle *lockh, __u64 extra_lock_flags)
751 {
752         struct obd_device *obddev = class_exp2obd(exp);
753         struct ptlrpc_request *req = NULL;
754         __u64 flags, saved_flags = extra_lock_flags;
755         struct ldlm_res_id res_id;
756         static const union ldlm_policy_data lookup_policy = {
757                                   .l_inodebits = { MDS_INODELOCK_LOOKUP } };
758         static const union ldlm_policy_data update_policy = {
759                                   .l_inodebits = { MDS_INODELOCK_UPDATE } };
760         static const union ldlm_policy_data layout_policy = {
761                                   .l_inodebits = { MDS_INODELOCK_LAYOUT } };
762         static const union ldlm_policy_data getxattr_policy = {
763                                   .l_inodebits = { MDS_INODELOCK_XATTR } };
764         int generation, resends = 0;
765         struct ldlm_reply *lockrep;
766         enum lvb_type lvb_type = 0;
767         int rc;
768         ENTRY;
769
770         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
771                  einfo->ei_type);
772         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
773
774         if (it != NULL) {
775                 LASSERT(policy == NULL);
776
777                 saved_flags |= LDLM_FL_HAS_INTENT;
778                 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_GETATTR | IT_READDIR))
779                         policy = &update_policy;
780                 else if (it->it_op & IT_LAYOUT)
781                         policy = &layout_policy;
782                 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
783                         policy = &getxattr_policy;
784                 else
785                         policy = &lookup_policy;
786         }
787
788         generation = obddev->u.cli.cl_import->imp_generation;
789 resend:
790         flags = saved_flags;
791         if (it == NULL) {
792                 /* The only way right now is FLOCK. */
793                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
794                          einfo->ei_type);
795                 res_id.name[3] = LDLM_FLOCK;
796         } else if (it->it_op & IT_OPEN) {
797                 req = mdc_intent_open_pack(exp, it, op_data);
798         } else if (it->it_op & IT_UNLINK) {
799                 req = mdc_intent_unlink_pack(exp, it, op_data);
800         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
801                 req = mdc_intent_getattr_pack(exp, it, op_data);
802         } else if (it->it_op & IT_READDIR) {
803                 req = mdc_enqueue_pack(exp, 0);
804         } else if (it->it_op & IT_LAYOUT) {
805                 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
806                         RETURN(-EOPNOTSUPP);
807                 req = mdc_intent_layout_pack(exp, it, op_data);
808                 lvb_type = LVB_T_LAYOUT;
809         } else if (it->it_op & IT_GETXATTR) {
810                 req = mdc_intent_getxattr_pack(exp, it, op_data);
811         } else {
812                 LBUG();
813                 RETURN(-EINVAL);
814         }
815
816         if (IS_ERR(req))
817                 RETURN(PTR_ERR(req));
818
819         if (resends) {
820                 req->rq_generation_set = 1;
821                 req->rq_import_generation = generation;
822                 req->rq_sent = cfs_time_current_sec() + resends;
823         }
824
825         /* It is important to obtain modify RPC slot first (if applicable), so
826          * that threads that are waiting for a modify RPC slot are not polluting
827          * our rpcs in flight counter.
828          * We do not do flock request limiting, though */
829         if (it) {
830                 mdc_get_mod_rpc_slot(req, it);
831                 rc = obd_get_request_slot(&obddev->u.cli);
832                 if (rc != 0) {
833                         mdc_put_mod_rpc_slot(req, it);
834                         mdc_clear_replay_flag(req, 0);
835                         ptlrpc_req_finished(req);
836                         RETURN(rc);
837                 }
838         }
839
840         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
841                               0, lvb_type, lockh, 0);
842         if (!it) {
843                 /* For flock requests we immediatelly return without further
844                    delay and let caller deal with the rest, since rest of
845                    this function metadata processing makes no sense for flock
846                    requests anyway. But in case of problem during comms with
847                    Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
848                    can not rely on caller and this mainly for F_UNLCKs
849                    (explicits or automatically generated by Kernel to clean
850                    current FLocks upon exit) that can't be trashed */
851                 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
852                     (einfo->ei_type == LDLM_FLOCK) &&
853                     (einfo->ei_mode == LCK_NL))
854                         goto resend;
855                 RETURN(rc);
856         }
857
858         obd_put_request_slot(&obddev->u.cli);
859         mdc_put_mod_rpc_slot(req, it);
860
861         if (rc < 0) {
862                 CDEBUG(D_INFO, "%s: ldlm_cli_enqueue failed: rc = %d\n",
863                        obddev->obd_name, rc);
864
865                 mdc_clear_replay_flag(req, rc);
866                 ptlrpc_req_finished(req);
867                 RETURN(rc);
868         }
869
870         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
871         LASSERT(lockrep != NULL);
872
873         lockrep->lock_policy_res2 =
874                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
875
876         /* Retry infinitely when the server returns -EINPROGRESS for the
877          * intent operation, when server returns -EINPROGRESS for acquiring
878          * intent lock, we'll retry in after_reply(). */
879         if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
880                 mdc_clear_replay_flag(req, rc);
881                 ptlrpc_req_finished(req);
882                 resends++;
883
884                 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
885                        obddev->obd_name, resends, it->it_op,
886                        PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
887
888                 if (generation == obddev->u.cli.cl_import->imp_generation) {
889                         goto resend;
890                 } else {
891                         CDEBUG(D_HA, "resend cross eviction\n");
892                         RETURN(-EIO);
893                 }
894         }
895
896         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
897         if (rc < 0) {
898                 if (lustre_handle_is_used(lockh)) {
899                         ldlm_lock_decref(lockh, einfo->ei_mode);
900                         memset(lockh, 0, sizeof(*lockh));
901                 }
902                 ptlrpc_req_finished(req);
903
904                 it->d.lustre.it_lock_handle = 0;
905                 it->d.lustre.it_lock_mode = 0;
906                 it->d.lustre.it_data = NULL;
907         }
908
909         RETURN(rc);
910 }
911
912 static int mdc_finish_intent_lock(struct obd_export *exp,
913                                   struct ptlrpc_request *request,
914                                   struct md_op_data *op_data,
915                                   struct lookup_intent *it,
916                                   struct lustre_handle *lockh)
917 {
918         struct lustre_handle old_lock;
919         struct mdt_body *mdt_body;
920         struct ldlm_lock *lock;
921         int rc;
922         ENTRY;
923
924         LASSERT(request != NULL);
925         LASSERT(request != LP_POISON);
926         LASSERT(request->rq_repmsg != LP_POISON);
927
928         if (it->it_op & IT_READDIR)
929                 RETURN(0);
930
931         if (!it_disposition(it, DISP_IT_EXECD)) {
932                 /* The server failed before it even started executing the
933                  * intent, i.e. because it couldn't unpack the request. */
934                 LASSERT(it->d.lustre.it_status != 0);
935                 RETURN(it->d.lustre.it_status);
936         }
937         rc = it_open_error(DISP_IT_EXECD, it);
938         if (rc)
939                 RETURN(rc);
940
941         mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
942         LASSERT(mdt_body != NULL);      /* mdc_enqueue checked */
943
944         rc = it_open_error(DISP_LOOKUP_EXECD, it);
945         if (rc)
946                 RETURN(rc);
947
948         /* keep requests around for the multiple phases of the call
949          * this shows the DISP_XX must guarantee we make it into the call
950          */
951         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
952             it_disposition(it, DISP_OPEN_CREATE) &&
953             !it_open_error(DISP_OPEN_CREATE, it)) {
954                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
955                 ptlrpc_request_addref(request); /* balanced in ll_create_node */
956         }
957         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
958             it_disposition(it, DISP_OPEN_OPEN) &&
959             !it_open_error(DISP_OPEN_OPEN, it)) {
960                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
961                 ptlrpc_request_addref(request); /* balanced in ll_file_open */
962                 /* BUG 11546 - eviction in the middle of open rpc processing */
963                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
964         }
965
966         if (it->it_op & IT_CREAT) {
967                 /* XXX this belongs in ll_create_it */
968         } else if (it->it_op == IT_OPEN) {
969                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
970         } else {
971                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
972         }
973
974         /* If we already have a matching lock, then cancel the new
975          * one.  We have to set the data here instead of in
976          * mdc_enqueue, because we need to use the child's inode as
977          * the l_ast_data to match, and that's not available until
978          * intent_finish has performed the iget().) */
979         lock = ldlm_handle2lock(lockh);
980         if (lock) {
981                 union ldlm_policy_data policy = lock->l_policy_data;
982                 LDLM_DEBUG(lock, "matching against this");
983
984                 LASSERTF(fid_res_name_eq(&mdt_body->mbo_fid1,
985                                          &lock->l_resource->lr_name),
986                          "Lock res_id: "DLDLMRES", fid: "DFID"\n",
987                          PLDLMRES(lock->l_resource), PFID(&mdt_body->mbo_fid1));
988                 LDLM_LOCK_PUT(lock);
989
990                 memcpy(&old_lock, lockh, sizeof(*lockh));
991                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
992                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
993                         ldlm_lock_decref_and_cancel(lockh,
994                                                     it->d.lustre.it_lock_mode);
995                         memcpy(lockh, &old_lock, sizeof(old_lock));
996                         it->d.lustre.it_lock_handle = lockh->cookie;
997                 }
998         }
999         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1000                 (int)op_data->op_namelen, op_data->op_name,
1001                 ldlm_it2str(it->it_op), it->d.lustre.it_status,
1002                 it->d.lustre.it_disposition, rc);
1003         RETURN(rc);
1004 }
1005
1006 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1007                         struct lu_fid *fid, __u64 *bits)
1008 {
1009         /* We could just return 1 immediately, but since we should only
1010          * be called in revalidate_it if we already have a lock, let's
1011          * verify that. */
1012         struct ldlm_res_id res_id;
1013         struct lustre_handle lockh;
1014         union ldlm_policy_data policy;
1015         enum ldlm_mode mode;
1016         ENTRY;
1017
1018         if (it->d.lustre.it_lock_handle) {
1019                 lockh.cookie = it->d.lustre.it_lock_handle;
1020                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1021         } else {
1022                 fid_build_reg_res_name(fid, &res_id);
1023                 switch (it->it_op) {
1024                 case IT_GETATTR:
1025                         /* File attributes are held under multiple bits:
1026                          * nlink is under lookup lock, size and times are
1027                          * under UPDATE lock and recently we've also got
1028                          * a separate permissions lock for owner/group/acl that
1029                          * were protected by lookup lock before.
1030                          * Getattr must provide all of that information,
1031                          * so we need to ensure we have all of those locks.
1032                          * Unfortunately, if the bits are split across multiple
1033                          * locks, there's no easy way to match all of them here,
1034                          * so an extra RPC would be performed to fetch all
1035                          * of those bits at once for now. */
1036                         /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1037                          * but for old MDTs (< 2.4), permission is covered
1038                          * by LOOKUP lock, so it needs to match all bits here.*/
1039                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1040                                                   MDS_INODELOCK_LOOKUP |
1041                                                   MDS_INODELOCK_PERM;
1042                         break;
1043                 case IT_READDIR:
1044                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1045                         break;
1046                 case IT_LAYOUT:
1047                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1048                         break;
1049                 default:
1050                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1051                         break;
1052                 }
1053
1054                 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1055                                       LDLM_IBITS, &policy,
1056                                       LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1057                                       &lockh);
1058         }
1059
1060         if (mode) {
1061                 it->d.lustre.it_lock_handle = lockh.cookie;
1062                 it->d.lustre.it_lock_mode = mode;
1063         } else {
1064                 it->d.lustre.it_lock_handle = 0;
1065                 it->d.lustre.it_lock_mode = 0;
1066         }
1067
1068         RETURN(!!mode);
1069 }
1070
1071 /*
1072  * This long block is all about fixing up the lock and request state
1073  * so that it is correct as of the moment _before_ the operation was
1074  * applied; that way, the VFS will think that everything is normal and
1075  * call Lustre's regular VFS methods.
1076  *
1077  * If we're performing a creation, that means that unless the creation
1078  * failed with EEXIST, we should fake up a negative dentry.
1079  *
1080  * For everything else, we want to lookup to succeed.
1081  *
1082  * One additional note: if CREATE or OPEN succeeded, we add an extra
1083  * reference to the request because we need to keep it around until
1084  * ll_create/ll_open gets called.
1085  *
1086  * The server will return to us, in it_disposition, an indication of
1087  * exactly what d.lustre.it_status refers to.
1088  *
1089  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1090  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1091  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1092  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1093  * was successful.
1094  *
1095  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1096  * child lookup.
1097  */
1098 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1099                     struct lookup_intent *it, struct ptlrpc_request **reqp,
1100                     ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1101 {
1102         struct ldlm_enqueue_info einfo = {
1103                 .ei_type        = LDLM_IBITS,
1104                 .ei_mode        = it_to_lock_mode(it),
1105                 .ei_cb_bl       = cb_blocking,
1106                 .ei_cb_cp       = ldlm_completion_ast,
1107         };
1108         struct lustre_handle lockh;
1109         int rc = 0;
1110         ENTRY;
1111         LASSERT(it);
1112
1113         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1114                 ", intent: %s flags %#"LPF64"o\n", (int)op_data->op_namelen,
1115                 op_data->op_name, PFID(&op_data->op_fid2),
1116                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1117                 it->it_flags);
1118
1119         lockh.cookie = 0;
1120         if (fid_is_sane(&op_data->op_fid2) &&
1121             (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1122                 /* We could just return 1 immediately, but since we should only
1123                  * be called in revalidate_it if we already have a lock, let's
1124                  * verify that. */
1125                 it->d.lustre.it_lock_handle = 0;
1126                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1127                 /* Only return failure if it was not GETATTR by cfid
1128                    (from inode_revalidate) */
1129                 if (rc || op_data->op_namelen != 0)
1130                         RETURN(rc);
1131         }
1132
1133         /* For case if upper layer did not alloc fid, do it now. */
1134         if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1135                 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1136                 if (rc < 0) {
1137                         CERROR("Can't alloc new fid, rc %d\n", rc);
1138                         RETURN(rc);
1139                 }
1140         }
1141
1142         rc = mdc_enqueue(exp, &einfo, NULL, it, op_data, &lockh,
1143                          extra_lock_flags);
1144         if (rc < 0)
1145                 RETURN(rc);
1146
1147         *reqp = it->d.lustre.it_data;
1148         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1149         RETURN(rc);
1150 }
1151
1152 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1153                                               struct ptlrpc_request *req,
1154                                               void *args, int rc)
1155 {
1156         struct mdc_getattr_args  *ga = args;
1157         struct obd_export        *exp = ga->ga_exp;
1158         struct md_enqueue_info   *minfo = ga->ga_minfo;
1159         struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1160         struct lookup_intent     *it;
1161         struct lustre_handle     *lockh;
1162         struct obd_device        *obddev;
1163         struct ldlm_reply        *lockrep;
1164         __u64                     flags = LDLM_FL_HAS_INTENT;
1165         ENTRY;
1166
1167         it    = &minfo->mi_it;
1168         lockh = &minfo->mi_lockh;
1169
1170         obddev = class_exp2obd(exp);
1171
1172         obd_put_request_slot(&obddev->u.cli);
1173         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1174                 rc = -ETIMEDOUT;
1175
1176         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1177                                    &flags, NULL, 0, lockh, rc);
1178         if (rc < 0) {
1179                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1180                 mdc_clear_replay_flag(req, rc);
1181                 GOTO(out, rc);
1182         }
1183
1184         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1185         LASSERT(lockrep != NULL);
1186
1187         lockrep->lock_policy_res2 =
1188                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1189
1190         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1191         if (rc)
1192                 GOTO(out, rc);
1193
1194         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1195         EXIT;
1196
1197 out:
1198         minfo->mi_cb(req, minfo, rc);
1199         return 0;
1200 }
1201
1202 int mdc_intent_getattr_async(struct obd_export *exp,
1203                              struct md_enqueue_info *minfo)
1204 {
1205         struct md_op_data       *op_data = &minfo->mi_data;
1206         struct lookup_intent    *it = &minfo->mi_it;
1207         struct ptlrpc_request   *req;
1208         struct mdc_getattr_args *ga;
1209         struct obd_device       *obddev = class_exp2obd(exp);
1210         struct ldlm_res_id       res_id;
1211         union ldlm_policy_data policy = {
1212                                 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1213                                                  MDS_INODELOCK_UPDATE } };
1214         int                      rc = 0;
1215         __u64                    flags = LDLM_FL_HAS_INTENT;
1216         ENTRY;
1217
1218         CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#"
1219                 LPF64"o\n",
1220                 (int)op_data->op_namelen, op_data->op_name,
1221                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1222
1223         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1224         req = mdc_intent_getattr_pack(exp, it, op_data);
1225         if (IS_ERR(req))
1226                 RETURN(PTR_ERR(req));
1227
1228         rc = obd_get_request_slot(&obddev->u.cli);
1229         if (rc != 0) {
1230                 ptlrpc_req_finished(req);
1231                 RETURN(rc);
1232         }
1233
1234         rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1235                               &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1236         if (rc < 0) {
1237                 obd_put_request_slot(&obddev->u.cli);
1238                 ptlrpc_req_finished(req);
1239                 RETURN(rc);
1240         }
1241
1242         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1243         ga = ptlrpc_req_async_args(req);
1244         ga->ga_exp = exp;
1245         ga->ga_minfo = minfo;
1246
1247         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1248         ptlrpcd_add_req(req);
1249
1250         RETURN(0);
1251 }