Whamcloud - gitweb
- zero out used locks in MDT before finishing req handling;
[fs/lustre-release.git] / lustre / fld / fld_handler.c
index e428655..375982c 100644 (file)
@@ -5,8 +5,8 @@
  *  FLD (Fids Location Database)
  *
  *  Copyright (C) 2006 Cluster File Systems, Inc.
- *   Author: WangDi <wangdi@clusterfs.com>
- *           Yury Umanets <umka@clusterfs.com>
+ *   Author: Yury Umanets <umka@clusterfs.com>
+ *           WangDi <wangdi@clusterfs.com>
  *
  *   This file is part of the Lustre file system, http://www.lustre.org
  *   Lustre is a trademark of Cluster File Systems, Inc.
 #include <obd_support.h>
 #include <lprocfs_status.h>
 
-#include <dt_object.h>
 #include <md_object.h>
 #include <lustre_req_layout.h>
-#include <lustre_fld.h>
 #include "fld_internal.h"
 
 #ifdef __KERNEL__
-struct fld_cache_info *fld_cache = NULL;
-
-static int fld_init(void)
+static void *fld_key_init(const struct lu_context *ctx,
+                          struct lu_context_key *key)
 {
-        int i;
+        struct fld_thread_info *info;
         ENTRY;
 
-        OBD_ALLOC_PTR(fld_cache);
-        if (fld_cache == NULL)
-                RETURN(-ENOMEM);
-
-        spin_lock_init(&fld_cache->fci_lock);
-
-        /* init fld cache info */
-        fld_cache->fci_hash_mask = FLD_HTABLE_MASK;
-        OBD_ALLOC(fld_cache->fci_hash, FLD_HTABLE_SIZE *
-                  sizeof(*fld_cache->fci_hash));
-        if (fld_cache->fci_hash == NULL) {
-                OBD_FREE_PTR(fld_cache);
-                RETURN(-ENOMEM);
-        }
-        
-        for (i = 0; i < FLD_HTABLE_SIZE; i++)
-                INIT_HLIST_HEAD(&fld_cache->fci_hash[i]);
-
-        CDEBUG(D_INFO|D_WARNING, "Client FLD, cache size %d\n",
-               FLD_HTABLE_SIZE);
-        
-        RETURN(0);
+        OBD_ALLOC_PTR(info);
+        if (info == NULL)
+                info = ERR_PTR(-ENOMEM);
+        RETURN(info);
 }
 
-static int fld_fini(void)
+static void fld_key_fini(const struct lu_context *ctx,
+                         struct lu_context_key *key, void *data)
 {
+        struct fld_thread_info *info = data;
         ENTRY;
-        if (fld_cache != NULL) {
-                OBD_FREE(fld_cache->fci_hash, FLD_HTABLE_SIZE *
-                         sizeof(*fld_cache->fci_hash));
-                OBD_FREE_PTR(fld_cache);
-        }
-        RETURN(0);
+        OBD_FREE_PTR(info);
+        EXIT;
 }
 
