Whamcloud - gitweb
6ef301972e01140d42c327570f4f5ca00eb663db
[fs/lustre-release.git] / lustre / osp / osp_md_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, Intel Corporation.
24  */
25 /*
26  * lustre/osp/osp_md_object.c
27  *
28  * Lustre MDT Proxy Device
29  *
30  * Author: Di Wang <di.wang@intel.com>
31  */
32
33 #define DEBUG_SUBSYSTEM S_MDS
34
35 #include <lustre_log.h>
36 #include "osp_internal.h"
37
38 static const char dot[] = ".";
39 static const char dotdot[] = "..";
40
41 int osp_md_declare_object_create(const struct lu_env *env,
42                                  struct dt_object *dt,
43                                  struct lu_attr *attr,
44                                  struct dt_allocation_hint *hint,
45                                  struct dt_object_format *dof,
46                                  struct thandle *th)
47 {
48         struct osp_thread_info  *osi = osp_env_info(env);
49         struct update_request   *update;
50         struct lu_fid           *fid1;
51         int                     sizes[2] = {sizeof(struct obdo), 0};
52         char                    *bufs[2] = {NULL, NULL};
53         int                     buf_count;
54         int                     rc;
55
56         update = out_find_create_update_loc(th, dt);
57         if (IS_ERR(update)) {
58                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
59                        dt->do_lu.lo_dev->ld_obd->obd_name,
60                        (int)PTR_ERR(update));
61                 return PTR_ERR(update);
62         }
63
64         osi->osi_obdo.o_valid = 0;
65         obdo_from_la(&osi->osi_obdo, attr, attr->la_valid);
66         lustre_set_wire_obdo(NULL, &osi->osi_obdo, &osi->osi_obdo);
67         obdo_cpu_to_le(&osi->osi_obdo, &osi->osi_obdo);
68
69         bufs[0] = (char *)&osi->osi_obdo;
70         buf_count = 1;
71         fid1 = (struct lu_fid *)lu_object_fid(&dt->do_lu);
72         if (hint != NULL && hint->dah_parent) {
73                 struct lu_fid *fid2;
74                 struct lu_fid *tmp_fid = &osi->osi_fid;
75
76                 fid2 = (struct lu_fid *)lu_object_fid(&hint->dah_parent->do_lu);
77                 fid_cpu_to_le(tmp_fid, fid2);
78                 sizes[1] = sizeof(*tmp_fid);
79                 bufs[1] = (char *)tmp_fid;
80                 buf_count++;
81         }
82
83         if (lu_object_exists(&dt->do_lu)) {
84                 /* If the object already exists, we needs to destroy
85                  * this orphan object first.
86                  *
87                  * The scenario might happen in this case
88                  *
89                  * 1. client send remote create to MDT0.
90                  * 2. MDT0 send create update to MDT1.
91                  * 3. MDT1 finished create synchronously.
92                  * 4. MDT0 failed and reboot.
93                  * 5. client resend remote create to MDT0.
94                  * 6. MDT0 tries to resend create update to MDT1,
95                  *    but find the object already exists
96                  */
97                 CDEBUG(D_HA, "%s: object "DFID" exists, destroy this orphan\n",
98                        dt->do_lu.lo_dev->ld_obd->obd_name, PFID(fid1));
99
100                 rc = out_insert_update(env, update, OBJ_REF_DEL, fid1, 0,
101                                        NULL, NULL);
102                 if (rc != 0)
103                         GOTO(out, rc);
104
105                 if (S_ISDIR(lu_object_attr(&dt->do_lu))) {
106                         /* decrease for ".." */
107                         rc = out_insert_update(env, update, OBJ_REF_DEL, fid1,
108                                                0, NULL, NULL);
109                         if (rc != 0)
110                                 GOTO(out, rc);
111                 }
112
113                 rc = out_insert_update(env, update, OBJ_DESTROY, fid1, 0, NULL,
114                                        NULL);
115                 if (rc != 0)
116                         GOTO(out, rc);
117
118                 dt->do_lu.lo_header->loh_attr &= ~LOHA_EXISTS;
119                 /* Increase batchid to add this orphan object deletion
120                  * to separate transaction */
121                 update_inc_batchid(update);
122         }
123
124         rc = out_insert_update(env, update, OBJ_CREATE, fid1, buf_count, sizes,
125                                (const char **)bufs);
126 out:
127         if (rc)
128                 CERROR("%s: Insert update error: rc = %d\n",
129                        dt->do_lu.lo_dev->ld_obd->obd_name, rc);
130
131         return rc;
132 }
133
134 int osp_md_object_create(const struct lu_env *env, struct dt_object *dt,
135                          struct lu_attr *attr, struct dt_allocation_hint *hint,
136                          struct dt_object_format *dof, struct thandle *th)
137 {
138         struct osp_object  *obj = dt2osp_obj(dt);
139
140         CDEBUG(D_INFO, "create object "DFID"\n",
141                PFID(&dt->do_lu.lo_header->loh_fid));
142
143         /* Because the create update RPC will be sent during declare phase,
144          * if creation reaches here, it means the object has been created
145          * successfully */
146         dt->do_lu.lo_header->loh_attr |= LOHA_EXISTS | (attr->la_mode & S_IFMT);
147         if (S_ISDIR(attr->la_mode))
148                 obj->opo_empty = 1;
149
150         return 0;
151 }
152
153 static int osp_md_declare_object_ref_del(const struct lu_env *env,
154                                          struct dt_object *dt,
155                                          struct thandle *th)
156 {
157         struct update_request   *update;
158         struct lu_fid           *fid;
159         int                     rc;
160
161         update = out_find_create_update_loc(th, dt);
162         if (IS_ERR(update)) {
163                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
164                        dt->do_lu.lo_dev->ld_obd->obd_name,
165                       (int)PTR_ERR(update));
166                 return PTR_ERR(update);
167         }
168
169         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
170
171         rc = out_insert_update(env, update, OBJ_REF_DEL, fid, 0, NULL, NULL);
172
173         return rc;
174 }
175
176 static int osp_md_object_ref_del(const struct lu_env *env,
177                                  struct dt_object *dt,
178                                  struct thandle *th)
179 {
180         CDEBUG(D_INFO, "ref del object "DFID"\n",
181                PFID(&dt->do_lu.lo_header->loh_fid));
182
183         return 0;
184 }
185
186 static int osp_md_declare_ref_add(const struct lu_env *env,
187                                   struct dt_object *dt, struct thandle *th)
188 {
189         struct update_request   *update;
190         struct lu_fid           *fid;
191         int                     rc;
192
193         update = out_find_create_update_loc(th, dt);
194         if (IS_ERR(update)) {
195                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
196                        dt->do_lu.lo_dev->ld_obd->obd_name,
197                        (int)PTR_ERR(update));
198                 return PTR_ERR(update);
199         }
200
201         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
202
203         rc = out_insert_update(env, update, OBJ_REF_ADD, fid, 0, NULL, NULL);
204
205         return rc;
206 }
207
208 static int osp_md_object_ref_add(const struct lu_env *env,
209                                  struct dt_object *dt,
210                                  struct thandle *th)
211 {
212         CDEBUG(D_INFO, "ref add object "DFID"\n",
213                PFID(&dt->do_lu.lo_header->loh_fid));
214
215         return 0;
216 }
217
218 static void osp_md_ah_init(const struct lu_env *env,
219                            struct dt_allocation_hint *ah,
220                            struct dt_object *parent,
221                            struct dt_object *child,
222                            umode_t child_mode)
223 {
224         LASSERT(ah);
225
226         memset(ah, 0, sizeof(*ah));
227         ah->dah_parent = parent;
228         ah->dah_mode = child_mode;
229 }
230
231 static int osp_md_declare_attr_set(const struct lu_env *env,
232                                    struct dt_object *dt,
233                                    const struct lu_attr *attr,
234                                    struct thandle *th)
235 {
236         struct osp_thread_info *osi = osp_env_info(env);
237         struct update_request  *update;
238         struct lu_fid          *fid;
239         int                     size = sizeof(struct obdo);
240         char                   *buf;
241         int                     rc;
242
243         update = out_find_create_update_loc(th, dt);
244         if (IS_ERR(update)) {
245                 CERROR("%s: Get OSP update buf failed: %d\n",
246                        dt->do_lu.lo_dev->ld_obd->obd_name,
247                        (int)PTR_ERR(update));
248                 return PTR_ERR(update);
249         }
250
251         osi->osi_obdo.o_valid = 0;
252         obdo_from_la(&osi->osi_obdo, (struct lu_attr *)attr,
253                      attr->la_valid);
254         lustre_set_wire_obdo(NULL, &osi->osi_obdo, &osi->osi_obdo);
255         obdo_cpu_to_le(&osi->osi_obdo, &osi->osi_obdo);
256
257         buf = (char *)&osi->osi_obdo;
258         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
259
260         rc = out_insert_update(env, update, OBJ_ATTR_SET, fid, 1, &size,
261                                (const char **)&buf);
262
263         return rc;
264 }
265
266 static int osp_md_attr_set(const struct lu_env *env, struct dt_object *dt,
267                            const struct lu_attr *attr, struct thandle *th,
268                            struct lustre_capa *capa)
269 {
270         CDEBUG(D_INFO, "attr set object "DFID"\n",
271                PFID(&dt->do_lu.lo_header->loh_fid));
272
273         RETURN(0);
274 }
275
276 static void osp_md_object_read_lock(const struct lu_env *env,
277                                     struct dt_object *dt, unsigned role)
278 {
279         struct osp_object  *obj = dt2osp_obj(dt);
280
281         LASSERT(obj->opo_owner != env);
282         down_read_nested(&obj->opo_sem, role);
283
284         LASSERT(obj->opo_owner == NULL);
285 }
286
287 static void osp_md_object_write_lock(const struct lu_env *env,
288                                      struct dt_object *dt, unsigned role)
289 {
290         struct osp_object *obj = dt2osp_obj(dt);
291
292         down_write_nested(&obj->opo_sem, role);
293
294         LASSERT(obj->opo_owner == NULL);
295         obj->opo_owner = env;
296 }
297
298 static void osp_md_object_read_unlock(const struct lu_env *env,
299                                       struct dt_object *dt)
300 {
301         struct osp_object *obj = dt2osp_obj(dt);
302
303         up_read(&obj->opo_sem);
304 }
305
306 static void osp_md_object_write_unlock(const struct lu_env *env,
307                                        struct dt_object *dt)
308 {
309         struct osp_object *obj = dt2osp_obj(dt);
310
311         LASSERT(obj->opo_owner == env);
312         obj->opo_owner = NULL;
313         up_write(&obj->opo_sem);
314 }
315
316 static int osp_md_object_write_locked(const struct lu_env *env,
317                                       struct dt_object *dt)
318 {
319         struct osp_object *obj = dt2osp_obj(dt);
320
321         return obj->opo_owner == env;
322 }
323
324 static int osp_md_index_lookup(const struct lu_env *env, struct dt_object *dt,
325                                struct dt_rec *rec, const struct dt_key *key,
326                                struct lustre_capa *capa)
327 {
328         struct lu_buf           *lbuf   = &osp_env_info(env)->osi_lb2;
329         struct osp_device       *osp    = lu2osp_dev(dt->do_lu.lo_dev);
330         struct dt_device        *dt_dev = &osp->opd_dt_dev;
331         struct update_request   *update;
332         struct ptlrpc_request   *req = NULL;
333         int                     size = strlen((char *)key) + 1;
334         int                     rc;
335         struct update_reply     *reply;
336         struct lu_fid           *fid;
337
338         ENTRY;
339
340         /* Because it needs send the update buffer right away,
341          * just create an update buffer, instead of attaching the
342          * update_remote list of the thandle.
343          */
344         update = out_create_update_req(dt_dev);
345         if (IS_ERR(update))
346                 RETURN(PTR_ERR(update));
347
348         rc = out_insert_update(env, update, OBJ_INDEX_LOOKUP,
349                                lu_object_fid(&dt->do_lu), 1, &size,
350                                (const char **)&key);
351         if (rc) {
352                 CERROR("%s: Insert update error: rc = %d\n",
353                        dt_dev->dd_lu_dev.ld_obd->obd_name, rc);
354                 GOTO(out, rc);
355         }
356
357         rc = out_remote_sync(env, osp->opd_obd->u.cli.cl_import, update, &req);
358         if (rc < 0)
359                 GOTO(out, rc);
360
361         reply = req_capsule_server_sized_get(&req->rq_pill, &RMF_UPDATE_REPLY,
362                                              UPDATE_BUFFER_SIZE);
363         if (reply->ur_version != UPDATE_REPLY_V1) {
364                 CERROR("%s: Wrong version %x expected %x: rc = %d\n",
365                        dt_dev->dd_lu_dev.ld_obd->obd_name,
366                        reply->ur_version, UPDATE_REPLY_V1, -EPROTO);
367                 GOTO(out, rc = -EPROTO);
368         }
369
370         rc = update_get_reply_result(reply, NULL, 0);
371         if (rc < 0) {
372                 CERROR("%s: wrong version lookup "DFID" %s: rc = %d\n",
373                        dt_dev->dd_lu_dev.ld_obd->obd_name,
374                        PFID(lu_object_fid(&dt->do_lu)), (char *)key, rc);
375                 GOTO(out, rc);
376         }
377
378         rc = update_get_reply_buf(reply, lbuf, 0);
379         if (rc < 0)
380                 GOTO(out, rc);
381
382         if (lbuf->lb_len != sizeof(*fid)) {
383                 CERROR("%s: lookup "DFID" %s wrong size %d\n",
384                        dt_dev->dd_lu_dev.ld_obd->obd_name,
385                        PFID(lu_object_fid(&dt->do_lu)), (char *)key,
386                        (int)lbuf->lb_len);
387                 GOTO(out, rc = -EINVAL);
388         }
389
390         fid = lbuf->lb_buf;
391         fid_le_to_cpu(fid, fid);
392         if (!fid_is_sane(fid)) {
393                 CERROR("%s: lookup "DFID" %s invalid fid "DFID"\n",
394                        dt_dev->dd_lu_dev.ld_obd->obd_name,
395                        PFID(lu_object_fid(&dt->do_lu)), (char *)key, PFID(fid));
396                 GOTO(out, rc = -EINVAL);
397         }
398
399         memcpy(rec, fid, sizeof(*fid));
400
401         GOTO(out, rc = 1);
402
403 out:
404         if (req != NULL)
405                 ptlrpc_req_finished(req);
406
407         out_destroy_update_req(update);
408
409         return rc;
410 }
411
412 static int osp_md_declare_insert(const struct lu_env *env,
413                                  struct dt_object *dt,
414                                  const struct dt_rec *rec,
415                                  const struct dt_key *key,
416                                  struct thandle *th)
417 {
418         struct update_request   *update;
419         struct lu_fid           *fid;
420         struct lu_fid           *rec_fid = (struct lu_fid *)rec;
421         int                     size[2] = {strlen((char *)key) + 1,
422                                                   sizeof(*rec_fid)};
423         const char              *bufs[2] = {(char *)key, (char *)rec_fid};
424         int                     rc;
425
426         update = out_find_create_update_loc(th, dt);
427         if (IS_ERR(update)) {
428                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
429                        dt->do_lu.lo_dev->ld_obd->obd_name,
430                        (int)PTR_ERR(update));
431                 return PTR_ERR(update);
432         }
433
434         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
435
436         CDEBUG(D_INFO, "%s: insert index of "DFID" %s: "DFID"\n",
437                dt->do_lu.lo_dev->ld_obd->obd_name,
438                PFID(fid), (char *)key, PFID(rec_fid));
439
440         fid_cpu_to_le(rec_fid, rec_fid);
441
442         rc = out_insert_update(env, update, OBJ_INDEX_INSERT, fid,
443                                ARRAY_SIZE(size), size, bufs);
444         return rc;
445 }
446
447 static int osp_md_index_insert(const struct lu_env *env,
448                                struct dt_object *dt,
449                                const struct dt_rec *rec,
450                                const struct dt_key *key,
451                                struct thandle *th,
452                                struct lustre_capa *capa,
453                                int ignore_quota)
454 {
455         return 0;
456 }
457
458 static int osp_md_declare_delete(const struct lu_env *env,
459                                  struct dt_object *dt,
460                                  const struct dt_key *key,
461                                  struct thandle *th)
462 {
463         struct update_request *update;
464         struct lu_fid *fid;
465         int size = strlen((char *)key) + 1;
466         int rc;
467
468         update = out_find_create_update_loc(th, dt);
469         if (IS_ERR(update)) {
470                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
471                        dt->do_lu.lo_dev->ld_obd->obd_name,
472                        (int)PTR_ERR(update));
473                 return PTR_ERR(update);
474         }
475
476         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
477
478         rc = out_insert_update(env, update, OBJ_INDEX_DELETE, fid, 1, &size,
479                                (const char **)&key);
480
481         return rc;
482 }
483
484 static int osp_md_index_delete(const struct lu_env *env,
485                                struct dt_object *dt,
486                                const struct dt_key *key,
487                                struct thandle *th,
488                                struct lustre_capa *capa)
489 {
490         CDEBUG(D_INFO, "index delete "DFID" %s\n",
491                PFID(&dt->do_lu.lo_header->loh_fid), (char *)key);
492
493         return 0;
494 }
495
496 /**
497  * Creates or initializes iterator context.
498  *
499  * Note: for OSP, these index iterate api is only used to check
500  * whether the directory is empty now (see mdd_dir_is_empty).
501  * Since dir_empty will be return by OBJ_ATTR_GET(see osp_attr_get/
502  * out_attr_get). So the implementation of these iterator is simplied
503  * to make mdd_dir_is_empty happy. The real iterator should be
504  * implemented, if we need it one day.
505  */
506 static struct dt_it *osp_it_init(const struct lu_env *env,
507                                  struct dt_object *dt,
508                                  __u32 attr,
509                                 struct lustre_capa *capa)
510 {
511         lu_object_get(&dt->do_lu);
512         return (struct dt_it *)dt;
513 }
514
515 static void osp_it_fini(const struct lu_env *env, struct dt_it *di)
516 {
517         struct dt_object *dt = (struct dt_object *)di;
518         lu_object_put(env, &dt->do_lu);
519 }
520
521 static int osp_it_get(const struct lu_env *env,
522                       struct dt_it *di, const struct dt_key *key)
523 {
524         return 1;
525 }
526
527 static void osp_it_put(const struct lu_env *env, struct dt_it *di)
528 {
529         return;
530 }
531
532 static int osp_it_next(const struct lu_env *env, struct dt_it *di)
533 {
534         struct dt_object *dt = (struct dt_object *)di;
535         struct osp_object *o = dt2osp_obj(dt);
536
537         if (o->opo_empty)
538                 return 1;
539
540         return 0;
541 }
542
543 static struct dt_key *osp_it_key(const struct lu_env *env,
544                                  const struct dt_it *di)
545 {
546         LBUG();
547         return NULL;
548 }
549
550 static int osp_it_key_size(const struct lu_env *env, const struct dt_it *di)
551 {
552         LBUG();
553         return 0;
554 }
555
556 static int osp_it_rec(const struct lu_env *env, const struct dt_it *di,
557                       struct dt_rec *lde, __u32 attr)
558 {
559         LBUG();
560         return 0;
561 }
562
563 static __u64 osp_it_store(const struct lu_env *env, const struct dt_it *di)
564 {
565         LBUG();
566         return 0;
567 }
568
569 static int osp_it_load(const struct lu_env *env, const struct dt_it *di,
570                        __u64 hash)
571 {
572         LBUG();
573         return 0;
574 }
575
576 static int osp_it_key_rec(const struct lu_env *env, const struct dt_it *di,
577                           void *key_rec)
578 {
579         LBUG();
580         return 0;
581 }
582
583 static const struct dt_index_operations osp_md_index_ops = {
584         .dio_lookup         = osp_md_index_lookup,
585         .dio_declare_insert = osp_md_declare_insert,
586         .dio_insert         = osp_md_index_insert,
587         .dio_declare_delete = osp_md_declare_delete,
588         .dio_delete         = osp_md_index_delete,
589         .dio_it     = {
590                 .init     = osp_it_init,
591                 .fini     = osp_it_fini,
592                 .get      = osp_it_get,
593                 .put      = osp_it_put,
594                 .next     = osp_it_next,
595                 .key      = osp_it_key,
596                 .key_size = osp_it_key_size,
597                 .rec      = osp_it_rec,
598                 .store    = osp_it_store,
599                 .load     = osp_it_load,
600                 .key_rec  = osp_it_key_rec,
601         }
602 };
603
604 static int osp_md_index_try(const struct lu_env *env,
605                             struct dt_object *dt,
606                             const struct dt_index_features *feat)
607 {
608         dt->do_index_ops = &osp_md_index_ops;
609         return 0;
610 }
611
612 static int osp_md_object_lock(const struct lu_env *env,
613                               struct dt_object *dt,
614                               struct lustre_handle *lh,
615                               struct ldlm_enqueue_info *einfo,
616                               void *policy)
617 {
618         struct osp_thread_info *info = osp_env_info(env);
619         struct ldlm_res_id     *res_id = &info->osi_resid;
620         struct dt_device       *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
621         struct osp_device      *osp = dt2osp_dev(dt_dev);
622         struct ptlrpc_request  *req = NULL;
623         int                     rc = 0;
624         __u64                   flags = 0;
625         ldlm_mode_t             mode;
626
627         fid_build_reg_res_name(lu_object_fid(&dt->do_lu), res_id);
628
629         mode = ldlm_lock_match(osp->opd_obd->obd_namespace,
630                                LDLM_FL_BLOCK_GRANTED, res_id,
631                                einfo->ei_type,
632                                (ldlm_policy_data_t *)policy,
633                                einfo->ei_mode, lh, 0);
634         if (mode > 0)
635                 return ELDLM_OK;
636
637         req = ldlm_enqueue_pack(osp->opd_exp, 0);
638         if (IS_ERR(req))
639                 RETURN(PTR_ERR(req));
640
641         rc = ldlm_cli_enqueue(osp->opd_exp, &req, einfo, res_id,
642                               (const ldlm_policy_data_t *)policy,
643                               &flags, NULL, 0, LVB_T_NONE, lh, 0);
644
645         ptlrpc_req_finished(req);
646
647         return rc == ELDLM_OK ? 0 : -EIO;
648 }
649
650 struct dt_object_operations osp_md_obj_ops = {
651         .do_read_lock         = osp_md_object_read_lock,
652         .do_write_lock        = osp_md_object_write_lock,
653         .do_read_unlock       = osp_md_object_read_unlock,
654         .do_write_unlock      = osp_md_object_write_unlock,
655         .do_write_locked      = osp_md_object_write_locked,
656         .do_declare_create    = osp_md_declare_object_create,
657         .do_create            = osp_md_object_create,
658         .do_declare_ref_add   = osp_md_declare_ref_add,
659         .do_ref_add           = osp_md_object_ref_add,
660         .do_declare_ref_del   = osp_md_declare_object_ref_del,
661         .do_ref_del           = osp_md_object_ref_del,
662         .do_declare_destroy   = osp_declare_object_destroy,
663         .do_destroy           = osp_object_destroy,
664         .do_ah_init           = osp_md_ah_init,
665         .do_attr_get          = osp_attr_get,
666         .do_declare_attr_set  = osp_md_declare_attr_set,
667         .do_attr_set          = osp_md_attr_set,
668         .do_xattr_get         = osp_xattr_get,
669         .do_declare_xattr_set = osp_declare_xattr_set,
670         .do_xattr_set         = osp_xattr_set,
671         .do_index_try         = osp_md_index_try,
672         .do_object_lock       = osp_md_object_lock,
673 };