Whamcloud - gitweb
LU-3591 lfsck: repair unmatched MDT-OST objects pairs
[fs/lustre-release.git] / lustre / osp / osp_md_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, Intel Corporation.
24  */
25 /*
26  * lustre/osp/osp_md_object.c
27  *
28  * Lustre MDT Proxy Device
29  *
30  * Author: Di Wang <di.wang@intel.com>
31  */
32
33 #define DEBUG_SUBSYSTEM S_MDS
34
35 #include <lustre_log.h>
36 #include "osp_internal.h"
37
38 static const char dot[] = ".";
39 static const char dotdot[] = "..";
40
41 int osp_md_declare_object_create(const struct lu_env *env,
42                                  struct dt_object *dt,
43                                  struct lu_attr *attr,
44                                  struct dt_allocation_hint *hint,
45                                  struct dt_object_format *dof,
46                                  struct thandle *th)
47 {
48         struct osp_thread_info  *osi = osp_env_info(env);
49         struct update_request   *update;
50         struct lu_fid           *fid1;
51         int                     sizes[2] = {sizeof(struct obdo), 0};
52         char                    *bufs[2] = {NULL, NULL};
53         int                     buf_count;
54         int                     rc;
55
56         update = out_find_create_update_loc(th, dt);
57         if (IS_ERR(update)) {
58                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
59                        dt->do_lu.lo_dev->ld_obd->obd_name,
60                        (int)PTR_ERR(update));
61                 return PTR_ERR(update);
62         }
63
64         osi->osi_obdo.o_valid = 0;
65         obdo_from_la(&osi->osi_obdo, attr, attr->la_valid);
66         lustre_set_wire_obdo(NULL, &osi->osi_obdo, &osi->osi_obdo);
67         obdo_cpu_to_le(&osi->osi_obdo, &osi->osi_obdo);
68
69         bufs[0] = (char *)&osi->osi_obdo;
70         buf_count = 1;
71         fid1 = (struct lu_fid *)lu_object_fid(&dt->do_lu);
72         if (hint != NULL && hint->dah_parent) {
73                 struct lu_fid *fid2;
74                 struct lu_fid *tmp_fid = &osi->osi_fid;
75
76                 fid2 = (struct lu_fid *)lu_object_fid(&hint->dah_parent->do_lu);
77                 fid_cpu_to_le(tmp_fid, fid2);
78                 sizes[1] = sizeof(*tmp_fid);
79                 bufs[1] = (char *)tmp_fid;
80                 buf_count++;
81         }
82
83         if (lu_object_exists(&dt->do_lu)) {
84                 /* If the object already exists, we needs to destroy
85                  * this orphan object first.
86                  *
87                  * The scenario might happen in this case
88                  *
89                  * 1. client send remote create to MDT0.
90                  * 2. MDT0 send create update to MDT1.
91                  * 3. MDT1 finished create synchronously.
92                  * 4. MDT0 failed and reboot.
93                  * 5. client resend remote create to MDT0.
94                  * 6. MDT0 tries to resend create update to MDT1,
95                  *    but find the object already exists
96                  */
97                 CDEBUG(D_HA, "%s: object "DFID" exists, destroy this orphan\n",
98                        dt->do_lu.lo_dev->ld_obd->obd_name, PFID(fid1));
99
100                 rc = out_insert_update(env, update, OBJ_REF_DEL, fid1, 0,
101                                        NULL, NULL);
102                 if (rc != 0)
103                         GOTO(out, rc);
104
105                 if (S_ISDIR(lu_object_attr(&dt->do_lu))) {
106                         /* decrease for ".." */
107                         rc = out_insert_update(env, update, OBJ_REF_DEL, fid1,
108                                                0, NULL, NULL);
109                         if (rc != 0)
110                                 GOTO(out, rc);
111                 }
112
113                 rc = out_insert_update(env, update, OBJ_DESTROY, fid1, 0, NULL,
114                                        NULL);
115                 if (rc != 0)
116                         GOTO(out, rc);
117
118                 dt->do_lu.lo_header->loh_attr &= ~LOHA_EXISTS;
119                 /* Increase batchid to add this orphan object deletion
120                  * to separate transaction */
121                 update_inc_batchid(update);
122         }
123
124         rc = out_insert_update(env, update, OBJ_CREATE, fid1, buf_count, sizes,
125                                (const char **)bufs);
126 out:
127         if (rc)
128                 CERROR("%s: Insert update error: rc = %d\n",
129                        dt->do_lu.lo_dev->ld_obd->obd_name, rc);
130
131         return rc;
132 }
133
134 int osp_md_object_create(const struct lu_env *env, struct dt_object *dt,
135                          struct lu_attr *attr, struct dt_allocation_hint *hint,
136                          struct dt_object_format *dof, struct thandle *th)
137 {
138         struct osp_object  *obj = dt2osp_obj(dt);
139
140         CDEBUG(D_INFO, "create object "DFID"\n",
141                PFID(&dt->do_lu.lo_header->loh_fid));
142
143         /* Because the create update RPC will be sent during declare phase,
144          * if creation reaches here, it means the object has been created
145          * successfully */
146         dt->do_lu.lo_header->loh_attr |= LOHA_EXISTS | (attr->la_mode & S_IFMT);
147         obj->opo_empty = 1;
148
149         return 0;
150 }
151
152 static int osp_md_declare_object_ref_del(const struct lu_env *env,
153                                          struct dt_object *dt,
154                                          struct thandle *th)
155 {
156         struct update_request   *update;
157         struct lu_fid           *fid;
158         int                     rc;
159
160         update = out_find_create_update_loc(th, dt);
161         if (IS_ERR(update)) {
162                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
163                        dt->do_lu.lo_dev->ld_obd->obd_name,
164                       (int)PTR_ERR(update));
165                 return PTR_ERR(update);
166         }
167
168         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
169
170         rc = out_insert_update(env, update, OBJ_REF_DEL, fid, 0, NULL, NULL);
171
172         return rc;
173 }
174
175 static int osp_md_object_ref_del(const struct lu_env *env,
176                                  struct dt_object *dt,
177                                  struct thandle *th)
178 {
179         CDEBUG(D_INFO, "ref del object "DFID"\n",
180                PFID(&dt->do_lu.lo_header->loh_fid));
181
182         return 0;
183 }
184
185 static int osp_md_declare_ref_add(const struct lu_env *env,
186                                   struct dt_object *dt, struct thandle *th)
187 {
188         struct update_request   *update;
189         struct lu_fid           *fid;
190         int                     rc;
191
192         update = out_find_create_update_loc(th, dt);
193         if (IS_ERR(update)) {
194                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
195                        dt->do_lu.lo_dev->ld_obd->obd_name,
196                        (int)PTR_ERR(update));
197                 return PTR_ERR(update);
198         }
199
200         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
201
202         rc = out_insert_update(env, update, OBJ_REF_ADD, fid, 0, NULL, NULL);
203
204         return rc;
205 }
206
207 static int osp_md_object_ref_add(const struct lu_env *env,
208                                  struct dt_object *dt,
209                                  struct thandle *th)
210 {
211         CDEBUG(D_INFO, "ref add object "DFID"\n",
212                PFID(&dt->do_lu.lo_header->loh_fid));
213
214         return 0;
215 }
216
217 static void osp_md_ah_init(const struct lu_env *env,
218                            struct dt_allocation_hint *ah,
219                            struct dt_object *parent,
220                            struct dt_object *child,
221                            umode_t child_mode)
222 {
223         LASSERT(ah);
224
225         memset(ah, 0, sizeof(*ah));
226         ah->dah_parent = parent;
227         ah->dah_mode = child_mode;
228 }
229
230 int osp_md_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
231                             const struct lu_attr *attr, struct thandle *th)
232 {
233         struct osp_thread_info *osi = osp_env_info(env);
234         struct update_request  *update;
235         struct lu_fid          *fid;
236         int                     size = sizeof(struct obdo);
237         char                   *buf;
238         int                     rc;
239
240         update = out_find_create_update_loc(th, dt);
241         if (IS_ERR(update)) {
242                 CERROR("%s: Get OSP update buf failed: %d\n",
243                        dt->do_lu.lo_dev->ld_obd->obd_name,
244                        (int)PTR_ERR(update));
245                 return PTR_ERR(update);
246         }
247
248         osi->osi_obdo.o_valid = 0;
249         obdo_from_la(&osi->osi_obdo, (struct lu_attr *)attr,
250                      attr->la_valid);
251         lustre_set_wire_obdo(NULL, &osi->osi_obdo, &osi->osi_obdo);
252         obdo_cpu_to_le(&osi->osi_obdo, &osi->osi_obdo);
253
254         buf = (char *)&osi->osi_obdo;
255         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
256
257         rc = out_insert_update(env, update, OBJ_ATTR_SET, fid, 1, &size,
258                                (const char **)&buf);
259
260         return rc;
261 }
262
263 int osp_md_attr_set(const struct lu_env *env, struct dt_object *dt,
264                     const struct lu_attr *attr, struct thandle *th,
265                     struct lustre_capa *capa)
266 {
267         CDEBUG(D_INFO, "attr set object "DFID"\n",
268                PFID(&dt->do_lu.lo_header->loh_fid));
269
270         RETURN(0);
271 }
272
273 static void osp_md_object_read_lock(const struct lu_env *env,
274                                     struct dt_object *dt, unsigned role)
275 {
276         struct osp_object  *obj = dt2osp_obj(dt);
277
278         LASSERT(obj->opo_owner != env);
279         down_read_nested(&obj->opo_sem, role);
280
281         LASSERT(obj->opo_owner == NULL);
282 }
283
284 static void osp_md_object_write_lock(const struct lu_env *env,
285                                      struct dt_object *dt, unsigned role)
286 {
287         struct osp_object *obj = dt2osp_obj(dt);
288
289         down_write_nested(&obj->opo_sem, role);
290
291         LASSERT(obj->opo_owner == NULL);
292         obj->opo_owner = env;
293 }
294
295 static void osp_md_object_read_unlock(const struct lu_env *env,
296                                       struct dt_object *dt)
297 {
298         struct osp_object *obj = dt2osp_obj(dt);
299
300         up_read(&obj->opo_sem);
301 }
302
303 static void osp_md_object_write_unlock(const struct lu_env *env,
304                                        struct dt_object *dt)
305 {
306         struct osp_object *obj = dt2osp_obj(dt);
307
308         LASSERT(obj->opo_owner == env);
309         obj->opo_owner = NULL;
310         up_write(&obj->opo_sem);
311 }
312
313 static int osp_md_object_write_locked(const struct lu_env *env,
314                                       struct dt_object *dt)
315 {
316         struct osp_object *obj = dt2osp_obj(dt);
317
318         return obj->opo_owner == env;
319 }
320
321 static int osp_md_index_lookup(const struct lu_env *env, struct dt_object *dt,
322                                struct dt_rec *rec, const struct dt_key *key,
323                                struct lustre_capa *capa)
324 {
325         struct lu_buf           *lbuf   = &osp_env_info(env)->osi_lb2;
326         struct osp_device       *osp    = lu2osp_dev(dt->do_lu.lo_dev);
327         struct dt_device        *dt_dev = &osp->opd_dt_dev;
328         struct update_request   *update;
329         struct ptlrpc_request   *req = NULL;
330         int                     size = strlen((char *)key) + 1;
331         int                     rc;
332         struct update_reply     *reply;
333         struct lu_fid           *fid;
334
335         ENTRY;
336
337         /* Because it needs send the update buffer right away,
338          * just create an update buffer, instead of attaching the
339          * update_remote list of the thandle.
340          */
341         update = out_create_update_req(dt_dev);
342         if (IS_ERR(update))
343                 RETURN(PTR_ERR(update));
344
345         rc = out_insert_update(env, update, OBJ_INDEX_LOOKUP,
346                                lu_object_fid(&dt->do_lu), 1, &size,
347                                (const char **)&key);
348         if (rc) {
349                 CERROR("%s: Insert update error: rc = %d\n",
350                        dt_dev->dd_lu_dev.ld_obd->obd_name, rc);
351                 GOTO(out, rc);
352         }
353
354         rc = out_remote_sync(env, osp->opd_obd->u.cli.cl_import, update, &req);
355         if (rc < 0)
356                 GOTO(out, rc);
357
358         reply = req_capsule_server_sized_get(&req->rq_pill, &RMF_UPDATE_REPLY,
359                                              UPDATE_BUFFER_SIZE);
360         if (reply->ur_version != UPDATE_REPLY_V1) {
361                 CERROR("%s: Wrong version %x expected %x: rc = %d\n",
362                        dt_dev->dd_lu_dev.ld_obd->obd_name,
363                        reply->ur_version, UPDATE_REPLY_V1, -EPROTO);
364                 GOTO(out, rc = -EPROTO);
365         }
366
367         rc = update_get_reply_result(reply, NULL, 0);
368         if (rc < 0) {
369                 CERROR("%s: wrong version lookup "DFID" %s: rc = %d\n",
370                        dt_dev->dd_lu_dev.ld_obd->obd_name,
371                        PFID(lu_object_fid(&dt->do_lu)), (char *)key, rc);
372                 GOTO(out, rc);
373         }
374
375         rc = update_get_reply_buf(reply, lbuf, 0);
376         if (rc < 0)
377                 GOTO(out, rc);
378
379         if (lbuf->lb_len != sizeof(*fid)) {
380                 CERROR("%s: lookup "DFID" %s wrong size %d\n",
381                        dt_dev->dd_lu_dev.ld_obd->obd_name,
382                        PFID(lu_object_fid(&dt->do_lu)), (char *)key,
383                        (int)lbuf->lb_len);
384                 GOTO(out, rc = -EINVAL);
385         }
386
387         fid = lbuf->lb_buf;
388         fid_le_to_cpu(fid, fid);
389         if (!fid_is_sane(fid)) {
390                 CERROR("%s: lookup "DFID" %s invalid fid "DFID"\n",
391                        dt_dev->dd_lu_dev.ld_obd->obd_name,
392                        PFID(lu_object_fid(&dt->do_lu)), (char *)key, PFID(fid));
393                 GOTO(out, rc = -EINVAL);
394         }
395
396         memcpy(rec, fid, sizeof(*fid));
397
398         GOTO(out, rc = 1);
399
400 out:
401         if (req != NULL)
402                 ptlrpc_req_finished(req);
403
404         out_destroy_update_req(update);
405
406         return rc;
407 }
408
409 static int osp_md_declare_insert(const struct lu_env *env,
410                                  struct dt_object *dt,
411                                  const struct dt_rec *rec,
412                                  const struct dt_key *key,
413                                  struct thandle *th)
414 {
415         struct update_request   *update;
416         struct lu_fid           *fid;
417         struct lu_fid           *rec_fid = (struct lu_fid *)rec;
418         int                     size[2] = {strlen((char *)key) + 1,
419                                                   sizeof(*rec_fid)};
420         const char              *bufs[2] = {(char *)key, (char *)rec_fid};
421         int                     rc;
422
423         update = out_find_create_update_loc(th, dt);
424         if (IS_ERR(update)) {
425                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
426                        dt->do_lu.lo_dev->ld_obd->obd_name,
427                        (int)PTR_ERR(update));
428                 return PTR_ERR(update);
429         }
430
431         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
432
433         CDEBUG(D_INFO, "%s: insert index of "DFID" %s: "DFID"\n",
434                dt->do_lu.lo_dev->ld_obd->obd_name,
435                PFID(fid), (char *)key, PFID(rec_fid));
436
437         fid_cpu_to_le(rec_fid, rec_fid);
438
439         rc = out_insert_update(env, update, OBJ_INDEX_INSERT, fid,
440                                ARRAY_SIZE(size), size, bufs);
441         return rc;
442 }
443
444 static int osp_md_index_insert(const struct lu_env *env,
445                                struct dt_object *dt,
446                                const struct dt_rec *rec,
447                                const struct dt_key *key,
448                                struct thandle *th,
449                                struct lustre_capa *capa,
450                                int ignore_quota)
451 {
452         return 0;
453 }
454
455 static int osp_md_declare_delete(const struct lu_env *env,
456                                  struct dt_object *dt,
457                                  const struct dt_key *key,
458                                  struct thandle *th)
459 {
460         struct update_request *update;
461         struct lu_fid *fid;
462         int size = strlen((char *)key) + 1;
463         int rc;
464
465         update = out_find_create_update_loc(th, dt);
466         if (IS_ERR(update)) {
467                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
468                        dt->do_lu.lo_dev->ld_obd->obd_name,
469                        (int)PTR_ERR(update));
470                 return PTR_ERR(update);
471         }
472
473         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
474
475         rc = out_insert_update(env, update, OBJ_INDEX_DELETE, fid, 1, &size,
476                                (const char **)&key);
477
478         return rc;
479 }
480
481 static int osp_md_index_delete(const struct lu_env *env,
482                                struct dt_object *dt,
483                                const struct dt_key *key,
484                                struct thandle *th,
485                                struct lustre_capa *capa)
486 {
487         CDEBUG(D_INFO, "index delete "DFID" %s\n",
488                PFID(&dt->do_lu.lo_header->loh_fid), (char *)key);
489
490         return 0;
491 }
492
493 /**
494  * Creates or initializes iterator context.
495  *
496  * Note: for OSP, these index iterate api is only used to check
497  * whether the directory is empty now (see mdd_dir_is_empty).
498  * Since dir_empty will be return by OBJ_ATTR_GET(see osp_attr_get/
499  * out_attr_get). So the implementation of these iterator is simplied
500  * to make mdd_dir_is_empty happy. The real iterator should be
501  * implemented, if we need it one day.
502  */
503 static struct dt_it *osp_it_init(const struct lu_env *env,
504                                  struct dt_object *dt,
505                                  __u32 attr,
506                                 struct lustre_capa *capa)
507 {
508         lu_object_get(&dt->do_lu);
509         return (struct dt_it *)dt;
510 }
511
512 static void osp_it_fini(const struct lu_env *env, struct dt_it *di)
513 {
514         struct dt_object *dt = (struct dt_object *)di;
515         lu_object_put(env, &dt->do_lu);
516 }
517
518 static int osp_it_get(const struct lu_env *env,
519                       struct dt_it *di, const struct dt_key *key)
520 {
521         return 1;
522 }
523
524 static void osp_it_put(const struct lu_env *env, struct dt_it *di)
525 {
526         return;
527 }
528
529 static int osp_it_next(const struct lu_env *env, struct dt_it *di)
530 {
531         struct dt_object *dt = (struct dt_object *)di;
532         struct osp_object *o = dt2osp_obj(dt);
533
534         if (o->opo_empty)
535                 return 1;
536
537         return 0;
538 }
539
540 static struct dt_key *osp_it_key(const struct lu_env *env,
541                                  const struct dt_it *di)
542 {
543         LBUG();
544         return NULL;
545 }
546
547 static int osp_it_key_size(const struct lu_env *env, const struct dt_it *di)
548 {
549         LBUG();
550         return 0;
551 }
552
553 static int osp_it_rec(const struct lu_env *env, const struct dt_it *di,
554                       struct dt_rec *lde, __u32 attr)
555 {
556         LBUG();
557         return 0;
558 }
559
560 static __u64 osp_it_store(const struct lu_env *env, const struct dt_it *di)
561 {
562         LBUG();
563         return 0;
564 }
565
566 static int osp_it_load(const struct lu_env *env, const struct dt_it *di,
567                        __u64 hash)
568 {
569         LBUG();
570         return 0;
571 }
572
573 static int osp_it_key_rec(const struct lu_env *env, const struct dt_it *di,
574                           void *key_rec)
575 {
576         LBUG();
577         return 0;
578 }
579
580 static const struct dt_index_operations osp_md_index_ops = {
581         .dio_lookup         = osp_md_index_lookup,
582         .dio_declare_insert = osp_md_declare_insert,
583         .dio_insert         = osp_md_index_insert,
584         .dio_declare_delete = osp_md_declare_delete,
585         .dio_delete         = osp_md_index_delete,
586         .dio_it     = {
587                 .init     = osp_it_init,
588                 .fini     = osp_it_fini,
589                 .get      = osp_it_get,
590                 .put      = osp_it_put,
591                 .next     = osp_it_next,
592                 .key      = osp_it_key,
593                 .key_size = osp_it_key_size,
594                 .rec      = osp_it_rec,
595                 .store    = osp_it_store,
596                 .load     = osp_it_load,
597                 .key_rec  = osp_it_key_rec,
598         }
599 };
600
601 static int osp_md_index_try(const struct lu_env *env,
602                             struct dt_object *dt,
603                             const struct dt_index_features *feat)
604 {
605         dt->do_index_ops = &osp_md_index_ops;
606         return 0;
607 }
608
609 static int osp_md_object_lock(const struct lu_env *env,
610                               struct dt_object *dt,
611                               struct lustre_handle *lh,
612                               struct ldlm_enqueue_info *einfo,
613                               void *policy)
614 {
615         struct osp_thread_info *info = osp_env_info(env);
616         struct ldlm_res_id     *res_id = &info->osi_resid;
617         struct dt_device       *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
618         struct osp_device      *osp = dt2osp_dev(dt_dev);
619         struct ptlrpc_request  *req = NULL;
620         int                     rc = 0;
621         __u64                   flags = 0;
622         ldlm_mode_t             mode;
623
624         fid_build_reg_res_name(lu_object_fid(&dt->do_lu), res_id);
625
626         mode = ldlm_lock_match(osp->opd_obd->obd_namespace,
627                                LDLM_FL_BLOCK_GRANTED, res_id,
628                                einfo->ei_type,
629                                (ldlm_policy_data_t *)policy,
630                                einfo->ei_mode, lh, 0);
631         if (mode > 0)
632                 return ELDLM_OK;
633
634         req = ldlm_enqueue_pack(osp->opd_exp, 0);
635         if (IS_ERR(req))
636                 RETURN(PTR_ERR(req));
637
638         rc = ldlm_cli_enqueue(osp->opd_exp, &req, einfo, res_id,
639                               (const ldlm_policy_data_t *)policy,
640                               &flags, NULL, 0, LVB_T_NONE, lh, 0);
641
642         ptlrpc_req_finished(req);
643
644         return rc == ELDLM_OK ? 0 : -EIO;
645 }
646
647 struct dt_object_operations osp_md_obj_ops = {
648         .do_read_lock         = osp_md_object_read_lock,
649         .do_write_lock        = osp_md_object_write_lock,
650         .do_read_unlock       = osp_md_object_read_unlock,
651         .do_write_unlock      = osp_md_object_write_unlock,
652         .do_write_locked      = osp_md_object_write_locked,
653         .do_declare_create    = osp_md_declare_object_create,
654         .do_create            = osp_md_object_create,
655         .do_declare_ref_add   = osp_md_declare_ref_add,
656         .do_ref_add           = osp_md_object_ref_add,
657         .do_declare_ref_del   = osp_md_declare_object_ref_del,
658         .do_ref_del           = osp_md_object_ref_del,
659         .do_declare_destroy   = osp_declare_object_destroy,
660         .do_destroy           = osp_object_destroy,
661         .do_ah_init           = osp_md_ah_init,
662         .do_attr_get          = osp_attr_get,
663         .do_declare_attr_set  = osp_md_declare_attr_set,
664         .do_attr_set          = osp_md_attr_set,
665         .do_xattr_get         = osp_xattr_get,
666         .do_declare_xattr_set = osp_declare_xattr_set,
667         .do_xattr_set         = osp_xattr_set,
668         .do_index_try         = osp_md_index_try,
669         .do_object_lock       = osp_md_object_lock,
670 };