Whamcloud - gitweb
LU-1842 protocol: add support for OBD_IDX_READ
[fs/lustre-release.git] / lustre / obdclass / dt_object.c
index 11a8984..51305b5 100644 (file)
@@ -598,3 +598,323 @@ const struct dt_index_features dt_quota_slv_features = {
        .dif_ptrsize            = 4
 };
 EXPORT_SYMBOL(dt_quota_slv_features);
+
+/* helper function returning what dt_index_features structure should be used
+ * based on the FID sequence. This is used by OBD_IDX_READ RPC */
+static inline const struct dt_index_features *dt_index_feat_select(__u64 seq,
+                                                                  __u32 mode)
+{
+       if (seq == FID_SEQ_QUOTA_GLB) {
+               /* global quota index */
+               if (!S_ISREG(mode))
+                       /* global quota index should be a regular file */
+                       return ERR_PTR(-ENOENT);
+               return &dt_quota_glb_features;
+       } else if (seq == FID_SEQ_QUOTA) {
+               /* quota slave index */
+               if (!S_ISREG(mode))
+                       /* slave index should be a regular file */
+                       return ERR_PTR(-ENOENT);
+               return &dt_quota_slv_features;
+       } else if (seq >= FID_SEQ_NORMAL) {
+               /* object is part of the namespace, verify that it is a
+                * directory */
+               if (!S_ISDIR(mode))
+                       /* sorry, we can only deal with directory */
+                       return ERR_PTR(-ENOTDIR);
+               return &dt_directory_features;
+       }
+
+       return ERR_PTR(-EOPNOTSUPP);
+}
+
+/*
+ * Fill a lu_idxpage with key/record pairs read for transfer via OBD_IDX_READ
+ * RPC
+ *
+ * \param env - is the environment passed by the caller
+ * \param lp  - is a pointer to the lu_page to fill
+ * \param nob - is the maximum number of bytes that should be copied
+ * \param iops - is the index operation vector associated with the index object
+ * \param it   - is a pointer to the current iterator
+ * \param attr - is the index attribute to pass to iops->rec()
+ * \param arg  - is a pointer to the idx_info structure
+ */
+static int dt_index_page_build(const struct lu_env *env, union lu_page *lp,
+                              int nob, const struct dt_it_ops *iops,
+                              struct dt_it *it, __u32 attr, void *arg)
+{
+       struct idx_info         *ii = (struct idx_info *)arg;
+       struct lu_idxpage       *lip = &lp->lp_idx;
+       char                    *entry;
+       int                      rc, size;
+       ENTRY;
+
+       /* no support for variable key & record size for now */
+       LASSERT((ii->ii_flags & II_FL_VARKEY) == 0);
+       LASSERT((ii->ii_flags & II_FL_VARREC) == 0);
+
+       /* initialize the header of the new container */
+       memset(lip, 0, LIP_HDR_SIZE);
+       lip->lip_magic = LIP_MAGIC;
+       nob           -= LIP_HDR_SIZE;
+
+       /* compute size needed to store a key/record pair */
+       size = ii->ii_recsize + ii->ii_keysize;
+       if ((ii->ii_flags & II_FL_NOHASH) == 0)
+               /* add hash if the client wants it */
+               size += sizeof(__u64);
+
+       entry = lip->lip_entries;
+       do {
+               char            *tmp_entry = entry;
+               struct dt_key   *key;
+               __u64            hash;
+
+               /* fetch 64-bit hash value */
+               hash = iops->store(env, it);
+               ii->ii_hash_end = hash;
+
+               if (nob < size) {
+                       if (lip->lip_nr == 0)
+                               GOTO(out, rc = -EINVAL);
+                       GOTO(out, rc = 0);
+               }
+
+               if ((ii->ii_flags & II_FL_NOHASH) == 0) {
+                       /* client wants to the 64-bit hash value associated with
+                        * each record */
+                       memcpy(tmp_entry, &hash, sizeof(hash));
+                       tmp_entry += sizeof(hash);
+               }
+
+               /* then the key value */
+               LASSERT(iops->key_size(env, it) == ii->ii_keysize);
+               key = iops->key(env, it);
+               memcpy(tmp_entry, key, ii->ii_keysize);
+               tmp_entry += ii->ii_keysize;
+
+               /* and finally the record */
+               rc = iops->rec(env, it, (struct dt_rec *)tmp_entry, attr);
+               if (rc != -ESTALE) {
+                       if (rc != 0)
+                               GOTO(out, rc);
+
+                       /* hash/key/record successfully copied! */
+                       lip->lip_nr++;
+                       if (unlikely(lip->lip_nr == 1 && ii->ii_count == 0))
+                               ii->ii_hash_start = hash;
+                       entry = tmp_entry + ii->ii_recsize;
+                       nob -= size;
+               }
+
+               /* move on to the next record */
+               do {
+                       rc = iops->next(env, it);
+               } while (rc == -ESTALE);
+
+       } while (rc == 0);
+
+       GOTO(out, rc);
+out:
+       if (rc >= 0 && lip->lip_nr > 0)
+               /* one more container */
+               ii->ii_count++;
+       if (rc > 0)
+               /* no more entries */
+               ii->ii_hash_end = II_END_OFF;
+       return rc;
+}
+
+/*
+ * Walk index and fill lu_page containers with key/record pairs
+ *
+ * \param env - is the environment passed by the caller
+ * \param obj - is the index object to parse
+ * \param rdpg - is the lu_rdpg descriptor associated with the transfer
+ * \param filler - is the callback function responsible for filling a lu_page
+ *                 with key/record pairs in the format wanted by the caller
+ * \param arg    - is an opaq argument passed to the filler function
+ *
+ * \retval sum (in bytes) of all filled lu_pages
+ * \retval -ve errno on failure
+ */
+int dt_index_walk(const struct lu_env *env, struct dt_object *obj,
+                 const struct lu_rdpg *rdpg, dt_index_page_build_t filler,
+                 void *arg)
+{
+       struct dt_it            *it;
+       const struct dt_it_ops  *iops;
+       unsigned int             pageidx, nob, nlupgs = 0;
+       int                      rc;
+       ENTRY;
+
+       LASSERT(rdpg->rp_pages != NULL);
+       LASSERT(obj->do_index_ops != NULL);
+
+       nob = rdpg->rp_count;
+       if (nob <= 0)
+               RETURN(-EFAULT);
+
+       /* Iterate through index and fill containers from @rdpg */
+       iops = &obj->do_index_ops->dio_it;
+       LASSERT(iops != NULL);
+       it = iops->init(env, obj, rdpg->rp_attrs, BYPASS_CAPA);
+       if (IS_ERR(it))
+               RETURN(PTR_ERR(it));
+
+       rc = iops->load(env, it, rdpg->rp_hash);
+       if (rc == 0) {
+               /*
+                * Iterator didn't find record with exactly the key requested.
+                *
+                * It is currently either
+                *
+                *     - positioned above record with key less than
+                *     requested---skip it.
+                *     - or not positioned at all (is in IAM_IT_SKEWED
+                *     state)---position it on the next item.
+                */
+               rc = iops->next(env, it);
+       } else if (rc > 0) {
+               rc = 0;
+       }
+
+       /* Fill containers one after the other. There might be multiple
+        * containers per physical page.
+        *
+        * At this point and across for-loop:
+        *  rc == 0 -> ok, proceed.
+        *  rc >  0 -> end of index.
+        *  rc <  0 -> error. */
+       for (pageidx = 0; rc == 0 && nob > 0; pageidx++) {
+               union lu_page   *lp;
+               int              i;
+
+               LASSERT(pageidx < rdpg->rp_npages);
+               lp = cfs_kmap(rdpg->rp_pages[pageidx]);
+
+               /* fill lu pages */
+               for (i = 0; i < LU_PAGE_COUNT; i++, lp++, nob -= LU_PAGE_SIZE) {
+                       rc = filler(env, lp, min_t(int, nob, LU_PAGE_SIZE),
+                                   iops, it, rdpg->rp_attrs, arg);
+                       if (rc < 0)
+                               break;
+                       /* one more lu_page */
+                       nlupgs++;
+                       if (rc > 0)
+                               /* end of index */
+                               break;
+               }
+               cfs_kunmap(rdpg->rp_pages[i]);
+       }
+
+       iops->put(env, it);
+       iops->fini(env, it);
+
+       if (rc >= 0)
+               rc = min_t(unsigned int, nlupgs * LU_PAGE_SIZE, rdpg->rp_count);
+
+       RETURN(rc);
+}
+EXPORT_SYMBOL(dt_index_walk);
+
+/**
+ * Walk key/record pairs of an index and copy them into 4KB containers to be
+ * transferred over the network. This is the common handler for OBD_IDX_READ
+ * RPC processing.
+ *
+ * \param env - is the environment passed by the caller
+ * \param dev - is the dt_device storing the index
+ * \param ii  - is the idx_info structure packed by the client in the
+ *              OBD_IDX_READ request
+ * \param rdpg - is the lu_rdpg descriptor
+ *
+ * \retval on success, return sum (in bytes) of all filled containers
+ * \retval appropriate error otherwise.
+ */
+int dt_index_read(const struct lu_env *env, struct dt_device *dev,
+                  struct idx_info *ii, const struct lu_rdpg *rdpg)
+{
+       const struct dt_index_features  *feat;
+       struct dt_object                *obj;
+       int                              rc;
+       ENTRY;
+
+       /* rp_count shouldn't be null and should be a multiple of the container
+        * size */
+       if (rdpg->rp_count <= 0 && (rdpg->rp_count & (LU_PAGE_SIZE - 1)) != 0)
+               RETURN(-EFAULT);
+
+       if (fid_seq(&ii->ii_fid) < FID_SEQ_SPECIAL)
+               /* block access to local files */
+               RETURN(-EPERM);
+
+       if (fid_seq(&ii->ii_fid) >= FID_SEQ_NORMAL)
+               /* we don't support directory transfer via OBD_IDX_READ for the
+                * time being */
+               RETURN(-EOPNOTSUPP);
+
+       /* lookup index object subject to the transfer */
+       obj = dt_locate(env, dev, &ii->ii_fid);
+       if (IS_ERR(obj))
+               RETURN(PTR_ERR(obj));
+       if (dt_object_exists(obj) == 0)
+               GOTO(out, rc = -ENOENT);
+
+       /* fetch index features associated with index object */
+       feat = dt_index_feat_select(fid_seq(&ii->ii_fid),
+                                   lu_object_attr(&obj->do_lu));
+       if (IS_ERR(feat))
+               GOTO(out, rc = PTR_ERR(feat));
+
+       /* load index feature if not done already */
+       if (obj->do_index_ops == NULL) {
+               rc = obj->do_ops->do_index_try(env, obj, feat);
+               if (rc)
+                       GOTO(out, rc);
+       }
+
+       /* fill ii_flags with supported index features */
+       ii->ii_flags &= II_FL_NOHASH;
+
+       ii->ii_keysize = feat->dif_keysize_max;
+       if ((feat->dif_flags & DT_IND_VARKEY) != 0) {
+               /* key size is variable */
+               ii->ii_flags |= II_FL_VARKEY;
+               /* we don't support variable key size for the time being */
+               GOTO(out, rc = -EOPNOTSUPP);
+       }
+
+       ii->ii_recsize = feat->dif_recsize_max;
+       if ((feat->dif_flags & DT_IND_VARREC) != 0) {
+               /* record size is variable */
+               ii->ii_flags |= II_FL_VARREC;
+               /* we don't support variable record size for the time being */
+               GOTO(out, rc = -EOPNOTSUPP);
+       }
+
+       if ((feat->dif_flags & DT_IND_NONUNQ) != 0)
+               /* key isn't necessarily unique */
+               ii->ii_flags |= II_FL_NONUNQ;
+
+       dt_read_lock(env, obj, 0);
+       /* fetch object version before walking the index */
+       ii->ii_version = dt_version_get(env, obj);
+
+       /* walk the index and fill lu_idxpages with key/record pairs */
+       rc = dt_index_walk(env, obj, rdpg, dt_index_page_build ,ii);
+       dt_read_unlock(env, obj);
+
+       if (rc == 0) {
+               /* index is empty */
+               LASSERT(ii->ii_count == 0);
+               ii->ii_hash_end = II_END_OFF;
+       }
+
+       GOTO(out, rc);
+out:
+       lu_object_put(env, &obj->do_lu);
+       return rc;
+}
+EXPORT_SYMBOL(dt_index_read);