+struct lu_context_key fld_thread_key = {
+        .lct_tags = LCT_MD_THREAD|LCT_DT_THREAD,
+        .lct_init = fld_key_init,
+        .lct_fini = fld_key_fini
+};
+
+cfs_proc_dir_entry_t *fld_type_proc_dir = NULL;
+
 static int __init fld_mod_init(void)
 {
-        fld_init();
+        printk(KERN_INFO "Lustre: Fid Location Database; "
+               "info@clusterfs.com\n");
+
+        fld_type_proc_dir = lprocfs_register(LUSTRE_FLD_NAME,
+                                             proc_lustre_root,
+                                             NULL, NULL);
+        if (IS_ERR(fld_type_proc_dir))
+                return PTR_ERR(fld_type_proc_dir);
+
+        lu_context_key_register(&fld_thread_key);
         return 0;
 }
 
 static void __exit fld_mod_exit(void)
 {
-        fld_fini();
-        return;
+        lu_context_key_degister(&fld_thread_key);
+        if (fld_type_proc_dir != NULL && !IS_ERR(fld_type_proc_dir)) {
+                lprocfs_remove(fld_type_proc_dir);
+                fld_type_proc_dir = NULL;
+        }
+}
+
+/* insert index entry and update cache */
+int fld_server_create(struct lu_server_fld *fld,
+                      const struct lu_env *env,
+                      seqno_t seq, mdsno_t mds)
+{
+        return fld_index_create(fld, env, seq, mds);
+}
+EXPORT_SYMBOL(fld_server_create);
+
+/* delete index entry */
+int fld_server_delete(struct lu_server_fld *fld,
+                      const struct lu_env *env,
+                      seqno_t seq)
+{
+        return fld_index_delete(fld, env, seq);
+}
+EXPORT_SYMBOL(fld_server_delete);
+
+/* issue on-disk index lookup */
+int fld_server_lookup(struct lu_server_fld *fld,
+                      const struct lu_env *env,
+                      seqno_t seq, mdsno_t *mds)
+{
+        return fld_index_lookup(fld, env, seq, mds);
 }
+EXPORT_SYMBOL(fld_server_lookup);
 
-static int
-fld_server_handle(struct lu_server_fld *fld,
-                  const struct lu_context *ctx,
-                  __u32 opc, struct md_fld *mf)
+static int fld_server_handle(struct lu_server_fld *fld,
+                             const struct lu_env *env,
+                             __u32 opc, struct md_fld *mf,
+                             struct fld_thread_info *info)
 {
         int rc;
         ENTRY;
 
         switch (opc) {
         case FLD_CREATE:
-                rc = fld_index_insert(fld, ctx,
-                                      mf->mf_seq, mf->mf_mds);
+                rc = fld_server_create(fld, env,
+                                       mf->mf_seq, mf->mf_mds);
+
+                /* do not return -EEXIST error for resent case */
+                if ((info->fti_flags & MSG_RESENT) && rc == -EEXIST)
+                        rc = 0;
                 break;
         case FLD_DELETE:
-                rc = fld_index_delete(fld, ctx, mf->mf_seq);
+                rc = fld_server_delete(fld, env, mf->mf_seq);
+
+                /* do not return -ENOENT error for resent case */
+                if ((info->fti_flags & MSG_RESENT) && rc == -ENOENT)
+                        rc = 0;
                 break;
         case FLD_LOOKUP:
-                rc = fld_index_lookup(fld, ctx,
-                                      mf->mf_seq, &mf->mf_mds);
+                rc = fld_server_lookup(fld, env,
+                                       mf->mf_seq, &mf->mf_mds);
                 break;
         default:
                 rc = -EINVAL;
                 break;
         }
+        if (rc) {
+                CERROR("%s: FLD req (opc: %d, seq: "LPX64", mds: "
+                       LPU64") handle error %d\n", fld->lsf_name,
+                       opc, mf->mf_seq, mf->mf_mds, rc);
+        }
         RETURN(rc);
 
 }
 
