Whamcloud - gitweb
2c151df4839e408c6a5abce6e949da6d2d5f44f3
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_MDC
34
35 #include <linux/module.h>
36
37 #include <obd.h>
38 #include <obd_class.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_intent.h>
42 #include <lustre_mdc.h>
43 #include <lustre_net.h>
44 #include <lustre_req_layout.h>
45 #include <lustre_swab.h>
46 #include <lustre_acl.h>
47
48 #include "mdc_internal.h"
49
50 struct mdc_getattr_args {
51         struct obd_export               *ga_exp;
52         struct md_enqueue_info          *ga_minfo;
53 };
54
55 int it_open_error(int phase, struct lookup_intent *it)
56 {
57         if (it_disposition(it, DISP_OPEN_LEASE)) {
58                 if (phase >= DISP_OPEN_LEASE)
59                         return it->it_status;
60                 else
61                         return 0;
62         }
63         if (it_disposition(it, DISP_OPEN_OPEN)) {
64                 if (phase >= DISP_OPEN_OPEN)
65                         return it->it_status;
66                 else
67                         return 0;
68         }
69
70         if (it_disposition(it, DISP_OPEN_CREATE)) {
71                 if (phase >= DISP_OPEN_CREATE)
72                         return it->it_status;
73                 else
74                         return 0;
75         }
76
77         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
78                 if (phase >= DISP_LOOKUP_EXECD)
79                         return it->it_status;
80                 else
81                         return 0;
82         }
83
84         if (it_disposition(it, DISP_IT_EXECD)) {
85                 if (phase >= DISP_IT_EXECD)
86                         return it->it_status;
87                 else
88                         return 0;
89         }
90
91         CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
92         LBUG();
93
94         return 0;
95 }
96 EXPORT_SYMBOL(it_open_error);
97
98 /* this must be called on a lockh that is known to have a referenced lock */
99 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
100                       void *data, __u64 *bits)
101 {
102         struct ldlm_lock *lock;
103         struct inode *new_inode = data;
104         ENTRY;
105
106         if(bits)
107                 *bits = 0;
108
109         if (!lustre_handle_is_used(lockh))
110                 RETURN(0);
111
112         lock = ldlm_handle2lock(lockh);
113
114         LASSERT(lock != NULL);
115         lock_res_and_lock(lock);
116         if (lock->l_resource->lr_lvb_inode &&
117             lock->l_resource->lr_lvb_inode != data) {
118                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
119                 LASSERTF(old_inode->i_state & I_FREEING,
120                          "Found existing inode %p/%lu/%u state %lu in lock: "
121                          "setting data to %p/%lu/%u\n", old_inode,
122                          old_inode->i_ino, old_inode->i_generation,
123                          old_inode->i_state,
124                          new_inode, new_inode->i_ino, new_inode->i_generation);
125         }
126         lock->l_resource->lr_lvb_inode = new_inode;
127         if (bits)
128                 *bits = lock->l_policy_data.l_inodebits.bits;
129
130         unlock_res_and_lock(lock);
131         LDLM_LOCK_PUT(lock);
132
133         RETURN(0);
134 }
135
136 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
137                               const struct lu_fid *fid, enum ldlm_type type,
138                               union ldlm_policy_data *policy,
139                               enum ldlm_mode mode, struct lustre_handle *lockh)
140 {
141         struct ldlm_res_id res_id;
142         enum ldlm_mode rc;
143         ENTRY;
144
145         fid_build_reg_res_name(fid, &res_id);
146         /* LU-4405: Clear bits not supported by server */
147         policy->l_inodebits.bits &= exp_connect_ibits(exp);
148         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
149                              &res_id, type, policy, mode, lockh, 0);
150         RETURN(rc);
151 }
152
153 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
154                       union ldlm_policy_data *policy, enum ldlm_mode mode,
155                       enum ldlm_cancel_flags flags, void *opaque)
156 {
157         struct obd_device *obd = class_exp2obd(exp);
158         struct ldlm_res_id res_id;
159         int rc;
160
161         ENTRY;
162
163         fid_build_reg_res_name(fid, &res_id);
164         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
165                                              policy, mode, flags, opaque);
166         RETURN(rc);
167 }
168
169 int mdc_null_inode(struct obd_export *exp,
170                    const struct lu_fid *fid)
171 {
172         struct ldlm_res_id res_id;
173         struct ldlm_resource *res;
174         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
175         ENTRY;
176
177         LASSERTF(ns != NULL, "no namespace passed\n");
178
179         fid_build_reg_res_name(fid, &res_id);
180
181         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
182         if (IS_ERR(res))
183                 RETURN(0);
184
185         lock_res(res);
186         res->lr_lvb_inode = NULL;
187         unlock_res(res);
188
189         ldlm_resource_putref(res);
190         RETURN(0);
191 }
192
193 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
194 {
195         /* Don't hold error requests for replay. */
196         if (req->rq_replay) {
197                 spin_lock(&req->rq_lock);
198                 req->rq_replay = 0;
199                 spin_unlock(&req->rq_lock);
200         }
201         if (rc && req->rq_transno != 0) {
202                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
203                 LBUG();
204         }
205 }
206
207 /* Save a large LOV EA into the request buffer so that it is available
208  * for replay.  We don't do this in the initial request because the
209  * original request doesn't need this buffer (at most it sends just the
210  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
211  * buffer and may also be difficult to allocate and save a very large
212  * request buffer for each open. (bug 5707)
213  *
214  * OOM here may cause recovery failure if lmm is needed (only for the
215  * original open if the MDS crashed just when this client also OOM'd)
216  * but this is incredibly unlikely, and questionable whether the client
217  * could do MDS recovery under OOM anyways... */
218 int mdc_save_lovea(struct ptlrpc_request *req,
219                    const struct req_msg_field *field,
220                    void *data, u32 size)
221 {
222         struct req_capsule *pill = &req->rq_pill;
223         void *lmm;
224         int rc = 0;
225
226         if (req_capsule_get_size(pill, field, RCL_CLIENT) < size) {
227                 rc = sptlrpc_cli_enlarge_reqbuf(req, field, size);
228                 if (rc) {
229                         CERROR("%s: Can't enlarge ea size to %d: rc = %d\n",
230                                req->rq_export->exp_obd->obd_name,
231                                size, rc);
232                         return rc;
233                 }
234         } else {
235                 req_capsule_shrink(pill, field, size, RCL_CLIENT);
236         }
237
238         req_capsule_set_size(pill, field, RCL_CLIENT, size);
239         lmm = req_capsule_client_get(pill, field);
240         if (lmm)
241                 memcpy(lmm, data, size);
242
243         return rc;
244 }
245
246 static struct ptlrpc_request *
247 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
248                      struct md_op_data *op_data, __u32 acl_bufsize)
249 {
250         struct ptlrpc_request   *req;
251         struct obd_device       *obddev = class_exp2obd(exp);
252         struct ldlm_intent      *lit;
253         const void              *lmm = op_data->op_data;
254         __u32                    lmmsize = op_data->op_data_size;
255         struct list_head         cancels = LIST_HEAD_INIT(cancels);
256         int                      count = 0;
257         enum ldlm_mode           mode;
258         int                      rc;
259         ENTRY;
260
261         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
262
263         /* XXX: openlock is not cancelled for cross-refs. */
264         /* If inode is known, cancel conflicting OPEN locks. */
265         if (fid_is_sane(&op_data->op_fid2)) {
266                 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
267                         if (it->it_flags & FMODE_WRITE)
268                                 mode = LCK_EX;
269                         else
270                                 mode = LCK_PR;
271                 } else {
272                         if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
273                                 mode = LCK_CW;
274 #ifdef FMODE_EXEC
275                         else if (it->it_flags & FMODE_EXEC)
276                                 mode = LCK_PR;
277 #endif
278                         else
279                                 mode = LCK_CR;
280                 }
281                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
282                                                 &cancels, mode,
283                                                 MDS_INODELOCK_OPEN);
284         }
285
286         /* If CREATE, cancel parent's UPDATE lock. */
287         if (it->it_op & IT_CREAT)
288                 mode = LCK_EX;
289         else
290                 mode = LCK_CR;
291         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
292                                          &cancels, mode,
293                                          MDS_INODELOCK_UPDATE);
294
295         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
296                                    &RQF_LDLM_INTENT_OPEN);
297         if (req == NULL) {
298                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
299                 RETURN(ERR_PTR(-ENOMEM));
300         }
301
302         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
303                              op_data->op_namelen + 1);
304         if (cl_is_lov_delay_create(it->it_flags)) {
305                 /* open(O_LOV_DELAY_CREATE) won't pack lmm */
306                 LASSERT(lmmsize == 0);
307                 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
308         } else {
309                 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
310                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
311         }
312
313         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
314                              RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
315                              strlen(op_data->op_file_secctx_name) + 1 : 0);
316
317         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
318                              op_data->op_file_secctx_size);
319
320         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
321         if (rc < 0) {
322                 ptlrpc_request_free(req);
323                 RETURN(ERR_PTR(rc));
324         }
325
326         spin_lock(&req->rq_lock);
327         req->rq_replay = req->rq_import->imp_replayable;
328         spin_unlock(&req->rq_lock);
329
330         /* pack the intent */
331         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
332         lit->opc = (__u64)it->it_op;
333
334         /* pack the intended request */
335         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
336                       lmmsize);
337
338         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
339                              obddev->u.cli.cl_max_mds_easize);
340         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
341         ptlrpc_request_set_replen(req);
342         return req;
343 }
344
345 #define GA_DEFAULT_EA_NAME_LEN 20
346 #define GA_DEFAULT_EA_VAL_LEN  250
347 #define GA_DEFAULT_EA_NUM      10
348
349 static struct ptlrpc_request *
350 mdc_intent_getxattr_pack(struct obd_export *exp,
351                          struct lookup_intent *it,
352                          struct md_op_data *op_data)
353 {
354         struct ptlrpc_request   *req;
355         struct ldlm_intent      *lit;
356         int                     rc, count = 0;
357         struct list_head        cancels = LIST_HEAD_INIT(cancels);
358
359         ENTRY;
360
361         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
362                                         &RQF_LDLM_INTENT_GETXATTR);
363         if (req == NULL)
364                 RETURN(ERR_PTR(-ENOMEM));
365
366         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
367         if (rc) {
368                 ptlrpc_request_free(req);
369                 RETURN(ERR_PTR(rc));
370         }
371
372         /* pack the intent */
373         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
374         lit->opc = IT_GETXATTR;
375
376         /* pack the intended request */
377         mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid,
378                       GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM,
379                       -1, 0);
380
381         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
382                              GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM);
383
384         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER,
385                              GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM);
386
387         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS, RCL_SERVER,
388                              sizeof(__u32) * GA_DEFAULT_EA_NUM);
389
390         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
391
392         ptlrpc_request_set_replen(req);
393
394         RETURN(req);
395 }
396
397 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
398                                                      struct lookup_intent *it,
399                                                      struct md_op_data *op_data)
400 {
401         struct ptlrpc_request *req;
402         struct obd_device     *obddev = class_exp2obd(exp);
403         struct ldlm_intent    *lit;
404         int                    rc;
405         ENTRY;
406
407         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
408                                    &RQF_LDLM_INTENT_UNLINK);
409         if (req == NULL)
410                 RETURN(ERR_PTR(-ENOMEM));
411
412         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
413                              op_data->op_namelen + 1);
414
415         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
416         if (rc) {
417                 ptlrpc_request_free(req);
418                 RETURN(ERR_PTR(rc));
419         }
420
421         /* pack the intent */
422         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
423         lit->opc = (__u64)it->it_op;
424
425         /* pack the intended request */
426         mdc_unlink_pack(req, op_data);
427
428         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
429                              obddev->u.cli.cl_default_mds_easize);
430         ptlrpc_request_set_replen(req);
431         RETURN(req);
432 }
433
434 static struct ptlrpc_request *
435 mdc_intent_getattr_pack(struct obd_export *exp, struct lookup_intent *it,
436                         struct md_op_data *op_data, __u32 acl_bufsize)
437 {
438         struct ptlrpc_request   *req;
439         struct obd_device       *obddev = class_exp2obd(exp);
440         u64                      valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
441                                          OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
442                                          OBD_MD_MEA | OBD_MD_FLACL;
443         struct ldlm_intent      *lit;
444         int                      rc;
445         __u32                    easize;
446         ENTRY;
447
448         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
449                                    &RQF_LDLM_INTENT_GETATTR);
450         if (req == NULL)
451                 RETURN(ERR_PTR(-ENOMEM));
452
453         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
454                              op_data->op_namelen + 1);
455
456         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
457         if (rc) {
458                 ptlrpc_request_free(req);
459                 RETURN(ERR_PTR(rc));
460         }
461
462         /* pack the intent */
463         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
464         lit->opc = (__u64)it->it_op;
465
466         if (obddev->u.cli.cl_default_mds_easize > 0)
467                 easize = obddev->u.cli.cl_default_mds_easize;
468         else
469                 easize = obddev->u.cli.cl_max_mds_easize;
470
471         /* pack the intended request */
472         mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
473
474         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
475         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
476         ptlrpc_request_set_replen(req);
477         RETURN(req);
478 }
479
480 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
481                                                      struct lookup_intent *it,
482                                                      struct md_op_data *op_data)
483 {
484         struct obd_device     *obd = class_exp2obd(exp);
485         struct ptlrpc_request *req;
486         struct ldlm_intent    *lit;
487         struct layout_intent  *layout;
488         int rc;
489         ENTRY;
490
491         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
492                                 &RQF_LDLM_INTENT_LAYOUT);
493         if (req == NULL)
494                 RETURN(ERR_PTR(-ENOMEM));
495
496         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
497         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
498         if (rc) {
499                 ptlrpc_request_free(req);
500                 RETURN(ERR_PTR(rc));
501         }
502
503         /* pack the intent */
504         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
505         lit->opc = (__u64)it->it_op;
506
507         /* pack the layout intent request */
508         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
509         LASSERT(op_data->op_data != NULL);
510         LASSERT(op_data->op_data_size == sizeof(*layout));
511         memcpy(layout, op_data->op_data, sizeof(*layout));
512
513         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
514                              obd->u.cli.cl_default_mds_easize);
515         ptlrpc_request_set_replen(req);
516         RETURN(req);
517 }
518
519 static struct ptlrpc_request *
520 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
521 {
522         struct ptlrpc_request *req;
523         int rc;
524         ENTRY;
525
526         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
527         if (req == NULL)
528                 RETURN(ERR_PTR(-ENOMEM));
529
530         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
531         if (rc) {
532                 ptlrpc_request_free(req);
533                 RETURN(ERR_PTR(rc));
534         }
535
536         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
537         ptlrpc_request_set_replen(req);
538         RETURN(req);
539 }
540
541 static int mdc_finish_enqueue(struct obd_export *exp,
542                               struct ptlrpc_request *req,
543                               struct ldlm_enqueue_info *einfo,
544                               struct lookup_intent *it,
545                               struct lustre_handle *lockh,
546                               int rc)
547 {
548         struct req_capsule  *pill = &req->rq_pill;
549         struct ldlm_request *lockreq;
550         struct ldlm_reply   *lockrep;
551         struct ldlm_lock    *lock;
552         struct mdt_body     *body = NULL;
553         void                *lvb_data = NULL;
554         __u32                lvb_len = 0;
555
556         ENTRY;
557
558         LASSERT(rc >= 0);
559         /* Similarly, if we're going to replay this request, we don't want to
560          * actually get a lock, just perform the intent. */
561         if (req->rq_transno || req->rq_replay) {
562                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
563                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
564         }
565
566         if (rc == ELDLM_LOCK_ABORTED) {
567                 einfo->ei_mode = 0;
568                 memset(lockh, 0, sizeof(*lockh));
569                 rc = 0;
570         } else { /* rc = 0 */
571                 lock = ldlm_handle2lock(lockh);
572                 LASSERT(lock != NULL);
573
574                 /* If the server gave us back a different lock mode, we should
575                  * fix up our variables. */
576                 if (lock->l_req_mode != einfo->ei_mode) {
577                         ldlm_lock_addref(lockh, lock->l_req_mode);
578                         ldlm_lock_decref(lockh, einfo->ei_mode);
579                         einfo->ei_mode = lock->l_req_mode;
580                 }
581                 LDLM_LOCK_PUT(lock);
582         }
583
584         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
585         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
586
587         it->it_disposition = (int)lockrep->lock_policy_res1;
588         it->it_status = (int)lockrep->lock_policy_res2;
589         it->it_lock_mode = einfo->ei_mode;
590         it->it_lock_handle = lockh->cookie;
591         it->it_request = req;
592
593         /* Technically speaking rq_transno must already be zero if
594          * it_status is in error, so the check is a bit redundant */
595         if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
596                 mdc_clear_replay_flag(req, it->it_status);
597
598         /* If we're doing an IT_OPEN which did not result in an actual
599          * successful open, then we need to remove the bit which saves
600          * this request for unconditional replay.
601          *
602          * It's important that we do this first!  Otherwise we might exit the
603          * function without doing so, and try to replay a failed create
604          * (bug 3440) */
605         if (it->it_op & IT_OPEN && req->rq_replay &&
606             (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
607                 mdc_clear_replay_flag(req, it->it_status);
608
609         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
610                   it->it_op, it->it_disposition, it->it_status);
611
612         /* We know what to expect, so we do any byte flipping required here */
613         if (it_has_reply_body(it)) {
614                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
615                 if (body == NULL) {
616                         CERROR ("Can't swab mdt_body\n");
617                         RETURN (-EPROTO);
618                 }
619
620                 if (it_disposition(it, DISP_OPEN_OPEN) &&
621                     !it_open_error(DISP_OPEN_OPEN, it)) {
622                         /*
623                          * If this is a successful OPEN request, we need to set
624                          * replay handler and data early, so that if replay
625                          * happens immediately after swabbing below, new reply
626                          * is swabbed by that handler correctly.
627                          */
628                         mdc_set_open_replay_data(NULL, NULL, it);
629                 }
630
631                 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
632                         void *eadata;
633
634                         mdc_update_max_ea_from_body(exp, body);
635
636                         /*
637                          * The eadata is opaque; just check that it is there.
638                          * Eventually, obd_unpackmd() will check the contents.
639                          */
640                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
641                                                         body->mbo_eadatasize);
642                         if (eadata == NULL)
643                                 RETURN(-EPROTO);
644
645                         /* save lvb data and length in case this is for layout
646                          * lock */
647                         lvb_data = eadata;
648                         lvb_len = body->mbo_eadatasize;
649
650                         /*
651                          * We save the reply LOV EA in case we have to replay a
652                          * create for recovery.  If we didn't allocate a large
653                          * enough request buffer above we need to reallocate it
654                          * here to hold the actual LOV EA.
655                          *
656                          * To not save LOV EA if request is not going to replay
657                          * (for example error one).
658                          */
659                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
660                                 rc = mdc_save_lovea(req, &RMF_EADATA, eadata,
661                                                     body->mbo_eadatasize);
662                                 if (rc) {
663                                         body->mbo_valid &= ~OBD_MD_FLEASIZE;
664                                         body->mbo_eadatasize = 0;
665                                         rc = 0;
666                                 }
667                         }
668                 }
669         } else if (it->it_op & IT_LAYOUT) {
670                 /* maybe the lock was granted right away and layout
671                  * is packed into RMF_DLM_LVB of req */
672                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
673                 if (lvb_len > 0) {
674                         lvb_data = req_capsule_server_sized_get(pill,
675                                                         &RMF_DLM_LVB, lvb_len);
676                         if (lvb_data == NULL)
677                                 RETURN(-EPROTO);
678
679                         /**
680                          * save replied layout data to the request buffer for
681                          * recovery consideration (lest MDS reinitialize
682                          * another set of OST objects).
683                          */
684                         if (req->rq_transno)
685                                 (void)mdc_save_lovea(req, &RMF_EADATA, lvb_data,
686                                                      lvb_len);
687                 }
688         }
689
690         /* fill in stripe data for layout lock.
691          * LU-6581: trust layout data only if layout lock is granted. The MDT
692          * has stopped sending layout unless the layout lock is granted. The
693          * client still does this checking in case it's talking with an old
694          * server. - Jinshan */
695         lock = ldlm_handle2lock(lockh);
696         if (lock == NULL)
697                 RETURN(rc);
698
699         if (ldlm_has_layout(lock) && lvb_data != NULL &&
700             !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
701                 void *lmm;
702
703                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
704                         ldlm_it2str(it->it_op), lvb_len);
705
706                 OBD_ALLOC_LARGE(lmm, lvb_len);
707                 if (lmm == NULL)
708                         GOTO(out_lock, rc = -ENOMEM);
709
710                 memcpy(lmm, lvb_data, lvb_len);
711
712                 /* install lvb_data */
713                 lock_res_and_lock(lock);
714                 if (lock->l_lvb_data == NULL) {
715                         lock->l_lvb_type = LVB_T_LAYOUT;
716                         lock->l_lvb_data = lmm;
717                         lock->l_lvb_len = lvb_len;
718                         lmm = NULL;
719                 }
720                 unlock_res_and_lock(lock);
721                 if (lmm != NULL)
722                         OBD_FREE_LARGE(lmm, lvb_len);
723         }
724
725         if (ldlm_has_dom(lock)) {
726                 LASSERT(lock->l_glimpse_ast == mdc_ldlm_glimpse_ast);
727
728                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
729                 if (!(body->mbo_valid & OBD_MD_DOM_SIZE)) {
730                         LDLM_ERROR(lock, "%s: DoM lock without size.\n",
731                                    exp->exp_obd->obd_name);
732                         GOTO(out_lock, rc = -EPROTO);
733                 }
734
735                 LDLM_DEBUG(lock, "DoM lock is returned by: %s, size: %llu",
736                            ldlm_it2str(it->it_op), body->mbo_dom_size);
737
738                 rc = mdc_fill_lvb(req, &lock->l_ost_lvb);
739         }
740 out_lock:
741         LDLM_LOCK_PUT(lock);
742
743         RETURN(rc);
744 }
745
746 /* We always reserve enough space in the reply packet for a stripe MD, because
747  * we don't know in advance the file type. */
748 static int mdc_enqueue_base(struct obd_export *exp,
749                             struct ldlm_enqueue_info *einfo,
750                             const union ldlm_policy_data *policy,
751                             struct lookup_intent *it,
752                             struct md_op_data *op_data,
753                             struct lustre_handle *lockh,
754                             __u64 extra_lock_flags)
755 {
756         struct obd_device *obddev = class_exp2obd(exp);
757         struct ptlrpc_request *req = NULL;
758         __u64 flags, saved_flags = extra_lock_flags;
759         struct ldlm_res_id res_id;
760         static const union ldlm_policy_data lookup_policy = {
761                                   .l_inodebits = { MDS_INODELOCK_LOOKUP } };
762         static const union ldlm_policy_data update_policy = {
763                                   .l_inodebits = { MDS_INODELOCK_UPDATE } };
764         static const union ldlm_policy_data layout_policy = {
765                                   .l_inodebits = { MDS_INODELOCK_LAYOUT } };
766         static const union ldlm_policy_data getxattr_policy = {
767                                   .l_inodebits = { MDS_INODELOCK_XATTR } };
768         int generation, resends = 0;
769         struct ldlm_reply *lockrep;
770         struct obd_import *imp = class_exp2cliimp(exp);
771         __u32 acl_bufsize;
772         enum lvb_type lvb_type = 0;
773         int rc;
774         ENTRY;
775
776         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
777                  einfo->ei_type);
778         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
779
780         if (it != NULL) {
781                 LASSERT(policy == NULL);
782
783                 saved_flags |= LDLM_FL_HAS_INTENT;
784                 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
785                         policy = &update_policy;
786                 else if (it->it_op & IT_LAYOUT)
787                         policy = &layout_policy;
788                 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
789                         policy = &getxattr_policy;
790                 else
791                         policy = &lookup_policy;
792         }
793
794         generation = obddev->u.cli.cl_import->imp_generation;
795         if (!it || (it->it_op & (IT_CREAT | IT_OPEN_CREAT)))
796                 acl_bufsize = imp->imp_connect_data.ocd_max_easize;
797         else
798                 acl_bufsize = LUSTRE_POSIX_ACL_MAX_SIZE_OLD;
799
800 resend:
801         flags = saved_flags;
802         if (it == NULL) {
803                 /* The only way right now is FLOCK. */
804                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
805                          einfo->ei_type);
806                 res_id.name[3] = LDLM_FLOCK;
807         } else if (it->it_op & IT_OPEN) {
808                 req = mdc_intent_open_pack(exp, it, op_data, acl_bufsize);
809         } else if (it->it_op & IT_UNLINK) {
810                 req = mdc_intent_unlink_pack(exp, it, op_data);
811         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
812                 req = mdc_intent_getattr_pack(exp, it, op_data, acl_bufsize);
813         } else if (it->it_op & IT_READDIR) {
814                 req = mdc_enqueue_pack(exp, 0);
815         } else if (it->it_op & IT_LAYOUT) {
816                 if (!imp_connect_lvb_type(imp))
817                         RETURN(-EOPNOTSUPP);
818                 req = mdc_intent_layout_pack(exp, it, op_data);
819                 lvb_type = LVB_T_LAYOUT;
820         } else if (it->it_op & IT_GETXATTR) {
821                 req = mdc_intent_getxattr_pack(exp, it, op_data);
822         } else {
823                 LBUG();
824                 RETURN(-EINVAL);
825         }
826
827         if (IS_ERR(req))
828                 RETURN(PTR_ERR(req));
829
830         if (resends) {
831                 req->rq_generation_set = 1;
832                 req->rq_import_generation = generation;
833                 req->rq_sent = ktime_get_real_seconds() + resends;
834         }
835
836         /* It is important to obtain modify RPC slot first (if applicable), so
837          * that threads that are waiting for a modify RPC slot are not polluting
838          * our rpcs in flight counter.
839          * We do not do flock request limiting, though */
840         if (it) {
841                 mdc_get_mod_rpc_slot(req, it);
842                 rc = obd_get_request_slot(&obddev->u.cli);
843                 if (rc != 0) {
844                         mdc_put_mod_rpc_slot(req, it);
845                         mdc_clear_replay_flag(req, 0);
846                         ptlrpc_req_finished(req);
847                         RETURN(rc);
848                 }
849         }
850
851         /* With Data-on-MDT the glimpse callback is needed too.
852          * It is set here in advance but not in mdc_finish_enqueue()
853          * to avoid possible races. It is safe to have glimpse handler
854          * for non-DOM locks and costs nothing.*/
855         if (einfo->ei_cb_gl == NULL)
856                 einfo->ei_cb_gl = mdc_ldlm_glimpse_ast;
857
858         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
859                               0, lvb_type, lockh, 0);
860         if (!it) {
861                 /* For flock requests we immediatelly return without further
862                    delay and let caller deal with the rest, since rest of
863                    this function metadata processing makes no sense for flock
864                    requests anyway. But in case of problem during comms with
865                    Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
866                    can not rely on caller and this mainly for F_UNLCKs
867                    (explicits or automatically generated by Kernel to clean
868                    current FLocks upon exit) that can't be trashed */
869                 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
870                     (einfo->ei_type == LDLM_FLOCK) &&
871                     (einfo->ei_mode == LCK_NL))
872                         goto resend;
873                 RETURN(rc);
874         }
875
876         obd_put_request_slot(&obddev->u.cli);
877         mdc_put_mod_rpc_slot(req, it);
878
879         if (rc < 0) {
880                 CDEBUG(D_INFO,
881                       "%s: ldlm_cli_enqueue "DFID":"DFID"=%s failed: rc = %d\n",
882                       obddev->obd_name, PFID(&op_data->op_fid1),
883                       PFID(&op_data->op_fid2), op_data->op_name ?: "", rc);
884
885                 mdc_clear_replay_flag(req, rc);
886                 ptlrpc_req_finished(req);
887                 RETURN(rc);
888         }
889
890         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
891         LASSERT(lockrep != NULL);
892
893         lockrep->lock_policy_res2 =
894                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
895
896         /* Retry infinitely when the server returns -EINPROGRESS for the
897          * intent operation, when server returns -EINPROGRESS for acquiring
898          * intent lock, we'll retry in after_reply(). */
899         if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
900                 mdc_clear_replay_flag(req, rc);
901                 ptlrpc_req_finished(req);
902                 if (generation == obddev->u.cli.cl_import->imp_generation) {
903                         if (signal_pending(current))
904                                 RETURN(-EINTR);
905
906                         resends++;
907                         CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
908                                obddev->obd_name, resends, it->it_op,
909                                PFID(&op_data->op_fid1),
910                                PFID(&op_data->op_fid2));
911                         goto resend;
912                 } else {
913                         CDEBUG(D_HA, "resend cross eviction\n");
914                         RETURN(-EIO);
915                 }
916         }
917
918         if ((int)lockrep->lock_policy_res2 == -ERANGE &&
919             it->it_op & (IT_OPEN | IT_GETATTR | IT_LOOKUP) &&
920             acl_bufsize != imp->imp_connect_data.ocd_max_easize) {
921                 mdc_clear_replay_flag(req, -ERANGE);
922                 ptlrpc_req_finished(req);
923                 acl_bufsize = imp->imp_connect_data.ocd_max_easize;
924                 goto resend;
925         }
926
927         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
928         if (rc < 0) {
929                 if (lustre_handle_is_used(lockh)) {
930                         ldlm_lock_decref(lockh, einfo->ei_mode);
931                         memset(lockh, 0, sizeof(*lockh));
932                 }
933                 ptlrpc_req_finished(req);
934
935                 it->it_lock_handle = 0;
936                 it->it_lock_mode = 0;
937                 it->it_request = NULL;
938         }
939
940         RETURN(rc);
941 }
942
943 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
944                 const union ldlm_policy_data *policy,
945                 struct md_op_data *op_data,
946                 struct lustre_handle *lockh, __u64 extra_lock_flags)
947 {
948         return mdc_enqueue_base(exp, einfo, policy, NULL,
949                                 op_data, lockh, extra_lock_flags);
950 }
951
952 static int mdc_finish_intent_lock(struct obd_export *exp,
953                                   struct ptlrpc_request *request,
954                                   struct md_op_data *op_data,
955                                   struct lookup_intent *it,
956                                   struct lustre_handle *lockh)
957 {
958         struct lustre_handle old_lock;
959         struct ldlm_lock *lock;
960         int rc = 0;
961         ENTRY;
962
963         LASSERT(request != NULL);
964         LASSERT(request != LP_POISON);
965         LASSERT(request->rq_repmsg != LP_POISON);
966
967         if (it->it_op & IT_READDIR)
968                 RETURN(0);
969
970         if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) {
971                 if (it->it_status != 0)
972                         GOTO(out, rc = it->it_status);
973         } else {
974                 if (!it_disposition(it, DISP_IT_EXECD)) {
975                         /* The server failed before it even started executing
976                          * the intent, i.e. because it couldn't unpack the
977                          * request.
978                          */
979                         LASSERT(it->it_status != 0);
980                         GOTO(out, rc = it->it_status);
981                 }
982                 rc = it_open_error(DISP_IT_EXECD, it);
983                 if (rc)
984                         GOTO(out, rc);
985
986                 rc = it_open_error(DISP_LOOKUP_EXECD, it);
987                 if (rc)
988                         GOTO(out, rc);
989
990                 /* keep requests around for the multiple phases of the call
991                  * this shows the DISP_XX must guarantee we make it into the
992                  * call
993                  */
994                 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
995                     it_disposition(it, DISP_OPEN_CREATE) &&
996                     !it_open_error(DISP_OPEN_CREATE, it)) {
997                         it_set_disposition(it, DISP_ENQ_CREATE_REF);
998                         /* balanced in ll_create_node */
999                         ptlrpc_request_addref(request);
1000                 }
1001                 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
1002                     it_disposition(it, DISP_OPEN_OPEN) &&
1003                     !it_open_error(DISP_OPEN_OPEN, it)) {
1004                         it_set_disposition(it, DISP_ENQ_OPEN_REF);
1005                         /* balanced in ll_file_open */
1006                         ptlrpc_request_addref(request);
1007                         /* BUG 11546 - eviction in the middle of open rpc
1008                          * processing
1009                          */
1010                         OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE,
1011                                          obd_timeout);
1012                 }
1013
1014                 if (it->it_op & IT_CREAT) {
1015                         /* XXX this belongs in ll_create_it */
1016                 } else if (it->it_op == IT_OPEN) {
1017                         LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
1018                 } else {
1019                         LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
1020                 }
1021         }
1022
1023         /* If we already have a matching lock, then cancel the new
1024          * one.  We have to set the data here instead of in
1025          * mdc_enqueue, because we need to use the child's inode as
1026          * the l_ast_data to match, and that's not available until
1027          * intent_finish has performed the iget().) */
1028         lock = ldlm_handle2lock(lockh);
1029         if (lock) {
1030                 union ldlm_policy_data policy = lock->l_policy_data;
1031                 LDLM_DEBUG(lock, "matching against this");
1032
1033                 if (it_has_reply_body(it)) {
1034                         struct mdt_body *body;
1035
1036                         body = req_capsule_server_get(&request->rq_pill,
1037                                                       &RMF_MDT_BODY);
1038                         /* mdc_enqueue checked */
1039                         LASSERT(body != NULL);
1040                         LASSERTF(fid_res_name_eq(&body->mbo_fid1,
1041                                                  &lock->l_resource->lr_name),
1042                                  "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1043                                  PLDLMRES(lock->l_resource),
1044                                  PFID(&body->mbo_fid1));
1045                 }
1046                 LDLM_LOCK_PUT(lock);
1047
1048                 memcpy(&old_lock, lockh, sizeof(*lockh));
1049                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1050                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1051                         ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
1052                         memcpy(lockh, &old_lock, sizeof(old_lock));
1053                         it->it_lock_handle = lockh->cookie;
1054                 }
1055         }
1056
1057         EXIT;
1058 out:
1059         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1060                 (int)op_data->op_namelen, op_data->op_name,
1061                 ldlm_it2str(it->it_op), it->it_status,
1062                 it->it_disposition, rc);
1063         return rc;
1064 }
1065
1066 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1067                         struct lu_fid *fid, __u64 *bits)
1068 {
1069         /* We could just return 1 immediately, but since we should only
1070          * be called in revalidate_it if we already have a lock, let's
1071          * verify that. */
1072         struct ldlm_res_id res_id;
1073         struct lustre_handle lockh;
1074         union ldlm_policy_data policy;
1075         enum ldlm_mode mode;
1076         ENTRY;
1077
1078         if (it->it_lock_handle) {
1079                 lockh.cookie = it->it_lock_handle;
1080                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1081         } else {
1082                 fid_build_reg_res_name(fid, &res_id);
1083                 switch (it->it_op) {
1084                 case IT_GETATTR:
1085                         /* File attributes are held under multiple bits:
1086                          * nlink is under lookup lock, size and times are
1087                          * under UPDATE lock and recently we've also got
1088                          * a separate permissions lock for owner/group/acl that
1089                          * were protected by lookup lock before.
1090                          * Getattr must provide all of that information,
1091                          * so we need to ensure we have all of those locks.
1092                          * Unfortunately, if the bits are split across multiple
1093                          * locks, there's no easy way to match all of them here,
1094                          * so an extra RPC would be performed to fetch all
1095                          * of those bits at once for now. */
1096                         /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1097                          * but for old MDTs (< 2.4), permission is covered
1098                          * by LOOKUP lock, so it needs to match all bits here.*/
1099                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1100                                                   MDS_INODELOCK_LOOKUP |
1101                                                   MDS_INODELOCK_PERM;
1102                         break;
1103                 case IT_READDIR:
1104                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1105                         break;
1106                 case IT_LAYOUT:
1107                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1108                         break;
1109                 default:
1110                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1111                         break;
1112                 }
1113
1114                 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1115                                       LDLM_IBITS, &policy,
1116                                       LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1117                                       &lockh);
1118         }
1119
1120         if (mode) {
1121                 it->it_lock_handle = lockh.cookie;
1122                 it->it_lock_mode = mode;
1123         } else {
1124                 it->it_lock_handle = 0;
1125                 it->it_lock_mode = 0;
1126         }
1127
1128         RETURN(!!mode);
1129 }
1130
1131 /*
1132  * This long block is all about fixing up the lock and request state
1133  * so that it is correct as of the moment _before_ the operation was
1134  * applied; that way, the VFS will think that everything is normal and
1135  * call Lustre's regular VFS methods.
1136  *
1137  * If we're performing a creation, that means that unless the creation
1138  * failed with EEXIST, we should fake up a negative dentry.
1139  *
1140  * For everything else, we want to lookup to succeed.
1141  *
1142  * One additional note: if CREATE or OPEN succeeded, we add an extra
1143  * reference to the request because we need to keep it around until
1144  * ll_create/ll_open gets called.
1145  *
1146  * The server will return to us, in it_disposition, an indication of
1147  * exactly what it_status refers to.
1148  *
1149  * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1150  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1151  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1152  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1153  * was successful.
1154  *
1155  * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1156  * child lookup.
1157  */
1158 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1159                     struct lookup_intent *it, struct ptlrpc_request **reqp,
1160                     ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1161 {
1162         struct ldlm_enqueue_info einfo = {
1163                 .ei_type        = LDLM_IBITS,
1164                 .ei_mode        = it_to_lock_mode(it),
1165                 .ei_cb_bl       = cb_blocking,
1166                 .ei_cb_cp       = ldlm_completion_ast,
1167                 .ei_cb_gl       = mdc_ldlm_glimpse_ast,
1168         };
1169         struct lustre_handle lockh;
1170         int rc = 0;
1171         ENTRY;
1172         LASSERT(it);
1173
1174         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1175                 ", intent: %s flags %#llo\n", (int)op_data->op_namelen,
1176                 op_data->op_name, PFID(&op_data->op_fid2),
1177                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1178                 it->it_flags);
1179
1180         lockh.cookie = 0;
1181         if (fid_is_sane(&op_data->op_fid2) &&
1182             (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1183                 /* We could just return 1 immediately, but since we should only
1184                  * be called in revalidate_it if we already have a lock, let's
1185                  * verify that. */
1186                 it->it_lock_handle = 0;
1187                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1188                 /* Only return failure if it was not GETATTR by cfid
1189                    (from inode_revalidate) */
1190                 if (rc || op_data->op_namelen != 0)
1191                         RETURN(rc);
1192         }
1193
1194         /* For case if upper layer did not alloc fid, do it now. */
1195         if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1196                 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1197                 if (rc < 0) {
1198                         CERROR("Can't alloc new fid, rc %d\n", rc);
1199                         RETURN(rc);
1200                 }
1201         }
1202
1203         rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh,
1204                               extra_lock_flags);
1205         if (rc < 0)
1206                 RETURN(rc);
1207
1208         *reqp = it->it_request;
1209         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1210         RETURN(rc);
1211 }
1212
1213 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1214                                               struct ptlrpc_request *req,
1215                                               void *args, int rc)
1216 {
1217         struct mdc_getattr_args  *ga = args;
1218         struct obd_export        *exp = ga->ga_exp;
1219         struct md_enqueue_info   *minfo = ga->ga_minfo;
1220         struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1221         struct lookup_intent     *it;
1222         struct lustre_handle     *lockh;
1223         struct obd_device        *obddev;
1224         struct ldlm_reply        *lockrep;
1225         __u64                     flags = LDLM_FL_HAS_INTENT;
1226         ENTRY;
1227
1228         it    = &minfo->mi_it;
1229         lockh = &minfo->mi_lockh;
1230
1231         obddev = class_exp2obd(exp);
1232
1233         obd_put_request_slot(&obddev->u.cli);
1234         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1235                 rc = -ETIMEDOUT;
1236
1237         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1238                                    &flags, NULL, 0, lockh, rc);
1239         if (rc < 0) {
1240                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1241                 mdc_clear_replay_flag(req, rc);
1242                 GOTO(out, rc);
1243         }
1244
1245         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1246         LASSERT(lockrep != NULL);
1247
1248         lockrep->lock_policy_res2 =
1249                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1250
1251         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1252         if (rc)
1253                 GOTO(out, rc);
1254
1255         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1256         EXIT;
1257
1258 out:
1259         minfo->mi_cb(req, minfo, rc);
1260         return 0;
1261 }
1262
1263 int mdc_intent_getattr_async(struct obd_export *exp,
1264                              struct md_enqueue_info *minfo)
1265 {
1266         struct md_op_data       *op_data = &minfo->mi_data;
1267         struct lookup_intent    *it = &minfo->mi_it;
1268         struct ptlrpc_request   *req;
1269         struct mdc_getattr_args *ga;
1270         struct obd_device       *obddev = class_exp2obd(exp);
1271         struct ldlm_res_id       res_id;
1272         union ldlm_policy_data policy = {
1273                                 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1274                                                  MDS_INODELOCK_UPDATE } };
1275         int                      rc = 0;
1276         __u64                    flags = LDLM_FL_HAS_INTENT;
1277         ENTRY;
1278
1279         CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#llo\n",
1280                 (int)op_data->op_namelen, op_data->op_name,
1281                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1282
1283         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1284         /* If the MDT return -ERANGE because of large ACL, then the sponsor
1285          * of the async getattr RPC will handle that by itself. */
1286         req = mdc_intent_getattr_pack(exp, it, op_data,
1287                                       LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
1288         if (IS_ERR(req))
1289                 RETURN(PTR_ERR(req));
1290
1291         rc = obd_get_request_slot(&obddev->u.cli);
1292         if (rc != 0) {
1293                 ptlrpc_req_finished(req);
1294                 RETURN(rc);
1295         }
1296
1297         /* With Data-on-MDT the glimpse callback is needed too.
1298          * It is set here in advance but not in mdc_finish_enqueue()
1299          * to avoid possible races. It is safe to have glimpse handler
1300          * for non-DOM locks and costs nothing.*/
1301         if (minfo->mi_einfo.ei_cb_gl == NULL)
1302                 minfo->mi_einfo.ei_cb_gl = mdc_ldlm_glimpse_ast;
1303
1304         rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1305                               &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1306         if (rc < 0) {
1307                 obd_put_request_slot(&obddev->u.cli);
1308                 ptlrpc_req_finished(req);
1309                 RETURN(rc);
1310         }
1311
1312         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1313         ga = ptlrpc_req_async_args(req);
1314         ga->ga_exp = exp;
1315         ga->ga_minfo = minfo;
1316
1317         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1318         ptlrpcd_add_req(req);
1319
1320         RETURN(0);
1321 }