Whamcloud - gitweb
LU-8581 osd: misuse of RCU in osd xattr cache 44/22344/6
authorLai Siyao <lai.siyao@intel.com>
Wed, 7 Sep 2016 08:20:10 +0000 (16:20 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Thu, 29 Sep 2016 14:58:53 +0000 (14:58 +0000)
In osd xattr cache implementation several list operations didn't
consider RCU concurrency.

Signed-off-by: Lai Siyao <lai.siyao@intel.com>
Change-Id: If4f91353fda5aca829f8e5ef327d9c06536f625d
Reviewed-on: http://review.whamcloud.com/22344
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Reviewed-by: Fan Yong <fan.yong@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/osd-ldiskfs/osd_handler.c

index a15d9b8..2946850 100644 (file)
@@ -1279,57 +1279,46 @@ struct osd_xattr_entry {
        char                    oxe_buf[0];
 };
 
-static struct osd_xattr_entry *osd_oxc_lookup(struct osd_object *obj,
-                                             const char *name,
-                                             size_t namelen)
-{
-       struct osd_xattr_entry *oxe;
-
-       list_for_each_entry(oxe, &obj->oo_xattr_list, oxe_list) {
-               if (namelen == oxe->oxe_namelen &&
-                   strncmp(name, oxe->oxe_buf, namelen) == 0)
-                       return oxe;
-       }
-
-       return NULL;
-}
-
 static int osd_oxc_get(struct osd_object *obj, const char *name,
                       struct lu_buf *buf)
 {
-       struct osd_xattr_entry *oxe;
-       size_t vallen;
+       struct osd_xattr_entry *tmp;
+       struct osd_xattr_entry *oxe = NULL;
+       size_t namelen = strlen(name);
+       int rc;
        ENTRY;
 
        rcu_read_lock();
-       oxe = osd_oxc_lookup(obj, name, strlen(name));
-       if (oxe == NULL) {
-               rcu_read_unlock();
-               RETURN(-ENOENT);
+       list_for_each_entry_rcu(tmp, &obj->oo_xattr_list, oxe_list) {
+               if (namelen == tmp->oxe_namelen &&
+                   strncmp(name, tmp->oxe_buf, namelen) == 0) {
+                       oxe = tmp;
+                       break;
+               }
        }
 
-       if (!oxe->oxe_exist) {
-               rcu_read_unlock();
-               RETURN(-ENODATA);
-       }
+       if (oxe == NULL)
+               GOTO(out, rc = -ENOENT);
 
-       vallen = oxe->oxe_len - sizeof(*oxe) - oxe->oxe_namelen - 1;
-       LASSERT(vallen > 0);
+       if (!oxe->oxe_exist)
+               GOTO(out, rc = -ENODATA);
 
-       if (buf->lb_buf == NULL) {
-               rcu_read_unlock();
-               RETURN(vallen);
-       }
+       /* vallen */
+       rc = oxe->oxe_len - sizeof(*oxe) - oxe->oxe_namelen - 1;
+       LASSERT(rc > 0);
 
-       if (buf->lb_len < vallen) {
-               rcu_read_unlock();
-               RETURN(-ERANGE);
-       }
+       if (buf->lb_buf == NULL)
+               GOTO(out, rc);
+
+       if (buf->lb_len < rc)
+               GOTO(out, rc = -ERANGE);
 
-       memcpy(buf->lb_buf, oxe->oxe_buf + oxe->oxe_namelen + 1, vallen);
+       memcpy(buf->lb_buf, &oxe->oxe_buf[namelen + 1], rc);
+       EXIT;
+out:
        rcu_read_unlock();
 
-       RETURN(vallen);
+       return rc;
 }
 
 static void osd_oxc_free(struct rcu_head *head)
@@ -1340,21 +1329,12 @@ static void osd_oxc_free(struct rcu_head *head)
        OBD_FREE(oxe, oxe->oxe_len);
 }
 
-static inline void __osd_oxc_del(struct osd_object *obj, const char *name)
-{
-       struct osd_xattr_entry *oxe;
-
-       oxe = osd_oxc_lookup(obj, name, strlen(name));
-       if (oxe != NULL) {
-               list_del(&oxe->oxe_list);
-               call_rcu(&oxe->oxe_rcu, osd_oxc_free);
-       }
-}
-
 static void osd_oxc_add(struct osd_object *obj, const char *name,
                        const char *buf, int buflen)
 {
        struct osd_xattr_entry *oxe;
+       struct osd_xattr_entry *old = NULL;
+       struct osd_xattr_entry *tmp;
        size_t namelen = strlen(name);
        size_t len = sizeof(*oxe) + namelen + 1 + buflen;
 
@@ -1376,15 +1356,36 @@ static void osd_oxc_add(struct osd_object *obj, const char *name,
 
        /* this should be rarely called, just remove old and add new */
        spin_lock(&obj->oo_guard);
-       __osd_oxc_del(obj, name);
-       list_add_tail(&oxe->oxe_list, &obj->oo_xattr_list);
+       list_for_each_entry(tmp, &obj->oo_xattr_list, oxe_list) {
+               if (namelen == tmp->oxe_namelen &&
+                   strncmp(name, tmp->oxe_buf, namelen) == 0) {
+                       old = tmp;
+                       break;
+               }
+       }
+       if (old != NULL) {
+               list_replace_rcu(&old->oxe_list, &oxe->oxe_list);
+               call_rcu(&old->oxe_rcu, osd_oxc_free);
+       } else {
+               list_add_tail_rcu(&oxe->oxe_list, &obj->oo_xattr_list);
+       }
        spin_unlock(&obj->oo_guard);
 }
 
 static void osd_oxc_del(struct osd_object *obj, const char *name)
 {
+       struct osd_xattr_entry *oxe;
+       size_t namelen = strlen(name);
+
        spin_lock(&obj->oo_guard);
-       __osd_oxc_del(obj, name);
+       list_for_each_entry(oxe, &obj->oo_xattr_list, oxe_list) {
+               if (namelen == oxe->oxe_namelen &&
+                   strncmp(name, oxe->oxe_buf, namelen) == 0) {
+                       list_del_rcu(&oxe->oxe_list);
+                       call_rcu(&oxe->oxe_rcu, osd_oxc_free);
+                       break;
+               }
+       }
        spin_unlock(&obj->oo_guard);
 }