Whamcloud - gitweb
Branch b1_6
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of the Lustre file system, http://www.lustre.org
7  *   Lustre is a trademark of Cluster File Systems, Inc.
8  *
9  *   You may have signed or agreed to another license before downloading
10  *   this software.  If so, you are bound by the terms and conditions
11  *   of that agreement, and the following does not apply to you.  See the
12  *   LICENSE file included with this distribution for more information.
13  *
14  *   If you did not agree to a different license, then this copy of Lustre
15  *   is open source software; you can redistribute it and/or modify it
16  *   under the terms of version 2 of the GNU General Public License as
17  *   published by the Free Software Foundation.
18  *
19  *   In either case, Lustre is distributed in the hope that it will be
20  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
21  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22  *   license text for more details.
23  */
24
25 #ifndef EXPORT_SYMTAB
26 # define EXPORT_SYMTAB
27 #endif
28 #define DEBUG_SUBSYSTEM S_MDC
29
30 #ifdef __KERNEL__
31 # include <linux/module.h>
32 # include <linux/pagemap.h>
33 # include <linux/miscdevice.h>
34 # include <linux/init.h>
35 #else
36 # include <liblustre.h>
37 #endif
38
39 #include <obd_class.h>
40 #include <lustre_dlm.h>
41 #include <lprocfs_status.h>
42 #include "mdc_internal.h"
43
44 int it_disposition(struct lookup_intent *it, int flag)
45 {
46         return it->d.lustre.it_disposition & flag;
47 }
48 EXPORT_SYMBOL(it_disposition);
49
50 void it_set_disposition(struct lookup_intent *it, int flag)
51 {
52         it->d.lustre.it_disposition |= flag;
53 }
54 EXPORT_SYMBOL(it_set_disposition);
55
56 void it_clear_disposition(struct lookup_intent *it, int flag)
57 {
58         it->d.lustre.it_disposition &= ~flag;
59 }
60 EXPORT_SYMBOL(it_clear_disposition);
61
62 int it_open_error(int phase, struct lookup_intent *it)
63 {
64         if (it_disposition(it, DISP_OPEN_OPEN)) {
65                 if (phase >= DISP_OPEN_OPEN)
66                         return it->d.lustre.it_status;
67                 else
68                         return 0;
69         }
70
71         if (it_disposition(it, DISP_OPEN_CREATE)) {
72                 if (phase >= DISP_OPEN_CREATE)
73                         return it->d.lustre.it_status;
74                 else
75                         return 0;
76         }
77
78         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
79                 if (phase >= DISP_LOOKUP_EXECD)
80                         return it->d.lustre.it_status;
81                 else
82                         return 0;
83         }
84
85         if (it_disposition(it, DISP_IT_EXECD)) {
86                 if (phase >= DISP_IT_EXECD)
87                         return it->d.lustre.it_status;
88                 else
89                         return 0;
90         }
91         CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
92                it->d.lustre.it_status);
93         LBUG();
94         return 0;
95 }
96 EXPORT_SYMBOL(it_open_error);
97
98 /* this must be called on a lockh that is known to have a referenced lock */
99 void mdc_set_lock_data(__u64 *l, void *data)
100 {
101         struct ldlm_lock *lock;
102         struct lustre_handle *lockh = (struct lustre_handle *)l;
103         ENTRY;
104
105         if (!*l) {
106                 EXIT;
107                 return;
108         }
109
110         lock = ldlm_handle2lock(lockh);
111
112         LASSERT(lock != NULL);
113         lock_res_and_lock(lock);
114 #ifdef __KERNEL__
115         if (lock->l_ast_data && lock->l_ast_data != data) {
116                 struct inode *new_inode = data;
117                 struct inode *old_inode = lock->l_ast_data;
118                 LASSERTF(old_inode->i_state & I_FREEING,
119                          "Found existing inode %p/%lu/%u state %lu in lock: "
120                          "setting data to %p/%lu/%u\n", old_inode,
121                          old_inode->i_ino, old_inode->i_generation,
122                          old_inode->i_state,
123                          new_inode, new_inode->i_ino, new_inode->i_generation);
124         }
125 #endif
126         lock->l_ast_data = data;
127         unlock_res_and_lock(lock);
128         LDLM_LOCK_PUT(lock);
129
130         EXIT;
131 }
132 EXPORT_SYMBOL(mdc_set_lock_data);
133
134 int mdc_change_cbdata(struct obd_export *exp, struct ll_fid *fid, 
135                       ldlm_iterator_t it, void *data)
136 {
137         struct ldlm_res_id res_id = { .name = {0} };
138         ENTRY;
139
140         res_id.name[0] = fid->id;
141         res_id.name[1] = fid->generation;
142
143         ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
144                               it, data);
145
146         EXIT;
147         return 0;
148 }
149
150 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
151 {
152         /* Don't hold error requests for replay. */
153         if (req->rq_replay) {
154                 spin_lock(&req->rq_lock);
155                 req->rq_replay = 0;
156                 spin_unlock(&req->rq_lock);
157         }
158         if (rc && req->rq_transno != 0) {
159                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
160                 LBUG();
161         }
162 }
163
164 static int round_up(int val)
165 {
166         int ret = 1;
167         while (val) {
168                 val >>= 1;
169                 ret <<= 1;
170         }
171         return ret;
172 }
173
174 /* Save a large LOV EA into the request buffer so that it is available
175  * for replay.  We don't do this in the initial request because the
176  * original request doesn't need this buffer (at most it sends just the
177  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
178  * buffer and may also be difficult to allocate and save a very large
179  * request buffer for each open. (bug 5707)
180  *
181  * OOM here may cause recovery failure if lmm is needed (only for the
182  * original open if the MDS crashed just when this client also OOM'd)
183  * but this is incredibly unlikely, and questionable whether the client
184  * could do MDS recovery under OOM anyways... */
185 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
186                                 struct mds_body *body)
187 {
188         int old_len, new_size, old_size;
189         struct lustre_msg *old_msg = req->rq_reqmsg;
190         struct lustre_msg *new_msg;
191
192         old_len = lustre_msg_buflen(old_msg, DLM_INTENT_REC_OFF + 2);
193         old_size = lustre_packed_msg_size(old_msg);
194         lustre_msg_set_buflen(old_msg, DLM_INTENT_REC_OFF + 2,
195                               body->eadatasize);
196         new_size = lustre_packed_msg_size(old_msg);
197
198         OBD_ALLOC(new_msg, new_size);
199         if (new_msg != NULL) {
200                 DEBUG_REQ(D_INFO, req, "replace reqmsg for larger EA %u\n",
201                           body->eadatasize);
202                 memcpy(new_msg, old_msg, old_size);
203
204                 spin_lock(&req->rq_lock);
205                 req->rq_reqmsg = new_msg;
206                 req->rq_reqlen = new_size;
207                 spin_unlock(&req->rq_lock);
208
209                 OBD_FREE(old_msg, old_size);
210         } else {
211                 lustre_msg_set_buflen(old_msg, DLM_INTENT_REC_OFF + 2, old_len);
212                 body->valid &= ~OBD_MD_FLEASIZE;
213                 body->eadatasize = 0;
214         }
215 }
216
217 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
218                                                    struct lookup_intent *it,
219                                                    struct mdc_op_data *data,
220                                                    void *lmm, int lmmsize)
221 {
222         struct ptlrpc_request *req;
223         struct ldlm_intent *lit;
224         struct obd_device *obddev = class_exp2obd(exp);
225         int size[7] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
226                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
227                         [DLM_INTENT_IT_OFF]   = sizeof(*lit),
228                         [DLM_INTENT_REC_OFF]  = sizeof(struct mds_rec_create),
229                         [DLM_INTENT_REC_OFF+1]= data->namelen + 1,
230                         /* As an optimization, we allocate an RPC request buffer
231                          * for at least a default-sized LOV EA even if we aren't
232                          * sending one.  We grow the whole request to the next
233                          * power-of-two size since we get that much from a slab
234                          * allocation anyways. This avoids an allocation below
235                          * in the common case where we need to save a
236                          * default-sized LOV EA for open replay. */
237                         [DLM_INTENT_REC_OFF+2]= max(lmmsize,
238                                          obddev->u.cli.cl_default_mds_easize) };
239         int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
240                            [DLM_LOCKREPLY_OFF]   = sizeof(struct ldlm_reply),
241                            [DLM_REPLY_REC_OFF]   = sizeof(struct mds_body),
242                            [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
243                                                         cl_max_mds_easize,
244                            [DLM_REPLY_REC_OFF+2] = LUSTRE_POSIX_ACL_MAX_SIZE };
245         CFS_LIST_HEAD(cancels);
246         int do_join = (it->it_flags & O_JOIN_FILE) && data->data;
247         int count = 0;
248         int mode;
249         int rc;
250
251         it->it_create_mode |= S_IFREG;
252
253         rc = lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic, 6, size);
254         if (rc & (rc - 1))
255                 size[DLM_INTENT_REC_OFF + 2] =
256                          min(size[DLM_INTENT_REC_OFF + 2] + round_up(rc) - rc,
257                                      obddev->u.cli.cl_max_mds_easize);
258
259                 /* If inode is known, cancel conflicting OPEN locks. */
260         if (data->fid2.id) {
261                 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
262                         mode = LCK_CW;
263 #ifdef FMODE_EXEC
264                 else if (it->it_flags & FMODE_EXEC)
265                         mode = LCK_PR;
266 #endif
267                 else
268                         mode = LCK_CR;
269                 count = mdc_resource_get_unused(exp, &data->fid2, &cancels,
270                                                 mode, MDS_INODELOCK_OPEN);
271         }
272
273         /* If CREATE or JOIN_FILE, cancel parent's UPDATE lock. */
274         if (it->it_op & IT_CREAT || do_join)
275                 mode = LCK_EX;
276         else
277                 mode = LCK_CR;
278         count += mdc_resource_get_unused(exp, &data->fid1, &cancels, mode,
279                                          MDS_INODELOCK_UPDATE);
280         if (do_join) {
281                 __u64 head_size = (*(__u64 *)data->data);
282                         /* join is like an unlink of the tail */
283                 size[DLM_INTENT_REC_OFF + 3] = sizeof(struct mds_rec_join);
284                 req = ldlm_prep_enqueue_req(exp, 7, size, &cancels, count);
285                 if (req)
286                         mdc_join_pack(req, DLM_INTENT_REC_OFF + 3, data, 
287                                       head_size);
288         } else {
289                 req = ldlm_prep_enqueue_req(exp, 6, size, &cancels, count);
290                 it->it_flags &= ~O_JOIN_FILE;
291         }
292
293         if (req) {
294                 spin_lock(&req->rq_lock);
295                 req->rq_replay = 1;
296                 spin_unlock(&req->rq_lock);
297
298                 /* pack the intent */
299                 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
300                                      sizeof(*lit));
301                 lit->opc = (__u64)it->it_op;
302
303                 /* pack the intended request */
304                 mdc_open_pack(req, DLM_INTENT_REC_OFF, data,
305                               it->it_create_mode, 0, it->it_flags,
306                               lmm, lmmsize);
307
308                 ptlrpc_req_set_repsize(req, 5, repsize);
309         }
310         return req;
311 }
312
313 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
314                                                      struct lookup_intent *it,
315                                                      struct mdc_op_data *data)
316 {
317         struct ptlrpc_request *req;
318         struct ldlm_intent *lit;
319         struct obd_device *obddev = class_exp2obd(exp);
320         int size[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
321                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
322                         [DLM_INTENT_IT_OFF]   = sizeof(*lit),
323                         [DLM_INTENT_REC_OFF]  = sizeof(struct mds_rec_unlink),
324                         [DLM_INTENT_REC_OFF+1]= data->namelen + 1 };
325         int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
326                            [DLM_LOCKREPLY_OFF]   = sizeof(struct ldlm_reply),
327                            [DLM_REPLY_REC_OFF]   = sizeof(struct mds_body),
328                            [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
329                                                         cl_max_mds_easize,
330                            [DLM_REPLY_REC_OFF+2] = obddev->u.cli.
331                                                         cl_max_mds_cookiesize };
332
333         req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0);
334         if (req) {
335                 /* pack the intent */
336                 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
337                                      sizeof(*lit));
338                 lit->opc = (__u64)it->it_op;
339
340                 /* pack the intended request */
341                 mdc_unlink_pack(req, DLM_INTENT_REC_OFF, data);
342
343                 ptlrpc_req_set_repsize(req, 5, repsize);
344         }
345         return req;
346 }
347
348 static struct ptlrpc_request *mdc_intent_lookup_pack(struct obd_export *exp,
349                                                      struct lookup_intent *it,
350                                                      struct mdc_op_data *data)
351 {
352         struct ptlrpc_request *req;
353         struct ldlm_intent *lit;
354         struct obd_device *obddev = class_exp2obd(exp);
355         int size[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
356                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
357                         [DLM_INTENT_IT_OFF]   = sizeof(*lit),
358                         [DLM_INTENT_REC_OFF]  = sizeof(struct mds_body),
359                         [DLM_INTENT_REC_OFF+1]= data->namelen + 1 };
360         int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
361                            [DLM_LOCKREPLY_OFF]   = sizeof(struct ldlm_reply),
362                            [DLM_REPLY_REC_OFF]   = sizeof(struct mds_body),
363                            [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
364                                                         cl_max_mds_easize,
365                            [DLM_REPLY_REC_OFF+2] = LUSTRE_POSIX_ACL_MAX_SIZE };
366         obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | OBD_MD_FLACL |
367                           OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA;
368
369                 req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0);
370         if (req) {
371                 /* pack the intent */
372                 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
373                                      sizeof(*lit));
374                 lit->opc = (__u64)it->it_op;
375
376                 /* pack the intended request */
377                 mdc_getattr_pack(req, DLM_INTENT_REC_OFF, valid, it->it_flags,
378                                  data);
379                 ptlrpc_req_set_repsize(req, 5, repsize);
380         }
381         return req;
382 }
383
384 static struct ptlrpc_request *mdc_intent_readdir_pack(struct obd_export *exp)
385 {
386         struct ptlrpc_request *req;
387         int size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
388                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request) };
389         int repsize[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
390                            [DLM_LOCKREPLY_OFF]   = sizeof(struct ldlm_reply) };
391
392         req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
393         if (req)
394                 ptlrpc_req_set_repsize(req, 2, repsize);
395         return req;
396 }
397
398 static int mdc_finish_enqueue(struct obd_export *exp,
399                               struct ptlrpc_request *req,
400                               struct ldlm_enqueue_info *einfo,
401                               struct lookup_intent *it,
402                               struct lustre_handle *lockh,
403                               int rc)
404 {
405         struct ldlm_request *lockreq;
406         struct ldlm_reply *lockrep;
407         ENTRY;
408
409         /* Similarly, if we're going to replay this request, we don't want to
410          * actually get a lock, just perform the intent. */
411         if (req->rq_transno || req->rq_replay) {
412                 lockreq = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF,
413                                          sizeof(*lockreq));
414                 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
415         }
416
417         if (rc == ELDLM_LOCK_ABORTED) {
418                 einfo->ei_mode = 0;
419                 memset(lockh, 0, sizeof(*lockh));
420                 rc = 0;
421         } else if (rc != 0) {
422                 CERROR("ldlm_cli_enqueue: %d\n", rc);
423                 LASSERTF(rc < 0, "rc %d\n", rc);
424                 mdc_clear_replay_flag(req, rc);
425                 ptlrpc_req_finished(req);
426                 RETURN(rc);
427         } else { /* rc = 0 */
428                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
429                 LASSERT(lock);
430
431                 /* If the server gave us back a different lock mode, we should
432                  * fix up our variables. */
433                 if (lock->l_req_mode != einfo->ei_mode) {
434                         ldlm_lock_addref(lockh, lock->l_req_mode);
435                         ldlm_lock_decref(lockh, einfo->ei_mode);
436                         einfo->ei_mode = lock->l_req_mode;
437                 }
438                 LDLM_LOCK_PUT(lock);
439         }
440
441         lockrep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
442                                  sizeof(*lockrep));
443         LASSERT(lockrep != NULL);                 /* checked by ldlm_cli_enqueue() */
444         /* swabbed by ldlm_cli_enqueue() */
445         LASSERT(lustre_rep_swabbed(req, DLM_LOCKREPLY_OFF));
446
447         it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
448         it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
449         it->d.lustre.it_lock_mode = einfo->ei_mode;
450         it->d.lustre.it_data = req;
451
452         if (it->d.lustre.it_status < 0 && req->rq_replay)
453                 mdc_clear_replay_flag(req, it->d.lustre.it_status);
454
455         /* If we're doing an IT_OPEN which did not result in an actual
456          * successful open, then we need to remove the bit which saves
457          * this request for unconditional replay.
458          *
459          * It's important that we do this first!  Otherwise we might exit the
460          * function without doing so, and try to replay a failed create
461          * (bug 3440) */
462         if ((it->it_op & IT_OPEN) &&
463             req->rq_replay &&
464             (!it_disposition(it, DISP_OPEN_OPEN) ||
465              it->d.lustre.it_status != 0))
466                 mdc_clear_replay_flag(req, it->d.lustre.it_status);
467
468         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
469                   it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
470
471         /* We know what to expect, so we do any byte flipping required here */
472         if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
473                 struct mds_body *body;
474
475                 body = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF, sizeof(*body),
476                                          lustre_swab_mds_body);
477                 if (body == NULL) {
478                         CERROR ("Can't swab mds_body\n");
479                         RETURN (-EPROTO);
480                 }
481
482                 /* If this is a successful OPEN request, we need to set
483                    replay handler and data early, so that if replay happens
484                    immediately after swabbing below, new reply is swabbed
485                    by that handler correctly */
486                 if (it_disposition(it, DISP_OPEN_OPEN) &&
487                     !it_open_error(DISP_OPEN_OPEN, it))
488                         mdc_set_open_replay_data(NULL, req);
489
490                 if ((body->valid & OBD_MD_FLEASIZE) != 0) {
491                         void *eadata;
492
493                         /* The eadata is opaque; just check that it is there.
494                          * Eventually, obd_unpackmd() will check the contents */
495                         eadata = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF + 1,
496                                                     body->eadatasize, NULL);
497                         if (eadata == NULL) {
498                                 CERROR ("Missing/short eadata\n");
499                                 RETURN (-EPROTO);
500                         }
501                         if (body->valid & OBD_MD_FLMODEASIZE) {
502                                 struct obd_device *obddev = class_exp2obd(exp);
503
504                                 if (obddev->u.cli.cl_max_mds_easize < 
505                                                         body->max_mdsize) {
506                                         obddev->u.cli.cl_max_mds_easize = 
507                                                 body->max_mdsize;
508                                         CDEBUG(D_INFO, "maxeasize become %d\n",
509                                                body->max_mdsize);
510                                 }
511                                 if (obddev->u.cli.cl_max_mds_cookiesize <
512                                                         body->max_cookiesize) {
513                                         obddev->u.cli.cl_max_mds_cookiesize =
514                                                 body->max_cookiesize;
515                                         CDEBUG(D_INFO, "cookiesize become %d\n",
516                                                body->max_cookiesize);
517                                 }
518                         }
519                         /* We save the reply LOV EA in case we have to replay
520                          * a create for recovery.  If we didn't allocate a
521                          * large enough request buffer above we need to
522                          * reallocate it here to hold the actual LOV EA. */
523                         if (it->it_op & IT_OPEN) {
524                                 int offset = DLM_INTENT_REC_OFF + 2;
525                                 void *lmm;
526
527                                 if (lustre_msg_buflen(req->rq_reqmsg, offset) <
528                                     body->eadatasize)
529                                         mdc_realloc_openmsg(req, body);
530
531                                 lmm = lustre_msg_buf(req->rq_reqmsg, offset,
532                                                      body->eadatasize);
533                                 if (lmm)
534                                         memcpy(lmm, eadata, body->eadatasize);
535                         }
536                 }
537         }
538
539         RETURN(rc);
540 }
541
542 /* We always reserve enough space in the reply packet for a stripe MD, because
543  * we don't know in advance the file type. */
544 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
545                 struct lookup_intent *it, struct mdc_op_data *data,
546                 struct lustre_handle *lockh, void *lmm, int lmmsize,
547                 int extra_lock_flags)
548 {
549         struct ptlrpc_request *req;
550         struct obd_device *obddev = class_exp2obd(exp);
551         struct ldlm_res_id res_id =
552                 { .name = {data->fid1.id, data->fid1.generation} };
553         ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
554         int flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
555         int rc;
556         ENTRY;
557
558         LASSERTF(einfo->ei_type == LDLM_IBITS,"lock type %d\n", einfo->ei_type);
559         if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
560                 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
561
562         if (it->it_op & IT_OPEN) {
563                 req = mdc_intent_open_pack(exp, it, data, lmm, lmmsize);
564                 if (it->it_flags & O_JOIN_FILE) {
565                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
566                 }
567         } else if (it->it_op & IT_UNLINK) {
568                 req = mdc_intent_unlink_pack(exp, it, data);
569         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
570                 req = mdc_intent_lookup_pack(exp, it, data);
571         } else if (it->it_op == IT_READDIR) {
572                 req = mdc_intent_readdir_pack(exp);
573         } else {
574                 CERROR("bad it_op %x\n", it->it_op);
575                 RETURN(-EINVAL);
576         }
577
578         if (!req)
579                 RETURN(-ENOMEM);
580
581          /* It is important to obtain rpc_lock first (if applicable), so that
582           * threads that are serialised with rpc_lock are not polluting our
583           * rpcs in flight counter */
584         mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
585         mdc_enter_request(&obddev->u.cli);
586         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL,
587                               0, NULL, lockh, 0);
588         mdc_exit_request(&obddev->u.cli);
589         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
590
591         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
592
593         RETURN(rc);
594 }
595 EXPORT_SYMBOL(mdc_enqueue);
596
597 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
598                         struct ll_fid *fid)
599 {
600                 /* We could just return 1 immediately, but since we should only
601                  * be called in revalidate_it if we already have a lock, let's
602                  * verify that. */
603         struct ldlm_res_id res_id = {.name ={fid->id, fid->generation}};
604         struct lustre_handle lockh;
605         ldlm_policy_data_t policy;
606         ldlm_mode_t mode;
607
608         /* As not all attributes are kept under update lock, e.g. 
609            owner/group/acls are under lookup lock, we need both 
610            ibits for GETATTR. */
611         policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
612                 MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP :
613                 MDS_INODELOCK_LOOKUP;
614
615         mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
616                                LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
617                                &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh);
618         if (mode) {
619                 memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh));
620                 it->d.lustre.it_lock_mode = mode;
621         }
622
623         return !!mode;
624 }
625 EXPORT_SYMBOL(mdc_revalidate_lock);
626
627 static int mdc_finish_intent_lock(struct obd_export *exp,
628                                   struct ptlrpc_request *req,
629                                   struct mdc_op_data *data,
630                                   struct lookup_intent *it,
631                                   struct lustre_handle *lockh)
632 {
633         struct mds_body *mds_body;
634         struct lustre_handle old_lock;
635         struct ldlm_lock *lock;
636         int rc;
637         ENTRY;
638
639         LASSERT(req != NULL);
640         LASSERT(req != LP_POISON);
641         LASSERT(req->rq_repmsg != LP_POISON);
642
643         if (!it_disposition(it, DISP_IT_EXECD)) {
644                 /* The server failed before it even started executing the
645                  * intent, i.e. because it couldn't unpack the request. */
646                 LASSERT(it->d.lustre.it_status != 0);
647                 RETURN(it->d.lustre.it_status);
648         }
649         rc = it_open_error(DISP_IT_EXECD, it);
650         if (rc)
651                 RETURN(rc);
652
653         mds_body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF,
654                                   sizeof(*mds_body));
655         /* mdc_enqueue checked */
656         LASSERT(mds_body != NULL);
657         /* mdc_enqueue swabbed */
658         LASSERT(lustre_rep_swabbed(req, DLM_REPLY_REC_OFF));
659
660         /* If we were revalidating a fid/name pair, mark the intent in
661          * case we fail and get called again from lookup */
662         if (data->fid2.id && (it->it_op != IT_GETATTR)) {
663                 it_set_disposition(it, DISP_ENQ_COMPLETE);
664                 /* Also: did we find the same inode? */
665                 if (memcmp(&data->fid2, &mds_body->fid1, sizeof(data->fid2))) 
666                         RETURN(-ESTALE);
667         }
668
669         rc = it_open_error(DISP_LOOKUP_EXECD, it);
670         if (rc)
671                 RETURN(rc);
672
673         /* keep requests around for the multiple phases of the call
674          * this shows the DISP_XX must guarantee we make it into the call
675          */
676         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
677             it_disposition(it, DISP_OPEN_CREATE) &&
678             !it_open_error(DISP_OPEN_CREATE, it)) {
679                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
680                 ptlrpc_request_addref(req); /* balanced in ll_create_node */
681         }
682         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
683             it_disposition(it, DISP_OPEN_OPEN) &&
684             !it_open_error(DISP_OPEN_OPEN, it)) {
685                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
686                 ptlrpc_request_addref(req); /* balanced in ll_file_open */
687                 /* BUG 11546 - eviction in the middle of open rpc processing */
688                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
689         }
690
691         if (it->it_op & IT_CREAT) {
692                 /* XXX this belongs in ll_create_it */
693         } else if (it->it_op == IT_OPEN) {
694                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
695         } else {
696                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
697         }
698
699         /* If we already have a matching lock, then cancel the new
700          * one.  We have to set the data here instead of in
701          * mdc_enqueue, because we need to use the child's inode as
702          * the l_ast_data to match, and that's not available until
703          * intent_finish has performed the iget().) */
704         lock = ldlm_handle2lock(lockh);
705         if (lock) {
706                 ldlm_policy_data_t policy = lock->l_policy_data;
707
708                 LDLM_DEBUG(lock, "matching against this");
709                 LDLM_LOCK_PUT(lock);
710                 memcpy(&old_lock, lockh, sizeof(*lockh));
711                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
712                                     LDLM_IBITS, &policy, LCK_NL, &old_lock)) {
713                         ldlm_lock_decref_and_cancel(lockh,
714                                                     it->d.lustre.it_lock_mode);
715                         memcpy(lockh, &old_lock, sizeof(old_lock));
716                         memcpy(&it->d.lustre.it_lock_handle, lockh,
717                                sizeof(*lockh));
718                 }
719         }
720
721         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
722                data->namelen, data->name, ldlm_it2str(it->it_op),
723                it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
724         RETURN(rc);
725 }
726
727 /* 
728  * This long block is all about fixing up the lock and request state
729  * so that it is correct as of the moment _before_ the operation was
730  * applied; that way, the VFS will think that everything is normal and
731  * call Lustre's regular VFS methods.
732  *
733  * If we're performing a creation, that means that unless the creation
734  * failed with EEXIST, we should fake up a negative dentry.
735  *
736  * For everything else, we want to lookup to succeed.
737  *
738  * One additional note: if CREATE or OPEN succeeded, we add an extra
739  * reference to the request because we need to keep it around until
740  * ll_create/ll_open gets called.
741  *
742  * The server will return to us, in it_disposition, an indication of
743  * exactly what d.lustre.it_status refers to.
744  *
745  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
746  * otherwise if DISP_OPEN_CREATE is set, then it status is the
747  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
748  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
749  * was successful.
750  *
751  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
752  * child lookup.
753  */
754 int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data,
755                     void *lmm, int lmmsize, struct lookup_intent *it,
756                     int lookup_flags, struct ptlrpc_request **reqp,
757                     ldlm_blocking_callback cb_blocking, int extra_lock_flags)
758 {
759         struct lustre_handle lockh;
760         int rc;
761         ENTRY;
762
763         LASSERT(it);
764
765         CDEBUG(D_DLMTRACE,"name: %.*s in inode "LPU64", intent: %s flags %#o\n",
766                op_data->namelen, op_data->name, op_data->fid1.id,
767                ldlm_it2str(it->it_op), it->it_flags);
768
769         if (op_data->fid2.id &&
770             (it->it_op == IT_LOOKUP || it->it_op == IT_GETATTR)) {
771                 rc = mdc_revalidate_lock(exp, it, &op_data->fid2);
772                 /* Only return failure if it was not GETATTR by cfid
773                    (from inode_revalidate) */
774                 if (rc || op_data->namelen != 0)
775                         RETURN(rc);
776         }
777
778         /* lookup_it may be called only after revalidate_it has run, because
779          * revalidate_it cannot return errors, only zero.  Returning zero causes
780          * this call to lookup, which *can* return an error.
781          *
782          * We only want to execute the request associated with the intent one
783          * time, however, so don't send the request again.  Instead, skip past
784          * this and use the request from revalidate.  In this case, revalidate
785          * never dropped its reference, so the refcounts are all OK */
786         if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
787                 struct ldlm_enqueue_info einfo =
788                         { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
789                           ldlm_completion_ast, NULL, NULL };
790
791                 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
792                                  lmm, lmmsize, extra_lock_flags);
793                 if (rc < 0)
794                         RETURN(rc);
795                 memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh));
796         } else if (!op_data->fid2.id) {
797                 /* DISP_ENQ_COMPLETE set means there is extra reference on
798                  * request referenced from this intent, saved for subsequent
799                  * lookup.  This path is executed when we proceed to this
800                  * lookup, so we clear DISP_ENQ_COMPLETE */
801                 it_clear_disposition(it, DISP_ENQ_COMPLETE);
802         }
803
804         *reqp = it->d.lustre.it_data;
805         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
806
807         RETURN(rc);
808 }
809 EXPORT_SYMBOL(mdc_intent_lock);
810
811 static int mdc_intent_getattr_async_interpret(struct ptlrpc_request *req,
812                                               void *unused, int rc)
813 {
814         struct mdc_enqueue_args  *ma;
815         struct md_enqueue_info   *minfo;
816         struct ldlm_enqueue_info *einfo;
817         struct obd_export        *exp;
818         struct lookup_intent     *it;
819         struct lustre_handle     *lockh;
820         struct obd_device        *obddev;
821         int                       flags = LDLM_FL_HAS_INTENT;
822         ENTRY;
823
824         ma = (struct mdc_enqueue_args *)&req->rq_async_args;
825         minfo = ma->ma_mi;
826         einfo = ma->ma_ei;
827
828         exp   = minfo->mi_exp;
829         it    = &minfo->mi_it;
830         lockh = &minfo->mi_lockh;
831
832         obddev = class_exp2obd(exp);
833
834         mdc_exit_request(&obddev->u.cli);
835
836         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
837                                    &flags, NULL, 0, NULL, lockh, rc);
838
839         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
840         if (rc)
841                 GOTO(out, rc);
842
843         memcpy(&it->d.lustre.it_lock_handle, lockh, sizeof(*lockh));
844
845         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
846         GOTO(out, rc);
847 out:
848         OBD_FREE_PTR(einfo);
849         minfo->mi_cb(exp, req, minfo, rc);
850
851         return 0;
852 }
853
854 int mdc_intent_getattr_async(struct obd_export *exp,
855                              struct md_enqueue_info *minfo,
856                              struct ldlm_enqueue_info *einfo)
857 {
858         struct mdc_op_data      *op_data = &minfo->mi_data;
859         struct lookup_intent    *it = &minfo->mi_it;
860         struct ptlrpc_request   *req;
861         struct obd_device       *obddev = class_exp2obd(exp);
862         struct ldlm_res_id       res_id = {
863                                         .name = {op_data->fid1.id,
864                                                  op_data->fid1.generation}
865                                  };
866         ldlm_policy_data_t       policy = {
867                                         .l_inodebits = { MDS_INODELOCK_LOOKUP }
868                                  };
869         struct mdc_enqueue_args *aa;
870         int                      rc;
871         int                      flags = LDLM_FL_HAS_INTENT;
872         ENTRY;
873
874         CDEBUG(D_DLMTRACE,"name: %.*s in inode "LPU64", intent: %s flags %#o\n",
875                op_data->namelen, op_data->name, op_data->fid1.id,
876                ldlm_it2str(it->it_op), it->it_flags);
877
878         req = mdc_intent_lookup_pack(exp, it, op_data);
879         if (!req)
880                 RETURN(-ENOMEM);
881
882         mdc_enter_request(&obddev->u.cli);
883         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL,
884                               0, NULL, &minfo->mi_lockh, 1);
885         if (rc < 0) {
886                 mdc_exit_request(&obddev->u.cli);
887                 RETURN(rc);
888         }
889
890         CLASSERT(sizeof(*aa) < sizeof(req->rq_async_args));
891         aa = (struct mdc_enqueue_args *)&req->rq_async_args;
892         aa->ma_mi = minfo;
893         aa->ma_ei = einfo;
894         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
895         ptlrpcd_add_req(req);
896
897         RETURN(0);
898 }
899 EXPORT_SYMBOL(mdc_intent_getattr_async);