+#define FILTER_VALID_FLAGS (OBD_MD_FLTYPE | OBD_MD_FLMODE | OBD_MD_FLGENER |\
+ OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ|\
+ OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME|\
+ OBD_MD_FLID)
+
+static void reconstruct_create(struct ptlrpc_request *req)
+{
+ struct mds_export_data *med = &req->rq_export->exp_mds_data;
+ struct mds_client_data *mcd = med->med_mcd;
+ struct ost_body *body;
+
+ body = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*body));
+
+ /* copy rc, transno and disp; steal locks */
+ mds_req_from_mcd(req, mcd);
+ CERROR("reconstruct reply for x"LPU64"\n", req->rq_xid);
+}
+
+static int mdt_obj_create(struct ptlrpc_request *req)
+{
+ struct obd_device *obd = req->rq_export->exp_obd;
+ struct ldlm_res_id res_id = { .name = {0} };
+ struct mds_obd *mds = &obd->u.mds;
+ struct ost_body *body, *repbody;
+ int rc, size = sizeof(*repbody);
+ char fidname[LL_FID_NAMELEN];
+ struct inode *parent_inode;
+ struct lustre_handle lockh;
+ struct lvfs_run_ctxt saved;
+ ldlm_policy_data_t policy;
+ struct dentry *new = NULL;
+ struct dentry_params dp;
+ int mealen, flags = 0;
+ unsigned int tmpname;
+ struct lvfs_ucred uc;
+ struct mea *mea;
+ void *handle;
+ ENTRY;
+
+ DEBUG_REQ(D_HA, req, "create remote object");
+
+ parent_inode = mds->mds_objects_dir->d_inode;
+
+ body = lustre_swab_reqbuf(req, 0, sizeof(*body),
+ lustre_swab_ost_body);
+ if (body == NULL)
+ RETURN(-EFAULT);
+
+ MDS_CHECK_RESENT(req, reconstruct_create(req));
+
+ uc.luc_fsuid = body->oa.o_uid;
+ uc.luc_fsgid = body->oa.o_gid;
+
+ push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
+
+ rc = lustre_pack_reply(req, 1, &size, NULL);
+ if (rc)
+ RETURN(rc);
+
+ repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
+
+ if (body->oa.o_flags & OBD_FL_RECREATE_OBJS) {
+ /* this is re-create request from MDS holding directory name.
+ * we have to lookup given ino/generation first. if it exists
+ * (good case) then there is nothing to do. if it does not
+ * then we have to recreate it */
+ struct ll_fid fid;
+ fid.id = body->oa.o_id;
+ fid.generation = body->oa.o_generation;
+ new = mds_fid2dentry(mds, &fid, NULL);
+ if (!IS_ERR(new) && new->d_inode) {
+ CWARN("mkdir() repairing is on its way: %lu/%lu\n",
+ (unsigned long) fid.id,
+ (unsigned long) fid.generation);
+ obdo_from_inode(&repbody->oa, new->d_inode,
+ FILTER_VALID_FLAGS);
+ repbody->oa.o_id = new->d_inode->i_ino;
+ repbody->oa.o_generation = new->d_inode->i_generation;
+ repbody->oa.o_valid |= OBD_MD_FLID | OBD_MD_FLGENER;
+ GOTO(cleanup2, rc = 0);
+ }
+ CWARN("hmm. for some reason dir %lu/%lu (or reply) got lost\n",
+ (unsigned long) fid.id, (unsigned long) fid.generation);
+ LASSERT(new->d_inode == NULL ||
+ new->d_inode->i_generation != fid.generation);
+ l_dput(new);
+ }
+
+ down(&parent_inode->i_sem);
+ handle = fsfilt_start(obd, parent_inode, FSFILT_OP_MKDIR, NULL);
+ LASSERT(!IS_ERR(handle));
+
+repeat:
+ tmpname = ll_insecure_random_int();
+ rc = sprintf(fidname, "%u", tmpname);
+ new = lookup_one_len(fidname, mds->mds_objects_dir, rc);
+ if (IS_ERR(new)) {
+ CERROR("%s: can't lookup new inode (%s) for mkdir: %d\n",
+ obd->obd_name, fidname, (int) PTR_ERR(new));
+ fsfilt_commit(obd, mds->mds_sb, new->d_inode, handle, 0);
+ up(&parent_inode->i_sem);
+ RETURN(PTR_ERR(new));
+ } else if (new->d_inode) {
+ CERROR("%s: name exists. repeat\n", obd->obd_name);
+ goto repeat;
+ }
+
+ new->d_fsdata = (void *) &dp;
+ dp.p_inum = 0;
+ dp.p_ptr = req;
+
+ if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
+ DEBUG_REQ(D_HA, req, "replay create obj %lu/%lu",
+ (unsigned long) body->oa.o_id,
+ (unsigned long) body->oa.o_generation);
+ dp.p_inum = body->oa.o_id;
+ dp.p_generation = body->oa.o_generation;
+ }
+ rc = vfs_mkdir(parent_inode, new, body->oa.o_mode);
+ if (rc == 0) {
+ obdo_from_inode(&repbody->oa, new->d_inode, FILTER_VALID_FLAGS);
+ repbody->oa.o_id = new->d_inode->i_ino;
+ repbody->oa.o_generation = new->d_inode->i_generation;
+ repbody->oa.o_valid |= OBD_MD_FLID | OBD_MD_FLGENER;
+
+ rc = fsfilt_del_dir_entry(obd, new);
+ up(&parent_inode->i_sem);
+
+ if (rc) {
+ CERROR("can't remove name for object: %d\n", rc);
+ GOTO(cleanup, rc);
+ }
+
+ /* this lock should be taken to serialize MDS modifications
+ * in failure case */
+ res_id.name[0] = new->d_inode->i_ino;
+ res_id.name[1] = new->d_inode->i_generation;
+ policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
+ rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
+ res_id, LDLM_IBITS, &policy,
+ LCK_EX, &flags, mds_blocking_ast,
+ ldlm_completion_ast, NULL, NULL,
+ NULL, 0, NULL, &lockh);
+ if (rc != ELDLM_OK)
+ GOTO(cleanup, rc);
+
+ CDEBUG(D_OTHER, "created dirobj: %lu/%lu mode %o\n",
+ (unsigned long) new->d_inode->i_ino,
+ (unsigned long) new->d_inode->i_generation,
+ (unsigned) new->d_inode->i_mode);
+ } else {
+ up(&parent_inode->i_sem);
+ CERROR("%s: can't create dirobj: %d\n", obd->obd_name, rc);
+ }
+
+ if (rc == 0 && body->oa.o_valid & OBD_MD_FLID) {
+ /* this is new object for splitted dir. we have to
+ * prevent recursive splitting on it -bzzz */
+ mealen = obd_size_diskmd(mds->mds_lmv_exp, NULL);
+ OBD_ALLOC(mea, mealen);
+ if (mea == NULL)
+ GOTO(cleanup, rc = -ENOMEM);
+ mea->mea_count = 0;
+ down(&new->d_inode->i_sem);
+ rc = fsfilt_set_md(obd, new->d_inode, handle, mea, mealen);
+ up(&new->d_inode->i_sem);
+ OBD_FREE(mea, mealen);
+ }
+
+cleanup:
+ rc = mds_finish_transno(mds, parent_inode, handle, req, rc, 0);
+ if (rc == 0)
+ ptlrpc_save_lock(req, &lockh, LCK_EX);
+ else
+ ldlm_lock_decref(&lockh, LCK_EX);
+cleanup2:
+ l_dput(new);
+ pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
+ RETURN(rc);
+}
+
+static int mdt_get_info(struct ptlrpc_request *req)
+{
+ char *key;
+ struct obd_export *exp = req->rq_export;
+ int keylen, rc = 0, size = sizeof(obd_id);
+ obd_id *reply;
+ ENTRY;
+
+ key = lustre_msg_buf(req->rq_reqmsg, 0, 1);
+ if (key == NULL) {
+ DEBUG_REQ(D_HA, req, "no get_info key");
+ RETURN(-EFAULT);
+ }
+ keylen = req->rq_reqmsg->buflens[0];
+
+ if (keylen < strlen("mdsize") || memcmp(key, "mdsize", 6) != 0)
+ RETURN(-EPROTO);
+
+ rc = lustre_pack_reply(req, 1, &size, NULL);
+ if (rc)
+ RETURN(rc);
+
+ reply = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*reply));
+ rc = obd_get_info(exp, keylen, key, &size, reply);
+ req->rq_repmsg->status = 0;
+ RETURN(rc);
+}
+
+static int mds_set_info(struct obd_export *exp, __u32 keylen,
+ void *key, __u32 vallen, void *val)
+{
+ struct obd_device *obd;
+ struct mds_obd *mds;
+ int rc = 0;
+ ENTRY;
+
+ obd = class_exp2obd(exp);
+ if (obd == NULL) {
+ CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
+ exp->exp_handle.h_cookie);
+ RETURN(-EINVAL);
+ }
+
+ mds = &obd->u.mds;
+ if (keylen == strlen("mds_num") &&
+ memcmp(key, "mds_num", keylen) == 0) {
+ int valsize;
+ __u32 group;
+ CDEBUG(D_IOCTL, "set mds num %d\n", *(int*)val);
+ mds->mds_num = *(int*)val;
+ group = FILTER_GROUP_FIRST_MDS + mds->mds_num;
+ valsize = sizeof(group);
+ /*mds number has been changed, so the corresponding obdfilter exp
+ *need to be changed too*/
+ rc = obd_set_info(mds->mds_osc_exp, strlen("mds_conn"), "mds_conn",
+ valsize, &group);
+ RETURN(rc);
+ } else if (keylen == strlen("client") &&
+ memcmp(key, "client", keylen) == 0) {
+ if (!(exp->exp_flags & OBD_OPT_REAL_CLIENT)) {
+ atomic_inc(&mds->mds_real_clients);
+ CDEBUG(D_OTHER,"%s: peer from %s is real client (%d)\n",
+ obd->obd_name, exp->exp_client_uuid.uuid,
+ atomic_read(&mds->mds_real_clients));
+ exp->exp_flags |= OBD_OPT_REAL_CLIENT;
+ }
+ if (mds->mds_lmv_name) {
+ rc = mds_lmv_connect(obd, mds->mds_lmv_name);
+ LASSERT(rc == 0);
+ }
+ RETURN(0);
+ }
+ CDEBUG(D_IOCTL, "invalid key\n");
+ RETURN(-EINVAL);
+}
+
+static int mdt_set_info(struct ptlrpc_request *req)