Whamcloud - gitweb
LU-4621 ofd: create same count of objects
[fs/lustre-release.git] / lustre / osp / osp_md_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, Intel Corporation.
24  */
25 /*
26  * lustre/osp/osp_md_object.c
27  *
28  * Lustre MDT Proxy Device
29  *
30  * Author: Di Wang <di.wang@intel.com>
31  */
32
33 #define DEBUG_SUBSYSTEM S_MDS
34
35 #include <lustre_log.h>
36 #include "osp_internal.h"
37
38 static const char dot[] = ".";
39 static const char dotdot[] = "..";
40
41 int osp_md_declare_object_create(const struct lu_env *env,
42                                  struct dt_object *dt,
43                                  struct lu_attr *attr,
44                                  struct dt_allocation_hint *hint,
45                                  struct dt_object_format *dof,
46                                  struct thandle *th)
47 {
48         struct osp_thread_info          *osi = osp_env_info(env);
49         struct dt_update_request        *update;
50         struct lu_fid                   *fid1;
51         int                             sizes[2] = {sizeof(struct obdo), 0};
52         char                            *bufs[2] = {NULL, NULL};
53         int                             buf_count;
54         int                             rc;
55
56         update = out_find_create_update_loc(th, dt);
57         if (IS_ERR(update)) {
58                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
59                        dt->do_lu.lo_dev->ld_obd->obd_name,
60                        (int)PTR_ERR(update));
61                 return PTR_ERR(update);
62         }
63
64         osi->osi_obdo.o_valid = 0;
65         obdo_from_la(&osi->osi_obdo, attr, attr->la_valid);
66         lustre_set_wire_obdo(NULL, &osi->osi_obdo, &osi->osi_obdo);
67
68         bufs[0] = (char *)&osi->osi_obdo;
69         buf_count = 1;
70         fid1 = (struct lu_fid *)lu_object_fid(&dt->do_lu);
71         if (hint != NULL && hint->dah_parent) {
72                 struct lu_fid *fid2;
73
74                 fid2 = (struct lu_fid *)lu_object_fid(&hint->dah_parent->do_lu);
75                 sizes[1] = sizeof(*fid2);
76                 bufs[1] = (char *)fid2;
77                 buf_count++;
78         }
79
80         if (lu_object_exists(&dt->do_lu)) {
81                 /* If the object already exists, we needs to destroy
82                  * this orphan object first.
83                  *
84                  * The scenario might happen in this case
85                  *
86                  * 1. client send remote create to MDT0.
87                  * 2. MDT0 send create update to MDT1.
88                  * 3. MDT1 finished create synchronously.
89                  * 4. MDT0 failed and reboot.
90                  * 5. client resend remote create to MDT0.
91                  * 6. MDT0 tries to resend create update to MDT1,
92                  *    but find the object already exists
93                  */
94                 CDEBUG(D_HA, "%s: object "DFID" exists, destroy this orphan\n",
95                        dt->do_lu.lo_dev->ld_obd->obd_name, PFID(fid1));
96
97                 rc = out_insert_update(env, update, OUT_REF_DEL, fid1, 0,
98                                        NULL, NULL);
99                 if (rc != 0)
100                         GOTO(out, rc);
101
102                 if (S_ISDIR(lu_object_attr(&dt->do_lu))) {
103                         /* decrease for ".." */
104                         rc = out_insert_update(env, update, OUT_REF_DEL, fid1,
105                                                0, NULL, NULL);
106                         if (rc != 0)
107                                 GOTO(out, rc);
108                 }
109
110                 rc = out_insert_update(env, update, OUT_DESTROY, fid1, 0, NULL,
111                                        NULL);
112                 if (rc != 0)
113                         GOTO(out, rc);
114
115                 dt->do_lu.lo_header->loh_attr &= ~LOHA_EXISTS;
116                 /* Increase batchid to add this orphan object deletion
117                  * to separate transaction */
118                 update_inc_batchid(update);
119         }
120
121         rc = out_insert_update(env, update, OUT_CREATE, fid1, buf_count, sizes,
122                                (const char **)bufs);
123 out:
124         if (rc)
125                 CERROR("%s: Insert update error: rc = %d\n",
126                        dt->do_lu.lo_dev->ld_obd->obd_name, rc);
127
128         return rc;
129 }
130
131 int osp_md_object_create(const struct lu_env *env, struct dt_object *dt,
132                          struct lu_attr *attr, struct dt_allocation_hint *hint,
133                          struct dt_object_format *dof, struct thandle *th)
134 {
135         CDEBUG(D_INFO, "create object "DFID"\n",
136                PFID(&dt->do_lu.lo_header->loh_fid));
137
138         /* Because the create update RPC will be sent during declare phase,
139          * if creation reaches here, it means the object has been created
140          * successfully */
141         dt->do_lu.lo_header->loh_attr |= LOHA_EXISTS | (attr->la_mode & S_IFMT);
142
143         return 0;
144 }
145
146 static int osp_md_declare_object_ref_del(const struct lu_env *env,
147                                          struct dt_object *dt,
148                                          struct thandle *th)
149 {
150         struct dt_update_request        *update;
151         struct lu_fid                   *fid;
152         int                             rc;
153
154         update = out_find_create_update_loc(th, dt);
155         if (IS_ERR(update)) {
156                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
157                        dt->do_lu.lo_dev->ld_obd->obd_name,
158                       (int)PTR_ERR(update));
159                 return PTR_ERR(update);
160         }
161
162         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
163
164         rc = out_insert_update(env, update, OUT_REF_DEL, fid, 0, NULL, NULL);
165
166         return rc;
167 }
168
169 static int osp_md_object_ref_del(const struct lu_env *env,
170                                  struct dt_object *dt,
171                                  struct thandle *th)
172 {
173         CDEBUG(D_INFO, "ref del object "DFID"\n",
174                PFID(&dt->do_lu.lo_header->loh_fid));
175
176         return 0;
177 }
178
179 static int osp_md_declare_ref_add(const struct lu_env *env,
180                                   struct dt_object *dt, struct thandle *th)
181 {
182         struct dt_update_request        *update;
183         struct lu_fid                   *fid;
184         int                             rc;
185
186         update = out_find_create_update_loc(th, dt);
187         if (IS_ERR(update)) {
188                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
189                        dt->do_lu.lo_dev->ld_obd->obd_name,
190                        (int)PTR_ERR(update));
191                 return PTR_ERR(update);
192         }
193
194         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
195
196         rc = out_insert_update(env, update, OUT_REF_ADD, fid, 0, NULL, NULL);
197
198         return rc;
199 }
200
201 static int osp_md_object_ref_add(const struct lu_env *env,
202                                  struct dt_object *dt,
203                                  struct thandle *th)
204 {
205         CDEBUG(D_INFO, "ref add object "DFID"\n",
206                PFID(&dt->do_lu.lo_header->loh_fid));
207
208         return 0;
209 }
210
211 static void osp_md_ah_init(const struct lu_env *env,
212                            struct dt_allocation_hint *ah,
213                            struct dt_object *parent,
214                            struct dt_object *child,
215                            umode_t child_mode)
216 {
217         LASSERT(ah);
218
219         ah->dah_parent = parent;
220         ah->dah_mode = child_mode;
221 }
222
223 int osp_md_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
224                             const struct lu_attr *attr, struct thandle *th)
225 {
226         struct osp_thread_info          *osi = osp_env_info(env);
227         struct dt_update_request        *update;
228         struct lu_fid                   *fid;
229         int                             size = sizeof(struct obdo);
230         char                            *buf;
231         int                             rc;
232
233         update = out_find_create_update_loc(th, dt);
234         if (IS_ERR(update)) {
235                 CERROR("%s: Get OSP update buf failed: %d\n",
236                        dt->do_lu.lo_dev->ld_obd->obd_name,
237                        (int)PTR_ERR(update));
238                 return PTR_ERR(update);
239         }
240
241         osi->osi_obdo.o_valid = 0;
242         obdo_from_la(&osi->osi_obdo, (struct lu_attr *)attr,
243                      attr->la_valid);
244         lustre_set_wire_obdo(NULL, &osi->osi_obdo, &osi->osi_obdo);
245
246         buf = (char *)&osi->osi_obdo;
247         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
248
249         rc = out_insert_update(env, update, OUT_ATTR_SET, fid, 1, &size,
250                                (const char **)&buf);
251
252         return rc;
253 }
254
255 int osp_md_attr_set(const struct lu_env *env, struct dt_object *dt,
256                     const struct lu_attr *attr, struct thandle *th,
257                     struct lustre_capa *capa)
258 {
259         CDEBUG(D_INFO, "attr set object "DFID"\n",
260                PFID(&dt->do_lu.lo_header->loh_fid));
261
262         RETURN(0);
263 }
264
265 static void osp_md_object_read_lock(const struct lu_env *env,
266                                     struct dt_object *dt, unsigned role)
267 {
268         struct osp_object  *obj = dt2osp_obj(dt);
269
270         LASSERT(obj->opo_owner != env);
271         down_read_nested(&obj->opo_sem, role);
272
273         LASSERT(obj->opo_owner == NULL);
274 }
275
276 static void osp_md_object_write_lock(const struct lu_env *env,
277                                      struct dt_object *dt, unsigned role)
278 {
279         struct osp_object *obj = dt2osp_obj(dt);
280
281         down_write_nested(&obj->opo_sem, role);
282
283         LASSERT(obj->opo_owner == NULL);
284         obj->opo_owner = env;
285 }
286
287 static void osp_md_object_read_unlock(const struct lu_env *env,
288                                       struct dt_object *dt)
289 {
290         struct osp_object *obj = dt2osp_obj(dt);
291
292         up_read(&obj->opo_sem);
293 }
294
295 static void osp_md_object_write_unlock(const struct lu_env *env,
296                                        struct dt_object *dt)
297 {
298         struct osp_object *obj = dt2osp_obj(dt);
299
300         LASSERT(obj->opo_owner == env);
301         obj->opo_owner = NULL;
302         up_write(&obj->opo_sem);
303 }
304
305 static int osp_md_object_write_locked(const struct lu_env *env,
306                                       struct dt_object *dt)
307 {
308         struct osp_object *obj = dt2osp_obj(dt);
309
310         return obj->opo_owner == env;
311 }
312
313 static int osp_md_index_lookup(const struct lu_env *env, struct dt_object *dt,
314                                struct dt_rec *rec, const struct dt_key *key,
315                                struct lustre_capa *capa)
316 {
317         struct lu_buf           *lbuf   = &osp_env_info(env)->osi_lb2;
318         struct osp_device       *osp    = lu2osp_dev(dt->do_lu.lo_dev);
319         struct dt_device        *dt_dev = &osp->opd_dt_dev;
320         struct dt_update_request   *update;
321         struct object_update_reply *reply;
322         struct ptlrpc_request      *req = NULL;
323         int                        size = strlen((char *)key) + 1;
324         struct lu_fid              *fid;
325         int                        rc;
326         ENTRY;
327
328         /* Because it needs send the update buffer right away,
329          * just create an update buffer, instead of attaching the
330          * update_remote list of the thandle.
331          */
332         update = out_create_update_req(dt_dev);
333         if (IS_ERR(update))
334                 RETURN(PTR_ERR(update));
335
336         rc = out_insert_update(env, update, OUT_INDEX_LOOKUP,
337                                lu_object_fid(&dt->do_lu),
338                                1, &size, (const char **)&key);
339         if (rc) {
340                 CERROR("%s: Insert update error: rc = %d\n",
341                        dt_dev->dd_lu_dev.ld_obd->obd_name, rc);
342                 GOTO(out, rc);
343         }
344
345         rc = out_remote_sync(env, osp->opd_obd->u.cli.cl_import, update, &req);
346         if (rc < 0)
347                 GOTO(out, rc);
348
349         reply = req_capsule_server_sized_get(&req->rq_pill,
350                                              &RMF_OUT_UPDATE_REPLY,
351                                              OUT_UPDATE_REPLY_SIZE);
352         if (reply->ourp_magic != UPDATE_REPLY_MAGIC) {
353                 CERROR("%s: Wrong version %x expected %x: rc = %d\n",
354                        dt_dev->dd_lu_dev.ld_obd->obd_name,
355                        reply->ourp_magic, UPDATE_REPLY_MAGIC, -EPROTO);
356                 GOTO(out, rc = -EPROTO);
357         }
358
359         rc = object_update_result_data_get(reply, lbuf, 0);
360         if (rc < 0)
361                 GOTO(out, rc = size);
362
363         if (lbuf->lb_len != sizeof(*fid)) {
364                 CERROR("%s: lookup "DFID" %s wrong size %d\n",
365                        dt_dev->dd_lu_dev.ld_obd->obd_name,
366                        PFID(lu_object_fid(&dt->do_lu)), (char *)key,
367                        (int)lbuf->lb_len);
368                 GOTO(out, rc = -EINVAL);
369         }
370
371         fid = lbuf->lb_buf;
372         if (ptlrpc_rep_need_swab(req))
373                 lustre_swab_lu_fid(fid);
374         if (!fid_is_sane(fid)) {
375                 CERROR("%s: lookup "DFID" %s invalid fid "DFID"\n",
376                        dt_dev->dd_lu_dev.ld_obd->obd_name,
377                        PFID(lu_object_fid(&dt->do_lu)), (char *)key, PFID(fid));
378                 GOTO(out, rc = -EINVAL);
379         }
380
381         memcpy(rec, fid, sizeof(*fid));
382
383         GOTO(out, rc = 1);
384
385 out:
386         if (req != NULL)
387                 ptlrpc_req_finished(req);
388
389         out_destroy_update_req(update);
390
391         return rc;
392 }
393
394 static int osp_md_declare_insert(const struct lu_env *env,
395                                  struct dt_object *dt,
396                                  const struct dt_rec *rec,
397                                  const struct dt_key *key,
398                                  struct thandle *th)
399 {
400         struct osp_thread_info     *info = osp_env_info(env);
401         struct dt_update_request   *update;
402         struct dt_insert_rec       *rec1 = (struct dt_insert_rec *)rec;
403         struct lu_fid              *fid =
404                                 (struct lu_fid *)lu_object_fid(&dt->do_lu);
405         struct lu_fid              *rec_fid = &info->osi_fid;
406         __u32                       type = cpu_to_le32(rec1->rec_type);
407         int                         size[3] = { strlen((char *)key) + 1,
408                                                 sizeof(*rec_fid),
409                                                 sizeof(type) };
410         const char                 *bufs[3] = { (char *)key,
411                                                 (char *)rec_fid,
412                                                 (char *)&type };
413         int                         rc;
414
415         update = out_find_create_update_loc(th, dt);
416         if (IS_ERR(update)) {
417                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
418                        dt->do_lu.lo_dev->ld_obd->obd_name,
419                        (int)PTR_ERR(update));
420                 return PTR_ERR(update);
421         }
422
423         CDEBUG(D_INFO, "%s: insert index of "DFID" %s: "DFID", %u\n",
424                dt->do_lu.lo_dev->ld_obd->obd_name,
425                PFID(fid), (char *)key, PFID(rec1->rec_fid), rec1->rec_type);
426
427         fid_cpu_to_le(rec_fid, rec1->rec_fid);
428         rc = out_insert_update(env, update, OUT_INDEX_INSERT, fid,
429                                ARRAY_SIZE(size), size, bufs);
430         return rc;
431 }
432
433 static int osp_md_index_insert(const struct lu_env *env,
434                                struct dt_object *dt,
435                                const struct dt_rec *rec,
436                                const struct dt_key *key,
437                                struct thandle *th,
438                                struct lustre_capa *capa,
439                                int ignore_quota)
440 {
441         return 0;
442 }
443
444 static int osp_md_declare_delete(const struct lu_env *env,
445                                  struct dt_object *dt,
446                                  const struct dt_key *key,
447                                  struct thandle *th)
448 {
449         struct dt_update_request *update;
450         struct lu_fid *fid;
451         int size = strlen((char *)key) + 1;
452         int rc;
453
454         update = out_find_create_update_loc(th, dt);
455         if (IS_ERR(update)) {
456                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
457                        dt->do_lu.lo_dev->ld_obd->obd_name,
458                        (int)PTR_ERR(update));
459                 return PTR_ERR(update);
460         }
461
462         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
463
464         rc = out_insert_update(env, update, OUT_INDEX_DELETE, fid, 1, &size,
465                                (const char **)&key);
466
467         return rc;
468 }
469
470 static int osp_md_index_delete(const struct lu_env *env,
471                                struct dt_object *dt,
472                                const struct dt_key *key,
473                                struct thandle *th,
474                                struct lustre_capa *capa)
475 {
476         CDEBUG(D_INFO, "index delete "DFID" %s\n",
477                PFID(&dt->do_lu.lo_header->loh_fid), (char *)key);
478
479         return 0;
480 }
481
482 int osp_md_index_it_next(const struct lu_env *env, struct dt_it *di)
483 {
484         struct osp_it           *it = (struct osp_it *)di;
485         struct lu_idxpage       *idxpage;
486         struct lu_dirent        *ent = (struct lu_dirent *)it->ooi_ent;
487         int                     rc;
488         ENTRY;
489
490 again:
491         idxpage = it->ooi_cur_idxpage;
492         if (idxpage != NULL) {
493                 if (idxpage->lip_nr == 0)
494                         RETURN(1);
495
496                 it->ooi_pos_ent++;
497                 if (ent == NULL) {
498                         it->ooi_ent =
499                               (struct lu_dirent *)idxpage->lip_entries;
500                         RETURN(0);
501                 } else if (le16_to_cpu(ent->lde_reclen) != 0 &&
502                            it->ooi_pos_ent < idxpage->lip_nr) {
503                         ent = (struct lu_dirent *)(((char *)ent) +
504                                         le16_to_cpu(ent->lde_reclen));
505                         it->ooi_ent = ent;
506                         RETURN(0);
507                 } else {
508                         it->ooi_ent = NULL;
509                 }
510         }
511
512         rc = osp_it_next_page(env, di);
513         if (rc == 0)
514                 goto again;
515
516         RETURN(rc);
517 }
518
519 static struct dt_key *osp_it_key(const struct lu_env *env,
520                                  const struct dt_it *di)
521 {
522         struct osp_it           *it = (struct osp_it *)di;
523         struct lu_dirent        *ent = (struct lu_dirent *)it->ooi_ent;
524
525         return (struct dt_key *)ent->lde_name;
526 }
527
528 static int osp_it_key_size(const struct lu_env *env, const struct dt_it *di)
529 {
530         struct osp_it           *it = (struct osp_it *)di;
531         struct lu_dirent        *ent = (struct lu_dirent *)it->ooi_ent;
532
533         return (int)le16_to_cpu(ent->lde_namelen);
534 }
535
536 static int osp_md_index_it_rec(const struct lu_env *env, const struct dt_it *di,
537                                struct dt_rec *rec, __u32 attr)
538 {
539         struct osp_it           *it = (struct osp_it *)di;
540         struct lu_dirent        *ent = (struct lu_dirent *)it->ooi_ent;
541         int                     reclen;
542
543         reclen = lu_dirent_calc_size(le16_to_cpu(ent->lde_namelen), attr);
544         memcpy(rec, ent, reclen);
545         return 0;
546 }
547
548 /**
549  * Locate the iteration cursor to the specified position (cookie).
550  *
551  * \param[in] env       pointer to the thread context
552  * \param[in] di        pointer to the iteration structure
553  * \param[in] hash      the specified position
554  *
555  * \retval              positive number for locating to the exactly position
556  *                      or the next
557  * \retval              0 for arriving at the end of the iteration
558  * \retval              negative error number on failure
559  */
560 static int osp_it_load(const struct lu_env *env, const struct dt_it *di,
561                        __u64 hash)
562 {
563         struct osp_it   *it     = (struct osp_it *)di;
564         int              rc;
565
566         it->ooi_next = hash;
567         rc = osp_md_index_it_next(env, (struct dt_it *)di);
568         if (rc == 1)
569                 return 0;
570
571         if (rc == 0)
572                 return 1;
573
574         return rc;
575 }
576
577 const struct dt_index_operations osp_md_index_ops = {
578         .dio_lookup         = osp_md_index_lookup,
579         .dio_declare_insert = osp_md_declare_insert,
580         .dio_insert         = osp_md_index_insert,
581         .dio_declare_delete = osp_md_declare_delete,
582         .dio_delete         = osp_md_index_delete,
583         .dio_it     = {
584                 .init     = osp_it_init,
585                 .fini     = osp_it_fini,
586                 .get      = osp_it_get,
587                 .put      = osp_it_put,
588                 .next     = osp_md_index_it_next,
589                 .key      = osp_it_key,
590                 .key_size = osp_it_key_size,
591                 .rec      = osp_md_index_it_rec,
592                 .store    = osp_it_store,
593                 .load     = osp_it_load,
594                 .key_rec  = osp_it_key_rec,
595         }
596 };
597
598 static int osp_md_index_try(const struct lu_env *env,
599                             struct dt_object *dt,
600                             const struct dt_index_features *feat)
601 {
602         dt->do_index_ops = &osp_md_index_ops;
603         return 0;
604 }
605
606 static int osp_md_object_lock(const struct lu_env *env,
607                               struct dt_object *dt,
608                               struct lustre_handle *lh,
609                               struct ldlm_enqueue_info *einfo,
610                               ldlm_policy_data_t *policy)
611 {
612         struct ldlm_res_id      *res_id;
613         struct dt_device        *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
614         struct osp_device       *osp = dt2osp_dev(dt_dev);
615         struct ptlrpc_request   *req;
616         int                     rc = 0;
617         __u64                   flags = 0;
618         ldlm_mode_t             mode;
619
620         res_id = einfo->ei_res_id;
621         LASSERT(res_id != NULL);
622
623         mode = ldlm_lock_match(osp->opd_obd->obd_namespace,
624                                LDLM_FL_BLOCK_GRANTED, res_id,
625                                einfo->ei_type, policy,
626                                einfo->ei_mode, lh, 0);
627         if (mode > 0)
628                 return ELDLM_OK;
629
630         req = ldlm_enqueue_pack(osp->opd_exp, 0);
631         if (IS_ERR(req))
632                 RETURN(PTR_ERR(req));
633
634         rc = ldlm_cli_enqueue(osp->opd_exp, &req, einfo, res_id,
635                               (const ldlm_policy_data_t *)policy,
636                               &flags, NULL, 0, LVB_T_NONE, lh, 0);
637
638         ptlrpc_req_finished(req);
639
640         return rc == ELDLM_OK ? 0 : -EIO;
641 }
642
643 static int osp_md_object_unlock(const struct lu_env *env,
644                                 struct dt_object *dt,
645                                 struct ldlm_enqueue_info *einfo,
646                                 ldlm_policy_data_t *policy)
647 {
648         struct lustre_handle    *lockh = einfo->ei_cbdata;
649
650         /* unlock finally */
651         ldlm_lock_decref(lockh, einfo->ei_mode);
652
653         return 0;
654 }
655
656 struct dt_object_operations osp_md_obj_ops = {
657         .do_read_lock         = osp_md_object_read_lock,
658         .do_write_lock        = osp_md_object_write_lock,
659         .do_read_unlock       = osp_md_object_read_unlock,
660         .do_write_unlock      = osp_md_object_write_unlock,
661         .do_write_locked      = osp_md_object_write_locked,
662         .do_declare_create    = osp_md_declare_object_create,
663         .do_create            = osp_md_object_create,
664         .do_declare_ref_add   = osp_md_declare_ref_add,
665         .do_ref_add           = osp_md_object_ref_add,
666         .do_declare_ref_del   = osp_md_declare_object_ref_del,
667         .do_ref_del           = osp_md_object_ref_del,
668         .do_declare_destroy   = osp_declare_object_destroy,
669         .do_destroy           = osp_object_destroy,
670         .do_ah_init           = osp_md_ah_init,
671         .do_attr_get          = osp_attr_get,
672         .do_declare_attr_set  = osp_md_declare_attr_set,
673         .do_attr_set          = osp_md_attr_set,
674         .do_xattr_get         = osp_xattr_get,
675         .do_declare_xattr_set = osp_declare_xattr_set,
676         .do_xattr_set         = osp_xattr_set,
677         .do_declare_xattr_del = osp_declare_xattr_del,
678         .do_xattr_del         = osp_xattr_del,
679         .do_index_try         = osp_md_index_try,
680         .do_object_lock       = osp_md_object_lock,
681         .do_object_unlock     = osp_md_object_unlock,
682 };
683
684 static ssize_t osp_md_declare_write(const struct lu_env *env,
685                                     struct dt_object *dt,
686                                     const struct lu_buf *buf,
687                                     loff_t pos, struct thandle *th)
688 {
689         struct dt_update_request  *update;
690         struct lu_fid             *fid;
691         int                       sizes[2] = {buf->lb_len, sizeof(pos)};
692         const char                *bufs[2] = {(char *)buf->lb_buf,
693                                               (char *)&pos};
694         ssize_t                   rc;
695
696         update = out_find_create_update_loc(th, dt);
697         if (IS_ERR(update)) {
698                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
699                        dt->do_lu.lo_dev->ld_obd->obd_name,
700                        (int)PTR_ERR(update));
701                 return PTR_ERR(update);
702         }
703
704         pos = cpu_to_le64(pos);
705         bufs[1] = (char *)&pos;
706         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
707         rc = out_insert_update(env, update, OUT_WRITE, fid,
708                                ARRAY_SIZE(sizes), sizes, bufs);
709
710         return rc;
711
712 }
713
714 static ssize_t osp_md_write(const struct lu_env *env, struct dt_object *dt,
715                             const struct lu_buf *buf, loff_t *pos,
716                             struct thandle *handle,
717                             struct lustre_capa *capa, int ignore_quota)
718 {
719         return buf->lb_len;
720 }
721
722 /* These body operation will be used to write symlinks during migration etc */
723 struct dt_body_operations osp_md_body_ops = {
724         .dbo_declare_write      = osp_md_declare_write,
725         .dbo_write              = osp_md_write,
726 };