Whamcloud - gitweb
LU-11074 mdc: set correct body eadatasize for getxattr()
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_MDC
34
35 #include <linux/module.h>
36
37 #include <obd.h>
38 #include <obd_class.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_intent.h>
42 #include <lustre_mdc.h>
43 #include <lustre_net.h>
44 #include <lustre_req_layout.h>
45 #include <lustre_swab.h>
46 #include <lustre_acl.h>
47
48 #include "mdc_internal.h"
49
50 struct mdc_getattr_args {
51         struct obd_export               *ga_exp;
52         struct md_enqueue_info          *ga_minfo;
53 };
54
55 int it_open_error(int phase, struct lookup_intent *it)
56 {
57         if (it_disposition(it, DISP_OPEN_LEASE)) {
58                 if (phase >= DISP_OPEN_LEASE)
59                         return it->it_status;
60                 else
61                         return 0;
62         }
63         if (it_disposition(it, DISP_OPEN_OPEN)) {
64                 if (phase >= DISP_OPEN_OPEN)
65                         return it->it_status;
66                 else
67                         return 0;
68         }
69
70         if (it_disposition(it, DISP_OPEN_CREATE)) {
71                 if (phase >= DISP_OPEN_CREATE)
72                         return it->it_status;
73                 else
74                         return 0;
75         }
76
77         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
78                 if (phase >= DISP_LOOKUP_EXECD)
79                         return it->it_status;
80                 else
81                         return 0;
82         }
83
84         if (it_disposition(it, DISP_IT_EXECD)) {
85                 if (phase >= DISP_IT_EXECD)
86                         return it->it_status;
87                 else
88                         return 0;
89         }
90
91         CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
92         LBUG();
93
94         return 0;
95 }
96 EXPORT_SYMBOL(it_open_error);
97
98 /* this must be called on a lockh that is known to have a referenced lock */
99 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
100                       void *data, __u64 *bits)
101 {
102         struct ldlm_lock *lock;
103         struct inode *new_inode = data;
104         ENTRY;
105
106         if(bits)
107                 *bits = 0;
108
109         if (!lustre_handle_is_used(lockh))
110                 RETURN(0);
111
112         lock = ldlm_handle2lock(lockh);
113
114         LASSERT(lock != NULL);
115         lock_res_and_lock(lock);
116         if (lock->l_resource->lr_lvb_inode &&
117             lock->l_resource->lr_lvb_inode != data) {
118                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
119                 LASSERTF(old_inode->i_state & I_FREEING,
120                          "Found existing inode %p/%lu/%u state %lu in lock: "
121                          "setting data to %p/%lu/%u\n", old_inode,
122                          old_inode->i_ino, old_inode->i_generation,
123                          old_inode->i_state,
124                          new_inode, new_inode->i_ino, new_inode->i_generation);
125         }
126         lock->l_resource->lr_lvb_inode = new_inode;
127         if (bits)
128                 *bits = lock->l_policy_data.l_inodebits.bits;
129
130         unlock_res_and_lock(lock);
131         LDLM_LOCK_PUT(lock);
132
133         RETURN(0);
134 }
135
136 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
137                               const struct lu_fid *fid, enum ldlm_type type,
138                               union ldlm_policy_data *policy,
139                               enum ldlm_mode mode, struct lustre_handle *lockh)
140 {
141         struct ldlm_res_id res_id;
142         enum ldlm_mode rc;
143         ENTRY;
144
145         fid_build_reg_res_name(fid, &res_id);
146         /* LU-4405: Clear bits not supported by server */
147         policy->l_inodebits.bits &= exp_connect_ibits(exp);
148         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
149                              &res_id, type, policy, mode, lockh, 0);
150         RETURN(rc);
151 }
152
153 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
154                       union ldlm_policy_data *policy, enum ldlm_mode mode,
155                       enum ldlm_cancel_flags flags, void *opaque)
156 {
157         struct obd_device *obd = class_exp2obd(exp);
158         struct ldlm_res_id res_id;
159         int rc;
160
161         ENTRY;
162
163         fid_build_reg_res_name(fid, &res_id);
164         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
165                                              policy, mode, flags, opaque);
166         RETURN(rc);
167 }
168
169 int mdc_null_inode(struct obd_export *exp,
170                    const struct lu_fid *fid)
171 {
172         struct ldlm_res_id res_id;
173         struct ldlm_resource *res;
174         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
175         ENTRY;
176
177         LASSERTF(ns != NULL, "no namespace passed\n");
178
179         fid_build_reg_res_name(fid, &res_id);
180
181         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
182         if (IS_ERR(res))
183                 RETURN(0);
184
185         lock_res(res);
186         res->lr_lvb_inode = NULL;
187         unlock_res(res);
188
189         ldlm_resource_putref(res);
190         RETURN(0);
191 }
192
193 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
194 {
195         /* Don't hold error requests for replay. */
196         if (req->rq_replay) {
197                 spin_lock(&req->rq_lock);
198                 req->rq_replay = 0;
199                 spin_unlock(&req->rq_lock);
200         }
201         if (rc && req->rq_transno != 0) {
202                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
203                 LBUG();
204         }
205 }
206
207 /* Save a large LOV EA into the request buffer so that it is available
208  * for replay.  We don't do this in the initial request because the
209  * original request doesn't need this buffer (at most it sends just the
210  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
211  * buffer and may also be difficult to allocate and save a very large
212  * request buffer for each open. (bug 5707)
213  *
214  * OOM here may cause recovery failure if lmm is needed (only for the
215  * original open if the MDS crashed just when this client also OOM'd)
216  * but this is incredibly unlikely, and questionable whether the client
217  * could do MDS recovery under OOM anyways... */
218 int mdc_save_lovea(struct ptlrpc_request *req,
219                    const struct req_msg_field *field,
220                    void *data, u32 size)
221 {
222         struct req_capsule *pill = &req->rq_pill;
223         void *lmm;
224         int rc = 0;
225
226         if (req_capsule_get_size(pill, field, RCL_CLIENT) < size) {
227                 rc = sptlrpc_cli_enlarge_reqbuf(req, field, size);
228                 if (rc) {
229                         CERROR("%s: Can't enlarge ea size to %d: rc = %d\n",
230                                req->rq_export->exp_obd->obd_name,
231                                size, rc);
232                         return rc;
233                 }
234         } else {
235                 req_capsule_shrink(pill, field, size, RCL_CLIENT);
236         }
237
238         req_capsule_set_size(pill, field, RCL_CLIENT, size);
239         lmm = req_capsule_client_get(pill, field);
240         if (lmm)
241                 memcpy(lmm, data, size);
242
243         return rc;
244 }
245
246 static struct ptlrpc_request *
247 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
248                      struct md_op_data *op_data, __u32 acl_bufsize)
249 {
250         struct ptlrpc_request   *req;
251         struct obd_device       *obddev = class_exp2obd(exp);
252         struct ldlm_intent      *lit;
253         const void              *lmm = op_data->op_data;
254         __u32                    lmmsize = op_data->op_data_size;
255         struct list_head         cancels = LIST_HEAD_INIT(cancels);
256         int                      count = 0;
257         enum ldlm_mode           mode;
258         int                      rc;
259         ENTRY;
260
261         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
262
263         /* XXX: openlock is not cancelled for cross-refs. */
264         /* If inode is known, cancel conflicting OPEN locks. */
265         if (fid_is_sane(&op_data->op_fid2)) {
266                 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
267                         if (it->it_flags & MDS_FMODE_WRITE)
268                                 mode = LCK_EX;
269                         else
270                                 mode = LCK_PR;
271                 } else {
272                         if (it->it_flags & (MDS_FMODE_WRITE | MDS_OPEN_TRUNC))
273                                 mode = LCK_CW;
274 #ifdef FMODE_EXEC
275                         else if (it->it_flags & FMODE_EXEC)
276                                 mode = LCK_PR;
277 #endif
278                         else
279                                 mode = LCK_CR;
280                 }
281                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
282                                                 &cancels, mode,
283                                                 MDS_INODELOCK_OPEN);
284         }
285
286         /* If CREATE, cancel parent's UPDATE lock. */
287         if (it->it_op & IT_CREAT)
288                 mode = LCK_EX;
289         else
290                 mode = LCK_CR;
291         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
292                                          &cancels, mode,
293                                          MDS_INODELOCK_UPDATE);
294
295         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
296                                    &RQF_LDLM_INTENT_OPEN);
297         if (req == NULL) {
298                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
299                 RETURN(ERR_PTR(-ENOMEM));
300         }
301
302         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
303                              op_data->op_namelen + 1);
304         if (cl_is_lov_delay_create(it->it_flags)) {
305                 /* open(O_LOV_DELAY_CREATE) won't pack lmm */
306                 LASSERT(lmmsize == 0);
307                 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
308         } else {
309                 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
310                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
311         }
312
313         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
314                              RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
315                              strlen(op_data->op_file_secctx_name) + 1 : 0);
316
317         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
318                              op_data->op_file_secctx_size);
319
320         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
321         if (rc < 0) {
322                 ptlrpc_request_free(req);
323                 RETURN(ERR_PTR(rc));
324         }
325
326         spin_lock(&req->rq_lock);
327         req->rq_replay = req->rq_import->imp_replayable;
328         spin_unlock(&req->rq_lock);
329
330         /* pack the intent */
331         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
332         lit->opc = (__u64)it->it_op;
333
334         /* pack the intended request */
335         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
336                       lmmsize);
337
338         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
339                              obddev->u.cli.cl_max_mds_easize);
340         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
341         ptlrpc_request_set_replen(req);
342         return req;
343 }
344
345 #define GA_DEFAULT_EA_NAME_LEN 20
346 #define GA_DEFAULT_EA_VAL_LEN  250
347 #define GA_DEFAULT_EA_NUM      10
348
349 static struct ptlrpc_request *
350 mdc_intent_getxattr_pack(struct obd_export *exp,
351                          struct lookup_intent *it,
352                          struct md_op_data *op_data)
353 {
354         struct ptlrpc_request   *req;
355         struct ldlm_intent      *lit;
356         int                     rc, count = 0;
357         struct list_head        cancels = LIST_HEAD_INIT(cancels);
358         u32 min_buf_size = 0;
359
360         ENTRY;
361
362         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
363                                         &RQF_LDLM_INTENT_GETXATTR);
364         if (req == NULL)
365                 RETURN(ERR_PTR(-ENOMEM));
366
367         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
368         if (rc) {
369                 ptlrpc_request_free(req);
370                 RETURN(ERR_PTR(rc));
371         }
372
373         /* pack the intent */
374         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
375         lit->opc = IT_GETXATTR;
376
377 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 0, 53, 0)
378         /* If the supplied buffer is too small then the server will
379          * return -ERANGE and llite will fallback to using non cached
380          * xattr operations. On servers before 2.10.1 a (non-cached)
381          * listxattr RPC for an orphan or dead file causes an oops. So
382          * let's try to avoid sending too small a buffer to too old a
383          * server. This is effectively undoing the memory conservation
384          * of LU-9417 when it would be *more* likely to crash the
385          * server. See LU-9856. */
386         if (exp->exp_connect_data.ocd_version < OBD_OCD_VERSION(2, 10, 1, 0))
387                 min_buf_size = exp->exp_connect_data.ocd_max_easize;
388 #endif
389
390         /* pack the intended request */
391         mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid,
392                       max_t(u32, min_buf_size,
393                             GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM),
394                       -1, 0);
395
396         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
397                              max_t(u32, min_buf_size,
398                                  GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM));
399
400         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER,
401                              max_t(u32, min_buf_size,
402                                  GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM));
403
404         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS, RCL_SERVER,
405                              max_t(u32, min_buf_size,
406                                  sizeof(__u32) * GA_DEFAULT_EA_NUM));
407
408         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
409
410         ptlrpc_request_set_replen(req);
411
412         RETURN(req);
413 }
414
415 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
416                                                      struct lookup_intent *it,
417                                                      struct md_op_data *op_data)
418 {
419         struct ptlrpc_request *req;
420         struct obd_device     *obddev = class_exp2obd(exp);
421         struct ldlm_intent    *lit;
422         int                    rc;
423         ENTRY;
424
425         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
426                                    &RQF_LDLM_INTENT_UNLINK);
427         if (req == NULL)
428                 RETURN(ERR_PTR(-ENOMEM));
429
430         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
431                              op_data->op_namelen + 1);
432
433         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
434         if (rc) {
435                 ptlrpc_request_free(req);
436                 RETURN(ERR_PTR(rc));
437         }
438
439         /* pack the intent */
440         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
441         lit->opc = (__u64)it->it_op;
442
443         /* pack the intended request */
444         mdc_unlink_pack(req, op_data);
445
446         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
447                              obddev->u.cli.cl_default_mds_easize);
448         ptlrpc_request_set_replen(req);
449         RETURN(req);
450 }
451
452 static struct ptlrpc_request *
453 mdc_intent_getattr_pack(struct obd_export *exp, struct lookup_intent *it,
454                         struct md_op_data *op_data, __u32 acl_bufsize)
455 {
456         struct ptlrpc_request   *req;
457         struct obd_device       *obddev = class_exp2obd(exp);
458         u64                      valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
459                                          OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
460                                          OBD_MD_MEA | OBD_MD_FLACL;
461         struct ldlm_intent      *lit;
462         int                      rc;
463         __u32                    easize;
464         ENTRY;
465
466         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
467                                    &RQF_LDLM_INTENT_GETATTR);
468         if (req == NULL)
469                 RETURN(ERR_PTR(-ENOMEM));
470
471         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
472                              op_data->op_namelen + 1);
473
474         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
475         if (rc) {
476                 ptlrpc_request_free(req);
477                 RETURN(ERR_PTR(rc));
478         }
479
480         /* pack the intent */
481         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
482         lit->opc = (__u64)it->it_op;
483
484         if (obddev->u.cli.cl_default_mds_easize > 0)
485                 easize = obddev->u.cli.cl_default_mds_easize;
486         else
487                 easize = obddev->u.cli.cl_max_mds_easize;
488
489         /* pack the intended request */
490         mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
491
492         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
493         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
494         ptlrpc_request_set_replen(req);
495         RETURN(req);
496 }
497
498 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
499                                                      struct lookup_intent *it,
500                                                      struct md_op_data *op_data)
501 {
502         struct obd_device     *obd = class_exp2obd(exp);
503         struct ptlrpc_request *req;
504         struct ldlm_intent    *lit;
505         struct layout_intent  *layout;
506         int rc;
507         ENTRY;
508
509         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
510                                 &RQF_LDLM_INTENT_LAYOUT);
511         if (req == NULL)
512                 RETURN(ERR_PTR(-ENOMEM));
513
514         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
515         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
516         if (rc) {
517                 ptlrpc_request_free(req);
518                 RETURN(ERR_PTR(rc));
519         }
520
521         /* pack the intent */
522         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
523         lit->opc = (__u64)it->it_op;
524
525         /* pack the layout intent request */
526         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
527         LASSERT(op_data->op_data != NULL);
528         LASSERT(op_data->op_data_size == sizeof(*layout));
529         memcpy(layout, op_data->op_data, sizeof(*layout));
530
531         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
532                              obd->u.cli.cl_default_mds_easize);
533         ptlrpc_request_set_replen(req);
534         RETURN(req);
535 }
536
537 static struct ptlrpc_request *
538 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
539 {
540         struct ptlrpc_request *req;
541         int rc;
542         ENTRY;
543
544         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
545         if (req == NULL)
546                 RETURN(ERR_PTR(-ENOMEM));
547
548         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
549         if (rc) {
550                 ptlrpc_request_free(req);
551                 RETURN(ERR_PTR(rc));
552         }
553
554         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
555         ptlrpc_request_set_replen(req);
556         RETURN(req);
557 }
558
559 static int mdc_finish_enqueue(struct obd_export *exp,
560                               struct ptlrpc_request *req,
561                               struct ldlm_enqueue_info *einfo,
562                               struct lookup_intent *it,
563                               struct lustre_handle *lockh,
564                               int rc)
565 {
566         struct req_capsule  *pill = &req->rq_pill;
567         struct ldlm_request *lockreq;
568         struct ldlm_reply   *lockrep;
569         struct ldlm_lock    *lock;
570         struct mdt_body     *body = NULL;
571         void                *lvb_data = NULL;
572         __u32                lvb_len = 0;
573
574         ENTRY;
575
576         LASSERT(rc >= 0);
577         /* Similarly, if we're going to replay this request, we don't want to
578          * actually get a lock, just perform the intent. */
579         if (req->rq_transno || req->rq_replay) {
580                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
581                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
582         }
583
584         if (rc == ELDLM_LOCK_ABORTED) {
585                 einfo->ei_mode = 0;
586                 memset(lockh, 0, sizeof(*lockh));
587                 rc = 0;
588         } else { /* rc = 0 */
589                 lock = ldlm_handle2lock(lockh);
590                 LASSERT(lock != NULL);
591
592                 /* If the server gave us back a different lock mode, we should
593                  * fix up our variables. */
594                 if (lock->l_req_mode != einfo->ei_mode) {
595                         ldlm_lock_addref(lockh, lock->l_req_mode);
596                         ldlm_lock_decref(lockh, einfo->ei_mode);
597                         einfo->ei_mode = lock->l_req_mode;
598                 }
599                 LDLM_LOCK_PUT(lock);
600         }
601
602         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
603         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
604
605         it->it_disposition = (int)lockrep->lock_policy_res1;
606         it->it_status = (int)lockrep->lock_policy_res2;
607         it->it_lock_mode = einfo->ei_mode;
608         it->it_lock_handle = lockh->cookie;
609         it->it_request = req;
610
611         /* Technically speaking rq_transno must already be zero if
612          * it_status is in error, so the check is a bit redundant */
613         if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
614                 mdc_clear_replay_flag(req, it->it_status);
615
616         /* If we're doing an IT_OPEN which did not result in an actual
617          * successful open, then we need to remove the bit which saves
618          * this request for unconditional replay.
619          *
620          * It's important that we do this first!  Otherwise we might exit the
621          * function without doing so, and try to replay a failed create
622          * (bug 3440) */
623         if (it->it_op & IT_OPEN && req->rq_replay &&
624             (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
625                 mdc_clear_replay_flag(req, it->it_status);
626
627         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
628                   it->it_op, it->it_disposition, it->it_status);
629
630         /* We know what to expect, so we do any byte flipping required here */
631         if (it_has_reply_body(it)) {
632                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
633                 if (body == NULL) {
634                         CERROR ("Can't swab mdt_body\n");
635                         RETURN (-EPROTO);
636                 }
637
638                 if (it_disposition(it, DISP_OPEN_OPEN) &&
639                     !it_open_error(DISP_OPEN_OPEN, it)) {
640                         /*
641                          * If this is a successful OPEN request, we need to set
642                          * replay handler and data early, so that if replay
643                          * happens immediately after swabbing below, new reply
644                          * is swabbed by that handler correctly.
645                          */
646                         mdc_set_open_replay_data(NULL, NULL, it);
647                 }
648
649                 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
650                         void *eadata;
651
652                         mdc_update_max_ea_from_body(exp, body);
653
654                         /*
655                          * The eadata is opaque; just check that it is there.
656                          * Eventually, obd_unpackmd() will check the contents.
657                          */
658                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
659                                                         body->mbo_eadatasize);
660                         if (eadata == NULL)
661                                 RETURN(-EPROTO);
662
663                         /* save lvb data and length in case this is for layout
664                          * lock */
665                         lvb_data = eadata;
666                         lvb_len = body->mbo_eadatasize;
667
668                         /*
669                          * We save the reply LOV EA in case we have to replay a
670                          * create for recovery.  If we didn't allocate a large
671                          * enough request buffer above we need to reallocate it
672                          * here to hold the actual LOV EA.
673                          *
674                          * To not save LOV EA if request is not going to replay
675                          * (for example error one).
676                          */
677                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
678                                 rc = mdc_save_lovea(req, &RMF_EADATA, eadata,
679                                                     body->mbo_eadatasize);
680                                 if (rc) {
681                                         body->mbo_valid &= ~OBD_MD_FLEASIZE;
682                                         body->mbo_eadatasize = 0;
683                                         rc = 0;
684                                 }
685                         }
686                 }
687         } else if (it->it_op & IT_LAYOUT) {
688                 /* maybe the lock was granted right away and layout
689                  * is packed into RMF_DLM_LVB of req */
690                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
691                 if (lvb_len > 0) {
692                         lvb_data = req_capsule_server_sized_get(pill,
693                                                         &RMF_DLM_LVB, lvb_len);
694                         if (lvb_data == NULL)
695                                 RETURN(-EPROTO);
696
697                         /**
698                          * save replied layout data to the request buffer for
699                          * recovery consideration (lest MDS reinitialize
700                          * another set of OST objects).
701                          */
702                         if (req->rq_transno)
703                                 (void)mdc_save_lovea(req, &RMF_EADATA, lvb_data,
704                                                      lvb_len);
705                 }
706         }
707
708         /* fill in stripe data for layout lock.
709          * LU-6581: trust layout data only if layout lock is granted. The MDT
710          * has stopped sending layout unless the layout lock is granted. The
711          * client still does this checking in case it's talking with an old
712          * server. - Jinshan */
713         lock = ldlm_handle2lock(lockh);
714         if (lock == NULL)
715                 RETURN(rc);
716
717         if (ldlm_has_layout(lock) && lvb_data != NULL &&
718             !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
719                 void *lmm;
720
721                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
722                         ldlm_it2str(it->it_op), lvb_len);
723
724                 OBD_ALLOC_LARGE(lmm, lvb_len);
725                 if (lmm == NULL)
726                         GOTO(out_lock, rc = -ENOMEM);
727
728                 memcpy(lmm, lvb_data, lvb_len);
729
730                 /* install lvb_data */
731                 lock_res_and_lock(lock);
732                 if (lock->l_lvb_data == NULL) {
733                         lock->l_lvb_type = LVB_T_LAYOUT;
734                         lock->l_lvb_data = lmm;
735                         lock->l_lvb_len = lvb_len;
736                         lmm = NULL;
737                 }
738                 unlock_res_and_lock(lock);
739                 if (lmm != NULL)
740                         OBD_FREE_LARGE(lmm, lvb_len);
741         }
742
743         if (ldlm_has_dom(lock)) {
744                 LASSERT(lock->l_glimpse_ast == mdc_ldlm_glimpse_ast);
745
746                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
747                 if (!(body->mbo_valid & OBD_MD_DOM_SIZE)) {
748                         LDLM_ERROR(lock, "%s: DoM lock without size.\n",
749                                    exp->exp_obd->obd_name);
750                         GOTO(out_lock, rc = -EPROTO);
751                 }
752
753                 LDLM_DEBUG(lock, "DoM lock is returned by: %s, size: %llu",
754                            ldlm_it2str(it->it_op), body->mbo_dom_size);
755
756                 rc = mdc_fill_lvb(req, &lock->l_ost_lvb);
757         }
758 out_lock:
759         LDLM_LOCK_PUT(lock);
760
761         RETURN(rc);
762 }
763
764 /* We always reserve enough space in the reply packet for a stripe MD, because
765  * we don't know in advance the file type. */
766 static int mdc_enqueue_base(struct obd_export *exp,
767                             struct ldlm_enqueue_info *einfo,
768                             const union ldlm_policy_data *policy,
769                             struct lookup_intent *it,
770                             struct md_op_data *op_data,
771                             struct lustre_handle *lockh,
772                             __u64 extra_lock_flags)
773 {
774         struct obd_device *obddev = class_exp2obd(exp);
775         struct ptlrpc_request *req = NULL;
776         __u64 flags, saved_flags = extra_lock_flags;
777         struct ldlm_res_id res_id;
778         static const union ldlm_policy_data lookup_policy = {
779                                   .l_inodebits = { MDS_INODELOCK_LOOKUP } };
780         static const union ldlm_policy_data update_policy = {
781                                   .l_inodebits = { MDS_INODELOCK_UPDATE } };
782         static const union ldlm_policy_data layout_policy = {
783                                   .l_inodebits = { MDS_INODELOCK_LAYOUT } };
784         static const union ldlm_policy_data getxattr_policy = {
785                                   .l_inodebits = { MDS_INODELOCK_XATTR } };
786         int generation, resends = 0;
787         struct ldlm_reply *lockrep;
788         struct obd_import *imp = class_exp2cliimp(exp);
789         __u32 acl_bufsize;
790         enum lvb_type lvb_type = 0;
791         int rc;
792         ENTRY;
793
794         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
795                  einfo->ei_type);
796         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
797
798         if (it != NULL) {
799                 LASSERT(policy == NULL);
800
801                 saved_flags |= LDLM_FL_HAS_INTENT;
802                 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
803                         policy = &update_policy;
804                 else if (it->it_op & IT_LAYOUT)
805                         policy = &layout_policy;
806                 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
807                         policy = &getxattr_policy;
808                 else
809                         policy = &lookup_policy;
810         }
811
812         generation = obddev->u.cli.cl_import->imp_generation;
813         if (!it || (it->it_op & (IT_CREAT | IT_OPEN_CREAT)))
814                 acl_bufsize = imp->imp_connect_data.ocd_max_easize;
815         else
816                 acl_bufsize = LUSTRE_POSIX_ACL_MAX_SIZE_OLD;
817
818 resend:
819         flags = saved_flags;
820         if (it == NULL) {
821                 /* The only way right now is FLOCK. */
822                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
823                          einfo->ei_type);
824                 res_id.name[3] = LDLM_FLOCK;
825         } else if (it->it_op & IT_OPEN) {
826                 req = mdc_intent_open_pack(exp, it, op_data, acl_bufsize);
827         } else if (it->it_op & IT_UNLINK) {
828                 req = mdc_intent_unlink_pack(exp, it, op_data);
829         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
830                 req = mdc_intent_getattr_pack(exp, it, op_data, acl_bufsize);
831         } else if (it->it_op & IT_READDIR) {
832                 req = mdc_enqueue_pack(exp, 0);
833         } else if (it->it_op & IT_LAYOUT) {
834                 if (!imp_connect_lvb_type(imp))
835                         RETURN(-EOPNOTSUPP);
836                 req = mdc_intent_layout_pack(exp, it, op_data);
837                 lvb_type = LVB_T_LAYOUT;
838         } else if (it->it_op & IT_GETXATTR) {
839                 req = mdc_intent_getxattr_pack(exp, it, op_data);
840         } else {
841                 LBUG();
842                 RETURN(-EINVAL);
843         }
844
845         if (IS_ERR(req))
846                 RETURN(PTR_ERR(req));
847
848         if (resends) {
849                 req->rq_generation_set = 1;
850                 req->rq_import_generation = generation;
851                 req->rq_sent = ktime_get_real_seconds() + resends;
852         }
853
854         /* It is important to obtain modify RPC slot first (if applicable), so
855          * that threads that are waiting for a modify RPC slot are not polluting
856          * our rpcs in flight counter.
857          * We do not do flock request limiting, though */
858         if (it) {
859                 mdc_get_mod_rpc_slot(req, it);
860                 rc = obd_get_request_slot(&obddev->u.cli);
861                 if (rc != 0) {
862                         mdc_put_mod_rpc_slot(req, it);
863                         mdc_clear_replay_flag(req, 0);
864                         ptlrpc_req_finished(req);
865                         RETURN(rc);
866                 }
867         }
868
869         /* With Data-on-MDT the glimpse callback is needed too.
870          * It is set here in advance but not in mdc_finish_enqueue()
871          * to avoid possible races. It is safe to have glimpse handler
872          * for non-DOM locks and costs nothing.*/
873         if (einfo->ei_cb_gl == NULL)
874                 einfo->ei_cb_gl = mdc_ldlm_glimpse_ast;
875
876         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
877                               0, lvb_type, lockh, 0);
878         if (!it) {
879                 /* For flock requests we immediatelly return without further
880                    delay and let caller deal with the rest, since rest of
881                    this function metadata processing makes no sense for flock
882                    requests anyway. But in case of problem during comms with
883                    Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
884                    can not rely on caller and this mainly for F_UNLCKs
885                    (explicits or automatically generated by Kernel to clean
886                    current FLocks upon exit) that can't be trashed */
887                 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
888                     (einfo->ei_type == LDLM_FLOCK) &&
889                     (einfo->ei_mode == LCK_NL))
890                         goto resend;
891                 RETURN(rc);
892         }
893
894         obd_put_request_slot(&obddev->u.cli);
895         mdc_put_mod_rpc_slot(req, it);
896
897         if (rc < 0) {
898                 CDEBUG(D_INFO,
899                       "%s: ldlm_cli_enqueue "DFID":"DFID"=%s failed: rc = %d\n",
900                       obddev->obd_name, PFID(&op_data->op_fid1),
901                       PFID(&op_data->op_fid2), op_data->op_name ?: "", rc);
902
903                 mdc_clear_replay_flag(req, rc);
904                 ptlrpc_req_finished(req);
905                 RETURN(rc);
906         }
907
908         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
909         LASSERT(lockrep != NULL);
910
911         lockrep->lock_policy_res2 =
912                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
913
914         /* Retry infinitely when the server returns -EINPROGRESS for the
915          * intent operation, when server returns -EINPROGRESS for acquiring
916          * intent lock, we'll retry in after_reply(). */
917         if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
918                 mdc_clear_replay_flag(req, rc);
919                 ptlrpc_req_finished(req);
920                 if (generation == obddev->u.cli.cl_import->imp_generation) {
921                         if (signal_pending(current))
922                                 RETURN(-EINTR);
923
924                         resends++;
925                         CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
926                                obddev->obd_name, resends, it->it_op,
927                                PFID(&op_data->op_fid1),
928                                PFID(&op_data->op_fid2));
929                         goto resend;
930                 } else {
931                         CDEBUG(D_HA, "resend cross eviction\n");
932                         RETURN(-EIO);
933                 }
934         }
935
936         if ((int)lockrep->lock_policy_res2 == -ERANGE &&
937             it->it_op & (IT_OPEN | IT_GETATTR | IT_LOOKUP) &&
938             acl_bufsize != imp->imp_connect_data.ocd_max_easize) {
939                 mdc_clear_replay_flag(req, -ERANGE);
940                 ptlrpc_req_finished(req);
941                 acl_bufsize = imp->imp_connect_data.ocd_max_easize;
942                 goto resend;
943         }
944
945         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
946         if (rc < 0) {
947                 if (lustre_handle_is_used(lockh)) {
948                         ldlm_lock_decref(lockh, einfo->ei_mode);
949                         memset(lockh, 0, sizeof(*lockh));
950                 }
951                 ptlrpc_req_finished(req);
952
953                 it->it_lock_handle = 0;
954                 it->it_lock_mode = 0;
955                 it->it_request = NULL;
956         }
957
958         RETURN(rc);
959 }
960
961 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
962                 const union ldlm_policy_data *policy,
963                 struct md_op_data *op_data,
964                 struct lustre_handle *lockh, __u64 extra_lock_flags)
965 {
966         return mdc_enqueue_base(exp, einfo, policy, NULL,
967                                 op_data, lockh, extra_lock_flags);
968 }
969
970 static int mdc_finish_intent_lock(struct obd_export *exp,
971                                   struct ptlrpc_request *request,
972                                   struct md_op_data *op_data,
973                                   struct lookup_intent *it,
974                                   struct lustre_handle *lockh)
975 {
976         struct lustre_handle old_lock;
977         struct ldlm_lock *lock;
978         int rc = 0;
979         ENTRY;
980
981         LASSERT(request != NULL);
982         LASSERT(request != LP_POISON);
983         LASSERT(request->rq_repmsg != LP_POISON);
984
985         if (it->it_op & IT_READDIR)
986                 RETURN(0);
987
988         if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) {
989                 if (it->it_status != 0)
990                         GOTO(out, rc = it->it_status);
991         } else {
992                 if (!it_disposition(it, DISP_IT_EXECD)) {
993                         /* The server failed before it even started executing
994                          * the intent, i.e. because it couldn't unpack the
995                          * request.
996                          */
997                         LASSERT(it->it_status != 0);
998                         GOTO(out, rc = it->it_status);
999                 }
1000                 rc = it_open_error(DISP_IT_EXECD, it);
1001                 if (rc)
1002                         GOTO(out, rc);
1003
1004                 rc = it_open_error(DISP_LOOKUP_EXECD, it);
1005                 if (rc)
1006                         GOTO(out, rc);
1007
1008                 /* keep requests around for the multiple phases of the call
1009                  * this shows the DISP_XX must guarantee we make it into the
1010                  * call
1011                  */
1012                 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
1013                     it_disposition(it, DISP_OPEN_CREATE) &&
1014                     !it_open_error(DISP_OPEN_CREATE, it)) {
1015                         it_set_disposition(it, DISP_ENQ_CREATE_REF);
1016                         /* balanced in ll_create_node */
1017                         ptlrpc_request_addref(request);
1018                 }
1019                 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1020                     it_disposition(it, DISP_OPEN_OPEN) &&
1021                     !it_open_error(DISP_OPEN_OPEN, it)) {
1022                         it_set_disposition(it, DISP_ENQ_OPEN_REF);
1023                         /* balanced in ll_file_open */
1024                         ptlrpc_request_addref(request);
1025                         /* BUG 11546 - eviction in the middle of open rpc
1026                          * processing
1027                          */
1028                         OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE,
1029                                          obd_timeout);
1030                 }
1031
1032                 if (it->it_op & IT_CREAT) {
1033                         /* XXX this belongs in ll_create_it */
1034                 } else if (it->it_op == IT_OPEN) {
1035                         LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1036                 } else {
1037                         LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
1038                 }
1039         }
1040
1041         /* If we already have a matching lock, then cancel the new
1042          * one.  We have to set the data here instead of in
1043          * mdc_enqueue, because we need to use the child's inode as
1044          * the l_ast_data to match, and that's not available until
1045          * intent_finish has performed the iget().) */
1046         lock = ldlm_handle2lock(lockh);
1047         if (lock) {
1048                 union ldlm_policy_data policy = lock->l_policy_data;
1049                 LDLM_DEBUG(lock, "matching against this");
1050
1051                 if (it_has_reply_body(it)) {
1052                         struct mdt_body *body;
1053
1054                         body = req_capsule_server_get(&request->rq_pill,
1055                                                       &RMF_MDT_BODY);
1056                         /* mdc_enqueue checked */
1057                         LASSERT(body != NULL);
1058                         LASSERTF(fid_res_name_eq(&body->mbo_fid1,
1059                                                  &lock->l_resource->lr_name),
1060                                  "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1061                                  PLDLMRES(lock->l_resource),
1062                                  PFID(&body->mbo_fid1));
1063                 }
1064                 LDLM_LOCK_PUT(lock);
1065
1066                 memcpy(&old_lock, lockh, sizeof(*lockh));
1067                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1068                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1069                         ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
1070                         memcpy(lockh, &old_lock, sizeof(old_lock));
1071                         it->it_lock_handle = lockh->cookie;
1072                 }
1073         }
1074
1075         EXIT;
1076 out:
1077         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1078                 (int)op_data->op_namelen, op_data->op_name,
1079                 ldlm_it2str(it->it_op), it->it_status,
1080                 it->it_disposition, rc);
1081         return rc;
1082 }
1083
1084 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1085                         struct lu_fid *fid, __u64 *bits)
1086 {
1087         /* We could just return 1 immediately, but since we should only
1088          * be called in revalidate_it if we already have a lock, let's
1089          * verify that. */
1090         struct ldlm_res_id res_id;
1091         struct lustre_handle lockh;
1092         union ldlm_policy_data policy;
1093         enum ldlm_mode mode;
1094         ENTRY;
1095
1096         if (it->it_lock_handle) {
1097                 lockh.cookie = it->it_lock_handle;
1098                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1099         } else {
1100                 fid_build_reg_res_name(fid, &res_id);
1101                 switch (it->it_op) {
1102                 case IT_GETATTR:
1103                         /* File attributes are held under multiple bits:
1104                          * nlink is under lookup lock, size and times are
1105                          * under UPDATE lock and recently we've also got
1106                          * a separate permissions lock for owner/group/acl that
1107                          * were protected by lookup lock before.
1108                          * Getattr must provide all of that information,
1109                          * so we need to ensure we have all of those locks.
1110                          * Unfortunately, if the bits are split across multiple
1111                          * locks, there's no easy way to match all of them here,
1112                          * so an extra RPC would be performed to fetch all
1113                          * of those bits at once for now. */
1114                         /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1115                          * but for old MDTs (< 2.4), permission is covered
1116                          * by LOOKUP lock, so it needs to match all bits here.*/
1117                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1118                                                   MDS_INODELOCK_LOOKUP |
1119                                                   MDS_INODELOCK_PERM;
1120                         break;
1121                 case IT_READDIR:
1122                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1123                         break;
1124                 case IT_LAYOUT:
1125                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1126                         break;
1127                 default:
1128                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1129                         break;
1130                 }
1131
1132                 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1133                                       LDLM_IBITS, &policy,
1134                                       LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1135                                       &lockh);
1136         }
1137
1138         if (mode) {
1139                 it->it_lock_handle = lockh.cookie;
1140                 it->it_lock_mode = mode;
1141         } else {
1142                 it->it_lock_handle = 0;
1143                 it->it_lock_mode = 0;
1144         }
1145
1146         RETURN(!!mode);
1147 }
1148
1149 /*
1150  * This long block is all about fixing up the lock and request state
1151  * so that it is correct as of the moment _before_ the operation was
1152  * applied; that way, the VFS will think that everything is normal and
1153  * call Lustre's regular VFS methods.
1154  *
1155  * If we're performing a creation, that means that unless the creation
1156  * failed with EEXIST, we should fake up a negative dentry.
1157  *
1158  * For everything else, we want to lookup to succeed.
1159  *
1160  * One additional note: if CREATE or OPEN succeeded, we add an extra
1161  * reference to the request because we need to keep it around until
1162  * ll_create/ll_open gets called.
1163  *
1164  * The server will return to us, in it_disposition, an indication of
1165  * exactly what it_status refers to.
1166  *
1167  * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1168  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1169  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1170  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1171  * was successful.
1172  *
1173  * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1174  * child lookup.
1175  */
1176 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1177                     struct lookup_intent *it, struct ptlrpc_request **reqp,
1178                     ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1179 {
1180         struct ldlm_enqueue_info einfo = {
1181                 .ei_type        = LDLM_IBITS,
1182                 .ei_mode        = it_to_lock_mode(it),
1183                 .ei_cb_bl       = cb_blocking,
1184                 .ei_cb_cp       = ldlm_completion_ast,
1185                 .ei_cb_gl       = mdc_ldlm_glimpse_ast,
1186         };
1187         struct lustre_handle lockh;
1188         int rc = 0;
1189         ENTRY;
1190         LASSERT(it);
1191
1192         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1193                 ", intent: %s flags %#llo\n", (int)op_data->op_namelen,
1194                 op_data->op_name, PFID(&op_data->op_fid2),
1195                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1196                 it->it_flags);
1197
1198         lockh.cookie = 0;
1199         if (fid_is_sane(&op_data->op_fid2) &&
1200             (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1201                 /* We could just return 1 immediately, but since we should only
1202                  * be called in revalidate_it if we already have a lock, let's
1203                  * verify that. */
1204                 it->it_lock_handle = 0;
1205                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1206                 /* Only return failure if it was not GETATTR by cfid
1207                    (from inode_revalidate) */
1208                 if (rc || op_data->op_namelen != 0)
1209                         RETURN(rc);
1210         }
1211
1212         /* For case if upper layer did not alloc fid, do it now. */
1213         if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1214                 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1215                 if (rc < 0) {
1216                         CERROR("Can't alloc new fid, rc %d\n", rc);
1217                         RETURN(rc);
1218                 }
1219         }
1220
1221         rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh,
1222                               extra_lock_flags);
1223         if (rc < 0)
1224                 RETURN(rc);
1225
1226         *reqp = it->it_request;
1227         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1228         RETURN(rc);
1229 }
1230
1231 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1232                                               struct ptlrpc_request *req,
1233                                               void *args, int rc)
1234 {
1235         struct mdc_getattr_args  *ga = args;
1236         struct obd_export        *exp = ga->ga_exp;
1237         struct md_enqueue_info   *minfo = ga->ga_minfo;
1238         struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1239         struct lookup_intent     *it;
1240         struct lustre_handle     *lockh;
1241         struct obd_device        *obddev;
1242         struct ldlm_reply        *lockrep;
1243         __u64                     flags = LDLM_FL_HAS_INTENT;
1244         ENTRY;
1245
1246         it    = &minfo->mi_it;
1247         lockh = &minfo->mi_lockh;
1248
1249         obddev = class_exp2obd(exp);
1250
1251         obd_put_request_slot(&obddev->u.cli);
1252         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1253                 rc = -ETIMEDOUT;
1254
1255         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1256                                    &flags, NULL, 0, lockh, rc);
1257         if (rc < 0) {
1258                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1259                 mdc_clear_replay_flag(req, rc);
1260                 GOTO(out, rc);
1261         }
1262
1263         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1264         LASSERT(lockrep != NULL);
1265
1266         lockrep->lock_policy_res2 =
1267                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1268
1269         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1270         if (rc)
1271                 GOTO(out, rc);
1272
1273         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1274         EXIT;
1275
1276 out:
1277         minfo->mi_cb(req, minfo, rc);
1278         return 0;
1279 }
1280
1281 int mdc_intent_getattr_async(struct obd_export *exp,
1282                              struct md_enqueue_info *minfo)
1283 {
1284         struct md_op_data       *op_data = &minfo->mi_data;
1285         struct lookup_intent    *it = &minfo->mi_it;
1286         struct ptlrpc_request   *req;
1287         struct mdc_getattr_args *ga;
1288         struct obd_device       *obddev = class_exp2obd(exp);
1289         struct ldlm_res_id       res_id;
1290         union ldlm_policy_data policy = {
1291                                 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1292                                                  MDS_INODELOCK_UPDATE } };
1293         int                      rc = 0;
1294         __u64                    flags = LDLM_FL_HAS_INTENT;
1295         ENTRY;
1296
1297         CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#llo\n",
1298                 (int)op_data->op_namelen, op_data->op_name,
1299                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1300
1301         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1302         /* If the MDT return -ERANGE because of large ACL, then the sponsor
1303          * of the async getattr RPC will handle that by itself. */
1304         req = mdc_intent_getattr_pack(exp, it, op_data,
1305                                       LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
1306         if (IS_ERR(req))
1307                 RETURN(PTR_ERR(req));
1308
1309         rc = obd_get_request_slot(&obddev->u.cli);
1310         if (rc != 0) {
1311                 ptlrpc_req_finished(req);
1312                 RETURN(rc);
1313         }
1314
1315         /* With Data-on-MDT the glimpse callback is needed too.
1316          * It is set here in advance but not in mdc_finish_enqueue()
1317          * to avoid possible races. It is safe to have glimpse handler
1318          * for non-DOM locks and costs nothing.*/
1319         if (minfo->mi_einfo.ei_cb_gl == NULL)
1320                 minfo->mi_einfo.ei_cb_gl = mdc_ldlm_glimpse_ast;
1321
1322         rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1323                               &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1324         if (rc < 0) {
1325                 obd_put_request_slot(&obddev->u.cli);
1326                 ptlrpc_req_finished(req);
1327                 RETURN(rc);
1328         }
1329
1330         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1331         ga = ptlrpc_req_async_args(req);
1332         ga->ga_exp = exp;
1333         ga->ga_minfo = minfo;
1334
1335         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1336         ptlrpcd_add_req(req);
1337
1338         RETURN(0);
1339 }