Whamcloud - gitweb
LU-8569 linkea: linkEA size limitation
[fs/lustre-release.git] / lustre / obdclass / linkea.c
index 7b8891a..a1bcc3d 100644 (file)
@@ -38,8 +38,10 @@ int linkea_data_new(struct linkea_data *ldata, struct lu_buf *buf)
                return -ENOMEM;
        ldata->ld_leh = ldata->ld_buf->lb_buf;
        ldata->ld_leh->leh_magic = LINK_EA_MAGIC;
-       ldata->ld_leh->leh_len = sizeof(struct link_ea_header);
        ldata->ld_leh->leh_reccount = 0;
+       ldata->ld_leh->leh_len = sizeof(struct link_ea_header);
+       ldata->ld_leh->leh_overflow_time = 0;
+       ldata->ld_leh->leh_padding = 0;
        return 0;
 }
 EXPORT_SYMBOL(linkea_data_new);
@@ -54,11 +56,15 @@ int linkea_init(struct linkea_data *ldata)
                leh->leh_magic = LINK_EA_MAGIC;
                leh->leh_reccount = __swab32(leh->leh_reccount);
                leh->leh_len = __swab64(leh->leh_len);
-               /* entries are swabbed by linkea_entry_unpack */
+               leh->leh_overflow_time = __swab32(leh->leh_overflow_time);
+               leh->leh_padding = __swab32(leh->leh_padding);
+               /* individual entries are swabbed by linkea_entry_unpack() */
        }
+
        if (leh->leh_magic != LINK_EA_MAGIC)
                return -EINVAL;
-       if (leh->leh_reccount == 0)
+
+       if (leh->leh_reccount == 0 && leh->leh_overflow_time == 0)
                return -ENODATA;
 
        ldata->ld_leh = leh;
@@ -66,6 +72,18 @@ int linkea_init(struct linkea_data *ldata)
 }
 EXPORT_SYMBOL(linkea_init);
 
+int linkea_init_with_rec(struct linkea_data *ldata)
+{
+       int rc;
+
+       rc = linkea_init(ldata);
+       if (!rc && ldata->ld_leh->leh_reccount == 0)
+               rc = -ENODATA;
+
+       return rc;
+}
+EXPORT_SYMBOL(linkea_init_with_rec);
+
 /**
  * Pack a link_ea_entry.
  * All elements are stored as chars to avoid alignment issues.
@@ -97,6 +115,8 @@ EXPORT_SYMBOL(linkea_entry_pack);
 void linkea_entry_unpack(const struct link_ea_entry *lee, int *reclen,
                         struct lu_name *lname, struct lu_fid *pfid)
 {
+       LASSERT(lee != NULL);
+
        *reclen = (lee->lee_reclen[0] << 8) | lee->lee_reclen[1];
        memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
        fid_be_to_cpu(pfid, pfid);
@@ -113,25 +133,42 @@ EXPORT_SYMBOL(linkea_entry_unpack);
 int linkea_add_buf(struct linkea_data *ldata, const struct lu_name *lname,
                   const struct lu_fid *pfid)
 {
-       LASSERT(ldata->ld_leh != NULL);
+       struct link_ea_header *leh = ldata->ld_leh;
+       int reclen;
+
+       LASSERT(leh != NULL);
 
        if (lname == NULL || pfid == NULL)
                return -EINVAL;
 
-       ldata->ld_reclen = lname->ln_namelen + sizeof(struct link_ea_entry);
-       if (ldata->ld_leh->leh_len + ldata->ld_reclen >
-           ldata->ld_buf->lb_len) {
+       reclen = lname->ln_namelen + sizeof(struct link_ea_entry);
+       if (unlikely(leh->leh_len + reclen > MAX_LINKEA_SIZE)) {
+               /* Use 32-bits to save the overflow time, although it will
+                * shrink the cfs_time_current_sec() returned 64-bits value
+                * to 32-bits value, it is still quite large and can be used
+                * for about 140 years. That is enough. */
+               leh->leh_overflow_time = cfs_time_current_sec();
+               if (unlikely(leh->leh_overflow_time == 0))
+                       leh->leh_overflow_time++;
+
+               CDEBUG(D_INODE, "No enough space to hold linkea entry '"
+                      DFID": %.*s' at %u\n", PFID(pfid), lname->ln_namelen,
+                      lname->ln_name, leh->leh_overflow_time);
+               return 0;
+       }
+
+       if (leh->leh_len + reclen > ldata->ld_buf->lb_len) {
                if (lu_buf_check_and_grow(ldata->ld_buf,
-                                         ldata->ld_leh->leh_len +
-                                         ldata->ld_reclen) < 0)
+                                         leh->leh_len + reclen) < 0)
                        return -ENOMEM;
