Whamcloud - gitweb
capa_renew should increase the refc.
[fs/lustre-release.git] / lustre / mds / mds_capa.c
index 63f9aec..6f8d091 100644 (file)
@@ -45,15 +45,18 @@ static struct thread_ctl {
 } mds_eck_ctl;
 
 static LIST_HEAD(mds_capa_key_list);
-static LIST_HEAD(mds_expired_capa_keys);
 static spinlock_t mds_capa_lock; /* protect capa and capa key */
 struct timer_list mds_eck_timer;
 
-#define CUR_MDS_CAPA_KEY(mds) mds->mds_capa_keys[mds->mds_capa_key_idx]
+#define CAPA_KEY_JIFFIES(key) \
+        expiry_to_jiffies(le64_to_cpu((key)->k_key->lk_expiry))
+
+#define CUR_MDS_CAPA_KEY(mds) (mds)->mds_capa_keys[(mds)->mds_capa_key_idx]
 #define CUR_CAPA_KEY(mds) CUR_MDS_CAPA_KEY(mds).k_key
 #define CUR_CAPA_KEY_ID(mds) CUR_MDS_CAPA_KEY(mds).k_key->lk_keyid
-#define CUR_CAPA_KEY_EXPIRY(mds) expiry_to_jiffies(CUR_CAPA_KEY(mds)->lk_expiry)
 #define CUR_CAPA_KEY_LIST(mds) CUR_MDS_CAPA_KEY(mds).k_list
+#define CUR_CAPA_KEY_EXPIRY(mds) le64_to_cpu(CUR_CAPA_KEY(mds)->lk_expiry)
+#define CUR_CAPA_KEY_JIFFIES(mds) CAPA_KEY_JIFFIES(&CUR_MDS_CAPA_KEY(mds))
 
 static int mds_write_capa_key(struct obd_device *obd, int force_sync)
 {
@@ -62,7 +65,7 @@ static int mds_write_capa_key(struct obd_device *obd, int force_sync)
         struct file *filp = mds->mds_capa_keys_filp;
         struct lvfs_run_ctxt saved;
         loff_t off = 0;
-        int i, rc;
+        int i, rc = 0;
         ENTRY;
 
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
@@ -83,8 +86,8 @@ static int mds_write_capa_key(struct obd_device *obd, int force_sync)
 static inline int
 mds_capa_key_cmp(struct mds_obd *mds)
 {
-        return capa_key_cmp(mds->mds_capa_keys[0].k_key,
-                            mds->mds_capa_keys[1].k_key);
+        return le32_to_cpu(mds->mds_capa_keys[0].k_key->lk_keyid) -
+               le32_to_cpu(mds->mds_capa_keys[1].k_key->lk_keyid);
 }
 
 static void
@@ -94,8 +97,10 @@ do_update_capa_key(struct mds_obd *mds, struct lustre_capa_key *key)
         __u64 expiry_rounded;
 
         if (CUR_CAPA_KEY(mds))
-                keyid = le32_to_cpu(CUR_CAPA_KEY(mds)->lk_keyid) + 1;
+                keyid = le32_to_cpu(CUR_CAPA_KEY_ID(mds)) + 1;
+        spin_lock(&mds_capa_lock);
         expiry_rounded = round_expiry(mds->mds_capa_key_timeout);
+        spin_unlock(&mds_capa_lock);
 
         key->lk_mdsid = cpu_to_le32(mds->mds_num);
         key->lk_keyid = cpu_to_le32(keyid);
@@ -108,13 +113,15 @@ static void list_add_capa_key(struct mds_capa_key *key, struct list_head *head)
         struct mds_capa_key *tmp;
 
         list_for_each_entry_reverse(tmp, head, k_list) {
-                if (key->k_key->lk_expiry > tmp->k_key->lk_expiry) {
-                        list_add(&key->k_list, &tmp->k_list);
+                if (le64_to_cpu(key->k_key->lk_expiry) <
+                    le64_to_cpu(tmp->k_key->lk_expiry)) {
+                        /* put key before tmp */
+                        list_add_tail(&key->k_list, &tmp->k_list);
                         return;
                 }
         }
 
-        list_add(&key->k_list, head);
+        list_add_tail(&key->k_list, head);
 }
 
 int mds_read_capa_key(struct obd_device *obd, struct file *file)
@@ -123,6 +130,7 @@ int mds_read_capa_key(struct obd_device *obd, struct file *file)
         struct mds_obd *mds = &obd->u.mds;
         struct lustre_capa_key *key;
         unsigned long capa_keys_size = file->f_dentry->d_inode->i_size;
+        unsigned long expiry;
         int i = 0, rc = 0;
         ENTRY;
 
@@ -173,11 +181,13 @@ int mds_read_capa_key(struct obd_device *obd, struct file *file)
                         mds->mds_capa_key_idx = 1;
         }
 
