Whamcloud - gitweb
LU-1201 checksum: add libcfs crypto hash
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_MDC
38
39 #ifdef __KERNEL__
40 # include <linux/module.h>
41 # include <linux/pagemap.h>
42 # include <linux/miscdevice.h>
43 # include <linux/init.h>
44 #else
45 # include <liblustre.h>
46 #endif
47
48 #include <lustre_acl.h>
49 #include <obd_class.h>
50 #include <lustre_dlm.h>
51 /* fid_res_name_eq() */
52 #include <lustre_fid.h>
53 #include <lprocfs_status.h>
54 #include "mdc_internal.h"
55
56 struct mdc_getattr_args {
57         struct obd_export           *ga_exp;
58         struct md_enqueue_info      *ga_minfo;
59         struct ldlm_enqueue_info    *ga_einfo;
60 };
61
62 int it_disposition(struct lookup_intent *it, int flag)
63 {
64         return it->d.lustre.it_disposition & flag;
65 }
66 EXPORT_SYMBOL(it_disposition);
67
68 void it_set_disposition(struct lookup_intent *it, int flag)
69 {
70         it->d.lustre.it_disposition |= flag;
71 }
72 EXPORT_SYMBOL(it_set_disposition);
73
74 void it_clear_disposition(struct lookup_intent *it, int flag)
75 {
76         it->d.lustre.it_disposition &= ~flag;
77 }
78 EXPORT_SYMBOL(it_clear_disposition);
79
80 int it_open_error(int phase, struct lookup_intent *it)
81 {
82         if (it_disposition(it, DISP_OPEN_OPEN)) {
83                 if (phase >= DISP_OPEN_OPEN)
84                         return it->d.lustre.it_status;
85                 else
86                         return 0;
87         }
88
89         if (it_disposition(it, DISP_OPEN_CREATE)) {
90                 if (phase >= DISP_OPEN_CREATE)
91                         return it->d.lustre.it_status;
92                 else
93                         return 0;
94         }
95
96         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
97                 if (phase >= DISP_LOOKUP_EXECD)
98                         return it->d.lustre.it_status;
99                 else
100                         return 0;
101         }
102
103         if (it_disposition(it, DISP_IT_EXECD)) {
104                 if (phase >= DISP_IT_EXECD)
105                         return it->d.lustre.it_status;
106                 else
107                         return 0;
108         }
109         CERROR("it disp: %X, status: %d\n", it->d.lustre.it_disposition,
110                it->d.lustre.it_status);
111         LBUG();
112         return 0;
113 }
114 EXPORT_SYMBOL(it_open_error);
115
116 /* this must be called on a lockh that is known to have a referenced lock */
117 int mdc_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
118                       __u64 *bits)
119 {
120         struct ldlm_lock *lock;
121         ENTRY;
122
123         if(bits)
124                 *bits = 0;
125
126         if (!*lockh)
127                 RETURN(0);
128
129         lock = ldlm_handle2lock((struct lustre_handle *)lockh);
130
131         LASSERT(lock != NULL);
132         lock_res_and_lock(lock);
133 #ifdef __KERNEL__
134         if (lock->l_ast_data && lock->l_ast_data != data) {
135                 struct inode *new_inode = data;
136                 struct inode *old_inode = lock->l_ast_data;
137                 LASSERTF(old_inode->i_state & I_FREEING,
138                          "Found existing inode %p/%lu/%u state %lu in lock: "
139                          "setting data to %p/%lu/%u\n", old_inode,
140                          old_inode->i_ino, old_inode->i_generation,
141                          old_inode->i_state,
142                          new_inode, new_inode->i_ino, new_inode->i_generation);
143         }
144 #endif
145         lock->l_ast_data = data;
146         if (bits)
147                 *bits = lock->l_policy_data.l_inodebits.bits;
148
149         unlock_res_and_lock(lock);
150         LDLM_LOCK_PUT(lock);
151
152         RETURN(0);
153 }
154
155 ldlm_mode_t mdc_lock_match(struct obd_export *exp, int flags,
156                            const struct lu_fid *fid, ldlm_type_t type,
157                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
158                            struct lustre_handle *lockh)
159 {
160         struct ldlm_res_id res_id;
161         ldlm_mode_t rc;
162         ENTRY;
163
164         fid_build_reg_res_name(fid, &res_id);
165         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
166                              &res_id, type, policy, mode, lockh, 0);
167         RETURN(rc);
168 }
169
170 int mdc_cancel_unused(struct obd_export *exp,
171                       const struct lu_fid *fid,
172                       ldlm_policy_data_t *policy,
173                       ldlm_mode_t mode,
174                       ldlm_cancel_flags_t flags,
175                       void *opaque)
176 {
177         struct ldlm_res_id res_id;
178         struct obd_device *obd = class_exp2obd(exp);
179         int rc;
180
181         ENTRY;
182
183         fid_build_reg_res_name(fid, &res_id);
184         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
185                                              policy, mode, flags, opaque);
186         RETURN(rc);
187 }
188
189 int mdc_change_cbdata(struct obd_export *exp,
190                       const struct lu_fid *fid,
191                       ldlm_iterator_t it, void *data)
192 {
193         struct ldlm_res_id res_id;
194         ENTRY;
195
196         fid_build_reg_res_name(fid, &res_id);
197         ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace,
198                               &res_id, it, data);
199
200         EXIT;
201         return 0;
202 }
203
204 /* find any ldlm lock of the inode in mdc
205  * return 0    not find
206  *        1    find one
207  *      < 0    error */
208 int mdc_find_cbdata(struct obd_export *exp,
209                     const struct lu_fid *fid,
210                     ldlm_iterator_t it, void *data)
211 {
212         struct ldlm_res_id res_id;
213         int rc = 0;
214         ENTRY;
215
216         fid_build_reg_res_name((struct lu_fid*)fid, &res_id);
217         rc = ldlm_resource_iterate(class_exp2obd(exp)->obd_namespace, &res_id,
218                                    it, data);
219         if (rc == LDLM_ITER_STOP)
220                 RETURN(1);
221         else if (rc == LDLM_ITER_CONTINUE)
222                 RETURN(0);
223         RETURN(rc);
224 }
225
226 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
227 {
228         /* Don't hold error requests for replay. */
229         if (req->rq_replay) {
230                 cfs_spin_lock(&req->rq_lock);
231                 req->rq_replay = 0;
232                 cfs_spin_unlock(&req->rq_lock);
233         }
234         if (rc && req->rq_transno != 0) {
235                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
236                 LBUG();
237         }
238 }
239
240 /* Save a large LOV EA into the request buffer so that it is available
241  * for replay.  We don't do this in the initial request because the
242  * original request doesn't need this buffer (at most it sends just the
243  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
244  * buffer and may also be difficult to allocate and save a very large
245  * request buffer for each open. (bug 5707)
246  *
247  * OOM here may cause recovery failure if lmm is needed (only for the
248  * original open if the MDS crashed just when this client also OOM'd)
249  * but this is incredibly unlikely, and questionable whether the client
250  * could do MDS recovery under OOM anyways... */
251 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
252                                 struct mdt_body *body)
253 {
254         int     rc;
255
256         /* FIXME: remove this explicit offset. */
257         rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
258                                         body->eadatasize);
259         if (rc) {
260                 CERROR("Can't enlarge segment %d size to %d\n",
261                        DLM_INTENT_REC_OFF + 4, body->eadatasize);
262                 body->valid &= ~OBD_MD_FLEASIZE;
263                 body->eadatasize = 0;
264         }
265 }
266
267 static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
268                                                    struct lookup_intent *it,
269                                                    struct md_op_data *op_data,
270                                                    void *lmm, int lmmsize,
271                                                    void *cb_data)
272 {
273         struct ptlrpc_request *req;
274         struct obd_device     *obddev = class_exp2obd(exp);
275         struct ldlm_intent    *lit;
276         CFS_LIST_HEAD(cancels);
277         int                    count = 0;
278         int                    mode;
279         int                    rc;
280         ENTRY;
281
282         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
283
284         /* XXX: openlock is not cancelled for cross-refs. */
285         /* If inode is known, cancel conflicting OPEN locks. */
286         if (fid_is_sane(&op_data->op_fid2)) {
287                 if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
288                         mode = LCK_CW;
289 #ifdef FMODE_EXEC
290                 else if (it->it_flags & FMODE_EXEC)
291                         mode = LCK_PR;
292 #endif
293                 else
294                         mode = LCK_CR;
295                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
296                                                 &cancels, mode,
297                                                 MDS_INODELOCK_OPEN);
298         }
299
300         /* If CREATE, cancel parent's UPDATE lock. */
301         if (it->it_op & IT_CREAT)
302                 mode = LCK_EX;
303         else
304                 mode = LCK_CR;
305         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
306                                          &cancels, mode,
307                                          MDS_INODELOCK_UPDATE);
308
309         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
310                                    &RQF_LDLM_INTENT_OPEN);
311         if (req == NULL) {
312                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
313                 RETURN(ERR_PTR(-ENOMEM));
314         }
315
316         /* parent capability */
317         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
318         /* child capability, reserve the size according to parent capa, it will
319          * be filled after we get the reply */
320         mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa1);
321
322         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
323                              op_data->op_namelen + 1);
324         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
325                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
326
327         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
328         if (rc) {
329                 ptlrpc_request_free(req);
330                 return NULL;
331         }
332
333         cfs_spin_lock(&req->rq_lock);
334         req->rq_replay = req->rq_import->imp_replayable;
335         cfs_spin_unlock(&req->rq_lock);
336
337         /* pack the intent */
338         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
339         lit->opc = (__u64)it->it_op;
340
341         /* pack the intended request */
342         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
343                       lmmsize);
344
345         /* for remote client, fetch remote perm for current user */
346         if (client_is_remote(exp))
347                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
348                                      sizeof(struct mdt_remote_perm));
349         ptlrpc_request_set_replen(req);
350         return req;
351 }
352
353 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
354                                                      struct lookup_intent *it,
355                                                      struct md_op_data *op_data)
356 {
357         struct ptlrpc_request *req;
358         struct obd_device     *obddev = class_exp2obd(exp);
359         struct ldlm_intent    *lit;
360         int                    rc;
361         ENTRY;
362
363         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
364                                    &RQF_LDLM_INTENT_UNLINK);
365         if (req == NULL)
366                 RETURN(ERR_PTR(-ENOMEM));
367
368         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
369         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
370                              op_data->op_namelen + 1);
371
372         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
373         if (rc) {
374                 ptlrpc_request_free(req);
375                 RETURN(ERR_PTR(rc));
376         }
377
378         /* pack the intent */
379         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
380         lit->opc = (__u64)it->it_op;
381
382         /* pack the intended request */
383         mdc_unlink_pack(req, op_data);
384
385         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
386                              obddev->u.cli.cl_max_mds_easize);
387         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
388                              obddev->u.cli.cl_max_mds_cookiesize);
389         ptlrpc_request_set_replen(req);
390         RETURN(req);
391 }
392
393 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
394                                                       struct lookup_intent *it,
395                                                       struct md_op_data *op_data)
396 {
397         struct ptlrpc_request *req;
398         struct obd_device     *obddev = class_exp2obd(exp);
399         obd_valid              valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
400                                        OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
401                                        OBD_MD_FLMDSCAPA | OBD_MD_MEA |
402                                        (client_is_remote(exp) ?
403                                                OBD_MD_FLRMTPERM : OBD_MD_FLACL);
404         struct ldlm_intent    *lit;
405         int                    rc;
406         ENTRY;
407
408         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
409                                    &RQF_LDLM_INTENT_GETATTR);
410         if (req == NULL)
411                 RETURN(ERR_PTR(-ENOMEM));
412
413         mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
414         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
415                              op_data->op_namelen + 1);
416
417         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
418         if (rc) {
419                 ptlrpc_request_free(req);
420                 RETURN(ERR_PTR(rc));
421         }
422
423         /* pack the intent */
424         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
425         lit->opc = (__u64)it->it_op;
426
427         /* pack the intended request */
428         mdc_getattr_pack(req, valid, it->it_flags, op_data,
429                          obddev->u.cli.cl_max_mds_easize);
430
431         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
432                              obddev->u.cli.cl_max_mds_easize);
433         if (client_is_remote(exp))
434                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
435                                      sizeof(struct mdt_remote_perm));
436         ptlrpc_request_set_replen(req);
437         RETURN(req);
438 }
439
440 static struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp)
441 {
442         struct ptlrpc_request *req;
443         int rc;
444         ENTRY;
445
446         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
447         if (req == NULL)
448                 RETURN(ERR_PTR(-ENOMEM));
449
450         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
451         if (rc) {
452                 ptlrpc_request_free(req);
453                 RETURN(ERR_PTR(rc));
454         }
455
456         ptlrpc_request_set_replen(req);
457         RETURN(req);
458 }
459
460 static int mdc_finish_enqueue(struct obd_export *exp,
461                               struct ptlrpc_request *req,
462                               struct ldlm_enqueue_info *einfo,
463                               struct lookup_intent *it,
464                               struct lustre_handle *lockh,
465                               int rc)
466 {
467         struct req_capsule  *pill = &req->rq_pill;
468         struct ldlm_request *lockreq;
469         struct ldlm_reply   *lockrep;
470         ENTRY;
471
472         LASSERT(rc >= 0);
473         /* Similarly, if we're going to replay this request, we don't want to
474          * actually get a lock, just perform the intent. */
475         if (req->rq_transno || req->rq_replay) {
476                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
477                 lockreq->lock_flags |= LDLM_FL_INTENT_ONLY;
478         }
479
480         if (rc == ELDLM_LOCK_ABORTED) {
481                 einfo->ei_mode = 0;
482                 memset(lockh, 0, sizeof(*lockh));
483                 rc = 0;
484         } else { /* rc = 0 */
485                 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
486                 LASSERT(lock);
487
488                 /* If the server gave us back a different lock mode, we should
489                  * fix up our variables. */
490                 if (lock->l_req_mode != einfo->ei_mode) {
491                         ldlm_lock_addref(lockh, lock->l_req_mode);
492                         ldlm_lock_decref(lockh, einfo->ei_mode);
493                         einfo->ei_mode = lock->l_req_mode;
494                 }
495                 LDLM_LOCK_PUT(lock);
496         }
497
498         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
499         LASSERT(lockrep != NULL);                 /* checked by ldlm_cli_enqueue() */
500
501         it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
502         it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
503         it->d.lustre.it_lock_mode = einfo->ei_mode;
504         it->d.lustre.it_lock_handle = lockh->cookie;
505         it->d.lustre.it_data = req;
506
507         if (it->d.lustre.it_status < 0 && req->rq_replay)
508                 mdc_clear_replay_flag(req, it->d.lustre.it_status);
509
510         /* If we're doing an IT_OPEN which did not result in an actual
511          * successful open, then we need to remove the bit which saves
512          * this request for unconditional replay.
513          *
514          * It's important that we do this first!  Otherwise we might exit the
515          * function without doing so, and try to replay a failed create
516          * (bug 3440) */
517         if (it->it_op & IT_OPEN && req->rq_replay &&
518             (!it_disposition(it, DISP_OPEN_OPEN) ||it->d.lustre.it_status != 0))
519                 mdc_clear_replay_flag(req, it->d.lustre.it_status);
520
521         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
522                   it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status);
523
524         /* We know what to expect, so we do any byte flipping required here */
525         if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
526                 struct mdt_body *body;
527
528                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
529                 if (body == NULL) {
530                         CERROR ("Can't swab mdt_body\n");
531                         RETURN (-EPROTO);
532                 }
533
534                 if (it_disposition(it, DISP_OPEN_OPEN) &&
535                     !it_open_error(DISP_OPEN_OPEN, it)) {
536                         /*
537                          * If this is a successful OPEN request, we need to set
538                          * replay handler and data early, so that if replay
539                          * happens immediately after swabbing below, new reply
540                          * is swabbed by that handler correctly.
541                          */
542                         mdc_set_open_replay_data(NULL, NULL, req);
543                 }
544
545                 if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
546                         void *eadata;
547
548                          mdc_update_max_ea_from_body(exp, body);
549
550                         /*
551                          * The eadata is opaque; just check that it is there.
552                          * Eventually, obd_unpackmd() will check the contents.
553                          */
554                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
555                                                               body->eadatasize);
556                         if (eadata == NULL)
557                                 RETURN(-EPROTO);
558
559                         /*
560                          * We save the reply LOV EA in case we have to replay a
561                          * create for recovery.  If we didn't allocate a large
562                          * enough request buffer above we need to reallocate it
563                          * here to hold the actual LOV EA.
564                          *
565                          * To not save LOV EA if request is not going to replay
566                          * (for example error one).
567                          */
568                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
569                                 void *lmm;
570                                 if (req_capsule_get_size(pill, &RMF_EADATA,
571                                                          RCL_CLIENT) <
572                                     body->eadatasize)
573                                         mdc_realloc_openmsg(req, body);
574                                 else
575                                         req_capsule_shrink(pill, &RMF_EADATA,
576                                                            body->eadatasize,
577                                                            RCL_CLIENT);
578
579                                 req_capsule_set_size(pill, &RMF_EADATA,
580                                                      RCL_CLIENT,
581                                                      body->eadatasize);
582
583                                 lmm = req_capsule_client_get(pill, &RMF_EADATA);
584                                 if (lmm)
585                                         memcpy(lmm, eadata, body->eadatasize);
586                         }
587                 }
588
589                 if (body->valid & OBD_MD_FLRMTPERM) {
590                         struct mdt_remote_perm *perm;
591
592                         LASSERT(client_is_remote(exp));
593                         perm = req_capsule_server_swab_get(pill, &RMF_ACL,
594                                                 lustre_swab_mdt_remote_perm);
595                         if (perm == NULL)
596                                 RETURN(-EPROTO);
597                 }
598                 if (body->valid & OBD_MD_FLMDSCAPA) {
599                         struct lustre_capa *capa, *p;
600
601                         capa = req_capsule_server_get(pill, &RMF_CAPA1);
602                         if (capa == NULL)
603                                 RETURN(-EPROTO);
604
605                         if (it->it_op & IT_OPEN) {
606                                 /* client fid capa will be checked in replay */
607                                 p = req_capsule_client_get(pill, &RMF_CAPA2);
608                                 LASSERT(p);
609                                 *p = *capa;
610                         }
611                 }
612                 if (body->valid & OBD_MD_FLOSSCAPA) {
613                         struct lustre_capa *capa;
614
615                         capa = req_capsule_server_get(pill, &RMF_CAPA2);
616                         if (capa == NULL)
617                                 RETURN(-EPROTO);
618                 }
619         }
620
621         RETURN(rc);
622 }
623
624 /* We always reserve enough space in the reply packet for a stripe MD, because
625  * we don't know in advance the file type. */
626 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
627                 struct lookup_intent *it, struct md_op_data *op_data,
628                 struct lustre_handle *lockh, void *lmm, int lmmsize,
629                 struct ptlrpc_request **reqp, int extra_lock_flags)
630 {
631         struct obd_device     *obddev = class_exp2obd(exp);
632         struct ptlrpc_request *req = NULL;
633         int                    flags, saved_flags = extra_lock_flags;
634         int                    rc;
635         struct ldlm_res_id res_id;
636         static const ldlm_policy_data_t lookup_policy =
637                             { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
638         static const ldlm_policy_data_t update_policy =
639                             { .l_inodebits = { MDS_INODELOCK_UPDATE } };
640         ldlm_policy_data_t const *policy = &lookup_policy;
641         int                    generation, resends = 0;
642         struct ldlm_reply     *lockrep;
643         ENTRY;
644
645         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
646                  einfo->ei_type);
647
648         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
649
650         if (it)
651                 saved_flags |= LDLM_FL_HAS_INTENT;
652         if (it && it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
653                 policy = &update_policy;
654
655         LASSERT(reqp == NULL);
656
657         generation = obddev->u.cli.cl_import->imp_generation;
658 resend:
659         flags = saved_flags;
660         if (!it) {
661                 /* The only way right now is FLOCK, in this case we hide flock
662                    policy as lmm, but lmmsize is 0 */
663                 LASSERT(lmm && lmmsize == 0);
664                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
665                          einfo->ei_type);
666                 policy = (ldlm_policy_data_t *)lmm;
667                 res_id.name[3] = LDLM_FLOCK;
668         } else if (it->it_op & IT_OPEN) {
669                 req = mdc_intent_open_pack(exp, it, op_data, lmm, lmmsize,
670                                            einfo->ei_cbdata);
671                 policy = &update_policy;
672                 einfo->ei_cbdata = NULL;
673                 lmm = NULL;
674         } else if (it->it_op & IT_UNLINK)
675                 req = mdc_intent_unlink_pack(exp, it, op_data);
676         else if (it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT))
677                 req = mdc_intent_getattr_pack(exp, it, op_data);
678         else if (it->it_op == IT_READDIR)
679                 req = ldlm_enqueue_pack(exp);
680         else {
681                 LBUG();
682                 RETURN(-EINVAL);
683         }
684
685         if (IS_ERR(req))
686                 RETURN(PTR_ERR(req));
687
688         if (resends) {
689                 req->rq_generation_set = 1;
690                 req->rq_import_generation = generation;
691                 req->rq_sent = cfs_time_current_sec() + resends;
692         }
693
694         /* It is important to obtain rpc_lock first (if applicable), so that
695          * threads that are serialised with rpc_lock are not polluting our
696          * rpcs in flight counter. We do not do flock request limiting, though*/
697         if (it) {
698                 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
699                 rc = mdc_enter_request(&obddev->u.cli);
700                 if (rc != 0) {
701                         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
702                         mdc_clear_replay_flag(req, 0);
703                         ptlrpc_req_finished(req);
704                         RETURN(rc);
705                 }
706         }
707
708         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
709                               0, lockh, 0);
710         if (!it) {
711                 /* For flock requests we immediatelly return without further
712                    delay and let caller deal with the rest, since rest of
713                    this function metadata processing makes no sense for flock
714                    requests anyway */
715                 RETURN(rc);
716         }
717
718         mdc_exit_request(&obddev->u.cli);
719         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
720
721         if (rc < 0) {
722                 CERROR("ldlm_cli_enqueue: %d\n", rc);
723                 mdc_clear_replay_flag(req, rc);
724                 ptlrpc_req_finished(req);
725                 RETURN(rc);
726         }
727
728         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
729         LASSERT(lockrep != NULL);
730
731         /* Retry the create infinitely when we get -EINPROGRESS from
732          * server. This is required by the new quota design. */
733         if (it && it->it_op & IT_CREAT &&
734             (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
735                 mdc_clear_replay_flag(req, rc);
736                 ptlrpc_req_finished(req);
737                 resends++;
738
739                 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
740                        obddev->obd_name, resends, it->it_op,
741                        PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
742
743                 if (generation == obddev->u.cli.cl_import->imp_generation) {
744                         goto resend;
745                 } else {
746                         CDEBUG(D_HA, "resned cross eviction\n");
747                         RETURN(-EIO);
748                 }
749         }
750
751         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
752
753         RETURN(rc);
754 }
755
756 static int mdc_finish_intent_lock(struct obd_export *exp,
757                                   struct ptlrpc_request *request,
758                                   struct md_op_data *op_data,
759                                   struct lookup_intent *it,
760                                   struct lustre_handle *lockh)
761 {
762         struct lustre_handle old_lock;
763         struct mdt_body *mdt_body;
764         struct ldlm_lock *lock;
765         int rc;
766
767
768         LASSERT(request != NULL);
769         LASSERT(request != LP_POISON);
770         LASSERT(request->rq_repmsg != LP_POISON);
771
772         if (!it_disposition(it, DISP_IT_EXECD)) {
773                 /* The server failed before it even started executing the
774                  * intent, i.e. because it couldn't unpack the request. */
775                 LASSERT(it->d.lustre.it_status != 0);
776                 RETURN(it->d.lustre.it_status);
777         }
778         rc = it_open_error(DISP_IT_EXECD, it);
779         if (rc)
780                 RETURN(rc);
781
782         mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
783         LASSERT(mdt_body != NULL);      /* mdc_enqueue checked */
784
785         /* If we were revalidating a fid/name pair, mark the intent in
786          * case we fail and get called again from lookup */
787         if (fid_is_sane(&op_data->op_fid2) &&
788             it->it_create_mode & M_CHECK_STALE &&
789             it->it_op != IT_GETATTR) {
790                 it_set_disposition(it, DISP_ENQ_COMPLETE);
791
792                 /* Also: did we find the same inode? */
793                 /* sever can return one of two fids:
794                  * op_fid2 - new allocated fid - if file is created.
795                  * op_fid3 - existent fid - if file only open.
796                  * op_fid3 is saved in lmv_intent_open */
797                 if ((!lu_fid_eq(&op_data->op_fid2, &mdt_body->fid1)) &&
798                     (!lu_fid_eq(&op_data->op_fid3, &mdt_body->fid1))) {
799                         CDEBUG(D_DENTRY, "Found stale data "DFID"("DFID")/"DFID
800                                "\n", PFID(&op_data->op_fid2),
801                                PFID(&op_data->op_fid2), PFID(&mdt_body->fid1));
802                         RETURN(-ESTALE);
803                 }
804         }
805
806         rc = it_open_error(DISP_LOOKUP_EXECD, it);
807         if (rc)
808                 RETURN(rc);
809
810         /* keep requests around for the multiple phases of the call
811          * this shows the DISP_XX must guarantee we make it into the call
812          */
813         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
814             it_disposition(it, DISP_OPEN_CREATE) &&
815             !it_open_error(DISP_OPEN_CREATE, it)) {
816                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
817                 ptlrpc_request_addref(request); /* balanced in ll_create_node */
818         }
819         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
820             it_disposition(it, DISP_OPEN_OPEN) &&
821             !it_open_error(DISP_OPEN_OPEN, it)) {
822                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
823                 ptlrpc_request_addref(request); /* balanced in ll_file_open */
824                 /* BUG 11546 - eviction in the middle of open rpc processing */
825                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
826         }
827
828         if (it->it_op & IT_CREAT) {
829                 /* XXX this belongs in ll_create_it */
830         } else if (it->it_op == IT_OPEN) {
831                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
832         } else {
833                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
834         }
835
836         /* If we already have a matching lock, then cancel the new
837          * one.  We have to set the data here instead of in
838          * mdc_enqueue, because we need to use the child's inode as
839          * the l_ast_data to match, and that's not available until
840          * intent_finish has performed the iget().) */
841         lock = ldlm_handle2lock(lockh);
842         if (lock) {
843                 ldlm_policy_data_t policy = lock->l_policy_data;
844                 LDLM_DEBUG(lock, "matching against this");
845
846                 LASSERTF(fid_res_name_eq(&mdt_body->fid1,
847                                          &lock->l_resource->lr_name),
848                          "Lock res_id: %lu/%lu/%lu, fid: %lu/%lu/%lu.\n",
849                          (unsigned long)lock->l_resource->lr_name.name[0],
850                          (unsigned long)lock->l_resource->lr_name.name[1],
851                          (unsigned long)lock->l_resource->lr_name.name[2],
852                          (unsigned long)fid_seq(&mdt_body->fid1),
853                          (unsigned long)fid_oid(&mdt_body->fid1),
854                          (unsigned long)fid_ver(&mdt_body->fid1));
855                 LDLM_LOCK_PUT(lock);
856
857                 memcpy(&old_lock, lockh, sizeof(*lockh));
858                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
859                                     LDLM_IBITS, &policy, LCK_NL, &old_lock, 0)) {
860                         ldlm_lock_decref_and_cancel(lockh,
861                                                     it->d.lustre.it_lock_mode);
862                         memcpy(lockh, &old_lock, sizeof(old_lock));
863                         it->d.lustre.it_lock_handle = lockh->cookie;
864                 }
865         }
866         CDEBUG(D_DENTRY,"D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
867                op_data->op_namelen, op_data->op_name, ldlm_it2str(it->it_op),
868                it->d.lustre.it_status, it->d.lustre.it_disposition, rc);
869         RETURN(rc);
870 }
871
872 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
873                         struct lu_fid *fid, __u64 *bits)
874 {
875         /* We could just return 1 immediately, but since we should only
876          * be called in revalidate_it if we already have a lock, let's
877          * verify that. */
878         struct ldlm_res_id res_id;
879         struct lustre_handle lockh;
880         ldlm_policy_data_t policy;
881         ldlm_mode_t mode;
882         ENTRY;
883
884         if (it->d.lustre.it_lock_handle) {
885                 lockh.cookie = it->d.lustre.it_lock_handle;
886                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
887         } else {
888                 fid_build_reg_res_name(fid, &res_id);
889                 switch (it->it_op) {
890                 case IT_GETATTR:
891                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
892                         break;
893                 case IT_LAYOUT:
894                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
895                         break;
896                 default:
897                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
898                         break;
899                 }
900                 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
901                                        LDLM_FL_BLOCK_GRANTED, &res_id,
902                                        LDLM_IBITS, &policy,
903                                        LCK_CR|LCK_CW|LCK_PR|LCK_PW, &lockh, 0);
904         }
905
906         if (mode) {
907                 it->d.lustre.it_lock_handle = lockh.cookie;
908                 it->d.lustre.it_lock_mode = mode;
909         } else {
910                 it->d.lustre.it_lock_handle = 0;
911                 it->d.lustre.it_lock_mode = 0;
912         }
913
914         RETURN(!!mode);
915 }
916
917 /*
918  * This long block is all about fixing up the lock and request state
919  * so that it is correct as of the moment _before_ the operation was
920  * applied; that way, the VFS will think that everything is normal and
921  * call Lustre's regular VFS methods.
922  *
923  * If we're performing a creation, that means that unless the creation
924  * failed with EEXIST, we should fake up a negative dentry.
925  *
926  * For everything else, we want to lookup to succeed.
927  *
928  * One additional note: if CREATE or OPEN succeeded, we add an extra
929  * reference to the request because we need to keep it around until
930  * ll_create/ll_open gets called.
931  *
932  * The server will return to us, in it_disposition, an indication of
933  * exactly what d.lustre.it_status refers to.
934  *
935  * If DISP_OPEN_OPEN is set, then d.lustre.it_status refers to the open() call,
936  * otherwise if DISP_OPEN_CREATE is set, then it status is the
937  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
938  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
939  * was successful.
940  *
941  * Else, if DISP_LOOKUP_EXECD then d.lustre.it_status is the rc of the
942  * child lookup.
943  */
944 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
945                     void *lmm, int lmmsize, struct lookup_intent *it,
946                     int lookup_flags, struct ptlrpc_request **reqp,
947                     ldlm_blocking_callback cb_blocking,
948                     int extra_lock_flags)
949 {
950         struct lustre_handle lockh;
951         int rc = 0;
952         ENTRY;
953         LASSERT(it);
954
955         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
956                ", intent: %s flags %#o\n", op_data->op_namelen,
957                op_data->op_name, PFID(&op_data->op_fid2),
958                PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
959                it->it_flags);
960
961         lockh.cookie = 0;
962         if (fid_is_sane(&op_data->op_fid2) &&
963             (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_LAYOUT))) {
964                 /* We could just return 1 immediately, but since we should only
965                  * be called in revalidate_it if we already have a lock, let's
966                  * verify that. */
967                 it->d.lustre.it_lock_handle = 0;
968                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
969                 /* Only return failure if it was not GETATTR by cfid
970                    (from inode_revalidate) */
971                 if (rc || op_data->op_namelen != 0)
972                         RETURN(rc);
973         }
974
975         /* lookup_it may be called only after revalidate_it has run, because
976          * revalidate_it cannot return errors, only zero.  Returning zero causes
977          * this call to lookup, which *can* return an error.
978          *
979          * We only want to execute the request associated with the intent one
980          * time, however, so don't send the request again.  Instead, skip past
981          * this and use the request from revalidate.  In this case, revalidate
982          * never dropped its reference, so the refcounts are all OK */
983         if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
984                 struct ldlm_enqueue_info einfo =
985                         { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
986                           ldlm_completion_ast, NULL, NULL, NULL };
987
988                 /* For case if upper layer did not alloc fid, do it now. */
989                 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
990                         rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
991                         if (rc < 0) {
992                                 CERROR("Can't alloc new fid, rc %d\n", rc);
993                                 RETURN(rc);
994                         }
995                 }
996                 rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
997                                  lmm, lmmsize, NULL, extra_lock_flags);
998                 if (rc < 0)
999                         RETURN(rc);
1000         } else if (!fid_is_sane(&op_data->op_fid2) ||
1001                    !(it->it_create_mode & M_CHECK_STALE)) {
1002                 /* DISP_ENQ_COMPLETE set means there is extra reference on
1003                  * request referenced from this intent, saved for subsequent
1004                  * lookup.  This path is executed when we proceed to this
1005                  * lookup, so we clear DISP_ENQ_COMPLETE */
1006                 it_clear_disposition(it, DISP_ENQ_COMPLETE);
1007         }
1008         *reqp = it->d.lustre.it_data;
1009         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1010         RETURN(rc);
1011 }
1012
1013 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1014                                               struct ptlrpc_request *req,
1015                                               void *args, int rc)
1016 {
1017         struct mdc_getattr_args  *ga = args;
1018         struct obd_export        *exp = ga->ga_exp;
1019         struct md_enqueue_info   *minfo = ga->ga_minfo;
1020         struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1021         struct lookup_intent     *it;
1022         struct lustre_handle     *lockh;
1023         struct obd_device        *obddev;
1024         int                       flags = LDLM_FL_HAS_INTENT;
1025         ENTRY;
1026
1027         it    = &minfo->mi_it;
1028         lockh = &minfo->mi_lockh;
1029
1030         obddev = class_exp2obd(exp);
1031
1032         mdc_exit_request(&obddev->u.cli);
1033         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1034                 rc = -ETIMEDOUT;
1035
1036         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1037                                    &flags, NULL, 0, lockh, rc);
1038         if (rc < 0) {
1039                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1040                 mdc_clear_replay_flag(req, rc);
1041                 GOTO(out, rc);
1042         }
1043
1044         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1045         if (rc)
1046                 GOTO(out, rc);
1047
1048         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1049         EXIT;
1050
1051 out:
1052         OBD_FREE_PTR(einfo);
1053         minfo->mi_cb(req, minfo, rc);
1054         return 0;
1055 }
1056
1057 int mdc_intent_getattr_async(struct obd_export *exp,
1058                              struct md_enqueue_info *minfo,
1059                              struct ldlm_enqueue_info *einfo)
1060 {
1061         struct md_op_data       *op_data = &minfo->mi_data;
1062         struct lookup_intent    *it = &minfo->mi_it;
1063         struct ptlrpc_request   *req;
1064         struct mdc_getattr_args *ga;
1065         struct obd_device       *obddev = class_exp2obd(exp);
1066         struct ldlm_res_id       res_id;
1067         /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1068          *     for statahead currently. Consider CMD in future, such two bits
1069          *     maybe managed by different MDS, should be adjusted then. */
1070         ldlm_policy_data_t       policy = {
1071                                         .l_inodebits = { MDS_INODELOCK_LOOKUP | 
1072                                                          MDS_INODELOCK_UPDATE }
1073                                  };
1074         int                      rc = 0;
1075         int                      flags = LDLM_FL_HAS_INTENT;
1076         ENTRY;
1077
1078         CDEBUG(D_DLMTRACE,"name: %.*s in inode "DFID", intent: %s flags %#o\n",
1079                op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1080                ldlm_it2str(it->it_op), it->it_flags);
1081
1082         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1083         req = mdc_intent_getattr_pack(exp, it, op_data);
1084         if (!req)
1085                 RETURN(-ENOMEM);
1086
1087         rc = mdc_enter_request(&obddev->u.cli);
1088         if (rc != 0) {
1089                 ptlrpc_req_finished(req);
1090                 RETURN(rc);
1091         }
1092
1093         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1094                               0, &minfo->mi_lockh, 1);
1095         if (rc < 0) {
1096                 mdc_exit_request(&obddev->u.cli);
1097                 ptlrpc_req_finished(req);
1098                 RETURN(rc);
1099         }
1100
1101         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1102         ga = ptlrpc_req_async_args(req);
1103         ga->ga_exp = exp;
1104         ga->ga_minfo = minfo;
1105         ga->ga_einfo = einfo;
1106
1107         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1108         ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
1109
1110         RETURN(0);
1111 }