+
+               leh = ldata->ld_leh = ldata->ld_buf->lb_buf;
        }
 
-       ldata->ld_leh = ldata->ld_buf->lb_buf;
-       ldata->ld_lee = ldata->ld_buf->lb_buf + ldata->ld_leh->leh_len;
+       ldata->ld_lee = ldata->ld_buf->lb_buf + leh->leh_len;
        ldata->ld_reclen = linkea_entry_pack(ldata->ld_lee, lname, pfid);
-       ldata->ld_leh->leh_len += ldata->ld_reclen;
-       ldata->ld_leh->leh_reccount++;
+       leh->leh_len += ldata->ld_reclen;
+       leh->leh_reccount++;
        CDEBUG(D_INODE, "New link_ea name '"DFID":%.*s' is added\n",
               PFID(pfid), lname->ln_namelen, lname->ln_name);
        return 0;
@@ -142,6 +179,7 @@ EXPORT_SYMBOL(linkea_add_buf);
 void linkea_del_buf(struct linkea_data *ldata, const struct lu_name *lname)
 {
        LASSERT(ldata->ld_leh != NULL && ldata->ld_lee != NULL);
+       LASSERT(ldata->ld_leh->leh_reccount > 0);
 
        ldata->ld_leh->leh_reccount--;
        ldata->ld_leh->leh_len -= ldata->ld_reclen;
@@ -157,6 +195,70 @@ void linkea_del_buf(struct linkea_data *ldata, const struct lu_name *lname)
 }
 EXPORT_SYMBOL(linkea_del_buf);
 
+int linkea_links_new(struct linkea_data *ldata, struct lu_buf *buf,
+                    const struct lu_name *cname, const struct lu_fid *pfid)
+{
+       int rc;
+
+       rc = linkea_data_new(ldata, buf);
+       if (!rc)
+               rc = linkea_add_buf(ldata, cname, pfid);
+
+       return rc;
+}
+EXPORT_SYMBOL(linkea_links_new);
+
+/**
+ * Mark the linkEA as overflow with current timestamp,
+ * and remove the last linkEA entry.
+ *
+ * Return the new linkEA size.
+ */
+int linkea_overflow_shrink(struct linkea_data *ldata)
+{
+       struct link_ea_header *leh;
+       struct lu_name tname;
+       struct lu_fid tfid;
+       int count;
+
+       leh = ldata->ld_leh = ldata->ld_buf->lb_buf;
+       if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) {
+               leh->leh_magic = LINK_EA_MAGIC;
+               leh->leh_reccount = __swab32(leh->leh_reccount);
+               leh->leh_overflow_time = __swab32(leh->leh_overflow_time);
+               leh->leh_padding = __swab32(leh->leh_padding);
+       }
+
+       LASSERT(leh->leh_reccount > 0);
+
+       leh->leh_len = sizeof(struct link_ea_header);
+       leh->leh_reccount--;
+       if (unlikely(leh->leh_reccount == 0))
+               return 0;
+
+       leh->leh_overflow_time = cfs_time_current_sec();
+       if (unlikely(leh->leh_overflow_time == 0))
+               leh->leh_overflow_time++;
+       ldata->ld_reclen = 0;
+       ldata->ld_lee = (struct link_ea_entry *)(leh + 1);
+       for (count = 0; count < leh->leh_reccount; count++) {
+               linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen,
+                                   &tname, &tfid);
+               leh->leh_len += ldata->ld_reclen;
+               ldata->ld_lee = (struct link_ea_entry *)((char *)ldata->ld_lee +
+                                                        ldata->ld_reclen);
+       }
+
+       linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen, &tname, &tfid);
+       CDEBUG(D_INODE, "No enough space to hold the last linkea entry '"
+              DFID": %.*s', shrink it, left %d linkea entries, size %llu\n",
+              PFID(&tfid), tname.ln_namelen, tname.ln_name,
+              leh->leh_reccount, leh->leh_len);
+
+       return leh->leh_len;
+}
+EXPORT_SYMBOL(linkea_overflow_shrink);
+
 /**
  * Check if such a link exists in linkEA.
  *
@@ -177,8 +279,9 @@ int linkea_links_find(struct linkea_data *ldata, const struct lu_name *lname,
 
        LASSERT(ldata->ld_leh != NULL);
 
-       /* link #0 */
-       ldata->ld_lee = (struct link_ea_entry *)(ldata->ld_leh + 1);
+       /* link #0, if leh_reccount == 0 we skip the loop and return -ENOENT */
+       if (likely(ldata->ld_leh->leh_reccount > 0))
+               ldata->ld_lee = (struct link_ea_entry *)(ldata->ld_leh + 1);
 
        for (count = 0; count < ldata->ld_leh->leh_reccount; count++) {
                linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen,