Whamcloud - gitweb
LU-13577 wbc: reimplement mkdir() by using intent lock
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  */
31
32 #define DEBUG_SUBSYSTEM S_MDC
33
34 #include <linux/module.h>
35
36 #include <obd.h>
37 #include <obd_class.h>
38 #include <lustre_dlm.h>
39 #include <lustre_fid.h>
40 #include <lustre_intent.h>
41 #include <lustre_mdc.h>
42 #include <lustre_net.h>
43 #include <lustre_req_layout.h>
44 #include <lustre_swab.h>
45 #include <lustre_acl.h>
46
47 #include "mdc_internal.h"
48
49 struct mdc_getattr_args {
50         struct obd_export       *ga_exp;
51         struct md_op_item       *ga_item;
52 };
53
54 int it_open_error(int phase, struct lookup_intent *it)
55 {
56         if (it_disposition(it, DISP_OPEN_LEASE)) {
57                 if (phase >= DISP_OPEN_LEASE)
58                         return it->it_status;
59                 else
60                         return 0;
61         }
62         if (it_disposition(it, DISP_OPEN_OPEN)) {
63                 if (phase >= DISP_OPEN_OPEN)
64                         return it->it_status;
65                 else
66                         return 0;
67         }
68
69         if (it_disposition(it, DISP_OPEN_CREATE)) {
70                 if (phase >= DISP_OPEN_CREATE)
71                         return it->it_status;
72                 else
73                         return 0;
74         }
75
76         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
77                 if (phase >= DISP_LOOKUP_EXECD)
78                         return it->it_status;
79                 else
80                         return 0;
81         }
82
83         if (it_disposition(it, DISP_IT_EXECD)) {
84                 if (phase >= DISP_IT_EXECD)
85                         return it->it_status;
86                 else
87                         return 0;
88         }
89
90         CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
91         LBUG();
92
93         return 0;
94 }
95 EXPORT_SYMBOL(it_open_error);
96
97 /* this must be called on a lockh that is known to have a referenced lock */
98 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
99                       void *data, __u64 *bits)
100 {
101         struct ldlm_lock *lock;
102         struct inode *new_inode = data;
103
104         ENTRY;
105         if (bits)
106                 *bits = 0;
107
108         if (!lustre_handle_is_used(lockh))
109                 RETURN(0);
110
111         lock = ldlm_handle2lock(lockh);
112
113         LASSERT(lock != NULL);
114         lock_res_and_lock(lock);
115         if (lock->l_resource->lr_lvb_inode &&
116             lock->l_resource->lr_lvb_inode != data) {
117                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
118
119                 LASSERTF(old_inode->i_state & I_FREEING,
120                          "Found existing inode %px/%lu/%u state %lu in lock: setting data to %px/%lu/%u\n",
121                          old_inode, old_inode->i_ino, old_inode->i_generation,
122                          old_inode->i_state,
123                          new_inode, new_inode->i_ino, new_inode->i_generation);
124         }
125         lock->l_resource->lr_lvb_inode = new_inode;
126         if (bits)
127                 *bits = lock->l_policy_data.l_inodebits.bits;
128
129         unlock_res_and_lock(lock);
130         LDLM_LOCK_PUT(lock);
131
132         RETURN(0);
133 }
134
135 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
136                               const struct lu_fid *fid, enum ldlm_type type,
137                               union ldlm_policy_data *policy,
138                               enum ldlm_mode mode, struct lustre_handle *lockh)
139 {
140         struct ldlm_res_id res_id;
141         enum ldlm_mode rc;
142
143         ENTRY;
144         fid_build_reg_res_name(fid, &res_id);
145         /* LU-4405: Clear bits not supported by server */
146         policy->l_inodebits.bits &= exp_connect_ibits(exp);
147         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
148                              &res_id, type, policy, mode, lockh);
149         RETURN(rc);
150 }
151
152 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
153                       union ldlm_policy_data *policy, enum ldlm_mode mode,
154                       enum ldlm_cancel_flags flags, void *opaque)
155 {
156         struct obd_device *obd = class_exp2obd(exp);
157         struct ldlm_res_id res_id;
158         int rc;
159
160         ENTRY;
161         fid_build_reg_res_name(fid, &res_id);
162         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
163                                              policy, mode, flags, opaque);
164         RETURN(rc);
165 }
166
167 int mdc_null_inode(struct obd_export *exp,
168                    const struct lu_fid *fid)
169 {
170         struct ldlm_res_id res_id;
171         struct ldlm_resource *res;
172         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
173
174         ENTRY;
175         LASSERTF(ns != NULL, "no namespace passed\n");
176
177         fid_build_reg_res_name(fid, &res_id);
178
179         res = ldlm_resource_get(ns, &res_id, 0, 0);
180         if (IS_ERR(res))
181                 RETURN(0);
182
183         lock_res(res);
184         res->lr_lvb_inode = NULL;
185         unlock_res(res);
186
187         ldlm_resource_putref(res);
188         RETURN(0);
189 }
190
191 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
192 {
193         /* Don't hold error requests for replay. */
194         if (req->rq_replay) {
195                 spin_lock(&req->rq_lock);
196                 req->rq_replay = 0;
197                 spin_unlock(&req->rq_lock);
198         }
199         if (rc && req->rq_transno != 0) {
200                 DEBUG_REQ(D_ERROR, req, "transno returned on error: rc = %d",
201                           rc);
202                 LBUG();
203         }
204 }
205
206 /**
207  * Save a large LOV/LMV EA into the request buffer so that it is available
208  * for replay.  We don't do this in the initial request because the
209  * original request doesn't need this buffer (at most it sends just the
210  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
211  * buffer and may also be difficult to allocate and save a very large
212  * request buffer for each open. (b=5707)
213  *
214  * OOM here may cause recovery failure if lmm is needed (only for the
215  * original open if the MDS crashed just when this client also OOM'd)
216  * but this is incredibly unlikely, and questionable whether the client
217  * could do MDS recovery under OOM anyways...
218  */
219 int mdc_save_lmm(struct ptlrpc_request *req, void *data, u32 size)
220 {
221         struct req_capsule *pill = &req->rq_pill;
222         void *lmm;
223         int rc = 0;
224
225         if (req_capsule_get_size(pill, &RMF_EADATA, RCL_CLIENT) < size) {
226                 rc = sptlrpc_cli_enlarge_reqbuf(req, &RMF_EADATA, size);
227                 if (rc) {
228                         CERROR("%s: Can't enlarge ea size to %d: rc = %d\n",
229                                req->rq_export->exp_obd->obd_name,
230                                size, rc);
231                         return rc;
232                 }
233         } else {
234                 req_capsule_shrink(pill, &RMF_EADATA, size, RCL_CLIENT);
235         }
236
237         req_capsule_set_size(pill, &RMF_EADATA, RCL_CLIENT, size);
238         lmm = req_capsule_client_get(pill, &RMF_EADATA);
239         if (lmm) {
240                 memcpy(lmm, data, size);
241                 lov_fix_ea_for_replay(lmm);
242         }
243
244         return rc;
245 }
246
247 static struct ptlrpc_request *
248 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
249                      struct md_op_data *op_data, __u32 acl_bufsize)
250 {
251         struct ptlrpc_request *req;
252         struct obd_device *obd = class_exp2obd(exp);
253         struct ldlm_intent *lit;
254         const void *lmm = op_data->op_data;
255         __u32 lmmsize = op_data->op_data_size;
256         __u32  mdt_md_capsule_size;
257         LIST_HEAD(cancels);
258         int count = 0;
259         enum ldlm_mode mode;
260         int repsize, repsize_estimate;
261         struct sptlrpc_sepol *sepol;
262         int rc;
263
264         ENTRY;
265
266         mdt_md_capsule_size = obd->u.cli.cl_default_mds_easize;
267
268         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
269
270         /* XXX: openlock is not cancelled for cross-refs. */
271         /* If inode is known, cancel conflicting OPEN locks. */
272         if (fid_is_sane(&op_data->op_fid2)) {
273                 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
274                         if (it->it_flags & MDS_FMODE_WRITE)
275                                 mode = LCK_EX;
276                         else
277                                 mode = LCK_PR;
278                 } else {
279                         if (it->it_flags & (MDS_FMODE_WRITE | MDS_OPEN_TRUNC))
280                                 mode = LCK_CW;
281 #ifdef FMODE_EXEC
282                         else if (it->it_flags & FMODE_EXEC)
283                                 mode = LCK_PR;
284 #endif
285                         else
286                                 mode = LCK_CR;
287                 }
288                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
289                                                 &cancels, mode,
290                                                 MDS_INODELOCK_OPEN);
291         }
292
293         /* If CREATE, cancel parent's UPDATE lock. */
294         if (it->it_op & IT_CREAT)
295                 mode = LCK_EX;
296         else
297                 mode = LCK_CR;
298         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
299                                          &cancels, mode,
300                                          MDS_INODELOCK_UPDATE);
301
302         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
303                                    &RQF_LDLM_INTENT_OPEN);
304         if (req == NULL) {
305                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
306                 RETURN(ERR_PTR(-ENOMEM));
307         }
308
309         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
310                              op_data->op_namelen + 1);
311         if (cl_is_lov_delay_create(it->it_flags)) {
312                 /* open(O_LOV_DELAY_CREATE) won't pack lmm */
313                 LASSERT(lmmsize == 0);
314                 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
315         } else {
316                 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
317                              max(lmmsize, obd->u.cli.cl_default_mds_easize));
318         }
319
320         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
321                              RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
322                              op_data->op_file_secctx_name_size : 0);
323
324         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
325                              op_data->op_file_secctx_size);
326
327         req_capsule_set_size(&req->rq_pill, &RMF_FILE_ENCCTX, RCL_CLIENT,
328                              op_data->op_file_encctx_size);
329
330         /* get SELinux policy info if any */
331         sepol = sptlrpc_sepol_get(req);
332         if (IS_ERR(sepol))
333                 GOTO(err_free_rq, rc = PTR_ERR(sepol));
334
335         req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
336                              sptlrpc_sepol_size(sepol));
337
338         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
339         if (rc < 0)
340                 GOTO(err_put_sepol, rc);
341
342         spin_lock(&req->rq_lock);
343         req->rq_replay = req->rq_import->imp_replayable;
344         spin_unlock(&req->rq_lock);
345
346         /* pack the intent */
347         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
348         lit->opc = (__u64)it->it_op;
349
350         /* pack the intended request */
351         mdc_open_pack(&req->rq_pill, op_data, it->it_create_mode, 0,
352                       it->it_flags, lmm, lmmsize, sepol);
353
354         sptlrpc_sepol_put(sepol);
355
356         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
357                              mdt_md_capsule_size);
358         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
359
360         if (!(it->it_op & IT_CREAT) && it->it_op & IT_OPEN &&
361             req_capsule_has_field(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
362                                   RCL_CLIENT) &&
363             op_data->op_file_secctx_name_size > 0 &&
364             op_data->op_file_secctx_name != NULL) {
365                 char *secctx_name;
366
367                 secctx_name = req_capsule_client_get(&req->rq_pill,
368                                                      &RMF_FILE_SECCTX_NAME);
369                 memcpy(secctx_name, op_data->op_file_secctx_name,
370                        op_data->op_file_secctx_name_size);
371                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
372                                      RCL_SERVER,
373                                      obd->u.cli.cl_max_mds_easize);
374
375                 CDEBUG(D_SEC, "packed '%.*s' as security xattr name\n",
376                        op_data->op_file_secctx_name_size,
377                        op_data->op_file_secctx_name);
378
379         } else {
380                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
381                                      RCL_SERVER, 0);
382         }
383
384         if (exp_connect_encrypt(exp) && !(it->it_op & IT_CREAT) &&
385             it->it_op & IT_OPEN)
386                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_ENCCTX,
387                                      RCL_SERVER,
388                                      obd->u.cli.cl_max_mds_easize);
389         else
390                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_ENCCTX,
391                                      RCL_SERVER, 0);
392
393         /**
394          * Inline buffer for possible data from Data-on-MDT files.
395          */
396         req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE, RCL_SERVER,
397                              sizeof(struct niobuf_remote));
398         req_capsule_set_size(&req->rq_pill, &RMF_DEFAULT_MDT_MD, RCL_SERVER,
399                              sizeof(struct lmv_user_md));
400         ptlrpc_request_set_replen(req);
401
402         /* Get real repbuf allocated size as rounded up power of 2 */
403         repsize = size_roundup_power2(req->rq_replen +
404                                       lustre_msg_early_size);
405         /* Estimate free space for DoM files in repbuf */
406         repsize_estimate = repsize - (req->rq_replen -
407                            mdt_md_capsule_size +
408                            sizeof(struct lov_comp_md_v1) +
409                            sizeof(struct lov_comp_md_entry_v1) +
410                            lov_mds_md_size(0, LOV_MAGIC_V3));
411
412         if (repsize_estimate < obd->u.cli.cl_dom_min_inline_repsize) {
413                 repsize = obd->u.cli.cl_dom_min_inline_repsize -
414                           repsize_estimate + sizeof(struct niobuf_remote);
415                 req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE,
416                                      RCL_SERVER,
417                                      sizeof(struct niobuf_remote) + repsize);
418                 ptlrpc_request_set_replen(req);
419                 CDEBUG(D_INFO, "Increase repbuf by %d bytes, total: %d\n",
420                        repsize, req->rq_replen);
421                 repsize = size_roundup_power2(req->rq_replen +
422                                               lustre_msg_early_size);
423         }
424         /* The only way to report real allocated repbuf size to the server
425          * is the lm_repsize but it must be set prior buffer allocation itself
426          * due to security reasons - it is part of buffer used in signature
427          * calculation (see LU-11414). Therefore the saved size is predicted
428          * value as rq_replen rounded to the next higher power of 2.
429          * Such estimation is safe. Though the final allocated buffer might
430          * be even larger, it is not possible to know that at this point.
431          */
432         req->rq_reqmsg->lm_repsize = repsize;
433         RETURN(req);
434
435 err_put_sepol:
436         sptlrpc_sepol_put(sepol);
437 err_free_rq:
438         ptlrpc_request_free(req);
439         return ERR_PTR(rc);
440 }
441
442 static struct ptlrpc_request *
443 mdc_intent_create_pack(struct obd_export *exp, struct lookup_intent *it,
444                        struct md_op_data *op_data, __u32 acl_bufsize,
445                        __u64 extra_lock_flags)
446 {
447         LIST_HEAD(cancels);
448         struct ptlrpc_request *req;
449         struct obd_device *obd = class_exp2obd(exp);
450         struct sptlrpc_sepol *sepol;
451         struct ldlm_intent *lit;
452         int count = 0;
453         int rc;
454
455         ENTRY;
456
457         if (fid_is_sane(&op_data->op_fid1))
458                 /* cancel parent's UPDATE lock. */
459                 count = mdc_resource_get_unused(exp, &op_data->op_fid1,
460                                                 &cancels, LCK_EX,
461                                                 MDS_INODELOCK_UPDATE);
462
463         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
464                                    &RQF_LDLM_INTENT_CREATE);
465         if (req == NULL) {
466                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
467                 RETURN(ERR_PTR(-ENOMEM));
468         }
469
470         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
471                              op_data->op_namelen + 1);
472         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
473                              RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
474                              strlen(op_data->op_file_secctx_name) + 1 : 0);
475         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
476                              op_data->op_file_secctx_size);
477         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
478                              op_data->op_data_size);
479         req_capsule_set_size(&req->rq_pill, &RMF_FILE_ENCCTX, RCL_CLIENT,
480                              op_data->op_file_encctx_size);
481
482         /* get SELinux policy info if any */
483         sepol = sptlrpc_sepol_get(req);
484         if (IS_ERR(sepol)) {
485                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
486                 GOTO(err_free_rq, rc = PTR_ERR(sepol));
487         }
488         req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
489                              sptlrpc_sepol_size(sepol));
490
491         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
492         if (rc < 0)
493                 GOTO(err_put_sepol, rc);
494
495         /* Pack the intent */
496         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
497         lit->opc = (__u64)it->it_op;
498
499         /* Pack the intent request. */
500         mdc_create_pack(&req->rq_pill, op_data, op_data->op_data,
501                         op_data->op_data_size, it->it_create_mode,
502                         op_data->op_fsuid, op_data->op_fsgid,
503                         op_data->op_cap, 0, sepol);
504
505         sptlrpc_sepol_put(sepol);
506
507         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
508                              obd->u.cli.cl_default_mds_easize);
509         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
510         req_capsule_set_size(&req->rq_pill, &RMF_DEFAULT_MDT_MD, RCL_SERVER,
511                              sizeof(struct lmv_user_md));
512         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
513                              RCL_SERVER, 0);
514         req_capsule_set_size(&req->rq_pill, &RMF_FILE_ENCCTX, RCL_SERVER, 0);
515
516         ptlrpc_request_set_replen(req);
517         RETURN(req);
518
519 err_put_sepol:
520         sptlrpc_sepol_put(sepol);
521 err_free_rq:
522         ptlrpc_request_free(req);
523         return ERR_PTR(rc);
524 }
525
526 #define GA_DEFAULT_EA_NAME_LEN   20
527 #define GA_DEFAULT_EA_VAL_LEN   250
528 #define GA_DEFAULT_EA_NUM        10
529
530 static struct ptlrpc_request *
531 mdc_intent_getxattr_pack(struct obd_export *exp, struct lookup_intent *it,
532                          struct md_op_data *op_data)
533 {
534         struct ptlrpc_request *req;
535         struct ldlm_intent *lit;
536         struct sptlrpc_sepol *sepol;
537         int rc, count = 0;
538         LIST_HEAD(cancels);
539         u32 ea_vals_buf_size = GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM;
540
541         ENTRY;
542         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
543                                         &RQF_LDLM_INTENT_GETXATTR);
544         if (req == NULL)
545                 RETURN(ERR_PTR(-ENOMEM));
546
547         /* get SELinux policy info if any */
548         sepol = sptlrpc_sepol_get(req);
549         if (IS_ERR(sepol))
550                 GOTO(err_free_rq, rc = PTR_ERR(sepol));
551
552         req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
553                              sptlrpc_sepol_size(sepol));
554
555         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
556         if (rc)
557                 GOTO(err_put_sepol, rc);
558
559         /* pack the intent */
560         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
561         lit->opc = IT_GETXATTR;
562         /* Message below is checked in sanity-selinux test_20d
563          * and sanity-sec test_49
564          */
565         CDEBUG(D_INFO, "%s: get xattrs for "DFID"\n",
566                exp->exp_obd->obd_name, PFID(&op_data->op_fid1));
567
568 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 0, 53, 0)
569         /* If the supplied buffer is too small then the server will return
570          * -ERANGE and llite will fallback to using non cached xattr
571          * operations. On servers before 2.10.1 a (non-cached) listxattr RPC
572          * for an orphan or dead file causes an oops. So let's try to avoid
573          * sending too small a buffer to too old a server. This is effectively
574          * undoing the memory conservation of LU-9417 when it would be *more*
575          * likely to crash the server. See LU-9856.
576          */
577         if (exp->exp_connect_data.ocd_version < OBD_OCD_VERSION(2, 10, 1, 0))
578                 ea_vals_buf_size = max_t(u32, ea_vals_buf_size,
579                                          exp->exp_connect_data.ocd_max_easize);
580 #endif
581
582         /* pack the intended request */
583         mdc_pack_body(&req->rq_pill, &op_data->op_fid1, op_data->op_valid,
584                       ea_vals_buf_size, -1, 0);
585
586         /* get SELinux policy info if any */
587         mdc_file_sepol_pack(&req->rq_pill, sepol);
588         sptlrpc_sepol_put(sepol);
589
590         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
591                              GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM);
592
593         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER,
594                              ea_vals_buf_size);
595
596         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS, RCL_SERVER,
597                              sizeof(u32) * GA_DEFAULT_EA_NUM);
598
599         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
600
601         ptlrpc_request_set_replen(req);
602
603         RETURN(req);
604
605 err_put_sepol:
606         sptlrpc_sepol_put(sepol);
607 err_free_rq:
608         ptlrpc_request_free(req);
609         RETURN(ERR_PTR(rc));
610 }
611
612 static struct ptlrpc_request *
613 mdc_intent_getattr_pack(struct obd_export *exp, struct lookup_intent *it,
614                         struct md_op_data *op_data, __u32 acl_bufsize)
615 {
616         struct ptlrpc_request *req;
617         struct obd_device *obd = class_exp2obd(exp);
618         u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE |
619                     OBD_MD_FLDIREA | OBD_MD_MEA | OBD_MD_FLACL |
620                     OBD_MD_DEFAULT_MEA;
621         struct ldlm_intent *lit;
622         __u32 easize;
623         bool have_secctx = false;
624         int rc;
625
626         ENTRY;
627         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
628                                    &RQF_LDLM_INTENT_GETATTR);
629         if (req == NULL)
630                 RETURN(ERR_PTR(-ENOMEM));
631
632         /* send name of security xattr to get upon intent */
633         if (it->it_op & (IT_LOOKUP | IT_GETATTR) &&
634             req_capsule_has_field(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
635                                   RCL_CLIENT) &&
636             op_data->op_file_secctx_name_size > 0 &&
637             op_data->op_file_secctx_name != NULL) {
638                 have_secctx = true;
639                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
640                                      RCL_CLIENT,
641                                      op_data->op_file_secctx_name_size);
642         }
643
644         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
645                              op_data->op_namelen + 1);
646
647         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
648         if (rc) {
649                 ptlrpc_request_free(req);
650                 RETURN(ERR_PTR(rc));
651         }
652
653         /* pack the intent */
654         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
655         lit->opc = (__u64)it->it_op;
656
657         if (obd->u.cli.cl_default_mds_easize > 0)
658                 easize = obd->u.cli.cl_default_mds_easize;
659         else
660                 easize = obd->u.cli.cl_max_mds_easize;
661
662         /* pack the intended request */
663         mdc_getattr_pack(&req->rq_pill, valid, it->it_flags, op_data, easize);
664
665         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
666         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
667         req_capsule_set_size(&req->rq_pill, &RMF_DEFAULT_MDT_MD, RCL_SERVER,
668                              sizeof(struct lmv_user_md));
669
670         if (have_secctx) {
671                 char *secctx_name;
672
673                 secctx_name = req_capsule_client_get(&req->rq_pill,
674                                                      &RMF_FILE_SECCTX_NAME);
675                 memcpy(secctx_name, op_data->op_file_secctx_name,
676                        op_data->op_file_secctx_name_size);
677
678                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
679                                      RCL_SERVER, easize);
680
681                 CDEBUG(D_SEC, "packed '%.*s' as security xattr name\n",
682                        op_data->op_file_secctx_name_size,
683                        op_data->op_file_secctx_name);
684         } else {
685                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
686                                      RCL_SERVER, 0);
687         }
688
689         if (exp_connect_encrypt(exp) && it->it_op & (IT_LOOKUP | IT_GETATTR))
690                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_ENCCTX,
691                                      RCL_SERVER, easize);
692         else
693                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_ENCCTX,
694                                      RCL_SERVER, 0);
695
696         ptlrpc_request_set_replen(req);
697         RETURN(req);
698 }
699
700 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
701                                                      struct lookup_intent *it,
702                                                      struct md_op_data *op_data)
703 {
704         struct obd_device *obd = class_exp2obd(exp);
705         struct ptlrpc_request *req;
706         struct ldlm_intent *lit;
707         struct layout_intent *layout;
708         LIST_HEAD(cancels);
709         int count = 0, rc;
710
711         ENTRY;
712         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
713                                 &RQF_LDLM_INTENT_LAYOUT);
714         if (req == NULL)
715                 RETURN(ERR_PTR(-ENOMEM));
716
717         if (fid_is_sane(&op_data->op_fid2) && (it->it_op & IT_LAYOUT) &&
718             (it->it_flags & FMODE_WRITE)) {
719                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
720                                                 &cancels, LCK_EX,
721                                                 MDS_INODELOCK_LAYOUT);
722         }
723
724         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
725         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
726         if (rc) {
727                 ptlrpc_request_free(req);
728                 RETURN(ERR_PTR(rc));
729         }
730
731         /* pack the intent */
732         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
733         lit->opc = (__u64)it->it_op;
734
735         /* pack the layout intent request */
736         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
737         LASSERT(op_data->op_data != NULL);
738         LASSERT(op_data->op_data_size == sizeof(*layout));
739         memcpy(layout, op_data->op_data, sizeof(*layout));
740
741         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
742                              obd->u.cli.cl_default_mds_easize);
743         ptlrpc_request_set_replen(req);
744         RETURN(req);
745 }
746
747 static struct ptlrpc_request *mdc_enqueue_pack(struct obd_export *exp,
748                                                int lvb_len)
749 {
750         struct ptlrpc_request *req;
751         int rc;
752
753         ENTRY;
754         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
755         if (req == NULL)
756                 RETURN(ERR_PTR(-ENOMEM));
757
758         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
759         if (rc) {
760                 ptlrpc_request_free(req);
761                 RETURN(ERR_PTR(rc));
762         }
763
764         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
765         ptlrpc_request_set_replen(req);
766         RETURN(req);
767 }
768
769 int mdc_finish_enqueue(struct obd_export *exp,
770                        struct req_capsule *pill,
771                        struct ldlm_enqueue_info *einfo,
772                        struct lookup_intent *it,
773                        struct lustre_handle *lockh, int rc)
774 {
775         struct ptlrpc_request *req = pill->rc_req;
776         struct ldlm_request *lockreq;
777         struct ldlm_reply *lockrep;
778         struct ldlm_lock *lock;
779         struct mdt_body *body = NULL;
780         void *lvb_data = NULL;
781         __u32 lvb_len = 0;
782
783         ENTRY;
784         LASSERT(rc >= 0);
785         /* Similarly, if we're going to replay this request, we don't want to
786          * actually get a lock, just perform the intent.
787          */
788         if (req->rq_transno || req->rq_replay) {
789                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
790                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
791         }
792
793         if (rc == ELDLM_LOCK_ABORTED) {
794                 einfo->ei_mode = 0;
795                 memset(lockh, 0, sizeof(*lockh));
796                 rc = 0;
797         } else { /* rc = 0 */
798                 lock = ldlm_handle2lock(lockh);
799                 LASSERT(lock != NULL);
800
801                 /* If server returned a different lock mode, fix up variables */
802                 if (lock->l_req_mode != einfo->ei_mode) {
803                         ldlm_lock_addref(lockh, lock->l_req_mode);
804                         ldlm_lock_decref(lockh, einfo->ei_mode);
805                         einfo->ei_mode = lock->l_req_mode;
806                 }
807                 LDLM_LOCK_PUT(lock);
808         }
809
810         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
811         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
812
813         it->it_disposition = (int)lockrep->lock_policy_res1;
814         it->it_status = (int)lockrep->lock_policy_res2;
815         it->it_lock_mode = einfo->ei_mode;
816         it->it_lock_handle = lockh->cookie;
817         it->it_request = req;
818
819         /* Technically speaking rq_transno must already be zero if
820          * it_status is in error, so the check is a bit redundant.
821          */
822         if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
823                 mdc_clear_replay_flag(req, it->it_status);
824
825         /* If we're doing an IT_OPEN which did not result in an actual
826          * successful open, then we need to remove the bit which saves
827          * this request for unconditional replay.
828          *
829          * It's important that we do this first!  Otherwise we might exit the
830          * function without doing so, and try to replay a failed create.
831          * (b=3440)
832          */
833         if (it->it_op & IT_OPEN && req->rq_replay &&
834             (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
835                 mdc_clear_replay_flag(req, it->it_status);
836
837         DEBUG_REQ(D_RPCTRACE, req, "op=%x disposition=%x, status=%d",
838                   it->it_op, it->it_disposition, it->it_status);
839
840         /* We know what to expect, so we do any byte flipping required here */
841         if (it_has_reply_body(it)) {
842                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
843                 if (body == NULL) {
844                         rc = -EPROTO;
845                         CERROR("%s: cannot swab mdt_body: rc = %d\n",
846                                exp->exp_obd->obd_name, rc);
847                         RETURN(rc);
848                 }
849
850                 if (it_disposition(it, DISP_OPEN_OPEN) &&
851                     !it_open_error(DISP_OPEN_OPEN, it)) {
852                         /*
853                          * If this is a successful OPEN request, we need to set
854                          * replay handler and data early, so that if replay
855                          * happens immediately after swabbing below, new reply
856                          * is swabbed by that handler correctly.
857                          */
858                         mdc_set_open_replay_data(NULL, NULL, it);
859                 }
860
861                 if (it_disposition(it, DISP_OPEN_CREATE) &&
862                     !it_open_error(DISP_OPEN_CREATE, it)) {
863                         lprocfs_counter_incr(exp->exp_obd->obd_md_stats,
864                                              LPROC_MD_CREATE);
865                 }
866
867                 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
868                         void *eadata;
869
870                         mdc_update_max_ea_from_body(exp, body);
871
872                         /*
873                          * The eadata is opaque; just check that it is there.
874                          * Eventually, obd_unpackmd() will check the contents.
875                          */
876                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
877                                                         body->mbo_eadatasize);
878                         if (eadata == NULL)
879                                 RETURN(-EPROTO);
880
881                         /* save LVB data and length if for layout lock */
882                         lvb_data = eadata;
883                         lvb_len = body->mbo_eadatasize;
884
885                         /*
886                          * We save the reply LOV EA in case we have to replay a
887                          * create for recovery.  If we didn't allocate a large
888                          * enough request buffer above we need to reallocate it
889                          * here to hold the actual LOV EA.
890                          *
891                          * To not save LOV EA if request is not going to replay
892                          * (for example error one).
893                          */
894                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
895                                 rc = mdc_save_lmm(req, eadata,
896                                                   body->mbo_eadatasize);
897                                 if (rc) {
898                                         body->mbo_valid &= ~OBD_MD_FLEASIZE;
899                                         body->mbo_eadatasize = 0;
900                                         rc = 0;
901                                 }
902                         }
903                 }
904         } else if (it->it_op & IT_LAYOUT) {
905                 /* maybe the lock was granted right away and layout
906                  * is packed into RMF_DLM_LVB of req
907                  */
908                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
909                 CDEBUG(D_INFO, "%s: layout return lvb %d transno %lld\n",
910                        class_exp2obd(exp)->obd_name, lvb_len, req->rq_transno);
911                 if (lvb_len > 0) {
912                         lvb_data = req_capsule_server_sized_get(pill,
913                                                         &RMF_DLM_LVB, lvb_len);
914                         if (lvb_data == NULL)
915                                 RETURN(-EPROTO);
916
917                         /**
918                          * save replied layout data to the request buffer for
919                          * recovery consideration (lest MDS reinitialize
920                          * another set of OST objects).
921                          */
922                         if (req->rq_transno)
923                                 mdc_save_lmm(req, lvb_data, lvb_len);
924                 }
925         }
926
927         /* fill in stripe data for layout lock.
928          * LU-6581: trust layout data only if layout lock is granted. The MDT
929          * has stopped sending layout unless the layout lock is granted. The
930          * client still does this checking in case it's talking with an old
931          * server. - Jinshan
932          */
933         lock = ldlm_handle2lock(lockh);
934         if (lock == NULL)
935                 RETURN(rc);
936
937         if (ldlm_has_layout(lock) && lvb_data != NULL &&
938             !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
939                 void *lmm;
940
941                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
942                         ldlm_it2str(it->it_op), lvb_len);
943
944                 OBD_ALLOC_LARGE(lmm, lvb_len);
945                 if (lmm == NULL)
946                         GOTO(out_lock, rc = -ENOMEM);
947
948                 memcpy(lmm, lvb_data, lvb_len);
949
950                 /* install lvb_data */
951                 lock_res_and_lock(lock);
952                 if (lock->l_lvb_data == NULL) {
953                         lock->l_lvb_type = LVB_T_LAYOUT;
954                         lock->l_lvb_data = lmm;
955                         lock->l_lvb_len = lvb_len;
956                         lmm = NULL;
957                 }
958                 unlock_res_and_lock(lock);
959                 if (lmm != NULL)
960                         OBD_FREE_LARGE(lmm, lvb_len);
961         }
962
963         if (ldlm_has_dom(lock)) {
964                 LASSERT(lock->l_glimpse_ast == mdc_ldlm_glimpse_ast);
965
966                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
967                 if (!(body->mbo_valid & OBD_MD_DOM_SIZE)) {
968                         LDLM_ERROR(lock, "%s: DoM lock without size.",
969                                    exp->exp_obd->obd_name);
970                         GOTO(out_lock, rc = -EPROTO);
971                 }
972
973                 LDLM_DEBUG(lock, "DoM lock is returned by: %s, size: %llu",
974                            ldlm_it2str(it->it_op), body->mbo_dom_size);
975
976                 lock_res_and_lock(lock);
977                 mdc_body2lvb(body, &lock->l_ost_lvb);
978                 ldlm_lock_allow_match_locked(lock);
979                 unlock_res_and_lock(lock);
980         }
981 out_lock:
982         LDLM_LOCK_PUT(lock);
983
984         RETURN(rc);
985 }
986
987 static inline bool mdc_skip_mod_rpc_slot(const struct lookup_intent *it)
988 {
989         if (it != NULL &&
990             (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
991              it->it_op == IT_READDIR || it->it_op == IT_GETXATTR ||
992              (it->it_op == IT_LAYOUT && !(it->it_flags & MDS_FMODE_WRITE))))
993                 return true;
994         return false;
995 }
996
997 /* We always reserve enough space in the reply packet for a stripe MD, because
998  * we don't know in advance the file type.
999  */
1000 static int mdc_enqueue_base(struct obd_export *exp,
1001                             struct ldlm_enqueue_info *einfo,
1002                             const union ldlm_policy_data *policy,
1003                             struct lookup_intent *it,
1004                             struct md_op_data *op_data,
1005                             struct lustre_handle *lockh,
1006                             __u64 extra_lock_flags)
1007 {
1008         struct obd_device *obd = class_exp2obd(exp);
1009         struct ptlrpc_request *req;
1010         __u64 flags, saved_flags = extra_lock_flags;
1011         struct ldlm_res_id res_id;
1012         static const union ldlm_policy_data lookup_policy = {
1013                                   .l_inodebits = { MDS_INODELOCK_LOOKUP } };
1014         static const union ldlm_policy_data update_policy = {
1015                                   .l_inodebits = { MDS_INODELOCK_UPDATE } };
1016         static const union ldlm_policy_data layout_policy = {
1017                                   .l_inodebits = { MDS_INODELOCK_LAYOUT } };
1018         static const union ldlm_policy_data getxattr_policy = {
1019                                   .l_inodebits = { MDS_INODELOCK_XATTR } };
1020         int generation, resends = 0;
1021         struct ldlm_reply *lockrep;
1022         struct obd_import *imp = class_exp2cliimp(exp);
1023         __u32 acl_bufsize;
1024         enum lvb_type lvb_type = 0;
1025         int rc;
1026
1027         ENTRY;
1028         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
1029                  einfo->ei_type);
1030         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1031
1032         if (it != NULL) {
1033                 LASSERT(policy == NULL);
1034
1035                 saved_flags |= LDLM_FL_HAS_INTENT;
1036                 if (it->it_op & (IT_GETATTR | IT_READDIR | IT_CREAT))
1037                         policy = &update_policy;
1038                 else if (it->it_op & IT_LAYOUT)
1039                         policy = &layout_policy;
1040                 else if (it->it_op & IT_GETXATTR)
1041                         policy = &getxattr_policy;
1042                 else
1043                         policy = &lookup_policy;
1044         }
1045
1046         generation = obd->u.cli.cl_import->imp_generation;
1047         if (!it || (it->it_op & (IT_OPEN | IT_CREAT)))
1048                 acl_bufsize = min_t(__u32, imp->imp_connect_data.ocd_max_easize,
1049                                     XATTR_SIZE_MAX);
1050         else
1051                 acl_bufsize = LUSTRE_POSIX_ACL_MAX_SIZE_OLD;
1052
1053 resend:
1054         flags = saved_flags;
1055         if (it == NULL) {
1056                 /* The only way right now is FLOCK. */
1057                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
1058                          einfo->ei_type);
1059                 res_id.name[3] = LDLM_FLOCK;
1060                 req = ldlm_enqueue_pack(exp, 0);
1061         } else if (it->it_op & IT_OPEN) {
1062                 req = mdc_intent_open_pack(exp, it, op_data, acl_bufsize);
1063         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
1064                 req = mdc_intent_getattr_pack(exp, it, op_data, acl_bufsize);
1065         } else if (it->it_op & IT_READDIR) {
1066                 req = mdc_enqueue_pack(exp, 0);
1067         } else if (it->it_op & IT_LAYOUT) {
1068                 if (!imp_connect_lvb_type(imp))
1069                         RETURN(-EOPNOTSUPP);
1070                 req = mdc_intent_layout_pack(exp, it, op_data);
1071                 lvb_type = LVB_T_LAYOUT;
1072         } else if (it->it_op & IT_GETXATTR) {
1073                 req = mdc_intent_getxattr_pack(exp, it, op_data);
1074         } else if (it->it_op == IT_CREAT) {
1075                 req = mdc_intent_create_pack(exp, it, op_data, acl_bufsize,
1076                                              extra_lock_flags);
1077         } else {
1078                 LBUG();
1079                 RETURN(-EINVAL);
1080         }
1081
1082         if (IS_ERR(req))
1083                 RETURN(PTR_ERR(req));
1084
1085         if (resends) {
1086                 req->rq_generation_set = 1;
1087                 req->rq_import_generation = generation;
1088                 req->rq_sent = ktime_get_real_seconds() + resends;
1089         }
1090
1091         einfo->ei_req_slot = !(op_data->op_cli_flags & CLI_NO_SLOT);
1092         einfo->ei_mod_slot = !mdc_skip_mod_rpc_slot(it);
1093
1094         /* With Data-on-MDT the glimpse callback is needed too.
1095          * It is set here in advance but not in mdc_finish_enqueue()
1096          * to avoid possible races. It is safe to have glimpse handler
1097          * for non-DOM locks and costs nothing.
1098          */
1099         if (einfo->ei_cb_gl == NULL)
1100                 einfo->ei_cb_gl = mdc_ldlm_glimpse_ast;
1101
1102         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
1103                               0, lvb_type, lockh, 0);
1104
1105         if (!it) {
1106                 /* For flock requests we immediatelly return without further
1107                  * delay and let caller deal with the rest, since rest of
1108                  * this function metadata processing makes no sense for flock
1109                  * requests anyway. But in case of problem during comms with
1110                  * server (-ETIMEDOUT) or any signal/kill attempt (-EINTR),
1111                  * we cannot rely on caller and this mainly for F_UNLCKs
1112                  * (explicits or automatically generated by kernel to clean
1113                  * current flocks upon exit) that can't be trashed.
1114                  */
1115                 ptlrpc_req_finished(req);
1116                 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
1117                     (einfo->ei_type == LDLM_FLOCK) &&
1118                     (einfo->ei_mode == LCK_NL))
1119                         goto resend;
1120                 RETURN(rc);
1121         }
1122
1123         if (rc < 0) {
1124                 CDEBUG(D_INFO,
1125                       "%s: ldlm_cli_enqueue "DFID":"DFID"=%s failed: rc = %d\n",
1126                       obd->obd_name, PFID(&op_data->op_fid1),
1127                       PFID(&op_data->op_fid2), op_data->op_name ?: "", rc);
1128
1129                 mdc_clear_replay_flag(req, rc);
1130                 ptlrpc_req_finished(req);
1131                 RETURN(rc);
1132         }
1133
1134         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1135         LASSERT(lockrep != NULL);
1136
1137         lockrep->lock_policy_res2 =
1138                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1139
1140         /* Retry infinitely when the server returns -EINPROGRESS for the
1141          * intent operation, when server returns -EINPROGRESS for acquiring
1142          * intent lock, we'll retry in after_reply().
1143          */
1144         if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
1145                 mdc_clear_replay_flag(req, rc);
1146                 ptlrpc_req_finished(req);
1147                 if (generation == obd->u.cli.cl_import->imp_generation) {
1148                         if (signal_pending(current))
1149                                 RETURN(-EINTR);
1150
1151                         resends++;
1152                         CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
1153                                obd->obd_name, resends, it->it_op,
1154                                PFID(&op_data->op_fid1),
1155                                PFID(&op_data->op_fid2));
1156                         goto resend;
1157                 } else {
1158                         CDEBUG(D_HA, "resend cross eviction\n");
1159                         RETURN(-EIO);
1160                 }
1161         }
1162
1163         if ((int)lockrep->lock_policy_res2 == -ERANGE &&
1164             it->it_op & (IT_OPEN | IT_GETATTR | IT_LOOKUP) &&
1165             acl_bufsize == LUSTRE_POSIX_ACL_MAX_SIZE_OLD) {
1166                 mdc_clear_replay_flag(req, -ERANGE);
1167                 ptlrpc_req_finished(req);
1168                 acl_bufsize = min_t(__u32, imp->imp_connect_data.ocd_max_easize,
1169                                     XATTR_SIZE_MAX);
1170                 goto resend;
1171         }
1172
1173         rc = mdc_finish_enqueue(exp, &req->rq_pill, einfo, it, lockh, rc);
1174         if (rc < 0) {
1175                 if (lustre_handle_is_used(lockh)) {
1176                         ldlm_lock_decref(lockh, einfo->ei_mode);
1177                         memset(lockh, 0, sizeof(*lockh));
1178                 }
1179                 ptlrpc_req_finished(req);
1180
1181                 it->it_lock_handle = 0;
1182                 it->it_lock_mode = 0;
1183                 it->it_request = NULL;
1184         }
1185
1186         RETURN(rc);
1187 }
1188
1189 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
1190                 const union ldlm_policy_data *policy,
1191                 struct md_op_data *op_data,
1192                 struct lustre_handle *lockh, __u64 extra_lock_flags)
1193 {
1194         return mdc_enqueue_base(exp, einfo, policy, NULL,
1195                                 op_data, lockh, extra_lock_flags);
1196 }
1197
1198 static int mdc_finish_intent_lock(struct obd_export *exp,
1199                                   struct ptlrpc_request *request,
1200                                   struct md_op_data *op_data,
1201                                   struct lookup_intent *it,
1202                                   struct lustre_handle *lockh)
1203 {
1204         struct lustre_handle old_lock;
1205         struct ldlm_lock *lock;
1206         int rc = 0;
1207
1208         ENTRY;
1209         LASSERT(request != NULL);
1210         LASSERT(request != LP_POISON);
1211         LASSERT(request->rq_repmsg != LP_POISON);
1212
1213         if (it->it_op & IT_READDIR)
1214                 RETURN(0);
1215
1216         if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) {
1217                 if (it->it_status != 0)
1218                         GOTO(out, rc = it->it_status);
1219         } else {
1220                 if (!it_disposition(it, DISP_IT_EXECD)) {
1221                         /* The server failed before it even started executing
1222                          * the intent, i.e. because it couldn't unpack the
1223                          * request.
1224                          */
1225                         LASSERT(it->it_status != 0);
1226                         GOTO(out, rc = it->it_status);
1227                 }
1228                 rc = it_open_error(DISP_IT_EXECD, it);
1229                 if (rc)
1230                         GOTO(out, rc);
1231
1232                 rc = it_open_error(DISP_LOOKUP_EXECD, it);
1233                 if (rc)
1234                         GOTO(out, rc);
1235
1236                 /* keep requests around for the multiple phases of the call
1237                  * this shows the DISP_XX must guarantee we make it into the
1238                  * call
1239                  */
1240                 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
1241                     it_disposition(it, DISP_OPEN_CREATE) &&
1242                     !it_open_error(DISP_OPEN_CREATE, it)) {
1243                         it_set_disposition(it, DISP_ENQ_CREATE_REF);
1244                         /* balanced in ll_create_node */
1245                         ptlrpc_request_addref(request);
1246                 }
1247                 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1248                     it_disposition(it, DISP_OPEN_OPEN) &&
1249                     !it_open_error(DISP_OPEN_OPEN, it)) {
1250                         it_set_disposition(it, DISP_ENQ_OPEN_REF);
1251                         /* balanced in ll_file_open */
1252                         ptlrpc_request_addref(request);
1253                         /* eviction in middle of open RPC processing b=11546 */
1254                         CFS_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE,
1255                                          obd_timeout);
1256                 }
1257
1258                 if (it->it_op & IT_CREAT) {
1259                         /* XXX this belongs in ll_create_it */
1260                 } else if (it->it_op == IT_OPEN) {
1261                         LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1262                 } else {
1263                         LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
1264                 }
1265         }
1266
1267         /* If we already have a matching lock, then cancel the new
1268          * one.  We have to set the data here instead of in
1269          * mdc_enqueue, because we need to use the child's inode as
1270          * the l_ast_data to match, and that's not available until
1271          * intent_finish has performed the iget().
1272          */
1273         lock = ldlm_handle2lock(lockh);
1274         if (lock) {
1275                 union ldlm_policy_data policy = lock->l_policy_data;
1276
1277                 LDLM_DEBUG(lock, "matching against this");
1278
1279                 if (it_has_reply_body(it)) {
1280                         struct mdt_body *body;
1281
1282                         body = req_capsule_server_get(&request->rq_pill,
1283                                                       &RMF_MDT_BODY);
1284                         /* mdc_enqueue checked */
1285                         LASSERT(body != NULL);
1286                         LASSERTF(fid_res_name_eq(&body->mbo_fid1,
1287                                                  &lock->l_resource->lr_name),
1288                                  "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1289                                  PLDLMRES(lock->l_resource),
1290                                  PFID(&body->mbo_fid1));
1291                 }
1292                 LDLM_LOCK_PUT(lock);
1293
1294                 memcpy(&old_lock, lockh, sizeof(*lockh));
1295                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1296                                    LDLM_IBITS, &policy, LCK_NL, &old_lock)) {
1297                         ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
1298                         memcpy(lockh, &old_lock, sizeof(old_lock));
1299                         it->it_lock_handle = lockh->cookie;
1300                 }
1301         }
1302
1303         EXIT;
1304 out:
1305         CDEBUG(D_DENTRY,
1306                "D_IT dentry=%.*s intent=%s status=%d disp=%x: rc = %d\n",
1307                 (int)op_data->op_namelen, op_data->op_name,
1308                 ldlm_it2str(it->it_op), it->it_status, it->it_disposition, rc);
1309
1310         return rc;
1311 }
1312
1313 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1314                         struct lu_fid *fid, __u64 *bits)
1315 {
1316         /* We could just return 1 immediately, but as we should only be called
1317          * in revalidate_it if we already have a lock, let's verify that.
1318          */
1319         struct ldlm_res_id res_id;
1320         struct lustre_handle lockh;
1321         union ldlm_policy_data policy;
1322         enum ldlm_mode mode;
1323
1324         ENTRY;
1325         if (it->it_lock_handle) {
1326                 lockh.cookie = it->it_lock_handle;
1327                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1328         } else {
1329                 fid_build_reg_res_name(fid, &res_id);
1330                 switch (it->it_op) {
1331                 case IT_GETATTR:
1332                         /* File attributes are held under multiple bits:
1333                          * nlink is under lookup lock, size and times are
1334                          * under UPDATE lock and recently we've also got
1335                          * a separate permissions lock for owner/group/acl that
1336                          * were protected by lookup lock before.
1337                          * Getattr must provide all of that information,
1338                          * so we need to ensure we have all of those locks.
1339                          * Unfortunately, if the bits are split across multiple
1340                          * locks, there's no easy way to match all of them here,
1341                          * so an extra RPC would be performed to fetch all
1342                          * of those bits at once for now.
1343                          */
1344                         /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1345                          * but for old MDTs (< 2.4), permission is covered
1346                          * by LOOKUP lock, so it needs to match all bits here.
1347                          */
1348                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1349                                                   MDS_INODELOCK_PERM;
1350                         break;
1351                 case IT_READDIR:
1352                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1353                         break;
1354                 case IT_LAYOUT:
1355                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1356                         break;
1357                 default:
1358                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1359                         break;
1360                 }
1361
1362                 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1363                                       LDLM_IBITS, &policy,
1364                                       LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1365                                       &lockh);
1366         }
1367
1368         if (mode) {
1369                 it->it_lock_handle = lockh.cookie;
1370                 it->it_lock_mode = mode;
1371         } else {
1372                 it->it_lock_handle = 0;
1373                 it->it_lock_mode = 0;
1374         }
1375
1376         RETURN(!!mode);
1377 }
1378
1379 /*
1380  * This long block is all about fixing up the lock and request state
1381  * so that it is correct as of the moment _before_ the operation was
1382  * applied; that way, the VFS will think that everything is normal and
1383  * call Lustre's regular VFS methods.
1384  *
1385  * If we're performing a creation, that means that unless the creation
1386  * failed with EEXIST, we should fake up a negative dentry.
1387  *
1388  * For everything else, we want the lookup to succeed.
1389  *
1390  * One additional note: if CREATE or OPEN succeeded, we add an extra
1391  * reference to the request because we need to keep it around until
1392  * ll_create/ll_open gets called.
1393  *
1394  * The server will return to us, in it_disposition, an indication of
1395  * exactly what it_status refers to.
1396  *
1397  * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1398  * otherwise if DISP_OPEN_CREATE is set, then it_status is the
1399  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1400  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1401  * was successful.
1402  *
1403  * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1404  * child lookup.
1405  */
1406 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1407                     struct lookup_intent *it, struct ptlrpc_request **reqp,
1408                     ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1409 {
1410         struct ldlm_enqueue_info einfo = {
1411                 .ei_type        = LDLM_IBITS,
1412                 .ei_mode        = it_to_lock_mode(it),
1413                 .ei_cb_bl       = cb_blocking,
1414                 .ei_cb_cp       = ldlm_completion_ast,
1415                 .ei_cb_gl       = mdc_ldlm_glimpse_ast,
1416         };
1417         struct lustre_handle lockh;
1418         int rc = 0;
1419
1420         ENTRY;
1421         LASSERT(it);
1422         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1423                 ", intent: %s flags %#llo\n", (int)op_data->op_namelen,
1424                 op_data->op_name, PFID(&op_data->op_fid2),
1425                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1426                 it->it_flags);
1427
1428         lockh.cookie = 0;
1429         /* MDS_FID_OP is not a revalidate case */
1430         if (fid_is_sane(&op_data->op_fid2) &&
1431             (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR)) &&
1432             !(op_data->op_bias & MDS_FID_OP)) {
1433                 /* We could just return 1 immediately, but since we should only
1434                  * be called in revalidate_it if we already have a lock, let's
1435                  * verify that.
1436                  */
1437                 it->it_lock_handle = 0;
1438                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1439                 /* Only return failure if it was not GETATTR by cfid
1440                  * (from inode_revalidate()).
1441                  */
1442                 if (rc || op_data->op_namelen != 0)
1443                         RETURN(rc);
1444         }
1445
1446         /* For case if upper layer did not alloc fid, do it now. */
1447         if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1448                 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1449                 if (rc < 0) {
1450                         CERROR("%s: cannot allocate new FID: rc=%d\n",
1451                                exp->exp_obd->obd_name, rc);
1452                         RETURN(rc);
1453                 }
1454         }
1455
1456         rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh,
1457                               extra_lock_flags);
1458         if (rc < 0)
1459                 RETURN(rc);
1460
1461         *reqp = it->it_request;
1462         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1463         RETURN(rc);
1464 }
1465
1466 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1467                                               struct ptlrpc_request *req,
1468                                               void *args, int rc)
1469 {
1470         struct mdc_getattr_args *ga = args;
1471         struct obd_export *exp = ga->ga_exp;
1472         struct md_op_item *item = ga->ga_item;
1473         struct ldlm_enqueue_info *einfo = &item->mop_einfo;
1474         struct lookup_intent *it = &item->mop_it;
1475         struct lustre_handle *lockh = &item->mop_lockh;
1476         struct req_capsule *pill = &req->rq_pill;
1477         struct ldlm_reply *lockrep;
1478         __u64 flags = LDLM_FL_HAS_INTENT;
1479
1480         ENTRY;
1481         if (CFS_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1482                 rc = -ETIMEDOUT;
1483
1484         rc = ldlm_cli_enqueue_fini(exp, pill, einfo, 1, &flags, NULL, 0,
1485                                    lockh, rc, true);
1486         if (rc < 0) {
1487                 CERROR("%s: ldlm_cli_enqueue_fini() failed: rc = %d\n",
1488                        exp->exp_obd->obd_name, rc);
1489                 mdc_clear_replay_flag(req, rc);
1490                 GOTO(out, rc);
1491         }
1492
1493         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
1494         LASSERT(lockrep != NULL);
1495
1496         lockrep->lock_policy_res2 =
1497                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1498
1499         rc = mdc_finish_enqueue(exp, pill, einfo, it, lockh, rc);
1500         if (rc)
1501                 GOTO(out, rc);
1502
1503         rc = mdc_finish_intent_lock(exp, req, &item->mop_data, it, lockh);
1504         EXIT;
1505
1506 out:
1507         item->mop_pill = pill;
1508         item->mop_cb(item, rc);
1509         return 0;
1510 }
1511
1512 int mdc_intent_getattr_async(struct obd_export *exp,
1513                              struct md_op_item *item)
1514 {
1515         struct md_op_data *op_data = &item->mop_data;
1516         struct lookup_intent *it = &item->mop_it;
1517         struct ptlrpc_request *req;
1518         struct mdc_getattr_args *ga;
1519         struct ldlm_res_id res_id;
1520         union ldlm_policy_data policy = {
1521                 .l_inodebits = { MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE }
1522         };
1523         __u64 flags = LDLM_FL_HAS_INTENT;
1524         int rc = 0;
1525
1526         ENTRY;
1527         CDEBUG(D_DLMTRACE,
1528                "name: %.*s in inode "DFID", intent: %s flags %#llo\n",
1529                (int)op_data->op_namelen, op_data->op_name,
1530                PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1531
1532         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1533         /* If the MDT return -ERANGE because of large ACL, then the sponsor
1534          * of the async getattr RPC will handle that by itself.
1535          */
1536         req = mdc_intent_getattr_pack(exp, it, op_data,
1537                                       LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
1538         if (IS_ERR(req))
1539                 RETURN(PTR_ERR(req));
1540
1541         /* With Data-on-MDT the glimpse callback is needed too.
1542          * It is set here in advance but not in mdc_finish_enqueue()
1543          * to avoid possible races. It is safe to have glimpse handler
1544          * for non-DOM locks and costs nothing.
1545          */
1546         if (item->mop_einfo.ei_cb_gl == NULL)
1547                 item->mop_einfo.ei_cb_gl = mdc_ldlm_glimpse_ast;
1548
1549         rc = ldlm_cli_enqueue(exp, &req, &item->mop_einfo, &res_id, &policy,
1550                               &flags, NULL, 0, LVB_T_NONE, &item->mop_lockh, 1);
1551         if (rc < 0) {
1552                 ptlrpc_req_finished(req);
1553                 RETURN(rc);
1554         }
1555
1556         ga = ptlrpc_req_async_args(ga, req);
1557         ga->ga_exp = exp;
1558         ga->ga_item = item;
1559
1560         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1561         ptlrpcd_add_req(req);
1562
1563         RETURN(0);
1564 }