* Use is subject to license terms.
*/
/*
- * Copyright (c) 2012, Intel Corporation.
+ * Copyright (c) 2012, 2013, Intel Corporation.
* Use is subject to license terms.
*/
/*
* Author: Mike Pershin <tappro@whamcloud.com>
*/
-#ifndef EXPORT_SYMTAB
-# define EXPORT_SYMTAB
-#endif
#define DEBUG_SUBSYSTEM S_OSD
#include <lustre_ver.h>
#include <libcfs/libcfs.h>
-#include <lustre_fsfilt.h>
#include <obd_support.h>
#include <lustre_net.h>
#include <obd.h>
lu_object_put(env, &obj->oo_dt.do_lu);
}
-static int osd_remote_fid(const struct lu_env *env, struct osd_device *osd,
- struct lu_fid *fid)
+static int osd_seq_exists(const struct lu_env *env, struct osd_device *osd,
+ obd_seq seq)
{
struct lu_seq_range *range = &osd_oti_get(env)->oti_seq_range;
struct seq_server_site *ss = osd_seq_site(osd);
int rc;
ENTRY;
- if (!fid_is_norm(fid) && !fid_is_root(fid))
- RETURN(0);
+ if (ss == NULL)
+ RETURN(1);
- rc = osd_fld_lookup(env, osd, fid, range);
+ rc = osd_fld_lookup(env, osd, seq, range);
if (rc != 0) {
- CERROR("%s: Can not lookup fld for "DFID"\n",
- osd_name(osd), PFID(fid));
- RETURN(rc);
+ CERROR("%s: Can not lookup fld for "LPX64"\n",
+ osd_name(osd), seq);
+ RETURN(0);
}
- RETURN(ss->ss_node_id != range->lsr_index);
+ RETURN(ss->ss_node_id == range->lsr_index);
+}
+
+static int osd_remote_fid(const struct lu_env *env, struct osd_device *osd,
+ struct lu_fid *fid)
+{
+ ENTRY;
+
+ if (!fid_is_norm(fid) && !fid_is_root(fid))
+ RETURN(0);
+
+ if (osd_seq_exists(env, osd, fid_seq(fid)))
+ RETURN(0);
+
+ RETURN(1);
}
/**
if (IS_ERR(child))
RETURN(PTR_ERR(child));
-#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 3, 91, 0)
-#define OSD_ZFS_INSERT_DOTS_FOR_TESTING__
-#endif
LASSERT(child->oo_db);
if (name[0] == '.') {
if (name[1] == 0) {
/* do not store ".", instead generate it
* during iteration */
-#ifndef OSD_ZFS_INSERT_DOTS_FOR_TESTING
GOTO(out, rc = 0);
-#endif
} else if (name[1] == '.' && name[2] == 0) {
/* update parent dnode in the child.
* later it will be used to generate ".." */
udmu_objset_t *uos = &osd->od_objset;
- rc = osd_object_sa_update(parent, SA_ZPL_PARENT(uos),
- &child->oo_db->db_object,
- 8, oh);
-#ifndef OSD_ZFS_INSERT_DOTS_FOR_TESTING
+ rc = osd_object_sa_update(parent,
+ SA_ZPL_PARENT(uos),
+ &child->oo_db->db_object,
+ 8, oh);
GOTO(out, rc);
-#endif
}
}
CLASSERT(sizeof(oti->oti_zde.lzd_reg) == 8);
(char *)key, 8, sizeof(oti->oti_zde) / 8,
(void *)&oti->oti_zde, oh->ot_tx);
-#ifndef OSD_ZFS_INSERT_DOTS_FOR_TESTING
out:
-#endif
if (child != NULL)
osd_object_put(env, child);
LASSERT(th != NULL);
oh = container_of0(th, struct osd_thandle, ot_super);
-#ifndef OSD_ZFS_INSERT_DOTS_FOR_TESTING
/*
- * in Orion . and .. were stored in the directory (not generated up on
- * request as now. we preserve them for backward compatibility
+ * In Orion . and .. were stored in the directory (not generated upon
+ * request as now). we preserve them for backward compatibility
*/
if (name[0] == '.') {
if (name[1] == 0) {
RETURN(0);
}
}
-#endif
/* Remove key from the ZAP */
rc = -zap_remove(osd->od_objset.os, zap_db->db_object,
strcpy(it->ozi_name, za->za_name);
-#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 3, 91, 0)
+#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 3, 90, 0)
if (za->za_name[0] == '.') {
if (za->za_name[1] == 0 || (za->za_name[1] == '.' &&
za->za_name[2] == 0)) {
if ((rc = -zap_cursor_retrieve(it->ozi_zc, za)) == 0)
rc = strlen(za->za_name);
-#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 3, 99, 0)
+#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 3, 90, 0)
if (rc == 0 && za->za_name[0] == '.') {
if (za->za_name[1] == 0 || (za->za_name[1] == '.' &&
za->za_name[2] == 0)) {
/*
* Primitives for index files using binary keys.
- * XXX: only 64-bit keys are supported for now.
*/
+static int osd_prepare_key(struct osd_object *o, __u64 *dst,
+ const struct dt_key *src)
+{
+ int size;
+
+ LASSERT(dst);
+ LASSERT(src);
+
+ /* align keysize to 64bit */
+ size = (o->oo_keysize + sizeof(__u64) - 1) / sizeof(__u64);
+ size *= sizeof(__u64);
+
+ LASSERT(size <= MAXNAMELEN);
+
+ if (unlikely(size > o->oo_keysize))
+ memset(dst + o->oo_keysize, 0, size - o->oo_keysize);
+ memcpy(dst, (const char *)src, o->oo_keysize);
+
+ return size;
+}
+
static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt,
struct dt_rec *rec, const struct dt_key *key,
struct lustre_capa *capa)
{
struct osd_object *obj = osd_dt_obj(dt);
struct osd_device *osd = osd_obj2dev(obj);
+ __u64 *k = osd_oti_get(env)->oti_key64;
int rc;
ENTRY;
+ rc = osd_prepare_key(obj, k, key);
+
rc = -zap_lookup_uint64(osd->od_objset.os, obj->oo_db->db_object,
- (const __u64 *)key, 1, 8, obj->oo_recsize,
+ k, rc, obj->oo_recusize, obj->oo_recsize,
(void *)rec);
RETURN(rc == 0 ? 1 : rc);
}
struct osd_object *obj = osd_dt_obj(dt);
struct osd_device *osd = osd_obj2dev(obj);
struct osd_thandle *oh;
+ __u64 *k = osd_oti_get(env)->oti_key64;
int rc;
ENTRY;
oh = container_of0(th, struct osd_thandle, ot_super);
+ rc = osd_prepare_key(obj, k, key);
+
/* Insert (key,oid) into ZAP */
rc = -zap_add_uint64(osd->od_objset.os, obj->oo_db->db_object,
- (const __u64 *)key, 1, 8, obj->oo_recsize,
+ k, rc, obj->oo_recusize, obj->oo_recsize,
(void *)rec, oh->ot_tx);
RETURN(rc);
}
struct osd_object *obj = osd_dt_obj(dt);
struct osd_device *osd = osd_obj2dev(obj);
struct osd_thandle *oh;
+ __u64 *k = osd_oti_get(env)->oti_key64;
int rc;
ENTRY;
LASSERT(th != NULL);
oh = container_of0(th, struct osd_thandle, ot_super);
+ rc = osd_prepare_key(obj, k, key);
+
/* Remove binary key from the ZAP */
rc = -zap_remove_uint64(osd->od_objset.os, obj->oo_db->db_object,
- (const __u64 *)key, 1, oh->ot_tx);
+ k, rc, oh->ot_tx);
RETURN(rc);
}
LASSERT(it);
LASSERT(it->ozi_zc);
- /* XXX: API is broken at the moment */
- LASSERT(*((const __u64 *)key) == 0);
+ /*
+ * XXX: we need a binary version of zap_cursor_move_to_key()
+ * to implement this API */
+ if (*((const __u64 *)key) != 0)
+ CERROR("NOT IMPLEMETED YET (move to %Lx)\n", *((__u64 *)key));
zap_cursor_fini(it->ozi_zc);
memset(it->ozi_zc, 0, sizeof(*it->ozi_zc));
const struct dt_it *di)
{
struct osd_zap_it *it = (struct osd_zap_it *)di;
+ struct osd_object *obj = it->ozi_obj;
zap_attribute_t *za = &osd_oti_get(env)->oti_za;
int rc = 0;
ENTRY;
RETURN(ERR_PTR(rc));
/* the binary key is stored in the name */
- it->ozi_key = *((__u64 *)za->za_name);
+ memcpy(&it->ozi_key, za->za_name, obj->oo_keysize);
RETURN((struct dt_key *)&it->ozi_key);
}
static int osd_index_it_key_size(const struct lu_env *env,
const struct dt_it *di)
{
- /* we only support 64-bit binary keys for the time being */
- RETURN(sizeof(__u64));
+ struct osd_zap_it *it = (struct osd_zap_it *)di;
+ struct osd_object *obj = it->ozi_obj;
+ RETURN(obj->oo_keysize);
}
static int osd_index_it_rec(const struct lu_env *env, const struct dt_it *di,
struct osd_zap_it *it = (struct osd_zap_it *)di;
struct osd_object *obj = it->ozi_obj;
struct osd_device *osd = osd_obj2dev(obj);
+ __u64 *k = osd_oti_get(env)->oti_key64;
int rc;
ENTRY;
if (rc)
RETURN(rc);
+ rc = osd_prepare_key(obj, k, (const struct dt_key *)za->za_name);
+
rc = -zap_lookup_uint64(osd->od_objset.os, obj->oo_db->db_object,
- (const __u64 *)za->za_name, 1, 8,
- obj->oo_recsize, (void *)rec);
+ k, rc, obj->oo_recusize, obj->oo_recsize,
+ (void *)rec);
RETURN(rc);
}
if ((feat->dif_flags & ~DT_IND_UPDATE) != 0)
RETURN(-EINVAL);
- /* Although the zap_*_uint64() primitives support large keys, we
- * limit ourselves to 64-bit keys for now */
- if (feat->dif_keysize_max != sizeof(__u64) ||
- feat->dif_keysize_min != sizeof(__u64))
+ if (feat->dif_keysize_max > ZAP_MAXNAMELEN)
+ RETURN(-E2BIG);
+ if (feat->dif_keysize_max != feat->dif_keysize_min)
RETURN(-EINVAL);
/* As for the record size, it should be a multiple of 8 bytes
*/
if (feat->dif_recsize_max > ZAP_MAXVALUELEN)
RETURN(-E2BIG);
- if (feat->dif_recsize_max != feat->dif_recsize_min ||
- (feat->dif_recsize_max & (sizeof(__u64) - 1)))
+ if (feat->dif_recsize_max != feat->dif_recsize_min)
RETURN(-EINVAL);
- obj->oo_recsize = feat->dif_recsize_max / sizeof(__u64);
+ obj->oo_keysize = feat->dif_keysize_max;
+ obj->oo_recsize = feat->dif_recsize_max;
+ obj->oo_recusize = 1;
+
+ /* ZFS prefers to work with array of 64bits */
+ if ((obj->oo_recsize & 7) == 0) {
+ obj->oo_recsize >>= 3;
+ obj->oo_recusize = 8;
+ }
dt->do_index_ops = &osd_index_ops;
}