Whamcloud - gitweb
LU-6142 ldlm: remove ldlm typedef usage from code
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_MDC
38
39 #include <linux/module.h>
40 #include <obd.h>
41 #include <obd_class.h>
42 #include <lustre_dlm.h>
43 #include <lustre_fid.h> /* fid_res_name_eq() */
44 #include <lustre_intent.h>
45 #include <lustre_mdc.h>
46 #include <lustre_net.h>
47 #include <lustre_req_layout.h>
48 #include "mdc_internal.h"
49
50 struct mdc_getattr_args {
51         struct obd_export           *ga_exp;
52         struct md_enqueue_info      *ga_minfo;
53         struct ldlm_enqueue_info    *ga_einfo;
54 };
55
56 int it_open_error(int phase, struct lookup_intent *it)
57 {
58         if (it_disposition(it, DISP_OPEN_LEASE)) {
59                 if (phase >= DISP_OPEN_LEASE)
60                         return it->d.lustre.it_status;
61                 else
62                         return 0;
63         }
64         if (it_disposition(it, DISP_OPEN_OPEN)) {
65                 if (phase >= DISP_OPEN_OPEN)
66                         return it->d.lustre.it_status;
67                 else
68                         return 0;
69         }
70
71         if (it_disposition(it, DISP_OPEN_CREATE)) {
72                 if (phase >= DISP_OPEN_CREATE)
73                         return it->d.lustre.it_status;
74                 else
75                         return 0;
76         }
77
78         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
79                 if (phase >= DISP_LOOKUP_EXECD)
80                         return it->d.lustre.it_status;
81                 else
82                         return 0;
83         }
84
85         if (it_disposition(it, DISP_IT_EXECD)) {
86                 if (phase >= DISP_IT_EXECD)
87                         return it->d.lustre.it_status;
88                 else
89                         return 0;
90         }
91         CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
92                it->d.lustre.it_status);
93         LBUG();
94         return 0;
95 }
96 EXPORT_SYMBOL(it_open_error);
97
98 /* this must be called on a lockh that is known to have a referenced lock */
99 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
100                       __u64 *bits)
101 {
102         struct ldlm_lock *lock;
103         struct inode *new_inode = data;
104         ENTRY;
105
106         if(bits)
107                 *bits = 0;
108
109         if (!*lockh)
110                 RETURN(0);
111
112         lock = ldlm_handle2lock((struct lustre_handle *)lockh);
113
114         LASSERT(lock != NULL);
115         lock_res_and_lock(lock);
116         if (lock->l_resource->lr_lvb_inode &&
117             lock->l_resource->lr_lvb_inode != data) {
118                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
119                 LASSERTF(old_inode->i_state & I_FREEING,
120                          "Found existing inode %p/%lu/%u state %lu in lock: "
121                          "setting data to %p/%lu/%u\n", old_inode,
122                          old_inode->i_ino, old_inode->i_generation,
123                          old_inode->i_state,
124                          new_inode, new_inode->i_ino, new_inode->i_generation);
125         }
126         lock->l_resource->lr_lvb_inode = new_inode;
127         if (bits)
128                 *bits = lock->l_policy_data.l_inodebits.bits;
129
130         unlock_res_and_lock(lock);
131         LDLM_LOCK_PUT(lock);
132
133         RETURN(0);
134 }
135
136 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
137                               const struct lu_fid *fid, enum ldlm_type type,
138                               union ldlm_policy_data *policy,
139                               enum ldlm_mode mode, struct lustre_handle *lockh)
140 {
141         struct ldlm_res_id res_id;
142         enum ldlm_mode rc;
143         ENTRY;
144
145         fid_build_reg_res_name(fid, &res_id);
146         /* LU-4405: Clear bits not supported by server */
147         policy->l_inodebits.bits &= exp_connect_ibits(exp);
148         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
149                              &res_id, type, policy, mode, lockh, 0);
150         RETURN(rc);
151 }
152
153 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
154                       union ldlm_policy_data *policy, enum ldlm_mode mode,
155                       enum ldlm_cancel_flags flags, void *opaque)
156 {
157         struct obd_device *obd = class_exp2obd(exp);
158         struct ldlm_res_id res_id;
159         int rc;
160
161         ENTRY;
162
163         fid_build_reg_res_name(fid, &res_id);
164         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
165                                              policy, mode, flags, opaque);
166         RETURN(rc);
167 }
168
169 int mdc_null_inode(struct obd_export *exp,
170                    const struct lu_fid *fid)
171 {
172         struct ldlm_res_id res_id;
173         struct ldlm_resource *res;
174         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
175         ENTRY;
176
177         LASSERTF(ns != NULL, "no namespace passed\n");
178
179         fid_build_reg_res_name(fid, &res_id);
180
181         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
182         if (IS_ERR(res))
183                 RETURN(0);
184
185         lock_res(res);
186         res->lr_lvb_inode = NULL;
187         unlock_res(res);
188
189         ldlm_resource_putref(res);
190         RETURN(0);
191 }
192
193 /* find any ldlm lock of the inode in mdc
194  * return 0    not find
195  *        1    find one
196  *      < 0    error */
197 int mdc_find_cbdata(struct obd_export *exp,
198                     const struct lu_fid *fid,
199                     ldlm_iterator_t it, void *data)
200 {
201         struct ldlm_res_id res_id;
202         int rc = 0;
203         ENTRY;
204
205         fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
206         rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
207                                    it, data);
208         if (rc == LDLM_ITER_STOP)
209                 RETURN(1);
210         else if (rc == LDLM_ITER_CONTINUE)
211                 RETURN(0);
212         RETURN(rc);
213 }
214
215 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
216 {
217         /* Don't hold error requests for replay. */
218         if (req->rq_replay) {
219                 spin_lock(&req->rq_lock);
220                 req->rq_replay = 0;
221                 spin_unlock(&req->rq_lock);
222         }
223         if (rc && req->rq_transno != 0) {
224                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
225                 LBUG();
226         }
227 }
228
229 /* Save a large LOV EA into the request buffer so that it is available
230  * for replay.  We don't do this in the initial request because the
231  * original request doesn't need this buffer (at most it sends just the
232  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
233  * buffer and may also be difficult to allocate and save a very large
234  * request buffer for each open. (bug 5707)
235  *
236  * OOM here may cause recovery failure if lmm is needed (only for the
237  * original open if the MDS crashed just when this client also OOM'd)
238  * but this is incredibly unlikely, and questionable whether the client
239  * could do MDS recovery under OOM anyways... */
240 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
241                                 struct mdt_body *body)
242 {
243         int     rc;
244
245         /* FIXME: remove this explicit offset. */
246         rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
247                                         body->mbo_eadatasize);
248         if (rc) {
249                 CERROR("Can't enlarge segment %d size to %d\n",
250                        DLM_INTENT_REC_OFF + 4, body->mbo_eadatasize);
251                 body->mbo_valid &= ~OBD_MD_FLEASIZE;
252                 body->mbo_eadatasize = 0;
253         }
254 }
255
256 static struct ptlrpc_request *
257 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
258                      struct md_op_data *op_data)
259 {
260         struct ptlrpc_request   *req;
261         struct obd_device       *obddev = class_exp2obd(exp);
262         struct ldlm_intent      *lit;
263         const void              *lmm = op_data->op_data;
264         __u32                    lmmsize = op_data->op_data_size;
265         struct list_head         cancels = LIST_HEAD_INIT(cancels);
266         int                      count = 0;
267         enum ldlm_mode           mode;
268         int                      rc;
269         ENTRY;
270
271         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
272
273         /* XXX: openlock is not cancelled for cross-refs. */
274         /* If inode is known, cancel conflicting OPEN locks. */
275         if (fid_is_sane(&op_data->op_fid2)) {
276                 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
277                         if (it->it_flags & FMODE_WRITE)
278                                 mode = LCK_EX;
279                         else
280                                 mode = LCK_PR;
281                 } else {
282                         if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
283                                 mode = LCK_CW;
284 #ifdef FMODE_EXEC
285                         else if (it->it_flags & FMODE_EXEC)
286                                 mode = LCK_PR;
287 #endif
288                         else
289                                 mode = LCK_CR;
290                 }
291                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
292                                                 &cancels, mode,
293                                                 MDS_INODELOCK_OPEN);
294         }
295
296         /* If CREATE, cancel parent's UPDATE lock. */
297         if (it->it_op & IT_CREAT)
298                 mode = LCK_EX;
299         else
300                 mode = LCK_CR;
301         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
302                                          &cancels, mode,
303                                          MDS_INODELOCK_UPDATE);
304
305         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
306                                    &RQF_LDLM_INTENT_OPEN);
307         if (req == NULL) {
308                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
309                 RETURN(ERR_PTR(-ENOMEM));
310         }
311
312         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
313                              op_data->op_namelen + 1);
314         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
315                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
316
317         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
318         if (rc < 0) {
319                 ptlrpc_request_free(req);
320                 RETURN(ERR_PTR(rc));
321         }
322
323         spin_lock(&req->rq_lock);
324         req->rq_replay = req->rq_import->imp_replayable;
325         spin_unlock(&req->rq_lock);
326
327         /* pack the intent */
328         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
329         lit->opc = (__u64)it->it_op;
330
331         /* pack the intended request */
332         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
333                       lmmsize);
334
335         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
336                              obddev->u.cli.cl_max_mds_easize);
337
338         /* for remote client, fetch remote perm for current user */
339         if (client_is_remote(exp))
340                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
341                                      sizeof(struct mdt_remote_perm));
342         ptlrpc_request_set_replen(req);
343         return req;
344 }
345
346 static struct ptlrpc_request *
347 mdc_intent_getxattr_pack(struct obd_export *exp,
348                          struct lookup_intent *it,
349                          struct md_op_data *op_data)
350 {
351         struct ptlrpc_request   *req;
352         struct ldlm_intent      *lit;
353         int                     rc, count = 0;
354         __u32                   maxdata;
355         struct list_head        cancels = LIST_HEAD_INIT(cancels);
356
357         ENTRY;
358
359         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
360                                         &RQF_LDLM_INTENT_GETXATTR);
361         if (req == NULL)
362                 RETURN(ERR_PTR(-ENOMEM));
363
364         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
365         if (rc) {
366                 ptlrpc_request_free(req);
367                 RETURN(ERR_PTR(rc));
368         }
369
370         /* pack the intent */
371         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
372         lit->opc = IT_GETXATTR;
373
374         maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
375
376         /* pack the intended request */
377         mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid, maxdata, -1,
378                       0);
379
380         req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
381                                 RCL_SERVER, maxdata);
382
383         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
384                                 RCL_SERVER, maxdata);
385
386         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
387                                 RCL_SERVER, maxdata);
388
389         ptlrpc_request_set_replen(req);
390
391         RETURN(req);
392 }
393
394 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
395                                                      struct lookup_intent *it,
396                                                      struct md_op_data *op_data)
397 {
398         struct ptlrpc_request *req;
399         struct obd_device     *obddev = class_exp2obd(exp);
400         struct ldlm_intent    *lit;
401         int                    rc;
402         ENTRY;
403
404         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
405                                    &RQF_LDLM_INTENT_UNLINK);
406         if (req == NULL)
407                 RETURN(ERR_PTR(-ENOMEM));
408
409         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
410                              op_data->op_namelen + 1);
411
412         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
413         if (rc) {
414                 ptlrpc_request_free(req);
415                 RETURN(ERR_PTR(rc));
416         }
417
418         /* pack the intent */
419         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
420         lit->opc = (__u64)it->it_op;
421
422         /* pack the intended request */
423         mdc_unlink_pack(req, op_data);
424
425         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
426                              obddev->u.cli.cl_default_mds_easize);
427         ptlrpc_request_set_replen(req);
428         RETURN(req);
429 }
430
431 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
432                                                       struct lookup_intent *it,
433                                                       struct md_op_data *op_data)
434 {
435         struct ptlrpc_request   *req;
436         struct obd_device       *obddev = class_exp2obd(exp);
437         u64                      valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
438                                          OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
439                                          OBD_MD_MEA |
440                                          (client_is_remote(exp) ?
441                                           OBD_MD_FLRMTPERM : OBD_MD_FLACL);
442         struct ldlm_intent      *lit;
443         int                      rc;
444         __u32                    easize;
445         ENTRY;
446
447         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
448                                    &RQF_LDLM_INTENT_GETATTR);
449         if (req == NULL)
450                 RETURN(ERR_PTR(-ENOMEM));
451
452         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
453                              op_data->op_namelen + 1);
454
455         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
456         if (rc) {
457                 ptlrpc_request_free(req);
458                 RETURN(ERR_PTR(rc));
459         }
460
461         /* pack the intent */
462         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
463         lit->opc = (__u64)it->it_op;
464
465         if (obddev->u.cli.cl_default_mds_easize > 0)
466                 easize = obddev->u.cli.cl_default_mds_easize;
467         else
468                 easize = obddev->u.cli.cl_max_mds_easize;
469
470         /* pack the intended request */
471         mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
472
473         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
474         if (client_is_remote(exp))
475                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
476                                      sizeof(struct mdt_remote_perm));
477         ptlrpc_request_set_replen(req);
478         RETURN(req);
479 }
480
481 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
482                                                      struct lookup_intent *it,
483                                                      struct md_op_data *unused)
484 {
485         struct obd_device     *obd = class_exp2obd(exp);
486         struct ptlrpc_request *req;
487         struct ldlm_intent    *lit;
488         struct layout_intent  *layout;
489         int rc;
490         ENTRY;
491
492         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
493                                 &RQF_LDLM_INTENT_LAYOUT);
494         if (req == NULL)
495                 RETURN(ERR_PTR(-ENOMEM));
496
497         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
498         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
499         if (rc) {
500                 ptlrpc_request_free(req);
501                 RETURN(ERR_PTR(rc));
502         }
503
504         /* pack the intent */
505         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
506         lit->opc = (__u64)it->it_op;
507
508         /* pack the layout intent request */
509         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
510         /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
511          * set for replication */
512         layout->li_opc = LAYOUT_INTENT_ACCESS;
513
514         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
515                              obd->u.cli.cl_default_mds_easize);
516         ptlrpc_request_set_replen(req);
517         RETURN(req);
518 }
519
520 static struct ptlrpc_request *
521 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
522 {
523         struct ptlrpc_request *req;
524         int rc;
525         ENTRY;
526
527         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
528         if (req == NULL)
529                 RETURN(ERR_PTR(-ENOMEM));
530
531         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
532         if (rc) {
533                 ptlrpc_request_free(req);
534                 RETURN(ERR_PTR(rc));
535         }
536
537         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
538         ptlrpc_request_set_replen(req);
539         RETURN(req);
540 }
541
542 static int mdc_finish_enqueue(struct obd_export *exp,
543                               struct ptlrpc_request *req,
544                               struct ldlm_enqueue_info *einfo,
545                               struct lookup_intent *it,
546                               struct lustre_handle *lockh,
547                               int rc)
548 {
549         struct req_capsule  *pill = &req->rq_pill;
550         struct ldlm_request *lockreq;
551         struct ldlm_reply   *lockrep;
552         struct lustre_intent_data *intent = &it->d.lustre;
553         struct ldlm_lock    *lock;
554         void                *lvb_data = NULL;
555         __u32                lvb_len = 0;
556         ENTRY;
557
558         LASSERT(rc >= 0);
559         /* Similarly, if we're going to replay this request, we don't want to
560          * actually get a lock, just perform the intent. */
561         if (req->rq_transno || req->rq_replay) {
562                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
563                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
564         }
565
566         if (rc == ELDLM_LOCK_ABORTED) {
567                 einfo->ei_mode = 0;
568                 memset(lockh, 0, sizeof(*lockh));
569                 rc = 0;
570         } else { /* rc = 0 */
571                 lock = ldlm_handle2lock(lockh);
572                 LASSERT(lock != NULL);
573
574                 /* If the server gave us back a different lock mode, we should
575                  * fix up our variables. */
576                 if (lock->l_req_mode != einfo->ei_mode) {
577                         ldlm_lock_addref(lockh, lock->l_req_mode);
578                         ldlm_lock_decref(lockh, einfo->ei_mode);
579                         einfo->ei_mode = lock->l_req_mode;
580                 }
581                 LDLM_LOCK_PUT(lock);
582         }
583
584         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
585         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
586
587         intent->it_disposition = (int)lockrep->lock_policy_res1;
588         intent->it_status = (int)lockrep->lock_policy_res2;
589         intent->it_lock_mode = einfo->ei_mode;
590         intent->it_lock_handle = lockh->cookie;
591         intent->it_data = req;
592
593         /* Technically speaking rq_transno must already be zero if
594          * it_status is in error, so the check is a bit redundant */
595         if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
596                 mdc_clear_replay_flag(req, intent->it_status);
597
598         /* If we're doing an IT_OPEN which did not result in an actual
599          * successful open, then we need to remove the bit which saves
600          * this request for unconditional replay.
601          *
602          * It's important that we do this first!  Otherwise we might exit the
603          * function without doing so, and try to replay a failed create
604          * (bug 3440) */
605         if (it->it_op & IT_OPEN && req->rq_replay &&
606             (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
607                 mdc_clear_replay_flag(req, intent->it_status);
608
609         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
610                   it->it_op, intent->it_disposition, intent->it_status);
611
612         /* We know what to expect, so we do any byte flipping required here */
613         if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
614                 struct mdt_body *body;
615
616                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
617                 if (body == NULL) {
618                         CERROR ("Can't swab mdt_body\n");
619                         RETURN (-EPROTO);
620                 }
621
622                 if (it_disposition(it, DISP_OPEN_OPEN) &&
623                     !it_open_error(DISP_OPEN_OPEN, it)) {
624                         /*
625                          * If this is a successful OPEN request, we need to set
626                          * replay handler and data early, so that if replay
627                          * happens immediately after swabbing below, new reply
628                          * is swabbed by that handler correctly.
629                          */
630                         mdc_set_open_replay_data(NULL, NULL, it);
631                 }
632
633                 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
634                         void *eadata;
635
636                         mdc_update_max_ea_from_body(exp, body);
637
638                         /*
639                          * The eadata is opaque; just check that it is there.
640                          * Eventually, obd_unpackmd() will check the contents.
641                          */
642                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
643                                                         body->mbo_eadatasize);
644                         if (eadata == NULL)
645                                 RETURN(-EPROTO);
646
647                         /* save lvb data and length in case this is for layout
648                          * lock */
649                         lvb_data = eadata;
650                         lvb_len = body->mbo_eadatasize;
651
652                         /*
653                          * We save the reply LOV EA in case we have to replay a
654                          * create for recovery.  If we didn't allocate a large
655                          * enough request buffer above we need to reallocate it
656                          * here to hold the actual LOV EA.
657                          *
658                          * To not save LOV EA if request is not going to replay
659                          * (for example error one).
660                          */
661                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
662                                 void *lmm;
663                                 if (req_capsule_get_size(pill, &RMF_EADATA,
664                                                          RCL_CLIENT) <
665                                     body->mbo_eadatasize)
666                                         mdc_realloc_openmsg(req, body);
667                                 else
668                                         req_capsule_shrink(pill, &RMF_EADATA,
669                                                            body->mbo_eadatasize,
670                                                            RCL_CLIENT);
671
672                                 req_capsule_set_size(pill, &RMF_EADATA,
673                                                      RCL_CLIENT,
674                                                      body->mbo_eadatasize);
675
676                                 lmm = req_capsule_client_get(pill, &RMF_EADATA);
677                                 if (lmm)
678                                         memcpy(lmm, eadata,
679                                                body->mbo_eadatasize);
680                         }
681                 }
682
683                 if (body->mbo_valid & OBD_MD_FLRMTPERM) {
684                         struct mdt_remote_perm *perm;
685
686                         LASSERT(client_is_remote(exp));
687                         perm = req_capsule_server_swab_get(pill, &RMF_ACL,
688                                                 lustre_swab_mdt_remote_perm);
689                         if (perm == NULL)
690                                 RETURN(-EPROTO);
691                 }
692         } else if (it->it_op & IT_LAYOUT) {
693                 /* maybe the lock was granted right away and layout
694                  * is packed into RMF_DLM_LVB of req */
695                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
696                 if (lvb_len > 0) {
697                         lvb_data = req_capsule_server_sized_get(pill,
698                                                         &RMF_DLM_LVB, lvb_len);
699                         if (lvb_data == NULL)
700                                 RETURN(-EPROTO);
701                 }
702         }
703
704         /* fill in stripe data for layout lock.
705          * LU-6581: trust layout data only if layout lock is granted. The MDT
706          * has stopped sending layout unless the layout lock is granted. The
707          * client still does this checking in case it's talking with an old
708          * server. - Jinshan */
709         lock = ldlm_handle2lock(lockh);
710         if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL &&
711             !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
712                 void *lmm;
713
714                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
715                         ldlm_it2str(it->it_op), lvb_len);
716
717                 OBD_ALLOC_LARGE(lmm, lvb_len);
718                 if (lmm == NULL) {
719                         LDLM_LOCK_PUT(lock);
720                         RETURN(-ENOMEM);
721                 }
722                 memcpy(lmm, lvb_data, lvb_len);
723
724                 /* install lvb_data */
725                 lock_res_and_lock(lock);
726                 if (lock->l_lvb_data == NULL) {
727                         lock->l_lvb_type = LVB_T_LAYOUT;
728                         lock->l_lvb_data = lmm;
729                         lock->l_lvb_len = lvb_len;
730                         lmm = NULL;
731                 }
732                 unlock_res_and_lock(lock);
733                 if (lmm != NULL)
734                         OBD_FREE_LARGE(lmm, lvb_len);
735         }
736         if (lock != NULL)
737                 LDLM_LOCK_PUT(lock);
738
739         RETURN(rc);
740 }
741
742 /* We always reserve enough space in the reply packet for a stripe MD, because
743  * we don't know in advance the file type. */
744 int mdc_enqueue(struct obd_export *exp,
745                 struct ldlm_enqueue_info *einfo,
746                 const union ldlm_policy_data *policy,
747                 struct lookup_intent *it, struct md_op_data *op_data,
748                 struct lustre_handle *lockh, __u64 extra_lock_flags)
749 {
750         struct obd_device *obddev = class_exp2obd(exp);
751         struct ptlrpc_request *req = NULL;
752         __u64 flags, saved_flags = extra_lock_flags;
753         struct ldlm_res_id res_id;
754         static const union ldlm_policy_data lookup_policy = {
755                                   .l_inodebits = { MDS_INODELOCK_LOOKUP } };
756         static const union ldlm_policy_data update_policy = {
757                                   .l_inodebits = { MDS_INODELOCK_UPDATE } };
758         static const union ldlm_policy_data layout_policy = {
759                                   .l_inodebits = { MDS_INODELOCK_LAYOUT } };
760         static const union ldlm_policy_data getxattr_policy = {
761                                   .l_inodebits = { MDS_INODELOCK_XATTR } };
762         int generation, resends = 0;
763         struct ldlm_reply *lockrep;
764         enum lvb_type lvb_type = 0;
765         int rc;
766         ENTRY;
767
768         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
769                  einfo->ei_type);
770         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
771
772         if (it != NULL) {
773                 LASSERT(policy == NULL);
774
775                 saved_flags |= LDLM_FL_HAS_INTENT;
776                 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_GETATTR | IT_READDIR))
777                         policy = &update_policy;
778                 else if (it->it_op & IT_LAYOUT)
779                         policy = &layout_policy;
780                 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
781                         policy = &getxattr_policy;
782                 else
783                         policy = &lookup_policy;
784         }
785
786         generation = obddev->u.cli.cl_import->imp_generation;
787 resend:
788         flags = saved_flags;
789         if (it == NULL) {
790                 /* The only way right now is FLOCK. */
791                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
792                          einfo->ei_type);
793                 res_id.name[3] = LDLM_FLOCK;
794         } else if (it->it_op & IT_OPEN) {
795                 req = mdc_intent_open_pack(exp, it, op_data);
796         } else if (it->it_op & IT_UNLINK) {
797                 req = mdc_intent_unlink_pack(exp, it, op_data);
798         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
799                 req = mdc_intent_getattr_pack(exp, it, op_data);
800         } else if (it->it_op & IT_READDIR) {
801                 req = mdc_enqueue_pack(exp, 0);
802         } else if (it->it_op & IT_LAYOUT) {
803                 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
804                         RETURN(-EOPNOTSUPP);
805                 req = mdc_intent_layout_pack(exp, it, op_data);
806                 lvb_type = LVB_T_LAYOUT;
807         } else if (it->it_op & IT_GETXATTR) {
808                 req = mdc_intent_getxattr_pack(exp, it, op_data);
809         } else {
810                 LBUG();
811                 RETURN(-EINVAL);
812         }
813
814         if (IS_ERR(req))
815                 RETURN(PTR_ERR(req));
816
817         if (resends) {
818                 req->rq_generation_set = 1;
819                 req->rq_import_generation = generation;
820                 req->rq_sent = cfs_time_current_sec() + resends;
821         }
822
823         /* It is important to obtain modify RPC slot first (if applicable), so
824          * that threads that are waiting for a modify RPC slot are not polluting
825          * our rpcs in flight counter.
826          * We do not do flock request limiting, though */
827         if (it) {
828                 mdc_get_mod_rpc_slot(req, it);
829                 rc = obd_get_request_slot(&obddev->u.cli);
830                 if (rc != 0) {
831                         mdc_put_mod_rpc_slot(req, it);
832                         mdc_clear_replay_flag(req, 0);
833                         ptlrpc_req_finished(req);
834                         RETURN(rc);
835                 }
836         }
837
838         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
839                               0, lvb_type, lockh, 0);
840         if (!it) {
841                 /* For flock requests we immediatelly return without further
842                    delay and let caller deal with the rest, since rest of
843                    this function metadata processing makes no sense for flock
844                    requests anyway. But in case of problem during comms with
845                    Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
846                    can not rely on caller and this mainly for F_UNLCKs
847                    (explicits or automatically generated by Kernel to clean
848                    current FLocks upon exit) that can't be trashed */
849                 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
850                     (einfo->ei_type == LDLM_FLOCK) &&
851                     (einfo->ei_mode == LCK_NL))
852                         goto resend;
853                 RETURN(rc);
854         }
855
856         obd_put_request_slot(&obddev->u.cli);
857         mdc_put_mod_rpc_slot(req, it);
858
859         if (rc < 0) {
860                 CDEBUG(D_INFO, "%s: ldlm_cli_enqueue failed: rc = %d\n",
861                        obddev->obd_name, rc);
862
863                 mdc_clear_replay_flag(req, rc);
864                 ptlrpc_req_finished(req);
865                 RETURN(rc);
866         }
867
868         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
869         LASSERT(lockrep != NULL);
870
871         lockrep->lock_policy_res2 =
872                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
873
874         /* Retry infinitely when the server returns -EINPROGRESS for the
875          * intent operation, when server returns -EINPROGRESS for acquiring
876          * intent lock, we'll retry in after_reply(). */
877         if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
878                 mdc_clear_replay_flag(req, rc);
879                 ptlrpc_req_finished(req);
880                 resends++;
881
882                 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
883                        obddev->obd_name, resends, it->it_op,
884                        PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
885
886                 if (generation == obddev->u.cli.cl_import->imp_generation) {
887                         goto resend;
888                 } else {
889                         CDEBUG(D_HA, "resend cross eviction\n");
890                         RETURN(-EIO);
891                 }
892         }
893
894         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
895         if (rc < 0) {
896                 if (lustre_handle_is_used(lockh)) {
897                         ldlm_lock_decref(lockh, einfo->ei_mode);
898                         memset(lockh, 0, sizeof(*lockh));
899                 }
900                 ptlrpc_req_finished(req);
901
902                 it->d.lustre.it_lock_handle = 0;
903                 it->d.lustre.it_lock_mode = 0;
904                 it->d.lustre.it_data = NULL;
905         }
906
907         RETURN(rc);
908 }
909
910 static int mdc_finish_intent_lock(struct obd_export *exp,
911                                   struct ptlrpc_request *request,
912                                   struct md_op_data *op_data,
913                                   struct lookup_intent *it,
914                                   struct lustre_handle *lockh)
915 {
916         struct lustre_handle old_lock;
917         struct mdt_body *mdt_body;
918         struct ldlm_lock *lock;
919         int rc;
920         ENTRY;
921
922         LASSERT(request != NULL);
923         LASSERT(request != LP_POISON);
924         LASSERT(request->rq_repmsg != LP_POISON);
925
926         if (it->it_op & IT_READDIR)
927                 RETURN(0);
928
929         if (!it_disposition(it, DISP_IT_EXECD)) {
930                 /* The server failed before it even started executing the
931                  * intent, i.e. because it couldn't unpack the request. */
932                 LASSERT(it->d.lustre.it_status != 0);
933                 RETURN(it->d.lustre.it_status);
934         }
935         rc = it_open_error(DISP_IT_EXECD, it);
936         if (rc)
937                 RETURN(rc);
938
939         mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
940         LASSERT(mdt_body != NULL);      /* mdc_enqueue checked */
941
942         rc = it_open_error(DISP_LOOKUP_EXECD, it);
943         if (rc)
944                 RETURN(rc);
945
946         /* keep requests around for the multiple phases of the call
947          * this shows the DISP_XX must guarantee we make it into the call
948          */
949         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
950             it_disposition(it, DISP_OPEN_CREATE) &&
951             !it_open_error(DISP_OPEN_CREATE, it)) {
952                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
953                 ptlrpc_request_addref(request); /* balanced in ll_create_node */
954         }
955         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
956             it_disposition(it, DISP_OPEN_OPEN) &&
957             !it_open_error(DISP_OPEN_OPEN, it)) {
958                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
959                 ptlrpc_request_addref(request); /* balanced in ll_file_open */
960                 /* BUG 11546 - eviction in the middle of open rpc processing */
961                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
962         }
963
964         if (it->it_op & IT_CREAT) {
965                 /* XXX this belongs in ll_create_it */
966         } else if (it->it_op == IT_OPEN) {
967                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
968         } else {
969                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
970         }
971
972         /* If we already have a matching lock, then cancel the new
973          * one.  We have to set the data here instead of in
974          * mdc_enqueue, because we need to use the child's inode as
975          * the l_ast_data to match, and that's not available until
976          * intent_finish has performed the iget().) */
977         lock = ldlm_handle2lock(lockh);
978         if (lock) {
979                 union ldlm_policy_data policy = lock->l_policy_data;
980                 LDLM_DEBUG(lock, "matching against this");
981
982                 LASSERTF(fid_res_name_eq(&mdt_body->mbo_fid1,
983                                          &lock->l_resource->lr_name),
984                          "Lock res_id: "DLDLMRES", fid: "DFID"\n",
985                          PLDLMRES(lock->l_resource), PFID(&mdt_body->mbo_fid1));
986                 LDLM_LOCK_PUT(lock);
987
988                 memcpy(&old_lock, lockh, sizeof(*lockh));
989                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
990                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
991                         ldlm_lock_decref_and_cancel(lockh,
992                                                     it->d.lustre.it_lock_mode);
993                         memcpy(lockh, &old_lock, sizeof(old_lock));
994                         it->d.lustre.it_lock_handle = lockh->cookie;
995                 }
996         }
997         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
998                 (int)op_data->op_namelen, op_data->op_name,
999                 ldlm_it2str(it->it_op), it->d.lustre.it_status,
1000                 it->d.lustre.it_disposition, rc);
1001         RETURN(rc);
1002 }
1003
1004 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1005                         struct lu_fid *fid, __u64 *bits)
1006 {
1007         /* We could just return 1 immediately, but since we should only
1008          * be called in revalidate_it if we already have a lock, let's
1009          * verify that. */
1010         struct ldlm_res_id res_id;
1011         struct lustre_handle lockh;
1012         union ldlm_policy_data policy;
1013         enum ldlm_mode mode;
1014         ENTRY;
1015
1016         if (it->d.lustre.it_lock_handle) {
1017                 lockh.cookie = it->d.lustre.it_lock_handle;
1018                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1019         } else {
1020                 fid_build_reg_res_name(fid, &res_id);
1021                 switch (it->it_op) {
1022                 case IT_GETATTR:
1023                         /* File attributes are held under multiple bits:
1024                          * nlink is under lookup lock, size and times are
1025                          * under UPDATE lock and recently we've also got
1026                          * a separate permissions lock for owner/group/acl that
1027                          * were protected by lookup lock before.
1028                          * Getattr must provide all of that information,
1029                          * so we need to ensure we have all of those locks.
1030                          * Unfortunately, if the bits are split across multiple
1031                          * locks, there's no easy way to match all of them here,
1032                          * so an extra RPC would be performed to fetch all
1033                          * of those bits at once for now. */
1034                         /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1035                          * but for old MDTs (< 2.4), permission is covered
1036                          * by LOOKUP lock, so it needs to match all bits here.*/
1037                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1038                                                   MDS_INODELOCK_LOOKUP |
1039                                                   MDS_INODELOCK_PERM;
1040                         break;
1041                 case IT_READDIR:
1042                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1043                         break;
1044                 case IT_LAYOUT:
1045                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1046                         break;
1047                 default:
1048                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1049                         break;
1050                 }
1051
1052                 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1053                                       LDLM_IBITS, &policy,
1054                                       LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1055                                       &lockh);
1056         }
1057
1058         if (mode) {
1059                 it->d.lustre.it_lock_handle = lockh.cookie;
1060                 it->d.lustre.it_lock_mode = mode;
1061         } else {
1062                 it->d.lustre.it_lock_handle = 0;
1063                 it->d.lustre.it_lock_mode = 0;
1064         }
1065
1066         RETURN(!!mode);
1067 }
1068
1069 /*
1070  * This long block is all about fixing up the lock and request state
1071  * so that it is correct as of the moment _before_ the operation was
1072  * applied; that way, the VFS will think that everything is normal and
1073  * call Lustre's regular VFS methods.
1074  *
1075  * If we're performing a creation, that means that unless the creation
1076  * failed with EEXIST, we should fake up a negative dentry.
1077  *
1078  * For everything else, we want to lookup to succeed.
1079  *
1080  * One additional note: if CREATE or OPEN succeeded, we add an extra
1081  * reference to the request because we need to keep it around until
1082  * ll_create/ll_open gets called.
1083  *
1084  * The server will return to us, in it_disposition, an indication of
1085  * exactly what d.lustre.it_status refers to.
1086  *
1087  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1088  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1089  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1090  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1091  * was successful.
1092  *
1093  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1094  * child lookup.
1095  */
1096 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1097                     struct lookup_intent *it, struct ptlrpc_request **reqp,
1098                     ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1099 {
1100         struct ldlm_enqueue_info einfo = {
1101                 .ei_type        = LDLM_IBITS,
1102                 .ei_mode        = it_to_lock_mode(it),
1103                 .ei_cb_bl       = cb_blocking,
1104                 .ei_cb_cp       = ldlm_completion_ast,
1105         };
1106         struct lustre_handle lockh;
1107         int rc = 0;
1108         ENTRY;
1109         LASSERT(it);
1110
1111         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1112                 ", intent: %s flags %#"LPF64"o\n", (int)op_data->op_namelen,
1113                 op_data->op_name, PFID(&op_data->op_fid2),
1114                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1115                 it->it_flags);
1116
1117         lockh.cookie = 0;
1118         if (fid_is_sane(&op_data->op_fid2) &&
1119             (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1120                 /* We could just return 1 immediately, but since we should only
1121                  * be called in revalidate_it if we already have a lock, let's
1122                  * verify that. */
1123                 it->d.lustre.it_lock_handle = 0;
1124                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1125                 /* Only return failure if it was not GETATTR by cfid
1126                    (from inode_revalidate) */
1127                 if (rc || op_data->op_namelen != 0)
1128                         RETURN(rc);
1129         }
1130
1131         /* For case if upper layer did not alloc fid, do it now. */
1132         if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1133                 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1134                 if (rc < 0) {
1135                         CERROR("Can't alloc new fid, rc %d\n", rc);
1136                         RETURN(rc);
1137                 }
1138         }
1139
1140         rc = mdc_enqueue(exp, &einfo, NULL, it, op_data, &lockh,
1141                          extra_lock_flags);
1142         if (rc < 0)
1143                 RETURN(rc);
1144
1145         *reqp = it->d.lustre.it_data;
1146         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1147         RETURN(rc);
1148 }
1149
1150 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1151                                               struct ptlrpc_request *req,
1152                                               void *args, int rc)
1153 {
1154         struct mdc_getattr_args  *ga = args;
1155         struct obd_export        *exp = ga->ga_exp;
1156         struct md_enqueue_info   *minfo = ga->ga_minfo;
1157         struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1158         struct lookup_intent     *it;
1159         struct lustre_handle     *lockh;
1160         struct obd_device        *obddev;
1161         struct ldlm_reply        *lockrep;
1162         __u64                     flags = LDLM_FL_HAS_INTENT;
1163         ENTRY;
1164
1165         it    = &minfo->mi_it;
1166         lockh = &minfo->mi_lockh;
1167
1168         obddev = class_exp2obd(exp);
1169
1170         obd_put_request_slot(&obddev->u.cli);
1171         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1172                 rc = -ETIMEDOUT;
1173
1174         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1175                                    &flags, NULL, 0, lockh, rc);
1176         if (rc < 0) {
1177                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1178                 mdc_clear_replay_flag(req, rc);
1179                 GOTO(out, rc);
1180         }
1181
1182         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1183         LASSERT(lockrep != NULL);
1184
1185         lockrep->lock_policy_res2 =
1186                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1187
1188         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1189         if (rc)
1190                 GOTO(out, rc);
1191
1192         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1193         EXIT;
1194
1195 out:
1196         OBD_FREE_PTR(einfo);
1197         minfo->mi_cb(req, minfo, rc);
1198         return 0;
1199 }
1200
1201 int mdc_intent_getattr_async(struct obd_export *exp,
1202                              struct md_enqueue_info *minfo,
1203                              struct ldlm_enqueue_info *einfo)
1204 {
1205         struct md_op_data       *op_data = &minfo->mi_data;
1206         struct lookup_intent    *it = &minfo->mi_it;
1207         struct ptlrpc_request   *req;
1208         struct mdc_getattr_args *ga;
1209         struct obd_device       *obddev = class_exp2obd(exp);
1210         struct ldlm_res_id       res_id;
1211         /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1212          *     for statahead currently. Consider CMD in future, such two bits
1213          *     maybe managed by different MDS, should be adjusted then. */
1214         union ldlm_policy_data policy = {
1215                                 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1216                                                  MDS_INODELOCK_UPDATE } };
1217         int                      rc = 0;
1218         __u64                    flags = LDLM_FL_HAS_INTENT;
1219         ENTRY;
1220
1221         CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#"
1222                 LPF64"o\n",
1223                 (int)op_data->op_namelen, op_data->op_name,
1224                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1225
1226         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1227         req = mdc_intent_getattr_pack(exp, it, op_data);
1228         if (IS_ERR(req))
1229                 RETURN(PTR_ERR(req));
1230
1231         rc = obd_get_request_slot(&obddev->u.cli);
1232         if (rc != 0) {
1233                 ptlrpc_req_finished(req);
1234                 RETURN(rc);
1235         }
1236
1237         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1238                               0, LVB_T_NONE, &minfo->mi_lockh, 1);
1239         if (rc < 0) {
1240                 obd_put_request_slot(&obddev->u.cli);
1241                 ptlrpc_req_finished(req);
1242                 RETURN(rc);
1243         }
1244
1245         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1246         ga = ptlrpc_req_async_args(req);
1247         ga->ga_exp = exp;
1248         ga->ga_minfo = minfo;
1249         ga->ga_einfo = einfo;
1250
1251         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1252         ptlrpcd_add_req(req);
1253
1254         RETURN(0);
1255 }