Whamcloud - gitweb
e331bfaf53ed3573a21b106bdf560cbd9c5a5b1c
[fs/lustre-release.git] / lustre / osp / osp_md_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, Intel Corporation.
24  */
25 /*
26  * lustre/osp/osp_md_object.c
27  *
28  * Lustre MDT Proxy Device
29  *
30  * Author: Di Wang <di.wang@intel.com>
31  */
32
33 #define DEBUG_SUBSYSTEM S_MDS
34
35 #include <lustre_log.h>
36 #include "osp_internal.h"
37
38 static const char dot[] = ".";
39 static const char dotdot[] = "..";
40
41 int osp_md_declare_object_create(const struct lu_env *env,
42                                  struct dt_object *dt,
43                                  struct lu_attr *attr,
44                                  struct dt_allocation_hint *hint,
45                                  struct dt_object_format *dof,
46                                  struct thandle *th)
47 {
48         struct osp_thread_info          *osi = osp_env_info(env);
49         struct dt_update_request        *update;
50         struct lu_fid                   *fid1;
51         int                             sizes[2] = {sizeof(struct obdo), 0};
52         char                            *bufs[2] = {NULL, NULL};
53         int                             buf_count;
54         int                             rc;
55
56         update = out_find_create_update_loc(th, dt);
57         if (IS_ERR(update)) {
58                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
59                        dt->do_lu.lo_dev->ld_obd->obd_name,
60                        (int)PTR_ERR(update));
61                 return PTR_ERR(update);
62         }
63
64         osi->osi_obdo.o_valid = 0;
65         obdo_from_la(&osi->osi_obdo, attr, attr->la_valid);
66         lustre_set_wire_obdo(NULL, &osi->osi_obdo, &osi->osi_obdo);
67
68         bufs[0] = (char *)&osi->osi_obdo;
69         buf_count = 1;
70         fid1 = (struct lu_fid *)lu_object_fid(&dt->do_lu);
71         if (hint != NULL && hint->dah_parent) {
72                 struct lu_fid *fid2;
73
74                 fid2 = (struct lu_fid *)lu_object_fid(&hint->dah_parent->do_lu);
75                 sizes[1] = sizeof(*fid2);
76                 bufs[1] = (char *)fid2;
77                 buf_count++;
78         }
79
80         if (lu_object_exists(&dt->do_lu)) {
81                 /* If the object already exists, we needs to destroy
82                  * this orphan object first.
83                  *
84                  * The scenario might happen in this case
85                  *
86                  * 1. client send remote create to MDT0.
87                  * 2. MDT0 send create update to MDT1.
88                  * 3. MDT1 finished create synchronously.
89                  * 4. MDT0 failed and reboot.
90                  * 5. client resend remote create to MDT0.
91                  * 6. MDT0 tries to resend create update to MDT1,
92                  *    but find the object already exists
93                  */
94                 CDEBUG(D_HA, "%s: object "DFID" exists, destroy this orphan\n",
95                        dt->do_lu.lo_dev->ld_obd->obd_name, PFID(fid1));
96
97                 rc = out_insert_update(env, update, OUT_REF_DEL, fid1, 0,
98                                        NULL, NULL);
99                 if (rc != 0)
100                         GOTO(out, rc);
101
102                 if (S_ISDIR(lu_object_attr(&dt->do_lu))) {
103                         /* decrease for ".." */
104                         rc = out_insert_update(env, update, OUT_REF_DEL, fid1,
105                                                0, NULL, NULL);
106                         if (rc != 0)
107                                 GOTO(out, rc);
108                 }
109
110                 rc = out_insert_update(env, update, OUT_DESTROY, fid1, 0, NULL,
111                                        NULL);
112                 if (rc != 0)
113                         GOTO(out, rc);
114
115                 dt->do_lu.lo_header->loh_attr &= ~LOHA_EXISTS;
116                 /* Increase batchid to add this orphan object deletion
117                  * to separate transaction */
118                 update_inc_batchid(update);
119         }
120
121         rc = out_insert_update(env, update, OUT_CREATE, fid1, buf_count, sizes,
122                                (const char **)bufs);
123 out:
124         if (rc)
125                 CERROR("%s: Insert update error: rc = %d\n",
126                        dt->do_lu.lo_dev->ld_obd->obd_name, rc);
127
128         return rc;
129 }
130
131 int osp_md_object_create(const struct lu_env *env, struct dt_object *dt,
132                          struct lu_attr *attr, struct dt_allocation_hint *hint,
133                          struct dt_object_format *dof, struct thandle *th)
134 {
135         struct osp_object  *obj = dt2osp_obj(dt);
136
137         CDEBUG(D_INFO, "create object "DFID"\n",
138                PFID(&dt->do_lu.lo_header->loh_fid));
139
140         /* Because the create update RPC will be sent during declare phase,
141          * if creation reaches here, it means the object has been created
142          * successfully */
143         dt->do_lu.lo_header->loh_attr |= LOHA_EXISTS | (attr->la_mode & S_IFMT);
144         obj->opo_empty = 1;
145
146         return 0;
147 }
148
149 static int osp_md_declare_object_ref_del(const struct lu_env *env,
150                                          struct dt_object *dt,
151                                          struct thandle *th)
152 {
153         struct dt_update_request        *update;
154         struct lu_fid                   *fid;
155         int                             rc;
156
157         update = out_find_create_update_loc(th, dt);
158         if (IS_ERR(update)) {
159                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
160                        dt->do_lu.lo_dev->ld_obd->obd_name,
161                       (int)PTR_ERR(update));
162                 return PTR_ERR(update);
163         }
164
165         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
166
167         rc = out_insert_update(env, update, OUT_REF_DEL, fid, 0, NULL, NULL);
168
169         return rc;
170 }
171
172 static int osp_md_object_ref_del(const struct lu_env *env,
173                                  struct dt_object *dt,
174                                  struct thandle *th)
175 {
176         CDEBUG(D_INFO, "ref del object "DFID"\n",
177                PFID(&dt->do_lu.lo_header->loh_fid));
178
179         return 0;
180 }
181
182 static int osp_md_declare_ref_add(const struct lu_env *env,
183                                   struct dt_object *dt, struct thandle *th)
184 {
185         struct dt_update_request        *update;
186         struct lu_fid                   *fid;
187         int                             rc;
188
189         update = out_find_create_update_loc(th, dt);
190         if (IS_ERR(update)) {
191                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
192                        dt->do_lu.lo_dev->ld_obd->obd_name,
193                        (int)PTR_ERR(update));
194                 return PTR_ERR(update);
195         }
196
197         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
198
199         rc = out_insert_update(env, update, OUT_REF_ADD, fid, 0, NULL, NULL);
200
201         return rc;
202 }
203
204 static int osp_md_object_ref_add(const struct lu_env *env,
205                                  struct dt_object *dt,
206                                  struct thandle *th)
207 {
208         CDEBUG(D_INFO, "ref add object "DFID"\n",
209                PFID(&dt->do_lu.lo_header->loh_fid));
210
211         return 0;
212 }
213
214 static void osp_md_ah_init(const struct lu_env *env,
215                            struct dt_allocation_hint *ah,
216                            struct dt_object *parent,
217                            struct dt_object *child,
218                            umode_t child_mode)
219 {
220         LASSERT(ah);
221
222         ah->dah_parent = parent;
223         ah->dah_mode = child_mode;
224 }
225
226 int osp_md_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
227                             const struct lu_attr *attr, struct thandle *th)
228 {
229         struct osp_thread_info          *osi = osp_env_info(env);
230         struct dt_update_request        *update;
231         struct lu_fid                   *fid;
232         int                             size = sizeof(struct obdo);
233         char                            *buf;
234         int                             rc;
235
236         update = out_find_create_update_loc(th, dt);
237         if (IS_ERR(update)) {
238                 CERROR("%s: Get OSP update buf failed: %d\n",
239                        dt->do_lu.lo_dev->ld_obd->obd_name,
240                        (int)PTR_ERR(update));
241                 return PTR_ERR(update);
242         }
243
244         osi->osi_obdo.o_valid = 0;
245         obdo_from_la(&osi->osi_obdo, (struct lu_attr *)attr,
246                      attr->la_valid);
247         lustre_set_wire_obdo(NULL, &osi->osi_obdo, &osi->osi_obdo);
248
249         buf = (char *)&osi->osi_obdo;
250         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
251
252         rc = out_insert_update(env, update, OUT_ATTR_SET, fid, 1, &size,
253                                (const char **)&buf);
254
255         return rc;
256 }
257
258 int osp_md_attr_set(const struct lu_env *env, struct dt_object *dt,
259                     const struct lu_attr *attr, struct thandle *th,
260                     struct lustre_capa *capa)
261 {
262         CDEBUG(D_INFO, "attr set object "DFID"\n",
263                PFID(&dt->do_lu.lo_header->loh_fid));
264
265         RETURN(0);
266 }
267
268 static void osp_md_object_read_lock(const struct lu_env *env,
269                                     struct dt_object *dt, unsigned role)
270 {
271         struct osp_object  *obj = dt2osp_obj(dt);
272
273         LASSERT(obj->opo_owner != env);
274         down_read_nested(&obj->opo_sem, role);
275
276         LASSERT(obj->opo_owner == NULL);
277 }
278
279 static void osp_md_object_write_lock(const struct lu_env *env,
280                                      struct dt_object *dt, unsigned role)
281 {
282         struct osp_object *obj = dt2osp_obj(dt);
283
284         down_write_nested(&obj->opo_sem, role);
285
286         LASSERT(obj->opo_owner == NULL);
287         obj->opo_owner = env;
288 }
289
290 static void osp_md_object_read_unlock(const struct lu_env *env,
291                                       struct dt_object *dt)
292 {
293         struct osp_object *obj = dt2osp_obj(dt);
294
295         up_read(&obj->opo_sem);
296 }
297
298 static void osp_md_object_write_unlock(const struct lu_env *env,
299                                        struct dt_object *dt)
300 {
301         struct osp_object *obj = dt2osp_obj(dt);
302
303         LASSERT(obj->opo_owner == env);
304         obj->opo_owner = NULL;
305         up_write(&obj->opo_sem);
306 }
307
308 static int osp_md_object_write_locked(const struct lu_env *env,
309                                       struct dt_object *dt)
310 {
311         struct osp_object *obj = dt2osp_obj(dt);
312
313         return obj->opo_owner == env;
314 }
315
316 static int osp_md_index_lookup(const struct lu_env *env, struct dt_object *dt,
317                                struct dt_rec *rec, const struct dt_key *key,
318                                struct lustre_capa *capa)
319 {
320         struct lu_buf           *lbuf   = &osp_env_info(env)->osi_lb2;
321         struct osp_device       *osp    = lu2osp_dev(dt->do_lu.lo_dev);
322         struct dt_device        *dt_dev = &osp->opd_dt_dev;
323         struct dt_update_request   *update;
324         struct object_update_reply *reply;
325         struct ptlrpc_request      *req = NULL;
326         int                        size = strlen((char *)key) + 1;
327         struct lu_fid              *fid;
328         int                        rc;
329         ENTRY;
330
331         /* Because it needs send the update buffer right away,
332          * just create an update buffer, instead of attaching the
333          * update_remote list of the thandle.
334          */
335         update = out_create_update_req(dt_dev);
336         if (IS_ERR(update))
337                 RETURN(PTR_ERR(update));
338
339         rc = out_insert_update(env, update, OUT_INDEX_LOOKUP,
340                                lu_object_fid(&dt->do_lu),
341                                1, &size, (const char **)&key);
342         if (rc) {
343                 CERROR("%s: Insert update error: rc = %d\n",
344                        dt_dev->dd_lu_dev.ld_obd->obd_name, rc);
345                 GOTO(out, rc);
346         }
347
348         rc = out_remote_sync(env, osp->opd_obd->u.cli.cl_import, update, &req);
349         if (rc < 0)
350                 GOTO(out, rc);
351
352         reply = req_capsule_server_sized_get(&req->rq_pill,
353                                              &RMF_OUT_UPDATE_REPLY,
354                                              OUT_UPDATE_REPLY_SIZE);
355         if (reply->ourp_magic != UPDATE_REPLY_MAGIC) {
356                 CERROR("%s: Wrong version %x expected %x: rc = %d\n",
357                        dt_dev->dd_lu_dev.ld_obd->obd_name,
358                        reply->ourp_magic, UPDATE_REPLY_MAGIC, -EPROTO);
359                 GOTO(out, rc = -EPROTO);
360         }
361
362         rc = object_update_result_data_get(reply, lbuf, 0);
363         if (rc < 0)
364                 GOTO(out, rc = size);
365
366         if (lbuf->lb_len != sizeof(*fid)) {
367                 CERROR("%s: lookup "DFID" %s wrong size %d\n",
368                        dt_dev->dd_lu_dev.ld_obd->obd_name,
369                        PFID(lu_object_fid(&dt->do_lu)), (char *)key,
370                        (int)lbuf->lb_len);
371                 GOTO(out, rc = -EINVAL);
372         }
373
374         fid = lbuf->lb_buf;
375         if (ptlrpc_rep_need_swab(req))
376                 lustre_swab_lu_fid(fid);
377         if (!fid_is_sane(fid)) {
378                 CERROR("%s: lookup "DFID" %s invalid fid "DFID"\n",
379                        dt_dev->dd_lu_dev.ld_obd->obd_name,
380                        PFID(lu_object_fid(&dt->do_lu)), (char *)key, PFID(fid));
381                 GOTO(out, rc = -EINVAL);
382         }
383
384         memcpy(rec, fid, sizeof(*fid));
385
386         GOTO(out, rc = 1);
387
388 out:
389         if (req != NULL)
390                 ptlrpc_req_finished(req);
391
392         out_destroy_update_req(update);
393
394         return rc;
395 }
396
397 static int osp_md_declare_insert(const struct lu_env *env,
398                                  struct dt_object *dt,
399                                  const struct dt_rec *rec,
400                                  const struct dt_key *key,
401                                  struct thandle *th)
402 {
403         struct dt_update_request *update;
404         struct lu_fid            *fid;
405         struct lu_fid            *rec_fid = (struct lu_fid *)rec;
406         int                      size[2] = {strlen((char *)key) + 1,
407                                                   sizeof(*rec_fid)};
408         const char               *bufs[2] = {(char *)key, (char *)rec_fid};
409         int                      rc;
410
411         update = out_find_create_update_loc(th, dt);
412         if (IS_ERR(update)) {
413                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
414                        dt->do_lu.lo_dev->ld_obd->obd_name,
415                        (int)PTR_ERR(update));
416                 return PTR_ERR(update);
417         }
418
419         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
420
421         CDEBUG(D_INFO, "%s: insert index of "DFID" %s: "DFID"\n",
422                dt->do_lu.lo_dev->ld_obd->obd_name,
423                PFID(fid), (char *)key, PFID(rec_fid));
424
425         fid_cpu_to_le(rec_fid, rec_fid);
426
427         rc = out_insert_update(env, update, OUT_INDEX_INSERT, fid,
428                                ARRAY_SIZE(size), size, bufs);
429         return rc;
430 }
431
432 static int osp_md_index_insert(const struct lu_env *env,
433                                struct dt_object *dt,
434                                const struct dt_rec *rec,
435                                const struct dt_key *key,
436                                struct thandle *th,
437                                struct lustre_capa *capa,
438                                int ignore_quota)
439 {
440         return 0;
441 }
442
443 static int osp_md_declare_delete(const struct lu_env *env,
444                                  struct dt_object *dt,
445                                  const struct dt_key *key,
446                                  struct thandle *th)
447 {
448         struct dt_update_request *update;
449         struct lu_fid *fid;
450         int size = strlen((char *)key) + 1;
451         int rc;
452
453         update = out_find_create_update_loc(th, dt);
454         if (IS_ERR(update)) {
455                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
456                        dt->do_lu.lo_dev->ld_obd->obd_name,
457                        (int)PTR_ERR(update));
458                 return PTR_ERR(update);
459         }
460
461         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
462
463         rc = out_insert_update(env, update, OUT_INDEX_DELETE, fid, 1, &size,
464                                (const char **)&key);
465
466         return rc;
467 }
468
469 static int osp_md_index_delete(const struct lu_env *env,
470                                struct dt_object *dt,
471                                const struct dt_key *key,
472                                struct thandle *th,
473                                struct lustre_capa *capa)
474 {
475         CDEBUG(D_INFO, "index delete "DFID" %s\n",
476                PFID(&dt->do_lu.lo_header->loh_fid), (char *)key);
477
478         return 0;
479 }
480
481 /**
482  * Creates or initializes iterator context.
483  *
484  * Note: for OSP, these index iterate api is only used to check
485  * whether the directory is empty now (see mdd_dir_is_empty).
486  * Since dir_empty will be return by OUT_ATTR_GET(see osp_attr_get/
487  * out_attr_get). So the implementation of these iterator is simplied
488  * to make mdd_dir_is_empty happy. The real iterator should be
489  * implemented, if we need it one day.
490  */
491 static struct dt_it *osp_it_init(const struct lu_env *env,
492                                  struct dt_object *dt,
493                                  __u32 attr,
494                                 struct lustre_capa *capa)
495 {
496         lu_object_get(&dt->do_lu);
497         return (struct dt_it *)dt;
498 }
499
500 static void osp_it_fini(const struct lu_env *env, struct dt_it *di)
501 {
502         struct dt_object *dt = (struct dt_object *)di;
503         lu_object_put(env, &dt->do_lu);
504 }
505
506 static int osp_it_get(const struct lu_env *env,
507                       struct dt_it *di, const struct dt_key *key)
508 {
509         return 1;
510 }
511
512 static void osp_it_put(const struct lu_env *env, struct dt_it *di)
513 {
514         return;
515 }
516
517 static int osp_it_next(const struct lu_env *env, struct dt_it *di)
518 {
519         struct dt_object *dt = (struct dt_object *)di;
520         struct osp_object *o = dt2osp_obj(dt);
521
522         if (o->opo_empty)
523                 return 1;
524
525         return 0;
526 }
527
528 static struct dt_key *osp_it_key(const struct lu_env *env,
529                                  const struct dt_it *di)
530 {
531         LBUG();
532         return NULL;
533 }
534
535 static int osp_it_key_size(const struct lu_env *env, const struct dt_it *di)
536 {
537         LBUG();
538         return 0;
539 }
540
541 static int osp_it_rec(const struct lu_env *env, const struct dt_it *di,
542                       struct dt_rec *lde, __u32 attr)
543 {
544         LBUG();
545         return 0;
546 }
547
548 static __u64 osp_it_store(const struct lu_env *env, const struct dt_it *di)
549 {
550         LBUG();
551         return 0;
552 }
553
554 static int osp_it_load(const struct lu_env *env, const struct dt_it *di,
555                        __u64 hash)
556 {
557         LBUG();
558         return 0;
559 }
560
561 static int osp_it_key_rec(const struct lu_env *env, const struct dt_it *di,
562                           void *key_rec)
563 {
564         LBUG();
565         return 0;
566 }
567
568 static const struct dt_index_operations osp_md_index_ops = {
569         .dio_lookup         = osp_md_index_lookup,
570         .dio_declare_insert = osp_md_declare_insert,
571         .dio_insert         = osp_md_index_insert,
572         .dio_declare_delete = osp_md_declare_delete,
573         .dio_delete         = osp_md_index_delete,
574         .dio_it     = {
575                 .init     = osp_it_init,
576                 .fini     = osp_it_fini,
577                 .get      = osp_it_get,
578                 .put      = osp_it_put,
579                 .next     = osp_it_next,
580                 .key      = osp_it_key,
581                 .key_size = osp_it_key_size,
582                 .rec      = osp_it_rec,
583                 .store    = osp_it_store,
584                 .load     = osp_it_load,
585                 .key_rec  = osp_it_key_rec,
586         }
587 };
588
589 static int osp_md_index_try(const struct lu_env *env,
590                             struct dt_object *dt,
591                             const struct dt_index_features *feat)
592 {
593         dt->do_index_ops = &osp_md_index_ops;
594         return 0;
595 }
596
597 static int osp_md_object_lock(const struct lu_env *env,
598                               struct dt_object *dt,
599                               struct lustre_handle *lh,
600                               struct ldlm_enqueue_info *einfo,
601                               ldlm_policy_data_t *policy)
602 {
603         struct ldlm_res_id      *res_id;
604         struct dt_device        *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
605         struct osp_device       *osp = dt2osp_dev(dt_dev);
606         struct ptlrpc_request   *req;
607         int                     rc = 0;
608         __u64                   flags = 0;
609         ldlm_mode_t             mode;
610
611         res_id = einfo->ei_res_id;
612         LASSERT(res_id != NULL);
613
614         mode = ldlm_lock_match(osp->opd_obd->obd_namespace,
615                                LDLM_FL_BLOCK_GRANTED, res_id,
616                                einfo->ei_type, policy,
617                                einfo->ei_mode, lh, 0);
618         if (mode > 0)
619                 return ELDLM_OK;
620
621         req = ldlm_enqueue_pack(osp->opd_exp, 0);
622         if (IS_ERR(req))
623                 RETURN(PTR_ERR(req));
624
625         rc = ldlm_cli_enqueue(osp->opd_exp, &req, einfo, res_id,
626                               (const ldlm_policy_data_t *)policy,
627                               &flags, NULL, 0, LVB_T_NONE, lh, 0);
628
629         ptlrpc_req_finished(req);
630
631         return rc == ELDLM_OK ? 0 : -EIO;
632 }
633
634 static int osp_md_object_unlock(const struct lu_env *env,
635                                 struct dt_object *dt,
636                                 struct ldlm_enqueue_info *einfo,
637                                 ldlm_policy_data_t *policy)
638 {
639         struct lustre_handle    *lockh = einfo->ei_cbdata;
640
641         /* unlock finally */
642         ldlm_lock_decref(lockh, einfo->ei_mode);
643
644         return 0;
645 }
646
647 struct dt_object_operations osp_md_obj_ops = {
648         .do_read_lock         = osp_md_object_read_lock,
649         .do_write_lock        = osp_md_object_write_lock,
650         .do_read_unlock       = osp_md_object_read_unlock,
651         .do_write_unlock      = osp_md_object_write_unlock,
652         .do_write_locked      = osp_md_object_write_locked,
653         .do_declare_create    = osp_md_declare_object_create,
654         .do_create            = osp_md_object_create,
655         .do_declare_ref_add   = osp_md_declare_ref_add,
656         .do_ref_add           = osp_md_object_ref_add,
657         .do_declare_ref_del   = osp_md_declare_object_ref_del,
658         .do_ref_del           = osp_md_object_ref_del,
659         .do_declare_destroy   = osp_declare_object_destroy,
660         .do_destroy           = osp_object_destroy,
661         .do_ah_init           = osp_md_ah_init,
662         .do_attr_get          = osp_attr_get,
663         .do_declare_attr_set  = osp_md_declare_attr_set,
664         .do_attr_set          = osp_md_attr_set,
665         .do_xattr_get         = osp_xattr_get,
666         .do_declare_xattr_set = osp_declare_xattr_set,
667         .do_xattr_set         = osp_xattr_set,
668         .do_index_try         = osp_md_index_try,
669         .do_object_lock       = osp_md_object_lock,
670         .do_object_unlock     = osp_md_object_unlock,
671 };
672
673 static ssize_t osp_md_declare_write(const struct lu_env *env,
674                                     struct dt_object *dt,
675                                     const struct lu_buf *buf,
676                                     loff_t pos, struct thandle *th)
677 {
678         struct dt_update_request  *update;
679         struct lu_fid             *fid;
680         int                       sizes[2] = {buf->lb_len, sizeof(pos)};
681         const char                *bufs[2] = {(char *)buf->lb_buf,
682                                               (char *)&pos};
683         ssize_t                   rc;
684
685         update = out_find_create_update_loc(th, dt);
686         if (IS_ERR(update)) {
687                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
688                        dt->do_lu.lo_dev->ld_obd->obd_name,
689                        (int)PTR_ERR(update));
690                 return PTR_ERR(update);
691         }
692
693         pos = cpu_to_le64(pos);
694         bufs[1] = (char *)&pos;
695         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
696         rc = out_insert_update(env, update, OUT_WRITE, fid,
697                                ARRAY_SIZE(sizes), sizes, bufs);
698
699         return rc;
700
701 }
702
703 static ssize_t osp_md_write(const struct lu_env *env, struct dt_object *dt,
704                             const struct lu_buf *buf, loff_t *pos,
705                             struct thandle *handle,
706                             struct lustre_capa *capa, int ignore_quota)
707 {
708         return buf->lb_len;
709 }
710
711 /* These body operation will be used to write symlinks during migration etc */
712 struct dt_body_operations osp_md_body_ops = {
713         .dbo_declare_write      = osp_md_declare_write,
714         .dbo_write              = osp_md_write,
715 };