Whamcloud - gitweb
LU-11835 mdt: return DOM size on open resend
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_MDC
34
35 #include <linux/module.h>
36
37 #include <obd.h>
38 #include <obd_class.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_intent.h>
42 #include <lustre_mdc.h>
43 #include <lustre_net.h>
44 #include <lustre_req_layout.h>
45 #include <lustre_swab.h>
46 #include <lustre_acl.h>
47
48 #include "mdc_internal.h"
49
50 struct mdc_getattr_args {
51         struct obd_export               *ga_exp;
52         struct md_enqueue_info          *ga_minfo;
53 };
54
55 int it_open_error(int phase, struct lookup_intent *it)
56 {
57         if (it_disposition(it, DISP_OPEN_LEASE)) {
58                 if (phase >= DISP_OPEN_LEASE)
59                         return it->it_status;
60                 else
61                         return 0;
62         }
63         if (it_disposition(it, DISP_OPEN_OPEN)) {
64                 if (phase >= DISP_OPEN_OPEN)
65                         return it->it_status;
66                 else
67                         return 0;
68         }
69
70         if (it_disposition(it, DISP_OPEN_CREATE)) {
71                 if (phase >= DISP_OPEN_CREATE)
72                         return it->it_status;
73                 else
74                         return 0;
75         }
76
77         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
78                 if (phase >= DISP_LOOKUP_EXECD)
79                         return it->it_status;
80                 else
81                         return 0;
82         }
83
84         if (it_disposition(it, DISP_IT_EXECD)) {
85                 if (phase >= DISP_IT_EXECD)
86                         return it->it_status;
87                 else
88                         return 0;
89         }
90
91         CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
92         LBUG();
93
94         return 0;
95 }
96 EXPORT_SYMBOL(it_open_error);
97
98 /* this must be called on a lockh that is known to have a referenced lock */
99 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
100                       void *data, __u64 *bits)
101 {
102         struct ldlm_lock *lock;
103         struct inode *new_inode = data;
104         ENTRY;
105
106         if(bits)
107                 *bits = 0;
108
109         if (!lustre_handle_is_used(lockh))
110                 RETURN(0);
111
112         lock = ldlm_handle2lock(lockh);
113
114         LASSERT(lock != NULL);
115         lock_res_and_lock(lock);
116         if (lock->l_resource->lr_lvb_inode &&
117             lock->l_resource->lr_lvb_inode != data) {
118                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
119                 LASSERTF(old_inode->i_state & I_FREEING,
120                          "Found existing inode %p/%lu/%u state %lu in lock: "
121                          "setting data to %p/%lu/%u\n", old_inode,
122                          old_inode->i_ino, old_inode->i_generation,
123                          old_inode->i_state,
124                          new_inode, new_inode->i_ino, new_inode->i_generation);
125         }
126         lock->l_resource->lr_lvb_inode = new_inode;
127         if (bits)
128                 *bits = lock->l_policy_data.l_inodebits.bits;
129
130         unlock_res_and_lock(lock);
131         LDLM_LOCK_PUT(lock);
132
133         RETURN(0);
134 }
135
136 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
137                               const struct lu_fid *fid, enum ldlm_type type,
138                               union ldlm_policy_data *policy,
139                               enum ldlm_mode mode, struct lustre_handle *lockh)
140 {
141         struct ldlm_res_id res_id;
142         enum ldlm_mode rc;
143         ENTRY;
144
145         fid_build_reg_res_name(fid, &res_id);
146         /* LU-4405: Clear bits not supported by server */
147         policy->l_inodebits.bits &= exp_connect_ibits(exp);
148         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
149                              &res_id, type, policy, mode, lockh, 0);
150         RETURN(rc);
151 }
152
153 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
154                       union ldlm_policy_data *policy, enum ldlm_mode mode,
155                       enum ldlm_cancel_flags flags, void *opaque)
156 {
157         struct obd_device *obd = class_exp2obd(exp);
158         struct ldlm_res_id res_id;
159         int rc;
160
161         ENTRY;
162
163         fid_build_reg_res_name(fid, &res_id);
164         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
165                                              policy, mode, flags, opaque);
166         RETURN(rc);
167 }
168
169 int mdc_null_inode(struct obd_export *exp,
170                    const struct lu_fid *fid)
171 {
172         struct ldlm_res_id res_id;
173         struct ldlm_resource *res;
174         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
175         ENTRY;
176
177         LASSERTF(ns != NULL, "no namespace passed\n");
178
179         fid_build_reg_res_name(fid, &res_id);
180
181         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
182         if (IS_ERR(res))
183                 RETURN(0);
184
185         lock_res(res);
186         res->lr_lvb_inode = NULL;
187         unlock_res(res);
188
189         ldlm_resource_putref(res);
190         RETURN(0);
191 }
192
193 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
194 {
195         /* Don't hold error requests for replay. */
196         if (req->rq_replay) {
197                 spin_lock(&req->rq_lock);
198                 req->rq_replay = 0;
199                 spin_unlock(&req->rq_lock);
200         }
201         if (rc && req->rq_transno != 0) {
202                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
203                 LBUG();
204         }
205 }
206
207 /* Save a large LOV EA into the request buffer so that it is available
208  * for replay.  We don't do this in the initial request because the
209  * original request doesn't need this buffer (at most it sends just the
210  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
211  * buffer and may also be difficult to allocate and save a very large
212  * request buffer for each open. (bug 5707)
213  *
214  * OOM here may cause recovery failure if lmm is needed (only for the
215  * original open if the MDS crashed just when this client also OOM'd)
216  * but this is incredibly unlikely, and questionable whether the client
217  * could do MDS recovery under OOM anyways... */
218 int mdc_save_lovea(struct ptlrpc_request *req,
219                    const struct req_msg_field *field,
220                    void *data, u32 size)
221 {
222         struct req_capsule *pill = &req->rq_pill;
223         void *lmm;
224         int rc = 0;
225
226         if (req_capsule_get_size(pill, field, RCL_CLIENT) < size) {
227                 rc = sptlrpc_cli_enlarge_reqbuf(req, field, size);
228                 if (rc) {
229                         CERROR("%s: Can't enlarge ea size to %d: rc = %d\n",
230                                req->rq_export->exp_obd->obd_name,
231                                size, rc);
232                         return rc;
233                 }
234         } else {
235                 req_capsule_shrink(pill, field, size, RCL_CLIENT);
236         }
237
238         req_capsule_set_size(pill, field, RCL_CLIENT, size);
239         lmm = req_capsule_client_get(pill, field);
240         if (lmm)
241                 memcpy(lmm, data, size);
242
243         return rc;
244 }
245
246 static struct ptlrpc_request *
247 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
248                      struct md_op_data *op_data, __u32 acl_bufsize)
249 {
250         struct ptlrpc_request   *req;
251         struct obd_device       *obddev = class_exp2obd(exp);
252         struct ldlm_intent      *lit;
253         const void              *lmm = op_data->op_data;
254         __u32                    lmmsize = op_data->op_data_size;
255         struct list_head         cancels = LIST_HEAD_INIT(cancels);
256         int                      count = 0;
257         enum ldlm_mode           mode;
258         int                      rc;
259         int repsize, repsize_estimate;
260
261         ENTRY;
262
263         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
264
265         /* XXX: openlock is not cancelled for cross-refs. */
266         /* If inode is known, cancel conflicting OPEN locks. */
267         if (fid_is_sane(&op_data->op_fid2)) {
268                 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
269                         if (it->it_flags & MDS_FMODE_WRITE)
270                                 mode = LCK_EX;
271                         else
272                                 mode = LCK_PR;
273                 } else {
274                         if (it->it_flags & (MDS_FMODE_WRITE | MDS_OPEN_TRUNC))
275                                 mode = LCK_CW;
276 #ifdef FMODE_EXEC
277                         else if (it->it_flags & FMODE_EXEC)
278                                 mode = LCK_PR;
279 #endif
280                         else
281                                 mode = LCK_CR;
282                 }
283                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
284                                                 &cancels, mode,
285                                                 MDS_INODELOCK_OPEN);
286         }
287
288         /* If CREATE, cancel parent's UPDATE lock. */
289         if (it->it_op & IT_CREAT)
290                 mode = LCK_EX;
291         else
292                 mode = LCK_CR;
293         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
294                                          &cancels, mode,
295                                          MDS_INODELOCK_UPDATE);
296
297         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
298                                    &RQF_LDLM_INTENT_OPEN);
299         if (req == NULL) {
300                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
301                 RETURN(ERR_PTR(-ENOMEM));
302         }
303
304         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
305                              op_data->op_namelen + 1);
306         if (cl_is_lov_delay_create(it->it_flags)) {
307                 /* open(O_LOV_DELAY_CREATE) won't pack lmm */
308                 LASSERT(lmmsize == 0);
309                 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
310         } else {
311                 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
312                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
313         }
314
315         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
316                              RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
317                              strlen(op_data->op_file_secctx_name) + 1 : 0);
318
319         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
320                              op_data->op_file_secctx_size);
321
322         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
323         if (rc < 0) {
324                 ptlrpc_request_free(req);
325                 RETURN(ERR_PTR(rc));
326         }
327
328         spin_lock(&req->rq_lock);
329         req->rq_replay = req->rq_import->imp_replayable;
330         spin_unlock(&req->rq_lock);
331
332         /* pack the intent */
333         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
334         lit->opc = (__u64)it->it_op;
335
336         /* pack the intended request */
337         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
338                       lmmsize);
339
340         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
341                              obddev->u.cli.cl_max_mds_easize);
342         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
343
344         /**
345          * Inline buffer for possible data from Data-on-MDT files.
346          */
347         req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE, RCL_SERVER,
348                              sizeof(struct niobuf_remote));
349         ptlrpc_request_set_replen(req);
350
351         /* Get real repbuf allocated size as rounded up power of 2 */
352         repsize = size_roundup_power2(req->rq_replen +
353                                       lustre_msg_early_size());
354         /* Estimate free space for DoM files in repbuf */
355         repsize_estimate = repsize - (req->rq_replen -
356                            obddev->u.cli.cl_max_mds_easize +
357                            sizeof(struct lov_comp_md_v1) +
358                            sizeof(struct lov_comp_md_entry_v1) +
359                            lov_mds_md_size(0, LOV_MAGIC_V3));
360
361         if (repsize_estimate < obddev->u.cli.cl_dom_min_inline_repsize) {
362                 repsize = obddev->u.cli.cl_dom_min_inline_repsize -
363                           repsize_estimate + sizeof(struct niobuf_remote);
364                 req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE,
365                                      RCL_SERVER,
366                                      sizeof(struct niobuf_remote) + repsize);
367                 ptlrpc_request_set_replen(req);
368                 CDEBUG(D_INFO, "Increase repbuf by %d bytes, total: %d\n",
369                        repsize, req->rq_replen);
370                 repsize = size_roundup_power2(req->rq_replen +
371                                               lustre_msg_early_size());
372         }
373         /* The only way to report real allocated repbuf size to the server
374          * is the lm_repsize but it must be set prior buffer allocation itself
375          * due to security reasons - it is part of buffer used in signature
376          * calculation (see LU-11414). Therefore the saved size is predicted
377          * value as rq_replen rounded to the next higher power of 2.
378          * Such estimation is safe. Though the final allocated buffer might
379          * be even larger, it is not possible to know that at this point.
380          */
381         req->rq_reqmsg->lm_repsize = repsize;
382         RETURN(req);
383 }
384
385 #define GA_DEFAULT_EA_NAME_LEN 20
386 #define GA_DEFAULT_EA_VAL_LEN  250
387 #define GA_DEFAULT_EA_NUM      10
388
389 static struct ptlrpc_request *
390 mdc_intent_getxattr_pack(struct obd_export *exp,
391                          struct lookup_intent *it,
392                          struct md_op_data *op_data)
393 {
394         struct ptlrpc_request   *req;
395         struct ldlm_intent      *lit;
396         int                     rc, count = 0;
397         struct list_head        cancels = LIST_HEAD_INIT(cancels);
398         u32 ea_vals_buf_size = GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM;
399
400         ENTRY;
401
402         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
403                                         &RQF_LDLM_INTENT_GETXATTR);
404         if (req == NULL)
405                 RETURN(ERR_PTR(-ENOMEM));
406
407         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
408         if (rc) {
409                 ptlrpc_request_free(req);
410                 RETURN(ERR_PTR(rc));
411         }
412
413         /* pack the intent */
414         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
415         lit->opc = IT_GETXATTR;
416
417 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 0, 53, 0)
418         /* If the supplied buffer is too small then the server will
419          * return -ERANGE and llite will fallback to using non cached
420          * xattr operations. On servers before 2.10.1 a (non-cached)
421          * listxattr RPC for an orphan or dead file causes an oops. So
422          * let's try to avoid sending too small a buffer to too old a
423          * server. This is effectively undoing the memory conservation
424          * of LU-9417 when it would be *more* likely to crash the
425          * server. See LU-9856. */
426         if (exp->exp_connect_data.ocd_version < OBD_OCD_VERSION(2, 10, 1, 0))
427                 ea_vals_buf_size = max_t(u32, ea_vals_buf_size,
428                                          exp->exp_connect_data.ocd_max_easize);
429 #endif
430
431         /* pack the intended request */
432         mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid,
433                       ea_vals_buf_size, -1, 0);
434
435         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
436                              GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM);
437
438         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER,
439                              ea_vals_buf_size);
440
441         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS, RCL_SERVER,
442                              sizeof(u32) * GA_DEFAULT_EA_NUM);
443
444         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
445
446         ptlrpc_request_set_replen(req);
447
448         RETURN(req);
449 }
450
451 static struct ptlrpc_request *
452 mdc_intent_getattr_pack(struct obd_export *exp, struct lookup_intent *it,
453                         struct md_op_data *op_data, __u32 acl_bufsize)
454 {
455         struct ptlrpc_request   *req;
456         struct obd_device       *obddev = class_exp2obd(exp);
457         u64                      valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
458                                          OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
459                                          OBD_MD_MEA | OBD_MD_FLACL;
460         struct ldlm_intent      *lit;
461         int                      rc;
462         __u32                    easize;
463         ENTRY;
464
465         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
466                                    &RQF_LDLM_INTENT_GETATTR);
467         if (req == NULL)
468                 RETURN(ERR_PTR(-ENOMEM));
469
470         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
471                              op_data->op_namelen + 1);
472
473         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
474         if (rc) {
475                 ptlrpc_request_free(req);
476                 RETURN(ERR_PTR(rc));
477         }
478
479         /* pack the intent */
480         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
481         lit->opc = (__u64)it->it_op;
482
483         if (obddev->u.cli.cl_default_mds_easize > 0)
484                 easize = obddev->u.cli.cl_default_mds_easize;
485         else
486                 easize = obddev->u.cli.cl_max_mds_easize;
487
488         /* pack the intended request */
489         mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
490
491         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
492         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
493         ptlrpc_request_set_replen(req);
494         RETURN(req);
495 }
496
497 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
498                                                      struct lookup_intent *it,
499                                                      struct md_op_data *op_data)
500 {
501         struct obd_device     *obd = class_exp2obd(exp);
502         struct ptlrpc_request *req;
503         struct ldlm_intent    *lit;
504         struct layout_intent  *layout;
505         int rc;
506         ENTRY;
507
508         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
509                                 &RQF_LDLM_INTENT_LAYOUT);
510         if (req == NULL)
511                 RETURN(ERR_PTR(-ENOMEM));
512
513         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
514         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
515         if (rc) {
516                 ptlrpc_request_free(req);
517                 RETURN(ERR_PTR(rc));
518         }
519
520         /* pack the intent */
521         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
522         lit->opc = (__u64)it->it_op;
523
524         /* pack the layout intent request */
525         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
526         LASSERT(op_data->op_data != NULL);
527         LASSERT(op_data->op_data_size == sizeof(*layout));
528         memcpy(layout, op_data->op_data, sizeof(*layout));
529
530         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
531                              obd->u.cli.cl_default_mds_easize);
532         ptlrpc_request_set_replen(req);
533         RETURN(req);
534 }
535
536 static struct ptlrpc_request *
537 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
538 {
539         struct ptlrpc_request *req;
540         int rc;
541         ENTRY;
542
543         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
544         if (req == NULL)
545                 RETURN(ERR_PTR(-ENOMEM));
546
547         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
548         if (rc) {
549                 ptlrpc_request_free(req);
550                 RETURN(ERR_PTR(rc));
551         }
552
553         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
554         ptlrpc_request_set_replen(req);
555         RETURN(req);
556 }
557
558 static int mdc_finish_enqueue(struct obd_export *exp,
559                               struct ptlrpc_request *req,
560                               struct ldlm_enqueue_info *einfo,
561                               struct lookup_intent *it,
562                               struct lustre_handle *lockh,
563                               int rc)
564 {
565         struct req_capsule  *pill = &req->rq_pill;
566         struct ldlm_request *lockreq;
567         struct ldlm_reply   *lockrep;
568         struct ldlm_lock    *lock;
569         struct mdt_body     *body = NULL;
570         void                *lvb_data = NULL;
571         __u32                lvb_len = 0;
572
573         ENTRY;
574
575         LASSERT(rc >= 0);
576         /* Similarly, if we're going to replay this request, we don't want to
577          * actually get a lock, just perform the intent. */
578         if (req->rq_transno || req->rq_replay) {
579                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
580                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
581         }
582
583         if (rc == ELDLM_LOCK_ABORTED) {
584                 einfo->ei_mode = 0;
585                 memset(lockh, 0, sizeof(*lockh));
586                 rc = 0;
587         } else { /* rc = 0 */
588                 lock = ldlm_handle2lock(lockh);
589                 LASSERT(lock != NULL);
590
591                 /* If the server gave us back a different lock mode, we should
592                  * fix up our variables. */
593                 if (lock->l_req_mode != einfo->ei_mode) {
594                         ldlm_lock_addref(lockh, lock->l_req_mode);
595                         ldlm_lock_decref(lockh, einfo->ei_mode);
596                         einfo->ei_mode = lock->l_req_mode;
597                 }
598                 LDLM_LOCK_PUT(lock);
599         }
600
601         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
602         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
603
604         it->it_disposition = (int)lockrep->lock_policy_res1;
605         it->it_status = (int)lockrep->lock_policy_res2;
606         it->it_lock_mode = einfo->ei_mode;
607         it->it_lock_handle = lockh->cookie;
608         it->it_request = req;
609
610         /* Technically speaking rq_transno must already be zero if
611          * it_status is in error, so the check is a bit redundant */
612         if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
613                 mdc_clear_replay_flag(req, it->it_status);
614
615         /* If we're doing an IT_OPEN which did not result in an actual
616          * successful open, then we need to remove the bit which saves
617          * this request for unconditional replay.
618          *
619          * It's important that we do this first!  Otherwise we might exit the
620          * function without doing so, and try to replay a failed create
621          * (bug 3440) */
622         if (it->it_op & IT_OPEN && req->rq_replay &&
623             (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
624                 mdc_clear_replay_flag(req, it->it_status);
625
626         DEBUG_REQ(D_RPCTRACE, req, "op: %x disposition: %x, status: %d",
627                   it->it_op, it->it_disposition, it->it_status);
628
629         /* We know what to expect, so we do any byte flipping required here */
630         if (it_has_reply_body(it)) {
631                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
632                 if (body == NULL) {
633                         CERROR ("Can't swab mdt_body\n");
634                         RETURN (-EPROTO);
635                 }
636
637                 if (it_disposition(it, DISP_OPEN_OPEN) &&
638                     !it_open_error(DISP_OPEN_OPEN, it)) {
639                         /*
640                          * If this is a successful OPEN request, we need to set
641                          * replay handler and data early, so that if replay
642                          * happens immediately after swabbing below, new reply
643                          * is swabbed by that handler correctly.
644                          */
645                         mdc_set_open_replay_data(NULL, NULL, it);
646                 }
647
648                 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
649                         void *eadata;
650
651                         mdc_update_max_ea_from_body(exp, body);
652
653                         /*
654                          * The eadata is opaque; just check that it is there.
655                          * Eventually, obd_unpackmd() will check the contents.
656                          */
657                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
658                                                         body->mbo_eadatasize);
659                         if (eadata == NULL)
660                                 RETURN(-EPROTO);
661
662                         /* save lvb data and length in case this is for layout
663                          * lock */
664                         lvb_data = eadata;
665                         lvb_len = body->mbo_eadatasize;
666
667                         /*
668                          * We save the reply LOV EA in case we have to replay a
669                          * create for recovery.  If we didn't allocate a large
670                          * enough request buffer above we need to reallocate it
671                          * here to hold the actual LOV EA.
672                          *
673                          * To not save LOV EA if request is not going to replay
674                          * (for example error one).
675                          */
676                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
677                                 rc = mdc_save_lovea(req, &RMF_EADATA, eadata,
678                                                     body->mbo_eadatasize);
679                                 if (rc) {
680                                         body->mbo_valid &= ~OBD_MD_FLEASIZE;
681                                         body->mbo_eadatasize = 0;
682                                         rc = 0;
683                                 }
684                         }
685                 }
686         } else if (it->it_op & IT_LAYOUT) {
687                 /* maybe the lock was granted right away and layout
688                  * is packed into RMF_DLM_LVB of req */
689                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
690                 CDEBUG(D_INFO, "%s: layout return lvb %d transno %lld\n",
691                        class_exp2obd(exp)->obd_name, lvb_len, req->rq_transno);
692                 if (lvb_len > 0) {
693                         lvb_data = req_capsule_server_sized_get(pill,
694                                                         &RMF_DLM_LVB, lvb_len);
695                         if (lvb_data == NULL)
696                                 RETURN(-EPROTO);
697
698                         /**
699                          * save replied layout data to the request buffer for
700                          * recovery consideration (lest MDS reinitialize
701                          * another set of OST objects).
702                          */
703                         if (req->rq_transno)
704                                 (void)mdc_save_lovea(req, &RMF_EADATA, lvb_data,
705                                                      lvb_len);
706                 }
707         }
708
709         /* fill in stripe data for layout lock.
710          * LU-6581: trust layout data only if layout lock is granted. The MDT
711          * has stopped sending layout unless the layout lock is granted. The
712          * client still does this checking in case it's talking with an old
713          * server. - Jinshan */
714         lock = ldlm_handle2lock(lockh);
715         if (lock == NULL)
716                 RETURN(rc);
717
718         if (ldlm_has_layout(lock) && lvb_data != NULL &&
719             !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
720                 void *lmm;
721
722                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
723                         ldlm_it2str(it->it_op), lvb_len);
724
725                 OBD_ALLOC_LARGE(lmm, lvb_len);
726                 if (lmm == NULL)
727                         GOTO(out_lock, rc = -ENOMEM);
728
729                 memcpy(lmm, lvb_data, lvb_len);
730
731                 /* install lvb_data */
732                 lock_res_and_lock(lock);
733                 if (lock->l_lvb_data == NULL) {
734                         lock->l_lvb_type = LVB_T_LAYOUT;
735                         lock->l_lvb_data = lmm;
736                         lock->l_lvb_len = lvb_len;
737                         lmm = NULL;
738                 }
739                 unlock_res_and_lock(lock);
740                 if (lmm != NULL)
741                         OBD_FREE_LARGE(lmm, lvb_len);
742         }
743
744         if (ldlm_has_dom(lock)) {
745                 LASSERT(lock->l_glimpse_ast == mdc_ldlm_glimpse_ast);
746
747                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
748                 if (!(body->mbo_valid & OBD_MD_DOM_SIZE)) {
749                         LDLM_ERROR(lock, "%s: DoM lock without size.",
750                                    exp->exp_obd->obd_name);
751                         GOTO(out_lock, rc = -EPROTO);
752                 }
753
754                 LDLM_DEBUG(lock, "DoM lock is returned by: %s, size: %llu",
755                            ldlm_it2str(it->it_op), body->mbo_dom_size);
756
757                 rc = mdc_fill_lvb(req, &lock->l_ost_lvb);
758         }
759 out_lock:
760         LDLM_LOCK_PUT(lock);
761
762         RETURN(rc);
763 }
764
765 /* We always reserve enough space in the reply packet for a stripe MD, because
766  * we don't know in advance the file type. */
767 static int mdc_enqueue_base(struct obd_export *exp,
768                             struct ldlm_enqueue_info *einfo,
769                             const union ldlm_policy_data *policy,
770                             struct lookup_intent *it,
771                             struct md_op_data *op_data,
772                             struct lustre_handle *lockh,
773                             __u64 extra_lock_flags)
774 {
775         struct obd_device *obddev = class_exp2obd(exp);
776         struct ptlrpc_request *req = NULL;
777         __u64 flags, saved_flags = extra_lock_flags;
778         struct ldlm_res_id res_id;
779         static const union ldlm_policy_data lookup_policy = {
780                                   .l_inodebits = { MDS_INODELOCK_LOOKUP } };
781         static const union ldlm_policy_data update_policy = {
782                                   .l_inodebits = { MDS_INODELOCK_UPDATE } };
783         static const union ldlm_policy_data layout_policy = {
784                                   .l_inodebits = { MDS_INODELOCK_LAYOUT } };
785         static const union ldlm_policy_data getxattr_policy = {
786                                   .l_inodebits = { MDS_INODELOCK_XATTR } };
787         int generation, resends = 0;
788         struct ldlm_reply *lockrep;
789         struct obd_import *imp = class_exp2cliimp(exp);
790         __u32 acl_bufsize;
791         enum lvb_type lvb_type = 0;
792         int rc;
793         ENTRY;
794
795         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
796                  einfo->ei_type);
797         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
798
799         if (it != NULL) {
800                 LASSERT(policy == NULL);
801
802                 saved_flags |= LDLM_FL_HAS_INTENT;
803                 if (it->it_op & (IT_GETATTR | IT_READDIR))
804                         policy = &update_policy;
805                 else if (it->it_op & IT_LAYOUT)
806                         policy = &layout_policy;
807                 else if (it->it_op & IT_GETXATTR)
808                         policy = &getxattr_policy;
809                 else
810                         policy = &lookup_policy;
811         }
812
813         generation = obddev->u.cli.cl_import->imp_generation;
814         if (!it || (it->it_op & (IT_OPEN | IT_CREAT)))
815                 acl_bufsize = MIN(imp->imp_connect_data.ocd_max_easize,
816                                   XATTR_SIZE_MAX);
817         else
818                 acl_bufsize = LUSTRE_POSIX_ACL_MAX_SIZE_OLD;
819
820 resend:
821         flags = saved_flags;
822         if (it == NULL) {
823                 /* The only way right now is FLOCK. */
824                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
825                          einfo->ei_type);
826                 res_id.name[3] = LDLM_FLOCK;
827         } else if (it->it_op & IT_OPEN) {
828                 req = mdc_intent_open_pack(exp, it, op_data, acl_bufsize);
829         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
830                 req = mdc_intent_getattr_pack(exp, it, op_data, acl_bufsize);
831         } else if (it->it_op & IT_READDIR) {
832                 req = mdc_enqueue_pack(exp, 0);
833         } else if (it->it_op & IT_LAYOUT) {
834                 if (!imp_connect_lvb_type(imp))
835                         RETURN(-EOPNOTSUPP);
836                 req = mdc_intent_layout_pack(exp, it, op_data);
837                 lvb_type = LVB_T_LAYOUT;
838         } else if (it->it_op & IT_GETXATTR) {
839                 req = mdc_intent_getxattr_pack(exp, it, op_data);
840         } else {
841                 LBUG();
842                 RETURN(-EINVAL);
843         }
844
845         if (IS_ERR(req))
846                 RETURN(PTR_ERR(req));
847
848         if (resends) {
849                 req->rq_generation_set = 1;
850                 req->rq_import_generation = generation;
851                 req->rq_sent = ktime_get_real_seconds() + resends;
852         }
853
854         /* It is important to obtain modify RPC slot first (if applicable), so
855          * that threads that are waiting for a modify RPC slot are not polluting
856          * our rpcs in flight counter.
857          * We do not do flock request limiting, though */
858         if (it) {
859                 mdc_get_mod_rpc_slot(req, it);
860                 rc = obd_get_request_slot(&obddev->u.cli);
861                 if (rc != 0) {
862                         mdc_put_mod_rpc_slot(req, it);
863                         mdc_clear_replay_flag(req, 0);
864                         ptlrpc_req_finished(req);
865                         RETURN(rc);
866                 }
867         }
868
869         /* With Data-on-MDT the glimpse callback is needed too.
870          * It is set here in advance but not in mdc_finish_enqueue()
871          * to avoid possible races. It is safe to have glimpse handler
872          * for non-DOM locks and costs nothing.*/
873         if (einfo->ei_cb_gl == NULL)
874                 einfo->ei_cb_gl = mdc_ldlm_glimpse_ast;
875
876         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
877                               0, lvb_type, lockh, 0);
878         if (!it) {
879                 /* For flock requests we immediatelly return without further
880                    delay and let caller deal with the rest, since rest of
881                    this function metadata processing makes no sense for flock
882                    requests anyway. But in case of problem during comms with
883                    Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
884                    can not rely on caller and this mainly for F_UNLCKs
885                    (explicits or automatically generated by Kernel to clean
886                    current FLocks upon exit) that can't be trashed */
887                 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
888                     (einfo->ei_type == LDLM_FLOCK) &&
889                     (einfo->ei_mode == LCK_NL))
890                         goto resend;
891                 RETURN(rc);
892         }
893
894         obd_put_request_slot(&obddev->u.cli);
895         mdc_put_mod_rpc_slot(req, it);
896
897         if (rc < 0) {
898                 CDEBUG(D_INFO,
899                       "%s: ldlm_cli_enqueue "DFID":"DFID"=%s failed: rc = %d\n",
900                       obddev->obd_name, PFID(&op_data->op_fid1),
901                       PFID(&op_data->op_fid2), op_data->op_name ?: "", rc);
902
903                 mdc_clear_replay_flag(req, rc);
904                 ptlrpc_req_finished(req);
905                 RETURN(rc);
906         }
907
908         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
909         LASSERT(lockrep != NULL);
910
911         lockrep->lock_policy_res2 =
912                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
913
914         /* Retry infinitely when the server returns -EINPROGRESS for the
915          * intent operation, when server returns -EINPROGRESS for acquiring
916          * intent lock, we'll retry in after_reply(). */
917         if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
918                 mdc_clear_replay_flag(req, rc);
919                 ptlrpc_req_finished(req);
920                 if (generation == obddev->u.cli.cl_import->imp_generation) {
921                         if (signal_pending(current))
922                                 RETURN(-EINTR);
923
924                         resends++;
925                         CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
926                                obddev->obd_name, resends, it->it_op,
927                                PFID(&op_data->op_fid1),
928                                PFID(&op_data->op_fid2));
929                         goto resend;
930                 } else {
931                         CDEBUG(D_HA, "resend cross eviction\n");
932                         RETURN(-EIO);
933                 }
934         }
935
936         if ((int)lockrep->lock_policy_res2 == -ERANGE &&
937             it->it_op & (IT_OPEN | IT_GETATTR | IT_LOOKUP) &&
938             acl_bufsize == LUSTRE_POSIX_ACL_MAX_SIZE_OLD) {
939                 mdc_clear_replay_flag(req, -ERANGE);
940                 ptlrpc_req_finished(req);
941                 acl_bufsize = MIN(imp->imp_connect_data.ocd_max_easize,
942                                   XATTR_SIZE_MAX);
943                 goto resend;
944         }
945
946         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
947         if (rc < 0) {
948                 if (lustre_handle_is_used(lockh)) {
949                         ldlm_lock_decref(lockh, einfo->ei_mode);
950                         memset(lockh, 0, sizeof(*lockh));
951                 }
952                 ptlrpc_req_finished(req);
953
954                 it->it_lock_handle = 0;
955                 it->it_lock_mode = 0;
956                 it->it_request = NULL;
957         }
958
959         RETURN(rc);
960 }
961
962 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
963                 const union ldlm_policy_data *policy,
964                 struct md_op_data *op_data,
965                 struct lustre_handle *lockh, __u64 extra_lock_flags)
966 {
967         return mdc_enqueue_base(exp, einfo, policy, NULL,
968                                 op_data, lockh, extra_lock_flags);
969 }
970
971 static int mdc_finish_intent_lock(struct obd_export *exp,
972                                   struct ptlrpc_request *request,
973                                   struct md_op_data *op_data,
974                                   struct lookup_intent *it,
975                                   struct lustre_handle *lockh)
976 {
977         struct lustre_handle old_lock;
978         struct ldlm_lock *lock;
979         int rc = 0;
980         ENTRY;
981
982         LASSERT(request != NULL);
983         LASSERT(request != LP_POISON);
984         LASSERT(request->rq_repmsg != LP_POISON);
985
986         if (it->it_op & IT_READDIR)
987                 RETURN(0);
988
989         if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) {
990                 if (it->it_status != 0)
991                         GOTO(out, rc = it->it_status);
992         } else {
993                 if (!it_disposition(it, DISP_IT_EXECD)) {
994                         /* The server failed before it even started executing
995                          * the intent, i.e. because it couldn't unpack the
996                          * request.
997                          */
998                         LASSERT(it->it_status != 0);
999                         GOTO(out, rc = it->it_status);
1000                 }
1001                 rc = it_open_error(DISP_IT_EXECD, it);
1002                 if (rc)
1003                         GOTO(out, rc);
1004
1005                 rc = it_open_error(DISP_LOOKUP_EXECD, it);
1006                 if (rc)
1007                         GOTO(out, rc);
1008
1009                 /* keep requests around for the multiple phases of the call
1010                  * this shows the DISP_XX must guarantee we make it into the
1011                  * call
1012                  */
1013                 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
1014                     it_disposition(it, DISP_OPEN_CREATE) &&
1015                     !it_open_error(DISP_OPEN_CREATE, it)) {
1016                         it_set_disposition(it, DISP_ENQ_CREATE_REF);
1017                         /* balanced in ll_create_node */
1018                         ptlrpc_request_addref(request);
1019                 }
1020                 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1021                     it_disposition(it, DISP_OPEN_OPEN) &&
1022                     !it_open_error(DISP_OPEN_OPEN, it)) {
1023                         it_set_disposition(it, DISP_ENQ_OPEN_REF);
1024                         /* balanced in ll_file_open */
1025                         ptlrpc_request_addref(request);
1026                         /* BUG 11546 - eviction in the middle of open rpc
1027                          * processing
1028                          */
1029                         OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE,
1030                                          obd_timeout);
1031                 }
1032
1033                 if (it->it_op & IT_CREAT) {
1034                         /* XXX this belongs in ll_create_it */
1035                 } else if (it->it_op == IT_OPEN) {
1036                         LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1037                 } else {
1038                         LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
1039                 }
1040         }
1041
1042         /* If we already have a matching lock, then cancel the new
1043          * one.  We have to set the data here instead of in
1044          * mdc_enqueue, because we need to use the child's inode as
1045          * the l_ast_data to match, and that's not available until
1046          * intent_finish has performed the iget().) */
1047         lock = ldlm_handle2lock(lockh);
1048         if (lock) {
1049                 union ldlm_policy_data policy = lock->l_policy_data;
1050                 LDLM_DEBUG(lock, "matching against this");
1051
1052                 if (it_has_reply_body(it)) {
1053                         struct mdt_body *body;
1054
1055                         body = req_capsule_server_get(&request->rq_pill,
1056                                                       &RMF_MDT_BODY);
1057                         /* mdc_enqueue checked */
1058                         LASSERT(body != NULL);
1059                         LASSERTF(fid_res_name_eq(&body->mbo_fid1,
1060                                                  &lock->l_resource->lr_name),
1061                                  "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1062                                  PLDLMRES(lock->l_resource),
1063                                  PFID(&body->mbo_fid1));
1064                 }
1065                 LDLM_LOCK_PUT(lock);
1066
1067                 memcpy(&old_lock, lockh, sizeof(*lockh));
1068                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1069                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1070                         ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
1071                         memcpy(lockh, &old_lock, sizeof(old_lock));
1072                         it->it_lock_handle = lockh->cookie;
1073                 }
1074         }
1075
1076         EXIT;
1077 out:
1078         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1079                 (int)op_data->op_namelen, op_data->op_name,
1080                 ldlm_it2str(it->it_op), it->it_status,
1081                 it->it_disposition, rc);
1082         return rc;
1083 }
1084
1085 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1086                         struct lu_fid *fid, __u64 *bits)
1087 {
1088         /* We could just return 1 immediately, but since we should only
1089          * be called in revalidate_it if we already have a lock, let's
1090          * verify that. */
1091         struct ldlm_res_id res_id;
1092         struct lustre_handle lockh;
1093         union ldlm_policy_data policy;
1094         enum ldlm_mode mode;
1095         ENTRY;
1096
1097         if (it->it_lock_handle) {
1098                 lockh.cookie = it->it_lock_handle;
1099                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1100         } else {
1101                 fid_build_reg_res_name(fid, &res_id);
1102                 switch (it->it_op) {
1103                 case IT_GETATTR:
1104                         /* File attributes are held under multiple bits:
1105                          * nlink is under lookup lock, size and times are
1106                          * under UPDATE lock and recently we've also got
1107                          * a separate permissions lock for owner/group/acl that
1108                          * were protected by lookup lock before.
1109                          * Getattr must provide all of that information,
1110                          * so we need to ensure we have all of those locks.
1111                          * Unfortunately, if the bits are split across multiple
1112                          * locks, there's no easy way to match all of them here,
1113                          * so an extra RPC would be performed to fetch all
1114                          * of those bits at once for now. */
1115                         /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1116                          * but for old MDTs (< 2.4), permission is covered
1117                          * by LOOKUP lock, so it needs to match all bits here.*/
1118                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1119                                                   MDS_INODELOCK_LOOKUP |
1120                                                   MDS_INODELOCK_PERM;
1121                         break;
1122                 case IT_READDIR:
1123                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1124                         break;
1125                 case IT_LAYOUT:
1126                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1127                         break;
1128                 default:
1129                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1130                         break;
1131                 }
1132
1133                 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1134                                       LDLM_IBITS, &policy,
1135                                       LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1136                                       &lockh);
1137         }
1138
1139         if (mode) {
1140                 it->it_lock_handle = lockh.cookie;
1141                 it->it_lock_mode = mode;
1142         } else {
1143                 it->it_lock_handle = 0;
1144                 it->it_lock_mode = 0;
1145         }
1146
1147         RETURN(!!mode);
1148 }
1149
1150 /*
1151  * This long block is all about fixing up the lock and request state
1152  * so that it is correct as of the moment _before_ the operation was
1153  * applied; that way, the VFS will think that everything is normal and
1154  * call Lustre's regular VFS methods.
1155  *
1156  * If we're performing a creation, that means that unless the creation
1157  * failed with EEXIST, we should fake up a negative dentry.
1158  *
1159  * For everything else, we want to lookup to succeed.
1160  *
1161  * One additional note: if CREATE or OPEN succeeded, we add an extra
1162  * reference to the request because we need to keep it around until
1163  * ll_create/ll_open gets called.
1164  *
1165  * The server will return to us, in it_disposition, an indication of
1166  * exactly what it_status refers to.
1167  *
1168  * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1169  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1170  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1171  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1172  * was successful.
1173  *
1174  * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1175  * child lookup.
1176  */
1177 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1178                     struct lookup_intent *it, struct ptlrpc_request **reqp,
1179                     ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1180 {
1181         struct ldlm_enqueue_info einfo = {
1182                 .ei_type        = LDLM_IBITS,
1183                 .ei_mode        = it_to_lock_mode(it),
1184                 .ei_cb_bl       = cb_blocking,
1185                 .ei_cb_cp       = ldlm_completion_ast,
1186                 .ei_cb_gl       = mdc_ldlm_glimpse_ast,
1187         };
1188         struct lustre_handle lockh;
1189         int rc = 0;
1190         ENTRY;
1191         LASSERT(it);
1192
1193         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1194                 ", intent: %s flags %#llo\n", (int)op_data->op_namelen,
1195                 op_data->op_name, PFID(&op_data->op_fid2),
1196                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1197                 it->it_flags);
1198
1199         lockh.cookie = 0;
1200         if (fid_is_sane(&op_data->op_fid2) &&
1201             (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1202                 /* We could just return 1 immediately, but since we should only
1203                  * be called in revalidate_it if we already have a lock, let's
1204                  * verify that. */
1205                 it->it_lock_handle = 0;
1206                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1207                 /* Only return failure if it was not GETATTR by cfid
1208                    (from inode_revalidate) */
1209                 if (rc || op_data->op_namelen != 0)
1210                         RETURN(rc);
1211         }
1212
1213         /* For case if upper layer did not alloc fid, do it now. */
1214         if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1215                 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1216                 if (rc < 0) {
1217                         CERROR("Can't alloc new fid, rc %d\n", rc);
1218                         RETURN(rc);
1219                 }
1220         }
1221
1222         rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh,
1223                               extra_lock_flags);
1224         if (rc < 0)
1225                 RETURN(rc);
1226
1227         *reqp = it->it_request;
1228         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1229         RETURN(rc);
1230 }
1231
1232 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1233                                               struct ptlrpc_request *req,
1234                                               void *args, int rc)
1235 {
1236         struct mdc_getattr_args  *ga = args;
1237         struct obd_export *exp = ga->ga_exp;
1238         struct md_enqueue_info *minfo = ga->ga_minfo;
1239         struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1240         struct lookup_intent *it;
1241         struct lustre_handle *lockh;
1242         struct obd_device *obddev;
1243         struct ldlm_reply *lockrep;
1244         __u64 flags = LDLM_FL_HAS_INTENT;
1245         ENTRY;
1246
1247         it    = &minfo->mi_it;
1248         lockh = &minfo->mi_lockh;
1249
1250         obddev = class_exp2obd(exp);
1251
1252         obd_put_request_slot(&obddev->u.cli);
1253         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1254                 rc = -ETIMEDOUT;
1255
1256         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1257                                    &flags, NULL, 0, lockh, rc);
1258         if (rc < 0) {
1259                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1260                 mdc_clear_replay_flag(req, rc);
1261                 GOTO(out, rc);
1262         }
1263
1264         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1265         LASSERT(lockrep != NULL);
1266
1267         lockrep->lock_policy_res2 =
1268                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1269
1270         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1271         if (rc)
1272                 GOTO(out, rc);
1273
1274         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1275         EXIT;
1276
1277 out:
1278         minfo->mi_cb(req, minfo, rc);
1279         return 0;
1280 }
1281
1282 int mdc_intent_getattr_async(struct obd_export *exp,
1283                              struct md_enqueue_info *minfo)
1284 {
1285         struct md_op_data       *op_data = &minfo->mi_data;
1286         struct lookup_intent    *it = &minfo->mi_it;
1287         struct ptlrpc_request   *req;
1288         struct mdc_getattr_args *ga;
1289         struct obd_device       *obddev = class_exp2obd(exp);
1290         struct ldlm_res_id       res_id;
1291         union ldlm_policy_data policy = {
1292                                 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1293                                                  MDS_INODELOCK_UPDATE } };
1294         int                      rc = 0;
1295         __u64                    flags = LDLM_FL_HAS_INTENT;
1296         ENTRY;
1297
1298         CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#llo\n",
1299                 (int)op_data->op_namelen, op_data->op_name,
1300                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1301
1302         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1303         /* If the MDT return -ERANGE because of large ACL, then the sponsor
1304          * of the async getattr RPC will handle that by itself. */
1305         req = mdc_intent_getattr_pack(exp, it, op_data,
1306                                       LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
1307         if (IS_ERR(req))
1308                 RETURN(PTR_ERR(req));
1309
1310         rc = obd_get_request_slot(&obddev->u.cli);
1311         if (rc != 0) {
1312                 ptlrpc_req_finished(req);
1313                 RETURN(rc);
1314         }
1315
1316         /* With Data-on-MDT the glimpse callback is needed too.
1317          * It is set here in advance but not in mdc_finish_enqueue()
1318          * to avoid possible races. It is safe to have glimpse handler
1319          * for non-DOM locks and costs nothing.*/
1320         if (minfo->mi_einfo.ei_cb_gl == NULL)
1321                 minfo->mi_einfo.ei_cb_gl = mdc_ldlm_glimpse_ast;
1322
1323         rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1324                               &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1325         if (rc < 0) {
1326                 obd_put_request_slot(&obddev->u.cli);
1327                 ptlrpc_req_finished(req);
1328                 RETURN(rc);
1329         }
1330
1331         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1332         ga = ptlrpc_req_async_args(req);
1333         ga->ga_exp = exp;
1334         ga->ga_minfo = minfo;
1335
1336         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1337         ptlrpcd_add_req(req);
1338
1339         RETURN(0);
1340 }