+ mds_fs_cleanup(obddev);
+
+ MOD_DEC_USE_COUNT;
+ RETURN(0);
+}
+
+static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie,
+ ldlm_mode_t mode, int flags, void *data)
+{
+ struct ptlrpc_request *req = req_cookie;
+ int rc = 0;
+ ENTRY;
+
+ if (!req_cookie)
+ RETURN(0);
+
+ if (req->rq_reqmsg->bufcount > 1) {
+ /* an intent needs to be considered */
+ struct ldlm_intent *it = lustre_msg_buf(req->rq_reqmsg, 1);
+ struct mds_obd *mds= &req->rq_export->exp_obd->u.mds;
+ struct mds_body *mds_rep;
+ struct ldlm_reply *rep;
+ __u64 new_resid[3] = {0, 0, 0}, old_res;
+ int rc, size[3] = {sizeof(struct ldlm_reply),
+ sizeof(struct mds_body),
+ mds->mds_max_mdsize};
+
+ it->opc = NTOH__u64(it->opc);
+
+ LDLM_DEBUG(lock, "intent policy, opc: %s",
+ ldlm_it2str(it->opc));
+
+ rc = lustre_pack_msg(3, size, NULL, &req->rq_replen,
+ &req->rq_repmsg);
+ if (rc) {
+ rc = req->rq_status = -ENOMEM;
+ RETURN(rc);
+ }
+
+ rep = lustre_msg_buf(req->rq_repmsg, 0);
+ rep->lock_policy_res1 = 1;
+
+ /* execute policy */
+ switch ((long)it->opc) {
+ case IT_CREAT|IT_OPEN:
+ rc = mds_reint(2, req);
+ if (rc || (req->rq_status != 0 &&
+ req->rq_status != -EEXIST)) {
+ rep->lock_policy_res2 = req->rq_status;
+ RETURN(ELDLM_LOCK_ABORTED);
+ }
+ break;
+ case IT_CREAT:
+ case IT_MKDIR:
+ case IT_MKNOD:
+ case IT_RENAME2:
+ case IT_LINK2:
+ case IT_RMDIR:
+ case IT_SYMLINK:
+ case IT_UNLINK:
+ rc = mds_reint(2, req);
+ if (rc || (req->rq_status != 0 &&
+ req->rq_status != -EISDIR &&
+ req->rq_status != -ENOTDIR)) {
+ rep->lock_policy_res2 = req->rq_status;
+ RETURN(ELDLM_LOCK_ABORTED);
+ }
+ break;
+ case IT_GETATTR:
+ case IT_LOOKUP:
+ case IT_OPEN:
+ case IT_READDIR:
+ case IT_READLINK:
+ case IT_RENAME:
+ case IT_LINK:
+ case IT_SETATTR:
+ rc = mds_getattr_name(2, req);
+ /* FIXME: we need to sit down and decide on who should
+ * set req->rq_status, who should return negative and
+ * positive return values, and what they all mean. */
+ if (rc || req->rq_status != 0) {
+ rep->lock_policy_res2 = req->rq_status;
+ RETURN(ELDLM_LOCK_ABORTED);
+ }
+ break;
+ case IT_READDIR|IT_OPEN:
+ LBUG();
+ break;
+ default:
+ CERROR("Unhandled intent\n");
+ LBUG();
+ }
+
+ /* We don't bother returning a lock to the client for a file
+ * or directory we are removing.
+ *
+ * As for link and rename, there is no reason for the client
+ * to get a lock on the target at this point. If they are
+ * going to modify the file/directory later they will get a
+ * lock at that time.
+ */
+ if (it->opc & (IT_UNLINK | IT_RMDIR | IT_LINK | IT_LINK2 |
+ IT_RENAME | IT_RENAME2))
+ RETURN(ELDLM_LOCK_ABORTED);
+
+ rep->lock_policy_res2 = req->rq_status;
+ mds_rep = lustre_msg_buf(req->rq_repmsg, 1);
+
+ /* If the client is about to open a file that doesn't have an MD
+ * stripe record, it's going to need a write lock. */
+ if (it->opc & IT_OPEN) {
+ struct lov_mds_md *lmm =
+ lustre_msg_buf(req->rq_repmsg, 2);
+ if (lmm->lmm_easize == 0) {
+ LDLM_DEBUG(lock, "open with no EA; returning PW"
+ " lock");
+ lock->l_req_mode = LCK_PW;
+ }
+ }
+
+ if (flags & LDLM_FL_INTENT_ONLY) {
+ LDLM_DEBUG(lock, "INTENT_ONLY, aborting lock");
+ RETURN(ELDLM_LOCK_ABORTED);
+ }
+ /* Give the client a lock on the child object, instead of the
+ * parent that it requested. */
+ new_resid[0] = NTOH__u32(mds_rep->ino);
+ new_resid[1] = NTOH__u32(mds_rep->generation);
+ if (new_resid[0] == 0)
+ LBUG();
+ old_res = lock->l_resource->lr_name[0];
+
+ ldlm_lock_change_resource(lock, new_resid);
+ if (lock->l_resource == NULL) {
+ LBUG();
+ RETURN(-ENOMEM);
+ }
+ LDLM_DEBUG(lock, "intent policy, old res %ld",
+ (long)old_res);
+ RETURN(ELDLM_LOCK_CHANGED);
+ } else {
+ int size = sizeof(struct ldlm_reply);
+ rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
+ &req->rq_repmsg);
+ if (rc) {
+ LBUG();
+ RETURN(-ENOMEM);
+ }
+ }
+ RETURN(rc);
+}
+
+int mds_attach(struct obd_device *dev,
+ obd_count len, void *data)
+{
+ int rc;
+ rc = lprocfs_reg_obd(dev, (lprocfs_vars_t*)status_var_nm_1, (void*)dev);
+ return rc;
+}
+
+int mds_detach(struct obd_device *dev)
+{
+ int rc;
+ rc = lprocfs_dereg_obd(dev);
+ return rc;
+
+}
+
+#define MDT_NUM_THREADS 8
+static int mdt_setup(struct obd_device *obddev, obd_count len, void *buf)
+{
+ int i;
+ // struct obd_ioctl_data* data = buf;
+ struct mds_obd *mds = &obddev->u.mds;
+ int rc = 0;
+ ENTRY;
+
+ MOD_INC_USE_COUNT;
+
+ mds->mds_service = ptlrpc_init_svc(MDS_NEVENTS, MDS_NBUFS,
+ MDS_BUFSIZE, MDS_MAXREQSIZE,
+ MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
+ "self", mds_handle, "mds");
+ if (!mds->mds_service) {
+ CERROR("failed to start service\n");
+ GOTO(err_dec, rc = -EINVAL);
+ }
+
+ for (i = 0; i < MDT_NUM_THREADS; i++) {
+ char name[32];
+ sprintf(name, "lustre_MDT_%02d", i);
+ rc = ptlrpc_start_thread(obddev, mds->mds_service, name);
+ if (rc) {
+ CERROR("cannot start MDT thread #%d: rc %d\n", i, rc);
+ GOTO(err_thread, rc);
+ }
+ }
+
+ RETURN(0);
+
+err_thread:
+ ptlrpc_stop_all_threads(mds->mds_service);
+ ptlrpc_unregister_service(mds->mds_service);
+err_dec:
+ MOD_DEC_USE_COUNT;
+ RETURN(rc);
+}
+
+
+static int mdt_cleanup(struct obd_device *obddev)
+{
+ struct mds_obd *mds = &obddev->u.mds;
+ ENTRY;
+
+ ptlrpc_stop_all_threads(mds->mds_service);
+ ptlrpc_unregister_service(mds->mds_service);