Whamcloud - gitweb
LU-5180 lfsck: linkea for orphan
[fs/lustre-release.git] / lustre / osp / osp_md_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, Intel Corporation.
24  */
25 /*
26  * lustre/osp/osp_md_object.c
27  *
28  * Lustre MDT Proxy Device
29  *
30  * Author: Di Wang <di.wang@intel.com>
31  */
32
33 #define DEBUG_SUBSYSTEM S_MDS
34
35 #include <lustre_log.h>
36 #include "osp_internal.h"
37
38 static const char dot[] = ".";
39 static const char dotdot[] = "..";
40
41 int osp_md_declare_object_create(const struct lu_env *env,
42                                  struct dt_object *dt,
43                                  struct lu_attr *attr,
44                                  struct dt_allocation_hint *hint,
45                                  struct dt_object_format *dof,
46                                  struct thandle *th)
47 {
48         struct osp_thread_info          *osi = osp_env_info(env);
49         struct dt_update_request        *update;
50         struct lu_fid                   *fid1;
51         int                             sizes[2] = {sizeof(struct obdo), 0};
52         char                            *bufs[2] = {NULL, NULL};
53         int                             buf_count;
54         int                             rc;
55
56         update = out_find_create_update_loc(th, dt);
57         if (IS_ERR(update)) {
58                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
59                        dt->do_lu.lo_dev->ld_obd->obd_name,
60                        (int)PTR_ERR(update));
61                 return PTR_ERR(update);
62         }
63
64         osi->osi_obdo.o_valid = 0;
65         obdo_from_la(&osi->osi_obdo, attr, attr->la_valid);
66         lustre_set_wire_obdo(NULL, &osi->osi_obdo, &osi->osi_obdo);
67
68         bufs[0] = (char *)&osi->osi_obdo;
69         buf_count = 1;
70         fid1 = (struct lu_fid *)lu_object_fid(&dt->do_lu);
71         if (hint != NULL && hint->dah_parent) {
72                 struct lu_fid *fid2;
73
74                 fid2 = (struct lu_fid *)lu_object_fid(&hint->dah_parent->do_lu);
75                 sizes[1] = sizeof(*fid2);
76                 bufs[1] = (char *)fid2;
77                 buf_count++;
78         }
79
80         if (lu_object_exists(&dt->do_lu)) {
81                 /* If the object already exists, we needs to destroy
82                  * this orphan object first.
83                  *
84                  * The scenario might happen in this case
85                  *
86                  * 1. client send remote create to MDT0.
87                  * 2. MDT0 send create update to MDT1.
88                  * 3. MDT1 finished create synchronously.
89                  * 4. MDT0 failed and reboot.
90                  * 5. client resend remote create to MDT0.
91                  * 6. MDT0 tries to resend create update to MDT1,
92                  *    but find the object already exists
93                  */
94                 CDEBUG(D_HA, "%s: object "DFID" exists, destroy this orphan\n",
95                        dt->do_lu.lo_dev->ld_obd->obd_name, PFID(fid1));
96
97                 rc = out_insert_update(env, update, OUT_REF_DEL, fid1, 0,
98                                        NULL, NULL);
99                 if (rc != 0)
100                         GOTO(out, rc);
101
102                 if (S_ISDIR(lu_object_attr(&dt->do_lu))) {
103                         /* decrease for ".." */
104                         rc = out_insert_update(env, update, OUT_REF_DEL, fid1,
105                                                0, NULL, NULL);
106                         if (rc != 0)
107                                 GOTO(out, rc);
108                 }
109
110                 rc = out_insert_update(env, update, OUT_DESTROY, fid1, 0, NULL,
111                                        NULL);
112                 if (rc != 0)
113                         GOTO(out, rc);
114
115                 dt->do_lu.lo_header->loh_attr &= ~LOHA_EXISTS;
116                 /* Increase batchid to add this orphan object deletion
117                  * to separate transaction */
118                 update_inc_batchid(update);
119         }
120
121         rc = out_insert_update(env, update, OUT_CREATE, fid1, buf_count, sizes,
122                                (const char **)bufs);
123 out:
124         if (rc)
125                 CERROR("%s: Insert update error: rc = %d\n",
126                        dt->do_lu.lo_dev->ld_obd->obd_name, rc);
127
128         return rc;
129 }
130
131 int osp_md_object_create(const struct lu_env *env, struct dt_object *dt,
132                          struct lu_attr *attr, struct dt_allocation_hint *hint,
133                          struct dt_object_format *dof, struct thandle *th)
134 {
135         CDEBUG(D_INFO, "create object "DFID"\n",
136                PFID(&dt->do_lu.lo_header->loh_fid));
137
138         /* Because the create update RPC will be sent during declare phase,
139          * if creation reaches here, it means the object has been created
140          * successfully */
141         dt->do_lu.lo_header->loh_attr |= LOHA_EXISTS | (attr->la_mode & S_IFMT);
142
143         return 0;
144 }
145
146 static int osp_md_declare_object_ref_del(const struct lu_env *env,
147                                          struct dt_object *dt,
148                                          struct thandle *th)
149 {
150         struct dt_update_request        *update;
151         struct lu_fid                   *fid;
152         int                             rc;
153
154         update = out_find_create_update_loc(th, dt);
155         if (IS_ERR(update)) {
156                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
157                        dt->do_lu.lo_dev->ld_obd->obd_name,
158                       (int)PTR_ERR(update));
159                 return PTR_ERR(update);
160         }
161
162         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
163
164         rc = out_insert_update(env, update, OUT_REF_DEL, fid, 0, NULL, NULL);
165
166         return rc;
167 }
168
169 static int osp_md_object_ref_del(const struct lu_env *env,
170                                  struct dt_object *dt,
171                                  struct thandle *th)
172 {
173         CDEBUG(D_INFO, "ref del object "DFID"\n",
174                PFID(&dt->do_lu.lo_header->loh_fid));
175
176         return 0;
177 }
178
179 static int osp_md_declare_ref_add(const struct lu_env *env,
180                                   struct dt_object *dt, struct thandle *th)
181 {
182         struct dt_update_request        *update;
183         struct lu_fid                   *fid;
184         int                             rc;
185
186         update = out_find_create_update_loc(th, dt);
187         if (IS_ERR(update)) {
188                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
189                        dt->do_lu.lo_dev->ld_obd->obd_name,
190                        (int)PTR_ERR(update));
191                 return PTR_ERR(update);
192         }
193
194         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
195
196         rc = out_insert_update(env, update, OUT_REF_ADD, fid, 0, NULL, NULL);
197
198         return rc;
199 }
200
201 static int osp_md_object_ref_add(const struct lu_env *env,
202                                  struct dt_object *dt,
203                                  struct thandle *th)
204 {
205         CDEBUG(D_INFO, "ref add object "DFID"\n",
206                PFID(&dt->do_lu.lo_header->loh_fid));
207
208         return 0;
209 }
210
211 static void osp_md_ah_init(const struct lu_env *env,
212                            struct dt_allocation_hint *ah,
213                            struct dt_object *parent,
214                            struct dt_object *child,
215                            umode_t child_mode)
216 {
217         LASSERT(ah);
218
219         ah->dah_parent = parent;
220         ah->dah_mode = child_mode;
221 }
222
223 int osp_md_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
224                             const struct lu_attr *attr, struct thandle *th)
225 {
226         struct osp_thread_info          *osi = osp_env_info(env);
227         struct dt_update_request        *update;
228         struct lu_fid                   *fid;
229         int                             size = sizeof(struct obdo);
230         char                            *buf;
231         int                             rc;
232
233         update = out_find_create_update_loc(th, dt);
234         if (IS_ERR(update)) {
235                 CERROR("%s: Get OSP update buf failed: %d\n",
236                        dt->do_lu.lo_dev->ld_obd->obd_name,
237                        (int)PTR_ERR(update));
238                 return PTR_ERR(update);
239         }
240
241         osi->osi_obdo.o_valid = 0;
242         obdo_from_la(&osi->osi_obdo, (struct lu_attr *)attr,
243                      attr->la_valid);
244         lustre_set_wire_obdo(NULL, &osi->osi_obdo, &osi->osi_obdo);
245
246         buf = (char *)&osi->osi_obdo;
247         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
248
249         rc = out_insert_update(env, update, OUT_ATTR_SET, fid, 1, &size,
250                                (const char **)&buf);
251
252         return rc;
253 }
254
255 int osp_md_attr_set(const struct lu_env *env, struct dt_object *dt,
256                     const struct lu_attr *attr, struct thandle *th,
257                     struct lustre_capa *capa)
258 {
259         CDEBUG(D_INFO, "attr set object "DFID"\n",
260                PFID(&dt->do_lu.lo_header->loh_fid));
261
262         RETURN(0);
263 }
264
265 static void osp_md_object_read_lock(const struct lu_env *env,
266                                     struct dt_object *dt, unsigned role)
267 {
268         struct osp_object  *obj = dt2osp_obj(dt);
269
270         LASSERT(obj->opo_owner != env);
271         down_read_nested(&obj->opo_sem, role);
272
273         LASSERT(obj->opo_owner == NULL);
274 }
275
276 static void osp_md_object_write_lock(const struct lu_env *env,
277                                      struct dt_object *dt, unsigned role)
278 {
279         struct osp_object *obj = dt2osp_obj(dt);
280
281         down_write_nested(&obj->opo_sem, role);
282
283         LASSERT(obj->opo_owner == NULL);
284         obj->opo_owner = env;
285 }
286
287 static void osp_md_object_read_unlock(const struct lu_env *env,
288                                       struct dt_object *dt)
289 {
290         struct osp_object *obj = dt2osp_obj(dt);
291
292         up_read(&obj->opo_sem);
293 }
294
295 static void osp_md_object_write_unlock(const struct lu_env *env,
296                                        struct dt_object *dt)
297 {
298         struct osp_object *obj = dt2osp_obj(dt);
299
300         LASSERT(obj->opo_owner == env);
301         obj->opo_owner = NULL;
302         up_write(&obj->opo_sem);
303 }
304
305 static int osp_md_object_write_locked(const struct lu_env *env,
306                                       struct dt_object *dt)
307 {
308         struct osp_object *obj = dt2osp_obj(dt);
309
310         return obj->opo_owner == env;
311 }
312
313 static int osp_md_index_lookup(const struct lu_env *env, struct dt_object *dt,
314                                struct dt_rec *rec, const struct dt_key *key,
315                                struct lustre_capa *capa)
316 {
317         struct lu_buf           *lbuf   = &osp_env_info(env)->osi_lb2;
318         struct osp_device       *osp    = lu2osp_dev(dt->do_lu.lo_dev);
319         struct dt_device        *dt_dev = &osp->opd_dt_dev;
320         struct dt_update_request   *update;
321         struct object_update_reply *reply;
322         struct ptlrpc_request      *req = NULL;
323         int                        size = strlen((char *)key) + 1;
324         struct lu_fid              *fid;
325         int                        rc;
326         ENTRY;
327
328         /* Because it needs send the update buffer right away,
329          * just create an update buffer, instead of attaching the
330          * update_remote list of the thandle.
331          */
332         update = out_create_update_req(dt_dev);
333         if (IS_ERR(update))
334                 RETURN(PTR_ERR(update));
335
336         rc = out_insert_update(env, update, OUT_INDEX_LOOKUP,
337                                lu_object_fid(&dt->do_lu),
338                                1, &size, (const char **)&key);
339         if (rc) {
340                 CERROR("%s: Insert update error: rc = %d\n",
341                        dt_dev->dd_lu_dev.ld_obd->obd_name, rc);
342                 GOTO(out, rc);
343         }
344
345         rc = out_remote_sync(env, osp->opd_obd->u.cli.cl_import, update, &req);
346         if (rc < 0)
347                 GOTO(out, rc);
348
349         reply = req_capsule_server_sized_get(&req->rq_pill,
350                                              &RMF_OUT_UPDATE_REPLY,
351                                              OUT_UPDATE_REPLY_SIZE);
352         if (reply->ourp_magic != UPDATE_REPLY_MAGIC) {
353                 CERROR("%s: Wrong version %x expected %x: rc = %d\n",
354                        dt_dev->dd_lu_dev.ld_obd->obd_name,
355                        reply->ourp_magic, UPDATE_REPLY_MAGIC, -EPROTO);
356                 GOTO(out, rc = -EPROTO);
357         }
358
359         rc = object_update_result_data_get(reply, lbuf, 0);
360         if (rc < 0)
361                 GOTO(out, rc = size);
362
363         if (lbuf->lb_len != sizeof(*fid)) {
364                 CERROR("%s: lookup "DFID" %s wrong size %d\n",
365                        dt_dev->dd_lu_dev.ld_obd->obd_name,
366                        PFID(lu_object_fid(&dt->do_lu)), (char *)key,
367                        (int)lbuf->lb_len);
368                 GOTO(out, rc = -EINVAL);
369         }
370
371         fid = lbuf->lb_buf;
372         if (ptlrpc_rep_need_swab(req))
373                 lustre_swab_lu_fid(fid);
374         if (!fid_is_sane(fid)) {
375                 CERROR("%s: lookup "DFID" %s invalid fid "DFID"\n",
376                        dt_dev->dd_lu_dev.ld_obd->obd_name,
377                        PFID(lu_object_fid(&dt->do_lu)), (char *)key, PFID(fid));
378                 GOTO(out, rc = -EINVAL);
379         }
380
381         memcpy(rec, fid, sizeof(*fid));
382
383         GOTO(out, rc = 1);
384
385 out:
386         if (req != NULL)
387                 ptlrpc_req_finished(req);
388
389         out_destroy_update_req(update);
390
391         return rc;
392 }
393
394 static int osp_md_declare_insert(const struct lu_env *env,
395                                  struct dt_object *dt,
396                                  const struct dt_rec *rec,
397                                  const struct dt_key *key,
398                                  struct thandle *th)
399 {
400         struct dt_update_request *update;
401         struct lu_fid            *fid;
402         struct lu_fid            *rec_fid = (struct lu_fid *)rec;
403         int                      size[2] = {strlen((char *)key) + 1,
404                                                   sizeof(*rec_fid)};
405         const char               *bufs[2] = {(char *)key, (char *)rec_fid};
406         int                      rc;
407
408         update = out_find_create_update_loc(th, dt);
409         if (IS_ERR(update)) {
410                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
411                        dt->do_lu.lo_dev->ld_obd->obd_name,
412                        (int)PTR_ERR(update));
413                 return PTR_ERR(update);
414         }
415
416         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
417
418         CDEBUG(D_INFO, "%s: insert index of "DFID" %s: "DFID"\n",
419                dt->do_lu.lo_dev->ld_obd->obd_name,
420                PFID(fid), (char *)key, PFID(rec_fid));
421
422         fid_cpu_to_le(rec_fid, rec_fid);
423
424         rc = out_insert_update(env, update, OUT_INDEX_INSERT, fid,
425                                ARRAY_SIZE(size), size, bufs);
426         return rc;
427 }
428
429 static int osp_md_index_insert(const struct lu_env *env,
430                                struct dt_object *dt,
431                                const struct dt_rec *rec,
432                                const struct dt_key *key,
433                                struct thandle *th,
434                                struct lustre_capa *capa,
435                                int ignore_quota)
436 {
437         return 0;
438 }
439
440 static int osp_md_declare_delete(const struct lu_env *env,
441                                  struct dt_object *dt,
442                                  const struct dt_key *key,
443                                  struct thandle *th)
444 {
445         struct dt_update_request *update;
446         struct lu_fid *fid;
447         int size = strlen((char *)key) + 1;
448         int rc;
449
450         update = out_find_create_update_loc(th, dt);
451         if (IS_ERR(update)) {
452                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
453                        dt->do_lu.lo_dev->ld_obd->obd_name,
454                        (int)PTR_ERR(update));
455                 return PTR_ERR(update);
456         }
457
458         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
459
460         rc = out_insert_update(env, update, OUT_INDEX_DELETE, fid, 1, &size,
461                                (const char **)&key);
462
463         return rc;
464 }
465
466 static int osp_md_index_delete(const struct lu_env *env,
467                                struct dt_object *dt,
468                                const struct dt_key *key,
469                                struct thandle *th,
470                                struct lustre_capa *capa)
471 {
472         CDEBUG(D_INFO, "index delete "DFID" %s\n",
473                PFID(&dt->do_lu.lo_header->loh_fid), (char *)key);
474
475         return 0;
476 }
477
478 int osp_md_index_it_next(const struct lu_env *env, struct dt_it *di)
479 {
480         struct osp_it           *it = (struct osp_it *)di;
481         struct lu_idxpage       *idxpage;
482         struct lu_dirent        *ent = (struct lu_dirent *)it->ooi_ent;
483         int                     rc;
484         ENTRY;
485
486 again:
487         idxpage = it->ooi_cur_idxpage;
488         if (idxpage != NULL) {
489                 if (idxpage->lip_nr == 0)
490                         RETURN(1);
491
492                 it->ooi_pos_ent++;
493                 if (ent == NULL) {
494                         it->ooi_ent =
495                               (struct lu_dirent *)idxpage->lip_entries;
496                         RETURN(0);
497                 } else if (le16_to_cpu(ent->lde_reclen) != 0 &&
498                            it->ooi_pos_ent < idxpage->lip_nr) {
499                         ent = (struct lu_dirent *)(((char *)ent) +
500                                         le16_to_cpu(ent->lde_reclen));
501                         it->ooi_ent = ent;
502                         RETURN(0);
503                 } else {
504                         it->ooi_ent = NULL;
505                 }
506         }
507
508         rc = osp_it_next_page(env, di);
509         if (rc == 0)
510                 goto again;
511
512         RETURN(rc);
513 }
514
515 static struct dt_key *osp_it_key(const struct lu_env *env,
516                                  const struct dt_it *di)
517 {
518         struct osp_it           *it = (struct osp_it *)di;
519         struct lu_dirent        *ent = (struct lu_dirent *)it->ooi_ent;
520
521         return (struct dt_key *)ent->lde_name;
522 }
523
524 static int osp_it_key_size(const struct lu_env *env, const struct dt_it *di)
525 {
526         struct osp_it           *it = (struct osp_it *)di;
527         struct lu_dirent        *ent = (struct lu_dirent *)it->ooi_ent;
528
529         return (int)le16_to_cpu(ent->lde_namelen);
530 }
531
532 static int osp_md_index_it_rec(const struct lu_env *env, const struct dt_it *di,
533                                struct dt_rec *rec, __u32 attr)
534 {
535         struct osp_it           *it = (struct osp_it *)di;
536         struct lu_dirent        *ent = (struct lu_dirent *)it->ooi_ent;
537         int                     reclen;
538
539         reclen = lu_dirent_calc_size(le16_to_cpu(ent->lde_namelen), attr);
540         memcpy(rec, ent, reclen);
541         return 0;
542 }
543
544 /**
545  * Locate the iteration cursor to the specified position (cookie).
546  *
547  * \param[in] env       pointer to the thread context
548  * \param[in] di        pointer to the iteration structure
549  * \param[in] hash      the specified position
550  *
551  * \retval              positive number for locating to the exactly position
552  *                      or the next
553  * \retval              0 for arriving at the end of the iteration
554  * \retval              negative error number on failure
555  */
556 static int osp_it_load(const struct lu_env *env, const struct dt_it *di,
557                        __u64 hash)
558 {
559         struct osp_it   *it     = (struct osp_it *)di;
560         int              rc;
561
562         it->ooi_next = hash;
563         rc = osp_md_index_it_next(env, (struct dt_it *)di);
564         if (rc == 1)
565                 return 0;
566
567         if (rc == 0)
568                 return 1;
569
570         return rc;
571 }
572
573 const struct dt_index_operations osp_md_index_ops = {
574         .dio_lookup         = osp_md_index_lookup,
575         .dio_declare_insert = osp_md_declare_insert,
576         .dio_insert         = osp_md_index_insert,
577         .dio_declare_delete = osp_md_declare_delete,
578         .dio_delete         = osp_md_index_delete,
579         .dio_it     = {
580                 .init     = osp_it_init,
581                 .fini     = osp_it_fini,
582                 .get      = osp_it_get,
583                 .put      = osp_it_put,
584                 .next     = osp_md_index_it_next,
585                 .key      = osp_it_key,
586                 .key_size = osp_it_key_size,
587                 .rec      = osp_md_index_it_rec,
588                 .store    = osp_it_store,
589                 .load     = osp_it_load,
590                 .key_rec  = osp_it_key_rec,
591         }
592 };
593
594 static int osp_md_index_try(const struct lu_env *env,
595                             struct dt_object *dt,
596                             const struct dt_index_features *feat)
597 {
598         dt->do_index_ops = &osp_md_index_ops;
599         return 0;
600 }
601
602 static int osp_md_object_lock(const struct lu_env *env,
603                               struct dt_object *dt,
604                               struct lustre_handle *lh,
605                               struct ldlm_enqueue_info *einfo,
606                               ldlm_policy_data_t *policy)
607 {
608         struct ldlm_res_id      *res_id;
609         struct dt_device        *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
610         struct osp_device       *osp = dt2osp_dev(dt_dev);
611         struct ptlrpc_request   *req;
612         int                     rc = 0;
613         __u64                   flags = 0;
614         ldlm_mode_t             mode;
615
616         res_id = einfo->ei_res_id;
617         LASSERT(res_id != NULL);
618
619         mode = ldlm_lock_match(osp->opd_obd->obd_namespace,
620                                LDLM_FL_BLOCK_GRANTED, res_id,
621                                einfo->ei_type, policy,
622                                einfo->ei_mode, lh, 0);
623         if (mode > 0)
624                 return ELDLM_OK;
625
626         req = ldlm_enqueue_pack(osp->opd_exp, 0);
627         if (IS_ERR(req))
628                 RETURN(PTR_ERR(req));
629
630         rc = ldlm_cli_enqueue(osp->opd_exp, &req, einfo, res_id,
631                               (const ldlm_policy_data_t *)policy,
632                               &flags, NULL, 0, LVB_T_NONE, lh, 0);
633
634         ptlrpc_req_finished(req);
635
636         return rc == ELDLM_OK ? 0 : -EIO;
637 }
638
639 static int osp_md_object_unlock(const struct lu_env *env,
640                                 struct dt_object *dt,
641                                 struct ldlm_enqueue_info *einfo,
642                                 ldlm_policy_data_t *policy)
643 {
644         struct lustre_handle    *lockh = einfo->ei_cbdata;
645
646         /* unlock finally */
647         ldlm_lock_decref(lockh, einfo->ei_mode);
648
649         return 0;
650 }
651
652 struct dt_object_operations osp_md_obj_ops = {
653         .do_read_lock         = osp_md_object_read_lock,
654         .do_write_lock        = osp_md_object_write_lock,
655         .do_read_unlock       = osp_md_object_read_unlock,
656         .do_write_unlock      = osp_md_object_write_unlock,
657         .do_write_locked      = osp_md_object_write_locked,
658         .do_declare_create    = osp_md_declare_object_create,
659         .do_create            = osp_md_object_create,
660         .do_declare_ref_add   = osp_md_declare_ref_add,
661         .do_ref_add           = osp_md_object_ref_add,
662         .do_declare_ref_del   = osp_md_declare_object_ref_del,
663         .do_ref_del           = osp_md_object_ref_del,
664         .do_declare_destroy   = osp_declare_object_destroy,
665         .do_destroy           = osp_object_destroy,
666         .do_ah_init           = osp_md_ah_init,
667         .do_attr_get          = osp_attr_get,
668         .do_declare_attr_set  = osp_md_declare_attr_set,
669         .do_attr_set          = osp_md_attr_set,
670         .do_xattr_get         = osp_xattr_get,
671         .do_declare_xattr_set = osp_declare_xattr_set,
672         .do_xattr_set         = osp_xattr_set,
673         .do_declare_xattr_del = osp_declare_xattr_del,
674         .do_xattr_del         = osp_xattr_del,
675         .do_index_try         = osp_md_index_try,
676         .do_object_lock       = osp_md_object_lock,
677         .do_object_unlock     = osp_md_object_unlock,
678 };
679
680 static ssize_t osp_md_declare_write(const struct lu_env *env,
681                                     struct dt_object *dt,
682                                     const struct lu_buf *buf,
683                                     loff_t pos, struct thandle *th)
684 {
685         struct dt_update_request  *update;
686         struct lu_fid             *fid;
687         int                       sizes[2] = {buf->lb_len, sizeof(pos)};
688         const char                *bufs[2] = {(char *)buf->lb_buf,
689                                               (char *)&pos};
690         ssize_t                   rc;
691
692         update = out_find_create_update_loc(th, dt);
693         if (IS_ERR(update)) {
694                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
695                        dt->do_lu.lo_dev->ld_obd->obd_name,
696                        (int)PTR_ERR(update));
697                 return PTR_ERR(update);
698         }
699
700         pos = cpu_to_le64(pos);
701         bufs[1] = (char *)&pos;
702         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
703         rc = out_insert_update(env, update, OUT_WRITE, fid,
704                                ARRAY_SIZE(sizes), sizes, bufs);
705
706         return rc;
707
708 }
709
710 static ssize_t osp_md_write(const struct lu_env *env, struct dt_object *dt,
711                             const struct lu_buf *buf, loff_t *pos,
712                             struct thandle *handle,
713                             struct lustre_capa *capa, int ignore_quota)
714 {
715         return buf->lb_len;
716 }
717
718 /* These body operation will be used to write symlinks during migration etc */
719 struct dt_body_operations osp_md_body_ops = {
720         .dbo_declare_write      = osp_md_declare_write,
721         .dbo_write              = osp_md_write,
722 };