Whamcloud - gitweb
d2c6d15e871db3ed75d3c565bdf1dd29a1fae0d1
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  */
31
32 #define DEBUG_SUBSYSTEM S_MDC
33
34 #include <linux/module.h>
35
36 #include <obd.h>
37 #include <obd_class.h>
38 #include <lustre_dlm.h>
39 #include <lustre_fid.h>
40 #include <lustre_intent.h>
41 #include <lustre_mdc.h>
42 #include <lustre_net.h>
43 #include <lustre_req_layout.h>
44 #include <lustre_swab.h>
45 #include <lustre_acl.h>
46
47 #include "mdc_internal.h"
48
49 struct mdc_getattr_args {
50         struct obd_export               *ga_exp;
51         struct md_enqueue_info          *ga_minfo;
52 };
53
54 int it_open_error(int phase, struct lookup_intent *it)
55 {
56         if (it_disposition(it, DISP_OPEN_LEASE)) {
57                 if (phase >= DISP_OPEN_LEASE)
58                         return it->it_status;
59                 else
60                         return 0;
61         }
62         if (it_disposition(it, DISP_OPEN_OPEN)) {
63                 if (phase >= DISP_OPEN_OPEN)
64                         return it->it_status;
65                 else
66                         return 0;
67         }
68
69         if (it_disposition(it, DISP_OPEN_CREATE)) {
70                 if (phase >= DISP_OPEN_CREATE)
71                         return it->it_status;
72                 else
73                         return 0;
74         }
75
76         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
77                 if (phase >= DISP_LOOKUP_EXECD)
78                         return it->it_status;
79                 else
80                         return 0;
81         }
82
83         if (it_disposition(it, DISP_IT_EXECD)) {
84                 if (phase >= DISP_IT_EXECD)
85                         return it->it_status;
86                 else
87                         return 0;
88         }
89
90         CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
91         LBUG();
92
93         return 0;
94 }
95 EXPORT_SYMBOL(it_open_error);
96
97 /* this must be called on a lockh that is known to have a referenced lock */
98 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
99                       void *data, __u64 *bits)
100 {
101         struct ldlm_lock *lock;
102         struct inode *new_inode = data;
103
104         ENTRY;
105         if (bits)
106                 *bits = 0;
107
108         if (!lustre_handle_is_used(lockh))
109                 RETURN(0);
110
111         lock = ldlm_handle2lock(lockh);
112
113         LASSERT(lock != NULL);
114         lock_res_and_lock(lock);
115         if (lock->l_resource->lr_lvb_inode &&
116             lock->l_resource->lr_lvb_inode != data) {
117                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
118
119                 LASSERTF(old_inode->i_state & I_FREEING,
120                          "Found existing inode %p/%lu/%u state %lu in lock: setting data to %p/%lu/%u\n",
121                          old_inode, old_inode->i_ino, old_inode->i_generation,
122                          old_inode->i_state,
123                          new_inode, new_inode->i_ino, new_inode->i_generation);
124         }
125         lock->l_resource->lr_lvb_inode = new_inode;
126         if (bits)
127                 *bits = lock->l_policy_data.l_inodebits.bits;
128
129         unlock_res_and_lock(lock);
130         LDLM_LOCK_PUT(lock);
131
132         RETURN(0);
133 }
134
135 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
136                               const struct lu_fid *fid, enum ldlm_type type,
137                               union ldlm_policy_data *policy,
138                               enum ldlm_mode mode, struct lustre_handle *lockh)
139 {
140         struct ldlm_res_id res_id;
141         enum ldlm_mode rc;
142
143         ENTRY;
144         fid_build_reg_res_name(fid, &res_id);
145         /* LU-4405: Clear bits not supported by server */
146         policy->l_inodebits.bits &= exp_connect_ibits(exp);
147         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
148                              &res_id, type, policy, mode, lockh);
149         RETURN(rc);
150 }
151
152 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
153                       union ldlm_policy_data *policy, enum ldlm_mode mode,
154                       enum ldlm_cancel_flags flags, void *opaque)
155 {
156         struct obd_device *obd = class_exp2obd(exp);
157         struct ldlm_res_id res_id;
158         int rc;
159
160         ENTRY;
161         fid_build_reg_res_name(fid, &res_id);
162         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
163                                              policy, mode, flags, opaque);
164         RETURN(rc);
165 }
166
167 int mdc_null_inode(struct obd_export *exp,
168                    const struct lu_fid *fid)
169 {
170         struct ldlm_res_id res_id;
171         struct ldlm_resource *res;
172         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
173
174         ENTRY;
175         LASSERTF(ns != NULL, "no namespace passed\n");
176
177         fid_build_reg_res_name(fid, &res_id);
178
179         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
180         if (IS_ERR(res))
181                 RETURN(0);
182
183         lock_res(res);
184         res->lr_lvb_inode = NULL;
185         unlock_res(res);
186
187         ldlm_resource_putref(res);
188         RETURN(0);
189 }
190
191 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
192 {
193         /* Don't hold error requests for replay. */
194         if (req->rq_replay) {
195                 spin_lock(&req->rq_lock);
196                 req->rq_replay = 0;
197                 spin_unlock(&req->rq_lock);
198         }
199         if (rc && req->rq_transno != 0) {
200                 DEBUG_REQ(D_ERROR, req, "transno returned on error: rc = %d",
201                           rc);
202                 LBUG();
203         }
204 }
205
206 /**
207  * Save a large LOV EA into the request buffer so that it is available
208  * for replay.  We don't do this in the initial request because the
209  * original request doesn't need this buffer (at most it sends just the
210  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
211  * buffer and may also be difficult to allocate and save a very large
212  * request buffer for each open. (b=5707)
213  *
214  * OOM here may cause recovery failure if lmm is needed (only for the
215  * original open if the MDS crashed just when this client also OOM'd)
216  * but this is incredibly unlikely, and questionable whether the client
217  * could do MDS recovery under OOM anyways...
218  */
219 static int mdc_save_lovea(struct ptlrpc_request *req, void *data, u32 size)
220 {
221         struct req_capsule *pill = &req->rq_pill;
222         void *lovea;
223         int rc = 0;
224
225         if (req_capsule_get_size(pill, &RMF_EADATA, RCL_CLIENT) < size) {
226                 rc = sptlrpc_cli_enlarge_reqbuf(req, &RMF_EADATA, size);
227                 if (rc) {
228                         CERROR("%s: Can't enlarge ea size to %d: rc = %d\n",
229                                req->rq_export->exp_obd->obd_name,
230                                size, rc);
231                         return rc;
232                 }
233         } else {
234                 req_capsule_shrink(pill, &RMF_EADATA, size, RCL_CLIENT);
235         }
236
237         req_capsule_set_size(pill, &RMF_EADATA, RCL_CLIENT, size);
238         lovea = req_capsule_client_get(pill, &RMF_EADATA);
239         if (lovea) {
240                 memcpy(lovea, data, size);
241                 lov_fix_ea_for_replay(lovea);
242         }
243
244         return rc;
245 }
246
247 static struct ptlrpc_request *
248 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
249                      struct md_op_data *op_data, __u32 acl_bufsize)
250 {
251         struct ptlrpc_request *req;
252         struct obd_device *obd = class_exp2obd(exp);
253         struct ldlm_intent *lit;
254         const void *lmm = op_data->op_data;
255         __u32 lmmsize = op_data->op_data_size;
256         __u32  mdt_md_capsule_size;
257         LIST_HEAD(cancels);
258         int count = 0;
259         enum ldlm_mode mode;
260         int repsize, repsize_estimate;
261         int rc;
262
263         ENTRY;
264
265         mdt_md_capsule_size = obd->u.cli.cl_default_mds_easize;
266
267         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
268
269         /* XXX: openlock is not cancelled for cross-refs. */
270         /* If inode is known, cancel conflicting OPEN locks. */
271         if (fid_is_sane(&op_data->op_fid2)) {
272                 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
273                         if (it->it_flags & MDS_FMODE_WRITE)
274                                 mode = LCK_EX;
275                         else
276                                 mode = LCK_PR;
277                 } else {
278                         if (it->it_flags & (MDS_FMODE_WRITE | MDS_OPEN_TRUNC))
279                                 mode = LCK_CW;
280 #ifdef FMODE_EXEC
281                         else if (it->it_flags & FMODE_EXEC)
282                                 mode = LCK_PR;
283 #endif
284                         else
285                                 mode = LCK_CR;
286                 }
287                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
288                                                 &cancels, mode,
289                                                 MDS_INODELOCK_OPEN);
290         }
291
292         /* If CREATE, cancel parent's UPDATE lock. */
293         if (it->it_op & IT_CREAT)
294                 mode = LCK_EX;
295         else
296                 mode = LCK_CR;
297         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
298                                          &cancels, mode,
299                                          MDS_INODELOCK_UPDATE);
300
301         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
302                                    &RQF_LDLM_INTENT_OPEN);
303         if (req == NULL) {
304                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
305                 RETURN(ERR_PTR(-ENOMEM));
306         }
307
308         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
309                              op_data->op_namelen + 1);
310         if (cl_is_lov_delay_create(it->it_flags)) {
311                 /* open(O_LOV_DELAY_CREATE) won't pack lmm */
312                 LASSERT(lmmsize == 0);
313                 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
314         } else {
315                 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
316                              max(lmmsize, obd->u.cli.cl_default_mds_easize));
317         }
318
319         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
320                              RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
321                              op_data->op_file_secctx_name_size : 0);
322
323         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
324                              op_data->op_file_secctx_size);
325
326         req_capsule_set_size(&req->rq_pill, &RMF_FILE_ENCCTX, RCL_CLIENT,
327                              op_data->op_file_encctx_size);
328
329         /* get SELinux policy info if any */
330         rc = sptlrpc_get_sepol(req);
331         if (rc < 0) {
332                 ptlrpc_request_free(req);
333                 RETURN(ERR_PTR(rc));
334         }
335         req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
336                              strlen(req->rq_sepol) ?
337                              strlen(req->rq_sepol) + 1 : 0);
338
339         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
340         if (rc < 0) {
341                 ptlrpc_request_free(req);
342                 RETURN(ERR_PTR(rc));
343         }
344
345         spin_lock(&req->rq_lock);
346         req->rq_replay = req->rq_import->imp_replayable;
347         spin_unlock(&req->rq_lock);
348
349         /* pack the intent */
350         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
351         lit->opc = (__u64)it->it_op;
352
353         /* pack the intended request */
354         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
355                       lmmsize);
356
357         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
358                              mdt_md_capsule_size);
359         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
360
361         if (!(it->it_op & IT_CREAT) && it->it_op & IT_OPEN &&
362             req_capsule_has_field(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
363                                   RCL_CLIENT) &&
364             op_data->op_file_secctx_name_size > 0 &&
365             op_data->op_file_secctx_name != NULL) {
366                 char *secctx_name;
367
368                 secctx_name = req_capsule_client_get(&req->rq_pill,
369                                                      &RMF_FILE_SECCTX_NAME);
370                 memcpy(secctx_name, op_data->op_file_secctx_name,
371                        op_data->op_file_secctx_name_size);
372                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
373                                      RCL_SERVER,
374                                      obd->u.cli.cl_max_mds_easize);
375
376                 CDEBUG(D_SEC, "packed '%.*s' as security xattr name\n",
377                        op_data->op_file_secctx_name_size,
378                        op_data->op_file_secctx_name);
379
380         } else {
381                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
382                                      RCL_SERVER, 0);
383         }
384
385         if (exp_connect_encrypt(exp) && !(it->it_op & IT_CREAT) &&
386             it->it_op & IT_OPEN)
387                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_ENCCTX,
388                                      RCL_SERVER,
389                                      obd->u.cli.cl_max_mds_easize);
390         else
391                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_ENCCTX,
392                                      RCL_SERVER, 0);
393
394         /**
395          * Inline buffer for possible data from Data-on-MDT files.
396          */
397         req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE, RCL_SERVER,
398                              sizeof(struct niobuf_remote));
399         ptlrpc_request_set_replen(req);
400
401         /* Get real repbuf allocated size as rounded up power of 2 */
402         repsize = size_roundup_power2(req->rq_replen +
403                                       lustre_msg_early_size());
404         /* Estimate free space for DoM files in repbuf */
405         repsize_estimate = repsize - (req->rq_replen -
406                            mdt_md_capsule_size +
407                            sizeof(struct lov_comp_md_v1) +
408                            sizeof(struct lov_comp_md_entry_v1) +
409                            lov_mds_md_size(0, LOV_MAGIC_V3));
410
411         if (repsize_estimate < obd->u.cli.cl_dom_min_inline_repsize) {
412                 repsize = obd->u.cli.cl_dom_min_inline_repsize -
413                           repsize_estimate + sizeof(struct niobuf_remote);
414                 req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE,
415                                      RCL_SERVER,
416                                      sizeof(struct niobuf_remote) + repsize);
417                 ptlrpc_request_set_replen(req);
418                 CDEBUG(D_INFO, "Increase repbuf by %d bytes, total: %d\n",
419                        repsize, req->rq_replen);
420                 repsize = size_roundup_power2(req->rq_replen +
421                                               lustre_msg_early_size());
422         }
423         /* The only way to report real allocated repbuf size to the server
424          * is the lm_repsize but it must be set prior buffer allocation itself
425          * due to security reasons - it is part of buffer used in signature
426          * calculation (see LU-11414). Therefore the saved size is predicted
427          * value as rq_replen rounded to the next higher power of 2.
428          * Such estimation is safe. Though the final allocated buffer might
429          * be even larger, it is not possible to know that at this point.
430          */
431         req->rq_reqmsg->lm_repsize = repsize;
432         RETURN(req);
433 }
434
435 #define GA_DEFAULT_EA_NAME_LEN   20
436 #define GA_DEFAULT_EA_VAL_LEN   250
437 #define GA_DEFAULT_EA_NUM        10
438
439 static struct ptlrpc_request *
440 mdc_intent_getxattr_pack(struct obd_export *exp, struct lookup_intent *it,
441                          struct md_op_data *op_data)
442 {
443         struct ptlrpc_request *req;
444         struct ldlm_intent *lit;
445         int rc, count = 0;
446         LIST_HEAD(cancels);
447         u32 ea_vals_buf_size = GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM;
448
449         ENTRY;
450         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
451                                         &RQF_LDLM_INTENT_GETXATTR);
452         if (req == NULL)
453                 RETURN(ERR_PTR(-ENOMEM));
454
455         /* get SELinux policy info if any */
456         rc = sptlrpc_get_sepol(req);
457         if (rc < 0) {
458                 ptlrpc_request_free(req);
459                 RETURN(ERR_PTR(rc));
460         }
461         req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
462                              strlen(req->rq_sepol) ?
463                              strlen(req->rq_sepol) + 1 : 0);
464
465         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
466         if (rc) {
467                 ptlrpc_request_free(req);
468                 RETURN(ERR_PTR(rc));
469         }
470
471         /* pack the intent */
472         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
473         lit->opc = IT_GETXATTR;
474         /* Message below is checked in sanity-selinux test_20d
475          * and sanity-sec test_49
476          */
477         CDEBUG(D_INFO, "%s: get xattrs for "DFID"\n",
478                exp->exp_obd->obd_name, PFID(&op_data->op_fid1));
479
480 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 0, 53, 0)
481         /* If the supplied buffer is too small then the server will return
482          * -ERANGE and llite will fallback to using non cached xattr
483          * operations. On servers before 2.10.1 a (non-cached) listxattr RPC
484          * for an orphan or dead file causes an oops. So let's try to avoid
485          * sending too small a buffer to too old a server. This is effectively
486          * undoing the memory conservation of LU-9417 when it would be *more*
487          * likely to crash the server. See LU-9856.
488          */
489         if (exp->exp_connect_data.ocd_version < OBD_OCD_VERSION(2, 10, 1, 0))
490                 ea_vals_buf_size = max_t(u32, ea_vals_buf_size,
491                                          exp->exp_connect_data.ocd_max_easize);
492 #endif
493
494         /* pack the intended request */
495         mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid,
496                       ea_vals_buf_size, -1, 0);
497
498         /* get SELinux policy info if any */
499         mdc_file_sepol_pack(req);
500
501         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
502                              GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM);
503
504         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER,
505                              ea_vals_buf_size);
506
507         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS, RCL_SERVER,
508                              sizeof(u32) * GA_DEFAULT_EA_NUM);
509
510         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
511
512         ptlrpc_request_set_replen(req);
513
514         RETURN(req);
515 }
516
517 static struct ptlrpc_request *
518 mdc_intent_getattr_pack(struct obd_export *exp, struct lookup_intent *it,
519                         struct md_op_data *op_data, __u32 acl_bufsize)
520 {
521         struct ptlrpc_request *req;
522         struct obd_device *obd = class_exp2obd(exp);
523         u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE |
524                     OBD_MD_FLDIREA | OBD_MD_MEA | OBD_MD_FLACL |
525                     OBD_MD_DEFAULT_MEA;
526         struct ldlm_intent *lit;
527         __u32 easize;
528         bool have_secctx = false;
529         int rc;
530
531         ENTRY;
532         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
533                                    &RQF_LDLM_INTENT_GETATTR);
534         if (req == NULL)
535                 RETURN(ERR_PTR(-ENOMEM));
536
537         /* send name of security xattr to get upon intent */
538         if (it->it_op & (IT_LOOKUP | IT_GETATTR) &&
539             req_capsule_has_field(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
540                                   RCL_CLIENT) &&
541             op_data->op_file_secctx_name_size > 0 &&
542             op_data->op_file_secctx_name != NULL) {
543                 have_secctx = true;
544                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
545                                      RCL_CLIENT,
546                                      op_data->op_file_secctx_name_size);
547         }
548
549         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
550                              op_data->op_namelen + 1);
551
552         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
553         if (rc) {
554                 ptlrpc_request_free(req);
555                 RETURN(ERR_PTR(rc));
556         }
557
558         /* pack the intent */
559         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
560         lit->opc = (__u64)it->it_op;
561
562         if (obd->u.cli.cl_default_mds_easize > 0)
563                 easize = obd->u.cli.cl_default_mds_easize;
564         else
565                 easize = obd->u.cli.cl_max_mds_easize;
566
567         /* pack the intended request */
568         mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
569
570         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
571         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
572         req_capsule_set_size(&req->rq_pill, &RMF_DEFAULT_MDT_MD, RCL_SERVER,
573                              sizeof(struct lmv_user_md));
574
575         if (have_secctx) {
576                 char *secctx_name;
577
578                 secctx_name = req_capsule_client_get(&req->rq_pill,
579                                                      &RMF_FILE_SECCTX_NAME);
580                 memcpy(secctx_name, op_data->op_file_secctx_name,
581                        op_data->op_file_secctx_name_size);
582
583                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
584                                      RCL_SERVER, easize);
585
586                 CDEBUG(D_SEC, "packed '%.*s' as security xattr name\n",
587                        op_data->op_file_secctx_name_size,
588                        op_data->op_file_secctx_name);
589         } else {
590                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
591                                      RCL_SERVER, 0);
592         }
593
594         if (exp_connect_encrypt(exp) && it->it_op & (IT_LOOKUP | IT_GETATTR))
595                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_ENCCTX,
596                                      RCL_SERVER, easize);
597         else
598                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_ENCCTX,
599                                      RCL_SERVER, 0);
600
601         ptlrpc_request_set_replen(req);
602         RETURN(req);
603 }
604
605 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
606                                                      struct lookup_intent *it,
607                                                      struct md_op_data *op_data)
608 {
609         struct obd_device *obd = class_exp2obd(exp);
610         struct ptlrpc_request *req;
611         struct ldlm_intent *lit;
612         struct layout_intent *layout;
613         LIST_HEAD(cancels);
614         int count = 0, rc;
615
616         ENTRY;
617         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
618                                 &RQF_LDLM_INTENT_LAYOUT);
619         if (req == NULL)
620                 RETURN(ERR_PTR(-ENOMEM));
621
622         if (fid_is_sane(&op_data->op_fid2) && (it->it_op & IT_LAYOUT) &&
623             (it->it_flags & FMODE_WRITE)) {
624                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
625                                                 &cancels, LCK_EX,
626                                                 MDS_INODELOCK_LAYOUT);
627         }
628
629         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
630         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
631         if (rc) {
632                 ptlrpc_request_free(req);
633                 RETURN(ERR_PTR(rc));
634         }
635
636         /* pack the intent */
637         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
638         lit->opc = (__u64)it->it_op;
639
640         /* pack the layout intent request */
641         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
642         LASSERT(op_data->op_data != NULL);
643         LASSERT(op_data->op_data_size == sizeof(*layout));
644         memcpy(layout, op_data->op_data, sizeof(*layout));
645
646         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
647                              obd->u.cli.cl_default_mds_easize);
648         ptlrpc_request_set_replen(req);
649         RETURN(req);
650 }
651
652 static struct ptlrpc_request *mdc_enqueue_pack(struct obd_export *exp,
653                                                int lvb_len)
654 {
655         struct ptlrpc_request *req;
656         int rc;
657
658         ENTRY;
659         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
660         if (req == NULL)
661                 RETURN(ERR_PTR(-ENOMEM));
662
663         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
664         if (rc) {
665                 ptlrpc_request_free(req);
666                 RETURN(ERR_PTR(rc));
667         }
668
669         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
670         ptlrpc_request_set_replen(req);
671         RETURN(req);
672 }
673
674 static int mdc_finish_enqueue(struct obd_export *exp,
675                               struct ptlrpc_request *req,
676                               struct ldlm_enqueue_info *einfo,
677                               struct lookup_intent *it,
678                               struct lustre_handle *lockh, int rc)
679 {
680         struct req_capsule *pill = &req->rq_pill;
681         struct ldlm_request *lockreq;
682         struct ldlm_reply *lockrep;
683         struct ldlm_lock *lock;
684         struct mdt_body *body = NULL;
685         void *lvb_data = NULL;
686         __u32 lvb_len = 0;
687
688         ENTRY;
689         LASSERT(rc >= 0);
690         /* Similarly, if we're going to replay this request, we don't want to
691          * actually get a lock, just perform the intent.
692          */
693         if (req->rq_transno || req->rq_replay) {
694                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
695                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
696         }
697
698         if (rc == ELDLM_LOCK_ABORTED) {
699                 einfo->ei_mode = 0;
700                 memset(lockh, 0, sizeof(*lockh));
701                 rc = 0;
702         } else { /* rc = 0 */
703                 lock = ldlm_handle2lock(lockh);
704                 LASSERT(lock != NULL);
705
706                 /* If server returned a different lock mode, fix up variables */
707                 if (lock->l_req_mode != einfo->ei_mode) {
708                         ldlm_lock_addref(lockh, lock->l_req_mode);
709                         ldlm_lock_decref(lockh, einfo->ei_mode);
710                         einfo->ei_mode = lock->l_req_mode;
711                 }
712                 LDLM_LOCK_PUT(lock);
713         }
714
715         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
716         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
717
718         it->it_disposition = (int)lockrep->lock_policy_res1;
719         it->it_status = (int)lockrep->lock_policy_res2;
720         it->it_lock_mode = einfo->ei_mode;
721         it->it_lock_handle = lockh->cookie;
722         it->it_request = req;
723
724         /* Technically speaking rq_transno must already be zero if
725          * it_status is in error, so the check is a bit redundant.
726          */
727         if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
728                 mdc_clear_replay_flag(req, it->it_status);
729
730         /* If we're doing an IT_OPEN which did not result in an actual
731          * successful open, then we need to remove the bit which saves
732          * this request for unconditional replay.
733          *
734          * It's important that we do this first!  Otherwise we might exit the
735          * function without doing so, and try to replay a failed create.
736          * (b=3440)
737          */
738         if (it->it_op & IT_OPEN && req->rq_replay &&
739             (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
740                 mdc_clear_replay_flag(req, it->it_status);
741
742         DEBUG_REQ(D_RPCTRACE, req, "op=%x disposition=%x, status=%d",
743                   it->it_op, it->it_disposition, it->it_status);
744
745         /* We know what to expect, so we do any byte flipping required here */
746         if (it_has_reply_body(it)) {
747                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
748                 if (body == NULL) {
749                         rc = -EPROTO;
750                         CERROR("%s: cannot swab mdt_body: rc = %d\n",
751                                exp->exp_obd->obd_name, rc);
752                         RETURN(rc);
753                 }
754
755                 if (it_disposition(it, DISP_OPEN_OPEN) &&
756                     !it_open_error(DISP_OPEN_OPEN, it)) {
757                         /*
758                          * If this is a successful OPEN request, we need to set
759                          * replay handler and data early, so that if replay
760                          * happens immediately after swabbing below, new reply
761                          * is swabbed by that handler correctly.
762                          */
763                         mdc_set_open_replay_data(NULL, NULL, it);
764                 }
765
766                 if (it_disposition(it, DISP_OPEN_CREATE) &&
767                     !it_open_error(DISP_OPEN_CREATE, it)) {
768                         lprocfs_counter_incr(exp->exp_obd->obd_md_stats,
769                                              LPROC_MD_CREATE);
770                 }
771
772                 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
773                         void *eadata;
774
775                         mdc_update_max_ea_from_body(exp, body);
776
777                         /*
778                          * The eadata is opaque; just check that it is there.
779                          * Eventually, obd_unpackmd() will check the contents.
780                          */
781                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
782                                                         body->mbo_eadatasize);
783                         if (eadata == NULL)
784                                 RETURN(-EPROTO);
785
786                         /* save LVB data and length if for layout lock */
787                         lvb_data = eadata;
788                         lvb_len = body->mbo_eadatasize;
789
790                         /*
791                          * We save the reply LOV EA in case we have to replay a
792                          * create for recovery.  If we didn't allocate a large
793                          * enough request buffer above we need to reallocate it
794                          * here to hold the actual LOV EA.
795                          *
796                          * To not save LOV EA if request is not going to replay
797                          * (for example error one).
798                          */
799                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
800                                 rc = mdc_save_lovea(req, eadata,
801                                                     body->mbo_eadatasize);
802                                 if (rc) {
803                                         body->mbo_valid &= ~OBD_MD_FLEASIZE;
804                                         body->mbo_eadatasize = 0;
805                                         rc = 0;
806                                 }
807                         }
808                 }
809         } else if (it->it_op & IT_LAYOUT) {
810                 /* maybe the lock was granted right away and layout
811                  * is packed into RMF_DLM_LVB of req
812                  */
813                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
814                 CDEBUG(D_INFO, "%s: layout return lvb %d transno %lld\n",
815                        class_exp2obd(exp)->obd_name, lvb_len, req->rq_transno);
816                 if (lvb_len > 0) {
817                         lvb_data = req_capsule_server_sized_get(pill,
818                                                         &RMF_DLM_LVB, lvb_len);
819                         if (lvb_data == NULL)
820                                 RETURN(-EPROTO);
821
822                         /**
823                          * save replied layout data to the request buffer for
824                          * recovery consideration (lest MDS reinitialize
825                          * another set of OST objects).
826                          */
827                         if (req->rq_transno)
828                                 (void)mdc_save_lovea(req, lvb_data, lvb_len);
829                 }
830         }
831
832         /* fill in stripe data for layout lock.
833          * LU-6581: trust layout data only if layout lock is granted. The MDT
834          * has stopped sending layout unless the layout lock is granted. The
835          * client still does this checking in case it's talking with an old
836          * server. - Jinshan
837          */
838         lock = ldlm_handle2lock(lockh);
839         if (lock == NULL)
840                 RETURN(rc);
841
842         if (ldlm_has_layout(lock) && lvb_data != NULL &&
843             !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
844                 void *lmm;
845
846                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
847                         ldlm_it2str(it->it_op), lvb_len);
848
849                 OBD_ALLOC_LARGE(lmm, lvb_len);
850                 if (lmm == NULL)
851                         GOTO(out_lock, rc = -ENOMEM);
852
853                 memcpy(lmm, lvb_data, lvb_len);
854
855                 /* install lvb_data */
856                 lock_res_and_lock(lock);
857                 if (lock->l_lvb_data == NULL) {
858                         lock->l_lvb_type = LVB_T_LAYOUT;
859                         lock->l_lvb_data = lmm;
860                         lock->l_lvb_len = lvb_len;
861                         lmm = NULL;
862                 }
863                 unlock_res_and_lock(lock);
864                 if (lmm != NULL)
865                         OBD_FREE_LARGE(lmm, lvb_len);
866         }
867
868         if (ldlm_has_dom(lock)) {
869                 LASSERT(lock->l_glimpse_ast == mdc_ldlm_glimpse_ast);
870
871                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
872                 if (!(body->mbo_valid & OBD_MD_DOM_SIZE)) {
873                         LDLM_ERROR(lock, "%s: DoM lock without size.",
874                                    exp->exp_obd->obd_name);
875                         GOTO(out_lock, rc = -EPROTO);
876                 }
877
878                 LDLM_DEBUG(lock, "DoM lock is returned by: %s, size: %llu",
879                            ldlm_it2str(it->it_op), body->mbo_dom_size);
880
881                 lock_res_and_lock(lock);
882                 mdc_body2lvb(body, &lock->l_ost_lvb);
883                 ldlm_lock_allow_match_locked(lock);
884                 unlock_res_and_lock(lock);
885         }
886 out_lock:
887         LDLM_LOCK_PUT(lock);
888
889         RETURN(rc);
890 }
891
892 static inline bool mdc_skip_mod_rpc_slot(const struct lookup_intent *it)
893 {
894         if (it != NULL &&
895             (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
896              it->it_op == IT_READDIR ||
897              (it->it_op == IT_LAYOUT && !(it->it_flags & MDS_FMODE_WRITE))))
898                 return true;
899         return false;
900 }
901
902 /* We always reserve enough space in the reply packet for a stripe MD, because
903  * we don't know in advance the file type.
904  */
905 static int mdc_enqueue_base(struct obd_export *exp,
906                             struct ldlm_enqueue_info *einfo,
907                             const union ldlm_policy_data *policy,
908                             struct lookup_intent *it,
909                             struct md_op_data *op_data,
910                             struct lustre_handle *lockh,
911                             __u64 extra_lock_flags)
912 {
913         struct obd_device *obd = class_exp2obd(exp);
914         struct ptlrpc_request *req;
915         __u64 flags, saved_flags = extra_lock_flags;
916         struct ldlm_res_id res_id;
917         static const union ldlm_policy_data lookup_policy = {
918                                   .l_inodebits = { MDS_INODELOCK_LOOKUP } };
919         static const union ldlm_policy_data update_policy = {
920                                   .l_inodebits = { MDS_INODELOCK_UPDATE } };
921         static const union ldlm_policy_data layout_policy = {
922                                   .l_inodebits = { MDS_INODELOCK_LAYOUT } };
923         static const union ldlm_policy_data getxattr_policy = {
924                                   .l_inodebits = { MDS_INODELOCK_XATTR } };
925         int generation, resends = 0;
926         struct ldlm_reply *lockrep;
927         struct obd_import *imp = class_exp2cliimp(exp);
928         __u32 acl_bufsize;
929         enum lvb_type lvb_type = 0;
930         int rc;
931
932         ENTRY;
933         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
934                  einfo->ei_type);
935         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
936
937         if (it != NULL) {
938                 LASSERT(policy == NULL);
939
940                 saved_flags |= LDLM_FL_HAS_INTENT;
941                 if (it->it_op & (IT_GETATTR | IT_READDIR))
942                         policy = &update_policy;
943                 else if (it->it_op & IT_LAYOUT)
944                         policy = &layout_policy;
945                 else if (it->it_op & IT_GETXATTR)
946                         policy = &getxattr_policy;
947                 else
948                         policy = &lookup_policy;
949         }
950
951         generation = obd->u.cli.cl_import->imp_generation;
952         if (!it || (it->it_op & (IT_OPEN | IT_CREAT)))
953                 acl_bufsize = min_t(__u32, imp->imp_connect_data.ocd_max_easize,
954                                     XATTR_SIZE_MAX);
955         else
956                 acl_bufsize = LUSTRE_POSIX_ACL_MAX_SIZE_OLD;
957
958 resend:
959         flags = saved_flags;
960         if (it == NULL) {
961                 /* The only way right now is FLOCK. */
962                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
963                          einfo->ei_type);
964                 res_id.name[3] = LDLM_FLOCK;
965                 req = ldlm_enqueue_pack(exp, 0);
966         } else if (it->it_op & IT_OPEN) {
967                 req = mdc_intent_open_pack(exp, it, op_data, acl_bufsize);
968         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
969                 req = mdc_intent_getattr_pack(exp, it, op_data, acl_bufsize);
970         } else if (it->it_op & IT_READDIR) {
971                 req = mdc_enqueue_pack(exp, 0);
972         } else if (it->it_op & IT_LAYOUT) {
973                 if (!imp_connect_lvb_type(imp))
974                         RETURN(-EOPNOTSUPP);
975                 req = mdc_intent_layout_pack(exp, it, op_data);
976                 lvb_type = LVB_T_LAYOUT;
977         } else if (it->it_op & IT_GETXATTR) {
978                 req = mdc_intent_getxattr_pack(exp, it, op_data);
979         } else {
980                 LBUG();
981                 RETURN(-EINVAL);
982         }
983
984         if (IS_ERR(req))
985                 RETURN(PTR_ERR(req));
986
987         if (resends) {
988                 req->rq_generation_set = 1;
989                 req->rq_import_generation = generation;
990                 req->rq_sent = ktime_get_real_seconds() + resends;
991         }
992
993         einfo->ei_enq_slot = !mdc_skip_mod_rpc_slot(it);
994
995         /* With Data-on-MDT the glimpse callback is needed too.
996          * It is set here in advance but not in mdc_finish_enqueue()
997          * to avoid possible races. It is safe to have glimpse handler
998          * for non-DOM locks and costs nothing.
999          */
1000         if (einfo->ei_cb_gl == NULL)
1001                 einfo->ei_cb_gl = mdc_ldlm_glimpse_ast;
1002
1003         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
1004                               0, lvb_type, lockh, 0);
1005
1006         if (!it) {
1007                 /* For flock requests we immediatelly return without further
1008                  * delay and let caller deal with the rest, since rest of
1009                  * this function metadata processing makes no sense for flock
1010                  * requests anyway. But in case of problem during comms with
1011                  * server (-ETIMEDOUT) or any signal/kill attempt (-EINTR),
1012                  * we cannot rely on caller and this mainly for F_UNLCKs
1013                  * (explicits or automatically generated by kernel to clean
1014                  * current flocks upon exit) that can't be trashed.
1015                  */
1016                 ptlrpc_req_finished(req);
1017                 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
1018                     (einfo->ei_type == LDLM_FLOCK) &&
1019                     (einfo->ei_mode == LCK_NL))
1020                         goto resend;
1021                 RETURN(rc);
1022         }
1023
1024         if (rc < 0) {
1025                 CDEBUG(D_INFO,
1026                       "%s: ldlm_cli_enqueue "DFID":"DFID"=%s failed: rc = %d\n",
1027                       obd->obd_name, PFID(&op_data->op_fid1),
1028                       PFID(&op_data->op_fid2), op_data->op_name ?: "", rc);
1029
1030                 mdc_clear_replay_flag(req, rc);
1031                 ptlrpc_req_finished(req);
1032                 RETURN(rc);
1033         }
1034
1035         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1036         LASSERT(lockrep != NULL);
1037
1038         lockrep->lock_policy_res2 =
1039                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1040
1041         /* Retry infinitely when the server returns -EINPROGRESS for the
1042          * intent operation, when server returns -EINPROGRESS for acquiring
1043          * intent lock, we'll retry in after_reply().
1044          */
1045         if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
1046                 mdc_clear_replay_flag(req, rc);
1047                 ptlrpc_req_finished(req);
1048                 if (generation == obd->u.cli.cl_import->imp_generation) {
1049                         if (signal_pending(current))
1050                                 RETURN(-EINTR);
1051
1052                         resends++;
1053                         CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
1054                                obd->obd_name, resends, it->it_op,
1055                                PFID(&op_data->op_fid1),
1056                                PFID(&op_data->op_fid2));
1057                         goto resend;
1058                 } else {
1059                         CDEBUG(D_HA, "resend cross eviction\n");
1060                         RETURN(-EIO);
1061                 }
1062         }
1063
1064         if ((int)lockrep->lock_policy_res2 == -ERANGE &&
1065             it->it_op & (IT_OPEN | IT_GETATTR | IT_LOOKUP) &&
1066             acl_bufsize == LUSTRE_POSIX_ACL_MAX_SIZE_OLD) {
1067                 mdc_clear_replay_flag(req, -ERANGE);
1068                 ptlrpc_req_finished(req);
1069                 acl_bufsize = min_t(__u32, imp->imp_connect_data.ocd_max_easize,
1070                                     XATTR_SIZE_MAX);
1071                 goto resend;
1072         }
1073
1074         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1075         if (rc < 0) {
1076                 if (lustre_handle_is_used(lockh)) {
1077                         ldlm_lock_decref(lockh, einfo->ei_mode);
1078                         memset(lockh, 0, sizeof(*lockh));
1079                 }
1080                 ptlrpc_req_finished(req);
1081
1082                 it->it_lock_handle = 0;
1083                 it->it_lock_mode = 0;
1084                 it->it_request = NULL;
1085         }
1086
1087         RETURN(rc);
1088 }
1089
1090 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
1091                 const union ldlm_policy_data *policy,
1092                 struct md_op_data *op_data,
1093                 struct lustre_handle *lockh, __u64 extra_lock_flags)
1094 {
1095         return mdc_enqueue_base(exp, einfo, policy, NULL,
1096                                 op_data, lockh, extra_lock_flags);
1097 }
1098
1099 static int mdc_finish_intent_lock(struct obd_export *exp,
1100                                   struct ptlrpc_request *request,
1101                                   struct md_op_data *op_data,
1102                                   struct lookup_intent *it,
1103                                   struct lustre_handle *lockh)
1104 {
1105         struct lustre_handle old_lock;
1106         struct ldlm_lock *lock;
1107         int rc = 0;
1108
1109         ENTRY;
1110         LASSERT(request != NULL);
1111         LASSERT(request != LP_POISON);
1112         LASSERT(request->rq_repmsg != LP_POISON);
1113
1114         if (it->it_op & IT_READDIR)
1115                 RETURN(0);
1116
1117         if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) {
1118                 if (it->it_status != 0)
1119                         GOTO(out, rc = it->it_status);
1120         } else {
1121                 if (!it_disposition(it, DISP_IT_EXECD)) {
1122                         /* The server failed before it even started executing
1123                          * the intent, i.e. because it couldn't unpack the
1124                          * request.
1125                          */
1126                         LASSERT(it->it_status != 0);
1127                         GOTO(out, rc = it->it_status);
1128                 }
1129                 rc = it_open_error(DISP_IT_EXECD, it);
1130                 if (rc)
1131                         GOTO(out, rc);
1132
1133                 rc = it_open_error(DISP_LOOKUP_EXECD, it);
1134                 if (rc)
1135                         GOTO(out, rc);
1136
1137                 /* keep requests around for the multiple phases of the call
1138                  * this shows the DISP_XX must guarantee we make it into the
1139                  * call
1140                  */
1141                 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
1142                     it_disposition(it, DISP_OPEN_CREATE) &&
1143                     !it_open_error(DISP_OPEN_CREATE, it)) {
1144                         it_set_disposition(it, DISP_ENQ_CREATE_REF);
1145                         /* balanced in ll_create_node */
1146                         ptlrpc_request_addref(request);
1147                 }
1148                 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1149                     it_disposition(it, DISP_OPEN_OPEN) &&
1150                     !it_open_error(DISP_OPEN_OPEN, it)) {
1151                         it_set_disposition(it, DISP_ENQ_OPEN_REF);
1152                         /* balanced in ll_file_open */
1153                         ptlrpc_request_addref(request);
1154                         /* eviction in middle of open RPC processing b=11546 */
1155                         OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE,
1156                                          obd_timeout);
1157                 }
1158
1159                 if (it->it_op & IT_CREAT) {
1160                         /* XXX this belongs in ll_create_it */
1161                 } else if (it->it_op == IT_OPEN) {
1162                         LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1163                 } else {
1164                         LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
1165                 }
1166         }
1167
1168         /* If we already have a matching lock, then cancel the new
1169          * one.  We have to set the data here instead of in
1170          * mdc_enqueue, because we need to use the child's inode as
1171          * the l_ast_data to match, and that's not available until
1172          * intent_finish has performed the iget().
1173          */
1174         lock = ldlm_handle2lock(lockh);
1175         if (lock) {
1176                 union ldlm_policy_data policy = lock->l_policy_data;
1177
1178                 LDLM_DEBUG(lock, "matching against this");
1179
1180                 if (it_has_reply_body(it)) {
1181                         struct mdt_body *body;
1182
1183                         body = req_capsule_server_get(&request->rq_pill,
1184                                                       &RMF_MDT_BODY);
1185                         /* mdc_enqueue checked */
1186                         LASSERT(body != NULL);
1187                         LASSERTF(fid_res_name_eq(&body->mbo_fid1,
1188                                                  &lock->l_resource->lr_name),
1189                                  "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1190                                  PLDLMRES(lock->l_resource),
1191                                  PFID(&body->mbo_fid1));
1192                 }
1193                 LDLM_LOCK_PUT(lock);
1194
1195                 memcpy(&old_lock, lockh, sizeof(*lockh));
1196                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1197                                    LDLM_IBITS, &policy, LCK_NL, &old_lock)) {
1198                         ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
1199                         memcpy(lockh, &old_lock, sizeof(old_lock));
1200                         it->it_lock_handle = lockh->cookie;
1201                 }
1202         }
1203
1204         EXIT;
1205 out:
1206         CDEBUG(D_DENTRY,
1207                "D_IT dentry=%.*s intent=%s status=%d disp=%x: rc = %d\n",
1208                 (int)op_data->op_namelen, op_data->op_name,
1209                 ldlm_it2str(it->it_op), it->it_status, it->it_disposition, rc);
1210
1211         return rc;
1212 }
1213
1214 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1215                         struct lu_fid *fid, __u64 *bits)
1216 {
1217         /* We could just return 1 immediately, but as we should only be called
1218          * in revalidate_it if we already have a lock, let's verify that.
1219          */
1220         struct ldlm_res_id res_id;
1221         struct lustre_handle lockh;
1222         union ldlm_policy_data policy;
1223         enum ldlm_mode mode;
1224
1225         ENTRY;
1226         if (it->it_lock_handle) {
1227                 lockh.cookie = it->it_lock_handle;
1228                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1229         } else {
1230                 fid_build_reg_res_name(fid, &res_id);
1231                 switch (it->it_op) {
1232                 case IT_GETATTR:
1233                         /* File attributes are held under multiple bits:
1234                          * nlink is under lookup lock, size and times are
1235                          * under UPDATE lock and recently we've also got
1236                          * a separate permissions lock for owner/group/acl that
1237                          * were protected by lookup lock before.
1238                          * Getattr must provide all of that information,
1239                          * so we need to ensure we have all of those locks.
1240                          * Unfortunately, if the bits are split across multiple
1241                          * locks, there's no easy way to match all of them here,
1242                          * so an extra RPC would be performed to fetch all
1243                          * of those bits at once for now.
1244                          */
1245                         /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1246                          * but for old MDTs (< 2.4), permission is covered
1247                          * by LOOKUP lock, so it needs to match all bits here.
1248                          */
1249                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1250                                                   MDS_INODELOCK_PERM;
1251                         break;
1252                 case IT_READDIR:
1253                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1254                         break;
1255                 case IT_LAYOUT:
1256                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1257                         break;
1258                 default:
1259                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1260                         break;
1261                 }
1262
1263                 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1264                                       LDLM_IBITS, &policy,
1265                                       LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1266                                       &lockh);
1267         }
1268
1269         if (mode) {
1270                 it->it_lock_handle = lockh.cookie;
1271                 it->it_lock_mode = mode;
1272         } else {
1273                 it->it_lock_handle = 0;
1274                 it->it_lock_mode = 0;
1275         }
1276
1277         RETURN(!!mode);
1278 }
1279
1280 /*
1281  * This long block is all about fixing up the lock and request state
1282  * so that it is correct as of the moment _before_ the operation was
1283  * applied; that way, the VFS will think that everything is normal and
1284  * call Lustre's regular VFS methods.
1285  *
1286  * If we're performing a creation, that means that unless the creation
1287  * failed with EEXIST, we should fake up a negative dentry.
1288  *
1289  * For everything else, we want to lookup to succeed.
1290  *
1291  * One additional note: if CREATE or OPEN succeeded, we add an extra
1292  * reference to the request because we need to keep it around until
1293  * ll_create/ll_open gets called.
1294  *
1295  * The server will return to us, in it_disposition, an indication of
1296  * exactly what it_status refers to.
1297  *
1298  * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1299  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1300  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1301  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1302  * was successful.
1303  *
1304  * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1305  * child lookup.
1306  */
1307 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1308                     struct lookup_intent *it, struct ptlrpc_request **reqp,
1309                     ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1310 {
1311         struct ldlm_enqueue_info einfo = {
1312                 .ei_type        = LDLM_IBITS,
1313                 .ei_mode        = it_to_lock_mode(it),
1314                 .ei_cb_bl       = cb_blocking,
1315                 .ei_cb_cp       = ldlm_completion_ast,
1316                 .ei_cb_gl       = mdc_ldlm_glimpse_ast,
1317         };
1318         struct lustre_handle lockh;
1319         int rc = 0;
1320
1321         ENTRY;
1322         LASSERT(it);
1323         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1324                 ", intent: %s flags %#llo\n", (int)op_data->op_namelen,
1325                 op_data->op_name, PFID(&op_data->op_fid2),
1326                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1327                 it->it_flags);
1328
1329         lockh.cookie = 0;
1330         if (fid_is_sane(&op_data->op_fid2) &&
1331             (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1332                 /* We could just return 1 immediately, but since we should only
1333                  * be called in revalidate_it if we already have a lock, let's
1334                  * verify that.
1335                  */
1336                 it->it_lock_handle = 0;
1337                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1338                 /* Only return failure if it was not GETATTR by cfid
1339                  * (from inode_revalidate()).
1340                  */
1341                 if (rc || op_data->op_namelen != 0)
1342                         RETURN(rc);
1343         }
1344
1345         /* For case if upper layer did not alloc fid, do it now. */
1346         if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1347                 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1348                 if (rc < 0) {
1349                         CERROR("%s: cannot allocate new FID: rc=%d\n",
1350                                exp->exp_obd->obd_name, rc);
1351                         RETURN(rc);
1352                 }
1353         }
1354
1355         rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh,
1356                               extra_lock_flags);
1357         if (rc < 0)
1358                 RETURN(rc);
1359
1360         *reqp = it->it_request;
1361         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1362         RETURN(rc);
1363 }
1364
1365 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1366                                               struct ptlrpc_request *req,
1367                                               void *args, int rc)
1368 {
1369         struct mdc_getattr_args *ga = args;
1370         struct obd_export *exp = ga->ga_exp;
1371         struct md_enqueue_info *minfo = ga->ga_minfo;
1372         struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1373         struct lookup_intent *it = &minfo->mi_it;
1374         struct lustre_handle *lockh = &minfo->mi_lockh;
1375         struct ldlm_reply *lockrep;
1376         __u64 flags = LDLM_FL_HAS_INTENT;
1377
1378         ENTRY;
1379         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1380                 rc = -ETIMEDOUT;
1381
1382         rc = ldlm_cli_enqueue_fini(exp, req, einfo, 1, &flags, NULL, 0,
1383                                    lockh, rc);
1384         if (rc < 0) {
1385                 CERROR("%s: ldlm_cli_enqueue_fini() failed: rc = %d\n",
1386                        exp->exp_obd->obd_name, rc);
1387                 mdc_clear_replay_flag(req, rc);
1388                 GOTO(out, rc);
1389         }
1390
1391         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1392         LASSERT(lockrep != NULL);
1393
1394         lockrep->lock_policy_res2 =
1395                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1396
1397         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1398         if (rc)
1399                 GOTO(out, rc);
1400
1401         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1402         EXIT;
1403
1404 out:
1405         minfo->mi_cb(req, minfo, rc);
1406         return 0;
1407 }
1408
1409 int mdc_intent_getattr_async(struct obd_export *exp,
1410                              struct md_enqueue_info *minfo)
1411 {
1412         struct md_op_data *op_data = &minfo->mi_data;
1413         struct lookup_intent *it = &minfo->mi_it;
1414         struct ptlrpc_request *req;
1415         struct mdc_getattr_args *ga;
1416         struct ldlm_res_id res_id;
1417         union ldlm_policy_data policy = {
1418                 .l_inodebits = { MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE }
1419         };
1420         __u64 flags = LDLM_FL_HAS_INTENT;
1421         int rc = 0;
1422
1423         ENTRY;
1424         CDEBUG(D_DLMTRACE,
1425                "name: %.*s in inode "DFID", intent: %s flags %#llo\n",
1426                (int)op_data->op_namelen, op_data->op_name,
1427                PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1428
1429         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1430         /* If the MDT return -ERANGE because of large ACL, then the sponsor
1431          * of the async getattr RPC will handle that by itself.
1432          */
1433         req = mdc_intent_getattr_pack(exp, it, op_data,
1434                                       LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
1435         if (IS_ERR(req))
1436                 RETURN(PTR_ERR(req));
1437
1438         /* With Data-on-MDT the glimpse callback is needed too.
1439          * It is set here in advance but not in mdc_finish_enqueue()
1440          * to avoid possible races. It is safe to have glimpse handler
1441          * for non-DOM locks and costs nothing.
1442          */
1443         if (minfo->mi_einfo.ei_cb_gl == NULL)
1444                 minfo->mi_einfo.ei_cb_gl = mdc_ldlm_glimpse_ast;
1445
1446         rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1447                               &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1448         if (rc < 0) {
1449                 ptlrpc_req_finished(req);
1450                 RETURN(rc);
1451         }
1452
1453         ga = ptlrpc_req_async_args(ga, req);
1454         ga->ga_exp = exp;
1455         ga->ga_minfo = minfo;
1456
1457         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1458         ptlrpcd_add_req(req);
1459
1460         RETURN(0);
1461 }