Whamcloud - gitweb
LU-17662 osd-zfs: Support for ZFS 2.2.3
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  */
31
32 #define DEBUG_SUBSYSTEM S_MDC
33
34 #include <linux/module.h>
35
36 #include <obd.h>
37 #include <obd_class.h>
38 #include <lustre_dlm.h>
39 #include <lustre_fid.h>
40 #include <lustre_intent.h>
41 #include <lustre_mdc.h>
42 #include <lustre_net.h>
43 #include <lustre_req_layout.h>
44 #include <lustre_swab.h>
45 #include <lustre_acl.h>
46
47 #include "mdc_internal.h"
48
49 struct mdc_getattr_args {
50         struct obd_export       *ga_exp;
51         struct md_op_item       *ga_item;
52 };
53
54 int it_open_error(int phase, struct lookup_intent *it)
55 {
56         if (it_disposition(it, DISP_OPEN_LEASE)) {
57                 if (phase >= DISP_OPEN_LEASE)
58                         return it->it_status;
59                 else
60                         return 0;
61         }
62         if (it_disposition(it, DISP_OPEN_OPEN)) {
63                 if (phase >= DISP_OPEN_OPEN)
64                         return it->it_status;
65                 else
66                         return 0;
67         }
68
69         if (it_disposition(it, DISP_OPEN_CREATE)) {
70                 if (phase >= DISP_OPEN_CREATE)
71                         return it->it_status;
72                 else
73                         return 0;
74         }
75
76         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
77                 if (phase >= DISP_LOOKUP_EXECD)
78                         return it->it_status;
79                 else
80                         return 0;
81         }
82
83         if (it_disposition(it, DISP_IT_EXECD)) {
84                 if (phase >= DISP_IT_EXECD)
85                         return it->it_status;
86                 else
87                         return 0;
88         }
89
90         CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
91         LBUG();
92
93         return 0;
94 }
95 EXPORT_SYMBOL(it_open_error);
96
97 /* this must be called on a lockh that is known to have a referenced lock */
98 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
99                       void *data, __u64 *bits)
100 {
101         struct ldlm_lock *lock;
102         struct inode *new_inode = data;
103
104         ENTRY;
105         if (bits)
106                 *bits = 0;
107
108         if (!lustre_handle_is_used(lockh))
109                 RETURN(0);
110
111         lock = ldlm_handle2lock(lockh);
112
113         LASSERT(lock != NULL);
114         lock_res_and_lock(lock);
115         if (lock->l_resource->lr_lvb_inode &&
116             lock->l_resource->lr_lvb_inode != data) {
117                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
118
119                 LASSERTF(old_inode->i_state & I_FREEING,
120                          "Found existing inode %px/%lu/%u state %lu in lock: setting data to %px/%lu/%u\n",
121                          old_inode, old_inode->i_ino, old_inode->i_generation,
122                          old_inode->i_state,
123                          new_inode, new_inode->i_ino, new_inode->i_generation);
124         }
125         lock->l_resource->lr_lvb_inode = new_inode;
126         if (bits)
127                 *bits = lock->l_policy_data.l_inodebits.bits;
128
129         unlock_res_and_lock(lock);
130         LDLM_LOCK_PUT(lock);
131
132         RETURN(0);
133 }
134
135 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
136                               const struct lu_fid *fid, enum ldlm_type type,
137                               union ldlm_policy_data *policy,
138                               enum ldlm_mode mode, struct lustre_handle *lockh)
139 {
140         struct ldlm_res_id res_id;
141         enum ldlm_mode rc;
142
143         ENTRY;
144         fid_build_reg_res_name(fid, &res_id);
145         /* LU-4405: Clear bits not supported by server */
146         policy->l_inodebits.bits &= exp_connect_ibits(exp);
147         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
148                              &res_id, type, policy, mode, lockh);
149         RETURN(rc);
150 }
151
152 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
153                       union ldlm_policy_data *policy, enum ldlm_mode mode,
154                       enum ldlm_cancel_flags flags, void *opaque)
155 {
156         struct obd_device *obd = class_exp2obd(exp);
157         struct ldlm_res_id res_id;
158         int rc;
159
160         ENTRY;
161         fid_build_reg_res_name(fid, &res_id);
162         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
163                                              policy, mode, flags, opaque);
164         RETURN(rc);
165 }
166
167 int mdc_null_inode(struct obd_export *exp,
168                    const struct lu_fid *fid)
169 {
170         struct ldlm_res_id res_id;
171         struct ldlm_resource *res;
172         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
173
174         ENTRY;
175         LASSERTF(ns != NULL, "no namespace passed\n");
176
177         fid_build_reg_res_name(fid, &res_id);
178
179         res = ldlm_resource_get(ns, &res_id, 0, 0);
180         if (IS_ERR(res))
181                 RETURN(0);
182
183         lock_res(res);
184         res->lr_lvb_inode = NULL;
185         unlock_res(res);
186
187         ldlm_resource_putref(res);
188         RETURN(0);
189 }
190
191 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
192 {
193         /* Don't hold error requests for replay. */
194         if (req->rq_replay) {
195                 spin_lock(&req->rq_lock);
196                 req->rq_replay = 0;
197                 spin_unlock(&req->rq_lock);
198         }
199         if (rc && req->rq_transno != 0) {
200                 DEBUG_REQ(D_ERROR, req, "transno returned on error: rc = %d",
201                           rc);
202                 LBUG();
203         }
204 }
205
206 /**
207  * Save a large LOV/LMV EA into the request buffer so that it is available
208  * for replay.  We don't do this in the initial request because the
209  * original request doesn't need this buffer (at most it sends just the
210  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
211  * buffer and may also be difficult to allocate and save a very large
212  * request buffer for each open. (b=5707)
213  *
214  * OOM here may cause recovery failure if lmm is needed (only for the
215  * original open if the MDS crashed just when this client also OOM'd)
216  * but this is incredibly unlikely, and questionable whether the client
217  * could do MDS recovery under OOM anyways...
218  */
219 int mdc_save_lmm(struct ptlrpc_request *req, void *data, u32 size)
220 {
221         struct req_capsule *pill = &req->rq_pill;
222         void *lmm;
223         int rc = 0;
224
225         if (req_capsule_get_size(pill, &RMF_EADATA, RCL_CLIENT) < size) {
226                 rc = sptlrpc_cli_enlarge_reqbuf(req, &RMF_EADATA, size);
227                 if (rc) {
228                         CERROR("%s: Can't enlarge ea size to %d: rc = %d\n",
229                                req->rq_export->exp_obd->obd_name,
230                                size, rc);
231                         return rc;
232                 }
233         } else {
234                 req_capsule_shrink(pill, &RMF_EADATA, size, RCL_CLIENT);
235         }
236
237         req_capsule_set_size(pill, &RMF_EADATA, RCL_CLIENT, size);
238         lmm = req_capsule_client_get(pill, &RMF_EADATA);
239         if (lmm) {
240                 memcpy(lmm, data, size);
241                 lov_fix_ea_for_replay(lmm);
242         }
243
244         return rc;
245 }
246
247 static struct ptlrpc_request *
248 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
249                      struct md_op_data *op_data, __u32 acl_bufsize)
250 {
251         struct ptlrpc_request *req;
252         struct obd_device *obd = class_exp2obd(exp);
253         struct ldlm_intent *lit;
254         const void *lmm = op_data->op_data;
255         __u32 lmmsize = op_data->op_data_size;
256         __u32  mdt_md_capsule_size;
257         LIST_HEAD(cancels);
258         int count = 0;
259         enum ldlm_mode mode;
260         int repsize, repsize_estimate;
261         struct sptlrpc_sepol *sepol;
262         int rc;
263
264         ENTRY;
265
266         mdt_md_capsule_size = obd->u.cli.cl_default_mds_easize;
267
268         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
269
270         /* XXX: openlock is not cancelled for cross-refs. */
271         /* If inode is known, cancel conflicting OPEN locks. */
272         if (fid_is_sane(&op_data->op_fid2)) {
273                 if (it->it_open_flags & MDS_OPEN_LEASE) { /* try to get lease */
274                         if (it->it_open_flags & MDS_FMODE_WRITE)
275                                 mode = LCK_EX;
276                         else
277                                 mode = LCK_PR;
278                 } else {
279                         if (it->it_open_flags & (MDS_FMODE_WRITE |
280                                                  MDS_OPEN_TRUNC))
281                                 mode = LCK_CW;
282 #ifdef FMODE_EXEC
283                         else if (it->it_open_flags & FMODE_EXEC)
284                                 mode = LCK_PR;
285 #endif
286                         else
287                                 mode = LCK_CR;
288                 }
289                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
290                                                 &cancels, mode,
291                                                 MDS_INODELOCK_OPEN);
292         }
293
294         /* If CREATE, cancel parent's UPDATE lock. */
295         if (it->it_op & IT_CREAT)
296                 mode = LCK_EX;
297         else
298                 mode = LCK_CR;
299         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
300                                          &cancels, mode,
301                                          MDS_INODELOCK_UPDATE);
302
303         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
304                                    &RQF_LDLM_INTENT_OPEN);
305         if (req == NULL) {
306                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
307                 RETURN(ERR_PTR(-ENOMEM));
308         }
309
310         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
311                              op_data->op_namelen + 1);
312         if (cl_is_lov_delay_create(it->it_open_flags)) {
313                 /* open(O_LOV_DELAY_CREATE) won't pack lmm */
314                 LASSERT(lmmsize == 0);
315                 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
316         } else {
317                 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
318                              max(lmmsize, obd->u.cli.cl_default_mds_easize));
319         }
320
321         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
322                              RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
323                              op_data->op_file_secctx_name_size : 0);
324
325         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
326                              op_data->op_file_secctx_size);
327
328         req_capsule_set_size(&req->rq_pill, &RMF_FILE_ENCCTX, RCL_CLIENT,
329                              op_data->op_file_encctx_size);
330
331         /* get SELinux policy info if any */
332         sepol = sptlrpc_sepol_get(req);
333         if (IS_ERR(sepol))
334                 GOTO(err_free_rq, rc = PTR_ERR(sepol));
335
336         req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
337                              sptlrpc_sepol_size(sepol));
338
339         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
340         if (rc < 0)
341                 GOTO(err_put_sepol, rc);
342
343         spin_lock(&req->rq_lock);
344         req->rq_replay = req->rq_import->imp_replayable;
345         spin_unlock(&req->rq_lock);
346
347         /* pack the intent */
348         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
349         lit->opc = (__u64)it->it_op;
350
351         /* pack the intended request */
352         mdc_open_pack(&req->rq_pill, op_data, it->it_create_mode, 0,
353                       it->it_open_flags, lmm, lmmsize, sepol);
354
355         sptlrpc_sepol_put(sepol);
356
357         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
358                              mdt_md_capsule_size);
359         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
360
361         if (!(it->it_op & IT_CREAT) && it->it_op & IT_OPEN &&
362             req_capsule_has_field(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
363                                   RCL_CLIENT) &&
364             op_data->op_file_secctx_name_size > 0 &&
365             op_data->op_file_secctx_name != NULL) {
366                 char *secctx_name;
367
368                 secctx_name = req_capsule_client_get(&req->rq_pill,
369                                                      &RMF_FILE_SECCTX_NAME);
370                 memcpy(secctx_name, op_data->op_file_secctx_name,
371                        op_data->op_file_secctx_name_size);
372                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
373                                      RCL_SERVER,
374                                      obd->u.cli.cl_max_mds_easize);
375
376                 CDEBUG(D_SEC, "packed '%.*s' as security xattr name\n",
377                        op_data->op_file_secctx_name_size,
378                        op_data->op_file_secctx_name);
379
380         } else {
381                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
382                                      RCL_SERVER, 0);
383         }
384
385         if (exp_connect_encrypt(exp) && !(it->it_op & IT_CREAT) &&
386             it->it_op & IT_OPEN)
387                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_ENCCTX,
388                                      RCL_SERVER,
389                                      obd->u.cli.cl_max_mds_easize);
390         else
391                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_ENCCTX,
392                                      RCL_SERVER, 0);
393
394         /**
395          * Inline buffer for possible data from Data-on-MDT files.
396          */
397         req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE, RCL_SERVER,
398                              sizeof(struct niobuf_remote));
399         req_capsule_set_size(&req->rq_pill, &RMF_DEFAULT_MDT_MD, RCL_SERVER,
400                              sizeof(struct lmv_user_md));
401         ptlrpc_request_set_replen(req);
402
403         /* Get real repbuf allocated size as rounded up power of 2 */
404         repsize = size_roundup_power2(req->rq_replen +
405                                       lustre_msg_early_size);
406         /* Estimate free space for DoM files in repbuf */
407         repsize_estimate = repsize - (req->rq_replen -
408                            mdt_md_capsule_size +
409                            sizeof(struct lov_comp_md_v1) +
410                            sizeof(struct lov_comp_md_entry_v1) +
411                            lov_mds_md_size(0, LOV_MAGIC_V3));
412
413         if (repsize_estimate < obd->u.cli.cl_dom_min_inline_repsize) {
414                 repsize = obd->u.cli.cl_dom_min_inline_repsize -
415                           repsize_estimate + sizeof(struct niobuf_remote);
416                 req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE,
417                                      RCL_SERVER,
418                                      sizeof(struct niobuf_remote) + repsize);
419                 ptlrpc_request_set_replen(req);
420                 CDEBUG(D_INFO, "Increase repbuf by %d bytes, total: %d\n",
421                        repsize, req->rq_replen);
422                 repsize = size_roundup_power2(req->rq_replen +
423                                               lustre_msg_early_size);
424         }
425         /* The only way to report real allocated repbuf size to the server
426          * is the lm_repsize but it must be set prior buffer allocation itself
427          * due to security reasons - it is part of buffer used in signature
428          * calculation (see LU-11414). Therefore the saved size is predicted
429          * value as rq_replen rounded to the next higher power of 2.
430          * Such estimation is safe. Though the final allocated buffer might
431          * be even larger, it is not possible to know that at this point.
432          */
433         req->rq_reqmsg->lm_repsize = repsize;
434         RETURN(req);
435
436 err_put_sepol:
437         sptlrpc_sepol_put(sepol);
438 err_free_rq:
439         ptlrpc_request_free(req);
440         return ERR_PTR(rc);
441 }
442
443 static struct ptlrpc_request *
444 mdc_intent_create_pack(struct obd_export *exp, struct lookup_intent *it,
445                        struct md_op_data *op_data, __u32 acl_bufsize,
446                        __u64 extra_lock_flags)
447 {
448         LIST_HEAD(cancels);
449         struct ptlrpc_request *req;
450         struct obd_device *obd = class_exp2obd(exp);
451         struct sptlrpc_sepol *sepol;
452         struct ldlm_intent *lit;
453         int count = 0;
454         int rc;
455
456         ENTRY;
457
458         if (fid_is_sane(&op_data->op_fid1))
459                 /* cancel parent's UPDATE lock. */
460                 count = mdc_resource_get_unused(exp, &op_data->op_fid1,
461                                                 &cancels, LCK_EX,
462                                                 MDS_INODELOCK_UPDATE);
463
464         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
465                                    &RQF_LDLM_INTENT_CREATE);
466         if (req == NULL) {
467                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
468                 RETURN(ERR_PTR(-ENOMEM));
469         }
470
471         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
472                              op_data->op_namelen + 1);
473         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
474                              RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
475                              strlen(op_data->op_file_secctx_name) + 1 : 0);
476         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
477                              op_data->op_file_secctx_size);
478         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
479                              op_data->op_data_size);
480         req_capsule_set_size(&req->rq_pill, &RMF_FILE_ENCCTX, RCL_CLIENT,
481                              op_data->op_file_encctx_size);
482
483         /* get SELinux policy info if any */
484         sepol = sptlrpc_sepol_get(req);
485         if (IS_ERR(sepol)) {
486                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
487                 GOTO(err_free_rq, rc = PTR_ERR(sepol));
488         }
489         req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
490                              sptlrpc_sepol_size(sepol));
491
492         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
493         if (rc < 0)
494                 GOTO(err_put_sepol, rc);
495
496         /* Pack the intent */
497         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
498         lit->opc = (__u64)it->it_op;
499
500         /* Pack the intent request. */
501         mdc_create_pack(&req->rq_pill, op_data, op_data->op_data,
502                         op_data->op_data_size, it->it_create_mode,
503                         op_data->op_fsuid, op_data->op_fsgid,
504                         op_data->op_cap, 0, sepol);
505
506         sptlrpc_sepol_put(sepol);
507
508         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
509                              obd->u.cli.cl_default_mds_easize);
510         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
511         req_capsule_set_size(&req->rq_pill, &RMF_DEFAULT_MDT_MD, RCL_SERVER,
512                              sizeof(struct lmv_user_md));
513         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
514                              RCL_SERVER, 0);
515         req_capsule_set_size(&req->rq_pill, &RMF_FILE_ENCCTX, RCL_SERVER, 0);
516
517         ptlrpc_request_set_replen(req);
518         RETURN(req);
519
520 err_put_sepol:
521         sptlrpc_sepol_put(sepol);
522 err_free_rq:
523         ptlrpc_request_free(req);
524         return ERR_PTR(rc);
525 }
526
527 #define GA_DEFAULT_EA_NAME_LEN   20
528 #define GA_DEFAULT_EA_VAL_LEN   250
529 #define GA_DEFAULT_EA_NUM        10
530
531 static struct ptlrpc_request *
532 mdc_intent_getxattr_pack(struct obd_export *exp, struct lookup_intent *it,
533                          struct md_op_data *op_data)
534 {
535         struct ptlrpc_request *req;
536         struct ldlm_intent *lit;
537         struct sptlrpc_sepol *sepol;
538         int rc, count = 0;
539         LIST_HEAD(cancels);
540         u32 ea_vals_buf_size = GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM;
541
542         ENTRY;
543         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
544                                         &RQF_LDLM_INTENT_GETXATTR);
545         if (req == NULL)
546                 RETURN(ERR_PTR(-ENOMEM));
547
548         /* get SELinux policy info if any */
549         sepol = sptlrpc_sepol_get(req);
550         if (IS_ERR(sepol))
551                 GOTO(err_free_rq, rc = PTR_ERR(sepol));
552
553         req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
554                              sptlrpc_sepol_size(sepol));
555
556         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
557         if (rc)
558                 GOTO(err_put_sepol, rc);
559
560         /* pack the intent */
561         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
562         lit->opc = IT_GETXATTR;
563         /* Message below is checked in sanity-selinux test_20d
564          * and sanity-sec test_49
565          */
566         CDEBUG(D_INFO, "%s: get xattrs for "DFID"\n",
567                exp->exp_obd->obd_name, PFID(&op_data->op_fid1));
568
569 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 0, 53, 0)
570         /* If the supplied buffer is too small then the server will return
571          * -ERANGE and llite will fallback to using non cached xattr
572          * operations. On servers before 2.10.1 a (non-cached) listxattr RPC
573          * for an orphan or dead file causes an oops. So let's try to avoid
574          * sending too small a buffer to too old a server. This is effectively
575          * undoing the memory conservation of LU-9417 when it would be *more*
576          * likely to crash the server. See LU-9856.
577          */
578         if (exp->exp_connect_data.ocd_version < OBD_OCD_VERSION(2, 10, 1, 0))
579                 ea_vals_buf_size = max_t(u32, ea_vals_buf_size,
580                                          exp->exp_connect_data.ocd_max_easize);
581 #endif
582
583         /* pack the intended request */
584         mdc_pack_body(&req->rq_pill, &op_data->op_fid1, op_data->op_valid,
585                       ea_vals_buf_size, -1, 0);
586
587         /* get SELinux policy info if any */
588         mdc_file_sepol_pack(&req->rq_pill, sepol);
589         sptlrpc_sepol_put(sepol);
590
591         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
592                              GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM);
593
594         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER,
595                              ea_vals_buf_size);
596
597         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS, RCL_SERVER,
598                              sizeof(u32) * GA_DEFAULT_EA_NUM);
599
600         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
601
602         ptlrpc_request_set_replen(req);
603
604         RETURN(req);
605
606 err_put_sepol:
607         sptlrpc_sepol_put(sepol);
608 err_free_rq:
609         ptlrpc_request_free(req);
610         RETURN(ERR_PTR(rc));
611 }
612
613 static struct ptlrpc_request *
614 mdc_intent_getattr_pack(struct obd_export *exp, struct lookup_intent *it,
615                         struct md_op_data *op_data, __u32 acl_bufsize)
616 {
617         struct ptlrpc_request *req;
618         struct obd_device *obd = class_exp2obd(exp);
619         u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE |
620                     OBD_MD_FLDIREA | OBD_MD_MEA | OBD_MD_FLACL |
621                     OBD_MD_DEFAULT_MEA;
622         struct ldlm_intent *lit;
623         __u32 easize;
624         bool have_secctx = false;
625         int rc;
626
627         ENTRY;
628         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
629                                    &RQF_LDLM_INTENT_GETATTR);
630         if (req == NULL)
631                 RETURN(ERR_PTR(-ENOMEM));
632
633         /* send name of security xattr to get upon intent */
634         if (it->it_op & (IT_LOOKUP | IT_GETATTR) &&
635             req_capsule_has_field(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
636                                   RCL_CLIENT) &&
637             op_data->op_file_secctx_name_size > 0 &&
638             op_data->op_file_secctx_name != NULL) {
639                 have_secctx = true;
640                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
641                                      RCL_CLIENT,
642                                      op_data->op_file_secctx_name_size);
643         }
644
645         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
646                              op_data->op_namelen + 1);
647
648         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
649         if (rc) {
650                 ptlrpc_request_free(req);
651                 RETURN(ERR_PTR(rc));
652         }
653
654         /* pack the intent */
655         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
656         lit->opc = (__u64)it->it_op;
657
658         if (obd->u.cli.cl_default_mds_easize > 0)
659                 easize = obd->u.cli.cl_default_mds_easize;
660         else
661                 easize = obd->u.cli.cl_max_mds_easize;
662
663         /* pack the intended request */
664         mdc_getattr_pack(&req->rq_pill, valid, it->it_open_flags, op_data,
665                          easize);
666
667         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
668         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
669         req_capsule_set_size(&req->rq_pill, &RMF_DEFAULT_MDT_MD, RCL_SERVER,
670                              sizeof(struct lmv_user_md));
671
672         if (have_secctx) {
673                 char *secctx_name;
674
675                 secctx_name = req_capsule_client_get(&req->rq_pill,
676                                                      &RMF_FILE_SECCTX_NAME);
677                 memcpy(secctx_name, op_data->op_file_secctx_name,
678                        op_data->op_file_secctx_name_size);
679
680                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
681                                      RCL_SERVER, easize);
682
683                 CDEBUG(D_SEC, "packed '%.*s' as security xattr name\n",
684                        op_data->op_file_secctx_name_size,
685                        op_data->op_file_secctx_name);
686         } else {
687                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
688                                      RCL_SERVER, 0);
689         }
690
691         if (exp_connect_encrypt(exp) && it->it_op & (IT_LOOKUP | IT_GETATTR))
692                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_ENCCTX,
693                                      RCL_SERVER, easize);
694         else
695                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_ENCCTX,
696                                      RCL_SERVER, 0);
697
698         ptlrpc_request_set_replen(req);
699         RETURN(req);
700 }
701
702 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
703                                                      struct lookup_intent *it,
704                                                      struct md_op_data *op_data)
705 {
706         struct obd_device *obd = class_exp2obd(exp);
707         struct ptlrpc_request *req;
708         struct ldlm_intent *lit;
709         struct layout_intent *layout;
710         LIST_HEAD(cancels);
711         int count = 0, rc;
712
713         ENTRY;
714         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
715                                 &RQF_LDLM_INTENT_LAYOUT);
716         if (req == NULL)
717                 RETURN(ERR_PTR(-ENOMEM));
718
719         if (fid_is_sane(&op_data->op_fid2) && (it->it_op & IT_LAYOUT) &&
720             (it->it_open_flags & FMODE_WRITE)) {
721                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
722                                                 &cancels, LCK_EX,
723                                                 MDS_INODELOCK_LAYOUT);
724         }
725
726         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
727         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
728         if (rc) {
729                 ptlrpc_request_free(req);
730                 RETURN(ERR_PTR(rc));
731         }
732
733         /* pack the intent */
734         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
735         lit->opc = (__u64)it->it_op;
736
737         /* pack the layout intent request */
738         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
739         LASSERT(op_data->op_data != NULL);
740         LASSERT(op_data->op_data_size == sizeof(*layout));
741         memcpy(layout, op_data->op_data, sizeof(*layout));
742
743         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
744                              obd->u.cli.cl_default_mds_easize);
745         ptlrpc_request_set_replen(req);
746         RETURN(req);
747 }
748
749 static struct ptlrpc_request *mdc_enqueue_pack(struct obd_export *exp,
750                                                int lvb_len)
751 {
752         struct ptlrpc_request *req;
753         int rc;
754
755         ENTRY;
756         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
757         if (req == NULL)
758                 RETURN(ERR_PTR(-ENOMEM));
759
760         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
761         if (rc) {
762                 ptlrpc_request_free(req);
763                 RETURN(ERR_PTR(rc));
764         }
765
766         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
767         ptlrpc_request_set_replen(req);
768         RETURN(req);
769 }
770
771 int mdc_finish_enqueue(struct obd_export *exp,
772                        struct req_capsule *pill,
773                        struct ldlm_enqueue_info *einfo,
774                        struct lookup_intent *it,
775                        struct lustre_handle *lockh, int rc)
776 {
777         struct ptlrpc_request *req = pill->rc_req;
778         struct ldlm_request *lockreq;
779         struct ldlm_reply *lockrep;
780         struct ldlm_lock *lock;
781         struct mdt_body *body = NULL;
782         void *lvb_data = NULL;
783         __u32 lvb_len = 0;
784
785         ENTRY;
786         LASSERT(rc >= 0);
787         /* Similarly, if we're going to replay this request, we don't want to
788          * actually get a lock, just perform the intent.
789          */
790         if (req->rq_transno || req->rq_replay) {
791                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
792                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
793         }
794
795         if (rc == ELDLM_LOCK_ABORTED) {
796                 einfo->ei_mode = 0;
797                 memset(lockh, 0, sizeof(*lockh));
798                 rc = 0;
799         } else { /* rc = 0 */
800                 lock = ldlm_handle2lock(lockh);
801                 LASSERT(lock != NULL);
802
803                 /* If server returned a different lock mode, fix up variables */
804                 if (lock->l_req_mode != einfo->ei_mode) {
805                         ldlm_lock_addref(lockh, lock->l_req_mode);
806                         ldlm_lock_decref(lockh, einfo->ei_mode);
807                         einfo->ei_mode = lock->l_req_mode;
808                 }
809                 LDLM_LOCK_PUT(lock);
810         }
811
812         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
813         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
814
815         it->it_disposition = (int)lockrep->lock_policy_res1;
816         it->it_status = (int)lockrep->lock_policy_res2;
817         it->it_lock_mode = einfo->ei_mode;
818         it->it_lock_handle = lockh->cookie;
819         it->it_request = req;
820
821         /* Technically speaking rq_transno must already be zero if
822          * it_status is in error, so the check is a bit redundant.
823          */
824         if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
825                 mdc_clear_replay_flag(req, it->it_status);
826
827         /* If we're doing an IT_OPEN which did not result in an actual
828          * successful open, then we need to remove the bit which saves
829          * this request for unconditional replay.
830          *
831          * It's important that we do this first!  Otherwise we might exit the
832          * function without doing so, and try to replay a failed create.
833          * (b=3440)
834          */
835         if (it->it_op & IT_OPEN && req->rq_replay &&
836             (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
837                 mdc_clear_replay_flag(req, it->it_status);
838
839         DEBUG_REQ(D_RPCTRACE, req, "op=%x disposition=%x, status=%d",
840                   it->it_op, it->it_disposition, it->it_status);
841
842         /* We know what to expect, so we do any byte flipping required here */
843         if (it_has_reply_body(it)) {
844                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
845                 if (body == NULL) {
846                         rc = -EPROTO;
847                         CERROR("%s: cannot swab mdt_body: rc = %d\n",
848                                exp->exp_obd->obd_name, rc);
849                         RETURN(rc);
850                 }
851
852                 if (it_disposition(it, DISP_OPEN_OPEN) &&
853                     !it_open_error(DISP_OPEN_OPEN, it)) {
854                         /*
855                          * If this is a successful OPEN request, we need to set
856                          * replay handler and data early, so that if replay
857                          * happens immediately after swabbing below, new reply
858                          * is swabbed by that handler correctly.
859                          */
860                         mdc_set_open_replay_data(NULL, NULL, it);
861                 }
862
863                 if (it_disposition(it, DISP_OPEN_CREATE) &&
864                     !it_open_error(DISP_OPEN_CREATE, it)) {
865                         lprocfs_counter_incr(exp->exp_obd->obd_md_stats,
866                                              LPROC_MD_CREATE);
867                 }
868
869                 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
870                         void *eadata;
871
872                         mdc_update_max_ea_from_body(exp, body);
873
874                         /*
875                          * The eadata is opaque; just check that it is there.
876                          * Eventually, obd_unpackmd() will check the contents.
877                          */
878                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
879                                                         body->mbo_eadatasize);
880                         if (eadata == NULL)
881                                 RETURN(-EPROTO);
882
883                         /* save LVB data and length if for layout lock */
884                         lvb_data = eadata;
885                         lvb_len = body->mbo_eadatasize;
886
887                         /*
888                          * We save the reply LOV EA in case we have to replay a
889                          * create for recovery.  If we didn't allocate a large
890                          * enough request buffer above we need to reallocate it
891                          * here to hold the actual LOV EA.
892                          *
893                          * To not save LOV EA if request is not going to replay
894                          * (for example error one).
895                          */
896                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
897                                 rc = mdc_save_lmm(req, eadata,
898                                                   body->mbo_eadatasize);
899                                 if (rc) {
900                                         body->mbo_valid &= ~OBD_MD_FLEASIZE;
901                                         body->mbo_eadatasize = 0;
902                                         rc = 0;
903                                 }
904                         }
905                 }
906         } else if (it->it_op & IT_LAYOUT) {
907                 /* maybe the lock was granted right away and layout
908                  * is packed into RMF_DLM_LVB of req
909                  */
910                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
911                 CDEBUG(D_INFO, "%s: layout return lvb %d transno %lld\n",
912                        class_exp2obd(exp)->obd_name, lvb_len, req->rq_transno);
913                 if (lvb_len > 0) {
914                         lvb_data = req_capsule_server_sized_get(pill,
915                                                         &RMF_DLM_LVB, lvb_len);
916                         if (lvb_data == NULL)
917                                 RETURN(-EPROTO);
918
919                         /**
920                          * save replied layout data to the request buffer for
921                          * recovery consideration (lest MDS reinitialize
922                          * another set of OST objects).
923                          */
924                         if (req->rq_transno)
925                                 mdc_save_lmm(req, lvb_data, lvb_len);
926                 }
927         }
928
929         /* fill in stripe data for layout lock.
930          * LU-6581: trust layout data only if layout lock is granted. The MDT
931          * has stopped sending layout unless the layout lock is granted. The
932          * client still does this checking in case it's talking with an old
933          * server. - Jinshan
934          */
935         lock = ldlm_handle2lock(lockh);
936         if (lock == NULL)
937                 RETURN(rc);
938
939         if (ldlm_has_layout(lock) && lvb_data != NULL &&
940             !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
941                 void *lmm;
942
943                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
944                         ldlm_it2str(it->it_op), lvb_len);
945
946                 OBD_ALLOC_LARGE(lmm, lvb_len);
947                 if (lmm == NULL)
948                         GOTO(out_lock, rc = -ENOMEM);
949
950                 memcpy(lmm, lvb_data, lvb_len);
951
952                 /* install lvb_data */
953                 lock_res_and_lock(lock);
954                 if (lock->l_lvb_data == NULL) {
955                         lock->l_lvb_type = LVB_T_LAYOUT;
956                         lock->l_lvb_data = lmm;
957                         lock->l_lvb_len = lvb_len;
958                         lmm = NULL;
959                 }
960                 unlock_res_and_lock(lock);
961                 if (lmm != NULL)
962                         OBD_FREE_LARGE(lmm, lvb_len);
963         }
964
965         if (ldlm_has_dom(lock)) {
966                 LASSERT(lock->l_glimpse_ast == mdc_ldlm_glimpse_ast);
967
968                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
969                 if (!(body->mbo_valid & OBD_MD_DOM_SIZE)) {
970                         LDLM_ERROR(lock, "%s: DoM lock without size.",
971                                    exp->exp_obd->obd_name);
972                         GOTO(out_lock, rc = -EPROTO);
973                 }
974
975                 LDLM_DEBUG(lock, "DoM lock is returned by: %s, size: %llu",
976                            ldlm_it2str(it->it_op), body->mbo_dom_size);
977
978                 lock_res_and_lock(lock);
979                 mdc_body2lvb(body, &lock->l_ost_lvb);
980                 ldlm_lock_allow_match_locked(lock);
981                 unlock_res_and_lock(lock);
982         }
983 out_lock:
984         LDLM_LOCK_PUT(lock);
985
986         RETURN(rc);
987 }
988
989 static inline bool mdc_skip_mod_rpc_slot(const struct lookup_intent *it)
990 {
991         if (it != NULL &&
992             (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
993              it->it_op == IT_READDIR || it->it_op == IT_GETXATTR ||
994              (it->it_op == IT_LAYOUT && !(it->it_open_flags &
995                                           MDS_FMODE_WRITE))))
996                 return true;
997         return false;
998 }
999
1000 /* We always reserve enough space in the reply packet for a stripe MD, because
1001  * we don't know in advance the file type.
1002  */
1003 static int mdc_enqueue_base(struct obd_export *exp,
1004                             struct ldlm_enqueue_info *einfo,
1005                             const union ldlm_policy_data *policy,
1006                             struct lookup_intent *it,
1007                             struct md_op_data *op_data,
1008                             struct lustre_handle *lockh,
1009                             __u64 extra_lock_flags)
1010 {
1011         struct obd_device *obd = class_exp2obd(exp);
1012         struct ptlrpc_request *req;
1013         __u64 flags, saved_flags = extra_lock_flags;
1014         struct ldlm_res_id res_id;
1015         static const union ldlm_policy_data lookup_policy = {
1016                                   .l_inodebits = { MDS_INODELOCK_LOOKUP } };
1017         static const union ldlm_policy_data update_policy = {
1018                                   .l_inodebits = { MDS_INODELOCK_UPDATE } };
1019         static const union ldlm_policy_data layout_policy = {
1020                                   .l_inodebits = { MDS_INODELOCK_LAYOUT } };
1021         static const union ldlm_policy_data getxattr_policy = {
1022                                   .l_inodebits = { MDS_INODELOCK_XATTR } };
1023         int generation, resends = 0;
1024         struct ldlm_reply *lockrep;
1025         struct obd_import *imp = class_exp2cliimp(exp);
1026         __u32 acl_bufsize;
1027         enum lvb_type lvb_type = 0;
1028         int rc;
1029
1030         ENTRY;
1031         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
1032                  einfo->ei_type);
1033         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1034
1035         if (it != NULL) {
1036                 LASSERT(policy == NULL);
1037
1038                 saved_flags |= LDLM_FL_HAS_INTENT;
1039                 if (it->it_op & (IT_GETATTR | IT_READDIR | IT_CREAT))
1040                         policy = &update_policy;
1041                 else if (it->it_op & IT_LAYOUT)
1042                         policy = &layout_policy;
1043                 else if (it->it_op & IT_GETXATTR)
1044                         policy = &getxattr_policy;
1045                 else
1046                         policy = &lookup_policy;
1047         }
1048
1049         generation = obd->u.cli.cl_import->imp_generation;
1050         if (!it || (it->it_op & (IT_OPEN | IT_CREAT)))
1051                 acl_bufsize = min_t(__u32, imp->imp_connect_data.ocd_max_easize,
1052                                     XATTR_SIZE_MAX);
1053         else
1054                 acl_bufsize = LUSTRE_POSIX_ACL_MAX_SIZE_OLD;
1055
1056 resend:
1057         flags = saved_flags;
1058         if (it == NULL) {
1059                 /* The only way right now is FLOCK. */
1060                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
1061                          einfo->ei_type);
1062                 res_id.name[3] = LDLM_FLOCK;
1063                 req = ldlm_enqueue_pack(exp, 0);
1064         } else if (it->it_op & IT_OPEN) {
1065                 req = mdc_intent_open_pack(exp, it, op_data, acl_bufsize);
1066         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
1067                 req = mdc_intent_getattr_pack(exp, it, op_data, acl_bufsize);
1068         } else if (it->it_op & IT_READDIR) {
1069                 req = mdc_enqueue_pack(exp, 0);
1070         } else if (it->it_op & IT_LAYOUT) {
1071                 if (!imp_connect_lvb_type(imp))
1072                         RETURN(-EOPNOTSUPP);
1073                 req = mdc_intent_layout_pack(exp, it, op_data);
1074                 lvb_type = LVB_T_LAYOUT;
1075         } else if (it->it_op & IT_GETXATTR) {
1076                 req = mdc_intent_getxattr_pack(exp, it, op_data);
1077         } else if (it->it_op == IT_CREAT) {
1078                 req = mdc_intent_create_pack(exp, it, op_data, acl_bufsize,
1079                                              extra_lock_flags);
1080         } else {
1081                 LBUG();
1082                 RETURN(-EINVAL);
1083         }
1084
1085         if (IS_ERR(req))
1086                 RETURN(PTR_ERR(req));
1087
1088         if (resends) {
1089                 req->rq_generation_set = 1;
1090                 req->rq_import_generation = generation;
1091                 req->rq_sent = ktime_get_real_seconds() + resends;
1092         }
1093
1094         einfo->ei_req_slot = !(op_data->op_cli_flags & CLI_NO_SLOT);
1095         einfo->ei_mod_slot = !mdc_skip_mod_rpc_slot(it);
1096
1097         /* With Data-on-MDT the glimpse callback is needed too.
1098          * It is set here in advance but not in mdc_finish_enqueue()
1099          * to avoid possible races. It is safe to have glimpse handler
1100          * for non-DOM locks and costs nothing.
1101          */
1102         if (einfo->ei_cb_gl == NULL)
1103                 einfo->ei_cb_gl = mdc_ldlm_glimpse_ast;
1104
1105         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
1106                               0, lvb_type, lockh, 0);
1107
1108         if (!it) {
1109                 /* For flock requests we immediatelly return without further
1110                  * delay and let caller deal with the rest, since rest of
1111                  * this function metadata processing makes no sense for flock
1112                  * requests anyway. But in case of problem during comms with
1113                  * server (-ETIMEDOUT) or any signal/kill attempt (-EINTR),
1114                  * we cannot rely on caller and this mainly for F_UNLCKs
1115                  * (explicits or automatically generated by kernel to clean
1116                  * current flocks upon exit) that can't be trashed.
1117                  */
1118                 ptlrpc_req_put(req);
1119                 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
1120                     (einfo->ei_type == LDLM_FLOCK) &&
1121                     (einfo->ei_mode == LCK_NL))
1122                         goto resend;
1123                 RETURN(rc);
1124         }
1125
1126         if (rc < 0) {
1127                 CDEBUG(D_INFO,
1128                       "%s: ldlm_cli_enqueue "DFID":"DFID"=%s failed: rc = %d\n",
1129                       obd->obd_name, PFID(&op_data->op_fid1),
1130                       PFID(&op_data->op_fid2), op_data->op_name ?: "", rc);
1131
1132                 mdc_clear_replay_flag(req, rc);
1133                 ptlrpc_req_put(req);
1134                 RETURN(rc);
1135         }
1136
1137         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1138         LASSERT(lockrep != NULL);
1139
1140         lockrep->lock_policy_res2 =
1141                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1142
1143         /* Retry infinitely when the server returns -EINPROGRESS for the
1144          * intent operation, when server returns -EINPROGRESS for acquiring
1145          * intent lock, we'll retry in after_reply().
1146          */
1147         if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
1148                 mdc_clear_replay_flag(req, rc);
1149                 ptlrpc_req_put(req);
1150                 if (generation == obd->u.cli.cl_import->imp_generation) {
1151                         if (signal_pending(current))
1152                                 RETURN(-EINTR);
1153
1154                         resends++;
1155                         CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
1156                                obd->obd_name, resends, it->it_op,
1157                                PFID(&op_data->op_fid1),
1158                                PFID(&op_data->op_fid2));
1159                         goto resend;
1160                 } else {
1161                         CDEBUG(D_HA, "resend cross eviction\n");
1162                         RETURN(-EIO);
1163                 }
1164         }
1165
1166         if ((int)lockrep->lock_policy_res2 == -ERANGE &&
1167             it->it_op & (IT_OPEN | IT_GETATTR | IT_LOOKUP) &&
1168             acl_bufsize == LUSTRE_POSIX_ACL_MAX_SIZE_OLD) {
1169                 mdc_clear_replay_flag(req, -ERANGE);
1170                 ptlrpc_req_put(req);
1171                 acl_bufsize = min_t(__u32, imp->imp_connect_data.ocd_max_easize,
1172                                     XATTR_SIZE_MAX);
1173                 goto resend;
1174         }
1175
1176         rc = mdc_finish_enqueue(exp, &req->rq_pill, einfo, it, lockh, rc);
1177         if (rc < 0) {
1178                 if (lustre_handle_is_used(lockh)) {
1179                         ldlm_lock_decref(lockh, einfo->ei_mode);
1180                         memset(lockh, 0, sizeof(*lockh));
1181                 }
1182                 ptlrpc_req_put(req);
1183
1184                 it->it_lock_handle = 0;
1185                 it->it_lock_mode = 0;
1186                 it->it_request = NULL;
1187         }
1188
1189         RETURN(rc);
1190 }
1191
1192 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
1193                 const union ldlm_policy_data *policy,
1194                 struct md_op_data *op_data,
1195                 struct lustre_handle *lockh, __u64 extra_lock_flags)
1196 {
1197         return mdc_enqueue_base(exp, einfo, policy, NULL,
1198                                 op_data, lockh, extra_lock_flags);
1199 }
1200
1201 static int mdc_finish_intent_lock(struct obd_export *exp,
1202                                   struct ptlrpc_request *request,
1203                                   struct md_op_data *op_data,
1204                                   struct lookup_intent *it,
1205                                   struct lustre_handle *lockh)
1206 {
1207         struct lustre_handle old_lock;
1208         struct ldlm_lock *lock;
1209         int rc = 0;
1210
1211         ENTRY;
1212         LASSERT(request != NULL);
1213         LASSERT(request != LP_POISON);
1214         LASSERT(request->rq_repmsg != LP_POISON);
1215
1216         if (it->it_op & IT_READDIR)
1217                 RETURN(0);
1218
1219         if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) {
1220                 if (it->it_status != 0)
1221                         GOTO(out, rc = it->it_status);
1222         } else {
1223                 if (!it_disposition(it, DISP_IT_EXECD)) {
1224                         /* The server failed before it even started executing
1225                          * the intent, i.e. because it couldn't unpack the
1226                          * request.
1227                          */
1228                         LASSERT(it->it_status != 0);
1229                         GOTO(out, rc = it->it_status);
1230                 }
1231                 rc = it_open_error(DISP_IT_EXECD, it);
1232                 if (rc)
1233                         GOTO(out, rc);
1234
1235                 rc = it_open_error(DISP_LOOKUP_EXECD, it);
1236                 if (rc)
1237                         GOTO(out, rc);
1238
1239                 /* keep requests around for the multiple phases of the call
1240                  * this shows the DISP_XX must guarantee we make it into the
1241                  * call
1242                  */
1243                 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
1244                     it_disposition(it, DISP_OPEN_CREATE) &&
1245                     !it_open_error(DISP_OPEN_CREATE, it)) {
1246                         it_set_disposition(it, DISP_ENQ_CREATE_REF);
1247                         /* balanced in ll_create_node */
1248                         ptlrpc_request_addref(request);
1249                 }
1250                 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1251                     it_disposition(it, DISP_OPEN_OPEN) &&
1252                     !it_open_error(DISP_OPEN_OPEN, it)) {
1253                         it_set_disposition(it, DISP_ENQ_OPEN_REF);
1254                         /* balanced in ll_file_open */
1255                         ptlrpc_request_addref(request);
1256                         /* eviction in middle of open RPC processing b=11546 */
1257                         CFS_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE,
1258                                          obd_timeout);
1259                 }
1260
1261                 if (it->it_op & IT_CREAT) {
1262                         /* XXX this belongs in ll_create_it */
1263                 } else if (it->it_op == IT_OPEN) {
1264                         LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1265                 } else {
1266                         LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
1267                 }
1268         }
1269
1270         /* If we already have a matching lock, then cancel the new
1271          * one.  We have to set the data here instead of in
1272          * mdc_enqueue, because we need to use the child's inode as
1273          * the l_ast_data to match, and that's not available until
1274          * intent_finish has performed the iget().
1275          */
1276         lock = ldlm_handle2lock(lockh);
1277         if (lock) {
1278                 union ldlm_policy_data policy = lock->l_policy_data;
1279
1280                 LDLM_DEBUG(lock, "matching against this");
1281
1282                 if (it_has_reply_body(it)) {
1283                         struct mdt_body *body;
1284
1285                         body = req_capsule_server_get(&request->rq_pill,
1286                                                       &RMF_MDT_BODY);
1287                         /* mdc_enqueue checked */
1288                         LASSERT(body != NULL);
1289                         LASSERTF(fid_res_name_eq(&body->mbo_fid1,
1290                                                  &lock->l_resource->lr_name),
1291                                  "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1292                                  PLDLMRES(lock->l_resource),
1293                                  PFID(&body->mbo_fid1));
1294                 }
1295                 LDLM_LOCK_PUT(lock);
1296
1297                 memcpy(&old_lock, lockh, sizeof(*lockh));
1298                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1299                                    LDLM_IBITS, &policy, LCK_NL, &old_lock)) {
1300                         ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
1301                         memcpy(lockh, &old_lock, sizeof(old_lock));
1302                         it->it_lock_handle = lockh->cookie;
1303                 }
1304         }
1305
1306         EXIT;
1307 out:
1308         CDEBUG(D_DENTRY,
1309                "D_IT dentry=%.*s intent=%s status=%d disp=%x: rc = %d\n",
1310                 (int)op_data->op_namelen, op_data->op_name,
1311                 ldlm_it2str(it->it_op), it->it_status, it->it_disposition, rc);
1312
1313         return rc;
1314 }
1315
1316 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1317                         struct lu_fid *fid, __u64 *bits)
1318 {
1319         /* We could just return 1 immediately, but as we should only be called
1320          * in revalidate_it if we already have a lock, let's verify that.
1321          */
1322         struct ldlm_res_id res_id;
1323         struct lustre_handle lockh;
1324         union ldlm_policy_data policy;
1325         enum ldlm_mode mode;
1326
1327         ENTRY;
1328         if (it->it_lock_handle) {
1329                 lockh.cookie = it->it_lock_handle;
1330                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1331         } else {
1332                 fid_build_reg_res_name(fid, &res_id);
1333                 switch (it->it_op) {
1334                 case IT_GETATTR:
1335                         /* File attributes are held under multiple bits:
1336                          * nlink is under lookup lock, size and times are
1337                          * under UPDATE lock and recently we've also got
1338                          * a separate permissions lock for owner/group/acl that
1339                          * were protected by lookup lock before.
1340                          * Getattr must provide all of that information,
1341                          * so we need to ensure we have all of those locks.
1342                          * Unfortunately, if the bits are split across multiple
1343                          * locks, there's no easy way to match all of them here,
1344                          * so an extra RPC would be performed to fetch all
1345                          * of those bits at once for now.
1346                          */
1347                         /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1348                          * but for old MDTs (< 2.4), permission is covered
1349                          * by LOOKUP lock, so it needs to match all bits here.
1350                          */
1351                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1352                                                   MDS_INODELOCK_PERM;
1353                         break;
1354                 case IT_READDIR:
1355                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1356                         break;
1357                 case IT_LAYOUT:
1358                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1359                         break;
1360                 default:
1361                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1362                         break;
1363                 }
1364
1365                 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1366                                       LDLM_IBITS, &policy,
1367                                       LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1368                                       &lockh);
1369         }
1370
1371         if (mode) {
1372                 it->it_lock_handle = lockh.cookie;
1373                 it->it_lock_mode = mode;
1374         } else {
1375                 it->it_lock_handle = 0;
1376                 it->it_lock_mode = 0;
1377         }
1378
1379         RETURN(!!mode);
1380 }
1381
1382 /*
1383  * This long block is all about fixing up the lock and request state
1384  * so that it is correct as of the moment _before_ the operation was
1385  * applied; that way, the VFS will think that everything is normal and
1386  * call Lustre's regular VFS methods.
1387  *
1388  * If we're performing a creation, that means that unless the creation
1389  * failed with EEXIST, we should fake up a negative dentry.
1390  *
1391  * For everything else, we want the lookup to succeed.
1392  *
1393  * One additional note: if CREATE or OPEN succeeded, we add an extra
1394  * reference to the request because we need to keep it around until
1395  * ll_create/ll_open gets called.
1396  *
1397  * The server will return to us, in it_disposition, an indication of
1398  * exactly what it_status refers to.
1399  *
1400  * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1401  * otherwise if DISP_OPEN_CREATE is set, then it_status is the
1402  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1403  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1404  * was successful.
1405  *
1406  * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1407  * child lookup.
1408  */
1409 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1410                     struct lookup_intent *it, struct ptlrpc_request **reqp,
1411                     ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1412 {
1413         struct ldlm_enqueue_info einfo = {
1414                 .ei_type        = LDLM_IBITS,
1415                 .ei_mode        = it_to_lock_mode(it),
1416                 .ei_cb_bl       = cb_blocking,
1417                 .ei_cb_cp       = ldlm_completion_ast,
1418                 .ei_cb_gl       = mdc_ldlm_glimpse_ast,
1419         };
1420         struct lustre_handle lockh;
1421         int rc = 0;
1422
1423         ENTRY;
1424         LASSERT(it);
1425         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1426                 ", intent: %s flags %#lo\n", (int)op_data->op_namelen,
1427                 op_data->op_name, PFID(&op_data->op_fid2),
1428                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1429                 it->it_open_flags);
1430
1431         lockh.cookie = 0;
1432         /* MDS_FID_OP is not a revalidate case */
1433         if (fid_is_sane(&op_data->op_fid2) &&
1434             (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR)) &&
1435             !(op_data->op_bias & MDS_FID_OP)) {
1436                 /* We could just return 1 immediately, but since we should only
1437                  * be called in revalidate_it if we already have a lock, let's
1438                  * verify that.
1439                  */
1440                 it->it_lock_handle = 0;
1441                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1442                 /* Only return failure if it was not GETATTR by cfid
1443                  * (from inode_revalidate()).
1444                  */
1445                 if (rc || op_data->op_namelen != 0)
1446                         RETURN(rc);
1447         }
1448
1449         /* For case if upper layer did not alloc fid, do it now. */
1450         if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1451                 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1452                 if (rc < 0) {
1453                         CERROR("%s: cannot allocate new FID: rc=%d\n",
1454                                exp->exp_obd->obd_name, rc);
1455                         RETURN(rc);
1456                 }
1457         }
1458
1459         rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh,
1460                               extra_lock_flags);
1461         if (rc < 0)
1462                 RETURN(rc);
1463
1464         *reqp = it->it_request;
1465         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1466         RETURN(rc);
1467 }
1468
1469 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1470                                               struct ptlrpc_request *req,
1471                                               void *args, int rc)
1472 {
1473         struct mdc_getattr_args *ga = args;
1474         struct obd_export *exp = ga->ga_exp;
1475         struct md_op_item *item = ga->ga_item;
1476         struct ldlm_enqueue_info *einfo = &item->mop_einfo;
1477         struct lookup_intent *it = &item->mop_it;
1478         struct lustre_handle *lockh = &item->mop_lockh;
1479         struct req_capsule *pill = &req->rq_pill;
1480         struct ldlm_reply *lockrep;
1481         __u64 flags = LDLM_FL_HAS_INTENT;
1482
1483         ENTRY;
1484         if (CFS_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1485                 rc = -ETIMEDOUT;
1486
1487         rc = ldlm_cli_enqueue_fini(exp, pill, einfo, 1, &flags, NULL, 0,
1488                                    lockh, rc, true);
1489         if (rc < 0) {
1490                 CERROR("%s: ldlm_cli_enqueue_fini() failed: rc = %d\n",
1491                        exp->exp_obd->obd_name, rc);
1492                 mdc_clear_replay_flag(req, rc);
1493                 GOTO(out, rc);
1494         }
1495
1496         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
1497         LASSERT(lockrep != NULL);
1498
1499         lockrep->lock_policy_res2 =
1500                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1501
1502         rc = mdc_finish_enqueue(exp, pill, einfo, it, lockh, rc);
1503         if (rc)
1504                 GOTO(out, rc);
1505
1506         rc = mdc_finish_intent_lock(exp, req, &item->mop_data, it, lockh);
1507         EXIT;
1508
1509 out:
1510         item->mop_pill = pill;
1511         item->mop_cb(item, rc);
1512         return 0;
1513 }
1514
1515 int mdc_intent_getattr_async(struct obd_export *exp,
1516                              struct md_op_item *item)
1517 {
1518         struct md_op_data *op_data = &item->mop_data;
1519         struct lookup_intent *it = &item->mop_it;
1520         struct ptlrpc_request *req;
1521         struct mdc_getattr_args *ga;
1522         struct ldlm_res_id res_id;
1523         union ldlm_policy_data policy = {
1524                 .l_inodebits = { MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE }
1525         };
1526         __u64 flags = LDLM_FL_HAS_INTENT;
1527         int rc = 0;
1528
1529         ENTRY;
1530         CDEBUG(D_DLMTRACE,
1531                "name: %.*s in inode "DFID", intent: %s flags %#lo\n",
1532                (int)op_data->op_namelen, op_data->op_name,
1533                PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1534                it->it_open_flags);
1535
1536         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1537         /* If the MDT return -ERANGE because of large ACL, then the sponsor
1538          * of the async getattr RPC will handle that by itself.
1539          */
1540         req = mdc_intent_getattr_pack(exp, it, op_data,
1541                                       LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
1542         if (IS_ERR(req))
1543                 RETURN(PTR_ERR(req));
1544
1545         /* With Data-on-MDT the glimpse callback is needed too.
1546          * It is set here in advance but not in mdc_finish_enqueue()
1547          * to avoid possible races. It is safe to have glimpse handler
1548          * for non-DOM locks and costs nothing.
1549          */
1550         if (item->mop_einfo.ei_cb_gl == NULL)
1551                 item->mop_einfo.ei_cb_gl = mdc_ldlm_glimpse_ast;
1552
1553         rc = ldlm_cli_enqueue(exp, &req, &item->mop_einfo, &res_id, &policy,
1554                               &flags, NULL, 0, LVB_T_NONE, &item->mop_lockh, 1);
1555         if (rc < 0) {
1556                 ptlrpc_req_put(req);
1557                 RETURN(rc);
1558         }
1559
1560         ga = ptlrpc_req_async_args(ga, req);
1561         ga->ga_exp = exp;
1562         ga->ga_item = item;
1563
1564         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1565         ptlrpcd_add_req(req);
1566
1567         RETURN(0);
1568 }