1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/cmm/mdc_object.c
38 * Lustre Cluster Metadata Manager (cmm)
40 * Author: Mike Pershin <tappro@clusterfs.com>
44 # define EXPORT_SYMTAB
47 #define DEBUG_SUBSYSTEM S_MDS
48 #include <obd_support.h>
49 #include <lustre_lib.h>
50 #include <obd_class.h>
51 #include <lustre_mdc.h>
52 #include "cmm_internal.h"
53 #include "mdc_internal.h"
55 static const struct md_object_operations mdc_mo_ops;
56 static const struct md_dir_operations mdc_dir_ops;
57 static const struct lu_object_operations mdc_obj_ops;
59 extern struct lu_context_key mdc_thread_key;
65 * Allocate new mdc object.
67 struct lu_object *mdc_object_alloc(const struct lu_env *env,
68 const struct lu_object_header *hdr,
71 struct mdc_object *mco;
78 lo = &mco->mco_obj.mo_lu;
79 lu_object_init(lo, NULL, ld);
80 mco->mco_obj.mo_ops = &mdc_mo_ops;
81 mco->mco_obj.mo_dir_ops = &mdc_dir_ops;
82 lo->lo_ops = &mdc_obj_ops;
88 /** Free current mdc object */
89 static void mdc_object_free(const struct lu_env *env, struct lu_object *lo)
91 struct mdc_object *mco = lu2mdc_obj(lo);
97 * Initialize mdc object. All of them have loh_attr::LOHA_REMOTE set.
99 static int mdc_object_init(const struct lu_env *env, struct lu_object *lo,
100 const struct lu_object_conf *unused)
103 lo->lo_header->loh_attr |= LOHA_REMOTE;
108 * Instance of lu_object_operations for mdc.
110 static const struct lu_object_operations mdc_obj_ops = {
111 .loo_object_init = mdc_object_init,
112 .loo_object_free = mdc_object_free,
116 * \name The set of md_object_operations.
120 * Get mdc_thread_info from lu_context
123 struct mdc_thread_info *mdc_info_get(const struct lu_env *env)
125 struct mdc_thread_info *mci;
127 mci = lu_context_key_get(&env->le_ctx, &mdc_thread_key);
133 * Initialize mdc_thread_info.
136 struct mdc_thread_info *mdc_info_init(const struct lu_env *env)
138 struct mdc_thread_info *mci = mdc_info_get(env);
139 memset(mci, 0, sizeof(*mci));
144 * Convert attributes from mdt_body to the md_attr.
146 static void mdc_body2attr(struct mdt_body *body, struct md_attr *ma)
148 struct lu_attr *la = &ma->ma_attr;
150 if (body->valid & OBD_MD_FLCTIME && body->ctime >= la->la_ctime) {
151 la->la_ctime = body->ctime;
152 if (body->valid & OBD_MD_FLMTIME)
153 la->la_mtime = body->mtime;
156 if (body->valid & OBD_MD_FLMODE)
157 la->la_mode = body->mode;
158 if (body->valid & OBD_MD_FLSIZE)
159 la->la_size = body->size;
160 if (body->valid & OBD_MD_FLBLOCKS)
161 la->la_blocks = body->blocks;
162 if (body->valid & OBD_MD_FLUID)
163 la->la_uid = body->uid;
164 if (body->valid & OBD_MD_FLGID)
165 la->la_gid = body->gid;
166 if (body->valid & OBD_MD_FLFLAGS)
167 la->la_flags = body->flags;
168 if (body->valid & OBD_MD_FLNLINK)
169 la->la_nlink = body->nlink;
170 if (body->valid & OBD_MD_FLRDEV)
171 la->la_rdev = body->rdev;
173 la->la_valid = body->valid;
174 ma->ma_valid = MA_INODE;
178 * Fill the md_attr \a ma with attributes from request.
180 static int mdc_req2attr_update(const struct lu_env *env,
183 struct mdc_thread_info *mci;
184 struct ptlrpc_request *req;
185 struct mdt_body *body;
186 struct lov_mds_md *md;
187 struct llog_cookie *cookie;
191 mci = mdc_info_get(env);
194 body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
196 mdc_body2attr(body, ma);
198 if (body->valid & OBD_MD_FLMDSCAPA) {
199 struct lustre_capa *capa;
201 /* create for cross-ref will fetch mds capa from remote obj */
202 capa = req_capsule_server_get(&req->rq_pill, &RMF_CAPA1);
203 LASSERT(capa != NULL);
204 LASSERT(ma->ma_capa != NULL);
205 *ma->ma_capa = *capa;
208 if ((body->valid & OBD_MD_FLEASIZE) || (body->valid & OBD_MD_FLDIREA)) {
209 if (body->eadatasize == 0) {
210 CERROR("No size defined for easize field\n");
214 md = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD,
219 LASSERT(ma->ma_lmm != NULL);
220 LASSERT(ma->ma_lmm_size >= body->eadatasize);
221 ma->ma_lmm_size = body->eadatasize;
222 memcpy(ma->ma_lmm, md, ma->ma_lmm_size);
223 ma->ma_valid |= MA_LOV;
226 if (body->valid & OBD_MD_FLCOOKIE) {
228 * ACL and cookie share the same body->aclsize, we need
229 * to make sure that they both never come here.
231 LASSERT(!(body->valid & OBD_MD_FLACL));
233 if (body->aclsize == 0) {
234 CERROR("No size defined for cookie field\n");
238 cookie = req_capsule_server_sized_get(&req->rq_pill,
244 LASSERT(ma->ma_cookie != NULL);
245 LASSERT(ma->ma_cookie_size == body->aclsize);
246 memcpy(ma->ma_cookie, cookie, ma->ma_cookie_size);
247 ma->ma_valid |= MA_COOKIE;
250 #ifdef CONFIG_FS_POSIX_ACL
251 if (body->valid & OBD_MD_FLACL) {
252 if (body->aclsize == 0) {
253 CERROR("No size defined for acl field\n");
257 acl = req_capsule_server_sized_get(&req->rq_pill,
263 LASSERT(ma->ma_acl != NULL);
264 LASSERT(ma->ma_acl_size == body->aclsize);
265 memcpy(ma->ma_acl, acl, ma->ma_acl_size);
266 ma->ma_valid |= MA_ACL_DEF;
274 * The md_object_operations::moo_attr_get() in mdc.
276 static int mdc_attr_get(const struct lu_env *env, struct md_object *mo,
279 struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
280 struct mdc_thread_info *mci;
284 mci = lu_context_key_get(&env->le_ctx, &mdc_thread_key);
287 memset(&mci->mci_opdata, 0, sizeof(mci->mci_opdata));
289 memcpy(&mci->mci_opdata.op_fid1, lu_object_fid(&mo->mo_lu),
290 sizeof (struct lu_fid));
291 mci->mci_opdata.op_valid = OBD_MD_FLMODE | OBD_MD_FLUID |
292 OBD_MD_FLGID | OBD_MD_FLFLAGS |
295 rc = md_getattr(mc->mc_desc.cl_exp, &mci->mci_opdata, &mci->mci_req);
297 /* get attr from request */
298 rc = mdc_req2attr_update(env, ma);
301 ptlrpc_req_finished(mci->mci_req);
307 * Helper to init timspec \a t.
309 static inline struct timespec *mdc_attr_time(struct timespec *t, obd_time seconds)
317 * The md_object_operations::moo_attr_set() in mdc.
319 * \note It is only used for set ctime when rename's source on remote MDS.
321 static int mdc_attr_set(const struct lu_env *env, struct md_object *mo,
322 const struct md_attr *ma)
324 struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
325 const struct lu_attr *la = &ma->ma_attr;
326 struct mdc_thread_info *mci;
327 struct md_ucred *uc = md_ucred(env);
331 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
333 mci = lu_context_key_get(&env->le_ctx, &mdc_thread_key);
336 memset(&mci->mci_opdata, 0, sizeof(mci->mci_opdata));
338 mci->mci_opdata.op_fid1 = *lu_object_fid(&mo->mo_lu);
339 mdc_attr_time(&mci->mci_opdata.op_attr.ia_ctime, la->la_ctime);
340 mci->mci_opdata.op_attr.ia_mode = la->la_mode;
341 mci->mci_opdata.op_attr.ia_valid = ATTR_CTIME_SET;
343 ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) {
344 mci->mci_opdata.op_fsuid = uc->mu_fsuid;
345 mci->mci_opdata.op_fsgid = uc->mu_fsgid;
346 mci->mci_opdata.op_cap = uc->mu_cap;
347 if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD)) {
348 mci->mci_opdata.op_suppgids[0] = uc->mu_suppgids[0];
349 mci->mci_opdata.op_suppgids[1] = uc->mu_suppgids[1];
351 mci->mci_opdata.op_suppgids[0] =
352 mci->mci_opdata.op_suppgids[1] = -1;
355 mci->mci_opdata.op_fsuid = la->la_uid;
356 mci->mci_opdata.op_fsgid = la->la_gid;
357 mci->mci_opdata.op_cap = cfs_curproc_cap_pack();
358 mci->mci_opdata.op_suppgids[0] =
359 mci->mci_opdata.op_suppgids[1] = -1;
362 rc = md_setattr(mc->mc_desc.cl_exp, &mci->mci_opdata,
363 NULL, 0, NULL, 0, &mci->mci_req, NULL);
365 ptlrpc_req_finished(mci->mci_req);
371 * The md_object_operations::moo_object_create() in mdc.
373 static int mdc_object_create(const struct lu_env *env,
374 struct md_object *mo,
375 const struct md_op_spec *spec,
378 struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
379 struct lu_attr *la = &ma->ma_attr;
380 struct mdc_thread_info *mci;
382 struct md_ucred *uc = md_ucred(env);
389 LASSERT(S_ISDIR(la->la_mode));
390 LASSERT(spec->u.sp_pfid != NULL);
392 mci = mdc_info_init(env);
393 mci->mci_opdata.op_bias = MDS_CROSS_REF;
394 mci->mci_opdata.op_fid2 = *lu_object_fid(&mo->mo_lu);
396 /* Parent fid is needed to create dotdot on the remote node. */
397 mci->mci_opdata.op_fid1 = *(spec->u.sp_pfid);
398 mci->mci_opdata.op_mod_time = la->la_ctime;
400 ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) {
402 if (la->la_mode & S_ISGID)
407 if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD))
408 mci->mci_opdata.op_suppgids[0] = uc->mu_suppgids[0];
410 mci->mci_opdata.op_suppgids[0] = -1;
415 mci->mci_opdata.op_suppgids[0] = -1;
418 /* get data from spec */
419 if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
420 symname = spec->u.sp_ea.eadata;
421 symlen = spec->u.sp_ea.eadatalen;
422 mci->mci_opdata.op_fid1 = *(spec->u.sp_ea.fid);
423 mci->mci_opdata.op_flags |= MDS_CREATE_SLAVE_OBJ;
424 #ifdef CONFIG_FS_POSIX_ACL
425 } else if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) {
426 symname = spec->u.sp_ea.eadata;
427 symlen = spec->u.sp_ea.eadatalen;
428 mci->mci_opdata.op_fid1 = *(spec->u.sp_ea.fid);
429 mci->mci_opdata.op_flags |= MDS_CREATE_RMT_ACL;
432 symname = spec->u.sp_symname;
433 symlen = symname ? strlen(symname) + 1 : 0;
436 rc = md_create(mc->mc_desc.cl_exp, &mci->mci_opdata,
437 symname, symlen, la->la_mode, uid, gid,
438 cap, la->la_rdev, &mci->mci_req);
441 /* get attr from request */
442 rc = mdc_req2attr_update(env, ma);
445 ptlrpc_req_finished(mci->mci_req);
451 * The md_object_operations::moo_ref_add() in mdc.
453 static int mdc_ref_add(const struct lu_env *env, struct md_object *mo,
454 const struct md_attr *ma)
456 struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
457 const struct lu_attr *la = &ma->ma_attr;
458 struct mdc_thread_info *mci;
459 struct md_ucred *uc = md_ucred(env);
463 mci = lu_context_key_get(&env->le_ctx, &mdc_thread_key);
466 memset(&mci->mci_opdata, 0, sizeof(mci->mci_opdata));
467 mci->mci_opdata.op_bias = MDS_CROSS_REF;
468 mci->mci_opdata.op_fid1 = *lu_object_fid(&mo->mo_lu);
469 mci->mci_opdata.op_mod_time = la->la_ctime;
471 ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) {
472 mci->mci_opdata.op_fsuid = uc->mu_fsuid;
473 mci->mci_opdata.op_fsgid = uc->mu_fsgid;
474 mci->mci_opdata.op_cap = uc->mu_cap;
475 if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD)) {
476 mci->mci_opdata.op_suppgids[0] = uc->mu_suppgids[0];
477 mci->mci_opdata.op_suppgids[1] = uc->mu_suppgids[1];
479 mci->mci_opdata.op_suppgids[0] =
480 mci->mci_opdata.op_suppgids[1] = -1;
483 mci->mci_opdata.op_fsuid = la->la_uid;
484 mci->mci_opdata.op_fsgid = la->la_gid;
485 mci->mci_opdata.op_cap = cfs_curproc_cap_pack();
486 mci->mci_opdata.op_suppgids[0] =
487 mci->mci_opdata.op_suppgids[1] = -1;
491 rc = md_link(mc->mc_desc.cl_exp, &mci->mci_opdata, &mci->mci_req);
493 ptlrpc_req_finished(mci->mci_req);
499 * The md_object_operations::moo_ref_del() in mdc.
501 static int mdc_ref_del(const struct lu_env *env, struct md_object *mo,
504 struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
505 struct lu_attr *la = &ma->ma_attr;
506 struct mdc_thread_info *mci;
507 struct md_ucred *uc = md_ucred(env);
511 mci = mdc_info_init(env);
512 mci->mci_opdata.op_bias = MDS_CROSS_REF;
513 if (ma->ma_attr_flags & MDS_VTX_BYPASS)
514 mci->mci_opdata.op_bias |= MDS_VTX_BYPASS;
516 mci->mci_opdata.op_bias &= ~MDS_VTX_BYPASS;
517 mci->mci_opdata.op_fid1 = *lu_object_fid(&mo->mo_lu);
518 mci->mci_opdata.op_mode = la->la_mode;
519 mci->mci_opdata.op_mod_time = la->la_ctime;
521 ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) {
522 mci->mci_opdata.op_fsuid = uc->mu_fsuid;
523 mci->mci_opdata.op_fsgid = uc->mu_fsgid;
524 mci->mci_opdata.op_cap = uc->mu_cap;
525 if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD))
526 mci->mci_opdata.op_suppgids[0] = uc->mu_suppgids[0];
528 mci->mci_opdata.op_suppgids[0] = -1;
530 mci->mci_opdata.op_fsuid = la->la_uid;
531 mci->mci_opdata.op_fsgid = la->la_gid;
532 mci->mci_opdata.op_cap = cfs_curproc_cap_pack();
533 mci->mci_opdata.op_suppgids[0] = -1;
536 rc = md_unlink(mc->mc_desc.cl_exp, &mci->mci_opdata, &mci->mci_req);
538 /* get attr from request */
539 rc = mdc_req2attr_update(env, ma);
542 ptlrpc_req_finished(mci->mci_req);
547 #ifdef HAVE_SPLIT_SUPPORT
548 /** Send page with directory entries to another MDS. */
549 int mdc_send_page(struct cmm_device *cm, const struct lu_env *env,
550 struct md_object *mo, struct page *page, __u32 offset)
552 struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
556 rc = mdc_sendpage(mc->mc_desc.cl_exp, lu_object_fid(&mo->mo_lu),
558 CDEBUG(rc ? D_ERROR : D_INFO, "send page %p offset %d fid "DFID
559 " rc %d \n", page, offset, PFID(lu_object_fid(&mo->mo_lu)), rc);
565 * Instance of md_object_operations for mdc.
567 static const struct md_object_operations mdc_mo_ops = {
568 .moo_attr_get = mdc_attr_get,
569 .moo_attr_set = mdc_attr_set,
570 .moo_object_create = mdc_object_create,
571 .moo_ref_add = mdc_ref_add,
572 .moo_ref_del = mdc_ref_del,
577 * \name The set of md_dir_operations.
581 * The md_dir_operations::mdo_rename_tgt in mdc.
583 static int mdc_rename_tgt(const struct lu_env *env, struct md_object *mo_p,
584 struct md_object *mo_t, const struct lu_fid *lf,
585 const struct lu_name *lname, struct md_attr *ma)
587 struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo_p));
588 struct lu_attr *la = &ma->ma_attr;
589 struct mdc_thread_info *mci;
590 struct md_ucred *uc = md_ucred(env);
594 mci = mdc_info_init(env);
595 mci->mci_opdata.op_bias = MDS_CROSS_REF;
596 if (ma->ma_attr_flags & MDS_VTX_BYPASS)
597 mci->mci_opdata.op_bias |= MDS_VTX_BYPASS;
599 mci->mci_opdata.op_bias &= ~MDS_VTX_BYPASS;
600 mci->mci_opdata.op_fid1 = *lu_object_fid(&mo_p->mo_lu);
601 mci->mci_opdata.op_fid2 = *lf;
602 mci->mci_opdata.op_mode = la->la_mode;
603 mci->mci_opdata.op_mod_time = la->la_ctime;
605 ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) {
606 mci->mci_opdata.op_fsuid = uc->mu_fsuid;
607 mci->mci_opdata.op_fsgid = uc->mu_fsgid;
608 mci->mci_opdata.op_cap = uc->mu_cap;
609 if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD)) {
610 mci->mci_opdata.op_suppgids[0] = uc->mu_suppgids[0];
611 mci->mci_opdata.op_suppgids[1] = uc->mu_suppgids[1];
613 mci->mci_opdata.op_suppgids[0] =
614 mci->mci_opdata.op_suppgids[1] = -1;
617 mci->mci_opdata.op_fsuid = la->la_uid;
618 mci->mci_opdata.op_fsgid = la->la_gid;
619 mci->mci_opdata.op_cap = cfs_curproc_cap_pack();
620 mci->mci_opdata.op_suppgids[0] =
621 mci->mci_opdata.op_suppgids[1] = -1;
624 rc = md_rename(mc->mc_desc.cl_exp, &mci->mci_opdata, NULL, 0,
625 lname->ln_name, lname->ln_namelen, &mci->mci_req);
627 /* get attr from request */
628 mdc_req2attr_update(env, ma);
631 ptlrpc_req_finished(mci->mci_req);
636 * Check the fids are not relatives.
637 * The md_dir_operations::mdo_is_subdir() in mdc.
639 * Return resulting fid in sfid.
640 * \retval \a sfid = 0 fids are not relatives
641 * \retval \a sfid = FID at which search stopped
643 static int mdc_is_subdir(const struct lu_env *env, struct md_object *mo,
644 const struct lu_fid *fid, struct lu_fid *sfid)
646 struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
647 struct mdc_thread_info *mci;
648 struct mdt_body *body;
652 mci = mdc_info_init(env);
654 rc = md_is_subdir(mc->mc_desc.cl_exp, lu_object_fid(&mo->mo_lu),
656 if (rc == 0 || rc == -EREMOTE) {
657 body = req_capsule_server_get(&mci->mci_req->rq_pill,
659 LASSERT(body->valid & OBD_MD_FLID);
661 CDEBUG(D_INFO, "Remote mdo_is_subdir(), new src "DFID"\n",
665 ptlrpc_req_finished(mci->mci_req);
669 /** Instance of md_dir_operations for mdc. */
670 static const struct md_dir_operations mdc_dir_ops = {
671 .mdo_is_subdir = mdc_is_subdir,
672 .mdo_rename_tgt = mdc_rename_tgt