Whamcloud - gitweb
24cea59ffba54ccb8291529c3de5322b34bcbe79
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_MDC
34
35 #include <linux/module.h>
36
37 #include <obd.h>
38 #include <obd_class.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_intent.h>
42 #include <lustre_mdc.h>
43 #include <lustre_net.h>
44 #include <lustre_req_layout.h>
45 #include <lustre_swab.h>
46 #include <lustre_acl.h>
47
48 #include "mdc_internal.h"
49
50 struct mdc_getattr_args {
51         struct obd_export               *ga_exp;
52         struct md_enqueue_info          *ga_minfo;
53 };
54
55 int it_open_error(int phase, struct lookup_intent *it)
56 {
57         if (it_disposition(it, DISP_OPEN_LEASE)) {
58                 if (phase >= DISP_OPEN_LEASE)
59                         return it->it_status;
60                 else
61                         return 0;
62         }
63         if (it_disposition(it, DISP_OPEN_OPEN)) {
64                 if (phase >= DISP_OPEN_OPEN)
65                         return it->it_status;
66                 else
67                         return 0;
68         }
69
70         if (it_disposition(it, DISP_OPEN_CREATE)) {
71                 if (phase >= DISP_OPEN_CREATE)
72                         return it->it_status;
73                 else
74                         return 0;
75         }
76
77         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
78                 if (phase >= DISP_LOOKUP_EXECD)
79                         return it->it_status;
80                 else
81                         return 0;
82         }
83
84         if (it_disposition(it, DISP_IT_EXECD)) {
85                 if (phase >= DISP_IT_EXECD)
86                         return it->it_status;
87                 else
88                         return 0;
89         }
90
91         CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
92         LBUG();
93
94         return 0;
95 }
96 EXPORT_SYMBOL(it_open_error);
97
98 /* this must be called on a lockh that is known to have a referenced lock */
99 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
100                       void *data, __u64 *bits)
101 {
102         struct ldlm_lock *lock;
103         struct inode *new_inode = data;
104         ENTRY;
105
106         if(bits)
107                 *bits = 0;
108
109         if (!lustre_handle_is_used(lockh))
110                 RETURN(0);
111
112         lock = ldlm_handle2lock(lockh);
113
114         LASSERT(lock != NULL);
115         lock_res_and_lock(lock);
116         if (lock->l_resource->lr_lvb_inode &&
117             lock->l_resource->lr_lvb_inode != data) {
118                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
119                 LASSERTF(old_inode->i_state & I_FREEING,
120                          "Found existing inode %p/%lu/%u state %lu in lock: "
121                          "setting data to %p/%lu/%u\n", old_inode,
122                          old_inode->i_ino, old_inode->i_generation,
123                          old_inode->i_state,
124                          new_inode, new_inode->i_ino, new_inode->i_generation);
125         }
126         lock->l_resource->lr_lvb_inode = new_inode;
127         if (bits)
128                 *bits = lock->l_policy_data.l_inodebits.bits;
129
130         unlock_res_and_lock(lock);
131         LDLM_LOCK_PUT(lock);
132
133         RETURN(0);
134 }
135
136 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
137                               const struct lu_fid *fid, enum ldlm_type type,
138                               union ldlm_policy_data *policy,
139                               enum ldlm_mode mode, struct lustre_handle *lockh)
140 {
141         struct ldlm_res_id res_id;
142         enum ldlm_mode rc;
143         ENTRY;
144
145         fid_build_reg_res_name(fid, &res_id);
146         /* LU-4405: Clear bits not supported by server */
147         policy->l_inodebits.bits &= exp_connect_ibits(exp);
148         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
149                              &res_id, type, policy, mode, lockh, 0);
150         RETURN(rc);
151 }
152
153 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
154                       union ldlm_policy_data *policy, enum ldlm_mode mode,
155                       enum ldlm_cancel_flags flags, void *opaque)
156 {
157         struct obd_device *obd = class_exp2obd(exp);
158         struct ldlm_res_id res_id;
159         int rc;
160
161         ENTRY;
162
163         fid_build_reg_res_name(fid, &res_id);
164         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
165                                              policy, mode, flags, opaque);
166         RETURN(rc);
167 }
168
169 int mdc_null_inode(struct obd_export *exp,
170                    const struct lu_fid *fid)
171 {
172         struct ldlm_res_id res_id;
173         struct ldlm_resource *res;
174         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
175         ENTRY;
176
177         LASSERTF(ns != NULL, "no namespace passed\n");
178
179         fid_build_reg_res_name(fid, &res_id);
180
181         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
182         if (IS_ERR(res))
183                 RETURN(0);
184
185         lock_res(res);
186         res->lr_lvb_inode = NULL;
187         unlock_res(res);
188
189         ldlm_resource_putref(res);
190         RETURN(0);
191 }
192
193 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
194 {
195         /* Don't hold error requests for replay. */
196         if (req->rq_replay) {
197                 spin_lock(&req->rq_lock);
198                 req->rq_replay = 0;
199                 spin_unlock(&req->rq_lock);
200         }
201         if (rc && req->rq_transno != 0) {
202                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
203                 LBUG();
204         }
205 }
206
207 /* Save a large LOV EA into the request buffer so that it is available
208  * for replay.  We don't do this in the initial request because the
209  * original request doesn't need this buffer (at most it sends just the
210  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
211  * buffer and may also be difficult to allocate and save a very large
212  * request buffer for each open. (bug 5707)
213  *
214  * OOM here may cause recovery failure if lmm is needed (only for the
215  * original open if the MDS crashed just when this client also OOM'd)
216  * but this is incredibly unlikely, and questionable whether the client
217  * could do MDS recovery under OOM anyways... */
218 int mdc_save_lovea(struct ptlrpc_request *req,
219                    const struct req_msg_field *field,
220                    void *data, u32 size)
221 {
222         struct req_capsule *pill = &req->rq_pill;
223         void *lmm;
224         int rc = 0;
225
226         if (req_capsule_get_size(pill, field, RCL_CLIENT) < size) {
227                 rc = sptlrpc_cli_enlarge_reqbuf(req, field, size);
228                 if (rc) {
229                         CERROR("%s: Can't enlarge ea size to %d: rc = %d\n",
230                                req->rq_export->exp_obd->obd_name,
231                                size, rc);
232                         return rc;
233                 }
234         } else {
235                 req_capsule_shrink(pill, field, size, RCL_CLIENT);
236         }
237
238         req_capsule_set_size(pill, field, RCL_CLIENT, size);
239         lmm = req_capsule_client_get(pill, field);
240         if (lmm)
241                 memcpy(lmm, data, size);
242
243         return rc;
244 }
245
246 static struct ptlrpc_request *
247 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
248                      struct md_op_data *op_data, __u32 acl_bufsize)
249 {
250         struct ptlrpc_request   *req;
251         struct obd_device       *obddev = class_exp2obd(exp);
252         struct ldlm_intent      *lit;
253         const void              *lmm = op_data->op_data;
254         __u32                    lmmsize = op_data->op_data_size;
255         struct list_head         cancels = LIST_HEAD_INIT(cancels);
256         int                      count = 0;
257         enum ldlm_mode           mode;
258         int                      rc;
259         int repsize, repsize_estimate;
260
261         ENTRY;
262
263         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
264
265         /* XXX: openlock is not cancelled for cross-refs. */
266         /* If inode is known, cancel conflicting OPEN locks. */
267         if (fid_is_sane(&op_data->op_fid2)) {
268                 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
269                         if (it->it_flags & MDS_FMODE_WRITE)
270                                 mode = LCK_EX;
271                         else
272                                 mode = LCK_PR;
273                 } else {
274                         if (it->it_flags & (MDS_FMODE_WRITE | MDS_OPEN_TRUNC))
275                                 mode = LCK_CW;
276 #ifdef FMODE_EXEC
277                         else if (it->it_flags & FMODE_EXEC)
278                                 mode = LCK_PR;
279 #endif
280                         else
281                                 mode = LCK_CR;
282                 }
283                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
284                                                 &cancels, mode,
285                                                 MDS_INODELOCK_OPEN);
286         }
287
288         /* If CREATE, cancel parent's UPDATE lock. */
289         if (it->it_op & IT_CREAT)
290                 mode = LCK_EX;
291         else
292                 mode = LCK_CR;
293         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
294                                          &cancels, mode,
295                                          MDS_INODELOCK_UPDATE);
296
297         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
298                                    &RQF_LDLM_INTENT_OPEN);
299         if (req == NULL) {
300                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
301                 RETURN(ERR_PTR(-ENOMEM));
302         }
303
304         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
305                              op_data->op_namelen + 1);
306         if (cl_is_lov_delay_create(it->it_flags)) {
307                 /* open(O_LOV_DELAY_CREATE) won't pack lmm */
308                 LASSERT(lmmsize == 0);
309                 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
310         } else {
311                 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
312                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
313         }
314
315         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
316                              RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
317                              op_data->op_file_secctx_name_size : 0);
318
319         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
320                              op_data->op_file_secctx_size);
321
322         /* get SELinux policy info if any */
323         rc = sptlrpc_get_sepol(req);
324         if (rc < 0) {
325                 ptlrpc_request_free(req);
326                 RETURN(ERR_PTR(rc));
327         }
328         req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
329                              strlen(req->rq_sepol) ?
330                              strlen(req->rq_sepol) + 1 : 0);
331
332         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
333         if (rc < 0) {
334                 ptlrpc_request_free(req);
335                 RETURN(ERR_PTR(rc));
336         }
337
338         spin_lock(&req->rq_lock);
339         req->rq_replay = req->rq_import->imp_replayable;
340         spin_unlock(&req->rq_lock);
341
342         /* pack the intent */
343         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
344         lit->opc = (__u64)it->it_op;
345
346         /* pack the intended request */
347         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
348                       lmmsize);
349
350         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
351                              obddev->u.cli.cl_max_mds_easize);
352         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
353
354         if (!(it->it_op & IT_CREAT) && it->it_op & IT_OPEN &&
355             req_capsule_has_field(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
356                                   RCL_CLIENT) &&
357             op_data->op_file_secctx_name_size > 0 &&
358             op_data->op_file_secctx_name != NULL) {
359                 char *secctx_name;
360
361                 secctx_name = req_capsule_client_get(&req->rq_pill,
362                                                      &RMF_FILE_SECCTX_NAME);
363                 memcpy(secctx_name, op_data->op_file_secctx_name,
364                        op_data->op_file_secctx_name_size);
365                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
366                                      RCL_SERVER,
367                                      obddev->u.cli.cl_max_mds_easize);
368
369                 CDEBUG(D_SEC, "packed '%.*s' as security xattr name\n",
370                        op_data->op_file_secctx_name_size,
371                        op_data->op_file_secctx_name);
372
373         } else {
374                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
375                                      RCL_SERVER, 0);
376         }
377
378         /**
379          * Inline buffer for possible data from Data-on-MDT files.
380          */
381         req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE, RCL_SERVER,
382                              sizeof(struct niobuf_remote));
383         ptlrpc_request_set_replen(req);
384
385         /* Get real repbuf allocated size as rounded up power of 2 */
386         repsize = size_roundup_power2(req->rq_replen +
387                                       lustre_msg_early_size());
388         /* Estimate free space for DoM files in repbuf */
389         repsize_estimate = repsize - (req->rq_replen -
390                            obddev->u.cli.cl_max_mds_easize +
391                            sizeof(struct lov_comp_md_v1) +
392                            sizeof(struct lov_comp_md_entry_v1) +
393                            lov_mds_md_size(0, LOV_MAGIC_V3));
394
395         if (repsize_estimate < obddev->u.cli.cl_dom_min_inline_repsize) {
396                 repsize = obddev->u.cli.cl_dom_min_inline_repsize -
397                           repsize_estimate + sizeof(struct niobuf_remote);
398                 req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE,
399                                      RCL_SERVER,
400                                      sizeof(struct niobuf_remote) + repsize);
401                 ptlrpc_request_set_replen(req);
402                 CDEBUG(D_INFO, "Increase repbuf by %d bytes, total: %d\n",
403                        repsize, req->rq_replen);
404                 repsize = size_roundup_power2(req->rq_replen +
405                                               lustre_msg_early_size());
406         }
407         /* The only way to report real allocated repbuf size to the server
408          * is the lm_repsize but it must be set prior buffer allocation itself
409          * due to security reasons - it is part of buffer used in signature
410          * calculation (see LU-11414). Therefore the saved size is predicted
411          * value as rq_replen rounded to the next higher power of 2.
412          * Such estimation is safe. Though the final allocated buffer might
413          * be even larger, it is not possible to know that at this point.
414          */
415         req->rq_reqmsg->lm_repsize = repsize;
416         RETURN(req);
417 }
418
419 #define GA_DEFAULT_EA_NAME_LEN 20
420 #define GA_DEFAULT_EA_VAL_LEN  250
421 #define GA_DEFAULT_EA_NUM      10
422
423 static struct ptlrpc_request *
424 mdc_intent_getxattr_pack(struct obd_export *exp,
425                          struct lookup_intent *it,
426                          struct md_op_data *op_data)
427 {
428         struct ptlrpc_request   *req;
429         struct ldlm_intent      *lit;
430         int                     rc, count = 0;
431         struct list_head        cancels = LIST_HEAD_INIT(cancels);
432         u32 ea_vals_buf_size = GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM;
433
434         ENTRY;
435
436         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
437                                         &RQF_LDLM_INTENT_GETXATTR);
438         if (req == NULL)
439                 RETURN(ERR_PTR(-ENOMEM));
440
441         /* get SELinux policy info if any */
442         rc = sptlrpc_get_sepol(req);
443         if (rc < 0) {
444                 ptlrpc_request_free(req);
445                 RETURN(ERR_PTR(rc));
446         }
447         req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT,
448                              strlen(req->rq_sepol) ?
449                              strlen(req->rq_sepol) + 1 : 0);
450
451         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
452         if (rc) {
453                 ptlrpc_request_free(req);
454                 RETURN(ERR_PTR(rc));
455         }
456
457         /* pack the intent */
458         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
459         lit->opc = IT_GETXATTR;
460         CDEBUG(D_INFO, "%s: get xattrs for "DFID"\n",
461                exp->exp_obd->obd_name, PFID(&op_data->op_fid1));
462
463 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 0, 53, 0)
464         /* If the supplied buffer is too small then the server will
465          * return -ERANGE and llite will fallback to using non cached
466          * xattr operations. On servers before 2.10.1 a (non-cached)
467          * listxattr RPC for an orphan or dead file causes an oops. So
468          * let's try to avoid sending too small a buffer to too old a
469          * server. This is effectively undoing the memory conservation
470          * of LU-9417 when it would be *more* likely to crash the
471          * server. See LU-9856. */
472         if (exp->exp_connect_data.ocd_version < OBD_OCD_VERSION(2, 10, 1, 0))
473                 ea_vals_buf_size = max_t(u32, ea_vals_buf_size,
474                                          exp->exp_connect_data.ocd_max_easize);
475 #endif
476
477         /* pack the intended request */
478         mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid,
479                       ea_vals_buf_size, -1, 0);
480
481         /* get SELinux policy info if any */
482         mdc_file_sepol_pack(req);
483
484         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
485                              GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM);
486
487         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER,
488                              ea_vals_buf_size);
489
490         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS, RCL_SERVER,
491                              sizeof(u32) * GA_DEFAULT_EA_NUM);
492
493         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
494
495         ptlrpc_request_set_replen(req);
496
497         RETURN(req);
498 }
499
500 static struct ptlrpc_request *
501 mdc_intent_getattr_pack(struct obd_export *exp, struct lookup_intent *it,
502                         struct md_op_data *op_data, __u32 acl_bufsize)
503 {
504         struct ptlrpc_request   *req;
505         struct obd_device       *obddev = class_exp2obd(exp);
506         u64                      valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
507                                          OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
508                                          OBD_MD_MEA | OBD_MD_FLACL;
509         struct ldlm_intent      *lit;
510         int                      rc;
511         __u32                    easize;
512         bool                     have_secctx = false;
513         ENTRY;
514
515         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
516                                    &RQF_LDLM_INTENT_GETATTR);
517         if (req == NULL)
518                 RETURN(ERR_PTR(-ENOMEM));
519
520         /* send name of security xattr to get upon intent */
521         if (it->it_op & (IT_LOOKUP | IT_GETATTR) &&
522             req_capsule_has_field(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
523                                   RCL_CLIENT) &&
524             op_data->op_file_secctx_name_size > 0 &&
525             op_data->op_file_secctx_name != NULL) {
526                 have_secctx = true;
527                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
528                                      RCL_CLIENT,
529                                      op_data->op_file_secctx_name_size);
530         }
531
532         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
533                              op_data->op_namelen + 1);
534
535         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
536         if (rc) {
537                 ptlrpc_request_free(req);
538                 RETURN(ERR_PTR(rc));
539         }
540
541         /* pack the intent */
542         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
543         lit->opc = (__u64)it->it_op;
544
545         if (obddev->u.cli.cl_default_mds_easize > 0)
546                 easize = obddev->u.cli.cl_default_mds_easize;
547         else
548                 easize = obddev->u.cli.cl_max_mds_easize;
549
550         /* pack the intended request */
551         mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
552
553         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
554         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
555
556         if (have_secctx) {
557                 char *secctx_name;
558
559                 secctx_name = req_capsule_client_get(&req->rq_pill,
560                                                      &RMF_FILE_SECCTX_NAME);
561                 memcpy(secctx_name, op_data->op_file_secctx_name,
562                        op_data->op_file_secctx_name_size);
563
564                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
565                                      RCL_SERVER, easize);
566
567                 CDEBUG(D_SEC, "packed '%.*s' as security xattr name\n",
568                        op_data->op_file_secctx_name_size,
569                        op_data->op_file_secctx_name);
570         } else {
571                 req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX,
572                                      RCL_SERVER, 0);
573         }
574
575         ptlrpc_request_set_replen(req);
576         RETURN(req);
577 }
578
579 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
580                                                      struct lookup_intent *it,
581                                                      struct md_op_data *op_data)
582 {
583         struct obd_device     *obd = class_exp2obd(exp);
584         struct ptlrpc_request *req;
585         struct ldlm_intent    *lit;
586         struct layout_intent  *layout;
587         int rc;
588         ENTRY;
589
590         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
591                                 &RQF_LDLM_INTENT_LAYOUT);
592         if (req == NULL)
593                 RETURN(ERR_PTR(-ENOMEM));
594
595         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
596         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
597         if (rc) {
598                 ptlrpc_request_free(req);
599                 RETURN(ERR_PTR(rc));
600         }
601
602         /* pack the intent */
603         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
604         lit->opc = (__u64)it->it_op;
605
606         /* pack the layout intent request */
607         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
608         LASSERT(op_data->op_data != NULL);
609         LASSERT(op_data->op_data_size == sizeof(*layout));
610         memcpy(layout, op_data->op_data, sizeof(*layout));
611
612         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
613                              obd->u.cli.cl_default_mds_easize);
614         ptlrpc_request_set_replen(req);
615         RETURN(req);
616 }
617
618 static struct ptlrpc_request *
619 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
620 {
621         struct ptlrpc_request *req;
622         int rc;
623         ENTRY;
624
625         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
626         if (req == NULL)
627                 RETURN(ERR_PTR(-ENOMEM));
628
629         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
630         if (rc) {
631                 ptlrpc_request_free(req);
632                 RETURN(ERR_PTR(rc));
633         }
634
635         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
636         ptlrpc_request_set_replen(req);
637         RETURN(req);
638 }
639
640 static int mdc_finish_enqueue(struct obd_export *exp,
641                               struct ptlrpc_request *req,
642                               struct ldlm_enqueue_info *einfo,
643                               struct lookup_intent *it,
644                               struct lustre_handle *lockh,
645                               int rc)
646 {
647         struct req_capsule  *pill = &req->rq_pill;
648         struct ldlm_request *lockreq;
649         struct ldlm_reply   *lockrep;
650         struct ldlm_lock    *lock;
651         struct mdt_body     *body = NULL;
652         void                *lvb_data = NULL;
653         __u32                lvb_len = 0;
654
655         ENTRY;
656
657         LASSERT(rc >= 0);
658         /* Similarly, if we're going to replay this request, we don't want to
659          * actually get a lock, just perform the intent. */
660         if (req->rq_transno || req->rq_replay) {
661                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
662                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
663         }
664
665         if (rc == ELDLM_LOCK_ABORTED) {
666                 einfo->ei_mode = 0;
667                 memset(lockh, 0, sizeof(*lockh));
668                 rc = 0;
669         } else { /* rc = 0 */
670                 lock = ldlm_handle2lock(lockh);
671                 LASSERT(lock != NULL);
672
673                 /* If the server gave us back a different lock mode, we should
674                  * fix up our variables. */
675                 if (lock->l_req_mode != einfo->ei_mode) {
676                         ldlm_lock_addref(lockh, lock->l_req_mode);
677                         ldlm_lock_decref(lockh, einfo->ei_mode);
678                         einfo->ei_mode = lock->l_req_mode;
679                 }
680                 LDLM_LOCK_PUT(lock);
681         }
682
683         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
684         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
685
686         it->it_disposition = (int)lockrep->lock_policy_res1;
687         it->it_status = (int)lockrep->lock_policy_res2;
688         it->it_lock_mode = einfo->ei_mode;
689         it->it_lock_handle = lockh->cookie;
690         it->it_request = req;
691
692         /* Technically speaking rq_transno must already be zero if
693          * it_status is in error, so the check is a bit redundant */
694         if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
695                 mdc_clear_replay_flag(req, it->it_status);
696
697         /* If we're doing an IT_OPEN which did not result in an actual
698          * successful open, then we need to remove the bit which saves
699          * this request for unconditional replay.
700          *
701          * It's important that we do this first!  Otherwise we might exit the
702          * function without doing so, and try to replay a failed create
703          * (bug 3440) */
704         if (it->it_op & IT_OPEN && req->rq_replay &&
705             (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
706                 mdc_clear_replay_flag(req, it->it_status);
707
708         DEBUG_REQ(D_RPCTRACE, req, "op: %x disposition: %x, status: %d",
709                   it->it_op, it->it_disposition, it->it_status);
710
711         /* We know what to expect, so we do any byte flipping required here */
712         if (it_has_reply_body(it)) {
713                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
714                 if (body == NULL) {
715                         CERROR ("Can't swab mdt_body\n");
716                         RETURN (-EPROTO);
717                 }
718
719                 if (it_disposition(it, DISP_OPEN_OPEN) &&
720                     !it_open_error(DISP_OPEN_OPEN, it)) {
721                         /*
722                          * If this is a successful OPEN request, we need to set
723                          * replay handler and data early, so that if replay
724                          * happens immediately after swabbing below, new reply
725                          * is swabbed by that handler correctly.
726                          */
727                         mdc_set_open_replay_data(NULL, NULL, it);
728                 }
729
730                 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
731                         void *eadata;
732
733                         mdc_update_max_ea_from_body(exp, body);
734
735                         /*
736                          * The eadata is opaque; just check that it is there.
737                          * Eventually, obd_unpackmd() will check the contents.
738                          */
739                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
740                                                         body->mbo_eadatasize);
741                         if (eadata == NULL)
742                                 RETURN(-EPROTO);
743
744                         /* save lvb data and length in case this is for layout
745                          * lock */
746                         lvb_data = eadata;
747                         lvb_len = body->mbo_eadatasize;
748
749                         /*
750                          * We save the reply LOV EA in case we have to replay a
751                          * create for recovery.  If we didn't allocate a large
752                          * enough request buffer above we need to reallocate it
753                          * here to hold the actual LOV EA.
754                          *
755                          * To not save LOV EA if request is not going to replay
756                          * (for example error one).
757                          */
758                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
759                                 rc = mdc_save_lovea(req, &RMF_EADATA, eadata,
760                                                     body->mbo_eadatasize);
761                                 if (rc) {
762                                         body->mbo_valid &= ~OBD_MD_FLEASIZE;
763                                         body->mbo_eadatasize = 0;
764                                         rc = 0;
765                                 }
766                         }
767                 }
768         } else if (it->it_op & IT_LAYOUT) {
769                 /* maybe the lock was granted right away and layout
770                  * is packed into RMF_DLM_LVB of req */
771                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
772                 CDEBUG(D_INFO, "%s: layout return lvb %d transno %lld\n",
773                        class_exp2obd(exp)->obd_name, lvb_len, req->rq_transno);
774                 if (lvb_len > 0) {
775                         lvb_data = req_capsule_server_sized_get(pill,
776                                                         &RMF_DLM_LVB, lvb_len);
777                         if (lvb_data == NULL)
778                                 RETURN(-EPROTO);
779
780                         /**
781                          * save replied layout data to the request buffer for
782                          * recovery consideration (lest MDS reinitialize
783                          * another set of OST objects).
784                          */
785                         if (req->rq_transno)
786                                 (void)mdc_save_lovea(req, &RMF_EADATA, lvb_data,
787                                                      lvb_len);
788                 }
789         }
790
791         /* fill in stripe data for layout lock.
792          * LU-6581: trust layout data only if layout lock is granted. The MDT
793          * has stopped sending layout unless the layout lock is granted. The
794          * client still does this checking in case it's talking with an old
795          * server. - Jinshan */
796         lock = ldlm_handle2lock(lockh);
797         if (lock == NULL)
798                 RETURN(rc);
799
800         if (ldlm_has_layout(lock) && lvb_data != NULL &&
801             !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
802                 void *lmm;
803
804                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
805                         ldlm_it2str(it->it_op), lvb_len);
806
807                 OBD_ALLOC_LARGE(lmm, lvb_len);
808                 if (lmm == NULL)
809                         GOTO(out_lock, rc = -ENOMEM);
810
811                 memcpy(lmm, lvb_data, lvb_len);
812
813                 /* install lvb_data */
814                 lock_res_and_lock(lock);
815                 if (lock->l_lvb_data == NULL) {
816                         lock->l_lvb_type = LVB_T_LAYOUT;
817                         lock->l_lvb_data = lmm;
818                         lock->l_lvb_len = lvb_len;
819                         lmm = NULL;
820                 }
821                 unlock_res_and_lock(lock);
822                 if (lmm != NULL)
823                         OBD_FREE_LARGE(lmm, lvb_len);
824         }
825
826         if (ldlm_has_dom(lock)) {
827                 LASSERT(lock->l_glimpse_ast == mdc_ldlm_glimpse_ast);
828
829                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
830                 if (!(body->mbo_valid & OBD_MD_DOM_SIZE)) {
831                         LDLM_ERROR(lock, "%s: DoM lock without size.",
832                                    exp->exp_obd->obd_name);
833                         GOTO(out_lock, rc = -EPROTO);
834                 }
835
836                 LDLM_DEBUG(lock, "DoM lock is returned by: %s, size: %llu",
837                            ldlm_it2str(it->it_op), body->mbo_dom_size);
838
839                 rc = mdc_fill_lvb(req, &lock->l_ost_lvb);
840         }
841 out_lock:
842         LDLM_LOCK_PUT(lock);
843
844         RETURN(rc);
845 }
846
847 /* We always reserve enough space in the reply packet for a stripe MD, because
848  * we don't know in advance the file type. */
849 static int mdc_enqueue_base(struct obd_export *exp,
850                             struct ldlm_enqueue_info *einfo,
851                             const union ldlm_policy_data *policy,
852                             struct lookup_intent *it,
853                             struct md_op_data *op_data,
854                             struct lustre_handle *lockh,
855                             __u64 extra_lock_flags)
856 {
857         struct obd_device *obddev = class_exp2obd(exp);
858         struct ptlrpc_request *req = NULL;
859         __u64 flags, saved_flags = extra_lock_flags;
860         struct ldlm_res_id res_id;
861         static const union ldlm_policy_data lookup_policy = {
862                                   .l_inodebits = { MDS_INODELOCK_LOOKUP } };
863         static const union ldlm_policy_data update_policy = {
864                                   .l_inodebits = { MDS_INODELOCK_UPDATE } };
865         static const union ldlm_policy_data layout_policy = {
866                                   .l_inodebits = { MDS_INODELOCK_LAYOUT } };
867         static const union ldlm_policy_data getxattr_policy = {
868                                   .l_inodebits = { MDS_INODELOCK_XATTR } };
869         int generation, resends = 0;
870         struct ldlm_reply *lockrep;
871         struct obd_import *imp = class_exp2cliimp(exp);
872         __u32 acl_bufsize;
873         enum lvb_type lvb_type = 0;
874         int rc;
875         ENTRY;
876
877         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
878                  einfo->ei_type);
879         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
880
881         if (it != NULL) {
882                 LASSERT(policy == NULL);
883
884                 saved_flags |= LDLM_FL_HAS_INTENT;
885                 if (it->it_op & (IT_GETATTR | IT_READDIR))
886                         policy = &update_policy;
887                 else if (it->it_op & IT_LAYOUT)
888                         policy = &layout_policy;
889                 else if (it->it_op & IT_GETXATTR)
890                         policy = &getxattr_policy;
891                 else
892                         policy = &lookup_policy;
893         }
894
895         generation = obddev->u.cli.cl_import->imp_generation;
896         if (!it || (it->it_op & (IT_OPEN | IT_CREAT)))
897                 acl_bufsize = MIN(imp->imp_connect_data.ocd_max_easize,
898                                   XATTR_SIZE_MAX);
899         else
900                 acl_bufsize = LUSTRE_POSIX_ACL_MAX_SIZE_OLD;
901
902 resend:
903         flags = saved_flags;
904         if (it == NULL) {
905                 /* The only way right now is FLOCK. */
906                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
907                          einfo->ei_type);
908                 res_id.name[3] = LDLM_FLOCK;
909         } else if (it->it_op & IT_OPEN) {
910                 req = mdc_intent_open_pack(exp, it, op_data, acl_bufsize);
911         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
912                 req = mdc_intent_getattr_pack(exp, it, op_data, acl_bufsize);
913         } else if (it->it_op & IT_READDIR) {
914                 req = mdc_enqueue_pack(exp, 0);
915         } else if (it->it_op & IT_LAYOUT) {
916                 if (!imp_connect_lvb_type(imp))
917                         RETURN(-EOPNOTSUPP);
918                 req = mdc_intent_layout_pack(exp, it, op_data);
919                 lvb_type = LVB_T_LAYOUT;
920         } else if (it->it_op & IT_GETXATTR) {
921                 req = mdc_intent_getxattr_pack(exp, it, op_data);
922         } else {
923                 LBUG();
924                 RETURN(-EINVAL);
925         }
926
927         if (IS_ERR(req))
928                 RETURN(PTR_ERR(req));
929
930         if (resends) {
931                 req->rq_generation_set = 1;
932                 req->rq_import_generation = generation;
933                 req->rq_sent = ktime_get_real_seconds() + resends;
934         }
935
936         /* It is important to obtain modify RPC slot first (if applicable), so
937          * that threads that are waiting for a modify RPC slot are not polluting
938          * our rpcs in flight counter.
939          * We do not do flock request limiting, though */
940         if (it) {
941                 mdc_get_mod_rpc_slot(req, it);
942                 rc = obd_get_request_slot(&obddev->u.cli);
943                 if (rc != 0) {
944                         mdc_put_mod_rpc_slot(req, it);
945                         mdc_clear_replay_flag(req, 0);
946                         ptlrpc_req_finished(req);
947                         RETURN(rc);
948                 }
949         }
950
951         /* With Data-on-MDT the glimpse callback is needed too.
952          * It is set here in advance but not in mdc_finish_enqueue()
953          * to avoid possible races. It is safe to have glimpse handler
954          * for non-DOM locks and costs nothing.*/
955         if (einfo->ei_cb_gl == NULL)
956                 einfo->ei_cb_gl = mdc_ldlm_glimpse_ast;
957
958         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
959                               0, lvb_type, lockh, 0);
960         if (!it) {
961                 /* For flock requests we immediatelly return without further
962                    delay and let caller deal with the rest, since rest of
963                    this function metadata processing makes no sense for flock
964                    requests anyway. But in case of problem during comms with
965                    Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
966                    can not rely on caller and this mainly for F_UNLCKs
967                    (explicits or automatically generated by Kernel to clean
968                    current FLocks upon exit) that can't be trashed */
969                 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
970                     (einfo->ei_type == LDLM_FLOCK) &&
971                     (einfo->ei_mode == LCK_NL))
972                         goto resend;
973                 RETURN(rc);
974         }
975
976         obd_put_request_slot(&obddev->u.cli);
977         mdc_put_mod_rpc_slot(req, it);
978
979         if (rc < 0) {
980                 CDEBUG(D_INFO,
981                       "%s: ldlm_cli_enqueue "DFID":"DFID"=%s failed: rc = %d\n",
982                       obddev->obd_name, PFID(&op_data->op_fid1),
983                       PFID(&op_data->op_fid2), op_data->op_name ?: "", rc);
984
985                 mdc_clear_replay_flag(req, rc);
986                 ptlrpc_req_finished(req);
987                 RETURN(rc);
988         }
989
990         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
991         LASSERT(lockrep != NULL);
992
993         lockrep->lock_policy_res2 =
994                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
995
996         /* Retry infinitely when the server returns -EINPROGRESS for the
997          * intent operation, when server returns -EINPROGRESS for acquiring
998          * intent lock, we'll retry in after_reply(). */
999         if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
1000                 mdc_clear_replay_flag(req, rc);
1001                 ptlrpc_req_finished(req);
1002                 if (generation == obddev->u.cli.cl_import->imp_generation) {
1003                         if (signal_pending(current))
1004                                 RETURN(-EINTR);
1005
1006                         resends++;
1007                         CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
1008                                obddev->obd_name, resends, it->it_op,
1009                                PFID(&op_data->op_fid1),
1010                                PFID(&op_data->op_fid2));
1011                         goto resend;
1012                 } else {
1013                         CDEBUG(D_HA, "resend cross eviction\n");
1014                         RETURN(-EIO);
1015                 }
1016         }
1017
1018         if ((int)lockrep->lock_policy_res2 == -ERANGE &&
1019             it->it_op & (IT_OPEN | IT_GETATTR | IT_LOOKUP) &&
1020             acl_bufsize == LUSTRE_POSIX_ACL_MAX_SIZE_OLD) {
1021                 mdc_clear_replay_flag(req, -ERANGE);
1022                 ptlrpc_req_finished(req);
1023                 acl_bufsize = MIN(imp->imp_connect_data.ocd_max_easize,
1024                                   XATTR_SIZE_MAX);
1025                 goto resend;
1026         }
1027
1028         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1029         if (rc < 0) {
1030                 if (lustre_handle_is_used(lockh)) {
1031                         ldlm_lock_decref(lockh, einfo->ei_mode);
1032                         memset(lockh, 0, sizeof(*lockh));
1033                 }
1034                 ptlrpc_req_finished(req);
1035
1036                 it->it_lock_handle = 0;
1037                 it->it_lock_mode = 0;
1038                 it->it_request = NULL;
1039         }
1040
1041         RETURN(rc);
1042 }
1043
1044 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
1045                 const union ldlm_policy_data *policy,
1046                 struct md_op_data *op_data,
1047                 struct lustre_handle *lockh, __u64 extra_lock_flags)
1048 {
1049         return mdc_enqueue_base(exp, einfo, policy, NULL,
1050                                 op_data, lockh, extra_lock_flags);
1051 }
1052
1053 static int mdc_finish_intent_lock(struct obd_export *exp,
1054                                   struct ptlrpc_request *request,
1055                                   struct md_op_data *op_data,
1056                                   struct lookup_intent *it,
1057                                   struct lustre_handle *lockh)
1058 {
1059         struct lustre_handle old_lock;
1060         struct ldlm_lock *lock;
1061         int rc = 0;
1062         ENTRY;
1063
1064         LASSERT(request != NULL);
1065         LASSERT(request != LP_POISON);
1066         LASSERT(request->rq_repmsg != LP_POISON);
1067
1068         if (it->it_op & IT_READDIR)
1069                 RETURN(0);
1070
1071         if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) {
1072                 if (it->it_status != 0)
1073                         GOTO(out, rc = it->it_status);
1074         } else {
1075                 if (!it_disposition(it, DISP_IT_EXECD)) {
1076                         /* The server failed before it even started executing
1077                          * the intent, i.e. because it couldn't unpack the
1078                          * request.
1079                          */
1080                         LASSERT(it->it_status != 0);
1081                         GOTO(out, rc = it->it_status);
1082                 }
1083                 rc = it_open_error(DISP_IT_EXECD, it);
1084                 if (rc)
1085                         GOTO(out, rc);
1086
1087                 rc = it_open_error(DISP_LOOKUP_EXECD, it);
1088                 if (rc)
1089                         GOTO(out, rc);
1090
1091                 /* keep requests around for the multiple phases of the call
1092                  * this shows the DISP_XX must guarantee we make it into the
1093                  * call
1094                  */
1095                 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
1096                     it_disposition(it, DISP_OPEN_CREATE) &&
1097                     !it_open_error(DISP_OPEN_CREATE, it)) {
1098                         it_set_disposition(it, DISP_ENQ_CREATE_REF);
1099                         /* balanced in ll_create_node */
1100                         ptlrpc_request_addref(request);
1101                 }
1102                 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1103                     it_disposition(it, DISP_OPEN_OPEN) &&
1104                     !it_open_error(DISP_OPEN_OPEN, it)) {
1105                         it_set_disposition(it, DISP_ENQ_OPEN_REF);
1106                         /* balanced in ll_file_open */
1107                         ptlrpc_request_addref(request);
1108                         /* BUG 11546 - eviction in the middle of open rpc
1109                          * processing
1110                          */
1111                         OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE,
1112                                          obd_timeout);
1113                 }
1114
1115                 if (it->it_op & IT_CREAT) {
1116                         /* XXX this belongs in ll_create_it */
1117                 } else if (it->it_op == IT_OPEN) {
1118                         LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1119                 } else {
1120                         LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
1121                 }
1122         }
1123
1124         /* If we already have a matching lock, then cancel the new
1125          * one.  We have to set the data here instead of in
1126          * mdc_enqueue, because we need to use the child's inode as
1127          * the l_ast_data to match, and that's not available until
1128          * intent_finish has performed the iget().) */
1129         lock = ldlm_handle2lock(lockh);
1130         if (lock) {
1131                 union ldlm_policy_data policy = lock->l_policy_data;
1132                 LDLM_DEBUG(lock, "matching against this");
1133
1134                 if (it_has_reply_body(it)) {
1135                         struct mdt_body *body;
1136
1137                         body = req_capsule_server_get(&request->rq_pill,
1138                                                       &RMF_MDT_BODY);
1139                         /* mdc_enqueue checked */
1140                         LASSERT(body != NULL);
1141                         LASSERTF(fid_res_name_eq(&body->mbo_fid1,
1142                                                  &lock->l_resource->lr_name),
1143                                  "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1144                                  PLDLMRES(lock->l_resource),
1145                                  PFID(&body->mbo_fid1));
1146                 }
1147                 LDLM_LOCK_PUT(lock);
1148
1149                 memcpy(&old_lock, lockh, sizeof(*lockh));
1150                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1151                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1152                         ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
1153                         memcpy(lockh, &old_lock, sizeof(old_lock));
1154                         it->it_lock_handle = lockh->cookie;
1155                 }
1156         }
1157
1158         EXIT;
1159 out:
1160         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1161                 (int)op_data->op_namelen, op_data->op_name,
1162                 ldlm_it2str(it->it_op), it->it_status,
1163                 it->it_disposition, rc);
1164         return rc;
1165 }
1166
1167 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1168                         struct lu_fid *fid, __u64 *bits)
1169 {
1170         /* We could just return 1 immediately, but since we should only
1171          * be called in revalidate_it if we already have a lock, let's
1172          * verify that. */
1173         struct ldlm_res_id res_id;
1174         struct lustre_handle lockh;
1175         union ldlm_policy_data policy;
1176         enum ldlm_mode mode;
1177         ENTRY;
1178
1179         if (it->it_lock_handle) {
1180                 lockh.cookie = it->it_lock_handle;
1181                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1182         } else {
1183                 fid_build_reg_res_name(fid, &res_id);
1184                 switch (it->it_op) {
1185                 case IT_GETATTR:
1186                         /* File attributes are held under multiple bits:
1187                          * nlink is under lookup lock, size and times are
1188                          * under UPDATE lock and recently we've also got
1189                          * a separate permissions lock for owner/group/acl that
1190                          * were protected by lookup lock before.
1191                          * Getattr must provide all of that information,
1192                          * so we need to ensure we have all of those locks.
1193                          * Unfortunately, if the bits are split across multiple
1194                          * locks, there's no easy way to match all of them here,
1195                          * so an extra RPC would be performed to fetch all
1196                          * of those bits at once for now. */
1197                         /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1198                          * but for old MDTs (< 2.4), permission is covered
1199                          * by LOOKUP lock, so it needs to match all bits here.*/
1200                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1201                                                   MDS_INODELOCK_LOOKUP |
1202                                                   MDS_INODELOCK_PERM;
1203                         break;
1204                 case IT_READDIR:
1205                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1206                         break;
1207                 case IT_LAYOUT:
1208                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1209                         break;
1210                 default:
1211                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1212                         break;
1213                 }
1214
1215                 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1216                                       LDLM_IBITS, &policy,
1217                                       LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1218                                       &lockh);
1219         }
1220
1221         if (mode) {
1222                 it->it_lock_handle = lockh.cookie;
1223                 it->it_lock_mode = mode;
1224         } else {
1225                 it->it_lock_handle = 0;
1226                 it->it_lock_mode = 0;
1227         }
1228
1229         RETURN(!!mode);
1230 }
1231
1232 /*
1233  * This long block is all about fixing up the lock and request state
1234  * so that it is correct as of the moment _before_ the operation was
1235  * applied; that way, the VFS will think that everything is normal and
1236  * call Lustre's regular VFS methods.
1237  *
1238  * If we're performing a creation, that means that unless the creation
1239  * failed with EEXIST, we should fake up a negative dentry.
1240  *
1241  * For everything else, we want to lookup to succeed.
1242  *
1243  * One additional note: if CREATE or OPEN succeeded, we add an extra
1244  * reference to the request because we need to keep it around until
1245  * ll_create/ll_open gets called.
1246  *
1247  * The server will return to us, in it_disposition, an indication of
1248  * exactly what it_status refers to.
1249  *
1250  * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1251  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1252  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1253  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1254  * was successful.
1255  *
1256  * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1257  * child lookup.
1258  */
1259 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1260                     struct lookup_intent *it, struct ptlrpc_request **reqp,
1261                     ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1262 {
1263         struct ldlm_enqueue_info einfo = {
1264                 .ei_type        = LDLM_IBITS,
1265                 .ei_mode        = it_to_lock_mode(it),
1266                 .ei_cb_bl       = cb_blocking,
1267                 .ei_cb_cp       = ldlm_completion_ast,
1268                 .ei_cb_gl       = mdc_ldlm_glimpse_ast,
1269         };
1270         struct lustre_handle lockh;
1271         int rc = 0;
1272         ENTRY;
1273         LASSERT(it);
1274
1275         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1276                 ", intent: %s flags %#llo\n", (int)op_data->op_namelen,
1277                 op_data->op_name, PFID(&op_data->op_fid2),
1278                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1279                 it->it_flags);
1280
1281         lockh.cookie = 0;
1282         if (fid_is_sane(&op_data->op_fid2) &&
1283             (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1284                 /* We could just return 1 immediately, but since we should only
1285                  * be called in revalidate_it if we already have a lock, let's
1286                  * verify that. */
1287                 it->it_lock_handle = 0;
1288                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1289                 /* Only return failure if it was not GETATTR by cfid
1290                    (from inode_revalidate) */
1291                 if (rc || op_data->op_namelen != 0)
1292                         RETURN(rc);
1293         }
1294
1295         /* For case if upper layer did not alloc fid, do it now. */
1296         if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1297                 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1298                 if (rc < 0) {
1299                         CERROR("Can't alloc new fid, rc %d\n", rc);
1300                         RETURN(rc);
1301                 }
1302         }
1303
1304         rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh,
1305                               extra_lock_flags);
1306         if (rc < 0)
1307                 RETURN(rc);
1308
1309         *reqp = it->it_request;
1310         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1311         RETURN(rc);
1312 }
1313
1314 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1315                                               struct ptlrpc_request *req,
1316                                               void *args, int rc)
1317 {
1318         struct mdc_getattr_args  *ga = args;
1319         struct obd_export *exp = ga->ga_exp;
1320         struct md_enqueue_info *minfo = ga->ga_minfo;
1321         struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1322         struct lookup_intent *it;
1323         struct lustre_handle *lockh;
1324         struct obd_device *obddev;
1325         struct ldlm_reply *lockrep;
1326         __u64 flags = LDLM_FL_HAS_INTENT;
1327         ENTRY;
1328
1329         it    = &minfo->mi_it;
1330         lockh = &minfo->mi_lockh;
1331
1332         obddev = class_exp2obd(exp);
1333
1334         obd_put_request_slot(&obddev->u.cli);
1335         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1336                 rc = -ETIMEDOUT;
1337
1338         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1339                                    &flags, NULL, 0, lockh, rc);
1340         if (rc < 0) {
1341                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1342                 mdc_clear_replay_flag(req, rc);
1343                 GOTO(out, rc);
1344         }
1345
1346         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1347         LASSERT(lockrep != NULL);
1348
1349         lockrep->lock_policy_res2 =
1350                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1351
1352         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1353         if (rc)
1354                 GOTO(out, rc);
1355
1356         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1357         EXIT;
1358
1359 out:
1360         minfo->mi_cb(req, minfo, rc);
1361         return 0;
1362 }
1363
1364 int mdc_intent_getattr_async(struct obd_export *exp,
1365                              struct md_enqueue_info *minfo)
1366 {
1367         struct md_op_data       *op_data = &minfo->mi_data;
1368         struct lookup_intent    *it = &minfo->mi_it;
1369         struct ptlrpc_request   *req;
1370         struct mdc_getattr_args *ga;
1371         struct obd_device       *obddev = class_exp2obd(exp);
1372         struct ldlm_res_id       res_id;
1373         union ldlm_policy_data policy = {
1374                                 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1375                                                  MDS_INODELOCK_UPDATE } };
1376         int                      rc = 0;
1377         __u64                    flags = LDLM_FL_HAS_INTENT;
1378         ENTRY;
1379
1380         CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#llo\n",
1381                 (int)op_data->op_namelen, op_data->op_name,
1382                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1383
1384         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1385         /* If the MDT return -ERANGE because of large ACL, then the sponsor
1386          * of the async getattr RPC will handle that by itself. */
1387         req = mdc_intent_getattr_pack(exp, it, op_data,
1388                                       LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
1389         if (IS_ERR(req))
1390                 RETURN(PTR_ERR(req));
1391
1392         rc = obd_get_request_slot(&obddev->u.cli);
1393         if (rc != 0) {
1394                 ptlrpc_req_finished(req);
1395                 RETURN(rc);
1396         }
1397
1398         /* With Data-on-MDT the glimpse callback is needed too.
1399          * It is set here in advance but not in mdc_finish_enqueue()
1400          * to avoid possible races. It is safe to have glimpse handler
1401          * for non-DOM locks and costs nothing.*/
1402         if (minfo->mi_einfo.ei_cb_gl == NULL)
1403                 minfo->mi_einfo.ei_cb_gl = mdc_ldlm_glimpse_ast;
1404
1405         rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1406                               &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1407         if (rc < 0) {
1408                 obd_put_request_slot(&obddev->u.cli);
1409                 ptlrpc_req_finished(req);
1410                 RETURN(rc);
1411         }
1412
1413         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1414         ga = ptlrpc_req_async_args(req);
1415         ga->ga_exp = exp;
1416         ga->ga_minfo = minfo;
1417
1418         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1419         ptlrpcd_add_req(req);
1420
1421         RETURN(0);
1422 }