Whamcloud - gitweb
LU-1187 osp: add osp_md_object for remote directory.
authorwangdi <di.wang@whamcloud.com>
Sat, 16 Nov 2013 18:52:14 +0000 (10:52 -0800)
committerOleg Drokin <green@whamcloud.com>
Thu, 31 Jan 2013 21:23:13 +0000 (16:23 -0500)
In declare phase, the update for remote object will be added
in the remote list of the transaction, which will then being
sent to the remote MDT during trans_start.

Add osp_md_object for remote directory, any updates for
remote operation will be packed into RPC by api in
the osp_md_object

Signed-off-by: wang di <di.wang@intel.com>
Change-Id: I5fac24e21e6663dca0e2d2fa5d90d4a07a002136
Reviewed-on: http://review.whamcloud.com/4927
Reviewed-by: Mike Pershin <mike.pershin@intel.com>
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
26 files changed:
lustre/include/Makefile.am
lustre/include/dt_object.h
lustre/include/lprocfs_status.h
lustre/include/lu_object.h
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_req_layout.h
lustre/include/lustre_update.h [new file with mode: 0644]
lustre/include/obd_class.h
lustre/include/obd_support.h
lustre/lod/lod_dev.c
lustre/lod/lod_internal.h
lustre/mdd/mdd_dir.c
lustre/mdt/mdt_lib.c
lustre/obdclass/obdo.c
lustre/osp/Makefile.in
lustre/osp/osp_dev.c
lustre/osp/osp_internal.h
lustre/osp/osp_md_object.c [new file with mode: 0644]
lustre/osp/osp_object.c
lustre/ptlrpc/layout.c
lustre/ptlrpc/lproc_ptlrpc.c
lustre/ptlrpc/pack_generic.c
lustre/ptlrpc/wiretest.c
lustre/utils/req-layout.c
lustre/utils/wirecheck.c
lustre/utils/wiretest.c

index 6a6c792..313c2cc 100644 (file)
@@ -50,4 +50,5 @@ EXTRA_DIST = ioctl.h liblustre.h lprocfs_status.h lustre_cfg.h        \
              md_object.h dt_object.h lustre_param.h lustre_mdt.h \
              lustre_fid.h lustre_fld.h lustre_req_layout.h lustre_capa.h \
              lustre_idmap.h lustre_eacl.h interval_tree.h obd_cksum.h \
-            lu_ref.h cl_object.h lustre_acl.h lclient.h lu_target.h
+            lu_ref.h cl_object.h lustre_acl.h lclient.h lu_target.h \
+            lustre_update.h
index 575b9b8..07f9a4b 100644 (file)
@@ -703,6 +703,12 @@ static inline int dt_object_exists(const struct dt_object *dt)
         return lu_object_exists(&dt->do_lu);
 }
 
+static inline struct dt_object *lu2dt_obj(struct lu_object *o)
+{
+       LASSERT(ergo(o != NULL, lu_device_is_dt(o->lo_dev)));
+       return container_of0(o, struct dt_object, do_lu);
+}
+
 /**
  * This is the general purpose transaction handle.
  * 1. Transaction Life Cycle
@@ -718,24 +724,32 @@ static inline int dt_object_exists(const struct dt_object *dt)
  *      No RPC request should be issued inside transaction.
  */
 struct thandle {
-        /** the dt device on which the transactions are executed */
-        struct dt_device *th_dev;
+       /** the dt device on which the transactions are executed */
+       struct dt_device *th_dev;
+
+       /** context for this transaction, tag is LCT_TX_HANDLE */
+       struct lu_context th_ctx;
 
-        /** additional tags (layers can add in declare) */
-        __u32             th_tags;
+       /** additional tags (layers can add in declare) */
+       __u32             th_tags;
 
-        /** context for this transaction, tag is LCT_TX_HANDLE */
-        struct lu_context th_ctx;
+       /** the last operation result in this transaction.
+        * this value is used in recovery */
+       __s32             th_result;
 
-        /** the last operation result in this transaction.
-         * this value is used in recovery */
-        __s32             th_result;
+       /** whether we need sync commit */
+       int               th_sync:1;
 
-        /** whether we need sync commit */
-        int               th_sync:1;
+       /* local transation, no need to inform other layers */
+       int               th_local:1;
 
-        /* local transation, no need to inform other layers */
-        int               th_local:1;
+       /* In DNE, one transaction can be disassemblied into
+        * updates on several different MDTs, and these updates
+        * will be attached to th_remote_update_list per target.
+        * Only single thread will access the list, no need lock
+        */
+       cfs_list_t              th_remote_update_list;
+       struct update_request   *th_current_request;
 };
 
 /**
index 2904728..390a13c 100644 (file)
@@ -289,7 +289,20 @@ static inline int opcode_offset(__u32 opc) {
                         OPC_RANGE(LDLM) +
                         OPC_RANGE(MDS) +
                         OPC_RANGE(OST));
-        } else {
+       } else if (opc < UPDATE_LAST_OPC) {
+               /* update opcode */
+               return (opc - UPDATE_FIRST_OPC +
+                       OPC_RANGE(FLD) +
+                       OPC_RANGE(SEC) +
+                       OPC_RANGE(SEQ) +
+                       OPC_RANGE(QUOTA) +
+                       OPC_RANGE(LLOG) +
+                       OPC_RANGE(OBD) +
+                       OPC_RANGE(MGS) +
+                       OPC_RANGE(LDLM) +
+                       OPC_RANGE(MDS) +
+                       OPC_RANGE(OST));
+       } else {
                 /* Unknown Opcode */
                 return -1;
         }
@@ -305,7 +318,8 @@ static inline int opcode_offset(__u32 opc) {
                             OPC_RANGE(SEC)  + \
                             OPC_RANGE(SEQ)  + \
                             OPC_RANGE(SEC)  + \
-                            OPC_RANGE(FLD)  )
+                           OPC_RANGE(FLD)  + \
+                           OPC_RANGE(UPDATE))
 
 #define EXTRA_MAX_OPCODES ((PTLRPC_LAST_CNTR - PTLRPC_FIRST_CNTR)  + \
                             OPC_RANGE(EXTRA))
index 82b136a..be66fda 100644 (file)
@@ -866,7 +866,7 @@ static inline int lu_object_assert_not_exists(const struct lu_object *o)
  */
 static inline __u32 lu_object_attr(const struct lu_object *o)
 {
-        LASSERT(lu_object_exists(o) > 0);
+       LASSERT(lu_object_exists(o) != 0);
         return o->lo_header->loh_attr;
 }
 
