Whamcloud - gitweb
57144eaa566933b03a3e9cd7be03733e495d8190
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_MDC
34
35 #include <linux/module.h>
36
37 #include <obd.h>
38 #include <obd_class.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_intent.h>
42 #include <lustre_mdc.h>
43 #include <lustre_net.h>
44 #include <lustre_req_layout.h>
45 #include <lustre_swab.h>
46 #include <lustre_acl.h>
47
48 #include "mdc_internal.h"
49
50 struct mdc_getattr_args {
51         struct obd_export               *ga_exp;
52         struct md_enqueue_info          *ga_minfo;
53 };
54
55 int it_open_error(int phase, struct lookup_intent *it)
56 {
57         if (it_disposition(it, DISP_OPEN_LEASE)) {
58                 if (phase >= DISP_OPEN_LEASE)
59                         return it->it_status;
60                 else
61                         return 0;
62         }
63         if (it_disposition(it, DISP_OPEN_OPEN)) {
64                 if (phase >= DISP_OPEN_OPEN)
65                         return it->it_status;
66                 else
67                         return 0;
68         }
69
70         if (it_disposition(it, DISP_OPEN_CREATE)) {
71                 if (phase >= DISP_OPEN_CREATE)
72                         return it->it_status;
73                 else
74                         return 0;
75         }
76
77         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
78                 if (phase >= DISP_LOOKUP_EXECD)
79                         return it->it_status;
80                 else
81                         return 0;
82         }
83
84         if (it_disposition(it, DISP_IT_EXECD)) {
85                 if (phase >= DISP_IT_EXECD)
86                         return it->it_status;
87                 else
88                         return 0;
89         }
90
91         CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
92         LBUG();
93
94         return 0;
95 }
96 EXPORT_SYMBOL(it_open_error);
97
98 /* this must be called on a lockh that is known to have a referenced lock */
99 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
100                       void *data, __u64 *bits)
101 {
102         struct ldlm_lock *lock;
103         struct inode *new_inode = data;
104         ENTRY;
105
106         if(bits)
107                 *bits = 0;
108
109         if (!lustre_handle_is_used(lockh))
110                 RETURN(0);
111
112         lock = ldlm_handle2lock(lockh);
113
114         LASSERT(lock != NULL);
115         lock_res_and_lock(lock);
116         if (lock->l_resource->lr_lvb_inode &&
117             lock->l_resource->lr_lvb_inode != data) {
118                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
119                 LASSERTF(old_inode->i_state & I_FREEING,
120                          "Found existing inode %p/%lu/%u state %lu in lock: "
121                          "setting data to %p/%lu/%u\n", old_inode,
122                          old_inode->i_ino, old_inode->i_generation,
123                          old_inode->i_state,
124                          new_inode, new_inode->i_ino, new_inode->i_generation);
125         }
126         lock->l_resource->lr_lvb_inode = new_inode;
127         if (bits)
128                 *bits = lock->l_policy_data.l_inodebits.bits;
129
130         unlock_res_and_lock(lock);
131         LDLM_LOCK_PUT(lock);
132
133         RETURN(0);
134 }
135
136 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
137                               const struct lu_fid *fid, enum ldlm_type type,
138                               union ldlm_policy_data *policy,
139                               enum ldlm_mode mode, struct lustre_handle *lockh)
140 {
141         struct ldlm_res_id res_id;
142         enum ldlm_mode rc;
143         ENTRY;
144
145         fid_build_reg_res_name(fid, &res_id);
146         /* LU-4405: Clear bits not supported by server */
147         policy->l_inodebits.bits &= exp_connect_ibits(exp);
148         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
149                              &res_id, type, policy, mode, lockh, 0);
150         RETURN(rc);
151 }
152
153 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
154                       union ldlm_policy_data *policy, enum ldlm_mode mode,
155                       enum ldlm_cancel_flags flags, void *opaque)
156 {
157         struct obd_device *obd = class_exp2obd(exp);
158         struct ldlm_res_id res_id;
159         int rc;
160
161         ENTRY;
162
163         fid_build_reg_res_name(fid, &res_id);
164         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
165                                              policy, mode, flags, opaque);
166         RETURN(rc);
167 }
168
169 int mdc_null_inode(struct obd_export *exp,
170                    const struct lu_fid *fid)
171 {
172         struct ldlm_res_id res_id;
173         struct ldlm_resource *res;
174         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
175         ENTRY;
176
177         LASSERTF(ns != NULL, "no namespace passed\n");
178
179         fid_build_reg_res_name(fid, &res_id);
180
181         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
182         if (IS_ERR(res))
183                 RETURN(0);
184
185         lock_res(res);
186         res->lr_lvb_inode = NULL;
187         unlock_res(res);
188
189         ldlm_resource_putref(res);
190         RETURN(0);
191 }
192
193 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
194 {
195         /* Don't hold error requests for replay. */
196         if (req->rq_replay) {
197                 spin_lock(&req->rq_lock);
198                 req->rq_replay = 0;
199                 spin_unlock(&req->rq_lock);
200         }
201         if (rc && req->rq_transno != 0) {
202                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
203                 LBUG();
204         }
205 }
206
207 /* Save a large LOV EA into the request buffer so that it is available
208  * for replay.  We don't do this in the initial request because the
209  * original request doesn't need this buffer (at most it sends just the
210  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
211  * buffer and may also be difficult to allocate and save a very large
212  * request buffer for each open. (bug 5707)
213  *
214  * OOM here may cause recovery failure if lmm is needed (only for the
215  * original open if the MDS crashed just when this client also OOM'd)
216  * but this is incredibly unlikely, and questionable whether the client
217  * could do MDS recovery under OOM anyways... */
218 int mdc_save_lovea(struct ptlrpc_request *req,
219                    const struct req_msg_field *field,
220                    void *data, u32 size)
221 {
222         struct req_capsule *pill = &req->rq_pill;
223         void *lmm;
224         int rc = 0;
225
226         if (req_capsule_get_size(pill, field, RCL_CLIENT) < size) {
227                 rc = sptlrpc_cli_enlarge_reqbuf(req, field, size);
228                 if (rc) {
229                         CERROR("%s: Can't enlarge ea size to %d: rc = %d\n",
230                                req->rq_export->exp_obd->obd_name,
231                                size, rc);
232                         return rc;
233                 }
234         } else {
235                 req_capsule_shrink(pill, field, size, RCL_CLIENT);
236         }
237
238         req_capsule_set_size(pill, field, RCL_CLIENT, size);
239         lmm = req_capsule_client_get(pill, field);
240         if (lmm)
241                 memcpy(lmm, data, size);
242
243         return rc;
244 }
245
246 static struct ptlrpc_request *
247 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
248                      struct md_op_data *op_data, __u32 acl_bufsize)
249 {
250         struct ptlrpc_request   *req;
251         struct obd_device       *obddev = class_exp2obd(exp);
252         struct ldlm_intent      *lit;
253         const void              *lmm = op_data->op_data;
254         __u32                    lmmsize = op_data->op_data_size;
255         struct list_head         cancels = LIST_HEAD_INIT(cancels);
256         int                      count = 0;
257         enum ldlm_mode           mode;
258         int                      rc;
259         int repsize;
260
261         ENTRY;
262
263         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
264
265         /* XXX: openlock is not cancelled for cross-refs. */
266         /* If inode is known, cancel conflicting OPEN locks. */
267         if (fid_is_sane(&op_data->op_fid2)) {
268                 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
269                         if (it->it_flags & MDS_FMODE_WRITE)
270                                 mode = LCK_EX;
271                         else
272                                 mode = LCK_PR;
273                 } else {
274                         if (it->it_flags & (MDS_FMODE_WRITE | MDS_OPEN_TRUNC))
275                                 mode = LCK_CW;
276 #ifdef FMODE_EXEC
277                         else if (it->it_flags & FMODE_EXEC)
278                                 mode = LCK_PR;
279 #endif
280                         else
281                                 mode = LCK_CR;
282                 }
283                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
284                                                 &cancels, mode,
285                                                 MDS_INODELOCK_OPEN);
286         }
287
288         /* If CREATE, cancel parent's UPDATE lock. */
289         if (it->it_op & IT_CREAT)
290                 mode = LCK_EX;
291         else
292                 mode = LCK_CR;
293         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
294                                          &cancels, mode,
295                                          MDS_INODELOCK_UPDATE);
296
297         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
298                                    &RQF_LDLM_INTENT_OPEN);
299         if (req == NULL) {
300                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
301                 RETURN(ERR_PTR(-ENOMEM));
302         }
303
304         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
305                              op_data->op_namelen + 1);
306         if (cl_is_lov_delay_create(it->it_flags)) {
307                 /* open(O_LOV_DELAY_CREATE) won't pack lmm */
308                 LASSERT(lmmsize == 0);
309                 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
310         } else {
311                 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
312                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
313         }
314
315         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
316                              RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
317                              strlen(op_data->op_file_secctx_name) + 1 : 0);
318
319         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
320                              op_data->op_file_secctx_size);
321
322         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
323         if (rc < 0) {
324                 ptlrpc_request_free(req);
325                 RETURN(ERR_PTR(rc));
326         }
327
328         spin_lock(&req->rq_lock);
329         req->rq_replay = req->rq_import->imp_replayable;
330         spin_unlock(&req->rq_lock);
331
332         /* pack the intent */
333         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
334         lit->opc = (__u64)it->it_op;
335
336         /* pack the intended request */
337         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
338                       lmmsize);
339
340         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
341                              obddev->u.cli.cl_max_mds_easize);
342         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
343
344         /**
345          * Inline buffer for possible data from Data-on-MDT files.
346          */
347         req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE, RCL_SERVER,
348                              sizeof(struct niobuf_remote));
349         ptlrpc_request_set_replen(req);
350
351         /* Get real repbuf allocated size as rounded up power of 2 */
352         repsize = size_roundup_power2(req->rq_replen +
353                                       lustre_msg_early_size());
354
355         /* Estimate free space for DoM files in repbuf */
356         repsize -= req->rq_replen - obddev->u.cli.cl_max_mds_easize +
357                    sizeof(struct lov_comp_md_v1) +
358                    sizeof(struct lov_comp_md_entry_v1) +
359                    lov_mds_md_size(0, LOV_MAGIC_V3);
360
361         if (repsize < obddev->u.cli.cl_dom_min_inline_repsize) {
362                 repsize = obddev->u.cli.cl_dom_min_inline_repsize - repsize;
363                 req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE,
364                                      RCL_SERVER,
365                                      sizeof(struct niobuf_remote) + repsize);
366                 ptlrpc_request_set_replen(req);
367                 CDEBUG(D_INFO, "Increase repbuf by %d bytes, total: %d\n",
368                        repsize, req->rq_replen);
369         }
370         return req;
371 }
372
373 #define GA_DEFAULT_EA_NAME_LEN 20
374 #define GA_DEFAULT_EA_VAL_LEN  250
375 #define GA_DEFAULT_EA_NUM      10
376
377 static struct ptlrpc_request *
378 mdc_intent_getxattr_pack(struct obd_export *exp,
379                          struct lookup_intent *it,
380                          struct md_op_data *op_data)
381 {
382         struct ptlrpc_request   *req;
383         struct ldlm_intent      *lit;
384         int                     rc, count = 0;
385         struct list_head        cancels = LIST_HEAD_INIT(cancels);
386         u32 ea_vals_buf_size = GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM;
387
388         ENTRY;
389
390         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
391                                         &RQF_LDLM_INTENT_GETXATTR);
392         if (req == NULL)
393                 RETURN(ERR_PTR(-ENOMEM));
394
395         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
396         if (rc) {
397                 ptlrpc_request_free(req);
398                 RETURN(ERR_PTR(rc));
399         }
400
401         /* pack the intent */
402         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
403         lit->opc = IT_GETXATTR;
404
405 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 0, 53, 0)
406         /* If the supplied buffer is too small then the server will
407          * return -ERANGE and llite will fallback to using non cached
408          * xattr operations. On servers before 2.10.1 a (non-cached)
409          * listxattr RPC for an orphan or dead file causes an oops. So
410          * let's try to avoid sending too small a buffer to too old a
411          * server. This is effectively undoing the memory conservation
412          * of LU-9417 when it would be *more* likely to crash the
413          * server. See LU-9856. */
414         if (exp->exp_connect_data.ocd_version < OBD_OCD_VERSION(2, 10, 1, 0))
415                 ea_vals_buf_size = max_t(u32, ea_vals_buf_size,
416                                          exp->exp_connect_data.ocd_max_easize);
417 #endif
418
419         /* pack the intended request */
420         mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid,
421                       ea_vals_buf_size, -1, 0);
422
423         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
424                              GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM);
425
426         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER,
427                              ea_vals_buf_size);
428
429         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS, RCL_SERVER,
430                              sizeof(u32) * GA_DEFAULT_EA_NUM);
431
432         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
433
434         ptlrpc_request_set_replen(req);
435
436         RETURN(req);
437 }
438
439 static struct ptlrpc_request *
440 mdc_intent_getattr_pack(struct obd_export *exp, struct lookup_intent *it,
441                         struct md_op_data *op_data, __u32 acl_bufsize)
442 {
443         struct ptlrpc_request   *req;
444         struct obd_device       *obddev = class_exp2obd(exp);
445         u64                      valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
446                                          OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
447                                          OBD_MD_MEA | OBD_MD_FLACL;
448         struct ldlm_intent      *lit;
449         int                      rc;
450         __u32                    easize;
451         ENTRY;
452
453         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
454                                    &RQF_LDLM_INTENT_GETATTR);
455         if (req == NULL)
456                 RETURN(ERR_PTR(-ENOMEM));
457
458         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
459                              op_data->op_namelen + 1);
460
461         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
462         if (rc) {
463                 ptlrpc_request_free(req);
464                 RETURN(ERR_PTR(rc));
465         }
466
467         /* pack the intent */
468         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
469         lit->opc = (__u64)it->it_op;
470
471         if (obddev->u.cli.cl_default_mds_easize > 0)
472                 easize = obddev->u.cli.cl_default_mds_easize;
473         else
474                 easize = obddev->u.cli.cl_max_mds_easize;
475
476         /* pack the intended request */
477         mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
478
479         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
480         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
481         ptlrpc_request_set_replen(req);
482         RETURN(req);
483 }
484
485 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
486                                                      struct lookup_intent *it,
487                                                      struct md_op_data *op_data)
488 {
489         struct obd_device     *obd = class_exp2obd(exp);
490         struct ptlrpc_request *req;
491         struct ldlm_intent    *lit;
492         struct layout_intent  *layout;
493         int rc;
494         ENTRY;
495
496         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
497                                 &RQF_LDLM_INTENT_LAYOUT);
498         if (req == NULL)
499                 RETURN(ERR_PTR(-ENOMEM));
500
501         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
502         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
503         if (rc) {
504                 ptlrpc_request_free(req);
505                 RETURN(ERR_PTR(rc));
506         }
507
508         /* pack the intent */
509         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
510         lit->opc = (__u64)it->it_op;
511
512         /* pack the layout intent request */
513         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
514         LASSERT(op_data->op_data != NULL);
515         LASSERT(op_data->op_data_size == sizeof(*layout));
516         memcpy(layout, op_data->op_data, sizeof(*layout));
517
518         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
519                              obd->u.cli.cl_default_mds_easize);
520         ptlrpc_request_set_replen(req);
521         RETURN(req);
522 }
523
524 static struct ptlrpc_request *
525 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
526 {
527         struct ptlrpc_request *req;
528         int rc;
529         ENTRY;
530
531         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
532         if (req == NULL)
533                 RETURN(ERR_PTR(-ENOMEM));
534
535         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
536         if (rc) {
537                 ptlrpc_request_free(req);
538                 RETURN(ERR_PTR(rc));
539         }
540
541         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
542         ptlrpc_request_set_replen(req);
543         RETURN(req);
544 }
545
546 static int mdc_finish_enqueue(struct obd_export *exp,
547                               struct ptlrpc_request *req,
548                               struct ldlm_enqueue_info *einfo,
549                               struct lookup_intent *it,
550                               struct lustre_handle *lockh,
551                               int rc)
552 {
553         struct req_capsule  *pill = &req->rq_pill;
554         struct ldlm_request *lockreq;
555         struct ldlm_reply   *lockrep;
556         struct ldlm_lock    *lock;
557         struct mdt_body     *body = NULL;
558         void                *lvb_data = NULL;
559         __u32                lvb_len = 0;
560
561         ENTRY;
562
563         LASSERT(rc >= 0);
564         /* Similarly, if we're going to replay this request, we don't want to
565          * actually get a lock, just perform the intent. */
566         if (req->rq_transno || req->rq_replay) {
567                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
568                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
569         }
570
571         if (rc == ELDLM_LOCK_ABORTED) {
572                 einfo->ei_mode = 0;
573                 memset(lockh, 0, sizeof(*lockh));
574                 rc = 0;
575         } else { /* rc = 0 */
576                 lock = ldlm_handle2lock(lockh);
577                 LASSERT(lock != NULL);
578
579                 /* If the server gave us back a different lock mode, we should
580                  * fix up our variables. */
581                 if (lock->l_req_mode != einfo->ei_mode) {
582                         ldlm_lock_addref(lockh, lock->l_req_mode);
583                         ldlm_lock_decref(lockh, einfo->ei_mode);
584                         einfo->ei_mode = lock->l_req_mode;
585                 }
586                 LDLM_LOCK_PUT(lock);
587         }
588
589         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
590         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
591
592         it->it_disposition = (int)lockrep->lock_policy_res1;
593         it->it_status = (int)lockrep->lock_policy_res2;
594         it->it_lock_mode = einfo->ei_mode;
595         it->it_lock_handle = lockh->cookie;
596         it->it_request = req;
597
598         /* Technically speaking rq_transno must already be zero if
599          * it_status is in error, so the check is a bit redundant */
600         if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
601                 mdc_clear_replay_flag(req, it->it_status);
602
603         /* If we're doing an IT_OPEN which did not result in an actual
604          * successful open, then we need to remove the bit which saves
605          * this request for unconditional replay.
606          *
607          * It's important that we do this first!  Otherwise we might exit the
608          * function without doing so, and try to replay a failed create
609          * (bug 3440) */
610         if (it->it_op & IT_OPEN && req->rq_replay &&
611             (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
612                 mdc_clear_replay_flag(req, it->it_status);
613
614         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
615                   it->it_op, it->it_disposition, it->it_status);
616
617         /* We know what to expect, so we do any byte flipping required here */
618         if (it_has_reply_body(it)) {
619                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
620                 if (body == NULL) {
621                         CERROR ("Can't swab mdt_body\n");
622                         RETURN (-EPROTO);
623                 }
624
625                 if (it_disposition(it, DISP_OPEN_OPEN) &&
626                     !it_open_error(DISP_OPEN_OPEN, it)) {
627                         /*
628                          * If this is a successful OPEN request, we need to set
629                          * replay handler and data early, so that if replay
630                          * happens immediately after swabbing below, new reply
631                          * is swabbed by that handler correctly.
632                          */
633                         mdc_set_open_replay_data(NULL, NULL, it);
634                 }
635
636                 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
637                         void *eadata;
638
639                         mdc_update_max_ea_from_body(exp, body);
640
641                         /*
642                          * The eadata is opaque; just check that it is there.
643                          * Eventually, obd_unpackmd() will check the contents.
644                          */
645                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
646                                                         body->mbo_eadatasize);
647                         if (eadata == NULL)
648                                 RETURN(-EPROTO);
649
650                         /* save lvb data and length in case this is for layout
651                          * lock */
652                         lvb_data = eadata;
653                         lvb_len = body->mbo_eadatasize;
654
655                         /*
656                          * We save the reply LOV EA in case we have to replay a
657                          * create for recovery.  If we didn't allocate a large
658                          * enough request buffer above we need to reallocate it
659                          * here to hold the actual LOV EA.
660                          *
661                          * To not save LOV EA if request is not going to replay
662                          * (for example error one).
663                          */
664                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
665                                 rc = mdc_save_lovea(req, &RMF_EADATA, eadata,
666                                                     body->mbo_eadatasize);
667                                 if (rc) {
668                                         body->mbo_valid &= ~OBD_MD_FLEASIZE;
669                                         body->mbo_eadatasize = 0;
670                                         rc = 0;
671                                 }
672                         }
673                 }
674         } else if (it->it_op & IT_LAYOUT) {
675                 /* maybe the lock was granted right away and layout
676                  * is packed into RMF_DLM_LVB of req */
677                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
678                 if (lvb_len > 0) {
679                         lvb_data = req_capsule_server_sized_get(pill,
680                                                         &RMF_DLM_LVB, lvb_len);
681                         if (lvb_data == NULL)
682                                 RETURN(-EPROTO);
683
684                         /**
685                          * save replied layout data to the request buffer for
686                          * recovery consideration (lest MDS reinitialize
687                          * another set of OST objects).
688                          */
689                         if (req->rq_transno)
690                                 (void)mdc_save_lovea(req, &RMF_EADATA, lvb_data,
691                                                      lvb_len);
692                 }
693         }
694
695         /* fill in stripe data for layout lock.
696          * LU-6581: trust layout data only if layout lock is granted. The MDT
697          * has stopped sending layout unless the layout lock is granted. The
698          * client still does this checking in case it's talking with an old
699          * server. - Jinshan */
700         lock = ldlm_handle2lock(lockh);
701         if (lock == NULL)
702                 RETURN(rc);
703
704         if (ldlm_has_layout(lock) && lvb_data != NULL &&
705             !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
706                 void *lmm;
707
708                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
709                         ldlm_it2str(it->it_op), lvb_len);
710
711                 OBD_ALLOC_LARGE(lmm, lvb_len);
712                 if (lmm == NULL)
713                         GOTO(out_lock, rc = -ENOMEM);
714
715                 memcpy(lmm, lvb_data, lvb_len);
716
717                 /* install lvb_data */
718                 lock_res_and_lock(lock);
719                 if (lock->l_lvb_data == NULL) {
720                         lock->l_lvb_type = LVB_T_LAYOUT;
721                         lock->l_lvb_data = lmm;
722                         lock->l_lvb_len = lvb_len;
723                         lmm = NULL;
724                 }
725                 unlock_res_and_lock(lock);
726                 if (lmm != NULL)
727                         OBD_FREE_LARGE(lmm, lvb_len);
728         }
729
730         if (ldlm_has_dom(lock)) {
731                 LASSERT(lock->l_glimpse_ast == mdc_ldlm_glimpse_ast);
732
733                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
734                 if (!(body->mbo_valid & OBD_MD_DOM_SIZE)) {
735                         LDLM_ERROR(lock, "%s: DoM lock without size.\n",
736                                    exp->exp_obd->obd_name);
737                         GOTO(out_lock, rc = -EPROTO);
738                 }
739
740                 LDLM_DEBUG(lock, "DoM lock is returned by: %s, size: %llu",
741                            ldlm_it2str(it->it_op), body->mbo_dom_size);
742
743                 rc = mdc_fill_lvb(req, &lock->l_ost_lvb);
744         }
745 out_lock:
746         LDLM_LOCK_PUT(lock);
747
748         RETURN(rc);
749 }
750
751 /* We always reserve enough space in the reply packet for a stripe MD, because
752  * we don't know in advance the file type. */
753 static int mdc_enqueue_base(struct obd_export *exp,
754                             struct ldlm_enqueue_info *einfo,
755                             const union ldlm_policy_data *policy,
756                             struct lookup_intent *it,
757                             struct md_op_data *op_data,
758                             struct lustre_handle *lockh,
759                             __u64 extra_lock_flags)
760 {
761         struct obd_device *obddev = class_exp2obd(exp);
762         struct ptlrpc_request *req = NULL;
763         __u64 flags, saved_flags = extra_lock_flags;
764         struct ldlm_res_id res_id;
765         static const union ldlm_policy_data lookup_policy = {
766                                   .l_inodebits = { MDS_INODELOCK_LOOKUP } };
767         static const union ldlm_policy_data update_policy = {
768                                   .l_inodebits = { MDS_INODELOCK_UPDATE } };
769         static const union ldlm_policy_data layout_policy = {
770                                   .l_inodebits = { MDS_INODELOCK_LAYOUT } };
771         static const union ldlm_policy_data getxattr_policy = {
772                                   .l_inodebits = { MDS_INODELOCK_XATTR } };
773         int generation, resends = 0;
774         struct ldlm_reply *lockrep;
775         struct obd_import *imp = class_exp2cliimp(exp);
776         __u32 acl_bufsize;
777         enum lvb_type lvb_type = 0;
778         int rc;
779         ENTRY;
780
781         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
782                  einfo->ei_type);
783         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
784
785         if (it != NULL) {
786                 LASSERT(policy == NULL);
787
788                 saved_flags |= LDLM_FL_HAS_INTENT;
789                 if (it->it_op & (IT_GETATTR | IT_READDIR))
790                         policy = &update_policy;
791                 else if (it->it_op & IT_LAYOUT)
792                         policy = &layout_policy;
793                 else if (it->it_op & IT_GETXATTR)
794                         policy = &getxattr_policy;
795                 else
796                         policy = &lookup_policy;
797         }
798
799         generation = obddev->u.cli.cl_import->imp_generation;
800         if (!it || (it->it_op & (IT_OPEN | IT_CREAT)))
801                 acl_bufsize = imp->imp_connect_data.ocd_max_easize;
802         else
803                 acl_bufsize = LUSTRE_POSIX_ACL_MAX_SIZE_OLD;
804
805 resend:
806         flags = saved_flags;
807         if (it == NULL) {
808                 /* The only way right now is FLOCK. */
809                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
810                          einfo->ei_type);
811                 res_id.name[3] = LDLM_FLOCK;
812         } else if (it->it_op & IT_OPEN) {
813                 req = mdc_intent_open_pack(exp, it, op_data, acl_bufsize);
814         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
815                 req = mdc_intent_getattr_pack(exp, it, op_data, acl_bufsize);
816         } else if (it->it_op & IT_READDIR) {
817                 req = mdc_enqueue_pack(exp, 0);
818         } else if (it->it_op & IT_LAYOUT) {
819                 if (!imp_connect_lvb_type(imp))
820                         RETURN(-EOPNOTSUPP);
821                 req = mdc_intent_layout_pack(exp, it, op_data);
822                 lvb_type = LVB_T_LAYOUT;
823         } else if (it->it_op & IT_GETXATTR) {
824                 req = mdc_intent_getxattr_pack(exp, it, op_data);
825         } else {
826                 LBUG();
827                 RETURN(-EINVAL);
828         }
829
830         if (IS_ERR(req))
831                 RETURN(PTR_ERR(req));
832
833         if (resends) {
834                 req->rq_generation_set = 1;
835                 req->rq_import_generation = generation;
836                 req->rq_sent = ktime_get_real_seconds() + resends;
837         }
838
839         /* It is important to obtain modify RPC slot first (if applicable), so
840          * that threads that are waiting for a modify RPC slot are not polluting
841          * our rpcs in flight counter.
842          * We do not do flock request limiting, though */
843         if (it) {
844                 mdc_get_mod_rpc_slot(req, it);
845                 rc = obd_get_request_slot(&obddev->u.cli);
846                 if (rc != 0) {
847                         mdc_put_mod_rpc_slot(req, it);
848                         mdc_clear_replay_flag(req, 0);
849                         ptlrpc_req_finished(req);
850                         RETURN(rc);
851                 }
852         }
853
854         /* With Data-on-MDT the glimpse callback is needed too.
855          * It is set here in advance but not in mdc_finish_enqueue()
856          * to avoid possible races. It is safe to have glimpse handler
857          * for non-DOM locks and costs nothing.*/
858         if (einfo->ei_cb_gl == NULL)
859                 einfo->ei_cb_gl = mdc_ldlm_glimpse_ast;
860
861         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
862                               0, lvb_type, lockh, 0);
863         if (!it) {
864                 /* For flock requests we immediatelly return without further
865                    delay and let caller deal with the rest, since rest of
866                    this function metadata processing makes no sense for flock
867                    requests anyway. But in case of problem during comms with
868                    Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
869                    can not rely on caller and this mainly for F_UNLCKs
870                    (explicits or automatically generated by Kernel to clean
871                    current FLocks upon exit) that can't be trashed */
872                 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
873                     (einfo->ei_type == LDLM_FLOCK) &&
874                     (einfo->ei_mode == LCK_NL))
875                         goto resend;
876                 RETURN(rc);
877         }
878
879         obd_put_request_slot(&obddev->u.cli);
880         mdc_put_mod_rpc_slot(req, it);
881
882         if (rc < 0) {
883                 CDEBUG(D_INFO,
884                       "%s: ldlm_cli_enqueue "DFID":"DFID"=%s failed: rc = %d\n",
885                       obddev->obd_name, PFID(&op_data->op_fid1),
886                       PFID(&op_data->op_fid2), op_data->op_name ?: "", rc);
887
888                 mdc_clear_replay_flag(req, rc);
889                 ptlrpc_req_finished(req);
890                 RETURN(rc);
891         }
892
893         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
894         LASSERT(lockrep != NULL);
895
896         lockrep->lock_policy_res2 =
897                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
898
899         /* Retry infinitely when the server returns -EINPROGRESS for the
900          * intent operation, when server returns -EINPROGRESS for acquiring
901          * intent lock, we'll retry in after_reply(). */
902         if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
903                 mdc_clear_replay_flag(req, rc);
904                 ptlrpc_req_finished(req);
905                 if (generation == obddev->u.cli.cl_import->imp_generation) {
906                         if (signal_pending(current))
907                                 RETURN(-EINTR);
908
909                         resends++;
910                         CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
911                                obddev->obd_name, resends, it->it_op,
912                                PFID(&op_data->op_fid1),
913                                PFID(&op_data->op_fid2));
914                         goto resend;
915                 } else {
916                         CDEBUG(D_HA, "resend cross eviction\n");
917                         RETURN(-EIO);
918                 }
919         }
920
921         if ((int)lockrep->lock_policy_res2 == -ERANGE &&
922             it->it_op & (IT_OPEN | IT_GETATTR | IT_LOOKUP) &&
923             acl_bufsize != imp->imp_connect_data.ocd_max_easize) {
924                 mdc_clear_replay_flag(req, -ERANGE);
925                 ptlrpc_req_finished(req);
926                 acl_bufsize = imp->imp_connect_data.ocd_max_easize;
927                 goto resend;
928         }
929
930         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
931         if (rc < 0) {
932                 if (lustre_handle_is_used(lockh)) {
933                         ldlm_lock_decref(lockh, einfo->ei_mode);
934                         memset(lockh, 0, sizeof(*lockh));
935                 }
936                 ptlrpc_req_finished(req);
937
938                 it->it_lock_handle = 0;
939                 it->it_lock_mode = 0;
940                 it->it_request = NULL;
941         }
942
943         RETURN(rc);
944 }
945
946 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
947                 const union ldlm_policy_data *policy,
948                 struct md_op_data *op_data,
949                 struct lustre_handle *lockh, __u64 extra_lock_flags)
950 {
951         return mdc_enqueue_base(exp, einfo, policy, NULL,
952                                 op_data, lockh, extra_lock_flags);
953 }
954
955 static int mdc_finish_intent_lock(struct obd_export *exp,
956                                   struct ptlrpc_request *request,
957                                   struct md_op_data *op_data,
958                                   struct lookup_intent *it,
959                                   struct lustre_handle *lockh)
960 {
961         struct lustre_handle old_lock;
962         struct ldlm_lock *lock;
963         int rc = 0;
964         ENTRY;
965
966         LASSERT(request != NULL);
967         LASSERT(request != LP_POISON);
968         LASSERT(request->rq_repmsg != LP_POISON);
969
970         if (it->it_op & IT_READDIR)
971                 RETURN(0);
972
973         if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) {
974                 if (it->it_status != 0)
975                         GOTO(out, rc = it->it_status);
976         } else {
977                 if (!it_disposition(it, DISP_IT_EXECD)) {
978                         /* The server failed before it even started executing
979                          * the intent, i.e. because it couldn't unpack the
980                          * request.
981                          */
982                         LASSERT(it->it_status != 0);
983                         GOTO(out, rc = it->it_status);
984                 }
985                 rc = it_open_error(DISP_IT_EXECD, it);
986                 if (rc)
987                         GOTO(out, rc);
988
989                 rc = it_open_error(DISP_LOOKUP_EXECD, it);
990                 if (rc)
991                         GOTO(out, rc);
992
993                 /* keep requests around for the multiple phases of the call
994                  * this shows the DISP_XX must guarantee we make it into the
995                  * call
996                  */
997                 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
998                     it_disposition(it, DISP_OPEN_CREATE) &&
999                     !it_open_error(DISP_OPEN_CREATE, it)) {
1000                         it_set_disposition(it, DISP_ENQ_CREATE_REF);
1001                         /* balanced in ll_create_node */
1002                         ptlrpc_request_addref(request);
1003                 }
1004                 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1005                     it_disposition(it, DISP_OPEN_OPEN) &&
1006                     !it_open_error(DISP_OPEN_OPEN, it)) {
1007                         it_set_disposition(it, DISP_ENQ_OPEN_REF);
1008                         /* balanced in ll_file_open */
1009                         ptlrpc_request_addref(request);
1010                         /* BUG 11546 - eviction in the middle of open rpc
1011                          * processing
1012                          */
1013                         OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE,
1014                                          obd_timeout);
1015                 }
1016
1017                 if (it->it_op & IT_CREAT) {
1018                         /* XXX this belongs in ll_create_it */
1019                 } else if (it->it_op == IT_OPEN) {
1020                         LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1021                 } else {
1022                         LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
1023                 }
1024         }
1025
1026         /* If we already have a matching lock, then cancel the new
1027          * one.  We have to set the data here instead of in
1028          * mdc_enqueue, because we need to use the child's inode as
1029          * the l_ast_data to match, and that's not available until
1030          * intent_finish has performed the iget().) */
1031         lock = ldlm_handle2lock(lockh);
1032         if (lock) {
1033                 union ldlm_policy_data policy = lock->l_policy_data;
1034                 LDLM_DEBUG(lock, "matching against this");
1035
1036                 if (it_has_reply_body(it)) {
1037                         struct mdt_body *body;
1038
1039                         body = req_capsule_server_get(&request->rq_pill,
1040                                                       &RMF_MDT_BODY);
1041                         /* mdc_enqueue checked */
1042                         LASSERT(body != NULL);
1043                         LASSERTF(fid_res_name_eq(&body->mbo_fid1,
1044                                                  &lock->l_resource->lr_name),
1045                                  "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1046                                  PLDLMRES(lock->l_resource),
1047                                  PFID(&body->mbo_fid1));
1048                 }
1049                 LDLM_LOCK_PUT(lock);
1050
1051                 memcpy(&old_lock, lockh, sizeof(*lockh));
1052                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1053                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1054                         ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
1055                         memcpy(lockh, &old_lock, sizeof(old_lock));
1056                         it->it_lock_handle = lockh->cookie;
1057                 }
1058         }
1059
1060         EXIT;
1061 out:
1062         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1063                 (int)op_data->op_namelen, op_data->op_name,
1064                 ldlm_it2str(it->it_op), it->it_status,
1065                 it->it_disposition, rc);
1066         return rc;
1067 }
1068
1069 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1070                         struct lu_fid *fid, __u64 *bits)
1071 {
1072         /* We could just return 1 immediately, but since we should only
1073          * be called in revalidate_it if we already have a lock, let's
1074          * verify that. */
1075         struct ldlm_res_id res_id;
1076         struct lustre_handle lockh;
1077         union ldlm_policy_data policy;
1078         enum ldlm_mode mode;
1079         ENTRY;
1080
1081         if (it->it_lock_handle) {
1082                 lockh.cookie = it->it_lock_handle;
1083                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1084         } else {
1085                 fid_build_reg_res_name(fid, &res_id);
1086                 switch (it->it_op) {
1087                 case IT_GETATTR:
1088                         /* File attributes are held under multiple bits:
1089                          * nlink is under lookup lock, size and times are
1090                          * under UPDATE lock and recently we've also got
1091                          * a separate permissions lock for owner/group/acl that
1092                          * were protected by lookup lock before.
1093                          * Getattr must provide all of that information,
1094                          * so we need to ensure we have all of those locks.
1095                          * Unfortunately, if the bits are split across multiple
1096                          * locks, there's no easy way to match all of them here,
1097                          * so an extra RPC would be performed to fetch all
1098                          * of those bits at once for now. */
1099                         /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1100                          * but for old MDTs (< 2.4), permission is covered
1101                          * by LOOKUP lock, so it needs to match all bits here.*/
1102                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1103                                                   MDS_INODELOCK_LOOKUP |
1104                                                   MDS_INODELOCK_PERM;
1105                         break;
1106                 case IT_READDIR:
1107                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1108                         break;
1109                 case IT_LAYOUT:
1110                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1111                         break;
1112                 default:
1113                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1114                         break;
1115                 }
1116
1117                 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1118                                       LDLM_IBITS, &policy,
1119                                       LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1120                                       &lockh);
1121         }
1122
1123         if (mode) {
1124                 it->it_lock_handle = lockh.cookie;
1125                 it->it_lock_mode = mode;
1126         } else {
1127                 it->it_lock_handle = 0;
1128                 it->it_lock_mode = 0;
1129         }
1130
1131         RETURN(!!mode);
1132 }
1133
1134 /*
1135  * This long block is all about fixing up the lock and request state
1136  * so that it is correct as of the moment _before_ the operation was
1137  * applied; that way, the VFS will think that everything is normal and
1138  * call Lustre's regular VFS methods.
1139  *
1140  * If we're performing a creation, that means that unless the creation
1141  * failed with EEXIST, we should fake up a negative dentry.
1142  *
1143  * For everything else, we want to lookup to succeed.
1144  *
1145  * One additional note: if CREATE or OPEN succeeded, we add an extra
1146  * reference to the request because we need to keep it around until
1147  * ll_create/ll_open gets called.
1148  *
1149  * The server will return to us, in it_disposition, an indication of
1150  * exactly what it_status refers to.
1151  *
1152  * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1153  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1154  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1155  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1156  * was successful.
1157  *
1158  * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1159  * child lookup.
1160  */
1161 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1162                     struct lookup_intent *it, struct ptlrpc_request **reqp,
1163                     ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1164 {
1165         struct ldlm_enqueue_info einfo = {
1166                 .ei_type        = LDLM_IBITS,
1167                 .ei_mode        = it_to_lock_mode(it),
1168                 .ei_cb_bl       = cb_blocking,
1169                 .ei_cb_cp       = ldlm_completion_ast,
1170                 .ei_cb_gl       = mdc_ldlm_glimpse_ast,
1171         };
1172         struct lustre_handle lockh;
1173         int rc = 0;
1174         ENTRY;
1175         LASSERT(it);
1176
1177         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1178                 ", intent: %s flags %#llo\n", (int)op_data->op_namelen,
1179                 op_data->op_name, PFID(&op_data->op_fid2),
1180                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1181                 it->it_flags);
1182
1183         lockh.cookie = 0;
1184         if (fid_is_sane(&op_data->op_fid2) &&
1185             (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1186                 /* We could just return 1 immediately, but since we should only
1187                  * be called in revalidate_it if we already have a lock, let's
1188                  * verify that. */
1189                 it->it_lock_handle = 0;
1190                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1191                 /* Only return failure if it was not GETATTR by cfid
1192                    (from inode_revalidate) */
1193                 if (rc || op_data->op_namelen != 0)
1194                         RETURN(rc);
1195         }
1196
1197         /* For case if upper layer did not alloc fid, do it now. */
1198         if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1199                 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1200                 if (rc < 0) {
1201                         CERROR("Can't alloc new fid, rc %d\n", rc);
1202                         RETURN(rc);
1203                 }
1204         }
1205
1206         rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh,
1207                               extra_lock_flags);
1208         if (rc < 0)
1209                 RETURN(rc);
1210
1211         *reqp = it->it_request;
1212         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1213         RETURN(rc);
1214 }
1215
1216 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1217                                               struct ptlrpc_request *req,
1218                                               void *args, int rc)
1219 {
1220         struct mdc_getattr_args  *ga = args;
1221         struct obd_export        *exp = ga->ga_exp;
1222         struct md_enqueue_info   *minfo = ga->ga_minfo;
1223         struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1224         struct lookup_intent     *it;
1225         struct lustre_handle     *lockh;
1226         struct obd_device        *obddev;
1227         struct ldlm_reply        *lockrep;
1228         __u64                     flags = LDLM_FL_HAS_INTENT;
1229         ENTRY;
1230
1231         it    = &minfo->mi_it;
1232         lockh = &minfo->mi_lockh;
1233
1234         obddev = class_exp2obd(exp);
1235
1236         obd_put_request_slot(&obddev->u.cli);
1237         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1238                 rc = -ETIMEDOUT;
1239
1240         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1241                                    &flags, NULL, 0, lockh, rc);
1242         if (rc < 0) {
1243                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1244                 mdc_clear_replay_flag(req, rc);
1245                 GOTO(out, rc);
1246         }
1247
1248         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1249         LASSERT(lockrep != NULL);
1250
1251         lockrep->lock_policy_res2 =
1252                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1253
1254         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1255         if (rc)
1256                 GOTO(out, rc);
1257
1258         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1259         EXIT;
1260
1261 out:
1262         minfo->mi_cb(req, minfo, rc);
1263         return 0;
1264 }
1265
1266 int mdc_intent_getattr_async(struct obd_export *exp,
1267                              struct md_enqueue_info *minfo)
1268 {
1269         struct md_op_data       *op_data = &minfo->mi_data;
1270         struct lookup_intent    *it = &minfo->mi_it;
1271         struct ptlrpc_request   *req;
1272         struct mdc_getattr_args *ga;
1273         struct obd_device       *obddev = class_exp2obd(exp);
1274         struct ldlm_res_id       res_id;
1275         union ldlm_policy_data policy = {
1276                                 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1277                                                  MDS_INODELOCK_UPDATE } };
1278         int                      rc = 0;
1279         __u64                    flags = LDLM_FL_HAS_INTENT;
1280         ENTRY;
1281
1282         CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#llo\n",
1283                 (int)op_data->op_namelen, op_data->op_name,
1284                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1285
1286         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1287         /* If the MDT return -ERANGE because of large ACL, then the sponsor
1288          * of the async getattr RPC will handle that by itself. */
1289         req = mdc_intent_getattr_pack(exp, it, op_data,
1290                                       LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
1291         if (IS_ERR(req))
1292                 RETURN(PTR_ERR(req));
1293
1294         rc = obd_get_request_slot(&obddev->u.cli);
1295         if (rc != 0) {
1296                 ptlrpc_req_finished(req);
1297                 RETURN(rc);
1298         }
1299
1300         /* With Data-on-MDT the glimpse callback is needed too.
1301          * It is set here in advance but not in mdc_finish_enqueue()
1302          * to avoid possible races. It is safe to have glimpse handler
1303          * for non-DOM locks and costs nothing.*/
1304         if (minfo->mi_einfo.ei_cb_gl == NULL)
1305                 minfo->mi_einfo.ei_cb_gl = mdc_ldlm_glimpse_ast;
1306
1307         rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1308                               &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1309         if (rc < 0) {
1310                 obd_put_request_slot(&obddev->u.cli);
1311                 ptlrpc_req_finished(req);
1312                 RETURN(rc);
1313         }
1314
1315         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1316         ga = ptlrpc_req_async_args(req);
1317         ga->ga_exp = exp;
1318         ga->ga_minfo = minfo;
1319
1320         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1321         ptlrpcd_add_req(req);
1322
1323         RETURN(0);
1324 }