Whamcloud - gitweb
Branch b1_6
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of the Lustre file system, http://www.lustre.org
7  *   Lustre is a trademark of Cluster File Systems, Inc.
8  *
9  *   You may have signed or agreed to another license before downloading
10  *   this software.  If so, you are bound by the terms and conditions
11  *   of that agreement, and the following does not apply to you.  See the
12  *   LICENSE file included with this distribution for more information.
13  *
14  *   If you did not agree to a different license, then this copy of Lustre
15  *   is open source software; you can redistribute it and/or modify it
16  *   under the terms of version 2 of the GNU General Public License as
17  *   published by the Free Software Foundation.
18  *
19  *   In either case, Lustre is distributed in the hope that it will be
20  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
21  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22  *   license text for more details.
23  */
24
25 #ifndef EXPORT_SYMTAB
26 # define EXPORT_SYMTAB
27 #endif
28 #define DEBUG_SUBSYSTEM S_MDC
29
30 #ifdef __KERNEL__
31 # include <linux/module.h>
32 # include <linux/pagemap.h>
33 # include <linux/miscdevice.h>
34 # include <linux/init.h>
35 #else
36 # include <liblustre.h>
37 #endif
38
39 #include <obd_class.h>
40 #include <lustre_dlm.h>
41 #include <lprocfs_status.h>
42 #include "mdc_internal.h"
43
44 int it_disposition(struct lookup_intent *it, int flag)
45 {
46         return it->d.lustre.it_disposition & flag;
47 }
48 EXPORT_SYMBOL(it_disposition);
49
50 void it_set_disposition(struct lookup_intent *it, int flag)
51 {
52         it->d.lustre.it_disposition |= flag;
53 }
54 EXPORT_SYMBOL(it_set_disposition);
55
56 void it_clear_disposition(struct lookup_intent *it, int flag)
57 {
58         it->d.lustre.it_disposition &= ~flag;
59 }
60 EXPORT_SYMBOL(it_clear_disposition);
61
62 static int it_to_lock_mode(struct lookup_intent *it)
63 {
64         /* CREAT needs to be tested before open (both could be set) */
65         if (it->it_op & IT_CREAT)
66                 return LCK_CW;
67         else if (it->it_op & (IT_READDIR | IT_GETATTR | IT_OPEN | IT_LOOKUP))
68                 return LCK_CR;
69
70         LBUG();
71         RETURN(-EINVAL);
72 }
73
74 int it_open_error(int phase, struct lookup_intent *it)
75 {
76         if (it_disposition(it, DISP_OPEN_OPEN)) {
77                 if (phase >= DISP_OPEN_OPEN)
78                         return it->d.lustre.it_status;
79                 else
80                         return 0;
81         }
82
83         if (it_disposition(it, DISP_OPEN_CREATE)) {
84                 if (phase >= DISP_OPEN_CREATE)
85                         return it->d.lustre.it_status;
86                 else
87                         return 0;
88         }
89
90         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
91                 if (phase >= DISP_LOOKUP_EXECD)
92                         return it->d.lustre.it_status;
93                 else
94                         return 0;
95         }
96
97         if (it_disposition(it, DISP_IT_EXECD)) {
98                 if (phase >= DISP_IT_EXECD)
99                         return it->d.lustre.it_status;
100                 else
101                         return 0;
102         }
103         CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
104                it->d.lustre.it_status);
105         LBUG();
106         return 0;
107 }
108 EXPORT_SYMBOL(it_open_error);
109
110 /* this must be called on a lockh that is known to have a referenced lock */
111 void mdc_set_lock_data(__u64 *l, void *data)
112 {
113         struct ldlm_lock *lock;
114         struct lustre_handle *lockh = (struct lustre_handle *)l;
115         ENTRY;
116
117         if (!*l) {
118                 EXIT;
119                 return;
120         }
121
122         lock = ldlm_handle2lock(lockh);
123
124         LASSERT(lock != NULL);
125         lock_res_and_lock(lock);
126 #ifdef __KERNEL__
127         if (lock->l_ast_data && lock->l_ast_data != data) {
128                 struct inode *new_inode = data;
129                 struct inode *old_inode = lock->l_ast_data;
130                 LASSERTF(old_inode->i_state & I_FREEING,
131                          "Found existing inode %p/%lu/%u state %lu in lock: "
132                          "setting data to %p/%lu/%u\n", old_inode,
133                          old_inode->i_ino, old_inode->i_generation,
134                          old_inode->i_state,
135                          new_inode, new_inode->i_ino, new_inode->i_generation);
136         }
137 #endif
138         lock->l_ast_data = data;
139         unlock_res_and_lock(lock);
140         LDLM_LOCK_PUT(lock);
141
142         EXIT;
143 }
144 EXPORT_SYMBOL(mdc_set_lock_data);
145
146 int mdc_change_cbdata(struct obd_export *exp, struct ll_fid *fid, 
147                       ldlm_iterator_t it, void *data)
148 {
149         struct ldlm_res_id res_id = { .name = {0} };
150         ENTRY;
151
152         res_id.name[0] = fid->id;
153         res_id.name[1] = fid->generation;
154
155         ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
156                               it, data);
157
158         EXIT;
159         return 0;
160 }
161
162 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
163 {
164         /* Don't hold error requests for replay. */
165         if (req->rq_replay) {
166                 spin_lock(&req->rq_lock);
167                 req->rq_replay = 0;
168                 spin_unlock(&req->rq_lock);
169         }
170         if (rc && req->rq_transno != 0) {
171                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
172                 LBUG();
173         }
174 }
175
176 static int round_up(int val)
177 {
178         int ret = 1;
179         while (val) {
180                 val >>= 1;
181                 ret <<= 1;
182         }
183         return ret;
184 }
185
186 /* Save a large LOV EA into the request buffer so that it is available
187  * for replay.  We don't do this in the initial request because the
188  * original request doesn't need this buffer (at most it sends just the
189  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
190  * buffer and may also be difficult to allocate and save a very large
191  * request buffer for each open. (bug 5707)
192  *
193  * OOM here may cause recovery failure if lmm is needed (only for the
194  * original open if the MDS crashed just when this client also OOM'd)
195  * but this is incredibly unlikely, and questionable whether the client
196  * could do MDS recovery under OOM anyways... */
197 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
198                                 struct mds_body *body, int size[6])
199 {
200         int new_size, old_size;
201         struct lustre_msg *new_msg;
202
203         /* save old size */
204         old_size = lustre_msg_size(lustre_request_magic(req), 6, size);
205
206         size[DLM_INTENT_REC_OFF + 2] = body->eadatasize;
207         new_size = lustre_msg_size(lustre_request_magic(req), 6, size);
208         OBD_ALLOC(new_msg, new_size);
209         if (new_msg != NULL) {
210                 struct lustre_msg *old_msg = req->rq_reqmsg;
211
212                 DEBUG_REQ(D_INFO, req, "replace reqmsg for larger EA %u",
213                           body->eadatasize);
214                 memcpy(new_msg, old_msg, old_size);
215                 lustre_msg_set_buflen(new_msg, DLM_INTENT_REC_OFF + 2,
216                                       body->eadatasize);
217
218                 spin_lock(&req->rq_lock);
219                 req->rq_reqmsg = new_msg;
220                 req->rq_reqlen = new_size;
221                 spin_unlock(&req->rq_lock);
222
223                 OBD_FREE(old_msg, old_size);
224         } else {
225                 body->valid &= ~OBD_MD_FLEASIZE;
226                 body->eadatasize = 0;
227         }
228 }
229
230 /* We always reserve enough space in the reply packet for a stripe MD, because
231  * we don't know in advance the file type. */
232 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
233                 struct lookup_intent *it, struct mdc_op_data *op_data,
234                 struct lustre_handle *lockh, void *lmm, int lmmsize,
235                 int extra_lock_flags)
236 {
237         struct ptlrpc_request *req;
238         struct obd_device *obddev = class_exp2obd(exp);
239         struct ldlm_res_id res_id =
240                 { .name = {op_data->fid1.id, op_data->fid1.generation} };
241         ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
242         struct ldlm_request *lockreq;
243         struct ldlm_intent *lit;
244         struct ldlm_reply *lockrep;
245         int size[7] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
246                         [DLM_LOCKREQ_OFF]     = sizeof(*lockreq),
247                         [DLM_INTENT_IT_OFF]   = sizeof(*lit),
248                         0, 0, 0, 0 };
249         int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
250                            [DLM_LOCKREPLY_OFF]   = sizeof(*lockrep),
251                            [DLM_REPLY_REC_OFF]   = sizeof(struct mds_body),
252                            [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
253                                                    cl_max_mds_easize, 0 };
254         int flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
255         int repbufcnt = 4, rc;
256         void *eadata;
257         ENTRY;
258
259         LASSERTF(einfo->ei_type == LDLM_IBITS,"lock type %d\n", einfo->ei_type);
260 //        LDLM_DEBUG_NOLOCK("mdsintent=%s,name=%s,dir=%lu",
261 //                          ldlm_it2str(it->it_op), it_name, it_inode->i_ino);
262
263         if (it->it_op & IT_OPEN) {
264                 CFS_LIST_HEAD(cancels);
265                 int count = 0;
266                 int mode;
267                 
268                 it->it_create_mode |= S_IFREG;
269
270                 size[DLM_INTENT_REC_OFF] = sizeof(struct mds_rec_create);
271                 size[DLM_INTENT_REC_OFF + 1] = op_data->namelen + 1;
272                 /* As an optimization, we allocate an RPC request buffer for
273                  * at least a default-sized LOV EA even if we aren't sending
274                  * one.  We grow the whole request to the next power-of-two
275                  * size since we get that much from a slab allocation anyways.
276                  * This avoids an allocation below in the common case where
277                  * we need to save a default-sized LOV EA for open replay. */
278                 size[DLM_INTENT_REC_OFF + 2] = max(lmmsize,
279                                           obddev->u.cli.cl_default_mds_easize);
280                 rc = lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic, 6,
281                                      size);
282                 if (rc & (rc - 1))
283                         size[DLM_INTENT_REC_OFF + 2] =
284                                  min(size[DLM_INTENT_REC_OFF+2]+round_up(rc)-rc,
285                                      obddev->u.cli.cl_max_mds_easize);
286
287                 /* If inode is known, cancel conflicting OPEN locks. */
288                 if (op_data->fid2.id) {
289                         if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
290                                 mode = LCK_CW;
291 #ifdef FMODE_EXEC
292                         else if (it->it_flags & FMODE_EXEC)
293                                 mode = LCK_PR;
294 #endif
295                         else 
296                                 mode = LCK_CR;
297                         count = mdc_resource_get_unused(exp, &op_data->fid2,
298                                                         &cancels, mode,
299                                                         MDS_INODELOCK_OPEN);
300                 }
301
302                 /* If CREATE or JOIN_FILE, cancel parent's UPDATE lock. */
303                 if (it->it_op & IT_CREAT || it->it_flags & O_JOIN_FILE)
304                         mode = LCK_EX;
305                 else
306                         mode = LCK_CR;
307                 count += mdc_resource_get_unused(exp, &op_data->fid1, &cancels,
308                                                  mode, MDS_INODELOCK_UPDATE);
309                 if (it->it_flags & O_JOIN_FILE) {
310                         /* join is like an unlink of the tail */
311                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
312                         size[DLM_INTENT_REC_OFF + 3] =
313                                                  sizeof(struct mds_rec_join);
314                         req = ldlm_prep_enqueue_req(exp, 7, size, &cancels,
315                                                     count);
316                         mdc_join_pack(req, DLM_INTENT_REC_OFF + 3, op_data,
317                                       (*(__u64 *)op_data->data));
318                 } else {
319                         req = ldlm_prep_enqueue_req(exp, 6, size, &cancels,
320                                                     count);
321                 }
322
323                 if (!req)
324                         RETURN(-ENOMEM);
325
326                 spin_lock(&req->rq_lock);
327                 req->rq_replay = 1;
328                 spin_unlock(&req->rq_lock);
329
330                 /* pack the intent */
331                 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
332                                      sizeof(*lit));
333                 lit->opc = (__u64)it->it_op;
334
335                 /* pack the intended request */
336                 mdc_open_pack(req, DLM_INTENT_REC_OFF, op_data,
337                               it->it_create_mode, 0, it->it_flags,
338                               lmm, lmmsize);
339
340                 repsize[repbufcnt++] = LUSTRE_POSIX_ACL_MAX_SIZE;
341         } else if (it->it_op & IT_UNLINK) {
342                 size[DLM_INTENT_REC_OFF] = sizeof(struct mds_rec_unlink);
343                 size[DLM_INTENT_REC_OFF + 1] = op_data->namelen + 1;
344                 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
345                 req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0);
346                 if (!req)
347                         RETURN(-ENOMEM);
348
349                 /* pack the intent */
350                 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
351                                      sizeof(*lit));
352                 lit->opc = (__u64)it->it_op;
353
354                 /* pack the intended request */
355                 mdc_unlink_pack(req, DLM_INTENT_REC_OFF, op_data);
356
357                 repsize[repbufcnt++] = obddev->u.cli.cl_max_mds_cookiesize;
358         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
359                 obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
360                                   OBD_MD_FLACL | OBD_MD_FLMODEASIZE |
361                                   OBD_MD_FLDIREA;
362                 size[DLM_INTENT_REC_OFF] = sizeof(struct mds_body);
363                 size[DLM_INTENT_REC_OFF + 1] = op_data->namelen + 1;
364
365                 if (it->it_op & IT_GETATTR)
366                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
367
368                 req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0);
369                 if (!req)
370                         RETURN(-ENOMEM);
371
372                 /* pack the intent */
373                 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
374                                      sizeof(*lit));
375                 lit->opc = (__u64)it->it_op;
376
377                 /* pack the intended request */
378                 mdc_getattr_pack(req, DLM_INTENT_REC_OFF, valid,
379                                  it->it_flags, op_data);
380
381                 repsize[repbufcnt++] = LUSTRE_POSIX_ACL_MAX_SIZE;
382         } else if (it->it_op == IT_READDIR) {
383                 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
384                 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
385                 if (!req)
386                         RETURN(-ENOMEM);
387
388                 repbufcnt = 2;
389         } else {
390                 LBUG();
391                 RETURN(-EINVAL);
392         }
393
394         /* get ready for the reply */
395         ptlrpc_req_set_repsize(req, repbufcnt, repsize);
396
397          /* It is important to obtain rpc_lock first (if applicable), so that
398           * threads that are serialised with rpc_lock are not polluting our
399           * rpcs in flight counter */
400         mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
401         mdc_enter_request(&obddev->u.cli);
402         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL,
403                               0, NULL, lockh, 0);
404         mdc_exit_request(&obddev->u.cli);
405         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
406
407         /* Similarly, if we're going to replay this request, we don't want to
408          * actually get a lock, just perform the intent. */
409         if (req->rq_transno || req->rq_replay) {
410                 lockreq = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF,
411                                          sizeof(*lockreq));
412                 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
413         }
414
415         if (rc == ELDLM_LOCK_ABORTED) {
416                 einfo->ei_mode = 0;
417                 memset(lockh, 0, sizeof(*lockh));
418                 rc = 0;
419         } else if (rc != 0) {
420                 CERROR("ldlm_cli_enqueue: %d\n", rc);
421                 LASSERTF(rc < 0, "rc %d\n", rc);
422                 mdc_clear_replay_flag(req, rc);
423                 ptlrpc_req_finished(req);
424                 RETURN(rc);
425         } else { /* rc = 0 */
426                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
427                 LASSERT(lock);
428
429                 /* If the server gave us back a different lock mode, we should
430                  * fix up our variables. */
431                 if (lock->l_req_mode != einfo->ei_mode) {
432                         ldlm_lock_addref(lockh, lock->l_req_mode);
433                         ldlm_lock_decref(lockh, einfo->ei_mode);
434                         einfo->ei_mode = lock->l_req_mode;
435                 }
436                 LDLM_LOCK_PUT(lock);
437         }
438
439         lockrep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
440                                  sizeof(*lockrep));
441         LASSERT(lockrep != NULL);                 /* checked by ldlm_cli_enqueue() */
442         LASSERT_REPSWABBED(req, DLM_LOCKREPLY_OFF); /* swabbed by ldlm_cli_enqueue() */
443
444         it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
445         it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
446         it->d.lustre.it_lock_mode = einfo->ei_mode;
447         it->d.lustre.it_data = req;
448
449         if (it->d.lustre.it_status < 0 && req->rq_replay)
450                 mdc_clear_replay_flag(req, it->d.lustre.it_status);
451
452         /* If we're doing an IT_OPEN which did not result in an actual
453          * successful open, then we need to remove the bit which saves
454          * this request for unconditional replay.
455          *
456          * It's important that we do this first!  Otherwise we might exit the
457          * function without doing so, and try to replay a failed create
458          * (bug 3440) */
459         if (it->it_op & IT_OPEN && req->rq_replay &&
460             (!it_disposition(it, DISP_OPEN_OPEN) ||it->d.lustre.it_status != 0))
461                 mdc_clear_replay_flag(req, it->d.lustre.it_status);
462
463         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
464                   it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
465
466         /* We know what to expect, so we do any byte flipping required here */
467         LASSERT(repbufcnt == 5 || repbufcnt == 2);
468         if (repbufcnt == 5) {
469                 struct mds_body *body;
470
471                 body = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF, sizeof(*body),
472                                          lustre_swab_mds_body);
473                 if (body == NULL) {
474                         CERROR ("Can't swab mds_body\n");
475                         RETURN (-EPROTO);
476                 }
477
478                 /* If this is a successful OPEN request, we need to set
479                    replay handler and data early, so that if replay happens
480                    immediately after swabbing below, new reply is swabbed
481                    by that handler correctly */
482                 if (it_disposition(it, DISP_OPEN_OPEN) &&
483                     !it_open_error(DISP_OPEN_OPEN, it))
484                         mdc_set_open_replay_data(NULL, req);
485
486                 if ((body->valid & OBD_MD_FLEASIZE) != 0) {
487                         /* The eadata is opaque; just check that it is there.
488                          * Eventually, obd_unpackmd() will check the contents */
489                         eadata = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF + 1,
490                                                     body->eadatasize, NULL);
491                         if (eadata == NULL) {
492                                 CERROR ("Missing/short eadata\n");
493                                 RETURN (-EPROTO);
494                         }
495                         if (body->valid & OBD_MD_FLMODEASIZE) {
496                                 if (obddev->u.cli.cl_max_mds_easize < 
497                                                         body->max_mdsize) {
498                                         obddev->u.cli.cl_max_mds_easize = 
499                                                 body->max_mdsize;
500                                         CDEBUG(D_INFO, "maxeasize become %d\n",
501                                                body->max_mdsize);
502                                 }
503                                 if (obddev->u.cli.cl_max_mds_cookiesize <
504                                                         body->max_cookiesize) {
505                                         obddev->u.cli.cl_max_mds_cookiesize =
506                                                 body->max_cookiesize;
507                                         CDEBUG(D_INFO, "cookiesize become %d\n",
508                                                body->max_cookiesize);
509                                 }
510                         }
511                         /* We save the reply LOV EA in case we have to replay
512                          * a create for recovery.  If we didn't allocate a
513                          * large enough request buffer above we need to
514                          * reallocate it here to hold the actual LOV EA. */
515                         if (it->it_op & IT_OPEN) {
516                                 int offset = DLM_INTENT_REC_OFF + 2;
517
518                                 if (lustre_msg_buflen(req->rq_reqmsg, offset) <
519                                     body->eadatasize)
520                                         mdc_realloc_openmsg(req, body, size);
521
522                                 lmm = lustre_msg_buf(req->rq_reqmsg, offset,
523                                                      body->eadatasize);
524                                 if (lmm)
525                                         memcpy(lmm, eadata, body->eadatasize);
526                         }
527                 }
528         }
529
530         RETURN(rc);
531 }
532 EXPORT_SYMBOL(mdc_enqueue);
533
534 /* 
535  * This long block is all about fixing up the lock and request state
536  * so that it is correct as of the moment _before_ the operation was
537  * applied; that way, the VFS will think that everything is normal and
538  * call Lustre's regular VFS methods.
539  *
540  * If we're performing a creation, that means that unless the creation
541  * failed with EEXIST, we should fake up a negative dentry.
542  *
543  * For everything else, we want to lookup to succeed.
544  *
545  * One additional note: if CREATE or OPEN succeeded, we add an extra
546  * reference to the request because we need to keep it around until
547  * ll_create/ll_open gets called.
548  *
549  * The server will return to us, in it_disposition, an indication of
550  * exactly what d.lustre.it_status refers to.
551  *
552  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
553  * otherwise if DISP_OPEN_CREATE is set, then it status is the
554  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
555  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
556  * was successful.
557  *
558  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
559  * child lookup.
560  */
561 int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data,
562                     void *lmm, int lmmsize, struct lookup_intent *it,
563                     int lookup_flags, struct ptlrpc_request **reqp,
564                     ldlm_blocking_callback cb_blocking, int extra_lock_flags)
565 {
566         struct lustre_handle lockh;
567         struct ptlrpc_request *request;
568         int rc = 0;
569         struct mds_body *mds_body;
570         struct lustre_handle old_lock;
571         struct ldlm_lock *lock;
572         ENTRY;
573         LASSERT(it);
574
575         CDEBUG(D_DLMTRACE,"name: %.*s in inode "LPU64", intent: %s flags %#o\n",
576                op_data->namelen, op_data->name, op_data->fid1.id,
577                ldlm_it2str(it->it_op), it->it_flags);
578
579         if (op_data->fid2.id &&
580             (it->it_op == IT_LOOKUP || it->it_op == IT_GETATTR)) {
581                 /* We could just return 1 immediately, but since we should only
582                  * be called in revalidate_it if we already have a lock, let's
583                  * verify that. */
584                 struct ldlm_res_id res_id = {.name ={op_data->fid2.id,
585                                                      op_data->fid2.generation}};
586                 struct lustre_handle lockh;
587                 ldlm_policy_data_t policy;
588                 int mode = LCK_CR;
589
590                 /* As not all attributes are kept under update lock, e.g. 
591                    owner/group/acls are under lookup lock, we need both 
592                    ibits for GETATTR. */
593                 policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
594                         MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP :
595                         MDS_INODELOCK_LOOKUP;
596
597                 rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
598                                      LDLM_FL_BLOCK_GRANTED, &res_id,
599                                      LDLM_IBITS, &policy, LCK_CR, &lockh);
600                 if (!rc) {
601                         mode = LCK_CW;
602                         rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
603                                              LDLM_FL_BLOCK_GRANTED, &res_id,
604                                              LDLM_IBITS, &policy,LCK_CW,&lockh);
605                 }
606                 if (!rc) {
607                         mode = LCK_PR;
608                         rc = ldlm_lock_match(exp->exp_obd->obd_namespace,
609                                              LDLM_FL_BLOCK_GRANTED, &res_id,
610                                              LDLM_IBITS, &policy,LCK_PR,&lockh);
611                 }
612                 if (rc) {
613                         memcpy(&it->d.lustre.it_lock_handle, &lockh,
614                                sizeof(lockh));
615                         it->d.lustre.it_lock_mode = mode;
616                 }
617
618                 /* Only return failure if it was not GETATTR by cfid
619                    (from inode_revalidate) */
620                 if (rc || op_data->namelen != 0)
621                         RETURN(rc);
622         }
623
624         /* lookup_it may be called only after revalidate_it has run, because
625          * revalidate_it cannot return errors, only zero.  Returning zero causes
626          * this call to lookup, which *can* return an error.
627          *
628          * We only want to execute the request associated with the intent one
629          * time, however, so don't send the request again.  Instead, skip past
630          * this and use the request from revalidate.  In this case, revalidate
631          * never dropped its reference, so the refcounts are all OK */
632         if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
633                 struct ldlm_enqueue_info einfo =
634                         { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
635                           ldlm_completion_ast, NULL, NULL };
636
637                 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
638                                  lmm, lmmsize, extra_lock_flags);
639                 if (rc < 0)
640                         RETURN(rc);
641                 memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh));
642         } else if (!op_data->fid2.id) {
643                 /* DISP_ENQ_COMPLETE set means there is extra reference on
644                  * request referenced from this intent, saved for subsequent
645                  * lookup.  This path is executed when we proceed to this
646                  * lookup, so we clear DISP_ENQ_COMPLETE */
647                 it_clear_disposition(it, DISP_ENQ_COMPLETE);
648         }
649         request = *reqp = it->d.lustre.it_data;
650         LASSERT(request != NULL);
651         LASSERT(request != LP_POISON);
652         LASSERT(request->rq_repmsg != LP_POISON);
653
654         if (!it_disposition(it, DISP_IT_EXECD)) {
655                 /* The server failed before it even started executing the
656                  * intent, i.e. because it couldn't unpack the request. */
657                 LASSERT(it->d.lustre.it_status != 0);
658                 RETURN(it->d.lustre.it_status);
659         }
660         rc = it_open_error(DISP_IT_EXECD, it);
661         if (rc)
662                 RETURN(rc);
663
664         mds_body = lustre_msg_buf(request->rq_repmsg, DLM_REPLY_REC_OFF,
665                                   sizeof(*mds_body));
666         LASSERT(mds_body != NULL);           /* mdc_enqueue checked */
667         LASSERT_REPSWABBED(request, 1); /* mdc_enqueue swabbed */
668
669         /* If we were revalidating a fid/name pair, mark the intent in
670          * case we fail and get called again from lookup */
671         if (op_data->fid2.id && (it->it_op != IT_GETATTR)) {
672                 it_set_disposition(it, DISP_ENQ_COMPLETE);
673                 /* Also: did we find the same inode? */
674                 if (memcmp(&op_data->fid2, &mds_body->fid1,
675                            sizeof(op_data->fid2)))
676                         RETURN (-ESTALE);
677         }
678
679         rc = it_open_error(DISP_LOOKUP_EXECD, it);
680         if (rc)
681                 RETURN(rc);
682
683         /* keep requests around for the multiple phases of the call
684          * this shows the DISP_XX must guarantee we make it into the call
685          */
686         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
687             it_disposition(it, DISP_OPEN_CREATE) &&
688             !it_open_error(DISP_OPEN_CREATE, it)) {
689                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
690                 ptlrpc_request_addref(request); /* balanced in ll_create_node */
691         }
692         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
693             it_disposition(it, DISP_OPEN_OPEN) &&
694             !it_open_error(DISP_OPEN_OPEN, it)) {
695                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
696                 ptlrpc_request_addref(request); /* balanced in ll_file_open */
697                 /* BUG 11546 - eviction in the middle of open rpc processing */
698                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
699         }
700
701         if (it->it_op & IT_CREAT) {
702                 /* XXX this belongs in ll_create_it */
703         } else if (it->it_op == IT_OPEN) {
704                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
705         } else {
706                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
707         }
708
709         /* If we already have a matching lock, then cancel the new
710          * one.  We have to set the data here instead of in
711          * mdc_enqueue, because we need to use the child's inode as
712          * the l_ast_data to match, and that's not available until
713          * intent_finish has performed the iget().) */
714         lock = ldlm_handle2lock(&lockh);
715         if (lock) {
716                 ldlm_policy_data_t policy = lock->l_policy_data;
717                 LDLM_DEBUG(lock, "matching against this");
718                 LDLM_LOCK_PUT(lock);
719                 memcpy(&old_lock, &lockh, sizeof(lockh));
720                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
721                                     LDLM_IBITS, &policy, LCK_NL, &old_lock)) {
722                         ldlm_lock_decref_and_cancel(&lockh,
723                                                     it->d.lustre.it_lock_mode);
724                         memcpy(&lockh, &old_lock, sizeof(old_lock));
725                         memcpy(&it->d.lustre.it_lock_handle, &lockh,
726                                sizeof(lockh));
727                 }
728         }
729         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
730                op_data->namelen, op_data->name, ldlm_it2str(it->it_op),
731                it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
732
733         RETURN(rc);
734 }
735 EXPORT_SYMBOL(mdc_intent_lock);