Whamcloud - gitweb
b=20433 decrease the usage of memory on clients.
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_MDC
41
42 #ifdef __KERNEL__
43 # include <linux/module.h>
44 # include <linux/pagemap.h>
45 # include <linux/miscdevice.h>
46 # include <linux/init.h>
47 #else
48 # include <liblustre.h>
49 #endif
50
51 #include <lustre_acl.h>
52 #include <obd_class.h>
53 #include <lustre_dlm.h>
54 /* fid_res_name_eq() */
55 #include <lustre_fid.h>
56 #include <lprocfs_status.h>
57 #include "mdc_internal.h"
58
59 int it_disposition(struct lookup_intent *it, int flag)
60 {
61         return it->d.lustre.it_disposition & flag;
62 }
63 EXPORT_SYMBOL(it_disposition);
64
65 void it_set_disposition(struct lookup_intent *it, int flag)
66 {
67         it->d.lustre.it_disposition |= flag;
68 }
69 EXPORT_SYMBOL(it_set_disposition);
70
71 void it_clear_disposition(struct lookup_intent *it, int flag)
72 {
73         it->d.lustre.it_disposition &= ~flag;
74 }
75 EXPORT_SYMBOL(it_clear_disposition);
76
77 int it_open_error(int phase, struct lookup_intent *it)
78 {
79         if (it_disposition(it, DISP_OPEN_OPEN)) {
80                 if (phase >= DISP_OPEN_OPEN)
81                         return it->d.lustre.it_status;
82                 else
83                         return 0;
84         }
85
86         if (it_disposition(it, DISP_OPEN_CREATE)) {
87                 if (phase >= DISP_OPEN_CREATE)
88                         return it->d.lustre.it_status;
89                 else
90                         return 0;
91         }
92
93         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
94                 if (phase >= DISP_LOOKUP_EXECD)
95                         return it->d.lustre.it_status;
96                 else
97                         return 0;
98         }
99
100         if (it_disposition(it, DISP_IT_EXECD)) {
101                 if (phase >= DISP_IT_EXECD)
102                         return it->d.lustre.it_status;
103                 else
104                         return 0;
105         }
106         CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
107                it->d.lustre.it_status);
108         LBUG();
109         return 0;
110 }
111 EXPORT_SYMBOL(it_open_error);
112
113 /* this must be called on a lockh that is known to have a referenced lock */
114 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
115                       __u32 *bits)
116 {
117         struct ldlm_lock *lock;
118         ENTRY;
119
120         if(bits)
121                 *bits = 0;
122
123         if (!*lockh) {
124                 EXIT;
125                 RETURN(0);
126         }
127
128         lock = ldlm_handle2lock((struct lustre_handle *)lockh);
129
130         LASSERT(lock != NULL);
131         lock_res_and_lock(lock);
132 #ifdef __KERNEL__
133         if (lock->l_ast_data && lock->l_ast_data != data) {
134                 struct inode *new_inode = data;
135                 struct inode *old_inode = lock->l_ast_data;
136                 LASSERTF(old_inode->i_state & I_FREEING,
137                          "Found existing inode %p/%lu/%u state %lu in lock: "
138                          "setting data to %p/%lu/%u\n", old_inode,
139                          old_inode->i_ino, old_inode->i_generation,
140                          old_inode->i_state,
141                          new_inode, new_inode->i_ino, new_inode->i_generation);
142         }
143 #endif
144         lock->l_ast_data = data;
145         if (bits)
146                 *bits = lock->l_policy_data.l_inodebits.bits;
147
148         unlock_res_and_lock(lock);
149         LDLM_LOCK_PUT(lock);
150
151         RETURN(0);
152 }
153
154 ldlm_mode_t mdc_lock_match(struct obd_export *exp, int flags,
155                            const struct lu_fid *fid, ldlm_type_t type,
156                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
157                            struct lustre_handle *lockh)
158 {
159         struct ldlm_res_id res_id;
160         ldlm_mode_t rc;
161         ENTRY;
162
163         fid_build_reg_res_name(fid, &res_id);
164         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
165                              &res_id, type, policy, mode, lockh, 0);
166         RETURN(rc);
167 }
168
169 int mdc_cancel_unused(struct obd_export *exp,
170                       const struct lu_fid *fid,
171                       ldlm_policy_data_t *policy,
172                       ldlm_mode_t mode, int flags, void *opaque)
173 {
174         struct ldlm_res_id res_id;
175         struct obd_device *obd = class_exp2obd(exp);
176         int rc;
177
178         ENTRY;
179
180         fid_build_reg_res_name(fid, &res_id);
181         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
182                                              policy, mode, flags, opaque);
183         RETURN(rc);
184 }
185
186 int mdc_change_cbdata(struct obd_export *exp,
187                       const struct lu_fid *fid,
188                       ldlm_iterator_t it, void *data)
189 {
190         struct ldlm_res_id res_id;
191         ENTRY;
192
193         fid_build_reg_res_name(fid, &res_id);
194         ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace,
195                               &res_id, it, data);
196
197         EXIT;
198         return 0;
199 }
200
201 /* find any ldlm lock of the inode in mdc
202  * return 0    not find
203  *        1    find one
204  *      < 0    error */
205 int mdc_find_cbdata(struct obd_export *exp,
206                     const struct lu_fid *fid,
207                     ldlm_iterator_t it, void *data)
208 {
209         struct ldlm_res_id res_id;
210         int rc = 0;
211         ENTRY;
212
213         fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
214         rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
215                                    it, data);
216         if (rc == LDLM_ITER_STOP)
217                 RETURN(1);
218         else if (rc == LDLM_ITER_CONTINUE)
219                 RETURN(0);
220         RETURN(rc);
221 }
222
223 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
224 {
225         /* Don't hold error requests for replay. */
226         if (req->rq_replay) {
227                 cfs_spin_lock(&req->rq_lock);
228                 req->rq_replay = 0;
229                 cfs_spin_unlock(&req->rq_lock);
230         }
231         if (rc && req->rq_transno != 0) {
232                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
233                 LBUG();
234         }
235 }
236
237 /* Save a large LOV EA into the request buffer so that it is available
238  * for replay.  We don't do this in the initial request because the
239  * original request doesn't need this buffer (at most it sends just the
240  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
241  * buffer and may also be difficult to allocate and save a very large
242  * request buffer for each open. (bug 5707)
243  *
244  * OOM here may cause recovery failure if lmm is needed (only for the
245  * original open if the MDS crashed just when this client also OOM'd)
246  * but this is incredibly unlikely, and questionable whether the client
247  * could do MDS recovery under OOM anyways... */
248 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
249                                 struct mdt_body *body)
250 {
251         int     rc;
252
253         /* FIXME: remove this explicit offset. */
254         rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
255                                         body->eadatasize);
256         if (rc) {
257                 CERROR("Can't enlarge segment %d size to %d\n",
258                        DLM_INTENT_REC_OFF + 4, body->eadatasize);
259                 body->valid &= ~OBD_MD_FLEASIZE;
260                 body->eadatasize = 0;
261         }
262 }
263
264 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
265                                                    struct lookup_intent *it,
266                                                    struct md_op_data *op_data,
267                                                    void *lmm, int lmmsize,
268                                                    void *cb_data)
269 {
270         struct ptlrpc_request *req;
271         struct obd_device     *obddev = class_exp2obd(exp);
272         struct ldlm_intent    *lit;
273         CFS_LIST_HEAD(cancels);
274         int                    count = 0;
275         int                    mode;
276         int                    rc;
277         ENTRY;
278
279         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
280
281         /* XXX: openlock is not cancelled for cross-refs. */
282         /* If inode is known, cancel conflicting OPEN locks. */
283         if (fid_is_sane(&op_data->op_fid2)) {
284                 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
285                         mode = LCK_CW;
286 #ifdef FMODE_EXEC
287                 else if (it->it_flags & FMODE_EXEC)
288                         mode = LCK_PR;
289 #endif
290                 else
291                         mode = LCK_CR;
292                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
293                                                 &cancels, mode,
294                                                 MDS_INODELOCK_OPEN);
295         }
296
297         /* If CREATE, cancel parent's UPDATE lock. */
298         if (it->it_op & IT_CREAT)
299                 mode = LCK_EX;
300         else
301                 mode = LCK_CR;
302         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
303                                          &cancels, mode,
304                                          MDS_INODELOCK_UPDATE);
305
306         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
307                                    &RQF_LDLM_INTENT_OPEN);
308         if (req == NULL) {
309                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
310                 RETURN(ERR_PTR(-ENOMEM));
311         }
312
313         /* parent capability */
314         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
315         /* child capability, reserve the size according to parent capa, it will
316          * be filled after we get the reply */
317         mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
318
319         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
320                              op_data->op_namelen + 1);
321         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
322                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
323
324         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
325         if (rc) {
326                 ptlrpc_request_free(req);
327                 return NULL;
328         }
329
330         cfs_spin_lock(&req->rq_lock);
331         req->rq_replay = req->rq_import->imp_replayable;
332         cfs_spin_unlock(&req->rq_lock);
333
334         /* pack the intent */
335         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
336         lit->opc = (__u64)it->it_op;
337
338         /* pack the intended request */
339         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
340                       lmmsize);
341
342         /* for remote client, fetch remote perm for current user */
343         if (client_is_remote(exp))
344                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
345                                      sizeof(struct mdt_remote_perm));
346         ptlrpc_request_set_replen(req);
347         return req;
348 }
349
350 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
351                                                      struct lookup_intent *it,
352                                                      struct md_op_data *op_data)
353 {
354         struct ptlrpc_request *req;
355         struct obd_device     *obddev = class_exp2obd(exp);
356         struct ldlm_intent    *lit;
357         int                    rc;
358         ENTRY;
359
360         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
361                                    &RQF_LDLM_INTENT_UNLINK);
362         if (req == NULL)
363                 RETURN(ERR_PTR(-ENOMEM));
364
365         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
366         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
367                              op_data->op_namelen + 1);
368
369         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
370         if (rc) {
371                 ptlrpc_request_free(req);
372                 RETURN(ERR_PTR(rc));
373         }
374
375         /* pack the intent */
376         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
377         lit->opc = (__u64)it->it_op;
378
379         /* pack the intended request */
380         mdc_unlink_pack(req, op_data);
381
382         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
383                              obddev->u.cli.cl_max_mds_easize);
384         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
385                              obddev->u.cli.cl_max_mds_cookiesize);
386         ptlrpc_request_set_replen(req);
387         RETURN(req);
388 }
389
390 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
391                                                       struct lookup_intent *it,
392                                                       struct md_op_data *op_data)
393 {
394         struct ptlrpc_request *req;
395         struct obd_device     *obddev = class_exp2obd(exp);
396         obd_valid              valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
397                                        OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
398                                        OBD_MD_FLMDSCAPA | OBD_MD_MEA |
399                                        (client_is_remote(exp) ?
400                                                OBD_MD_FLRMTPERM : OBD_MD_FLACL);
401         struct ldlm_intent    *lit;
402         int                    rc;
403         ENTRY;
404
405         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
406                                    &RQF_LDLM_INTENT_GETATTR);
407         if (req == NULL)
408                 RETURN(ERR_PTR(-ENOMEM));
409
410         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
411         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
412                              op_data->op_namelen + 1);
413
414         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
415         if (rc) {
416                 ptlrpc_request_free(req);
417                 RETURN(ERR_PTR(rc));
418         }
419
420         /* pack the intent */
421         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
422         lit->opc = (__u64)it->it_op;
423
424         /* pack the intended request */
425         mdc_getattr_pack(req, valid, it->it_flags, op_data);
426
427         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
428                              obddev->u.cli.cl_max_mds_easize);
429         if (client_is_remote(exp))
430                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
431                                      sizeof(struct mdt_remote_perm));
432         ptlrpc_request_set_replen(req);
433         RETURN(req);
434 }
435
436 static struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp)
437 {
438         struct ptlrpc_request *req;
439         int rc;
440         ENTRY;
441
442         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
443         if (req == NULL)
444                 RETURN(ERR_PTR(-ENOMEM));
445
446         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
447         if (rc) {
448                 ptlrpc_request_free(req);
449                 RETURN(ERR_PTR(rc));
450         }
451
452         ptlrpc_request_set_replen(req);
453         RETURN(req);
454 }
455
456 static int mdc_finish_enqueue(struct obd_export *exp,
457                               struct ptlrpc_request *req,
458                               struct ldlm_enqueue_info *einfo,
459                               struct lookup_intent *it,
460                               struct lustre_handle *lockh,
461                               int rc)
462 {
463         struct req_capsule  *pill = &req->rq_pill;
464         struct ldlm_request *lockreq;
465         struct ldlm_reply   *lockrep;
466         ENTRY;
467
468         LASSERT(rc >= 0);
469         /* Similarly, if we're going to replay this request, we don't want to
470          * actually get a lock, just perform the intent. */
471         if (req->rq_transno || req->rq_replay) {
472                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
473                 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
474         }
475
476         if (rc == ELDLM_LOCK_ABORTED) {
477                 einfo->ei_mode = 0;
478                 memset(lockh, 0, sizeof(*lockh));
479                 rc = 0;
480         } else { /* rc = 0 */
481                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
482                 LASSERT(lock);
483
484                 /* If the server gave us back a different lock mode, we should
485                  * fix up our variables. */
486                 if (lock->l_req_mode != einfo->ei_mode) {
487                         ldlm_lock_addref(lockh, lock->l_req_mode);
488                         ldlm_lock_decref(lockh, einfo->ei_mode);
489                         einfo->ei_mode = lock->l_req_mode;
490                 }
491                 LDLM_LOCK_PUT(lock);
492         }
493
494         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
495         LASSERT(lockrep != NULL);                 /* checked by ldlm_cli_enqueue() */
496
497         it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
498         it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
499         it->d.lustre.it_lock_mode = einfo->ei_mode;
500         it->d.lustre.it_lock_handle = lockh->cookie;
501         it->d.lustre.it_data = req;
502
503         if (it->d.lustre.it_status < 0 && req->rq_replay)
504                 mdc_clear_replay_flag(req, it->d.lustre.it_status);
505
506         /* If we're doing an IT_OPEN which did not result in an actual
507          * successful open, then we need to remove the bit which saves
508          * this request for unconditional replay.
509          *
510          * It's important that we do this first!  Otherwise we might exit the
511          * function without doing so, and try to replay a failed create
512          * (bug 3440) */
513         if (it->it_op & IT_OPEN && req->rq_replay &&
514             (!it_disposition(it, DISP_OPEN_OPEN) ||it->d.lustre.it_status != 0))
515                 mdc_clear_replay_flag(req, it->d.lustre.it_status);
516
517         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
518                   it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
519
520         /* We know what to expect, so we do any byte flipping required here */
521         if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
522                 struct mdt_body *body;
523
524                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
525                 if (body == NULL) {
526                         CERROR ("Can't swab mdt_body\n");
527                         RETURN (-EPROTO);
528                 }
529
530                 if (it_disposition(it, DISP_OPEN_OPEN) &&
531                     !it_open_error(DISP_OPEN_OPEN, it)) {
532                         /*
533                          * If this is a successful OPEN request, we need to set
534                          * replay handler and data early, so that if replay
535                          * happens immediately after swabbing below, new reply
536                          * is swabbed by that handler correctly.
537                          */
538                         mdc_set_open_replay_data(NULL, NULL, req);
539                 }
540
541                 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
542                         void *eadata;
543
544                          mdc_update_max_ea_from_body(exp, body);
545
546                         /*
547                          * The eadata is opaque; just check that it is there.
548                          * Eventually, obd_unpackmd() will check the contents.
549                          */
550                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
551                                                               body->eadatasize);
552                         if (eadata == NULL)
553                                 RETURN(-EPROTO);
554
555                         /*
556                          * We save the reply LOV EA in case we have to replay a
557                          * create for recovery.  If we didn't allocate a large
558                          * enough request buffer above we need to reallocate it
559                          * here to hold the actual LOV EA.
560                          *
561                          * To not save LOV EA if request is not going to replay
562                          * (for example error one).
563                          */
564                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
565                                 void *lmm;
566                                 if (req_capsule_get_size(pill, &RMF_EADATA,
567                                                          RCL_CLIENT) <
568                                     body->eadatasize)
569                                         mdc_realloc_openmsg(req, body);
570                                 else
571                                         req_capsule_shrink(pill, &RMF_EADATA,
572                                                            body->eadatasize,
573                                                            RCL_CLIENT);
574
575                                 req_capsule_set_size(pill, &RMF_EADATA,
576                                                      RCL_CLIENT,
577                                                      body->eadatasize);
578
579                                 lmm = req_capsule_client_get(pill, &RMF_EADATA);
580                                 if (lmm)
581                                         memcpy(lmm, eadata, body->eadatasize);
582                         }
583                 }
584
585                 if (body->valid & OBD_MD_FLRMTPERM) {
586                         struct mdt_remote_perm *perm;
587
588                         LASSERT(client_is_remote(exp));
589                         perm = req_capsule_server_swab_get(pill, &RMF_ACL,
590                                                 lustre_swab_mdt_remote_perm);
591                         if (perm == NULL)
592                                 RETURN(-EPROTO);
593                 }
594                 if (body->valid & OBD_MD_FLMDSCAPA) {
595                         struct lustre_capa *capa, *p;
596
597                         capa = req_capsule_server_get(pill, &RMF_CAPA1);
598                         if (capa == NULL)
599                                 RETURN(-EPROTO);
600
601                         if (it->it_op & IT_OPEN) {
602                                 /* client fid capa will be checked in replay */
603                                 p = req_capsule_client_get(pill, &RMF_CAPA2);
604                                 LASSERT(p);
605                                 *p = *capa;
606                         }
607                 }
608                 if (body->valid & OBD_MD_FLOSSCAPA) {
609                         struct lustre_capa *capa;
610
611                         capa = req_capsule_server_get(pill, &RMF_CAPA2);
612                         if (capa == NULL)
613                                 RETURN(-EPROTO);
614                 }
615         }
616
617         RETURN(rc);
618 }
619
620 /* We always reserve enough space in the reply packet for a stripe MD, because
621  * we don't know in advance the file type. */
622 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
623                 struct lookup_intent *it, struct md_op_data *op_data,
624                 struct lustre_handle *lockh, void *lmm, int lmmsize,
625                 struct ptlrpc_request **reqp, int extra_lock_flags)
626 {
627         struct obd_device     *obddev = class_exp2obd(exp);
628         struct ptlrpc_request *req = NULL;
629         struct req_capsule    *pill;
630         int                    flags = extra_lock_flags;
631         int                    rc;
632         struct ldlm_res_id res_id;
633         static const ldlm_policy_data_t lookup_policy =
634                             { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
635         static const ldlm_policy_data_t update_policy =
636                             { .l_inodebits = { MDS_INODELOCK_UPDATE } };
637         ldlm_policy_data_t const *policy = &lookup_policy;
638         ENTRY;
639
640         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
641                  einfo->ei_type);
642
643         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
644
645         if (it)
646                 flags |= LDLM_FL_HAS_INTENT;
647         if (it && it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
648                 policy = &update_policy;
649
650         if (reqp)
651                 req = *reqp;
652
653         if (!it) {
654                 /* The only way right now is FLOCK, in this case we hide flock
655                    policy as lmm, but lmmsize is 0 */
656                 LASSERT(lmm && lmmsize == 0);
657                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
658                          einfo->ei_type);
659                 policy = (ldlm_policy_data_t *)lmm;
660                 res_id.name[3] = LDLM_FLOCK;
661         } else if (it->it_op & IT_OPEN) {
662                 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
663                                            einfo->ei_cbdata);
664                 policy = &update_policy;
665                 einfo->ei_cbdata = NULL;
666                 lmm = NULL;
667         } else if (it->it_op & IT_UNLINK)
668                 req = mdc_intent_unlink_pack(exp, it, op_data);
669         else if (it->it_op & (IT_GETATTR | IT_LOOKUP))
670                 req = mdc_intent_getattr_pack(exp, it, op_data);
671         else if (it->it_op == IT_READDIR)
672                 req = ldlm_enqueue_pack(exp);
673         else {
674                 LBUG();
675                 RETURN(-EINVAL);
676         }
677
678         if (IS_ERR(req))
679                 RETURN(PTR_ERR(req));
680         pill = &req->rq_pill;
681
682         /* It is important to obtain rpc_lock first (if applicable), so that
683          * threads that are serialised with rpc_lock are not polluting our
684          * rpcs in flight counter. We do not do flock request limiting, though*/
685         if (it) {
686                 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
687                 mdc_enter_request(&obddev->u.cli);
688         }
689
690         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
691                               0, lockh, 0);
692         if (reqp)
693                 *reqp = req;
694
695         if (it) {
696                 mdc_exit_request(&obddev->u.cli);
697                 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
698         }
699         if (!it) {
700                 /* For flock requests we immediatelly return without further
701                    delay and let caller deal with the rest, since rest of
702                    this function metadata processing makes no sense for flock
703                    requests anyway */
704                 RETURN(rc);
705         }
706
707         if (rc < 0) {
708                 CERROR("ldlm_cli_enqueue: %d\n", rc);
709                 mdc_clear_replay_flag(req, rc);
710                 ptlrpc_req_finished(req);
711                 RETURN(rc);
712         }
713         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
714
715         RETURN(rc);
716 }
717
718 static int mdc_finish_intent_lock(struct obd_export *exp,
719                                   struct ptlrpc_request *request,
720                                   struct md_op_data *op_data,
721                                   struct lookup_intent *it,
722                                   struct lustre_handle *lockh)
723 {
724         struct lustre_handle old_lock;
725         struct mdt_body *mdt_body;
726         struct ldlm_lock *lock;
727         int rc;
728
729
730         LASSERT(request != NULL);
731         LASSERT(request != LP_POISON);
732         LASSERT(request->rq_repmsg != LP_POISON);
733
734         if (!it_disposition(it, DISP_IT_EXECD)) {
735                 /* The server failed before it even started executing the
736                  * intent, i.e. because it couldn't unpack the request. */
737                 LASSERT(it->d.lustre.it_status != 0);
738                 RETURN(it->d.lustre.it_status);
739         }
740         rc = it_open_error(DISP_IT_EXECD, it);
741         if (rc)
742                 RETURN(rc);
743
744         mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
745         LASSERT(mdt_body != NULL);      /* mdc_enqueue checked */
746
747         /* If we were revalidating a fid/name pair, mark the intent in
748          * case we fail and get called again from lookup */
749         if (fid_is_sane(&op_data->op_fid2) &&
750             it->it_create_mode & M_CHECK_STALE &&
751             it->it_op != IT_GETATTR) {
752                 it_set_disposition(it, DISP_ENQ_COMPLETE);
753
754                 /* Also: did we find the same inode? */
755                 /* sever can return one of two fids:
756                  * op_fid2 - new allocated fid - if file is created.
757                  * op_fid3 - existent fid - if file only open.
758                  * op_fid3 is saved in lmv_intent_open */
759                 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
760                     (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
761                         CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
762                                "\n", PFID(&op_data->op_fid2),
763                                PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
764                         RETURN(-ESTALE);
765                 }
766         }
767
768         rc = it_open_error(DISP_LOOKUP_EXECD, it);
769         if (rc)
770                 RETURN(rc);
771
772         /* keep requests around for the multiple phases of the call
773          * this shows the DISP_XX must guarantee we make it into the call
774          */
775         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
776             it_disposition(it, DISP_OPEN_CREATE) &&
777             !it_open_error(DISP_OPEN_CREATE, it)) {
778                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
779                 ptlrpc_request_addref(request); /* balanced in ll_create_node */
780         }
781         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
782             it_disposition(it, DISP_OPEN_OPEN) &&
783             !it_open_error(DISP_OPEN_OPEN, it)) {
784                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
785                 ptlrpc_request_addref(request); /* balanced in ll_file_open */
786                 /* BUG 11546 - eviction in the middle of open rpc processing */
787                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
788         }
789
790         if (it->it_op & IT_CREAT) {
791                 /* XXX this belongs in ll_create_it */
792         } else if (it->it_op == IT_OPEN) {
793                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
794         } else {
795                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
796         }
797
798         /* If we already have a matching lock, then cancel the new
799          * one.  We have to set the data here instead of in
800          * mdc_enqueue, because we need to use the child's inode as
801          * the l_ast_data to match, and that's not available until
802          * intent_finish has performed the iget().) */
803         lock = ldlm_handle2lock(lockh);
804         if (lock) {
805                 ldlm_policy_data_t policy = lock->l_policy_data;
806                 LDLM_DEBUG(lock, "matching against this");
807
808                 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
809                                          &lock->l_resource->lr_name),
810                          "Lock res_id: %lu/%lu/%lu, fid: %lu/%lu/%lu.\n",
811                          (unsigned long)lock->l_resource->lr_name.name[0],
812                          (unsigned long)lock->l_resource->lr_name.name[1],
813                          (unsigned long)lock->l_resource->lr_name.name[2],
814                          (unsigned long)fid_seq(&mdt_body->fid1),
815                          (unsigned long)fid_oid(&mdt_body->fid1),
816                          (unsigned long)fid_ver(&mdt_body->fid1));
817                 LDLM_LOCK_PUT(lock);
818
819                 memcpy(&old_lock, lockh, sizeof(*lockh));
820                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
821                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
822                         ldlm_lock_decref_and_cancel(lockh,
823                                                     it->d.lustre.it_lock_mode);
824                         memcpy(lockh, &old_lock, sizeof(old_lock));
825                         it->d.lustre.it_lock_handle = lockh->cookie;
826                 }
827         }
828         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
829                op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
830                it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
831         RETURN(rc);
832 }
833
834 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
835                         struct lu_fid *fid, __u32 *bits)
836 {
837         /* We could just return 1 immediately, but since we should only
838          * be called in revalidate_it if we already have a lock, let's
839          * verify that. */
840         struct ldlm_res_id res_id;
841         struct lustre_handle lockh;
842         ldlm_policy_data_t policy;
843         ldlm_mode_t mode;
844         ENTRY;
845
846         fid_build_reg_res_name(fid, &res_id);
847         /* As not all attributes are kept under update lock, e.g.
848            owner/group/acls are under lookup lock, we need both
849            ibits for GETATTR. */
850         policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
851                 MDS_INODELOCK_UPDATE : MDS_INODELOCK_LOOKUP;
852
853         mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
854                                LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
855                                &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0);
856         if (mode) {
857                 it->d.lustre.it_lock_handle = lockh.cookie;
858                 it->d.lustre.it_lock_mode = mode;
859                 if (bits) {
860                         struct ldlm_lock *lock = ldlm_handle2lock(&lockh);
861
862                         LASSERT(lock != NULL);
863                         *bits = lock->l_policy_data.l_inodebits.bits; 
864                         LDLM_LOCK_PUT(lock);
865                 }
866         }
867
868         RETURN(!!mode);
869 }
870
871 /*
872  * This long block is all about fixing up the lock and request state
873  * so that it is correct as of the moment _before_ the operation was
874  * applied; that way, the VFS will think that everything is normal and
875  * call Lustre's regular VFS methods.
876  *
877  * If we're performing a creation, that means that unless the creation
878  * failed with EEXIST, we should fake up a negative dentry.
879  *
880  * For everything else, we want to lookup to succeed.
881  *
882  * One additional note: if CREATE or OPEN succeeded, we add an extra
883  * reference to the request because we need to keep it around until
884  * ll_create/ll_open gets called.
885  *
886  * The server will return to us, in it_disposition, an indication of
887  * exactly what d.lustre.it_status refers to.
888  *
889  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
890  * otherwise if DISP_OPEN_CREATE is set, then it status is the
891  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
892  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
893  * was successful.
894  *
895  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
896  * child lookup.
897  */
898 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
899                     void *lmm, int lmmsize, struct lookup_intent *it,
900                     int lookup_flags, struct ptlrpc_request **reqp,
901                     ldlm_blocking_callback cb_blocking,
902                     int extra_lock_flags)
903 {
904         struct lustre_handle lockh;
905         int rc = 0;
906         ENTRY;
907         LASSERT(it);
908
909         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
910                ", intent: %s flags %#o\n", op_data->op_namelen,
911                op_data->op_name, PFID(&op_data->op_fid2),
912                PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
913                it->it_flags);
914
915         lockh.cookie = 0;
916         if (fid_is_sane(&op_data->op_fid2) &&
917             (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
918                 /* We could just return 1 immediately, but since we should only
919                  * be called in revalidate_it if we already have a lock, let's
920                  * verify that. */
921                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
922                 /* Only return failure if it was not GETATTR by cfid
923                    (from inode_revalidate) */
924                 if (rc || op_data->op_namelen != 0)
925                         RETURN(rc);
926         }
927
928         /* lookup_it may be called only after revalidate_it has run, because
929          * revalidate_it cannot return errors, only zero.  Returning zero causes
930          * this call to lookup, which *can* return an error.
931          *
932          * We only want to execute the request associated with the intent one
933          * time, however, so don't send the request again.  Instead, skip past
934          * this and use the request from revalidate.  In this case, revalidate
935          * never dropped its reference, so the refcounts are all OK */
936         if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
937                 struct ldlm_enqueue_info einfo =
938                         { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
939                           ldlm_completion_ast, NULL, NULL, NULL };
940
941                 /* For case if upper layer did not alloc fid, do it now. */
942                 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
943                         rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
944                         if (rc < 0) {
945                                 CERROR("Can't alloc new fid, rc %d\n", rc);
946                                 RETURN(rc);
947                         }
948                 }
949                 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
950                                  lmm, lmmsize, NULL, extra_lock_flags);
951                 if (rc < 0)
952                         RETURN(rc);
953         } else if (!fid_is_sane(&op_data->op_fid2) ||
954                    !(it->it_create_mode & M_CHECK_STALE)) {
955                 /* DISP_ENQ_COMPLETE set means there is extra reference on
956                  * request referenced from this intent, saved for subsequent
957                  * lookup.  This path is executed when we proceed to this
958                  * lookup, so we clear DISP_ENQ_COMPLETE */
959                 it_clear_disposition(it, DISP_ENQ_COMPLETE);
960         }
961         *reqp = it->d.lustre.it_data;
962         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
963         RETURN(rc);
964 }
965
966 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
967                                               struct ptlrpc_request *req,
968                                               void *unused, int rc)
969 {
970         struct obd_export        *exp = req->rq_async_args.pointer_arg[0];
971         struct md_enqueue_info   *minfo = req->rq_async_args.pointer_arg[1];
972         struct ldlm_enqueue_info *einfo = req->rq_async_args.pointer_arg[2];
973         struct lookup_intent     *it;
974         struct lustre_handle     *lockh;
975         struct obd_device        *obddev;
976         int                       flags = LDLM_FL_HAS_INTENT;
977         ENTRY;
978
979         it    = &minfo->mi_it;
980         lockh = &minfo->mi_lockh;
981
982         obddev = class_exp2obd(exp);
983
984         mdc_exit_request(&obddev->u.cli);
985         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
986                 rc = -ETIMEDOUT;
987
988         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
989                                    &flags, NULL, 0, lockh, rc);
990         if (rc < 0) {
991                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
992                 mdc_clear_replay_flag(req, rc);
993                 GOTO(out, rc);
994         }
995
996         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
997         if (rc)
998                 GOTO(out, rc);
999
1000         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1001         EXIT;
1002
1003 out:
1004         OBD_FREE_PTR(einfo);
1005         minfo->mi_cb(req, minfo, rc);
1006         return 0;
1007 }
1008
1009 int mdc_intent_getattr_async(struct obd_export *exp,
1010                              struct md_enqueue_info *minfo,
1011                              struct ldlm_enqueue_info *einfo)
1012 {
1013         struct md_op_data       *op_data = &minfo->mi_data;
1014         struct lookup_intent    *it = &minfo->mi_it;
1015         struct ptlrpc_request   *req;
1016         struct obd_device       *obddev = class_exp2obd(exp);
1017         struct ldlm_res_id       res_id;
1018         ldlm_policy_data_t       policy = {
1019                                         .l_inodebits = { MDS_INODELOCK_LOOKUP }
1020                                  };
1021         int                      rc;
1022         int                      flags = LDLM_FL_HAS_INTENT;
1023         ENTRY;
1024
1025         CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#o\n",
1026                op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1027                ldlm_it2str(it->it_op), it->it_flags);
1028
1029         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1030         req = mdc_intent_getattr_pack(exp, it, op_data);
1031         if (!req)
1032                 RETURN(-ENOMEM);
1033
1034         mdc_enter_request(&obddev->u.cli);
1035         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1036                               0, &minfo->mi_lockh, 1);
1037         if (rc < 0) {
1038                 mdc_exit_request(&obddev->u.cli);
1039                 RETURN(rc);
1040         }
1041
1042         req->rq_async_args.pointer_arg[0] = exp;
1043         req->rq_async_args.pointer_arg[1] = minfo;
1044         req->rq_async_args.pointer_arg[2] = einfo;
1045         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1046         ptlrpcd_add_req(req, PSCOPE_OTHER);
1047
1048         RETURN(0);
1049 }