Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
index ffc8a96..c299486 100644 (file)
 #include <lustre_mds.h>
 #include <lustre_mdt.h>
 #include "mdt_internal.h"
+#ifdef HAVE_QUOTA_SUPPORT
+# include <lustre_quota.h>
+#endif
 #include <lustre_acl.h>
 #include <lustre_param.h>
+#include <lustre_fsfilt.h>
 
 mdl_mode_t mdt_mdl_lock_modes[] = {
         [LCK_MINMODE] = MDL_MINMODE,
@@ -161,6 +165,8 @@ static struct mdt_opc_slice mdt_fld_handlers[];
 static struct mdt_device *mdt_dev(struct lu_device *d);
 static int mdt_regular_handle(struct ptlrpc_request *req);
 static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags);
+static int mdt_fid2path(const struct lu_env *env, struct mdt_device *mdt,
+                        struct getinfo_fid2path *fp);
 
 static const struct lu_object_operations mdt_obj_ops;
 
@@ -309,7 +315,8 @@ static int mdt_getstatus(struct mdt_thread_info *info)
 
         repbody->valid |= OBD_MD_FLID;
 
-        if (mdt->mdt_opts.mo_mds_capa) {
+        if (mdt->mdt_opts.mo_mds_capa &&
+            info->mti_exp->exp_connect_flags & OBD_CONNECT_MDS_CAPA) {
                 struct mdt_object  *root;
                 struct lustre_capa *capa;
 
@@ -320,7 +327,6 @@ static int mdt_getstatus(struct mdt_thread_info *info)
                 capa = req_capsule_server_get(info->mti_pill, &RMF_CAPA1);
                 LASSERT(capa);
                 capa->lc_opc = CAPA_OPC_MDS_DEFAULT;
-
                 rc = mo_capa_get(info->mti_env, mdt_object_child(root), capa,
                                  0);
                 mdt_object_put(info->mti_env, root);
@@ -345,7 +351,7 @@ static int mdt_statfs(struct mdt_thread_info *info)
         /* This will trigger a watchdog timeout */
         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_STATFS_LCW_SLEEP,
                          (MDT_SERVICE_WATCHDOG_FACTOR *
-                          at_get(&svc->srv_at_estimate) / 1000) + 1);
+                          at_get(&svc->srv_at_estimate)) + 1);
 
         rc = mdt_check_ucred(info);
         if (rc)
@@ -432,7 +438,6 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
         struct md_object        *next = mdt_object_child(o);
         const struct mdt_body   *reqbody = info->mti_body;
         struct ptlrpc_request   *req = mdt_info_req(info);
-        struct mdt_export_data  *med = &req->rq_export->exp_mdt_data;
         struct md_attr          *ma = &info->mti_attr;
         struct lu_attr          *la = &ma->ma_attr;
         struct req_capsule      *pill = info->mti_pill;
@@ -537,7 +542,8 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
                        repbody->max_cookiesize);
         }
 
-        if (med->med_rmtclient && (reqbody->valid & OBD_MD_FLRMTPERM)) {
+        if (exp_connect_rmtclient(info->mti_exp) &&
+            reqbody->valid & OBD_MD_FLRMTPERM) {
                 void *buf = req_capsule_server_get(pill, &RMF_ACL);
 
                 /* mdt_getattr_lock only */
@@ -579,8 +585,9 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
         }
 #endif
 
-        if ((reqbody->valid & OBD_MD_FLMDSCAPA) &&
-            info->mti_mdt->mdt_opts.mo_mds_capa) {
+        if (reqbody->valid & OBD_MD_FLMDSCAPA &&
+            info->mti_mdt->mdt_opts.mo_mds_capa &&
+            info->mti_exp->exp_connect_flags & OBD_CONNECT_MDS_CAPA) {
                 struct lustre_capa *capa;
 
                 capa = req_capsule_server_get(pill, &RMF_CAPA1);
@@ -596,7 +603,6 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
 
 static int mdt_renew_capa(struct mdt_thread_info *info)
 {
-        struct mdt_device  *mdt = info->mti_mdt;
         struct mdt_object  *obj = info->mti_object;
         struct mdt_body    *body;
         struct lustre_capa *capa, *c;
@@ -607,7 +613,8 @@ static int mdt_renew_capa(struct mdt_thread_info *info)
          * return directly, client will find body->valid OBD_MD_FLOSSCAPA
          * flag not set.
          */
-        if (!obj || !mdt->mdt_opts.mo_mds_capa)
+        if (!obj || !info->mti_mdt->mdt_opts.mo_oss_capa ||
+            !(info->mti_exp->exp_connect_flags & OBD_CONNECT_OSS_CAPA))
                 RETURN(0);
 
         body = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
@@ -799,14 +806,14 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
         namelen = req_capsule_get_size(info->mti_pill, &RMF_NAME,
                                        RCL_CLIENT) - 1;
         if (!info->mti_cross_ref) {
-                /* 
-                 * XXX: Check for "namelen == 0" is for getattr by fid 
+                /*
+                 * XXX: Check for "namelen == 0" is for getattr by fid
                  * (OBD_CONNECT_ATTRFID), otherwise do not allow empty name,
                  * that is the name must contain at least one character and
                  * the terminating '\0'
                  */
                 if (namelen == 0) {
-                        reqbody = req_capsule_client_get(info->mti_pill, 
+                        reqbody = req_capsule_client_get(info->mti_pill,
                                                          &RMF_MDT_BODY);
                         LASSERT(fid_is_sane(&reqbody->fid2));
                         name = NULL;
@@ -818,7 +825,7 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                 } else {
                         lname = mdt_name(info->mti_env, (char *)name, namelen);
                         CDEBUG(D_INODE, "getattr with lock for "DFID"/%s, "
-                               "ldlm_rep = %p\n", PFID(mdt_object_fid(parent)), 
+                               "ldlm_rep = %p\n", PFID(mdt_object_fid(parent)),
                                name, ldlm_rep);
                 }
         }
@@ -1063,12 +1070,15 @@ static int lu_device_is_mdt(struct lu_device *d)
         return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops);
 }
 
+static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
+                         void *karg, void *uarg);
+
 static int mdt_set_info(struct mdt_thread_info *info)
 {
         struct ptlrpc_request *req = mdt_info_req(info);
         char *key;
-        __u32 *val;
-        int keylen, rc = 0;
+        void *val;
+        int keylen, vallen, rc = 0;
         ENTRY;
 
         rc = req_capsule_server_pack(info->mti_pill);
@@ -1090,19 +1100,39 @@ static int mdt_set_info(struct mdt_thread_info *info)
                 RETURN(-EFAULT);
         }
 
-        if (!KEY_IS(KEY_READ_ONLY))
-                RETURN(-EINVAL);
+        vallen = req_capsule_get_size(info->mti_pill, &RMF_SETINFO_VAL,
+                                      RCL_CLIENT);
 
-        req->rq_status = 0;
-        lustre_msg_set_status(req->rq_repmsg, 0);
+        if (KEY_IS(KEY_READ_ONLY)) {
+                req->rq_status = 0;
+                lustre_msg_set_status(req->rq_repmsg, 0);
 
-        spin_lock(&req->rq_export->exp_lock);
-        if (*val)
-                req->rq_export->exp_connect_flags |= OBD_CONNECT_RDONLY;
-        else
-                req->rq_export->exp_connect_flags &= ~OBD_CONNECT_RDONLY;
-        spin_unlock(&req->rq_export->exp_lock);
+                spin_lock(&req->rq_export->exp_lock);
+                if (*(__u32 *)val)
+                        req->rq_export->exp_connect_flags |= OBD_CONNECT_RDONLY;
+                else
+                        req->rq_export->exp_connect_flags &=~OBD_CONNECT_RDONLY;
+                spin_unlock(&req->rq_export->exp_lock);
+
+        } else if (KEY_IS(KEY_CHANGELOG_CLEAR)) {
+                struct changelog_setinfo *cs =
+                        (struct changelog_setinfo *)val;
+                if (vallen != sizeof(*cs)) {
+                        CERROR("Bad changelog_clear setinfo size %d\n", vallen);
+                        RETURN(-EINVAL);
+                }
+                if (lustre_msg_swabbed(req->rq_reqmsg)) {
+                        __swab64s(&cs->cs_recno);
+                        __swab32s(&cs->cs_id);
+                }
 
+                rc = mdt_iocontrol(OBD_IOC_CHANGELOG_CLEAR, info->mti_exp,
+                                   vallen, val, NULL);
+                lustre_msg_set_status(req->rq_repmsg, rc);
+
+        } else {
+                RETURN(-EINVAL);
+        }
         RETURN(0);
 }
 
@@ -1116,12 +1146,14 @@ static int mdt_connect(struct mdt_thread_info *info)
         if (rc == 0) {
                 LASSERT(req->rq_export != NULL);
                 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
-                rc = mdt_init_idmap(info);
+                rc = mdt_init_sec_level(info);
+                if (rc == 0)
+                        rc = mdt_init_idmap(info);
                 if (rc != 0)
-                        /* if mdt_init_idmap failed, revocation for connect */
                         obd_disconnect(class_export_get(req->rq_export));
-        } else
+        } else {
                 rc = err_serious(rc);
+        }
         return rc;
 }
 
@@ -1161,6 +1193,10 @@ static int mdt_sendpage(struct mdt_thread_info *info,
         }
 
         LASSERT(desc->bd_nob == rdpg->rp_count);
+        rc = sptlrpc_svc_wrap_bulk(req, desc);
+        if (rc)
+                GOTO(free_desc, rc);
+
         rc = ptlrpc_start_bulk_transfer(desc);
         if (rc)
                 GOTO(free_desc, rc);
@@ -1172,8 +1208,8 @@ static int mdt_sendpage(struct mdt_thread_info *info,
         if (timeout < 0)
                 CERROR("Req deadline already passed %lu (now: %lu)\n",
                        req->rq_deadline, cfs_time_current_sec());
-        *lwi = LWI_TIMEOUT(max(timeout, 1) * HZ, NULL, NULL);
-        rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), lwi);
+        *lwi = LWI_TIMEOUT(cfs_time_seconds(max(timeout, 1)), NULL, NULL);
+        rc = l_wait_event(desc->bd_waitq, !ptlrpc_server_bulk_active(desc), lwi);
         LASSERT (rc == 0 || rc == -ETIMEDOUT);
 
         if (rc == 0) {
@@ -1258,7 +1294,7 @@ static int mdt_write_dir_page(struct mdt_thread_info *info, struct page *page,
                 memcpy(name, ent->lde_name, le16_to_cpu(ent->lde_namelen));
                 lname = mdt_name(info->mti_env, name,
                                  le16_to_cpu(ent->lde_namelen));
-                ma->ma_attr_flags |= MDS_PERM_BYPASS;
+                ma->ma_attr_flags |= (MDS_PERM_BYPASS | MDS_QUOTA_IGNORE);
                 rc = mdo_name_insert(info->mti_env,
                                      md_object_next(&object->mot_obj),
                                      lname, lf, ma);
@@ -1320,6 +1356,9 @@ static int mdt_writepage(struct mdt_thread_info *info)
         ptlrpc_prep_bulk_page(desc, page, (int)reqbody->size,
                               (int)reqbody->nlink);
 
+        rc = sptlrpc_svc_prep_bulk(req, desc);
+        if (rc != 0)
+                GOTO(cleanup_page, rc);
         /*
          * Check if client was evicted while we were doing i/o before touching
          * network.
@@ -1400,6 +1439,8 @@ static int mdt_readpage(struct mdt_thread_info *info)
                        rdpg->rp_hash, reqbody->size);
                 RETURN(-EFAULT);
         }
+
+        rdpg->rp_attrs = reqbody->mode;
         rdpg->rp_count  = reqbody->nlink;
         rdpg->rp_npages = (rdpg->rp_count + CFS_PAGE_SIZE - 1)>>CFS_PAGE_SHIFT;
         OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
@@ -1474,6 +1515,18 @@ static int mdt_reint_internal(struct mdt_thread_info *info,
                 GOTO(out_shrink, rc = err_serious(rc));
         }
 
+        OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_REINT_DELAY, 10);
+
+        /* for replay no cookkie / lmm need, because client have this already */
+        if (info->mti_spec.no_create == 1)  {
+                if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
+                        req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, 0);
+
+                if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
+                        req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER,
+                                             0);
+        }
+
         rc = mdt_init_ucred_reint(info);
         if (rc)
                 GOTO(out_shrink, rc);
