Whamcloud - gitweb
b=23120 eliminate defunct changelog_send_thread processes
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_MDC
41
42 #ifdef __KERNEL__
43 # include <linux/module.h>
44 # include <linux/pagemap.h>
45 # include <linux/miscdevice.h>
46 # include <linux/init.h>
47 #else
48 # include <liblustre.h>
49 #endif
50
51 #include <lustre_acl.h>
52 #include <obd_class.h>
53 #include <lustre_dlm.h>
54 /* fid_res_name_eq() */
55 #include <lustre_fid.h>
56 #include <lprocfs_status.h>
57 #include "mdc_internal.h"
58
59 int it_disposition(struct lookup_intent *it, int flag)
60 {
61         return it->d.lustre.it_disposition & flag;
62 }
63 EXPORT_SYMBOL(it_disposition);
64
65 void it_set_disposition(struct lookup_intent *it, int flag)
66 {
67         it->d.lustre.it_disposition |= flag;
68 }
69 EXPORT_SYMBOL(it_set_disposition);
70
71 void it_clear_disposition(struct lookup_intent *it, int flag)
72 {
73         it->d.lustre.it_disposition &= ~flag;
74 }
75 EXPORT_SYMBOL(it_clear_disposition);
76
77 int it_open_error(int phase, struct lookup_intent *it)
78 {
79         if (it_disposition(it, DISP_OPEN_OPEN)) {
80                 if (phase >= DISP_OPEN_OPEN)
81                         return it->d.lustre.it_status;
82                 else
83                         return 0;
84         }
85
86         if (it_disposition(it, DISP_OPEN_CREATE)) {
87                 if (phase >= DISP_OPEN_CREATE)
88                         return it->d.lustre.it_status;
89                 else
90                         return 0;
91         }
92
93         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
94                 if (phase >= DISP_LOOKUP_EXECD)
95                         return it->d.lustre.it_status;
96                 else
97                         return 0;
98         }
99
100         if (it_disposition(it, DISP_IT_EXECD)) {
101                 if (phase >= DISP_IT_EXECD)
102                         return it->d.lustre.it_status;
103                 else
104                         return 0;
105         }
106         CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
107                it->d.lustre.it_status);
108         LBUG();
109         return 0;
110 }
111 EXPORT_SYMBOL(it_open_error);
112
113 /* this must be called on a lockh that is known to have a referenced lock */
114 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
115                       __u32 *bits)
116 {
117         struct ldlm_lock *lock;
118         ENTRY;
119
120         if(bits)
121                 *bits = 0;
122
123         if (!*lockh) {
124                 EXIT;
125                 RETURN(0);
126         }
127
128         lock = ldlm_handle2lock((struct lustre_handle *)lockh);
129
130         LASSERT(lock != NULL);
131         lock_res_and_lock(lock);
132 #ifdef __KERNEL__
133         if (lock->l_ast_data && lock->l_ast_data != data) {
134                 struct inode *new_inode = data;
135                 struct inode *old_inode = lock->l_ast_data;
136                 LASSERTF(old_inode->i_state & I_FREEING,
137                          "Found existing inode %p/%lu/%u state %lu in lock: "
138                          "setting data to %p/%lu/%u\n", old_inode,
139                          old_inode->i_ino, old_inode->i_generation,
140                          old_inode->i_state,
141                          new_inode, new_inode->i_ino, new_inode->i_generation);
142         }
143 #endif
144         lock->l_ast_data = data;
145         if (bits)
146                 *bits = lock->l_policy_data.l_inodebits.bits;
147
148         unlock_res_and_lock(lock);
149         LDLM_LOCK_PUT(lock);
150
151         RETURN(0);
152 }
153
154 ldlm_mode_t mdc_lock_match(struct obd_export *exp, int flags,
155                            const struct lu_fid *fid, ldlm_type_t type,
156                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
157                            struct lustre_handle *lockh)
158 {
159         struct ldlm_res_id res_id;
160         ldlm_mode_t rc;
161         ENTRY;
162
163         fid_build_reg_res_name(fid, &res_id);
164         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
165                              &res_id, type, policy, mode, lockh, 0);
166         RETURN(rc);
167 }
168
169 int mdc_cancel_unused(struct obd_export *exp,
170                       const struct lu_fid *fid,
171                       ldlm_policy_data_t *policy,
172                       ldlm_mode_t mode,
173                       ldlm_cancel_flags_t flags,
174                       void *opaque)
175 {
176         struct ldlm_res_id res_id;
177         struct obd_device *obd = class_exp2obd(exp);
178         int rc;
179
180         ENTRY;
181
182         fid_build_reg_res_name(fid, &res_id);
183         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
184                                              policy, mode, flags, opaque);
185         RETURN(rc);
186 }
187
188 int mdc_change_cbdata(struct obd_export *exp,
189                       const struct lu_fid *fid,
190                       ldlm_iterator_t it, void *data)
191 {
192         struct ldlm_res_id res_id;
193         ENTRY;
194
195         fid_build_reg_res_name(fid, &res_id);
196         ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace,
197                               &res_id, it, data);
198
199         EXIT;
200         return 0;
201 }
202
203 /* find any ldlm lock of the inode in mdc
204  * return 0    not find
205  *        1    find one
206  *      < 0    error */
207 int mdc_find_cbdata(struct obd_export *exp,
208                     const struct lu_fid *fid,
209                     ldlm_iterator_t it, void *data)
210 {
211         struct ldlm_res_id res_id;
212         int rc = 0;
213         ENTRY;
214
215         fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
216         rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
217                                    it, data);
218         if (rc == LDLM_ITER_STOP)
219                 RETURN(1);
220         else if (rc == LDLM_ITER_CONTINUE)
221                 RETURN(0);
222         RETURN(rc);
223 }
224
225 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
226 {
227         /* Don't hold error requests for replay. */
228         if (req->rq_replay) {
229                 cfs_spin_lock(&req->rq_lock);
230                 req->rq_replay = 0;
231                 cfs_spin_unlock(&req->rq_lock);
232         }
233         if (rc && req->rq_transno != 0) {
234                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
235                 LBUG();
236         }
237 }
238
239 /* Save a large LOV EA into the request buffer so that it is available
240  * for replay.  We don't do this in the initial request because the
241  * original request doesn't need this buffer (at most it sends just the
242  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
243  * buffer and may also be difficult to allocate and save a very large
244  * request buffer for each open. (bug 5707)
245  *
246  * OOM here may cause recovery failure if lmm is needed (only for the
247  * original open if the MDS crashed just when this client also OOM'd)
248  * but this is incredibly unlikely, and questionable whether the client
249  * could do MDS recovery under OOM anyways... */
250 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
251                                 struct mdt_body *body)
252 {
253         int     rc;
254
255         /* FIXME: remove this explicit offset. */
256         rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
257                                         body->eadatasize);
258         if (rc) {
259                 CERROR("Can't enlarge segment %d size to %d\n",
260                        DLM_INTENT_REC_OFF + 4, body->eadatasize);
261                 body->valid &= ~OBD_MD_FLEASIZE;
262                 body->eadatasize = 0;
263         }
264 }
265
266 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
267                                                    struct lookup_intent *it,
268                                                    struct md_op_data *op_data,
269                                                    void *lmm, int lmmsize,
270                                                    void *cb_data)
271 {
272         struct ptlrpc_request *req;
273         struct obd_device     *obddev = class_exp2obd(exp);
274         struct ldlm_intent    *lit;
275         CFS_LIST_HEAD(cancels);
276         int                    count = 0;
277         int                    mode;
278         int                    rc;
279         ENTRY;
280
281         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
282
283         /* XXX: openlock is not cancelled for cross-refs. */
284         /* If inode is known, cancel conflicting OPEN locks. */
285         if (fid_is_sane(&op_data->op_fid2)) {
286                 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
287                         mode = LCK_CW;
288 #ifdef FMODE_EXEC
289                 else if (it->it_flags & FMODE_EXEC)
290                         mode = LCK_PR;
291 #endif
292                 else
293                         mode = LCK_CR;
294                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
295                                                 &cancels, mode,
296                                                 MDS_INODELOCK_OPEN);
297         }
298
299         /* If CREATE, cancel parent's UPDATE lock. */
300         if (it->it_op & IT_CREAT)
301                 mode = LCK_EX;
302         else
303                 mode = LCK_CR;
304         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
305                                          &cancels, mode,
306                                          MDS_INODELOCK_UPDATE);
307
308         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
309                                    &RQF_LDLM_INTENT_OPEN);
310         if (req == NULL) {
311                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
312                 RETURN(ERR_PTR(-ENOMEM));
313         }
314
315         /* parent capability */
316         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
317         /* child capability, reserve the size according to parent capa, it will
318          * be filled after we get the reply */
319         mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
320
321         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
322                              op_data->op_namelen + 1);
323         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
324                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
325
326         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
327         if (rc) {
328                 ptlrpc_request_free(req);
329                 return NULL;
330         }
331
332         cfs_spin_lock(&req->rq_lock);
333         req->rq_replay = req->rq_import->imp_replayable;
334         cfs_spin_unlock(&req->rq_lock);
335
336         /* pack the intent */
337         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
338         lit->opc = (__u64)it->it_op;
339
340         /* pack the intended request */
341         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
342                       lmmsize);
343
344         /* for remote client, fetch remote perm for current user */
345         if (client_is_remote(exp))
346                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
347                                      sizeof(struct mdt_remote_perm));
348         ptlrpc_request_set_replen(req);
349         return req;
350 }
351
352 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
353                                                      struct lookup_intent *it,
354                                                      struct md_op_data *op_data)
355 {
356         struct ptlrpc_request *req;
357         struct obd_device     *obddev = class_exp2obd(exp);
358         struct ldlm_intent    *lit;
359         int                    rc;
360         ENTRY;
361
362         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
363                                    &RQF_LDLM_INTENT_UNLINK);
364         if (req == NULL)
365                 RETURN(ERR_PTR(-ENOMEM));
366
367         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
368         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
369                              op_data->op_namelen + 1);
370
371         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
372         if (rc) {
373                 ptlrpc_request_free(req);
374                 RETURN(ERR_PTR(rc));
375         }
376
377         /* pack the intent */
378         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
379         lit->opc = (__u64)it->it_op;
380
381         /* pack the intended request */
382         mdc_unlink_pack(req, op_data);
383
384         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
385                              obddev->u.cli.cl_max_mds_easize);
386         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
387                              obddev->u.cli.cl_max_mds_cookiesize);
388         ptlrpc_request_set_replen(req);
389         RETURN(req);
390 }
391
392 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
393                                                       struct lookup_intent *it,
394                                                       struct md_op_data *op_data)
395 {
396         struct ptlrpc_request *req;
397         struct obd_device     *obddev = class_exp2obd(exp);
398         obd_valid              valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
399                                        OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
400                                        OBD_MD_FLMDSCAPA | OBD_MD_MEA |
401                                        (client_is_remote(exp) ?
402                                                OBD_MD_FLRMTPERM : OBD_MD_FLACL);
403         struct ldlm_intent    *lit;
404         int                    rc;
405         ENTRY;
406
407         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
408                                    &RQF_LDLM_INTENT_GETATTR);
409         if (req == NULL)
410                 RETURN(ERR_PTR(-ENOMEM));
411
412         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
413         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
414                              op_data->op_namelen + 1);
415
416         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
417         if (rc) {
418                 ptlrpc_request_free(req);
419                 RETURN(ERR_PTR(rc));
420         }
421
422         /* pack the intent */
423         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
424         lit->opc = (__u64)it->it_op;
425
426         /* pack the intended request */
427         mdc_getattr_pack(req, valid, it->it_flags, op_data);
428
429         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
430                              obddev->u.cli.cl_max_mds_easize);
431         if (client_is_remote(exp))
432                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
433                                      sizeof(struct mdt_remote_perm));
434         ptlrpc_request_set_replen(req);
435         RETURN(req);
436 }
437
438 static struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp)
439 {
440         struct ptlrpc_request *req;
441         int rc;
442         ENTRY;
443
444         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
445         if (req == NULL)
446                 RETURN(ERR_PTR(-ENOMEM));
447
448         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
449         if (rc) {
450                 ptlrpc_request_free(req);
451                 RETURN(ERR_PTR(rc));
452         }
453
454         ptlrpc_request_set_replen(req);
455         RETURN(req);
456 }
457
458 static int mdc_finish_enqueue(struct obd_export *exp,
459                               struct ptlrpc_request *req,
460                               struct ldlm_enqueue_info *einfo,
461                               struct lookup_intent *it,
462                               struct lustre_handle *lockh,
463                               int rc)
464 {
465         struct req_capsule  *pill = &req->rq_pill;
466         struct ldlm_request *lockreq;
467         struct ldlm_reply   *lockrep;
468         ENTRY;
469
470         LASSERT(rc >= 0);
471         /* Similarly, if we're going to replay this request, we don't want to
472          * actually get a lock, just perform the intent. */
473         if (req->rq_transno || req->rq_replay) {
474                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
475                 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
476         }
477
478         if (rc == ELDLM_LOCK_ABORTED) {
479                 einfo->ei_mode = 0;
480                 memset(lockh, 0, sizeof(*lockh));
481                 rc = 0;
482         } else { /* rc = 0 */
483                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
484                 LASSERT(lock);
485
486                 /* If the server gave us back a different lock mode, we should
487                  * fix up our variables. */
488                 if (lock->l_req_mode != einfo->ei_mode) {
489                         ldlm_lock_addref(lockh, lock->l_req_mode);
490                         ldlm_lock_decref(lockh, einfo->ei_mode);
491                         einfo->ei_mode = lock->l_req_mode;
492                 }
493                 LDLM_LOCK_PUT(lock);
494         }
495
496         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
497         LASSERT(lockrep != NULL);                 /* checked by ldlm_cli_enqueue() */
498
499         it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
500         it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
501         it->d.lustre.it_lock_mode = einfo->ei_mode;
502         it->d.lustre.it_lock_handle = lockh->cookie;
503         it->d.lustre.it_data = req;
504
505         if (it->d.lustre.it_status < 0 && req->rq_replay)
506                 mdc_clear_replay_flag(req, it->d.lustre.it_status);
507
508         /* If we're doing an IT_OPEN which did not result in an actual
509          * successful open, then we need to remove the bit which saves
510          * this request for unconditional replay.
511          *
512          * It's important that we do this first!  Otherwise we might exit the
513          * function without doing so, and try to replay a failed create
514          * (bug 3440) */
515         if (it->it_op & IT_OPEN && req->rq_replay &&
516             (!it_disposition(it, DISP_OPEN_OPEN) ||it->d.lustre.it_status != 0))
517                 mdc_clear_replay_flag(req, it->d.lustre.it_status);
518
519         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
520                   it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
521
522         /* We know what to expect, so we do any byte flipping required here */
523         if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
524                 struct mdt_body *body;
525
526                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
527                 if (body == NULL) {
528                         CERROR ("Can't swab mdt_body\n");
529                         RETURN (-EPROTO);
530                 }
531
532                 if (it_disposition(it, DISP_OPEN_OPEN) &&
533                     !it_open_error(DISP_OPEN_OPEN, it)) {
534                         /*
535                          * If this is a successful OPEN request, we need to set
536                          * replay handler and data early, so that if replay
537                          * happens immediately after swabbing below, new reply
538                          * is swabbed by that handler correctly.
539                          */
540                         mdc_set_open_replay_data(NULL, NULL, req);
541                 }
542
543                 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
544                         void *eadata;
545
546                          mdc_update_max_ea_from_body(exp, body);
547
548                         /*
549                          * The eadata is opaque; just check that it is there.
550                          * Eventually, obd_unpackmd() will check the contents.
551                          */
552                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
553                                                               body->eadatasize);
554                         if (eadata == NULL)
555                                 RETURN(-EPROTO);
556
557                         /*
558                          * We save the reply LOV EA in case we have to replay a
559                          * create for recovery.  If we didn't allocate a large
560                          * enough request buffer above we need to reallocate it
561                          * here to hold the actual LOV EA.
562                          *
563                          * To not save LOV EA if request is not going to replay
564                          * (for example error one).
565                          */
566                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
567                                 void *lmm;
568                                 if (req_capsule_get_size(pill, &RMF_EADATA,
569                                                          RCL_CLIENT) <
570                                     body->eadatasize)
571                                         mdc_realloc_openmsg(req, body);
572                                 else
573                                         req_capsule_shrink(pill, &RMF_EADATA,
574                                                            body->eadatasize,
575                                                            RCL_CLIENT);
576
577                                 req_capsule_set_size(pill, &RMF_EADATA,
578                                                      RCL_CLIENT,
579                                                      body->eadatasize);
580
581                                 lmm = req_capsule_client_get(pill, &RMF_EADATA);
582                                 if (lmm)
583                                         memcpy(lmm, eadata, body->eadatasize);
584                         }
585                 }
586
587                 if (body->valid & OBD_MD_FLRMTPERM) {
588                         struct mdt_remote_perm *perm;
589
590                         LASSERT(client_is_remote(exp));
591                         perm = req_capsule_server_swab_get(pill, &RMF_ACL,
592                                                 lustre_swab_mdt_remote_perm);
593                         if (perm == NULL)
594                                 RETURN(-EPROTO);
595                 }
596                 if (body->valid & OBD_MD_FLMDSCAPA) {
597                         struct lustre_capa *capa, *p;
598
599                         capa = req_capsule_server_get(pill, &RMF_CAPA1);
600                         if (capa == NULL)
601                                 RETURN(-EPROTO);
602
603                         if (it->it_op & IT_OPEN) {
604                                 /* client fid capa will be checked in replay */
605                                 p = req_capsule_client_get(pill, &RMF_CAPA2);
606                                 LASSERT(p);
607                                 *p = *capa;
608                         }
609                 }
610                 if (body->valid & OBD_MD_FLOSSCAPA) {
611                         struct lustre_capa *capa;
612
613                         capa = req_capsule_server_get(pill, &RMF_CAPA2);
614                         if (capa == NULL)
615                                 RETURN(-EPROTO);
616                 }
617         }
618
619         RETURN(rc);
620 }
621
622 /* We always reserve enough space in the reply packet for a stripe MD, because
623  * we don't know in advance the file type. */
624 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
625                 struct lookup_intent *it, struct md_op_data *op_data,
626                 struct lustre_handle *lockh, void *lmm, int lmmsize,
627                 struct ptlrpc_request **reqp, int extra_lock_flags)
628 {
629         struct obd_device     *obddev = class_exp2obd(exp);
630         struct ptlrpc_request *req = NULL;
631         struct req_capsule    *pill;
632         int                    flags = extra_lock_flags;
633         int                    rc;
634         struct ldlm_res_id res_id;
635         static const ldlm_policy_data_t lookup_policy =
636                             { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
637         static const ldlm_policy_data_t update_policy =
638                             { .l_inodebits = { MDS_INODELOCK_UPDATE } };
639         ldlm_policy_data_t const *policy = &lookup_policy;
640         ENTRY;
641
642         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
643                  einfo->ei_type);
644
645         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
646
647         if (it)
648                 flags |= LDLM_FL_HAS_INTENT;
649         if (it && it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
650                 policy = &update_policy;
651
652         if (reqp)
653                 req = *reqp;
654
655         if (!it) {
656                 /* The only way right now is FLOCK, in this case we hide flock
657                    policy as lmm, but lmmsize is 0 */
658                 LASSERT(lmm && lmmsize == 0);
659                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
660                          einfo->ei_type);
661                 policy = (ldlm_policy_data_t *)lmm;
662                 res_id.name[3] = LDLM_FLOCK;
663         } else if (it->it_op & IT_OPEN) {
664                 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
665                                            einfo->ei_cbdata);
666                 policy = &update_policy;
667                 einfo->ei_cbdata = NULL;
668                 lmm = NULL;
669         } else if (it->it_op & IT_UNLINK)
670                 req = mdc_intent_unlink_pack(exp, it, op_data);
671         else if (it->it_op & (IT_GETATTR | IT_LOOKUP))
672                 req = mdc_intent_getattr_pack(exp, it, op_data);
673         else if (it->it_op == IT_READDIR)
674                 req = ldlm_enqueue_pack(exp);
675         else {
676                 LBUG();
677                 RETURN(-EINVAL);
678         }
679
680         if (IS_ERR(req))
681                 RETURN(PTR_ERR(req));
682         pill = &req->rq_pill;
683
684         /* It is important to obtain rpc_lock first (if applicable), so that
685          * threads that are serialised with rpc_lock are not polluting our
686          * rpcs in flight counter. We do not do flock request limiting, though*/
687         if (it) {
688                 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
689                 mdc_enter_request(&obddev->u.cli);
690         }
691
692         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
693                               0, lockh, 0);
694         if (reqp)
695                 *reqp = req;
696
697         if (it) {
698                 mdc_exit_request(&obddev->u.cli);
699                 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
700         }
701         if (!it) {
702                 /* For flock requests we immediatelly return without further
703                    delay and let caller deal with the rest, since rest of
704                    this function metadata processing makes no sense for flock
705                    requests anyway */
706                 RETURN(rc);
707         }
708
709         if (rc < 0) {
710                 CERROR("ldlm_cli_enqueue: %d\n", rc);
711                 mdc_clear_replay_flag(req, rc);
712                 ptlrpc_req_finished(req);
713                 RETURN(rc);
714         }
715         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
716
717         RETURN(rc);
718 }
719
720 static int mdc_finish_intent_lock(struct obd_export *exp,
721                                   struct ptlrpc_request *request,
722                                   struct md_op_data *op_data,
723                                   struct lookup_intent *it,
724                                   struct lustre_handle *lockh)
725 {
726         struct lustre_handle old_lock;
727         struct mdt_body *mdt_body;
728         struct ldlm_lock *lock;
729         int rc;
730
731
732         LASSERT(request != NULL);
733         LASSERT(request != LP_POISON);
734         LASSERT(request->rq_repmsg != LP_POISON);
735
736         if (!it_disposition(it, DISP_IT_EXECD)) {
737                 /* The server failed before it even started executing the
738                  * intent, i.e. because it couldn't unpack the request. */
739                 LASSERT(it->d.lustre.it_status != 0);
740                 RETURN(it->d.lustre.it_status);
741         }
742         rc = it_open_error(DISP_IT_EXECD, it);
743         if (rc)
744                 RETURN(rc);
745
746         mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
747         LASSERT(mdt_body != NULL);      /* mdc_enqueue checked */
748
749         /* If we were revalidating a fid/name pair, mark the intent in
750          * case we fail and get called again from lookup */
751         if (fid_is_sane(&op_data->op_fid2) &&
752             it->it_create_mode & M_CHECK_STALE &&
753             it->it_op != IT_GETATTR) {
754                 it_set_disposition(it, DISP_ENQ_COMPLETE);
755
756                 /* Also: did we find the same inode? */
757                 /* sever can return one of two fids:
758                  * op_fid2 - new allocated fid - if file is created.
759                  * op_fid3 - existent fid - if file only open.
760                  * op_fid3 is saved in lmv_intent_open */
761                 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
762                     (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
763                         CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
764                                "\n", PFID(&op_data->op_fid2),
765                                PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
766                         RETURN(-ESTALE);
767                 }
768         }
769
770         rc = it_open_error(DISP_LOOKUP_EXECD, it);
771         if (rc)
772                 RETURN(rc);
773
774         /* keep requests around for the multiple phases of the call
775          * this shows the DISP_XX must guarantee we make it into the call
776          */
777         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
778             it_disposition(it, DISP_OPEN_CREATE) &&
779             !it_open_error(DISP_OPEN_CREATE, it)) {
780                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
781                 ptlrpc_request_addref(request); /* balanced in ll_create_node */
782         }
783         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
784             it_disposition(it, DISP_OPEN_OPEN) &&
785             !it_open_error(DISP_OPEN_OPEN, it)) {
786                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
787                 ptlrpc_request_addref(request); /* balanced in ll_file_open */
788                 /* BUG 11546 - eviction in the middle of open rpc processing */
789                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
790         }
791
792         if (it->it_op & IT_CREAT) {
793                 /* XXX this belongs in ll_create_it */
794         } else if (it->it_op == IT_OPEN) {
795                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
796         } else {
797                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP));
798         }
799
800         /* If we already have a matching lock, then cancel the new
801          * one.  We have to set the data here instead of in
802          * mdc_enqueue, because we need to use the child's inode as
803          * the l_ast_data to match, and that's not available until
804          * intent_finish has performed the iget().) */
805         lock = ldlm_handle2lock(lockh);
806         if (lock) {
807                 ldlm_policy_data_t policy = lock->l_policy_data;
808                 LDLM_DEBUG(lock, "matching against this");
809
810                 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
811                                          &lock->l_resource->lr_name),
812                          "Lock res_id: %lu/%lu/%lu, fid: %lu/%lu/%lu.\n",
813                          (unsigned long)lock->l_resource->lr_name.name[0],
814                          (unsigned long)lock->l_resource->lr_name.name[1],
815                          (unsigned long)lock->l_resource->lr_name.name[2],
816                          (unsigned long)fid_seq(&mdt_body->fid1),
817                          (unsigned long)fid_oid(&mdt_body->fid1),
818                          (unsigned long)fid_ver(&mdt_body->fid1));
819                 LDLM_LOCK_PUT(lock);
820
821                 memcpy(&old_lock, lockh, sizeof(*lockh));
822                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
823                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
824                         ldlm_lock_decref_and_cancel(lockh,
825                                                     it->d.lustre.it_lock_mode);
826                         memcpy(lockh, &old_lock, sizeof(old_lock));
827                         it->d.lustre.it_lock_handle = lockh->cookie;
828                 }
829         }
830         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
831                op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
832                it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
833         RETURN(rc);
834 }
835
836 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
837                         struct lu_fid *fid, __u32 *bits)
838 {
839         /* We could just return 1 immediately, but since we should only
840          * be called in revalidate_it if we already have a lock, let's
841          * verify that. */
842         struct ldlm_res_id res_id;
843         struct lustre_handle lockh;
844         ldlm_policy_data_t policy;
845         ldlm_mode_t mode;
846         ENTRY;
847
848         fid_build_reg_res_name(fid, &res_id);
849         /* Firstly consider the bits */
850         if (bits && *bits)
851                 policy.l_inodebits.bits = *bits;
852         else
853                 policy.l_inodebits.bits = (it->it_op == IT_GETATTR) ?
854                         MDS_INODELOCK_UPDATE : MDS_INODELOCK_LOOKUP;
855
856         mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
857                                LDLM_FL_BLOCK_GRANTED, &res_id, LDLM_IBITS,
858                                &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0);
859         if (mode) {
860                 it->d.lustre.it_lock_handle = lockh.cookie;
861                 it->d.lustre.it_lock_mode = mode;
862                 if (bits) {
863                         struct ldlm_lock *lock = ldlm_handle2lock(&lockh);
864
865                         LASSERT(lock != NULL);
866                         *bits = lock->l_policy_data.l_inodebits.bits; 
867                         LDLM_LOCK_PUT(lock);
868                 }
869         }
870
871         RETURN(!!mode);
872 }
873
874 /*
875  * This long block is all about fixing up the lock and request state
876  * so that it is correct as of the moment _before_ the operation was
877  * applied; that way, the VFS will think that everything is normal and
878  * call Lustre's regular VFS methods.
879  *
880  * If we're performing a creation, that means that unless the creation
881  * failed with EEXIST, we should fake up a negative dentry.
882  *
883  * For everything else, we want to lookup to succeed.
884  *
885  * One additional note: if CREATE or OPEN succeeded, we add an extra
886  * reference to the request because we need to keep it around until
887  * ll_create/ll_open gets called.
888  *
889  * The server will return to us, in it_disposition, an indication of
890  * exactly what d.lustre.it_status refers to.
891  *
892  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
893  * otherwise if DISP_OPEN_CREATE is set, then it status is the
894  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
895  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
896  * was successful.
897  *
898  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
899  * child lookup.
900  */
901 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
902                     void *lmm, int lmmsize, struct lookup_intent *it,
903                     int lookup_flags, struct ptlrpc_request **reqp,
904                     ldlm_blocking_callback cb_blocking,
905                     int extra_lock_flags)
906 {
907         struct lustre_handle lockh;
908         int rc = 0;
909         ENTRY;
910         LASSERT(it);
911
912         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
913                ", intent: %s flags %#o\n", op_data->op_namelen,
914                op_data->op_name, PFID(&op_data->op_fid2),
915                PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
916                it->it_flags);
917
918         lockh.cookie = 0;
919         if (fid_is_sane(&op_data->op_fid2) &&
920             (it->it_op & (IT_LOOKUP | IT_GETATTR))) {
921                 /* We could just return 1 immediately, but since we should only
922                  * be called in revalidate_it if we already have a lock, let's
923                  * verify that. */
924                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
925                 /* Only return failure if it was not GETATTR by cfid
926                    (from inode_revalidate) */
927                 if (rc || op_data->op_namelen != 0)
928                         RETURN(rc);
929         }
930
931         /* lookup_it may be called only after revalidate_it has run, because
932          * revalidate_it cannot return errors, only zero.  Returning zero causes
933          * this call to lookup, which *can* return an error.
934          *
935          * We only want to execute the request associated with the intent one
936          * time, however, so don't send the request again.  Instead, skip past
937          * this and use the request from revalidate.  In this case, revalidate
938          * never dropped its reference, so the refcounts are all OK */
939         if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
940                 struct ldlm_enqueue_info einfo =
941                         { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
942                           ldlm_completion_ast, NULL, NULL, NULL };
943
944                 /* For case if upper layer did not alloc fid, do it now. */
945                 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
946                         rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
947                         if (rc < 0) {
948                                 CERROR("Can't alloc new fid, rc %d\n", rc);
949                                 RETURN(rc);
950                         }
951                 }
952                 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
953                                  lmm, lmmsize, NULL, extra_lock_flags);
954                 if (rc < 0)
955                         RETURN(rc);
956         } else if (!fid_is_sane(&op_data->op_fid2) ||
957                    !(it->it_create_mode & M_CHECK_STALE)) {
958                 /* DISP_ENQ_COMPLETE set means there is extra reference on
959                  * request referenced from this intent, saved for subsequent
960                  * lookup.  This path is executed when we proceed to this
961                  * lookup, so we clear DISP_ENQ_COMPLETE */
962                 it_clear_disposition(it, DISP_ENQ_COMPLETE);
963         }
964         *reqp = it->d.lustre.it_data;
965         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
966         RETURN(rc);
967 }
968
969 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
970                                               struct ptlrpc_request *req,
971                                               void *unused, int rc)
972 {
973         struct obd_export        *exp = req->rq_async_args.pointer_arg[0];
974         struct md_enqueue_info   *minfo = req->rq_async_args.pointer_arg[1];
975         struct ldlm_enqueue_info *einfo = req->rq_async_args.pointer_arg[2];
976         struct lookup_intent     *it;
977         struct lustre_handle     *lockh;
978         struct obd_device        *obddev;
979         int                       flags = LDLM_FL_HAS_INTENT;
980         ENTRY;
981
982         it    = &minfo->mi_it;
983         lockh = &minfo->mi_lockh;
984
985         obddev = class_exp2obd(exp);
986
987         mdc_exit_request(&obddev->u.cli);
988         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
989                 rc = -ETIMEDOUT;
990
991         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
992                                    &flags, NULL, 0, lockh, rc);
993         if (rc < 0) {
994                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
995                 mdc_clear_replay_flag(req, rc);
996                 GOTO(out, rc);
997         }
998
999         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1000         if (rc)
1001                 GOTO(out, rc);
1002
1003         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1004         EXIT;
1005
1006 out:
1007         OBD_FREE_PTR(einfo);
1008         minfo->mi_cb(req, minfo, rc);
1009         return 0;
1010 }
1011
1012 int mdc_intent_getattr_async(struct obd_export *exp,
1013                              struct md_enqueue_info *minfo,
1014                              struct ldlm_enqueue_info *einfo)
1015 {
1016         struct md_op_data       *op_data = &minfo->mi_data;
1017         struct lookup_intent    *it = &minfo->mi_it;
1018         struct ptlrpc_request   *req;
1019         struct obd_device       *obddev = class_exp2obd(exp);
1020         struct ldlm_res_id       res_id;
1021         /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1022          *     for statahead currently. Consider CMD in future, such two bits
1023          *     maybe managed by different MDS, should be adjusted then. */
1024         ldlm_policy_data_t       policy = {
1025                                         .l_inodebits = { MDS_INODELOCK_LOOKUP | 
1026                                                          MDS_INODELOCK_UPDATE }
1027                                  };
1028         int                      rc;
1029         int                      flags = LDLM_FL_HAS_INTENT;
1030         ENTRY;
1031
1032         CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#o\n",
1033                op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1034                ldlm_it2str(it->it_op), it->it_flags);
1035
1036         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1037         req = mdc_intent_getattr_pack(exp, it, op_data);
1038         if (!req)
1039                 RETURN(-ENOMEM);
1040
1041         mdc_enter_request(&obddev->u.cli);
1042         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1043                               0, &minfo->mi_lockh, 1);
1044         if (rc < 0) {
1045                 mdc_exit_request(&obddev->u.cli);
1046                 RETURN(rc);
1047         }
1048
1049         req->rq_async_args.pointer_arg[0] = exp;
1050         req->rq_async_args.pointer_arg[1] = minfo;
1051         req->rq_async_args.pointer_arg[2] = einfo;
1052         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1053         ptlrpcd_add_req(req, PSCOPE_OTHER);
1054
1055         RETURN(0);
1056 }