Whamcloud - gitweb
LU-10070 ldlm: layout lock fixes
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_MDC
34
35 #include <linux/module.h>
36
37 #include <obd.h>
38 #include <obd_class.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_intent.h>
42 #include <lustre_mdc.h>
43 #include <lustre_net.h>
44 #include <lustre_req_layout.h>
45 #include <lustre_swab.h>
46 #include <lustre_acl.h>
47
48 #include "mdc_internal.h"
49
50 struct mdc_getattr_args {
51         struct obd_export               *ga_exp;
52         struct md_enqueue_info          *ga_minfo;
53 };
54
55 int it_open_error(int phase, struct lookup_intent *it)
56 {
57         if (it_disposition(it, DISP_OPEN_LEASE)) {
58                 if (phase >= DISP_OPEN_LEASE)
59                         return it->it_status;
60                 else
61                         return 0;
62         }
63         if (it_disposition(it, DISP_OPEN_OPEN)) {
64                 if (phase >= DISP_OPEN_OPEN)
65                         return it->it_status;
66                 else
67                         return 0;
68         }
69
70         if (it_disposition(it, DISP_OPEN_CREATE)) {
71                 if (phase >= DISP_OPEN_CREATE)
72                         return it->it_status;
73                 else
74                         return 0;
75         }
76
77         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
78                 if (phase >= DISP_LOOKUP_EXECD)
79                         return it->it_status;
80                 else
81                         return 0;
82         }
83
84         if (it_disposition(it, DISP_IT_EXECD)) {
85                 if (phase >= DISP_IT_EXECD)
86                         return it->it_status;
87                 else
88                         return 0;
89         }
90
91         CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
92         LBUG();
93
94         return 0;
95 }
96 EXPORT_SYMBOL(it_open_error);
97
98 /* this must be called on a lockh that is known to have a referenced lock */
99 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
100                       void *data, __u64 *bits)
101 {
102         struct ldlm_lock *lock;
103         struct inode *new_inode = data;
104         ENTRY;
105
106         if(bits)
107                 *bits = 0;
108
109         if (!lustre_handle_is_used(lockh))
110                 RETURN(0);
111
112         lock = ldlm_handle2lock(lockh);
113
114         LASSERT(lock != NULL);
115         lock_res_and_lock(lock);
116         if (lock->l_resource->lr_lvb_inode &&
117             lock->l_resource->lr_lvb_inode != data) {
118                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
119                 LASSERTF(old_inode->i_state & I_FREEING,
120                          "Found existing inode %p/%lu/%u state %lu in lock: "
121                          "setting data to %p/%lu/%u\n", old_inode,
122                          old_inode->i_ino, old_inode->i_generation,
123                          old_inode->i_state,
124                          new_inode, new_inode->i_ino, new_inode->i_generation);
125         }
126         lock->l_resource->lr_lvb_inode = new_inode;
127         if (bits)
128                 *bits = lock->l_policy_data.l_inodebits.bits;
129
130         unlock_res_and_lock(lock);
131         LDLM_LOCK_PUT(lock);
132
133         RETURN(0);
134 }
135
136 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
137                               const struct lu_fid *fid, enum ldlm_type type,
138                               union ldlm_policy_data *policy,
139                               enum ldlm_mode mode, struct lustre_handle *lockh)
140 {
141         struct ldlm_res_id res_id;
142         enum ldlm_mode rc;
143         ENTRY;
144
145         fid_build_reg_res_name(fid, &res_id);
146         /* LU-4405: Clear bits not supported by server */
147         policy->l_inodebits.bits &= exp_connect_ibits(exp);
148         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
149                              &res_id, type, policy, mode, lockh, 0);
150         RETURN(rc);
151 }
152
153 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
154                       union ldlm_policy_data *policy, enum ldlm_mode mode,
155                       enum ldlm_cancel_flags flags, void *opaque)
156 {
157         struct obd_device *obd = class_exp2obd(exp);
158         struct ldlm_res_id res_id;
159         int rc;
160
161         ENTRY;
162
163         fid_build_reg_res_name(fid, &res_id);
164         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
165                                              policy, mode, flags, opaque);
166         RETURN(rc);
167 }
168
169 int mdc_null_inode(struct obd_export *exp,
170                    const struct lu_fid *fid)
171 {
172         struct ldlm_res_id res_id;
173         struct ldlm_resource *res;
174         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
175         ENTRY;
176
177         LASSERTF(ns != NULL, "no namespace passed\n");
178
179         fid_build_reg_res_name(fid, &res_id);
180
181         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
182         if (IS_ERR(res))
183                 RETURN(0);
184
185         lock_res(res);
186         res->lr_lvb_inode = NULL;
187         unlock_res(res);
188
189         ldlm_resource_putref(res);
190         RETURN(0);
191 }
192
193 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
194 {
195         /* Don't hold error requests for replay. */
196         if (req->rq_replay) {
197                 spin_lock(&req->rq_lock);
198                 req->rq_replay = 0;
199                 spin_unlock(&req->rq_lock);
200         }
201         if (rc && req->rq_transno != 0) {
202                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
203                 LBUG();
204         }
205 }
206
207 /* Save a large LOV EA into the request buffer so that it is available
208  * for replay.  We don't do this in the initial request because the
209  * original request doesn't need this buffer (at most it sends just the
210  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
211  * buffer and may also be difficult to allocate and save a very large
212  * request buffer for each open. (bug 5707)
213  *
214  * OOM here may cause recovery failure if lmm is needed (only for the
215  * original open if the MDS crashed just when this client also OOM'd)
216  * but this is incredibly unlikely, and questionable whether the client
217  * could do MDS recovery under OOM anyways... */
218 int mdc_save_lovea(struct ptlrpc_request *req,
219                    const struct req_msg_field *field,
220                    void *data, u32 size)
221 {
222         struct req_capsule *pill = &req->rq_pill;
223         struct lov_user_md *lmm;
224         int rc = 0;
225
226         if (req_capsule_get_size(pill, field, RCL_CLIENT) < size) {
227                 rc = sptlrpc_cli_enlarge_reqbuf(req, field, size);
228                 if (rc) {
229                         CERROR("%s: Can't enlarge ea size to %d: rc = %d\n",
230                                req->rq_export->exp_obd->obd_name,
231                                size, rc);
232                         return rc;
233                 }
234         } else {
235                 req_capsule_shrink(pill, field, size, RCL_CLIENT);
236         }
237
238         req_capsule_set_size(pill, field, RCL_CLIENT, size);
239         lmm = req_capsule_client_get(pill, field);
240         if (lmm) {
241                 memcpy(lmm, data, size);
242                 /* overwrite layout generation returned from the MDS */
243                 lmm->lmm_stripe_offset =
244                   (typeof(lmm->lmm_stripe_offset))LOV_OFFSET_DEFAULT;
245         }
246
247         return rc;
248 }
249
250 static struct ptlrpc_request *
251 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
252                      struct md_op_data *op_data, __u32 acl_bufsize)
253 {
254         struct ptlrpc_request   *req;
255         struct obd_device       *obddev = class_exp2obd(exp);
256         struct ldlm_intent      *lit;
257         const void              *lmm = op_data->op_data;
258         __u32                    lmmsize = op_data->op_data_size;
259         __u32                    mdt_md_capsule_size;
260         struct list_head         cancels = LIST_HEAD_INIT(cancels);
261         int                      count = 0;
262         enum ldlm_mode           mode;
263         int                      rc;
264         int repsize, repsize_estimate;
265
266         ENTRY;
267
268         mdt_md_capsule_size = obddev->u.cli.cl_default_mds_easize;
269
270         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
271
272         /* XXX: openlock is not cancelled for cross-refs. */
273         /* If inode is known, cancel conflicting OPEN locks. */
274         if (fid_is_sane(&op_data->op_fid2)) {
275                 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
276                         if (it->it_flags & MDS_FMODE_WRITE)
277                                 mode = LCK_EX;
278                         else
279                                 mode = LCK_PR;
280                 } else {
281                         if (it->it_flags & (MDS_FMODE_WRITE | MDS_OPEN_TRUNC))
282                                 mode = LCK_CW;
283 #ifdef FMODE_EXEC
284                         else if (it->it_flags & FMODE_EXEC)
285                                 mode = LCK_PR;
286 #endif
287                         else
288                                 mode = LCK_CR;
289                 }
290                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
291                                                 &cancels, mode,
292                                                 MDS_INODELOCK_OPEN);
293         }
294
295         /* If CREATE, cancel parent's UPDATE lock. */
296         if (it->it_op & IT_CREAT)
297                 mode = LCK_EX;
298         else
299                 mode = LCK_CR;
300         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
301                                          &cancels, mode,
302                                          MDS_INODELOCK_UPDATE);
303
304         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
305                                    &RQF_LDLM_INTENT_OPEN);
306         if (req == NULL) {
307                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
308                 RETURN(ERR_PTR(-ENOMEM));
309         }
310
311         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
312                              op_data->op_namelen + 1);
313         if (cl_is_lov_delay_create(it->it_flags)) {
314                 /* open(O_LOV_DELAY_CREATE) won't pack lmm */
315                 LASSERT(lmmsize == 0);
316                 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
317         } else {
318                 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
319                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
320         }
321
322         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
323                              RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
324                              op_data->op_file_secctx_name_size : 0);
325
326         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
327                              op_data->op_file_secctx_size);
328
329         /* get SELinux policy info if any */
330         rc = sptlrpc_get_sepol(req);
331         if (rc < 0) {
332                 ptlrpc_request_free(req);
333                 RETURN(ERR_PTR(rc));
334         }
335         req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
336                              strlen(req->rq_sepol) ?
337                              strlen(req->rq_sepol) + 1 : 0);
338
339         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
340         if (rc < 0) {
341                 ptlrpc_request_free(req);
342                 RETURN(ERR_PTR(rc));
343         }
344
345         spin_lock(&req->rq_lock);
346         req->rq_replay = req->rq_import->imp_replayable;
347         spin_unlock(&req->rq_lock);
348
349         /* pack the intent */
350         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
351         lit->opc = (__u64)it->it_op;
352
353         /* pack the intended request */
354         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
355                       lmmsize);
356
357         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
358                              mdt_md_capsule_size);
359         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
360
361         if (!(it->it_op & IT_CREAT) && it->it_op & IT_OPEN &&
362             req_capsule_has_field(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
363                                   RCL_CLIENT) &&
364             op_data->op_file_secctx_name_size > 0 &&
365             op_data->op_file_secctx_name != NULL) {
366                 char *secctx_name;
367
368                 secctx_name = req_capsule_client_get(&req->rq_pill,
369                                                      &RMF_FILE_SECCTX_NAME);
370                 memcpy(secctx_name, op_data->op_file_secctx_name,
371                        op_data->op_file_secctx_name_size);
372                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
373                                      RCL_SERVER,
374                                      obddev->u.cli.cl_max_mds_easize);
375
376                 CDEBUG(D_SEC, "packed '%.*s' as security xattr name\n",
377                        op_data->op_file_secctx_name_size,
378                        op_data->op_file_secctx_name);
379
380         } else {
381                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
382                                      RCL_SERVER, 0);
383         }
384
385         /**
386          * Inline buffer for possible data from Data-on-MDT files.
387          */
388         req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE, RCL_SERVER,
389                              sizeof(struct niobuf_remote));
390         ptlrpc_request_set_replen(req);
391
392         /* Get real repbuf allocated size as rounded up power of 2 */
393         repsize = size_roundup_power2(req->rq_replen +
394                                       lustre_msg_early_size());
395         /* Estimate free space for DoM files in repbuf */
396         repsize_estimate = repsize - (req->rq_replen -
397                            mdt_md_capsule_size +
398                            sizeof(struct lov_comp_md_v1) +
399                            sizeof(struct lov_comp_md_entry_v1) +
400                            lov_mds_md_size(0, LOV_MAGIC_V3));
401
402         if (repsize_estimate < obddev->u.cli.cl_dom_min_inline_repsize) {
403                 repsize = obddev->u.cli.cl_dom_min_inline_repsize -
404                           repsize_estimate + sizeof(struct niobuf_remote);
405                 req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE,
406                                      RCL_SERVER,
407                                      sizeof(struct niobuf_remote) + repsize);
408                 ptlrpc_request_set_replen(req);
409                 CDEBUG(D_INFO, "Increase repbuf by %d bytes, total: %d\n",
410                        repsize, req->rq_replen);
411                 repsize = size_roundup_power2(req->rq_replen +
412                                               lustre_msg_early_size());
413         }
414         /* The only way to report real allocated repbuf size to the server
415          * is the lm_repsize but it must be set prior buffer allocation itself
416          * due to security reasons - it is part of buffer used in signature
417          * calculation (see LU-11414). Therefore the saved size is predicted
418          * value as rq_replen rounded to the next higher power of 2.
419          * Such estimation is safe. Though the final allocated buffer might
420          * be even larger, it is not possible to know that at this point.
421          */
422         req->rq_reqmsg->lm_repsize = repsize;
423         RETURN(req);
424 }
425
426 #define GA_DEFAULT_EA_NAME_LEN 20
427 #define GA_DEFAULT_EA_VAL_LEN  250
428 #define GA_DEFAULT_EA_NUM      10
429
430 static struct ptlrpc_request *
431 mdc_intent_getxattr_pack(struct obd_export *exp,
432                          struct lookup_intent *it,
433                          struct md_op_data *op_data)
434 {
435         struct ptlrpc_request   *req;
436         struct ldlm_intent      *lit;
437         int                     rc, count = 0;
438         struct list_head        cancels = LIST_HEAD_INIT(cancels);
439         u32 ea_vals_buf_size = GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM;
440
441         ENTRY;
442
443         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
444                                         &RQF_LDLM_INTENT_GETXATTR);
445         if (req == NULL)
446                 RETURN(ERR_PTR(-ENOMEM));
447
448         /* get SELinux policy info if any */
449         rc = sptlrpc_get_sepol(req);
450         if (rc < 0) {
451                 ptlrpc_request_free(req);
452                 RETURN(ERR_PTR(rc));
453         }
454         req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
455                              strlen(req->rq_sepol) ?
456                              strlen(req->rq_sepol) + 1 : 0);
457
458         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
459         if (rc) {
460                 ptlrpc_request_free(req);
461                 RETURN(ERR_PTR(rc));
462         }
463
464         /* pack the intent */
465         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
466         lit->opc = IT_GETXATTR;
467         CDEBUG(D_INFO, "%s: get xattrs for "DFID"\n",
468                exp->exp_obd->obd_name, PFID(&op_data->op_fid1));
469
470 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 0, 53, 0)
471         /* If the supplied buffer is too small then the server will
472          * return -ERANGE and llite will fallback to using non cached
473          * xattr operations. On servers before 2.10.1 a (non-cached)
474          * listxattr RPC for an orphan or dead file causes an oops. So
475          * let's try to avoid sending too small a buffer to too old a
476          * server. This is effectively undoing the memory conservation
477          * of LU-9417 when it would be *more* likely to crash the
478          * server. See LU-9856. */
479         if (exp->exp_connect_data.ocd_version < OBD_OCD_VERSION(2, 10, 1, 0))
480                 ea_vals_buf_size = max_t(u32, ea_vals_buf_size,
481                                          exp->exp_connect_data.ocd_max_easize);
482 #endif
483
484         /* pack the intended request */
485         mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid,
486                       ea_vals_buf_size, -1, 0);
487
488         /* get SELinux policy info if any */
489         mdc_file_sepol_pack(req);
490
491         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
492                              GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM);
493
494         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER,
495                              ea_vals_buf_size);
496
497         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS, RCL_SERVER,
498                              sizeof(u32) * GA_DEFAULT_EA_NUM);
499
500         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
501
502         ptlrpc_request_set_replen(req);
503
504         RETURN(req);
505 }
506
507 static struct ptlrpc_request *
508 mdc_intent_getattr_pack(struct obd_export *exp, struct lookup_intent *it,
509                         struct md_op_data *op_data, __u32 acl_bufsize)
510 {
511         struct ptlrpc_request *req;
512         struct obd_device *obddev = class_exp2obd(exp);
513         u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE |
514                     OBD_MD_FLDIREA | OBD_MD_MEA | OBD_MD_FLACL |
515                     OBD_MD_DEFAULT_MEA;
516         struct ldlm_intent *lit;
517         __u32 easize;
518         bool have_secctx = false;
519         int rc;
520
521         ENTRY;
522
523         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
524                                    &RQF_LDLM_INTENT_GETATTR);
525         if (req == NULL)
526                 RETURN(ERR_PTR(-ENOMEM));
527
528         /* send name of security xattr to get upon intent */
529         if (it->it_op & (IT_LOOKUP | IT_GETATTR) &&
530             req_capsule_has_field(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
531                                   RCL_CLIENT) &&
532             op_data->op_file_secctx_name_size > 0 &&
533             op_data->op_file_secctx_name != NULL) {
534                 have_secctx = true;
535                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
536                                      RCL_CLIENT,
537                                      op_data->op_file_secctx_name_size);
538         }
539
540         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
541                              op_data->op_namelen + 1);
542
543         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
544         if (rc) {
545                 ptlrpc_request_free(req);
546                 RETURN(ERR_PTR(rc));
547         }
548
549         /* pack the intent */
550         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
551         lit->opc = (__u64)it->it_op;
552
553         easize = obddev->u.cli.cl_default_mds_easize;
554
555         /* pack the intended request */
556         mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
557
558         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
559         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
560         req_capsule_set_size(&req->rq_pill, &RMF_DEFAULT_MDT_MD, RCL_SERVER,
561                              sizeof(struct lmv_user_md));
562
563         if (have_secctx) {
564                 char *secctx_name;
565
566                 secctx_name = req_capsule_client_get(&req->rq_pill,
567                                                      &RMF_FILE_SECCTX_NAME);
568                 memcpy(secctx_name, op_data->op_file_secctx_name,
569                        op_data->op_file_secctx_name_size);
570
571                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
572                                      RCL_SERVER, easize);
573
574                 CDEBUG(D_SEC, "packed '%.*s' as security xattr name\n",
575                        op_data->op_file_secctx_name_size,
576                        op_data->op_file_secctx_name);
577         } else {
578                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
579                                      RCL_SERVER, 0);
580         }
581
582         ptlrpc_request_set_replen(req);
583         RETURN(req);
584 }
585
586 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
587                                                      struct lookup_intent *it,
588                                                      struct md_op_data *op_data)
589 {
590         struct obd_device     *obd = class_exp2obd(exp);
591         struct list_head cancels = LIST_HEAD_INIT(cancels);
592         struct ptlrpc_request *req;
593         struct ldlm_intent    *lit;
594         struct layout_intent  *layout;
595         int count = 0, rc;
596         ENTRY;
597
598         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
599                                 &RQF_LDLM_INTENT_LAYOUT);
600         if (req == NULL)
601                 RETURN(ERR_PTR(-ENOMEM));
602
603         if (fid_is_sane(&op_data->op_fid2) && (it->it_op & IT_LAYOUT) &&
604             (it->it_flags & FMODE_WRITE)) {
605                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
606                                                 &cancels, LCK_EX,
607                                                 MDS_INODELOCK_LAYOUT);
608         }
609
610         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
611         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
612         if (rc) {
613                 ptlrpc_request_free(req);
614                 RETURN(ERR_PTR(rc));
615         }
616
617         /* pack the intent */
618         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
619         lit->opc = (__u64)it->it_op;
620
621         /* pack the layout intent request */
622         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
623         LASSERT(op_data->op_data != NULL);
624         LASSERT(op_data->op_data_size == sizeof(*layout));
625         memcpy(layout, op_data->op_data, sizeof(*layout));
626
627         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
628                              obd->u.cli.cl_default_mds_easize);
629         ptlrpc_request_set_replen(req);
630         RETURN(req);
631 }
632
633 static struct ptlrpc_request *
634 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
635 {
636         struct ptlrpc_request *req;
637         int rc;
638         ENTRY;
639
640         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
641         if (req == NULL)
642                 RETURN(ERR_PTR(-ENOMEM));
643
644         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
645         if (rc) {
646                 ptlrpc_request_free(req);
647                 RETURN(ERR_PTR(rc));
648         }
649
650         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
651         ptlrpc_request_set_replen(req);
652         RETURN(req);
653 }
654
655 static int mdc_finish_enqueue(struct obd_export *exp,
656                               struct ptlrpc_request *req,
657                               struct ldlm_enqueue_info *einfo,
658                               struct lookup_intent *it,
659                               struct lustre_handle *lockh,
660                               int rc)
661 {
662         struct req_capsule  *pill = &req->rq_pill;
663         struct ldlm_request *lockreq;
664         struct ldlm_reply   *lockrep;
665         struct ldlm_lock    *lock;
666         struct mdt_body     *body = NULL;
667         void                *lvb_data = NULL;
668         __u32                lvb_len = 0;
669
670         ENTRY;
671
672         LASSERT(rc >= 0);
673         /* Similarly, if we're going to replay this request, we don't want to
674          * actually get a lock, just perform the intent. */
675         if (req->rq_transno || req->rq_replay) {
676                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
677                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
678         }
679
680         if (rc == ELDLM_LOCK_ABORTED) {
681                 einfo->ei_mode = 0;
682                 memset(lockh, 0, sizeof(*lockh));
683                 rc = 0;
684         } else { /* rc = 0 */
685                 lock = ldlm_handle2lock(lockh);
686                 LASSERT(lock != NULL);
687
688                 /* If the server gave us back a different lock mode, we should
689                  * fix up our variables. */
690                 if (lock->l_req_mode != einfo->ei_mode) {
691                         ldlm_lock_addref(lockh, lock->l_req_mode);
692                         ldlm_lock_decref(lockh, einfo->ei_mode);
693                         einfo->ei_mode = lock->l_req_mode;
694                 }
695                 LDLM_LOCK_PUT(lock);
696         }
697
698         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
699         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
700
701         it->it_disposition = (int)lockrep->lock_policy_res1;
702         it->it_status = (int)lockrep->lock_policy_res2;
703         it->it_lock_mode = einfo->ei_mode;
704         it->it_lock_handle = lockh->cookie;
705         it->it_request = req;
706
707         /* Technically speaking rq_transno must already be zero if
708          * it_status is in error, so the check is a bit redundant */
709         if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
710                 mdc_clear_replay_flag(req, it->it_status);
711
712         /* If we're doing an IT_OPEN which did not result in an actual
713          * successful open, then we need to remove the bit which saves
714          * this request for unconditional replay.
715          *
716          * It's important that we do this first!  Otherwise we might exit the
717          * function without doing so, and try to replay a failed create
718          * (bug 3440) */
719         if (it->it_op & IT_OPEN && req->rq_replay &&
720             (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
721                 mdc_clear_replay_flag(req, it->it_status);
722
723         DEBUG_REQ(D_RPCTRACE, req, "op: %x disposition: %x, status: %d",
724                   it->it_op, it->it_disposition, it->it_status);
725
726         /* We know what to expect, so we do any byte flipping required here */
727         if (it_has_reply_body(it)) {
728                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
729                 if (body == NULL) {
730                         CERROR ("Can't swab mdt_body\n");
731                         RETURN (-EPROTO);
732                 }
733
734                 if (it_disposition(it, DISP_OPEN_OPEN) &&
735                     !it_open_error(DISP_OPEN_OPEN, it)) {
736                         /*
737                          * If this is a successful OPEN request, we need to set
738                          * replay handler and data early, so that if replay
739                          * happens immediately after swabbing below, new reply
740                          * is swabbed by that handler correctly.
741                          */
742                         mdc_set_open_replay_data(NULL, NULL, it);
743                 }
744
745                 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
746                         void *eadata;
747
748                         mdc_update_max_ea_from_body(exp, body);
749
750                         /*
751                          * The eadata is opaque; just check that it is there.
752                          * Eventually, obd_unpackmd() will check the contents.
753                          */
754                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
755                                                         body->mbo_eadatasize);
756                         if (eadata == NULL)
757                                 RETURN(-EPROTO);
758
759                         /* save lvb data and length in case this is for layout
760                          * lock */
761                         lvb_data = eadata;
762                         lvb_len = body->mbo_eadatasize;
763
764                         /*
765                          * We save the reply LOV EA in case we have to replay a
766                          * create for recovery.  If we didn't allocate a large
767                          * enough request buffer above we need to reallocate it
768                          * here to hold the actual LOV EA.
769                          *
770                          * To not save LOV EA if request is not going to replay
771                          * (for example error one).
772                          */
773                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
774                                 rc = mdc_save_lovea(req, &RMF_EADATA, eadata,
775                                                     body->mbo_eadatasize);
776                                 if (rc) {
777                                         body->mbo_valid &= ~OBD_MD_FLEASIZE;
778                                         body->mbo_eadatasize = 0;
779                                         rc = 0;
780                                 }
781                         }
782                 }
783         } else if (it->it_op & IT_LAYOUT) {
784                 /* maybe the lock was granted right away and layout
785                  * is packed into RMF_DLM_LVB of req */
786                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
787                 CDEBUG(D_INFO, "%s: layout return lvb %d transno %lld\n",
788                        class_exp2obd(exp)->obd_name, lvb_len, req->rq_transno);
789                 if (lvb_len > 0) {
790                         lvb_data = req_capsule_server_sized_get(pill,
791                                                         &RMF_DLM_LVB, lvb_len);
792                         if (lvb_data == NULL)
793                                 RETURN(-EPROTO);
794
795                         /**
796                          * save replied layout data to the request buffer for
797                          * recovery consideration (lest MDS reinitialize
798                          * another set of OST objects).
799                          */
800                         if (req->rq_transno)
801                                 (void)mdc_save_lovea(req, &RMF_EADATA, lvb_data,
802                                                      lvb_len);
803                 }
804         }
805
806         /* fill in stripe data for layout lock.
807          * LU-6581: trust layout data only if layout lock is granted. The MDT
808          * has stopped sending layout unless the layout lock is granted. The
809          * client still does this checking in case it's talking with an old
810          * server. - Jinshan */
811         lock = ldlm_handle2lock(lockh);
812         if (lock == NULL)
813                 RETURN(rc);
814
815         if (ldlm_has_layout(lock) && lvb_data != NULL &&
816             !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
817                 void *lmm;
818
819                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
820                         ldlm_it2str(it->it_op), lvb_len);
821
822                 OBD_ALLOC_LARGE(lmm, lvb_len);
823                 if (lmm == NULL)
824                         GOTO(out_lock, rc = -ENOMEM);
825
826                 memcpy(lmm, lvb_data, lvb_len);
827
828                 /* install lvb_data */
829                 lock_res_and_lock(lock);
830                 if (lock->l_lvb_data == NULL) {
831                         lock->l_lvb_type = LVB_T_LAYOUT;
832                         lock->l_lvb_data = lmm;
833                         lock->l_lvb_len = lvb_len;
834                         lmm = NULL;
835                 }
836                 unlock_res_and_lock(lock);
837                 if (lmm != NULL)
838                         OBD_FREE_LARGE(lmm, lvb_len);
839         }
840
841         if (ldlm_has_dom(lock)) {
842                 LASSERT(lock->l_glimpse_ast == mdc_ldlm_glimpse_ast);
843
844                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
845                 if (!(body->mbo_valid & OBD_MD_DOM_SIZE)) {
846                         LDLM_ERROR(lock, "%s: DoM lock without size.",
847                                    exp->exp_obd->obd_name);
848                         GOTO(out_lock, rc = -EPROTO);
849                 }
850
851                 LDLM_DEBUG(lock, "DoM lock is returned by: %s, size: %llu",
852                            ldlm_it2str(it->it_op), body->mbo_dom_size);
853
854                 rc = mdc_fill_lvb(req, &lock->l_ost_lvb);
855         }
856 out_lock:
857         LDLM_LOCK_PUT(lock);
858
859         RETURN(rc);
860 }
861
862 /* We always reserve enough space in the reply packet for a stripe MD, because
863  * we don't know in advance the file type. */
864 static int mdc_enqueue_base(struct obd_export *exp,
865                             struct ldlm_enqueue_info *einfo,
866                             const union ldlm_policy_data *policy,
867                             struct lookup_intent *it,
868                             struct md_op_data *op_data,
869                             struct lustre_handle *lockh,
870                             __u64 extra_lock_flags)
871 {
872         struct obd_device *obddev = class_exp2obd(exp);
873         struct ptlrpc_request *req = NULL;
874         __u64 flags, saved_flags = extra_lock_flags;
875         struct ldlm_res_id res_id;
876         static const union ldlm_policy_data lookup_policy = {
877                                   .l_inodebits = { MDS_INODELOCK_LOOKUP } };
878         static const union ldlm_policy_data update_policy = {
879                                   .l_inodebits = { MDS_INODELOCK_UPDATE } };
880         static const union ldlm_policy_data layout_policy = {
881                                   .l_inodebits = { MDS_INODELOCK_LAYOUT } };
882         static const union ldlm_policy_data getxattr_policy = {
883                                   .l_inodebits = { MDS_INODELOCK_XATTR } };
884         int generation, resends = 0;
885         struct ldlm_reply *lockrep;
886         struct obd_import *imp = class_exp2cliimp(exp);
887         __u32 acl_bufsize;
888         enum lvb_type lvb_type = 0;
889         int rc;
890         ENTRY;
891
892         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
893                  einfo->ei_type);
894         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
895
896         if (it != NULL) {
897                 LASSERT(policy == NULL);
898
899                 saved_flags |= LDLM_FL_HAS_INTENT;
900                 if (it->it_op & (IT_GETATTR | IT_READDIR))
901                         policy = &update_policy;
902                 else if (it->it_op & IT_LAYOUT)
903                         policy = &layout_policy;
904                 else if (it->it_op & IT_GETXATTR)
905                         policy = &getxattr_policy;
906                 else
907                         policy = &lookup_policy;
908         }
909
910         generation = obddev->u.cli.cl_import->imp_generation;
911         if (!it || (it->it_op & (IT_OPEN | IT_CREAT)))
912                 acl_bufsize = MIN(imp->imp_connect_data.ocd_max_easize,
913                                   XATTR_SIZE_MAX);
914         else
915                 acl_bufsize = LUSTRE_POSIX_ACL_MAX_SIZE_OLD;
916
917 resend:
918         flags = saved_flags;
919         if (it == NULL) {
920                 /* The only way right now is FLOCK. */
921                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
922                          einfo->ei_type);
923                 res_id.name[3] = LDLM_FLOCK;
924         } else if (it->it_op & IT_OPEN) {
925                 req = mdc_intent_open_pack(exp, it, op_data, acl_bufsize);
926         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
927                 req = mdc_intent_getattr_pack(exp, it, op_data, acl_bufsize);
928         } else if (it->it_op & IT_READDIR) {
929                 req = mdc_enqueue_pack(exp, 0);
930         } else if (it->it_op & IT_LAYOUT) {
931                 if (!imp_connect_lvb_type(imp))
932                         RETURN(-EOPNOTSUPP);
933                 req = mdc_intent_layout_pack(exp, it, op_data);
934                 lvb_type = LVB_T_LAYOUT;
935         } else if (it->it_op & IT_GETXATTR) {
936                 req = mdc_intent_getxattr_pack(exp, it, op_data);
937         } else {
938                 LBUG();
939                 RETURN(-EINVAL);
940         }
941
942         if (IS_ERR(req))
943                 RETURN(PTR_ERR(req));
944
945         if (resends) {
946                 req->rq_generation_set = 1;
947                 req->rq_import_generation = generation;
948                 req->rq_sent = ktime_get_real_seconds() + resends;
949         }
950
951         /* It is important to obtain modify RPC slot first (if applicable), so
952          * that threads that are waiting for a modify RPC slot are not polluting
953          * our rpcs in flight counter.
954          * We do not do flock request limiting, though */
955         if (it) {
956                 mdc_get_mod_rpc_slot(req, it);
957                 rc = obd_get_request_slot(&obddev->u.cli);
958                 if (rc != 0) {
959                         mdc_put_mod_rpc_slot(req, it);
960                         mdc_clear_replay_flag(req, 0);
961                         ptlrpc_req_finished(req);
962                         RETURN(rc);
963                 }
964         }
965
966         /* With Data-on-MDT the glimpse callback is needed too.
967          * It is set here in advance but not in mdc_finish_enqueue()
968          * to avoid possible races. It is safe to have glimpse handler
969          * for non-DOM locks and costs nothing.*/
970         if (einfo->ei_cb_gl == NULL)
971                 einfo->ei_cb_gl = mdc_ldlm_glimpse_ast;
972
973         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
974                               0, lvb_type, lockh, 0);
975         if (!it) {
976                 /* For flock requests we immediatelly return without further
977                    delay and let caller deal with the rest, since rest of
978                    this function metadata processing makes no sense for flock
979                    requests anyway. But in case of problem during comms with
980                    Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
981                    can not rely on caller and this mainly for F_UNLCKs
982                    (explicits or automatically generated by Kernel to clean
983                    current FLocks upon exit) that can't be trashed */
984                 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
985                     (einfo->ei_type == LDLM_FLOCK) &&
986                     (einfo->ei_mode == LCK_NL))
987                         goto resend;
988                 RETURN(rc);
989         }
990
991         obd_put_request_slot(&obddev->u.cli);
992         mdc_put_mod_rpc_slot(req, it);
993
994         if (rc < 0) {
995                 CDEBUG(D_INFO,
996                       "%s: ldlm_cli_enqueue "DFID":"DFID"=%s failed: rc = %d\n",
997                       obddev->obd_name, PFID(&op_data->op_fid1),
998                       PFID(&op_data->op_fid2), op_data->op_name ?: "", rc);
999
1000                 mdc_clear_replay_flag(req, rc);
1001                 ptlrpc_req_finished(req);
1002                 RETURN(rc);
1003         }
1004
1005         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1006         LASSERT(lockrep != NULL);
1007
1008         lockrep->lock_policy_res2 =
1009                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1010
1011         /* Retry infinitely when the server returns -EINPROGRESS for the
1012          * intent operation, when server returns -EINPROGRESS for acquiring
1013          * intent lock, we'll retry in after_reply(). */
1014         if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
1015                 mdc_clear_replay_flag(req, rc);
1016                 ptlrpc_req_finished(req);
1017                 if (generation == obddev->u.cli.cl_import->imp_generation) {
1018                         if (signal_pending(current))
1019                                 RETURN(-EINTR);
1020
1021                         resends++;
1022                         CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
1023                                obddev->obd_name, resends, it->it_op,
1024                                PFID(&op_data->op_fid1),
1025                                PFID(&op_data->op_fid2));
1026                         goto resend;
1027                 } else {
1028                         CDEBUG(D_HA, "resend cross eviction\n");
1029                         RETURN(-EIO);
1030                 }
1031         }
1032
1033         if ((int)lockrep->lock_policy_res2 == -ERANGE &&
1034             it->it_op & (IT_OPEN | IT_GETATTR | IT_LOOKUP) &&
1035             acl_bufsize == LUSTRE_POSIX_ACL_MAX_SIZE_OLD) {
1036                 mdc_clear_replay_flag(req, -ERANGE);
1037                 ptlrpc_req_finished(req);
1038                 acl_bufsize = MIN(imp->imp_connect_data.ocd_max_easize,
1039                                   XATTR_SIZE_MAX);
1040                 goto resend;
1041         }
1042
1043         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1044         if (rc < 0) {
1045                 if (lustre_handle_is_used(lockh)) {
1046                         ldlm_lock_decref(lockh, einfo->ei_mode);
1047                         memset(lockh, 0, sizeof(*lockh));
1048                 }
1049                 ptlrpc_req_finished(req);
1050
1051                 it->it_lock_handle = 0;
1052                 it->it_lock_mode = 0;
1053                 it->it_request = NULL;
1054         }
1055
1056         RETURN(rc);
1057 }
1058
1059 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
1060                 const union ldlm_policy_data *policy,
1061                 struct md_op_data *op_data,
1062                 struct lustre_handle *lockh, __u64 extra_lock_flags)
1063 {
1064         return mdc_enqueue_base(exp, einfo, policy, NULL,
1065                                 op_data, lockh, extra_lock_flags);
1066 }
1067
1068 static int mdc_finish_intent_lock(struct obd_export *exp,
1069                                   struct ptlrpc_request *request,
1070                                   struct md_op_data *op_data,
1071                                   struct lookup_intent *it,
1072                                   struct lustre_handle *lockh)
1073 {
1074         struct lustre_handle old_lock;
1075         struct ldlm_lock *lock;
1076         int rc = 0;
1077         ENTRY;
1078
1079         LASSERT(request != NULL);
1080         LASSERT(request != LP_POISON);
1081         LASSERT(request->rq_repmsg != LP_POISON);
1082
1083         if (it->it_op & IT_READDIR)
1084                 RETURN(0);
1085
1086         if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) {
1087                 if (it->it_status != 0)
1088                         GOTO(out, rc = it->it_status);
1089         } else {
1090                 if (!it_disposition(it, DISP_IT_EXECD)) {
1091                         /* The server failed before it even started executing
1092                          * the intent, i.e. because it couldn't unpack the
1093                          * request.
1094                          */
1095                         LASSERT(it->it_status != 0);
1096                         GOTO(out, rc = it->it_status);
1097                 }
1098                 rc = it_open_error(DISP_IT_EXECD, it);
1099                 if (rc)
1100                         GOTO(out, rc);
1101
1102                 rc = it_open_error(DISP_LOOKUP_EXECD, it);
1103                 if (rc)
1104                         GOTO(out, rc);
1105
1106                 /* keep requests around for the multiple phases of the call
1107                  * this shows the DISP_XX must guarantee we make it into the
1108                  * call
1109                  */
1110                 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
1111                     it_disposition(it, DISP_OPEN_CREATE) &&
1112                     !it_open_error(DISP_OPEN_CREATE, it)) {
1113                         it_set_disposition(it, DISP_ENQ_CREATE_REF);
1114                         /* balanced in ll_create_node */
1115                         ptlrpc_request_addref(request);
1116                 }
1117                 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1118                     it_disposition(it, DISP_OPEN_OPEN) &&
1119                     !it_open_error(DISP_OPEN_OPEN, it)) {
1120                         it_set_disposition(it, DISP_ENQ_OPEN_REF);
1121                         /* balanced in ll_file_open */
1122                         ptlrpc_request_addref(request);
1123                         /* BUG 11546 - eviction in the middle of open rpc
1124                          * processing
1125                          */
1126                         OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE,
1127                                          obd_timeout);
1128                 }
1129
1130                 if (it->it_op & IT_CREAT) {
1131                         /* XXX this belongs in ll_create_it */
1132                 } else if (it->it_op == IT_OPEN) {
1133                         LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1134                 } else {
1135                         LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
1136                 }
1137         }
1138
1139         /* If we already have a matching lock, then cancel the new
1140          * one.  We have to set the data here instead of in
1141          * mdc_enqueue, because we need to use the child's inode as
1142          * the l_ast_data to match, and that's not available until
1143          * intent_finish has performed the iget().) */
1144         lock = ldlm_handle2lock(lockh);
1145         if (lock) {
1146                 union ldlm_policy_data policy = lock->l_policy_data;
1147                 LDLM_DEBUG(lock, "matching against this");
1148
1149                 if (it_has_reply_body(it)) {
1150                         struct mdt_body *body;
1151
1152                         body = req_capsule_server_get(&request->rq_pill,
1153                                                       &RMF_MDT_BODY);
1154                         /* mdc_enqueue checked */
1155                         LASSERT(body != NULL);
1156                         LASSERTF(fid_res_name_eq(&body->mbo_fid1,
1157                                                  &lock->l_resource->lr_name),
1158                                  "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1159                                  PLDLMRES(lock->l_resource),
1160                                  PFID(&body->mbo_fid1));
1161                 }
1162                 LDLM_LOCK_PUT(lock);
1163
1164                 memcpy(&old_lock, lockh, sizeof(*lockh));
1165                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1166                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1167                         ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
1168                         memcpy(lockh, &old_lock, sizeof(old_lock));
1169                         it->it_lock_handle = lockh->cookie;
1170                 }
1171         }
1172
1173         EXIT;
1174 out:
1175         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1176                 (int)op_data->op_namelen, op_data->op_name,
1177                 ldlm_it2str(it->it_op), it->it_status,
1178                 it->it_disposition, rc);
1179         return rc;
1180 }
1181
1182 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1183                         struct lu_fid *fid, __u64 *bits)
1184 {
1185         /* We could just return 1 immediately, but since we should only
1186          * be called in revalidate_it if we already have a lock, let's
1187          * verify that. */
1188         struct ldlm_res_id res_id;
1189         struct lustre_handle lockh;
1190         union ldlm_policy_data policy;
1191         enum ldlm_mode mode;
1192         ENTRY;
1193
1194         if (it->it_lock_handle) {
1195                 lockh.cookie = it->it_lock_handle;
1196                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1197         } else {
1198                 fid_build_reg_res_name(fid, &res_id);
1199                 switch (it->it_op) {
1200                 case IT_GETATTR:
1201                         /* File attributes are held under multiple bits:
1202                          * nlink is under lookup lock, size and times are
1203                          * under UPDATE lock and recently we've also got
1204                          * a separate permissions lock for owner/group/acl that
1205                          * were protected by lookup lock before.
1206                          * Getattr must provide all of that information,
1207                          * so we need to ensure we have all of those locks.
1208                          * Unfortunately, if the bits are split across multiple
1209                          * locks, there's no easy way to match all of them here,
1210                          * so an extra RPC would be performed to fetch all
1211                          * of those bits at once for now. */
1212                         /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1213                          * but for old MDTs (< 2.4), permission is covered
1214                          * by LOOKUP lock, so it needs to match all bits here.*/
1215                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1216                                                   MDS_INODELOCK_LOOKUP |
1217                                                   MDS_INODELOCK_PERM;
1218                         break;
1219                 case IT_READDIR:
1220                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1221                         break;
1222                 case IT_LAYOUT:
1223                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1224                         break;
1225                 default:
1226                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1227                         break;
1228                 }
1229
1230                 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1231                                       LDLM_IBITS, &policy,
1232                                       LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1233                                       &lockh);
1234         }
1235
1236         if (mode) {
1237                 it->it_lock_handle = lockh.cookie;
1238                 it->it_lock_mode = mode;
1239         } else {
1240                 it->it_lock_handle = 0;
1241                 it->it_lock_mode = 0;
1242         }
1243
1244         RETURN(!!mode);
1245 }
1246
1247 /*
1248  * This long block is all about fixing up the lock and request state
1249  * so that it is correct as of the moment _before_ the operation was
1250  * applied; that way, the VFS will think that everything is normal and
1251  * call Lustre's regular VFS methods.
1252  *
1253  * If we're performing a creation, that means that unless the creation
1254  * failed with EEXIST, we should fake up a negative dentry.
1255  *
1256  * For everything else, we want to lookup to succeed.
1257  *
1258  * One additional note: if CREATE or OPEN succeeded, we add an extra
1259  * reference to the request because we need to keep it around until
1260  * ll_create/ll_open gets called.
1261  *
1262  * The server will return to us, in it_disposition, an indication of
1263  * exactly what it_status refers to.
1264  *
1265  * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1266  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1267  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1268  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1269  * was successful.
1270  *
1271  * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1272  * child lookup.
1273  */
1274 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1275                     struct lookup_intent *it, struct ptlrpc_request **reqp,
1276                     ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1277 {
1278         struct ldlm_enqueue_info einfo = {
1279                 .ei_type        = LDLM_IBITS,
1280                 .ei_mode        = it_to_lock_mode(it),
1281                 .ei_cb_bl       = cb_blocking,
1282                 .ei_cb_cp       = ldlm_completion_ast,
1283                 .ei_cb_gl       = mdc_ldlm_glimpse_ast,
1284         };
1285         struct lustre_handle lockh;
1286         int rc = 0;
1287         ENTRY;
1288         LASSERT(it);
1289
1290         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1291                 ", intent: %s flags %#llo\n", (int)op_data->op_namelen,
1292                 op_data->op_name, PFID(&op_data->op_fid2),
1293                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1294                 it->it_flags);
1295
1296         lockh.cookie = 0;
1297         if (fid_is_sane(&op_data->op_fid2) &&
1298             (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1299                 /* We could just return 1 immediately, but since we should only
1300                  * be called in revalidate_it if we already have a lock, let's
1301                  * verify that. */
1302                 it->it_lock_handle = 0;
1303                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1304                 /* Only return failure if it was not GETATTR by cfid
1305                    (from inode_revalidate) */
1306                 if (rc || op_data->op_namelen != 0)
1307                         RETURN(rc);
1308         }
1309
1310         /* For case if upper layer did not alloc fid, do it now. */
1311         if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1312                 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1313                 if (rc < 0) {
1314                         CERROR("Can't alloc new fid, rc %d\n", rc);
1315                         RETURN(rc);
1316                 }
1317         }
1318
1319         rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh,
1320                               extra_lock_flags);
1321         if (rc < 0)
1322                 RETURN(rc);
1323
1324         *reqp = it->it_request;
1325         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1326         RETURN(rc);
1327 }
1328
1329 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1330                                               struct ptlrpc_request *req,
1331                                               void *args, int rc)
1332 {
1333         struct mdc_getattr_args  *ga = args;
1334         struct obd_export *exp = ga->ga_exp;
1335         struct md_enqueue_info *minfo = ga->ga_minfo;
1336         struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1337         struct lookup_intent *it;
1338         struct lustre_handle *lockh;
1339         struct obd_device *obddev;
1340         struct ldlm_reply *lockrep;
1341         __u64 flags = LDLM_FL_HAS_INTENT;
1342         ENTRY;
1343
1344         it    = &minfo->mi_it;
1345         lockh = &minfo->mi_lockh;
1346
1347         obddev = class_exp2obd(exp);
1348
1349         obd_put_request_slot(&obddev->u.cli);
1350         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1351                 rc = -ETIMEDOUT;
1352
1353         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1354                                    &flags, NULL, 0, lockh, rc);
1355         if (rc < 0) {
1356                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1357                 mdc_clear_replay_flag(req, rc);
1358                 GOTO(out, rc);
1359         }
1360
1361         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1362         LASSERT(lockrep != NULL);
1363
1364         lockrep->lock_policy_res2 =
1365                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1366
1367         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1368         if (rc)
1369                 GOTO(out, rc);
1370
1371         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1372         EXIT;
1373
1374 out:
1375         minfo->mi_cb(req, minfo, rc);
1376         return 0;
1377 }
1378
1379 int mdc_intent_getattr_async(struct obd_export *exp,
1380                              struct md_enqueue_info *minfo)
1381 {
1382         struct md_op_data       *op_data = &minfo->mi_data;
1383         struct lookup_intent    *it = &minfo->mi_it;
1384         struct ptlrpc_request   *req;
1385         struct mdc_getattr_args *ga;
1386         struct obd_device       *obddev = class_exp2obd(exp);
1387         struct ldlm_res_id       res_id;
1388         union ldlm_policy_data policy = {
1389                                 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1390                                                  MDS_INODELOCK_UPDATE } };
1391         int                      rc = 0;
1392         __u64                    flags = LDLM_FL_HAS_INTENT;
1393         ENTRY;
1394
1395         CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#llo\n",
1396                 (int)op_data->op_namelen, op_data->op_name,
1397                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1398
1399         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1400         /* If the MDT return -ERANGE because of large ACL, then the sponsor
1401          * of the async getattr RPC will handle that by itself. */
1402         req = mdc_intent_getattr_pack(exp, it, op_data,
1403                                       LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
1404         if (IS_ERR(req))
1405                 RETURN(PTR_ERR(req));
1406
1407         rc = obd_get_request_slot(&obddev->u.cli);
1408         if (rc != 0) {
1409                 ptlrpc_req_finished(req);
1410                 RETURN(rc);
1411         }
1412
1413         /* With Data-on-MDT the glimpse callback is needed too.
1414          * It is set here in advance but not in mdc_finish_enqueue()
1415          * to avoid possible races. It is safe to have glimpse handler
1416          * for non-DOM locks and costs nothing.*/
1417         if (minfo->mi_einfo.ei_cb_gl == NULL)
1418                 minfo->mi_einfo.ei_cb_gl = mdc_ldlm_glimpse_ast;
1419
1420         rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1421                               &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1422         if (rc < 0) {
1423                 obd_put_request_slot(&obddev->u.cli);
1424                 ptlrpc_req_finished(req);
1425                 RETURN(rc);
1426         }
1427
1428         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1429         ga = ptlrpc_req_async_args(req);
1430         ga->ga_exp = exp;
1431         ga->ga_minfo = minfo;
1432
1433         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1434         ptlrpcd_add_req(req);
1435
1436         RETURN(0);
1437 }