Whamcloud - gitweb
32587cfa60df4a4e7be3b3e878fbf32787fa4494
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_MDC
34
35 #include <linux/module.h>
36
37 #include <obd.h>
38 #include <obd_class.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_intent.h>
42 #include <lustre_mdc.h>
43 #include <lustre_net.h>
44 #include <lustre_req_layout.h>
45 #include <lustre_swab.h>
46 #include <lustre_acl.h>
47
48 #include "mdc_internal.h"
49
50 struct mdc_getattr_args {
51         struct obd_export               *ga_exp;
52         struct md_enqueue_info          *ga_minfo;
53 };
54
55 int it_open_error(int phase, struct lookup_intent *it)
56 {
57         if (it_disposition(it, DISP_OPEN_LEASE)) {
58                 if (phase >= DISP_OPEN_LEASE)
59                         return it->it_status;
60                 else
61                         return 0;
62         }
63         if (it_disposition(it, DISP_OPEN_OPEN)) {
64                 if (phase >= DISP_OPEN_OPEN)
65                         return it->it_status;
66                 else
67                         return 0;
68         }
69
70         if (it_disposition(it, DISP_OPEN_CREATE)) {
71                 if (phase >= DISP_OPEN_CREATE)
72                         return it->it_status;
73                 else
74                         return 0;
75         }
76
77         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
78                 if (phase >= DISP_LOOKUP_EXECD)
79                         return it->it_status;
80                 else
81                         return 0;
82         }
83
84         if (it_disposition(it, DISP_IT_EXECD)) {
85                 if (phase >= DISP_IT_EXECD)
86                         return it->it_status;
87                 else
88                         return 0;
89         }
90
91         CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
92         LBUG();
93
94         return 0;
95 }
96 EXPORT_SYMBOL(it_open_error);
97
98 /* this must be called on a lockh that is known to have a referenced lock */
99 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
100                       void *data, __u64 *bits)
101 {
102         struct ldlm_lock *lock;
103         struct inode *new_inode = data;
104         ENTRY;
105
106         if(bits)
107                 *bits = 0;
108
109         if (!lustre_handle_is_used(lockh))
110                 RETURN(0);
111
112         lock = ldlm_handle2lock(lockh);
113
114         LASSERT(lock != NULL);
115         lock_res_and_lock(lock);
116         if (lock->l_resource->lr_lvb_inode &&
117             lock->l_resource->lr_lvb_inode != data) {
118                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
119                 LASSERTF(old_inode->i_state & I_FREEING,
120                          "Found existing inode %p/%lu/%u state %lu in lock: "
121                          "setting data to %p/%lu/%u\n", old_inode,
122                          old_inode->i_ino, old_inode->i_generation,
123                          old_inode->i_state,
124                          new_inode, new_inode->i_ino, new_inode->i_generation);
125         }
126         lock->l_resource->lr_lvb_inode = new_inode;
127         if (bits)
128                 *bits = lock->l_policy_data.l_inodebits.bits;
129
130         unlock_res_and_lock(lock);
131         LDLM_LOCK_PUT(lock);
132
133         RETURN(0);
134 }
135
136 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
137                               const struct lu_fid *fid, enum ldlm_type type,
138                               union ldlm_policy_data *policy,
139                               enum ldlm_mode mode, struct lustre_handle *lockh)
140 {
141         struct ldlm_res_id res_id;
142         enum ldlm_mode rc;
143         ENTRY;
144
145         fid_build_reg_res_name(fid, &res_id);
146         /* LU-4405: Clear bits not supported by server */
147         policy->l_inodebits.bits &= exp_connect_ibits(exp);
148         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
149                              &res_id, type, policy, mode, lockh, 0);
150         RETURN(rc);
151 }
152
153 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
154                       union ldlm_policy_data *policy, enum ldlm_mode mode,
155                       enum ldlm_cancel_flags flags, void *opaque)
156 {
157         struct obd_device *obd = class_exp2obd(exp);
158         struct ldlm_res_id res_id;
159         int rc;
160
161         ENTRY;
162
163         fid_build_reg_res_name(fid, &res_id);
164         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
165                                              policy, mode, flags, opaque);
166         RETURN(rc);
167 }
168
169 int mdc_null_inode(struct obd_export *exp,
170                    const struct lu_fid *fid)
171 {
172         struct ldlm_res_id res_id;
173         struct ldlm_resource *res;
174         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
175         ENTRY;
176
177         LASSERTF(ns != NULL, "no namespace passed\n");
178
179         fid_build_reg_res_name(fid, &res_id);
180
181         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
182         if (IS_ERR(res))
183                 RETURN(0);
184
185         lock_res(res);
186         res->lr_lvb_inode = NULL;
187         unlock_res(res);
188
189         ldlm_resource_putref(res);
190         RETURN(0);
191 }
192
193 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
194 {
195         /* Don't hold error requests for replay. */
196         if (req->rq_replay) {
197                 spin_lock(&req->rq_lock);
198                 req->rq_replay = 0;
199                 spin_unlock(&req->rq_lock);
200         }
201         if (rc && req->rq_transno != 0) {
202                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
203                 LBUG();
204         }
205 }
206
207 /* Save a large LOV EA into the request buffer so that it is available
208  * for replay.  We don't do this in the initial request because the
209  * original request doesn't need this buffer (at most it sends just the
210  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
211  * buffer and may also be difficult to allocate and save a very large
212  * request buffer for each open. (bug 5707)
213  *
214  * OOM here may cause recovery failure if lmm is needed (only for the
215  * original open if the MDS crashed just when this client also OOM'd)
216  * but this is incredibly unlikely, and questionable whether the client
217  * could do MDS recovery under OOM anyways... */
218 int mdc_save_lovea(struct ptlrpc_request *req,
219                    const struct req_msg_field *field,
220                    void *data, u32 size)
221 {
222         struct req_capsule *pill = &req->rq_pill;
223         void *lmm;
224         int rc = 0;
225
226         if (req_capsule_get_size(pill, field, RCL_CLIENT) < size) {
227                 rc = sptlrpc_cli_enlarge_reqbuf(req, field, size);
228                 if (rc) {
229                         CERROR("%s: Can't enlarge ea size to %d: rc = %d\n",
230                                req->rq_export->exp_obd->obd_name,
231                                size, rc);
232                         return rc;
233                 }
234         } else {
235                 req_capsule_shrink(pill, field, size, RCL_CLIENT);
236         }
237
238         req_capsule_set_size(pill, field, RCL_CLIENT, size);
239         lmm = req_capsule_client_get(pill, field);
240         if (lmm)
241                 memcpy(lmm, data, size);
242
243         return rc;
244 }
245
246 static struct ptlrpc_request *
247 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
248                      struct md_op_data *op_data, __u32 acl_bufsize)
249 {
250         struct ptlrpc_request   *req;
251         struct obd_device       *obddev = class_exp2obd(exp);
252         struct ldlm_intent      *lit;
253         const void              *lmm = op_data->op_data;
254         __u32                    lmmsize = op_data->op_data_size;
255         struct list_head         cancels = LIST_HEAD_INIT(cancels);
256         int                      count = 0;
257         enum ldlm_mode           mode;
258         int                      rc;
259         int repsize;
260
261         ENTRY;
262
263         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
264
265         /* XXX: openlock is not cancelled for cross-refs. */
266         /* If inode is known, cancel conflicting OPEN locks. */
267         if (fid_is_sane(&op_data->op_fid2)) {
268                 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
269                         if (it->it_flags & MDS_FMODE_WRITE)
270                                 mode = LCK_EX;
271                         else
272                                 mode = LCK_PR;
273                 } else {
274                         if (it->it_flags & (MDS_FMODE_WRITE | MDS_OPEN_TRUNC))
275                                 mode = LCK_CW;
276 #ifdef FMODE_EXEC
277                         else if (it->it_flags & FMODE_EXEC)
278                                 mode = LCK_PR;
279 #endif
280                         else
281                                 mode = LCK_CR;
282                 }
283                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
284                                                 &cancels, mode,
285                                                 MDS_INODELOCK_OPEN);
286         }
287
288         /* If CREATE, cancel parent's UPDATE lock. */
289         if (it->it_op & IT_CREAT)
290                 mode = LCK_EX;
291         else
292                 mode = LCK_CR;
293         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
294                                          &cancels, mode,
295                                          MDS_INODELOCK_UPDATE);
296
297         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
298                                    &RQF_LDLM_INTENT_OPEN);
299         if (req == NULL) {
300                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
301                 RETURN(ERR_PTR(-ENOMEM));
302         }
303
304         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
305                              op_data->op_namelen + 1);
306         if (cl_is_lov_delay_create(it->it_flags)) {
307                 /* open(O_LOV_DELAY_CREATE) won't pack lmm */
308                 LASSERT(lmmsize == 0);
309                 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
310         } else {
311                 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
312                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
313         }
314
315         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
316                              RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
317                              strlen(op_data->op_file_secctx_name) + 1 : 0);
318
319         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
320                              op_data->op_file_secctx_size);
321
322         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
323         if (rc < 0) {
324                 ptlrpc_request_free(req);
325                 RETURN(ERR_PTR(rc));
326         }
327
328         spin_lock(&req->rq_lock);
329         req->rq_replay = req->rq_import->imp_replayable;
330         spin_unlock(&req->rq_lock);
331
332         /* pack the intent */
333         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
334         lit->opc = (__u64)it->it_op;
335
336         /* pack the intended request */
337         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
338                       lmmsize);
339
340         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
341                              obddev->u.cli.cl_max_mds_easize);
342         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
343
344         /**
345          * Inline buffer for possible data from Data-on-MDT files.
346          */
347         req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE, RCL_SERVER,
348                              sizeof(struct niobuf_remote));
349         ptlrpc_request_set_replen(req);
350
351         /* Get real repbuf allocated size as rounded up power of 2 */
352         repsize = size_roundup_power2(req->rq_replen +
353                                       lustre_msg_early_size());
354
355         /* Estimate free space for DoM files in repbuf */
356         repsize -= req->rq_replen - obddev->u.cli.cl_max_mds_easize +
357                    sizeof(struct lov_comp_md_v1) +
358                    sizeof(struct lov_comp_md_entry_v1) +
359                    lov_mds_md_size(0, LOV_MAGIC_V3);
360
361         if (repsize < obddev->u.cli.cl_dom_min_inline_repsize) {
362                 repsize = obddev->u.cli.cl_dom_min_inline_repsize - repsize;
363                 req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE,
364                                      RCL_SERVER,
365                                      sizeof(struct niobuf_remote) + repsize);
366                 ptlrpc_request_set_replen(req);
367                 CDEBUG(D_INFO, "Increase repbuf by %d bytes, total: %d\n",
368                        repsize, req->rq_replen);
369         }
370         return req;
371 }
372
373 #define GA_DEFAULT_EA_NAME_LEN 20
374 #define GA_DEFAULT_EA_VAL_LEN  250
375 #define GA_DEFAULT_EA_NUM      10
376
377 static struct ptlrpc_request *
378 mdc_intent_getxattr_pack(struct obd_export *exp,
379                          struct lookup_intent *it,
380                          struct md_op_data *op_data)
381 {
382         struct ptlrpc_request   *req;
383         struct ldlm_intent      *lit;
384         int                     rc, count = 0;
385         struct list_head        cancels = LIST_HEAD_INIT(cancels);
386         u32 min_buf_size = 0;
387
388         ENTRY;
389
390         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
391                                         &RQF_LDLM_INTENT_GETXATTR);
392         if (req == NULL)
393                 RETURN(ERR_PTR(-ENOMEM));
394
395         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
396         if (rc) {
397                 ptlrpc_request_free(req);
398                 RETURN(ERR_PTR(rc));
399         }
400
401         /* pack the intent */
402         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
403         lit->opc = IT_GETXATTR;
404
405 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 0, 53, 0)
406         /* If the supplied buffer is too small then the server will
407          * return -ERANGE and llite will fallback to using non cached
408          * xattr operations. On servers before 2.10.1 a (non-cached)
409          * listxattr RPC for an orphan or dead file causes an oops. So
410          * let's try to avoid sending too small a buffer to too old a
411          * server. This is effectively undoing the memory conservation
412          * of LU-9417 when it would be *more* likely to crash the
413          * server. See LU-9856. */
414         if (exp->exp_connect_data.ocd_version < OBD_OCD_VERSION(2, 10, 1, 0))
415                 min_buf_size = exp->exp_connect_data.ocd_max_easize;
416 #endif
417
418         /* pack the intended request */
419         mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid,
420                       max_t(u32, min_buf_size,
421                             GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM),
422                       -1, 0);
423
424         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
425                              max_t(u32, min_buf_size,
426                                  GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM));
427
428         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER,
429                              max_t(u32, min_buf_size,
430                                  GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM));
431
432         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS, RCL_SERVER,
433                              max_t(u32, min_buf_size,
434                                  sizeof(__u32) * GA_DEFAULT_EA_NUM));
435
436         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
437
438         ptlrpc_request_set_replen(req);
439
440         RETURN(req);
441 }
442
443 static struct ptlrpc_request *
444 mdc_intent_getattr_pack(struct obd_export *exp, struct lookup_intent *it,
445                         struct md_op_data *op_data, __u32 acl_bufsize)
446 {
447         struct ptlrpc_request   *req;
448         struct obd_device       *obddev = class_exp2obd(exp);
449         u64                      valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
450                                          OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
451                                          OBD_MD_MEA | OBD_MD_FLACL;
452         struct ldlm_intent      *lit;
453         int                      rc;
454         __u32                    easize;
455         ENTRY;
456
457         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
458                                    &RQF_LDLM_INTENT_GETATTR);
459         if (req == NULL)
460                 RETURN(ERR_PTR(-ENOMEM));
461
462         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
463                              op_data->op_namelen + 1);
464
465         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
466         if (rc) {
467                 ptlrpc_request_free(req);
468                 RETURN(ERR_PTR(rc));
469         }
470
471         /* pack the intent */
472         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
473         lit->opc = (__u64)it->it_op;
474
475         if (obddev->u.cli.cl_default_mds_easize > 0)
476                 easize = obddev->u.cli.cl_default_mds_easize;
477         else
478                 easize = obddev->u.cli.cl_max_mds_easize;
479
480         /* pack the intended request */
481         mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
482
483         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
484         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
485         ptlrpc_request_set_replen(req);
486         RETURN(req);
487 }
488
489 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
490                                                      struct lookup_intent *it,
491                                                      struct md_op_data *op_data)
492 {
493         struct obd_device     *obd = class_exp2obd(exp);
494         struct ptlrpc_request *req;
495         struct ldlm_intent    *lit;
496         struct layout_intent  *layout;
497         int rc;
498         ENTRY;
499
500         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
501                                 &RQF_LDLM_INTENT_LAYOUT);
502         if (req == NULL)
503                 RETURN(ERR_PTR(-ENOMEM));
504
505         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
506         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
507         if (rc) {
508                 ptlrpc_request_free(req);
509                 RETURN(ERR_PTR(rc));
510         }
511
512         /* pack the intent */
513         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
514         lit->opc = (__u64)it->it_op;
515
516         /* pack the layout intent request */
517         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
518         LASSERT(op_data->op_data != NULL);
519         LASSERT(op_data->op_data_size == sizeof(*layout));
520         memcpy(layout, op_data->op_data, sizeof(*layout));
521
522         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
523                              obd->u.cli.cl_default_mds_easize);
524         ptlrpc_request_set_replen(req);
525         RETURN(req);
526 }
527
528 static struct ptlrpc_request *
529 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
530 {
531         struct ptlrpc_request *req;
532         int rc;
533         ENTRY;
534
535         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
536         if (req == NULL)
537                 RETURN(ERR_PTR(-ENOMEM));
538
539         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
540         if (rc) {
541                 ptlrpc_request_free(req);
542                 RETURN(ERR_PTR(rc));
543         }
544
545         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
546         ptlrpc_request_set_replen(req);
547         RETURN(req);
548 }
549
550 static int mdc_finish_enqueue(struct obd_export *exp,
551                               struct ptlrpc_request *req,
552                               struct ldlm_enqueue_info *einfo,
553                               struct lookup_intent *it,
554                               struct lustre_handle *lockh,
555                               int rc)
556 {
557         struct req_capsule  *pill = &req->rq_pill;
558         struct ldlm_request *lockreq;
559         struct ldlm_reply   *lockrep;
560         struct ldlm_lock    *lock;
561         struct mdt_body     *body = NULL;
562         void                *lvb_data = NULL;
563         __u32                lvb_len = 0;
564
565         ENTRY;
566
567         LASSERT(rc >= 0);
568         /* Similarly, if we're going to replay this request, we don't want to
569          * actually get a lock, just perform the intent. */
570         if (req->rq_transno || req->rq_replay) {
571                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
572                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
573         }
574
575         if (rc == ELDLM_LOCK_ABORTED) {
576                 einfo->ei_mode = 0;
577                 memset(lockh, 0, sizeof(*lockh));
578                 rc = 0;
579         } else { /* rc = 0 */
580                 lock = ldlm_handle2lock(lockh);
581                 LASSERT(lock != NULL);
582
583                 /* If the server gave us back a different lock mode, we should
584                  * fix up our variables. */
585                 if (lock->l_req_mode != einfo->ei_mode) {
586                         ldlm_lock_addref(lockh, lock->l_req_mode);
587                         ldlm_lock_decref(lockh, einfo->ei_mode);
588                         einfo->ei_mode = lock->l_req_mode;
589                 }
590                 LDLM_LOCK_PUT(lock);
591         }
592
593         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
594         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
595
596         it->it_disposition = (int)lockrep->lock_policy_res1;
597         it->it_status = (int)lockrep->lock_policy_res2;
598         it->it_lock_mode = einfo->ei_mode;
599         it->it_lock_handle = lockh->cookie;
600         it->it_request = req;
601
602         /* Technically speaking rq_transno must already be zero if
603          * it_status is in error, so the check is a bit redundant */
604         if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
605                 mdc_clear_replay_flag(req, it->it_status);
606
607         /* If we're doing an IT_OPEN which did not result in an actual
608          * successful open, then we need to remove the bit which saves
609          * this request for unconditional replay.
610          *
611          * It's important that we do this first!  Otherwise we might exit the
612          * function without doing so, and try to replay a failed create
613          * (bug 3440) */
614         if (it->it_op & IT_OPEN && req->rq_replay &&
615             (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
616                 mdc_clear_replay_flag(req, it->it_status);
617
618         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
619                   it->it_op, it->it_disposition, it->it_status);
620
621         /* We know what to expect, so we do any byte flipping required here */
622         if (it_has_reply_body(it)) {
623                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
624                 if (body == NULL) {
625                         CERROR ("Can't swab mdt_body\n");
626                         RETURN (-EPROTO);
627                 }
628
629                 if (it_disposition(it, DISP_OPEN_OPEN) &&
630                     !it_open_error(DISP_OPEN_OPEN, it)) {
631                         /*
632                          * If this is a successful OPEN request, we need to set
633                          * replay handler and data early, so that if replay
634                          * happens immediately after swabbing below, new reply
635                          * is swabbed by that handler correctly.
636                          */
637                         mdc_set_open_replay_data(NULL, NULL, it);
638                 }
639
640                 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
641                         void *eadata;
642
643                         mdc_update_max_ea_from_body(exp, body);
644
645                         /*
646                          * The eadata is opaque; just check that it is there.
647                          * Eventually, obd_unpackmd() will check the contents.
648                          */
649                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
650                                                         body->mbo_eadatasize);
651                         if (eadata == NULL)
652                                 RETURN(-EPROTO);
653
654                         /* save lvb data and length in case this is for layout
655                          * lock */
656                         lvb_data = eadata;
657                         lvb_len = body->mbo_eadatasize;
658
659                         /*
660                          * We save the reply LOV EA in case we have to replay a
661                          * create for recovery.  If we didn't allocate a large
662                          * enough request buffer above we need to reallocate it
663                          * here to hold the actual LOV EA.
664                          *
665                          * To not save LOV EA if request is not going to replay
666                          * (for example error one).
667                          */
668                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
669                                 rc = mdc_save_lovea(req, &RMF_EADATA, eadata,
670                                                     body->mbo_eadatasize);
671                                 if (rc) {
672                                         body->mbo_valid &= ~OBD_MD_FLEASIZE;
673                                         body->mbo_eadatasize = 0;
674                                         rc = 0;
675                                 }
676                         }
677                 }
678         } else if (it->it_op & IT_LAYOUT) {
679                 /* maybe the lock was granted right away and layout
680                  * is packed into RMF_DLM_LVB of req */
681                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
682                 if (lvb_len > 0) {
683                         lvb_data = req_capsule_server_sized_get(pill,
684                                                         &RMF_DLM_LVB, lvb_len);
685                         if (lvb_data == NULL)
686                                 RETURN(-EPROTO);
687
688                         /**
689                          * save replied layout data to the request buffer for
690                          * recovery consideration (lest MDS reinitialize
691                          * another set of OST objects).
692                          */
693                         if (req->rq_transno)
694                                 (void)mdc_save_lovea(req, &RMF_EADATA, lvb_data,
695                                                      lvb_len);
696                 }
697         }
698
699         /* fill in stripe data for layout lock.
700          * LU-6581: trust layout data only if layout lock is granted. The MDT
701          * has stopped sending layout unless the layout lock is granted. The
702          * client still does this checking in case it's talking with an old
703          * server. - Jinshan */
704         lock = ldlm_handle2lock(lockh);
705         if (lock == NULL)
706                 RETURN(rc);
707
708         if (ldlm_has_layout(lock) && lvb_data != NULL &&
709             !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
710                 void *lmm;
711
712                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
713                         ldlm_it2str(it->it_op), lvb_len);
714
715                 OBD_ALLOC_LARGE(lmm, lvb_len);
716                 if (lmm == NULL)
717                         GOTO(out_lock, rc = -ENOMEM);
718
719                 memcpy(lmm, lvb_data, lvb_len);
720
721                 /* install lvb_data */
722                 lock_res_and_lock(lock);
723                 if (lock->l_lvb_data == NULL) {
724                         lock->l_lvb_type = LVB_T_LAYOUT;
725                         lock->l_lvb_data = lmm;
726                         lock->l_lvb_len = lvb_len;
727                         lmm = NULL;
728                 }
729                 unlock_res_and_lock(lock);
730                 if (lmm != NULL)
731                         OBD_FREE_LARGE(lmm, lvb_len);
732         }
733
734         if (ldlm_has_dom(lock)) {
735                 LASSERT(lock->l_glimpse_ast == mdc_ldlm_glimpse_ast);
736
737                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
738                 if (!(body->mbo_valid & OBD_MD_DOM_SIZE)) {
739                         LDLM_ERROR(lock, "%s: DoM lock without size.\n",
740                                    exp->exp_obd->obd_name);
741                         GOTO(out_lock, rc = -EPROTO);
742                 }
743
744                 LDLM_DEBUG(lock, "DoM lock is returned by: %s, size: %llu",
745                            ldlm_it2str(it->it_op), body->mbo_dom_size);
746
747                 rc = mdc_fill_lvb(req, &lock->l_ost_lvb);
748         }
749 out_lock:
750         LDLM_LOCK_PUT(lock);
751
752         RETURN(rc);
753 }
754
755 /* We always reserve enough space in the reply packet for a stripe MD, because
756  * we don't know in advance the file type. */
757 static int mdc_enqueue_base(struct obd_export *exp,
758                             struct ldlm_enqueue_info *einfo,
759                             const union ldlm_policy_data *policy,
760                             struct lookup_intent *it,
761                             struct md_op_data *op_data,
762                             struct lustre_handle *lockh,
763                             __u64 extra_lock_flags)
764 {
765         struct obd_device *obddev = class_exp2obd(exp);
766         struct ptlrpc_request *req = NULL;
767         __u64 flags, saved_flags = extra_lock_flags;
768         struct ldlm_res_id res_id;
769         static const union ldlm_policy_data lookup_policy = {
770                                   .l_inodebits = { MDS_INODELOCK_LOOKUP } };
771         static const union ldlm_policy_data update_policy = {
772                                   .l_inodebits = { MDS_INODELOCK_UPDATE } };
773         static const union ldlm_policy_data layout_policy = {
774                                   .l_inodebits = { MDS_INODELOCK_LAYOUT } };
775         static const union ldlm_policy_data getxattr_policy = {
776                                   .l_inodebits = { MDS_INODELOCK_XATTR } };
777         int generation, resends = 0;
778         struct ldlm_reply *lockrep;
779         struct obd_import *imp = class_exp2cliimp(exp);
780         __u32 acl_bufsize;
781         enum lvb_type lvb_type = 0;
782         int rc;
783         ENTRY;
784
785         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
786                  einfo->ei_type);
787         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
788
789         if (it != NULL) {
790                 LASSERT(policy == NULL);
791
792                 saved_flags |= LDLM_FL_HAS_INTENT;
793                 if (it->it_op & (IT_GETATTR | IT_READDIR))
794                         policy = &update_policy;
795                 else if (it->it_op & IT_LAYOUT)
796                         policy = &layout_policy;
797                 else if (it->it_op & IT_GETXATTR)
798                         policy = &getxattr_policy;
799                 else
800                         policy = &lookup_policy;
801         }
802
803         generation = obddev->u.cli.cl_import->imp_generation;
804         if (!it || (it->it_op & (IT_OPEN | IT_CREAT)))
805                 acl_bufsize = imp->imp_connect_data.ocd_max_easize;
806         else
807                 acl_bufsize = LUSTRE_POSIX_ACL_MAX_SIZE_OLD;
808
809 resend:
810         flags = saved_flags;
811         if (it == NULL) {
812                 /* The only way right now is FLOCK. */
813                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
814                          einfo->ei_type);
815                 res_id.name[3] = LDLM_FLOCK;
816         } else if (it->it_op & IT_OPEN) {
817                 req = mdc_intent_open_pack(exp, it, op_data, acl_bufsize);
818         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
819                 req = mdc_intent_getattr_pack(exp, it, op_data, acl_bufsize);
820         } else if (it->it_op & IT_READDIR) {
821                 req = mdc_enqueue_pack(exp, 0);
822         } else if (it->it_op & IT_LAYOUT) {
823                 if (!imp_connect_lvb_type(imp))
824                         RETURN(-EOPNOTSUPP);
825                 req = mdc_intent_layout_pack(exp, it, op_data);
826                 lvb_type = LVB_T_LAYOUT;
827         } else if (it->it_op & IT_GETXATTR) {
828                 req = mdc_intent_getxattr_pack(exp, it, op_data);
829         } else {
830                 LBUG();
831                 RETURN(-EINVAL);
832         }
833
834         if (IS_ERR(req))
835                 RETURN(PTR_ERR(req));
836
837         if (resends) {
838                 req->rq_generation_set = 1;
839                 req->rq_import_generation = generation;
840                 req->rq_sent = ktime_get_real_seconds() + resends;
841         }
842
843         /* It is important to obtain modify RPC slot first (if applicable), so
844          * that threads that are waiting for a modify RPC slot are not polluting
845          * our rpcs in flight counter.
846          * We do not do flock request limiting, though */
847         if (it) {
848                 mdc_get_mod_rpc_slot(req, it);
849                 rc = obd_get_request_slot(&obddev->u.cli);
850                 if (rc != 0) {
851                         mdc_put_mod_rpc_slot(req, it);
852                         mdc_clear_replay_flag(req, 0);
853                         ptlrpc_req_finished(req);
854                         RETURN(rc);
855                 }
856         }
857
858         /* With Data-on-MDT the glimpse callback is needed too.
859          * It is set here in advance but not in mdc_finish_enqueue()
860          * to avoid possible races. It is safe to have glimpse handler
861          * for non-DOM locks and costs nothing.*/
862         if (einfo->ei_cb_gl == NULL)
863                 einfo->ei_cb_gl = mdc_ldlm_glimpse_ast;
864
865         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
866                               0, lvb_type, lockh, 0);
867         if (!it) {
868                 /* For flock requests we immediatelly return without further
869                    delay and let caller deal with the rest, since rest of
870                    this function metadata processing makes no sense for flock
871                    requests anyway. But in case of problem during comms with
872                    Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
873                    can not rely on caller and this mainly for F_UNLCKs
874                    (explicits or automatically generated by Kernel to clean
875                    current FLocks upon exit) that can't be trashed */
876                 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
877                     (einfo->ei_type == LDLM_FLOCK) &&
878                     (einfo->ei_mode == LCK_NL))
879                         goto resend;
880                 RETURN(rc);
881         }
882
883         obd_put_request_slot(&obddev->u.cli);
884         mdc_put_mod_rpc_slot(req, it);
885
886         if (rc < 0) {
887                 CDEBUG(D_INFO,
888                       "%s: ldlm_cli_enqueue "DFID":"DFID"=%s failed: rc = %d\n",
889                       obddev->obd_name, PFID(&op_data->op_fid1),
890                       PFID(&op_data->op_fid2), op_data->op_name ?: "", rc);
891
892                 mdc_clear_replay_flag(req, rc);
893                 ptlrpc_req_finished(req);
894                 RETURN(rc);
895         }
896
897         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
898         LASSERT(lockrep != NULL);
899
900         lockrep->lock_policy_res2 =
901                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
902
903         /* Retry infinitely when the server returns -EINPROGRESS for the
904          * intent operation, when server returns -EINPROGRESS for acquiring
905          * intent lock, we'll retry in after_reply(). */
906         if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
907                 mdc_clear_replay_flag(req, rc);
908                 ptlrpc_req_finished(req);
909                 if (generation == obddev->u.cli.cl_import->imp_generation) {
910                         if (signal_pending(current))
911                                 RETURN(-EINTR);
912
913                         resends++;
914                         CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
915                                obddev->obd_name, resends, it->it_op,
916                                PFID(&op_data->op_fid1),
917                                PFID(&op_data->op_fid2));
918                         goto resend;
919                 } else {
920                         CDEBUG(D_HA, "resend cross eviction\n");
921                         RETURN(-EIO);
922                 }
923         }
924
925         if ((int)lockrep->lock_policy_res2 == -ERANGE &&
926             it->it_op & (IT_OPEN | IT_GETATTR | IT_LOOKUP) &&
927             acl_bufsize != imp->imp_connect_data.ocd_max_easize) {
928                 mdc_clear_replay_flag(req, -ERANGE);
929                 ptlrpc_req_finished(req);
930                 acl_bufsize = imp->imp_connect_data.ocd_max_easize;
931                 goto resend;
932         }
933
934         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
935         if (rc < 0) {
936                 if (lustre_handle_is_used(lockh)) {
937                         ldlm_lock_decref(lockh, einfo->ei_mode);
938                         memset(lockh, 0, sizeof(*lockh));
939                 }
940                 ptlrpc_req_finished(req);
941
942                 it->it_lock_handle = 0;
943                 it->it_lock_mode = 0;
944                 it->it_request = NULL;
945         }
946
947         RETURN(rc);
948 }
949
950 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
951                 const union ldlm_policy_data *policy,
952                 struct md_op_data *op_data,
953                 struct lustre_handle *lockh, __u64 extra_lock_flags)
954 {
955         return mdc_enqueue_base(exp, einfo, policy, NULL,
956                                 op_data, lockh, extra_lock_flags);
957 }
958
959 static int mdc_finish_intent_lock(struct obd_export *exp,
960                                   struct ptlrpc_request *request,
961                                   struct md_op_data *op_data,
962                                   struct lookup_intent *it,
963                                   struct lustre_handle *lockh)
964 {
965         struct lustre_handle old_lock;
966         struct ldlm_lock *lock;
967         int rc = 0;
968         ENTRY;
969
970         LASSERT(request != NULL);
971         LASSERT(request != LP_POISON);
972         LASSERT(request->rq_repmsg != LP_POISON);
973
974         if (it->it_op & IT_READDIR)
975                 RETURN(0);
976
977         if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) {
978                 if (it->it_status != 0)
979                         GOTO(out, rc = it->it_status);
980         } else {
981                 if (!it_disposition(it, DISP_IT_EXECD)) {
982                         /* The server failed before it even started executing
983                          * the intent, i.e. because it couldn't unpack the
984                          * request.
985                          */
986                         LASSERT(it->it_status != 0);
987                         GOTO(out, rc = it->it_status);
988                 }
989                 rc = it_open_error(DISP_IT_EXECD, it);
990                 if (rc)
991                         GOTO(out, rc);
992
993                 rc = it_open_error(DISP_LOOKUP_EXECD, it);
994                 if (rc)
995                         GOTO(out, rc);
996
997                 /* keep requests around for the multiple phases of the call
998                  * this shows the DISP_XX must guarantee we make it into the
999                  * call
1000                  */
1001                 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
1002                     it_disposition(it, DISP_OPEN_CREATE) &&
1003                     !it_open_error(DISP_OPEN_CREATE, it)) {
1004                         it_set_disposition(it, DISP_ENQ_CREATE_REF);
1005                         /* balanced in ll_create_node */
1006                         ptlrpc_request_addref(request);
1007                 }
1008                 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1009                     it_disposition(it, DISP_OPEN_OPEN) &&
1010                     !it_open_error(DISP_OPEN_OPEN, it)) {
1011                         it_set_disposition(it, DISP_ENQ_OPEN_REF);
1012                         /* balanced in ll_file_open */
1013                         ptlrpc_request_addref(request);
1014                         /* BUG 11546 - eviction in the middle of open rpc
1015                          * processing
1016                          */
1017                         OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE,
1018                                          obd_timeout);
1019                 }
1020
1021                 if (it->it_op & IT_CREAT) {
1022                         /* XXX this belongs in ll_create_it */
1023                 } else if (it->it_op == IT_OPEN) {
1024                         LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1025                 } else {
1026                         LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
1027                 }
1028         }
1029
1030         /* If we already have a matching lock, then cancel the new
1031          * one.  We have to set the data here instead of in
1032          * mdc_enqueue, because we need to use the child's inode as
1033          * the l_ast_data to match, and that's not available until
1034          * intent_finish has performed the iget().) */
1035         lock = ldlm_handle2lock(lockh);
1036         if (lock) {
1037                 union ldlm_policy_data policy = lock->l_policy_data;
1038                 LDLM_DEBUG(lock, "matching against this");
1039
1040                 if (it_has_reply_body(it)) {
1041                         struct mdt_body *body;
1042
1043                         body = req_capsule_server_get(&request->rq_pill,
1044                                                       &RMF_MDT_BODY);
1045                         /* mdc_enqueue checked */
1046                         LASSERT(body != NULL);
1047                         LASSERTF(fid_res_name_eq(&body->mbo_fid1,
1048                                                  &lock->l_resource->lr_name),
1049                                  "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1050                                  PLDLMRES(lock->l_resource),
1051                                  PFID(&body->mbo_fid1));
1052                 }
1053                 LDLM_LOCK_PUT(lock);
1054
1055                 memcpy(&old_lock, lockh, sizeof(*lockh));
1056                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1057                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1058                         ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
1059                         memcpy(lockh, &old_lock, sizeof(old_lock));
1060                         it->it_lock_handle = lockh->cookie;
1061                 }
1062         }
1063
1064         EXIT;
1065 out:
1066         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1067                 (int)op_data->op_namelen, op_data->op_name,
1068                 ldlm_it2str(it->it_op), it->it_status,
1069                 it->it_disposition, rc);
1070         return rc;
1071 }
1072
1073 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1074                         struct lu_fid *fid, __u64 *bits)
1075 {
1076         /* We could just return 1 immediately, but since we should only
1077          * be called in revalidate_it if we already have a lock, let's
1078          * verify that. */
1079         struct ldlm_res_id res_id;
1080         struct lustre_handle lockh;
1081         union ldlm_policy_data policy;
1082         enum ldlm_mode mode;
1083         ENTRY;
1084
1085         if (it->it_lock_handle) {
1086                 lockh.cookie = it->it_lock_handle;
1087                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1088         } else {
1089                 fid_build_reg_res_name(fid, &res_id);
1090                 switch (it->it_op) {
1091                 case IT_GETATTR:
1092                         /* File attributes are held under multiple bits:
1093                          * nlink is under lookup lock, size and times are
1094                          * under UPDATE lock and recently we've also got
1095                          * a separate permissions lock for owner/group/acl that
1096                          * were protected by lookup lock before.
1097                          * Getattr must provide all of that information,
1098                          * so we need to ensure we have all of those locks.
1099                          * Unfortunately, if the bits are split across multiple
1100                          * locks, there's no easy way to match all of them here,
1101                          * so an extra RPC would be performed to fetch all
1102                          * of those bits at once for now. */
1103                         /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1104                          * but for old MDTs (< 2.4), permission is covered
1105                          * by LOOKUP lock, so it needs to match all bits here.*/
1106                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1107                                                   MDS_INODELOCK_LOOKUP |
1108                                                   MDS_INODELOCK_PERM;
1109                         break;
1110                 case IT_READDIR:
1111                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1112                         break;
1113                 case IT_LAYOUT:
1114                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1115                         break;
1116                 default:
1117                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1118                         break;
1119                 }
1120
1121                 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1122                                       LDLM_IBITS, &policy,
1123                                       LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1124                                       &lockh);
1125         }
1126
1127         if (mode) {
1128                 it->it_lock_handle = lockh.cookie;
1129                 it->it_lock_mode = mode;
1130         } else {
1131                 it->it_lock_handle = 0;
1132                 it->it_lock_mode = 0;
1133         }
1134
1135         RETURN(!!mode);
1136 }
1137
1138 /*
1139  * This long block is all about fixing up the lock and request state
1140  * so that it is correct as of the moment _before_ the operation was
1141  * applied; that way, the VFS will think that everything is normal and
1142  * call Lustre's regular VFS methods.
1143  *
1144  * If we're performing a creation, that means that unless the creation
1145  * failed with EEXIST, we should fake up a negative dentry.
1146  *
1147  * For everything else, we want to lookup to succeed.
1148  *
1149  * One additional note: if CREATE or OPEN succeeded, we add an extra
1150  * reference to the request because we need to keep it around until
1151  * ll_create/ll_open gets called.
1152  *
1153  * The server will return to us, in it_disposition, an indication of
1154  * exactly what it_status refers to.
1155  *
1156  * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1157  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1158  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1159  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1160  * was successful.
1161  *
1162  * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1163  * child lookup.
1164  */
1165 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1166                     struct lookup_intent *it, struct ptlrpc_request **reqp,
1167                     ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1168 {
1169         struct ldlm_enqueue_info einfo = {
1170                 .ei_type        = LDLM_IBITS,
1171                 .ei_mode        = it_to_lock_mode(it),
1172                 .ei_cb_bl       = cb_blocking,
1173                 .ei_cb_cp       = ldlm_completion_ast,
1174                 .ei_cb_gl       = mdc_ldlm_glimpse_ast,
1175         };
1176         struct lustre_handle lockh;
1177         int rc = 0;
1178         ENTRY;
1179         LASSERT(it);
1180
1181         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1182                 ", intent: %s flags %#llo\n", (int)op_data->op_namelen,
1183                 op_data->op_name, PFID(&op_data->op_fid2),
1184                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1185                 it->it_flags);
1186
1187         lockh.cookie = 0;
1188         if (fid_is_sane(&op_data->op_fid2) &&
1189             (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1190                 /* We could just return 1 immediately, but since we should only
1191                  * be called in revalidate_it if we already have a lock, let's
1192                  * verify that. */
1193                 it->it_lock_handle = 0;
1194                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1195                 /* Only return failure if it was not GETATTR by cfid
1196                    (from inode_revalidate) */
1197                 if (rc || op_data->op_namelen != 0)
1198                         RETURN(rc);
1199         }
1200
1201         /* For case if upper layer did not alloc fid, do it now. */
1202         if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1203                 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1204                 if (rc < 0) {
1205                         CERROR("Can't alloc new fid, rc %d\n", rc);
1206                         RETURN(rc);
1207                 }
1208         }
1209
1210         rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh,
1211                               extra_lock_flags);
1212         if (rc < 0)
1213                 RETURN(rc);
1214
1215         *reqp = it->it_request;
1216         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1217         RETURN(rc);
1218 }
1219
1220 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1221                                               struct ptlrpc_request *req,
1222                                               void *args, int rc)
1223 {
1224         struct mdc_getattr_args  *ga = args;
1225         struct obd_export        *exp = ga->ga_exp;
1226         struct md_enqueue_info   *minfo = ga->ga_minfo;
1227         struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1228         struct lookup_intent     *it;
1229         struct lustre_handle     *lockh;
1230         struct obd_device        *obddev;
1231         struct ldlm_reply        *lockrep;
1232         __u64                     flags = LDLM_FL_HAS_INTENT;
1233         ENTRY;
1234
1235         it    = &minfo->mi_it;
1236         lockh = &minfo->mi_lockh;
1237
1238         obddev = class_exp2obd(exp);
1239
1240         obd_put_request_slot(&obddev->u.cli);
1241         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1242                 rc = -ETIMEDOUT;
1243
1244         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1245                                    &flags, NULL, 0, lockh, rc);
1246         if (rc < 0) {
1247                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1248                 mdc_clear_replay_flag(req, rc);
1249                 GOTO(out, rc);
1250         }
1251
1252         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1253         LASSERT(lockrep != NULL);
1254
1255         lockrep->lock_policy_res2 =
1256                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1257
1258         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1259         if (rc)
1260                 GOTO(out, rc);
1261
1262         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1263         EXIT;
1264
1265 out:
1266         minfo->mi_cb(req, minfo, rc);
1267         return 0;
1268 }
1269
1270 int mdc_intent_getattr_async(struct obd_export *exp,
1271                              struct md_enqueue_info *minfo)
1272 {
1273         struct md_op_data       *op_data = &minfo->mi_data;
1274         struct lookup_intent    *it = &minfo->mi_it;
1275         struct ptlrpc_request   *req;
1276         struct mdc_getattr_args *ga;
1277         struct obd_device       *obddev = class_exp2obd(exp);
1278         struct ldlm_res_id       res_id;
1279         union ldlm_policy_data policy = {
1280                                 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1281                                                  MDS_INODELOCK_UPDATE } };
1282         int                      rc = 0;
1283         __u64                    flags = LDLM_FL_HAS_INTENT;
1284         ENTRY;
1285
1286         CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#llo\n",
1287                 (int)op_data->op_namelen, op_data->op_name,
1288                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1289
1290         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1291         /* If the MDT return -ERANGE because of large ACL, then the sponsor
1292          * of the async getattr RPC will handle that by itself. */
1293         req = mdc_intent_getattr_pack(exp, it, op_data,
1294                                       LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
1295         if (IS_ERR(req))
1296                 RETURN(PTR_ERR(req));
1297
1298         rc = obd_get_request_slot(&obddev->u.cli);
1299         if (rc != 0) {
1300                 ptlrpc_req_finished(req);
1301                 RETURN(rc);
1302         }
1303
1304         /* With Data-on-MDT the glimpse callback is needed too.
1305          * It is set here in advance but not in mdc_finish_enqueue()
1306          * to avoid possible races. It is safe to have glimpse handler
1307          * for non-DOM locks and costs nothing.*/
1308         if (minfo->mi_einfo.ei_cb_gl == NULL)
1309                 minfo->mi_einfo.ei_cb_gl = mdc_ldlm_glimpse_ast;
1310
1311         rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1312                               &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1313         if (rc < 0) {
1314                 obd_put_request_slot(&obddev->u.cli);
1315                 ptlrpc_req_finished(req);
1316                 RETURN(rc);
1317         }
1318
1319         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1320         ga = ptlrpc_req_async_args(req);
1321         ga->ga_exp = exp;
1322         ga->ga_minfo = minfo;
1323
1324         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1325         ptlrpcd_add_req(req);
1326
1327         RETURN(0);
1328 }