Whamcloud - gitweb
LU-9633 ptlrpc: Add kernel doc style for ptlrpc (14)
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 // SPDX-License-Identifier: GPL-2.0
2
3 /*
4  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
5  * Use is subject to license terms.
6  *
7  * Copyright (c) 2011, 2017, Intel Corporation.
8  */
9
10 /*
11  * This file is part of Lustre, http://www.lustre.org/
12  */
13
14 #define DEBUG_SUBSYSTEM S_MDC
15
16 #include <linux/module.h>
17
18 #include <obd.h>
19 #include <obd_class.h>
20 #include <lustre_dlm.h>
21 #include <lustre_fid.h>
22 #include <lustre_intent.h>
23 #include <lustre_mdc.h>
24 #include <lustre_net.h>
25 #include <lustre_req_layout.h>
26 #include <lustre_swab.h>
27 #include <lustre_acl.h>
28
29 #include "mdc_internal.h"
30
31 struct mdc_getattr_args {
32         struct obd_export       *ga_exp;
33         struct md_op_item       *ga_item;
34 };
35
36 struct mdc_enqueue_args {
37         struct ldlm_lock                *mea_lock;
38         struct obd_export               *mea_exp;
39         enum ldlm_mode                  mea_mode;
40         __u64                           mea_flags;
41         obd_enqueue_update_f            mea_upcall;
42 };
43
44 int it_open_error(int phase, struct lookup_intent *it)
45 {
46         if (it_disposition(it, DISP_OPEN_LEASE)) {
47                 if (phase >= DISP_OPEN_LEASE)
48                         return it->it_status;
49                 else
50                         return 0;
51         }
52         if (it_disposition(it, DISP_OPEN_OPEN)) {
53                 if (phase >= DISP_OPEN_OPEN)
54                         return it->it_status;
55                 else
56                         return 0;
57         }
58
59         if (it_disposition(it, DISP_OPEN_CREATE)) {
60                 if (phase >= DISP_OPEN_CREATE)
61                         return it->it_status;
62                 else
63                         return 0;
64         }
65
66         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
67                 if (phase >= DISP_LOOKUP_EXECD)
68                         return it->it_status;
69                 else
70                         return 0;
71         }
72
73         if (it_disposition(it, DISP_IT_EXECD)) {
74                 if (phase >= DISP_IT_EXECD)
75                         return it->it_status;
76                 else
77                         return 0;
78         }
79
80         CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
81         LBUG();
82
83         return 0;
84 }
85 EXPORT_SYMBOL(it_open_error);
86
87 /* this must be called on a lockh that is known to have a referenced lock */
88 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
89                       void *data, enum mds_ibits_locks *bits)
90 {
91         struct ldlm_lock *lock;
92         struct inode *new_inode = data;
93
94         ENTRY;
95         if (bits)
96                 *bits = 0;
97
98         if (!lustre_handle_is_used(lockh))
99                 RETURN(0);
100
101         lock = ldlm_handle2lock(lockh);
102
103         LASSERT(lock != NULL);
104         lock_res_and_lock(lock);
105         if (lock->l_resource->lr_lvb_inode &&
106             lock->l_resource->lr_lvb_inode != data) {
107                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
108
109                 LASSERTF(old_inode->i_state & I_FREEING,
110                          "Found existing inode %px/%lu/%u state %lu in lock: setting data to %px/%lu/%u\n",
111                          old_inode, old_inode->i_ino, old_inode->i_generation,
112                          (unsigned long)old_inode->i_state,
113                          new_inode, new_inode->i_ino, new_inode->i_generation);
114         }
115         lock->l_resource->lr_lvb_inode = new_inode;
116         if (bits)
117                 *bits = lock->l_policy_data.l_inodebits.bits;
118
119         unlock_res_and_lock(lock);
120         ldlm_lock_put(lock);
121
122         RETURN(0);
123 }
124
125 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
126                               const struct lu_fid *fid, enum ldlm_type type,
127                               union ldlm_policy_data *policy,
128                               enum ldlm_mode mode,
129                               enum ldlm_match_flags match_flags,
130                               struct lustre_handle *lockh)
131 {
132         struct ldlm_res_id res_id;
133         enum ldlm_mode rc;
134
135         ENTRY;
136         fid_build_reg_res_name(fid, &res_id);
137         /* LU-4405: Clear bits not supported by server */
138         policy->l_inodebits.bits &= exp_connect_ibits(exp);
139         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
140                              &res_id, type, policy, mode, match_flags, lockh);
141         RETURN(rc);
142 }
143
144 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
145                       union ldlm_policy_data *policy, enum ldlm_mode mode,
146                       enum ldlm_cancel_flags flags, void *opaque)
147 {
148         struct obd_device *obd = class_exp2obd(exp);
149         struct ldlm_res_id res_id;
150         int rc;
151
152         ENTRY;
153         fid_build_reg_res_name(fid, &res_id);
154         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
155                                              policy, mode, flags, opaque);
156         RETURN(rc);
157 }
158
159 int mdc_null_inode(struct obd_export *exp,
160                    const struct lu_fid *fid)
161 {
162         struct ldlm_res_id res_id;
163         struct ldlm_resource *res;
164         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
165
166         ENTRY;
167         LASSERTF(ns != NULL, "no namespace passed\n");
168
169         fid_build_reg_res_name(fid, &res_id);
170
171         res = ldlm_resource_get(ns, &res_id, 0, 0);
172         if (IS_ERR(res))
173                 RETURN(0);
174
175         lock_res(res);
176         res->lr_lvb_inode = NULL;
177         unlock_res(res);
178
179         ldlm_resource_putref(res);
180         RETURN(0);
181 }
182
183 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
184 {
185         /* Don't hold error requests for replay. */
186         if (req->rq_replay) {
187                 spin_lock(&req->rq_lock);
188                 req->rq_replay = 0;
189                 spin_unlock(&req->rq_lock);
190         }
191         if (rc && req->rq_transno != 0) {
192                 DEBUG_REQ(D_ERROR, req, "transno returned on error: rc = %d",
193                           rc);
194                 LBUG();
195         }
196 }
197
198 /**
199  * Save a large LOV/LMV EA into the request buffer so that it is available
200  * for replay.  We don't do this in the initial request because the
201  * original request doesn't need this buffer (at most it sends just the
202  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
203  * buffer and may also be difficult to allocate and save a very large
204  * request buffer for each open. (b=5707)
205  *
206  * OOM here may cause recovery failure if lmm is needed (only for the
207  * original open if the MDS crashed just when this client also OOM'd)
208  * but this is incredibly unlikely, and questionable whether the client
209  * could do MDS recovery under OOM anyways...
210  */
211 int mdc_save_lmm(struct ptlrpc_request *req, void *data, u32 size)
212 {
213         struct req_capsule *pill = &req->rq_pill;
214         void *lmm;
215         int rc = 0;
216
217         if (req_capsule_get_size(pill, &RMF_EADATA, RCL_CLIENT) < size) {
218                 rc = sptlrpc_cli_enlarge_reqbuf(req, &RMF_EADATA, size);
219                 if (rc) {
220                         CERROR("%s: Can't enlarge ea size to %d: rc = %d\n",
221                                req->rq_export->exp_obd->obd_name,
222                                size, rc);
223                         return rc;
224                 }
225         } else {
226                 req_capsule_shrink(pill, &RMF_EADATA, size, RCL_CLIENT);
227         }
228
229         req_capsule_set_size(pill, &RMF_EADATA, RCL_CLIENT, size);
230         lmm = req_capsule_client_get(pill, &RMF_EADATA);
231         if (lmm) {
232                 memcpy(lmm, data, size);
233                 lov_fix_ea_for_replay(lmm);
234         }
235
236         return rc;
237 }
238
239 static struct ptlrpc_request *
240 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
241                      struct md_op_data *op_data, __u32 acl_bufsize)
242 {
243         struct ptlrpc_request *req;
244         struct obd_device *obd = class_exp2obd(exp);
245         struct ldlm_intent *lit;
246         const void *lmm = op_data->op_data;
247         __u32 lmmsize = op_data->op_data_size;
248         __u32  mdt_md_capsule_size;
249         LIST_HEAD(cancels);
250         int count = 0;
251         enum ldlm_mode mode;
252         int repsize, repsize_estimate;
253         struct sptlrpc_sepol *sepol;
254         int rc;
255
256         ENTRY;
257
258         mdt_md_capsule_size = obd->u.cli.cl_default_mds_easize;
259
260         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
261
262         /* XXX: openlock is not cancelled for cross-refs. */
263         /* If inode is known, cancel conflicting OPEN locks. */
264         if (fid_is_sane(&op_data->op_fid2)) {
265                 if (it->it_open_flags & MDS_OPEN_LEASE) { /* try to get lease */
266                         if (it->it_open_flags & MDS_FMODE_WRITE)
267                                 mode = LCK_EX;
268                         else
269                                 mode = LCK_PR;
270                 } else {
271                         if (it->it_open_flags & (MDS_FMODE_WRITE |
272                                                  MDS_OPEN_TRUNC))
273                                 mode = LCK_CW;
274 #ifdef FMODE_EXEC
275                         else if (it->it_open_flags & FMODE_EXEC)
276                                 mode = LCK_PR;
277 #endif
278                         else
279                                 mode = LCK_CR;
280                 }
281                 count = mdc_resource_cancel_unused(exp, &op_data->op_fid2,
282                                                    &cancels, mode,
283                                                    MDS_INODELOCK_OPEN);
284         }
285
286         /* If CREATE, cancel parent's UPDATE lock. */
287         if (it->it_op & IT_CREAT)
288                 mode = LCK_EX;
289         else
290                 mode = LCK_CR;
291         count += mdc_resource_cancel_unused(exp, &op_data->op_fid1,
292                                             &cancels, mode,
293                                             MDS_INODELOCK_UPDATE);
294
295         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
296                                    &RQF_LDLM_INTENT_OPEN);
297         if (req == NULL) {
298                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
299                 RETURN(ERR_PTR(-ENOMEM));
300         }
301
302         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
303                              op_data->op_namelen + 1);
304         if (cl_is_lov_delay_create(it->it_open_flags)) {
305                 /* open(O_LOV_DELAY_CREATE) won't pack lmm */
306                 LASSERT(lmmsize == 0);
307                 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
308         } else {
309                 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
310                              max(lmmsize, obd->u.cli.cl_default_mds_easize));
311         }
312
313         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
314                              RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
315                              op_data->op_file_secctx_name_size : 0);
316
317         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
318                              op_data->op_file_secctx_size);
319
320         req_capsule_set_size(&req->rq_pill, &RMF_FILE_ENCCTX, RCL_CLIENT,
321                              op_data->op_file_encctx_size);
322
323         /* get SELinux policy info if any */
324         sepol = sptlrpc_sepol_get(req);
325         if (IS_ERR(sepol))
326                 GOTO(err_free_rq, rc = PTR_ERR(sepol));
327
328         req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
329                              sptlrpc_sepol_size(sepol));
330
331         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
332         if (rc < 0)
333                 GOTO(err_put_sepol, rc);
334
335         spin_lock(&req->rq_lock);
336         req->rq_replay = req->rq_import->imp_replayable;
337         spin_unlock(&req->rq_lock);
338
339         /* pack the intent */
340         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
341         lit->opc = (__u64)it->it_op;
342
343         /* pack the intended request */
344         mdc_open_pack(&req->rq_pill, op_data, it->it_create_mode, 0,
345                       it->it_open_flags, lmm, lmmsize, sepol);
346
347         sptlrpc_sepol_put(sepol);
348
349         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
350                              mdt_md_capsule_size);
351         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
352
353         if (!(it->it_op & IT_CREAT) && it->it_op & IT_OPEN &&
354             req_capsule_has_field(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
355                                   RCL_CLIENT) &&
356             op_data->op_file_secctx_name_size > 0 &&
357             op_data->op_file_secctx_name != NULL) {
358                 char *secctx_name;
359
360                 secctx_name = req_capsule_client_get(&req->rq_pill,
361                                                      &RMF_FILE_SECCTX_NAME);
362                 memcpy(secctx_name, op_data->op_file_secctx_name,
363                        op_data->op_file_secctx_name_size);
364                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
365                                      RCL_SERVER,
366                                      obd->u.cli.cl_max_mds_easize);
367
368                 CDEBUG(D_SEC, "packed '"DNAME"' as security xattr name\n",
369                        encode_fn_opdata(op_data));
370
371         } else {
372                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
373                                      RCL_SERVER, 0);
374         }
375
376         if (exp_connect_encrypt(exp) && !(it->it_op & IT_CREAT) &&
377             it->it_op & IT_OPEN)
378                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_ENCCTX,
379                                      RCL_SERVER,
380                                      obd->u.cli.cl_max_mds_easize);
381         else
382                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_ENCCTX,
383                                      RCL_SERVER, 0);
384
385         /**
386          * Inline buffer for possible data from Data-on-MDT files.
387          */
388         req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE, RCL_SERVER,
389                              sizeof(struct niobuf_remote));
390         req_capsule_set_size(&req->rq_pill, &RMF_DEFAULT_MDT_MD, RCL_SERVER,
391                              sizeof(struct lmv_user_md));
392         ptlrpc_request_set_replen(req);
393
394         /* Get real repbuf allocated size as rounded up power of 2 */
395         repsize = size_roundup_power2(req->rq_replen +
396                                       lustre_msg_early_size);
397         /* Estimate free space for DoM files in repbuf */
398         repsize_estimate = repsize - (req->rq_replen -
399                            mdt_md_capsule_size +
400                            sizeof(struct lov_comp_md_v1) +
401                            sizeof(struct lov_comp_md_entry_v1) +
402                            lov_mds_md_size(0, LOV_MAGIC_V3));
403
404         if (repsize_estimate < obd->u.cli.cl_dom_min_inline_repsize) {
405                 repsize = obd->u.cli.cl_dom_min_inline_repsize -
406                           repsize_estimate + sizeof(struct niobuf_remote);
407                 req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE,
408                                      RCL_SERVER,
409                                      sizeof(struct niobuf_remote) + repsize);
410                 ptlrpc_request_set_replen(req);
411                 CDEBUG(D_INFO, "Increase repbuf by %d bytes, total: %d\n",
412                        repsize, req->rq_replen);
413                 repsize = size_roundup_power2(req->rq_replen +
414                                               lustre_msg_early_size);
415         }
416         /* The only way to report real allocated repbuf size to the server
417          * is the lm_repsize but it must be set prior buffer allocation itself
418          * due to security reasons - it is part of buffer used in signature
419          * calculation (see LU-11414). Therefore the saved size is predicted
420          * value as rq_replen rounded to the next higher power of 2.
421          * Such estimation is safe. Though the final allocated buffer might
422          * be even larger, it is not possible to know that at this point.
423          */
424         if ((op_data->op_cli_flags & CLI_READ_ON_OPEN) != 0)
425                 req->rq_reqmsg->lm_repsize = repsize;
426         else
427                 req->rq_reqmsg->lm_repsize = 0;
428         RETURN(req);
429
430 err_put_sepol:
431         sptlrpc_sepol_put(sepol);
432 err_free_rq:
433         ptlrpc_request_free(req);
434         return ERR_PTR(rc);
435 }
436
437 static struct ptlrpc_request *
438 mdc_intent_create_pack(struct obd_export *exp, struct lookup_intent *it,
439                        struct md_op_data *op_data, __u32 acl_bufsize,
440                        __u64 extra_lock_flags)
441 {
442         LIST_HEAD(cancels);
443         struct ptlrpc_request *req;
444         struct obd_device *obd = class_exp2obd(exp);
445         struct sptlrpc_sepol *sepol;
446         struct ldlm_intent *lit;
447         int count = 0;
448         int rc;
449
450         ENTRY;
451
452         if (fid_is_sane(&op_data->op_fid1))
453                 /* cancel parent's UPDATE lock. */
454                 count = mdc_resource_cancel_unused(exp, &op_data->op_fid1,
455                                                    &cancels, LCK_EX,
456                                                    MDS_INODELOCK_UPDATE);
457
458         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
459                                    &RQF_LDLM_INTENT_CREATE);
460         if (req == NULL) {
461                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
462                 RETURN(ERR_PTR(-ENOMEM));
463         }
464
465         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
466                              op_data->op_namelen + 1);
467         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
468                              RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
469                              strlen(op_data->op_file_secctx_name) + 1 : 0);
470         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
471                              op_data->op_file_secctx_size);
472         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
473                              op_data->op_data_size);
474         req_capsule_set_size(&req->rq_pill, &RMF_FILE_ENCCTX, RCL_CLIENT,
475                              op_data->op_file_encctx_size);
476
477         /* get SELinux policy info if any */
478         sepol = sptlrpc_sepol_get(req);
479         if (IS_ERR(sepol)) {
480                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
481                 GOTO(err_free_rq, rc = PTR_ERR(sepol));
482         }
483         req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
484                              sptlrpc_sepol_size(sepol));
485
486         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
487         if (rc < 0)
488                 GOTO(err_put_sepol, rc);
489
490         /* Pack the intent */
491         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
492         lit->opc = (__u64)it->it_op;
493
494         /* Pack the intent request. */
495         mdc_create_pack(&req->rq_pill, op_data, op_data->op_data,
496                         op_data->op_data_size, it->it_create_mode,
497                         op_data->op_fsuid, op_data->op_fsgid,
498                         op_data->op_cap, 0, sepol);
499
500         sptlrpc_sepol_put(sepol);
501
502         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
503                              obd->u.cli.cl_default_mds_easize);
504         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
505         req_capsule_set_size(&req->rq_pill, &RMF_DEFAULT_MDT_MD, RCL_SERVER,
506                              sizeof(struct lmv_user_md));
507         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
508                              RCL_SERVER, 0);
509         req_capsule_set_size(&req->rq_pill, &RMF_FILE_ENCCTX, RCL_SERVER, 0);
510
511         ptlrpc_request_set_replen(req);
512         RETURN(req);
513
514 err_put_sepol:
515         sptlrpc_sepol_put(sepol);
516 err_free_rq:
517         ptlrpc_request_free(req);
518         return ERR_PTR(rc);
519 }
520
521 #define GA_DEFAULT_EA_NAME_LEN   20
522 #define GA_DEFAULT_EA_VAL_LEN   250
523 #define GA_DEFAULT_EA_NUM        10
524
525 static struct ptlrpc_request *
526 mdc_intent_getxattr_pack(struct obd_export *exp, struct lookup_intent *it,
527                          struct md_op_data *op_data)
528 {
529         struct ptlrpc_request *req;
530         struct ldlm_intent *lit;
531         struct sptlrpc_sepol *sepol;
532         int rc, count = 0;
533         LIST_HEAD(cancels);
534         u32 ea_vals_buf_size = GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM;
535
536         ENTRY;
537         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
538                                         &RQF_LDLM_INTENT_GETXATTR);
539         if (req == NULL)
540                 RETURN(ERR_PTR(-ENOMEM));
541
542         /* get SELinux policy info if any */
543         sepol = sptlrpc_sepol_get(req);
544         if (IS_ERR(sepol))
545                 GOTO(err_free_rq, rc = PTR_ERR(sepol));
546
547         req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
548                              sptlrpc_sepol_size(sepol));
549
550         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
551         if (rc)
552                 GOTO(err_put_sepol, rc);
553
554         /* pack the intent */
555         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
556         lit->opc = IT_GETXATTR;
557         /* Message below is checked in sanity-selinux test_20d
558          * and sanity-sec test_49
559          */
560         CDEBUG(D_INFO, "%s: get xattrs for "DFID"\n",
561                exp->exp_obd->obd_name, PFID(&op_data->op_fid1));
562
563 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 0, 53, 0)
564         /* If the supplied buffer is too small then the server will return
565          * -ERANGE and llite will fallback to using non cached xattr
566          * operations. On servers before 2.10.1 a (non-cached) listxattr RPC
567          * for an orphan or dead file causes an oops. So let's try to avoid
568          * sending too small a buffer to too old a server. This is effectively
569          * undoing the memory conservation of LU-9417 when it would be *more*
570          * likely to crash the server. See LU-9856.
571          */
572         if (exp->exp_connect_data.ocd_version < OBD_OCD_VERSION(2, 10, 1, 0))
573                 ea_vals_buf_size = max_t(u32, ea_vals_buf_size,
574                                          exp->exp_connect_data.ocd_max_easize);
575 #endif
576
577         /* pack the intended request */
578         mdc_pack_body(&req->rq_pill, &op_data->op_fid1, op_data->op_valid,
579                       ea_vals_buf_size, -1, 0, op_data->op_projid);
580
581         /* get SELinux policy info if any */
582         mdc_file_sepol_pack(&req->rq_pill, sepol);
583         sptlrpc_sepol_put(sepol);
584
585         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
586                              GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM);
587
588         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER,
589                              ea_vals_buf_size);
590
591         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS, RCL_SERVER,
592                              sizeof(u32) * GA_DEFAULT_EA_NUM);
593
594         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
595
596         ptlrpc_request_set_replen(req);
597
598         RETURN(req);
599
600 err_put_sepol:
601         sptlrpc_sepol_put(sepol);
602 err_free_rq:
603         ptlrpc_request_free(req);
604         RETURN(ERR_PTR(rc));
605 }
606
607 static struct ptlrpc_request *
608 mdc_intent_getattr_pack(struct obd_export *exp, struct lookup_intent *it,
609                         struct md_op_data *op_data, __u32 acl_bufsize)
610 {
611         struct ptlrpc_request *req;
612         struct obd_device *obd = class_exp2obd(exp);
613         u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE |
614                     OBD_MD_FLDIREA | OBD_MD_MEA | OBD_MD_FLACL |
615                     OBD_MD_DEFAULT_MEA;
616         struct ldlm_intent *lit;
617         __u32 easize;
618         bool have_secctx = false;
619         int rc;
620
621         ENTRY;
622         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
623                                    &RQF_LDLM_INTENT_GETATTR);
624         if (req == NULL)
625                 RETURN(ERR_PTR(-ENOMEM));
626
627         /* send name of security xattr to get upon intent */
628         if (it->it_op & (IT_LOOKUP | IT_GETATTR) &&
629             req_capsule_has_field(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
630                                   RCL_CLIENT) &&
631             op_data->op_file_secctx_name_size > 0 &&
632             op_data->op_file_secctx_name != NULL) {
633                 have_secctx = true;
634                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
635                                      RCL_CLIENT,
636                                      op_data->op_file_secctx_name_size);
637         }
638
639         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
640                              op_data->op_namelen + 1);
641
642         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
643         if (rc) {
644                 ptlrpc_request_free(req);
645                 RETURN(ERR_PTR(rc));
646         }
647
648         /* pack the intent */
649         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
650         lit->opc = (__u64)it->it_op;
651
652         if (obd->u.cli.cl_default_mds_easize > 0)
653                 easize = obd->u.cli.cl_default_mds_easize;
654         else
655                 easize = obd->u.cli.cl_max_mds_easize;
656
657         /* pack the intended request */
658         mdc_getattr_pack(&req->rq_pill, valid, it->it_open_flags, op_data,
659                          easize);
660
661         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
662         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
663         req_capsule_set_size(&req->rq_pill, &RMF_DEFAULT_MDT_MD, RCL_SERVER,
664                              sizeof(struct lmv_user_md));
665
666         if (have_secctx) {
667                 char *secctx_name;
668
669                 secctx_name = req_capsule_client_get(&req->rq_pill,
670                                                      &RMF_FILE_SECCTX_NAME);
671                 memcpy(secctx_name, op_data->op_file_secctx_name,
672                        op_data->op_file_secctx_name_size);
673
674                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
675                                      RCL_SERVER, easize);
676
677                 CDEBUG(D_SEC, "packed '"DNAME"' as security xattr name\n",
678                        encode_fn_opdata(op_data));
679         } else {
680                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
681                                      RCL_SERVER, 0);
682         }
683
684         if (exp_connect_encrypt(exp) && it->it_op & (IT_LOOKUP | IT_GETATTR))
685                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_ENCCTX,
686                                      RCL_SERVER, easize);
687         else
688                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_ENCCTX,
689                                      RCL_SERVER, 0);
690
691         ptlrpc_request_set_replen(req);
692         RETURN(req);
693 }
694
695 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
696                                                      struct lookup_intent *it,
697                                                      struct md_op_data *op_data)
698 {
699         struct obd_device *obd = class_exp2obd(exp);
700         struct ptlrpc_request *req;
701         struct ldlm_intent *lit;
702         struct layout_intent *layout;
703         LIST_HEAD(cancels);
704         int count = 0, rc;
705
706         ENTRY;
707         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
708                                 &RQF_LDLM_INTENT_LAYOUT);
709         if (req == NULL)
710                 RETURN(ERR_PTR(-ENOMEM));
711
712         if (fid_is_sane(&op_data->op_fid2) && (it->it_op & IT_LAYOUT) &&
713             (it->it_open_flags & FMODE_WRITE)) {
714                 count = mdc_resource_cancel_unused(exp, &op_data->op_fid2,
715                                                    &cancels, LCK_EX,
716                                                    MDS_INODELOCK_LAYOUT);
717         }
718
719         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
720         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
721         if (rc) {
722                 ptlrpc_request_free(req);
723                 RETURN(ERR_PTR(rc));
724         }
725
726         /* pack the intent */
727         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
728         lit->opc = (__u64)it->it_op;
729
730         /* pack the layout intent request */
731         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
732         LASSERT(op_data->op_data != NULL);
733         LASSERT(op_data->op_data_size == sizeof(*layout));
734         memcpy(layout, op_data->op_data, sizeof(*layout));
735
736         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
737                              obd->u.cli.cl_default_mds_easize);
738         ptlrpc_request_set_replen(req);
739         RETURN(req);
740 }
741
742 static struct ptlrpc_request *mdc_enqueue_pack(struct obd_export *exp,
743                                                int lvb_len)
744 {
745         struct ptlrpc_request *req;
746         int rc;
747
748         ENTRY;
749         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
750         if (req == NULL)
751                 RETURN(ERR_PTR(-ENOMEM));
752
753         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
754         if (rc) {
755                 ptlrpc_request_free(req);
756                 RETURN(ERR_PTR(rc));
757         }
758
759         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
760         ptlrpc_request_set_replen(req);
761         RETURN(req);
762 }
763
764 int mdc_finish_enqueue(struct obd_export *exp,
765                        struct req_capsule *pill,
766                        struct ldlm_enqueue_info *einfo,
767                        struct lookup_intent *it,
768                        struct lustre_handle *lockh, int rc)
769 {
770         struct ptlrpc_request *req = pill->rc_req;
771         struct ldlm_request *lockreq;
772         struct ldlm_reply *lockrep;
773         struct ldlm_lock *lock;
774         struct mdt_body *body = NULL;
775         void *lvb_data = NULL;
776         __u32 lvb_len = 0;
777
778         ENTRY;
779         LASSERT(rc >= 0);
780         /* Similarly, if we're going to replay this request, we don't want to
781          * actually get a lock, just perform the intent.
782          */
783         if (req->rq_transno || req->rq_replay) {
784                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
785                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
786         }
787
788         if (rc == ELDLM_LOCK_ABORTED) {
789                 einfo->ei_mode = 0;
790                 memset(lockh, 0, sizeof(*lockh));
791                 rc = 0;
792         } else { /* rc = 0 */
793                 lock = ldlm_handle2lock(lockh);
794                 LASSERT(lock != NULL);
795
796                 /* If server returned a different lock mode, fix up variables */
797                 if (lock->l_req_mode != einfo->ei_mode) {
798                         ldlm_lock_addref(lockh, lock->l_req_mode);
799                         ldlm_lock_decref(lockh, einfo->ei_mode);
800                         einfo->ei_mode = lock->l_req_mode;
801                 }
802                 ldlm_lock_put(lock);
803         }
804
805         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
806         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
807
808         it->it_disposition = (int)lockrep->lock_policy_res1;
809         it->it_status = (int)lockrep->lock_policy_res2;
810         it->it_lock_mode = einfo->ei_mode;
811         it->it_lock_handle = lockh->cookie;
812         it->it_request = req;
813
814         /* Technically speaking rq_transno must already be zero if
815          * it_status is in error, so the check is a bit redundant.
816          */
817         if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
818                 mdc_clear_replay_flag(req, it->it_status);
819
820         /* If we're doing an IT_OPEN which did not result in an actual
821          * successful open, then we need to remove the bit which saves
822          * this request for unconditional replay.
823          *
824          * It's important that we do this first!  Otherwise we might exit the
825          * function without doing so, and try to replay a failed create.
826          * (b=3440)
827          */
828         if (it->it_op & IT_OPEN && req->rq_replay &&
829             (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
830                 mdc_clear_replay_flag(req, it->it_status);
831
832         DEBUG_REQ(D_RPCTRACE, req, "op=%x disposition=%x, status=%d",
833                   it->it_op, it->it_disposition, it->it_status);
834
835         /* We know what to expect, so we do any byte flipping required here */
836         if (it_has_reply_body(it)) {
837                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
838                 if (body == NULL) {
839                         rc = -EPROTO;
840                         CERROR("%s: cannot swab mdt_body: rc = %d\n",
841                                exp->exp_obd->obd_name, rc);
842                         RETURN(rc);
843                 }
844
845                 if (it_disposition(it, DISP_OPEN_OPEN) &&
846                     !it_open_error(DISP_OPEN_OPEN, it)) {
847                         /*
848                          * If this is a successful OPEN request, we need to set
849                          * replay handler and data early, so that if replay
850                          * happens immediately after swabbing below, new reply
851                          * is swabbed by that handler correctly.
852                          */
853                         mdc_set_open_replay_data(NULL, NULL, it);
854                 }
855
856                 if (it_disposition(it, DISP_OPEN_CREATE) &&
857                     !it_open_error(DISP_OPEN_CREATE, it)) {
858                         lprocfs_counter_incr(exp->exp_obd->obd_md_stats,
859                                              LPROC_MD_CREATE);
860                 }
861
862                 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
863                         void *eadata;
864
865                         mdc_update_max_ea_from_body(exp, body);
866
867                         /*
868                          * The eadata is opaque; just check that it is there.
869                          * Eventually, obd_unpackmd() will check the contents.
870                          */
871                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
872                                                         body->mbo_eadatasize);
873                         if (eadata == NULL)
874                                 RETURN(-EPROTO);
875
876                         /* save LVB data and length if for layout lock */
877                         lvb_data = eadata;
878                         lvb_len = body->mbo_eadatasize;
879
880                         /*
881                          * We save the reply LOV EA in case we have to replay a
882                          * create for recovery.  If we didn't allocate a large
883                          * enough request buffer above we need to reallocate it
884                          * here to hold the actual LOV EA.
885                          *
886                          * To not save LOV EA if request is not going to replay
887                          * (for example error one).
888                          */
889                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
890                                 rc = mdc_save_lmm(req, eadata,
891                                                   body->mbo_eadatasize);
892                                 if (rc) {
893                                         body->mbo_valid &= ~OBD_MD_FLEASIZE;
894                                         body->mbo_eadatasize = 0;
895                                         rc = 0;
896                                 }
897                         }
898                 }
899         } else if (it->it_op & IT_LAYOUT) {
900                 /* maybe the lock was granted right away and layout
901                  * is packed into RMF_DLM_LVB of req
902                  */
903                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
904                 CDEBUG(D_INFO, "%s: layout return lvb %d transno %lld\n",
905                        class_exp2obd(exp)->obd_name, lvb_len, req->rq_transno);
906                 if (lvb_len > 0) {
907                         lvb_data = req_capsule_server_sized_get(pill,
908                                                         &RMF_DLM_LVB, lvb_len);
909                         if (lvb_data == NULL)
910                                 RETURN(-EPROTO);
911
912                         /**
913                          * save replied layout data to the request buffer for
914                          * recovery consideration (lest MDS reinitialize
915                          * another set of OST objects).
916                          */
917                         if (req->rq_transno)
918                                 mdc_save_lmm(req, lvb_data, lvb_len);
919                 }
920         }
921
922         /* fill in stripe data for layout lock.
923          * LU-6581: trust layout data only if layout lock is granted. The MDT
924          * has stopped sending layout unless the layout lock is granted. The
925          * client still does this checking in case it's talking with an old
926          * server. - Jinshan
927          */
928         lock = ldlm_handle2lock(lockh);
929         if (lock == NULL)
930                 RETURN(rc);
931
932         if (ldlm_has_layout(lock) && lvb_data != NULL &&
933             !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
934                 void *lmm;
935
936                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
937                         ldlm_it2str(it->it_op), lvb_len);
938
939                 OBD_ALLOC_LARGE(lmm, lvb_len);
940                 if (lmm == NULL)
941                         GOTO(out_lock, rc = -ENOMEM);
942
943                 memcpy(lmm, lvb_data, lvb_len);
944
945                 /* install lvb_data */
946                 lock_res_and_lock(lock);
947                 if (lock->l_lvb_data == NULL) {
948                         lock->l_lvb_type = LVB_T_LAYOUT;
949                         lock->l_lvb_data = lmm;
950                         lock->l_lvb_len = lvb_len;
951                         lmm = NULL;
952                 }
953                 unlock_res_and_lock(lock);
954                 if (lmm != NULL)
955                         OBD_FREE_LARGE(lmm, lvb_len);
956         }
957
958         if (ldlm_has_dom(lock)) {
959                 LASSERT(lock->l_glimpse_ast == mdc_ldlm_glimpse_ast);
960
961                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
962                 if (!(body->mbo_valid & OBD_MD_DOM_SIZE)) {
963                         LDLM_ERROR(lock, "%s: DoM lock without size.",
964                                    exp->exp_obd->obd_name);
965                         GOTO(out_lock, rc = -EPROTO);
966                 }
967
968                 LDLM_DEBUG(lock, "DoM lock is returned by: %s, size: %llu",
969                            ldlm_it2str(it->it_op), body->mbo_dom_size);
970
971                 /* l_ost_lvb is only in the LDLM_IBITS union **/
972                 LASSERT(lock->l_resource->lr_type == LDLM_IBITS);
973                 lock_res_and_lock(lock);
974                 mdc_body2lvb(body, &lock->l_ost_lvb);
975                 ldlm_lock_allow_match_locked(lock);
976                 unlock_res_and_lock(lock);
977         }
978 out_lock:
979         ldlm_lock_put(lock);
980
981         RETURN(rc);
982 }
983
984 static inline bool mdc_skip_mod_rpc_slot(const struct lookup_intent *it)
985 {
986         if (it != NULL &&
987             (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
988              it->it_op == IT_READDIR || it->it_op == IT_GETXATTR ||
989              (it->it_op == IT_LAYOUT && !(it->it_open_flags &
990                                           MDS_FMODE_WRITE))))
991                 return true;
992         return false;
993 }
994
995 /* We always reserve enough space in the reply packet for a stripe MD, because
996  * we don't know in advance the file type.
997  */
998 static int mdc_enqueue_base(struct obd_export *exp,
999                             struct ldlm_enqueue_info *einfo,
1000                             const union ldlm_policy_data *policy,
1001                             struct lookup_intent *it,
1002                             struct md_op_data *op_data,
1003                             struct lustre_handle *lockh,
1004                             __u64 extra_lock_flags)
1005 {
1006         struct obd_device *obd = class_exp2obd(exp);
1007         struct ptlrpc_request *req;
1008         __u64 flags, saved_flags = extra_lock_flags;
1009         struct ldlm_res_id res_id;
1010         static const union ldlm_policy_data lookup_policy = {
1011                                   .l_inodebits = { MDS_INODELOCK_LOOKUP } };
1012         static const union ldlm_policy_data update_policy = {
1013                                   .l_inodebits = { MDS_INODELOCK_UPDATE } };
1014         static const union ldlm_policy_data layout_policy = {
1015                                   .l_inodebits = { MDS_INODELOCK_LAYOUT } };
1016         static const union ldlm_policy_data getxattr_policy = {
1017                                   .l_inodebits = { MDS_INODELOCK_XATTR } };
1018         int generation, resends = 0;
1019         struct ldlm_reply *lockrep;
1020         struct obd_import *imp = class_exp2cliimp(exp);
1021         __u32 acl_bufsize;
1022         enum lvb_type lvb_type = 0;
1023         int rc;
1024
1025         ENTRY;
1026         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
1027                  einfo->ei_type);
1028         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1029
1030         if (it != NULL) {
1031                 LASSERT(policy == NULL);
1032
1033                 saved_flags |= LDLM_FL_HAS_INTENT;
1034                 if (it->it_op & (IT_GETATTR | IT_READDIR | IT_CREAT))
1035                         policy = &update_policy;
1036                 else if (it->it_op & IT_LAYOUT)
1037                         policy = &layout_policy;
1038                 else if (it->it_op & IT_GETXATTR)
1039                         policy = &getxattr_policy;
1040                 else
1041                         policy = &lookup_policy;
1042         }
1043
1044         generation = obd->u.cli.cl_import->imp_generation;
1045         if (!it || (it->it_op & (IT_OPEN | IT_CREAT)))
1046                 acl_bufsize = min_t(__u32, imp->imp_connect_data.ocd_max_easize,
1047                                     XATTR_SIZE_MAX);
1048         else
1049                 acl_bufsize = LUSTRE_POSIX_ACL_MAX_SIZE_OLD;
1050
1051 resend:
1052         flags = saved_flags;
1053         if (it == NULL) {
1054                 /* The only way right now is FLOCK. */
1055                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
1056                          einfo->ei_type);
1057                 res_id.name[3] = LDLM_FLOCK;
1058                 req = ldlm_enqueue_pack(exp, 0);
1059         } else if (it->it_op & IT_OPEN) {
1060                 req = mdc_intent_open_pack(exp, it, op_data, acl_bufsize);
1061         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
1062                 req = mdc_intent_getattr_pack(exp, it, op_data, acl_bufsize);
1063         } else if (it->it_op & IT_READDIR) {
1064                 req = mdc_enqueue_pack(exp, 0);
1065         } else if (it->it_op & IT_LAYOUT) {
1066                 if (!imp_connect_lvb_type(imp))
1067                         RETURN(-EOPNOTSUPP);
1068                 req = mdc_intent_layout_pack(exp, it, op_data);
1069                 lvb_type = LVB_T_LAYOUT;
1070         } else if (it->it_op & IT_GETXATTR) {
1071                 req = mdc_intent_getxattr_pack(exp, it, op_data);
1072         } else if (it->it_op == IT_CREAT) {
1073                 req = mdc_intent_create_pack(exp, it, op_data, acl_bufsize,
1074                                              extra_lock_flags);
1075         } else {
1076                 LBUG();
1077                 RETURN(-EINVAL);
1078         }
1079
1080         if (IS_ERR(req))
1081                 RETURN(PTR_ERR(req));
1082
1083         lustre_msg_set_projid(req->rq_reqmsg, op_data->op_projid);
1084
1085         if (resends) {
1086                 req->rq_generation_set = 1;
1087                 req->rq_import_generation = generation;
1088                 req->rq_sent = ktime_get_real_seconds() + resends;
1089         }
1090
1091         einfo->ei_req_slot = !(op_data->op_cli_flags & CLI_NO_SLOT);
1092         einfo->ei_mod_slot = !mdc_skip_mod_rpc_slot(it);
1093
1094         /* With Data-on-MDT the glimpse callback is needed too.
1095          * It is set here in advance but not in mdc_finish_enqueue()
1096          * to avoid possible races. It is safe to have glimpse handler
1097          * for non-DOM locks and costs nothing.
1098          */
1099         if (einfo->ei_cb_gl == NULL)
1100                 einfo->ei_cb_gl = mdc_ldlm_glimpse_ast;
1101
1102         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
1103                               0, lvb_type, lockh, 0);
1104
1105         if (!it) {
1106                 /* For flock requests we immediatelly return without further
1107                  * delay and let caller deal with the rest, since rest of
1108                  * this function metadata processing makes no sense for flock
1109                  * requests anyway. But in case of problem during comms with
1110                  * server (-ETIMEDOUT) or any signal/kill attempt (-EINTR),
1111                  * we cannot rely on caller and this mainly for F_UNLCKs
1112                  * (explicits or automatically generated by kernel to clean
1113                  * current flocks upon exit) that can't be trashed.
1114                  */
1115                 ptlrpc_req_put(req);
1116                 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
1117                     (einfo->ei_type == LDLM_FLOCK) &&
1118                     (einfo->ei_mode == LCK_NL))
1119                         goto resend;
1120                 RETURN(rc);
1121         }
1122
1123         if (rc < 0) {
1124                 CDEBUG(D_INFO,
1125                       "%s: ldlm_cli_enqueue "DFID":"DFID"=%s failed: rc = %d\n",
1126                       obd->obd_name, PFID(&op_data->op_fid1),
1127                       PFID(&op_data->op_fid2), op_data->op_name ?: "", rc);
1128
1129                 mdc_clear_replay_flag(req, rc);
1130                 ptlrpc_req_put(req);
1131                 RETURN(rc);
1132         }
1133
1134         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1135         LASSERT(lockrep != NULL);
1136
1137         lockrep->lock_policy_res2 =
1138                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1139
1140         /* Retry infinitely when the server returns -EINPROGRESS for the
1141          * intent operation, when server returns -EINPROGRESS for acquiring
1142          * intent lock, we'll retry in after_reply().
1143          */
1144         if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
1145                 mdc_clear_replay_flag(req, rc);
1146                 ptlrpc_req_put(req);
1147                 if (generation == obd->u.cli.cl_import->imp_generation) {
1148                         if (signal_pending(current))
1149                                 RETURN(-EINTR);
1150
1151                         resends++;
1152                         CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
1153                                obd->obd_name, resends, it->it_op,
1154                                PFID(&op_data->op_fid1),
1155                                PFID(&op_data->op_fid2));
1156                         goto resend;
1157                 } else {
1158                         CDEBUG(D_HA, "resend cross eviction\n");
1159                         RETURN(-EIO);
1160                 }
1161         }
1162
1163         if ((int)lockrep->lock_policy_res2 == -ERANGE &&
1164             it->it_op & (IT_OPEN | IT_GETATTR | IT_LOOKUP) &&
1165             acl_bufsize == LUSTRE_POSIX_ACL_MAX_SIZE_OLD) {
1166                 mdc_clear_replay_flag(req, -ERANGE);
1167                 ptlrpc_req_put(req);
1168                 acl_bufsize = min_t(__u32, imp->imp_connect_data.ocd_max_easize,
1169                                     XATTR_SIZE_MAX);
1170                 goto resend;
1171         }
1172
1173         rc = mdc_finish_enqueue(exp, &req->rq_pill, einfo, it, lockh, rc);
1174         if (rc < 0) {
1175                 if (lustre_handle_is_used(lockh)) {
1176                         ldlm_lock_decref(lockh, einfo->ei_mode);
1177                         memset(lockh, 0, sizeof(*lockh));
1178                 }
1179                 ptlrpc_req_put(req);
1180
1181                 it->it_lock_handle = 0;
1182                 it->it_lock_mode = 0;
1183                 it->it_request = NULL;
1184         }
1185
1186         RETURN(rc);
1187 }
1188
1189 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
1190                 const union ldlm_policy_data *policy,
1191                 struct md_op_data *op_data,
1192                 struct lustre_handle *lockh, __u64 extra_lock_flags)
1193 {
1194         return mdc_enqueue_base(exp, einfo, policy, NULL,
1195                                 op_data, lockh, extra_lock_flags);
1196 }
1197
1198 static int mdc_enqueue_async_interpret(const struct lu_env *env,
1199                                        struct ptlrpc_request *req,
1200                                        void *args, int rc)
1201 {
1202         struct mdc_enqueue_args *mea = args;
1203         struct obd_export       *exp = mea->mea_exp;
1204         struct ldlm_lock        *lock = mea->mea_lock;
1205         struct lustre_handle    lockh;
1206         struct ldlm_enqueue_info  einfo = {
1207                         .ei_type = LDLM_FLOCK,
1208                         .ei_mode = mea->mea_mode,
1209         };
1210
1211         ENTRY;
1212         CDEBUG(D_INFO, "req=%p rc=%d\n", req, rc);
1213
1214         ldlm_lock2handle(lock, &lockh);
1215         rc = ldlm_cli_enqueue_fini(exp, &req->rq_pill, &einfo, 1,
1216                                   &mea->mea_flags, NULL, 0, &lockh, rc, true);
1217         if (rc == -ENOLCK)
1218                 ldlm_lock_put(lock);
1219
1220         /* we expect failed_lock_cleanup() to destroy lock */
1221         if (rc != 0)
1222                 LASSERT(list_empty(&lock->l_res_link));
1223
1224         if (mea->mea_upcall != NULL)
1225                 mea->mea_upcall(lock, rc);
1226
1227         ldlm_lock_put(lock);
1228
1229         RETURN(rc);
1230 }
1231
1232 int mdc_enqueue_async(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
1233                       obd_enqueue_update_f upcall, struct md_op_data *op_data,
1234                       const union ldlm_policy_data *policy, __u64 flags)
1235 {
1236         struct mdc_enqueue_args *mea;
1237         struct ptlrpc_request *req;
1238         int                    rc;
1239         struct ldlm_res_id res_id;
1240         struct lustre_handle lockh;
1241
1242         ENTRY;
1243         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1244
1245         LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
1246                  einfo->ei_type);
1247         res_id.name[3] = LDLM_FLOCK;
1248
1249         req = ldlm_enqueue_pack(exp, 0);
1250         if (IS_ERR(req))
1251                 RETURN(PTR_ERR(req));
1252
1253         einfo->ei_req_slot = 1;
1254         einfo->ei_mod_slot = 1;
1255
1256         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
1257                               0, 0, &lockh, 1);
1258         if (rc) {
1259                 ptlrpc_req_put(req);
1260                 RETURN(rc);
1261         }
1262
1263         mea = ptlrpc_req_async_args(mea, req);
1264         mea->mea_exp = exp;
1265         mea->mea_lock = ldlm_handle2lock(&lockh);
1266         LASSERT(mea->mea_lock != NULL);
1267
1268         mea->mea_mode = einfo->ei_mode;
1269         mea->mea_flags = flags;
1270         mea->mea_upcall = upcall;
1271
1272         req->rq_interpret_reply = mdc_enqueue_async_interpret;
1273         ptlrpcd_add_req(req);
1274
1275         RETURN(0);
1276 }
1277
1278 static int mdc_finish_intent_lock(struct obd_export *exp,
1279                                   struct ptlrpc_request *request,
1280                                   struct md_op_data *op_data,
1281                                   struct lookup_intent *it,
1282                                   struct lustre_handle *lockh)
1283 {
1284         struct lustre_handle old_lock;
1285         struct ldlm_lock *lock;
1286         int rc = 0;
1287
1288         ENTRY;
1289         LASSERT(request != NULL);
1290         LASSERT(request != LP_POISON);
1291         LASSERT(request->rq_repmsg != LP_POISON);
1292
1293         if (it->it_op & IT_READDIR)
1294                 RETURN(0);
1295
1296         if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) {
1297                 if (it->it_status != 0)
1298                         GOTO(out, rc = it->it_status);
1299         } else {
1300                 if (!it_disposition(it, DISP_IT_EXECD)) {
1301                         /* The server failed before it even started executing
1302                          * the intent, i.e. because it couldn't unpack the
1303                          * request.
1304                          */
1305                         LASSERT(it->it_status != 0);
1306                         GOTO(out, rc = it->it_status);
1307                 }
1308                 rc = it_open_error(DISP_IT_EXECD, it);
1309                 if (rc)
1310                         GOTO(out, rc);
1311
1312                 rc = it_open_error(DISP_LOOKUP_EXECD, it);
1313                 if (rc)
1314                         GOTO(out, rc);
1315
1316                 /* keep requests around for the multiple phases of the call
1317                  * this shows the DISP_XX must guarantee we make it into the
1318                  * call
1319                  */
1320                 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
1321                     it_disposition(it, DISP_OPEN_CREATE) &&
1322                     !it_open_error(DISP_OPEN_CREATE, it)) {
1323                         it_set_disposition(it, DISP_ENQ_CREATE_REF);
1324                         /* balanced in ll_create_node */
1325                         ptlrpc_request_addref(request);
1326                 }
1327                 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1328                     it_disposition(it, DISP_OPEN_OPEN) &&
1329                     !it_open_error(DISP_OPEN_OPEN, it)) {
1330                         it_set_disposition(it, DISP_ENQ_OPEN_REF);
1331                         /* balanced in ll_file_open */
1332                         ptlrpc_request_addref(request);
1333                         /* eviction in middle of open RPC processing b=11546 */
1334                         CFS_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE,
1335                                          obd_timeout);
1336                 }
1337
1338                 if (it->it_op & IT_CREAT) {
1339                         /* XXX this belongs in ll_create_it */
1340                 } else if (it->it_op == IT_OPEN) {
1341                         LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1342                 } else {
1343                         LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
1344                 }
1345         }
1346
1347         /* If we already have a matching lock, then cancel the new
1348          * one.  We have to set the data here instead of in
1349          * mdc_enqueue, because we need to use the child's inode as
1350          * the l_ast_data to match, and that's not available until
1351          * intent_finish has performed the iget().
1352          */
1353         lock = ldlm_handle2lock(lockh);
1354         if (lock) {
1355                 union ldlm_policy_data policy = lock->l_policy_data;
1356
1357                 LDLM_DEBUG(lock, "matching against this");
1358
1359                 if (it_has_reply_body(it)) {
1360                         struct mdt_body *body;
1361
1362                         body = req_capsule_server_get(&request->rq_pill,
1363                                                       &RMF_MDT_BODY);
1364                         /* mdc_enqueue checked */
1365                         LASSERT(body != NULL);
1366                         LASSERTF(fid_res_name_eq(&body->mbo_fid1,
1367                                                  &lock->l_resource->lr_name),
1368                                  "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1369                                  PLDLMRES(lock->l_resource),
1370                                  PFID(&body->mbo_fid1));
1371                 }
1372                 ldlm_lock_put(lock);
1373
1374                 memcpy(&old_lock, lockh, sizeof(*lockh));
1375                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1376                                    LDLM_IBITS, &policy, LCK_NL, 0, &old_lock)) {
1377                         ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
1378                         memcpy(lockh, &old_lock, sizeof(old_lock));
1379                         it->it_lock_handle = lockh->cookie;
1380                 }
1381         }
1382
1383         EXIT;
1384 out:
1385         CDEBUG(D_DENTRY,
1386                "D_IT dentry="DNAME" intent=%s status=%d disp=%x: rc = %d\n",
1387                encode_fn_opdata(op_data), ldlm_it2str(it->it_op),
1388                it->it_status, it->it_disposition, rc);
1389
1390         return rc;
1391 }
1392
1393 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1394                         struct lu_fid *fid, enum mds_ibits_locks *bits)
1395 {
1396         /* We could just return 1 immediately, but as we should only be called
1397          * in revalidate_it if we already have a lock, let's verify that.
1398          */
1399         struct ldlm_res_id res_id;
1400         struct lustre_handle lockh;
1401         union ldlm_policy_data policy;
1402         enum ldlm_mode mode;
1403
1404         ENTRY;
1405         if (it->it_lock_handle) {
1406                 lockh.cookie = it->it_lock_handle;
1407                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1408         } else {
1409                 fid_build_reg_res_name(fid, &res_id);
1410                 switch (it->it_op) {
1411                 case IT_GETATTR:
1412                         /* File attributes are held under multiple bits:
1413                          * nlink is under lookup lock, size and times are
1414                          * under UPDATE lock and recently we've also got
1415                          * a separate permissions lock for owner/group/acl that
1416                          * were protected by lookup lock before.
1417                          * Getattr must provide all of that information,
1418                          * so we need to ensure we have all of those locks.
1419                          * Unfortunately, if the bits are split across multiple
1420                          * locks, there's no easy way to match all of them here,
1421                          * so an extra RPC would be performed to fetch all
1422                          * of those bits at once for now.
1423                          */
1424                         /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1425                          * but for old MDTs (< 2.4), permission is covered
1426                          * by LOOKUP lock, so it needs to match all bits here.
1427                          */
1428                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1429                                                   MDS_INODELOCK_PERM;
1430                         break;
1431                 case IT_READDIR:
1432                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1433                         break;
1434                 case IT_LAYOUT:
1435                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1436                         break;
1437                 default:
1438                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1439                         break;
1440                 }
1441
1442                 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1443                                       LDLM_IBITS, &policy,
1444                                       LCK_CR | LCK_CW | LCK_PR | LCK_PW, 0,
1445                                       &lockh);
1446         }
1447
1448         if (mode) {
1449                 it->it_lock_handle = lockh.cookie;
1450                 it->it_lock_mode = mode;
1451         } else {
1452                 it->it_lock_handle = 0;
1453                 it->it_lock_mode = 0;
1454         }
1455
1456         RETURN(!!mode);
1457 }
1458
1459 /*
1460  * This long block is all about fixing up the lock and request state
1461  * so that it is correct as of the moment _before_ the operation was
1462  * applied; that way, the VFS will think that everything is normal and
1463  * call Lustre's regular VFS methods.
1464  *
1465  * If we're performing a creation, that means that unless the creation
1466  * failed with EEXIST, we should fake up a negative dentry.
1467  *
1468  * For everything else, we want the lookup to succeed.
1469  *
1470  * One additional note: if CREATE or OPEN succeeded, we add an extra
1471  * reference to the request because we need to keep it around until
1472  * ll_create/ll_open gets called.
1473  *
1474  * The server will return to us, in it_disposition, an indication of
1475  * exactly what it_status refers to.
1476  *
1477  * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1478  * otherwise if DISP_OPEN_CREATE is set, then it_status is the
1479  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1480  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1481  * was successful.
1482  *
1483  * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1484  * child lookup.
1485  */
1486 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1487                     struct lookup_intent *it, struct ptlrpc_request **reqp,
1488                     ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1489 {
1490         struct ldlm_enqueue_info einfo = {
1491                 .ei_type        = LDLM_IBITS,
1492                 .ei_mode        = it_to_lock_mode(it),
1493                 .ei_cb_bl       = cb_blocking,
1494                 .ei_cb_cp       = ldlm_completion_ast,
1495                 .ei_cb_gl       = mdc_ldlm_glimpse_ast,
1496         };
1497         struct lustre_handle lockh;
1498         int rc = 0;
1499
1500         ENTRY;
1501         LASSERT(it);
1502         CDEBUG(D_DLMTRACE,
1503                "(name: "DNAME","DFID") in obj "DFID", intent: %s flags %#lo\n",
1504                encode_fn_opdata(op_data), PFID(&op_data->op_fid2),
1505                PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1506                it->it_open_flags);
1507
1508         lockh.cookie = 0;
1509         /* MDS_FID_OP is not a revalidate case */
1510         if (fid_is_sane(&op_data->op_fid2) &&
1511             (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR)) &&
1512             !(op_data->op_bias & MDS_FID_OP)) {
1513                 /* We could just return 1 immediately, but since we should only
1514                  * be called in revalidate_it if we already have a lock, let's
1515                  * verify that.
1516                  */
1517                 it->it_lock_handle = 0;
1518                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1519                 /* Only return failure if it was not GETATTR by cfid
1520                  * (from inode_revalidate()).
1521                  */
1522                 if (rc || op_data->op_namelen != 0)
1523                         RETURN(rc);
1524         }
1525
1526         /* For case if upper layer did not alloc fid, do it now. */
1527         if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1528                 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1529                 if (rc < 0) {
1530                         CERROR("%s: cannot allocate new FID: rc=%d\n",
1531                                exp->exp_obd->obd_name, rc);
1532                         RETURN(rc);
1533                 }
1534         }
1535
1536         rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh,
1537                               extra_lock_flags);
1538         if (rc < 0)
1539                 RETURN(rc);
1540
1541         *reqp = it->it_request;
1542         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1543         RETURN(rc);
1544 }
1545
1546 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1547                                               struct ptlrpc_request *req,
1548                                               void *args, int rc)
1549 {
1550         struct mdc_getattr_args *ga = args;
1551         struct obd_export *exp = ga->ga_exp;
1552         struct md_op_item *item = ga->ga_item;
1553         struct ldlm_enqueue_info *einfo = &item->mop_einfo;
1554         struct lookup_intent *it = &item->mop_it;
1555         struct lustre_handle *lockh = &item->mop_lockh;
1556         struct req_capsule *pill = &req->rq_pill;
1557         struct ldlm_reply *lockrep;
1558         __u64 flags = LDLM_FL_HAS_INTENT;
1559
1560         ENTRY;
1561         if (CFS_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1562                 rc = -ETIMEDOUT;
1563
1564         rc = ldlm_cli_enqueue_fini(exp, pill, einfo, 1, &flags, NULL, 0,
1565                                    lockh, rc, true);
1566         if (rc < 0) {
1567                 CERROR("%s: ldlm_cli_enqueue_fini() failed: rc = %d\n",
1568                        exp->exp_obd->obd_name, rc);
1569                 mdc_clear_replay_flag(req, rc);
1570                 GOTO(out, rc);
1571         }
1572
1573         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
1574         LASSERT(lockrep != NULL);
1575
1576         lockrep->lock_policy_res2 =
1577                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1578
1579         rc = mdc_finish_enqueue(exp, pill, einfo, it, lockh, rc);
1580         if (rc)
1581                 GOTO(out, rc);
1582
1583         rc = mdc_finish_intent_lock(exp, req, &item->mop_data, it, lockh);
1584         EXIT;
1585
1586 out:
1587         item->mop_pill = pill;
1588         item->mop_cb(item, rc);
1589         return 0;
1590 }
1591
1592 int mdc_intent_getattr_async(struct obd_export *exp,
1593                              struct md_op_item *item)
1594 {
1595         struct md_op_data *op_data = &item->mop_data;
1596         struct lookup_intent *it = &item->mop_it;
1597         struct ptlrpc_request *req;
1598         struct mdc_getattr_args *ga;
1599         struct ldlm_res_id res_id;
1600         union ldlm_policy_data policy = {
1601                 .l_inodebits = { MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE }
1602         };
1603         __u64 flags = LDLM_FL_HAS_INTENT;
1604         int rc = 0;
1605
1606         ENTRY;
1607         CDEBUG(D_DLMTRACE,
1608                "name: "DNAME" in inode "DFID", intent: %s flags %#lo\n",
1609                encode_fn_opdata(op_data), PFID(&op_data->op_fid1),
1610                ldlm_it2str(it->it_op), it->it_open_flags);
1611
1612         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1613         /* If the MDT return -ERANGE because of large ACL, then the sponsor
1614          * of the async getattr RPC will handle that by itself.
1615          */
1616         req = mdc_intent_getattr_pack(exp, it, op_data,
1617                                       LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
1618         if (IS_ERR(req))
1619                 RETURN(PTR_ERR(req));
1620
1621         /* With Data-on-MDT the glimpse callback is needed too.
1622          * It is set here in advance but not in mdc_finish_enqueue()
1623          * to avoid possible races. It is safe to have glimpse handler
1624          * for non-DOM locks and costs nothing.
1625          */
1626         if (item->mop_einfo.ei_cb_gl == NULL)
1627                 item->mop_einfo.ei_cb_gl = mdc_ldlm_glimpse_ast;
1628
1629         rc = ldlm_cli_enqueue(exp, &req, &item->mop_einfo, &res_id, &policy,
1630                               &flags, NULL, 0, LVB_T_NONE, &item->mop_lockh, 1);
1631         if (rc < 0) {
1632                 ptlrpc_req_put(req);
1633                 RETURN(rc);
1634         }
1635
1636         ga = ptlrpc_req_async_args(ga, req);
1637         ga->ga_exp = exp;
1638         ga->ga_item = item;
1639
1640         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1641         ptlrpcd_add_req(req);
1642
1643         RETURN(0);
1644 }