index 9b3a848..7f3f71a 100644 (file)
@@ -1869,6 +1869,15 @@ typedef enum {
 
 #define MDS_FIRST_OPC    MDS_GETATTR
 
+
+/* opcodes for object update */
+typedef enum {
+       UPDATE_OBJ      = 1000,
+       UPDATE_LAST_OPC
+} update_cmd_t;
+
+#define UPDATE_FIRST_OPC    UPDATE_OBJ
+
 /*
  * Do not exceed 63
  */
@@ -2997,7 +3006,8 @@ struct obdo {
         obd_flag                o_flags;
         obd_count               o_nlink;        /* brw: checksum */
         obd_count               o_parent_oid;
-        obd_count               o_misc;         /* brw: o_dropped */
+       obd_count               o_misc;         /* brw: o_dropped */
+
         __u64                   o_ioepoch;      /* epoch in ost writes */
         __u32                   o_stripe_idx;   /* holds stripe idx */
         __u32                   o_parent_ver;
@@ -3005,16 +3015,16 @@ struct obdo {
                                                  * locks */
         struct llog_cookie      o_lcookie;      /* destroy: unlink cookie from
                                                  * MDS */
-        __u32                   o_uid_h;
-        __u32                   o_gid_h;
+       __u32                   o_uid_h;
+       __u32                   o_gid_h;
 
-        __u64                   o_data_version; /* getattr: sum of iversion for
-                                                 * each stripe.
-                                                 * brw: grant space consumed on
-                                                 * the client for the write */
-        __u64                   o_padding_4;
-        __u64                   o_padding_5;
-        __u64                   o_padding_6;
+       __u64                   o_data_version; /* getattr: sum of iversion for
+                                                * each stripe.
+                                                * brw: grant space consumed on
+                                                * the client for the write */
+       __u64                   o_padding_4;
+       __u64                   o_padding_5;
+       __u64                   o_padding_6;
 };
 
 #define o_id     o_oi.oi_id
@@ -3339,5 +3349,92 @@ extern void lustre_swab_hsm_progress_kernel(struct hsm_progress_kernel *hpk);
 extern void lustre_swab_hsm_user_state(struct hsm_user_state *hus);
 extern void lustre_swab_hsm_user_item(struct hsm_user_item *hui);
 extern void lustre_swab_hsm_request(struct hsm_request *hr);
+
+/**
+ * These are object update opcode under UPDATE_OBJ, which is currently
+ * being used by cross-ref operations between MDT.
+ *
+ * During the cross-ref operation, the Master MDT, which the client send the
+ * request to, will disassembly the operation into object updates, then OSP
+ * will send these updates to the remote MDT to be executed.
+ *
+ *   Update request format
+ *   magic:  UPDATE_BUFFER_MAGIC_V1
+ *   Count:  How many updates in the req.
+ *   bufs[0] : following are packets of object.
+ *   update[0]:
+ *             type: object_update_op, the op code of update
+ *             fid: The object fid of the update.
+ *             lens/bufs: other parameters of the update.
+ *   update[1]:
+ *             type: object_update_op, the op code of update
+ *             fid: The object fid of the update.
+ *             lens/bufs: other parameters of the update.
+ *   ..........
+ *   update[7]:        type: object_update_op, the op code of update
+ *             fid: The object fid of the update.
+ *             lens/bufs: other parameters of the update.
+ *   Current 8 maxim updates per object update request.
+ *
+ *******************************************************************
+ *   update reply format:
+ *
+ *   ur_version: UPDATE_REPLY_V1
+ *   ur_count:   The count of the reply, which is usually equal
+ *              to the number of updates in the request.
+ *   ur_lens:    The reply lengths of each object update.
+ *
+ *   replies:    1st update reply  [4bytes_ret: other body]
+ *              2nd update reply  [4bytes_ret: other body]
+ *              .....
+ *              nth update reply  [4bytes_ret: other body]
+ *
+ *   For each reply of the update, the format would be
+ *      result(4 bytes):Other stuff
+ */
+
+#define UPDATE_MAX_OPS         10
+#define UPDATE_BUFFER_MAGIC_V1 0xBDDE0001
+#define UPDATE_BUFFER_MAGIC    UPDATE_BUFFER_MAGIC_V1
+#define UPDATE_BUF_COUNT       8
+enum object_update_op {
+       OBJ_CREATE              = 1,
+       OBJ_DESTROY             = 2,
+       OBJ_REF_ADD             = 3,
+       OBJ_REF_DEL             = 4,
+       OBJ_ATTR_SET            = 5,
+       OBJ_ATTR_GET            = 6,
+       OBJ_XATTR_SET           = 7,
+       OBJ_XATTR_GET           = 8,
+       OBJ_INDEX_LOOKUP        = 9,
+       OBJ_INDEX_INSERT        = 10,
+       OBJ_INDEX_DELETE        = 11,
+       OBJ_LAST
+};
+
+struct update {
+       __u32           u_type;
+       __u32           u_padding;
+       struct lu_fid   u_fid;
+       __u32           u_lens[UPDATE_BUF_COUNT];
+       __u32           u_bufs[0];
+};
+
+struct update_buf {
+       __u32   ub_magic;
+       __u32   ub_count;
+       __u32   ub_bufs[0];
+};
+
+#define UPDATE_REPLY_V1                0x00BD0001
+struct update_reply {
+       __u32   ur_version;
+       __u32   ur_count;
+       __u32   ur_lens[0];
+};
+
+void lustre_swab_update_buf(struct update_buf *ub);
+void lustre_swab_update_reply_buf(struct update_reply *ur);
+
 #endif
 /** @} lustreidl */
index f2d65cc..93bb4d1 100644 (file)
@@ -157,6 +157,8 @@ extern struct req_format RQF_MDS_GETSTATUS;
 extern struct req_format RQF_MDS_SYNC;
 extern struct req_format RQF_MDS_GETXATTR;
 extern struct req_format RQF_MDS_GETATTR;
+extern struct req_format RQF_UPDATE_OBJ;
+
 /*
  * This is format of direct (non-intent) MDS_GETATTR_NAME request.
  */
@@ -321,6 +323,9 @@ extern struct req_msg_field RMF_MGS_CONFIG_RES;
 /* generic uint32 */
 extern struct req_msg_field RMF_U32;
 
+/* OBJ update format */
+extern struct req_msg_field RMF_UPDATE;
+extern struct req_msg_field RMF_UPDATE_REPLY;
 /** @} req_layout */
 
 #endif /* _LUSTRE_REQ_LAYOUT_H__ */
diff --git a/lustre/include/lustre_update.h b/lustre/include/lustre_update.h
new file mode 100644 (file)
index 0000000..d85b8a1
--- /dev/null
@@ -0,0 +1,190 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.gnu.org/licenses/gpl-2.0.htm
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2013, Intel Corporation.
+ */
+/*
+ * lustre/include/lustre_update.h
+ *
+ * Author: Di Wang <di.wang@intel.com>
+ */
+
+#ifndef _LUSTRE_UPDATE_H
+#define _LUSTRE_UPDATE_H
+
+#define UPDATE_BUFFER_SIZE     4096
+struct update_request {
+       struct dt_device        *ur_dt;
+       cfs_list_t              ur_list;    /* attached itself to thandle */
+       int                     ur_flags;
+       int                     ur_rc;
+       struct update_buf       *ur_buf;   /* Holding the update req */
+};
+
+static inline unsigned long update_size(struct update *update)
+{
+       unsigned long size;
+       int        i;
+
+       size = cfs_size_round(offsetof(struct update, u_bufs[0]));
+       for (i = 0; i < UPDATE_BUF_COUNT; i++)
+               size += cfs_size_round(update->u_lens[i]);
+
+       return size;
+}
+
+static inline void *update_param_buf(struct update *update, int index,
+                                    int *size)
+{
+       int     i;
+       void    *ptr;
+
+       if (index >= UPDATE_BUF_COUNT)
+               return NULL;
+
+       ptr = (char *)update + cfs_size_round(offsetof(struct update,
+                                                      u_bufs[0]));
+       for (i = 0; i < index; i++) {
+               LASSERT(update->u_lens[i] > 0);
+               ptr += cfs_size_round(update->u_lens[i]);
+       }
+
+       if (size != NULL)
+               *size = update->u_lens[index];
+
+       return ptr;
+}
+
+static inline unsigned long update_buf_size(struct update_buf *buf)
+{
+       unsigned long size;
+       int        i = 0;
+
+       size = cfs_size_round(offsetof(struct update_buf, ub_bufs[0]));
+       for (i = 0; i < buf->ub_count; i++) {
+               struct update *update;
+
+               update = (struct update *)((char *)buf + size);
+               size += update_size(update);
+       }
+       LASSERT(size <= UPDATE_BUFFER_SIZE);
+       return size;
+}
+
+static inline void *update_buf_get(struct update_buf *buf, int index, int *size)
+{
+       int     count = buf->ub_count;
+       void    *ptr;
+       int     i = 0;
+
+       if (index >= count)
+               return NULL;
+
+       ptr = (char *)buf + cfs_size_round(offsetof(struct update_buf,
+                                                   ub_bufs[0]));
+       for (i = 0; i < index; i++)
+               ptr += update_size((struct update *)ptr);
+
+       if (size != NULL)
+               *size = update_size((struct update *)ptr);
+
+       return ptr;
+}
+
+static inline void update_init_reply_buf(struct update_reply *reply, int count)
+{
+       reply->ur_version = UPDATE_REPLY_V1;
+       reply->ur_count = count;
+}
+
+static inline void *update_get_buf_internal(struct update_reply *reply,
+                                           int index, int *size)
+{
+       char *ptr;
+       int count = reply->ur_count;
+       int i;
+
+       if (index >= count)
+               return NULL;
+
+       ptr = (char *)reply + cfs_size_round(offsetof(struct update_reply,
+                                            ur_lens[count]));
+       for (i = 0; i < index; i++) {
+               LASSERT(reply->ur_lens[i] > 0);
+               ptr += cfs_size_round(reply->ur_lens[i]);
+       }
+
+       if (size != NULL)
+               *size = reply->ur_lens[index];
+
+       return ptr;
+}
+
+static inline void update_insert_reply(struct update_reply *reply, void *data,
+                                      int data_len, int index, int rc)
+{
+       char *ptr;
+
+       ptr = update_get_buf_internal(reply, index, NULL);
+       LASSERT(ptr != NULL);
+
+       *(int *)ptr = cpu_to_le32(rc);
+       ptr += sizeof(int);
+       if (data_len > 0) {
+               LASSERT(data != NULL);
+               memcpy(ptr, data, data_len);
+       }
+       reply->ur_lens[index] = data_len + sizeof(int);
+}
+
+static inline int update_get_reply_buf(struct update_reply *reply, void **buf,
+                                      int index)
+{
+       char *ptr;
+       int  size = 0;
+       int  result;
+
+       ptr = update_get_buf_internal(reply, index, &size);
+       result = *(int *)ptr;
+
+       if (result < 0)
+               return result;
+
+       LASSERT((ptr != NULL && size >= sizeof(int)));
+       *buf = ptr + sizeof(int);
+       return size - sizeof(int);
+}
+
+static inline int update_get_reply_result(struct update_reply *reply,
+                                         void **buf, int index)
+{
+       void *ptr;
+       int  size;
+
+       ptr = update_get_buf_internal(reply, index, &size);
+       LASSERT(ptr != NULL && size > sizeof(int));
+       return *(int *)ptr;
+}
+
+#endif
+
+
index dbc8038..e754106 100644 (file)
@@ -338,6 +338,9 @@ void md_from_obdo(struct md_op_data *op_data, struct obdo *oa, obd_flag valid);
 void obdo_from_md(struct obdo *oa, struct md_op_data *op_data,
                   unsigned int valid);
 
+void obdo_cpu_to_le(struct obdo *dobdo, struct obdo *sobdo);
+void obdo_le_to_cpu(struct obdo *dobdo, struct obdo *sobdo);
+
 #define OBT(dev)        (dev)->obd_type
 #define OBP(dev, op)    (dev)->obd_type->typ_dt_ops->o_ ## op
 #define MDP(dev, op)    (dev)->obd_type->typ_md_ops->m_ ## op
index 76838f0..3a91473 100644 (file)
@@ -255,6 +255,7 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type,
 #define OBD_FAIL_MDS_RECOVERY_ACCEPTS_GAPS 0x185
 #define OBD_FAIL_MDS_GET_INFO_NET        0x186
 #define OBD_FAIL_MDS_DQACQ_NET           0x187
+#define OBD_FAIL_MDS_OBJ_UPDATE_NET      0x188
 
 /* OI scrub */
 #define OBD_FAIL_OSD_SCRUB_DELAY        0x190
index 5ea6737..4a672dc 100644 (file)
@@ -46,6 +46,7 @@
 #include <obd_class.h>
 #include <lustre_fid.h>
 #include <lustre_param.h>
+#include <lustre_update.h>
 
 #include "lod_internal.h"
 
@@ -507,19 +508,77 @@ static int lod_statfs(const struct lu_env *env,
 static struct thandle *lod_trans_create(const struct lu_env *env,
                                        struct dt_device *dev)
 {
-       return dt_trans_create(env, dt2lod_dev(dev)->lod_child);
+       struct thandle *th;
+
+       th = dt_trans_create(env, dt2lod_dev(dev)->lod_child);
+       if (IS_ERR(th))
+               return th;
+
+       CFS_INIT_LIST_HEAD(&th->th_remote_update_list);
+       return th;
+}
+
+static int lod_remote_sync(const struct lu_env *env, struct dt_device *dev,
+                          struct thandle *th)
+{
+       struct update_request *update;
+       int    rc = 0;
+       ENTRY;
+
+       if (cfs_list_empty(&th->th_remote_update_list))
+               RETURN(0);
+
+       cfs_list_for_each_entry(update, &th->th_remote_update_list,
+                               ur_list) {
+               /* In DNE phase I, there should be only one OSP
+                * here, so we will do send/receive one by one,
+                * instead of sending them parallel, will fix this
+                * in Phase II */
+               th->th_current_request = update;
+               rc = dt_trans_start(env, update->ur_dt, th);
+               if (rc != 0) {
+                       /* FIXME how to revert the partial results
+                        * once error happened? Resolved by 2 Phase commit */
+                       update->ur_rc = rc;
+                       break;
+               }
+       }
+
+       RETURN(rc);
 }
 
 static int lod_trans_start(const struct lu_env *env, struct dt_device *dev,
                           struct thandle *th)
 {
-       return dt_trans_start(env, dt2lod_dev(dev)->lod_child, th);
+       struct lod_device *lod = dt2lod_dev((struct dt_device *) dev);
+       int rc;
+
+       rc = lod_remote_sync(env, dev, th);
+       if (rc)
+               return rc;
+
+       return dt_trans_start(env, lod->lod_child, th);
 }
 
 static int lod_trans_stop(const struct lu_env *env, struct thandle *th)
 {
-       /* XXX: we don't know next device, will be fixed with DNE */
-       return dt_trans_stop(env, th->th_dev, th);
+       struct update_request *update;
+       struct update_request *tmp;
+       int rc = 0;
+       int rc2 = 0;
+
+       cfs_list_for_each_entry_safe(update, tmp,
+                                    &th->th_remote_update_list,
+                                    ur_list) {
+               th->th_current_request = update;
+               rc2 = dt_trans_stop(env, update->ur_dt, th);
+               if (unlikely(rc2 != 0 && rc == 0))
+                       rc = rc2;
+       }
+
+       rc2 = dt_trans_stop(env, th->th_dev, th);
+
+       return rc2 != 0 ? rc2 : rc;
 }
 
 static void lod_conf_get(const struct lu_env *env,
index 719887f..a83479d 100644 (file)
@@ -240,12 +240,6 @@ static inline struct dt_object* lod_object_child(struct lod_object *o)
                        struct dt_object, do_lu);
 }
 
-static inline struct dt_object *lu2dt_obj(struct lu_object *o)
-{
-       LASSERT(ergo(o != NULL, lu_device_is_dt(o->lo_dev)));
-       return container_of0(o, struct dt_object, do_lu);
-}
-
 static inline struct dt_object *dt_object_child(struct dt_object *o)
 {
        return container_of0(lu_object_next(&(o)->do_lu),
index 7344c49..f15e37d 100644 (file)
@@ -1397,7 +1397,16 @@ int mdd_declare_object_initialize(const struct lu_env *env,
 {
         int rc;
 
+       /*
+        * inode mode has been set in creation time, and it's based on umask,
+        * la_mode and acl, don't set here again! (which will go wrong
+        * because below function doesn't consider umask).
+        * I'd suggest set all object attributes in creation time, see above.
+        */
+       LASSERT(attr->la_valid & (LA_MODE | LA_TYPE));
+       attr->la_valid &= ~(LA_MODE | LA_TYPE);
        rc = mdo_declare_attr_set(env, child, attr, handle);
+       attr->la_valid |= LA_MODE | LA_TYPE;
        if (rc == 0 && S_ISDIR(attr->la_mode)) {
                rc = mdo_declare_index_insert(env, child, mdo2fid(child),
                                              dot, handle);
@@ -1433,11 +1442,11 @@ int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
         * because below function doesn't consider umask).
         * I'd suggest set all object attributes in creation time, see above.
         */
-       LASSERT(attr->la_valid & LA_MODE);
-       attr->la_valid &= ~LA_MODE;
+       LASSERT(attr->la_valid & (LA_MODE | LA_TYPE));
+       attr->la_valid &= ~(LA_MODE | LA_TYPE);
        rc = mdd_attr_set_internal(env, child, attr, handle, 0);
        /* arguments are supposed to stay the same */
-       attr->la_valid |= LA_MODE;
+       attr->la_valid |= LA_MODE | LA_TYPE;
        if (rc != 0)
                RETURN(rc);
 
index 05fc80e..85fa8f5 100644 (file)
@@ -954,8 +954,8 @@ static int mdt_create_unpack(struct mdt_thread_info *info)
         attr->la_ctime = rec->cr_time;
         attr->la_mtime = rec->cr_time;
         attr->la_atime = rec->cr_time;
-        attr->la_valid = LA_MODE | LA_RDEV | LA_UID | LA_GID |
-                         LA_CTIME | LA_MTIME | LA_ATIME;
+       attr->la_valid = LA_MODE | LA_RDEV | LA_UID | LA_GID | LA_TYPE |
+                        LA_CTIME | LA_MTIME | LA_ATIME;
         memset(&sp->u, 0, sizeof(sp->u));
         sp->sp_cr_flags = get_mrc_cr_flags(rec);
 
index d958752..dded526 100644 (file)
@@ -331,3 +331,38 @@ void obdo_from_md(struct obdo *oa, struct md_op_data *op_data,
         }
 }
 EXPORT_SYMBOL(obdo_from_md);
+
+void obdo_cpu_to_le(struct obdo *dobdo, struct obdo *sobdo)
+{
+       dobdo->o_size = cpu_to_le64(sobdo->o_size);
+       dobdo->o_mtime = cpu_to_le64(sobdo->o_mtime);
+       dobdo->o_atime = cpu_to_le64(sobdo->o_atime);
+       dobdo->o_ctime = cpu_to_le64(sobdo->o_ctime);
+       dobdo->o_blocks = cpu_to_le64(sobdo->o_blocks);
+       dobdo->o_mode = cpu_to_le32(sobdo->o_mode);
+       dobdo->o_uid = cpu_to_le32(sobdo->o_uid);
+       dobdo->o_gid = cpu_to_le32(sobdo->o_gid);
+       dobdo->o_flags = cpu_to_le32(sobdo->o_flags);
+       dobdo->o_nlink = cpu_to_le32(sobdo->o_nlink);
+       dobdo->o_blksize = cpu_to_le32(sobdo->o_blksize);
+       dobdo->o_valid = cpu_to_le64(sobdo->o_valid);
+}
+EXPORT_SYMBOL(obdo_cpu_to_le);
+
+void obdo_le_to_cpu(struct obdo *dobdo, struct obdo *sobdo)
+{
+       dobdo->o_size = le64_to_cpu(sobdo->o_size);
+       dobdo->o_mtime = le64_to_cpu(sobdo->o_mtime);
+       dobdo->o_atime = le64_to_cpu(sobdo->o_atime);
+       dobdo->o_ctime = le64_to_cpu(sobdo->o_ctime);
+       dobdo->o_blocks = le64_to_cpu(sobdo->o_blocks);
+       dobdo->o_mode = le32_to_cpu(sobdo->o_mode);
+       dobdo->o_uid = le32_to_cpu(sobdo->o_uid);
+       dobdo->o_gid = le32_to_cpu(sobdo->o_gid);
+       dobdo->o_flags = le32_to_cpu(sobdo->o_flags);
+       dobdo->o_nlink = le32_to_cpu(sobdo->o_nlink);
+       dobdo->o_blksize = le32_to_cpu(sobdo->o_blksize);
+       dobdo->o_valid = le64_to_cpu(sobdo->o_valid);
+}
+EXPORT_SYMBOL(obdo_le_to_cpu);
+
index 4176d21..4c27a13 100644 (file)
@@ -1,6 +1,6 @@
 MODULES = osp
 osp-objs = osp_dev.o osp_object.o osp_precreate.o osp_sync.o lproc_osp.o
-osp-objs += lwp_dev.o
+osp-objs += lwp_dev.o  osp_md_object.o
 
 EXTRA_DIST = $(osp-objs:.o=.c) osp_internal.h
 
index f0206dc..aaf5073 100644 (file)
@@ -488,6 +488,8 @@ static int osp_sync(const struct lu_env *env, struct dt_device *dev)
 const struct dt_device_operations osp_dt_ops = {
        .dt_statfs      = osp_statfs,
        .dt_sync        = osp_sync,
+       .dt_trans_start = osp_trans_start,
+       .dt_trans_stop  = osp_trans_stop,
 };
 
 static int osp_connect_to_osd(const struct lu_env *env, struct osp_device *m,
index 81e3d60..75821a5 100644 (file)
@@ -180,14 +180,21 @@ extern cfs_mem_cache_t *osp_object_kmem;
 
 /* this is a top object */
 struct osp_object {
-       struct lu_object_header  opo_header;
-       struct dt_object         opo_obj;
-       int                      opo_reserved:1,
-                                opo_new:1;
+       struct lu_object_header opo_header;
+       struct dt_object        opo_obj;
+       int                     opo_reserved:1,
+                               opo_new:1,
+                               opo_empty:1;
+
+       /* read/write lock for md osp object */
+       struct rw_semaphore     opo_sem;
+       const struct lu_env     *opo_owner;
 };
 
 extern struct lu_object_operations osp_lu_obj_ops;
 extern const struct dt_device_operations osp_dt_ops;
+extern struct dt_object_operations osp_md_obj_ops;
+extern struct dt_lock_operations osp_md_lock_ops;
 
 struct osp_thread_info {
        struct lu_buf            osi_lb;
@@ -207,6 +214,8 @@ struct osp_thread_info {
        struct llog_cookie       osi_cookie;
        struct llog_catid        osi_cid;
        struct lu_seq_range      osi_seq;
+       struct ldlm_res_id       osi_resid;
+       struct obdo              osi_obdo;
 };
 
 static inline void osp_objid_buf_prep(struct lu_buf *buf, loff_t *off,
@@ -378,6 +387,10 @@ static inline int osp_is_fid_client(struct osp_device *osp)
 void osp_update_last_id(struct osp_device *d, obd_id objid);
 extern struct llog_operations osp_mds_ost_orig_logops;
 
+/* osp_md_object.c */
+int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
+                   struct thandle *th);
+int osp_trans_stop(const struct lu_env *env, struct thandle *th);
 /* osp_precreate.c */
 int osp_init_precreate(struct osp_device *d);
 int osp_precreate_reserve(const struct lu_env *env, struct osp_device *d);
diff --git a/lustre/osp/osp_md_object.c b/lustre/osp/osp_md_object.c
new file mode 100644 (file)
index 0000000..9df10ed
--- /dev/null
@@ -0,0 +1,1071 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.gnu.org/licenses/gpl-2.0.html
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2013, Intel Corporation.
+ */
+/*
+ * lustre/osp/osp_md_object.c
+ *
+ * Lustre MDT Proxy Device
+ *
+ * Author: Di Wang <di.wang@intel.com>
+ */
+
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+#define DEBUG_SUBSYSTEM S_MDS
+
+#include <lustre_log.h>
+#include <lustre_update.h>
+#include "osp_internal.h"
+static const char dot[] = ".";
+static const char dotdot[] = "..";
+
+static int osp_prep_update_req(const struct lu_env *env,
+                              struct osp_device *osp,
+                              struct update_buf *ubuf, int ubuf_len,
+                              struct ptlrpc_request **reqp)
+{
+       struct obd_import      *imp;
+       struct ptlrpc_request  *req;
+       struct update_buf      *tmp;
+       int                     rc;
+       ENTRY;
+
+       imp = osp->opd_obd->u.cli.cl_import;
+       LASSERT(imp);
+
+       req = ptlrpc_request_alloc(imp, &RQF_UPDATE_OBJ);
+       if (req == NULL)
+               RETURN(-ENOMEM);
+
+       req_capsule_set_size(&req->rq_pill, &RMF_UPDATE, RCL_CLIENT,
+                            UPDATE_BUFFER_SIZE);
+
+       rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, UPDATE_OBJ);
+       if (rc != 0) {
+               ptlrpc_req_finished(req);
+               RETURN(rc);
+       }
+
+       req_capsule_set_size(&req->rq_pill, &RMF_UPDATE_REPLY, RCL_SERVER,
+                            UPDATE_BUFFER_SIZE);
+
+       tmp = req_capsule_client_get(&req->rq_pill, &RMF_UPDATE);
+       memcpy(tmp, ubuf, ubuf_len);
+
+       ptlrpc_request_set_replen(req);
+
+       *reqp = req;
+
+       RETURN(rc);
+}
+
+static int osp_remote_sync(const struct lu_env *env, struct dt_device *dt,
+                          struct update_request *update,
+                          struct ptlrpc_request **reqp)
+{
+       struct osp_device       *osp = dt2osp_dev(dt);
+       struct ptlrpc_request   *req = NULL;
+       int                     rc;
+       ENTRY;
+
+       rc = osp_prep_update_req(env, osp, update->ur_buf,
+                                UPDATE_BUFFER_SIZE, &req);
+       if (rc)
+               RETURN(rc);
+
+       /* Note: some dt index api might return non-zero result here, like
+        * osd_index_ea_lookup, so we should only check rc < 0 here */
+       rc = ptlrpc_queue_wait(req);
+       if (rc < 0) {
+               ptlrpc_req_finished(req);
+               update->ur_rc = rc;
+               RETURN(rc);
+       }
+
+       if (reqp != NULL) {
+               *reqp = req;
+               RETURN(rc);
+       }
+
+       update->ur_rc = rc;
+
+       ptlrpc_req_finished(req);
+
+       RETURN(rc);
+}
+
+/**
+ * Create a new update request for the device.
+ */
+static struct update_request
+*osp_create_update_req(struct dt_device *dt)
+{
+       struct update_request *update;
+
+       OBD_ALLOC_PTR(update);
+       if (!update)
+               return ERR_PTR(-ENOMEM);
+
+       OBD_ALLOC(update->ur_buf, UPDATE_BUFFER_SIZE);
+       if (update->ur_buf == NULL) {
+               OBD_FREE_PTR(update);
+               return ERR_PTR(-ENOMEM);
+       }
+
+       CFS_INIT_LIST_HEAD(&update->ur_list);
+       update->ur_dt = dt;
+
+       update->ur_buf->ub_magic = UPDATE_BUFFER_MAGIC;
+       update->ur_buf->ub_count = 0;
+
+       return update;
+}
+
+static void osp_destroy_update_req(struct update_request *update)
+{
+       if (update == NULL)
+               return;
+
+       cfs_list_del(&update->ur_list);
+       if (update->ur_buf != NULL)
+               OBD_FREE(update->ur_buf, UPDATE_BUFFER_SIZE);
+
+       OBD_FREE_PTR(update);
+       return;
+}
+
+int osp_trans_stop(const struct lu_env *env, struct thandle *th)
+{
+       int rc = 0;
+
+       osp_destroy_update_req(th->th_current_request);
+       rc = th->th_current_request->ur_rc;
+       th->th_current_request = NULL;
+
+       return rc;
+}
+
+/**
+ * In DNE phase I, all remote updates will be packed into RPC (the format
+ * description is in lustre_idl.h) during declare phase, all of updates
+ * are attached to the transaction, one entry per OSP. Then in trans start,
+ * LOD will walk through these entries and send these UPDATEs to the remote
+ * MDT to be executed synchronously.
+ */
+int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
+                   struct thandle *th)
+{
+       struct update_request *update;
+       int rc = 0;
+
+       /* In phase I, if the transaction includes remote updates, the local
+        * update should be synchronized, so it will set th_sync = 1 */
+       update = th->th_current_request;
+       LASSERT(update != NULL && update->ur_dt == dt);
+       if (update->ur_buf->ub_count > 0) {
+               rc = osp_remote_sync(env, dt, update, NULL);
+               th->th_sync = 1;
+       }
+
+       RETURN(rc);
+}
+
+/**
+ * Insert the update into the th_bufs for the device.
+ */
+static int osp_insert_update(const struct lu_env *env,
+                            struct update_request *update, int op,
+                            struct lu_fid *fid, int count, int *lens,
+                            char **bufs)
+{
+       struct update_buf    *ubuf = update->ur_buf;
+       struct update        *obj_update;
+       char                 *ptr;
+       int                   i;
+       int                   update_length;
+       int                   rc = 0;
+       ENTRY;
+
+       obj_update = (struct update *)((char *)ubuf +
+                     cfs_size_round(update_buf_size(ubuf)));
+
+       /* Check update size to make sure it can fit into the buffer */
+       update_length = cfs_size_round(offsetof(struct update,
+                                      u_bufs[0]));
+       for (i = 0; i < count; i++)
+               update_length += cfs_size_round(lens[i]);
+
+       if (cfs_size_round(update_buf_size(ubuf)) + update_length >
+           UPDATE_BUFFER_SIZE || ubuf->ub_count >= UPDATE_MAX_OPS) {
+               CERROR("%s: insert up %p, idx %d cnt %d len %lu: rc = %d\n",
+                       update->ur_dt->dd_lu_dev.ld_obd->obd_name, ubuf,
+                       update_length, ubuf->ub_count, update_buf_size(ubuf),
+                       -E2BIG);
+               RETURN(-E2BIG);
+       }
+
+       if (count > UPDATE_BUF_COUNT) {
+               CERROR("%s: Insert too much params %d "DFID" op %d: rc = %d\n",
+                       update->ur_dt->dd_lu_dev.ld_obd->obd_name, count,
+                       PFID(fid), op, -E2BIG);
+               RETURN(-E2BIG);
+       }
+
+       /* fill the update into the update buffer */
+       fid_cpu_to_le(&obj_update->u_fid, fid);
+       obj_update->u_type = cpu_to_le32(op);
+       for (i = 0; i < count; i++)
+               obj_update->u_lens[i] = cpu_to_le32(lens[i]);
+
+       ptr = (char *)obj_update +
+                       cfs_size_round(offsetof(struct update, u_bufs[0]));
+       for (i = 0; i < count; i++)
+               LOGL(bufs[i], lens[i], ptr);
+
+       ubuf->ub_count++;
+
+       CDEBUG(D_INFO, "%s: %p "DFID" idx %d: op %d params %d:%lu\n",
+              update->ur_dt->dd_lu_dev.ld_obd->obd_name, ubuf, PFID(fid),
+              ubuf->ub_count, op, count, update_buf_size(ubuf));
+
+       RETURN(rc);
+}
+
+static struct update_request
+*osp_find_update(struct thandle *th, struct dt_device *dt_dev)
+{
+       struct update_request   *update;
+
+       /* Because transaction api does not proivde the interface
+        * to transfer the update from LOD to OSP,  we need walk
+        * remote update list to find the update, this probably
+        * should move to LOD layer, when update can be part of
+        * the trancation api parameter. XXX */
+       cfs_list_for_each_entry(update, &th->th_remote_update_list, ur_list) {
+               if (update->ur_dt == dt_dev)
+                       return update;
+       }
+       return NULL;
+}
+
+/**
+ * Find one loc in th_dev/dev_obj_update for the update,
+ * Because only one thread can access this thandle, no need
+ * lock now.
+ */
+static struct update_request
+*osp_find_create_update_loc(struct thandle *th, struct dt_object *dt)
+{
+       struct dt_device        *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
+       struct update_request   *update;
+       ENTRY;
+
+       update = osp_find_update(th, dt_dev);
+       if (update != NULL)
+               RETURN(update);
+
+       update = osp_create_update_req(dt_dev);
+       if (IS_ERR(update))
+               RETURN(update);
+
+       cfs_list_add_tail(&update->ur_list, &th->th_remote_update_list);
+
+       RETURN(update);
+}
+
+static int osp_get_attr_from_req(const struct lu_env *env,
+                                struct ptlrpc_request *req,
+                                struct lu_attr *attr, int index)
+{
+       struct update_reply     *reply;
+       struct obdo             *lobdo = &osp_env_info(env)->osi_obdo;
+       struct obdo             *wobdo;
+       int                     size;
+
+       LASSERT(attr != NULL);
+
+       reply = req_capsule_server_sized_get(&req->rq_pill, &RMF_UPDATE_REPLY,
+                                            UPDATE_BUFFER_SIZE);
+       if (reply->ur_version != UPDATE_REPLY_V1)
+               return -EPROTO;
+
+       size = update_get_reply_buf(reply, (void **)&wobdo, index);
+       if (size != sizeof(struct obdo))
+               return -EPROTO;
+
+       obdo_le_to_cpu(wobdo, wobdo);
+       lustre_get_wire_obdo(lobdo, wobdo);
+       la_from_obdo(attr, lobdo, lobdo->o_valid);
+
+       return 0;
+}
+
+static int osp_md_declare_object_create(const struct lu_env *env,
+                                       struct dt_object *dt,
+                                       struct lu_attr *attr,
+                                       struct dt_allocation_hint *hint,
+                                       struct dt_object_format *dof,
+                                       struct thandle *th)
+{
+       struct osp_thread_info  *osi = osp_env_info(env);
+       struct update_request   *update;
+       struct lu_fid           *fid1;
+       int                     sizes[2] = {sizeof(struct obdo), 0};
+       char                    *bufs[2] = {NULL, NULL};
+       int                     buf_count;
+       int                     rc;
+
+       update = osp_find_create_update_loc(th, dt);
+       if (IS_ERR(update)) {
+               CERROR("%s: Get OSP update buf failed: rc = %d\n",
+                      dt->do_lu.lo_dev->ld_obd->obd_name,
+                      (int)PTR_ERR(update));
+               return PTR_ERR(update);
+       }
+
+       osi->osi_obdo.o_valid = 0;
+       LASSERT(S_ISDIR(attr->la_mode));
+       obdo_from_la(&osi->osi_obdo, attr, attr->la_valid);
+       lustre_set_wire_obdo(&osi->osi_obdo, &osi->osi_obdo);
+       obdo_cpu_to_le(&osi->osi_obdo, &osi->osi_obdo);
+
+       bufs[0] = (char *)&osi->osi_obdo;
+       buf_count = 1;
+       fid1 = (struct lu_fid *)lu_object_fid(&dt->do_lu);
+       if (hint->dah_parent) {
+               struct lu_fid *fid2;
+               struct lu_fid *tmp_fid = &osi->osi_fid;
+
+               fid2 = (struct lu_fid *)lu_object_fid(&hint->dah_parent->do_lu);
+               fid_cpu_to_le(tmp_fid, fid2);
+               sizes[1] = sizeof(*tmp_fid);
+               bufs[1] = (char *)tmp_fid;
+               buf_count++;
+       }
+
+       rc = osp_insert_update(env, update, OBJ_CREATE, fid1, buf_count, sizes,
+                              bufs);
+       if (rc) {
+               CERROR("%s: Insert update error: rc = %d\n",
+                      dt->do_lu.lo_dev->ld_obd->obd_name, rc);
+       }
+
+       return rc;
+}
+
+static int osp_md_object_create(const struct lu_env *env, struct dt_object *dt,
+                               struct lu_attr *attr,
+                               struct dt_allocation_hint *hint,
+                               struct dt_object_format *dof,
+                               struct thandle *th)
+{
+       struct osp_object  *obj = dt2osp_obj(dt);
+
+       CDEBUG(D_INFO, "create object "DFID"\n",
+              PFID(&dt->do_lu.lo_header->loh_fid));
+
+       /* Because the create update RPC will be sent during declare phase,
+        * if creation reaches here, it means the object has been created
+        * successfully */
+       dt->do_lu.lo_header->loh_attr |= LOHA_EXISTS | (attr->la_mode & S_IFMT);
+       obj->opo_empty = 1;
+
+       return 0;
+}
+
+static int osp_md_declare_object_ref_del(const struct lu_env *env,
+                                        struct dt_object *dt,
+                                        struct thandle *th)
+{
+       struct update_request   *update;
+       struct lu_fid           *fid;
+       int                     rc;
+
+       update = osp_find_create_update_loc(th, dt);
+       if (IS_ERR(update)) {
+               CERROR("%s: Get OSP update buf failed: rc = %d\n",
+                      dt->do_lu.lo_dev->ld_obd->obd_name,
+                     (int)PTR_ERR(update));
+               return PTR_ERR(update);
+       }
+
+       fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
+
+       rc = osp_insert_update(env, update, OBJ_REF_DEL, fid, 0, NULL, NULL);
+
+       return rc;
+}
+
+static int osp_md_object_ref_del(const struct lu_env *env,
+                                struct dt_object *dt,
+                                struct thandle *th)
+{
+       CDEBUG(D_INFO, "ref del object "DFID"\n",
+              PFID(&dt->do_lu.lo_header->loh_fid));
+
+       return 0;
+}
+
+static int osp_md_declare_ref_add(const struct lu_env *env,
+                                 struct dt_object *dt, struct thandle *th)
+{
+       struct update_request   *update;
+       struct lu_fid           *fid;
+       int                     rc;
+
+       update = osp_find_create_update_loc(th, dt);
+       if (IS_ERR(update)) {
+               CERROR("%s: Get OSP update buf failed: rc = %d\n",
+                      dt->do_lu.lo_dev->ld_obd->obd_name,
+                      (int)PTR_ERR(update));
+               return PTR_ERR(update);
+       }
+
+       fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
+
+       rc = osp_insert_update(env, update, OBJ_REF_ADD, fid, 0, NULL, NULL);
+
+       return rc;
+}
+
+static int osp_md_object_ref_add(const struct lu_env *env,
+                                struct dt_object *dt,
+                                struct thandle *th)
+{
+       CDEBUG(D_INFO, "ref add object "DFID"\n",
+              PFID(&dt->do_lu.lo_header->loh_fid));
+
+       return 0;
+}
+
+static void osp_md_ah_init(const struct lu_env *env,
+                          struct dt_allocation_hint *ah,
+                          struct dt_object *parent,
+                          struct dt_object *child,
+                          cfs_umode_t child_mode)
+{
+       LASSERT(ah);
+
+       memset(ah, 0, sizeof(*ah));
+       ah->dah_parent = parent;
+       ah->dah_mode = child_mode;
+}
+
+static int osp_md_declare_attr_set(const struct lu_env *env,
+                                  struct dt_object *dt,
+                                  const struct lu_attr *attr,
+                                  struct thandle *th)
+{
+       struct osp_thread_info *osi = osp_env_info(env);
+       struct update_request  *update;
+       struct lu_fid          *fid;
+       int                     size = sizeof(struct obdo);
+       char                   *buf;
+       int                     rc;
+
+       update = osp_find_create_update_loc(th, dt);
+       if (IS_ERR(update)) {
+               CERROR("%s: Get OSP update buf failed: %d\n",
+                      dt->do_lu.lo_dev->ld_obd->obd_name,
+                      (int)PTR_ERR(update));
+               return PTR_ERR(update);
+       }
+
+       osi->osi_obdo.o_valid = 0;
+       LASSERT(!(attr->la_valid & (LA_MODE | LA_TYPE)));
+       obdo_from_la(&osi->osi_obdo, (struct lu_attr *)attr,
+                    attr->la_valid);
+       lustre_set_wire_obdo(&osi->osi_obdo, &osi->osi_obdo);
+       obdo_cpu_to_le(&osi->osi_obdo, &osi->osi_obdo);
+
+       buf = (char *)&osi->osi_obdo;
+       fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
+
+       rc = osp_insert_update(env, update, OBJ_ATTR_SET, fid, 1, &size, &buf);
+
+       return rc;
+}
+
+static int osp_md_attr_set(const struct lu_env *env, struct dt_object *dt,
+                          const struct lu_attr *attr, struct thandle *th,
+                          struct lustre_capa *capa)
+{
+       CDEBUG(D_INFO, "attr set object "DFID"\n",
+              PFID(&dt->do_lu.lo_header->loh_fid));
+
+       RETURN(0);
+}
+
+static int osp_md_declare_xattr_set(const struct lu_env *env,
+                                   struct dt_object *dt,
+                                   const struct lu_buf *buf,
+                                   const char *name, int flag,
+                                   struct thandle *th)
+{
+       struct update_request   *update;
+       struct lu_fid           *fid;
+       int                     sizes[3] = {strlen(name), buf->lb_len,
+                                           sizeof(int)};
+       char                    *bufs[3] = {(char *)name, (char *)buf->lb_buf };
+       int                     rc;
+
+       update = osp_find_create_update_loc(th, dt);
+       if (IS_ERR(update)) {
+               CERROR("%s: Get OSP update buf failed: rc = %d\n",
+                      dt->do_lu.lo_dev->ld_obd->obd_name,
+                      (int)PTR_ERR(update));
+               return PTR_ERR(update);
+       }
+
+       flag = cpu_to_le32(flag);
+       bufs[2] = (char *)&flag;
+
+       fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
+       rc = osp_insert_update(env, update, OBJ_XATTR_SET, fid,
+                              ARRAY_SIZE(sizes), sizes, bufs);
+
+       return rc;
+}
+
+static int osp_md_xattr_set(const struct lu_env *env, struct dt_object *dt,
+                           const struct lu_buf *buf, const char *name, int fl,
+                           struct thandle *th, struct lustre_capa *capa)
+{
+       CDEBUG(D_INFO, "xattr %s set object "DFID"\n", name,
+              PFID(&dt->do_lu.lo_header->loh_fid));
+
+       return 0;
+}
+
+static int osp_md_xattr_get(const struct lu_env *env, struct dt_object *dt,
+                           struct lu_buf *buf, const char *name,
+                           struct lustre_capa *capa)
+{
+       struct dt_device        *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
+       struct update_request   *update = NULL;
+       struct ptlrpc_request   *req = NULL;
+       int                     rc;
+       int                     buf_len;
+       int                     size;
+       struct update_reply     *reply;
+       void                    *ea_buf;
+       ENTRY;
+
+       /* Because it needs send the update buffer right away,
+        * just create an update buffer, instead of attaching the
+        * update_remote list of the thandle.
+        */
+       update = osp_create_update_req(dt_dev);
+       if (IS_ERR(update))
+               RETURN(PTR_ERR(update));
+
+       LASSERT(name != NULL);
+       buf_len = strlen(name);
+       rc = osp_insert_update(env, update, OBJ_XATTR_GET,
+                              (struct lu_fid *)lu_object_fid(&dt->do_lu),
+                              1, &buf_len, (char **)&name);
+       if (rc != 0) {
+               CERROR("%s: Insert update error: rc = %d\n",
+                      dt->do_lu.lo_dev->ld_obd->obd_name, rc);
+               GOTO(out, rc);
+       }
+       dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
+
+       rc = osp_remote_sync(env, dt_dev, update, &req);
+       if (rc != 0)
+               GOTO(out, rc);
+
+       reply = req_capsule_server_sized_get(&req->rq_pill, &RMF_UPDATE_REPLY,
+                                           UPDATE_BUFFER_SIZE);
+       if (reply->ur_version != UPDATE_REPLY_V1) {
+               CERROR("%s: Wrong version %x expected %x: rc = %d\n",
+                      dt_dev->dd_lu_dev.ld_obd->obd_name,
+                      reply->ur_version, UPDATE_REPLY_V1, -EPROTO);
+               GOTO(out, rc = -EPROTO);
+       }
+
+       size = update_get_reply_buf(reply, &ea_buf, 0);
+       if (size < 0)
+               GOTO(out, rc = size);
+
+       LASSERT(size > 0 && size < CFS_PAGE_SIZE);
+       LASSERT(ea_buf != NULL);
+
+       buf->lb_len = size;
+       memcpy(buf->lb_buf, ea_buf, size);
+out:
+       if (req != NULL)
+               ptlrpc_req_finished(req);
+
+       if (update != NULL)
+               osp_destroy_update_req(update);
+
+       RETURN(rc);
+}
+
+static void osp_md_object_read_lock(const struct lu_env *env,
+                                   struct dt_object *dt, unsigned role)
+{
+       struct osp_object  *obj = dt2osp_obj(dt);
+
+       LASSERT(obj->opo_owner != env);
+       down_read_nested(&obj->opo_sem, role);
+
+       LASSERT(obj->opo_owner == NULL);
+}
+
+static void osp_md_object_write_lock(const struct lu_env *env,
+                                    struct dt_object *dt, unsigned role)
+{
+       struct osp_object *obj = dt2osp_obj(dt);
+
+       down_write_nested(&obj->opo_sem, role);
+
+       LASSERT(obj->opo_owner == NULL);
+       obj->opo_owner = env;
+}
+
+static void osp_md_object_read_unlock(const struct lu_env *env,
+                                     struct dt_object *dt)
+{
+       struct osp_object *obj = dt2osp_obj(dt);
+
+       up_read(&obj->opo_sem);
+}
+
+static void osp_md_object_write_unlock(const struct lu_env *env,
+                                      struct dt_object *dt)
+{
+       struct osp_object *obj = dt2osp_obj(dt);
+
+       LASSERT(obj->opo_owner == env);
+       obj->opo_owner = NULL;
+       up_write(&obj->opo_sem);
+}
+
+static int osp_md_object_write_locked(const struct lu_env *env,
+                                     struct dt_object *dt)
+{
+       struct osp_object *obj = dt2osp_obj(dt);
+
+       return obj->opo_owner == env;
+}
+
+static int osp_md_index_lookup(const struct lu_env *env, struct dt_object *dt,
+                              struct dt_rec *rec, const struct dt_key *key,
+                              struct lustre_capa *capa)
+{
+       struct dt_device        *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
+       struct update_request   *update;
+       struct ptlrpc_request   *req = NULL;
+       int                     size = strlen((char *)key) + 1;
+       char                    *name = (char *)key;
+       int                     rc;
+       struct update_reply     *reply;
+       struct lu_fid           *fid;
+
+       ENTRY;
+
+       /* Because it needs send the update buffer right away,
+        * just create an update buffer, instead of attaching the
+        * update_remote list of the thandle.
+        */
+       update = osp_create_update_req(dt_dev);
+       if (IS_ERR(update))
+               RETURN(PTR_ERR(update));
+
+       rc = osp_insert_update(env, update, OBJ_INDEX_LOOKUP,
+                              (struct lu_fid *)lu_object_fid(&dt->do_lu),
+                              1, &size, (char **)&name);
+       if (rc) {
+               CERROR("%s: Insert update error: rc = %d\n",
+                      dt_dev->dd_lu_dev.ld_obd->obd_name, rc);
+               GOTO(out, rc);
+       }
+
+       rc = osp_remote_sync(env, dt_dev, update, &req);
+       if (rc < 0) {
+               CERROR("%s: lookup "DFID" %s failed: rc = %d\n",
+                      dt_dev->dd_lu_dev.ld_obd->obd_name,
+                      PFID(lu_object_fid(&dt->do_lu)), (char *)key, rc);
+               GOTO(out, rc);
+       }
+
+       reply = req_capsule_server_sized_get(&req->rq_pill, &RMF_UPDATE_REPLY,
+                                            UPDATE_BUFFER_SIZE);
+       if (reply->ur_version != UPDATE_REPLY_V1) {
+               CERROR("%s: Wrong version %x expected %x: rc = %d\n",
+                      dt_dev->dd_lu_dev.ld_obd->obd_name,
+                      reply->ur_version, UPDATE_REPLY_V1, -EPROTO);
+               GOTO(out, rc = -EPROTO);
+       }
+
+       rc = update_get_reply_result(reply, NULL, 0);
+       if (rc < 0) {
+               CERROR("%s: wrong version lookup "DFID" %s: rc = %d\n",
+                      dt_dev->dd_lu_dev.ld_obd->obd_name,
+                      PFID(lu_object_fid(&dt->do_lu)), (char *)key, rc);
+               GOTO(out, rc);
+       }
+
+       size = update_get_reply_buf(reply, (void **)&fid, 0);
+       if (size < 0)
+               GOTO(out, rc = size);
+
+       if (size != sizeof(struct lu_fid)) {
+               CERROR("%s: lookup "DFID" %s wrong size %d: rc = %d\n",
+                      dt_dev->dd_lu_dev.ld_obd->obd_name,
+                      PFID(lu_object_fid(&dt->do_lu)), (char *)key, size, rc);
+               GOTO(out, rc = -EINVAL);
+       }
+
+       fid_le_to_cpu(fid, fid);
+       if (!fid_is_sane(fid)) {
+               CERROR("%s: lookup "DFID" %s invalid fid "DFID": rc = %d\n",
+                      dt_dev->dd_lu_dev.ld_obd->obd_name,
+                      PFID(lu_object_fid(&dt->do_lu)), (char *)key, PFID(fid),
+                      rc);
+               GOTO(out, rc = -EINVAL);
+       }
+       memcpy(rec, fid, sizeof(*fid));
+out:
+       if (req != NULL)
+               ptlrpc_req_finished(req);
+
+       if (update != NULL)
+               osp_destroy_update_req(update);
+
+       RETURN(rc);
+}
+
+static int osp_md_declare_insert(const struct lu_env *env,
+                                struct dt_object *dt,
+                                const struct dt_rec *rec,
+                                const struct dt_key *key,
+                                struct thandle *th)
+{
+       struct update_request   *update;
+       struct lu_fid           *fid;
+       struct lu_fid           *rec_fid = (struct lu_fid *)rec;
+       int                     size[2] = {strlen((char *)key) + 1,
+                                                 sizeof(*rec_fid)};
+       char                    *bufs[2] = {(char *)key, (char *)rec_fid};
+       int                     rc;
+
+       update = osp_find_create_update_loc(th, dt);
+       if (IS_ERR(update)) {
+               CERROR("%s: Get OSP update buf failed: rc = %d\n",
+                      dt->do_lu.lo_dev->ld_obd->obd_name,
+                      (int)PTR_ERR(update));
+               return PTR_ERR(update);
+       }
+
+       fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
+
+       CDEBUG(D_INFO, "%s: insert index of "DFID" %s: "DFID"\n",
+              dt->do_lu.lo_dev->ld_obd->obd_name,
+              PFID(fid), (char *)key, PFID(rec_fid));
+
+       fid_cpu_to_le(rec_fid, rec_fid);
+
+       rc = osp_insert_update(env, update, OBJ_INDEX_INSERT, fid,
+                              ARRAY_SIZE(size), size, bufs);
+       return rc;
+}
+
+static int osp_md_index_insert(const struct lu_env *env,
+                              struct dt_object *dt,
+                              const struct dt_rec *rec,
+                              const struct dt_key *key,
+                              struct thandle *th,
+                              struct lustre_capa *capa,
+                              int ignore_quota)
+{
+       return 0;
+}
+
+static int osp_md_declare_delete(const struct lu_env *env,
+                                struct dt_object *dt,
+                                const struct dt_key *key,
+                                struct thandle *th)
+{
+       struct update_request *update;
+       struct lu_fid *fid;
+       int size = strlen((char *)key) + 1;
+       char *buf = (char *)key;
+       int rc;
+
+       update = osp_find_create_update_loc(th, dt);
+       if (IS_ERR(update)) {
+               CERROR("%s: Get OSP update buf failed: rc = %d\n",
+                      dt->do_lu.lo_dev->ld_obd->obd_name,
+                      (int)PTR_ERR(update));
+               return PTR_ERR(update);
+       }
+
+       fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
+
+       rc = osp_insert_update(env, update, OBJ_INDEX_DELETE, fid, 1, &size,
+                              &buf);
+
+       return rc;
+}
+
+static int osp_md_index_delete(const struct lu_env *env,
+                              struct dt_object *dt,
+                              const struct dt_key *key,
+                              struct thandle *th,
+                              struct lustre_capa *capa)
+{
+       CDEBUG(D_INFO, "index delete "DFID" %s\n",
+              PFID(&dt->do_lu.lo_header->loh_fid), (char *)key);
+
+       return 0;
+}
+
+/**
+ * Creates or initializes iterator context.
+ *
+ * Note: for OSP, these index iterate api is only used to check
+ * whether the directory is empty now (see mdd_dir_is_empty).
+ * Since dir_empty will be return by OBJ_ATTR_GET(see osp_md_attr_get/
+ * out_attr_get). So the implementation of these iterator is simplied
+ * to make mdd_dir_is_empty happy. The real iterator should be
+ * implemented, if we need it one day.
+ */
+static struct dt_it *osp_it_init(const struct lu_env *env,
+                                struct dt_object *dt,
+                                __u32 attr,
+                               struct lustre_capa *capa)
+{
+       lu_object_get(&dt->do_lu);
+       return (struct dt_it *)dt;
+}
+
+static void osp_it_fini(const struct lu_env *env, struct dt_it *di)
+{
+       struct dt_object *dt = (struct dt_object *)di;
+       lu_object_put(env, &dt->do_lu);
+}
+
+static int osp_it_get(const struct lu_env *env,
+                     struct dt_it *di, const struct dt_key *key)
+{
+       return 1;
+}
+
+static void osp_it_put(const struct lu_env *env, struct dt_it *di)
+{
+       return;
+}
+
+static int osp_it_next(const struct lu_env *env, struct dt_it *di)
+{
+       struct dt_object *dt = (struct dt_object *)di;
+       struct osp_object *osp_obj = dt2osp_obj(dt);
+       if (osp_obj->opo_empty)
+               return 1;
+       return 0;
+}
+
+static struct dt_key *osp_it_key(const struct lu_env *env,
+                                const struct dt_it *di)
+{
+       LBUG();
+       return NULL;
+}
+
+static int osp_it_key_size(const struct lu_env *env, const struct dt_it *di)
+{
+       LBUG();
+       return 0;
+}
+
+static int osp_it_rec(const struct lu_env *env, const struct dt_it *di,
+                     struct dt_rec *lde, __u32 attr)
+{
+       LBUG();
+       return 0;
+}
+
+static __u64 osp_it_store(const struct lu_env *env, const struct dt_it *di)
+{
+       LBUG();
+       return 0;
+}
+
+static int osp_it_load(const struct lu_env *env, const struct dt_it *di,
+                      __u64 hash)
+{
+       LBUG();
+       return 0;
+}
+
+static int osp_it_key_rec(const struct lu_env *env, const struct dt_it *di,
+                         void *key_rec)
+{
+       LBUG();
+       return 0;
+}
+
+static const struct dt_index_operations osp_md_index_ops = {
+       .dio_lookup         = osp_md_index_lookup,
+       .dio_declare_insert = osp_md_declare_insert,
+       .dio_insert         = osp_md_index_insert,
+       .dio_declare_delete = osp_md_declare_delete,
+       .dio_delete         = osp_md_index_delete,
+       .dio_it     = {
+               .init     = osp_it_init,
+               .fini     = osp_it_fini,
+               .get      = osp_it_get,
+               .put      = osp_it_put,
+               .next     = osp_it_next,
+               .key      = osp_it_key,
+               .key_size = osp_it_key_size,
+               .rec      = osp_it_rec,
+               .store    = osp_it_store,
+               .load     = osp_it_load,
+               .key_rec  = osp_it_key_rec,
+       }
+};
+
+static int osp_md_index_try(const struct lu_env *env,
+                           struct dt_object *dt,
+                           const struct dt_index_features *feat)
+{
+       dt->do_index_ops = &osp_md_index_ops;
+       return 0;
+}
+
+static int osp_md_attr_get(const struct lu_env *env,
+                          struct dt_object *dt, struct lu_attr *attr,
+                          struct lustre_capa *capa)
+{
+       struct osp_object     *obj = dt2osp_obj(dt);
+       struct dt_device      *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
+       struct update_request *update = NULL;
+       struct ptlrpc_request *req = NULL;
+       int rc;
+       ENTRY;
+
+       /* Because it needs send the update buffer right away,
+        * just create an update buffer, instead of attaching the
+        * update_remote list of the thandle.
+        */
+       update = osp_create_update_req(dt_dev);
+       if (IS_ERR(update))
+               RETURN(PTR_ERR(update));
+
+       rc = osp_insert_update(env, update, OBJ_ATTR_GET,
+                              (struct lu_fid *)lu_object_fid(&dt->do_lu),
+                              0, NULL, NULL);
+       if (rc) {
+               CERROR("%s: Insert update error: rc = %d\n",
+                      dt_dev->dd_lu_dev.ld_obd->obd_name, rc);
+               GOTO(out, rc);
+       }
+       dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
+
+       rc = osp_remote_sync(env, dt_dev, update, &req);
+       if (rc < 0)
+               GOTO(out, rc);
+
+       rc = osp_get_attr_from_req(env, req, attr, 0);
+       if (rc)
+               GOTO(out, rc);
+
+       if (attr->la_flags == 1)
+               obj->opo_empty = 0;
+       else
+               obj->opo_empty = 1;
+out:
+       if (req != NULL)
+               ptlrpc_req_finished(req);
+
+       if (update != NULL)
+               osp_destroy_update_req(update);
+
+       RETURN(rc);
+}
+
+static int osp_md_declare_object_destroy(const struct lu_env *env,
+                                        struct dt_object *dt,
+                                        struct thandle *th)
+{
+       struct osp_object  *o = dt2osp_obj(dt);
+       int                 rc = 0;
+       ENTRY;
+
+       /*
+        * track objects to be destroyed via llog
+        */
+       rc = osp_sync_declare_add(env, o, MDS_UNLINK64_REC, th);
+
+       RETURN(rc);
+}
+
+static int osp_md_object_destroy(const struct lu_env *env,
+                                struct dt_object *dt, struct thandle *th)
+{
+       struct osp_object  *o = dt2osp_obj(dt);
+       int                 rc = 0;
+       ENTRY;
+
+       /*
+        * once transaction is committed put proper command on
+        * the queue going to our OST
+        */
+       rc = osp_sync_add(env, o, MDS_UNLINK64_REC, th, NULL);
+
+       /* not needed in cache any more */
+       set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags);
+
+       RETURN(rc);
+}
+
+struct dt_object_operations osp_md_obj_ops = {
+       .do_read_lock         = osp_md_object_read_lock,
+       .do_write_lock        = osp_md_object_write_lock,
+       .do_read_unlock       = osp_md_object_read_unlock,
+       .do_write_unlock      = osp_md_object_write_unlock,
+       .do_write_locked      = osp_md_object_write_locked,
+       .do_declare_create    = osp_md_declare_object_create,
+       .do_create            = osp_md_object_create,
+       .do_declare_ref_add   = osp_md_declare_ref_add,
+       .do_ref_add           = osp_md_object_ref_add,
+       .do_declare_ref_del   = osp_md_declare_object_ref_del,
+       .do_ref_del           = osp_md_object_ref_del,
+       .do_declare_destroy   = osp_md_declare_object_destroy,
+       .do_destroy           = osp_md_object_destroy,
+       .do_ah_init           = osp_md_ah_init,
+       .do_attr_get          = osp_md_attr_get,
+       .do_declare_attr_set  = osp_md_declare_attr_set,
+       .do_attr_set          = osp_md_attr_set,
+       .do_declare_xattr_set = osp_md_declare_xattr_set,
+       .do_xattr_set         = osp_md_xattr_set,
+       .do_xattr_get         = osp_md_xattr_get,
+       .do_index_try         = osp_md_index_try,
+};
+
index e5ffe31..695a31e 100644 (file)
@@ -356,15 +356,30 @@ static int is_ost_obj(struct lu_object *lo)
 }
 
 static int osp_object_init(const struct lu_env *env, struct lu_object *o,
-                          const struct lu_object_conf *unused)
+                          const struct lu_object_conf *conf)
 {
        struct osp_object       *po = lu2osp_obj(o);
        int                     rc = 0;
        ENTRY;
 
-       if (is_ost_obj(o))
+       if (is_ost_obj(o)) {
                po->opo_obj.do_ops = &osp_obj_ops;
-
+       } else {
+               struct lu_attr          *la = &osp_env_info(env)->osi_attr;
+
+               po->opo_obj.do_ops = &osp_md_obj_ops;
+               o->lo_header->loh_attr |= LOHA_REMOTE;
+               /* Do not need get attr for new object */
+               if (!(conf != NULL && (conf->loc_flags & LOC_F_NEW) != 0)) {
+                       rc = po->opo_obj.do_ops->do_attr_get(env, lu2dt_obj(o),
+                                                            la, NULL);
+                       if (rc == 0)
+                               o->lo_header->loh_attr |=
+                                       LOHA_EXISTS | (la->la_mode & S_IFMT);
+                       if (rc == -ENOENT)
+                               rc = 0;
+               }
+       }
        RETURN(rc);
 }
 
index 2903d1e..fe04c8c 100644 (file)
@@ -69,6 +69,7 @@
 #endif
 /* struct ptlrpc_request, lustre_msg* */
 #include <lustre_req_layout.h>
+#include <lustre_update.h>
 #include <lustre_acl.h>
 #include <lustre_debug.h>
 
@@ -480,6 +481,16 @@ static const struct req_msg_field *mds_setattr_server[] = {
         &RMF_CAPA2
 };
 
+static const struct req_msg_field *mds_update_client[] = {
+       &RMF_PTLRPC_BODY,
+       &RMF_UPDATE,
+};
+
+static const struct req_msg_field *mds_update_server[] = {
+       &RMF_PTLRPC_BODY,
+       &RMF_UPDATE_REPLY,
+};
+
 static const struct req_msg_field *llog_origin_handle_create_client[] = {
         &RMF_PTLRPC_BODY,
         &RMF_LLOGD_BODY,
@@ -667,7 +678,10 @@ static struct req_format *req_formats[] = {
        &RQF_MDS_HSM_STATE_SET,
        &RQF_MDS_HSM_ACTION,
        &RQF_MDS_HSM_REQUEST,
-        &RQF_QC_CALLBACK,
+
+       &RQF_UPDATE_OBJ,
+
+       &RQF_QC_CALLBACK,
         &RQF_OST_CONNECT,
         &RQF_OST_DISCONNECT,
         &RQF_OST_QUOTACHECK,
@@ -1081,6 +1095,15 @@ struct req_msg_field RMF_MDS_HSM_REQUEST =
                    lustre_swab_hsm_request, NULL);
 EXPORT_SYMBOL(RMF_MDS_HSM_REQUEST);
 
+struct req_msg_field RMF_UPDATE = DEFINE_MSGF("update", 0, -1,
+                                             lustre_swab_update_buf, NULL);
+EXPORT_SYMBOL(RMF_UPDATE);
+
+struct req_msg_field RMF_UPDATE_REPLY = DEFINE_MSGF("update_reply", 0, -1,
+                                               lustre_swab_update_reply_buf,
+                                                   NULL);
+EXPORT_SYMBOL(RMF_UPDATE_REPLY);
+
 /*
  * Request formats.
  */
@@ -1280,6 +1303,11 @@ struct req_format RQF_MDS_GET_INFO =
                         mds_getinfo_server);
 EXPORT_SYMBOL(RQF_MDS_GET_INFO);
 
+struct req_format RQF_UPDATE_OBJ =
+       DEFINE_REQ_FMT0("OBJECT_UPDATE_OBJ", mds_update_client,
+                       mds_update_server);
+EXPORT_SYMBOL(RQF_UPDATE_OBJ);
+
 struct req_format RQF_LDLM_ENQUEUE =
         DEFINE_REQ_FMT0("LDLM_ENQUEUE",
                         ldlm_enqueue_client, ldlm_enqueue_lvb_server);
index ad4777c..814e939 100644 (file)
@@ -134,7 +134,8 @@ struct ll_rpc_opcode {
         { SEC_CTX_INIT,     "sec_ctx_init" },
         { SEC_CTX_INIT_CONT,"sec_ctx_init_cont" },
         { SEC_CTX_FINI,     "sec_ctx_fini" },
-        { FLD_QUERY,        "fld_query" }
+       { FLD_QUERY,        "fld_query" },
+       { UPDATE_OBJ,       "update_obj" },
 };
 
 struct ll_eopcode {
index fb8a810..12b0f3b 100644 (file)
@@ -2544,3 +2544,21 @@ void lustre_swab_hsm_request(struct hsm_request *hr)
 }
 EXPORT_SYMBOL(lustre_swab_hsm_request);
 
+void lustre_swab_update_buf(struct update_buf *ub)
+{
+       __swab32s(&ub->ub_magic);
+       __swab32s(&ub->ub_count);
+}
+EXPORT_SYMBOL(lustre_swab_update_buf);
+
+void lustre_swab_update_reply_buf(struct update_reply *ur)
+{
+       int i;
+
+       __swab32s(&ur->ur_version);
+       __swab32s(&ur->ur_count);
+       for (i = 0; i < ur->ur_count; i++)
+               __swab32s(&ur->ur_lens[i]);
+}
+EXPORT_SYMBOL(lustre_swab_update_reply_buf);
+
index b14eb56..ff1f7bd 100644 (file)
 #include <obd_class.h>
 #include <lustre_net.h>
 #include <lustre_disk.h>
+
 void lustre_assert_wire_constants(void)
 {
         /* Wire protocol assertions generated by 'wirecheck'
          * (make -C lustre/utils newwiretest)
-         * running on Linux centos6-bis 2.6.32-279.2.1.el6-head #0 SMP Thu Aug 9 23:25:09 CEST 2012 x
-         * with gcc version 4.4.6 20110731 (Red Hat 4.4.6-3) (GCC)  */
+         * running on Linux testnode 2.6.32-279.5.1.el6_lustre.g53f705f.x86_64 #1 SMP Mon Oct 8 05:12
+         * with gcc version 4.4.6 20120305 (Red Hat 4.4.6-4) (GCC)  */
 
 
        /* Constants... */
@@ -333,6 +334,10 @@ void lustre_assert_wire_constants(void)
        CLASSERT(LDLM_MAX_TYPE == 14);
        CLASSERT(LUSTRE_RES_ID_SEQ_OFF == 0);
        CLASSERT(LUSTRE_RES_ID_VER_OID_OFF == 1);
+       LASSERTF(UPDATE_OBJ == 1000, "found %lld\n",
+                (long long)UPDATE_OBJ);
+       LASSERTF(UPDATE_LAST_OPC == 1001, "found %lld\n",
+                (long long)UPDATE_LAST_OPC);
        CLASSERT(LUSTRE_RES_ID_QUOTA_SEQ_OFF == 2);
        CLASSERT(LUSTRE_RES_ID_QUOTA_VER_OID_OFF == 3);
        CLASSERT(LUSTRE_RES_ID_HSH_OFF == 3);
@@ -433,6 +438,30 @@ void lustre_assert_wire_constants(void)
                (unsigned)LMAC_HSM);
        LASSERTF(LMAC_SOM == 0x00000002UL, "found 0x%.8xUL\n",
                (unsigned)LMAC_SOM);
+       LASSERTF(OBJ_CREATE == 1, "found %lld\n",
+                (long long)OBJ_CREATE);
+       LASSERTF(OBJ_DESTROY == 2, "found %lld\n",
+                (long long)OBJ_DESTROY);
+       LASSERTF(OBJ_REF_ADD == 3, "found %lld\n",
+                (long long)OBJ_REF_ADD);
+       LASSERTF(OBJ_REF_DEL == 4, "found %lld\n",
+                (long long)OBJ_REF_DEL);
+       LASSERTF(OBJ_ATTR_SET == 5, "found %lld\n",
+                (long long)OBJ_ATTR_SET);
+       LASSERTF(OBJ_ATTR_GET == 6, "found %lld\n",
+                (long long)OBJ_ATTR_GET);
+       LASSERTF(OBJ_XATTR_SET == 7, "found %lld\n",
+                (long long)OBJ_XATTR_SET);
+       LASSERTF(OBJ_XATTR_GET == 8, "found %lld\n",
+                (long long)OBJ_XATTR_GET);
+       LASSERTF(OBJ_INDEX_LOOKUP == 9, "found %lld\n",
+                (long long)OBJ_INDEX_LOOKUP);
+       LASSERTF(OBJ_INDEX_LOOKUP == 9, "found %lld\n",
+                (long long)OBJ_INDEX_LOOKUP);
+       LASSERTF(OBJ_INDEX_INSERT == 10, "found %lld\n",
+                (long long)OBJ_INDEX_INSERT);
+       LASSERTF(OBJ_INDEX_DELETE == 11, "found %lld\n",
+                (long long)OBJ_INDEX_DELETE);
 
        /* Checks for struct som_attrs */
        LASSERTF((int)sizeof(struct som_attrs) == 40, "found %lld\n",
@@ -4422,5 +4451,61 @@ void lustre_assert_wire_constants(void)
                 (long long)(int)offsetof(struct hsm_user_request, hur_user_item));
        LASSERTF((int)sizeof(((struct hsm_user_request *)0)->hur_user_item) == 0, "found %lld\n",
                 (long long)(int)sizeof(((struct hsm_user_request *)0)->hur_user_item));
+
+       /* Checks for struct update_buf */
+       LASSERTF((int)sizeof(struct update_buf) == 8, "found %lld\n",
+                (long long)(int)sizeof(struct update_buf));
+       LASSERTF((int)offsetof(struct update_buf, ub_magic) == 0, "found %lld\n",
+                (long long)(int)offsetof(struct update_buf, ub_magic));
+       LASSERTF((int)sizeof(((struct update_buf *)0)->ub_magic) == 4, "found %lld\n",
+                (long long)(int)sizeof(((struct update_buf *)0)->ub_magic));
+       LASSERTF((int)offsetof(struct update_buf, ub_count) == 4, "found %lld\n",
+                (long long)(int)offsetof(struct update_buf, ub_count));
+       LASSERTF((int)sizeof(((struct update_buf *)0)->ub_count) == 4, "found %lld\n",
+                (long long)(int)sizeof(((struct update_buf *)0)->ub_count));
+       LASSERTF((int)offsetof(struct update_buf, ub_bufs) == 8, "found %lld\n",
+                (long long)(int)offsetof(struct update_buf, ub_bufs));
+       LASSERTF((int)sizeof(((struct update_buf *)0)->ub_bufs) == 0, "found %lld\n",
+                (long long)(int)sizeof(((struct update_buf *)0)->ub_bufs));
+
+       /* Checks for struct update_reply */
+       LASSERTF((int)sizeof(struct update_reply) == 8, "found %lld\n",
+                (long long)(int)sizeof(struct update_reply));
+       LASSERTF((int)offsetof(struct update_reply, ur_version) == 0, "found %lld\n",
+                (long long)(int)offsetof(struct update_reply, ur_version));
+       LASSERTF((int)sizeof(((struct update_reply *)0)->ur_version) == 4, "found %lld\n",
+                (long long)(int)sizeof(((struct update_reply *)0)->ur_version));
+       LASSERTF((int)offsetof(struct update_reply, ur_count) == 4, "found %lld\n",
+                (long long)(int)offsetof(struct update_reply, ur_count));
+       LASSERTF((int)sizeof(((struct update_reply *)0)->ur_count) == 4, "found %lld\n",
+                (long long)(int)sizeof(((struct update_reply *)0)->ur_count));
+       LASSERTF((int)offsetof(struct update_reply, ur_lens) == 8, "found %lld\n",
+                (long long)(int)offsetof(struct update_reply, ur_lens));
+       LASSERTF((int)sizeof(((struct update_reply *)0)->ur_lens) == 0, "found %lld\n",
+                (long long)(int)sizeof(((struct update_reply *)0)->ur_lens));
+
+       /* Checks for struct update */
+       LASSERTF((int)sizeof(struct update) == 56, "found %lld\n",
+                (long long)(int)sizeof(struct update));
+       LASSERTF((int)offsetof(struct update, u_type) == 0, "found %lld\n",
+                (long long)(int)offsetof(struct update, u_type));
+       LASSERTF((int)sizeof(((struct update *)0)->u_type) == 4, "found %lld\n",
+                (long long)(int)sizeof(((struct update *)0)->u_type));
+       LASSERTF((int)offsetof(struct update, u_padding) == 4, "found %lld\n",
+                (long long)(int)offsetof(struct update, u_padding));
+       LASSERTF((int)sizeof(((struct update *)0)->u_padding) == 4, "found %lld\n",
+                (long long)(int)sizeof(((struct update *)0)->u_padding));
+       LASSERTF((int)offsetof(struct update, u_fid) == 8, "found %lld\n",
+                (long long)(int)offsetof(struct update, u_fid));
+       LASSERTF((int)sizeof(((struct update *)0)->u_fid) == 16, "found %lld\n",
+                (long long)(int)sizeof(((struct update *)0)->u_fid));
+       LASSERTF((int)offsetof(struct update, u_lens) == 24, "found %lld\n",
+                (long long)(int)offsetof(struct update, u_lens));
+       LASSERTF((int)sizeof(((struct update *)0)->u_lens) == 32, "found %lld\n",
+                (long long)(int)sizeof(((struct update *)0)->u_lens));
+       LASSERTF((int)offsetof(struct update, u_bufs) == 56, "found %lld\n",
+                (long long)(int)offsetof(struct update, u_bufs));
+       LASSERTF((int)sizeof(((struct update *)0)->u_bufs) == 0, "found %lld\n",
+                (long long)(int)sizeof(((struct update *)0)->u_bufs));
 }
 
index cf41176..df296b5 100644 (file)
@@ -89,6 +89,9 @@
 #define lustre_swab_hsm_state_set NULL
 #define lustre_swab_hsm_current_action NULL
 #define lustre_swab_hsm_request NULL
+#define lustre_swab_update_buf NULL
+#define lustre_swab_update_reply_buf NULL
+
 #define dump_rniobuf NULL
 #define dump_ioo NULL
 #define dump_obdo NULL
index 56b9eab..254f5ec 100644 (file)
@@ -1999,8 +1999,36 @@ check_hsm_user_request(void)
        CHECK_MEMBER(hsm_user_request, hur_user_item);
 }
 
-static void
-system_string (char *cmdline, char *str, int len)
+static void check_update_buf(void)
+{
+       BLANK_LINE();
+       CHECK_STRUCT(update_buf);
+       CHECK_MEMBER(update_buf, ub_magic);
+       CHECK_MEMBER(update_buf, ub_count);
+       CHECK_MEMBER(update_buf, ub_bufs);
+}
+
+static void check_update_reply(void)
+{
+       BLANK_LINE();
+       CHECK_STRUCT(update_reply);
+       CHECK_MEMBER(update_reply, ur_version);
+       CHECK_MEMBER(update_reply, ur_count);
+       CHECK_MEMBER(update_reply, ur_lens);
+}
+
+static void check_update(void)
+{
+       BLANK_LINE();
+       CHECK_STRUCT(update);
+       CHECK_MEMBER(update, u_type);
+       CHECK_MEMBER(update, u_padding);
+       CHECK_MEMBER(update, u_fid);
+       CHECK_MEMBER(update, u_lens);
+       CHECK_MEMBER(update, u_bufs);
+}
+
+static void system_string(char *cmdline, char *str, int len)
 {
        int   fds[2];
        int   rc;
@@ -2233,6 +2261,9 @@ main(int argc, char **argv)
        CHECK_CVALUE(LUSTRE_RES_ID_SEQ_OFF);
        CHECK_CVALUE(LUSTRE_RES_ID_VER_OID_OFF);
        /* CHECK_CVALUE(LUSTRE_RES_ID_WAS_VER_OFF); packed with OID */
+
+       CHECK_VALUE(UPDATE_OBJ);
+       CHECK_VALUE(UPDATE_LAST_OPC);
        CHECK_CVALUE(LUSTRE_RES_ID_QUOTA_SEQ_OFF);
        CHECK_CVALUE(LUSTRE_RES_ID_QUOTA_VER_OID_OFF);
        CHECK_CVALUE(LUSTRE_RES_ID_HSH_OFF);
@@ -2271,6 +2302,20 @@ main(int argc, char **argv)
        CHECK_STRUCT(obd_uuid);
        check_lu_seq_range();
        check_lustre_mdt_attrs();
+
+       CHECK_VALUE(OBJ_CREATE);
+       CHECK_VALUE(OBJ_DESTROY);
+       CHECK_VALUE(OBJ_REF_ADD);
+       CHECK_VALUE(OBJ_REF_DEL);
+       CHECK_VALUE(OBJ_ATTR_SET);
+       CHECK_VALUE(OBJ_ATTR_GET);
+       CHECK_VALUE(OBJ_XATTR_SET);
+       CHECK_VALUE(OBJ_XATTR_GET);
+       CHECK_VALUE(OBJ_INDEX_LOOKUP);
+       CHECK_VALUE(OBJ_INDEX_LOOKUP);
+       CHECK_VALUE(OBJ_INDEX_INSERT);
+       CHECK_VALUE(OBJ_INDEX_DELETE);
+
        check_som_attrs();
        check_hsm_attrs();
        check_ost_id();
@@ -2370,6 +2415,10 @@ main(int argc, char **argv)
        check_hsm_request();
        check_hsm_user_request();
 
+       check_update_buf();
+       check_update_reply();
+       check_update();
+
        printf("}\n\n");
 
        return 0;
index 306a444..692664a 100644 (file)
@@ -58,12 +58,13 @@ int main()
 
         return ret;
 }
+
 void lustre_assert_wire_constants(void)
 {
         /* Wire protocol assertions generated by 'wirecheck'
          * (make -C lustre/utils newwiretest)
-         * running on Linux centos6-bis 2.6.32-279.2.1.el6-head #0 SMP Thu Aug 9 23:25:09 CEST 2012 x
-         * with gcc version 4.4.6 20110731 (Red Hat 4.4.6-3) (GCC)  */
+         * running on Linux testnode 2.6.32-279.5.1.el6_lustre.g53f705f.x86_64 #1 SMP Mon Oct 8 05:12
+         * with gcc version 4.4.6 20120305 (Red Hat 4.4.6-4) (GCC)  */
 
 
        /* Constants... */
@@ -341,6 +342,10 @@ void lustre_assert_wire_constants(void)
        CLASSERT(LDLM_MAX_TYPE == 14);
        CLASSERT(LUSTRE_RES_ID_SEQ_OFF == 0);
        CLASSERT(LUSTRE_RES_ID_VER_OID_OFF == 1);
+       LASSERTF(UPDATE_OBJ == 1000, "found %lld\n",
+                (long long)UPDATE_OBJ);
+       LASSERTF(UPDATE_LAST_OPC == 1001, "found %lld\n",
+                (long long)UPDATE_LAST_OPC);
        CLASSERT(LUSTRE_RES_ID_QUOTA_SEQ_OFF == 2);
        CLASSERT(LUSTRE_RES_ID_QUOTA_VER_OID_OFF == 3);
        CLASSERT(LUSTRE_RES_ID_HSH_OFF == 3);
@@ -441,6 +446,30 @@ void lustre_assert_wire_constants(void)
                (unsigned)LMAC_HSM);
        LASSERTF(LMAC_SOM == 0x00000002UL, "found 0x%.8xUL\n",
                (unsigned)LMAC_SOM);
+       LASSERTF(OBJ_CREATE == 1, "found %lld\n",
+                (long long)OBJ_CREATE);
+       LASSERTF(OBJ_DESTROY == 2, "found %lld\n",
+                (long long)OBJ_DESTROY);
+       LASSERTF(OBJ_REF_ADD == 3, "found %lld\n",
+                (long long)OBJ_REF_ADD);
+       LASSERTF(OBJ_REF_DEL == 4, "found %lld\n",
+                (long long)OBJ_REF_DEL);
+       LASSERTF(OBJ_ATTR_SET == 5, "found %lld\n",
+                (long long)OBJ_ATTR_SET);
+       LASSERTF(OBJ_ATTR_GET == 6, "found %lld\n",
+                (long long)OBJ_ATTR_GET);
+       LASSERTF(OBJ_XATTR_SET == 7, "found %lld\n",
+                (long long)OBJ_XATTR_SET);
+       LASSERTF(OBJ_XATTR_GET == 8, "found %lld\n",
+                (long long)OBJ_XATTR_GET);
+       LASSERTF(OBJ_INDEX_LOOKUP == 9, "found %lld\n",
+                (long long)OBJ_INDEX_LOOKUP);
+       LASSERTF(OBJ_INDEX_LOOKUP == 9, "found %lld\n",
+                (long long)OBJ_INDEX_LOOKUP);
+       LASSERTF(OBJ_INDEX_INSERT == 10, "found %lld\n",
+                (long long)OBJ_INDEX_INSERT);
+       LASSERTF(OBJ_INDEX_DELETE == 11, "found %lld\n",
+                (long long)OBJ_INDEX_DELETE);
 
        /* Checks for struct som_attrs */
        LASSERTF((int)sizeof(struct som_attrs) == 40, "found %lld\n",
@@ -4430,5 +4459,61 @@ void lustre_assert_wire_constants(void)
                 (long long)(int)offsetof(struct hsm_user_request, hur_user_item));
        LASSERTF((int)sizeof(((struct hsm_user_request *)0)->hur_user_item) == 0, "found %lld\n",
                 (long long)(int)sizeof(((struct hsm_user_request *)0)->hur_user_item));
+
+       /* Checks for struct update_buf */
+       LASSERTF((int)sizeof(struct update_buf) == 8, "found %lld\n",
+                (long long)(int)sizeof(struct update_buf));
+       LASSERTF((int)offsetof(struct update_buf, ub_magic) == 0, "found %lld\n",
+                (long long)(int)offsetof(struct update_buf, ub_magic));
+       LASSERTF((int)sizeof(((struct update_buf *)0)->ub_magic) == 4, "found %lld\n",
+                (long long)(int)sizeof(((struct update_buf *)0)->ub_magic));
+       LASSERTF((int)offsetof(struct update_buf, ub_count) == 4, "found %lld\n",
+                (long long)(int)offsetof(struct update_buf, ub_count));
+       LASSERTF((int)sizeof(((struct update_buf *)0)->ub_count) == 4, "found %lld\n",
+                (long long)(int)sizeof(((struct update_buf *)0)->ub_count));
+       LASSERTF((int)offsetof(struct update_buf, ub_bufs) == 8, "found %lld\n",
+                (long long)(int)offsetof(struct update_buf, ub_bufs));
+       LASSERTF((int)sizeof(((struct update_buf *)0)->ub_bufs) == 0, "found %lld\n",
+                (long long)(int)sizeof(((struct update_buf *)0)->ub_bufs));
+
+       /* Checks for struct update_reply */
+       LASSERTF((int)sizeof(struct update_reply) == 8, "found %lld\n",
+                (long long)(int)sizeof(struct update_reply));
+       LASSERTF((int)offsetof(struct update_reply, ur_version) == 0, "found %lld\n",
+                (long long)(int)offsetof(struct update_reply, ur_version));
+       LASSERTF((int)sizeof(((struct update_reply *)0)->ur_version) == 4, "found %lld\n",
+                (long long)(int)sizeof(((struct update_reply *)0)->ur_version));
+       LASSERTF((int)offsetof(struct update_reply, ur_count) == 4, "found %lld\n",
+                (long long)(int)offsetof(struct update_reply, ur_count));
+       LASSERTF((int)sizeof(((struct update_reply *)0)->ur_count) == 4, "found %lld\n",
+                (long long)(int)sizeof(((struct update_reply *)0)->ur_count));
+       LASSERTF((int)offsetof(struct update_reply, ur_lens) == 8, "found %lld\n",
+                (long long)(int)offsetof(struct update_reply, ur_lens));
+       LASSERTF((int)sizeof(((struct update_reply *)0)->ur_lens) == 0, "found %lld\n",
+                (long long)(int)sizeof(((struct update_reply *)0)->ur_lens));
+
+       /* Checks for struct update */
+       LASSERTF((int)sizeof(struct update) == 56, "found %lld\n",
+                (long long)(int)sizeof(struct update));
+       LASSERTF((int)offsetof(struct update, u_type) == 0, "found %lld\n",
+                (long long)(int)offsetof(struct update, u_type));
+       LASSERTF((int)sizeof(((struct update *)0)->u_type) == 4, "found %lld\n",
+                (long long)(int)sizeof(((struct update *)0)->u_type));
+       LASSERTF((int)offsetof(struct update, u_padding) == 4, "found %lld\n",
+                (long long)(int)offsetof(struct update, u_padding));
+       LASSERTF((int)sizeof(((struct update *)0)->u_padding) == 4, "found %lld\n",
+                (long long)(int)sizeof(((struct update *)0)->u_padding));
+       LASSERTF((int)offsetof(struct update, u_fid) == 8, "found %lld\n",
+                (long long)(int)offsetof(struct update, u_fid));
+       LASSERTF((int)sizeof(((struct update *)0)->u_fid) == 16, "found %lld\n",
+                (long long)(int)sizeof(((struct update *)0)->u_fid));
+       LASSERTF((int)offsetof(struct update, u_lens) == 24, "found %lld\n",
+                (long long)(int)offsetof(struct update, u_lens));
+       LASSERTF((int)sizeof(((struct update *)0)->u_lens) == 32, "found %lld\n",
+                (long long)(int)sizeof(((struct update *)0)->u_lens));
+       LASSERTF((int)offsetof(struct update, u_bufs) == 56, "found %lld\n",
+                (long long)(int)offsetof(struct update, u_bufs));
+       LASSERTF((int)sizeof(((struct update *)0)->u_bufs) == 0, "found %lld\n",
+                (long long)(int)sizeof(((struct update *)0)->u_bufs));
 }