Whamcloud - gitweb
LU-3529 lod: create striped directory
[fs/lustre-release.git] / lustre / osp / osp_md_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, Intel Corporation.
24  */
25 /*
26  * lustre/osp/osp_md_object.c
27  *
28  * Lustre MDT Proxy Device
29  *
30  * Author: Di Wang <di.wang@intel.com>
31  */
32
33 #define DEBUG_SUBSYSTEM S_MDS
34
35 #include <lustre_log.h>
36 #include "osp_internal.h"
37
38 static const char dot[] = ".";
39 static const char dotdot[] = "..";
40
41 static int osp_md_declare_object_create(const struct lu_env *env,
42                                         struct dt_object *dt,
43                                         struct lu_attr *attr,
44                                         struct dt_allocation_hint *hint,
45                                         struct dt_object_format *dof,
46                                         struct thandle *th)
47 {
48         struct osp_thread_info  *osi = osp_env_info(env);
49         struct update_request   *update;
50         struct lu_fid           *fid1;
51         int                     sizes[2] = {sizeof(struct obdo), 0};
52         char                    *bufs[2] = {NULL, NULL};
53         int                     buf_count;
54         int                     rc;
55
56         update = out_find_create_update_loc(th, dt);
57         if (IS_ERR(update)) {
58                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
59                        dt->do_lu.lo_dev->ld_obd->obd_name,
60                        (int)PTR_ERR(update));
61                 return PTR_ERR(update);
62         }
63
64         osi->osi_obdo.o_valid = 0;
65         LASSERT(S_ISDIR(attr->la_mode));
66         obdo_from_la(&osi->osi_obdo, attr, attr->la_valid);
67         lustre_set_wire_obdo(NULL, &osi->osi_obdo, &osi->osi_obdo);
68         obdo_cpu_to_le(&osi->osi_obdo, &osi->osi_obdo);
69
70         bufs[0] = (char *)&osi->osi_obdo;
71         buf_count = 1;
72         fid1 = (struct lu_fid *)lu_object_fid(&dt->do_lu);
73         if (hint != NULL && hint->dah_parent) {
74                 struct lu_fid *fid2;
75                 struct lu_fid *tmp_fid = &osi->osi_fid;
76
77                 fid2 = (struct lu_fid *)lu_object_fid(&hint->dah_parent->do_lu);
78                 fid_cpu_to_le(tmp_fid, fid2);
79                 sizes[1] = sizeof(*tmp_fid);
80                 bufs[1] = (char *)tmp_fid;
81                 buf_count++;
82         }
83
84         if (lu_object_exists(&dt->do_lu)) {
85                 /* If the object already exists, we needs to destroy
86                  * this orphan object first.
87                  *
88                  * The scenario might happen in this case
89                  *
90                  * 1. client send remote create to MDT0.
91                  * 2. MDT0 send create update to MDT1.
92                  * 3. MDT1 finished create synchronously.
93                  * 4. MDT0 failed and reboot.
94                  * 5. client resend remote create to MDT0.
95                  * 6. MDT0 tries to resend create update to MDT1,
96                  *    but find the object already exists
97                  */
98                 CDEBUG(D_HA, "%s: object "DFID" exists, destroy this orphan\n",
99                        dt->do_lu.lo_dev->ld_obd->obd_name, PFID(fid1));
100
101                 rc = out_insert_update(env, update, OBJ_REF_DEL, fid1, 0,
102                                        NULL, NULL);
103                 if (rc != 0)
104                         GOTO(out, rc);
105
106                 if (S_ISDIR(lu_object_attr(&dt->do_lu))) {
107                         /* decrease for ".." */
108                         rc = out_insert_update(env, update, OBJ_REF_DEL, fid1,
109                                                0, NULL, NULL);
110                         if (rc != 0)
111                                 GOTO(out, rc);
112                 }
113
114                 rc = out_insert_update(env, update, OBJ_DESTROY, fid1, 0, NULL,
115                                        NULL);
116                 if (rc != 0)
117                         GOTO(out, rc);
118
119                 dt->do_lu.lo_header->loh_attr &= ~LOHA_EXISTS;
120                 /* Increase batchid to add this orphan object deletion
121                  * to separate transaction */
122                 update_inc_batchid(update);
123         }
124
125         rc = out_insert_update(env, update, OBJ_CREATE, fid1, buf_count, sizes,
126                                (const char **)bufs);
127 out:
128         if (rc)
129                 CERROR("%s: Insert update error: rc = %d\n",
130                        dt->do_lu.lo_dev->ld_obd->obd_name, rc);
131
132         return rc;
133 }
134
135 static int osp_md_object_create(const struct lu_env *env, struct dt_object *dt,
136                                 struct lu_attr *attr,
137                                 struct dt_allocation_hint *hint,
138                                 struct dt_object_format *dof,
139                                 struct thandle *th)
140 {
141         struct osp_object  *obj = dt2osp_obj(dt);
142
143         CDEBUG(D_INFO, "create object "DFID"\n",
144                PFID(&dt->do_lu.lo_header->loh_fid));
145
146         /* Because the create update RPC will be sent during declare phase,
147          * if creation reaches here, it means the object has been created
148          * successfully */
149         dt->do_lu.lo_header->loh_attr |= LOHA_EXISTS | (attr->la_mode & S_IFMT);
150         obj->opo_empty = 1;
151
152         return 0;
153 }
154
155 static int osp_md_declare_object_ref_del(const struct lu_env *env,
156                                          struct dt_object *dt,
157                                          struct thandle *th)
158 {
159         struct update_request   *update;
160         struct lu_fid           *fid;
161         int                     rc;
162
163         update = out_find_create_update_loc(th, dt);
164         if (IS_ERR(update)) {
165                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
166                        dt->do_lu.lo_dev->ld_obd->obd_name,
167                       (int)PTR_ERR(update));
168                 return PTR_ERR(update);
169         }
170
171         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
172
173         rc = out_insert_update(env, update, OBJ_REF_DEL, fid, 0, NULL, NULL);
174
175         return rc;
176 }
177
178 static int osp_md_object_ref_del(const struct lu_env *env,
179                                  struct dt_object *dt,
180                                  struct thandle *th)
181 {
182         CDEBUG(D_INFO, "ref del object "DFID"\n",
183                PFID(&dt->do_lu.lo_header->loh_fid));
184
185         return 0;
186 }
187
188 static int osp_md_declare_ref_add(const struct lu_env *env,
189                                   struct dt_object *dt, struct thandle *th)
190 {
191         struct update_request   *update;
192         struct lu_fid           *fid;
193         int                     rc;
194
195         update = out_find_create_update_loc(th, dt);
196         if (IS_ERR(update)) {
197                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
198                        dt->do_lu.lo_dev->ld_obd->obd_name,
199                        (int)PTR_ERR(update));
200                 return PTR_ERR(update);
201         }
202
203         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
204
205         rc = out_insert_update(env, update, OBJ_REF_ADD, fid, 0, NULL, NULL);
206
207         return rc;
208 }
209
210 static int osp_md_object_ref_add(const struct lu_env *env,
211                                  struct dt_object *dt,
212                                  struct thandle *th)
213 {
214         CDEBUG(D_INFO, "ref add object "DFID"\n",
215                PFID(&dt->do_lu.lo_header->loh_fid));
216
217         return 0;
218 }
219
220 static void osp_md_ah_init(const struct lu_env *env,
221                            struct dt_allocation_hint *ah,
222                            struct dt_object *parent,
223                            struct dt_object *child,
224                            umode_t child_mode)
225 {
226         LASSERT(ah);
227
228         memset(ah, 0, sizeof(*ah));
229         ah->dah_parent = parent;
230         ah->dah_mode = child_mode;
231 }
232
233 static int osp_md_declare_attr_set(const struct lu_env *env,
234                                    struct dt_object *dt,
235                                    const struct lu_attr *attr,
236                                    struct thandle *th)
237 {
238         struct osp_thread_info *osi = osp_env_info(env);
239         struct update_request  *update;
240         struct lu_fid          *fid;
241         int                     size = sizeof(struct obdo);
242         char                   *buf;
243         int                     rc;
244
245         update = out_find_create_update_loc(th, dt);
246         if (IS_ERR(update)) {
247                 CERROR("%s: Get OSP update buf failed: %d\n",
248                        dt->do_lu.lo_dev->ld_obd->obd_name,
249                        (int)PTR_ERR(update));
250                 return PTR_ERR(update);
251         }
252
253         osi->osi_obdo.o_valid = 0;
254         obdo_from_la(&osi->osi_obdo, (struct lu_attr *)attr,
255                      attr->la_valid);
256         lustre_set_wire_obdo(NULL, &osi->osi_obdo, &osi->osi_obdo);
257         obdo_cpu_to_le(&osi->osi_obdo, &osi->osi_obdo);
258
259         buf = (char *)&osi->osi_obdo;
260         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
261
262         rc = out_insert_update(env, update, OBJ_ATTR_SET, fid, 1, &size,
263                                (const char **)&buf);
264
265         return rc;
266 }
267
268 static int osp_md_attr_set(const struct lu_env *env, struct dt_object *dt,
269                            const struct lu_attr *attr, struct thandle *th,
270                            struct lustre_capa *capa)
271 {
272         CDEBUG(D_INFO, "attr set object "DFID"\n",
273                PFID(&dt->do_lu.lo_header->loh_fid));
274
275         RETURN(0);
276 }
277
278 static void osp_md_object_read_lock(const struct lu_env *env,
279                                     struct dt_object *dt, unsigned role)
280 {
281         struct osp_object  *obj = dt2osp_obj(dt);
282
283         LASSERT(obj->opo_owner != env);
284         down_read_nested(&obj->opo_sem, role);
285
286         LASSERT(obj->opo_owner == NULL);
287 }
288
289 static void osp_md_object_write_lock(const struct lu_env *env,
290                                      struct dt_object *dt, unsigned role)
291 {
292         struct osp_object *obj = dt2osp_obj(dt);
293
294         down_write_nested(&obj->opo_sem, role);
295
296         LASSERT(obj->opo_owner == NULL);
297         obj->opo_owner = env;
298 }
299
300 static void osp_md_object_read_unlock(const struct lu_env *env,
301                                       struct dt_object *dt)
302 {
303         struct osp_object *obj = dt2osp_obj(dt);
304
305         up_read(&obj->opo_sem);
306 }
307
308 static void osp_md_object_write_unlock(const struct lu_env *env,
309                                        struct dt_object *dt)
310 {
311         struct osp_object *obj = dt2osp_obj(dt);
312
313         LASSERT(obj->opo_owner == env);
314         obj->opo_owner = NULL;
315         up_write(&obj->opo_sem);
316 }
317
318 static int osp_md_object_write_locked(const struct lu_env *env,
319                                       struct dt_object *dt)
320 {
321         struct osp_object *obj = dt2osp_obj(dt);
322
323         return obj->opo_owner == env;
324 }
325
326 static int osp_md_index_lookup(const struct lu_env *env, struct dt_object *dt,
327                                struct dt_rec *rec, const struct dt_key *key,
328                                struct lustre_capa *capa)
329 {
330         struct lu_buf           *lbuf   = &osp_env_info(env)->osi_lb2;
331         struct osp_device       *osp    = lu2osp_dev(dt->do_lu.lo_dev);
332         struct dt_device        *dt_dev = &osp->opd_dt_dev;
333         struct update_request   *update;
334         struct ptlrpc_request   *req = NULL;
335         int                     size = strlen((char *)key) + 1;
336         int                     rc;
337         struct update_reply     *reply;
338         struct lu_fid           *fid;
339
340         ENTRY;
341
342         /* Because it needs send the update buffer right away,
343          * just create an update buffer, instead of attaching the
344          * update_remote list of the thandle.
345          */
346         update = out_create_update_req(dt_dev);
347         if (IS_ERR(update))
348                 RETURN(PTR_ERR(update));
349
350         rc = out_insert_update(env, update, OBJ_INDEX_LOOKUP,
351                                lu_object_fid(&dt->do_lu), 1, &size,
352                                (const char **)&key);
353         if (rc) {
354                 CERROR("%s: Insert update error: rc = %d\n",
355                        dt_dev->dd_lu_dev.ld_obd->obd_name, rc);
356                 GOTO(out, rc);
357         }
358
359         rc = out_remote_sync(env, osp->opd_obd->u.cli.cl_import, update, &req);
360         if (rc < 0)
361                 GOTO(out, rc);
362
363         reply = req_capsule_server_sized_get(&req->rq_pill, &RMF_UPDATE_REPLY,
364                                              UPDATE_BUFFER_SIZE);
365         if (reply->ur_version != UPDATE_REPLY_V1) {
366                 CERROR("%s: Wrong version %x expected %x: rc = %d\n",
367                        dt_dev->dd_lu_dev.ld_obd->obd_name,
368                        reply->ur_version, UPDATE_REPLY_V1, -EPROTO);
369                 GOTO(out, rc = -EPROTO);
370         }
371
372         rc = update_get_reply_result(reply, NULL, 0);
373         if (rc < 0) {
374                 CERROR("%s: wrong version lookup "DFID" %s: rc = %d\n",
375                        dt_dev->dd_lu_dev.ld_obd->obd_name,
376                        PFID(lu_object_fid(&dt->do_lu)), (char *)key, rc);
377                 GOTO(out, rc);
378         }
379
380         rc = update_get_reply_buf(reply, lbuf, 0);
381         if (rc < 0)
382                 GOTO(out, rc);
383
384         if (lbuf->lb_len != sizeof(*fid)) {
385                 CERROR("%s: lookup "DFID" %s wrong size %d\n",
386                        dt_dev->dd_lu_dev.ld_obd->obd_name,
387                        PFID(lu_object_fid(&dt->do_lu)), (char *)key,
388                        (int)lbuf->lb_len);
389                 GOTO(out, rc = -EINVAL);
390         }
391
392         fid = lbuf->lb_buf;
393         fid_le_to_cpu(fid, fid);
394         if (!fid_is_sane(fid)) {
395                 CERROR("%s: lookup "DFID" %s invalid fid "DFID"\n",
396                        dt_dev->dd_lu_dev.ld_obd->obd_name,
397                        PFID(lu_object_fid(&dt->do_lu)), (char *)key, PFID(fid));
398                 GOTO(out, rc = -EINVAL);
399         }
400
401         memcpy(rec, fid, sizeof(*fid));
402
403         GOTO(out, rc = 1);
404
405 out:
406         if (req != NULL)
407                 ptlrpc_req_finished(req);
408
409         out_destroy_update_req(update);
410
411         return rc;
412 }
413
414 static int osp_md_declare_insert(const struct lu_env *env,
415                                  struct dt_object *dt,
416                                  const struct dt_rec *rec,
417                                  const struct dt_key *key,
418                                  struct thandle *th)
419 {
420         struct update_request   *update;
421         struct lu_fid           *fid;
422         struct lu_fid           *rec_fid = (struct lu_fid *)rec;
423         int                     size[2] = {strlen((char *)key) + 1,
424                                                   sizeof(*rec_fid)};
425         const char              *bufs[2] = {(char *)key, (char *)rec_fid};
426         int                     rc;
427
428         update = out_find_create_update_loc(th, dt);
429         if (IS_ERR(update)) {
430                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
431                        dt->do_lu.lo_dev->ld_obd->obd_name,
432                        (int)PTR_ERR(update));
433                 return PTR_ERR(update);
434         }
435
436         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
437
438         CDEBUG(D_INFO, "%s: insert index of "DFID" %s: "DFID"\n",
439                dt->do_lu.lo_dev->ld_obd->obd_name,
440                PFID(fid), (char *)key, PFID(rec_fid));
441
442         fid_cpu_to_le(rec_fid, rec_fid);
443
444         rc = out_insert_update(env, update, OBJ_INDEX_INSERT, fid,
445                                ARRAY_SIZE(size), size, bufs);
446         return rc;
447 }
448
449 static int osp_md_index_insert(const struct lu_env *env,
450                                struct dt_object *dt,
451                                const struct dt_rec *rec,
452                                const struct dt_key *key,
453                                struct thandle *th,
454                                struct lustre_capa *capa,
455                                int ignore_quota)
456 {
457         return 0;
458 }
459
460 static int osp_md_declare_delete(const struct lu_env *env,
461                                  struct dt_object *dt,
462                                  const struct dt_key *key,
463                                  struct thandle *th)
464 {
465         struct update_request *update;
466         struct lu_fid *fid;
467         int size = strlen((char *)key) + 1;
468         int rc;
469
470         update = out_find_create_update_loc(th, dt);
471         if (IS_ERR(update)) {
472                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
473                        dt->do_lu.lo_dev->ld_obd->obd_name,
474                        (int)PTR_ERR(update));
475                 return PTR_ERR(update);
476         }
477
478         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
479
480         rc = out_insert_update(env, update, OBJ_INDEX_DELETE, fid, 1, &size,
481                                (const char **)&key);
482
483         return rc;
484 }
485
486 static int osp_md_index_delete(const struct lu_env *env,
487                                struct dt_object *dt,
488                                const struct dt_key *key,
489                                struct thandle *th,
490                                struct lustre_capa *capa)
491 {
492         CDEBUG(D_INFO, "index delete "DFID" %s\n",
493                PFID(&dt->do_lu.lo_header->loh_fid), (char *)key);
494
495         return 0;
496 }
497
498 /**
499  * Creates or initializes iterator context.
500  *
501  * Note: for OSP, these index iterate api is only used to check
502  * whether the directory is empty now (see mdd_dir_is_empty).
503  * Since dir_empty will be return by OBJ_ATTR_GET(see osp_attr_get/
504  * out_attr_get). So the implementation of these iterator is simplied
505  * to make mdd_dir_is_empty happy. The real iterator should be
506  * implemented, if we need it one day.
507  */
508 static struct dt_it *osp_it_init(const struct lu_env *env,
509                                  struct dt_object *dt,
510                                  __u32 attr,
511                                 struct lustre_capa *capa)
512 {
513         lu_object_get(&dt->do_lu);
514         return (struct dt_it *)dt;
515 }
516
517 static void osp_it_fini(const struct lu_env *env, struct dt_it *di)
518 {
519         struct dt_object *dt = (struct dt_object *)di;
520         lu_object_put(env, &dt->do_lu);
521 }
522
523 static int osp_it_get(const struct lu_env *env,
524                       struct dt_it *di, const struct dt_key *key)
525 {
526         return 1;
527 }
528
529 static void osp_it_put(const struct lu_env *env, struct dt_it *di)
530 {
531         return;
532 }
533
534 static int osp_it_next(const struct lu_env *env, struct dt_it *di)
535 {
536         struct dt_object *dt = (struct dt_object *)di;
537         struct osp_object *o = dt2osp_obj(dt);
538
539         if (o->opo_empty)
540                 return 1;
541
542         return 0;
543 }
544
545 static struct dt_key *osp_it_key(const struct lu_env *env,
546                                  const struct dt_it *di)
547 {
548         LBUG();
549         return NULL;
550 }
551
552 static int osp_it_key_size(const struct lu_env *env, const struct dt_it *di)
553 {
554         LBUG();
555         return 0;
556 }
557
558 static int osp_it_rec(const struct lu_env *env, const struct dt_it *di,
559                       struct dt_rec *lde, __u32 attr)
560 {
561         LBUG();
562         return 0;
563 }
564
565 static __u64 osp_it_store(const struct lu_env *env, const struct dt_it *di)
566 {
567         LBUG();
568         return 0;
569 }
570
571 static int osp_it_load(const struct lu_env *env, const struct dt_it *di,
572                        __u64 hash)
573 {
574         LBUG();
575         return 0;
576 }
577
578 static int osp_it_key_rec(const struct lu_env *env, const struct dt_it *di,
579                           void *key_rec)
580 {
581         LBUG();
582         return 0;
583 }
584
585 static const struct dt_index_operations osp_md_index_ops = {
586         .dio_lookup         = osp_md_index_lookup,
587         .dio_declare_insert = osp_md_declare_insert,
588         .dio_insert         = osp_md_index_insert,
589         .dio_declare_delete = osp_md_declare_delete,
590         .dio_delete         = osp_md_index_delete,
591         .dio_it     = {
592                 .init     = osp_it_init,
593                 .fini     = osp_it_fini,
594                 .get      = osp_it_get,
595                 .put      = osp_it_put,
596                 .next     = osp_it_next,
597                 .key      = osp_it_key,
598                 .key_size = osp_it_key_size,
599                 .rec      = osp_it_rec,
600                 .store    = osp_it_store,
601                 .load     = osp_it_load,
602                 .key_rec  = osp_it_key_rec,
603         }
604 };
605
606 static int osp_md_index_try(const struct lu_env *env,
607                             struct dt_object *dt,
608                             const struct dt_index_features *feat)
609 {
610         dt->do_index_ops = &osp_md_index_ops;
611         return 0;
612 }
613
614 static int osp_md_object_lock(const struct lu_env *env,
615                               struct dt_object *dt,
616                               struct lustre_handle *lh,
617                               struct ldlm_enqueue_info *einfo,
618                               void *policy)
619 {
620         struct osp_thread_info *info = osp_env_info(env);
621         struct ldlm_res_id     *res_id = &info->osi_resid;
622         struct dt_device       *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
623         struct osp_device      *osp = dt2osp_dev(dt_dev);
624         struct ptlrpc_request  *req = NULL;
625         int                     rc = 0;
626         __u64                   flags = 0;
627         ldlm_mode_t             mode;
628
629         fid_build_reg_res_name(lu_object_fid(&dt->do_lu), res_id);
630
631         mode = ldlm_lock_match(osp->opd_obd->obd_namespace,
632                                LDLM_FL_BLOCK_GRANTED, res_id,
633                                einfo->ei_type,
634                                (ldlm_policy_data_t *)policy,
635                                einfo->ei_mode, lh, 0);
636         if (mode > 0)
637                 return ELDLM_OK;
638
639         req = ldlm_enqueue_pack(osp->opd_exp, 0);
640         if (IS_ERR(req))
641                 RETURN(PTR_ERR(req));
642
643         rc = ldlm_cli_enqueue(osp->opd_exp, &req, einfo, res_id,
644                               (const ldlm_policy_data_t *)policy,
645                               &flags, NULL, 0, LVB_T_NONE, lh, 0);
646
647         ptlrpc_req_finished(req);
648
649         return rc == ELDLM_OK ? 0 : -EIO;
650 }
651
652 struct dt_object_operations osp_md_obj_ops = {
653         .do_read_lock         = osp_md_object_read_lock,
654         .do_write_lock        = osp_md_object_write_lock,
655         .do_read_unlock       = osp_md_object_read_unlock,
656         .do_write_unlock      = osp_md_object_write_unlock,
657         .do_write_locked      = osp_md_object_write_locked,
658         .do_declare_create    = osp_md_declare_object_create,
659         .do_create            = osp_md_object_create,
660         .do_declare_ref_add   = osp_md_declare_ref_add,
661         .do_ref_add           = osp_md_object_ref_add,
662         .do_declare_ref_del   = osp_md_declare_object_ref_del,
663         .do_ref_del           = osp_md_object_ref_del,
664         .do_declare_destroy   = osp_declare_object_destroy,
665         .do_destroy           = osp_object_destroy,
666         .do_ah_init           = osp_md_ah_init,
667         .do_attr_get          = osp_attr_get,
668         .do_declare_attr_set  = osp_md_declare_attr_set,
669         .do_attr_set          = osp_md_attr_set,
670         .do_xattr_get         = osp_xattr_get,
671         .do_declare_xattr_set = osp_declare_xattr_set,
672         .do_xattr_set         = osp_xattr_set,
673         .do_index_try         = osp_md_index_try,
674         .do_object_lock       = osp_md_object_lock,
675 };