@@ -1629,15 +1682,142 @@ static int mdt_sync(struct mdt_thread_info *info)
         RETURN(rc);
 }
 
+#ifdef HAVE_QUOTA_SUPPORT
 static int mdt_quotacheck_handle(struct mdt_thread_info *info)
 {
-        return err_serious(-EOPNOTSUPP);
+        struct obd_quotactl *oqctl;
+        struct req_capsule *pill = info->mti_pill;
+        struct obd_export *exp = info->mti_exp;
+        struct md_device *next = info->mti_mdt->mdt_child;
+        int rc;
+        ENTRY;
+
+        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_QUOTACHECK_NET))
+                RETURN(0);
+
+        oqctl = req_capsule_client_get(pill, &RMF_OBD_QUOTACTL);
+        if (oqctl == NULL)
+                RETURN(-EPROTO);
+
+        /* remote client has no permission for quotacheck */
+        if (unlikely(exp_connect_rmtclient(exp)))
+                RETURN(-EPERM);
+
+        rc = req_capsule_server_pack(pill);
+        if (rc)
+                RETURN(rc);
+
+        rc = next->md_ops->mdo_quota.mqo_check(info->mti_env, next, exp,
+                                               oqctl->qc_type);
+        RETURN(rc);
 }
 
 static int mdt_quotactl_handle(struct mdt_thread_info *info)
 {
-        return err_serious(-EOPNOTSUPP);
+        struct obd_quotactl *oqctl, *repoqc;
+        struct req_capsule *pill = info->mti_pill;
+        struct obd_export *exp = info->mti_exp;
+        struct md_device *next = info->mti_mdt->mdt_child;
+        const struct md_quota_operations *mqo = &next->md_ops->mdo_quota;
+        int id, rc;
+        ENTRY;
+
+        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_QUOTACTL_NET))
+                RETURN(0);
+
+        oqctl = req_capsule_client_get(pill, &RMF_OBD_QUOTACTL);
+        if (oqctl == NULL)
+                RETURN(-EPROTO);
+
+        id = oqctl->qc_id;
+        if (exp_connect_rmtclient(exp)) {
+                struct ptlrpc_request *req = mdt_info_req(info);
+                struct mdt_export_data *med = mdt_req2med(req);
+                struct lustre_idmap_table *idmap = med->med_idmap;
+
+                if (unlikely(oqctl->qc_cmd != Q_GETQUOTA &&
+                             oqctl->qc_cmd != Q_GETINFO))
+                        RETURN(-EPERM);
+
+
+                if (oqctl->qc_type == USRQUOTA)
+                        id = lustre_idmap_lookup_uid(NULL, idmap, 0,
+                                                     oqctl->qc_id);
+                else if (oqctl->qc_type == GRPQUOTA)
+                        id = lustre_idmap_lookup_gid(NULL, idmap, 0,
+                                                     oqctl->qc_id);
+                else
+                        RETURN(-EINVAL);
+
+                if (id == CFS_IDMAP_NOTFOUND) {
+                        CDEBUG(D_QUOTA, "no mapping for id %u\n",
+                               oqctl->qc_id);
+                        RETURN(-EACCES);
+                }
+        }
+
+        rc = req_capsule_server_pack(pill);
+        if (rc)
+                RETURN(rc);
+
+        repoqc = req_capsule_server_get(pill, &RMF_OBD_QUOTACTL);
+        LASSERT(repoqc != NULL);
+
+        switch (oqctl->qc_cmd) {
+        case Q_QUOTAON:
+                if (info->mti_mdt->mdt_som_conf) {
+                        /* Quota cannot be used together with SOM while
+                         * SOM stored blocks in i_blocks but not in SOM EA. */
+                        LCONSOLE_ERROR("Fail to turn Quota on: SOM is enabled "
+                                       "and temporary conflicts with quota.\n");
+                        RETURN(-ENOTSUPP);
+                }
+                rc = mqo->mqo_on(info->mti_env, next, oqctl->qc_type);
+                break;
+        case Q_QUOTAOFF:
+                rc = mqo->mqo_off(info->mti_env, next, oqctl->qc_type);
+                break;
+        case Q_SETINFO:
+                rc = mqo->mqo_setinfo(info->mti_env, next, oqctl->qc_type, id,
+                                      &oqctl->qc_dqinfo);
+                break;
+        case Q_GETINFO:
+                rc = mqo->mqo_getinfo(info->mti_env, next, oqctl->qc_type, id,
+                                      &oqctl->qc_dqinfo);
+                break;
+        case Q_SETQUOTA:
+                rc = mqo->mqo_setquota(info->mti_env, next, oqctl->qc_type, id,
+                                       &oqctl->qc_dqblk);
+                break;
+        case Q_GETQUOTA:
+                rc = mqo->mqo_getquota(info->mti_env, next, oqctl->qc_type, id,
+                                       &oqctl->qc_dqblk);
+                break;
+        case Q_GETOINFO:
+                rc = mqo->mqo_getoinfo(info->mti_env, next, oqctl->qc_type, id,
+                                       &oqctl->qc_dqinfo);
+                break;
+        case Q_GETOQUOTA:
+                rc = mqo->mqo_getoquota(info->mti_env, next, oqctl->qc_type, id,
+                                        &oqctl->qc_dqblk);
+                break;
+        case LUSTRE_Q_INVALIDATE:
+                rc = mqo->mqo_invalidate(info->mti_env, next, oqctl->qc_type);
+                break;
+        case LUSTRE_Q_FINVALIDATE:
+                rc = mqo->mqo_finvalidate(info->mti_env, next, oqctl->qc_type);
+                break;
+        default:
+                CERROR("unsupported mdt_quotactl command: %d\n",
+                       oqctl->qc_cmd);
+                RETURN(-EFAULT);
+        }
+
+        *repoqc = *oqctl;
+        RETURN(rc);
 }
+#endif
+
 
 /*
  * OBD PING and other handlers.
@@ -1667,6 +1847,101 @@ static int mdt_obd_qc_callback(struct mdt_thread_info *info)
 
 
 /*
+ * LLOG handlers.
+ */
+
+/** clone llog ctxt from child (mdd)
+ * This allows remote llog (replicator) access.
+ * We can either pass all llog RPCs (eg mdt_llog_create) on to child where the
+ * context was originally set up, or we can handle them directly.
+ * I choose the latter, but that means I need any llog
+ * contexts set up by child to be accessable by the mdt.  So we clone the
+ * context into our context list here.
+ */
+static int mdt_llog_ctxt_clone(const struct lu_env *env, struct mdt_device *mdt,
+                               int idx)
+{
+        struct md_device  *next = mdt->mdt_child;
+        struct llog_ctxt *ctxt;
+        int rc;
+
+        if (!llog_ctxt_null(mdt2obd_dev(mdt), idx))
+                return 0;
+
+        rc = next->md_ops->mdo_llog_ctxt_get(env, next, idx, (void **)&ctxt);
+        if (rc || ctxt == NULL) {
+                CERROR("Can't get mdd ctxt %d\n", rc);
+                return rc;
+        }
+
+        rc = llog_group_set_ctxt(&mdt2obd_dev(mdt)->obd_olg, ctxt, idx);
+        if (rc)
+                CERROR("Can't set mdt ctxt %d\n", rc);
+
+        return rc;
+}
+
+static int mdt_llog_ctxt_unclone(const struct lu_env *env,
+                                 struct mdt_device *mdt, int idx)
+{
+        struct llog_ctxt *ctxt;
+
+        ctxt = llog_get_context(mdt2obd_dev(mdt), idx);
+        if (ctxt == NULL)
+                return 0;
+        /* Put once for the get we just did, and once for the clone */
+        llog_ctxt_put(ctxt);
+        llog_ctxt_put(ctxt);
+        return 0;
+}
+
+static int mdt_llog_create(struct mdt_thread_info *info)
+{
+        int rc;
+
+        req_capsule_set(info->mti_pill, &RQF_LLOG_ORIGIN_HANDLE_CREATE);
+        rc = llog_origin_handle_create(mdt_info_req(info));
+        return (rc < 0 ? err_serious(rc) : rc);
+}
+
+static int mdt_llog_destroy(struct mdt_thread_info *info)
+{
+        int rc;
+
+        req_capsule_set(info->mti_pill, &RQF_LLOG_ORIGIN_HANDLE_DESTROY);
+        rc = llog_origin_handle_destroy(mdt_info_req(info));
+        return (rc < 0 ? err_serious(rc) : rc);
+}
+
+static int mdt_llog_read_header(struct mdt_thread_info *info)
+{
+        int rc;
+
+        req_capsule_set(info->mti_pill, &RQF_LLOG_ORIGIN_HANDLE_READ_HEADER);
+        rc = llog_origin_handle_read_header(mdt_info_req(info));
+        return (rc < 0 ? err_serious(rc) : rc);
+}
+
+static int mdt_llog_next_block(struct mdt_thread_info *info)
+{
+        int rc;
+
+        req_capsule_set(info->mti_pill, &RQF_LLOG_ORIGIN_HANDLE_NEXT_BLOCK);
+        rc = llog_origin_handle_next_block(mdt_info_req(info));
+        return (rc < 0 ? err_serious(rc) : rc);
+}
+
+static int mdt_llog_prev_block(struct mdt_thread_info *info)
+{
+        int rc;
+
+        req_capsule_set(info->mti_pill, &RQF_LLOG_ORIGIN_HANDLE_PREV_BLOCK);
+        rc = llog_origin_handle_prev_block(mdt_info_req(info));
+        return (rc < 0 ? err_serious(rc) : rc);
+}
+
+
+/*
  * DLM handlers.
  */
 static struct ldlm_callback_suite cbs = {
@@ -1776,6 +2051,108 @@ struct mdt_object *mdt_object_find(const struct lu_env *env,
         RETURN(m);
 }
 
+/**
+ * Asyncronous commit for mdt device.
+ *
+ * Pass asynchonous commit call down the MDS stack.
+ *
+ * \param env environment
+ * \param mdt the mdt device
+ */
+static void mdt_device_commit_async(const struct lu_env *env,
+                                    struct mdt_device *mdt)
+{
+        struct dt_device *dt = mdt->mdt_bottom;
+        int rc;
+
+        rc = dt->dd_ops->dt_commit_async(env, dt);
+        if (unlikely(rc != 0))
+                CWARN("async commit start failed with rc = %d", rc);
+}
+
+/**
+ * Mark the lock as "synchonous".
+ *
+ * Mark the lock to deffer transaction commit to the unlock time.
+ *
+ * \param lock the lock to mark as "synchonous"
+ *
+ * \see mdt_is_lock_sync
+ * \see mdt_save_lock
+ */
+static inline void mdt_set_lock_sync(struct ldlm_lock *lock)
+{
+        lock->l_ast_data = (void*)1;
+}
+
+/**
+ * Check whehter the lock "synchonous" or not.
+ *
+ * \param lock the lock to check
+ * \retval 1 the lock is "synchonous"
+ * \retval 0 the lock isn't "synchronous"
+ *
+ * \see mdt_set_lock_sync
+ * \see mdt_save_lock
+ */
+static inline int mdt_is_lock_sync(struct ldlm_lock *lock)
+{
+        return lock->l_ast_data != NULL;
+}
+
+/**
+ * Blocking AST for mdt locks.
+ *
+ * Starts transaction commit if in case of COS lock conflict or
+ * deffers such a commit to the mdt_save_lock.
+ *
+ * \param lock the lock which blocks a request or cancelling lock
+ * \param desc unused
+ * \param data unused
+ * \param flag indicates whether this cancelling or blocking callback
+ * \retval 0
+ * \see ldlm_blocking_ast_nocheck
+ */
+int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
+                     void *data, int flag)
+{
+        struct obd_device *obd = lock->l_resource->lr_namespace->ns_obd;
+        struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
+        int rc;
+        ENTRY;
+
+        if (flag == LDLM_CB_CANCELING)
+                RETURN(0);
+        lock_res_and_lock(lock);
+        if (lock->l_blocking_ast != mdt_blocking_ast) {
+                unlock_res_and_lock(lock);
+                RETURN(0);
+        }
+        if (mdt_cos_is_enabled(mdt) &&
+            lock->l_req_mode & (LCK_PW | LCK_EX) &&
+            lock->l_blocking_lock != NULL &&
+            lock->l_client_cookie != lock->l_blocking_lock->l_client_cookie) {
+                mdt_set_lock_sync(lock);
+        }
+        rc = ldlm_blocking_ast_nocheck(lock);
+
+        /* There is no lock conflict if l_blocking_lock == NULL,
+         * it indicates a blocking ast sent from ldlm_lock_decref_internal
+         * when the last reference to a local lock was released */
+        if (lock->l_req_mode == LCK_COS && lock->l_blocking_lock != NULL) {
+                struct lu_env env;
+
+                rc = lu_env_init(&env, LCT_MD_THREAD);
+                if (unlikely(rc != 0))
+                        CWARN("lu_env initialization failed with rc = %d,"
+                              "cannot start asynchronous commit\n", rc);
+                else
+                        mdt_device_commit_async(&env, mdt);
+                lu_env_fini(&env);
+        }
+        RETURN(rc);
+}
+
 int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
                     struct mdt_lock_handle *lh, __u64 ibits, int locality)
 {
@@ -1832,7 +2209,8 @@ int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
                          */
                         policy->l_inodebits.bits = MDS_INODELOCK_UPDATE;
                         rc = mdt_fid_lock(ns, &lh->mlh_pdo_lh, lh->mlh_pdo_mode,
-                                          policy, res_id, LDLM_FL_ATOMIC_CB);
+                                          policy, res_id, LDLM_FL_ATOMIC_CB,
+                                          &info->mti_exp->exp_handle.h_cookie);
                         if (unlikely(rc))
                                 RETURN(rc);
                 }
