Whamcloud - gitweb
LU-12040 mdc: reset lmm->lmm_stripe_offset in mdc_save_lovea
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_MDC
34
35 #include <linux/module.h>
36
37 #include <obd.h>
38 #include <obd_class.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_intent.h>
42 #include <lustre_mdc.h>
43 #include <lustre_net.h>
44 #include <lustre_req_layout.h>
45 #include <lustre_swab.h>
46 #include <lustre_acl.h>
47
48 #include "mdc_internal.h"
49
50 struct mdc_getattr_args {
51         struct obd_export               *ga_exp;
52         struct md_enqueue_info          *ga_minfo;
53 };
54
55 int it_open_error(int phase, struct lookup_intent *it)
56 {
57         if (it_disposition(it, DISP_OPEN_LEASE)) {
58                 if (phase >= DISP_OPEN_LEASE)
59                         return it->it_status;
60                 else
61                         return 0;
62         }
63         if (it_disposition(it, DISP_OPEN_OPEN)) {
64                 if (phase >= DISP_OPEN_OPEN)
65                         return it->it_status;
66                 else
67                         return 0;
68         }
69
70         if (it_disposition(it, DISP_OPEN_CREATE)) {
71                 if (phase >= DISP_OPEN_CREATE)
72                         return it->it_status;
73                 else
74                         return 0;
75         }
76
77         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
78                 if (phase >= DISP_LOOKUP_EXECD)
79                         return it->it_status;
80                 else
81                         return 0;
82         }
83
84         if (it_disposition(it, DISP_IT_EXECD)) {
85                 if (phase >= DISP_IT_EXECD)
86                         return it->it_status;
87                 else
88                         return 0;
89         }
90
91         CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
92         LBUG();
93
94         return 0;
95 }
96 EXPORT_SYMBOL(it_open_error);
97
98 /* this must be called on a lockh that is known to have a referenced lock */
99 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
100                       void *data, __u64 *bits)
101 {
102         struct ldlm_lock *lock;
103         struct inode *new_inode = data;
104         ENTRY;
105
106         if(bits)
107                 *bits = 0;
108
109         if (!lustre_handle_is_used(lockh))
110                 RETURN(0);
111
112         lock = ldlm_handle2lock(lockh);
113
114         LASSERT(lock != NULL);
115         lock_res_and_lock(lock);
116         if (lock->l_resource->lr_lvb_inode &&
117             lock->l_resource->lr_lvb_inode != data) {
118                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
119                 LASSERTF(old_inode->i_state & I_FREEING,
120                          "Found existing inode %p/%lu/%u state %lu in lock: "
121                          "setting data to %p/%lu/%u\n", old_inode,
122                          old_inode->i_ino, old_inode->i_generation,
123                          old_inode->i_state,
124                          new_inode, new_inode->i_ino, new_inode->i_generation);
125         }
126         lock->l_resource->lr_lvb_inode = new_inode;
127         if (bits)
128                 *bits = lock->l_policy_data.l_inodebits.bits;
129
130         unlock_res_and_lock(lock);
131         LDLM_LOCK_PUT(lock);
132
133         RETURN(0);
134 }
135
136 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
137                               const struct lu_fid *fid, enum ldlm_type type,
138                               union ldlm_policy_data *policy,
139                               enum ldlm_mode mode, struct lustre_handle *lockh)
140 {
141         struct ldlm_res_id res_id;
142         enum ldlm_mode rc;
143         ENTRY;
144
145         fid_build_reg_res_name(fid, &res_id);
146         /* LU-4405: Clear bits not supported by server */
147         policy->l_inodebits.bits &= exp_connect_ibits(exp);
148         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
149                              &res_id, type, policy, mode, lockh, 0);
150         RETURN(rc);
151 }
152
153 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
154                       union ldlm_policy_data *policy, enum ldlm_mode mode,
155                       enum ldlm_cancel_flags flags, void *opaque)
156 {
157         struct obd_device *obd = class_exp2obd(exp);
158         struct ldlm_res_id res_id;
159         int rc;
160
161         ENTRY;
162
163         fid_build_reg_res_name(fid, &res_id);
164         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
165                                              policy, mode, flags, opaque);
166         RETURN(rc);
167 }
168
169 int mdc_null_inode(struct obd_export *exp,
170                    const struct lu_fid *fid)
171 {
172         struct ldlm_res_id res_id;
173         struct ldlm_resource *res;
174         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
175         ENTRY;
176
177         LASSERTF(ns != NULL, "no namespace passed\n");
178
179         fid_build_reg_res_name(fid, &res_id);
180
181         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
182         if (IS_ERR(res))
183                 RETURN(0);
184
185         lock_res(res);
186         res->lr_lvb_inode = NULL;
187         unlock_res(res);
188
189         ldlm_resource_putref(res);
190         RETURN(0);
191 }
192
193 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
194 {
195         /* Don't hold error requests for replay. */
196         if (req->rq_replay) {
197                 spin_lock(&req->rq_lock);
198                 req->rq_replay = 0;
199                 spin_unlock(&req->rq_lock);
200         }
201         if (rc && req->rq_transno != 0) {
202                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
203                 LBUG();
204         }
205 }
206
207 /* Save a large LOV EA into the request buffer so that it is available
208  * for replay.  We don't do this in the initial request because the
209  * original request doesn't need this buffer (at most it sends just the
210  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
211  * buffer and may also be difficult to allocate and save a very large
212  * request buffer for each open. (bug 5707)
213  *
214  * OOM here may cause recovery failure if lmm is needed (only for the
215  * original open if the MDS crashed just when this client also OOM'd)
216  * but this is incredibly unlikely, and questionable whether the client
217  * could do MDS recovery under OOM anyways... */
218 int mdc_save_lovea(struct ptlrpc_request *req,
219                    const struct req_msg_field *field,
220                    void *data, u32 size)
221 {
222         struct req_capsule *pill = &req->rq_pill;
223         struct lov_user_md *lmm;
224         int rc = 0;
225
226         if (req_capsule_get_size(pill, field, RCL_CLIENT) < size) {
227                 rc = sptlrpc_cli_enlarge_reqbuf(req, field, size);
228                 if (rc) {
229                         CERROR("%s: Can't enlarge ea size to %d: rc = %d\n",
230                                req->rq_export->exp_obd->obd_name,
231                                size, rc);
232                         return rc;
233                 }
234         } else {
235                 req_capsule_shrink(pill, field, size, RCL_CLIENT);
236         }
237
238         req_capsule_set_size(pill, field, RCL_CLIENT, size);
239         lmm = req_capsule_client_get(pill, field);
240         if (lmm) {
241                 memcpy(lmm, data, size);
242                 /* overwrite layout generation returned from the MDS */
243                 lmm->lmm_stripe_offset =
244                   (typeof(lmm->lmm_stripe_offset))LOV_OFFSET_DEFAULT;
245         }
246
247         return rc;
248 }
249
250 static struct ptlrpc_request *
251 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
252                      struct md_op_data *op_data, __u32 acl_bufsize)
253 {
254         struct ptlrpc_request   *req;
255         struct obd_device       *obddev = class_exp2obd(exp);
256         struct ldlm_intent      *lit;
257         const void              *lmm = op_data->op_data;
258         __u32                    lmmsize = op_data->op_data_size;
259         struct list_head         cancels = LIST_HEAD_INIT(cancels);
260         int                      count = 0;
261         enum ldlm_mode           mode;
262         int                      rc;
263         int repsize, repsize_estimate;
264
265         ENTRY;
266
267         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
268
269         /* XXX: openlock is not cancelled for cross-refs. */
270         /* If inode is known, cancel conflicting OPEN locks. */
271         if (fid_is_sane(&op_data->op_fid2)) {
272                 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
273                         if (it->it_flags & MDS_FMODE_WRITE)
274                                 mode = LCK_EX;
275                         else
276                                 mode = LCK_PR;
277                 } else {
278                         if (it->it_flags & (MDS_FMODE_WRITE | MDS_OPEN_TRUNC))
279                                 mode = LCK_CW;
280 #ifdef FMODE_EXEC
281                         else if (it->it_flags & FMODE_EXEC)
282                                 mode = LCK_PR;
283 #endif
284                         else
285                                 mode = LCK_CR;
286                 }
287                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
288                                                 &cancels, mode,
289                                                 MDS_INODELOCK_OPEN);
290         }
291
292         /* If CREATE, cancel parent's UPDATE lock. */
293         if (it->it_op & IT_CREAT)
294                 mode = LCK_EX;
295         else
296                 mode = LCK_CR;
297         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
298                                          &cancels, mode,
299                                          MDS_INODELOCK_UPDATE);
300
301         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
302                                    &RQF_LDLM_INTENT_OPEN);
303         if (req == NULL) {
304                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
305                 RETURN(ERR_PTR(-ENOMEM));
306         }
307
308         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
309                              op_data->op_namelen + 1);
310         if (cl_is_lov_delay_create(it->it_flags)) {
311                 /* open(O_LOV_DELAY_CREATE) won't pack lmm */
312                 LASSERT(lmmsize == 0);
313                 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
314         } else {
315                 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
316                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
317         }
318
319         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
320                              RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
321                              op_data->op_file_secctx_name_size : 0);
322
323         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
324                              op_data->op_file_secctx_size);
325
326         /* get SELinux policy info if any */
327         rc = sptlrpc_get_sepol(req);
328         if (rc < 0) {
329                 ptlrpc_request_free(req);
330                 RETURN(ERR_PTR(rc));
331         }
332         req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
333                              strlen(req->rq_sepol) ?
334                              strlen(req->rq_sepol) + 1 : 0);
335
336         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
337         if (rc < 0) {
338                 ptlrpc_request_free(req);
339                 RETURN(ERR_PTR(rc));
340         }
341
342         spin_lock(&req->rq_lock);
343         req->rq_replay = req->rq_import->imp_replayable;
344         spin_unlock(&req->rq_lock);
345
346         /* pack the intent */
347         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
348         lit->opc = (__u64)it->it_op;
349
350         /* pack the intended request */
351         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
352                       lmmsize);
353
354         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
355                              obddev->u.cli.cl_max_mds_easize);
356         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
357
358         if (!(it->it_op & IT_CREAT) && it->it_op & IT_OPEN &&
359             req_capsule_has_field(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
360                                   RCL_CLIENT) &&
361             op_data->op_file_secctx_name_size > 0 &&
362             op_data->op_file_secctx_name != NULL) {
363                 char *secctx_name;
364
365                 secctx_name = req_capsule_client_get(&req->rq_pill,
366                                                      &RMF_FILE_SECCTX_NAME);
367                 memcpy(secctx_name, op_data->op_file_secctx_name,
368                        op_data->op_file_secctx_name_size);
369                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
370                                      RCL_SERVER,
371                                      obddev->u.cli.cl_max_mds_easize);
372
373                 CDEBUG(D_SEC, "packed '%.*s' as security xattr name\n",
374                        op_data->op_file_secctx_name_size,
375                        op_data->op_file_secctx_name);
376
377         } else {
378                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
379                                      RCL_SERVER, 0);
380         }
381
382         /**
383          * Inline buffer for possible data from Data-on-MDT files.
384          */
385         req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE, RCL_SERVER,
386                              sizeof(struct niobuf_remote));
387         ptlrpc_request_set_replen(req);
388
389         /* Get real repbuf allocated size as rounded up power of 2 */
390         repsize = size_roundup_power2(req->rq_replen +
391                                       lustre_msg_early_size());
392         /* Estimate free space for DoM files in repbuf */
393         repsize_estimate = repsize - (req->rq_replen -
394                            obddev->u.cli.cl_max_mds_easize +
395                            sizeof(struct lov_comp_md_v1) +
396                            sizeof(struct lov_comp_md_entry_v1) +
397                            lov_mds_md_size(0, LOV_MAGIC_V3));
398
399         if (repsize_estimate < obddev->u.cli.cl_dom_min_inline_repsize) {
400                 repsize = obddev->u.cli.cl_dom_min_inline_repsize -
401                           repsize_estimate + sizeof(struct niobuf_remote);
402                 req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE,
403                                      RCL_SERVER,
404                                      sizeof(struct niobuf_remote) + repsize);
405                 ptlrpc_request_set_replen(req);
406                 CDEBUG(D_INFO, "Increase repbuf by %d bytes, total: %d\n",
407                        repsize, req->rq_replen);
408                 repsize = size_roundup_power2(req->rq_replen +
409                                               lustre_msg_early_size());
410         }
411         /* The only way to report real allocated repbuf size to the server
412          * is the lm_repsize but it must be set prior buffer allocation itself
413          * due to security reasons - it is part of buffer used in signature
414          * calculation (see LU-11414). Therefore the saved size is predicted
415          * value as rq_replen rounded to the next higher power of 2.
416          * Such estimation is safe. Though the final allocated buffer might
417          * be even larger, it is not possible to know that at this point.
418          */
419         req->rq_reqmsg->lm_repsize = repsize;
420         RETURN(req);
421 }
422
423 #define GA_DEFAULT_EA_NAME_LEN 20
424 #define GA_DEFAULT_EA_VAL_LEN  250
425 #define GA_DEFAULT_EA_NUM      10
426
427 static struct ptlrpc_request *
428 mdc_intent_getxattr_pack(struct obd_export *exp,
429                          struct lookup_intent *it,
430                          struct md_op_data *op_data)
431 {
432         struct ptlrpc_request   *req;
433         struct ldlm_intent      *lit;
434         int                     rc, count = 0;
435         struct list_head        cancels = LIST_HEAD_INIT(cancels);
436         u32 ea_vals_buf_size = GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM;
437
438         ENTRY;
439
440         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
441                                         &RQF_LDLM_INTENT_GETXATTR);
442         if (req == NULL)
443                 RETURN(ERR_PTR(-ENOMEM));
444
445         /* get SELinux policy info if any */
446         rc = sptlrpc_get_sepol(req);
447         if (rc < 0) {
448                 ptlrpc_request_free(req);
449                 RETURN(ERR_PTR(rc));
450         }
451         req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
452                              strlen(req->rq_sepol) ?
453                              strlen(req->rq_sepol) + 1 : 0);
454
455         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
456         if (rc) {
457                 ptlrpc_request_free(req);
458                 RETURN(ERR_PTR(rc));
459         }
460
461         /* pack the intent */
462         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
463         lit->opc = IT_GETXATTR;
464         CDEBUG(D_INFO, "%s: get xattrs for "DFID"\n",
465                exp->exp_obd->obd_name, PFID(&op_data->op_fid1));
466
467 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 0, 53, 0)
468         /* If the supplied buffer is too small then the server will
469          * return -ERANGE and llite will fallback to using non cached
470          * xattr operations. On servers before 2.10.1 a (non-cached)
471          * listxattr RPC for an orphan or dead file causes an oops. So
472          * let's try to avoid sending too small a buffer to too old a
473          * server. This is effectively undoing the memory conservation
474          * of LU-9417 when it would be *more* likely to crash the
475          * server. See LU-9856. */
476         if (exp->exp_connect_data.ocd_version < OBD_OCD_VERSION(2, 10, 1, 0))
477                 ea_vals_buf_size = max_t(u32, ea_vals_buf_size,
478                                          exp->exp_connect_data.ocd_max_easize);
479 #endif
480
481         /* pack the intended request */
482         mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid,
483                       ea_vals_buf_size, -1, 0);
484
485         /* get SELinux policy info if any */
486         mdc_file_sepol_pack(req);
487
488         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
489                              GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM);
490
491         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER,
492                              ea_vals_buf_size);
493
494         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS, RCL_SERVER,
495                              sizeof(u32) * GA_DEFAULT_EA_NUM);
496
497         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
498
499         ptlrpc_request_set_replen(req);
500
501         RETURN(req);
502 }
503
504 static struct ptlrpc_request *
505 mdc_intent_getattr_pack(struct obd_export *exp, struct lookup_intent *it,
506                         struct md_op_data *op_data, __u32 acl_bufsize)
507 {
508         struct ptlrpc_request   *req;
509         struct obd_device       *obddev = class_exp2obd(exp);
510         u64                      valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
511                                          OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
512                                          OBD_MD_MEA | OBD_MD_FLACL;
513         struct ldlm_intent      *lit;
514         int                      rc;
515         __u32                    easize;
516         bool                     have_secctx = false;
517         ENTRY;
518
519         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
520                                    &RQF_LDLM_INTENT_GETATTR);
521         if (req == NULL)
522                 RETURN(ERR_PTR(-ENOMEM));
523
524         /* send name of security xattr to get upon intent */
525         if (it->it_op & (IT_LOOKUP | IT_GETATTR) &&
526             req_capsule_has_field(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
527                                   RCL_CLIENT) &&
528             op_data->op_file_secctx_name_size > 0 &&
529             op_data->op_file_secctx_name != NULL) {
530                 have_secctx = true;
531                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
532                                      RCL_CLIENT,
533                                      op_data->op_file_secctx_name_size);
534         }
535
536         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
537                              op_data->op_namelen + 1);
538
539         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
540         if (rc) {
541                 ptlrpc_request_free(req);
542                 RETURN(ERR_PTR(rc));
543         }
544
545         /* pack the intent */
546         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
547         lit->opc = (__u64)it->it_op;
548
549         if (obddev->u.cli.cl_default_mds_easize > 0)
550                 easize = obddev->u.cli.cl_default_mds_easize;
551         else
552                 easize = obddev->u.cli.cl_max_mds_easize;
553
554         /* pack the intended request */
555         mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
556
557         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
558         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
559
560         if (have_secctx) {
561                 char *secctx_name;
562
563                 secctx_name = req_capsule_client_get(&req->rq_pill,
564                                                      &RMF_FILE_SECCTX_NAME);
565                 memcpy(secctx_name, op_data->op_file_secctx_name,
566                        op_data->op_file_secctx_name_size);
567
568                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
569                                      RCL_SERVER, easize);
570
571                 CDEBUG(D_SEC, "packed '%.*s' as security xattr name\n",
572                        op_data->op_file_secctx_name_size,
573                        op_data->op_file_secctx_name);
574         } else {
575                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
576                                      RCL_SERVER, 0);
577         }
578
579         ptlrpc_request_set_replen(req);
580         RETURN(req);
581 }
582
583 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
584                                                      struct lookup_intent *it,
585                                                      struct md_op_data *op_data)
586 {
587         struct obd_device     *obd = class_exp2obd(exp);
588         struct ptlrpc_request *req;
589         struct ldlm_intent    *lit;
590         struct layout_intent  *layout;
591         int rc;
592         ENTRY;
593
594         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
595                                 &RQF_LDLM_INTENT_LAYOUT);
596         if (req == NULL)
597                 RETURN(ERR_PTR(-ENOMEM));
598
599         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
600         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
601         if (rc) {
602                 ptlrpc_request_free(req);
603                 RETURN(ERR_PTR(rc));
604         }
605
606         /* pack the intent */
607         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
608         lit->opc = (__u64)it->it_op;
609
610         /* pack the layout intent request */
611         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
612         LASSERT(op_data->op_data != NULL);
613         LASSERT(op_data->op_data_size == sizeof(*layout));
614         memcpy(layout, op_data->op_data, sizeof(*layout));
615
616         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
617                              obd->u.cli.cl_default_mds_easize);
618         ptlrpc_request_set_replen(req);
619         RETURN(req);
620 }
621
622 static struct ptlrpc_request *
623 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
624 {
625         struct ptlrpc_request *req;
626         int rc;
627         ENTRY;
628
629         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
630         if (req == NULL)
631                 RETURN(ERR_PTR(-ENOMEM));
632
633         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
634         if (rc) {
635                 ptlrpc_request_free(req);
636                 RETURN(ERR_PTR(rc));
637         }
638
639         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
640         ptlrpc_request_set_replen(req);
641         RETURN(req);
642 }
643
644 static int mdc_finish_enqueue(struct obd_export *exp,
645                               struct ptlrpc_request *req,
646                               struct ldlm_enqueue_info *einfo,
647                               struct lookup_intent *it,
648                               struct lustre_handle *lockh,
649                               int rc)
650 {
651         struct req_capsule  *pill = &req->rq_pill;
652         struct ldlm_request *lockreq;
653         struct ldlm_reply   *lockrep;
654         struct ldlm_lock    *lock;
655         struct mdt_body     *body = NULL;
656         void                *lvb_data = NULL;
657         __u32                lvb_len = 0;
658
659         ENTRY;
660
661         LASSERT(rc >= 0);
662         /* Similarly, if we're going to replay this request, we don't want to
663          * actually get a lock, just perform the intent. */
664         if (req->rq_transno || req->rq_replay) {
665                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
666                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
667         }
668
669         if (rc == ELDLM_LOCK_ABORTED) {
670                 einfo->ei_mode = 0;
671                 memset(lockh, 0, sizeof(*lockh));
672                 rc = 0;
673         } else { /* rc = 0 */
674                 lock = ldlm_handle2lock(lockh);
675                 LASSERT(lock != NULL);
676
677                 /* If the server gave us back a different lock mode, we should
678                  * fix up our variables. */
679                 if (lock->l_req_mode != einfo->ei_mode) {
680                         ldlm_lock_addref(lockh, lock->l_req_mode);
681                         ldlm_lock_decref(lockh, einfo->ei_mode);
682                         einfo->ei_mode = lock->l_req_mode;
683                 }
684                 LDLM_LOCK_PUT(lock);
685         }
686
687         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
688         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
689
690         it->it_disposition = (int)lockrep->lock_policy_res1;
691         it->it_status = (int)lockrep->lock_policy_res2;
692         it->it_lock_mode = einfo->ei_mode;
693         it->it_lock_handle = lockh->cookie;
694         it->it_request = req;
695
696         /* Technically speaking rq_transno must already be zero if
697          * it_status is in error, so the check is a bit redundant */
698         if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
699                 mdc_clear_replay_flag(req, it->it_status);
700
701         /* If we're doing an IT_OPEN which did not result in an actual
702          * successful open, then we need to remove the bit which saves
703          * this request for unconditional replay.
704          *
705          * It's important that we do this first!  Otherwise we might exit the
706          * function without doing so, and try to replay a failed create
707          * (bug 3440) */
708         if (it->it_op & IT_OPEN && req->rq_replay &&
709             (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
710                 mdc_clear_replay_flag(req, it->it_status);
711
712         DEBUG_REQ(D_RPCTRACE, req, "op: %x disposition: %x, status: %d",
713                   it->it_op, it->it_disposition, it->it_status);
714
715         /* We know what to expect, so we do any byte flipping required here */
716         if (it_has_reply_body(it)) {
717                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
718                 if (body == NULL) {
719                         CERROR ("Can't swab mdt_body\n");
720                         RETURN (-EPROTO);
721                 }
722
723                 if (it_disposition(it, DISP_OPEN_OPEN) &&
724                     !it_open_error(DISP_OPEN_OPEN, it)) {
725                         /*
726                          * If this is a successful OPEN request, we need to set
727                          * replay handler and data early, so that if replay
728                          * happens immediately after swabbing below, new reply
729                          * is swabbed by that handler correctly.
730                          */
731                         mdc_set_open_replay_data(NULL, NULL, it);
732                 }
733
734                 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
735                         void *eadata;
736
737                         mdc_update_max_ea_from_body(exp, body);
738
739                         /*
740                          * The eadata is opaque; just check that it is there.
741                          * Eventually, obd_unpackmd() will check the contents.
742                          */
743                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
744                                                         body->mbo_eadatasize);
745                         if (eadata == NULL)
746                                 RETURN(-EPROTO);
747
748                         /* save lvb data and length in case this is for layout
749                          * lock */
750                         lvb_data = eadata;
751                         lvb_len = body->mbo_eadatasize;
752
753                         /*
754                          * We save the reply LOV EA in case we have to replay a
755                          * create for recovery.  If we didn't allocate a large
756                          * enough request buffer above we need to reallocate it
757                          * here to hold the actual LOV EA.
758                          *
759                          * To not save LOV EA if request is not going to replay
760                          * (for example error one).
761                          */
762                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
763                                 rc = mdc_save_lovea(req, &RMF_EADATA, eadata,
764                                                     body->mbo_eadatasize);
765                                 if (rc) {
766                                         body->mbo_valid &= ~OBD_MD_FLEASIZE;
767                                         body->mbo_eadatasize = 0;
768                                         rc = 0;
769                                 }
770                         }
771                 }
772         } else if (it->it_op & IT_LAYOUT) {
773                 /* maybe the lock was granted right away and layout
774                  * is packed into RMF_DLM_LVB of req */
775                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
776                 CDEBUG(D_INFO, "%s: layout return lvb %d transno %lld\n",
777                        class_exp2obd(exp)->obd_name, lvb_len, req->rq_transno);
778                 if (lvb_len > 0) {
779                         lvb_data = req_capsule_server_sized_get(pill,
780                                                         &RMF_DLM_LVB, lvb_len);
781                         if (lvb_data == NULL)
782                                 RETURN(-EPROTO);
783
784                         /**
785                          * save replied layout data to the request buffer for
786                          * recovery consideration (lest MDS reinitialize
787                          * another set of OST objects).
788                          */
789                         if (req->rq_transno)
790                                 (void)mdc_save_lovea(req, &RMF_EADATA, lvb_data,
791                                                      lvb_len);
792                 }
793         }
794
795         /* fill in stripe data for layout lock.
796          * LU-6581: trust layout data only if layout lock is granted. The MDT
797          * has stopped sending layout unless the layout lock is granted. The
798          * client still does this checking in case it's talking with an old
799          * server. - Jinshan */
800         lock = ldlm_handle2lock(lockh);
801         if (lock == NULL)
802                 RETURN(rc);
803
804         if (ldlm_has_layout(lock) && lvb_data != NULL &&
805             !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
806                 void *lmm;
807
808                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
809                         ldlm_it2str(it->it_op), lvb_len);
810
811                 OBD_ALLOC_LARGE(lmm, lvb_len);
812                 if (lmm == NULL)
813                         GOTO(out_lock, rc = -ENOMEM);
814
815                 memcpy(lmm, lvb_data, lvb_len);
816
817                 /* install lvb_data */
818                 lock_res_and_lock(lock);
819                 if (lock->l_lvb_data == NULL) {
820                         lock->l_lvb_type = LVB_T_LAYOUT;
821                         lock->l_lvb_data = lmm;
822                         lock->l_lvb_len = lvb_len;
823                         lmm = NULL;
824                 }
825                 unlock_res_and_lock(lock);
826                 if (lmm != NULL)
827                         OBD_FREE_LARGE(lmm, lvb_len);
828         }
829
830         if (ldlm_has_dom(lock)) {
831                 LASSERT(lock->l_glimpse_ast == mdc_ldlm_glimpse_ast);
832
833                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
834                 if (!(body->mbo_valid & OBD_MD_DOM_SIZE)) {
835                         LDLM_ERROR(lock, "%s: DoM lock without size.",
836                                    exp->exp_obd->obd_name);
837                         GOTO(out_lock, rc = -EPROTO);
838                 }
839
840                 LDLM_DEBUG(lock, "DoM lock is returned by: %s, size: %llu",
841                            ldlm_it2str(it->it_op), body->mbo_dom_size);
842
843                 rc = mdc_fill_lvb(req, &lock->l_ost_lvb);
844         }
845 out_lock:
846         LDLM_LOCK_PUT(lock);
847
848         RETURN(rc);
849 }
850
851 /* We always reserve enough space in the reply packet for a stripe MD, because
852  * we don't know in advance the file type. */
853 static int mdc_enqueue_base(struct obd_export *exp,
854                             struct ldlm_enqueue_info *einfo,
855                             const union ldlm_policy_data *policy,
856                             struct lookup_intent *it,
857                             struct md_op_data *op_data,
858                             struct lustre_handle *lockh,
859                             __u64 extra_lock_flags)
860 {
861         struct obd_device *obddev = class_exp2obd(exp);
862         struct ptlrpc_request *req = NULL;
863         __u64 flags, saved_flags = extra_lock_flags;
864         struct ldlm_res_id res_id;
865         static const union ldlm_policy_data lookup_policy = {
866                                   .l_inodebits = { MDS_INODELOCK_LOOKUP } };
867         static const union ldlm_policy_data update_policy = {
868                                   .l_inodebits = { MDS_INODELOCK_UPDATE } };
869         static const union ldlm_policy_data layout_policy = {
870                                   .l_inodebits = { MDS_INODELOCK_LAYOUT } };
871         static const union ldlm_policy_data getxattr_policy = {
872                                   .l_inodebits = { MDS_INODELOCK_XATTR } };
873         int generation, resends = 0;
874         struct ldlm_reply *lockrep;
875         struct obd_import *imp = class_exp2cliimp(exp);
876         __u32 acl_bufsize;
877         enum lvb_type lvb_type = 0;
878         int rc;
879         ENTRY;
880
881         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
882                  einfo->ei_type);
883         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
884
885         if (it != NULL) {
886                 LASSERT(policy == NULL);
887
888                 saved_flags |= LDLM_FL_HAS_INTENT;
889                 if (it->it_op & (IT_GETATTR | IT_READDIR))
890                         policy = &update_policy;
891                 else if (it->it_op & IT_LAYOUT)
892                         policy = &layout_policy;
893                 else if (it->it_op & IT_GETXATTR)
894                         policy = &getxattr_policy;
895                 else
896                         policy = &lookup_policy;
897         }
898
899         generation = obddev->u.cli.cl_import->imp_generation;
900         if (!it || (it->it_op & (IT_OPEN | IT_CREAT)))
901                 acl_bufsize = MIN(imp->imp_connect_data.ocd_max_easize,
902                                   XATTR_SIZE_MAX);
903         else
904                 acl_bufsize = LUSTRE_POSIX_ACL_MAX_SIZE_OLD;
905
906 resend:
907         flags = saved_flags;
908         if (it == NULL) {
909                 /* The only way right now is FLOCK. */
910                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
911                          einfo->ei_type);
912                 res_id.name[3] = LDLM_FLOCK;
913         } else if (it->it_op & IT_OPEN) {
914                 req = mdc_intent_open_pack(exp, it, op_data, acl_bufsize);
915         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
916                 req = mdc_intent_getattr_pack(exp, it, op_data, acl_bufsize);
917         } else if (it->it_op & IT_READDIR) {
918                 req = mdc_enqueue_pack(exp, 0);
919         } else if (it->it_op & IT_LAYOUT) {
920                 if (!imp_connect_lvb_type(imp))
921                         RETURN(-EOPNOTSUPP);
922                 req = mdc_intent_layout_pack(exp, it, op_data);
923                 lvb_type = LVB_T_LAYOUT;
924         } else if (it->it_op & IT_GETXATTR) {
925                 req = mdc_intent_getxattr_pack(exp, it, op_data);
926         } else {
927                 LBUG();
928                 RETURN(-EINVAL);
929         }
930
931         if (IS_ERR(req))
932                 RETURN(PTR_ERR(req));
933
934         if (resends) {
935                 req->rq_generation_set = 1;
936                 req->rq_import_generation = generation;
937                 req->rq_sent = ktime_get_real_seconds() + resends;
938         }
939
940         /* It is important to obtain modify RPC slot first (if applicable), so
941          * that threads that are waiting for a modify RPC slot are not polluting
942          * our rpcs in flight counter.
943          * We do not do flock request limiting, though */
944         if (it) {
945                 mdc_get_mod_rpc_slot(req, it);
946                 rc = obd_get_request_slot(&obddev->u.cli);
947                 if (rc != 0) {
948                         mdc_put_mod_rpc_slot(req, it);
949                         mdc_clear_replay_flag(req, 0);
950                         ptlrpc_req_finished(req);
951                         RETURN(rc);
952                 }
953         }
954
955         /* With Data-on-MDT the glimpse callback is needed too.
956          * It is set here in advance but not in mdc_finish_enqueue()
957          * to avoid possible races. It is safe to have glimpse handler
958          * for non-DOM locks and costs nothing.*/
959         if (einfo->ei_cb_gl == NULL)
960                 einfo->ei_cb_gl = mdc_ldlm_glimpse_ast;
961
962         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
963                               0, lvb_type, lockh, 0);
964         if (!it) {
965                 /* For flock requests we immediatelly return without further
966                    delay and let caller deal with the rest, since rest of
967                    this function metadata processing makes no sense for flock
968                    requests anyway. But in case of problem during comms with
969                    Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
970                    can not rely on caller and this mainly for F_UNLCKs
971                    (explicits or automatically generated by Kernel to clean
972                    current FLocks upon exit) that can't be trashed */
973                 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
974                     (einfo->ei_type == LDLM_FLOCK) &&
975                     (einfo->ei_mode == LCK_NL))
976                         goto resend;
977                 RETURN(rc);
978         }
979
980         obd_put_request_slot(&obddev->u.cli);
981         mdc_put_mod_rpc_slot(req, it);
982
983         if (rc < 0) {
984                 CDEBUG(D_INFO,
985                       "%s: ldlm_cli_enqueue "DFID":"DFID"=%s failed: rc = %d\n",
986                       obddev->obd_name, PFID(&op_data->op_fid1),
987                       PFID(&op_data->op_fid2), op_data->op_name ?: "", rc);
988
989                 mdc_clear_replay_flag(req, rc);
990                 ptlrpc_req_finished(req);
991                 RETURN(rc);
992         }
993
994         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
995         LASSERT(lockrep != NULL);
996
997         lockrep->lock_policy_res2 =
998                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
999
1000         /* Retry infinitely when the server returns -EINPROGRESS for the
1001          * intent operation, when server returns -EINPROGRESS for acquiring
1002          * intent lock, we'll retry in after_reply(). */
1003         if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
1004                 mdc_clear_replay_flag(req, rc);
1005                 ptlrpc_req_finished(req);
1006                 if (generation == obddev->u.cli.cl_import->imp_generation) {
1007                         if (signal_pending(current))
1008                                 RETURN(-EINTR);
1009
1010                         resends++;
1011                         CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
1012                                obddev->obd_name, resends, it->it_op,
1013                                PFID(&op_data->op_fid1),
1014                                PFID(&op_data->op_fid2));
1015                         goto resend;
1016                 } else {
1017                         CDEBUG(D_HA, "resend cross eviction\n");
1018                         RETURN(-EIO);
1019                 }
1020         }
1021
1022         if ((int)lockrep->lock_policy_res2 == -ERANGE &&
1023             it->it_op & (IT_OPEN | IT_GETATTR | IT_LOOKUP) &&
1024             acl_bufsize == LUSTRE_POSIX_ACL_MAX_SIZE_OLD) {
1025                 mdc_clear_replay_flag(req, -ERANGE);
1026                 ptlrpc_req_finished(req);
1027                 acl_bufsize = MIN(imp->imp_connect_data.ocd_max_easize,
1028                                   XATTR_SIZE_MAX);
1029                 goto resend;
1030         }
1031
1032         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1033         if (rc < 0) {
1034                 if (lustre_handle_is_used(lockh)) {
1035                         ldlm_lock_decref(lockh, einfo->ei_mode);
1036                         memset(lockh, 0, sizeof(*lockh));
1037                 }
1038                 ptlrpc_req_finished(req);
1039
1040                 it->it_lock_handle = 0;
1041                 it->it_lock_mode = 0;
1042                 it->it_request = NULL;
1043         }
1044
1045         RETURN(rc);
1046 }
1047
1048 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
1049                 const union ldlm_policy_data *policy,
1050                 struct md_op_data *op_data,
1051                 struct lustre_handle *lockh, __u64 extra_lock_flags)
1052 {
1053         return mdc_enqueue_base(exp, einfo, policy, NULL,
1054                                 op_data, lockh, extra_lock_flags);
1055 }
1056
1057 static int mdc_finish_intent_lock(struct obd_export *exp,
1058                                   struct ptlrpc_request *request,
1059                                   struct md_op_data *op_data,
1060                                   struct lookup_intent *it,
1061                                   struct lustre_handle *lockh)
1062 {
1063         struct lustre_handle old_lock;
1064         struct ldlm_lock *lock;
1065         int rc = 0;
1066         ENTRY;
1067
1068         LASSERT(request != NULL);
1069         LASSERT(request != LP_POISON);
1070         LASSERT(request->rq_repmsg != LP_POISON);
1071
1072         if (it->it_op & IT_READDIR)
1073                 RETURN(0);
1074
1075         if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) {
1076                 if (it->it_status != 0)
1077                         GOTO(out, rc = it->it_status);
1078         } else {
1079                 if (!it_disposition(it, DISP_IT_EXECD)) {
1080                         /* The server failed before it even started executing
1081                          * the intent, i.e. because it couldn't unpack the
1082                          * request.
1083                          */
1084                         LASSERT(it->it_status != 0);
1085                         GOTO(out, rc = it->it_status);
1086                 }
1087                 rc = it_open_error(DISP_IT_EXECD, it);
1088                 if (rc)
1089                         GOTO(out, rc);
1090
1091                 rc = it_open_error(DISP_LOOKUP_EXECD, it);
1092                 if (rc)
1093                         GOTO(out, rc);
1094
1095                 /* keep requests around for the multiple phases of the call
1096                  * this shows the DISP_XX must guarantee we make it into the
1097                  * call
1098                  */
1099                 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
1100                     it_disposition(it, DISP_OPEN_CREATE) &&
1101                     !it_open_error(DISP_OPEN_CREATE, it)) {
1102                         it_set_disposition(it, DISP_ENQ_CREATE_REF);
1103                         /* balanced in ll_create_node */
1104                         ptlrpc_request_addref(request);
1105                 }
1106                 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1107                     it_disposition(it, DISP_OPEN_OPEN) &&
1108                     !it_open_error(DISP_OPEN_OPEN, it)) {
1109                         it_set_disposition(it, DISP_ENQ_OPEN_REF);
1110                         /* balanced in ll_file_open */
1111                         ptlrpc_request_addref(request);
1112                         /* BUG 11546 - eviction in the middle of open rpc
1113                          * processing
1114                          */
1115                         OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE,
1116                                          obd_timeout);
1117                 }
1118
1119                 if (it->it_op & IT_CREAT) {
1120                         /* XXX this belongs in ll_create_it */
1121                 } else if (it->it_op == IT_OPEN) {
1122                         LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1123                 } else {
1124                         LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
1125                 }
1126         }
1127
1128         /* If we already have a matching lock, then cancel the new
1129          * one.  We have to set the data here instead of in
1130          * mdc_enqueue, because we need to use the child's inode as
1131          * the l_ast_data to match, and that's not available until
1132          * intent_finish has performed the iget().) */
1133         lock = ldlm_handle2lock(lockh);
1134         if (lock) {
1135                 union ldlm_policy_data policy = lock->l_policy_data;
1136                 LDLM_DEBUG(lock, "matching against this");
1137
1138                 if (it_has_reply_body(it)) {
1139                         struct mdt_body *body;
1140
1141                         body = req_capsule_server_get(&request->rq_pill,
1142                                                       &RMF_MDT_BODY);
1143                         /* mdc_enqueue checked */
1144                         LASSERT(body != NULL);
1145                         LASSERTF(fid_res_name_eq(&body->mbo_fid1,
1146                                                  &lock->l_resource->lr_name),
1147                                  "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1148                                  PLDLMRES(lock->l_resource),
1149                                  PFID(&body->mbo_fid1));
1150                 }
1151                 LDLM_LOCK_PUT(lock);
1152
1153                 memcpy(&old_lock, lockh, sizeof(*lockh));
1154                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1155                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1156                         ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
1157                         memcpy(lockh, &old_lock, sizeof(old_lock));
1158                         it->it_lock_handle = lockh->cookie;
1159                 }
1160         }
1161
1162         EXIT;
1163 out:
1164         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1165                 (int)op_data->op_namelen, op_data->op_name,
1166                 ldlm_it2str(it->it_op), it->it_status,
1167                 it->it_disposition, rc);
1168         return rc;
1169 }
1170
1171 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1172                         struct lu_fid *fid, __u64 *bits)
1173 {
1174         /* We could just return 1 immediately, but since we should only
1175          * be called in revalidate_it if we already have a lock, let's
1176          * verify that. */
1177         struct ldlm_res_id res_id;
1178         struct lustre_handle lockh;
1179         union ldlm_policy_data policy;
1180         enum ldlm_mode mode;
1181         ENTRY;
1182
1183         if (it->it_lock_handle) {
1184                 lockh.cookie = it->it_lock_handle;
1185                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1186         } else {
1187                 fid_build_reg_res_name(fid, &res_id);
1188                 switch (it->it_op) {
1189                 case IT_GETATTR:
1190                         /* File attributes are held under multiple bits:
1191                          * nlink is under lookup lock, size and times are
1192                          * under UPDATE lock and recently we've also got
1193                          * a separate permissions lock for owner/group/acl that
1194                          * were protected by lookup lock before.
1195                          * Getattr must provide all of that information,
1196                          * so we need to ensure we have all of those locks.
1197                          * Unfortunately, if the bits are split across multiple
1198                          * locks, there's no easy way to match all of them here,
1199                          * so an extra RPC would be performed to fetch all
1200                          * of those bits at once for now. */
1201                         /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1202                          * but for old MDTs (< 2.4), permission is covered
1203                          * by LOOKUP lock, so it needs to match all bits here.*/
1204                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1205                                                   MDS_INODELOCK_LOOKUP |
1206                                                   MDS_INODELOCK_PERM;
1207                         break;
1208                 case IT_READDIR:
1209                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1210                         break;
1211                 case IT_LAYOUT:
1212                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1213                         break;
1214                 default:
1215                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1216                         break;
1217                 }
1218
1219                 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1220                                       LDLM_IBITS, &policy,
1221                                       LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1222                                       &lockh);
1223         }
1224
1225         if (mode) {
1226                 it->it_lock_handle = lockh.cookie;
1227                 it->it_lock_mode = mode;
1228         } else {
1229                 it->it_lock_handle = 0;
1230                 it->it_lock_mode = 0;
1231         }
1232
1233         RETURN(!!mode);
1234 }
1235
1236 /*
1237  * This long block is all about fixing up the lock and request state
1238  * so that it is correct as of the moment _before_ the operation was
1239  * applied; that way, the VFS will think that everything is normal and
1240  * call Lustre's regular VFS methods.
1241  *
1242  * If we're performing a creation, that means that unless the creation
1243  * failed with EEXIST, we should fake up a negative dentry.
1244  *
1245  * For everything else, we want to lookup to succeed.
1246  *
1247  * One additional note: if CREATE or OPEN succeeded, we add an extra
1248  * reference to the request because we need to keep it around until
1249  * ll_create/ll_open gets called.
1250  *
1251  * The server will return to us, in it_disposition, an indication of
1252  * exactly what it_status refers to.
1253  *
1254  * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1255  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1256  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1257  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1258  * was successful.
1259  *
1260  * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1261  * child lookup.
1262  */
1263 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1264                     struct lookup_intent *it, struct ptlrpc_request **reqp,
1265                     ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1266 {
1267         struct ldlm_enqueue_info einfo = {
1268                 .ei_type        = LDLM_IBITS,
1269                 .ei_mode        = it_to_lock_mode(it),
1270                 .ei_cb_bl       = cb_blocking,
1271                 .ei_cb_cp       = ldlm_completion_ast,
1272                 .ei_cb_gl       = mdc_ldlm_glimpse_ast,
1273         };
1274         struct lustre_handle lockh;
1275         int rc = 0;
1276         ENTRY;
1277         LASSERT(it);
1278
1279         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1280                 ", intent: %s flags %#llo\n", (int)op_data->op_namelen,
1281                 op_data->op_name, PFID(&op_data->op_fid2),
1282                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1283                 it->it_flags);
1284
1285         lockh.cookie = 0;
1286         if (fid_is_sane(&op_data->op_fid2) &&
1287             (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1288                 /* We could just return 1 immediately, but since we should only
1289                  * be called in revalidate_it if we already have a lock, let's
1290                  * verify that. */
1291                 it->it_lock_handle = 0;
1292                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1293                 /* Only return failure if it was not GETATTR by cfid
1294                    (from inode_revalidate) */
1295                 if (rc || op_data->op_namelen != 0)
1296                         RETURN(rc);
1297         }
1298
1299         /* For case if upper layer did not alloc fid, do it now. */
1300         if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1301                 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1302                 if (rc < 0) {
1303                         CERROR("Can't alloc new fid, rc %d\n", rc);
1304                         RETURN(rc);
1305                 }
1306         }
1307
1308         rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh,
1309                               extra_lock_flags);
1310         if (rc < 0)
1311                 RETURN(rc);
1312
1313         *reqp = it->it_request;
1314         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1315         RETURN(rc);
1316 }
1317
1318 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1319                                               struct ptlrpc_request *req,
1320                                               void *args, int rc)
1321 {
1322         struct mdc_getattr_args  *ga = args;
1323         struct obd_export *exp = ga->ga_exp;
1324         struct md_enqueue_info *minfo = ga->ga_minfo;
1325         struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1326         struct lookup_intent *it;
1327         struct lustre_handle *lockh;
1328         struct obd_device *obddev;
1329         struct ldlm_reply *lockrep;
1330         __u64 flags = LDLM_FL_HAS_INTENT;
1331         ENTRY;
1332
1333         it    = &minfo->mi_it;
1334         lockh = &minfo->mi_lockh;
1335
1336         obddev = class_exp2obd(exp);
1337
1338         obd_put_request_slot(&obddev->u.cli);
1339         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1340                 rc = -ETIMEDOUT;
1341
1342         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1343                                    &flags, NULL, 0, lockh, rc);
1344         if (rc < 0) {
1345                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1346                 mdc_clear_replay_flag(req, rc);
1347                 GOTO(out, rc);
1348         }
1349
1350         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1351         LASSERT(lockrep != NULL);
1352
1353         lockrep->lock_policy_res2 =
1354                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1355
1356         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1357         if (rc)
1358                 GOTO(out, rc);
1359
1360         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1361         EXIT;
1362
1363 out:
1364         minfo->mi_cb(req, minfo, rc);
1365         return 0;
1366 }
1367
1368 int mdc_intent_getattr_async(struct obd_export *exp,
1369                              struct md_enqueue_info *minfo)
1370 {
1371         struct md_op_data       *op_data = &minfo->mi_data;
1372         struct lookup_intent    *it = &minfo->mi_it;
1373         struct ptlrpc_request   *req;
1374         struct mdc_getattr_args *ga;
1375         struct obd_device       *obddev = class_exp2obd(exp);
1376         struct ldlm_res_id       res_id;
1377         union ldlm_policy_data policy = {
1378                                 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1379                                                  MDS_INODELOCK_UPDATE } };
1380         int                      rc = 0;
1381         __u64                    flags = LDLM_FL_HAS_INTENT;
1382         ENTRY;
1383
1384         CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#llo\n",
1385                 (int)op_data->op_namelen, op_data->op_name,
1386                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1387
1388         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1389         /* If the MDT return -ERANGE because of large ACL, then the sponsor
1390          * of the async getattr RPC will handle that by itself. */
1391         req = mdc_intent_getattr_pack(exp, it, op_data,
1392                                       LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
1393         if (IS_ERR(req))
1394                 RETURN(PTR_ERR(req));
1395
1396         rc = obd_get_request_slot(&obddev->u.cli);
1397         if (rc != 0) {
1398                 ptlrpc_req_finished(req);
1399                 RETURN(rc);
1400         }
1401
1402         /* With Data-on-MDT the glimpse callback is needed too.
1403          * It is set here in advance but not in mdc_finish_enqueue()
1404          * to avoid possible races. It is safe to have glimpse handler
1405          * for non-DOM locks and costs nothing.*/
1406         if (minfo->mi_einfo.ei_cb_gl == NULL)
1407                 minfo->mi_einfo.ei_cb_gl = mdc_ldlm_glimpse_ast;
1408
1409         rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1410                               &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1411         if (rc < 0) {
1412                 obd_put_request_slot(&obddev->u.cli);
1413                 ptlrpc_req_finished(req);
1414                 RETURN(rc);
1415         }
1416
1417         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1418         ga = ptlrpc_req_async_args(req);
1419         ga->ga_exp = exp;
1420         ga->ga_minfo = minfo;
1421
1422         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1423         ptlrpcd_add_req(req);
1424
1425         RETURN(0);
1426 }