+        expiry = CUR_CAPA_KEY_JIFFIES(mds);
         spin_lock(&mds_capa_lock);
-        if (time_before(CUR_CAPA_KEY_EXPIRY(mds), mds_eck_timer.expires)
-            || !timer_pending(&mds_eck_timer))
-                mod_timer(&mds_eck_timer, CUR_CAPA_KEY_EXPIRY(mds));
-
+        if (time_before(expiry, mds_eck_timer.expires) ||
+            !timer_pending(&mds_eck_timer)) {
+                mod_timer(&mds_eck_timer, expiry);
+                CDEBUG(D_INFO, "mds_eck_timer %lu", expiry);
+        }
         list_add_capa_key(&CUR_MDS_CAPA_KEY(mds), &mds_capa_key_list);
         spin_unlock(&mds_capa_lock);
 out:
@@ -189,6 +199,7 @@ void mds_capa_keys_cleanup(struct obd_device *obd)
         struct mds_obd *mds = &obd->u.mds;
         int i;
 
+        del_timer(&mds_eck_timer);
         spin_lock(&mds_capa_lock);
         if (CUR_CAPA_KEY(mds))
                 list_del_init(&CUR_CAPA_KEY_LIST(mds));
@@ -208,9 +219,6 @@ static int mds_set_capa_key(struct obd_device *obd, struct lustre_capa_key *key)
 
         rc = obd_set_info(mds->mds_dt_exp, strlen("capa_key"), "capa_key",
                           sizeof(*key), key);
-        if (rc)
-                CERROR("obd_set_info capa_key failed: rc = %d\n", rc);
-
         RETURN(rc);
 }
 