@@ -1852,8 +2230,8 @@ int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
          * fix it up and turn FL_LOCAL flag off.
          */
         rc = mdt_fid_lock(ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, policy,
-                          res_id, LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB);
-
+                          res_id, LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB,
+                          &info->mti_exp->exp_handle.h_cookie);
         if (rc)
                 GOTO(out, rc);
 
@@ -1865,36 +2243,79 @@ out:
         RETURN(rc);
 }
 
-static inline
-void mdt_save_lock(struct ptlrpc_request *req, struct lustre_handle *h,
+/**
+ * Save a lock within request object.
+ *
+ * Keep the lock referenced until whether client ACK or transaction
+ * commit happens or release the lock immediately depending on input
+ * parameters. If COS is ON, a write lock is converted to COS lock
+ * before saving.
+ *
+ * \param info thead info object
+ * \param h lock handle
+ * \param mode lock mode
+ * \param decref force immediate lock releasing
+ */
+static
+void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h,
                    ldlm_mode_t mode, int decref)
 {
         ENTRY;
 
         if (lustre_handle_is_used(h)) {
-                if (decref)
+                if (decref || !info->mti_has_trans ||
+                    !(mode & (LCK_PW | LCK_EX))){
                         mdt_fid_unlock(h, mode);
-                else
-                        ptlrpc_save_lock(req, h, mode);
+                } else {
+                        struct mdt_device *mdt = info->mti_mdt;
+                        struct ldlm_lock *lock = ldlm_handle2lock(h);
+                        struct ptlrpc_request *req = mdt_info_req(info);
+                        int no_ack = 0;
+
+                        LASSERTF(lock != NULL, "no lock for cookie "LPX64"\n",
+                                 h->cookie);
+                        CDEBUG(D_HA, "request = %p reply state = %p"
+                               " transno = "LPD64"\n",
+                               req, req->rq_reply_state, req->rq_transno);
+                        if (mdt_cos_is_enabled(mdt)) {
+                                no_ack = 1;
+                                ldlm_lock_downgrade(lock, LCK_COS);
+                                mode = LCK_COS;
+                        }
+                        ptlrpc_save_lock(req, h, mode, no_ack);
+                        if (mdt_is_lock_sync(lock)) {
+                                CDEBUG(D_HA, "found sync-lock,"
+                                       " async commit started\n");
+                                mdt_device_commit_async(info->mti_env,
+                                                        mdt);
+                        }
+                        LDLM_LOCK_PUT(lock);
+                }
                 h->cookie = 0ull;
         }
 
         EXIT;
 }
 
-/*
- * Just call ldlm_lock_decref() if decref, else we only call ptlrpc_save_lock()
- * to save this lock in req.  when transaction committed, req will be released,
- * and lock will, too.
+/**
+ * Unlock mdt object.
+ *
+ * Immeditely release the regular lock and the PDO lock or save the
+ * lock in reqeuest and keep them referenced until client ACK or
+ * transaction commit.
+ *
+ * \param info thread info object
+ * \param o mdt object
+ * \param h mdt lock handle referencing regular and PDO locks
+ * \param decref force immediate lock releasing
  */
 void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *o,
                        struct mdt_lock_handle *lh, int decref)
 {
-        struct ptlrpc_request *req = mdt_info_req(info);
         ENTRY;
 
-        mdt_save_lock(req, &lh->mlh_pdo_lh, lh->mlh_pdo_mode, decref);
-        mdt_save_lock(req, &lh->mlh_reg_lh, lh->mlh_reg_mode, decref);
+        mdt_save_lock(info, &lh->mlh_pdo_lh, lh->mlh_pdo_mode, decref);
+        mdt_save_lock(info, &lh->mlh_reg_lh, lh->mlh_reg_mode, decref);
 
         EXIT;
 }
@@ -1940,7 +2361,9 @@ static struct mdt_handler *mdt_handler_find(__u32 opc,
                 if (s->mos_opc_start <= opc && opc < s->mos_opc_end) {
                         h = s->mos_hs + (opc - s->mos_opc_start);
                         if (likely(h->mh_opc != 0))
-                                LASSERT(h->mh_opc == opc);
+                                LASSERTF(h->mh_opc == opc,
+                                         "opcode mismatch %d != %d\n",
+                                         h->mh_opc, opc);
                         else
                                 h = NULL; /* unsupported opc */
                         break;
@@ -2044,6 +2467,7 @@ static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags)
                 struct mdt_device *mdt = info->mti_mdt;
 
                 /* Pack reply. */
+
                 if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
                         req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
                                              mdt->mdt_max_mdsize);
@@ -2176,7 +2600,7 @@ static int mdt_req_handle(struct mdt_thread_info *info,
         }
 
         /* If we're DISCONNECTing, the mdt_export_data is already freed */
