Whamcloud - gitweb
LU-9193 security: return security context for metadata ops
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_MDC
34
35 #include <linux/module.h>
36
37 #include <obd.h>
38 #include <obd_class.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_intent.h>
42 #include <lustre_mdc.h>
43 #include <lustre_net.h>
44 #include <lustre_req_layout.h>
45 #include <lustre_swab.h>
46 #include <lustre_acl.h>
47
48 #include "mdc_internal.h"
49
50 struct mdc_getattr_args {
51         struct obd_export               *ga_exp;
52         struct md_enqueue_info          *ga_minfo;
53 };
54
55 int it_open_error(int phase, struct lookup_intent *it)
56 {
57         if (it_disposition(it, DISP_OPEN_LEASE)) {
58                 if (phase >= DISP_OPEN_LEASE)
59                         return it->it_status;
60                 else
61                         return 0;
62         }
63         if (it_disposition(it, DISP_OPEN_OPEN)) {
64                 if (phase >= DISP_OPEN_OPEN)
65                         return it->it_status;
66                 else
67                         return 0;
68         }
69
70         if (it_disposition(it, DISP_OPEN_CREATE)) {
71                 if (phase >= DISP_OPEN_CREATE)
72                         return it->it_status;
73                 else
74                         return 0;
75         }
76
77         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
78                 if (phase >= DISP_LOOKUP_EXECD)
79                         return it->it_status;
80                 else
81                         return 0;
82         }
83
84         if (it_disposition(it, DISP_IT_EXECD)) {
85                 if (phase >= DISP_IT_EXECD)
86                         return it->it_status;
87                 else
88                         return 0;
89         }
90
91         CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
92         LBUG();
93
94         return 0;
95 }
96 EXPORT_SYMBOL(it_open_error);
97
98 /* this must be called on a lockh that is known to have a referenced lock */
99 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
100                       void *data, __u64 *bits)
101 {
102         struct ldlm_lock *lock;
103         struct inode *new_inode = data;
104         ENTRY;
105
106         if(bits)
107                 *bits = 0;
108
109         if (!lustre_handle_is_used(lockh))
110                 RETURN(0);
111
112         lock = ldlm_handle2lock(lockh);
113
114         LASSERT(lock != NULL);
115         lock_res_and_lock(lock);
116         if (lock->l_resource->lr_lvb_inode &&
117             lock->l_resource->lr_lvb_inode != data) {
118                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
119                 LASSERTF(old_inode->i_state & I_FREEING,
120                          "Found existing inode %p/%lu/%u state %lu in lock: "
121                          "setting data to %p/%lu/%u\n", old_inode,
122                          old_inode->i_ino, old_inode->i_generation,
123                          old_inode->i_state,
124                          new_inode, new_inode->i_ino, new_inode->i_generation);
125         }
126         lock->l_resource->lr_lvb_inode = new_inode;
127         if (bits)
128                 *bits = lock->l_policy_data.l_inodebits.bits;
129
130         unlock_res_and_lock(lock);
131         LDLM_LOCK_PUT(lock);
132
133         RETURN(0);
134 }
135
136 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
137                               const struct lu_fid *fid, enum ldlm_type type,
138                               union ldlm_policy_data *policy,
139                               enum ldlm_mode mode, struct lustre_handle *lockh)
140 {
141         struct ldlm_res_id res_id;
142         enum ldlm_mode rc;
143         ENTRY;
144
145         fid_build_reg_res_name(fid, &res_id);
146         /* LU-4405: Clear bits not supported by server */
147         policy->l_inodebits.bits &= exp_connect_ibits(exp);
148         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
149                              &res_id, type, policy, mode, lockh, 0);
150         RETURN(rc);
151 }
152
153 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
154                       union ldlm_policy_data *policy, enum ldlm_mode mode,
155                       enum ldlm_cancel_flags flags, void *opaque)
156 {
157         struct obd_device *obd = class_exp2obd(exp);
158         struct ldlm_res_id res_id;
159         int rc;
160
161         ENTRY;
162
163         fid_build_reg_res_name(fid, &res_id);
164         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
165                                              policy, mode, flags, opaque);
166         RETURN(rc);
167 }
168
169 int mdc_null_inode(struct obd_export *exp,
170                    const struct lu_fid *fid)
171 {
172         struct ldlm_res_id res_id;
173         struct ldlm_resource *res;
174         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
175         ENTRY;
176
177         LASSERTF(ns != NULL, "no namespace passed\n");
178
179         fid_build_reg_res_name(fid, &res_id);
180
181         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
182         if (IS_ERR(res))
183                 RETURN(0);
184
185         lock_res(res);
186         res->lr_lvb_inode = NULL;
187         unlock_res(res);
188
189         ldlm_resource_putref(res);
190         RETURN(0);
191 }
192
193 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
194 {
195         /* Don't hold error requests for replay. */
196         if (req->rq_replay) {
197                 spin_lock(&req->rq_lock);
198                 req->rq_replay = 0;
199                 spin_unlock(&req->rq_lock);
200         }
201         if (rc && req->rq_transno != 0) {
202                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
203                 LBUG();
204         }
205 }
206
207 /* Save a large LOV EA into the request buffer so that it is available
208  * for replay.  We don't do this in the initial request because the
209  * original request doesn't need this buffer (at most it sends just the
210  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
211  * buffer and may also be difficult to allocate and save a very large
212  * request buffer for each open. (bug 5707)
213  *
214  * OOM here may cause recovery failure if lmm is needed (only for the
215  * original open if the MDS crashed just when this client also OOM'd)
216  * but this is incredibly unlikely, and questionable whether the client
217  * could do MDS recovery under OOM anyways... */
218 int mdc_save_lovea(struct ptlrpc_request *req,
219                    const struct req_msg_field *field,
220                    void *data, u32 size)
221 {
222         struct req_capsule *pill = &req->rq_pill;
223         void *lmm;
224         int rc = 0;
225
226         if (req_capsule_get_size(pill, field, RCL_CLIENT) < size) {
227                 rc = sptlrpc_cli_enlarge_reqbuf(req, field, size);
228                 if (rc) {
229                         CERROR("%s: Can't enlarge ea size to %d: rc = %d\n",
230                                req->rq_export->exp_obd->obd_name,
231                                size, rc);
232                         return rc;
233                 }
234         } else {
235                 req_capsule_shrink(pill, field, size, RCL_CLIENT);
236         }
237
238         req_capsule_set_size(pill, field, RCL_CLIENT, size);
239         lmm = req_capsule_client_get(pill, field);
240         if (lmm)
241                 memcpy(lmm, data, size);
242
243         return rc;
244 }
245
246 static struct ptlrpc_request *
247 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
248                      struct md_op_data *op_data, __u32 acl_bufsize)
249 {
250         struct ptlrpc_request   *req;
251         struct obd_device       *obddev = class_exp2obd(exp);
252         struct ldlm_intent      *lit;
253         const void              *lmm = op_data->op_data;
254         __u32                    lmmsize = op_data->op_data_size;
255         struct list_head         cancels = LIST_HEAD_INIT(cancels);
256         int                      count = 0;
257         enum ldlm_mode           mode;
258         int                      rc;
259         int repsize, repsize_estimate;
260
261         ENTRY;
262
263         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
264
265         /* XXX: openlock is not cancelled for cross-refs. */
266         /* If inode is known, cancel conflicting OPEN locks. */
267         if (fid_is_sane(&op_data->op_fid2)) {
268                 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
269                         if (it->it_flags & MDS_FMODE_WRITE)
270                                 mode = LCK_EX;
271                         else
272                                 mode = LCK_PR;
273                 } else {
274                         if (it->it_flags & (MDS_FMODE_WRITE | MDS_OPEN_TRUNC))
275                                 mode = LCK_CW;
276 #ifdef FMODE_EXEC
277                         else if (it->it_flags & FMODE_EXEC)
278                                 mode = LCK_PR;
279 #endif
280                         else
281                                 mode = LCK_CR;
282                 }
283                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
284                                                 &cancels, mode,
285                                                 MDS_INODELOCK_OPEN);
286         }
287
288         /* If CREATE, cancel parent's UPDATE lock. */
289         if (it->it_op & IT_CREAT)
290                 mode = LCK_EX;
291         else
292                 mode = LCK_CR;
293         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
294                                          &cancels, mode,
295                                          MDS_INODELOCK_UPDATE);
296
297         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
298                                    &RQF_LDLM_INTENT_OPEN);
299         if (req == NULL) {
300                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
301                 RETURN(ERR_PTR(-ENOMEM));
302         }
303
304         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
305                              op_data->op_namelen + 1);
306         if (cl_is_lov_delay_create(it->it_flags)) {
307                 /* open(O_LOV_DELAY_CREATE) won't pack lmm */
308                 LASSERT(lmmsize == 0);
309                 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
310         } else {
311                 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
312                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
313         }
314
315         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
316                              RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
317                              op_data->op_file_secctx_name_size : 0);
318
319         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
320                              op_data->op_file_secctx_size);
321
322         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
323         if (rc < 0) {
324                 ptlrpc_request_free(req);
325                 RETURN(ERR_PTR(rc));
326         }
327
328         spin_lock(&req->rq_lock);
329         req->rq_replay = req->rq_import->imp_replayable;
330         spin_unlock(&req->rq_lock);
331
332         /* pack the intent */
333         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
334         lit->opc = (__u64)it->it_op;
335
336         /* pack the intended request */
337         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
338                       lmmsize);
339
340         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
341                              obddev->u.cli.cl_max_mds_easize);
342         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
343
344         if (!(it->it_op & IT_CREAT) && it->it_op & IT_OPEN &&
345             req_capsule_has_field(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
346                                   RCL_CLIENT) &&
347             op_data->op_file_secctx_name_size > 0 &&
348             op_data->op_file_secctx_name != NULL) {
349                 char *secctx_name;
350
351                 secctx_name = req_capsule_client_get(&req->rq_pill,
352                                                      &RMF_FILE_SECCTX_NAME);
353                 memcpy(secctx_name, op_data->op_file_secctx_name,
354                        op_data->op_file_secctx_name_size);
355                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
356                                      RCL_SERVER,
357                                      obddev->u.cli.cl_max_mds_easize);
358
359                 CDEBUG(D_SEC, "packed '%.*s' as security xattr name\n",
360                        op_data->op_file_secctx_name_size,
361                        op_data->op_file_secctx_name);
362
363         } else {
364                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
365                                      RCL_SERVER, 0);
366         }
367
368         /**
369          * Inline buffer for possible data from Data-on-MDT files.
370          */
371         req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE, RCL_SERVER,
372                              sizeof(struct niobuf_remote));
373         ptlrpc_request_set_replen(req);
374
375         /* Get real repbuf allocated size as rounded up power of 2 */
376         repsize = size_roundup_power2(req->rq_replen +
377                                       lustre_msg_early_size());
378         /* Estimate free space for DoM files in repbuf */
379         repsize_estimate = repsize - (req->rq_replen -
380                            obddev->u.cli.cl_max_mds_easize +
381                            sizeof(struct lov_comp_md_v1) +
382                            sizeof(struct lov_comp_md_entry_v1) +
383                            lov_mds_md_size(0, LOV_MAGIC_V3));
384
385         if (repsize_estimate < obddev->u.cli.cl_dom_min_inline_repsize) {
386                 repsize = obddev->u.cli.cl_dom_min_inline_repsize -
387                           repsize_estimate + sizeof(struct niobuf_remote);
388                 req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE,
389                                      RCL_SERVER,
390                                      sizeof(struct niobuf_remote) + repsize);
391                 ptlrpc_request_set_replen(req);
392                 CDEBUG(D_INFO, "Increase repbuf by %d bytes, total: %d\n",
393                        repsize, req->rq_replen);
394                 repsize = size_roundup_power2(req->rq_replen +
395                                               lustre_msg_early_size());
396         }
397         /* The only way to report real allocated repbuf size to the server
398          * is the lm_repsize but it must be set prior buffer allocation itself
399          * due to security reasons - it is part of buffer used in signature
400          * calculation (see LU-11414). Therefore the saved size is predicted
401          * value as rq_replen rounded to the next higher power of 2.
402          * Such estimation is safe. Though the final allocated buffer might
403          * be even larger, it is not possible to know that at this point.
404          */
405         req->rq_reqmsg->lm_repsize = repsize;
406         RETURN(req);
407 }
408
409 #define GA_DEFAULT_EA_NAME_LEN 20
410 #define GA_DEFAULT_EA_VAL_LEN  250
411 #define GA_DEFAULT_EA_NUM      10
412
413 static struct ptlrpc_request *
414 mdc_intent_getxattr_pack(struct obd_export *exp,
415                          struct lookup_intent *it,
416                          struct md_op_data *op_data)
417 {
418         struct ptlrpc_request   *req;
419         struct ldlm_intent      *lit;
420         int                     rc, count = 0;
421         struct list_head        cancels = LIST_HEAD_INIT(cancels);
422         u32 ea_vals_buf_size = GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM;
423
424         ENTRY;
425
426         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
427                                         &RQF_LDLM_INTENT_GETXATTR);
428         if (req == NULL)
429                 RETURN(ERR_PTR(-ENOMEM));
430
431         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
432         if (rc) {
433                 ptlrpc_request_free(req);
434                 RETURN(ERR_PTR(rc));
435         }
436
437         /* pack the intent */
438         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
439         lit->opc = IT_GETXATTR;
440         CDEBUG(D_INFO, "%s: get xattrs for "DFID"\n",
441                exp->exp_obd->obd_name, PFID(&op_data->op_fid1));
442
443 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 0, 53, 0)
444         /* If the supplied buffer is too small then the server will
445          * return -ERANGE and llite will fallback to using non cached
446          * xattr operations. On servers before 2.10.1 a (non-cached)
447          * listxattr RPC for an orphan or dead file causes an oops. So
448          * let's try to avoid sending too small a buffer to too old a
449          * server. This is effectively undoing the memory conservation
450          * of LU-9417 when it would be *more* likely to crash the
451          * server. See LU-9856. */
452         if (exp->exp_connect_data.ocd_version < OBD_OCD_VERSION(2, 10, 1, 0))
453                 ea_vals_buf_size = max_t(u32, ea_vals_buf_size,
454                                          exp->exp_connect_data.ocd_max_easize);
455 #endif
456
457         /* pack the intended request */
458         mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid,
459                       ea_vals_buf_size, -1, 0);
460
461         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
462                              GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM);
463
464         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER,
465                              ea_vals_buf_size);
466
467         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS, RCL_SERVER,
468                              sizeof(u32) * GA_DEFAULT_EA_NUM);
469
470         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
471
472         ptlrpc_request_set_replen(req);
473
474         RETURN(req);
475 }
476
477 static struct ptlrpc_request *
478 mdc_intent_getattr_pack(struct obd_export *exp, struct lookup_intent *it,
479                         struct md_op_data *op_data, __u32 acl_bufsize)
480 {
481         struct ptlrpc_request   *req;
482         struct obd_device       *obddev = class_exp2obd(exp);
483         u64                      valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
484                                          OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
485                                          OBD_MD_MEA | OBD_MD_FLACL;
486         struct ldlm_intent      *lit;
487         int                      rc;
488         __u32                    easize;
489         bool                     have_secctx = false;
490         ENTRY;
491
492         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
493                                    &RQF_LDLM_INTENT_GETATTR);
494         if (req == NULL)
495                 RETURN(ERR_PTR(-ENOMEM));
496
497         /* send name of security xattr to get upon intent */
498         if (it->it_op & (IT_LOOKUP | IT_GETATTR) &&
499             req_capsule_has_field(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
500                                   RCL_CLIENT) &&
501             op_data->op_file_secctx_name_size > 0 &&
502             op_data->op_file_secctx_name != NULL) {
503                 have_secctx = true;
504                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
505                                      RCL_CLIENT,
506                                      op_data->op_file_secctx_name_size);
507         }
508
509         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
510                              op_data->op_namelen + 1);
511
512         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
513         if (rc) {
514                 ptlrpc_request_free(req);
515                 RETURN(ERR_PTR(rc));
516         }
517
518         /* pack the intent */
519         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
520         lit->opc = (__u64)it->it_op;
521
522         if (obddev->u.cli.cl_default_mds_easize > 0)
523                 easize = obddev->u.cli.cl_default_mds_easize;
524         else
525                 easize = obddev->u.cli.cl_max_mds_easize;
526
527         /* pack the intended request */
528         mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
529
530         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
531         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
532
533         if (have_secctx) {
534                 char *secctx_name;
535
536                 secctx_name = req_capsule_client_get(&req->rq_pill,
537                                                      &RMF_FILE_SECCTX_NAME);
538                 memcpy(secctx_name, op_data->op_file_secctx_name,
539                        op_data->op_file_secctx_name_size);
540
541                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
542                                      RCL_SERVER, easize);
543
544                 CDEBUG(D_SEC, "packed '%.*s' as security xattr name\n",
545                        op_data->op_file_secctx_name_size,
546                        op_data->op_file_secctx_name);
547         } else {
548                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
549                                      RCL_SERVER, 0);
550         }
551
552         ptlrpc_request_set_replen(req);
553         RETURN(req);
554 }
555
556 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
557                                                      struct lookup_intent *it,
558                                                      struct md_op_data *op_data)
559 {
560         struct obd_device     *obd = class_exp2obd(exp);
561         struct ptlrpc_request *req;
562         struct ldlm_intent    *lit;
563         struct layout_intent  *layout;
564         int rc;
565         ENTRY;
566
567         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
568                                 &RQF_LDLM_INTENT_LAYOUT);
569         if (req == NULL)
570                 RETURN(ERR_PTR(-ENOMEM));
571
572         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
573         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
574         if (rc) {
575                 ptlrpc_request_free(req);
576                 RETURN(ERR_PTR(rc));
577         }
578
579         /* pack the intent */
580         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
581         lit->opc = (__u64)it->it_op;
582
583         /* pack the layout intent request */
584         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
585         LASSERT(op_data->op_data != NULL);
586         LASSERT(op_data->op_data_size == sizeof(*layout));
587         memcpy(layout, op_data->op_data, sizeof(*layout));
588
589         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
590                              obd->u.cli.cl_default_mds_easize);
591         ptlrpc_request_set_replen(req);
592         RETURN(req);
593 }
594
595 static struct ptlrpc_request *
596 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
597 {
598         struct ptlrpc_request *req;
599         int rc;
600         ENTRY;
601
602         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
603         if (req == NULL)
604                 RETURN(ERR_PTR(-ENOMEM));
605
606         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
607         if (rc) {
608                 ptlrpc_request_free(req);
609                 RETURN(ERR_PTR(rc));
610         }
611
612         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
613         ptlrpc_request_set_replen(req);
614         RETURN(req);
615 }
616
617 static int mdc_finish_enqueue(struct obd_export *exp,
618                               struct ptlrpc_request *req,
619                               struct ldlm_enqueue_info *einfo,
620                               struct lookup_intent *it,
621                               struct lustre_handle *lockh,
622                               int rc)
623 {
624         struct req_capsule  *pill = &req->rq_pill;
625         struct ldlm_request *lockreq;
626         struct ldlm_reply   *lockrep;
627         struct ldlm_lock    *lock;
628         struct mdt_body     *body = NULL;
629         void                *lvb_data = NULL;
630         __u32                lvb_len = 0;
631
632         ENTRY;
633
634         LASSERT(rc >= 0);
635         /* Similarly, if we're going to replay this request, we don't want to
636          * actually get a lock, just perform the intent. */
637         if (req->rq_transno || req->rq_replay) {
638                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
639                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
640         }
641
642         if (rc == ELDLM_LOCK_ABORTED) {
643                 einfo->ei_mode = 0;
644                 memset(lockh, 0, sizeof(*lockh));
645                 rc = 0;
646         } else { /* rc = 0 */
647                 lock = ldlm_handle2lock(lockh);
648                 LASSERT(lock != NULL);
649
650                 /* If the server gave us back a different lock mode, we should
651                  * fix up our variables. */
652                 if (lock->l_req_mode != einfo->ei_mode) {
653                         ldlm_lock_addref(lockh, lock->l_req_mode);
654                         ldlm_lock_decref(lockh, einfo->ei_mode);
655                         einfo->ei_mode = lock->l_req_mode;
656                 }
657                 LDLM_LOCK_PUT(lock);
658         }
659
660         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
661         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
662
663         it->it_disposition = (int)lockrep->lock_policy_res1;
664         it->it_status = (int)lockrep->lock_policy_res2;
665         it->it_lock_mode = einfo->ei_mode;
666         it->it_lock_handle = lockh->cookie;
667         it->it_request = req;
668
669         /* Technically speaking rq_transno must already be zero if
670          * it_status is in error, so the check is a bit redundant */
671         if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
672                 mdc_clear_replay_flag(req, it->it_status);
673
674         /* If we're doing an IT_OPEN which did not result in an actual
675          * successful open, then we need to remove the bit which saves
676          * this request for unconditional replay.
677          *
678          * It's important that we do this first!  Otherwise we might exit the
679          * function without doing so, and try to replay a failed create
680          * (bug 3440) */
681         if (it->it_op & IT_OPEN && req->rq_replay &&
682             (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
683                 mdc_clear_replay_flag(req, it->it_status);
684
685         DEBUG_REQ(D_RPCTRACE, req, "op: %x disposition: %x, status: %d",
686                   it->it_op, it->it_disposition, it->it_status);
687
688         /* We know what to expect, so we do any byte flipping required here */
689         if (it_has_reply_body(it)) {
690                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
691                 if (body == NULL) {
692                         CERROR ("Can't swab mdt_body\n");
693                         RETURN (-EPROTO);
694                 }
695
696                 if (it_disposition(it, DISP_OPEN_OPEN) &&
697                     !it_open_error(DISP_OPEN_OPEN, it)) {
698                         /*
699                          * If this is a successful OPEN request, we need to set
700                          * replay handler and data early, so that if replay
701                          * happens immediately after swabbing below, new reply
702                          * is swabbed by that handler correctly.
703                          */
704                         mdc_set_open_replay_data(NULL, NULL, it);
705                 }
706
707                 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
708                         void *eadata;
709
710                         mdc_update_max_ea_from_body(exp, body);
711
712                         /*
713                          * The eadata is opaque; just check that it is there.
714                          * Eventually, obd_unpackmd() will check the contents.
715                          */
716                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
717                                                         body->mbo_eadatasize);
718                         if (eadata == NULL)
719                                 RETURN(-EPROTO);
720
721                         /* save lvb data and length in case this is for layout
722                          * lock */
723                         lvb_data = eadata;
724                         lvb_len = body->mbo_eadatasize;
725
726                         /*
727                          * We save the reply LOV EA in case we have to replay a
728                          * create for recovery.  If we didn't allocate a large
729                          * enough request buffer above we need to reallocate it
730                          * here to hold the actual LOV EA.
731                          *
732                          * To not save LOV EA if request is not going to replay
733                          * (for example error one).
734                          */
735                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
736                                 rc = mdc_save_lovea(req, &RMF_EADATA, eadata,
737                                                     body->mbo_eadatasize);
738                                 if (rc) {
739                                         body->mbo_valid &= ~OBD_MD_FLEASIZE;
740                                         body->mbo_eadatasize = 0;
741                                         rc = 0;
742                                 }
743                         }
744                 }
745         } else if (it->it_op & IT_LAYOUT) {
746                 /* maybe the lock was granted right away and layout
747                  * is packed into RMF_DLM_LVB of req */
748                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
749                 CDEBUG(D_INFO, "%s: layout return lvb %d transno %lld\n",
750                        class_exp2obd(exp)->obd_name, lvb_len, req->rq_transno);
751                 if (lvb_len > 0) {
752                         lvb_data = req_capsule_server_sized_get(pill,
753                                                         &RMF_DLM_LVB, lvb_len);
754                         if (lvb_data == NULL)
755                                 RETURN(-EPROTO);
756
757                         /**
758                          * save replied layout data to the request buffer for
759                          * recovery consideration (lest MDS reinitialize
760                          * another set of OST objects).
761                          */
762                         if (req->rq_transno)
763                                 (void)mdc_save_lovea(req, &RMF_EADATA, lvb_data,
764                                                      lvb_len);
765                 }
766         }
767
768         /* fill in stripe data for layout lock.
769          * LU-6581: trust layout data only if layout lock is granted. The MDT
770          * has stopped sending layout unless the layout lock is granted. The
771          * client still does this checking in case it's talking with an old
772          * server. - Jinshan */
773         lock = ldlm_handle2lock(lockh);
774         if (lock == NULL)
775                 RETURN(rc);
776
777         if (ldlm_has_layout(lock) && lvb_data != NULL &&
778             !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
779                 void *lmm;
780
781                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
782                         ldlm_it2str(it->it_op), lvb_len);
783
784                 OBD_ALLOC_LARGE(lmm, lvb_len);
785                 if (lmm == NULL)
786                         GOTO(out_lock, rc = -ENOMEM);
787
788                 memcpy(lmm, lvb_data, lvb_len);
789
790                 /* install lvb_data */
791                 lock_res_and_lock(lock);
792                 if (lock->l_lvb_data == NULL) {
793                         lock->l_lvb_type = LVB_T_LAYOUT;
794                         lock->l_lvb_data = lmm;
795                         lock->l_lvb_len = lvb_len;
796                         lmm = NULL;
797                 }
798                 unlock_res_and_lock(lock);
799                 if (lmm != NULL)
800                         OBD_FREE_LARGE(lmm, lvb_len);
801         }
802
803         if (ldlm_has_dom(lock)) {
804                 LASSERT(lock->l_glimpse_ast == mdc_ldlm_glimpse_ast);
805
806                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
807                 if (!(body->mbo_valid & OBD_MD_DOM_SIZE)) {
808                         LDLM_ERROR(lock, "%s: DoM lock without size.",
809                                    exp->exp_obd->obd_name);
810                         GOTO(out_lock, rc = -EPROTO);
811                 }
812
813                 LDLM_DEBUG(lock, "DoM lock is returned by: %s, size: %llu",
814                            ldlm_it2str(it->it_op), body->mbo_dom_size);
815
816                 rc = mdc_fill_lvb(req, &lock->l_ost_lvb);
817         }
818 out_lock:
819         LDLM_LOCK_PUT(lock);
820
821         RETURN(rc);
822 }
823
824 /* We always reserve enough space in the reply packet for a stripe MD, because
825  * we don't know in advance the file type. */
826 static int mdc_enqueue_base(struct obd_export *exp,
827                             struct ldlm_enqueue_info *einfo,
828                             const union ldlm_policy_data *policy,
829                             struct lookup_intent *it,
830                             struct md_op_data *op_data,
831                             struct lustre_handle *lockh,
832                             __u64 extra_lock_flags)
833 {
834         struct obd_device *obddev = class_exp2obd(exp);
835         struct ptlrpc_request *req = NULL;
836         __u64 flags, saved_flags = extra_lock_flags;
837         struct ldlm_res_id res_id;
838         static const union ldlm_policy_data lookup_policy = {
839                                   .l_inodebits = { MDS_INODELOCK_LOOKUP } };
840         static const union ldlm_policy_data update_policy = {
841                                   .l_inodebits = { MDS_INODELOCK_UPDATE } };
842         static const union ldlm_policy_data layout_policy = {
843                                   .l_inodebits = { MDS_INODELOCK_LAYOUT } };
844         static const union ldlm_policy_data getxattr_policy = {
845                                   .l_inodebits = { MDS_INODELOCK_XATTR } };
846         int generation, resends = 0;
847         struct ldlm_reply *lockrep;
848         struct obd_import *imp = class_exp2cliimp(exp);
849         __u32 acl_bufsize;
850         enum lvb_type lvb_type = 0;
851         int rc;
852         ENTRY;
853
854         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
855                  einfo->ei_type);
856         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
857
858         if (it != NULL) {
859                 LASSERT(policy == NULL);
860
861                 saved_flags |= LDLM_FL_HAS_INTENT;
862                 if (it->it_op & (IT_GETATTR | IT_READDIR))
863                         policy = &update_policy;
864                 else if (it->it_op & IT_LAYOUT)
865                         policy = &layout_policy;
866                 else if (it->it_op & IT_GETXATTR)
867                         policy = &getxattr_policy;
868                 else
869                         policy = &lookup_policy;
870         }
871
872         generation = obddev->u.cli.cl_import->imp_generation;
873         if (!it || (it->it_op & (IT_OPEN | IT_CREAT)))
874                 acl_bufsize = MIN(imp->imp_connect_data.ocd_max_easize,
875                                   XATTR_SIZE_MAX);
876         else
877                 acl_bufsize = LUSTRE_POSIX_ACL_MAX_SIZE_OLD;
878
879 resend:
880         flags = saved_flags;
881         if (it == NULL) {
882                 /* The only way right now is FLOCK. */
883                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
884                          einfo->ei_type);
885                 res_id.name[3] = LDLM_FLOCK;
886         } else if (it->it_op & IT_OPEN) {
887                 req = mdc_intent_open_pack(exp, it, op_data, acl_bufsize);
888         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
889                 req = mdc_intent_getattr_pack(exp, it, op_data, acl_bufsize);
890         } else if (it->it_op & IT_READDIR) {
891                 req = mdc_enqueue_pack(exp, 0);
892         } else if (it->it_op & IT_LAYOUT) {
893                 if (!imp_connect_lvb_type(imp))
894                         RETURN(-EOPNOTSUPP);
895                 req = mdc_intent_layout_pack(exp, it, op_data);
896                 lvb_type = LVB_T_LAYOUT;
897         } else if (it->it_op & IT_GETXATTR) {
898                 req = mdc_intent_getxattr_pack(exp, it, op_data);
899         } else {
900                 LBUG();
901                 RETURN(-EINVAL);
902         }
903
904         if (IS_ERR(req))
905                 RETURN(PTR_ERR(req));
906
907         if (resends) {
908                 req->rq_generation_set = 1;
909                 req->rq_import_generation = generation;
910                 req->rq_sent = ktime_get_real_seconds() + resends;
911         }
912
913         /* It is important to obtain modify RPC slot first (if applicable), so
914          * that threads that are waiting for a modify RPC slot are not polluting
915          * our rpcs in flight counter.
916          * We do not do flock request limiting, though */
917         if (it) {
918                 mdc_get_mod_rpc_slot(req, it);
919                 rc = obd_get_request_slot(&obddev->u.cli);
920                 if (rc != 0) {
921                         mdc_put_mod_rpc_slot(req, it);
922                         mdc_clear_replay_flag(req, 0);
923                         ptlrpc_req_finished(req);
924                         RETURN(rc);
925                 }
926         }
927
928         /* With Data-on-MDT the glimpse callback is needed too.
929          * It is set here in advance but not in mdc_finish_enqueue()
930          * to avoid possible races. It is safe to have glimpse handler
931          * for non-DOM locks and costs nothing.*/
932         if (einfo->ei_cb_gl == NULL)
933                 einfo->ei_cb_gl = mdc_ldlm_glimpse_ast;
934
935         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
936                               0, lvb_type, lockh, 0);
937         if (!it) {
938                 /* For flock requests we immediatelly return without further
939                    delay and let caller deal with the rest, since rest of
940                    this function metadata processing makes no sense for flock
941                    requests anyway. But in case of problem during comms with
942                    Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
943                    can not rely on caller and this mainly for F_UNLCKs
944                    (explicits or automatically generated by Kernel to clean
945                    current FLocks upon exit) that can't be trashed */
946                 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
947                     (einfo->ei_type == LDLM_FLOCK) &&
948                     (einfo->ei_mode == LCK_NL))
949                         goto resend;
950                 RETURN(rc);
951         }
952
953         obd_put_request_slot(&obddev->u.cli);
954         mdc_put_mod_rpc_slot(req, it);
955
956         if (rc < 0) {
957                 CDEBUG(D_INFO,
958                       "%s: ldlm_cli_enqueue "DFID":"DFID"=%s failed: rc = %d\n",
959                       obddev->obd_name, PFID(&op_data->op_fid1),
960                       PFID(&op_data->op_fid2), op_data->op_name ?: "", rc);
961
962                 mdc_clear_replay_flag(req, rc);
963                 ptlrpc_req_finished(req);
964                 RETURN(rc);
965         }
966
967         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
968         LASSERT(lockrep != NULL);
969
970         lockrep->lock_policy_res2 =
971                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
972
973         /* Retry infinitely when the server returns -EINPROGRESS for the
974          * intent operation, when server returns -EINPROGRESS for acquiring
975          * intent lock, we'll retry in after_reply(). */
976         if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
977                 mdc_clear_replay_flag(req, rc);
978                 ptlrpc_req_finished(req);
979                 if (generation == obddev->u.cli.cl_import->imp_generation) {
980                         if (signal_pending(current))
981                                 RETURN(-EINTR);
982
983                         resends++;
984                         CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
985                                obddev->obd_name, resends, it->it_op,
986                                PFID(&op_data->op_fid1),
987                                PFID(&op_data->op_fid2));
988                         goto resend;
989                 } else {
990                         CDEBUG(D_HA, "resend cross eviction\n");
991                         RETURN(-EIO);
992                 }
993         }
994
995         if ((int)lockrep->lock_policy_res2 == -ERANGE &&
996             it->it_op & (IT_OPEN | IT_GETATTR | IT_LOOKUP) &&
997             acl_bufsize == LUSTRE_POSIX_ACL_MAX_SIZE_OLD) {
998                 mdc_clear_replay_flag(req, -ERANGE);
999                 ptlrpc_req_finished(req);
1000                 acl_bufsize = MIN(imp->imp_connect_data.ocd_max_easize,
1001                                   XATTR_SIZE_MAX);
1002                 goto resend;
1003         }
1004
1005         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1006         if (rc < 0) {
1007                 if (lustre_handle_is_used(lockh)) {
1008                         ldlm_lock_decref(lockh, einfo->ei_mode);
1009                         memset(lockh, 0, sizeof(*lockh));
1010                 }
1011                 ptlrpc_req_finished(req);
1012
1013                 it->it_lock_handle = 0;
1014                 it->it_lock_mode = 0;
1015                 it->it_request = NULL;
1016         }
1017
1018         RETURN(rc);
1019 }
1020
1021 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
1022                 const union ldlm_policy_data *policy,
1023                 struct md_op_data *op_data,
1024                 struct lustre_handle *lockh, __u64 extra_lock_flags)
1025 {
1026         return mdc_enqueue_base(exp, einfo, policy, NULL,
1027                                 op_data, lockh, extra_lock_flags);
1028 }
1029
1030 static int mdc_finish_intent_lock(struct obd_export *exp,
1031                                   struct ptlrpc_request *request,
1032                                   struct md_op_data *op_data,
1033                                   struct lookup_intent *it,
1034                                   struct lustre_handle *lockh)
1035 {
1036         struct lustre_handle old_lock;
1037         struct ldlm_lock *lock;
1038         int rc = 0;
1039         ENTRY;
1040
1041         LASSERT(request != NULL);
1042         LASSERT(request != LP_POISON);
1043         LASSERT(request->rq_repmsg != LP_POISON);
1044
1045         if (it->it_op & IT_READDIR)
1046                 RETURN(0);
1047
1048         if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) {
1049                 if (it->it_status != 0)
1050                         GOTO(out, rc = it->it_status);
1051         } else {
1052                 if (!it_disposition(it, DISP_IT_EXECD)) {
1053                         /* The server failed before it even started executing
1054                          * the intent, i.e. because it couldn't unpack the
1055                          * request.
1056                          */
1057                         LASSERT(it->it_status != 0);
1058                         GOTO(out, rc = it->it_status);
1059                 }
1060                 rc = it_open_error(DISP_IT_EXECD, it);
1061                 if (rc)
1062                         GOTO(out, rc);
1063
1064                 rc = it_open_error(DISP_LOOKUP_EXECD, it);
1065                 if (rc)
1066                         GOTO(out, rc);
1067
1068                 /* keep requests around for the multiple phases of the call
1069                  * this shows the DISP_XX must guarantee we make it into the
1070                  * call
1071                  */
1072                 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
1073                     it_disposition(it, DISP_OPEN_CREATE) &&
1074                     !it_open_error(DISP_OPEN_CREATE, it)) {
1075                         it_set_disposition(it, DISP_ENQ_CREATE_REF);
1076                         /* balanced in ll_create_node */
1077                         ptlrpc_request_addref(request);
1078                 }
1079                 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1080                     it_disposition(it, DISP_OPEN_OPEN) &&
1081                     !it_open_error(DISP_OPEN_OPEN, it)) {
1082                         it_set_disposition(it, DISP_ENQ_OPEN_REF);
1083                         /* balanced in ll_file_open */
1084                         ptlrpc_request_addref(request);
1085                         /* BUG 11546 - eviction in the middle of open rpc
1086                          * processing
1087                          */
1088                         OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE,
1089                                          obd_timeout);
1090                 }
1091
1092                 if (it->it_op & IT_CREAT) {
1093                         /* XXX this belongs in ll_create_it */
1094                 } else if (it->it_op == IT_OPEN) {
1095                         LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1096                 } else {
1097                         LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
1098                 }
1099         }
1100
1101         /* If we already have a matching lock, then cancel the new
1102          * one.  We have to set the data here instead of in
1103          * mdc_enqueue, because we need to use the child's inode as
1104          * the l_ast_data to match, and that's not available until
1105          * intent_finish has performed the iget().) */
1106         lock = ldlm_handle2lock(lockh);
1107         if (lock) {
1108                 union ldlm_policy_data policy = lock->l_policy_data;
1109                 LDLM_DEBUG(lock, "matching against this");
1110
1111                 if (it_has_reply_body(it)) {
1112                         struct mdt_body *body;
1113
1114                         body = req_capsule_server_get(&request->rq_pill,
1115                                                       &RMF_MDT_BODY);
1116                         /* mdc_enqueue checked */
1117                         LASSERT(body != NULL);
1118                         LASSERTF(fid_res_name_eq(&body->mbo_fid1,
1119                                                  &lock->l_resource->lr_name),
1120                                  "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1121                                  PLDLMRES(lock->l_resource),
1122                                  PFID(&body->mbo_fid1));
1123                 }
1124                 LDLM_LOCK_PUT(lock);
1125
1126                 memcpy(&old_lock, lockh, sizeof(*lockh));
1127                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1128                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1129                         ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
1130                         memcpy(lockh, &old_lock, sizeof(old_lock));
1131                         it->it_lock_handle = lockh->cookie;
1132                 }
1133         }
1134
1135         EXIT;
1136 out:
1137         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1138                 (int)op_data->op_namelen, op_data->op_name,
1139                 ldlm_it2str(it->it_op), it->it_status,
1140                 it->it_disposition, rc);
1141         return rc;
1142 }
1143
1144 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1145                         struct lu_fid *fid, __u64 *bits)
1146 {
1147         /* We could just return 1 immediately, but since we should only
1148          * be called in revalidate_it if we already have a lock, let's
1149          * verify that. */
1150         struct ldlm_res_id res_id;
1151         struct lustre_handle lockh;
1152         union ldlm_policy_data policy;
1153         enum ldlm_mode mode;
1154         ENTRY;
1155
1156         if (it->it_lock_handle) {
1157                 lockh.cookie = it->it_lock_handle;
1158                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1159         } else {
1160                 fid_build_reg_res_name(fid, &res_id);
1161                 switch (it->it_op) {
1162                 case IT_GETATTR:
1163                         /* File attributes are held under multiple bits:
1164                          * nlink is under lookup lock, size and times are
1165                          * under UPDATE lock and recently we've also got
1166                          * a separate permissions lock for owner/group/acl that
1167                          * were protected by lookup lock before.
1168                          * Getattr must provide all of that information,
1169                          * so we need to ensure we have all of those locks.
1170                          * Unfortunately, if the bits are split across multiple
1171                          * locks, there's no easy way to match all of them here,
1172                          * so an extra RPC would be performed to fetch all
1173                          * of those bits at once for now. */
1174                         /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1175                          * but for old MDTs (< 2.4), permission is covered
1176                          * by LOOKUP lock, so it needs to match all bits here.*/
1177                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1178                                                   MDS_INODELOCK_LOOKUP |
1179                                                   MDS_INODELOCK_PERM;
1180                         break;
1181                 case IT_READDIR:
1182                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1183                         break;
1184                 case IT_LAYOUT:
1185                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1186                         break;
1187                 default:
1188                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1189                         break;
1190                 }
1191
1192                 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1193                                       LDLM_IBITS, &policy,
1194                                       LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1195                                       &lockh);
1196         }
1197
1198         if (mode) {
1199                 it->it_lock_handle = lockh.cookie;
1200                 it->it_lock_mode = mode;
1201         } else {
1202                 it->it_lock_handle = 0;
1203                 it->it_lock_mode = 0;
1204         }
1205
1206         RETURN(!!mode);
1207 }
1208
1209 /*
1210  * This long block is all about fixing up the lock and request state
1211  * so that it is correct as of the moment _before_ the operation was
1212  * applied; that way, the VFS will think that everything is normal and
1213  * call Lustre's regular VFS methods.
1214  *
1215  * If we're performing a creation, that means that unless the creation
1216  * failed with EEXIST, we should fake up a negative dentry.
1217  *
1218  * For everything else, we want to lookup to succeed.
1219  *
1220  * One additional note: if CREATE or OPEN succeeded, we add an extra
1221  * reference to the request because we need to keep it around until
1222  * ll_create/ll_open gets called.
1223  *
1224  * The server will return to us, in it_disposition, an indication of
1225  * exactly what it_status refers to.
1226  *
1227  * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1228  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1229  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1230  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1231  * was successful.
1232  *
1233  * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1234  * child lookup.
1235  */
1236 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1237                     struct lookup_intent *it, struct ptlrpc_request **reqp,
1238                     ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1239 {
1240         struct ldlm_enqueue_info einfo = {
1241                 .ei_type        = LDLM_IBITS,
1242                 .ei_mode        = it_to_lock_mode(it),
1243                 .ei_cb_bl       = cb_blocking,
1244                 .ei_cb_cp       = ldlm_completion_ast,
1245                 .ei_cb_gl       = mdc_ldlm_glimpse_ast,
1246         };
1247         struct lustre_handle lockh;
1248         int rc = 0;
1249         ENTRY;
1250         LASSERT(it);
1251
1252         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1253                 ", intent: %s flags %#llo\n", (int)op_data->op_namelen,
1254                 op_data->op_name, PFID(&op_data->op_fid2),
1255                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1256                 it->it_flags);
1257
1258         lockh.cookie = 0;
1259         if (fid_is_sane(&op_data->op_fid2) &&
1260             (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1261                 /* We could just return 1 immediately, but since we should only
1262                  * be called in revalidate_it if we already have a lock, let's
1263                  * verify that. */
1264                 it->it_lock_handle = 0;
1265                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1266                 /* Only return failure if it was not GETATTR by cfid
1267                    (from inode_revalidate) */
1268                 if (rc || op_data->op_namelen != 0)
1269                         RETURN(rc);
1270         }
1271
1272         /* For case if upper layer did not alloc fid, do it now. */
1273         if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1274                 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1275                 if (rc < 0) {
1276                         CERROR("Can't alloc new fid, rc %d\n", rc);
1277                         RETURN(rc);
1278                 }
1279         }
1280
1281         rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh,
1282                               extra_lock_flags);
1283         if (rc < 0)
1284                 RETURN(rc);
1285
1286         *reqp = it->it_request;
1287         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1288         RETURN(rc);
1289 }
1290
1291 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1292                                               struct ptlrpc_request *req,
1293                                               void *args, int rc)
1294 {
1295         struct mdc_getattr_args  *ga = args;
1296         struct obd_export *exp = ga->ga_exp;
1297         struct md_enqueue_info *minfo = ga->ga_minfo;
1298         struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1299         struct lookup_intent *it;
1300         struct lustre_handle *lockh;
1301         struct obd_device *obddev;
1302         struct ldlm_reply *lockrep;
1303         __u64 flags = LDLM_FL_HAS_INTENT;
1304         ENTRY;
1305
1306         it    = &minfo->mi_it;
1307         lockh = &minfo->mi_lockh;
1308
1309         obddev = class_exp2obd(exp);
1310
1311         obd_put_request_slot(&obddev->u.cli);
1312         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1313                 rc = -ETIMEDOUT;
1314
1315         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1316                                    &flags, NULL, 0, lockh, rc);
1317         if (rc < 0) {
1318                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1319                 mdc_clear_replay_flag(req, rc);
1320                 GOTO(out, rc);
1321         }
1322
1323         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1324         LASSERT(lockrep != NULL);
1325
1326         lockrep->lock_policy_res2 =
1327                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1328
1329         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1330         if (rc)
1331                 GOTO(out, rc);
1332
1333         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1334         EXIT;
1335
1336 out:
1337         minfo->mi_cb(req, minfo, rc);
1338         return 0;
1339 }
1340
1341 int mdc_intent_getattr_async(struct obd_export *exp,
1342                              struct md_enqueue_info *minfo)
1343 {
1344         struct md_op_data       *op_data = &minfo->mi_data;
1345         struct lookup_intent    *it = &minfo->mi_it;
1346         struct ptlrpc_request   *req;
1347         struct mdc_getattr_args *ga;
1348         struct obd_device       *obddev = class_exp2obd(exp);
1349         struct ldlm_res_id       res_id;
1350         union ldlm_policy_data policy = {
1351                                 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1352                                                  MDS_INODELOCK_UPDATE } };
1353         int                      rc = 0;
1354         __u64                    flags = LDLM_FL_HAS_INTENT;
1355         ENTRY;
1356
1357         CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#llo\n",
1358                 (int)op_data->op_namelen, op_data->op_name,
1359                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1360
1361         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1362         /* If the MDT return -ERANGE because of large ACL, then the sponsor
1363          * of the async getattr RPC will handle that by itself. */
1364         req = mdc_intent_getattr_pack(exp, it, op_data,
1365                                       LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
1366         if (IS_ERR(req))
1367                 RETURN(PTR_ERR(req));
1368
1369         rc = obd_get_request_slot(&obddev->u.cli);
1370         if (rc != 0) {
1371                 ptlrpc_req_finished(req);
1372                 RETURN(rc);
1373         }
1374
1375         /* With Data-on-MDT the glimpse callback is needed too.
1376          * It is set here in advance but not in mdc_finish_enqueue()
1377          * to avoid possible races. It is safe to have glimpse handler
1378          * for non-DOM locks and costs nothing.*/
1379         if (minfo->mi_einfo.ei_cb_gl == NULL)
1380                 minfo->mi_einfo.ei_cb_gl = mdc_ldlm_glimpse_ast;
1381
1382         rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1383                               &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1384         if (rc < 0) {
1385                 obd_put_request_slot(&obddev->u.cli);
1386                 ptlrpc_req_finished(req);
1387                 RETURN(rc);
1388         }
1389
1390         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1391         ga = ptlrpc_req_async_args(req);
1392         ga->ga_exp = exp;
1393         ga->ga_minfo = minfo;
1394
1395         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1396         ptlrpcd_add_req(req);
1397
1398         RETURN(0);
1399 }