Whamcloud - gitweb
LU-3285 merge: 'dom' branch merging
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_MDC
34
35 #include <linux/module.h>
36
37 #include <obd.h>
38 #include <obd_class.h>
39 #include <lustre_dlm.h>
40 #include <lustre_fid.h>
41 #include <lustre_intent.h>
42 #include <lustre_mdc.h>
43 #include <lustre_net.h>
44 #include <lustre_req_layout.h>
45 #include <lustre_swab.h>
46
47 #include "mdc_internal.h"
48
49 struct mdc_getattr_args {
50         struct obd_export               *ga_exp;
51         struct md_enqueue_info          *ga_minfo;
52 };
53
54 int it_open_error(int phase, struct lookup_intent *it)
55 {
56         if (it_disposition(it, DISP_OPEN_LEASE)) {
57                 if (phase >= DISP_OPEN_LEASE)
58                         return it->it_status;
59                 else
60                         return 0;
61         }
62         if (it_disposition(it, DISP_OPEN_OPEN)) {
63                 if (phase >= DISP_OPEN_OPEN)
64                         return it->it_status;
65                 else
66                         return 0;
67         }
68
69         if (it_disposition(it, DISP_OPEN_CREATE)) {
70                 if (phase >= DISP_OPEN_CREATE)
71                         return it->it_status;
72                 else
73                         return 0;
74         }
75
76         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
77                 if (phase >= DISP_LOOKUP_EXECD)
78                         return it->it_status;
79                 else
80                         return 0;
81         }
82
83         if (it_disposition(it, DISP_IT_EXECD)) {
84                 if (phase >= DISP_IT_EXECD)
85                         return it->it_status;
86                 else
87                         return 0;
88         }
89
90         CERROR("it disp: %X, status: %d\n", it->it_disposition, it->it_status);
91         LBUG();
92
93         return 0;
94 }
95 EXPORT_SYMBOL(it_open_error);
96
97 /* this must be called on a lockh that is known to have a referenced lock */
98 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
99                       void *data, __u64 *bits)
100 {
101         struct ldlm_lock *lock;
102         struct inode *new_inode = data;
103         ENTRY;
104
105         if(bits)
106                 *bits = 0;
107
108         if (!lustre_handle_is_used(lockh))
109                 RETURN(0);
110
111         lock = ldlm_handle2lock(lockh);
112
113         LASSERT(lock != NULL);
114         lock_res_and_lock(lock);
115         if (lock->l_resource->lr_lvb_inode &&
116             lock->l_resource->lr_lvb_inode != data) {
117                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
118                 LASSERTF(old_inode->i_state & I_FREEING,
119                          "Found existing inode %p/%lu/%u state %lu in lock: "
120                          "setting data to %p/%lu/%u\n", old_inode,
121                          old_inode->i_ino, old_inode->i_generation,
122                          old_inode->i_state,
123                          new_inode, new_inode->i_ino, new_inode->i_generation);
124         }
125         lock->l_resource->lr_lvb_inode = new_inode;
126         if (bits)
127                 *bits = lock->l_policy_data.l_inodebits.bits;
128
129         unlock_res_and_lock(lock);
130         LDLM_LOCK_PUT(lock);
131
132         RETURN(0);
133 }
134
135 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
136                               const struct lu_fid *fid, enum ldlm_type type,
137                               union ldlm_policy_data *policy,
138                               enum ldlm_mode mode, struct lustre_handle *lockh)
139 {
140         struct ldlm_res_id res_id;
141         enum ldlm_mode rc;
142         ENTRY;
143
144         fid_build_reg_res_name(fid, &res_id);
145         /* LU-4405: Clear bits not supported by server */
146         policy->l_inodebits.bits &= exp_connect_ibits(exp);
147         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
148                              &res_id, type, policy, mode, lockh, 0);
149         RETURN(rc);
150 }
151
152 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
153                       union ldlm_policy_data *policy, enum ldlm_mode mode,
154                       enum ldlm_cancel_flags flags, void *opaque)
155 {
156         struct obd_device *obd = class_exp2obd(exp);
157         struct ldlm_res_id res_id;
158         int rc;
159
160         ENTRY;
161
162         fid_build_reg_res_name(fid, &res_id);
163         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
164                                              policy, mode, flags, opaque);
165         RETURN(rc);
166 }
167
168 int mdc_null_inode(struct obd_export *exp,
169                    const struct lu_fid *fid)
170 {
171         struct ldlm_res_id res_id;
172         struct ldlm_resource *res;
173         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
174         ENTRY;
175
176         LASSERTF(ns != NULL, "no namespace passed\n");
177
178         fid_build_reg_res_name(fid, &res_id);
179
180         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
181         if (IS_ERR(res))
182                 RETURN(0);
183
184         lock_res(res);
185         res->lr_lvb_inode = NULL;
186         unlock_res(res);
187
188         ldlm_resource_putref(res);
189         RETURN(0);
190 }
191
192 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
193 {
194         /* Don't hold error requests for replay. */
195         if (req->rq_replay) {
196                 spin_lock(&req->rq_lock);
197                 req->rq_replay = 0;
198                 spin_unlock(&req->rq_lock);
199         }
200         if (rc && req->rq_transno != 0) {
201                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
202                 LBUG();
203         }
204 }
205
206 /* Save a large LOV EA into the request buffer so that it is available
207  * for replay.  We don't do this in the initial request because the
208  * original request doesn't need this buffer (at most it sends just the
209  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
210  * buffer and may also be difficult to allocate and save a very large
211  * request buffer for each open. (bug 5707)
212  *
213  * OOM here may cause recovery failure if lmm is needed (only for the
214  * original open if the MDS crashed just when this client also OOM'd)
215  * but this is incredibly unlikely, and questionable whether the client
216  * could do MDS recovery under OOM anyways... */
217 int mdc_save_lovea(struct ptlrpc_request *req,
218                    const struct req_msg_field *field,
219                    void *data, u32 size)
220 {
221         struct req_capsule *pill = &req->rq_pill;
222         void *lmm;
223         int rc = 0;
224
225         if (req_capsule_get_size(pill, field, RCL_CLIENT) < size) {
226                 rc = sptlrpc_cli_enlarge_reqbuf(req, field, size);
227                 if (rc) {
228                         CERROR("%s: Can't enlarge ea size to %d: rc = %d\n",
229                                req->rq_export->exp_obd->obd_name,
230                                size, rc);
231                         return rc;
232                 }
233         } else {
234                 req_capsule_shrink(pill, field, size, RCL_CLIENT);
235         }
236
237         req_capsule_set_size(pill, field, RCL_CLIENT, size);
238         lmm = req_capsule_client_get(pill, field);
239         if (lmm)
240                 memcpy(lmm, data, size);
241
242         return rc;
243 }
244
245 static struct ptlrpc_request *
246 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
247                      struct md_op_data *op_data)
248 {
249         struct ptlrpc_request   *req;
250         struct obd_device       *obddev = class_exp2obd(exp);
251         struct ldlm_intent      *lit;
252         const void              *lmm = op_data->op_data;
253         __u32                    lmmsize = op_data->op_data_size;
254         struct list_head         cancels = LIST_HEAD_INIT(cancels);
255         int                      count = 0;
256         enum ldlm_mode           mode;
257         int                      rc;
258         ENTRY;
259
260         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
261
262         /* XXX: openlock is not cancelled for cross-refs. */
263         /* If inode is known, cancel conflicting OPEN locks. */
264         if (fid_is_sane(&op_data->op_fid2)) {
265                 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
266                         if (it->it_flags & FMODE_WRITE)
267                                 mode = LCK_EX;
268                         else
269                                 mode = LCK_PR;
270                 } else {
271                         if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
272                                 mode = LCK_CW;
273 #ifdef FMODE_EXEC
274                         else if (it->it_flags & FMODE_EXEC)
275                                 mode = LCK_PR;
276 #endif
277                         else
278                                 mode = LCK_CR;
279                 }
280                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
281                                                 &cancels, mode,
282                                                 MDS_INODELOCK_OPEN);
283         }
284
285         /* If CREATE, cancel parent's UPDATE lock. */
286         if (it->it_op & IT_CREAT)
287                 mode = LCK_EX;
288         else
289                 mode = LCK_CR;
290         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
291                                          &cancels, mode,
292                                          MDS_INODELOCK_UPDATE);
293
294         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
295                                    &RQF_LDLM_INTENT_OPEN);
296         if (req == NULL) {
297                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
298                 RETURN(ERR_PTR(-ENOMEM));
299         }
300
301         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
302                              op_data->op_namelen + 1);
303         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
304                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
305
306         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX_NAME,
307                              RCL_CLIENT, op_data->op_file_secctx_name != NULL ?
308                              strlen(op_data->op_file_secctx_name) + 1 : 0);
309
310         req_capsule_set_size(&req->rq_pill, &RMF_FILE_SECCTX, RCL_CLIENT,
311                              op_data->op_file_secctx_size);
312
313         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
314         if (rc < 0) {
315                 ptlrpc_request_free(req);
316                 RETURN(ERR_PTR(rc));
317         }
318
319         spin_lock(&req->rq_lock);
320         req->rq_replay = req->rq_import->imp_replayable;
321         spin_unlock(&req->rq_lock);
322
323         /* pack the intent */
324         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
325         lit->opc = (__u64)it->it_op;
326
327         /* pack the intended request */
328         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
329                       lmmsize);
330
331         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
332                              obddev->u.cli.cl_max_mds_easize);
333         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
334                              req->rq_import->imp_connect_data.ocd_max_easize);
335         ptlrpc_request_set_replen(req);
336         return req;
337 }
338
339 #define GA_DEFAULT_EA_NAME_LEN 20
340 #define GA_DEFAULT_EA_VAL_LEN  250
341 #define GA_DEFAULT_EA_NUM      10
342
343 static struct ptlrpc_request *
344 mdc_intent_getxattr_pack(struct obd_export *exp,
345                          struct lookup_intent *it,
346                          struct md_op_data *op_data)
347 {
348         struct ptlrpc_request   *req;
349         struct ldlm_intent      *lit;
350         int                     rc, count = 0;
351         struct list_head        cancels = LIST_HEAD_INIT(cancels);
352
353         ENTRY;
354
355         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
356                                         &RQF_LDLM_INTENT_GETXATTR);
357         if (req == NULL)
358                 RETURN(ERR_PTR(-ENOMEM));
359
360         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
361         if (rc) {
362                 ptlrpc_request_free(req);
363                 RETURN(ERR_PTR(rc));
364         }
365
366         /* pack the intent */
367         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
368         lit->opc = IT_GETXATTR;
369
370         /* pack the intended request */
371         mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid,
372                       GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM,
373                       -1, 0);
374
375         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER,
376                              GA_DEFAULT_EA_NAME_LEN * GA_DEFAULT_EA_NUM);
377
378         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER,
379                              GA_DEFAULT_EA_VAL_LEN * GA_DEFAULT_EA_NUM);
380
381         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS, RCL_SERVER,
382                              sizeof(__u32) * GA_DEFAULT_EA_NUM);
383
384         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0);
385
386         ptlrpc_request_set_replen(req);
387
388         RETURN(req);
389 }
390
391 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
392                                                      struct lookup_intent *it,
393                                                      struct md_op_data *op_data)
394 {
395         struct ptlrpc_request *req;
396         struct obd_device     *obddev = class_exp2obd(exp);
397         struct ldlm_intent    *lit;
398         int                    rc;
399         ENTRY;
400
401         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
402                                    &RQF_LDLM_INTENT_UNLINK);
403         if (req == NULL)
404                 RETURN(ERR_PTR(-ENOMEM));
405
406         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
407                              op_data->op_namelen + 1);
408
409         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
410         if (rc) {
411                 ptlrpc_request_free(req);
412                 RETURN(ERR_PTR(rc));
413         }
414
415         /* pack the intent */
416         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
417         lit->opc = (__u64)it->it_op;
418
419         /* pack the intended request */
420         mdc_unlink_pack(req, op_data);
421
422         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
423                              obddev->u.cli.cl_default_mds_easize);
424         ptlrpc_request_set_replen(req);
425         RETURN(req);
426 }
427
428 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
429                                                       struct lookup_intent *it,
430                                                       struct md_op_data *op_data)
431 {
432         struct ptlrpc_request   *req;
433         struct obd_device       *obddev = class_exp2obd(exp);
434         u64                      valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
435                                          OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
436                                          OBD_MD_MEA | OBD_MD_FLACL;
437         struct ldlm_intent      *lit;
438         int                      rc;
439         __u32                    easize;
440         ENTRY;
441
442         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
443                                    &RQF_LDLM_INTENT_GETATTR);
444         if (req == NULL)
445                 RETURN(ERR_PTR(-ENOMEM));
446
447         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
448                              op_data->op_namelen + 1);
449
450         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
451         if (rc) {
452                 ptlrpc_request_free(req);
453                 RETURN(ERR_PTR(rc));
454         }
455
456         /* pack the intent */
457         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
458         lit->opc = (__u64)it->it_op;
459
460         if (obddev->u.cli.cl_default_mds_easize > 0)
461                 easize = obddev->u.cli.cl_default_mds_easize;
462         else
463                 easize = obddev->u.cli.cl_max_mds_easize;
464
465         /* pack the intended request */
466         mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
467
468         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
469         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
470                              req->rq_import->imp_connect_data.ocd_max_easize);
471         ptlrpc_request_set_replen(req);
472         RETURN(req);
473 }
474
475 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
476                                                      struct lookup_intent *it,
477                                                      struct md_op_data *op_data)
478 {
479         struct obd_device     *obd = class_exp2obd(exp);
480         struct ptlrpc_request *req;
481         struct ldlm_intent    *lit;
482         struct layout_intent  *layout;
483         int rc;
484         ENTRY;
485
486         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
487                                 &RQF_LDLM_INTENT_LAYOUT);
488         if (req == NULL)
489                 RETURN(ERR_PTR(-ENOMEM));
490
491         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
492         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
493         if (rc) {
494                 ptlrpc_request_free(req);
495                 RETURN(ERR_PTR(rc));
496         }
497
498         /* pack the intent */
499         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
500         lit->opc = (__u64)it->it_op;
501
502         /* pack the layout intent request */
503         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
504         LASSERT(op_data->op_data != NULL);
505         LASSERT(op_data->op_data_size == sizeof(*layout));
506         memcpy(layout, op_data->op_data, sizeof(*layout));
507
508         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
509                              obd->u.cli.cl_default_mds_easize);
510         ptlrpc_request_set_replen(req);
511         RETURN(req);
512 }
513
514 static struct ptlrpc_request *
515 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
516 {
517         struct ptlrpc_request *req;
518         int rc;
519         ENTRY;
520
521         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
522         if (req == NULL)
523                 RETURN(ERR_PTR(-ENOMEM));
524
525         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
526         if (rc) {
527                 ptlrpc_request_free(req);
528                 RETURN(ERR_PTR(rc));
529         }
530
531         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
532         ptlrpc_request_set_replen(req);
533         RETURN(req);
534 }
535
536 static int mdc_finish_enqueue(struct obd_export *exp,
537                               struct ptlrpc_request *req,
538                               struct ldlm_enqueue_info *einfo,
539                               struct lookup_intent *it,
540                               struct lustre_handle *lockh,
541                               int rc)
542 {
543         struct req_capsule  *pill = &req->rq_pill;
544         struct ldlm_request *lockreq;
545         struct ldlm_reply   *lockrep;
546         struct ldlm_lock    *lock;
547         struct mdt_body     *body = NULL;
548         void                *lvb_data = NULL;
549         __u32                lvb_len = 0;
550
551         ENTRY;
552
553         LASSERT(rc >= 0);
554         /* Similarly, if we're going to replay this request, we don't want to
555          * actually get a lock, just perform the intent. */
556         if (req->rq_transno || req->rq_replay) {
557                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
558                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
559         }
560
561         if (rc == ELDLM_LOCK_ABORTED) {
562                 einfo->ei_mode = 0;
563                 memset(lockh, 0, sizeof(*lockh));
564                 rc = 0;
565         } else { /* rc = 0 */
566                 lock = ldlm_handle2lock(lockh);
567                 LASSERT(lock != NULL);
568
569                 /* If the server gave us back a different lock mode, we should
570                  * fix up our variables. */
571                 if (lock->l_req_mode != einfo->ei_mode) {
572                         ldlm_lock_addref(lockh, lock->l_req_mode);
573                         ldlm_lock_decref(lockh, einfo->ei_mode);
574                         einfo->ei_mode = lock->l_req_mode;
575                 }
576                 LDLM_LOCK_PUT(lock);
577         }
578
579         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
580         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
581
582         it->it_disposition = (int)lockrep->lock_policy_res1;
583         it->it_status = (int)lockrep->lock_policy_res2;
584         it->it_lock_mode = einfo->ei_mode;
585         it->it_lock_handle = lockh->cookie;
586         it->it_request = req;
587
588         /* Technically speaking rq_transno must already be zero if
589          * it_status is in error, so the check is a bit redundant */
590         if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
591                 mdc_clear_replay_flag(req, it->it_status);
592
593         /* If we're doing an IT_OPEN which did not result in an actual
594          * successful open, then we need to remove the bit which saves
595          * this request for unconditional replay.
596          *
597          * It's important that we do this first!  Otherwise we might exit the
598          * function without doing so, and try to replay a failed create
599          * (bug 3440) */
600         if (it->it_op & IT_OPEN && req->rq_replay &&
601             (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
602                 mdc_clear_replay_flag(req, it->it_status);
603
604         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
605                   it->it_op, it->it_disposition, it->it_status);
606
607         /* We know what to expect, so we do any byte flipping required here */
608         if (it_has_reply_body(it)) {
609                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
610                 if (body == NULL) {
611                         CERROR ("Can't swab mdt_body\n");
612                         RETURN (-EPROTO);
613                 }
614
615                 if (it_disposition(it, DISP_OPEN_OPEN) &&
616                     !it_open_error(DISP_OPEN_OPEN, it)) {
617                         /*
618                          * If this is a successful OPEN request, we need to set
619                          * replay handler and data early, so that if replay
620                          * happens immediately after swabbing below, new reply
621                          * is swabbed by that handler correctly.
622                          */
623                         mdc_set_open_replay_data(NULL, NULL, it);
624                 }
625
626                 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
627                         void *eadata;
628
629                         mdc_update_max_ea_from_body(exp, body);
630
631                         /*
632                          * The eadata is opaque; just check that it is there.
633                          * Eventually, obd_unpackmd() will check the contents.
634                          */
635                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
636                                                         body->mbo_eadatasize);
637                         if (eadata == NULL)
638                                 RETURN(-EPROTO);
639
640                         /* save lvb data and length in case this is for layout
641                          * lock */
642                         lvb_data = eadata;
643                         lvb_len = body->mbo_eadatasize;
644
645                         /*
646                          * We save the reply LOV EA in case we have to replay a
647                          * create for recovery.  If we didn't allocate a large
648                          * enough request buffer above we need to reallocate it
649                          * here to hold the actual LOV EA.
650                          *
651                          * To not save LOV EA if request is not going to replay
652                          * (for example error one).
653                          */
654                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
655                                 rc = mdc_save_lovea(req, &RMF_EADATA, eadata,
656                                                     body->mbo_eadatasize);
657                                 if (rc) {
658                                         body->mbo_valid &= ~OBD_MD_FLEASIZE;
659                                         body->mbo_eadatasize = 0;
660                                         rc = 0;
661                                 }
662                         }
663                 }
664         } else if (it->it_op & IT_LAYOUT) {
665                 /* maybe the lock was granted right away and layout
666                  * is packed into RMF_DLM_LVB of req */
667                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
668                 if (lvb_len > 0) {
669                         lvb_data = req_capsule_server_sized_get(pill,
670                                                         &RMF_DLM_LVB, lvb_len);
671                         if (lvb_data == NULL)
672                                 RETURN(-EPROTO);
673
674                         /**
675                          * save replied layout data to the request buffer for
676                          * recovery consideration (lest MDS reinitialize
677                          * another set of OST objects).
678                          */
679                         if (req->rq_transno)
680                                 (void)mdc_save_lovea(req, &RMF_EADATA, lvb_data,
681                                                      lvb_len);
682                 }
683         }
684
685         /* fill in stripe data for layout lock.
686          * LU-6581: trust layout data only if layout lock is granted. The MDT
687          * has stopped sending layout unless the layout lock is granted. The
688          * client still does this checking in case it's talking with an old
689          * server. - Jinshan */
690         lock = ldlm_handle2lock(lockh);
691         if (lock == NULL)
692                 RETURN(rc);
693
694         if (ldlm_has_layout(lock) && lvb_data != NULL &&
695             !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
696                 void *lmm;
697
698                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
699                         ldlm_it2str(it->it_op), lvb_len);
700
701                 OBD_ALLOC_LARGE(lmm, lvb_len);
702                 if (lmm == NULL)
703                         GOTO(out_lock, rc = -ENOMEM);
704
705                 memcpy(lmm, lvb_data, lvb_len);
706
707                 /* install lvb_data */
708                 lock_res_and_lock(lock);
709                 if (lock->l_lvb_data == NULL) {
710                         lock->l_lvb_type = LVB_T_LAYOUT;
711                         lock->l_lvb_data = lmm;
712                         lock->l_lvb_len = lvb_len;
713                         lmm = NULL;
714                 }
715                 unlock_res_and_lock(lock);
716                 if (lmm != NULL)
717                         OBD_FREE_LARGE(lmm, lvb_len);
718         }
719
720         if (ldlm_has_dom(lock)) {
721                 LASSERT(lock->l_glimpse_ast == mdc_ldlm_glimpse_ast);
722
723                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
724                 if (!(body->mbo_valid & OBD_MD_DOM_SIZE)) {
725                         LDLM_ERROR(lock, "%s: DoM lock without size.\n",
726                                    exp->exp_obd->obd_name);
727                         GOTO(out_lock, rc = -EPROTO);
728                 }
729
730                 LDLM_DEBUG(lock, "DoM lock is returned by: %s, size: %llu",
731                            ldlm_it2str(it->it_op), body->mbo_dom_size);
732
733                 rc = mdc_fill_lvb(req, &lock->l_ost_lvb);
734         }
735 out_lock:
736         LDLM_LOCK_PUT(lock);
737
738         RETURN(rc);
739 }
740
741 /* We always reserve enough space in the reply packet for a stripe MD, because
742  * we don't know in advance the file type. */
743 static int mdc_enqueue_base(struct obd_export *exp,
744                             struct ldlm_enqueue_info *einfo,
745                             const union ldlm_policy_data *policy,
746                             struct lookup_intent *it,
747                             struct md_op_data *op_data,
748                             struct lustre_handle *lockh,
749                             __u64 extra_lock_flags)
750 {
751         struct obd_device *obddev = class_exp2obd(exp);
752         struct ptlrpc_request *req = NULL;
753         __u64 flags, saved_flags = extra_lock_flags;
754         struct ldlm_res_id res_id;
755         static const union ldlm_policy_data lookup_policy = {
756                                   .l_inodebits = { MDS_INODELOCK_LOOKUP } };
757         static const union ldlm_policy_data update_policy = {
758                                   .l_inodebits = { MDS_INODELOCK_UPDATE } };
759         static const union ldlm_policy_data layout_policy = {
760                                   .l_inodebits = { MDS_INODELOCK_LAYOUT } };
761         static const union ldlm_policy_data getxattr_policy = {
762                                   .l_inodebits = { MDS_INODELOCK_XATTR } };
763         int generation, resends = 0;
764         struct ldlm_reply *lockrep;
765         enum lvb_type lvb_type = 0;
766         int rc;
767         ENTRY;
768
769         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
770                  einfo->ei_type);
771         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
772
773         if (it != NULL) {
774                 LASSERT(policy == NULL);
775
776                 saved_flags |= LDLM_FL_HAS_INTENT;
777                 if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
778                         policy = &update_policy;
779                 else if (it->it_op & IT_LAYOUT)
780                         policy = &layout_policy;
781                 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
782                         policy = &getxattr_policy;
783                 else
784                         policy = &lookup_policy;
785         }
786
787         generation = obddev->u.cli.cl_import->imp_generation;
788 resend:
789         flags = saved_flags;
790         if (it == NULL) {
791                 /* The only way right now is FLOCK. */
792                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
793                          einfo->ei_type);
794                 res_id.name[3] = LDLM_FLOCK;
795         } else if (it->it_op & IT_OPEN) {
796                 req = mdc_intent_open_pack(exp, it, op_data);
797         } else if (it->it_op & IT_UNLINK) {
798                 req = mdc_intent_unlink_pack(exp, it, op_data);
799         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
800                 req = mdc_intent_getattr_pack(exp, it, op_data);
801         } else if (it->it_op & IT_READDIR) {
802                 req = mdc_enqueue_pack(exp, 0);
803         } else if (it->it_op & IT_LAYOUT) {
804                 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
805                         RETURN(-EOPNOTSUPP);
806                 req = mdc_intent_layout_pack(exp, it, op_data);
807                 lvb_type = LVB_T_LAYOUT;
808         } else if (it->it_op & IT_GETXATTR) {
809                 req = mdc_intent_getxattr_pack(exp, it, op_data);
810         } else {
811                 LBUG();
812                 RETURN(-EINVAL);
813         }
814
815         if (IS_ERR(req))
816                 RETURN(PTR_ERR(req));
817
818         if (resends) {
819                 req->rq_generation_set = 1;
820                 req->rq_import_generation = generation;
821                 req->rq_sent = ktime_get_real_seconds() + resends;
822         }
823
824         /* It is important to obtain modify RPC slot first (if applicable), so
825          * that threads that are waiting for a modify RPC slot are not polluting
826          * our rpcs in flight counter.
827          * We do not do flock request limiting, though */
828         if (it) {
829                 mdc_get_mod_rpc_slot(req, it);
830                 rc = obd_get_request_slot(&obddev->u.cli);
831                 if (rc != 0) {
832                         mdc_put_mod_rpc_slot(req, it);
833                         mdc_clear_replay_flag(req, 0);
834                         ptlrpc_req_finished(req);
835                         RETURN(rc);
836                 }
837         }
838
839         /* With Data-on-MDT the glimpse callback is needed too.
840          * It is set here in advance but not in mdc_finish_enqueue()
841          * to avoid possible races. It is safe to have glimpse handler
842          * for non-DOM locks and costs nothing.*/
843         if (einfo->ei_cb_gl == NULL)
844                 einfo->ei_cb_gl = mdc_ldlm_glimpse_ast;
845
846         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
847                               0, lvb_type, lockh, 0);
848         if (!it) {
849                 /* For flock requests we immediatelly return without further
850                    delay and let caller deal with the rest, since rest of
851                    this function metadata processing makes no sense for flock
852                    requests anyway. But in case of problem during comms with
853                    Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
854                    can not rely on caller and this mainly for F_UNLCKs
855                    (explicits or automatically generated by Kernel to clean
856                    current FLocks upon exit) that can't be trashed */
857                 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
858                     (einfo->ei_type == LDLM_FLOCK) &&
859                     (einfo->ei_mode == LCK_NL))
860                         goto resend;
861                 RETURN(rc);
862         }
863
864         obd_put_request_slot(&obddev->u.cli);
865         mdc_put_mod_rpc_slot(req, it);
866
867         if (rc < 0) {
868                 CDEBUG(D_INFO,
869                       "%s: ldlm_cli_enqueue "DFID":"DFID"=%s failed: rc = %d\n",
870                       obddev->obd_name, PFID(&op_data->op_fid1),
871                       PFID(&op_data->op_fid2), op_data->op_name ?: "", rc);
872
873                 mdc_clear_replay_flag(req, rc);
874                 ptlrpc_req_finished(req);
875                 RETURN(rc);
876         }
877
878         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
879         LASSERT(lockrep != NULL);
880
881         lockrep->lock_policy_res2 =
882                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
883
884         /* Retry infinitely when the server returns -EINPROGRESS for the
885          * intent operation, when server returns -EINPROGRESS for acquiring
886          * intent lock, we'll retry in after_reply(). */
887         if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
888                 mdc_clear_replay_flag(req, rc);
889                 ptlrpc_req_finished(req);
890                 resends++;
891
892                 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
893                        obddev->obd_name, resends, it->it_op,
894                        PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
895
896                 if (generation == obddev->u.cli.cl_import->imp_generation) {
897                         goto resend;
898                 } else {
899                         CDEBUG(D_HA, "resend cross eviction\n");
900                         RETURN(-EIO);
901                 }
902         }
903
904         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
905         if (rc < 0) {
906                 if (lustre_handle_is_used(lockh)) {
907                         ldlm_lock_decref(lockh, einfo->ei_mode);
908                         memset(lockh, 0, sizeof(*lockh));
909                 }
910                 ptlrpc_req_finished(req);
911
912                 it->it_lock_handle = 0;
913                 it->it_lock_mode = 0;
914                 it->it_request = NULL;
915         }
916
917         RETURN(rc);
918 }
919
920 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
921                 const union ldlm_policy_data *policy,
922                 struct md_op_data *op_data,
923                 struct lustre_handle *lockh, __u64 extra_lock_flags)
924 {
925         return mdc_enqueue_base(exp, einfo, policy, NULL,
926                                 op_data, lockh, extra_lock_flags);
927 }
928
929 static int mdc_finish_intent_lock(struct obd_export *exp,
930                                   struct ptlrpc_request *request,
931                                   struct md_op_data *op_data,
932                                   struct lookup_intent *it,
933                                   struct lustre_handle *lockh)
934 {
935         struct lustre_handle old_lock;
936         struct ldlm_lock *lock;
937         int rc = 0;
938         ENTRY;
939
940         LASSERT(request != NULL);
941         LASSERT(request != LP_POISON);
942         LASSERT(request->rq_repmsg != LP_POISON);
943
944         if (it->it_op & IT_READDIR)
945                 RETURN(0);
946
947         if (it->it_op & (IT_GETXATTR | IT_LAYOUT)) {
948                 if (it->it_status != 0)
949                         GOTO(out, rc = it->it_status);
950         } else {
951                 if (!it_disposition(it, DISP_IT_EXECD)) {
952                         /* The server failed before it even started executing
953                          * the intent, i.e. because it couldn't unpack the
954                          * request.
955                          */
956                         LASSERT(it->it_status != 0);
957                         GOTO(out, rc = it->it_status);
958                 }
959                 rc = it_open_error(DISP_IT_EXECD, it);
960                 if (rc)
961                         GOTO(out, rc);
962
963                 rc = it_open_error(DISP_LOOKUP_EXECD, it);
964                 if (rc)
965                         GOTO(out, rc);
966
967                 /* keep requests around for the multiple phases of the call
968                  * this shows the DISP_XX must guarantee we make it into the
969                  * call
970                  */
971                 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
972                     it_disposition(it, DISP_OPEN_CREATE) &&
973                     !it_open_error(DISP_OPEN_CREATE, it)) {
974                         it_set_disposition(it, DISP_ENQ_CREATE_REF);
975                         /* balanced in ll_create_node */
976                         ptlrpc_request_addref(request);
977                 }
978                 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
979                     it_disposition(it, DISP_OPEN_OPEN) &&
980                     !it_open_error(DISP_OPEN_OPEN, it)) {
981                         it_set_disposition(it, DISP_ENQ_OPEN_REF);
982                         /* balanced in ll_file_open */
983                         ptlrpc_request_addref(request);
984                         /* BUG 11546 - eviction in the middle of open rpc
985                          * processing
986                          */
987                         OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE,
988                                          obd_timeout);
989                 }
990
991                 if (it->it_op & IT_CREAT) {
992                         /* XXX this belongs in ll_create_it */
993                 } else if (it->it_op == IT_OPEN) {
994                         LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
995                 } else {
996                         LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
997                 }
998         }
999
1000         /* If we already have a matching lock, then cancel the new
1001          * one.  We have to set the data here instead of in
1002          * mdc_enqueue, because we need to use the child's inode as
1003          * the l_ast_data to match, and that's not available until
1004          * intent_finish has performed the iget().) */
1005         lock = ldlm_handle2lock(lockh);
1006         if (lock) {
1007                 union ldlm_policy_data policy = lock->l_policy_data;
1008                 LDLM_DEBUG(lock, "matching against this");
1009
1010                 if (it_has_reply_body(it)) {
1011                         struct mdt_body *body;
1012
1013                         body = req_capsule_server_get(&request->rq_pill,
1014                                                       &RMF_MDT_BODY);
1015                         /* mdc_enqueue checked */
1016                         LASSERT(body != NULL);
1017                         LASSERTF(fid_res_name_eq(&body->mbo_fid1,
1018                                                  &lock->l_resource->lr_name),
1019                                  "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1020                                  PLDLMRES(lock->l_resource),
1021                                  PFID(&body->mbo_fid1));
1022                 }
1023                 LDLM_LOCK_PUT(lock);
1024
1025                 memcpy(&old_lock, lockh, sizeof(*lockh));
1026                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
1027                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
1028                         ldlm_lock_decref_and_cancel(lockh, it->it_lock_mode);
1029                         memcpy(lockh, &old_lock, sizeof(old_lock));
1030                         it->it_lock_handle = lockh->cookie;
1031                 }
1032         }
1033
1034         EXIT;
1035 out:
1036         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
1037                 (int)op_data->op_namelen, op_data->op_name,
1038                 ldlm_it2str(it->it_op), it->it_status,
1039                 it->it_disposition, rc);
1040         return rc;
1041 }
1042
1043 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
1044                         struct lu_fid *fid, __u64 *bits)
1045 {
1046         /* We could just return 1 immediately, but since we should only
1047          * be called in revalidate_it if we already have a lock, let's
1048          * verify that. */
1049         struct ldlm_res_id res_id;
1050         struct lustre_handle lockh;
1051         union ldlm_policy_data policy;
1052         enum ldlm_mode mode;
1053         ENTRY;
1054
1055         if (it->it_lock_handle) {
1056                 lockh.cookie = it->it_lock_handle;
1057                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
1058         } else {
1059                 fid_build_reg_res_name(fid, &res_id);
1060                 switch (it->it_op) {
1061                 case IT_GETATTR:
1062                         /* File attributes are held under multiple bits:
1063                          * nlink is under lookup lock, size and times are
1064                          * under UPDATE lock and recently we've also got
1065                          * a separate permissions lock for owner/group/acl that
1066                          * were protected by lookup lock before.
1067                          * Getattr must provide all of that information,
1068                          * so we need to ensure we have all of those locks.
1069                          * Unfortunately, if the bits are split across multiple
1070                          * locks, there's no easy way to match all of them here,
1071                          * so an extra RPC would be performed to fetch all
1072                          * of those bits at once for now. */
1073                         /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1074                          * but for old MDTs (< 2.4), permission is covered
1075                          * by LOOKUP lock, so it needs to match all bits here.*/
1076                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1077                                                   MDS_INODELOCK_LOOKUP |
1078                                                   MDS_INODELOCK_PERM;
1079                         break;
1080                 case IT_READDIR:
1081                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1082                         break;
1083                 case IT_LAYOUT:
1084                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1085                         break;
1086                 default:
1087                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1088                         break;
1089                 }
1090
1091                 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1092                                       LDLM_IBITS, &policy,
1093                                       LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1094                                       &lockh);
1095         }
1096
1097         if (mode) {
1098                 it->it_lock_handle = lockh.cookie;
1099                 it->it_lock_mode = mode;
1100         } else {
1101                 it->it_lock_handle = 0;
1102                 it->it_lock_mode = 0;
1103         }
1104
1105         RETURN(!!mode);
1106 }
1107
1108 /*
1109  * This long block is all about fixing up the lock and request state
1110  * so that it is correct as of the moment _before_ the operation was
1111  * applied; that way, the VFS will think that everything is normal and
1112  * call Lustre's regular VFS methods.
1113  *
1114  * If we're performing a creation, that means that unless the creation
1115  * failed with EEXIST, we should fake up a negative dentry.
1116  *
1117  * For everything else, we want to lookup to succeed.
1118  *
1119  * One additional note: if CREATE or OPEN succeeded, we add an extra
1120  * reference to the request because we need to keep it around until
1121  * ll_create/ll_open gets called.
1122  *
1123  * The server will return to us, in it_disposition, an indication of
1124  * exactly what it_status refers to.
1125  *
1126  * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1127  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1128  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1129  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1130  * was successful.
1131  *
1132  * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1133  * child lookup.
1134  */
1135 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1136                     struct lookup_intent *it, struct ptlrpc_request **reqp,
1137                     ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1138 {
1139         struct ldlm_enqueue_info einfo = {
1140                 .ei_type        = LDLM_IBITS,
1141                 .ei_mode        = it_to_lock_mode(it),
1142                 .ei_cb_bl       = cb_blocking,
1143                 .ei_cb_cp       = ldlm_completion_ast,
1144                 .ei_cb_gl       = mdc_ldlm_glimpse_ast,
1145         };
1146         struct lustre_handle lockh;
1147         int rc = 0;
1148         ENTRY;
1149         LASSERT(it);
1150
1151         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1152                 ", intent: %s flags %#llo\n", (int)op_data->op_namelen,
1153                 op_data->op_name, PFID(&op_data->op_fid2),
1154                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1155                 it->it_flags);
1156
1157         lockh.cookie = 0;
1158         if (fid_is_sane(&op_data->op_fid2) &&
1159             (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1160                 /* We could just return 1 immediately, but since we should only
1161                  * be called in revalidate_it if we already have a lock, let's
1162                  * verify that. */
1163                 it->it_lock_handle = 0;
1164                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1165                 /* Only return failure if it was not GETATTR by cfid
1166                    (from inode_revalidate) */
1167                 if (rc || op_data->op_namelen != 0)
1168                         RETURN(rc);
1169         }
1170
1171         /* For case if upper layer did not alloc fid, do it now. */
1172         if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1173                 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1174                 if (rc < 0) {
1175                         CERROR("Can't alloc new fid, rc %d\n", rc);
1176                         RETURN(rc);
1177                 }
1178         }
1179
1180         rc = mdc_enqueue_base(exp, &einfo, NULL, it, op_data, &lockh,
1181                               extra_lock_flags);
1182         if (rc < 0)
1183                 RETURN(rc);
1184
1185         *reqp = it->it_request;
1186         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1187         RETURN(rc);
1188 }
1189
1190 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1191                                               struct ptlrpc_request *req,
1192                                               void *args, int rc)
1193 {
1194         struct mdc_getattr_args  *ga = args;
1195         struct obd_export        *exp = ga->ga_exp;
1196         struct md_enqueue_info   *minfo = ga->ga_minfo;
1197         struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1198         struct lookup_intent     *it;
1199         struct lustre_handle     *lockh;
1200         struct obd_device        *obddev;
1201         struct ldlm_reply        *lockrep;
1202         __u64                     flags = LDLM_FL_HAS_INTENT;
1203         ENTRY;
1204
1205         it    = &minfo->mi_it;
1206         lockh = &minfo->mi_lockh;
1207
1208         obddev = class_exp2obd(exp);
1209
1210         obd_put_request_slot(&obddev->u.cli);
1211         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1212                 rc = -ETIMEDOUT;
1213
1214         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1215                                    &flags, NULL, 0, lockh, rc);
1216         if (rc < 0) {
1217                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1218                 mdc_clear_replay_flag(req, rc);
1219                 GOTO(out, rc);
1220         }
1221
1222         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1223         LASSERT(lockrep != NULL);
1224
1225         lockrep->lock_policy_res2 =
1226                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1227
1228         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1229         if (rc)
1230                 GOTO(out, rc);
1231
1232         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1233         EXIT;
1234
1235 out:
1236         minfo->mi_cb(req, minfo, rc);
1237         return 0;
1238 }
1239
1240 int mdc_intent_getattr_async(struct obd_export *exp,
1241                              struct md_enqueue_info *minfo)
1242 {
1243         struct md_op_data       *op_data = &minfo->mi_data;
1244         struct lookup_intent    *it = &minfo->mi_it;
1245         struct ptlrpc_request   *req;
1246         struct mdc_getattr_args *ga;
1247         struct obd_device       *obddev = class_exp2obd(exp);
1248         struct ldlm_res_id       res_id;
1249         union ldlm_policy_data policy = {
1250                                 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1251                                                  MDS_INODELOCK_UPDATE } };
1252         int                      rc = 0;
1253         __u64                    flags = LDLM_FL_HAS_INTENT;
1254         ENTRY;
1255
1256         CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#llo\n",
1257                 (int)op_data->op_namelen, op_data->op_name,
1258                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1259
1260         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1261         req = mdc_intent_getattr_pack(exp, it, op_data);
1262         if (IS_ERR(req))
1263                 RETURN(PTR_ERR(req));
1264
1265         rc = obd_get_request_slot(&obddev->u.cli);
1266         if (rc != 0) {
1267                 ptlrpc_req_finished(req);
1268                 RETURN(rc);
1269         }
1270
1271         /* With Data-on-MDT the glimpse callback is needed too.
1272          * It is set here in advance but not in mdc_finish_enqueue()
1273          * to avoid possible races. It is safe to have glimpse handler
1274          * for non-DOM locks and costs nothing.*/
1275         if (minfo->mi_einfo.ei_cb_gl == NULL)
1276                 minfo->mi_einfo.ei_cb_gl = mdc_ldlm_glimpse_ast;
1277
1278         rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1279                               &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1280         if (rc < 0) {
1281                 obd_put_request_slot(&obddev->u.cli);
1282                 ptlrpc_req_finished(req);
1283                 RETURN(rc);
1284         }
1285
1286         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1287         ga = ptlrpc_req_async_args(req);
1288         ga->ga_exp = exp;
1289         ga->ga_minfo = minfo;
1290
1291         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1292         ptlrpcd_add_req(req);
1293
1294         RETURN(0);
1295 }