#include <unistd.h>
#include <errno.h>
#include <limits.h>
+#include <assert.h>
#include <sys/xattr.h>
#include <sys/param.h>
struct lu_extent llc_extent; /* [start, end) of component */
uint32_t llc_id; /* unique ID of component */
uint32_t llc_flags; /* LCME_FL_* flags */
+ uint64_t llc_timestamp; /* snapshot timestamp */
struct list_head llc_list; /* linked to the llapi_layout
components list */
};
ent = &comp_v1->lcm_entries[i];
__swab32s(&ent->lcme_id);
__swab32s(&ent->lcme_flags);
+ __swab64s(&ent->lcme_timestamp);
__swab64s(&ent->lcme_extent.e_start);
__swab64s(&ent->lcme_extent.e_end);
__swab32s(&ent->lcme_offset);
* Convert the data from a lov_user_md to a newly allocated llapi_layout.
* The caller is responsible for freeing the returned pointer.
*
- * \param[in] lum LOV user metadata structure to copy data from
- * \param[in] lum_size size the the lum passed in
+ * \param[in] lov_xattr LOV user metadata xattr to copy data from
+ * \param[in] lov_xattr_size size the lov_xattr_size passed in
*
* \retval valid llapi_layout pointer on success
* \retval NULL if memory allocation fails
*/
-static struct llapi_layout *
-llapi_layout_from_lum(const struct lov_user_md *lum, int lum_size)
+struct llapi_layout *llapi_layout_get_by_xattr(const void *lov_xattr,
+ ssize_t lov_xattr_size)
{
+ const struct lov_user_md *lum = lov_xattr;
struct lov_comp_md_v1 *comp_v1 = NULL;
struct lov_comp_md_entry_v1 *ent;
struct lov_user_md *v1;
lum->lmm_magic == LOV_MAGIC_V3) {
ent_count = 1;
layout->llot_is_composite = false;
+
+ if (lov_xattr_size <= 0) {
+ errno = EINVAL;
+ goto error;
+ }
+ } else {
+ errno = EOPNOTSUPP;
+ goto error;
}
if (ent_count == 0) {
ent = &comp_v1->lcm_entries[i];
v1 = (struct lov_user_md *)((char *)comp_v1 +
ent->lcme_offset);
- lum_size = ent->lcme_size;
+ lov_xattr_size = ent->lcme_size;
} else {
ent = NULL;
}
- obj_count = llapi_layout_objects_in_lum(v1, lum_size);
+ obj_count = llapi_layout_objects_in_lum(v1, lov_xattr_size);
comp = __llapi_comp_alloc(obj_count);
if (comp == NULL)
goto error;
comp->llc_extent.e_end = ent->lcme_extent.e_end;
comp->llc_id = ent->lcme_id;
comp->llc_flags = ent->lcme_flags;
+ if (comp->llc_flags & LCME_FL_NOSYNC)
+ comp->llc_timestamp = ent->lcme_timestamp;
} else {
comp->llc_extent.e_start = 0;
comp->llc_extent.e_end = LUSTRE_EOF;
ent = &comp_v1->lcm_entries[ent_idx];
ent->lcme_id = comp->llc_id;
ent->lcme_flags = comp->llc_flags;
+ if (ent->lcme_flags & LCME_FL_NOSYNC)
+ ent->lcme_timestamp = comp->llc_timestamp;
ent->lcme_extent.e_start = comp->llc_extent.e_start;
ent->lcme_extent.e_end = comp->llc_extent.e_end;
ent->lcme_size = blob_size;
goto out;
}
- layout = llapi_layout_from_lum(lum, bytes_read);
+ layout = llapi_layout_get_by_xattr(lum, bytes_read);
out:
free(lum);
return layout;
*
* \param[in] path name of the file to open
* \param[in] open_flags open() flags
- * \param[in] mode permissions to create new file with
+ * \param[in] mode permissions to create file, filtered by umask
* \param[in] layout layout to create new file with
*
* \retval non-negative file descriptor on successful open
return 0;
}
+const char *llapi_layout_flags_string(uint32_t flags)
+{
+ switch (flags & LCM_FL_FLR_MASK) {
+ case LCM_FL_RDONLY:
+ return "ro";
+ case LCM_FL_WRITE_PENDING:
+ return "wp";
+ case LCM_FL_SYNC_PENDING:
+ return "sp";
+ }
+
+ return "0";
+}
+
+const __u16 llapi_layout_string_flags(char *string)
+{
+ if (strncmp(string, "ro", strlen(string)) == 0)
+ return LCM_FL_RDONLY;
+ if (strncmp(string, "wp", strlen(string)) == 0)
+ return LCM_FL_WRITE_PENDING;
+ if (strncmp(string, "sp", strlen(string)) == 0)
+ return LCM_FL_SYNC_PENDING;
+
+ return 0;
+}
+
/**
* llapi_layout_mirror_count_is_valid() - Check the validity of mirror count.
* @count: Mirror count value to be checked.
if (comp->llc_list.prev != &layout->llot_comp_list) {
prev = list_entry(comp->llc_list.prev, typeof(*prev),
llc_list);
- if (start != prev->llc_extent.e_end) {
+ if (start != 0 && start != prev->llc_extent.e_end) {
errno = EINVAL;
return -1;
}
if (comp->llc_list.next != &layout->llot_comp_list) {
next = list_entry(comp->llc_list.next, typeof(*next),
llc_list);
- if (end != next->llc_extent.e_start) {
+ if (next->llc_extent.e_start != 0 &&
+ end != next->llc_extent.e_start) {
errno = EINVAL;
return -1;
}
return 0;
}
+/**
+ * Adds a first component of a mirror to \a layout.
+ * The \a layout will change it's current component pointer to
+ * the newly added component, and it'll be turned into a composite
+ * layout if it was not before the adding.
+ *
+ * \param[in] layout existing composite or plain layout
+ *
+ * \retval 0 on success
+ * \retval <0 if error occurs
+ */
+int llapi_layout_add_first_comp(struct llapi_layout *layout)
+{
+ struct llapi_layout_comp *comp, *new;
+
+ comp = __llapi_layout_cur_comp(layout);
+ if (comp == NULL)
+ return -1;
+
+ new = __llapi_comp_alloc(0);
+ if (new == NULL)
+ return -1;
+
+ new->llc_extent.e_start = 0;
+
+ list_add_tail(&new->llc_list, &layout->llot_comp_list);
+ layout->llot_cur_comp = new;
+ layout->llot_is_composite = true;
+
+ return 0;
+}
/**
* Deletes current component from the composite layout. The component
/**
* Iterate every components in the @layout and call callback function @cb.
*
- * \param[in]
+ * \param[in] layout component layout list.
+ * \param[in] cb callback for each component
+ * \param[in] cbdata callback data
+ *
+ * \retval < 0 error happens during the iteration
+ * \retval LLAPI_LAYOUT_ITER_CONT finished the iteration w/o error
+ * \retval LLAPI_LAYOUT_ITER_STOP got something, stop the iteration
*/
int llapi_layout_comp_iterate(struct llapi_layout *layout,
llapi_layout_iter_cb cb, void *cbdata)
if (rc < 0)
return rc;
- while (rc == 0) {
+ /**
+ * make sure on success llapi_layout_comp_use() API returns 0 with
+ * USE_FIRST.
+ */
+ assert(rc == 0);
+
+ while (1) {
rc = cb(layout, cbdata);
if (rc != LLAPI_LAYOUT_ITER_CONT)
break;
rc = llapi_layout_comp_use(layout, LLAPI_LAYOUT_COMP_USE_NEXT);
if (rc < 0)
return rc;
+ else if (rc == 1) /* reached the last comp */
+ return LLAPI_LAYOUT_ITER_CONT;
}
- return rc >= 0 ? LLAPI_LAYOUT_ITER_CONT : rc;
+ return rc;
}
/**
/* not in the specified mirror */
if (j == ids_nr)
goto next;
+ } else if (flags & LCME_FL_NOSYNC) {
+ /* if not specified mirrors, do not resync "nosync"
+ * mirrors */
+ goto next;
}
rc = llapi_layout_comp_id_get(layout, &id);
}
/* locate @layout to a valid component covering file [file_start, file_end) */
-static uint32_t llapi_mirror_find(struct llapi_layout *layout,
- uint64_t file_start, uint64_t file_end,
- uint64_t *endp)
+uint32_t llapi_mirror_find(struct llapi_layout *layout,
+ uint64_t file_start, uint64_t file_end,
+ uint64_t *endp)
{
uint32_t mirror_id = 0;
int rc;
return mirror_id;
}
-ssize_t llapi_mirror_resync_one(int fd, struct llapi_layout *layout,
- uint32_t dst, uint64_t start, uint64_t end)
+int llapi_mirror_resync_many(int fd, struct llapi_layout *layout,
+ struct llapi_resync_comp *comp_array,
+ int comp_size, uint64_t start, uint64_t end)
{
- uint64_t mirror_end = 0;
- ssize_t result = 0;
size_t count;
+ size_t page_size = sysconf(_SC_PAGESIZE);
+ const size_t buflen = 4 << 20; /* 4M */
+ void *buf;
+ uint64_t pos = start;
+ int i;
+ int rc;
+
+ rc = posix_memalign(&buf, page_size, buflen);
+ if (rc)
+ return -rc;
if (end == OBD_OBJECT_EOF)
count = OBD_OBJECT_EOF;
while (count > 0) {
uint32_t src;
- size_t to_copy;
- ssize_t copied;
+ uint64_t mirror_end = 0;
+ ssize_t bytes_read;
+ size_t to_read;
+ size_t to_write;
- src = llapi_mirror_find(layout, start, end, &mirror_end);
+ src = llapi_mirror_find(layout, pos, end, &mirror_end);
if (src == 0)
return -ENOENT;
- if (mirror_end == OBD_OBJECT_EOF)
- to_copy = count;
- else
- to_copy = MIN(count, mirror_end - start);
-
- copied = llapi_mirror_copy(fd, src, dst, start, to_copy);
- if (copied < 0)
- return copied;
+ if (mirror_end == OBD_OBJECT_EOF) {
+ to_read = count;
+ } else {
+ to_read = MIN(count, mirror_end - pos);
+ to_read = (to_read + page_size - 1) & ~(page_size - 1);
+ }
+ to_read = MIN(buflen, to_read);
- result += copied;
- if (copied < to_copy) /* end of file */
+ bytes_read = llapi_mirror_read(fd, src, buf, to_read, pos);
+ if (bytes_read == 0) {
+ /* end of file */
+ break;
+ }
+ if (bytes_read < 0) {
+ rc = bytes_read;
break;
+ }
+
+ /* round up to page align to make direct IO happy. */
+ to_write = (bytes_read + page_size - 1) & ~(page_size - 1);
+
+ for (i = 0; i < comp_size; i++) {
+ ssize_t written;
+ off_t pos2 = pos;
+ size_t to_write2 = to_write;
+
+ /* skip non-overlapped component */
+ if (pos >= comp_array[i].lrc_end ||
+ pos + to_write <= comp_array[i].lrc_start)
+ continue;
+
+ if (pos < comp_array[i].lrc_start)
+ pos2 = comp_array[i].lrc_start;
+
+ to_write2 -= pos2 - pos;
+
+ if ((pos + to_write) > comp_array[i].lrc_end)
+ to_write2 -= pos + to_write -
+ comp_array[i].lrc_end;
+
+ written = llapi_mirror_write(fd,
+ comp_array[i].lrc_mirror_id,
+ buf + pos2 - pos,
+ to_write2, pos2);
+ if (written < 0) {
+ /**
+ * this component is not written successfully,
+ * mark it using its lrc_synced, it is supposed
+ * to be false before getting here.
+ *
+ * And before this function returns, all
+ * elements of comp_array will reverse their
+ * lrc_synced flag to reflect their true
+ * meanings.
+ */
+ comp_array[i].lrc_synced = true;
+ continue;
+ }
+ assert(written == to_write2);
+ }
- if (count != OBD_OBJECT_EOF)
- count -= copied;
- start += copied;
+ pos += bytes_read;
+ count -= bytes_read;
}
- return result;
+ free(buf);
+
+ if (rc < 0) {
+ for (i = 0; i < comp_size; i++)
+ comp_array[i].lrc_synced = false;
+ return rc;
+ }
+
+ for (i = 0; i < comp_size; i++) {
+ comp_array[i].lrc_synced = !comp_array[i].lrc_synced;
+ if (comp_array[i].lrc_synced && pos & (page_size - 1)) {
+ rc = llapi_mirror_truncate(fd,
+ comp_array[i].lrc_mirror_id, pos);
+ if (rc < 0)
+ comp_array[i].lrc_synced = false;
+ }
+ }
+
+ /* partially successful is successful */
+ return 0;
}