Whamcloud - gitweb
LU-7925 llite: avoid clearing i_nlink for inodes in use
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2015, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_MDC
38
39 #include <linux/module.h>
40
41 #include <obd.h>
42 #include <obd_class.h>
43 #include <lustre_dlm.h>
44 #include <lustre_fid.h>
45 #include <lustre_intent.h>
46 #include <lustre_mdc.h>
47 #include <lustre_net.h>
48 #include <lustre_req_layout.h>
49 #include <lustre_swab.h>
50
51 #include "mdc_internal.h"
52
53 struct mdc_getattr_args {
54         struct obd_export               *ga_exp;
55         struct md_enqueue_info          *ga_minfo;
56 };
57
58 int it_open_error(int phase, struct lookup_intent *it)
59 {
60         if (it_disposition(it, DISP_OPEN_LEASE)) {
61                 if (phase >= DISP_OPEN_LEASE)
62                         return it->d.lustre.it_status;
63                 else
64                         return 0;
65         }
66         if (it_disposition(it, DISP_OPEN_OPEN)) {
67                 if (phase >= DISP_OPEN_OPEN)
68                         return it->d.lustre.it_status;
69                 else
70                         return 0;
71         }
72
73         if (it_disposition(it, DISP_OPEN_CREATE)) {
74                 if (phase >= DISP_OPEN_CREATE)
75                         return it->d.lustre.it_status;
76                 else
77                         return 0;
78         }
79
80         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
81                 if (phase >= DISP_LOOKUP_EXECD)
82                         return it->d.lustre.it_status;
83                 else
84                         return 0;
85         }
86
87         if (it_disposition(it, DISP_IT_EXECD)) {
88                 if (phase >= DISP_IT_EXECD)
89                         return it->d.lustre.it_status;
90                 else
91                         return 0;
92         }
93         CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
94                it->d.lustre.it_status);
95         LBUG();
96         return 0;
97 }
98 EXPORT_SYMBOL(it_open_error);
99
100 /* this must be called on a lockh that is known to have a referenced lock */
101 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
102                       __u64 *bits)
103 {
104         struct ldlm_lock *lock;
105         struct inode *new_inode = data;
106         ENTRY;
107
108         if(bits)
109                 *bits = 0;
110
111         if (!*lockh)
112                 RETURN(0);
113
114         lock = ldlm_handle2lock((struct lustre_handle *)lockh);
115
116         LASSERT(lock != NULL);
117         lock_res_and_lock(lock);
118         if (lock->l_resource->lr_lvb_inode &&
119             lock->l_resource->lr_lvb_inode != data) {
120                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
121                 LASSERTF(old_inode->i_state & I_FREEING,
122                          "Found existing inode %p/%lu/%u state %lu in lock: "
123                          "setting data to %p/%lu/%u\n", old_inode,
124                          old_inode->i_ino, old_inode->i_generation,
125                          old_inode->i_state,
126                          new_inode, new_inode->i_ino, new_inode->i_generation);
127         }
128         lock->l_resource->lr_lvb_inode = new_inode;
129         if (bits)
130                 *bits = lock->l_policy_data.l_inodebits.bits;
131
132         unlock_res_and_lock(lock);
133         LDLM_LOCK_PUT(lock);
134
135         RETURN(0);
136 }
137
138 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
139                               const struct lu_fid *fid, enum ldlm_type type,
140                               union ldlm_policy_data *policy,
141                               enum ldlm_mode mode, struct lustre_handle *lockh)
142 {
143         struct ldlm_res_id res_id;
144         enum ldlm_mode rc;
145         ENTRY;
146
147         fid_build_reg_res_name(fid, &res_id);
148         /* LU-4405: Clear bits not supported by server */
149         policy->l_inodebits.bits &= exp_connect_ibits(exp);
150         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
151                              &res_id, type, policy, mode, lockh, 0);
152         RETURN(rc);
153 }
154
155 int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
156                       union ldlm_policy_data *policy, enum ldlm_mode mode,
157                       enum ldlm_cancel_flags flags, void *opaque)
158 {
159         struct obd_device *obd = class_exp2obd(exp);
160         struct ldlm_res_id res_id;
161         int rc;
162
163         ENTRY;
164
165         fid_build_reg_res_name(fid, &res_id);
166         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
167                                              policy, mode, flags, opaque);
168         RETURN(rc);
169 }
170
171 int mdc_null_inode(struct obd_export *exp,
172                    const struct lu_fid *fid)
173 {
174         struct ldlm_res_id res_id;
175         struct ldlm_resource *res;
176         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
177         ENTRY;
178
179         LASSERTF(ns != NULL, "no namespace passed\n");
180
181         fid_build_reg_res_name(fid, &res_id);
182
183         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
184         if (IS_ERR(res))
185                 RETURN(0);
186
187         lock_res(res);
188         res->lr_lvb_inode = NULL;
189         unlock_res(res);
190
191         ldlm_resource_putref(res);
192         RETURN(0);
193 }
194
195 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
196 {
197         /* Don't hold error requests for replay. */
198         if (req->rq_replay) {
199                 spin_lock(&req->rq_lock);
200                 req->rq_replay = 0;
201                 spin_unlock(&req->rq_lock);
202         }
203         if (rc && req->rq_transno != 0) {
204                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
205                 LBUG();
206         }
207 }
208
209 /* Save a large LOV EA into the request buffer so that it is available
210  * for replay.  We don't do this in the initial request because the
211  * original request doesn't need this buffer (at most it sends just the
212  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
213  * buffer and may also be difficult to allocate and save a very large
214  * request buffer for each open. (bug 5707)
215  *
216  * OOM here may cause recovery failure if lmm is needed (only for the
217  * original open if the MDS crashed just when this client also OOM'd)
218  * but this is incredibly unlikely, and questionable whether the client
219  * could do MDS recovery under OOM anyways... */
220 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
221                                 struct mdt_body *body)
222 {
223         int     rc;
224
225         /* FIXME: remove this explicit offset. */
226         rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
227                                         body->mbo_eadatasize);
228         if (rc) {
229                 CERROR("Can't enlarge segment %d size to %d\n",
230                        DLM_INTENT_REC_OFF + 4, body->mbo_eadatasize);
231                 body->mbo_valid &= ~OBD_MD_FLEASIZE;
232                 body->mbo_eadatasize = 0;
233         }
234 }
235
236 static struct ptlrpc_request *
237 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
238                      struct md_op_data *op_data)
239 {
240         struct ptlrpc_request   *req;
241         struct obd_device       *obddev = class_exp2obd(exp);
242         struct ldlm_intent      *lit;
243         const void              *lmm = op_data->op_data;
244         __u32                    lmmsize = op_data->op_data_size;
245         struct list_head         cancels = LIST_HEAD_INIT(cancels);
246         int                      count = 0;
247         enum ldlm_mode           mode;
248         int                      rc;
249         ENTRY;
250
251         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
252
253         /* XXX: openlock is not cancelled for cross-refs. */
254         /* If inode is known, cancel conflicting OPEN locks. */
255         if (fid_is_sane(&op_data->op_fid2)) {
256                 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
257                         if (it->it_flags & FMODE_WRITE)
258                                 mode = LCK_EX;
259                         else
260                                 mode = LCK_PR;
261                 } else {
262                         if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
263                                 mode = LCK_CW;
264 #ifdef FMODE_EXEC
265                         else if (it->it_flags & FMODE_EXEC)
266                                 mode = LCK_PR;
267 #endif
268                         else
269                                 mode = LCK_CR;
270                 }
271                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
272                                                 &cancels, mode,
273                                                 MDS_INODELOCK_OPEN);
274         }
275
276         /* If CREATE, cancel parent's UPDATE lock. */
277         if (it->it_op & IT_CREAT)
278                 mode = LCK_EX;
279         else
280                 mode = LCK_CR;
281         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
282                                          &cancels, mode,
283                                          MDS_INODELOCK_UPDATE);
284
285         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
286                                    &RQF_LDLM_INTENT_OPEN);
287         if (req == NULL) {
288                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
289                 RETURN(ERR_PTR(-ENOMEM));
290         }
291
292         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
293                              op_data->op_namelen + 1);
294         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
295                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
296
297         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
298         if (rc < 0) {
299                 ptlrpc_request_free(req);
300                 RETURN(ERR_PTR(rc));
301         }
302
303         spin_lock(&req->rq_lock);
304         req->rq_replay = req->rq_import->imp_replayable;
305         spin_unlock(&req->rq_lock);
306
307         /* pack the intent */
308         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
309         lit->opc = (__u64)it->it_op;
310
311         /* pack the intended request */
312         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
313                       lmmsize);
314
315         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
316                              obddev->u.cli.cl_max_mds_easize);
317
318         /* for remote client, fetch remote perm for current user */
319         if (client_is_remote(exp))
320                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
321                                      sizeof(struct mdt_remote_perm));
322         ptlrpc_request_set_replen(req);
323         return req;
324 }
325
326 static struct ptlrpc_request *
327 mdc_intent_getxattr_pack(struct obd_export *exp,
328                          struct lookup_intent *it,
329                          struct md_op_data *op_data)
330 {
331         struct ptlrpc_request   *req;
332         struct ldlm_intent      *lit;
333         int                     rc, count = 0;
334         __u32                   maxdata;
335         struct list_head        cancels = LIST_HEAD_INIT(cancels);
336
337         ENTRY;
338
339         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
340                                         &RQF_LDLM_INTENT_GETXATTR);
341         if (req == NULL)
342                 RETURN(ERR_PTR(-ENOMEM));
343
344         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
345         if (rc) {
346                 ptlrpc_request_free(req);
347                 RETURN(ERR_PTR(rc));
348         }
349
350         /* pack the intent */
351         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
352         lit->opc = IT_GETXATTR;
353
354         maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
355
356         /* pack the intended request */
357         mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid, maxdata, -1,
358                       0);
359
360         req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
361                                 RCL_SERVER, maxdata);
362
363         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS,
364                                 RCL_SERVER, maxdata);
365
366         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
367                                 RCL_SERVER, maxdata);
368
369         ptlrpc_request_set_replen(req);
370
371         RETURN(req);
372 }
373
374 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
375                                                      struct lookup_intent *it,
376                                                      struct md_op_data *op_data)
377 {
378         struct ptlrpc_request *req;
379         struct obd_device     *obddev = class_exp2obd(exp);
380         struct ldlm_intent    *lit;
381         int                    rc;
382         ENTRY;
383
384         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
385                                    &RQF_LDLM_INTENT_UNLINK);
386         if (req == NULL)
387                 RETURN(ERR_PTR(-ENOMEM));
388
389         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
390                              op_data->op_namelen + 1);
391
392         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
393         if (rc) {
394                 ptlrpc_request_free(req);
395                 RETURN(ERR_PTR(rc));
396         }
397
398         /* pack the intent */
399         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
400         lit->opc = (__u64)it->it_op;
401
402         /* pack the intended request */
403         mdc_unlink_pack(req, op_data);
404
405         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
406                              obddev->u.cli.cl_default_mds_easize);
407         ptlrpc_request_set_replen(req);
408         RETURN(req);
409 }
410
411 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
412                                                       struct lookup_intent *it,
413                                                       struct md_op_data *op_data)
414 {
415         struct ptlrpc_request   *req;
416         struct obd_device       *obddev = class_exp2obd(exp);
417         u64                      valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
418                                          OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
419                                          OBD_MD_MEA |
420                                          (client_is_remote(exp) ?
421                                           OBD_MD_FLRMTPERM : OBD_MD_FLACL);
422         struct ldlm_intent      *lit;
423         int                      rc;
424         __u32                    easize;
425         ENTRY;
426
427         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
428                                    &RQF_LDLM_INTENT_GETATTR);
429         if (req == NULL)
430                 RETURN(ERR_PTR(-ENOMEM));
431
432         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
433                              op_data->op_namelen + 1);
434
435         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
436         if (rc) {
437                 ptlrpc_request_free(req);
438                 RETURN(ERR_PTR(rc));
439         }
440
441         /* pack the intent */
442         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
443         lit->opc = (__u64)it->it_op;
444
445         if (obddev->u.cli.cl_default_mds_easize > 0)
446                 easize = obddev->u.cli.cl_default_mds_easize;
447         else
448                 easize = obddev->u.cli.cl_max_mds_easize;
449
450         /* pack the intended request */
451         mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
452
453         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
454         if (client_is_remote(exp))
455                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
456                                      sizeof(struct mdt_remote_perm));
457         ptlrpc_request_set_replen(req);
458         RETURN(req);
459 }
460
461 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
462                                                      struct lookup_intent *it,
463                                                      struct md_op_data *unused)
464 {
465         struct obd_device     *obd = class_exp2obd(exp);
466         struct ptlrpc_request *req;
467         struct ldlm_intent    *lit;
468         struct layout_intent  *layout;
469         int rc;
470         ENTRY;
471
472         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
473                                 &RQF_LDLM_INTENT_LAYOUT);
474         if (req == NULL)
475                 RETURN(ERR_PTR(-ENOMEM));
476
477         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
478         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
479         if (rc) {
480                 ptlrpc_request_free(req);
481                 RETURN(ERR_PTR(rc));
482         }
483
484         /* pack the intent */
485         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
486         lit->opc = (__u64)it->it_op;
487
488         /* pack the layout intent request */
489         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
490         /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
491          * set for replication */
492         layout->li_opc = LAYOUT_INTENT_ACCESS;
493
494         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
495                              obd->u.cli.cl_default_mds_easize);
496         ptlrpc_request_set_replen(req);
497         RETURN(req);
498 }
499
500 static struct ptlrpc_request *
501 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
502 {
503         struct ptlrpc_request *req;
504         int rc;
505         ENTRY;
506
507         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
508         if (req == NULL)
509                 RETURN(ERR_PTR(-ENOMEM));
510
511         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
512         if (rc) {
513                 ptlrpc_request_free(req);
514                 RETURN(ERR_PTR(rc));
515         }
516
517         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
518         ptlrpc_request_set_replen(req);
519         RETURN(req);
520 }
521
522 static int mdc_finish_enqueue(struct obd_export *exp,
523                               struct ptlrpc_request *req,
524                               struct ldlm_enqueue_info *einfo,
525                               struct lookup_intent *it,
526                               struct lustre_handle *lockh,
527                               int rc)
528 {
529         struct req_capsule  *pill = &req->rq_pill;
530         struct ldlm_request *lockreq;
531         struct ldlm_reply   *lockrep;
532         struct lustre_intent_data *intent = &it->d.lustre;
533         struct ldlm_lock    *lock;
534         void                *lvb_data = NULL;
535         __u32                lvb_len = 0;
536         ENTRY;
537
538         LASSERT(rc >= 0);
539         /* Similarly, if we're going to replay this request, we don't want to
540          * actually get a lock, just perform the intent. */
541         if (req->rq_transno || req->rq_replay) {
542                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
543                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
544         }
545
546         if (rc == ELDLM_LOCK_ABORTED) {
547                 einfo->ei_mode = 0;
548                 memset(lockh, 0, sizeof(*lockh));
549                 rc = 0;
550         } else { /* rc = 0 */
551                 lock = ldlm_handle2lock(lockh);
552                 LASSERT(lock != NULL);
553
554                 /* If the server gave us back a different lock mode, we should
555                  * fix up our variables. */
556                 if (lock->l_req_mode != einfo->ei_mode) {
557                         ldlm_lock_addref(lockh, lock->l_req_mode);
558                         ldlm_lock_decref(lockh, einfo->ei_mode);
559                         einfo->ei_mode = lock->l_req_mode;
560                 }
561                 LDLM_LOCK_PUT(lock);
562         }
563
564         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
565         LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */
566
567         intent->it_disposition = (int)lockrep->lock_policy_res1;
568         intent->it_status = (int)lockrep->lock_policy_res2;
569         intent->it_lock_mode = einfo->ei_mode;
570         intent->it_lock_handle = lockh->cookie;
571         intent->it_data = req;
572
573         /* Technically speaking rq_transno must already be zero if
574          * it_status is in error, so the check is a bit redundant */
575         if ((!req->rq_transno || intent->it_status < 0) && req->rq_replay)
576                 mdc_clear_replay_flag(req, intent->it_status);
577
578         /* If we're doing an IT_OPEN which did not result in an actual
579          * successful open, then we need to remove the bit which saves
580          * this request for unconditional replay.
581          *
582          * It's important that we do this first!  Otherwise we might exit the
583          * function without doing so, and try to replay a failed create
584          * (bug 3440) */
585         if (it->it_op & IT_OPEN && req->rq_replay &&
586             (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0))
587                 mdc_clear_replay_flag(req, intent->it_status);
588
589         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
590                   it->it_op, intent->it_disposition, intent->it_status);
591
592         /* We know what to expect, so we do any byte flipping required here */
593         if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
594                 struct mdt_body *body;
595
596                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
597                 if (body == NULL) {
598                         CERROR ("Can't swab mdt_body\n");
599                         RETURN (-EPROTO);
600                 }
601
602                 if (it_disposition(it, DISP_OPEN_OPEN) &&
603                     !it_open_error(DISP_OPEN_OPEN, it)) {
604                         /*
605                          * If this is a successful OPEN request, we need to set
606                          * replay handler and data early, so that if replay
607                          * happens immediately after swabbing below, new reply
608                          * is swabbed by that handler correctly.
609                          */
610                         mdc_set_open_replay_data(NULL, NULL, it);
611                 }
612
613                 if (body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) {
614                         void *eadata;
615
616                         mdc_update_max_ea_from_body(exp, body);
617
618                         /*
619                          * The eadata is opaque; just check that it is there.
620                          * Eventually, obd_unpackmd() will check the contents.
621                          */
622                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
623                                                         body->mbo_eadatasize);
624                         if (eadata == NULL)
625                                 RETURN(-EPROTO);
626
627                         /* save lvb data and length in case this is for layout
628                          * lock */
629                         lvb_data = eadata;
630                         lvb_len = body->mbo_eadatasize;
631
632                         /*
633                          * We save the reply LOV EA in case we have to replay a
634                          * create for recovery.  If we didn't allocate a large
635                          * enough request buffer above we need to reallocate it
636                          * here to hold the actual LOV EA.
637                          *
638                          * To not save LOV EA if request is not going to replay
639                          * (for example error one).
640                          */
641                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
642                                 void *lmm;
643                                 if (req_capsule_get_size(pill, &RMF_EADATA,
644                                                          RCL_CLIENT) <
645                                     body->mbo_eadatasize)
646                                         mdc_realloc_openmsg(req, body);
647                                 else
648                                         req_capsule_shrink(pill, &RMF_EADATA,
649                                                            body->mbo_eadatasize,
650                                                            RCL_CLIENT);
651
652                                 req_capsule_set_size(pill, &RMF_EADATA,
653                                                      RCL_CLIENT,
654                                                      body->mbo_eadatasize);
655
656                                 lmm = req_capsule_client_get(pill, &RMF_EADATA);
657                                 if (lmm)
658                                         memcpy(lmm, eadata,
659                                                body->mbo_eadatasize);
660                         }
661                 }
662
663                 if (body->mbo_valid & OBD_MD_FLRMTPERM) {
664                         struct mdt_remote_perm *perm;
665
666                         LASSERT(client_is_remote(exp));
667                         perm = req_capsule_server_swab_get(pill, &RMF_ACL,
668                                                 lustre_swab_mdt_remote_perm);
669                         if (perm == NULL)
670                                 RETURN(-EPROTO);
671                 }
672         } else if (it->it_op & IT_LAYOUT) {
673                 /* maybe the lock was granted right away and layout
674                  * is packed into RMF_DLM_LVB of req */
675                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
676                 if (lvb_len > 0) {
677                         lvb_data = req_capsule_server_sized_get(pill,
678                                                         &RMF_DLM_LVB, lvb_len);
679                         if (lvb_data == NULL)
680                                 RETURN(-EPROTO);
681                 }
682         }
683
684         /* fill in stripe data for layout lock.
685          * LU-6581: trust layout data only if layout lock is granted. The MDT
686          * has stopped sending layout unless the layout lock is granted. The
687          * client still does this checking in case it's talking with an old
688          * server. - Jinshan */
689         lock = ldlm_handle2lock(lockh);
690         if (lock != NULL && ldlm_has_layout(lock) && lvb_data != NULL &&
691             !(lockrep->lock_flags & LDLM_FL_BLOCKED_MASK)) {
692                 void *lmm;
693
694                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d\n",
695                         ldlm_it2str(it->it_op), lvb_len);
696
697                 OBD_ALLOC_LARGE(lmm, lvb_len);
698                 if (lmm == NULL) {
699                         LDLM_LOCK_PUT(lock);
700                         RETURN(-ENOMEM);
701                 }
702                 memcpy(lmm, lvb_data, lvb_len);
703
704                 /* install lvb_data */
705                 lock_res_and_lock(lock);
706                 if (lock->l_lvb_data == NULL) {
707                         lock->l_lvb_type = LVB_T_LAYOUT;
708                         lock->l_lvb_data = lmm;
709                         lock->l_lvb_len = lvb_len;
710                         lmm = NULL;
711                 }
712                 unlock_res_and_lock(lock);
713                 if (lmm != NULL)
714                         OBD_FREE_LARGE(lmm, lvb_len);
715         }
716         if (lock != NULL)
717                 LDLM_LOCK_PUT(lock);
718
719         RETURN(rc);
720 }
721
722 /* We always reserve enough space in the reply packet for a stripe MD, because
723  * we don't know in advance the file type. */
724 int mdc_enqueue(struct obd_export *exp,
725                 struct ldlm_enqueue_info *einfo,
726                 const union ldlm_policy_data *policy,
727                 struct lookup_intent *it, struct md_op_data *op_data,
728                 struct lustre_handle *lockh, __u64 extra_lock_flags)
729 {
730         struct obd_device *obddev = class_exp2obd(exp);
731         struct ptlrpc_request *req = NULL;
732         __u64 flags, saved_flags = extra_lock_flags;
733         struct ldlm_res_id res_id;
734         static const union ldlm_policy_data lookup_policy = {
735                                   .l_inodebits = { MDS_INODELOCK_LOOKUP } };
736         static const union ldlm_policy_data update_policy = {
737                                   .l_inodebits = { MDS_INODELOCK_UPDATE } };
738         static const union ldlm_policy_data layout_policy = {
739                                   .l_inodebits = { MDS_INODELOCK_LAYOUT } };
740         static const union ldlm_policy_data getxattr_policy = {
741                                   .l_inodebits = { MDS_INODELOCK_XATTR } };
742         int generation, resends = 0;
743         struct ldlm_reply *lockrep;
744         enum lvb_type lvb_type = 0;
745         int rc;
746         ENTRY;
747
748         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
749                  einfo->ei_type);
750         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
751
752         if (it != NULL) {
753                 LASSERT(policy == NULL);
754
755                 saved_flags |= LDLM_FL_HAS_INTENT;
756                 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_GETATTR | IT_READDIR))
757                         policy = &update_policy;
758                 else if (it->it_op & IT_LAYOUT)
759                         policy = &layout_policy;
760                 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
761                         policy = &getxattr_policy;
762                 else
763                         policy = &lookup_policy;
764         }
765
766         generation = obddev->u.cli.cl_import->imp_generation;
767 resend:
768         flags = saved_flags;
769         if (it == NULL) {
770                 /* The only way right now is FLOCK. */
771                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
772                          einfo->ei_type);
773                 res_id.name[3] = LDLM_FLOCK;
774         } else if (it->it_op & IT_OPEN) {
775                 req = mdc_intent_open_pack(exp, it, op_data);
776         } else if (it->it_op & IT_UNLINK) {
777                 req = mdc_intent_unlink_pack(exp, it, op_data);
778         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
779                 req = mdc_intent_getattr_pack(exp, it, op_data);
780         } else if (it->it_op & IT_READDIR) {
781                 req = mdc_enqueue_pack(exp, 0);
782         } else if (it->it_op & IT_LAYOUT) {
783                 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
784                         RETURN(-EOPNOTSUPP);
785                 req = mdc_intent_layout_pack(exp, it, op_data);
786                 lvb_type = LVB_T_LAYOUT;
787         } else if (it->it_op & IT_GETXATTR) {
788                 req = mdc_intent_getxattr_pack(exp, it, op_data);
789         } else {
790                 LBUG();
791                 RETURN(-EINVAL);
792         }
793
794         if (IS_ERR(req))
795                 RETURN(PTR_ERR(req));
796
797         if (resends) {
798                 req->rq_generation_set = 1;
799                 req->rq_import_generation = generation;
800                 req->rq_sent = cfs_time_current_sec() + resends;
801         }
802
803         /* It is important to obtain modify RPC slot first (if applicable), so
804          * that threads that are waiting for a modify RPC slot are not polluting
805          * our rpcs in flight counter.
806          * We do not do flock request limiting, though */
807         if (it) {
808                 mdc_get_mod_rpc_slot(req, it);
809                 rc = obd_get_request_slot(&obddev->u.cli);
810                 if (rc != 0) {
811                         mdc_put_mod_rpc_slot(req, it);
812                         mdc_clear_replay_flag(req, 0);
813                         ptlrpc_req_finished(req);
814                         RETURN(rc);
815                 }
816         }
817
818         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
819                               0, lvb_type, lockh, 0);
820         if (!it) {
821                 /* For flock requests we immediatelly return without further
822                    delay and let caller deal with the rest, since rest of
823                    this function metadata processing makes no sense for flock
824                    requests anyway. But in case of problem during comms with
825                    Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
826                    can not rely on caller and this mainly for F_UNLCKs
827                    (explicits or automatically generated by Kernel to clean
828                    current FLocks upon exit) that can't be trashed */
829                 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
830                     (einfo->ei_type == LDLM_FLOCK) &&
831                     (einfo->ei_mode == LCK_NL))
832                         goto resend;
833                 RETURN(rc);
834         }
835
836         obd_put_request_slot(&obddev->u.cli);
837         mdc_put_mod_rpc_slot(req, it);
838
839         if (rc < 0) {
840                 CDEBUG(D_INFO, "%s: ldlm_cli_enqueue failed: rc = %d\n",
841                        obddev->obd_name, rc);
842
843                 mdc_clear_replay_flag(req, rc);
844                 ptlrpc_req_finished(req);
845                 RETURN(rc);
846         }
847
848         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
849         LASSERT(lockrep != NULL);
850
851         lockrep->lock_policy_res2 =
852                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
853
854         /* Retry infinitely when the server returns -EINPROGRESS for the
855          * intent operation, when server returns -EINPROGRESS for acquiring
856          * intent lock, we'll retry in after_reply(). */
857         if (it && (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
858                 mdc_clear_replay_flag(req, rc);
859                 ptlrpc_req_finished(req);
860                 resends++;
861
862                 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
863                        obddev->obd_name, resends, it->it_op,
864                        PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
865
866                 if (generation == obddev->u.cli.cl_import->imp_generation) {
867                         goto resend;
868                 } else {
869                         CDEBUG(D_HA, "resend cross eviction\n");
870                         RETURN(-EIO);
871                 }
872         }
873
874         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
875         if (rc < 0) {
876                 if (lustre_handle_is_used(lockh)) {
877                         ldlm_lock_decref(lockh, einfo->ei_mode);
878                         memset(lockh, 0, sizeof(*lockh));
879                 }
880                 ptlrpc_req_finished(req);
881
882                 it->d.lustre.it_lock_handle = 0;
883                 it->d.lustre.it_lock_mode = 0;
884                 it->d.lustre.it_data = NULL;
885         }
886
887         RETURN(rc);
888 }
889
890 static int mdc_finish_intent_lock(struct obd_export *exp,
891                                   struct ptlrpc_request *request,
892                                   struct md_op_data *op_data,
893                                   struct lookup_intent *it,
894                                   struct lustre_handle *lockh)
895 {
896         struct lustre_handle old_lock;
897         struct mdt_body *mdt_body;
898         struct ldlm_lock *lock;
899         int rc;
900         ENTRY;
901
902         LASSERT(request != NULL);
903         LASSERT(request != LP_POISON);
904         LASSERT(request->rq_repmsg != LP_POISON);
905
906         if (it->it_op & IT_READDIR)
907                 RETURN(0);
908
909         if (!it_disposition(it, DISP_IT_EXECD)) {
910                 /* The server failed before it even started executing the
911                  * intent, i.e. because it couldn't unpack the request. */
912                 LASSERT(it->d.lustre.it_status != 0);
913                 RETURN(it->d.lustre.it_status);
914         }
915         rc = it_open_error(DISP_IT_EXECD, it);
916         if (rc)
917                 RETURN(rc);
918
919         mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
920         LASSERT(mdt_body != NULL);      /* mdc_enqueue checked */
921
922         rc = it_open_error(DISP_LOOKUP_EXECD, it);
923         if (rc)
924                 RETURN(rc);
925
926         /* keep requests around for the multiple phases of the call
927          * this shows the DISP_XX must guarantee we make it into the call
928          */
929         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
930             it_disposition(it, DISP_OPEN_CREATE) &&
931             !it_open_error(DISP_OPEN_CREATE, it)) {
932                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
933                 ptlrpc_request_addref(request); /* balanced in ll_create_node */
934         }
935         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
936             it_disposition(it, DISP_OPEN_OPEN) &&
937             !it_open_error(DISP_OPEN_OPEN, it)) {
938                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
939                 ptlrpc_request_addref(request); /* balanced in ll_file_open */
940                 /* BUG 11546 - eviction in the middle of open rpc processing */
941                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
942         }
943
944         if (it->it_op & IT_CREAT) {
945                 /* XXX this belongs in ll_create_it */
946         } else if (it->it_op == IT_OPEN) {
947                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
948         } else {
949                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
950         }
951
952         /* If we already have a matching lock, then cancel the new
953          * one.  We have to set the data here instead of in
954          * mdc_enqueue, because we need to use the child's inode as
955          * the l_ast_data to match, and that's not available until
956          * intent_finish has performed the iget().) */
957         lock = ldlm_handle2lock(lockh);
958         if (lock) {
959                 union ldlm_policy_data policy = lock->l_policy_data;
960                 LDLM_DEBUG(lock, "matching against this");
961
962                 LASSERTF(fid_res_name_eq(&mdt_body->mbo_fid1,
963                                          &lock->l_resource->lr_name),
964                          "Lock res_id: "DLDLMRES", fid: "DFID"\n",
965                          PLDLMRES(lock->l_resource), PFID(&mdt_body->mbo_fid1));
966                 LDLM_LOCK_PUT(lock);
967
968                 memcpy(&old_lock, lockh, sizeof(*lockh));
969                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
970                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
971                         ldlm_lock_decref_and_cancel(lockh,
972                                                     it->d.lustre.it_lock_mode);
973                         memcpy(lockh, &old_lock, sizeof(old_lock));
974                         it->d.lustre.it_lock_handle = lockh->cookie;
975                 }
976         }
977         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
978                 (int)op_data->op_namelen, op_data->op_name,
979                 ldlm_it2str(it->it_op), it->d.lustre.it_status,
980                 it->d.lustre.it_disposition, rc);
981         RETURN(rc);
982 }
983
984 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
985                         struct lu_fid *fid, __u64 *bits)
986 {
987         /* We could just return 1 immediately, but since we should only
988          * be called in revalidate_it if we already have a lock, let's
989          * verify that. */
990         struct ldlm_res_id res_id;
991         struct lustre_handle lockh;
992         union ldlm_policy_data policy;
993         enum ldlm_mode mode;
994         ENTRY;
995
996         if (it->d.lustre.it_lock_handle) {
997                 lockh.cookie = it->d.lustre.it_lock_handle;
998                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
999         } else {
1000                 fid_build_reg_res_name(fid, &res_id);
1001                 switch (it->it_op) {
1002                 case IT_GETATTR:
1003                         /* File attributes are held under multiple bits:
1004                          * nlink is under lookup lock, size and times are
1005                          * under UPDATE lock and recently we've also got
1006                          * a separate permissions lock for owner/group/acl that
1007                          * were protected by lookup lock before.
1008                          * Getattr must provide all of that information,
1009                          * so we need to ensure we have all of those locks.
1010                          * Unfortunately, if the bits are split across multiple
1011                          * locks, there's no easy way to match all of them here,
1012                          * so an extra RPC would be performed to fetch all
1013                          * of those bits at once for now. */
1014                         /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
1015                          * but for old MDTs (< 2.4), permission is covered
1016                          * by LOOKUP lock, so it needs to match all bits here.*/
1017                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
1018                                                   MDS_INODELOCK_LOOKUP |
1019                                                   MDS_INODELOCK_PERM;
1020                         break;
1021                 case IT_READDIR:
1022                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1023                         break;
1024                 case IT_LAYOUT:
1025                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1026                         break;
1027                 default:
1028                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1029                         break;
1030                 }
1031
1032                 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1033                                       LDLM_IBITS, &policy,
1034                                       LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1035                                       &lockh);
1036         }
1037
1038         if (mode) {
1039                 it->d.lustre.it_lock_handle = lockh.cookie;
1040                 it->d.lustre.it_lock_mode = mode;
1041         } else {
1042                 it->d.lustre.it_lock_handle = 0;
1043                 it->d.lustre.it_lock_mode = 0;
1044         }
1045
1046         RETURN(!!mode);
1047 }
1048
1049 /*
1050  * This long block is all about fixing up the lock and request state
1051  * so that it is correct as of the moment _before_ the operation was
1052  * applied; that way, the VFS will think that everything is normal and
1053  * call Lustre's regular VFS methods.
1054  *
1055  * If we're performing a creation, that means that unless the creation
1056  * failed with EEXIST, we should fake up a negative dentry.
1057  *
1058  * For everything else, we want to lookup to succeed.
1059  *
1060  * One additional note: if CREATE or OPEN succeeded, we add an extra
1061  * reference to the request because we need to keep it around until
1062  * ll_create/ll_open gets called.
1063  *
1064  * The server will return to us, in it_disposition, an indication of
1065  * exactly what d.lustre.it_status refers to.
1066  *
1067  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
1068  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1069  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1070  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1071  * was successful.
1072  *
1073  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
1074  * child lookup.
1075  */
1076 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1077                     struct lookup_intent *it, struct ptlrpc_request **reqp,
1078                     ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1079 {
1080         struct ldlm_enqueue_info einfo = {
1081                 .ei_type        = LDLM_IBITS,
1082                 .ei_mode        = it_to_lock_mode(it),
1083                 .ei_cb_bl       = cb_blocking,
1084                 .ei_cb_cp       = ldlm_completion_ast,
1085         };
1086         struct lustre_handle lockh;
1087         int rc = 0;
1088         ENTRY;
1089         LASSERT(it);
1090
1091         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1092                 ", intent: %s flags %#"LPF64"o\n", (int)op_data->op_namelen,
1093                 op_data->op_name, PFID(&op_data->op_fid2),
1094                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1095                 it->it_flags);
1096
1097         lockh.cookie = 0;
1098         if (fid_is_sane(&op_data->op_fid2) &&
1099             (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1100                 /* We could just return 1 immediately, but since we should only
1101                  * be called in revalidate_it if we already have a lock, let's
1102                  * verify that. */
1103                 it->d.lustre.it_lock_handle = 0;
1104                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1105                 /* Only return failure if it was not GETATTR by cfid
1106                    (from inode_revalidate) */
1107                 if (rc || op_data->op_namelen != 0)
1108                         RETURN(rc);
1109         }
1110
1111         /* For case if upper layer did not alloc fid, do it now. */
1112         if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1113                 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1114                 if (rc < 0) {
1115                         CERROR("Can't alloc new fid, rc %d\n", rc);
1116                         RETURN(rc);
1117                 }
1118         }
1119
1120         rc = mdc_enqueue(exp, &einfo, NULL, it, op_data, &lockh,
1121                          extra_lock_flags);
1122         if (rc < 0)
1123                 RETURN(rc);
1124
1125         *reqp = it->d.lustre.it_data;
1126         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1127         RETURN(rc);
1128 }
1129
1130 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1131                                               struct ptlrpc_request *req,
1132                                               void *args, int rc)
1133 {
1134         struct mdc_getattr_args  *ga = args;
1135         struct obd_export        *exp = ga->ga_exp;
1136         struct md_enqueue_info   *minfo = ga->ga_minfo;
1137         struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
1138         struct lookup_intent     *it;
1139         struct lustre_handle     *lockh;
1140         struct obd_device        *obddev;
1141         struct ldlm_reply        *lockrep;
1142         __u64                     flags = LDLM_FL_HAS_INTENT;
1143         ENTRY;
1144
1145         it    = &minfo->mi_it;
1146         lockh = &minfo->mi_lockh;
1147
1148         obddev = class_exp2obd(exp);
1149
1150         obd_put_request_slot(&obddev->u.cli);
1151         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1152                 rc = -ETIMEDOUT;
1153
1154         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1155                                    &flags, NULL, 0, lockh, rc);
1156         if (rc < 0) {
1157                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1158                 mdc_clear_replay_flag(req, rc);
1159                 GOTO(out, rc);
1160         }
1161
1162         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1163         LASSERT(lockrep != NULL);
1164
1165         lockrep->lock_policy_res2 =
1166                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1167
1168         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1169         if (rc)
1170                 GOTO(out, rc);
1171
1172         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1173         EXIT;
1174
1175 out:
1176         minfo->mi_cb(req, minfo, rc);
1177         return 0;
1178 }
1179
1180 int mdc_intent_getattr_async(struct obd_export *exp,
1181                              struct md_enqueue_info *minfo)
1182 {
1183         struct md_op_data       *op_data = &minfo->mi_data;
1184         struct lookup_intent    *it = &minfo->mi_it;
1185         struct ptlrpc_request   *req;
1186         struct mdc_getattr_args *ga;
1187         struct obd_device       *obddev = class_exp2obd(exp);
1188         struct ldlm_res_id       res_id;
1189         union ldlm_policy_data policy = {
1190                                 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1191                                                  MDS_INODELOCK_UPDATE } };
1192         int                      rc = 0;
1193         __u64                    flags = LDLM_FL_HAS_INTENT;
1194         ENTRY;
1195
1196         CDEBUG(D_DLMTRACE, "name: %.*s in inode "DFID", intent: %s flags %#"
1197                 LPF64"o\n",
1198                 (int)op_data->op_namelen, op_data->op_name,
1199                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1200
1201         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1202         req = mdc_intent_getattr_pack(exp, it, op_data);
1203         if (IS_ERR(req))
1204                 RETURN(PTR_ERR(req));
1205
1206         rc = obd_get_request_slot(&obddev->u.cli);
1207         if (rc != 0) {
1208                 ptlrpc_req_finished(req);
1209                 RETURN(rc);
1210         }
1211
1212         rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
1213                               &flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1214         if (rc < 0) {
1215                 obd_put_request_slot(&obddev->u.cli);
1216                 ptlrpc_req_finished(req);
1217                 RETURN(rc);
1218         }
1219
1220         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1221         ga = ptlrpc_req_async_args(req);
1222         ga->ga_exp = exp;
1223         ga->ga_minfo = minfo;
1224
1225         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1226         ptlrpcd_add_req(req);
1227
1228         RETURN(0);
1229 }