Whamcloud - gitweb
9df10ed502449de6bbf8a1530106af596186af9e
[fs/lustre-release.git] / lustre / osp / osp_md_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, Intel Corporation.
24  */
25 /*
26  * lustre/osp/osp_md_object.c
27  *
28  * Lustre MDT Proxy Device
29  *
30  * Author: Di Wang <di.wang@intel.com>
31  */
32
33 #ifndef EXPORT_SYMTAB
34 # define EXPORT_SYMTAB
35 #endif
36 #define DEBUG_SUBSYSTEM S_MDS
37
38 #include <lustre_log.h>
39 #include <lustre_update.h>
40 #include "osp_internal.h"
41 static const char dot[] = ".";
42 static const char dotdot[] = "..";
43
44 static int osp_prep_update_req(const struct lu_env *env,
45                                struct osp_device *osp,
46                                struct update_buf *ubuf, int ubuf_len,
47                                struct ptlrpc_request **reqp)
48 {
49         struct obd_import      *imp;
50         struct ptlrpc_request  *req;
51         struct update_buf      *tmp;
52         int                     rc;
53         ENTRY;
54
55         imp = osp->opd_obd->u.cli.cl_import;
56         LASSERT(imp);
57
58         req = ptlrpc_request_alloc(imp, &RQF_UPDATE_OBJ);
59         if (req == NULL)
60                 RETURN(-ENOMEM);
61
62         req_capsule_set_size(&req->rq_pill, &RMF_UPDATE, RCL_CLIENT,
63                              UPDATE_BUFFER_SIZE);
64
65         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, UPDATE_OBJ);
66         if (rc != 0) {
67                 ptlrpc_req_finished(req);
68                 RETURN(rc);
69         }
70
71         req_capsule_set_size(&req->rq_pill, &RMF_UPDATE_REPLY, RCL_SERVER,
72                              UPDATE_BUFFER_SIZE);
73
74         tmp = req_capsule_client_get(&req->rq_pill, &RMF_UPDATE);
75         memcpy(tmp, ubuf, ubuf_len);
76
77         ptlrpc_request_set_replen(req);
78
79         *reqp = req;
80
81         RETURN(rc);
82 }
83
84 static int osp_remote_sync(const struct lu_env *env, struct dt_device *dt,
85                            struct update_request *update,
86                            struct ptlrpc_request **reqp)
87 {
88         struct osp_device       *osp = dt2osp_dev(dt);
89         struct ptlrpc_request   *req = NULL;
90         int                     rc;
91         ENTRY;
92
93         rc = osp_prep_update_req(env, osp, update->ur_buf,
94                                  UPDATE_BUFFER_SIZE, &req);
95         if (rc)
96                 RETURN(rc);
97
98         /* Note: some dt index api might return non-zero result here, like
99          * osd_index_ea_lookup, so we should only check rc < 0 here */
100         rc = ptlrpc_queue_wait(req);
101         if (rc < 0) {
102                 ptlrpc_req_finished(req);
103                 update->ur_rc = rc;
104                 RETURN(rc);
105         }
106
107         if (reqp != NULL) {
108                 *reqp = req;
109                 RETURN(rc);
110         }
111
112         update->ur_rc = rc;
113
114         ptlrpc_req_finished(req);
115
116         RETURN(rc);
117 }
118
119 /**
120  * Create a new update request for the device.
121  */
122 static struct update_request
123 *osp_create_update_req(struct dt_device *dt)
124 {
125         struct update_request *update;
126
127         OBD_ALLOC_PTR(update);
128         if (!update)
129                 return ERR_PTR(-ENOMEM);
130
131         OBD_ALLOC(update->ur_buf, UPDATE_BUFFER_SIZE);
132         if (update->ur_buf == NULL) {
133                 OBD_FREE_PTR(update);
134                 return ERR_PTR(-ENOMEM);
135         }
136
137         CFS_INIT_LIST_HEAD(&update->ur_list);
138         update->ur_dt = dt;
139
140         update->ur_buf->ub_magic = UPDATE_BUFFER_MAGIC;
141         update->ur_buf->ub_count = 0;
142
143         return update;
144 }
145
146 static void osp_destroy_update_req(struct update_request *update)
147 {
148         if (update == NULL)
149                 return;
150
151         cfs_list_del(&update->ur_list);
152         if (update->ur_buf != NULL)
153                 OBD_FREE(update->ur_buf, UPDATE_BUFFER_SIZE);
154
155         OBD_FREE_PTR(update);
156         return;
157 }
158
159 int osp_trans_stop(const struct lu_env *env, struct thandle *th)
160 {
161         int rc = 0;
162
163         osp_destroy_update_req(th->th_current_request);
164         rc = th->th_current_request->ur_rc;
165         th->th_current_request = NULL;
166
167         return rc;
168 }
169
170 /**
171  * In DNE phase I, all remote updates will be packed into RPC (the format
172  * description is in lustre_idl.h) during declare phase, all of updates
173  * are attached to the transaction, one entry per OSP. Then in trans start,
174  * LOD will walk through these entries and send these UPDATEs to the remote
175  * MDT to be executed synchronously.
176  */
177 int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
178                     struct thandle *th)
179 {
180         struct update_request *update;
181         int rc = 0;
182
183         /* In phase I, if the transaction includes remote updates, the local
184          * update should be synchronized, so it will set th_sync = 1 */
185         update = th->th_current_request;
186         LASSERT(update != NULL && update->ur_dt == dt);
187         if (update->ur_buf->ub_count > 0) {
188                 rc = osp_remote_sync(env, dt, update, NULL);
189                 th->th_sync = 1;
190         }
191
192         RETURN(rc);
193 }
194
195 /**
196  * Insert the update into the th_bufs for the device.
197  */
198 static int osp_insert_update(const struct lu_env *env,
199                              struct update_request *update, int op,
200                              struct lu_fid *fid, int count, int *lens,
201                              char **bufs)
202 {
203         struct update_buf    *ubuf = update->ur_buf;
204         struct update        *obj_update;
205         char                 *ptr;
206         int                   i;
207         int                   update_length;
208         int                   rc = 0;
209         ENTRY;
210
211         obj_update = (struct update *)((char *)ubuf +
212                       cfs_size_round(update_buf_size(ubuf)));
213
214         /* Check update size to make sure it can fit into the buffer */
215         update_length = cfs_size_round(offsetof(struct update,
216                                        u_bufs[0]));
217         for (i = 0; i < count; i++)
218                 update_length += cfs_size_round(lens[i]);
219
220         if (cfs_size_round(update_buf_size(ubuf)) + update_length >
221             UPDATE_BUFFER_SIZE || ubuf->ub_count >= UPDATE_MAX_OPS) {
222                 CERROR("%s: insert up %p, idx %d cnt %d len %lu: rc = %d\n",
223                         update->ur_dt->dd_lu_dev.ld_obd->obd_name, ubuf,
224                         update_length, ubuf->ub_count, update_buf_size(ubuf),
225                         -E2BIG);
226                 RETURN(-E2BIG);
227         }
228
229         if (count > UPDATE_BUF_COUNT) {
230                 CERROR("%s: Insert too much params %d "DFID" op %d: rc = %d\n",
231                         update->ur_dt->dd_lu_dev.ld_obd->obd_name, count,
232                         PFID(fid), op, -E2BIG);
233                 RETURN(-E2BIG);
234         }
235
236         /* fill the update into the update buffer */
237         fid_cpu_to_le(&obj_update->u_fid, fid);
238         obj_update->u_type = cpu_to_le32(op);
239         for (i = 0; i < count; i++)
240                 obj_update->u_lens[i] = cpu_to_le32(lens[i]);
241
242         ptr = (char *)obj_update +
243                         cfs_size_round(offsetof(struct update, u_bufs[0]));
244         for (i = 0; i < count; i++)
245                 LOGL(bufs[i], lens[i], ptr);
246
247         ubuf->ub_count++;
248
249         CDEBUG(D_INFO, "%s: %p "DFID" idx %d: op %d params %d:%lu\n",
250                update->ur_dt->dd_lu_dev.ld_obd->obd_name, ubuf, PFID(fid),
251                ubuf->ub_count, op, count, update_buf_size(ubuf));
252
253         RETURN(rc);
254 }
255
256 static struct update_request
257 *osp_find_update(struct thandle *th, struct dt_device *dt_dev)
258 {
259         struct update_request   *update;
260
261         /* Because transaction api does not proivde the interface
262          * to transfer the update from LOD to OSP,  we need walk
263          * remote update list to find the update, this probably
264          * should move to LOD layer, when update can be part of
265          * the trancation api parameter. XXX */
266         cfs_list_for_each_entry(update, &th->th_remote_update_list, ur_list) {
267                 if (update->ur_dt == dt_dev)
268                         return update;
269         }
270         return NULL;
271 }
272
273 /**
274  * Find one loc in th_dev/dev_obj_update for the update,
275  * Because only one thread can access this thandle, no need
276  * lock now.
277  */
278 static struct update_request
279 *osp_find_create_update_loc(struct thandle *th, struct dt_object *dt)
280 {
281         struct dt_device        *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
282         struct update_request   *update;
283         ENTRY;
284
285         update = osp_find_update(th, dt_dev);
286         if (update != NULL)
287                 RETURN(update);
288
289         update = osp_create_update_req(dt_dev);
290         if (IS_ERR(update))
291                 RETURN(update);
292
293         cfs_list_add_tail(&update->ur_list, &th->th_remote_update_list);
294
295         RETURN(update);
296 }
297
298 static int osp_get_attr_from_req(const struct lu_env *env,
299                                  struct ptlrpc_request *req,
300                                  struct lu_attr *attr, int index)
301 {
302         struct update_reply     *reply;
303         struct obdo             *lobdo = &osp_env_info(env)->osi_obdo;
304         struct obdo             *wobdo;
305         int                     size;
306
307         LASSERT(attr != NULL);
308
309         reply = req_capsule_server_sized_get(&req->rq_pill, &RMF_UPDATE_REPLY,
310                                              UPDATE_BUFFER_SIZE);
311         if (reply->ur_version != UPDATE_REPLY_V1)
312                 return -EPROTO;
313
314         size = update_get_reply_buf(reply, (void **)&wobdo, index);
315         if (size != sizeof(struct obdo))
316                 return -EPROTO;
317
318         obdo_le_to_cpu(wobdo, wobdo);
319         lustre_get_wire_obdo(lobdo, wobdo);
320         la_from_obdo(attr, lobdo, lobdo->o_valid);
321
322         return 0;
323 }
324
325 static int osp_md_declare_object_create(const struct lu_env *env,
326                                         struct dt_object *dt,
327                                         struct lu_attr *attr,
328                                         struct dt_allocation_hint *hint,
329                                         struct dt_object_format *dof,
330                                         struct thandle *th)
331 {
332         struct osp_thread_info  *osi = osp_env_info(env);
333         struct update_request   *update;
334         struct lu_fid           *fid1;
335         int                     sizes[2] = {sizeof(struct obdo), 0};
336         char                    *bufs[2] = {NULL, NULL};
337         int                     buf_count;
338         int                     rc;
339
340         update = osp_find_create_update_loc(th, dt);
341         if (IS_ERR(update)) {
342                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
343                        dt->do_lu.lo_dev->ld_obd->obd_name,
344                        (int)PTR_ERR(update));
345                 return PTR_ERR(update);
346         }
347
348         osi->osi_obdo.o_valid = 0;
349         LASSERT(S_ISDIR(attr->la_mode));
350         obdo_from_la(&osi->osi_obdo, attr, attr->la_valid);
351         lustre_set_wire_obdo(&osi->osi_obdo, &osi->osi_obdo);
352         obdo_cpu_to_le(&osi->osi_obdo, &osi->osi_obdo);
353
354         bufs[0] = (char *)&osi->osi_obdo;
355         buf_count = 1;
356         fid1 = (struct lu_fid *)lu_object_fid(&dt->do_lu);
357         if (hint->dah_parent) {
358                 struct lu_fid *fid2;
359                 struct lu_fid *tmp_fid = &osi->osi_fid;
360
361                 fid2 = (struct lu_fid *)lu_object_fid(&hint->dah_parent->do_lu);
362                 fid_cpu_to_le(tmp_fid, fid2);
363                 sizes[1] = sizeof(*tmp_fid);
364                 bufs[1] = (char *)tmp_fid;
365                 buf_count++;
366         }
367
368         rc = osp_insert_update(env, update, OBJ_CREATE, fid1, buf_count, sizes,
369                                bufs);
370         if (rc) {
371                 CERROR("%s: Insert update error: rc = %d\n",
372                        dt->do_lu.lo_dev->ld_obd->obd_name, rc);
373         }
374
375         return rc;
376 }
377
378 static int osp_md_object_create(const struct lu_env *env, struct dt_object *dt,
379                                 struct lu_attr *attr,
380                                 struct dt_allocation_hint *hint,
381                                 struct dt_object_format *dof,
382                                 struct thandle *th)
383 {
384         struct osp_object  *obj = dt2osp_obj(dt);
385
386         CDEBUG(D_INFO, "create object "DFID"\n",
387                PFID(&dt->do_lu.lo_header->loh_fid));
388
389         /* Because the create update RPC will be sent during declare phase,
390          * if creation reaches here, it means the object has been created
391          * successfully */
392         dt->do_lu.lo_header->loh_attr |= LOHA_EXISTS | (attr->la_mode & S_IFMT);
393         obj->opo_empty = 1;
394
395         return 0;
396 }
397
398 static int osp_md_declare_object_ref_del(const struct lu_env *env,
399                                          struct dt_object *dt,
400                                          struct thandle *th)
401 {
402         struct update_request   *update;
403         struct lu_fid           *fid;
404         int                     rc;
405
406         update = osp_find_create_update_loc(th, dt);
407         if (IS_ERR(update)) {
408                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
409                        dt->do_lu.lo_dev->ld_obd->obd_name,
410                       (int)PTR_ERR(update));
411                 return PTR_ERR(update);
412         }
413
414         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
415
416         rc = osp_insert_update(env, update, OBJ_REF_DEL, fid, 0, NULL, NULL);
417
418         return rc;
419 }
420
421 static int osp_md_object_ref_del(const struct lu_env *env,
422                                  struct dt_object *dt,
423                                  struct thandle *th)
424 {
425         CDEBUG(D_INFO, "ref del object "DFID"\n",
426                PFID(&dt->do_lu.lo_header->loh_fid));
427
428         return 0;
429 }
430
431 static int osp_md_declare_ref_add(const struct lu_env *env,
432                                   struct dt_object *dt, struct thandle *th)
433 {
434         struct update_request   *update;
435         struct lu_fid           *fid;
436         int                     rc;
437
438         update = osp_find_create_update_loc(th, dt);
439         if (IS_ERR(update)) {
440                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
441                        dt->do_lu.lo_dev->ld_obd->obd_name,
442                        (int)PTR_ERR(update));
443                 return PTR_ERR(update);
444         }
445
446         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
447
448         rc = osp_insert_update(env, update, OBJ_REF_ADD, fid, 0, NULL, NULL);
449
450         return rc;
451 }
452
453 static int osp_md_object_ref_add(const struct lu_env *env,
454                                  struct dt_object *dt,
455                                  struct thandle *th)
456 {
457         CDEBUG(D_INFO, "ref add object "DFID"\n",
458                PFID(&dt->do_lu.lo_header->loh_fid));
459
460         return 0;
461 }
462
463 static void osp_md_ah_init(const struct lu_env *env,
464                            struct dt_allocation_hint *ah,
465                            struct dt_object *parent,
466                            struct dt_object *child,
467                            cfs_umode_t child_mode)
468 {
469         LASSERT(ah);
470
471         memset(ah, 0, sizeof(*ah));
472         ah->dah_parent = parent;
473         ah->dah_mode = child_mode;
474 }
475
476 static int osp_md_declare_attr_set(const struct lu_env *env,
477                                    struct dt_object *dt,
478                                    const struct lu_attr *attr,
479                                    struct thandle *th)
480 {
481         struct osp_thread_info *osi = osp_env_info(env);
482         struct update_request  *update;
483         struct lu_fid          *fid;
484         int                     size = sizeof(struct obdo);
485         char                   *buf;
486         int                     rc;
487
488         update = osp_find_create_update_loc(th, dt);
489         if (IS_ERR(update)) {
490                 CERROR("%s: Get OSP update buf failed: %d\n",
491                        dt->do_lu.lo_dev->ld_obd->obd_name,
492                        (int)PTR_ERR(update));
493                 return PTR_ERR(update);
494         }
495
496         osi->osi_obdo.o_valid = 0;
497         LASSERT(!(attr->la_valid & (LA_MODE | LA_TYPE)));
498         obdo_from_la(&osi->osi_obdo, (struct lu_attr *)attr,
499                      attr->la_valid);
500         lustre_set_wire_obdo(&osi->osi_obdo, &osi->osi_obdo);
501         obdo_cpu_to_le(&osi->osi_obdo, &osi->osi_obdo);
502
503         buf = (char *)&osi->osi_obdo;
504         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
505
506         rc = osp_insert_update(env, update, OBJ_ATTR_SET, fid, 1, &size, &buf);
507
508         return rc;
509 }
510
511 static int osp_md_attr_set(const struct lu_env *env, struct dt_object *dt,
512                            const struct lu_attr *attr, struct thandle *th,
513                            struct lustre_capa *capa)
514 {
515         CDEBUG(D_INFO, "attr set object "DFID"\n",
516                PFID(&dt->do_lu.lo_header->loh_fid));
517
518         RETURN(0);
519 }
520
521 static int osp_md_declare_xattr_set(const struct lu_env *env,
522                                     struct dt_object *dt,
523                                     const struct lu_buf *buf,
524                                     const char *name, int flag,
525                                     struct thandle *th)
526 {
527         struct update_request   *update;
528         struct lu_fid           *fid;
529         int                     sizes[3] = {strlen(name), buf->lb_len,
530                                             sizeof(int)};
531         char                    *bufs[3] = {(char *)name, (char *)buf->lb_buf };
532         int                     rc;
533
534         update = osp_find_create_update_loc(th, dt);
535         if (IS_ERR(update)) {
536                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
537                        dt->do_lu.lo_dev->ld_obd->obd_name,
538                        (int)PTR_ERR(update));
539                 return PTR_ERR(update);
540         }
541
542         flag = cpu_to_le32(flag);
543         bufs[2] = (char *)&flag;
544
545         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
546         rc = osp_insert_update(env, update, OBJ_XATTR_SET, fid,
547                                ARRAY_SIZE(sizes), sizes, bufs);
548
549         return rc;
550 }
551
552 static int osp_md_xattr_set(const struct lu_env *env, struct dt_object *dt,
553                             const struct lu_buf *buf, const char *name, int fl,
554                             struct thandle *th, struct lustre_capa *capa)
555 {
556         CDEBUG(D_INFO, "xattr %s set object "DFID"\n", name,
557                PFID(&dt->do_lu.lo_header->loh_fid));
558
559         return 0;
560 }
561
562 static int osp_md_xattr_get(const struct lu_env *env, struct dt_object *dt,
563                             struct lu_buf *buf, const char *name,
564                             struct lustre_capa *capa)
565 {
566         struct dt_device        *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
567         struct update_request   *update = NULL;
568         struct ptlrpc_request   *req = NULL;
569         int                     rc;
570         int                     buf_len;
571         int                     size;
572         struct update_reply     *reply;
573         void                    *ea_buf;
574         ENTRY;
575
576         /* Because it needs send the update buffer right away,
577          * just create an update buffer, instead of attaching the
578          * update_remote list of the thandle.
579          */
580         update = osp_create_update_req(dt_dev);
581         if (IS_ERR(update))
582                 RETURN(PTR_ERR(update));
583
584         LASSERT(name != NULL);
585         buf_len = strlen(name);
586         rc = osp_insert_update(env, update, OBJ_XATTR_GET,
587                                (struct lu_fid *)lu_object_fid(&dt->do_lu),
588                                1, &buf_len, (char **)&name);
589         if (rc != 0) {
590                 CERROR("%s: Insert update error: rc = %d\n",
591                        dt->do_lu.lo_dev->ld_obd->obd_name, rc);
592                 GOTO(out, rc);
593         }
594         dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
595
596         rc = osp_remote_sync(env, dt_dev, update, &req);
597         if (rc != 0)
598                 GOTO(out, rc);
599
600         reply = req_capsule_server_sized_get(&req->rq_pill, &RMF_UPDATE_REPLY,
601                                             UPDATE_BUFFER_SIZE);
602         if (reply->ur_version != UPDATE_REPLY_V1) {
603                 CERROR("%s: Wrong version %x expected %x: rc = %d\n",
604                        dt_dev->dd_lu_dev.ld_obd->obd_name,
605                        reply->ur_version, UPDATE_REPLY_V1, -EPROTO);
606                 GOTO(out, rc = -EPROTO);
607         }
608
609         size = update_get_reply_buf(reply, &ea_buf, 0);
610         if (size < 0)
611                 GOTO(out, rc = size);
612
613         LASSERT(size > 0 && size < CFS_PAGE_SIZE);
614         LASSERT(ea_buf != NULL);
615
616         buf->lb_len = size;
617         memcpy(buf->lb_buf, ea_buf, size);
618 out:
619         if (req != NULL)
620                 ptlrpc_req_finished(req);
621
622         if (update != NULL)
623                 osp_destroy_update_req(update);
624
625         RETURN(rc);
626 }
627
628 static void osp_md_object_read_lock(const struct lu_env *env,
629                                     struct dt_object *dt, unsigned role)
630 {
631         struct osp_object  *obj = dt2osp_obj(dt);
632
633         LASSERT(obj->opo_owner != env);
634         down_read_nested(&obj->opo_sem, role);
635
636         LASSERT(obj->opo_owner == NULL);
637 }
638
639 static void osp_md_object_write_lock(const struct lu_env *env,
640                                      struct dt_object *dt, unsigned role)
641 {
642         struct osp_object *obj = dt2osp_obj(dt);
643
644         down_write_nested(&obj->opo_sem, role);
645
646         LASSERT(obj->opo_owner == NULL);
647         obj->opo_owner = env;
648 }
649
650 static void osp_md_object_read_unlock(const struct lu_env *env,
651                                       struct dt_object *dt)
652 {
653         struct osp_object *obj = dt2osp_obj(dt);
654
655         up_read(&obj->opo_sem);
656 }
657
658 static void osp_md_object_write_unlock(const struct lu_env *env,
659                                        struct dt_object *dt)
660 {
661         struct osp_object *obj = dt2osp_obj(dt);
662
663         LASSERT(obj->opo_owner == env);
664         obj->opo_owner = NULL;
665         up_write(&obj->opo_sem);
666 }
667
668 static int osp_md_object_write_locked(const struct lu_env *env,
669                                       struct dt_object *dt)
670 {
671         struct osp_object *obj = dt2osp_obj(dt);
672
673         return obj->opo_owner == env;
674 }
675
676 static int osp_md_index_lookup(const struct lu_env *env, struct dt_object *dt,
677                                struct dt_rec *rec, const struct dt_key *key,
678                                struct lustre_capa *capa)
679 {
680         struct dt_device        *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
681         struct update_request   *update;
682         struct ptlrpc_request   *req = NULL;
683         int                     size = strlen((char *)key) + 1;
684         char                    *name = (char *)key;
685         int                     rc;
686         struct update_reply     *reply;
687         struct lu_fid           *fid;
688
689         ENTRY;
690
691         /* Because it needs send the update buffer right away,
692          * just create an update buffer, instead of attaching the
693          * update_remote list of the thandle.
694          */
695         update = osp_create_update_req(dt_dev);
696         if (IS_ERR(update))
697                 RETURN(PTR_ERR(update));
698
699         rc = osp_insert_update(env, update, OBJ_INDEX_LOOKUP,
700                                (struct lu_fid *)lu_object_fid(&dt->do_lu),
701                                1, &size, (char **)&name);
702         if (rc) {
703                 CERROR("%s: Insert update error: rc = %d\n",
704                        dt_dev->dd_lu_dev.ld_obd->obd_name, rc);
705                 GOTO(out, rc);
706         }
707
708         rc = osp_remote_sync(env, dt_dev, update, &req);
709         if (rc < 0) {
710                 CERROR("%s: lookup "DFID" %s failed: rc = %d\n",
711                        dt_dev->dd_lu_dev.ld_obd->obd_name,
712                        PFID(lu_object_fid(&dt->do_lu)), (char *)key, rc);
713                 GOTO(out, rc);
714         }
715
716         reply = req_capsule_server_sized_get(&req->rq_pill, &RMF_UPDATE_REPLY,
717                                              UPDATE_BUFFER_SIZE);
718         if (reply->ur_version != UPDATE_REPLY_V1) {
719                 CERROR("%s: Wrong version %x expected %x: rc = %d\n",
720                        dt_dev->dd_lu_dev.ld_obd->obd_name,
721                        reply->ur_version, UPDATE_REPLY_V1, -EPROTO);
722                 GOTO(out, rc = -EPROTO);
723         }
724
725         rc = update_get_reply_result(reply, NULL, 0);
726         if (rc < 0) {
727                 CERROR("%s: wrong version lookup "DFID" %s: rc = %d\n",
728                        dt_dev->dd_lu_dev.ld_obd->obd_name,
729                        PFID(lu_object_fid(&dt->do_lu)), (char *)key, rc);
730                 GOTO(out, rc);
731         }
732
733         size = update_get_reply_buf(reply, (void **)&fid, 0);
734         if (size < 0)
735                 GOTO(out, rc = size);
736
737         if (size != sizeof(struct lu_fid)) {
738                 CERROR("%s: lookup "DFID" %s wrong size %d: rc = %d\n",
739                        dt_dev->dd_lu_dev.ld_obd->obd_name,
740                        PFID(lu_object_fid(&dt->do_lu)), (char *)key, size, rc);
741                 GOTO(out, rc = -EINVAL);
742         }
743
744         fid_le_to_cpu(fid, fid);
745         if (!fid_is_sane(fid)) {
746                 CERROR("%s: lookup "DFID" %s invalid fid "DFID": rc = %d\n",
747                        dt_dev->dd_lu_dev.ld_obd->obd_name,
748                        PFID(lu_object_fid(&dt->do_lu)), (char *)key, PFID(fid),
749                        rc);
750                 GOTO(out, rc = -EINVAL);
751         }
752         memcpy(rec, fid, sizeof(*fid));
753 out:
754         if (req != NULL)
755                 ptlrpc_req_finished(req);
756
757         if (update != NULL)
758                 osp_destroy_update_req(update);
759
760         RETURN(rc);
761 }
762
763 static int osp_md_declare_insert(const struct lu_env *env,
764                                  struct dt_object *dt,
765                                  const struct dt_rec *rec,
766                                  const struct dt_key *key,
767                                  struct thandle *th)
768 {
769         struct update_request   *update;
770         struct lu_fid           *fid;
771         struct lu_fid           *rec_fid = (struct lu_fid *)rec;
772         int                     size[2] = {strlen((char *)key) + 1,
773                                                   sizeof(*rec_fid)};
774         char                    *bufs[2] = {(char *)key, (char *)rec_fid};
775         int                     rc;
776
777         update = osp_find_create_update_loc(th, dt);
778         if (IS_ERR(update)) {
779                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
780                        dt->do_lu.lo_dev->ld_obd->obd_name,
781                        (int)PTR_ERR(update));
782                 return PTR_ERR(update);
783         }
784
785         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
786
787         CDEBUG(D_INFO, "%s: insert index of "DFID" %s: "DFID"\n",
788                dt->do_lu.lo_dev->ld_obd->obd_name,
789                PFID(fid), (char *)key, PFID(rec_fid));
790
791         fid_cpu_to_le(rec_fid, rec_fid);
792
793         rc = osp_insert_update(env, update, OBJ_INDEX_INSERT, fid,
794                                ARRAY_SIZE(size), size, bufs);
795         return rc;
796 }
797
798 static int osp_md_index_insert(const struct lu_env *env,
799                                struct dt_object *dt,
800                                const struct dt_rec *rec,
801                                const struct dt_key *key,
802                                struct thandle *th,
803                                struct lustre_capa *capa,
804                                int ignore_quota)
805 {
806         return 0;
807 }
808
809 static int osp_md_declare_delete(const struct lu_env *env,
810                                  struct dt_object *dt,
811                                  const struct dt_key *key,
812                                  struct thandle *th)
813 {
814         struct update_request *update;
815         struct lu_fid *fid;
816         int size = strlen((char *)key) + 1;
817         char *buf = (char *)key;
818         int rc;
819
820         update = osp_find_create_update_loc(th, dt);
821         if (IS_ERR(update)) {
822                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
823                        dt->do_lu.lo_dev->ld_obd->obd_name,
824                        (int)PTR_ERR(update));
825                 return PTR_ERR(update);
826         }
827
828         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
829
830         rc = osp_insert_update(env, update, OBJ_INDEX_DELETE, fid, 1, &size,
831                                &buf);
832
833         return rc;
834 }
835
836 static int osp_md_index_delete(const struct lu_env *env,
837                                struct dt_object *dt,
838                                const struct dt_key *key,
839                                struct thandle *th,
840                                struct lustre_capa *capa)
841 {
842         CDEBUG(D_INFO, "index delete "DFID" %s\n",
843                PFID(&dt->do_lu.lo_header->loh_fid), (char *)key);
844
845         return 0;
846 }
847
848 /**
849  * Creates or initializes iterator context.
850  *
851  * Note: for OSP, these index iterate api is only used to check
852  * whether the directory is empty now (see mdd_dir_is_empty).
853  * Since dir_empty will be return by OBJ_ATTR_GET(see osp_md_attr_get/
854  * out_attr_get). So the implementation of these iterator is simplied
855  * to make mdd_dir_is_empty happy. The real iterator should be
856  * implemented, if we need it one day.
857  */
858 static struct dt_it *osp_it_init(const struct lu_env *env,
859                                  struct dt_object *dt,
860                                  __u32 attr,
861                                 struct lustre_capa *capa)
862 {
863         lu_object_get(&dt->do_lu);
864         return (struct dt_it *)dt;
865 }
866
867 static void osp_it_fini(const struct lu_env *env, struct dt_it *di)
868 {
869         struct dt_object *dt = (struct dt_object *)di;
870         lu_object_put(env, &dt->do_lu);
871 }
872
873 static int osp_it_get(const struct lu_env *env,
874                       struct dt_it *di, const struct dt_key *key)
875 {
876         return 1;
877 }
878
879 static void osp_it_put(const struct lu_env *env, struct dt_it *di)
880 {
881         return;
882 }
883
884 static int osp_it_next(const struct lu_env *env, struct dt_it *di)
885 {
886         struct dt_object *dt = (struct dt_object *)di;
887         struct osp_object *osp_obj = dt2osp_obj(dt);
888         if (osp_obj->opo_empty)
889                 return 1;
890         return 0;
891 }
892
893 static struct dt_key *osp_it_key(const struct lu_env *env,
894                                  const struct dt_it *di)
895 {
896         LBUG();
897         return NULL;
898 }
899
900 static int osp_it_key_size(const struct lu_env *env, const struct dt_it *di)
901 {
902         LBUG();
903         return 0;
904 }
905
906 static int osp_it_rec(const struct lu_env *env, const struct dt_it *di,
907                       struct dt_rec *lde, __u32 attr)
908 {
909         LBUG();
910         return 0;
911 }
912
913 static __u64 osp_it_store(const struct lu_env *env, const struct dt_it *di)
914 {
915         LBUG();
916         return 0;
917 }
918
919 static int osp_it_load(const struct lu_env *env, const struct dt_it *di,
920                        __u64 hash)
921 {
922         LBUG();
923         return 0;
924 }
925
926 static int osp_it_key_rec(const struct lu_env *env, const struct dt_it *di,
927                           void *key_rec)
928 {
929         LBUG();
930         return 0;
931 }
932
933 static const struct dt_index_operations osp_md_index_ops = {
934         .dio_lookup         = osp_md_index_lookup,
935         .dio_declare_insert = osp_md_declare_insert,
936         .dio_insert         = osp_md_index_insert,
937         .dio_declare_delete = osp_md_declare_delete,
938         .dio_delete         = osp_md_index_delete,
939         .dio_it     = {
940                 .init     = osp_it_init,
941                 .fini     = osp_it_fini,
942                 .get      = osp_it_get,
943                 .put      = osp_it_put,
944                 .next     = osp_it_next,
945                 .key      = osp_it_key,
946                 .key_size = osp_it_key_size,
947                 .rec      = osp_it_rec,
948                 .store    = osp_it_store,
949                 .load     = osp_it_load,
950                 .key_rec  = osp_it_key_rec,
951         }
952 };
953
954 static int osp_md_index_try(const struct lu_env *env,
955                             struct dt_object *dt,
956                             const struct dt_index_features *feat)
957 {
958         dt->do_index_ops = &osp_md_index_ops;
959         return 0;
960 }
961
962 static int osp_md_attr_get(const struct lu_env *env,
963                            struct dt_object *dt, struct lu_attr *attr,
964                            struct lustre_capa *capa)
965 {
966         struct osp_object     *obj = dt2osp_obj(dt);
967         struct dt_device      *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
968         struct update_request *update = NULL;
969         struct ptlrpc_request *req = NULL;
970         int rc;
971         ENTRY;
972
973         /* Because it needs send the update buffer right away,
974          * just create an update buffer, instead of attaching the
975          * update_remote list of the thandle.
976          */
977         update = osp_create_update_req(dt_dev);
978         if (IS_ERR(update))
979                 RETURN(PTR_ERR(update));
980
981         rc = osp_insert_update(env, update, OBJ_ATTR_GET,
982                                (struct lu_fid *)lu_object_fid(&dt->do_lu),
983                                0, NULL, NULL);
984         if (rc) {
985                 CERROR("%s: Insert update error: rc = %d\n",
986                        dt_dev->dd_lu_dev.ld_obd->obd_name, rc);
987                 GOTO(out, rc);
988         }
989         dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
990
991         rc = osp_remote_sync(env, dt_dev, update, &req);
992         if (rc < 0)
993                 GOTO(out, rc);
994
995         rc = osp_get_attr_from_req(env, req, attr, 0);
996         if (rc)
997                 GOTO(out, rc);
998
999         if (attr->la_flags == 1)
1000                 obj->opo_empty = 0;
1001         else
1002                 obj->opo_empty = 1;
1003 out:
1004         if (req != NULL)
1005                 ptlrpc_req_finished(req);
1006
1007         if (update != NULL)
1008                 osp_destroy_update_req(update);
1009
1010         RETURN(rc);
1011 }
1012
1013 static int osp_md_declare_object_destroy(const struct lu_env *env,
1014                                          struct dt_object *dt,
1015                                          struct thandle *th)
1016 {
1017         struct osp_object  *o = dt2osp_obj(dt);
1018         int                 rc = 0;
1019         ENTRY;
1020
1021         /*
1022          * track objects to be destroyed via llog
1023          */
1024         rc = osp_sync_declare_add(env, o, MDS_UNLINK64_REC, th);
1025
1026         RETURN(rc);
1027 }
1028
1029 static int osp_md_object_destroy(const struct lu_env *env,
1030                                  struct dt_object *dt, struct thandle *th)
1031 {
1032         struct osp_object  *o = dt2osp_obj(dt);
1033         int                 rc = 0;
1034         ENTRY;
1035
1036         /*
1037          * once transaction is committed put proper command on
1038          * the queue going to our OST
1039          */
1040         rc = osp_sync_add(env, o, MDS_UNLINK64_REC, th, NULL);
1041
1042         /* not needed in cache any more */
1043         set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags);
1044
1045         RETURN(rc);
1046 }
1047
1048 struct dt_object_operations osp_md_obj_ops = {
1049         .do_read_lock         = osp_md_object_read_lock,
1050         .do_write_lock        = osp_md_object_write_lock,
1051         .do_read_unlock       = osp_md_object_read_unlock,
1052         .do_write_unlock      = osp_md_object_write_unlock,
1053         .do_write_locked      = osp_md_object_write_locked,
1054         .do_declare_create    = osp_md_declare_object_create,
1055         .do_create            = osp_md_object_create,
1056         .do_declare_ref_add   = osp_md_declare_ref_add,
1057         .do_ref_add           = osp_md_object_ref_add,
1058         .do_declare_ref_del   = osp_md_declare_object_ref_del,
1059         .do_ref_del           = osp_md_object_ref_del,
1060         .do_declare_destroy   = osp_md_declare_object_destroy,
1061         .do_destroy           = osp_md_object_destroy,
1062         .do_ah_init           = osp_md_ah_init,
1063         .do_attr_get          = osp_md_attr_get,
1064         .do_declare_attr_set  = osp_md_declare_attr_set,
1065         .do_attr_set          = osp_md_attr_set,
1066         .do_declare_xattr_set = osp_md_declare_xattr_set,
1067         .do_xattr_set         = osp_md_xattr_set,
1068         .do_xattr_get         = osp_md_xattr_get,
1069         .do_index_try         = osp_md_index_try,
1070 };
1071