Whamcloud - gitweb
Land b1_8_gate onto b1_8 (20081218_1708)
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_MDC
41
42 #ifdef __KERNEL__
43 # include <linux/module.h>
44 # include <linux/pagemap.h>
45 # include <linux/miscdevice.h>
46 # include <linux/init.h>
47 #else
48 # include <liblustre.h>
49 #endif
50
51 #include <obd_class.h>
52 #include <lustre_dlm.h>
53 #include <lprocfs_status.h>
54 #include "mdc_internal.h"
55
56 int it_disposition(struct lookup_intent *it, int flag)
57 {
58         return it->d.lustre.it_disposition & flag;
59 }
60 EXPORT_SYMBOL(it_disposition);
61
62 void it_set_disposition(struct lookup_intent *it, int flag)
63 {
64         it->d.lustre.it_disposition |= flag;
65 }
66 EXPORT_SYMBOL(it_set_disposition);
67
68 void it_clear_disposition(struct lookup_intent *it, int flag)
69 {
70         it->d.lustre.it_disposition &= ~flag;
71 }
72 EXPORT_SYMBOL(it_clear_disposition);
73
74 int it_open_error(int phase, struct lookup_intent *it)
75 {
76         if (it_disposition(it, DISP_OPEN_OPEN)) {
77                 if (phase >= DISP_OPEN_OPEN)
78                         return it->d.lustre.it_status;
79                 else
80                         return 0;
81         }
82
83         if (it_disposition(it, DISP_OPEN_CREATE)) {
84                 if (phase >= DISP_OPEN_CREATE)
85                         return it->d.lustre.it_status;
86                 else
87                         return 0;
88         }
89
90         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
91                 if (phase >= DISP_LOOKUP_EXECD)
92                         return it->d.lustre.it_status;
93                 else
94                         return 0;
95         }
96
97         if (it_disposition(it, DISP_IT_EXECD)) {
98                 if (phase >= DISP_IT_EXECD)
99                         return it->d.lustre.it_status;
100                 else
101                         return 0;
102         }
103         CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
104                it->d.lustre.it_status);
105         LBUG();
106         return 0;
107 }
108 EXPORT_SYMBOL(it_open_error);
109
110 /* this must be called on a lockh that is known to have a referenced lock */
111 void mdc_set_lock_data(__u64 *l, void *data)
112 {
113         struct ldlm_lock *lock;
114         struct lustre_handle *lockh = (struct lustre_handle *)l;
115         ENTRY;
116
117         if (!*l) {
118                 EXIT;
119                 return;
120         }
121
122         lock = ldlm_handle2lock(lockh);
123
124         LASSERT(lock != NULL);
125         lock_res_and_lock(lock);
126 #ifdef __KERNEL__
127         if (lock->l_ast_data && lock->l_ast_data != data) {
128                 struct inode *new_inode = data;
129                 struct inode *old_inode = lock->l_ast_data;
130                 LASSERTF(old_inode->i_state & I_FREEING,
131                          "Found existing inode %p/%lu/%u state %lu in lock: "
132                          "setting data to %p/%lu/%u\n", old_inode,
133                          old_inode->i_ino, old_inode->i_generation,
134                          old_inode->i_state,
135                          new_inode, new_inode->i_ino, new_inode->i_generation);
136         }
137 #endif
138         lock->l_ast_data = data;
139         unlock_res_and_lock(lock);
140         LDLM_LOCK_PUT(lock);
141
142         EXIT;
143 }
144 EXPORT_SYMBOL(mdc_set_lock_data);
145
146 int mdc_change_cbdata(struct obd_export *exp, struct ll_fid *fid, 
147                       ldlm_iterator_t it, void *data)
148 {
149         struct ldlm_res_id res_id;
150         ENTRY;
151
152         fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
153         ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
154                               it, data);
155
156         EXIT;
157         return 0;
158 }
159
160 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
161 {
162         /* Don't hold error requests for replay. */
163         if (req->rq_replay) {
164                 spin_lock(&req->rq_lock);
165                 req->rq_replay = 0;
166                 spin_unlock(&req->rq_lock);
167         }
168         if (rc && req->rq_transno != 0) {
169                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
170                 LBUG();
171         }
172 }
173
174 static int round_up(int val)
175 {
176         int ret = 1;
177         while (val) {
178                 val >>= 1;
179                 ret <<= 1;
180         }
181         return ret;
182 }
183
184 /* Save a large LOV EA into the request buffer so that it is available
185  * for replay.  We don't do this in the initial request because the
186  * original request doesn't need this buffer (at most it sends just the
187  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
188  * buffer and may also be difficult to allocate and save a very large
189  * request buffer for each open. (bug 5707)
190  *
191  * OOM here may cause recovery failure if lmm is needed (only for the
192  * original open if the MDS crashed just when this client also OOM'd)
193  * but this is incredibly unlikely, and questionable whether the client
194  * could do MDS recovery under OOM anyways... */
195 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
196                                 struct mds_body *body)
197 {
198         int old_len, new_size, old_size;
199         struct lustre_msg *old_msg = req->rq_reqmsg;
200         struct lustre_msg *new_msg;
201
202         old_len = lustre_msg_buflen(old_msg, DLM_INTENT_REC_OFF + 2);
203         old_size = lustre_packed_msg_size(old_msg);
204         lustre_msg_set_buflen(old_msg, DLM_INTENT_REC_OFF + 2,
205                               body->eadatasize);
206         new_size = lustre_packed_msg_size(old_msg);
207
208         OBD_ALLOC(new_msg, new_size);
209         if (new_msg != NULL) {
210                 DEBUG_REQ(D_INFO, req, "replace reqmsg for larger EA %u",
211                           body->eadatasize);
212                 memcpy(new_msg, old_msg, old_size);
213
214                 spin_lock(&req->rq_lock);
215                 req->rq_reqmsg = new_msg;
216                 req->rq_reqlen = new_size;
217                 spin_unlock(&req->rq_lock);
218
219                 OBD_FREE(old_msg, old_size);
220         } else {
221                 lustre_msg_set_buflen(old_msg, DLM_INTENT_REC_OFF + 2, old_len);
222                 body->valid &= ~OBD_MD_FLEASIZE;
223                 body->eadatasize = 0;
224         }
225 }
226
227 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
228                                                    struct lookup_intent *it,
229                                                    struct mdc_op_data *data,
230                                                    void *lmm, __u32 lmmsize)
231 {
232         struct ptlrpc_request *req;
233         struct ldlm_intent *lit;
234         struct obd_device *obddev = class_exp2obd(exp);
235         __u32 size[9] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
236                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
237                         [DLM_INTENT_IT_OFF]   = sizeof(*lit),
238                         [DLM_INTENT_REC_OFF]  = sizeof(struct mds_rec_create),
239                         [DLM_INTENT_REC_OFF+1]= data->namelen + 1,
240                         /* As an optimization, we allocate an RPC request buffer
241                          * for at least a default-sized LOV EA even if we aren't
242                          * sending one.  We grow the whole request to the next
243                          * power-of-two size since we get that much from a slab
244                          * allocation anyways. This avoids an allocation below
245                          * in the common case where we need to save a
246                          * default-sized LOV EA for open replay. */
247                         [DLM_INTENT_REC_OFF+2]= max(lmmsize,
248                                          obddev->u.cli.cl_default_mds_easize) };
249         __u32 repsize[7] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
250                            [DLM_LOCKREPLY_OFF]   = sizeof(struct ldlm_reply),
251                            [DLM_REPLY_REC_OFF]   = sizeof(struct mdt_body),
252                            [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
253                                                         cl_max_mds_easize,
254                            [DLM_REPLY_REC_OFF+2] = LUSTRE_POSIX_ACL_MAX_SIZE };
255         CFS_LIST_HEAD(cancels);
256         int do_join = (it->it_flags & O_JOIN_FILE) && data->data;
257         int count = 0;
258         int bufcount = 6;
259         int repbufcount = 5;
260         int mode;
261         int rc;
262         ENTRY;
263
264         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
265         if (mdc_exp_is_2_0_server(exp)) {
266                 size[DLM_INTENT_REC_OFF] = sizeof(struct mdt_rec_create);
267                 size[DLM_INTENT_REC_OFF+4] = size[DLM_INTENT_REC_OFF+2];
268                 size[DLM_INTENT_REC_OFF+3] = size[DLM_INTENT_REC_OFF+1];
269                 size[DLM_INTENT_REC_OFF+2] = 0; /* capa */
270                 size[DLM_INTENT_REC_OFF+1] = 0; /* capa */
271                 bufcount = 8;
272                 repsize[DLM_REPLY_REC_OFF+3]=sizeof(struct lustre_capa);
273                 repsize[DLM_REPLY_REC_OFF+4]=sizeof(struct lustre_capa); 
274                 repbufcount = 7;
275         }
276         rc = lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic,
277                              bufcount, size);
278         if (rc & (rc - 1))
279                 size[bufcount - 1] = min(size[bufcount - 1] + round_up(rc) - rc,
280                                          obddev->u.cli.cl_max_mds_easize);
281
282         /* If inode is known, cancel conflicting OPEN locks. */
283         if (data->fid2.id) {
284                 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
285                         mode = LCK_CW;
286 #ifdef FMODE_EXEC
287                 else if (it->it_flags & FMODE_EXEC)
288                         mode = LCK_PR;
289 #endif
290                 else
291                         mode = LCK_CR;
292                 count = mdc_resource_get_unused(exp, &data->fid2, &cancels,
293                                                 mode, MDS_INODELOCK_OPEN);
294         }
295
296         /* If CREATE or JOIN_FILE, cancel parent's UPDATE lock. */
297         if (it->it_op & IT_CREAT || do_join)
298                 mode = LCK_EX;
299         else
300                 mode = LCK_CR;
301         count += mdc_resource_get_unused(exp, &data->fid1, &cancels, mode,
302                                          MDS_INODELOCK_UPDATE);
303         if (do_join) {
304                 __u64 head_size = (*(__u64 *)data->data);
305                 /* join is like an unlink of the tail */
306                 if (mdc_exp_is_2_0_server(exp)) {
307                         size[DLM_INTENT_REC_OFF+5]=sizeof(struct mdt_rec_join);
308                 } else {
309                         size[DLM_INTENT_REC_OFF+3]=sizeof(struct mds_rec_join);
310                 }
311                 bufcount++;
312
313                 req = ldlm_prep_enqueue_req(exp, bufcount, size,&cancels,count);
314                 if (req)
315                         mdc_join_pack(req, bufcount - 1, data, head_size);
316         } else {
317                 req = ldlm_prep_enqueue_req(exp, bufcount, size,&cancels,count);
318                 it->it_flags &= ~O_JOIN_FILE;
319         }
320
321         if (req) {
322                 spin_lock(&req->rq_lock);
323                 req->rq_replay = 1;
324                 spin_unlock(&req->rq_lock);
325
326                 /* pack the intent */
327                 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
328                                      sizeof(*lit));
329                 lit->opc = (__u64)it->it_op;
330
331                 /* pack the intended request */
332                 mdc_open_pack(req, DLM_INTENT_REC_OFF, data,
333                               it->it_create_mode, 0, it->it_flags,
334                               lmm, lmmsize);
335
336                 ptlrpc_req_set_repsize(req, repbufcount, repsize);
337         }
338         RETURN(req);
339 }
340
341 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
342                                                      struct lookup_intent *it,
343                                                      struct mdc_op_data *data)
344 {
345         struct ptlrpc_request *req;
346         struct ldlm_intent *lit;
347         struct obd_device *obddev = class_exp2obd(exp);
348         __u32 size[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
349                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
350                         [DLM_INTENT_IT_OFF]   = sizeof(*lit),
351                         [DLM_INTENT_REC_OFF]  = mdc_exp_is_2_0_server(exp) ?
352                                                 sizeof(struct mdt_rec_unlink) :
353                                                 sizeof(struct mds_rec_unlink),
354                         [DLM_INTENT_REC_OFF+1]= data->namelen + 1 };
355         __u32 repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
356                            [DLM_LOCKREPLY_OFF]   = sizeof(struct ldlm_reply),
357                            [DLM_REPLY_REC_OFF]   = sizeof(struct mdt_body),
358                            [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
359                                                         cl_max_mds_easize,
360                            [DLM_REPLY_REC_OFF+2] = obddev->u.cli.
361                                                         cl_max_mds_cookiesize };
362         ENTRY;
363
364         req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0);
365         if (req) {
366                 /* pack the intent */
367                 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
368                                      sizeof(*lit));
369                 lit->opc = (__u64)it->it_op;
370
371                 /* pack the intended request */
372                 mdc_unlink_pack(req, DLM_INTENT_REC_OFF, data);
373
374                 ptlrpc_req_set_repsize(req, 5, repsize);
375         }
376         RETURN(req);
377 }
378
379 static struct ptlrpc_request *mdc_intent_lookup_pack(struct obd_export *exp,
380                                                      struct lookup_intent *it,
381                                                      struct mdc_op_data *data)
382 {
383         struct ptlrpc_request *req;
384         struct ldlm_intent *lit;
385         struct obd_device *obddev = class_exp2obd(exp);
386         __u32 size[6] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
387                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
388                         [DLM_INTENT_IT_OFF]   = sizeof(*lit),
389                         [DLM_INTENT_REC_OFF]  = sizeof(struct mdt_body),
390                         [DLM_INTENT_REC_OFF+1]= data->namelen + 1,
391                         [DLM_INTENT_REC_OFF+2]= 0 };
392         __u32 repsize[6] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
393                            [DLM_LOCKREPLY_OFF]   = sizeof(struct ldlm_reply),
394                            [DLM_REPLY_REC_OFF]   = sizeof(struct mdt_body),
395                            [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
396                                                         cl_max_mds_easize,
397                            [DLM_REPLY_REC_OFF+2] = LUSTRE_POSIX_ACL_MAX_SIZE,
398                            [DLM_REPLY_REC_OFF+3] = 0 };
399         obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | OBD_MD_FLACL |
400                           OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA;
401         int bufcount = 5;
402         ENTRY;
403
404         if (mdc_exp_is_2_0_server(exp)) {
405                 size[DLM_INTENT_REC_OFF+1] = 0; /* capa */
406                 size[DLM_INTENT_REC_OFF+2] = data->namelen + 1;
407                 bufcount = 6;
408         }
409         req = ldlm_prep_enqueue_req(exp, bufcount, size, NULL, 0);
410         if (req) {
411                 /* pack the intent */
412                 lit = lustre_msg_buf(req->rq_reqmsg, DLM_INTENT_IT_OFF,
413                                      sizeof(*lit));
414                 lit->opc = (__u64)it->it_op;
415
416                 /* pack the intended request */
417                 mdc_getattr_pack(req, DLM_INTENT_REC_OFF, valid, it->it_flags,
418                                  data);
419                 ptlrpc_req_set_repsize(req, bufcount, repsize);
420         }
421         RETURN(req);
422 }
423
424 static struct ptlrpc_request *mdc_intent_readdir_pack(struct obd_export *exp)
425 {
426         struct ptlrpc_request *req;
427         __u32 size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
428                         [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request) };
429         __u32 repsize[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
430                            [DLM_LOCKREPLY_OFF]   = sizeof(struct ldlm_reply),
431                            [DLM_REPLY_REC_OFF] = sizeof(struct ost_lvb) };
432         ENTRY;
433
434         req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
435         if (req)
436                 ptlrpc_req_set_repsize(req, 3, repsize);
437         RETURN(req);
438 }
439
440 static int mdc_finish_enqueue(struct obd_export *exp,
441                               struct ptlrpc_request *req,
442                               struct ldlm_enqueue_info *einfo,
443                               struct lookup_intent *it,
444                               struct lustre_handle *lockh,
445                               int rc)
446 {
447         struct ldlm_request *lockreq;
448         struct ldlm_reply *lockrep;
449         ENTRY;
450
451         LASSERT(rc >= 0);
452         /* Similarly, if we're going to replay this request, we don't want to
453          * actually get a lock, just perform the intent. */
454         if (req->rq_transno || req->rq_replay) {
455                 lockreq = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF,
456                                          sizeof(*lockreq));
457                 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
458         }
459
460         if (rc == ELDLM_LOCK_ABORTED) {
461                 einfo->ei_mode = 0;
462                 memset(lockh, 0, sizeof(*lockh));
463                 rc = 0;
464         } else { /* rc = 0 */
465                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
466                 LASSERT(lock);
467
468                 /* If the server gave us back a different lock mode, we should
469                  * fix up our variables. */
470                 if (lock->l_req_mode != einfo->ei_mode) {
471                         ldlm_lock_addref(lockh, lock->l_req_mode);
472                         ldlm_lock_decref(lockh, einfo->ei_mode);
473                         einfo->ei_mode = lock->l_req_mode;
474                 }
475                 LDLM_LOCK_PUT(lock);
476         }
477
478         lockrep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
479                                  sizeof(*lockrep));
480         LASSERT(lockrep != NULL);  /* checked by ldlm_cli_enqueue() */
481         /* swabbed by ldlm_cli_enqueue() */
482         LASSERT(lustre_rep_swabbed(req, DLM_LOCKREPLY_OFF));
483
484         it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
485         it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
486         it->d.lustre.it_lock_mode = einfo->ei_mode;
487         it->d.lustre.it_data = req;
488
489         if (it->d.lustre.it_status < 0 && req->rq_replay)
490                 mdc_clear_replay_flag(req, it->d.lustre.it_status);
491
492         /* If we're doing an IT_OPEN which did not result in an actual
493          * successful open, then we need to remove the bit which saves
494          * this request for unconditional replay.
495          *
496          * It's important that we do this first!  Otherwise we might exit the
497          * function without doing so, and try to replay a failed create
498          * (bug 3440) */
499         if ((it->it_op & IT_OPEN) &&
500             req->rq_replay &&
501             (!it_disposition(it, DISP_OPEN_OPEN) ||
502              it->d.lustre.it_status != 0))
503                 mdc_clear_replay_flag(req, it->d.lustre.it_status);
504
505         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
506                   it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
507
508         /* We know what to expect, so we do any byte flipping required here */
509         if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
510                 struct mds_body *body;
511
512                 body = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF, sizeof(*body),
513                                          lustre_swab_mds_body);
514                 if (body == NULL) {
515                         CERROR ("Can't swab mds_body\n");
516                         RETURN (-EPROTO);
517                 }
518
519                 /* If this is a successful OPEN request, we need to set
520                    replay handler and data early, so that if replay happens
521                    immediately after swabbing below, new reply is swabbed
522                    by that handler correctly */
523                 if (it_disposition(it, DISP_OPEN_OPEN) &&
524                     !it_open_error(DISP_OPEN_OPEN, it))
525                         mdc_set_open_replay_data(NULL, req);
526
527                 if ((body->valid & OBD_MD_FLEASIZE) != 0) {
528                         void *eadata;
529
530                         /* The eadata is opaque; just check that it is there.
531                          * Eventually, obd_unpackmd() will check the contents */
532                         eadata = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF + 1,
533                                                     body->eadatasize, NULL);
534                         if (eadata == NULL) {
535                                 CERROR ("Missing/short eadata\n");
536                                 RETURN (-EPROTO);
537                         }
538                         /* We save the reply LOV EA in case we have to replay
539                          * a create for recovery.  If we didn't allocate a
540                          * large enough request buffer above we need to
541                          * reallocate it here to hold the actual LOV EA. */
542                         if (it->it_op & IT_OPEN) {
543                                 int offset = DLM_INTENT_REC_OFF + 2;
544                                 void *lmm;
545
546                                 if (lustre_msg_buflen(req->rq_reqmsg, offset) <
547                                     body->eadatasize)
548                                         mdc_realloc_openmsg(req, body);
549
550                                 lmm = lustre_msg_buf(req->rq_reqmsg, offset,
551                                                      body->eadatasize);
552                                 if (lmm)
553                                         memcpy(lmm, eadata, body->eadatasize);
554                         }
555                 }
556         }
557
558         RETURN(rc);
559 }
560
561 /* We always reserve enough space in the reply packet for a stripe MD, because
562  * we don't know in advance the file type. */
563 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
564                 struct lookup_intent *it, struct mdc_op_data *data,
565                 struct lustre_handle *lockh, void *lmm, int lmmsize,
566                 int extra_lock_flags)
567 {
568         struct ptlrpc_request *req;
569         struct obd_device *obddev = class_exp2obd(exp);
570         struct ldlm_res_id res_id;
571         ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
572         int flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
573         int rc;
574         ENTRY;
575
576         fid_build_reg_res_name((void *)&data->fid1, &res_id);
577         LASSERTF(einfo->ei_type == LDLM_IBITS,"lock type %d\n", einfo->ei_type);
578         if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
579                 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
580
581         if (it->it_op & IT_OPEN) {
582                 if ((it->it_op & IT_CREAT) && mdc_exp_is_2_0_server(exp)) {
583                         struct client_obd *cli = &obddev->u.cli;
584                         data->fid3 = data->fid2;
585                         rc = mdc_fid_alloc(cli->cl_seq, (void *)&data->fid2);
586                         if (rc) {
587                                 CERROR("fid allocation result: %d\n", rc);
588                                 RETURN(rc);
589                         }
590                 }
591                 req = mdc_intent_open_pack(exp, it, data, lmm, lmmsize);
592                 if (it->it_flags & O_JOIN_FILE) {
593                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
594                 }
595         } else if (it->it_op & IT_UNLINK) {
596                 req = mdc_intent_unlink_pack(exp, it, data);
597         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
598                 req = mdc_intent_lookup_pack(exp, it, data);
599         } else if (it->it_op == IT_READDIR) {
600                 req = mdc_intent_readdir_pack(exp);
601         } else {
602                 CERROR("bad it_op %x\n", it->it_op);
603                 RETURN(-EINVAL);
604         }
605
606         if (!req)
607                 RETURN(-ENOMEM);
608
609          /* It is important to obtain rpc_lock first (if applicable), so that
610           * threads that are serialised with rpc_lock are not polluting our
611           * rpcs in flight counter */
612         mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
613         mdc_enter_request(&obddev->u.cli);
614         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL,
615                               0, NULL, lockh, 0);
616         mdc_exit_request(&obddev->u.cli);
617         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
618         if (rc < 0) {
619                 CERROR("ldlm_cli_enqueue: %d\n", rc);
620                 mdc_clear_replay_flag(req, rc);
621                 ptlrpc_req_finished(req);
622                 RETURN(rc);
623         }
624         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
625
626         RETURN(rc);
627 }
628 EXPORT_SYMBOL(mdc_enqueue);
629
630 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
631                         struct ll_fid *fid)
632 {
633                 /* We could just return 1 immediately, but since we should only
634                  * be called in revalidate_it if we already have a lock, let's
635                  * verify that. */
636         struct ldlm_res_id res_id;
637         struct lustre_handle lockh;
638         ldlm_policy_data_t policy;
639         ldlm_mode_t mode;
640         ENTRY;
641
642         fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
643         /* As not all attributes are kept under update lock, e.g. 
644            owner/group/acls are under lookup lock, we need both 
645            ibits for GETATTR. */
646         policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
647                 MDS_INODELOCK_UPDATE | MDS_INODELOCK_LOOKUP :
648                 MDS_INODELOCK_LOOKUP;
649
650         mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
651                                LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
652                                &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh);
653         if (mode) {
654                 memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh));
655                 it->d.lustre.it_lock_mode = mode;
656         }
657
658         RETURN(!!mode);
659 }
660 EXPORT_SYMBOL(mdc_revalidate_lock);
661
662 static int mdc_finish_intent_lock(struct obd_export *exp,
663                                   struct ptlrpc_request *req,
664                                   struct mdc_op_data *data,
665                                   struct lookup_intent *it,
666                                   struct lustre_handle *lockh)
667 {
668         struct mds_body *mds_body;
669         struct lustre_handle old_lock;
670         struct ldlm_lock *lock;
671         int rc;
672         ENTRY;
673
674         LASSERT(req != NULL);
675         LASSERT(req != LP_POISON);
676         LASSERT(req->rq_repmsg != LP_POISON);
677
678         if (!it_disposition(it, DISP_IT_EXECD)) {
679                 /* The server failed before it even started executing the
680                  * intent, i.e. because it couldn't unpack the request. */
681                 LASSERT(it->d.lustre.it_status != 0);
682                 RETURN(it->d.lustre.it_status);
683         }
684         rc = it_open_error(DISP_IT_EXECD, it);
685         if (rc)
686                 RETURN(rc);
687
688         mds_body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF,
689                                   sizeof(*mds_body));
690         /* mdc_enqueue checked */
691         LASSERT(mds_body != NULL);
692         /* mdc_enqueue swabbed */
693         LASSERT(lustre_rep_swabbed(req, DLM_REPLY_REC_OFF));
694
695         /* If we were revalidating a fid/name pair, mark the intent in
696          * case we fail and get called again from lookup */
697
698         if (data->fid2.id && (it->it_op != IT_GETATTR) &&
699            ( !mdc_exp_is_2_0_server(exp) ||
700              (mdc_exp_is_2_0_server(exp) && (it->it_flags & O_CHECK_STALE)))) {
701                 it_set_disposition(it, DISP_ENQ_COMPLETE);
702
703                 /* Also: did we find the same inode? */
704                 if (memcmp(&data->fid2, &mds_body->fid1, sizeof(data->fid2)) &&
705                     memcmp(&data->fid3, &mds_body->fid1, sizeof(data->fid3)))
706                         RETURN(-ESTALE);
707         }
708
709         rc = it_open_error(DISP_LOOKUP_EXECD, it);
710         if (rc)
711                 RETURN(rc);
712
713         /* keep requests around for the multiple phases of the call
714          * this shows the DISP_XX must guarantee we make it into the call
715          */
716         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
717             it_disposition(it, DISP_OPEN_CREATE) &&
718             !it_open_error(DISP_OPEN_CREATE, it)) {
719                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
720                 ptlrpc_request_addref(req); /* balanced in ll_create_node */
721         }
722         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
723             it_disposition(it, DISP_OPEN_OPEN) &&
724             !it_open_error(DISP_OPEN_OPEN, it)) {
725                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
726                 ptlrpc_request_addref(req); /* balanced in ll_file_open */
727                 /* BUG 11546 - eviction in the middle of open rpc processing */
728                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
729         }
730
731         if (it->it_op & IT_CREAT) {
732                 /* XXX this belongs in ll_create_it */
733         } else if (it->it_op == IT_OPEN) {
734                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
735         } else {
736                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
737         }
738
739         /* If we already have a matching lock, then cancel the new
740          * one.  We have to set the data here instead of in
741          * mdc_enqueue, because we need to use the child's inode as
742          * the l_ast_data to match, and that's not available until
743          * intent_finish has performed the iget().) */
744         lock = ldlm_handle2lock(lockh);
745         if (lock) {
746                 ldlm_policy_data_t policy = lock->l_policy_data;
747
748                 LDLM_DEBUG(lock, "matching against this");
749                 LDLM_LOCK_PUT(lock);
750                 memcpy(&old_lock, lockh, sizeof(*lockh));
751                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
752                                     LDLM_IBITS, &policy, LCK_NL, &old_lock)) {
753                         ldlm_lock_decref_and_cancel(lockh,
754                                                     it->d.lustre.it_lock_mode);
755                         memcpy(lockh, &old_lock, sizeof(old_lock));
756                         memcpy(&it->d.lustre.it_lock_handle, lockh,
757                                sizeof(*lockh));
758                 }
759         }
760
761         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
762                data->namelen, data->name, ldlm_it2str(it->it_op),
763                it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
764         RETURN(rc);
765 }
766
767 /* 
768  * This long block is all about fixing up the lock and request state
769  * so that it is correct as of the moment _before_ the operation was
770  * applied; that way, the VFS will think that everything is normal and
771  * call Lustre's regular VFS methods.
772  *
773  * If we're performing a creation, that means that unless the creation
774  * failed with EEXIST, we should fake up a negative dentry.
775  *
776  * For everything else, we want to lookup to succeed.
777  *
778  * One additional note: if CREATE or OPEN succeeded, we add an extra
779  * reference to the request because we need to keep it around until
780  * ll_create/ll_open gets called.
781  *
782  * The server will return to us, in it_disposition, an indication of
783  * exactly what d.lustre.it_status refers to.
784  *
785  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
786  * otherwise if DISP_OPEN_CREATE is set, then it status is the
787  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
788  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
789  * was successful.
790  *
791  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
792  * child lookup.
793  */
794 int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data,
795                     void *lmm, int lmmsize, struct lookup_intent *it,
796                     int lookup_flags, struct ptlrpc_request **reqp,
797                     ldlm_blocking_callback cb_blocking, int extra_lock_flags)
798 {
799         struct lustre_handle lockh;
800         int rc;
801         ENTRY;
802
803         LASSERT(it);
804
805         CDEBUG(D_DLMTRACE,"name: %.*s("DFID") in inode ("DFID"), "
806                "intent: %s flags %#o\n",
807                op_data->namelen, op_data->name,
808                PFID(((void *)&op_data->fid2)),
809                PFID(((void *)&op_data->fid1)),
810                ldlm_it2str(it->it_op), it->it_flags);
811
812         lockh.cookie = 0;
813         if (op_data->fid2.id &&
814             (it->it_op == IT_LOOKUP || it->it_op == IT_GETATTR)) {
815                 rc = mdc_revalidate_lock(exp, it, &op_data->fid2);
816                 /* Only return failure if it was not GETATTR by cfid
817                    (from inode_revalidate) */
818                 if (rc || op_data->namelen != 0)
819                         RETURN(rc);
820         }
821
822         /* lookup_it may be called only after revalidate_it has run, because
823          * revalidate_it cannot return errors, only zero.  Returning zero causes
824          * this call to lookup, which *can* return an error.
825          *
826          * We only want to execute the request associated with the intent one
827          * time, however, so don't send the request again.  Instead, skip past
828          * this and use the request from revalidate.  In this case, revalidate
829          * never dropped its reference, so the refcounts are all OK */
830         if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
831                 struct ldlm_enqueue_info einfo =
832                         { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
833                           ldlm_completion_ast, NULL, NULL };
834
835                 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
836                                  lmm, lmmsize, extra_lock_flags);
837                 if (rc < 0)
838                         RETURN(rc);
839                 memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh));
840         } else if (!op_data->fid2.id) {
841                 /* DISP_ENQ_COMPLETE set means there is extra reference on
842                  * request referenced from this intent, saved for subsequent
843                  * lookup.  This path is executed when we proceed to this
844                  * lookup, so we clear DISP_ENQ_COMPLETE */
845                 it_clear_disposition(it, DISP_ENQ_COMPLETE);
846         }
847
848         *reqp = it->d.lustre.it_data;
849         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
850
851         RETURN(rc);
852 }
853 EXPORT_SYMBOL(mdc_intent_lock);
854
855 static int mdc_intent_getattr_async_interpret(struct ptlrpc_request *req,
856                                               void *unused, int rc)
857 {
858         struct obd_export        *exp = req->rq_async_args.pointer_arg[0];
859         struct md_enqueue_info   *minfo = req->rq_async_args.pointer_arg[1];
860         struct ldlm_enqueue_info *einfo = req->rq_async_args.pointer_arg[2];
861         struct lookup_intent     *it;
862         struct lustre_handle     *lockh;
863         struct obd_device        *obddev;
864         int                       flags = LDLM_FL_HAS_INTENT;
865         ENTRY;
866
867         it    = &minfo->mi_it;
868         lockh = &minfo->mi_lockh;
869
870         obddev = class_exp2obd(exp);
871
872         mdc_exit_request(&obddev->u.cli);
873         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
874                 rc = -ETIMEDOUT;
875
876         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
877                                    &flags, NULL, 0, NULL, lockh, rc);
878         if (rc < 0) {
879                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
880                 mdc_clear_replay_flag(req, rc);
881                 GOTO(out, rc);
882         }
883
884         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
885         if (rc)
886                 GOTO(out, rc);
887
888         memcpy(&it->d.lustre.it_lock_handle, lockh, sizeof(*lockh));
889
890         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
891         GOTO(out, rc);
892 out:
893         OBD_FREE_PTR(einfo);
894         minfo->mi_cb(exp, req, minfo, rc);
895
896         return 0;
897 }
898
899 int mdc_intent_getattr_async(struct obd_export *exp,
900                              struct md_enqueue_info *minfo,
901                              struct ldlm_enqueue_info *einfo)
902 {
903         struct mdc_op_data      *op_data = &minfo->mi_data;
904         struct lookup_intent    *it = &minfo->mi_it;
905         struct ptlrpc_request   *req;
906         struct obd_device       *obddev = class_exp2obd(exp);
907         struct ldlm_res_id res_id;
908         ldlm_policy_data_t       policy = {
909                                         .l_inodebits = { MDS_INODELOCK_LOOKUP }
910                                  };
911         int                      rc;
912         int                      flags = LDLM_FL_HAS_INTENT;
913         ENTRY;
914
915         CDEBUG(D_DLMTRACE,"name: %.*s in inode "LPU64", intent: %s flags %#o\n",
916                op_data->namelen, op_data->name, op_data->fid1.id,
917                ldlm_it2str(it->it_op), it->it_flags);
918
919         fid_build_reg_res_name((void *)&op_data->fid1, &res_id);
920         req = mdc_intent_lookup_pack(exp, it, op_data);
921         if (!req)
922                 RETURN(-ENOMEM);
923
924         mdc_enter_request(&obddev->u.cli);
925         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL,
926                               0, NULL, &minfo->mi_lockh, 1);
927         if (rc < 0) {
928                 mdc_exit_request(&obddev->u.cli);
929                 RETURN(rc);
930         }
931
932         req->rq_async_args.pointer_arg[0] = exp;
933         req->rq_async_args.pointer_arg[1] = minfo;
934         req->rq_async_args.pointer_arg[2] = einfo;
935         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
936         ptlrpcd_add_req(req);
937
938         RETURN(0);
939 }
940 EXPORT_SYMBOL(mdc_intent_getattr_async);