-static int
-fld_req_handle0(const struct lu_context *ctx,
-                struct lu_server_fld *fld,
-                struct ptlrpc_request *req)
+static int fld_req_handle(struct ptlrpc_request *req,
+                          struct fld_thread_info *info)
 {
-        int rep_buf_size[3] = { 0, };
-        struct req_capsule pill;
+        struct lu_site *site;
         struct md_fld *in;
         struct md_fld *out;
-        int rc = -EPROTO;
+        int rc;
         __u32 *opc;
         ENTRY;
 
-        req_capsule_init(&pill, req, RCL_SERVER,
-                         rep_buf_size);
+        site = req->rq_export->exp_obd->obd_lu_dev->ld_site;
 
-        req_capsule_set(&pill, &RQF_FLD_QUERY);
-        req_capsule_pack(&pill);
+        rc = req_capsule_pack(&info->fti_pill);
+        if (rc)
+                RETURN(err_serious(rc));
 
-        opc = req_capsule_client_get(&pill, &RMF_FLD_OPC);
+        opc = req_capsule_client_get(&info->fti_pill, &RMF_FLD_OPC);
         if (opc != NULL) {
-                in = req_capsule_client_get(&pill, &RMF_FLD_MDFLD);
-                if (in == NULL) {
-                        CERROR("cannot unpack fld request\n");
-                        GOTO(out_pill, rc = -EPROTO);
-                }
-                out = req_capsule_server_get(&pill, &RMF_FLD_MDFLD);
-                if (out == NULL) {
-                        CERROR("cannot allocate fld response\n");
-                        GOTO(out_pill, rc = -EPROTO);
-                }
+                in = req_capsule_client_get(&info->fti_pill, &RMF_FLD_MDFLD);
+                if (in == NULL)
+                        RETURN(err_serious(-EPROTO));
+                out = req_capsule_server_get(&info->fti_pill, &RMF_FLD_MDFLD);
+                if (out == NULL)
+                        RETURN(err_serious(-EPROTO));
                 *out = *in;
-                rc = fld_server_handle(fld, ctx, *opc, out);
-        } else {
-                CERROR("cannot unpack FLD operation\n");
-        }
-        
-out_pill:
-        EXIT;
-        req_capsule_fini(&pill);
-        return rc;
+
+                rc = fld_server_handle(site->ls_server_fld,
+                                       req->rq_svc_thread->t_env,
+                                       *opc, out, info);
+        } else
+                rc = err_serious(-EPROTO);
+
+        RETURN(rc);
 }
 
-static int fld_req_handle(struct ptlrpc_request *req)
+static void fld_thread_info_init(struct ptlrpc_request *req,
+                                 struct fld_thread_info *info)
 {
-        int fail = OBD_FAIL_FLD_ALL_REPLY_NET;
-        const struct lu_context *ctx;
-        struct lu_site    *site;
-        int rc = -EPROTO;
-        ENTRY;
+        int i;
 
-        OBD_FAIL_RETURN(OBD_FAIL_FLD_ALL_REPLY_NET | OBD_FAIL_ONCE, 0);
-
-        ctx = req->rq_svc_thread->t_ctx;
-        LASSERT(ctx != NULL);
-        LASSERT(ctx->lc_thread == req->rq_svc_thread);
-        if (req->rq_reqmsg->opc == FLD_QUERY) {
-                if (req->rq_export != NULL) {
-                        site = req->rq_export->exp_obd->obd_lu_dev->ld_site;
-                        LASSERT(site != NULL);
-                        rc = fld_req_handle0(ctx, site->ls_fld, req);
-                } else {
-                        CERROR("Unconnected request\n");
-                        req->rq_status = -ENOTCONN;
-                        GOTO(out, rc = -ENOTCONN);
-                }
-        } else {
-                CERROR("Wrong opcode: %d\n", req->rq_reqmsg->opc);
-                req->rq_status = -ENOTSUPP;
-                rc = ptlrpc_error(req);
-                RETURN(rc);
-        }
+        info->fti_flags = lustre_msg_get_flags(req->rq_reqmsg);
 
-        EXIT;
-out:
-        target_send_reply(req, rc, fail);
-        return 0;
+        /* mark rep buffer as req-layout stuff expects */
+        for (i = 0; i < ARRAY_SIZE(info->fti_rep_buf_size); i++)
+                info->fti_rep_buf_size[i] = -1;
+
+        /* init request capsule */
+        req_capsule_init(&info->fti_pill, req, RCL_SERVER,
+                         info->fti_rep_buf_size);
+
+        req_capsule_set(&info->fti_pill, &RQF_FLD_QUERY);
 }
 
-#ifdef LPROCFS
-static int
-fld_server_proc_init(struct lu_server_fld *fld)
+static void fld_thread_info_fini(struct fld_thread_info *info)
 {
-        int rc;
-        ENTRY;
+        req_capsule_fini(&info->fti_pill);
+}
 