-        if (likely(rc == 0 && h->mh_opc != MDS_DISCONNECT))
+        if (likely(rc == 0 && req->rq_export && h->mh_opc != MDS_DISCONNECT))
                 target_committed_to_req(req);
 
         if (unlikely((lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) &&
@@ -2231,9 +2655,22 @@ static void mdt_thread_info_init(struct ptlrpc_request *req,
         info->mti_env = req->rq_svc_thread->t_env;
         ci = md_capainfo(info->mti_env);
         memset(ci, 0, sizeof *ci);
+        if (req->rq_export) {
+                if (exp_connect_rmtclient(req->rq_export))
+                        ci->mc_auth = LC_ID_CONVERT;
+                else if (req->rq_export->exp_connect_flags &
+                         OBD_CONNECT_MDS_CAPA)
+                        ci->mc_auth = LC_ID_PLAIN;
+                else
+                        ci->mc_auth = LC_ID_NONE;
+        }
 
         info->mti_fail_id = OBD_FAIL_MDS_ALL_REPLY_NET;
         info->mti_transno = lustre_msg_get_transno(req->rq_reqmsg);
+        info->mti_mos[0] = NULL;
+        info->mti_mos[1] = NULL;
+        info->mti_mos[2] = NULL;
+        info->mti_mos[3] = NULL;
 
         memset(&info->mti_attr, 0, sizeof(info->mti_attr));
         info->mti_body = NULL;
@@ -2246,6 +2683,7 @@ static void mdt_thread_info_init(struct ptlrpc_request *req,
 
         /* To not check for split by default. */
         info->mti_spec.sp_ck_split = 0;
+        info->mti_spec.no_create = 0;
 }
 
 static void mdt_thread_info_fini(struct mdt_thread_info *info)
@@ -2254,6 +2692,11 @@ static void mdt_thread_info_fini(struct mdt_thread_info *info)
 
         req_capsule_fini(info->mti_pill);
         if (info->mti_object != NULL) {
+                /*
+                 * freeing an object may lead to OSD level transaction, do not
+                 * let it mess with MDT. bz19385.
+                 */
+                info->mti_no_need_trans = 1;
                 mdt_object_put(info->mti_env, info->mti_object);
                 info->mti_object = NULL;
         }
@@ -2413,6 +2856,7 @@ static int mdt_msg_check_version(struct lustre_msg *msg)
         case MDS_GETXATTR:
         case MDS_SETXATTR:
         case MDS_SET_INFO:
+        case MDS_GET_INFO:
         case MDS_QUOTACHECK:
         case MDS_QUOTACTL:
         case QUOTA_DQACQ:
@@ -2479,12 +2923,22 @@ static int mdt_handle0(struct ptlrpc_request *req,
         if (likely(rc == 0)) {
                 rc = mdt_recovery(info);
                 if (likely(rc == +1)) {
+                        switch (lustre_msg_get_opc(msg)) {
+                        case MDS_READPAGE:
+                                req->rq_bulk_read = 1;
+                                break;
+                        case MDS_WRITEPAGE:
+                                req->rq_bulk_write = 1;
+                                break;
+                        }
+
                         h = mdt_handler_find(lustre_msg_get_opc(msg),
                                              supported);
                         if (likely(h != NULL)) {
                                 rc = mdt_req_handle(info, h, req);
                         } else {
-                                CERROR("The unsupported opc: 0x%x\n", lustre_msg_get_opc(msg) );
+                                CERROR("The unsupported opc: 0x%x\n",
+                                       lustre_msg_get_opc(msg) );
                                 req->rq_status = -ENOTSUPP;
                                 rc = ptlrpc_error(req);
                                 RETURN(rc);
@@ -2739,11 +3193,12 @@ int mdt_intent_lock_replace(struct mdt_thread_info *info,
         new_lock->l_remote_handle = lock->l_remote_handle;
         new_lock->l_flags &= ~LDLM_FL_LOCAL;
 
+        unlock_res_and_lock(new_lock);
+
         lustre_hash_add(new_lock->l_export->exp_lock_hash,
                         &new_lock->l_remote_handle,
                         &new_lock->l_exp_hash);
 
-        unlock_res_and_lock(new_lock);
         LDLM_LOCK_RELEASE(new_lock);
         lh->mlh_reg_lh.cookie = 0;
 
@@ -2924,14 +3379,15 @@ static int mdt_intent_reint(enum mdt_it_code opcode,
         }
         rep->lock_policy_res2 = clear_serious(rc);
 
-        lhc->mlh_reg_lh.cookie = 0ull;
-        if (rc == -ENOTCONN || rc == -ENODEV) {
+        if (rc == -ENOTCONN || rc == -ENODEV ||
+            rc == -EOVERFLOW) { /**< if VBR failure then return error */
                 /*
                  * If it is the disconnect error (ENODEV & ENOCONN), the error
                  * will be returned by rq_status, and client at ptlrpc layer
                  * will detect this, then disconnect, reconnect the import
                  * immediately, instead of impacting the following the rpc.
                  */
+                lhc->mlh_reg_lh.cookie = 0ull;
                 RETURN(rc);
         } else {
                 /*
@@ -2942,7 +3398,14 @@ static int mdt_intent_reint(enum mdt_it_code opcode,
                   * FIXME: when open lock is finished, that should be
                   * checked here.
                   */
-                RETURN(ELDLM_LOCK_ABORTED);
+                if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
+                        rep->lock_policy_res2 = 0;
+                        rc = mdt_intent_lock_replace(info, lockp, NULL, lhc, flags);
+                        RETURN(rc);
+                } else {
+                        lhc->mlh_reg_lh.cookie = 0ull;
+                        RETURN(ELDLM_LOCK_ABORTED);
+                }
         }
 }
 
@@ -3081,7 +3544,7 @@ static void mdt_seq_adjust(const struct lu_env *env,
                           struct mdt_device *m, int lost)
 {
         struct md_site *ms = mdt_md_site(m);
-        struct lu_range out;
+        struct lu_seq_range out;
         ENTRY;
 
         LASSERT(ms && ms->ms_server_seq);
@@ -3147,6 +3610,7 @@ static int mdt_seq_init(const struct lu_env *env,
                 rc = seq_server_init(ms->ms_control_seq,
                                      m->mdt_bottom, uuid,
                                      LUSTRE_SEQ_CONTROLLER,
+                                     ms,
                                      env);
 
                 if (rc)
@@ -3188,6 +3652,7 @@ static int mdt_seq_init(const struct lu_env *env,
         rc = seq_server_init(ms->ms_server_seq,
                              m->mdt_bottom, uuid,
                              LUSTRE_SEQ_SERVER,
+                             ms,
                              env);
         if (rc)
                 GOTO(out_seq_fini, rc = -ENOMEM);
@@ -3342,7 +3807,8 @@ static int mdt_fld_init(const struct lu_env *env,
                 RETURN(rc = -ENOMEM);
 
         rc = fld_server_init(ms->ms_server_fld,
-                             m->mdt_bottom, uuid, env);
+                             m->mdt_bottom, uuid,
+                             env, ms->ms_node_id);
         if (rc) {
                 OBD_FREE_PTR(ms->ms_server_fld);
                 ms->ms_server_fld = NULL;
@@ -3388,7 +3854,7 @@ static void mdt_stop_ptlrpc_service(struct mdt_device *m)
                 ptlrpc_unregister_service(m->mdt_fld_service);
                 m->mdt_fld_service = NULL;
         }
-        ENTRY;
+        EXIT;
 }
 
 static int mdt_start_ptlrpc_service(struct mdt_device *m)
@@ -3652,7 +4118,7 @@ err_mdt_svc:
 static void mdt_stack_fini(const struct lu_env *env,
                            struct mdt_device *m, struct lu_device *top)
 {
-        struct obd_device       *obd = m->mdt_md_dev.md_lu_dev.ld_obd;
+        struct obd_device       *obd = mdt2obd_dev(m);
         struct lustre_cfg_bufs  *bufs;
         struct lustre_cfg       *lcfg;
         struct mdt_thread_info  *info;
@@ -3704,20 +4170,12 @@ static struct lu_device *mdt_layer_setup(struct lu_env *env,
                 GOTO(out, rc = -ENODEV);
         }
 
-        rc = lu_context_refill(&env->le_ctx);
+        rc = lu_env_refill((struct lu_env *)env);
         if (rc != 0) {
-                CERROR("Failure to refill context: '%d'\n", rc);
+                CERROR("Failure to refill session: '%d'\n", rc);
                 GOTO(out_type, rc);
         }
 
-        if (env->le_ses != NULL) {
-                rc = lu_context_refill(env->le_ses);
-                if (rc != 0) {
-                        CERROR("Failure to refill session: '%d'\n", rc);
-                        GOTO(out_type, rc);
-                }
-        }
-
         ldt = type->typ_lu;
         if (ldt == NULL) {
                 CERROR("type: '%s'\n", typename);
@@ -3741,6 +4199,7 @@ static struct lu_device *mdt_layer_setup(struct lu_env *env,
                 GOTO(out_alloc, rc);
         }
         lu_device_get(d);
+        lu_ref_add(&d->ld_reference, "lu-stack", &lu_site_init);
 
         RETURN(d);
 
@@ -3754,11 +4213,14 @@ out:
 }
 
 static int mdt_stack_init(struct lu_env *env,
-                          struct mdt_device *m, struct lustre_cfg *cfg)
+                          struct mdt_device *m,
+                          struct lustre_cfg *cfg,
+                          struct lustre_mount_info  *lmi)
 {
         struct lu_device  *d = &m->mdt_md_dev.md_lu_dev;
         struct lu_device  *tmp;
         struct md_device  *md;
+        struct lu_device  *child_lu_dev;
         int rc;
         ENTRY;
 
@@ -3793,7 +4255,15 @@ static int mdt_stack_init(struct lu_env *env,
         /* process setup config */
         tmp = &m->mdt_md_dev.md_lu_dev;
         rc = tmp->ld_ops->ldo_process_config(env, tmp, cfg);
-        GOTO(out, rc);
+        if (rc)
+                GOTO(out, rc);
+
+        /* initialize local objects */
+        child_lu_dev = &m->mdt_child->md_lu_dev;
+
+        rc = child_lu_dev->ld_ops->ldo_prepare(env,
+                                               &m->mdt_md_dev.md_lu_dev,
+                                               child_lu_dev);
 out:
         /* fini from last known good lu_device */
         if (rc)
@@ -3802,21 +4272,77 @@ out:
         return rc;
 }
 
+/**
+ * setup CONFIG_ORIG context, used to access local config log.
+ * this may need to be rewrite as part of llog rewrite for lu-api.
+ */
+static int mdt_obd_llog_setup(struct obd_device *obd,
+                              struct lustre_sb_info *lsi)
+{
+        int     rc;
+
+        LASSERT(obd->obd_fsops == NULL);
+
+        obd->obd_fsops = fsfilt_get_ops(MT_STR(lsi->lsi_ldd));
+        if (IS_ERR(obd->obd_fsops))
+                return PTR_ERR(obd->obd_fsops);
+
+        rc = fsfilt_setup(obd, lsi->lsi_srv_mnt->mnt_sb);
+        if (rc) {
+                fsfilt_put_ops(obd->obd_fsops);
+                return rc;
+        }
+
+        OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt);
+        obd->obd_lvfs_ctxt.pwdmnt = lsi->lsi_srv_mnt;
+        obd->obd_lvfs_ctxt.pwd = lsi->lsi_srv_mnt->mnt_root;
+        obd->obd_lvfs_ctxt.fs = get_ds();
+
+        rc = llog_setup(obd, &obd->obd_olg, LLOG_CONFIG_ORIG_CTXT, obd,
+                        0, NULL, &llog_lvfs_ops);
+        if (rc) {
+                CERROR("llog_setup() failed: %d\n", rc);
+                fsfilt_put_ops(obd->obd_fsops);
+        }
+
+        return rc;
+}
+
+static void mdt_obd_llog_cleanup(struct obd_device *obd)
+{
+        struct llog_ctxt *ctxt;
+
+        ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
+        if (ctxt)
+                llog_cleanup(ctxt);
+
+        if (obd->obd_fsops) {
+                fsfilt_put_ops(obd->obd_fsops);
+                obd->obd_fsops = NULL;
+        }
+}
+
 static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
 {
-        struct md_device *next = m->mdt_child;
-        struct lu_device *d    = &m->mdt_md_dev.md_lu_dev;
-        struct lu_site   *ls   = d->ld_site;
-        struct obd_device *obd = m->mdt_md_dev.md_lu_dev.ld_obd;
+        struct md_device  *next = m->mdt_child;
+        struct lu_device  *d    = &m->mdt_md_dev.md_lu_dev;
+        struct lu_site    *ls   = d->ld_site;
+        struct obd_device *obd = mdt2obd_dev(m);
         ENTRY;
 
+        target_recovery_fini(obd);
+
         ping_evictor_stop();
 
-        target_recovery_fini(obd);
         mdt_stop_ptlrpc_service(m);
-
+        mdt_llog_ctxt_unclone(env, m, LLOG_CHANGELOG_ORIG_CTXT);
+        mdt_obd_llog_cleanup(obd);
+        obd_zombie_barrier();
+#ifdef HAVE_QUOTA_SUPPORT
+        next->md_ops->mdo_quota.mqo_cleanup(env, next);
+#endif
+        lut_fini(env, &m->mdt_lut);
         mdt_fs_cleanup(env, m);
-
         upcall_cache_cleanup(m->mdt_identity_cache);
         m->mdt_identity_cache = NULL;
 
@@ -3825,22 +4351,37 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
                 d->ld_obd->obd_namespace = m->mdt_namespace = NULL;
         }
 
+        cfs_free_nidlist(&m->mdt_nosquash_nids);
+        if (m->mdt_nosquash_str) {
+                OBD_FREE(m->mdt_nosquash_str, m->mdt_nosquash_strlen);
+                m->mdt_nosquash_str = NULL;
+                m->mdt_nosquash_strlen = 0;
+        }
+
         mdt_seq_fini(env, m);
         mdt_seq_fini_cli(m);
         mdt_fld_fini(env, m);
-        mdt_procfs_fini(m);
-        ptlrpc_lprocfs_unregister_obd(d->ld_obd);
-        lprocfs_obd_cleanup(d->ld_obd);
-
         sptlrpc_rule_set_free(&m->mdt_sptlrpc_rset);
 
         next->md_ops->mdo_init_capa_ctxt(env, next, 0, 0, 0, NULL);
         cfs_timer_disarm(&m->mdt_ck_timer);
         mdt_ck_thread_stop(m);
 
-        /* finish the stack */
+        /*
+         * Finish the stack
+         */
         mdt_stack_fini(env, m, md2lu_dev(m->mdt_child));
 
+        mdt_procfs_fini(m);
+        if (obd->obd_proc_exports_entry) {
+                lprocfs_remove_proc_entry("clear", obd->obd_proc_exports_entry);
+                obd->obd_proc_exports_entry = NULL;
+        }
+        lprocfs_free_per_client_stats(obd);
+        lprocfs_free_obd_stats(obd);
+        ptlrpc_lprocfs_unregister_obd(obd);
+        lprocfs_obd_cleanup(obd);
+
         if (ls) {
                 struct md_site *mite;
 
@@ -3850,15 +4391,40 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
                 d->ld_site = NULL;
         }
         LASSERT(atomic_read(&d->ld_ref) == 0);
-        md_device_fini(&m->mdt_md_dev);
 
         EXIT;
 }
 
+static int mdt_adapt_sptlrpc_conf(struct obd_device *obd, int initial)
+{
+        struct mdt_device       *m = mdt_dev(obd->obd_lu_dev);
+        struct sptlrpc_rule_set  tmp_rset;
+        int                      rc;
+
+        sptlrpc_rule_set_init(&tmp_rset);
+        rc = sptlrpc_conf_target_get_rules(obd, &tmp_rset, initial);
+        if (rc) {
+                CERROR("mdt %s: failed get sptlrpc rules: %d\n",
+                       obd->obd_name, rc);
+                return rc;
+        }
+
+        sptlrpc_target_update_exp_flavor(obd, &tmp_rset);
+
+        write_lock(&m->mdt_sptlrpc_lock);
+        sptlrpc_rule_set_free(&m->mdt_sptlrpc_rset);
+        m->mdt_sptlrpc_rset = tmp_rset;
+        write_unlock(&m->mdt_sptlrpc_lock);
+
+        return 0;
+}
+
 static void fsoptions_to_mdt_flags(struct mdt_device *m, char *options)
 {
         char *p = options;
 
+        m->mdt_opts.mo_mds_capa = 1;
+        m->mdt_opts.mo_oss_capa = 1;
 #ifdef CONFIG_FS_POSIX_ACL
         /* ACLs should be enabled by default (b=13829) */
         m->mdt_opts.mo_acl = 1;
@@ -3906,14 +4472,32 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
         struct obd_device         *obd;
         const char                *dev = lustre_cfg_string(cfg, 0);
         const char                *num = lustre_cfg_string(cfg, 2);
-        struct lustre_mount_info  *lmi;
+        struct lustre_mount_info  *lmi = NULL;
         struct lustre_sb_info     *lsi;
+        struct lustre_disk_data   *ldd;
         struct lu_site            *s;
         struct md_site            *mite;
         const char                *identity_upcall = "NONE";
+#ifdef HAVE_QUOTA_SUPPORT
+        struct md_device          *next;
+#endif
         int                        rc;
+        int                        node_id;
         ENTRY;
 
+        md_device_init(&m->mdt_md_dev, ldt);
+        /*
+         * Environment (env) might be missing mdt_thread_key values at that
+         * point, if device is allocated when mdt_thread_key is in QUIESCENT
+         * mode.
+         *
+         * Usually device allocation path doesn't use module key values, but
+         * mdt has to do a lot of work here, so allocate key value.
+         */
+        rc = lu_env_refill((struct lu_env *)env);
+        if (rc != 0)
+                RETURN(rc);
+
         info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
         LASSERT(info != NULL);
 
@@ -3924,9 +4508,11 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
 
         m->mdt_max_mdsize = MAX_MD_SIZE;
         m->mdt_max_cookiesize = sizeof(struct llog_cookie);
+        m->mdt_som_conf = 0;
 
         m->mdt_opts.mo_user_xattr = 0;
         m->mdt_opts.mo_acl = 0;
+        m->mdt_opts.mo_cos = MDT_COS_DEFAULT;
         lmi = server_get_mount_2(dev);
         if (lmi == NULL) {
                 CERROR("Cannot get mount info for %s!\n", dev);
@@ -3934,10 +4520,17 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
         } else {
                 lsi = s2lsi(lmi->lmi_sb);
                 fsoptions_to_mdt_flags(m, lsi->lsi_lmd->lmd_opts);
-                server_put_mount_2(dev, lmi->lmi_mnt);
+                /* CMD is supported only in IAM mode */
+                ldd = lsi->lsi_ldd;
+                LASSERT(num);
+                node_id = simple_strtol(num, NULL, 10);
+                if (!(ldd->ldd_flags & LDD_F_IAM_DIR) && node_id) {
+                        CERROR("CMD Operation not allowed in IOP mode\n");
+                        GOTO(err_lmi, rc = -EINVAL);
+                }
         }
 
-        m->mdt_sptlrpc_lock = RW_LOCK_UNLOCKED;
+        rwlock_init(&m->mdt_sptlrpc_lock);
         sptlrpc_rule_set_init(&m->mdt_sptlrpc_rset);
 
         spin_lock_init(&m->mdt_ioepoch_lock);
@@ -3945,14 +4538,19 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
         m->mdt_capa_timeout = CAPA_TIMEOUT;
         m->mdt_capa_alg = CAPA_HMAC_ALG_SHA1;
         m->mdt_ck_timeout = CAPA_KEY_TIMEOUT;
+        m->mdt_squash_uid = 0;
+        m->mdt_squash_gid = 0;
+        CFS_INIT_LIST_HEAD(&m->mdt_nosquash_nids);
+        m->mdt_nosquash_str = NULL;
+        m->mdt_nosquash_strlen = 0;
+        init_rwsem(&m->mdt_squash_sem);
 
         spin_lock_init(&m->mdt_client_bitmap_lock);
 
         OBD_ALLOC_PTR(mite);
         if (mite == NULL)
-                RETURN(-ENOMEM);
+                GOTO(err_lmi, rc = -ENOMEM);
 
-        md_device_init(&m->mdt_md_dev, ldt);
         s = &mite->ms_lu;
 
         m->mdt_md_dev.md_lu_dev.ld_ops = &mdt_lu_ops;
@@ -3980,13 +4578,19 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
                 GOTO(err_fini_proc, rc);
         }
 
+        obd->obd_proc_exports_entry = proc_mkdir("exports",
+                                                 obd->obd_proc_entry);
+        if (obd->obd_proc_exports_entry)
+                lprocfs_add_simple(obd->obd_proc_exports_entry,
+                                   "clear", lprocfs_nid_stats_clear_read,
+                                   lprocfs_nid_stats_clear_write, obd, NULL);
+
         /* set server index */
-        LASSERT(num);
-        lu_site2md(s)->ms_node_id = simple_strtol(num, NULL, 10);
+        lu_site2md(s)->ms_node_id = node_id;
 
         /* failover is the default
          * FIXME: we do not failout mds0/mgs, which may cause some problems.
-         * assumed whose ls_node_id == 0 XXX
+         * assumed whose ms_node_id == 0 XXX
          * */
         obd->obd_replayable = 1;
         /* No connection accepted until configurations will finish */
@@ -4001,16 +4605,20 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
         }
 
         /* init the stack */
-        rc = mdt_stack_init((struct lu_env *)env, m, cfg);
+        rc = mdt_stack_init((struct lu_env *)env, m, cfg, lmi);
         if (rc) {
                 CERROR("Can't init device stack, rc %d\n", rc);
                 GOTO(err_fini_proc, rc);
         }
 
-        rc = mdt_fld_init(env, obd->obd_name, m);
+        rc = lut_init(env, &m->mdt_lut, obd, m->mdt_bottom);
         if (rc)
                 GOTO(err_fini_stack, rc);
 
+        rc = mdt_fld_init(env, obd->obd_name, m);
+        if (rc)
+                GOTO(err_lut, rc);
+
         rc = mdt_seq_init(env, obd->obd_name, m);
         if (rc)
                 GOTO(err_fini_fld, rc);
@@ -4046,15 +4654,35 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
         if (rc)
                 GOTO(err_free_ns, rc);
 
-        rc = mdt_fs_setup(env, m, obd);
+        rc = mdt_fs_setup(env, m, obd, lsi);
         if (rc)
                 GOTO(err_capa, rc);
 
-        target_recovery_init(obd, mdt_recovery_handle);
+        rc = mdt_obd_llog_setup(obd, lsi);
+        if (rc)
+                GOTO(err_fs_cleanup, rc);
+
+        rc = mdt_llog_ctxt_clone(env, m, LLOG_CHANGELOG_ORIG_CTXT);
+        if (rc)
+                GOTO(err_llog_cleanup, rc);
+
+        mdt_adapt_sptlrpc_conf(obd, 1);
+
+#ifdef HAVE_QUOTA_SUPPORT
+        next = m->mdt_child;
+        rc = next->md_ops->mdo_quota.mqo_setup(env, next, lmi->lmi_mnt);
+        if (rc)
+                GOTO(err_llog_cleanup, rc);
+#endif
+
+        server_put_mount_2(dev, lmi->lmi_mnt);
+        lmi = NULL;
+
+        target_recovery_init(&m->mdt_lut, mdt_recovery_handle);
 
         rc = mdt_start_ptlrpc_service(m);
         if (rc)
-                GOTO(err_fs_cleanup, rc);
+                GOTO(err_recovery, rc);
 
         ping_evictor_start();
 
@@ -4078,8 +4706,15 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
 err_stop_service:
         ping_evictor_stop();
         mdt_stop_ptlrpc_service(m);
-err_fs_cleanup:
+err_recovery:
         target_recovery_fini(obd);
+#ifdef HAVE_QUOTA_SUPPORT
+        next->md_ops->mdo_quota.mqo_cleanup(env, next);
+#endif
+err_llog_cleanup:
+        mdt_llog_ctxt_unclone(env, m, LLOG_CHANGELOG_ORIG_CTXT);
+        mdt_obd_llog_cleanup(obd);
+err_fs_cleanup:
         mdt_fs_cleanup(env, m);
 err_capa:
         cfs_timer_disarm(&m->mdt_ck_timer);
@@ -4093,18 +4728,25 @@ err_fini_seq:
         mdt_seq_fini(env, m);
 err_fini_fld:
         mdt_fld_fini(env, m);
+err_lut:
+        lut_fini(env, &m->mdt_lut);
 err_fini_stack:
         mdt_stack_fini(env, m, md2lu_dev(m->mdt_child));
 err_fini_proc:
         mdt_procfs_fini(m);
+        if (obd->obd_proc_exports_entry) {
+                lprocfs_remove_proc_entry("clear", obd->obd_proc_exports_entry);
+                obd->obd_proc_exports_entry = NULL;
+        }
         ptlrpc_lprocfs_unregister_obd(obd);
         lprocfs_obd_cleanup(obd);
 err_fini_site:
         lu_site_fini(s);
 err_free_site:
-        OBD_FREE_PTR(s);
-
-        md_device_fini(&m->mdt_md_dev);
+        OBD_FREE_PTR(mite);
+err_lmi:
+        if (lmi)
+                server_put_mount_2(dev, lmi->lmi_mnt);
         return (rc);
 }
 
@@ -4119,42 +4761,27 @@ static int mdt_process_config(const struct lu_env *env,
         ENTRY;
 
         switch (cfg->lcfg_command) {
-        case LCFG_SPTLRPC_CONF: {
-                struct sptlrpc_conf_log *log;
-                struct sptlrpc_rule_set  tmp_rset;
-
-                log = sptlrpc_conf_log_extract(cfg);
-                if (IS_ERR(log)) {
-                        rc = PTR_ERR(log);
-                        break;
-                }
-
-                sptlrpc_rule_set_init(&tmp_rset);
-
-                rc = sptlrpc_rule_set_from_log(&tmp_rset, log);
-                if (rc) {
-                        CERROR("mdt %p: failed get sptlrpc rules: %d\n", m, rc);
-                        break;
-                }
-
-                write_lock(&m->mdt_sptlrpc_lock);
-                sptlrpc_rule_set_free(&m->mdt_sptlrpc_rset);
-                m->mdt_sptlrpc_rset = tmp_rset;
-                write_unlock(&m->mdt_sptlrpc_lock);
-
-                sptlrpc_target_update_exp_flavor(
-                                md2lu_dev(&m->mdt_md_dev)->ld_obd, &tmp_rset);
-
-                break;
-        }
         case LCFG_PARAM: {
                 struct lprocfs_static_vars lvars;
                 struct obd_device *obd = d->ld_obd;
 
+                /*
+                 * For interoperability between 1.8 and 2.0,
+                 * skip old "mdt.group_upcall" param.
+                 */
+                {
+                        char *param = lustre_cfg_string(cfg, 1);
+                        if (param && !strncmp("mdt.group_upcall", param, 16)) {
+                                CWARN("For 1.8 interoperability, skip this"
+                                       " mdt.group_upcall. It is obsolete\n");
+                                break;
+                        }
+                }
+
                 lprocfs_mdt_init_vars(&lvars);
                 rc = class_process_proc_param(PARAM_MDT, lvars.obd_vars,
                                               cfg, obd);
-                if (rc == -ENOSYS)
+                if (rc > 0 || rc == -ENOSYS)
                         /* we don't understand; pass it on */
                         rc = next->ld_ops->ldo_process_config(env, next, cfg);
                 break;
@@ -4240,23 +4867,51 @@ static void mdt_object_free(const struct lu_env *env, struct lu_object *o)
         EXIT;
 }
 
+static int mdt_object_print(const struct lu_env *env, void *cookie,
+                            lu_printer_t p, const struct lu_object *o)
+{
+        struct mdt_object *mdto = mdt_obj((struct lu_object *)o);
+        return (*p)(env, cookie, LUSTRE_MDT_NAME"-object@%p(ioepoch=%llu "
+                    "flags=%llx, epochcount=%d, writecount=%d)",
+                    mdto, mdto->mot_ioepoch, mdto->mot_flags,
+                    mdto->mot_epochcount, mdto->mot_writecount);
+}
+
 static const struct lu_device_operations mdt_lu_ops = {
         .ldo_object_alloc   = mdt_object_alloc,
-        .ldo_process_config = mdt_process_config
+        .ldo_process_config = mdt_process_config,
 };
 
 static const struct lu_object_operations mdt_obj_ops = {
         .loo_object_init    = mdt_object_init,
-        .loo_object_free    = mdt_object_free
+        .loo_object_free    = mdt_object_free,
+        .loo_object_print   = mdt_object_print
 };
 
+static int mdt_obd_set_info_async(struct obd_export *exp,
+                                  __u32 keylen, void *key,
+                                  __u32 vallen, void *val,
+                                  struct ptlrpc_request_set *set)
+{
+        struct obd_device     *obd = exp->exp_obd;
+        int                    rc;
+        ENTRY;
+
+        LASSERT(obd);
+
+        if (KEY_IS(KEY_SPTLRPC_CONF)) {
+                rc = mdt_adapt_sptlrpc_conf(obd, 0);
+                RETURN(rc);
+        }
+
+        RETURN(0);
+}
+
 /* mds_connect_internal */
 static int mdt_connect_internal(struct obd_export *exp,
                                 struct mdt_device *mdt,
                                 struct obd_connect_data *data)
 {
-        __u64 flags;
-
         if (data != NULL) {
                 data->ocd_connect_flags &= MDT_CONNECT_SUPPORTED;
                 data->ocd_ibits_known &= MDS_INODELOCK_FULL;
@@ -4274,12 +4929,9 @@ static int mdt_connect_internal(struct obd_export *exp,
                 if (!mdt->mdt_opts.mo_user_xattr)
                         data->ocd_connect_flags &= ~OBD_CONNECT_XATTR;
 
-                if (!mdt->mdt_opts.mo_mds_capa)
-                        data->ocd_connect_flags &= ~OBD_CONNECT_MDS_CAPA;
-
-                if (!mdt->mdt_opts.mo_oss_capa)
-                        data->ocd_connect_flags &= ~OBD_CONNECT_OSS_CAPA;
-
+                if (!mdt->mdt_som_conf)
+                        data->ocd_connect_flags &= ~OBD_CONNECT_SOM;
+                
                 spin_lock(&exp->exp_lock);
                 exp->exp_connect_flags = data->ocd_connect_flags;
                 spin_unlock(&exp->exp_lock);
@@ -4296,88 +4948,103 @@ static int mdt_connect_internal(struct obd_export *exp,
         }
 #endif
 
-        flags = OBD_CONNECT_LCL_CLIENT | OBD_CONNECT_RMT_CLIENT;
-        if ((exp->exp_connect_flags & flags) == flags) {
-                CWARN("%s: both local and remote client flags are set\n",
+        if ((exp->exp_connect_flags & OBD_CONNECT_FID) == 0) {
+                CWARN("%s: MDS requires FID support, but client not\n",
                       mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name);
                 return -EBADE;
         }
 
-        if (mdt->mdt_opts.mo_mds_capa &&
-            ((exp->exp_connect_flags & OBD_CONNECT_MDS_CAPA) == 0)) {
-                CWARN("%s: MDS requires capability support, but client not\n",
-                      mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name);
+        if (mdt->mdt_som_conf &&
+            !(exp->exp_connect_flags & OBD_CONNECT_MDS_MDS) &&
+            !(exp->exp_connect_flags & OBD_CONNECT_SOM)) {
+                CWARN("%s: MDS has SOM enabled, but client does not support "
+                      "it\n", mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name);
                 return -EBADE;
         }
 
-        if (mdt->mdt_opts.mo_oss_capa &&
-            ((exp->exp_connect_flags & OBD_CONNECT_OSS_CAPA) == 0)) {
-                CWARN("%s: MDS requires OSS capability support, "
-                      "but client not\n",
-                      mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name);
-                return -EBADE;
-        }
+        return 0;
+}
 
-        if ((exp->exp_connect_flags & OBD_CONNECT_FID) == 0) {
-                CWARN("%s: MDS requires FID support, but client not\n",
-                      mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name);
-                return -EBADE;
+static int mdt_connect_check_sptlrpc(struct mdt_device *mdt,
+                                     struct obd_export *exp,
+                                     struct ptlrpc_request *req)
+{
+        struct sptlrpc_flavor   flvr;
+        int                     rc = 0;
+
+        if (exp->exp_flvr.sf_rpc == SPTLRPC_FLVR_INVALID) {
+                read_lock(&mdt->mdt_sptlrpc_lock);
+                sptlrpc_target_choose_flavor(&mdt->mdt_sptlrpc_rset,
+                                             req->rq_sp_from,
+                                             req->rq_peer.nid,
+                                             &flvr);
+                read_unlock(&mdt->mdt_sptlrpc_lock);
+
+                spin_lock(&exp->exp_lock);
+
+                exp->exp_sp_peer = req->rq_sp_from;
+                exp->exp_flvr = flvr;
+
+                if (exp->exp_flvr.sf_rpc != SPTLRPC_FLVR_ANY &&
+                    exp->exp_flvr.sf_rpc != req->rq_flvr.sf_rpc) {
+                        CERROR("unauthorized rpc flavor %x from %s, "
+                               "expect %x\n", req->rq_flvr.sf_rpc,
+                               libcfs_nid2str(req->rq_peer.nid),
+                               exp->exp_flvr.sf_rpc);
+                        rc = -EACCES;
+                }
+
+                spin_unlock(&exp->exp_lock);
+        } else {
+                if (exp->exp_sp_peer != req->rq_sp_from) {
+                        CERROR("RPC source %s doesn't match %s\n",
+                               sptlrpc_part2name(req->rq_sp_from),
+                               sptlrpc_part2name(exp->exp_sp_peer));
+                        rc = -EACCES;
+                } else {
+                        rc = sptlrpc_target_export_check(exp, req);
+                }
         }
 
-        return 0;
+        return rc;
 }
 
 /* mds_connect copy */
 static int mdt_obd_connect(const struct lu_env *env,
-                           struct lustre_handle *conn, struct obd_device *obd,
+                           struct obd_export **exp, struct obd_device *obd,
                            struct obd_uuid *cluuid,
                            struct obd_connect_data *data,
                            void *localdata)
 {
         struct mdt_thread_info *info;
         struct lsd_client_data *lcd;
-        struct obd_export      *exp;
+        struct obd_export      *lexp;
+        struct lustre_handle    conn = { 0 };
         struct mdt_device      *mdt;
         struct ptlrpc_request  *req;
         int                     rc;
         ENTRY;
 
         LASSERT(env != NULL);
-        if (!conn || !obd || !cluuid)
+        if (!exp || !obd || !cluuid)
                 RETURN(-EINVAL);
 
         info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
         req = info->mti_pill->rc_req;
         mdt = mdt_dev(obd->obd_lu_dev);
 
-        rc = class_connect(conn, obd, cluuid);
+        rc = class_connect(&conn, obd, cluuid);
         if (rc)
                 RETURN(rc);
 
-        exp = class_conn2export(conn);
-        LASSERT(exp != NULL);
+        lexp = class_conn2export(&conn);
+        LASSERT(lexp != NULL);
 
-        CDEBUG(D_SEC, "from %s\n", sptlrpc_part2name(req->rq_sp_from));
-
-        spin_lock(&exp->exp_lock);
-        exp->exp_sp_peer = req->rq_sp_from;
-
-        read_lock(&mdt->mdt_sptlrpc_lock);
-        sptlrpc_rule_set_choose(&mdt->mdt_sptlrpc_rset, exp->exp_sp_peer,
-                                req->rq_peer.nid, &exp->exp_flvr);
-        read_unlock(&mdt->mdt_sptlrpc_lock);
-
-        if (exp->exp_flvr.sf_rpc != req->rq_flvr.sf_rpc) {
-                CERROR("invalid rpc flavor %x, expect %x, from %s\n",
-                       req->rq_flvr.sf_rpc, exp->exp_flvr.sf_rpc,
-                       libcfs_nid2str(req->rq_peer.nid));
-                exp->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
-                spin_unlock(&exp->exp_lock);
-                RETURN(-EACCES);
-        }
-        spin_unlock(&exp->exp_lock);
+        rc = mdt_connect_check_sptlrpc(mdt, lexp, req);
+        if (rc)
+                GOTO(out, rc);
 
-        rc = mdt_connect_internal(exp, mdt, data);
+        rc = mdt_connect_internal(lexp, mdt, data);
         if (rc == 0) {
                 OBD_ALLOC_PTR(lcd);
                 if (lcd != NULL) {
@@ -4385,22 +5052,25 @@ static int mdt_obd_connect(const struct lu_env *env,
                         mti = lu_context_key_get(&env->le_ctx,
                                                  &mdt_thread_key);
                         LASSERT(mti != NULL);
-                        mti->mti_exp = exp;
+                        mti->mti_exp = lexp;
                         memcpy(lcd->lcd_uuid, cluuid, sizeof lcd->lcd_uuid);
-                        exp->exp_mdt_data.med_lcd = lcd;
+                        lexp->exp_mdt_data.med_lcd = lcd;
                         rc = mdt_client_new(env, mdt);
                         if (rc != 0) {
                                 OBD_FREE_PTR(lcd);
-                                exp->exp_mdt_data.med_lcd = NULL;
+                                lexp->exp_mdt_data.med_lcd = NULL;
+                        } else {
+                                mdt_export_stats_init(obd, lexp, localdata);
                         }
                 } else
                         rc = -ENOMEM;
         }
 
+out:
         if (rc != 0)
-                class_disconnect(exp);
+                class_disconnect(lexp);
         else
-                class_export_put(exp);
+                *exp = lexp;
 
         RETURN(rc);
 }
@@ -4408,7 +5078,8 @@ static int mdt_obd_connect(const struct lu_env *env,
 static int mdt_obd_reconnect(const struct lu_env *env,
                              struct obd_export *exp, struct obd_device *obd,
                              struct obd_uuid *cluuid,
-                             struct obd_connect_data *data)
+                             struct obd_connect_data *data,
+                             void *localdata)
 {
         struct mdt_thread_info *info;
         struct mdt_device      *mdt;
@@ -4423,30 +5094,88 @@ static int mdt_obd_reconnect(const struct lu_env *env,
         req = info->mti_pill->rc_req;
         mdt = mdt_dev(obd->obd_lu_dev);
 
-        CDEBUG(D_SEC, "from %s\n", sptlrpc_part2name(req->rq_sp_from));
+        rc = mdt_connect_check_sptlrpc(mdt, exp, req);
+        if (rc)
+                RETURN(rc);
 
-        spin_lock(&exp->exp_lock);
-        if (exp->exp_flvr.sf_rpc == SPTLRPC_FLVR_INVALID) {
-                exp->exp_sp_peer = req->rq_sp_from;
+        rc = mdt_connect_internal(exp, mdt_dev(obd->obd_lu_dev), data);
+        if (rc == 0)
+                mdt_export_stats_init(obd, exp, localdata);
 
-                read_lock(&mdt->mdt_sptlrpc_lock);
-                sptlrpc_rule_set_choose(&mdt->mdt_sptlrpc_rset,
-                                        exp->exp_sp_peer,
-                                        req->rq_peer.nid, &exp->exp_flvr);
-                read_unlock(&mdt->mdt_sptlrpc_lock);
+        RETURN(rc);
+}
+static int mdt_mfd_cleanup(struct obd_export *exp)
+{
+        struct mdt_export_data *med = &exp->exp_mdt_data;
+        struct obd_device      *obd = exp->exp_obd;
+        struct mdt_device      *mdt;
+        struct mdt_thread_info *info;
+        struct lu_env           env;
+        CFS_LIST_HEAD(closing_list);
+        struct mdt_file_data *mfd, *n;
+        int rc = 0;
+        ENTRY;
 
-                if (exp->exp_flvr.sf_rpc != req->rq_flvr.sf_rpc) {
-                        CERROR("invalid rpc flavor %x, expect %x, from %s\n",
-                               req->rq_flvr.sf_rpc, exp->exp_flvr.sf_rpc,
-                               libcfs_nid2str(req->rq_peer.nid));
-                        exp->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
-                        spin_unlock(&exp->exp_lock);
-                        RETURN(-EACCES);
-                }
+        spin_lock(&med->med_open_lock);
+        while (!list_empty(&med->med_open_head)) {
+                struct list_head *tmp = med->med_open_head.next;
+                mfd = list_entry(tmp, struct mdt_file_data, mfd_list);
+
+                /* Remove mfd handle so it can't be found again.
+                 * We are consuming the mfd_list reference here. */
+                class_handle_unhash(&mfd->mfd_handle);
+                list_move_tail(&mfd->mfd_list, &closing_list);
         }
-        spin_unlock(&exp->exp_lock);
+        spin_unlock(&med->med_open_lock);
+        mdt = mdt_dev(obd->obd_lu_dev);
+        LASSERT(mdt != NULL);
 
-        rc = mdt_connect_internal(exp, mdt_dev(obd->obd_lu_dev), data);
+        rc = lu_env_init(&env, LCT_MD_THREAD);
+        if (rc)
+                RETURN(rc);
+
+        info = lu_context_key_get(&env.le_ctx, &mdt_thread_key);
+        LASSERT(info != NULL);
+        memset(info, 0, sizeof *info);
+        info->mti_env = &env;
+        info->mti_mdt = mdt;
+        info->mti_exp = exp;
+
+        if (!list_empty(&closing_list)) {
+                struct md_attr *ma = &info->mti_attr;
+                int lmm_size;
+                int cookie_size;
+
+                lmm_size = mdt->mdt_max_mdsize;
+                cookie_size = mdt->mdt_max_cookiesize;
+                OBD_ALLOC(ma->ma_lmm, lmm_size);
+                if (ma->ma_lmm == NULL)
+                        GOTO(out_lmm, rc = -ENOMEM);
+                OBD_ALLOC(ma->ma_cookie, cookie_size);
+                if (ma->ma_cookie == NULL)
+                        GOTO(out_cookie, rc = -ENOMEM);
+
+                /* Close any open files (which may also cause orphan unlinking). */
+                list_for_each_entry_safe(mfd, n, &closing_list, mfd_list) {
+                        list_del_init(&mfd->mfd_list);
+                        /* TODO: if we close the unlinked file,
+                         * we need to remove its objects from OST */
+                        memset(&ma->ma_attr, 0, sizeof(ma->ma_attr));
+                        ma->ma_lmm_size = lmm_size;
+                        ma->ma_cookie_size = cookie_size;
+                        ma->ma_need = MA_LOV | MA_COOKIE;
+                        ma->ma_valid = 0;
+                        mdt_mfd_close(info, mfd);
+                }
+                info->mti_mdt = NULL;
+                OBD_FREE(ma->ma_cookie, cookie_size);
+                ma->ma_cookie = NULL;
+out_cookie:
+                OBD_FREE(ma->ma_lmm, lmm_size);
+                ma->ma_lmm = NULL;
+        }
+out_lmm:
+        lu_env_fini(&env);
 
         RETURN(rc);
 }
@@ -4478,11 +5207,13 @@ static int mdt_obd_disconnect(struct obd_export *exp)
 
                 spin_lock(&svc->srv_lock);
                 list_del_init(&rs->rs_exp_list);
+                spin_lock(&rs->rs_lock);
                 ptlrpc_schedule_difficult_reply(rs);
+                spin_unlock(&rs->rs_lock);
                 spin_unlock(&svc->srv_lock);
         }
         spin_unlock(&exp->exp_lock);
-
+        rc = mdt_mfd_cleanup(exp);
         class_export_put(exp);
         RETURN(rc);
 }
@@ -4514,26 +5245,25 @@ static int mdt_destroy_export(struct obd_export *export)
         struct mdt_device      *mdt;
         struct mdt_thread_info *info;
         struct lu_env           env;
-        struct md_attr         *ma;
-        int lmm_size;
-        int cookie_size;
         int rc = 0;
         ENTRY;
 
         med = &export->exp_mdt_data;
-        if (med->med_rmtclient)
+        if (exp_connect_rmtclient(export))
                 mdt_cleanup_idmap(med);
 
         target_destroy_export(export);
         ldlm_destroy_export(export);
 
+        LASSERT(list_empty(&export->exp_outstanding_replies));
+        LASSERT(list_empty(&med->med_open_head));
         if (obd_uuid_equals(&export->exp_client_uuid, &obd->obd_uuid))
                 RETURN(0);
 
         mdt = mdt_dev(obd->obd_lu_dev);
         LASSERT(mdt != NULL);
 
-        rc = lu_env_init(&env, NULL, LCT_MD_THREAD);
+        rc = lu_env_init(&env, LCT_MD_THREAD);
         if (rc)
                 RETURN(rc);
 
@@ -4541,74 +5271,30 @@ static int mdt_destroy_export(struct obd_export *export)
         LASSERT(info != NULL);
         memset(info, 0, sizeof *info);
         info->mti_env = &env;
-        info->mti_mdt = mdt;
         info->mti_exp = export;
-
-        ma = &info->mti_attr;
-        lmm_size = ma->ma_lmm_size = mdt->mdt_max_mdsize;
-        cookie_size = ma->ma_cookie_size = mdt->mdt_max_cookiesize;
-        OBD_ALLOC(ma->ma_lmm, lmm_size);
-        OBD_ALLOC(ma->ma_cookie, cookie_size);
-
-        if (ma->ma_lmm == NULL || ma->ma_cookie == NULL)
-                GOTO(out, rc = -ENOMEM);
-        ma->ma_need = MA_LOV | MA_COOKIE;
-        ma->ma_valid = 0;
-        /* Close any open files (which may also cause orphan unlinking). */
-        spin_lock(&med->med_open_lock);
-        while (!list_empty(&med->med_open_head)) {
-                struct list_head *tmp = med->med_open_head.next;
-                struct mdt_file_data *mfd =
-                        list_entry(tmp, struct mdt_file_data, mfd_list);
-
-                /* Remove mfd handle so it can't be found again.
-                 * We are consuming the mfd_list reference here. */
-                class_handle_unhash(&mfd->mfd_handle);
-                list_del_init(&mfd->mfd_list);
-                spin_unlock(&med->med_open_lock);
-                mdt_mfd_close(info, mfd);
-                /* TODO: if we close the unlinked file,
-                 * we need to remove it's objects from OST */
-                memset(&ma->ma_attr, 0, sizeof(ma->ma_attr));
-                spin_lock(&med->med_open_lock);
-                ma->ma_lmm_size = lmm_size;
-                ma->ma_cookie_size = cookie_size;
-                ma->ma_need = MA_LOV | MA_COOKIE;
-                ma->ma_valid = 0;
-        }
-        spin_unlock(&med->med_open_lock);
         info->mti_mdt = NULL;
         mdt_client_del(&env, mdt);
 
-        EXIT;
-out:
-        if (lmm_size) {
-                OBD_FREE(ma->ma_lmm, lmm_size);
-                ma->ma_lmm = NULL;
-        }
-        if (cookie_size) {
-                OBD_FREE(ma->ma_cookie, cookie_size);
-                ma->ma_cookie = NULL;
-        }
         lu_env_fini(&env);
-
-        return rc;
+        RETURN(rc);
 }
 
 static void mdt_allow_cli(struct mdt_device *m, unsigned int flag)
 {
         if (flag & CONFIG_LOG)
                 m->mdt_fl_cfglog = 1;
+
+        /* also notify active event */
         if (flag & CONFIG_SYNC)
                 m->mdt_fl_synced = 1;
 
-        if (m->mdt_fl_cfglog /* bz11778: && m->mdt_fl_synced */)
+        if (m->mdt_fl_cfglog && m->mdt_fl_synced)
                 /* Open for clients */
                 m->mdt_md_dev.md_lu_dev.ld_obd->obd_no_conn = 0;
 }
 
 static int mdt_upcall(const struct lu_env *env, struct md_device *md,
-                      enum md_upcall_event ev)
+                      enum md_upcall_event ev, void *data)
 {
         struct mdt_device *m = mdt_dev(&md->md_lu_dev);
         struct md_device  *next  = m->mdt_child;
@@ -4624,6 +5310,8 @@ static int mdt_upcall(const struct lu_env *env, struct md_device *md,
                         CDEBUG(D_INFO, "get max mdsize %d max cookiesize %d\n",
                                      m->mdt_max_mdsize, m->mdt_max_cookiesize);
                         mdt_allow_cli(m, CONFIG_SYNC);
+                        if (data)
+                                (*(__u64 *)data) = m->mdt_mount_count;
                         break;
                 case MD_NO_TRANS:
                         mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
@@ -4634,6 +5322,13 @@ static int mdt_upcall(const struct lu_env *env, struct md_device *md,
                         /* Check that MDT is not yet configured */
                         LASSERT(!m->mdt_fl_cfglog);
                         break;
+#ifdef HAVE_QUOTA_SUPPORT
+                case MD_LOV_QUOTA:
+                        if (md->md_lu_dev.ld_obd->obd_recovering == 0 &&
+                            likely(md->md_lu_dev.ld_obd->obd_stopping == 0))
+                                next->md_ops->mdo_quota.mqo_recovery(env, next);
+                        break;
+#endif
                 default:
                         CERROR("invalid event\n");
                         rc = -EINVAL;
@@ -4646,11 +5341,21 @@ static int mdt_obd_notify(struct obd_device *host,
                           struct obd_device *watched,
                           enum obd_notify_event ev, void *data)
 {
+        struct mdt_device *mdt = mdt_dev(host->obd_lu_dev);
+#ifdef HAVE_QUOTA_SUPPORT
+        struct md_device *next = mdt->mdt_child;
+#endif
         ENTRY;
 
         switch (ev) {
         case OBD_NOTIFY_CONFIG:
-                mdt_allow_cli(mdt_dev(host->obd_lu_dev), (unsigned long)data);
+                mdt_allow_cli(mdt, (unsigned long)data);
+
+#ifdef HAVE_QUOTA_SUPPORT
+               /* quota_type has been processed, we can now handle
+                * incoming quota requests */
+                next->md_ops->mdo_quota.mqo_notify(NULL, next);
+#endif
                 break;
         default:
                 CDEBUG(D_INFO, "Unhandled notification %#x\n", ev);
@@ -4658,18 +5363,146 @@ static int mdt_obd_notify(struct obd_device *host,
         RETURN(0);
 }
 
+static int mdt_rpc_fid2path(struct mdt_thread_info *info, void *key,
+                            void *val, int vallen)
+{
+        struct mdt_device *mdt = mdt_dev(info->mti_exp->exp_obd->obd_lu_dev);
+        struct getinfo_fid2path *fpout, *fpin;
+        int rc = 0;
+
+        fpin = key + size_round(sizeof(KEY_FID2PATH));
+        fpout = val;
+
+        if (lustre_msg_swabbed(mdt_info_req(info)->rq_reqmsg))
+                lustre_swab_fid2path(fpin);
+
+        memcpy(fpout, fpin, sizeof(*fpin));
+        if (fpout->gf_pathlen != vallen - sizeof(*fpin))
+                RETURN(-EINVAL);
+
+        rc = mdt_fid2path(info->mti_env, mdt, fpout);
+        RETURN(rc);
+}
+
+static int mdt_fid2path(const struct lu_env *env, struct mdt_device *mdt,
+                        struct getinfo_fid2path *fp)
+{
+        struct mdt_object *obj;
+        int    rc;
+        ENTRY;
+
+        CDEBUG(D_IOCTL, "path get "DFID" from "LPU64" #%d\n",
+               PFID(&fp->gf_fid), fp->gf_recno, fp->gf_linkno);
+
+        if (!fid_is_sane(&fp->gf_fid))
+                RETURN(-EINVAL);
+
+        obj = mdt_object_find(env, mdt, &fp->gf_fid);
+        if (obj == NULL || IS_ERR(obj)) {
+                CDEBUG(D_IOCTL, "no object "DFID": %ld\n",PFID(&fp->gf_fid),
+                       PTR_ERR(obj));
+                RETURN(-EINVAL);
+        }
+
+        rc = lu_object_exists(&obj->mot_obj.mo_lu);
+        if (rc <= 0) {
+                if (rc == -1)
+                        rc = -EREMOTE;
+                else
+                        rc = -ENOENT;
+                mdt_object_put(env, obj);
+                CDEBUG(D_IOCTL, "nonlocal object "DFID": %d\n",
+                       PFID(&fp->gf_fid), rc);
+                RETURN(rc);
+        }
+
+        rc = mo_path(env, md_object_next(&obj->mot_obj), fp->gf_path,
+                     fp->gf_pathlen, &fp->gf_recno, &fp->gf_linkno);
+        mdt_object_put(env, obj);
+
+        RETURN(rc);
+}
+
+static int mdt_get_info(struct mdt_thread_info *info)
+{
+        struct ptlrpc_request *req = mdt_info_req(info);
+        char *key;
+        int keylen;
+        __u32 *vallen;
+        void *valout;
+        int rc;
+        ENTRY;
+
+        key = req_capsule_client_get(info->mti_pill, &RMF_GETINFO_KEY);
+        if (key == NULL) {
+                CDEBUG(D_IOCTL, "No GETINFO key");
+                RETURN(-EFAULT);
+        }
+        keylen = req_capsule_get_size(info->mti_pill, &RMF_GETINFO_KEY,
+                                      RCL_CLIENT);
+
+        vallen = req_capsule_client_get(info->mti_pill, &RMF_GETINFO_VALLEN);
+        if (vallen == NULL) {
+                CDEBUG(D_IOCTL, "Unable to get RMF_GETINFO_VALLEN buffer");
+                RETURN(-EFAULT);
+        }
+
+        req_capsule_set_size(info->mti_pill, &RMF_GETINFO_VAL, RCL_SERVER,
+                             *vallen);
+        rc = req_capsule_server_pack(info->mti_pill);
+        valout = req_capsule_server_get(info->mti_pill, &RMF_GETINFO_VAL);
+        if (valout == NULL) {
+                CDEBUG(D_IOCTL, "Unable to get get-info RPC out buffer");
+                RETURN(-EFAULT);
+        }
+
+        if (KEY_IS(KEY_FID2PATH))
+                rc = mdt_rpc_fid2path(info, key, valout, *vallen);
+        else
+                rc = -EINVAL;
+
+        lustre_msg_set_status(req->rq_repmsg, rc);
+
+        RETURN(rc);
+}
+
+/* Pass the ioc down */
+static int mdt_ioc_child(struct lu_env *env, struct mdt_device *mdt,
+                         unsigned int cmd, int len, void *data)
+{
+        struct lu_context ioctl_session;
+        struct md_device *next = mdt->mdt_child;
+        int rc;
+        ENTRY;
+
+        rc = lu_context_init(&ioctl_session, LCT_SESSION);
+        if (rc)
+                RETURN(rc);
+        ioctl_session.lc_thread = (struct ptlrpc_thread *)cfs_current();
+        lu_context_enter(&ioctl_session);
+        env->le_ses = &ioctl_session;
+
+        LASSERT(next->md_ops->mdo_iocontrol);
+        rc = next->md_ops->mdo_iocontrol(env, next, cmd, len, data);
+
+        lu_context_exit(&ioctl_session);
+        lu_context_fini(&ioctl_session);
+        RETURN(rc);
+}
+
+/* ioctls on obd dev */
 static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                          void *karg, void *uarg)
 {
         struct lu_env      env;
-        struct obd_device *obd= exp->exp_obd;
+        struct obd_device *obd = exp->exp_obd;
         struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
         struct dt_device  *dt = mdt->mdt_bottom;
         int rc;
 
         ENTRY;
         CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
-        rc = lu_env_init(&env, NULL, LCT_MD_THREAD);
+        rc = lu_env_init(&env, LCT_MD_THREAD);
         if (rc)
                 RETURN(rc);
 
@@ -4678,7 +5511,6 @@ static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                 rc = mdt_device_sync(&env, mdt);
                 break;
         case OBD_IOC_SET_READONLY:
-                rc = dt->dd_ops->dt_sync(&env, dt);
                 dt->dd_ops->dt_ro(&env, dt);
                 break;
         case OBD_IOC_ABORT_RECOVERY:
@@ -4686,6 +5518,11 @@ static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                 target_stop_recovery_thread(obd);
                 rc = 0;
                 break;
+        case OBD_IOC_CHANGELOG_REG:
+        case OBD_IOC_CHANGELOG_DEREG:
+        case OBD_IOC_CHANGELOG_CLEAR:
+                rc = mdt_ioc_child(&env, mdt, cmd, len, karg);
+                break;
         default:
                 CERROR("Not supported cmd = %d for device %s\n",
                        cmd, obd->obd_name);
@@ -4699,7 +5536,10 @@ static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
 int mdt_postrecov(const struct lu_env *env, struct mdt_device *mdt)
 {
         struct lu_device *ld = md2lu_dev(mdt->mdt_child);
-        struct obd_device *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
+        struct obd_device *obd = mdt2obd_dev(mdt);
+#ifdef HAVE_QUOTA_SUPPORT
+        struct md_device *next = mdt->mdt_child;
+#endif
         int rc, lost;
         ENTRY;
         /* if some clients didn't participate in recovery then we can possibly
@@ -4708,6 +5548,10 @@ int mdt_postrecov(const struct lu_env *env, struct mdt_device *mdt)
         mdt_seq_adjust(env, mdt, lost);
 
         rc = ld->ld_ops->ldo_recovery_complete(env, ld);
+#ifdef HAVE_QUOTA_SUPPORT
+        if (likely(obd->obd_stopping == 0))
+                next->md_ops->mdo_quota.mqo_recovery(env, next);
+#endif
         RETURN(rc);
 }
 
@@ -4716,7 +5560,7 @@ int mdt_obd_postrecov(struct obd_device *obd)
         struct lu_env env;
         int rc;
 
-        rc = lu_env_init(&env, NULL, LCT_MD_THREAD);
+        rc = lu_env_init(&env, LCT_MD_THREAD);
         if (rc)
                 RETURN(rc);
         rc = mdt_postrecov(&env, mdt_dev(obd->obd_lu_dev));
@@ -4726,6 +5570,7 @@ int mdt_obd_postrecov(struct obd_device *obd)
 
 static struct obd_ops mdt_obd_device_ops = {
         .o_owner          = THIS_MODULE,
+        .o_set_info_async = mdt_obd_set_info_async,
         .o_connect        = mdt_obd_connect,
         .o_reconnect      = mdt_obd_reconnect,
         .o_disconnect     = mdt_obd_disconnect,
@@ -4752,6 +5597,7 @@ static struct lu_device *mdt_device_free(const struct lu_env *env,
         struct mdt_device *m = mdt_dev(d);
         ENTRY;
 
+        md_device_fini(&m->mdt_md_dev);
         OBD_FREE_PTR(m);
         RETURN(NULL);
 }
@@ -4770,7 +5616,7 @@ static struct lu_device *mdt_device_alloc(const struct lu_env *env,
                 l = &m->mdt_md_dev.md_lu_dev;
                 rc = mdt_init0(env, m, t, cfg);
                 if (rc != 0) {
-                        OBD_FREE_PTR(m);
+                        mdt_device_free(env, l);
                         l = ERR_PTR(rc);
                         return l;
                 }
@@ -4800,6 +5646,42 @@ struct md_ucred *mdt_ucred(const struct mdt_thread_info *info)
         return md_ucred(info->mti_env);
 }
 
+/**
+ * Enable/disable COS.
+ *
+ * Set/Clear the COS flag in mdt options.
+ *
+ * \param mdt mdt device
+ * \param val 0 disables COS, other values enable COS
+ */
+void mdt_enable_cos(struct mdt_device *mdt, int val)
+{
+        struct lu_env env;
+        int rc;
+
+        mdt->mdt_opts.mo_cos = !!val;
+        rc = lu_env_init(&env, LCT_MD_THREAD);
+        if (unlikely(rc != 0)) {
+                CWARN("lu_env initialization failed with rc = %d,"
+                      "cannot sync\n", rc);
+                return;
+        }
+        mdt_device_sync(&env, mdt);
+        lu_env_fini(&env);
+}
+
+/**
+ * Check COS status.
+ *
+ * Return COS flag status/
+ *
+ * \param mdt mdt device
+ */
+int mdt_cos_is_enabled(struct mdt_device *mdt)
+{
+        return mdt->mdt_opts.mo_cos != 0;
+}
+
 /* type constructor/destructor: mdt_type_init, mdt_type_fini */
 LU_TYPE_INIT_FINI(mdt, &mdt_thread_key, &mdt_txn_key);
 
@@ -4822,11 +5704,19 @@ static struct lu_device_type mdt_device_type = {
         .ldt_ctx_tags = LCT_MD_THREAD
 };
 
+static struct lu_local_obj_desc mdt_last_recv = {
+        .llod_name      = LAST_RCVD,
+        .llod_oid       = MDT_LAST_RECV_OID,
+        .llod_is_index  = 0,
+};
+
 static int __init mdt_mod_init(void)
 {
         struct lprocfs_static_vars lvars;
         int rc;
 
+        llo_local_obj_register(&mdt_last_recv);
+
         mdt_num_threads = MDT_NUM_THREADS;
         lprocfs_mdt_init_vars(&lvars);
         rc = class_register_type(&mdt_obd_device_ops, NULL,
@@ -4838,6 +5728,7 @@ static int __init mdt_mod_init(void)
 
 static void __exit mdt_mod_exit(void)
 {
+        llo_local_obj_unregister(&mdt_last_recv);
         class_unregister_type(LUSTRE_MDT_NAME);
 }
 
@@ -4881,6 +5772,7 @@ static struct mdt_handler mdt_mds_ops[] = {
 DEF_MDT_HNDL_F(0,                         CONNECT,      mdt_connect),
 DEF_MDT_HNDL_F(0,                         DISCONNECT,   mdt_disconnect),
 DEF_MDT_HNDL_F(0,                         SET_INFO,     mdt_set_info),
+DEF_MDT_HNDL_F(0,                         GET_INFO,     mdt_get_info),
 DEF_MDT_HNDL_F(0           |HABEO_REFERO, GETSTATUS,    mdt_getstatus),
 DEF_MDT_HNDL_F(HABEO_CORPUS,              GETATTR,      mdt_getattr),
 DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, GETATTR_NAME, mdt_getattr_name),
@@ -4892,8 +5784,10 @@ DEF_MDT_HNDL_F(HABEO_CORPUS,              DONE_WRITING, mdt_done_writing),
 DEF_MDT_HNDL_F(0           |HABEO_REFERO, PIN,          mdt_pin),
 DEF_MDT_HNDL_0(0,                         SYNC,         mdt_sync),
 DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, IS_SUBDIR,    mdt_is_subdir),
+#ifdef HAVE_QUOTA_SUPPORT
 DEF_MDT_HNDL_F(0,                         QUOTACHECK,   mdt_quotacheck_handle),
 DEF_MDT_HNDL_F(0,                         QUOTACTL,     mdt_quotactl_handle)
+#endif
 };
 
 #define DEF_OBD_HNDL(flags, name, fn)                   \
@@ -4918,7 +5812,19 @@ static struct mdt_handler mdt_dlm_ops[] = {
         DEF_DLM_HNDL_0(0,            CP_CALLBACK,    mdt_cp_callback)
 };
 
+#define DEF_LLOG_HNDL(flags, name, fn)                   \
+        DEF_HNDL(LLOG, ORIGIN_HANDLE_CREATE, _NET, flags, name, fn, NULL)
+
 static struct mdt_handler mdt_llog_ops[] = {
+        DEF_LLOG_HNDL(0, ORIGIN_HANDLE_CREATE,      mdt_llog_create),
+        DEF_LLOG_HNDL(0, ORIGIN_HANDLE_NEXT_BLOCK,  mdt_llog_next_block),
+        DEF_LLOG_HNDL(0, ORIGIN_HANDLE_READ_HEADER, mdt_llog_read_header),
+        DEF_LLOG_HNDL(0, ORIGIN_HANDLE_WRITE_REC,   NULL),
+        DEF_LLOG_HNDL(0, ORIGIN_HANDLE_CLOSE,       NULL),
+        DEF_LLOG_HNDL(0, ORIGIN_CONNECT,            NULL),
+        DEF_LLOG_HNDL(0, CATINFO,                   NULL),
+        DEF_LLOG_HNDL(0, ORIGIN_HANDLE_PREV_BLOCK,  mdt_llog_prev_block),
+        DEF_LLOG_HNDL(0, ORIGIN_HANDLE_DESTROY,     mdt_llog_destroy),
 };
 
 #define DEF_SEC_CTX_HNDL(name, fn)                      \