Whamcloud - gitweb
7923428aff564f93e7a9053c812daa0c70973c2f
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_MDC
34
35 #include <linux/module.h>
36
37 #include <obd.h>
38 #include <obd_class.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_intent.h>
42 #include <lustre_mdc.h>
43 #include <lustre_net.h>
44 #include <lustre_req_layout.h>
45 #include <lustre_swab.h>
46 #include <lustre_acl.h>
47
48 #include "mdc_internal.h"
49
50 struct mdc_getattr_args {
51         struct obd_export               *ga_exp;
52         struct md_enqueue_info          *ga_minfo;
53 };
54
55 int it_open_error(int phase, struct lookup_intent *it)
56 {
57         if (it_disposition(it, DISP_OPEN_LEASE)) {
58                 if (phase >= DISP_OPEN_LEASE)
59                         return it->it_status;
60                 else
61                         return 0;
62         }
63         if (it_disposition(it, DISP_OPEN_OPEN)) {
64                 if (phase >= DISP_OPEN_OPEN)
65                         return it->it_status;
66                 else
67                         return 0;
68         }
69
70         if (it_disposition(it, DISP_OPEN_CREATE)) {
71                 if (phase >= DISP_OPEN_CREATE)
72                         return it->it_status;
73                 else
74                         return 0;
75         }
76
77         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
78                 if (phase >= DISP_LOOKUP_EXECD)
79                         return it->it_status;
80                 else
81                         return 0;
82         }
83
84         if (it_disposition(it, DISP_IT_EXECD)) {
85                 if (phase >= DISP_IT_EXECD)
86                         return it->it_status;
87                 else
88                         return 0;
89         }
90
91         CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
92         LBUG();
93
94         return 0;
95 }
96 EXPORT_SYMBOL(it_open_error);
97
98 /* this must be called on a lockh that is known to have a referenced lock */
99 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
100                       void *data, __u64 *bits)
101 {
102         struct ldlm_lock *lock;
103         struct inode *new_inode = data;
104         ENTRY;
105
106         if(bits)
107                 *bits = 0;
108
109         if (!lustre_handle_is_used(lockh))
110                 RETURN(0);
111
112         lock = ldlm_handle2lock(lockh);
113
114         LASSERT(lock != NULL);
115         lock_res_and_lock(lock);
116         if (lock->l_resource->lr_lvb_inode &&
117             lock->l_resource->lr_lvb_inode != data) {
118                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
119                 LASSERTF(old_inode->i_state & I_FREEING,
120                          "Found existing inode %p/%lu/%u state %lu in lock: "
121                          "setting data to %p/%lu/%u\n", old_inode,
122                          old_inode->i_ino, old_inode->i_generation,
123                          old_inode->i_state,
124                          new_inode, new_inode->i_ino, new_inode->i_generation);
125         }
126         lock->l_resource->lr_lvb_inode = new_inode;
127         if (bits)
128                 *bits = lock->l_policy_data.l_inodebits.bits;
129
130         unlock_res_and_lock(lock);
131         LDLM_LOCK_PUT(lock);
132
133         RETURN(0);
134 }
135
136 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
137                               const struct lu_fid *fid, enum ldlm_type type,
138                               union ldlm_policy_data *policy,
139                               enum ldlm_mode mode, struct lustre_handle *lockh)
140 {
141         struct ldlm_res_id res_id;
142         enum ldlm_mode rc;
143         ENTRY;
144
145         fid_build_reg_res_name(fid, &res_id);
146         /* LU-4405: Clear bits not supported by server */
147         policy->l_inodebits.bits &= exp_connect_ibits(exp);
148         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
149                              &res_id, type, policy, mode, lockh, 0);
150         RETURN(rc);
151 }
152
153 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
154                       union ldlm_policy_data *policy, enum ldlm_mode mode,
155                       enum ldlm_cancel_flags flags, void *opaque)
156 {
157         struct obd_device *obd = class_exp2obd(exp);
158         struct ldlm_res_id res_id;
159         int rc;
160
161         ENTRY;
162
163         fid_build_reg_res_name(fid, &res_id);
164         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
165                                              policy, mode, flags, opaque);
166         RETURN(rc);
167 }
168
169 int mdc_null_inode(struct obd_export *exp,
170                    const struct lu_fid *fid)
171 {
172         struct ldlm_res_id res_id;
173         struct ldlm_resource *res;
174         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
175         ENTRY;
176
177         LASSERTF(ns != NULL, "no namespace passed\n");
178
179         fid_build_reg_res_name(fid, &res_id);
180
181         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
182         if (IS_ERR(res))
183                 RETURN(0);
184
185         lock_res(res);
186         res->lr_lvb_inode = NULL;
187         unlock_res(res);
188
189         ldlm_resource_putref(res);
190         RETURN(0);
191 }
192
193 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
194 {
195         /* Don't hold error requests for replay. */
196         if (req->rq_replay) {
197                 spin_lock(&req->rq_lock);
198                 req->rq_replay = 0;
199                 spin_unlock(&req->rq_lock);
200         }
201         if (rc && req->rq_transno != 0) {
202                 DEBUG_REQ(D_ERROR, req, "transno returned on error: rc = %d",
203                           rc);
204                 LBUG();
205         }
206 }
207
208 /* Save a large LOV EA into the request buffer so that it is available
209  * for replay.  We don't do this in the initial request because the
210  * original request doesn't need this buffer (at most it sends just the
211  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
212  * buffer and may also be difficult to allocate and save a very large
213  * request buffer for each open. (bug 5707)
214  *
215  * OOM here may cause recovery failure if lmm is needed (only for the
216  * original open if the MDS crashed just when this client also OOM'd)
217  * but this is incredibly unlikely, and questionable whether the client
218  * could do MDS recovery under OOM anyways... */
219 int mdc_save_lovea(struct ptlrpc_request *req,
220                    const struct req_msg_field *field,
221                    void *data, u32 size)
222 {
223         struct req_capsule *pill = &req->rq_pill;
224         struct lov_user_md *lmm;
225         int rc = 0;
226
227         if (req_capsule_get_size(pill, field, RCL_CLIENT) < size) {
228                 rc = sptlrpc_cli_enlarge_reqbuf(req, field, size);
229                 if (rc) {
230                         CERROR("%s: Can't enlarge ea size to %d: rc = %d\n",
231                                req->rq_export->exp_obd->obd_name,
232                                size, rc);
233                         return rc;
234                 }
235         } else {
236                 req_capsule_shrink(pill, field, size, RCL_CLIENT);
237         }
238
239         req_capsule_set_size(pill, field, RCL_CLIENT, size);
240         lmm = req_capsule_client_get(pill, field);
241         if (lmm) {
242                 memcpy(lmm, data, size);
243                 /* overwrite layout generation returned from the MDS */
244                 lmm->lmm_stripe_offset =
245                   (typeof(lmm->lmm_stripe_offset))LOV_OFFSET_DEFAULT;
246         }
247
248         return rc;
249 }
250
251 static struct ptlrpc_request *
252 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
253                      struct md_op_data *op_data, __u32 acl_bufsize)
254 {
255         struct ptlrpc_request   *req;
256         struct obd_device       *obddev = class_exp2obd(exp);
257         struct ldlm_intent      *lit;
258         const void              *lmm = op_data->op_data;
259         __u32                    lmmsize = op_data->op_data_size;
260         __u32                    mdt_md_capsule_size;
261         LIST_HEAD(cancels);
262         int                      count = 0;
263         enum ldlm_mode           mode;
264         int                      rc;
265         int repsize, repsize_estimate;
266
267         ENTRY;
268
269         mdt_md_capsule_size = obddev->u.cli.cl_default_mds_easize;
270
271         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
272
273         /* XXX: openlock is not cancelled for cross-refs. */
274         /* If inode is known, cancel conflicting OPEN locks. */
275         if (fid_is_sane(&op_data->op_fid2)) {
276                 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
277                         if (it->it_flags & MDS_FMODE_WRITE)
278                                 mode = LCK_EX;
279                         else
280                                 mode = LCK_PR;
281                 } else {
282                         if (it->it_flags & (MDS_FMODE_WRITE | MDS_OPEN_TRUNC))
283                                 mode = LCK_CW;
284 #ifdef FMODE_EXEC
285                         else if (it->it_flags & FMODE_EXEC)
286                                 mode = LCK_PR;
287 #endif
288                         else
289                                 mode = LCK_CR;
290                 }
291                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
292                                                 &cancels, mode,
293                                                 MDS_INODELOCK_OPEN);
294         }
295
296         /* If CREATE, cancel parent's UPDATE lock. */
297         if (it->it_op & IT_CREAT)
298                 mode = LCK_EX;
299         else
300                 mode = LCK_CR;
301         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
302                                          &cancels, mode,
303                                          MDS_INODELOCK_UPDATE);
304
305         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
306                                    &RQF_LDLM_INTENT_OPEN);
307         if (req == NULL) {
308                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
309                 RETURN(ERR_PTR(-ENOMEM));
310         }
311
312         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
313                              op_data->op_namelen + 1);
314         if (cl_is_lov_delay_create(it->it_flags)) {
315                 /* open(O_LOV_DELAY_CREATE) won't pack lmm */
316                 LASSERT(lmmsize == 0);
317                 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
318         } else {
319                 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
320                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
321         }
322
323         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
324                              RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
325                              op_data->op_file_secctx_name_size : 0);
326
327         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
328                              op_data->op_file_secctx_size);
329
330         /* get SELinux policy info if any */
331         rc = sptlrpc_get_sepol(req);
332         if (rc < 0) {
333                 ptlrpc_request_free(req);
334                 RETURN(ERR_PTR(rc));
335         }
336         req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
337                              strlen(req->rq_sepol) ?
338                              strlen(req->rq_sepol) + 1 : 0);
339
340         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
341         if (rc < 0) {
342                 ptlrpc_request_free(req);
343                 RETURN(ERR_PTR(rc));
344         }
345
346         spin_lock(&req->rq_lock);
347         req->rq_replay = req->rq_import->imp_replayable;
348         spin_unlock(&req->rq_lock);
349
350         /* pack the intent */
351         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
352         lit->opc = (__u64)it->it_op;
353
354         /* pack the intended request */
355         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
356                       lmmsize);
357
358         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
359                              mdt_md_capsule_size);
360         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
361
362         if (!(it->it_op & IT_CREAT) && it->it_op & IT_OPEN &&
363             req_capsule_has_field(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
364                                   RCL_CLIENT) &&
365             op_data->op_file_secctx_name_size > 0 &&
366             op_data->op_file_secctx_name != NULL) {
367                 char *secctx_name;
368
369                 secctx_name = req_capsule_client_get(&req->rq_pill,
370                                                      &RMF_FILE_SECCTX_NAME);
371                 memcpy(secctx_name, op_data->op_file_secctx_name,
372                        op_data->op_file_secctx_name_size);
373                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
374                                      RCL_SERVER,
375                                      obddev->u.cli.cl_max_mds_easize);
376
377                 CDEBUG(D_SEC, "packed '%.*s' as security xattr name\n",
378                        op_data->op_file_secctx_name_size,
379                        op_data->op_file_secctx_name);
380
381         } else {
382                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
383                                      RCL_SERVER, 0);
384         }
385
386         /**
387          * Inline buffer for possible data from Data-on-MDT files.
388          */
389         req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE, RCL_SERVER,
390                              sizeof(struct niobuf_remote));
391         ptlrpc_request_set_replen(req);
392
393         /* Get real repbuf allocated size as rounded up power of 2 */
394         repsize = size_roundup_power2(req->rq_replen +
395                                       lustre_msg_early_size());
396         /* Estimate free space for DoM files in repbuf */
397         repsize_estimate = repsize - (req->rq_replen -
398                            mdt_md_capsule_size +
399                            sizeof(struct lov_comp_md_v1) +
400                            sizeof(struct lov_comp_md_entry_v1) +
401                            lov_mds_md_size(0, LOV_MAGIC_V3));
402
403         if (repsize_estimate < obddev->u.cli.cl_dom_min_inline_repsize) {
404                 repsize = obddev->u.cli.cl_dom_min_inline_repsize -
405                           repsize_estimate + sizeof(struct niobuf_remote);
406                 req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE,
407                                      RCL_SERVER,
408                                      sizeof(struct niobuf_remote) + repsize);
409                 ptlrpc_request_set_replen(req);
410                 CDEBUG(D_INFO, "Increase repbuf by %d bytes, total: %d\n",
411                        repsize, req->rq_replen);
412                 repsize = size_roundup_power2(req->rq_replen +
413                                               lustre_msg_early_size());
414         }
415         /* The only way to report real allocated repbuf size to the server
416          * is the lm_repsize but it must be set prior buffer allocation itself
417          * due to security reasons - it is part of buffer used in signature
418          * calculation (see LU-11414). Therefore the saved size is predicted
419          * value as rq_replen rounded to the next higher power of 2.
420          * Such estimation is safe. Though the final allocated buffer might
421          * be even larger, it is not possible to know that at this point.
422          */
423         req->rq_reqmsg->lm_repsize = repsize;
424         RETURN(req);
425 }
426
427 #define GA_DEFAULT_EA_NAME_LEN 20
428 #define GA_DEFAULT_EA_VAL_LEN  250
429 #define GA_DEFAULT_EA_NUM      10
430
431 static struct ptlrpc_request *
432 mdc_intent_getxattr_pack(struct obd_export *exp,
433                          struct lookup_intent *it,
434                          struct md_op_data *op_data)
435 {
436         struct ptlrpc_request   *req;
437         struct ldlm_intent      *lit;
438         int                     rc, count = 0;
439         LIST_HEAD(cancels);
440         u32 ea_vals_buf_size = GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM;
441
442         ENTRY;
443
444         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
445                                         &RQF_LDLM_INTENT_GETXATTR);
446         if (req == NULL)
447                 RETURN(ERR_PTR(-ENOMEM));
448
449         /* get SELinux policy info if any */
450         rc = sptlrpc_get_sepol(req);
451         if (rc < 0) {
452                 ptlrpc_request_free(req);
453                 RETURN(ERR_PTR(rc));
454         }
455         req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
456                              strlen(req->rq_sepol) ?
457                              strlen(req->rq_sepol) + 1 : 0);
458
459         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
460         if (rc) {
461                 ptlrpc_request_free(req);
462                 RETURN(ERR_PTR(rc));
463         }
464
465         /* pack the intent */
466         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
467         lit->opc = IT_GETXATTR;
468         CDEBUG(D_INFO, "%s: get xattrs for "DFID"\n",
469                exp->exp_obd->obd_name, PFID(&op_data->op_fid1));
470
471 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 0, 53, 0)
472         /* If the supplied buffer is too small then the server will
473          * return -ERANGE and llite will fallback to using non cached
474          * xattr operations. On servers before 2.10.1 a (non-cached)
475          * listxattr RPC for an orphan or dead file causes an oops. So
476          * let's try to avoid sending too small a buffer to too old a
477          * server. This is effectively undoing the memory conservation
478          * of LU-9417 when it would be *more* likely to crash the
479          * server. See LU-9856. */
480         if (exp->exp_connect_data.ocd_version < OBD_OCD_VERSION(2, 10, 1, 0))
481                 ea_vals_buf_size = max_t(u32, ea_vals_buf_size,
482                                          exp->exp_connect_data.ocd_max_easize);
483 #endif
484
485         /* pack the intended request */
486         mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid,
487                       ea_vals_buf_size, -1, 0);
488
489         /* get SELinux policy info if any */
490         mdc_file_sepol_pack(req);
491
492         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
493                              GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM);
494
495         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER,
496                              ea_vals_buf_size);
497
498         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS, RCL_SERVER,
499                              sizeof(u32) * GA_DEFAULT_EA_NUM);
500
501         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
502
503         ptlrpc_request_set_replen(req);
504
505         RETURN(req);
506 }
507
508 static struct ptlrpc_request *
509 mdc_intent_getattr_pack(struct obd_export *exp, struct lookup_intent *it,
510                         struct md_op_data *op_data, __u32 acl_bufsize)
511 {
512         struct ptlrpc_request *req;
513         struct obd_device *obddev = class_exp2obd(exp);
514         u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE |
515                     OBD_MD_FLDIREA | OBD_MD_MEA | OBD_MD_FLACL |
516                     OBD_MD_DEFAULT_MEA;
517         struct ldlm_intent *lit;
518         __u32 easize;
519         bool have_secctx = false;
520         int rc;
521
522         ENTRY;
523
524         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
525                                    &RQF_LDLM_INTENT_GETATTR);
526         if (req == NULL)
527                 RETURN(ERR_PTR(-ENOMEM));
528
529         /* send name of security xattr to get upon intent */
530         if (it->it_op & (IT_LOOKUP | IT_GETATTR) &&
531             req_capsule_has_field(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
532                                   RCL_CLIENT) &&
533             op_data->op_file_secctx_name_size > 0 &&
534             op_data->op_file_secctx_name != NULL) {
535                 have_secctx = true;
536                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
537                                      RCL_CLIENT,
538                                      op_data->op_file_secctx_name_size);
539         }
540
541         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
542                              op_data->op_namelen + 1);
543
544         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
545         if (rc) {
546                 ptlrpc_request_free(req);
547                 RETURN(ERR_PTR(rc));
548         }
549
550         /* pack the intent */
551         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
552         lit->opc = (__u64)it->it_op;
553
554         easize = obddev->u.cli.cl_default_mds_easize;
555
556         /* pack the intended request */
557         mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
558
559         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
560         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
561         req_capsule_set_size(&req->rq_pill, &RMF_DEFAULT_MDT_MD, RCL_SERVER,
562                              sizeof(struct lmv_user_md));
563
564         if (have_secctx) {
565                 char *secctx_name;
566
567                 secctx_name = req_capsule_client_get(&req->rq_pill,
568                                                      &RMF_FILE_SECCTX_NAME);
569                 memcpy(secctx_name, op_data->op_file_secctx_name,
570                        op_data->op_file_secctx_name_size);
571
572                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
573                                      RCL_SERVER, easize);
574
575                 CDEBUG(D_SEC, "packed '%.*s' as security xattr name\n",
576                        op_data->op_file_secctx_name_size,
577                        op_data->op_file_secctx_name);
578         } else {
579                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
580                                      RCL_SERVER, 0);
581         }
582
583         ptlrpc_request_set_replen(req);
584         RETURN(req);
585 }
586
587 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
588                                                      struct lookup_intent *it,
589                                                      struct md_op_data *op_data)
590 {
591         struct obd_device     *obd = class_exp2obd(exp);
592         LIST_HEAD(cancels);
593         struct ptlrpc_request *req;
594         struct ldlm_intent    *lit;
595         struct layout_intent  *layout;
596         int count = 0, rc;
597         ENTRY;
598
599         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
600                                 &RQF_LDLM_INTENT_LAYOUT);
601         if (req == NULL)
602                 RETURN(ERR_PTR(-ENOMEM));
603
604         if (fid_is_sane(&op_data->op_fid2) && (it->it_op & IT_LAYOUT) &&
605             (it->it_flags & FMODE_WRITE)) {
606                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
607                                                 &cancels, LCK_EX,
608                                                 MDS_INODELOCK_LAYOUT);
609         }
610
611         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
612         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
613         if (rc) {
614                 ptlrpc_request_free(req);
615                 RETURN(ERR_PTR(rc));
616         }
617
618         /* pack the intent */
619         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
620         lit->opc = (__u64)it->it_op;
621
622         /* pack the layout intent request */
623         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
624         LASSERT(op_data->op_data != NULL);
625         LASSERT(op_data->op_data_size == sizeof(*layout));
626         memcpy(layout, op_data->op_data, sizeof(*layout));
627
628         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
629                              obd->u.cli.cl_default_mds_easize);
630         ptlrpc_request_set_replen(req);
631         RETURN(req);
632 }
633
634 static struct ptlrpc_request *
635 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
636 {
637         struct ptlrpc_request *req;
638         int rc;
639         ENTRY;
640
641         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
642         if (req == NULL)
643                 RETURN(ERR_PTR(-ENOMEM));
644
645         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
646         if (rc) {
647                 ptlrpc_request_free(req);
648                 RETURN(ERR_PTR(rc));
649         }
650
651         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
652         ptlrpc_request_set_replen(req);
653         RETURN(req);
654 }
655
656 static int mdc_finish_enqueue(struct obd_export *exp,
657                               struct ptlrpc_request *req,
658                               struct ldlm_enqueue_info *einfo,
659                               struct lookup_intent *it,
660                               struct lustre_handle *lockh,
661                               int rc)
662 {
663         struct req_capsule  *pill = &req->rq_pill;
664         struct ldlm_request *lockreq;
665         struct ldlm_reply   *lockrep;
666         struct ldlm_lock    *lock;
667         struct mdt_body     *body = NULL;
668         void                *lvb_data = NULL;
669         __u32                lvb_len = 0;
670
671         ENTRY;
672
673         LASSERT(rc >= 0);
674         /* Similarly, if we're going to replay this request, we don't want to
675          * actually get a lock, just perform the intent. */
676         if (req->rq_transno || req->rq_replay) {
677                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
678                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
679         }
680
681         if (rc == ELDLM_LOCK_ABORTED) {
682                 einfo->ei_mode = 0;
683                 memset(lockh, 0, sizeof(*lockh));
684                 rc = 0;
685         } else { /* rc = 0 */
686                 lock = ldlm_handle2lock(lockh);
687                 LASSERT(lock != NULL);
688
689                 /* If the server gave us back a different lock mode, we should
690                  * fix up our variables. */
691                 if (lock->l_req_mode != einfo->ei_mode) {
692                         ldlm_lock_addref(lockh, lock->l_req_mode);
693                         ldlm_lock_decref(lockh, einfo->ei_mode);
694                         einfo->ei_mode = lock->l_req_mode;
695                 }
696                 LDLM_LOCK_PUT(lock);
697         }
698
699         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
700         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
701
702         it->it_disposition = (int)lockrep->lock_policy_res1;
703         it->it_status = (int)lockrep->lock_policy_res2;
704         it->it_lock_mode = einfo->ei_mode;
705         it->it_lock_handle = lockh->cookie;
706         it->it_request = req;
707
708         /* Technically speaking rq_transno must already be zero if
709          * it_status is in error, so the check is a bit redundant */
710         if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
711                 mdc_clear_replay_flag(req, it->it_status);
712
713         /* If we're doing an IT_OPEN which did not result in an actual
714          * successful open, then we need to remove the bit which saves
715          * this request for unconditional replay.
716          *
717          * It's important that we do this first!  Otherwise we might exit the
718          * function without doing so, and try to replay a failed create
719          * (bug 3440) */
720         if (it->it_op & IT_OPEN && req->rq_replay &&
721             (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
722                 mdc_clear_replay_flag(req, it->it_status);
723
724         DEBUG_REQ(D_RPCTRACE, req, "op=%x disposition=%x, status=%d",
725                   it->it_op, it->it_disposition, it->it_status);
726
727         /* We know what to expect, so we do any byte flipping required here */
728         if (it_has_reply_body(it)) {
729                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
730                 if (body == NULL) {
731                         CERROR ("Can't swab mdt_body\n");
732                         RETURN (-EPROTO);
733                 }
734
735                 if (it_disposition(it, DISP_OPEN_OPEN) &&
736                     !it_open_error(DISP_OPEN_OPEN, it)) {
737                         /*
738                          * If this is a successful OPEN request, we need to set
739                          * replay handler and data early, so that if replay
740                          * happens immediately after swabbing below, new reply
741                          * is swabbed by that handler correctly.
742                          */
743                         mdc_set_open_replay_data(NULL, NULL, it);
744                 }
745
746                 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
747                         void *eadata;
748
749                         mdc_update_max_ea_from_body(exp, body);
750
751                         /*
752                          * The eadata is opaque; just check that it is there.
753                          * Eventually, obd_unpackmd() will check the contents.
754                          */
755                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
756                                                         body->mbo_eadatasize);
757                         if (eadata == NULL)
758                                 RETURN(-EPROTO);
759
760                         /* save lvb data and length in case this is for layout
761                          * lock */
762                         lvb_data = eadata;
763                         lvb_len = body->mbo_eadatasize;
764
765                         /*
766                          * We save the reply LOV EA in case we have to replay a
767                          * create for recovery.  If we didn't allocate a large
768                          * enough request buffer above we need to reallocate it
769                          * here to hold the actual LOV EA.
770                          *
771                          * To not save LOV EA if request is not going to replay
772                          * (for example error one).
773                          */
774                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
775                                 rc = mdc_save_lovea(req, &RMF_EADATA, eadata,
776                                                     body->mbo_eadatasize);
777                                 if (rc) {
778                                         body->mbo_valid &= ~OBD_MD_FLEASIZE;
779                                         body->mbo_eadatasize = 0;
780                                         rc = 0;
781                                 }
782                         }
783                 }
784         } else if (it->it_op & IT_LAYOUT) {
785                 /* maybe the lock was granted right away and layout
786                  * is packed into RMF_DLM_LVB of req */
787                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
788                 CDEBUG(D_INFO, "%s: layout return lvb %d transno %lld\n",
789                        class_exp2obd(exp)->obd_name, lvb_len, req->rq_transno);
790                 if (lvb_len > 0) {
791                         lvb_data = req_capsule_server_sized_get(pill,
792                                                         &RMF_DLM_LVB, lvb_len);
793                         if (lvb_data == NULL)
794                                 RETURN(-EPROTO);
795
796                         /**
797                          * save replied layout data to the request buffer for
798                          * recovery consideration (lest MDS reinitialize
799                          * another set of OST objects).
800                          */
801                         if (req->rq_transno)
802                                 (void)mdc_save_lovea(req, &RMF_EADATA, lvb_data,
803                                                      lvb_len);
804                 }
805         }
806
807         /* fill in stripe data for layout lock.
808          * LU-6581: trust layout data only if layout lock is granted. The MDT
809          * has stopped sending layout unless the layout lock is granted. The
810          * client still does this checking in case it's talking with an old
811          * server. - Jinshan */
812         lock = ldlm_handle2lock(lockh);
813         if (lock == NULL)
814                 RETURN(rc);
815
816         if (ldlm_has_layout(lock) && lvb_data != NULL &&
817             !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
818                 void *lmm;
819
820                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
821                         ldlm_it2str(it->it_op), lvb_len);
822
823                 OBD_ALLOC_LARGE(lmm, lvb_len);
824                 if (lmm == NULL)
825                         GOTO(out_lock, rc = -ENOMEM);
826
827                 memcpy(lmm, lvb_data, lvb_len);
828
829                 /* install lvb_data */
830                 lock_res_and_lock(lock);
831                 if (lock->l_lvb_data == NULL) {
832                         lock->l_lvb_type = LVB_T_LAYOUT;
833                         lock->l_lvb_data = lmm;
834                         lock->l_lvb_len = lvb_len;
835                         lmm = NULL;
836                 }
837                 unlock_res_and_lock(lock);
838                 if (lmm != NULL)
839                         OBD_FREE_LARGE(lmm, lvb_len);
840         }
841
842         if (ldlm_has_dom(lock)) {
843                 LASSERT(lock->l_glimpse_ast == mdc_ldlm_glimpse_ast);
844
845                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
846                 if (!(body->mbo_valid & OBD_MD_DOM_SIZE)) {
847                         LDLM_ERROR(lock, "%s: DoM lock without size.",
848                                    exp->exp_obd->obd_name);
849                         GOTO(out_lock, rc = -EPROTO);
850                 }
851
852                 LDLM_DEBUG(lock, "DoM lock is returned by: %s, size: %llu",
853                            ldlm_it2str(it->it_op), body->mbo_dom_size);
854
855                 rc = mdc_fill_lvb(req, &lock->l_ost_lvb);
856         }
857 out_lock:
858         LDLM_LOCK_PUT(lock);
859
860         RETURN(rc);
861 }
862
863 /* We always reserve enough space in the reply packet for a stripe MD, because
864  * we don't know in advance the file type. */
865 static int mdc_enqueue_base(struct obd_export *exp,
866                             struct ldlm_enqueue_info *einfo,
867                             const union ldlm_policy_data *policy,
868                             struct lookup_intent *it,
869                             struct md_op_data *op_data,
870                             struct lustre_handle *lockh,
871                             __u64 extra_lock_flags)
872 {
873         struct obd_device *obddev = class_exp2obd(exp);
874         struct ptlrpc_request *req = NULL;
875         __u64 flags, saved_flags = extra_lock_flags;
876         struct ldlm_res_id res_id;
877         static const union ldlm_policy_data lookup_policy = {
878                                   .l_inodebits = { MDS_INODELOCK_LOOKUP } };
879         static const union ldlm_policy_data update_policy = {
880                                   .l_inodebits = { MDS_INODELOCK_UPDATE } };
881         static const union ldlm_policy_data layout_policy = {
882                                   .l_inodebits = { MDS_INODELOCK_LAYOUT } };
883         static const union ldlm_policy_data getxattr_policy = {
884                                   .l_inodebits = { MDS_INODELOCK_XATTR } };
885         int generation, resends = 0;
886         struct ldlm_reply *lockrep;
887         struct obd_import *imp = class_exp2cliimp(exp);
888         __u32 acl_bufsize;
889         enum lvb_type lvb_type = 0;
890         int rc;
891         ENTRY;
892
893         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
894                  einfo->ei_type);
895         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
896
897         if (it != NULL) {
898                 LASSERT(policy == NULL);
899
900                 saved_flags |= LDLM_FL_HAS_INTENT;
901                 if (it->it_op & (IT_GETATTR | IT_READDIR))
902                         policy = &update_policy;
903                 else if (it->it_op & IT_LAYOUT)
904                         policy = &layout_policy;
905                 else if (it->it_op & IT_GETXATTR)
906                         policy = &getxattr_policy;
907                 else
908                         policy = &lookup_policy;
909         }
910
911         generation = obddev->u.cli.cl_import->imp_generation;
912         if (!it || (it->it_op & (IT_OPEN | IT_CREAT)))
913                 acl_bufsize = MIN(imp->imp_connect_data.ocd_max_easize,
914                                   XATTR_SIZE_MAX);
915         else
916                 acl_bufsize = LUSTRE_POSIX_ACL_MAX_SIZE_OLD;
917
918 resend:
919         flags = saved_flags;
920         if (it == NULL) {
921                 /* The only way right now is FLOCK. */
922                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
923                          einfo->ei_type);
924                 res_id.name[3] = LDLM_FLOCK;
925         } else if (it->it_op & IT_OPEN) {
926                 req = mdc_intent_open_pack(exp, it, op_data, acl_bufsize);
927         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
928                 req = mdc_intent_getattr_pack(exp, it, op_data, acl_bufsize);
929         } else if (it->it_op & IT_READDIR) {
930                 req = mdc_enqueue_pack(exp, 0);
931         } else if (it->it_op & IT_LAYOUT) {
932                 if (!imp_connect_lvb_type(imp))
933                         RETURN(-EOPNOTSUPP);
934                 req = mdc_intent_layout_pack(exp, it, op_data);
935                 lvb_type = LVB_T_LAYOUT;
936         } else if (it->it_op & IT_GETXATTR) {
937                 req = mdc_intent_getxattr_pack(exp, it, op_data);
938         } else {
939                 LBUG();
940                 RETURN(-EINVAL);
941         }
942
943         if (IS_ERR(req))
944                 RETURN(PTR_ERR(req));
945
946         if (resends) {
947                 req->rq_generation_set = 1;
948                 req->rq_import_generation = generation;
949                 req->rq_sent = ktime_get_real_seconds() + resends;
950         }
951
952         /* It is important to obtain modify RPC slot first (if applicable), so
953          * that threads that are waiting for a modify RPC slot are not polluting
954          * our rpcs in flight counter.
955          * We do not do flock request limiting, though */
956         if (it) {
957                 mdc_get_mod_rpc_slot(req, it);
958                 rc = obd_get_request_slot(&obddev->u.cli);
959                 if (rc != 0) {
960                         mdc_put_mod_rpc_slot(req, it);
961                         mdc_clear_replay_flag(req, 0);
962                         ptlrpc_req_finished(req);
963                         RETURN(rc);
964                 }
965         }
966
967         /* With Data-on-MDT the glimpse callback is needed too.
968          * It is set here in advance but not in mdc_finish_enqueue()
969          * to avoid possible races. It is safe to have glimpse handler
970          * for non-DOM locks and costs nothing.*/
971         if (einfo->ei_cb_gl == NULL)
972                 einfo->ei_cb_gl = mdc_ldlm_glimpse_ast;
973
974         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
975                               0, lvb_type, lockh, 0);
976         if (!it) {
977                 /* For flock requests we immediatelly return without further
978                    delay and let caller deal with the rest, since rest of
979                    this function metadata processing makes no sense for flock
980                    requests anyway. But in case of problem during comms with
981                    Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
982                    can not rely on caller and this mainly for F_UNLCKs
983                    (explicits or automatically generated by Kernel to clean
984                    current FLocks upon exit) that can't be trashed */
985                 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
986                     (einfo->ei_type == LDLM_FLOCK) &&
987                     (einfo->ei_mode == LCK_NL))
988                         goto resend;
989                 RETURN(rc);
990         }
991
992         obd_put_request_slot(&obddev->u.cli);
993         mdc_put_mod_rpc_slot(req, it);
994
995         if (rc < 0) {
996                 CDEBUG(D_INFO,
997                       "%s: ldlm_cli_enqueue "DFID":"DFID"=%s failed: rc = %d\n",
998                       obddev->obd_name, PFID(&op_data->op_fid1),
999                       PFID(&op_data->op_fid2), op_data->op_name ?: "", rc);
1000
1001                 mdc_clear_replay_flag(req, rc);
1002                 ptlrpc_req_finished(req);
1003                 RETURN(rc);
1004         }
1005
1006         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1007         LASSERT(lockrep != NULL);
1008
1009         lockrep->lock_policy_res2 =
1010                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1011
1012         /* Retry infinitely when the server returns -EINPROGRESS for the
1013          * intent operation, when server returns -EINPROGRESS for acquiring
1014          * intent lock, we'll retry in after_reply(). */
1015         if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
1016                 mdc_clear_replay_flag(req, rc);
1017                 ptlrpc_req_finished(req);
1018                 if (generation == obddev->u.cli.cl_import->imp_generation) {
1019                         if (signal_pending(current))
1020                                 RETURN(-EINTR);
1021
1022                         resends++;
1023                         CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
1024                                obddev->obd_name, resends, it->it_op,
1025                                PFID(&op_data->op_fid1),
1026                                PFID(&op_data->op_fid2));
1027                         goto resend;
1028                 } else {
1029                         CDEBUG(D_HA, "resend cross eviction\n");
1030                         RETURN(-EIO);
1031                 }
1032         }
1033
1034         if ((int)lockrep->lock_policy_res2 == -ERANGE &&
1035             it->it_op & (IT_OPEN | IT_GETATTR | IT_LOOKUP) &&
1036             acl_bufsize == LUSTRE_POSIX_ACL_MAX_SIZE_OLD) {
1037                 mdc_clear_replay_flag(req, -ERANGE);
1038                 ptlrpc_req_finished(req);
1039                 acl_bufsize = MIN(imp->imp_connect_data.ocd_max_easize,
1040                                   XATTR_SIZE_MAX);
1041                 goto resend;
1042         }
1043
1044         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1045         if (rc < 0) {
1046                 if (lustre_handle_is_used(lockh)) {
1047                         ldlm_lock_decref(lockh, einfo->ei_mode);
1048                         memset(lockh, 0, sizeof(*lockh));
1049                 }
1050                 ptlrpc_req_finished(req);
1051
1052                 it->it_lock_handle = 0;
1053                 it->it_lock_mode = 0;
1054                 it->it_request = NULL;
1055         }
1056
1057         RETURN(rc);
1058 }
1059
1060 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
1061                 const union ldlm_policy_data *policy,
1062                 struct md_op_data *op_data,
1063                 struct lustre_handle *lockh, __u64 extra_lock_flags)
1064 {
1065         return mdc_enqueue_base(exp, einfo, policy, NULL,
1066                                 op_data, lockh, extra_lock_flags);
1067 }
1068
1069 static int mdc_finish_intent_lock(struct obd_export *exp,
1070                                   struct ptlrpc_request *request,
1071                                   struct md_op_data *op_data,
1072                                   struct lookup_intent *it,
1073                                   struct lustre_handle *lockh)
1074 {
1075         struct lustre_handle old_lock;
1076         struct ldlm_lock *lock;
1077         int rc = 0;
1078         ENTRY;
1079
1080         LASSERT(request != NULL);
1081         LASSERT(request != LP_POISON);
1082         LASSERT(request->rq_repmsg != LP_POISON);
1083
1084         if (it->it_op & IT_READDIR)
1085                 RETURN(0);
1086
1087         if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) {
1088                 if (it->it_status != 0)
1089                         GOTO(out, rc = it->it_status);
1090         } else {
1091                 if (!it_disposition(it, DISP_IT_EXECD)) {
1092                         /* The server failed before it even started executing
1093                          * the intent, i.e. because it couldn't unpack the
1094                          * request.
1095                          */
1096                         LASSERT(it->it_status != 0);
1097                         GOTO(out, rc = it->it_status);
1098                 }
1099                 rc = it_open_error(DISP_IT_EXECD, it);
1100                 if (rc)
1101                         GOTO(out, rc);
1102
1103                 rc = it_open_error(DISP_LOOKUP_EXECD, it);
1104                 if (rc)
1105                         GOTO(out, rc);
1106
1107                 /* keep requests around for the multiple phases of the call
1108                  * this shows the DISP_XX must guarantee we make it into the
1109                  * call
1110                  */
1111                 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
1112                     it_disposition(it, DISP_OPEN_CREATE) &&
1113                     !it_open_error(DISP_OPEN_CREATE, it)) {
1114                         it_set_disposition(it, DISP_ENQ_CREATE_REF);
1115                         /* balanced in ll_create_node */
1116                         ptlrpc_request_addref(request);
1117                 }
1118                 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1119                     it_disposition(it, DISP_OPEN_OPEN) &&
1120                     !it_open_error(DISP_OPEN_OPEN, it)) {
1121                         it_set_disposition(it, DISP_ENQ_OPEN_REF);
1122                         /* balanced in ll_file_open */
1123                         ptlrpc_request_addref(request);
1124                         /* BUG 11546 - eviction in the middle of open rpc
1125                          * processing
1126                          */
1127                         OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE,
1128                                          obd_timeout);
1129                 }
1130
1131                 if (it->it_op & IT_CREAT) {
1132                         /* XXX this belongs in ll_create_it */
1133                 } else if (it->it_op == IT_OPEN) {
1134                         LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1135                 } else {
1136                         LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
1137                 }
1138         }
1139
1140         /* If we already have a matching lock, then cancel the new
1141          * one.  We have to set the data here instead of in
1142          * mdc_enqueue, because we need to use the child's inode as
1143          * the l_ast_data to match, and that's not available until
1144          * intent_finish has performed the iget().) */
1145         lock = ldlm_handle2lock(lockh);
1146         if (lock) {
1147                 union ldlm_policy_data policy = lock->l_policy_data;
1148                 LDLM_DEBUG(lock, "matching against this");
1149
1150                 if (it_has_reply_body(it)) {
1151                         struct mdt_body *body;
1152
1153                         body = req_capsule_server_get(&request->rq_pill,
1154                                                       &RMF_MDT_BODY);
1155                         /* mdc_enqueue checked */
1156                         LASSERT(body != NULL);
1157                         LASSERTF(fid_res_name_eq(&body->mbo_fid1,
1158                                                  &lock->l_resource->lr_name),
1159                                  "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1160                                  PLDLMRES(lock->l_resource),
1161                                  PFID(&body->mbo_fid1));
1162                 }
1163                 LDLM_LOCK_PUT(lock);
1164
1165                 memcpy(&old_lock, lockh, sizeof(*lockh));
1166                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1167                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1168                         ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
1169                         memcpy(lockh, &old_lock, sizeof(old_lock));
1170                         it->it_lock_handle = lockh->cookie;
1171                 }
1172         }
1173
1174         EXIT;
1175 out:
1176         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1177                 (int)op_data->op_namelen, op_data->op_name,
1178                 ldlm_it2str(it->it_op), it->it_status,
1179                 it->it_disposition, rc);
1180         return rc;
1181 }
1182
1183 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1184                         struct lu_fid *fid, __u64 *bits)
1185 {
1186         /* We could just return 1 immediately, but since we should only
1187          * be called in revalidate_it if we already have a lock, let's
1188          * verify that. */
1189         struct ldlm_res_id res_id;
1190         struct lustre_handle lockh;
1191         union ldlm_policy_data policy;
1192         enum ldlm_mode mode;
1193         ENTRY;
1194
1195         if (it->it_lock_handle) {
1196                 lockh.cookie = it->it_lock_handle;
1197                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1198         } else {
1199                 fid_build_reg_res_name(fid, &res_id);
1200                 switch (it->it_op) {
1201                 case IT_GETATTR:
1202                         /* File attributes are held under multiple bits:
1203                          * nlink is under lookup lock, size and times are
1204                          * under UPDATE lock and recently we've also got
1205                          * a separate permissions lock for owner/group/acl that
1206                          * were protected by lookup lock before.
1207                          * Getattr must provide all of that information,
1208                          * so we need to ensure we have all of those locks.
1209                          * Unfortunately, if the bits are split across multiple
1210                          * locks, there's no easy way to match all of them here,
1211                          * so an extra RPC would be performed to fetch all
1212                          * of those bits at once for now. */
1213                         /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1214                          * but for old MDTs (< 2.4), permission is covered
1215                          * by LOOKUP lock, so it needs to match all bits here.*/
1216                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1217                                                   MDS_INODELOCK_LOOKUP |
1218                                                   MDS_INODELOCK_PERM;
1219                         break;
1220                 case IT_READDIR:
1221                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1222                         break;
1223                 case IT_LAYOUT:
1224                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1225                         break;
1226                 default:
1227                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1228                         break;
1229                 }
1230
1231                 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1232                                       LDLM_IBITS, &policy,
1233                                       LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1234                                       &lockh);
1235         }
1236
1237         if (mode) {
1238                 it->it_lock_handle = lockh.cookie;
1239                 it->it_lock_mode = mode;
1240         } else {
1241                 it->it_lock_handle = 0;
1242                 it->it_lock_mode = 0;
1243         }
1244
1245         RETURN(!!mode);
1246 }
1247
1248 /*
1249  * This long block is all about fixing up the lock and request state
1250  * so that it is correct as of the moment _before_ the operation was
1251  * applied; that way, the VFS will think that everything is normal and
1252  * call Lustre's regular VFS methods.
1253  *
1254  * If we're performing a creation, that means that unless the creation
1255  * failed with EEXIST, we should fake up a negative dentry.
1256  *
1257  * For everything else, we want to lookup to succeed.
1258  *
1259  * One additional note: if CREATE or OPEN succeeded, we add an extra
1260  * reference to the request because we need to keep it around until
1261  * ll_create/ll_open gets called.
1262  *
1263  * The server will return to us, in it_disposition, an indication of
1264  * exactly what it_status refers to.
1265  *
1266  * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1267  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1268  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1269  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1270  * was successful.
1271  *
1272  * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1273  * child lookup.
1274  */
1275 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1276                     struct lookup_intent *it, struct ptlrpc_request **reqp,
1277                     ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1278 {
1279         struct ldlm_enqueue_info einfo = {
1280                 .ei_type        = LDLM_IBITS,
1281                 .ei_mode        = it_to_lock_mode(it),
1282                 .ei_cb_bl       = cb_blocking,
1283                 .ei_cb_cp       = ldlm_completion_ast,
1284                 .ei_cb_gl       = mdc_ldlm_glimpse_ast,
1285         };
1286         struct lustre_handle lockh;
1287         int rc = 0;
1288         ENTRY;
1289         LASSERT(it);
1290
1291         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1292                 ", intent: %s flags %#llo\n", (int)op_data->op_namelen,
1293                 op_data->op_name, PFID(&op_data->op_fid2),
1294                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1295                 it->it_flags);
1296
1297         lockh.cookie = 0;
1298         if (fid_is_sane(&op_data->op_fid2) &&
1299             (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1300                 /* We could just return 1 immediately, but since we should only
1301                  * be called in revalidate_it if we already have a lock, let's
1302                  * verify that. */
1303                 it->it_lock_handle = 0;
1304                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1305                 /* Only return failure if it was not GETATTR by cfid
1306                    (from inode_revalidate) */
1307                 if (rc || op_data->op_namelen != 0)
1308                         RETURN(rc);
1309         }
1310
1311         /* For case if upper layer did not alloc fid, do it now. */
1312         if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1313                 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1314                 if (rc < 0) {
1315                         CERROR("Can't alloc new fid, rc %d\n", rc);
1316                         RETURN(rc);
1317                 }
1318         }
1319
1320         rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh,
1321                               extra_lock_flags);
1322         if (rc < 0)
1323                 RETURN(rc);
1324
1325         *reqp = it->it_request;
1326         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1327         RETURN(rc);
1328 }
1329
1330 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1331                                               struct ptlrpc_request *req,
1332                                               void *args, int rc)
1333 {
1334         struct mdc_getattr_args  *ga = args;
1335         struct obd_export *exp = ga->ga_exp;
1336         struct md_enqueue_info *minfo = ga->ga_minfo;
1337         struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1338         struct lookup_intent *it;
1339         struct lustre_handle *lockh;
1340         struct obd_device *obddev;
1341         struct ldlm_reply *lockrep;
1342         __u64 flags = LDLM_FL_HAS_INTENT;
1343         ENTRY;
1344
1345         it    = &minfo->mi_it;
1346         lockh = &minfo->mi_lockh;
1347
1348         obddev = class_exp2obd(exp);
1349
1350         obd_put_request_slot(&obddev->u.cli);
1351         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1352                 rc = -ETIMEDOUT;
1353
1354         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1355                                    &flags, NULL, 0, lockh, rc);
1356         if (rc < 0) {
1357                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1358                 mdc_clear_replay_flag(req, rc);
1359                 GOTO(out, rc);
1360         }
1361
1362         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1363         LASSERT(lockrep != NULL);
1364
1365         lockrep->lock_policy_res2 =
1366                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1367
1368         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1369         if (rc)
1370                 GOTO(out, rc);
1371
1372         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1373         EXIT;
1374
1375 out:
1376         minfo->mi_cb(req, minfo, rc);
1377         return 0;
1378 }
1379
1380 int mdc_intent_getattr_async(struct obd_export *exp,
1381                              struct md_enqueue_info *minfo)
1382 {
1383         struct md_op_data       *op_data = &minfo->mi_data;
1384         struct lookup_intent    *it = &minfo->mi_it;
1385         struct ptlrpc_request   *req;
1386         struct mdc_getattr_args *ga;
1387         struct obd_device       *obddev = class_exp2obd(exp);
1388         struct ldlm_res_id       res_id;
1389         union ldlm_policy_data policy = {
1390                                 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1391                                                  MDS_INODELOCK_UPDATE } };
1392         int                      rc = 0;
1393         __u64                    flags = LDLM_FL_HAS_INTENT;
1394         ENTRY;
1395
1396         CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#llo\n",
1397                 (int)op_data->op_namelen, op_data->op_name,
1398                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1399
1400         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1401         /* If the MDT return -ERANGE because of large ACL, then the sponsor
1402          * of the async getattr RPC will handle that by itself. */
1403         req = mdc_intent_getattr_pack(exp, it, op_data,
1404                                       LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
1405         if (IS_ERR(req))
1406                 RETURN(PTR_ERR(req));
1407
1408         rc = obd_get_request_slot(&obddev->u.cli);
1409         if (rc != 0) {
1410                 ptlrpc_req_finished(req);
1411                 RETURN(rc);
1412         }
1413
1414         /* With Data-on-MDT the glimpse callback is needed too.
1415          * It is set here in advance but not in mdc_finish_enqueue()
1416          * to avoid possible races. It is safe to have glimpse handler
1417          * for non-DOM locks and costs nothing.*/
1418         if (minfo->mi_einfo.ei_cb_gl == NULL)
1419                 minfo->mi_einfo.ei_cb_gl = mdc_ldlm_glimpse_ast;
1420
1421         rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1422                               &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1423         if (rc < 0) {
1424                 obd_put_request_slot(&obddev->u.cli);
1425                 ptlrpc_req_finished(req);
1426                 RETURN(rc);
1427         }
1428
1429         ga = ptlrpc_req_async_args(ga, req);
1430         ga->ga_exp = exp;
1431         ga->ga_minfo = minfo;
1432
1433         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1434         ptlrpcd_add_req(req);
1435
1436         RETURN(0);
1437 }