Whamcloud - gitweb
LU-10181 mdt: read on open for DoM files
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_MDC
34
35 #include <linux/module.h>
36
37 #include <obd.h>
38 #include <obd_class.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_intent.h>
42 #include <lustre_mdc.h>
43 #include <lustre_net.h>
44 #include <lustre_req_layout.h>
45 #include <lustre_swab.h>
46 #include <lustre_acl.h>
47
48 #include "mdc_internal.h"
49
50 struct mdc_getattr_args {
51         struct obd_export               *ga_exp;
52         struct md_enqueue_info          *ga_minfo;
53 };
54
55 int it_open_error(int phase, struct lookup_intent *it)
56 {
57         if (it_disposition(it, DISP_OPEN_LEASE)) {
58                 if (phase >= DISP_OPEN_LEASE)
59                         return it->it_status;
60                 else
61                         return 0;
62         }
63         if (it_disposition(it, DISP_OPEN_OPEN)) {
64                 if (phase >= DISP_OPEN_OPEN)
65                         return it->it_status;
66                 else
67                         return 0;
68         }
69
70         if (it_disposition(it, DISP_OPEN_CREATE)) {
71                 if (phase >= DISP_OPEN_CREATE)
72                         return it->it_status;
73                 else
74                         return 0;
75         }
76
77         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
78                 if (phase >= DISP_LOOKUP_EXECD)
79                         return it->it_status;
80                 else
81                         return 0;
82         }
83
84         if (it_disposition(it, DISP_IT_EXECD)) {
85                 if (phase >= DISP_IT_EXECD)
86                         return it->it_status;
87                 else
88                         return 0;
89         }
90
91         CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
92         LBUG();
93
94         return 0;
95 }
96 EXPORT_SYMBOL(it_open_error);
97
98 /* this must be called on a lockh that is known to have a referenced lock */
99 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
100                       void *data, __u64 *bits)
101 {
102         struct ldlm_lock *lock;
103         struct inode *new_inode = data;
104         ENTRY;
105
106         if(bits)
107                 *bits = 0;
108
109         if (!lustre_handle_is_used(lockh))
110                 RETURN(0);
111
112         lock = ldlm_handle2lock(lockh);
113
114         LASSERT(lock != NULL);
115         lock_res_and_lock(lock);
116         if (lock->l_resource->lr_lvb_inode &&
117             lock->l_resource->lr_lvb_inode != data) {
118                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
119                 LASSERTF(old_inode->i_state & I_FREEING,
120                          "Found existing inode %p/%lu/%u state %lu in lock: "
121                          "setting data to %p/%lu/%u\n", old_inode,
122                          old_inode->i_ino, old_inode->i_generation,
123                          old_inode->i_state,
124                          new_inode, new_inode->i_ino, new_inode->i_generation);
125         }
126         lock->l_resource->lr_lvb_inode = new_inode;
127         if (bits)
128                 *bits = lock->l_policy_data.l_inodebits.bits;
129
130         unlock_res_and_lock(lock);
131         LDLM_LOCK_PUT(lock);
132
133         RETURN(0);
134 }
135
136 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
137                               const struct lu_fid *fid, enum ldlm_type type,
138                               union ldlm_policy_data *policy,
139                               enum ldlm_mode mode, struct lustre_handle *lockh)
140 {
141         struct ldlm_res_id res_id;
142         enum ldlm_mode rc;
143         ENTRY;
144
145         fid_build_reg_res_name(fid, &res_id);
146         /* LU-4405: Clear bits not supported by server */
147         policy->l_inodebits.bits &= exp_connect_ibits(exp);
148         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
149                              &res_id, type, policy, mode, lockh, 0);
150         RETURN(rc);
151 }
152
153 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
154                       union ldlm_policy_data *policy, enum ldlm_mode mode,
155                       enum ldlm_cancel_flags flags, void *opaque)
156 {
157         struct obd_device *obd = class_exp2obd(exp);
158         struct ldlm_res_id res_id;
159         int rc;
160
161         ENTRY;
162
163         fid_build_reg_res_name(fid, &res_id);
164         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
165                                              policy, mode, flags, opaque);
166         RETURN(rc);
167 }
168
169 int mdc_null_inode(struct obd_export *exp,
170                    const struct lu_fid *fid)
171 {
172         struct ldlm_res_id res_id;
173         struct ldlm_resource *res;
174         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
175         ENTRY;
176
177         LASSERTF(ns != NULL, "no namespace passed\n");
178
179         fid_build_reg_res_name(fid, &res_id);
180
181         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
182         if (IS_ERR(res))
183                 RETURN(0);
184
185         lock_res(res);
186         res->lr_lvb_inode = NULL;
187         unlock_res(res);
188
189         ldlm_resource_putref(res);
190         RETURN(0);
191 }
192
193 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
194 {
195         /* Don't hold error requests for replay. */
196         if (req->rq_replay) {
197                 spin_lock(&req->rq_lock);
198                 req->rq_replay = 0;
199                 spin_unlock(&req->rq_lock);
200         }
201         if (rc && req->rq_transno != 0) {
202                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
203                 LBUG();
204         }
205 }
206
207 /* Save a large LOV EA into the request buffer so that it is available
208  * for replay.  We don't do this in the initial request because the
209  * original request doesn't need this buffer (at most it sends just the
210  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
211  * buffer and may also be difficult to allocate and save a very large
212  * request buffer for each open. (bug 5707)
213  *
214  * OOM here may cause recovery failure if lmm is needed (only for the
215  * original open if the MDS crashed just when this client also OOM'd)
216  * but this is incredibly unlikely, and questionable whether the client
217  * could do MDS recovery under OOM anyways... */
218 int mdc_save_lovea(struct ptlrpc_request *req,
219                    const struct req_msg_field *field,
220                    void *data, u32 size)
221 {
222         struct req_capsule *pill = &req->rq_pill;
223         void *lmm;
224         int rc = 0;
225
226         if (req_capsule_get_size(pill, field, RCL_CLIENT) < size) {
227                 rc = sptlrpc_cli_enlarge_reqbuf(req, field, size);
228                 if (rc) {
229                         CERROR("%s: Can't enlarge ea size to %d: rc = %d\n",
230                                req->rq_export->exp_obd->obd_name,
231                                size, rc);
232                         return rc;
233                 }
234         } else {
235                 req_capsule_shrink(pill, field, size, RCL_CLIENT);
236         }
237
238         req_capsule_set_size(pill, field, RCL_CLIENT, size);
239         lmm = req_capsule_client_get(pill, field);
240         if (lmm)
241                 memcpy(lmm, data, size);
242
243         return rc;
244 }
245
246 static struct ptlrpc_request *
247 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
248                      struct md_op_data *op_data, __u32 acl_bufsize)
249 {
250         struct ptlrpc_request   *req;
251         struct obd_device       *obddev = class_exp2obd(exp);
252         struct ldlm_intent      *lit;
253         const void              *lmm = op_data->op_data;
254         __u32                    lmmsize = op_data->op_data_size;
255         struct list_head         cancels = LIST_HEAD_INIT(cancels);
256         int                      count = 0;
257         enum ldlm_mode           mode;
258         int                      rc;
259         int repsize;
260
261         ENTRY;
262
263         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
264
265         /* XXX: openlock is not cancelled for cross-refs. */
266         /* If inode is known, cancel conflicting OPEN locks. */
267         if (fid_is_sane(&op_data->op_fid2)) {
268                 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
269                         if (it->it_flags & MDS_FMODE_WRITE)
270                                 mode = LCK_EX;
271                         else
272                                 mode = LCK_PR;
273                 } else {
274                         if (it->it_flags & (MDS_FMODE_WRITE | MDS_OPEN_TRUNC))
275                                 mode = LCK_CW;
276 #ifdef FMODE_EXEC
277                         else if (it->it_flags & FMODE_EXEC)
278                                 mode = LCK_PR;
279 #endif
280                         else
281                                 mode = LCK_CR;
282                 }
283                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
284                                                 &cancels, mode,
285                                                 MDS_INODELOCK_OPEN);
286         }
287
288         /* If CREATE, cancel parent's UPDATE lock. */
289         if (it->it_op & IT_CREAT)
290                 mode = LCK_EX;
291         else
292                 mode = LCK_CR;
293         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
294                                          &cancels, mode,
295                                          MDS_INODELOCK_UPDATE);
296
297         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
298                                    &RQF_LDLM_INTENT_OPEN);
299         if (req == NULL) {
300                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
301                 RETURN(ERR_PTR(-ENOMEM));
302         }
303
304         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
305                              op_data->op_namelen + 1);
306         if (cl_is_lov_delay_create(it->it_flags)) {
307                 /* open(O_LOV_DELAY_CREATE) won't pack lmm */
308                 LASSERT(lmmsize == 0);
309                 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
310         } else {
311                 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
312                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
313         }
314
315         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
316                              RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
317                              strlen(op_data->op_file_secctx_name) + 1 : 0);
318
319         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
320                              op_data->op_file_secctx_size);
321
322         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
323         if (rc < 0) {
324                 ptlrpc_request_free(req);
325                 RETURN(ERR_PTR(rc));
326         }
327
328         spin_lock(&req->rq_lock);
329         req->rq_replay = req->rq_import->imp_replayable;
330         spin_unlock(&req->rq_lock);
331
332         /* pack the intent */
333         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
334         lit->opc = (__u64)it->it_op;
335
336         /* pack the intended request */
337         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
338                       lmmsize);
339
340         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
341                              obddev->u.cli.cl_max_mds_easize);
342         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
343
344         /**
345          * Inline buffer for possible data from Data-on-MDT files.
346          */
347         req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE, RCL_SERVER,
348                              sizeof(struct niobuf_remote));
349         ptlrpc_request_set_replen(req);
350
351         /* Get real repbuf allocated size as rounded up power of 2 */
352         repsize = size_roundup_power2(req->rq_replen +
353                                       lustre_msg_early_size());
354
355         /* Estimate free space for DoM files in repbuf */
356         repsize -= req->rq_replen - obddev->u.cli.cl_max_mds_easize +
357                    sizeof(struct lov_comp_md_v1) +
358                    sizeof(struct lov_comp_md_entry_v1) +
359                    lov_mds_md_size(0, LOV_MAGIC_V3);
360
361         if (repsize < obddev->u.cli.cl_dom_min_inline_repsize) {
362                 repsize = obddev->u.cli.cl_dom_min_inline_repsize - repsize;
363                 req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE,
364                                      RCL_SERVER,
365                                      sizeof(struct niobuf_remote) + repsize);
366                 ptlrpc_request_set_replen(req);
367                 CDEBUG(D_INFO, "Increase repbuf by %d bytes, total: %d\n",
368                        repsize, req->rq_replen);
369         }
370         return req;
371 }
372
373 #define GA_DEFAULT_EA_NAME_LEN 20
374 #define GA_DEFAULT_EA_VAL_LEN  250
375 #define GA_DEFAULT_EA_NUM      10
376
377 static struct ptlrpc_request *
378 mdc_intent_getxattr_pack(struct obd_export *exp,
379                          struct lookup_intent *it,
380                          struct md_op_data *op_data)
381 {
382         struct ptlrpc_request   *req;
383         struct ldlm_intent      *lit;
384         int                     rc, count = 0;
385         struct list_head        cancels = LIST_HEAD_INIT(cancels);
386         u32 min_buf_size = 0;
387
388         ENTRY;
389
390         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
391                                         &RQF_LDLM_INTENT_GETXATTR);
392         if (req == NULL)
393                 RETURN(ERR_PTR(-ENOMEM));
394
395         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
396         if (rc) {
397                 ptlrpc_request_free(req);
398                 RETURN(ERR_PTR(rc));
399         }
400
401         /* pack the intent */
402         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
403         lit->opc = IT_GETXATTR;
404
405 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 0, 53, 0)
406         /* If the supplied buffer is too small then the server will
407          * return -ERANGE and llite will fallback to using non cached
408          * xattr operations. On servers before 2.10.1 a (non-cached)
409          * listxattr RPC for an orphan or dead file causes an oops. So
410          * let's try to avoid sending too small a buffer to too old a
411          * server. This is effectively undoing the memory conservation
412          * of LU-9417 when it would be *more* likely to crash the
413          * server. See LU-9856. */
414         if (exp->exp_connect_data.ocd_version < OBD_OCD_VERSION(2, 10, 1, 0))
415                 min_buf_size = exp->exp_connect_data.ocd_max_easize;
416 #endif
417
418         /* pack the intended request */
419         mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid,
420                       max_t(u32, min_buf_size,
421                             GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM),
422                       -1, 0);
423
424         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
425                              max_t(u32, min_buf_size,
426                                  GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM));
427
428         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER,
429                              max_t(u32, min_buf_size,
430                                  GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM));
431
432         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS, RCL_SERVER,
433                              max_t(u32, min_buf_size,
434                                  sizeof(__u32) * GA_DEFAULT_EA_NUM));
435
436         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
437
438         ptlrpc_request_set_replen(req);
439
440         RETURN(req);
441 }
442
443 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
444                                                      struct lookup_intent *it,
445                                                      struct md_op_data *op_data)
446 {
447         struct ptlrpc_request *req;
448         struct obd_device     *obddev = class_exp2obd(exp);
449         struct ldlm_intent    *lit;
450         int                    rc;
451         ENTRY;
452
453         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
454                                    &RQF_LDLM_INTENT_UNLINK);
455         if (req == NULL)
456                 RETURN(ERR_PTR(-ENOMEM));
457
458         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
459                              op_data->op_namelen + 1);
460
461         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
462         if (rc) {
463                 ptlrpc_request_free(req);
464                 RETURN(ERR_PTR(rc));
465         }
466
467         /* pack the intent */
468         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
469         lit->opc = (__u64)it->it_op;
470
471         /* pack the intended request */
472         mdc_unlink_pack(req, op_data);
473
474         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
475                              obddev->u.cli.cl_default_mds_easize);
476         ptlrpc_request_set_replen(req);
477         RETURN(req);
478 }
479
480 static struct ptlrpc_request *
481 mdc_intent_getattr_pack(struct obd_export *exp, struct lookup_intent *it,
482                         struct md_op_data *op_data, __u32 acl_bufsize)
483 {
484         struct ptlrpc_request   *req;
485         struct obd_device       *obddev = class_exp2obd(exp);
486         u64                      valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
487                                          OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
488                                          OBD_MD_MEA | OBD_MD_FLACL;
489         struct ldlm_intent      *lit;
490         int                      rc;
491         __u32                    easize;
492         ENTRY;
493
494         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
495                                    &RQF_LDLM_INTENT_GETATTR);
496         if (req == NULL)
497                 RETURN(ERR_PTR(-ENOMEM));
498
499         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
500                              op_data->op_namelen + 1);
501
502         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
503         if (rc) {
504                 ptlrpc_request_free(req);
505                 RETURN(ERR_PTR(rc));
506         }
507
508         /* pack the intent */
509         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
510         lit->opc = (__u64)it->it_op;
511
512         if (obddev->u.cli.cl_default_mds_easize > 0)
513                 easize = obddev->u.cli.cl_default_mds_easize;
514         else
515                 easize = obddev->u.cli.cl_max_mds_easize;
516
517         /* pack the intended request */
518         mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
519
520         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
521         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
522         ptlrpc_request_set_replen(req);
523         RETURN(req);
524 }
525
526 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
527                                                      struct lookup_intent *it,
528                                                      struct md_op_data *op_data)
529 {
530         struct obd_device     *obd = class_exp2obd(exp);
531         struct ptlrpc_request *req;
532         struct ldlm_intent    *lit;
533         struct layout_intent  *layout;
534         int rc;
535         ENTRY;
536
537         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
538                                 &RQF_LDLM_INTENT_LAYOUT);
539         if (req == NULL)
540                 RETURN(ERR_PTR(-ENOMEM));
541
542         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
543         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
544         if (rc) {
545                 ptlrpc_request_free(req);
546                 RETURN(ERR_PTR(rc));
547         }
548
549         /* pack the intent */
550         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
551         lit->opc = (__u64)it->it_op;
552
553         /* pack the layout intent request */
554         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
555         LASSERT(op_data->op_data != NULL);
556         LASSERT(op_data->op_data_size == sizeof(*layout));
557         memcpy(layout, op_data->op_data, sizeof(*layout));
558
559         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
560                              obd->u.cli.cl_default_mds_easize);
561         ptlrpc_request_set_replen(req);
562         RETURN(req);
563 }
564
565 static struct ptlrpc_request *
566 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
567 {
568         struct ptlrpc_request *req;
569         int rc;
570         ENTRY;
571
572         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
573         if (req == NULL)
574                 RETURN(ERR_PTR(-ENOMEM));
575
576         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
577         if (rc) {
578                 ptlrpc_request_free(req);
579                 RETURN(ERR_PTR(rc));
580         }
581
582         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
583         ptlrpc_request_set_replen(req);
584         RETURN(req);
585 }
586
587 static int mdc_finish_enqueue(struct obd_export *exp,
588                               struct ptlrpc_request *req,
589                               struct ldlm_enqueue_info *einfo,
590                               struct lookup_intent *it,
591                               struct lustre_handle *lockh,
592                               int rc)
593 {
594         struct req_capsule  *pill = &req->rq_pill;
595         struct ldlm_request *lockreq;
596         struct ldlm_reply   *lockrep;
597         struct ldlm_lock    *lock;
598         struct mdt_body     *body = NULL;
599         void                *lvb_data = NULL;
600         __u32                lvb_len = 0;
601
602         ENTRY;
603
604         LASSERT(rc >= 0);
605         /* Similarly, if we're going to replay this request, we don't want to
606          * actually get a lock, just perform the intent. */
607         if (req->rq_transno || req->rq_replay) {
608                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
609                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
610         }
611
612         if (rc == ELDLM_LOCK_ABORTED) {
613                 einfo->ei_mode = 0;
614                 memset(lockh, 0, sizeof(*lockh));
615                 rc = 0;
616         } else { /* rc = 0 */
617                 lock = ldlm_handle2lock(lockh);
618                 LASSERT(lock != NULL);
619
620                 /* If the server gave us back a different lock mode, we should
621                  * fix up our variables. */
622                 if (lock->l_req_mode != einfo->ei_mode) {
623                         ldlm_lock_addref(lockh, lock->l_req_mode);
624                         ldlm_lock_decref(lockh, einfo->ei_mode);
625                         einfo->ei_mode = lock->l_req_mode;
626                 }
627                 LDLM_LOCK_PUT(lock);
628         }
629
630         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
631         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
632
633         it->it_disposition = (int)lockrep->lock_policy_res1;
634         it->it_status = (int)lockrep->lock_policy_res2;
635         it->it_lock_mode = einfo->ei_mode;
636         it->it_lock_handle = lockh->cookie;
637         it->it_request = req;
638
639         /* Technically speaking rq_transno must already be zero if
640          * it_status is in error, so the check is a bit redundant */
641         if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
642                 mdc_clear_replay_flag(req, it->it_status);
643
644         /* If we're doing an IT_OPEN which did not result in an actual
645          * successful open, then we need to remove the bit which saves
646          * this request for unconditional replay.
647          *
648          * It's important that we do this first!  Otherwise we might exit the
649          * function without doing so, and try to replay a failed create
650          * (bug 3440) */
651         if (it->it_op & IT_OPEN && req->rq_replay &&
652             (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
653                 mdc_clear_replay_flag(req, it->it_status);
654
655         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
656                   it->it_op, it->it_disposition, it->it_status);
657
658         /* We know what to expect, so we do any byte flipping required here */
659         if (it_has_reply_body(it)) {
660                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
661                 if (body == NULL) {
662                         CERROR ("Can't swab mdt_body\n");
663                         RETURN (-EPROTO);
664                 }
665
666                 if (it_disposition(it, DISP_OPEN_OPEN) &&
667                     !it_open_error(DISP_OPEN_OPEN, it)) {
668                         /*
669                          * If this is a successful OPEN request, we need to set
670                          * replay handler and data early, so that if replay
671                          * happens immediately after swabbing below, new reply
672                          * is swabbed by that handler correctly.
673                          */
674                         mdc_set_open_replay_data(NULL, NULL, it);
675                 }
676
677                 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
678                         void *eadata;
679
680                         mdc_update_max_ea_from_body(exp, body);
681
682                         /*
683                          * The eadata is opaque; just check that it is there.
684                          * Eventually, obd_unpackmd() will check the contents.
685                          */
686                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
687                                                         body->mbo_eadatasize);
688                         if (eadata == NULL)
689                                 RETURN(-EPROTO);
690
691                         /* save lvb data and length in case this is for layout
692                          * lock */
693                         lvb_data = eadata;
694                         lvb_len = body->mbo_eadatasize;
695
696                         /*
697                          * We save the reply LOV EA in case we have to replay a
698                          * create for recovery.  If we didn't allocate a large
699                          * enough request buffer above we need to reallocate it
700                          * here to hold the actual LOV EA.
701                          *
702                          * To not save LOV EA if request is not going to replay
703                          * (for example error one).
704                          */
705                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
706                                 rc = mdc_save_lovea(req, &RMF_EADATA, eadata,
707                                                     body->mbo_eadatasize);
708                                 if (rc) {
709                                         body->mbo_valid &= ~OBD_MD_FLEASIZE;
710                                         body->mbo_eadatasize = 0;
711                                         rc = 0;
712                                 }
713                         }
714                 }
715         } else if (it->it_op & IT_LAYOUT) {
716                 /* maybe the lock was granted right away and layout
717                  * is packed into RMF_DLM_LVB of req */
718                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
719                 if (lvb_len > 0) {
720                         lvb_data = req_capsule_server_sized_get(pill,
721                                                         &RMF_DLM_LVB, lvb_len);
722                         if (lvb_data == NULL)
723                                 RETURN(-EPROTO);
724
725                         /**
726                          * save replied layout data to the request buffer for
727                          * recovery consideration (lest MDS reinitialize
728                          * another set of OST objects).
729                          */
730                         if (req->rq_transno)
731                                 (void)mdc_save_lovea(req, &RMF_EADATA, lvb_data,
732                                                      lvb_len);
733                 }
734         }
735
736         /* fill in stripe data for layout lock.
737          * LU-6581: trust layout data only if layout lock is granted. The MDT
738          * has stopped sending layout unless the layout lock is granted. The
739          * client still does this checking in case it's talking with an old
740          * server. - Jinshan */
741         lock = ldlm_handle2lock(lockh);
742         if (lock == NULL)
743                 RETURN(rc);
744
745         if (ldlm_has_layout(lock) && lvb_data != NULL &&
746             !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
747                 void *lmm;
748
749                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
750                         ldlm_it2str(it->it_op), lvb_len);
751
752                 OBD_ALLOC_LARGE(lmm, lvb_len);
753                 if (lmm == NULL)
754                         GOTO(out_lock, rc = -ENOMEM);
755
756                 memcpy(lmm, lvb_data, lvb_len);
757
758                 /* install lvb_data */
759                 lock_res_and_lock(lock);
760                 if (lock->l_lvb_data == NULL) {
761                         lock->l_lvb_type = LVB_T_LAYOUT;
762                         lock->l_lvb_data = lmm;
763                         lock->l_lvb_len = lvb_len;
764                         lmm = NULL;
765                 }
766                 unlock_res_and_lock(lock);
767                 if (lmm != NULL)
768                         OBD_FREE_LARGE(lmm, lvb_len);
769         }
770
771         if (ldlm_has_dom(lock)) {
772                 LASSERT(lock->l_glimpse_ast == mdc_ldlm_glimpse_ast);
773
774                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
775                 if (!(body->mbo_valid & OBD_MD_DOM_SIZE)) {
776                         LDLM_ERROR(lock, "%s: DoM lock without size.\n",
777                                    exp->exp_obd->obd_name);
778                         GOTO(out_lock, rc = -EPROTO);
779                 }
780
781                 LDLM_DEBUG(lock, "DoM lock is returned by: %s, size: %llu",
782                            ldlm_it2str(it->it_op), body->mbo_dom_size);
783
784                 rc = mdc_fill_lvb(req, &lock->l_ost_lvb);
785         }
786 out_lock:
787         LDLM_LOCK_PUT(lock);
788
789         RETURN(rc);
790 }
791
792 /* We always reserve enough space in the reply packet for a stripe MD, because
793  * we don't know in advance the file type. */
794 static int mdc_enqueue_base(struct obd_export *exp,
795                             struct ldlm_enqueue_info *einfo,
796                             const union ldlm_policy_data *policy,
797                             struct lookup_intent *it,
798                             struct md_op_data *op_data,
799                             struct lustre_handle *lockh,
800                             __u64 extra_lock_flags)
801 {
802         struct obd_device *obddev = class_exp2obd(exp);
803         struct ptlrpc_request *req = NULL;
804         __u64 flags, saved_flags = extra_lock_flags;
805         struct ldlm_res_id res_id;
806         static const union ldlm_policy_data lookup_policy = {
807                                   .l_inodebits = { MDS_INODELOCK_LOOKUP } };
808         static const union ldlm_policy_data update_policy = {
809                                   .l_inodebits = { MDS_INODELOCK_UPDATE } };
810         static const union ldlm_policy_data layout_policy = {
811                                   .l_inodebits = { MDS_INODELOCK_LAYOUT } };
812         static const union ldlm_policy_data getxattr_policy = {
813                                   .l_inodebits = { MDS_INODELOCK_XATTR } };
814         int generation, resends = 0;
815         struct ldlm_reply *lockrep;
816         struct obd_import *imp = class_exp2cliimp(exp);
817         __u32 acl_bufsize;
818         enum lvb_type lvb_type = 0;
819         int rc;
820         ENTRY;
821
822         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
823                  einfo->ei_type);
824         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
825
826         if (it != NULL) {
827                 LASSERT(policy == NULL);
828
829                 saved_flags |= LDLM_FL_HAS_INTENT;
830                 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
831                         policy = &update_policy;
832                 else if (it->it_op & IT_LAYOUT)
833                         policy = &layout_policy;
834                 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
835                         policy = &getxattr_policy;
836                 else
837                         policy = &lookup_policy;
838         }
839
840         generation = obddev->u.cli.cl_import->imp_generation;
841         if (!it || (it->it_op & (IT_CREAT | IT_OPEN_CREAT)))
842                 acl_bufsize = imp->imp_connect_data.ocd_max_easize;
843         else
844                 acl_bufsize = LUSTRE_POSIX_ACL_MAX_SIZE_OLD;
845
846 resend:
847         flags = saved_flags;
848         if (it == NULL) {
849                 /* The only way right now is FLOCK. */
850                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
851                          einfo->ei_type);
852                 res_id.name[3] = LDLM_FLOCK;
853         } else if (it->it_op & IT_OPEN) {
854                 req = mdc_intent_open_pack(exp, it, op_data, acl_bufsize);
855         } else if (it->it_op & IT_UNLINK) {
856                 req = mdc_intent_unlink_pack(exp, it, op_data);
857         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
858                 req = mdc_intent_getattr_pack(exp, it, op_data, acl_bufsize);
859         } else if (it->it_op & IT_READDIR) {
860                 req = mdc_enqueue_pack(exp, 0);
861         } else if (it->it_op & IT_LAYOUT) {
862                 if (!imp_connect_lvb_type(imp))
863                         RETURN(-EOPNOTSUPP);
864                 req = mdc_intent_layout_pack(exp, it, op_data);
865                 lvb_type = LVB_T_LAYOUT;
866         } else if (it->it_op & IT_GETXATTR) {
867                 req = mdc_intent_getxattr_pack(exp, it, op_data);
868         } else {
869                 LBUG();
870                 RETURN(-EINVAL);
871         }
872
873         if (IS_ERR(req))
874                 RETURN(PTR_ERR(req));
875
876         if (resends) {
877                 req->rq_generation_set = 1;
878                 req->rq_import_generation = generation;
879                 req->rq_sent = ktime_get_real_seconds() + resends;
880         }
881
882         /* It is important to obtain modify RPC slot first (if applicable), so
883          * that threads that are waiting for a modify RPC slot are not polluting
884          * our rpcs in flight counter.
885          * We do not do flock request limiting, though */
886         if (it) {
887                 mdc_get_mod_rpc_slot(req, it);
888                 rc = obd_get_request_slot(&obddev->u.cli);
889                 if (rc != 0) {
890                         mdc_put_mod_rpc_slot(req, it);
891                         mdc_clear_replay_flag(req, 0);
892                         ptlrpc_req_finished(req);
893                         RETURN(rc);
894                 }
895         }
896
897         /* With Data-on-MDT the glimpse callback is needed too.
898          * It is set here in advance but not in mdc_finish_enqueue()
899          * to avoid possible races. It is safe to have glimpse handler
900          * for non-DOM locks and costs nothing.*/
901         if (einfo->ei_cb_gl == NULL)
902                 einfo->ei_cb_gl = mdc_ldlm_glimpse_ast;
903
904         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
905                               0, lvb_type, lockh, 0);
906         if (!it) {
907                 /* For flock requests we immediatelly return without further
908                    delay and let caller deal with the rest, since rest of
909                    this function metadata processing makes no sense for flock
910                    requests anyway. But in case of problem during comms with
911                    Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
912                    can not rely on caller and this mainly for F_UNLCKs
913                    (explicits or automatically generated by Kernel to clean
914                    current FLocks upon exit) that can't be trashed */
915                 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
916                     (einfo->ei_type == LDLM_FLOCK) &&
917                     (einfo->ei_mode == LCK_NL))
918                         goto resend;
919                 RETURN(rc);
920         }
921
922         obd_put_request_slot(&obddev->u.cli);
923         mdc_put_mod_rpc_slot(req, it);
924
925         if (rc < 0) {
926                 CDEBUG(D_INFO,
927                       "%s: ldlm_cli_enqueue "DFID":"DFID"=%s failed: rc = %d\n",
928                       obddev->obd_name, PFID(&op_data->op_fid1),
929                       PFID(&op_data->op_fid2), op_data->op_name ?: "", rc);
930
931                 mdc_clear_replay_flag(req, rc);
932                 ptlrpc_req_finished(req);
933                 RETURN(rc);
934         }
935
936         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
937         LASSERT(lockrep != NULL);
938
939         lockrep->lock_policy_res2 =
940                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
941
942         /* Retry infinitely when the server returns -EINPROGRESS for the
943          * intent operation, when server returns -EINPROGRESS for acquiring
944          * intent lock, we'll retry in after_reply(). */
945         if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
946                 mdc_clear_replay_flag(req, rc);
947                 ptlrpc_req_finished(req);
948                 if (generation == obddev->u.cli.cl_import->imp_generation) {
949                         if (signal_pending(current))
950                                 RETURN(-EINTR);
951
952                         resends++;
953                         CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
954                                obddev->obd_name, resends, it->it_op,
955                                PFID(&op_data->op_fid1),
956                                PFID(&op_data->op_fid2));
957                         goto resend;
958                 } else {
959                         CDEBUG(D_HA, "resend cross eviction\n");
960                         RETURN(-EIO);
961                 }
962         }
963
964         if ((int)lockrep->lock_policy_res2 == -ERANGE &&
965             it->it_op & (IT_OPEN | IT_GETATTR | IT_LOOKUP) &&
966             acl_bufsize != imp->imp_connect_data.ocd_max_easize) {
967                 mdc_clear_replay_flag(req, -ERANGE);
968                 ptlrpc_req_finished(req);
969                 acl_bufsize = imp->imp_connect_data.ocd_max_easize;
970                 goto resend;
971         }
972
973         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
974         if (rc < 0) {
975                 if (lustre_handle_is_used(lockh)) {
976                         ldlm_lock_decref(lockh, einfo->ei_mode);
977                         memset(lockh, 0, sizeof(*lockh));
978                 }
979                 ptlrpc_req_finished(req);
980
981                 it->it_lock_handle = 0;
982                 it->it_lock_mode = 0;
983                 it->it_request = NULL;
984         }
985
986         RETURN(rc);
987 }
988
989 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
990                 const union ldlm_policy_data *policy,
991                 struct md_op_data *op_data,
992                 struct lustre_handle *lockh, __u64 extra_lock_flags)
993 {
994         return mdc_enqueue_base(exp, einfo, policy, NULL,
995                                 op_data, lockh, extra_lock_flags);
996 }
997
998 static int mdc_finish_intent_lock(struct obd_export *exp,
999                                   struct ptlrpc_request *request,
1000                                   struct md_op_data *op_data,
1001                                   struct lookup_intent *it,
1002                                   struct lustre_handle *lockh)
1003 {
1004         struct lustre_handle old_lock;
1005         struct ldlm_lock *lock;
1006         int rc = 0;
1007         ENTRY;
1008
1009         LASSERT(request != NULL);
1010         LASSERT(request != LP_POISON);
1011         LASSERT(request->rq_repmsg != LP_POISON);
1012
1013         if (it->it_op & IT_READDIR)
1014                 RETURN(0);
1015
1016         if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) {
1017                 if (it->it_status != 0)
1018                         GOTO(out, rc = it->it_status);
1019         } else {
1020                 if (!it_disposition(it, DISP_IT_EXECD)) {
1021                         /* The server failed before it even started executing
1022                          * the intent, i.e. because it couldn't unpack the
1023                          * request.
1024                          */
1025                         LASSERT(it->it_status != 0);
1026                         GOTO(out, rc = it->it_status);
1027                 }
1028                 rc = it_open_error(DISP_IT_EXECD, it);
1029                 if (rc)
1030                         GOTO(out, rc);
1031
1032                 rc = it_open_error(DISP_LOOKUP_EXECD, it);
1033                 if (rc)
1034                         GOTO(out, rc);
1035
1036                 /* keep requests around for the multiple phases of the call
1037                  * this shows the DISP_XX must guarantee we make it into the
1038                  * call
1039                  */
1040                 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
1041                     it_disposition(it, DISP_OPEN_CREATE) &&
1042                     !it_open_error(DISP_OPEN_CREATE, it)) {
1043                         it_set_disposition(it, DISP_ENQ_CREATE_REF);
1044                         /* balanced in ll_create_node */
1045                         ptlrpc_request_addref(request);
1046                 }
1047                 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1048                     it_disposition(it, DISP_OPEN_OPEN) &&
1049                     !it_open_error(DISP_OPEN_OPEN, it)) {
1050                         it_set_disposition(it, DISP_ENQ_OPEN_REF);
1051                         /* balanced in ll_file_open */
1052                         ptlrpc_request_addref(request);
1053                         /* BUG 11546 - eviction in the middle of open rpc
1054                          * processing
1055                          */
1056                         OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE,
1057                                          obd_timeout);
1058                 }
1059
1060                 if (it->it_op & IT_CREAT) {
1061                         /* XXX this belongs in ll_create_it */
1062                 } else if (it->it_op == IT_OPEN) {
1063                         LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1064                 } else {
1065                         LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
1066                 }
1067         }
1068
1069         /* If we already have a matching lock, then cancel the new
1070          * one.  We have to set the data here instead of in
1071          * mdc_enqueue, because we need to use the child's inode as
1072          * the l_ast_data to match, and that's not available until
1073          * intent_finish has performed the iget().) */
1074         lock = ldlm_handle2lock(lockh);
1075         if (lock) {
1076                 union ldlm_policy_data policy = lock->l_policy_data;
1077                 LDLM_DEBUG(lock, "matching against this");
1078
1079                 if (it_has_reply_body(it)) {
1080                         struct mdt_body *body;
1081
1082                         body = req_capsule_server_get(&request->rq_pill,
1083                                                       &RMF_MDT_BODY);
1084                         /* mdc_enqueue checked */
1085                         LASSERT(body != NULL);
1086                         LASSERTF(fid_res_name_eq(&body->mbo_fid1,
1087                                                  &lock->l_resource->lr_name),
1088                                  "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1089                                  PLDLMRES(lock->l_resource),
1090                                  PFID(&body->mbo_fid1));
1091                 }
1092                 LDLM_LOCK_PUT(lock);
1093
1094                 memcpy(&old_lock, lockh, sizeof(*lockh));
1095                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1096                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1097                         ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
1098                         memcpy(lockh, &old_lock, sizeof(old_lock));
1099                         it->it_lock_handle = lockh->cookie;
1100                 }
1101         }
1102
1103         EXIT;
1104 out:
1105         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1106                 (int)op_data->op_namelen, op_data->op_name,
1107                 ldlm_it2str(it->it_op), it->it_status,
1108                 it->it_disposition, rc);
1109         return rc;
1110 }
1111
1112 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1113                         struct lu_fid *fid, __u64 *bits)
1114 {
1115         /* We could just return 1 immediately, but since we should only
1116          * be called in revalidate_it if we already have a lock, let's
1117          * verify that. */
1118         struct ldlm_res_id res_id;
1119         struct lustre_handle lockh;
1120         union ldlm_policy_data policy;
1121         enum ldlm_mode mode;
1122         ENTRY;
1123
1124         if (it->it_lock_handle) {
1125                 lockh.cookie = it->it_lock_handle;
1126                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1127         } else {
1128                 fid_build_reg_res_name(fid, &res_id);
1129                 switch (it->it_op) {
1130                 case IT_GETATTR:
1131                         /* File attributes are held under multiple bits:
1132                          * nlink is under lookup lock, size and times are
1133                          * under UPDATE lock and recently we've also got
1134                          * a separate permissions lock for owner/group/acl that
1135                          * were protected by lookup lock before.
1136                          * Getattr must provide all of that information,
1137                          * so we need to ensure we have all of those locks.
1138                          * Unfortunately, if the bits are split across multiple
1139                          * locks, there's no easy way to match all of them here,
1140                          * so an extra RPC would be performed to fetch all
1141                          * of those bits at once for now. */
1142                         /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1143                          * but for old MDTs (< 2.4), permission is covered
1144                          * by LOOKUP lock, so it needs to match all bits here.*/
1145                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1146                                                   MDS_INODELOCK_LOOKUP |
1147                                                   MDS_INODELOCK_PERM;
1148                         break;
1149                 case IT_READDIR:
1150                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1151                         break;
1152                 case IT_LAYOUT:
1153                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1154                         break;
1155                 default:
1156                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1157                         break;
1158                 }
1159
1160                 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1161                                       LDLM_IBITS, &policy,
1162                                       LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1163                                       &lockh);
1164         }
1165
1166         if (mode) {
1167                 it->it_lock_handle = lockh.cookie;
1168                 it->it_lock_mode = mode;
1169         } else {
1170                 it->it_lock_handle = 0;
1171                 it->it_lock_mode = 0;
1172         }
1173
1174         RETURN(!!mode);
1175 }
1176
1177 /*
1178  * This long block is all about fixing up the lock and request state
1179  * so that it is correct as of the moment _before_ the operation was
1180  * applied; that way, the VFS will think that everything is normal and
1181  * call Lustre's regular VFS methods.
1182  *
1183  * If we're performing a creation, that means that unless the creation
1184  * failed with EEXIST, we should fake up a negative dentry.
1185  *
1186  * For everything else, we want to lookup to succeed.
1187  *
1188  * One additional note: if CREATE or OPEN succeeded, we add an extra
1189  * reference to the request because we need to keep it around until
1190  * ll_create/ll_open gets called.
1191  *
1192  * The server will return to us, in it_disposition, an indication of
1193  * exactly what it_status refers to.
1194  *
1195  * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1196  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1197  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1198  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1199  * was successful.
1200  *
1201  * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1202  * child lookup.
1203  */
1204 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1205                     struct lookup_intent *it, struct ptlrpc_request **reqp,
1206                     ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1207 {
1208         struct ldlm_enqueue_info einfo = {
1209                 .ei_type        = LDLM_IBITS,
1210                 .ei_mode        = it_to_lock_mode(it),
1211                 .ei_cb_bl       = cb_blocking,
1212                 .ei_cb_cp       = ldlm_completion_ast,
1213                 .ei_cb_gl       = mdc_ldlm_glimpse_ast,
1214         };
1215         struct lustre_handle lockh;
1216         int rc = 0;
1217         ENTRY;
1218         LASSERT(it);
1219
1220         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1221                 ", intent: %s flags %#llo\n", (int)op_data->op_namelen,
1222                 op_data->op_name, PFID(&op_data->op_fid2),
1223                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1224                 it->it_flags);
1225
1226         lockh.cookie = 0;
1227         if (fid_is_sane(&op_data->op_fid2) &&
1228             (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1229                 /* We could just return 1 immediately, but since we should only
1230                  * be called in revalidate_it if we already have a lock, let's
1231                  * verify that. */
1232                 it->it_lock_handle = 0;
1233                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1234                 /* Only return failure if it was not GETATTR by cfid
1235                    (from inode_revalidate) */
1236                 if (rc || op_data->op_namelen != 0)
1237                         RETURN(rc);
1238         }
1239
1240         /* For case if upper layer did not alloc fid, do it now. */
1241         if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1242                 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1243                 if (rc < 0) {
1244                         CERROR("Can't alloc new fid, rc %d\n", rc);
1245                         RETURN(rc);
1246                 }
1247         }
1248
1249         rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh,
1250                               extra_lock_flags);
1251         if (rc < 0)
1252                 RETURN(rc);
1253
1254         *reqp = it->it_request;
1255         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1256         RETURN(rc);
1257 }
1258
1259 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1260                                               struct ptlrpc_request *req,
1261                                               void *args, int rc)
1262 {
1263         struct mdc_getattr_args  *ga = args;
1264         struct obd_export        *exp = ga->ga_exp;
1265         struct md_enqueue_info   *minfo = ga->ga_minfo;
1266         struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1267         struct lookup_intent     *it;
1268         struct lustre_handle     *lockh;
1269         struct obd_device        *obddev;
1270         struct ldlm_reply        *lockrep;
1271         __u64                     flags = LDLM_FL_HAS_INTENT;
1272         ENTRY;
1273
1274         it    = &minfo->mi_it;
1275         lockh = &minfo->mi_lockh;
1276
1277         obddev = class_exp2obd(exp);
1278
1279         obd_put_request_slot(&obddev->u.cli);
1280         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1281                 rc = -ETIMEDOUT;
1282
1283         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1284                                    &flags, NULL, 0, lockh, rc);
1285         if (rc < 0) {
1286                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1287                 mdc_clear_replay_flag(req, rc);
1288                 GOTO(out, rc);
1289         }
1290
1291         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1292         LASSERT(lockrep != NULL);
1293
1294         lockrep->lock_policy_res2 =
1295                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1296
1297         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1298         if (rc)
1299                 GOTO(out, rc);
1300
1301         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1302         EXIT;
1303
1304 out:
1305         minfo->mi_cb(req, minfo, rc);
1306         return 0;
1307 }
1308
1309 int mdc_intent_getattr_async(struct obd_export *exp,
1310                              struct md_enqueue_info *minfo)
1311 {
1312         struct md_op_data       *op_data = &minfo->mi_data;
1313         struct lookup_intent    *it = &minfo->mi_it;
1314         struct ptlrpc_request   *req;
1315         struct mdc_getattr_args *ga;
1316         struct obd_device       *obddev = class_exp2obd(exp);
1317         struct ldlm_res_id       res_id;
1318         union ldlm_policy_data policy = {
1319                                 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1320                                                  MDS_INODELOCK_UPDATE } };
1321         int                      rc = 0;
1322         __u64                    flags = LDLM_FL_HAS_INTENT;
1323         ENTRY;
1324
1325         CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#llo\n",
1326                 (int)op_data->op_namelen, op_data->op_name,
1327                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1328
1329         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1330         /* If the MDT return -ERANGE because of large ACL, then the sponsor
1331          * of the async getattr RPC will handle that by itself. */
1332         req = mdc_intent_getattr_pack(exp, it, op_data,
1333                                       LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
1334         if (IS_ERR(req))
1335                 RETURN(PTR_ERR(req));
1336
1337         rc = obd_get_request_slot(&obddev->u.cli);
1338         if (rc != 0) {
1339                 ptlrpc_req_finished(req);
1340                 RETURN(rc);
1341         }
1342
1343         /* With Data-on-MDT the glimpse callback is needed too.
1344          * It is set here in advance but not in mdc_finish_enqueue()
1345          * to avoid possible races. It is safe to have glimpse handler
1346          * for non-DOM locks and costs nothing.*/
1347         if (minfo->mi_einfo.ei_cb_gl == NULL)
1348                 minfo->mi_einfo.ei_cb_gl = mdc_ldlm_glimpse_ast;
1349
1350         rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1351                               &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1352         if (rc < 0) {
1353                 obd_put_request_slot(&obddev->u.cli);
1354                 ptlrpc_req_finished(req);
1355                 RETURN(rc);
1356         }
1357
1358         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1359         ga = ptlrpc_req_async_args(req);
1360         ga->ga_exp = exp;
1361         ga->ga_minfo = minfo;
1362
1363         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1364         ptlrpcd_add_req(req);
1365
1366         RETURN(0);
1367 }