@@ -219,10 +227,11 @@ mds_update_capa_key(struct obd_device *obd, struct mds_capa_key *mkey,
                     int force_sync)
 {
         struct mds_obd *mds = &obd->u.mds;
-        int to_update = mds->mds_capa_key_idx ^ 1;
+        int to_update = !mds->mds_capa_key_idx;
         struct lustre_capa_key *key = mds->mds_capa_keys[to_update].k_key;
         __u32 keyid;
-        int rc = 0;
+        unsigned long expiry;
+        int rc, rc2;
         ENTRY;
 
         LASSERT(mkey != &mds->mds_capa_keys[to_update]);
@@ -241,33 +250,37 @@ mds_update_capa_key(struct obd_device *obd, struct mds_capa_key *mkey,
         keyid = le32_to_cpu(key->lk_keyid);
 
         rc = mds_set_capa_key(obd, key);
-        if (rc) {
+        if (rc)
+                /* XXX: anyway, it will be replayed */
                 CERROR("error set capa key(id %u), err = %d\n", keyid, rc);
-                GOTO(out, rc);
-        }
-
-        CDEBUG(D_INFO, "MDS capa_keyid is %u\n", keyid);
 
-        rc = mds_write_capa_key(obd, 1);
-        if (rc)
-                GOTO(out, rc);
+        rc2 = mds_write_capa_key(obd, 1);
+        if (rc2)
+                GOTO(out, rc2);
         
-        CDEBUG(D_INFO, "wrote capa keyid %u: err = %d\n", keyid, rc);
-
         spin_lock(&mds_capa_lock);
         list_del_init(&CUR_CAPA_KEY_LIST(mds));
         mds->mds_capa_key_idx = to_update;
+        expiry = CUR_CAPA_KEY_JIFFIES(mds);
         list_add_capa_key(&CUR_MDS_CAPA_KEY(mds), &mds_capa_key_list);
+
+        if (time_before(expiry, mds_eck_timer.expires) ||
+            !timer_pending(&mds_eck_timer)) {
+                mod_timer(&mds_eck_timer, expiry);
+                CDEBUG(D_INFO, "mds_eck_timer %lu\n", expiry);
+        }
         spin_unlock(&mds_capa_lock);
 
-        /* TODO: update timer here */
+        DEBUG_MDS_CAPA_KEY(D_INFO, &CUR_MDS_CAPA_KEY(mds),
+                           "mds_update_capa_key");
 out:
-        RETURN(rc);
+        RETURN(rc2);
 }
 
 static inline int have_expired_capa_key(void)
 {
         struct mds_capa_key *key;
+        unsigned long expiry;
         int expired = 0;
         ENTRY;
 
@@ -275,8 +288,18 @@ static inline int have_expired_capa_key(void)
         if (!list_empty(&mds_capa_key_list)) {
                 key = list_entry(mds_capa_key_list.next, struct mds_capa_key,
                                  k_list);
-                expired = time_before(expiry_to_jiffies(key->k_key->lk_expiry),
-                                      jiffies);
+                /* expiry is in sec, so in case it misses, the result will
+                 * minus 5 sec and then compare with jiffies. (in case the
+                 * clock is innacurate) */
+                expiry = CAPA_KEY_JIFFIES(key);
+                expired = time_before(expiry - 5 * HZ, jiffies);
+                if (!expired) {
+                        if (time_before(expiry, mds_eck_timer.expires) ||
+                            !timer_pending(&mds_eck_timer)) {
+                                mod_timer(&mds_eck_timer, expiry);
+                                CDEBUG(D_INFO, "mds_eck_timer %lu", expiry);
+                        }
+                }
         }
         spin_unlock(&mds_capa_lock);
 
@@ -292,18 +315,18 @@ static int mds_capa_key_thread_main(void *arg)
 {
         struct thread_ctl *ctl = arg;
         unsigned long flags;
+        int rc;
         ENTRY;
 
-        {
-                char name[sizeof(current->comm)];
-                snprintf(name, sizeof(name) - 1, "mds_ck");
-                kportal_daemonize(name);
-        }
+        lock_kernel();
+        ptlrpc_daemonize();
 
         SIGNAL_MASK_LOCK(current, flags);
         sigfillset(&current->blocked);
         RECALC_SIGPENDING;
         SIGNAL_MASK_UNLOCK(current, flags);
+        THREAD_NAME(current->comm, sizeof(current->comm), "mds_ck");
+        unlock_kernel();
 
         /*
          * letting starting function know, that we are ready and control may be
@@ -314,6 +337,7 @@ static int mds_capa_key_thread_main(void *arg)
 
         while (!mds_capa_key_check_stop()) {
                 struct l_wait_info lwi = { 0 };
+                unsigned long expiry;
                 struct mds_capa_key *key, *tmp, *next = NULL;
 
                 l_wait_event(mds_eck_thread.t_ctl_waitq,
@@ -323,26 +347,27 @@ static int mds_capa_key_thread_main(void *arg)
 
                 spin_lock(&mds_capa_lock);
                 list_for_each_entry_safe(key, tmp, &mds_capa_key_list, k_list) {
-                        if (time_after(expiry_to_jiffies(key->k_key->lk_expiry),
-                                       jiffies)) {
+                        if (time_after(CAPA_KEY_JIFFIES(key), jiffies)) {
                                 next = key;
                                 break;
                         }
 
                         spin_unlock(&mds_capa_lock);
 
-                        CDEBUG(D_INFO, "mds capa key expired, wake up updating "
-                                       "thread: mds #%u, key #%u\n",
+                        CDEBUG(D_INFO, "mds capa key expired: "
+                               "mds #%u, key #%u\n",
                                le32_to_cpu(key->k_key->lk_mdsid),
                                le32_to_cpu(key->k_key->lk_keyid));
 
-                        mds_update_capa_key(key->k_obd, key, 1);
+                        rc = mds_update_capa_key(key->k_obd, key, 1);
                         spin_lock(&mds_capa_lock);
                 }
 
-                if (next)
-                        mod_timer(&mds_eck_timer,
-                                  expiry_to_jiffies(next->k_key->lk_expiry));
+                if (next) {
+                        expiry = CAPA_KEY_JIFFIES(next);
+                        mod_timer(&mds_eck_timer, expiry);
+                        CDEBUG(D_INFO, "mds_eck_timer %lu", expiry);
+                }
                 spin_unlock(&mds_capa_lock);
         }
 
@@ -395,18 +420,85 @@ void mds_capa_key_stop_thread(void)
         EXIT;
 }
 
-int mds_pack_capa(struct obd_device *obd, struct mds_body *req_body,
-                  struct lustre_capa *req_capa, struct lustre_msg *repmsg,
-                  int *offset, struct mds_body *body)
+void mds_update_capa_stat(struct obd_device *obd, int stat)
+{
+        struct mds_obd *mds = &obd->u.mds;
+
+        spin_lock(&mds_capa_lock);
+        mds->mds_capa_stat = stat;
+        spin_unlock(&mds_capa_lock);
+}
+
+void mds_update_capa_timeout(struct obd_device *obd, unsigned long timeout)
+{
+        struct mds_obd *mds = &obd->u.mds;
+
+        spin_lock(&mds_capa_lock);
+        mds->mds_capa_timeout = timeout;
+        /* XXX: update all capabilities in cache if their expiry too long */
+        spin_unlock(&mds_capa_lock);
+}
+
+int mds_update_capa_key_timeout(struct obd_device *obd, unsigned long timeout)
+{
+        struct mds_obd *mds = &obd->u.mds;
+        struct timeval tv;
+        int rc;
+        ENTRY;
+
+        do_gettimeofday(&tv);
+
+        spin_lock(&mds_capa_lock);
+        mds->mds_capa_key_timeout = timeout;
+        if (CUR_CAPA_KEY_EXPIRY(mds) < tv.tv_sec + timeout) {
+                spin_unlock(&mds_capa_lock);
+                RETURN(0);
+        }
+        spin_unlock(&mds_capa_lock);
+
+        rc = mds_update_capa_key(obd, &CUR_MDS_CAPA_KEY(mds), 1);
+
+        RETURN(rc);
+}
+
+static void mds_capa_reverse_map(struct mds_export_data *med,
+                                 struct lustre_capa *capa)
+{
+        uid_t uid;
+
+        if (!med->med_remote) {
+                /* when not remote uid, ruid == uid */
+                capa->lc_ruid = capa->lc_uid;
+                return;
+        }
+
+        ENTRY;
+        uid = mds_idmap_lookup_uid(med->med_idmap, 1, capa->lc_uid);
+        if (uid == MDS_IDMAP_NOTFOUND)
+                uid = med->med_nllu;
+        capa->lc_ruid = uid;
+        capa->lc_flags |= CAPA_FL_REMUID;
+        EXIT;
+}
+
+
+int mds_pack_capa(struct obd_device *obd, struct mds_export_data *med,
+                  struct mds_body *req_body, struct lustre_capa *req_capa,
+                  struct ptlrpc_request *req, int *offset, struct mds_body *body)
 {
         struct mds_obd *mds = &obd->u.mds;
         struct lustre_capa *capa;
+        struct lustre_msg *repmsg = req->rq_repmsg;
         struct obd_capa *ocapa;
         __u8 key[CAPA_KEY_LEN];  /* key */
-        int expired, rc = 0;
+        int stat, expired, rc = 0;
         ENTRY;
 
-        if (mds->mds_capa_stat == 0) {
+        spin_lock(&mds_capa_lock);
+        stat = mds->mds_capa_stat;
+        spin_unlock(&mds_capa_lock);
+
+        if (stat == 0) {
                 (*offset)++;
                 RETURN(0); /* capability is disabled */
         }
@@ -421,19 +513,19 @@ int mds_pack_capa(struct obd_device *obd, struct mds_body *req_body,
 
                 mfd = mds_handle2mfd(&req_body->handle);
                 if (mfd == NULL) {
-                        CERROR("no handle for capa renewal ino "LPD64
-                               ": cookie "LPX64"\n",
-                               req_capa->lc_ino, req_body->handle.cookie);
+                        DEBUG_CAPA(D_INFO, req_capa, "no handle "LPX64" for",
+                                   req_body->handle.cookie);
                         RETURN(-ESTALE);
                 }
 
                 mode = accmode(mfd->mfd_mode);
                 if (!(req_capa->lc_op & mode)) {
-                        CERROR("invalid capa to renew ino "LPD64
-                               ": op %d mismatch with mode %d\n",
-                               req_capa->lc_ino, req_capa->lc_op, mfd->mfd_mode);
+                        DEBUG_CAPA(D_ERROR, req_capa, "accmode %d mismatch",
+                                   mode);
                         RETURN(-EACCES);
                 }
+
+                mds_mfd_put(mfd);
         }
 
         LASSERT(repmsg->buflens[*offset] == sizeof(*capa));
@@ -441,32 +533,37 @@ int mds_pack_capa(struct obd_device *obd, struct mds_body *req_body,
         LASSERT(capa != NULL);
 
         ocapa = capa_get(req_capa->lc_uid, req_capa->lc_op, req_capa->lc_mdsid,
-                         req_capa->lc_ino, MDS_CAPA, NULL, NULL, NULL);
+                         req_capa->lc_ino, req_capa->lc_igen, MDS_CAPA);
         if (ocapa) {
                 expired = capa_is_to_expire(ocapa);
                 if (!expired) {
                         capa_dup(capa, ocapa);
-                        capa_put(ocapa, MDS_CAPA);
+                        capa_put(ocapa);
                         GOTO(out, rc);
                 }
-                capa_put(ocapa, MDS_CAPA);
+                capa_put(ocapa);
         }
 
         memcpy(capa, req_capa, sizeof(*capa));
+        mds_capa_reverse_map(med, capa);
 
         spin_lock(&mds_capa_lock);
         capa->lc_keyid = le32_to_cpu(CUR_CAPA_KEY_ID(mds));
         capa->lc_expiry = round_expiry(mds->mds_capa_timeout);
         if (mds->mds_capa_timeout < CAPA_EXPIRY)
-                capa->lc_flags |= CAPA_FL_NOROUND;
+                capa->lc_flags |= CAPA_FL_SHORT;
         memcpy(key, CUR_CAPA_KEY(mds)->lk_key, sizeof(key));
         spin_unlock(&mds_capa_lock);
 
         capa_hmac(mds->mds_capa_hmac, key, capa);
 
-        rc = capa_renew(capa, MDS_CAPA);
+        ocapa = capa_renew(capa, MDS_CAPA);
+        if (!ocapa)
+                rc = -ENOMEM;
+        capa_put(ocapa);
 out:
         if (rc == 0)
                 body->valid |= OBD_MD_CAPA;
         RETURN(rc);
 }
+