-        fld->fld_proc_dir = lprocfs_register(fld->fld_name,
-                                             proc_lustre_root,
-                                             NULL, NULL);
-        if (IS_ERR(fld->fld_proc_dir)) {
-                CERROR("LProcFS failed in fld-init\n");
-                rc = PTR_ERR(fld->fld_proc_dir);
-                GOTO(err, rc);
-        }
+static int fld_handle(struct ptlrpc_request *req)
+{
+        const struct lu_env *env;
+        struct fld_thread_info *info;
+        int rc;
 
-        fld->fld_proc_entry = lprocfs_register("services",
-                                               fld->fld_proc_dir,
-                                               NULL, NULL);
-        if (IS_ERR(fld->fld_proc_entry)) {
-                CERROR("LProcFS failed in fld-init\n");
-                rc = PTR_ERR(fld->fld_proc_entry);
-                GOTO(err_type, rc);
-        }
+        env = req->rq_svc_thread->t_env;
+        LASSERT(env != NULL);
 
-        rc = lprocfs_add_vars(fld->fld_proc_dir,
-                              fld_server_proc_list, fld);
-        if (rc) {
-                CERROR("can't init FLD proc, rc %d\n", rc);
-                GOTO(err_type, rc);
-        }
+        info = lu_context_key_get(&env->le_ctx, &fld_thread_key);
+        LASSERT(info != NULL);
 
-        RETURN(0);
+        fld_thread_info_init(req, info);
+        rc = fld_req_handle(req, info);
+        fld_thread_info_fini(info);
 
-err_type:
-        lprocfs_remove(fld->fld_proc_dir);
-err:
-        fld->fld_proc_dir = NULL;
-        fld->fld_proc_entry = NULL;
         return rc;
 }
 
