Whamcloud - gitweb
a4299e78134ebd02ae8ffcae77f5896aa38d62ae
[fs/lustre-release.git] / lustre / osp / osp_md_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, Intel Corporation.
24  */
25 /*
26  * lustre/osp/osp_md_object.c
27  *
28  * Lustre MDT Proxy Device
29  *
30  * Author: Di Wang <di.wang@intel.com>
31  */
32
33 #define DEBUG_SUBSYSTEM S_MDS
34
35 #include <lustre_log.h>
36 #include "osp_internal.h"
37
38 static const char dot[] = ".";
39 static const char dotdot[] = "..";
40
41 int osp_md_declare_object_create(const struct lu_env *env,
42                                  struct dt_object *dt,
43                                  struct lu_attr *attr,
44                                  struct dt_allocation_hint *hint,
45                                  struct dt_object_format *dof,
46                                  struct thandle *th)
47 {
48         struct osp_thread_info          *osi = osp_env_info(env);
49         struct dt_update_request        *update;
50         struct lu_fid                   *fid1;
51         int                             sizes[2] = {sizeof(struct obdo), 0};
52         char                            *bufs[2] = {NULL, NULL};
53         int                             buf_count;
54         int                             rc;
55
56         update = out_find_create_update_loc(th, dt);
57         if (IS_ERR(update)) {
58                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
59                        dt->do_lu.lo_dev->ld_obd->obd_name,
60                        (int)PTR_ERR(update));
61                 return PTR_ERR(update);
62         }
63
64         osi->osi_obdo.o_valid = 0;
65         obdo_from_la(&osi->osi_obdo, attr, attr->la_valid);
66         lustre_set_wire_obdo(NULL, &osi->osi_obdo, &osi->osi_obdo);
67         obdo_cpu_to_le(&osi->osi_obdo, &osi->osi_obdo);
68
69         bufs[0] = (char *)&osi->osi_obdo;
70         buf_count = 1;
71         fid1 = (struct lu_fid *)lu_object_fid(&dt->do_lu);
72         if (hint != NULL && hint->dah_parent) {
73                 struct lu_fid *fid2;
74                 struct lu_fid *tmp_fid = &osi->osi_fid;
75
76                 fid2 = (struct lu_fid *)lu_object_fid(&hint->dah_parent->do_lu);
77                 fid_cpu_to_le(tmp_fid, fid2);
78                 sizes[1] = sizeof(*tmp_fid);
79                 bufs[1] = (char *)tmp_fid;
80                 buf_count++;
81         }
82
83         if (lu_object_exists(&dt->do_lu)) {
84                 /* If the object already exists, we needs to destroy
85                  * this orphan object first.
86                  *
87                  * The scenario might happen in this case
88                  *
89                  * 1. client send remote create to MDT0.
90                  * 2. MDT0 send create update to MDT1.
91                  * 3. MDT1 finished create synchronously.
92                  * 4. MDT0 failed and reboot.
93                  * 5. client resend remote create to MDT0.
94                  * 6. MDT0 tries to resend create update to MDT1,
95                  *    but find the object already exists
96                  */
97                 CDEBUG(D_HA, "%s: object "DFID" exists, destroy this orphan\n",
98                        dt->do_lu.lo_dev->ld_obd->obd_name, PFID(fid1));
99
100                 rc = out_insert_update(env, update, OUT_REF_DEL, fid1, 0,
101                                        NULL, NULL);
102                 if (rc != 0)
103                         GOTO(out, rc);
104
105                 if (S_ISDIR(lu_object_attr(&dt->do_lu))) {
106                         /* decrease for ".." */
107                         rc = out_insert_update(env, update, OUT_REF_DEL, fid1,
108                                                0, NULL, NULL);
109                         if (rc != 0)
110                                 GOTO(out, rc);
111                 }
112
113                 rc = out_insert_update(env, update, OUT_DESTROY, fid1, 0, NULL,
114                                        NULL);
115                 if (rc != 0)
116                         GOTO(out, rc);
117
118                 dt->do_lu.lo_header->loh_attr &= ~LOHA_EXISTS;
119                 /* Increase batchid to add this orphan object deletion
120                  * to separate transaction */
121                 update_inc_batchid(update);
122         }
123
124         rc = out_insert_update(env, update, OUT_CREATE, fid1, buf_count, sizes,
125                                (const char **)bufs);
126 out:
127         if (rc)
128                 CERROR("%s: Insert update error: rc = %d\n",
129                        dt->do_lu.lo_dev->ld_obd->obd_name, rc);
130
131         return rc;
132 }
133
134 int osp_md_object_create(const struct lu_env *env, struct dt_object *dt,
135                          struct lu_attr *attr, struct dt_allocation_hint *hint,
136                          struct dt_object_format *dof, struct thandle *th)
137 {
138         struct osp_object  *obj = dt2osp_obj(dt);
139
140         CDEBUG(D_INFO, "create object "DFID"\n",
141                PFID(&dt->do_lu.lo_header->loh_fid));
142
143         /* Because the create update RPC will be sent during declare phase,
144          * if creation reaches here, it means the object has been created
145          * successfully */
146         dt->do_lu.lo_header->loh_attr |= LOHA_EXISTS | (attr->la_mode & S_IFMT);
147         obj->opo_empty = 1;
148
149         return 0;
150 }
151
152 static int osp_md_declare_object_ref_del(const struct lu_env *env,
153                                          struct dt_object *dt,
154                                          struct thandle *th)
155 {
156         struct dt_update_request        *update;
157         struct lu_fid                   *fid;
158         int                             rc;
159
160         update = out_find_create_update_loc(th, dt);
161         if (IS_ERR(update)) {
162                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
163                        dt->do_lu.lo_dev->ld_obd->obd_name,
164                       (int)PTR_ERR(update));
165                 return PTR_ERR(update);
166         }
167
168         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
169
170         rc = out_insert_update(env, update, OUT_REF_DEL, fid, 0, NULL, NULL);
171
172         return rc;
173 }
174
175 static int osp_md_object_ref_del(const struct lu_env *env,
176                                  struct dt_object *dt,
177                                  struct thandle *th)
178 {
179         CDEBUG(D_INFO, "ref del object "DFID"\n",
180                PFID(&dt->do_lu.lo_header->loh_fid));
181
182         return 0;
183 }
184
185 static int osp_md_declare_ref_add(const struct lu_env *env,
186                                   struct dt_object *dt, struct thandle *th)
187 {
188         struct dt_update_request        *update;
189         struct lu_fid                   *fid;
190         int                             rc;
191
192         update = out_find_create_update_loc(th, dt);
193         if (IS_ERR(update)) {
194                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
195                        dt->do_lu.lo_dev->ld_obd->obd_name,
196                        (int)PTR_ERR(update));
197                 return PTR_ERR(update);
198         }
199
200         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
201
202         rc = out_insert_update(env, update, OUT_REF_ADD, fid, 0, NULL, NULL);
203
204         return rc;
205 }
206
207 static int osp_md_object_ref_add(const struct lu_env *env,
208                                  struct dt_object *dt,
209                                  struct thandle *th)
210 {
211         CDEBUG(D_INFO, "ref add object "DFID"\n",
212                PFID(&dt->do_lu.lo_header->loh_fid));
213
214         return 0;
215 }
216
217 static void osp_md_ah_init(const struct lu_env *env,
218                            struct dt_allocation_hint *ah,
219                            struct dt_object *parent,
220                            struct dt_object *child,
221                            umode_t child_mode)
222 {
223         LASSERT(ah);
224
225         ah->dah_parent = parent;
226         ah->dah_mode = child_mode;
227 }
228
229 int osp_md_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
230                             const struct lu_attr *attr, struct thandle *th)
231 {
232         struct osp_thread_info          *osi = osp_env_info(env);
233         struct dt_update_request        *update;
234         struct lu_fid                   *fid;
235         int                             size = sizeof(struct obdo);
236         char                            *buf;
237         int                             rc;
238
239         update = out_find_create_update_loc(th, dt);
240         if (IS_ERR(update)) {
241                 CERROR("%s: Get OSP update buf failed: %d\n",
242                        dt->do_lu.lo_dev->ld_obd->obd_name,
243                        (int)PTR_ERR(update));
244                 return PTR_ERR(update);
245         }
246
247         osi->osi_obdo.o_valid = 0;
248         obdo_from_la(&osi->osi_obdo, (struct lu_attr *)attr,
249                      attr->la_valid);
250         lustre_set_wire_obdo(NULL, &osi->osi_obdo, &osi->osi_obdo);
251         obdo_cpu_to_le(&osi->osi_obdo, &osi->osi_obdo);
252
253         buf = (char *)&osi->osi_obdo;
254         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
255
256         rc = out_insert_update(env, update, OUT_ATTR_SET, fid, 1, &size,
257                                (const char **)&buf);
258
259         return rc;
260 }
261
262 int osp_md_attr_set(const struct lu_env *env, struct dt_object *dt,
263                     const struct lu_attr *attr, struct thandle *th,
264                     struct lustre_capa *capa)
265 {
266         CDEBUG(D_INFO, "attr set object "DFID"\n",
267                PFID(&dt->do_lu.lo_header->loh_fid));
268
269         RETURN(0);
270 }
271
272 static void osp_md_object_read_lock(const struct lu_env *env,
273                                     struct dt_object *dt, unsigned role)
274 {
275         struct osp_object  *obj = dt2osp_obj(dt);
276
277         LASSERT(obj->opo_owner != env);
278         down_read_nested(&obj->opo_sem, role);
279
280         LASSERT(obj->opo_owner == NULL);
281 }
282
283 static void osp_md_object_write_lock(const struct lu_env *env,
284                                      struct dt_object *dt, unsigned role)
285 {
286         struct osp_object *obj = dt2osp_obj(dt);
287
288         down_write_nested(&obj->opo_sem, role);
289
290         LASSERT(obj->opo_owner == NULL);
291         obj->opo_owner = env;
292 }
293
294 static void osp_md_object_read_unlock(const struct lu_env *env,
295                                       struct dt_object *dt)
296 {
297         struct osp_object *obj = dt2osp_obj(dt);
298
299         up_read(&obj->opo_sem);
300 }
301
302 static void osp_md_object_write_unlock(const struct lu_env *env,
303                                        struct dt_object *dt)
304 {
305         struct osp_object *obj = dt2osp_obj(dt);
306
307         LASSERT(obj->opo_owner == env);
308         obj->opo_owner = NULL;
309         up_write(&obj->opo_sem);
310 }
311
312 static int osp_md_object_write_locked(const struct lu_env *env,
313                                       struct dt_object *dt)
314 {
315         struct osp_object *obj = dt2osp_obj(dt);
316
317         return obj->opo_owner == env;
318 }
319
320 static int osp_md_index_lookup(const struct lu_env *env, struct dt_object *dt,
321                                struct dt_rec *rec, const struct dt_key *key,
322                                struct lustre_capa *capa)
323 {
324         struct lu_buf           *lbuf   = &osp_env_info(env)->osi_lb2;
325         struct osp_device       *osp    = lu2osp_dev(dt->do_lu.lo_dev);
326         struct dt_device        *dt_dev = &osp->opd_dt_dev;
327         struct dt_update_request   *update;
328         struct object_update_reply *reply;
329         struct ptlrpc_request      *req = NULL;
330         int                        size = strlen((char *)key) + 1;
331         struct lu_fid              *fid;
332         int                        rc;
333         ENTRY;
334
335         /* Because it needs send the update buffer right away,
336          * just create an update buffer, instead of attaching the
337          * update_remote list of the thandle.
338          */
339         update = out_create_update_req(dt_dev);
340         if (IS_ERR(update))
341                 RETURN(PTR_ERR(update));
342
343         rc = out_insert_update(env, update, OUT_INDEX_LOOKUP,
344                                lu_object_fid(&dt->do_lu),
345                                1, &size, (const char **)&key);
346         if (rc) {
347                 CERROR("%s: Insert update error: rc = %d\n",
348                        dt_dev->dd_lu_dev.ld_obd->obd_name, rc);
349                 GOTO(out, rc);
350         }
351
352         rc = out_remote_sync(env, osp->opd_obd->u.cli.cl_import, update, &req);
353         if (rc < 0)
354                 GOTO(out, rc);
355
356         reply = req_capsule_server_sized_get(&req->rq_pill,
357                                              &RMF_OUT_UPDATE_REPLY,
358                                              OUT_UPDATE_REPLY_SIZE);
359         if (reply->ourp_magic != UPDATE_REPLY_MAGIC) {
360                 CERROR("%s: Wrong version %x expected %x: rc = %d\n",
361                        dt_dev->dd_lu_dev.ld_obd->obd_name,
362                        reply->ourp_magic, UPDATE_REPLY_MAGIC, -EPROTO);
363                 GOTO(out, rc = -EPROTO);
364         }
365
366         rc = object_update_result_data_get(reply, lbuf, 0);
367         if (rc < 0)
368                 GOTO(out, rc = size);
369
370         if (lbuf->lb_len != sizeof(*fid)) {
371                 CERROR("%s: lookup "DFID" %s wrong size %d\n",
372                        dt_dev->dd_lu_dev.ld_obd->obd_name,
373                        PFID(lu_object_fid(&dt->do_lu)), (char *)key,
374                        (int)lbuf->lb_len);
375                 GOTO(out, rc = -EINVAL);
376         }
377
378         fid = lbuf->lb_buf;
379         fid_le_to_cpu(fid, fid);
380         if (!fid_is_sane(fid)) {
381                 CERROR("%s: lookup "DFID" %s invalid fid "DFID"\n",
382                        dt_dev->dd_lu_dev.ld_obd->obd_name,
383                        PFID(lu_object_fid(&dt->do_lu)), (char *)key, PFID(fid));
384                 GOTO(out, rc = -EINVAL);
385         }
386
387         memcpy(rec, fid, sizeof(*fid));
388
389         GOTO(out, rc = 1);
390
391 out:
392         if (req != NULL)
393                 ptlrpc_req_finished(req);
394
395         out_destroy_update_req(update);
396
397         return rc;
398 }
399
400 static int osp_md_declare_insert(const struct lu_env *env,
401                                  struct dt_object *dt,
402                                  const struct dt_rec *rec,
403                                  const struct dt_key *key,
404                                  struct thandle *th)
405 {
406         struct dt_update_request *update;
407         struct lu_fid            *fid;
408         struct lu_fid            *rec_fid = (struct lu_fid *)rec;
409         int                      size[2] = {strlen((char *)key) + 1,
410                                                   sizeof(*rec_fid)};
411         const char               *bufs[2] = {(char *)key, (char *)rec_fid};
412         int                      rc;
413
414         update = out_find_create_update_loc(th, dt);
415         if (IS_ERR(update)) {
416                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
417                        dt->do_lu.lo_dev->ld_obd->obd_name,
418                        (int)PTR_ERR(update));
419                 return PTR_ERR(update);
420         }
421
422         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
423
424         CDEBUG(D_INFO, "%s: insert index of "DFID" %s: "DFID"\n",
425                dt->do_lu.lo_dev->ld_obd->obd_name,
426                PFID(fid), (char *)key, PFID(rec_fid));
427
428         fid_cpu_to_le(rec_fid, rec_fid);
429
430         rc = out_insert_update(env, update, OUT_INDEX_INSERT, fid,
431                                ARRAY_SIZE(size), size, bufs);
432         return rc;
433 }
434
435 static int osp_md_index_insert(const struct lu_env *env,
436                                struct dt_object *dt,
437                                const struct dt_rec *rec,
438                                const struct dt_key *key,
439                                struct thandle *th,
440                                struct lustre_capa *capa,
441                                int ignore_quota)
442 {
443         return 0;
444 }
445
446 static int osp_md_declare_delete(const struct lu_env *env,
447                                  struct dt_object *dt,
448                                  const struct dt_key *key,
449                                  struct thandle *th)
450 {
451         struct dt_update_request *update;
452         struct lu_fid *fid;
453         int size = strlen((char *)key) + 1;
454         int rc;
455
456         update = out_find_create_update_loc(th, dt);
457         if (IS_ERR(update)) {
458                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
459                        dt->do_lu.lo_dev->ld_obd->obd_name,
460                        (int)PTR_ERR(update));
461                 return PTR_ERR(update);
462         }
463
464         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
465
466         rc = out_insert_update(env, update, OUT_INDEX_DELETE, fid, 1, &size,
467                                (const char **)&key);
468
469         return rc;
470 }
471
472 static int osp_md_index_delete(const struct lu_env *env,
473                                struct dt_object *dt,
474                                const struct dt_key *key,
475                                struct thandle *th,
476                                struct lustre_capa *capa)
477 {
478         CDEBUG(D_INFO, "index delete "DFID" %s\n",
479                PFID(&dt->do_lu.lo_header->loh_fid), (char *)key);
480
481         return 0;
482 }
483
484 /**
485  * Creates or initializes iterator context.
486  *
487  * Note: for OSP, these index iterate api is only used to check
488  * whether the directory is empty now (see mdd_dir_is_empty).
489  * Since dir_empty will be return by OUT_ATTR_GET(see osp_attr_get/
490  * out_attr_get). So the implementation of these iterator is simplied
491  * to make mdd_dir_is_empty happy. The real iterator should be
492  * implemented, if we need it one day.
493  */
494 static struct dt_it *osp_it_init(const struct lu_env *env,
495                                  struct dt_object *dt,
496                                  __u32 attr,
497                                 struct lustre_capa *capa)
498 {
499         lu_object_get(&dt->do_lu);
500         return (struct dt_it *)dt;
501 }
502
503 static void osp_it_fini(const struct lu_env *env, struct dt_it *di)
504 {
505         struct dt_object *dt = (struct dt_object *)di;
506         lu_object_put(env, &dt->do_lu);
507 }
508
509 static int osp_it_get(const struct lu_env *env,
510                       struct dt_it *di, const struct dt_key *key)
511 {
512         return 1;
513 }
514
515 static void osp_it_put(const struct lu_env *env, struct dt_it *di)
516 {
517         return;
518 }
519
520 static int osp_it_next(const struct lu_env *env, struct dt_it *di)
521 {
522         struct dt_object *dt = (struct dt_object *)di;
523         struct osp_object *o = dt2osp_obj(dt);
524
525         if (o->opo_empty)
526                 return 1;
527
528         return 0;
529 }
530
531 static struct dt_key *osp_it_key(const struct lu_env *env,
532                                  const struct dt_it *di)
533 {
534         LBUG();
535         return NULL;
536 }
537
538 static int osp_it_key_size(const struct lu_env *env, const struct dt_it *di)
539 {
540         LBUG();
541         return 0;
542 }
543
544 static int osp_it_rec(const struct lu_env *env, const struct dt_it *di,
545                       struct dt_rec *lde, __u32 attr)
546 {
547         LBUG();
548         return 0;
549 }
550
551 static __u64 osp_it_store(const struct lu_env *env, const struct dt_it *di)
552 {
553         LBUG();
554         return 0;
555 }
556
557 static int osp_it_load(const struct lu_env *env, const struct dt_it *di,
558                        __u64 hash)
559 {
560         LBUG();
561         return 0;
562 }
563
564 static int osp_it_key_rec(const struct lu_env *env, const struct dt_it *di,
565                           void *key_rec)
566 {
567         LBUG();
568         return 0;
569 }
570
571 static const struct dt_index_operations osp_md_index_ops = {
572         .dio_lookup         = osp_md_index_lookup,
573         .dio_declare_insert = osp_md_declare_insert,
574         .dio_insert         = osp_md_index_insert,
575         .dio_declare_delete = osp_md_declare_delete,
576         .dio_delete         = osp_md_index_delete,
577         .dio_it     = {
578                 .init     = osp_it_init,
579                 .fini     = osp_it_fini,
580                 .get      = osp_it_get,
581                 .put      = osp_it_put,
582                 .next     = osp_it_next,
583                 .key      = osp_it_key,
584                 .key_size = osp_it_key_size,
585                 .rec      = osp_it_rec,
586                 .store    = osp_it_store,
587                 .load     = osp_it_load,
588                 .key_rec  = osp_it_key_rec,
589         }
590 };
591
592 static int osp_md_index_try(const struct lu_env *env,
593                             struct dt_object *dt,
594                             const struct dt_index_features *feat)
595 {
596         dt->do_index_ops = &osp_md_index_ops;
597         return 0;
598 }
599
600 static int osp_md_object_lock(const struct lu_env *env,
601                               struct dt_object *dt,
602                               struct lustre_handle *lh,
603                               struct ldlm_enqueue_info *einfo,
604                               ldlm_policy_data_t *policy)
605 {
606         struct osp_thread_info  *info = osp_env_info(env);
607         struct ldlm_res_id      *res_id = &info->osi_resid;
608         struct dt_device        *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
609         struct osp_device       *osp = dt2osp_dev(dt_dev);
610         struct ptlrpc_request   *req;
611         int                     rc = 0;
612         __u64                   flags = 0;
613         ldlm_mode_t             mode;
614
615         fid_build_reg_res_name(lu_object_fid(&dt->do_lu), res_id);
616
617         mode = ldlm_lock_match(osp->opd_obd->obd_namespace,
618                                LDLM_FL_BLOCK_GRANTED, res_id,
619                                einfo->ei_type, policy,
620                                einfo->ei_mode, lh, 0);
621         if (mode > 0)
622                 return ELDLM_OK;
623
624         req = ldlm_enqueue_pack(osp->opd_exp, 0);
625         if (IS_ERR(req))
626                 RETURN(PTR_ERR(req));
627
628         rc = ldlm_cli_enqueue(osp->opd_exp, &req, einfo, res_id,
629                               (const ldlm_policy_data_t *)policy,
630                               &flags, NULL, 0, LVB_T_NONE, lh, 0);
631
632         ptlrpc_req_finished(req);
633
634         return rc == ELDLM_OK ? 0 : -EIO;
635 }
636
637 static int osp_md_object_unlock(const struct lu_env *env,
638                                 struct dt_object *dt,
639                                 struct ldlm_enqueue_info *einfo,
640                                 ldlm_policy_data_t *policy)
641 {
642         struct lustre_handle    *lockh = einfo->ei_cbdata;
643
644         /* unlock finally */
645         ldlm_lock_decref(lockh, einfo->ei_mode);
646
647         return 0;
648 }
649
650 struct dt_object_operations osp_md_obj_ops = {
651         .do_read_lock         = osp_md_object_read_lock,
652         .do_write_lock        = osp_md_object_write_lock,
653         .do_read_unlock       = osp_md_object_read_unlock,
654         .do_write_unlock      = osp_md_object_write_unlock,
655         .do_write_locked      = osp_md_object_write_locked,
656         .do_declare_create    = osp_md_declare_object_create,
657         .do_create            = osp_md_object_create,
658         .do_declare_ref_add   = osp_md_declare_ref_add,
659         .do_ref_add           = osp_md_object_ref_add,
660         .do_declare_ref_del   = osp_md_declare_object_ref_del,
661         .do_ref_del           = osp_md_object_ref_del,
662         .do_declare_destroy   = osp_declare_object_destroy,
663         .do_destroy           = osp_object_destroy,
664         .do_ah_init           = osp_md_ah_init,
665         .do_attr_get          = osp_attr_get,
666         .do_declare_attr_set  = osp_md_declare_attr_set,
667         .do_attr_set          = osp_md_attr_set,
668         .do_xattr_get         = osp_xattr_get,
669         .do_declare_xattr_set = osp_declare_xattr_set,
670         .do_xattr_set         = osp_xattr_set,
671         .do_index_try         = osp_md_index_try,
672         .do_object_lock       = osp_md_object_lock,
673         .do_object_unlock     = osp_md_object_unlock,
674 };
675
676 static ssize_t osp_md_declare_write(const struct lu_env *env,
677                                     struct dt_object *dt,
678                                     const struct lu_buf *buf,
679                                     loff_t pos, struct thandle *th)
680 {
681         struct dt_update_request  *update;
682         struct lu_fid             *fid;
683         int                       sizes[2] = {buf->lb_len, sizeof(pos)};
684         const char                *bufs[2] = {(char *)buf->lb_buf,
685                                               (char *)&pos};
686         ssize_t                   rc;
687
688         update = out_find_create_update_loc(th, dt);
689         if (IS_ERR(update)) {
690                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
691                        dt->do_lu.lo_dev->ld_obd->obd_name,
692                        (int)PTR_ERR(update));
693                 return PTR_ERR(update);
694         }
695
696         pos = cpu_to_le64(pos);
697         bufs[1] = (char *)&pos;
698         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
699         rc = out_insert_update(env, update, OUT_WRITE, fid,
700                                ARRAY_SIZE(sizes), sizes, bufs);
701
702         return rc;
703
704 }
705
706 static ssize_t osp_md_write(const struct lu_env *env, struct dt_object *dt,
707                             const struct lu_buf *buf, loff_t *pos,
708                             struct thandle *handle,
709                             struct lustre_capa *capa, int ignore_quota)
710 {
711         return buf->lb_len;
712 }
713
714 /* These body operation will be used to write symlinks during migration etc */
715 struct dt_body_operations osp_md_body_ops = {
716         .dbo_declare_write      = osp_md_declare_write,
717         .dbo_write              = osp_md_write,
718 };