Whamcloud - gitweb
LU-6142 mdc: minor function cleanups.
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_MDC
34
35 #include <linux/module.h>
36
37 #include <obd.h>
38 #include <obd_class.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_intent.h>
42 #include <lustre_mdc.h>
43 #include <lustre_net.h>
44 #include <lustre_req_layout.h>
45 #include <lustre_swab.h>
46 #include <lustre_acl.h>
47
48 #include "mdc_internal.h"
49
50 struct mdc_getattr_args {
51         struct obd_export               *ga_exp;
52         struct md_enqueue_info          *ga_minfo;
53 };
54
55 int it_open_error(int phase, struct lookup_intent *it)
56 {
57         if (it_disposition(it, DISP_OPEN_LEASE)) {
58                 if (phase >= DISP_OPEN_LEASE)
59                         return it->it_status;
60                 else
61                         return 0;
62         }
63         if (it_disposition(it, DISP_OPEN_OPEN)) {
64                 if (phase >= DISP_OPEN_OPEN)
65                         return it->it_status;
66                 else
67                         return 0;
68         }
69
70         if (it_disposition(it, DISP_OPEN_CREATE)) {
71                 if (phase >= DISP_OPEN_CREATE)
72                         return it->it_status;
73                 else
74                         return 0;
75         }
76
77         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
78                 if (phase >= DISP_LOOKUP_EXECD)
79                         return it->it_status;
80                 else
81                         return 0;
82         }
83
84         if (it_disposition(it, DISP_IT_EXECD)) {
85                 if (phase >= DISP_IT_EXECD)
86                         return it->it_status;
87                 else
88                         return 0;
89         }
90
91         CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
92         LBUG();
93
94         return 0;
95 }
96 EXPORT_SYMBOL(it_open_error);
97
98 /* this must be called on a lockh that is known to have a referenced lock */
99 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
100                       void *data, __u64 *bits)
101 {
102         struct ldlm_lock *lock;
103         struct inode *new_inode = data;
104
105         ENTRY;
106         if (bits)
107                 *bits = 0;
108
109         if (!lustre_handle_is_used(lockh))
110                 RETURN(0);
111
112         lock = ldlm_handle2lock(lockh);
113
114         LASSERT(lock != NULL);
115         lock_res_and_lock(lock);
116         if (lock->l_resource->lr_lvb_inode &&
117             lock->l_resource->lr_lvb_inode != data) {
118                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
119
120                 LASSERTF(old_inode->i_state & I_FREEING,
121                          "Found existing inode %p/%lu/%u state %lu in lock: setting data to %p/%lu/%u\n",
122                          old_inode, old_inode->i_ino, old_inode->i_generation,
123                          old_inode->i_state,
124                          new_inode, new_inode->i_ino, new_inode->i_generation);
125         }
126         lock->l_resource->lr_lvb_inode = new_inode;
127         if (bits)
128                 *bits = lock->l_policy_data.l_inodebits.bits;
129
130         unlock_res_and_lock(lock);
131         LDLM_LOCK_PUT(lock);
132
133         RETURN(0);
134 }
135
136 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
137                               const struct lu_fid *fid, enum ldlm_type type,
138                               union ldlm_policy_data *policy,
139                               enum ldlm_mode mode, struct lustre_handle *lockh)
140 {
141         struct ldlm_res_id res_id;
142         enum ldlm_mode rc;
143
144         ENTRY;
145         fid_build_reg_res_name(fid, &res_id);
146         /* LU-4405: Clear bits not supported by server */
147         policy->l_inodebits.bits &= exp_connect_ibits(exp);
148         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
149                              &res_id, type, policy, mode, lockh);
150         RETURN(rc);
151 }
152
153 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
154                       union ldlm_policy_data *policy, enum ldlm_mode mode,
155                       enum ldlm_cancel_flags flags, void *opaque)
156 {
157         struct obd_device *obd = class_exp2obd(exp);
158         struct ldlm_res_id res_id;
159         int rc;
160
161         ENTRY;
162         fid_build_reg_res_name(fid, &res_id);
163         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
164                                              policy, mode, flags, opaque);
165         RETURN(rc);
166 }
167
168 int mdc_null_inode(struct obd_export *exp,
169                    const struct lu_fid *fid)
170 {
171         struct ldlm_res_id res_id;
172         struct ldlm_resource *res;
173         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
174
175         ENTRY;
176         LASSERTF(ns != NULL, "no namespace passed\n");
177
178         fid_build_reg_res_name(fid, &res_id);
179
180         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
181         if (IS_ERR(res))
182                 RETURN(0);
183
184         lock_res(res);
185         res->lr_lvb_inode = NULL;
186         unlock_res(res);
187
188         ldlm_resource_putref(res);
189         RETURN(0);
190 }
191
192 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
193 {
194         /* Don't hold error requests for replay. */
195         if (req->rq_replay) {
196                 spin_lock(&req->rq_lock);
197                 req->rq_replay = 0;
198                 spin_unlock(&req->rq_lock);
199         }
200         if (rc && req->rq_transno != 0) {
201                 DEBUG_REQ(D_ERROR, req, "transno returned on error: rc = %d",
202                           rc);
203                 LBUG();
204         }
205 }
206
207 /**
208  * Save a large LOV EA into the request buffer so that it is available
209  * for replay.  We don't do this in the initial request because the
210  * original request doesn't need this buffer (at most it sends just the
211  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
212  * buffer and may also be difficult to allocate and save a very large
213  * request buffer for each open. (b=5707)
214  *
215  * OOM here may cause recovery failure if lmm is needed (only for the
216  * original open if the MDS crashed just when this client also OOM'd)
217  * but this is incredibly unlikely, and questionable whether the client
218  * could do MDS recovery under OOM anyways...
219  */
220 static int mdc_save_lovea(struct ptlrpc_request *req, void *data, u32 size)
221 {
222         struct req_capsule *pill = &req->rq_pill;
223         void *lovea;
224         int rc = 0;
225
226         if (req_capsule_get_size(pill, &RMF_EADATA, RCL_CLIENT) < size) {
227                 rc = sptlrpc_cli_enlarge_reqbuf(req, &RMF_EADATA, size);
228                 if (rc) {
229                         CERROR("%s: Can't enlarge ea size to %d: rc = %d\n",
230                                req->rq_export->exp_obd->obd_name,
231                                size, rc);
232                         return rc;
233                 }
234         } else {
235                 req_capsule_shrink(pill, &RMF_EADATA, size, RCL_CLIENT);
236         }
237
238         req_capsule_set_size(pill, &RMF_EADATA, RCL_CLIENT, size);
239         lovea = req_capsule_client_get(pill, &RMF_EADATA);
240         if (lovea) {
241                 memcpy(lovea, data, size);
242                 lov_fix_ea_for_replay(lovea);
243         }
244
245         return rc;
246 }
247
248 static struct ptlrpc_request *
249 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
250                      struct md_op_data *op_data, __u32 acl_bufsize)
251 {
252         struct ptlrpc_request *req;
253         struct obd_device *obd = class_exp2obd(exp);
254         struct ldlm_intent *lit;
255         const void *lmm = op_data->op_data;
256         __u32 lmmsize = op_data->op_data_size;
257         __u32  mdt_md_capsule_size;
258         LIST_HEAD(cancels);
259         int count = 0;
260         enum ldlm_mode mode;
261         int repsize, repsize_estimate;
262         int rc;
263
264         ENTRY;
265
266         mdt_md_capsule_size = obd->u.cli.cl_default_mds_easize;
267
268         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
269
270         /* XXX: openlock is not cancelled for cross-refs. */
271         /* If inode is known, cancel conflicting OPEN locks. */
272         if (fid_is_sane(&op_data->op_fid2)) {
273                 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
274                         if (it->it_flags & MDS_FMODE_WRITE)
275                                 mode = LCK_EX;
276                         else
277                                 mode = LCK_PR;
278                 } else {
279                         if (it->it_flags & (MDS_FMODE_WRITE | MDS_OPEN_TRUNC))
280                                 mode = LCK_CW;
281 #ifdef FMODE_EXEC
282                         else if (it->it_flags & FMODE_EXEC)
283                                 mode = LCK_PR;
284 #endif
285                         else
286                                 mode = LCK_CR;
287                 }
288                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
289                                                 &cancels, mode,
290                                                 MDS_INODELOCK_OPEN);
291         }
292
293         /* If CREATE, cancel parent's UPDATE lock. */
294         if (it->it_op & IT_CREAT)
295                 mode = LCK_EX;
296         else
297                 mode = LCK_CR;
298         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
299                                          &cancels, mode,
300                                          MDS_INODELOCK_UPDATE);
301
302         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
303                                    &RQF_LDLM_INTENT_OPEN);
304         if (req == NULL) {
305                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
306                 RETURN(ERR_PTR(-ENOMEM));
307         }
308
309         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
310                              op_data->op_namelen + 1);
311         if (cl_is_lov_delay_create(it->it_flags)) {
312                 /* open(O_LOV_DELAY_CREATE) won't pack lmm */
313                 LASSERT(lmmsize == 0);
314                 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
315         } else {
316                 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
317                              max(lmmsize, obd->u.cli.cl_default_mds_easize));
318         }
319
320         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
321                              RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
322                              op_data->op_file_secctx_name_size : 0);
323
324         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
325                              op_data->op_file_secctx_size);
326
327         req_capsule_set_size(&req->rq_pill, &RMF_FILE_ENCCTX, RCL_CLIENT,
328                              op_data->op_file_encctx_size);
329
330         /* get SELinux policy info if any */
331         rc = sptlrpc_get_sepol(req);
332         if (rc < 0) {
333                 ptlrpc_request_free(req);
334                 RETURN(ERR_PTR(rc));
335         }
336         req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
337                              strlen(req->rq_sepol) ?
338                              strlen(req->rq_sepol) + 1 : 0);
339
340         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
341         if (rc < 0) {
342                 ptlrpc_request_free(req);
343                 RETURN(ERR_PTR(rc));
344         }
345
346         spin_lock(&req->rq_lock);
347         req->rq_replay = req->rq_import->imp_replayable;
348         spin_unlock(&req->rq_lock);
349
350         /* pack the intent */
351         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
352         lit->opc = (__u64)it->it_op;
353
354         /* pack the intended request */
355         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
356                       lmmsize);
357
358         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
359                              mdt_md_capsule_size);
360         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
361
362         if (!(it->it_op & IT_CREAT) && it->it_op & IT_OPEN &&
363             req_capsule_has_field(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
364                                   RCL_CLIENT) &&
365             op_data->op_file_secctx_name_size > 0 &&
366             op_data->op_file_secctx_name != NULL) {
367                 char *secctx_name;
368
369                 secctx_name = req_capsule_client_get(&req->rq_pill,
370                                                      &RMF_FILE_SECCTX_NAME);
371                 memcpy(secctx_name, op_data->op_file_secctx_name,
372                        op_data->op_file_secctx_name_size);
373                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
374                                      RCL_SERVER,
375                                      obd->u.cli.cl_max_mds_easize);
376
377                 CDEBUG(D_SEC, "packed '%.*s' as security xattr name\n",
378                        op_data->op_file_secctx_name_size,
379                        op_data->op_file_secctx_name);
380
381         } else {
382                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
383                                      RCL_SERVER, 0);
384         }
385
386         if (exp_connect_encrypt(exp) && !(it->it_op & IT_CREAT) &&
387             it->it_op & IT_OPEN)
388                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_ENCCTX,
389                                      RCL_SERVER,
390                                      obd->u.cli.cl_max_mds_easize);
391         else
392                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_ENCCTX,
393                                      RCL_SERVER, 0);
394
395         /**
396          * Inline buffer for possible data from Data-on-MDT files.
397          */
398         req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE, RCL_SERVER,
399                              sizeof(struct niobuf_remote));
400         ptlrpc_request_set_replen(req);
401
402         /* Get real repbuf allocated size as rounded up power of 2 */
403         repsize = size_roundup_power2(req->rq_replen +
404                                       lustre_msg_early_size());
405         /* Estimate free space for DoM files in repbuf */
406         repsize_estimate = repsize - (req->rq_replen -
407                            mdt_md_capsule_size +
408                            sizeof(struct lov_comp_md_v1) +
409                            sizeof(struct lov_comp_md_entry_v1) +
410                            lov_mds_md_size(0, LOV_MAGIC_V3));
411
412         if (repsize_estimate < obd->u.cli.cl_dom_min_inline_repsize) {
413                 repsize = obd->u.cli.cl_dom_min_inline_repsize -
414                           repsize_estimate + sizeof(struct niobuf_remote);
415                 req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE,
416                                      RCL_SERVER,
417                                      sizeof(struct niobuf_remote) + repsize);
418                 ptlrpc_request_set_replen(req);
419                 CDEBUG(D_INFO, "Increase repbuf by %d bytes, total: %d\n",
420                        repsize, req->rq_replen);
421                 repsize = size_roundup_power2(req->rq_replen +
422                                               lustre_msg_early_size());
423         }
424         /* The only way to report real allocated repbuf size to the server
425          * is the lm_repsize but it must be set prior buffer allocation itself
426          * due to security reasons - it is part of buffer used in signature
427          * calculation (see LU-11414). Therefore the saved size is predicted
428          * value as rq_replen rounded to the next higher power of 2.
429          * Such estimation is safe. Though the final allocated buffer might
430          * be even larger, it is not possible to know that at this point.
431          */
432         req->rq_reqmsg->lm_repsize = repsize;
433         RETURN(req);
434 }
435
436 #define GA_DEFAULT_EA_NAME_LEN   20
437 #define GA_DEFAULT_EA_VAL_LEN   250
438 #define GA_DEFAULT_EA_NUM        10
439
440 static struct ptlrpc_request *
441 mdc_intent_getxattr_pack(struct obd_export *exp, struct lookup_intent *it,
442                          struct md_op_data *op_data)
443 {
444         struct ptlrpc_request *req;
445         struct ldlm_intent *lit;
446         int rc, count = 0;
447         LIST_HEAD(cancels);
448         u32 ea_vals_buf_size = GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM;
449
450         ENTRY;
451         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
452                                         &RQF_LDLM_INTENT_GETXATTR);
453         if (req == NULL)
454                 RETURN(ERR_PTR(-ENOMEM));
455
456         /* get SELinux policy info if any */
457         rc = sptlrpc_get_sepol(req);
458         if (rc < 0) {
459                 ptlrpc_request_free(req);
460                 RETURN(ERR_PTR(rc));
461         }
462         req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
463                              strlen(req->rq_sepol) ?
464                              strlen(req->rq_sepol) + 1 : 0);
465
466         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
467         if (rc) {
468                 ptlrpc_request_free(req);
469                 RETURN(ERR_PTR(rc));
470         }
471
472         /* pack the intent */
473         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
474         lit->opc = IT_GETXATTR;
475         /* Message below is checked in sanity-selinux test_20d
476          * and sanity-sec test_49
477          */
478         CDEBUG(D_INFO, "%s: get xattrs for "DFID"\n",
479                exp->exp_obd->obd_name, PFID(&op_data->op_fid1));
480
481 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 0, 53, 0)
482         /* If the supplied buffer is too small then the server will return
483          * -ERANGE and llite will fallback to using non cached xattr
484          * operations. On servers before 2.10.1 a (non-cached) listxattr RPC
485          * for an orphan or dead file causes an oops. So let's try to avoid
486          * sending too small a buffer to too old a server. This is effectively
487          * undoing the memory conservation of LU-9417 when it would be *more*
488          * likely to crash the server. See LU-9856.
489          */
490         if (exp->exp_connect_data.ocd_version < OBD_OCD_VERSION(2, 10, 1, 0))
491                 ea_vals_buf_size = max_t(u32, ea_vals_buf_size,
492                                          exp->exp_connect_data.ocd_max_easize);
493 #endif
494
495         /* pack the intended request */
496         mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid,
497                       ea_vals_buf_size, -1, 0);
498
499         /* get SELinux policy info if any */
500         mdc_file_sepol_pack(req);
501
502         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
503                              GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM);
504
505         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER,
506                              ea_vals_buf_size);
507
508         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS, RCL_SERVER,
509                              sizeof(u32) * GA_DEFAULT_EA_NUM);
510
511         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
512
513         ptlrpc_request_set_replen(req);
514
515         RETURN(req);
516 }
517
518 static struct ptlrpc_request *
519 mdc_intent_getattr_pack(struct obd_export *exp, struct lookup_intent *it,
520                         struct md_op_data *op_data, __u32 acl_bufsize)
521 {
522         struct ptlrpc_request *req;
523         struct obd_device *obd = class_exp2obd(exp);
524         u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE |
525                     OBD_MD_FLDIREA | OBD_MD_MEA | OBD_MD_FLACL |
526                     OBD_MD_DEFAULT_MEA;
527         struct ldlm_intent *lit;
528         __u32 easize;
529         bool have_secctx = false;
530         int rc;
531
532         ENTRY;
533         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
534                                    &RQF_LDLM_INTENT_GETATTR);
535         if (req == NULL)
536                 RETURN(ERR_PTR(-ENOMEM));
537
538         /* send name of security xattr to get upon intent */
539         if (it->it_op & (IT_LOOKUP | IT_GETATTR) &&
540             req_capsule_has_field(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
541                                   RCL_CLIENT) &&
542             op_data->op_file_secctx_name_size > 0 &&
543             op_data->op_file_secctx_name != NULL) {
544                 have_secctx = true;
545                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
546                                      RCL_CLIENT,
547                                      op_data->op_file_secctx_name_size);
548         }
549
550         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
551                              op_data->op_namelen + 1);
552
553         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
554         if (rc) {
555                 ptlrpc_request_free(req);
556                 RETURN(ERR_PTR(rc));
557         }
558
559         /* pack the intent */
560         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
561         lit->opc = (__u64)it->it_op;
562
563         if (obd->u.cli.cl_default_mds_easize > 0)
564                 easize = obd->u.cli.cl_default_mds_easize;
565         else
566                 easize = obd->u.cli.cl_max_mds_easize;
567
568         /* pack the intended request */
569         mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
570
571         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
572         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
573         req_capsule_set_size(&req->rq_pill, &RMF_DEFAULT_MDT_MD, RCL_SERVER,
574                              sizeof(struct lmv_user_md));
575
576         if (have_secctx) {
577                 char *secctx_name;
578
579                 secctx_name = req_capsule_client_get(&req->rq_pill,
580                                                      &RMF_FILE_SECCTX_NAME);
581                 memcpy(secctx_name, op_data->op_file_secctx_name,
582                        op_data->op_file_secctx_name_size);
583
584                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
585                                      RCL_SERVER, easize);
586
587                 CDEBUG(D_SEC, "packed '%.*s' as security xattr name\n",
588                        op_data->op_file_secctx_name_size,
589                        op_data->op_file_secctx_name);
590         } else {
591                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
592                                      RCL_SERVER, 0);
593         }
594
595         if (exp_connect_encrypt(exp) && it->it_op & (IT_LOOKUP | IT_GETATTR))
596                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_ENCCTX,
597                                      RCL_SERVER, easize);
598         else
599                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_ENCCTX,
600                                      RCL_SERVER, 0);
601
602         ptlrpc_request_set_replen(req);
603         RETURN(req);
604 }
605
606 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
607                                                      struct lookup_intent *it,
608                                                      struct md_op_data *op_data)
609 {
610         struct obd_device *obd = class_exp2obd(exp);
611         struct ptlrpc_request *req;
612         struct ldlm_intent *lit;
613         struct layout_intent *layout;
614         LIST_HEAD(cancels);
615         int count = 0, rc;
616
617         ENTRY;
618         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
619                                 &RQF_LDLM_INTENT_LAYOUT);
620         if (req == NULL)
621                 RETURN(ERR_PTR(-ENOMEM));
622
623         if (fid_is_sane(&op_data->op_fid2) && (it->it_op & IT_LAYOUT) &&
624             (it->it_flags & FMODE_WRITE)) {
625                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
626                                                 &cancels, LCK_EX,
627                                                 MDS_INODELOCK_LAYOUT);
628         }
629
630         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
631         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
632         if (rc) {
633                 ptlrpc_request_free(req);
634                 RETURN(ERR_PTR(rc));
635         }
636
637         /* pack the intent */
638         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
639         lit->opc = (__u64)it->it_op;
640
641         /* pack the layout intent request */
642         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
643         LASSERT(op_data->op_data != NULL);
644         LASSERT(op_data->op_data_size == sizeof(*layout));
645         memcpy(layout, op_data->op_data, sizeof(*layout));
646
647         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
648                              obd->u.cli.cl_default_mds_easize);
649         ptlrpc_request_set_replen(req);
650         RETURN(req);
651 }
652
653 static struct ptlrpc_request *mdc_enqueue_pack(struct obd_export *exp,
654                                                int lvb_len)
655 {
656         struct ptlrpc_request *req;
657         int rc;
658
659         ENTRY;
660         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
661         if (req == NULL)
662                 RETURN(ERR_PTR(-ENOMEM));
663
664         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
665         if (rc) {
666                 ptlrpc_request_free(req);
667                 RETURN(ERR_PTR(rc));
668         }
669
670         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
671         ptlrpc_request_set_replen(req);
672         RETURN(req);
673 }
674
675 static int mdc_finish_enqueue(struct obd_export *exp,
676                               struct ptlrpc_request *req,
677                               struct ldlm_enqueue_info *einfo,
678                               struct lookup_intent *it,
679                               struct lustre_handle *lockh, int rc)
680 {
681         struct req_capsule *pill = &req->rq_pill;
682         struct ldlm_request *lockreq;
683         struct ldlm_reply *lockrep;
684         struct ldlm_lock *lock;
685         struct mdt_body *body = NULL;
686         void *lvb_data = NULL;
687         __u32 lvb_len = 0;
688
689         ENTRY;
690         LASSERT(rc >= 0);
691         /* Similarly, if we're going to replay this request, we don't want to
692          * actually get a lock, just perform the intent.
693          */
694         if (req->rq_transno || req->rq_replay) {
695                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
696                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
697         }
698
699         if (rc == ELDLM_LOCK_ABORTED) {
700                 einfo->ei_mode = 0;
701                 memset(lockh, 0, sizeof(*lockh));
702                 rc = 0;
703         } else { /* rc = 0 */
704                 lock = ldlm_handle2lock(lockh);
705                 LASSERT(lock != NULL);
706
707                 /* If server returned a different lock mode, fix up variables */
708                 if (lock->l_req_mode != einfo->ei_mode) {
709                         ldlm_lock_addref(lockh, lock->l_req_mode);
710                         ldlm_lock_decref(lockh, einfo->ei_mode);
711                         einfo->ei_mode = lock->l_req_mode;
712                 }
713                 LDLM_LOCK_PUT(lock);
714         }
715
716         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
717         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
718
719         it->it_disposition = (int)lockrep->lock_policy_res1;
720         it->it_status = (int)lockrep->lock_policy_res2;
721         it->it_lock_mode = einfo->ei_mode;
722         it->it_lock_handle = lockh->cookie;
723         it->it_request = req;
724
725         /* Technically speaking rq_transno must already be zero if
726          * it_status is in error, so the check is a bit redundant.
727          */
728         if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
729                 mdc_clear_replay_flag(req, it->it_status);
730
731         /* If we're doing an IT_OPEN which did not result in an actual
732          * successful open, then we need to remove the bit which saves
733          * this request for unconditional replay.
734          *
735          * It's important that we do this first!  Otherwise we might exit the
736          * function without doing so, and try to replay a failed create.
737          * (b=3440)
738          */
739         if (it->it_op & IT_OPEN && req->rq_replay &&
740             (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
741                 mdc_clear_replay_flag(req, it->it_status);
742
743         DEBUG_REQ(D_RPCTRACE, req, "op=%x disposition=%x, status=%d",
744                   it->it_op, it->it_disposition, it->it_status);
745
746         /* We know what to expect, so we do any byte flipping required here */
747         if (it_has_reply_body(it)) {
748                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
749                 if (body == NULL) {
750                         rc = -EPROTO;
751                         CERROR("%s: cannot swab mdt_body: rc = %d\n",
752                                exp->exp_obd->obd_name, rc);
753                         RETURN(rc);
754                 }
755
756                 if (it_disposition(it, DISP_OPEN_OPEN) &&
757                     !it_open_error(DISP_OPEN_OPEN, it)) {
758                         /*
759                          * If this is a successful OPEN request, we need to set
760                          * replay handler and data early, so that if replay
761                          * happens immediately after swabbing below, new reply
762                          * is swabbed by that handler correctly.
763                          */
764                         mdc_set_open_replay_data(NULL, NULL, it);
765                 }
766
767                 if (it_disposition(it, DISP_OPEN_CREATE) &&
768                     !it_open_error(DISP_OPEN_CREATE, it)) {
769                         lprocfs_counter_incr(exp->exp_obd->obd_md_stats,
770                                              LPROC_MD_CREATE);
771                 }
772
773                 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
774                         void *eadata;
775
776                         mdc_update_max_ea_from_body(exp, body);
777
778                         /*
779                          * The eadata is opaque; just check that it is there.
780                          * Eventually, obd_unpackmd() will check the contents.
781                          */
782                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
783                                                         body->mbo_eadatasize);
784                         if (eadata == NULL)
785                                 RETURN(-EPROTO);
786
787                         /* save LVB data and length if for layout lock */
788                         lvb_data = eadata;
789                         lvb_len = body->mbo_eadatasize;
790
791                         /*
792                          * We save the reply LOV EA in case we have to replay a
793                          * create for recovery.  If we didn't allocate a large
794                          * enough request buffer above we need to reallocate it
795                          * here to hold the actual LOV EA.
796                          *
797                          * To not save LOV EA if request is not going to replay
798                          * (for example error one).
799                          */
800                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
801                                 rc = mdc_save_lovea(req, eadata,
802                                                     body->mbo_eadatasize);
803                                 if (rc) {
804                                         body->mbo_valid &= ~OBD_MD_FLEASIZE;
805                                         body->mbo_eadatasize = 0;
806                                         rc = 0;
807                                 }
808                         }
809                 }
810         } else if (it->it_op & IT_LAYOUT) {
811                 /* maybe the lock was granted right away and layout
812                  * is packed into RMF_DLM_LVB of req
813                  */
814                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
815                 CDEBUG(D_INFO, "%s: layout return lvb %d transno %lld\n",
816                        class_exp2obd(exp)->obd_name, lvb_len, req->rq_transno);
817                 if (lvb_len > 0) {
818                         lvb_data = req_capsule_server_sized_get(pill,
819                                                         &RMF_DLM_LVB, lvb_len);
820                         if (lvb_data == NULL)
821                                 RETURN(-EPROTO);
822
823                         /**
824                          * save replied layout data to the request buffer for
825                          * recovery consideration (lest MDS reinitialize
826                          * another set of OST objects).
827                          */
828                         if (req->rq_transno)
829                                 (void)mdc_save_lovea(req, lvb_data, lvb_len);
830                 }
831         }
832
833         /* fill in stripe data for layout lock.
834          * LU-6581: trust layout data only if layout lock is granted. The MDT
835          * has stopped sending layout unless the layout lock is granted. The
836          * client still does this checking in case it's talking with an old
837          * server. - Jinshan
838          */
839         lock = ldlm_handle2lock(lockh);
840         if (lock == NULL)
841                 RETURN(rc);
842
843         if (ldlm_has_layout(lock) && lvb_data != NULL &&
844             !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
845                 void *lmm;
846
847                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
848                         ldlm_it2str(it->it_op), lvb_len);
849
850                 OBD_ALLOC_LARGE(lmm, lvb_len);
851                 if (lmm == NULL)
852                         GOTO(out_lock, rc = -ENOMEM);
853
854                 memcpy(lmm, lvb_data, lvb_len);
855
856                 /* install lvb_data */
857                 lock_res_and_lock(lock);
858                 if (lock->l_lvb_data == NULL) {
859                         lock->l_lvb_type = LVB_T_LAYOUT;
860                         lock->l_lvb_data = lmm;
861                         lock->l_lvb_len = lvb_len;
862                         lmm = NULL;
863                 }
864                 unlock_res_and_lock(lock);
865                 if (lmm != NULL)
866                         OBD_FREE_LARGE(lmm, lvb_len);
867         }
868
869         if (ldlm_has_dom(lock)) {
870                 LASSERT(lock->l_glimpse_ast == mdc_ldlm_glimpse_ast);
871
872                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
873                 if (!(body->mbo_valid & OBD_MD_DOM_SIZE)) {
874                         LDLM_ERROR(lock, "%s: DoM lock without size.",
875                                    exp->exp_obd->obd_name);
876                         GOTO(out_lock, rc = -EPROTO);
877                 }
878
879                 LDLM_DEBUG(lock, "DoM lock is returned by: %s, size: %llu",
880                            ldlm_it2str(it->it_op), body->mbo_dom_size);
881
882                 lock_res_and_lock(lock);
883                 mdc_body2lvb(body, &lock->l_ost_lvb);
884                 ldlm_lock_allow_match_locked(lock);
885                 unlock_res_and_lock(lock);
886         }
887 out_lock:
888         LDLM_LOCK_PUT(lock);
889
890         RETURN(rc);
891 }
892
893 static inline bool mdc_skip_mod_rpc_slot(const struct lookup_intent *it)
894 {
895         if (it != NULL &&
896             (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
897              it->it_op == IT_READDIR ||
898              (it->it_op == IT_LAYOUT && !(it->it_flags & MDS_FMODE_WRITE))))
899                 return true;
900         return false;
901 }
902
903 /* We always reserve enough space in the reply packet for a stripe MD, because
904  * we don't know in advance the file type.
905  */
906 static int mdc_enqueue_base(struct obd_export *exp,
907                             struct ldlm_enqueue_info *einfo,
908                             const union ldlm_policy_data *policy,
909                             struct lookup_intent *it,
910                             struct md_op_data *op_data,
911                             struct lustre_handle *lockh,
912                             __u64 extra_lock_flags)
913 {
914         struct obd_device *obd = class_exp2obd(exp);
915         struct ptlrpc_request *req;
916         __u64 flags, saved_flags = extra_lock_flags;
917         struct ldlm_res_id res_id;
918         static const union ldlm_policy_data lookup_policy = {
919                                   .l_inodebits = { MDS_INODELOCK_LOOKUP } };
920         static const union ldlm_policy_data update_policy = {
921                                   .l_inodebits = { MDS_INODELOCK_UPDATE } };
922         static const union ldlm_policy_data layout_policy = {
923                                   .l_inodebits = { MDS_INODELOCK_LAYOUT } };
924         static const union ldlm_policy_data getxattr_policy = {
925                                   .l_inodebits = { MDS_INODELOCK_XATTR } };
926         int generation, resends = 0;
927         struct ldlm_reply *lockrep;
928         struct obd_import *imp = class_exp2cliimp(exp);
929         __u32 acl_bufsize;
930         enum lvb_type lvb_type = 0;
931         int rc;
932
933         ENTRY;
934         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
935                  einfo->ei_type);
936         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
937
938         if (it != NULL) {
939                 LASSERT(policy == NULL);
940
941                 saved_flags |= LDLM_FL_HAS_INTENT;
942                 if (it->it_op & (IT_GETATTR | IT_READDIR))
943                         policy = &update_policy;
944                 else if (it->it_op & IT_LAYOUT)
945                         policy = &layout_policy;
946                 else if (it->it_op & IT_GETXATTR)
947                         policy = &getxattr_policy;
948                 else
949                         policy = &lookup_policy;
950         }
951
952         generation = obd->u.cli.cl_import->imp_generation;
953         if (!it || (it->it_op & (IT_OPEN | IT_CREAT)))
954                 acl_bufsize = min_t(__u32, imp->imp_connect_data.ocd_max_easize,
955                                     XATTR_SIZE_MAX);
956         else
957                 acl_bufsize = LUSTRE_POSIX_ACL_MAX_SIZE_OLD;
958
959 resend:
960         flags = saved_flags;
961         if (it == NULL) {
962                 /* The only way right now is FLOCK. */
963                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
964                          einfo->ei_type);
965                 res_id.name[3] = LDLM_FLOCK;
966                 req = ldlm_enqueue_pack(exp, 0);
967         } else if (it->it_op & IT_OPEN) {
968                 req = mdc_intent_open_pack(exp, it, op_data, acl_bufsize);
969         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
970                 req = mdc_intent_getattr_pack(exp, it, op_data, acl_bufsize);
971         } else if (it->it_op & IT_READDIR) {
972                 req = mdc_enqueue_pack(exp, 0);
973         } else if (it->it_op & IT_LAYOUT) {
974                 if (!imp_connect_lvb_type(imp))
975                         RETURN(-EOPNOTSUPP);
976                 req = mdc_intent_layout_pack(exp, it, op_data);
977                 lvb_type = LVB_T_LAYOUT;
978         } else if (it->it_op & IT_GETXATTR) {
979                 req = mdc_intent_getxattr_pack(exp, it, op_data);
980         } else {
981                 LBUG();
982                 RETURN(-EINVAL);
983         }
984
985         if (IS_ERR(req))
986                 RETURN(PTR_ERR(req));
987
988         if (resends) {
989                 req->rq_generation_set = 1;
990                 req->rq_import_generation = generation;
991                 req->rq_sent = ktime_get_real_seconds() + resends;
992         }
993
994         einfo->ei_enq_slot = !mdc_skip_mod_rpc_slot(it);
995
996         /* With Data-on-MDT the glimpse callback is needed too.
997          * It is set here in advance but not in mdc_finish_enqueue()
998          * to avoid possible races. It is safe to have glimpse handler
999          * for non-DOM locks and costs nothing.
1000          */
1001         if (einfo->ei_cb_gl == NULL)
1002                 einfo->ei_cb_gl = mdc_ldlm_glimpse_ast;
1003
1004         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
1005                               0, lvb_type, lockh, 0);
1006
1007         if (!it) {
1008                 /* For flock requests we immediatelly return without further
1009                  * delay and let caller deal with the rest, since rest of
1010                  * this function metadata processing makes no sense for flock
1011                  * requests anyway. But in case of problem during comms with
1012                  * server (-ETIMEDOUT) or any signal/kill attempt (-EINTR),
1013                  * we cannot rely on caller and this mainly for F_UNLCKs
1014                  * (explicits or automatically generated by kernel to clean
1015                  * current flocks upon exit) that can't be trashed.
1016                  */
1017                 ptlrpc_req_finished(req);
1018                 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
1019                     (einfo->ei_type == LDLM_FLOCK) &&
1020                     (einfo->ei_mode == LCK_NL))
1021                         goto resend;
1022                 RETURN(rc);
1023         }
1024
1025         if (rc < 0) {
1026                 CDEBUG(D_INFO,
1027                       "%s: ldlm_cli_enqueue "DFID":"DFID"=%s failed: rc = %d\n",
1028                       obd->obd_name, PFID(&op_data->op_fid1),
1029                       PFID(&op_data->op_fid2), op_data->op_name ?: "", rc);
1030
1031                 mdc_clear_replay_flag(req, rc);
1032                 ptlrpc_req_finished(req);
1033                 RETURN(rc);
1034         }
1035
1036         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1037         LASSERT(lockrep != NULL);
1038
1039         lockrep->lock_policy_res2 =
1040                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1041
1042         /* Retry infinitely when the server returns -EINPROGRESS for the
1043          * intent operation, when server returns -EINPROGRESS for acquiring
1044          * intent lock, we'll retry in after_reply().
1045          */
1046         if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
1047                 mdc_clear_replay_flag(req, rc);
1048                 ptlrpc_req_finished(req);
1049                 if (generation == obd->u.cli.cl_import->imp_generation) {
1050                         if (signal_pending(current))
1051                                 RETURN(-EINTR);
1052
1053                         resends++;
1054                         CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
1055                                obd->obd_name, resends, it->it_op,
1056                                PFID(&op_data->op_fid1),
1057                                PFID(&op_data->op_fid2));
1058                         goto resend;
1059                 } else {
1060                         CDEBUG(D_HA, "resend cross eviction\n");
1061                         RETURN(-EIO);
1062                 }
1063         }
1064
1065         if ((int)lockrep->lock_policy_res2 == -ERANGE &&
1066             it->it_op & (IT_OPEN | IT_GETATTR | IT_LOOKUP) &&
1067             acl_bufsize == LUSTRE_POSIX_ACL_MAX_SIZE_OLD) {
1068                 mdc_clear_replay_flag(req, -ERANGE);
1069                 ptlrpc_req_finished(req);
1070                 acl_bufsize = min_t(__u32, imp->imp_connect_data.ocd_max_easize,
1071                                     XATTR_SIZE_MAX);
1072                 goto resend;
1073         }
1074
1075         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1076         if (rc < 0) {
1077                 if (lustre_handle_is_used(lockh)) {
1078                         ldlm_lock_decref(lockh, einfo->ei_mode);
1079                         memset(lockh, 0, sizeof(*lockh));
1080                 }
1081                 ptlrpc_req_finished(req);
1082
1083                 it->it_lock_handle = 0;
1084                 it->it_lock_mode = 0;
1085                 it->it_request = NULL;
1086         }
1087
1088         RETURN(rc);
1089 }
1090
1091 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
1092                 const union ldlm_policy_data *policy,
1093                 struct md_op_data *op_data,
1094                 struct lustre_handle *lockh, __u64 extra_lock_flags)
1095 {
1096         return mdc_enqueue_base(exp, einfo, policy, NULL,
1097                                 op_data, lockh, extra_lock_flags);
1098 }
1099
1100 static int mdc_finish_intent_lock(struct obd_export *exp,
1101                                   struct ptlrpc_request *request,
1102                                   struct md_op_data *op_data,
1103                                   struct lookup_intent *it,
1104                                   struct lustre_handle *lockh)
1105 {
1106         struct lustre_handle old_lock;
1107         struct ldlm_lock *lock;
1108         int rc = 0;
1109
1110         ENTRY;
1111         LASSERT(request != NULL);
1112         LASSERT(request != LP_POISON);
1113         LASSERT(request->rq_repmsg != LP_POISON);
1114
1115         if (it->it_op & IT_READDIR)
1116                 RETURN(0);
1117
1118         if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) {
1119                 if (it->it_status != 0)
1120                         GOTO(out, rc = it->it_status);
1121         } else {
1122                 if (!it_disposition(it, DISP_IT_EXECD)) {
1123                         /* The server failed before it even started executing
1124                          * the intent, i.e. because it couldn't unpack the
1125                          * request.
1126                          */
1127                         LASSERT(it->it_status != 0);
1128                         GOTO(out, rc = it->it_status);
1129                 }
1130                 rc = it_open_error(DISP_IT_EXECD, it);
1131                 if (rc)
1132                         GOTO(out, rc);
1133
1134                 rc = it_open_error(DISP_LOOKUP_EXECD, it);
1135                 if (rc)
1136                         GOTO(out, rc);
1137
1138                 /* keep requests around for the multiple phases of the call
1139                  * this shows the DISP_XX must guarantee we make it into the
1140                  * call
1141                  */
1142                 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
1143                     it_disposition(it, DISP_OPEN_CREATE) &&
1144                     !it_open_error(DISP_OPEN_CREATE, it)) {
1145                         it_set_disposition(it, DISP_ENQ_CREATE_REF);
1146                         /* balanced in ll_create_node */
1147                         ptlrpc_request_addref(request);
1148                 }
1149                 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1150                     it_disposition(it, DISP_OPEN_OPEN) &&
1151                     !it_open_error(DISP_OPEN_OPEN, it)) {
1152                         it_set_disposition(it, DISP_ENQ_OPEN_REF);
1153                         /* balanced in ll_file_open */
1154                         ptlrpc_request_addref(request);
1155                         /* eviction in middle of open RPC processing b=11546 */
1156                         OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE,
1157                                          obd_timeout);
1158                 }
1159
1160                 if (it->it_op & IT_CREAT) {
1161                         /* XXX this belongs in ll_create_it */
1162                 } else if (it->it_op == IT_OPEN) {
1163                         LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1164                 } else {
1165                         LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
1166                 }
1167         }
1168
1169         /* If we already have a matching lock, then cancel the new
1170          * one.  We have to set the data here instead of in
1171          * mdc_enqueue, because we need to use the child's inode as
1172          * the l_ast_data to match, and that's not available until
1173          * intent_finish has performed the iget().
1174          */
1175         lock = ldlm_handle2lock(lockh);
1176         if (lock) {
1177                 union ldlm_policy_data policy = lock->l_policy_data;
1178
1179                 LDLM_DEBUG(lock, "matching against this");
1180
1181                 if (it_has_reply_body(it)) {
1182                         struct mdt_body *body;
1183
1184                         body = req_capsule_server_get(&request->rq_pill,
1185                                                       &RMF_MDT_BODY);
1186                         /* mdc_enqueue checked */
1187                         LASSERT(body != NULL);
1188                         LASSERTF(fid_res_name_eq(&body->mbo_fid1,
1189                                                  &lock->l_resource->lr_name),
1190                                  "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1191                                  PLDLMRES(lock->l_resource),
1192                                  PFID(&body->mbo_fid1));
1193                 }
1194                 LDLM_LOCK_PUT(lock);
1195
1196                 memcpy(&old_lock, lockh, sizeof(*lockh));
1197                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1198                                    LDLM_IBITS, &policy, LCK_NL, &old_lock)) {
1199                         ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
1200                         memcpy(lockh, &old_lock, sizeof(old_lock));
1201                         it->it_lock_handle = lockh->cookie;
1202                 }
1203         }
1204
1205         EXIT;
1206 out:
1207         CDEBUG(D_DENTRY,
1208                "D_IT dentry=%.*s intent=%s status=%d disp=%x: rc = %d\n",
1209                 (int)op_data->op_namelen, op_data->op_name,
1210                 ldlm_it2str(it->it_op), it->it_status, it->it_disposition, rc);
1211
1212         return rc;
1213 }
1214
1215 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1216                         struct lu_fid *fid, __u64 *bits)
1217 {
1218         /* We could just return 1 immediately, but as we should only be called
1219          * in revalidate_it if we already have a lock, let's verify that.
1220          */
1221         struct ldlm_res_id res_id;
1222         struct lustre_handle lockh;
1223         union ldlm_policy_data policy;
1224         enum ldlm_mode mode;
1225
1226         ENTRY;
1227         if (it->it_lock_handle) {
1228                 lockh.cookie = it->it_lock_handle;
1229                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1230         } else {
1231                 fid_build_reg_res_name(fid, &res_id);
1232                 switch (it->it_op) {
1233                 case IT_GETATTR:
1234                         /* File attributes are held under multiple bits:
1235                          * nlink is under lookup lock, size and times are
1236                          * under UPDATE lock and recently we've also got
1237                          * a separate permissions lock for owner/group/acl that
1238                          * were protected by lookup lock before.
1239                          * Getattr must provide all of that information,
1240                          * so we need to ensure we have all of those locks.
1241                          * Unfortunately, if the bits are split across multiple
1242                          * locks, there's no easy way to match all of them here,
1243                          * so an extra RPC would be performed to fetch all
1244                          * of those bits at once for now.
1245                          */
1246                         /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1247                          * but for old MDTs (< 2.4), permission is covered
1248                          * by LOOKUP lock, so it needs to match all bits here.
1249                          */
1250                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1251                                                   MDS_INODELOCK_PERM;
1252                         break;
1253                 case IT_READDIR:
1254                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1255                         break;
1256                 case IT_LAYOUT:
1257                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1258                         break;
1259                 default:
1260                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1261                         break;
1262                 }
1263
1264                 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1265                                       LDLM_IBITS, &policy,
1266                                       LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1267                                       &lockh);
1268         }
1269
1270         if (mode) {
1271                 it->it_lock_handle = lockh.cookie;
1272                 it->it_lock_mode = mode;
1273         } else {
1274                 it->it_lock_handle = 0;
1275                 it->it_lock_mode = 0;
1276         }
1277
1278         RETURN(!!mode);
1279 }
1280
1281 /*
1282  * This long block is all about fixing up the lock and request state
1283  * so that it is correct as of the moment _before_ the operation was
1284  * applied; that way, the VFS will think that everything is normal and
1285  * call Lustre's regular VFS methods.
1286  *
1287  * If we're performing a creation, that means that unless the creation
1288  * failed with EEXIST, we should fake up a negative dentry.
1289  *
1290  * For everything else, we want to lookup to succeed.
1291  *
1292  * One additional note: if CREATE or OPEN succeeded, we add an extra
1293  * reference to the request because we need to keep it around until
1294  * ll_create/ll_open gets called.
1295  *
1296  * The server will return to us, in it_disposition, an indication of
1297  * exactly what it_status refers to.
1298  *
1299  * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1300  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1301  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1302  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1303  * was successful.
1304  *
1305  * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1306  * child lookup.
1307  */
1308 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1309                     struct lookup_intent *it, struct ptlrpc_request **reqp,
1310                     ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1311 {
1312         struct ldlm_enqueue_info einfo = {
1313                 .ei_type        = LDLM_IBITS,
1314                 .ei_mode        = it_to_lock_mode(it),
1315                 .ei_cb_bl       = cb_blocking,
1316                 .ei_cb_cp       = ldlm_completion_ast,
1317                 .ei_cb_gl       = mdc_ldlm_glimpse_ast,
1318         };
1319         struct lustre_handle lockh;
1320         int rc = 0;
1321
1322         ENTRY;
1323         LASSERT(it);
1324         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1325                 ", intent: %s flags %#llo\n", (int)op_data->op_namelen,
1326                 op_data->op_name, PFID(&op_data->op_fid2),
1327                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1328                 it->it_flags);
1329
1330         lockh.cookie = 0;
1331         if (fid_is_sane(&op_data->op_fid2) &&
1332             (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1333                 /* We could just return 1 immediately, but since we should only
1334                  * be called in revalidate_it if we already have a lock, let's
1335                  * verify that.
1336                  */
1337                 it->it_lock_handle = 0;
1338                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1339                 /* Only return failure if it was not GETATTR by cfid
1340                  * (from inode_revalidate()).
1341                  */
1342                 if (rc || op_data->op_namelen != 0)
1343                         RETURN(rc);
1344         }
1345
1346         /* For case if upper layer did not alloc fid, do it now. */
1347         if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1348                 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1349                 if (rc < 0) {
1350                         CERROR("%s: cannot allocate new FID: rc=%d\n",
1351                                exp->exp_obd->obd_name, rc);
1352                         RETURN(rc);
1353                 }
1354         }
1355
1356         rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh,
1357                               extra_lock_flags);
1358         if (rc < 0)
1359                 RETURN(rc);
1360
1361         *reqp = it->it_request;
1362         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1363         RETURN(rc);
1364 }
1365
1366 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1367                                               struct ptlrpc_request *req,
1368                                               void *args, int rc)
1369 {
1370         struct mdc_getattr_args *ga = args;
1371         struct obd_export *exp = ga->ga_exp;
1372         struct md_enqueue_info *minfo = ga->ga_minfo;
1373         struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1374         struct lookup_intent *it = &minfo->mi_it;
1375         struct lustre_handle *lockh = &minfo->mi_lockh;
1376         struct ldlm_reply *lockrep;
1377         __u64 flags = LDLM_FL_HAS_INTENT;
1378
1379         ENTRY;
1380         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1381                 rc = -ETIMEDOUT;
1382
1383         rc = ldlm_cli_enqueue_fini(exp, req, einfo, 1, &flags, NULL, 0,
1384                                    lockh, rc);
1385         if (rc < 0) {
1386                 CERROR("%s: ldlm_cli_enqueue_fini() failed: rc = %d\n",
1387                        exp->exp_obd->obd_name, rc);
1388                 mdc_clear_replay_flag(req, rc);
1389                 GOTO(out, rc);
1390         }
1391
1392         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1393         LASSERT(lockrep != NULL);
1394
1395         lockrep->lock_policy_res2 =
1396                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1397
1398         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1399         if (rc)
1400                 GOTO(out, rc);
1401
1402         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1403         EXIT;
1404
1405 out:
1406         minfo->mi_cb(req, minfo, rc);
1407         return 0;
1408 }
1409
1410 int mdc_intent_getattr_async(struct obd_export *exp,
1411                              struct md_enqueue_info *minfo)
1412 {
1413         struct md_op_data *op_data = &minfo->mi_data;
1414         struct lookup_intent *it = &minfo->mi_it;
1415         struct ptlrpc_request *req;
1416         struct mdc_getattr_args *ga;
1417         struct ldlm_res_id res_id;
1418         union ldlm_policy_data policy = {
1419                 .l_inodebits = { MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE }
1420         };
1421         __u64 flags = LDLM_FL_HAS_INTENT;
1422         int rc = 0;
1423
1424         ENTRY;
1425         CDEBUG(D_DLMTRACE,
1426                "name: %.*s in inode "DFID", intent: %s flags %#llo\n",
1427                (int)op_data->op_namelen, op_data->op_name,
1428                PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1429
1430         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1431         /* If the MDT return -ERANGE because of large ACL, then the sponsor
1432          * of the async getattr RPC will handle that by itself.
1433          */
1434         req = mdc_intent_getattr_pack(exp, it, op_data,
1435                                       LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
1436         if (IS_ERR(req))
1437                 RETURN(PTR_ERR(req));
1438
1439         /* With Data-on-MDT the glimpse callback is needed too.
1440          * It is set here in advance but not in mdc_finish_enqueue()
1441          * to avoid possible races. It is safe to have glimpse handler
1442          * for non-DOM locks and costs nothing.
1443          */
1444         if (minfo->mi_einfo.ei_cb_gl == NULL)
1445                 minfo->mi_einfo.ei_cb_gl = mdc_ldlm_glimpse_ast;
1446
1447         rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1448                               &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1449         if (rc < 0) {
1450                 ptlrpc_req_finished(req);
1451                 RETURN(rc);
1452         }
1453
1454         ga = ptlrpc_req_async_args(ga, req);
1455         ga->ga_exp = exp;
1456         ga->ga_minfo = minfo;
1457
1458         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1459         ptlrpcd_add_req(req);
1460
1461         RETURN(0);
1462 }