-static void
-fld_server_proc_fini(struct lu_server_fld *fld)
+/*
+ * Entry point for handling FLD RPCs called from MDT.
+ */
+int fld_query(struct com_thread_info *info)
+{
+        return fld_handle(info->cti_pill.rc_req);
+}
+EXPORT_SYMBOL(fld_query);
+
+/*
+ * Returns true, if fid is local to this server node.
+ *
+ * WARNING: this function is *not* guaranteed to return false if fid is
+ * remote: it makes an educated conservative guess only.
+ *
+ * fid_is_local() is supposed to be used in assertion checks only.
+ */
+int fid_is_local(struct lu_site *site, const struct lu_fid *fid)
+{
+        int result;
+
+        result = 1; /* conservatively assume fid is local */
+        if (site->ls_client_fld != NULL) {
+                mdsno_t mds;
+                int rc;
+
+                rc = fld_cache_lookup(site->ls_client_fld->lcf_cache,
+                                      fid_seq(fid), &mds);
+                if (rc == 0)
+                        result = (mds == site->ls_node_id);
+        }
+        return result;
+}
+EXPORT_SYMBOL(fid_is_local);
+
+static void fld_server_proc_fini(struct lu_server_fld *fld);
+
+#ifdef LPROCFS
+static int fld_server_proc_init(struct lu_server_fld *fld)
 {
+        int rc = 0;
         ENTRY;
-        if (fld->fld_proc_entry) {
-                lprocfs_remove(fld->fld_proc_entry);
-                fld->fld_proc_entry = NULL;
+
+        fld->lsf_proc_dir = lprocfs_register(fld->lsf_name,
+                                             fld_type_proc_dir,
+                                             fld_server_proc_list, fld);
+        if (IS_ERR(fld->lsf_proc_dir)) {
+                rc = PTR_ERR(fld->lsf_proc_dir);
+                RETURN(rc);
         }
 
-        if (fld->fld_proc_dir) {
-                lprocfs_remove(fld->fld_proc_dir);
-                fld->fld_proc_dir = NULL;
+        RETURN(rc);
+}
+
+static void fld_server_proc_fini(struct lu_server_fld *fld)
+{
+        ENTRY;
+        if (fld->lsf_proc_dir != NULL) {
+                if (!IS_ERR(fld->lsf_proc_dir))
+                        lprocfs_remove(fld->lsf_proc_dir);
+                fld->lsf_proc_dir = NULL;
         }
         EXIT;
 }
+#else
+static int fld_server_proc_init(struct lu_server_fld *fld)
+{
+        return 0;
+}
+
+static void fld_server_proc_fini(struct lu_server_fld *fld)
+{
+        return;
+}
 #endif
 
-int
-fld_server_init(struct lu_server_fld *fld,
-                const struct lu_context *ctx,
-                const char *uuid,
-                struct dt_device *dt)
+int fld_server_init(struct lu_server_fld *fld, struct dt_device *dt,
+                    const char *prefix, const struct lu_env *env)
 {
         int rc;
-        struct ptlrpc_service_conf fld_conf = {
-                .psc_nbufs            = MDS_NBUFS,
-                .psc_bufsize          = MDS_BUFSIZE,
-                .psc_max_req_size     = MDS_MAXREQSIZE,
-                .psc_max_reply_size   = MDS_MAXREPSIZE,
-                .psc_req_portal       = FLD_REQUEST_PORTAL,
-                .psc_rep_portal       = MDC_REPLY_PORTAL,
-                .psc_watchdog_timeout = FLD_SERVICE_WATCHDOG_TIMEOUT,
-                .psc_num_threads      = FLD_NUM_THREADS
-        };
         ENTRY;
 
-        fld->fld_dt = dt;
-        lu_device_get(&dt->dd_lu_dev);
+        snprintf(fld->lsf_name, sizeof(fld->lsf_name),
+                 "srv-%s", prefix);
 
-        snprintf(fld->fld_name, sizeof(fld->fld_name),
-                 "%s-%s", LUSTRE_FLD_NAME, uuid);
-        
-        rc = fld_index_init(fld, ctx);
+        rc = fld_index_init(fld, env, dt);
         if (rc)
                 GOTO(out, rc);
 
-#ifdef LPROCFS
         rc = fld_server_proc_init(fld);
         if (rc)
                 GOTO(out, rc);
-#endif
-
-        fld->fld_service =
-                ptlrpc_init_svc_conf(&fld_conf, fld_req_handle,
-                                     LUSTRE_FLD_NAME,
-                                     fld->fld_proc_entry, NULL);
-        if (fld->fld_service != NULL)
-                rc = ptlrpc_start_threads(NULL, fld->fld_service,
-                                          LUSTRE_FLD_NAME);
-        else
-                rc = -ENOMEM;
 
         EXIT;
 out:
         if (rc)
-                fld_server_fini(fld, ctx);
-        else
-                CDEBUG(D_INFO|D_WARNING, "Server FLD\n");
+                fld_server_fini(fld, env);
         return rc;
 }
 EXPORT_SYMBOL(fld_server_init);
 
-void
-fld_server_fini(struct lu_server_fld *fld,
-                const struct lu_context *ctx)
+void fld_server_fini(struct lu_server_fld *fld,
+                     const struct lu_env *env)
 {
         ENTRY;
 
-#ifdef LPROCFS
         fld_server_proc_fini(fld);
-#endif
-        
-        if (fld->fld_service != NULL) {
-                ptlrpc_unregister_service(fld->fld_service);
-                fld->fld_service = NULL;
-        }
+        fld_index_fini(fld, env);
 
-        if (fld->fld_dt != NULL) {
-                lu_device_put(&fld->fld_dt->dd_lu_dev);
-                fld_index_fini(fld, ctx);
-                fld->fld_dt = NULL;
-        }
-        CDEBUG(D_INFO|D_WARNING, "Server FLD\n");
         EXIT;
 }
 EXPORT_SYMBOL(fld_server_fini);
@@ -358,5 +367,5 @@ MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
 MODULE_DESCRIPTION("Lustre FLD");
 MODULE_LICENSE("GPL");
 
-cfs_module(mdd, "0.0.4", fld_mod_init, fld_mod_exit);
+cfs_module(mdd, "0.1.0", fld_mod_init, fld_mod_exit);
 #endif