Whamcloud - gitweb
LU-3529 osp: init FID client for OSP on MDT.
[fs/lustre-release.git] / lustre / osp / osp_md_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, Intel Corporation.
24  */
25 /*
26  * lustre/osp/osp_md_object.c
27  *
28  * Lustre MDT Proxy Device
29  *
30  * Author: Di Wang <di.wang@intel.com>
31  */
32
33 #define DEBUG_SUBSYSTEM S_MDS
34
35 #include <lustre_log.h>
36 #include <lustre_update.h>
37 #include "osp_internal.h"
38 static const char dot[] = ".";
39 static const char dotdot[] = "..";
40
41 static int osp_prep_update_req(const struct lu_env *env,
42                                struct osp_device *osp,
43                                struct update_buf *ubuf, int ubuf_len,
44                                struct ptlrpc_request **reqp)
45 {
46         struct obd_import      *imp;
47         struct ptlrpc_request  *req;
48         struct update_buf      *tmp;
49         int                     rc;
50         ENTRY;
51
52         imp = osp->opd_obd->u.cli.cl_import;
53         LASSERT(imp);
54
55         req = ptlrpc_request_alloc(imp, &RQF_UPDATE_OBJ);
56         if (req == NULL)
57                 RETURN(-ENOMEM);
58
59         req_capsule_set_size(&req->rq_pill, &RMF_UPDATE, RCL_CLIENT,
60                              UPDATE_BUFFER_SIZE);
61
62         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, UPDATE_OBJ);
63         if (rc != 0) {
64                 ptlrpc_req_finished(req);
65                 RETURN(rc);
66         }
67
68         req_capsule_set_size(&req->rq_pill, &RMF_UPDATE_REPLY, RCL_SERVER,
69                              UPDATE_BUFFER_SIZE);
70
71         tmp = req_capsule_client_get(&req->rq_pill, &RMF_UPDATE);
72         memcpy(tmp, ubuf, ubuf_len);
73
74         ptlrpc_request_set_replen(req);
75
76         *reqp = req;
77
78         RETURN(rc);
79 }
80
81 static int osp_remote_sync(const struct lu_env *env, struct dt_device *dt,
82                            struct update_request *update,
83                            struct ptlrpc_request **reqp)
84 {
85         struct osp_device       *osp = dt2osp_dev(dt);
86         struct ptlrpc_request   *req = NULL;
87         int                     rc;
88         ENTRY;
89
90         rc = osp_prep_update_req(env, osp, update->ur_buf, UPDATE_BUFFER_SIZE,
91                                  &req);
92         if (rc)
93                 RETURN(rc);
94
95         /* Note: some dt index api might return non-zero result here, like
96          * osd_index_ea_lookup, so we should only check rc < 0 here */
97         rc = ptlrpc_queue_wait(req);
98         if (rc < 0) {
99                 ptlrpc_req_finished(req);
100                 update->ur_rc = rc;
101                 RETURN(rc);
102         }
103
104         if (reqp != NULL) {
105                 *reqp = req;
106                 RETURN(rc);
107         }
108
109         update->ur_rc = rc;
110
111         ptlrpc_req_finished(req);
112
113         RETURN(rc);
114 }
115
116 /**
117  * Create a new update request for the device.
118  */
119 static struct update_request
120 *osp_create_update_req(struct dt_device *dt)
121 {
122         struct update_request *update;
123
124         OBD_ALLOC_PTR(update);
125         if (!update)
126                 return ERR_PTR(-ENOMEM);
127
128         OBD_ALLOC_LARGE(update->ur_buf, UPDATE_BUFFER_SIZE);
129         if (update->ur_buf == NULL) {
130                 OBD_FREE_PTR(update);
131                 return ERR_PTR(-ENOMEM);
132         }
133
134         CFS_INIT_LIST_HEAD(&update->ur_list);
135         update->ur_dt = dt;
136         update->ur_batchid = 0;
137         update->ur_buf->ub_magic = UPDATE_BUFFER_MAGIC;
138         update->ur_buf->ub_count = 0;
139
140         return update;
141 }
142
143 static void osp_destroy_update_req(struct update_request *update)
144 {
145         if (update == NULL)
146                 return;
147
148         cfs_list_del(&update->ur_list);
149         if (update->ur_buf != NULL)
150                 OBD_FREE_LARGE(update->ur_buf, UPDATE_BUFFER_SIZE);
151
152         OBD_FREE_PTR(update);
153         return;
154 }
155
156 int osp_trans_stop(const struct lu_env *env, struct thandle *th)
157 {
158         int rc = 0;
159
160         rc = th->th_current_request->ur_rc;
161         osp_destroy_update_req(th->th_current_request);
162         th->th_current_request = NULL;
163
164         return rc;
165 }
166
167 /**
168  * In DNE phase I, all remote updates will be packed into RPC (the format
169  * description is in lustre_idl.h) during declare phase, all of updates
170  * are attached to the transaction, one entry per OSP. Then in trans start,
171  * LOD will walk through these entries and send these UPDATEs to the remote
172  * MDT to be executed synchronously.
173  */
174 int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
175                     struct thandle *th)
176 {
177         struct update_request *update;
178         int rc = 0;
179
180         /* In phase I, if the transaction includes remote updates, the local
181          * update should be synchronized, so it will set th_sync = 1 */
182         update = th->th_current_request;
183         LASSERT(update != NULL && update->ur_dt == dt);
184         if (update->ur_buf->ub_count > 0) {
185                 rc = osp_remote_sync(env, dt, update, NULL);
186                 th->th_sync = 1;
187         }
188
189         RETURN(rc);
190 }
191
192 /**
193  * Insert the update into the th_bufs for the device.
194  */
195 static int osp_insert_update(const struct lu_env *env,
196                              struct update_request *update, int op,
197                              struct lu_fid *fid, int count,
198                              int *lens, char **bufs)
199 {
200         struct update_buf    *ubuf = update->ur_buf;
201         struct update        *obj_update;
202         char                 *ptr;
203         int                   i;
204         int                   update_length;
205         int                   rc = 0;
206         ENTRY;
207
208         obj_update = (struct update *)((char *)ubuf +
209                       cfs_size_round(update_buf_size(ubuf)));
210
211         /* Check update size to make sure it can fit into the buffer */
212         update_length = cfs_size_round(offsetof(struct update,
213                                        u_bufs[0]));
214         for (i = 0; i < count; i++)
215                 update_length += cfs_size_round(lens[i]);
216
217         if (cfs_size_round(update_buf_size(ubuf)) + update_length >
218             UPDATE_BUFFER_SIZE || ubuf->ub_count >= UPDATE_MAX_OPS) {
219                 CERROR("%s: insert up %p, idx %d cnt %d len %lu: rc = %d\n",
220                         update->ur_dt->dd_lu_dev.ld_obd->obd_name, ubuf,
221                         update_length, ubuf->ub_count, update_buf_size(ubuf),
222                         -E2BIG);
223                 RETURN(-E2BIG);
224         }
225
226         if (count > UPDATE_BUF_COUNT) {
227                 CERROR("%s: Insert too much params %d "DFID" op %d: rc = %d\n",
228                         update->ur_dt->dd_lu_dev.ld_obd->obd_name, count,
229                         PFID(fid), op, -E2BIG);
230                 RETURN(-E2BIG);
231         }
232
233         /* fill the update into the update buffer */
234         fid_cpu_to_le(&obj_update->u_fid, fid);
235         obj_update->u_type = cpu_to_le32(op);
236         obj_update->u_batchid = update->ur_batchid;
237         for (i = 0; i < count; i++)
238                 obj_update->u_lens[i] = cpu_to_le32(lens[i]);
239
240         ptr = (char *)obj_update +
241                         cfs_size_round(offsetof(struct update, u_bufs[0]));
242         for (i = 0; i < count; i++)
243                 LOGL(bufs[i], lens[i], ptr);
244
245         ubuf->ub_count++;
246
247         CDEBUG(D_INFO, "%s: %p "DFID" idx %d: op %d params %d:%lu\n",
248                update->ur_dt->dd_lu_dev.ld_obd->obd_name, ubuf, PFID(fid),
249                ubuf->ub_count, op, count, update_buf_size(ubuf));
250
251         RETURN(rc);
252 }
253
254 static struct update_request
255 *osp_find_update(struct thandle *th, struct dt_device *dt_dev)
256 {
257         struct update_request   *update;
258
259         /* Because transaction api does not proivde the interface
260          * to transfer the update from LOD to OSP,  we need walk
261          * remote update list to find the update, this probably
262          * should move to LOD layer, when update can be part of
263          * the trancation api parameter. XXX */
264         cfs_list_for_each_entry(update, &th->th_remote_update_list, ur_list) {
265                 if (update->ur_dt == dt_dev)
266                         return update;
267         }
268         return NULL;
269 }
270
271 static inline void osp_md_add_update_batchid(struct update_request *update)
272 {
273         update->ur_batchid++;
274 }
275
276 /**
277  * Find one loc in th_dev/dev_obj_update for the update,
278  * Because only one thread can access this thandle, no need
279  * lock now.
280  */
281 static struct update_request
282 *osp_find_create_update_loc(struct thandle *th, struct dt_object *dt)
283 {
284         struct dt_device        *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
285         struct update_request   *update;
286         ENTRY;
287
288         update = osp_find_update(th, dt_dev);
289         if (update != NULL)
290                 RETURN(update);
291
292         update = osp_create_update_req(dt_dev);
293         if (IS_ERR(update))
294                 RETURN(update);
295
296         cfs_list_add_tail(&update->ur_list, &th->th_remote_update_list);
297
298         RETURN(update);
299 }
300
301 static int osp_get_attr_from_req(const struct lu_env *env,
302                                  struct ptlrpc_request *req,
303                                  struct lu_attr *attr, int index)
304 {
305         struct update_reply     *reply;
306         struct obdo             *lobdo = &osp_env_info(env)->osi_obdo;
307         struct obdo             *wobdo;
308         int                     size;
309
310         LASSERT(attr != NULL);
311
312         reply = req_capsule_server_sized_get(&req->rq_pill, &RMF_UPDATE_REPLY,
313                                              UPDATE_BUFFER_SIZE);
314         if (reply == NULL || reply->ur_version != UPDATE_REPLY_V1)
315                 return -EPROTO;
316
317         size = update_get_reply_buf(reply, (void **)&wobdo, index);
318         if (size != sizeof(struct obdo))
319                 return -EPROTO;
320
321         obdo_le_to_cpu(wobdo, wobdo);
322         lustre_get_wire_obdo(NULL, lobdo, wobdo);
323         la_from_obdo(attr, lobdo, lobdo->o_valid);
324
325         return 0;
326 }
327
328 static int osp_md_declare_object_create(const struct lu_env *env,
329                                         struct dt_object *dt,
330                                         struct lu_attr *attr,
331                                         struct dt_allocation_hint *hint,
332                                         struct dt_object_format *dof,
333                                         struct thandle *th)
334 {
335         struct osp_thread_info  *osi = osp_env_info(env);
336         struct update_request   *update;
337         struct lu_fid           *fid1;
338         int                     sizes[2] = {sizeof(struct obdo), 0};
339         char                    *bufs[2] = {NULL, NULL};
340         int                     buf_count;
341         int                     rc;
342
343         update = osp_find_create_update_loc(th, dt);
344         if (IS_ERR(update)) {
345                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
346                        dt->do_lu.lo_dev->ld_obd->obd_name,
347                        (int)PTR_ERR(update));
348                 return PTR_ERR(update);
349         }
350
351         osi->osi_obdo.o_valid = 0;
352         LASSERT(S_ISDIR(attr->la_mode));
353         obdo_from_la(&osi->osi_obdo, attr, attr->la_valid);
354         lustre_set_wire_obdo(NULL, &osi->osi_obdo, &osi->osi_obdo);
355         obdo_cpu_to_le(&osi->osi_obdo, &osi->osi_obdo);
356
357         bufs[0] = (char *)&osi->osi_obdo;
358         buf_count = 1;
359         fid1 = (struct lu_fid *)lu_object_fid(&dt->do_lu);
360         if (hint->dah_parent) {
361                 struct lu_fid *fid2;
362                 struct lu_fid *tmp_fid = &osi->osi_fid;
363
364                 fid2 = (struct lu_fid *)lu_object_fid(&hint->dah_parent->do_lu);
365                 fid_cpu_to_le(tmp_fid, fid2);
366                 sizes[1] = sizeof(*tmp_fid);
367                 bufs[1] = (char *)tmp_fid;
368                 buf_count++;
369         }
370
371         if (lu_object_exists(&dt->do_lu)) {
372                 /* If the object already exists, we needs to destroy
373                  * this orphan object first.
374                  *
375                  * The scenario might happen in this case
376                  *
377                  * 1. client send remote create to MDT0.
378                  * 2. MDT0 send create update to MDT1.
379                  * 3. MDT1 finished create synchronously.
380                  * 4. MDT0 failed and reboot.
381                  * 5. client resend remote create to MDT0.
382                  * 6. MDT0 tries to resend create update to MDT1,
383                  *    but find the object already exists
384                  */
385                 CDEBUG(D_HA, "%s: object "DFID" exists, destroy this orphan\n",
386                        dt->do_lu.lo_dev->ld_obd->obd_name, PFID(fid1));
387
388                 rc = osp_insert_update(env, update, OBJ_REF_DEL, fid1, 0,
389                                        NULL, NULL);
390                 if (rc != 0)
391                         GOTO(out, rc);
392
393                 if (S_ISDIR(lu_object_attr(&dt->do_lu))) {
394                         /* decrease for ".." */
395                         rc = osp_insert_update(env, update, OBJ_REF_DEL, fid1,
396                                                0, NULL, NULL);
397                         if (rc != 0)
398                                 GOTO(out, rc);
399                 }
400
401                 rc = osp_insert_update(env, update, OBJ_DESTROY, fid1, 0, NULL,
402                                        NULL);
403                 if (rc != 0)
404                         GOTO(out, rc);
405
406                 dt->do_lu.lo_header->loh_attr &= ~LOHA_EXISTS;
407                 /* Increase batchid to add this orphan object deletion
408                  * to separate transaction */
409                 osp_md_add_update_batchid(update);
410         }
411
412         rc = osp_insert_update(env, update, OBJ_CREATE, fid1, buf_count, sizes,
413                                bufs);
414 out:
415         if (rc)
416                 CERROR("%s: Insert update error: rc = %d\n",
417                        dt->do_lu.lo_dev->ld_obd->obd_name, rc);
418
419         return rc;
420 }
421
422 static int osp_md_object_create(const struct lu_env *env, struct dt_object *dt,
423                                 struct lu_attr *attr,
424                                 struct dt_allocation_hint *hint,
425                                 struct dt_object_format *dof,
426                                 struct thandle *th)
427 {
428         struct osp_object  *obj = dt2osp_obj(dt);
429
430         CDEBUG(D_INFO, "create object "DFID"\n",
431                PFID(&dt->do_lu.lo_header->loh_fid));
432
433         /* Because the create update RPC will be sent during declare phase,
434          * if creation reaches here, it means the object has been created
435          * successfully */
436         dt->do_lu.lo_header->loh_attr |= LOHA_EXISTS | (attr->la_mode & S_IFMT);
437         obj->opo_empty = 1;
438
439         return 0;
440 }
441
442 static int osp_md_declare_object_ref_del(const struct lu_env *env,
443                                          struct dt_object *dt,
444                                          struct thandle *th)
445 {
446         struct update_request   *update;
447         struct lu_fid           *fid;
448         int                     rc;
449
450         update = osp_find_create_update_loc(th, dt);
451         if (IS_ERR(update)) {
452                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
453                        dt->do_lu.lo_dev->ld_obd->obd_name,
454                       (int)PTR_ERR(update));
455                 return PTR_ERR(update);
456         }
457
458         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
459
460         rc = osp_insert_update(env, update, OBJ_REF_DEL, fid, 0, NULL, NULL);
461
462         return rc;
463 }
464
465 static int osp_md_object_ref_del(const struct lu_env *env,
466                                  struct dt_object *dt,
467                                  struct thandle *th)
468 {
469         CDEBUG(D_INFO, "ref del object "DFID"\n",
470                PFID(&dt->do_lu.lo_header->loh_fid));
471
472         return 0;
473 }
474
475 static int osp_md_declare_ref_add(const struct lu_env *env,
476                                   struct dt_object *dt, struct thandle *th)
477 {
478         struct update_request   *update;
479         struct lu_fid           *fid;
480         int                     rc;
481
482         update = osp_find_create_update_loc(th, dt);
483         if (IS_ERR(update)) {
484                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
485                        dt->do_lu.lo_dev->ld_obd->obd_name,
486                        (int)PTR_ERR(update));
487                 return PTR_ERR(update);
488         }
489
490         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
491
492         rc = osp_insert_update(env, update, OBJ_REF_ADD, fid, 0, NULL, NULL);
493
494         return rc;
495 }
496
497 static int osp_md_object_ref_add(const struct lu_env *env,
498                                  struct dt_object *dt,
499                                  struct thandle *th)
500 {
501         CDEBUG(D_INFO, "ref add object "DFID"\n",
502                PFID(&dt->do_lu.lo_header->loh_fid));
503
504         return 0;
505 }
506
507 static void osp_md_ah_init(const struct lu_env *env,
508                            struct dt_allocation_hint *ah,
509                            struct dt_object *parent,
510                            struct dt_object *child,
511                            umode_t child_mode)
512 {
513         LASSERT(ah);
514
515         memset(ah, 0, sizeof(*ah));
516         ah->dah_parent = parent;
517         ah->dah_mode = child_mode;
518 }
519
520 static int osp_md_declare_attr_set(const struct lu_env *env,
521                                    struct dt_object *dt,
522                                    const struct lu_attr *attr,
523                                    struct thandle *th)
524 {
525         struct osp_thread_info *osi = osp_env_info(env);
526         struct update_request  *update;
527         struct lu_fid          *fid;
528         int                     size = sizeof(struct obdo);
529         char                   *buf;
530         int                     rc;
531
532         update = osp_find_create_update_loc(th, dt);
533         if (IS_ERR(update)) {
534                 CERROR("%s: Get OSP update buf failed: %d\n",
535                        dt->do_lu.lo_dev->ld_obd->obd_name,
536                        (int)PTR_ERR(update));
537                 return PTR_ERR(update);
538         }
539
540         osi->osi_obdo.o_valid = 0;
541         LASSERT(!(attr->la_valid & (LA_MODE | LA_TYPE)));
542         obdo_from_la(&osi->osi_obdo, (struct lu_attr *)attr,
543                      attr->la_valid);
544         lustre_set_wire_obdo(NULL, &osi->osi_obdo, &osi->osi_obdo);
545         obdo_cpu_to_le(&osi->osi_obdo, &osi->osi_obdo);
546
547         buf = (char *)&osi->osi_obdo;
548         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
549
550         rc = osp_insert_update(env, update, OBJ_ATTR_SET, fid, 1, &size, &buf);
551
552         return rc;
553 }
554
555 static int osp_md_attr_set(const struct lu_env *env, struct dt_object *dt,
556                            const struct lu_attr *attr, struct thandle *th,
557                            struct lustre_capa *capa)
558 {
559         CDEBUG(D_INFO, "attr set object "DFID"\n",
560                PFID(&dt->do_lu.lo_header->loh_fid));
561
562         RETURN(0);
563 }
564
565 static int osp_md_declare_xattr_set(const struct lu_env *env,
566                                     struct dt_object *dt,
567                                     const struct lu_buf *buf,
568                                     const char *name, int flag,
569                                     struct thandle *th)
570 {
571         struct update_request   *update;
572         struct lu_fid           *fid;
573         int                     sizes[3] = {strlen(name), buf->lb_len,
574                                             sizeof(int)};
575         char                    *bufs[3] = {(char *)name, (char *)buf->lb_buf };
576         int                     rc;
577
578         LASSERT(buf->lb_len > 0 && buf->lb_buf != NULL);
579         update = osp_find_create_update_loc(th, dt);
580         if (IS_ERR(update)) {
581                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
582                        dt->do_lu.lo_dev->ld_obd->obd_name,
583                        (int)PTR_ERR(update));
584                 return PTR_ERR(update);
585         }
586
587         flag = cpu_to_le32(flag);
588         bufs[2] = (char *)&flag;
589
590         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
591         rc = osp_insert_update(env, update, OBJ_XATTR_SET, fid,
592                                ARRAY_SIZE(sizes), sizes, bufs);
593
594         return rc;
595 }
596
597 static int osp_md_xattr_set(const struct lu_env *env, struct dt_object *dt,
598                             const struct lu_buf *buf, const char *name, int fl,
599                             struct thandle *th, struct lustre_capa *capa)
600 {
601         CDEBUG(D_INFO, "xattr %s set object "DFID"\n", name,
602                PFID(&dt->do_lu.lo_header->loh_fid));
603
604         return 0;
605 }
606
607 static int osp_md_xattr_get(const struct lu_env *env, struct dt_object *dt,
608                             struct lu_buf *buf, const char *name,
609                             struct lustre_capa *capa)
610 {
611         struct dt_device        *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
612         struct update_request   *update = NULL;
613         struct ptlrpc_request   *req = NULL;
614         int                     rc;
615         int                     buf_len;
616         int                     size;
617         struct update_reply     *reply;
618         void                    *ea_buf;
619         ENTRY;
620
621         /* Because it needs send the update buffer right away,
622          * just create an update buffer, instead of attaching the
623          * update_remote list of the thandle.
624          */
625         update = osp_create_update_req(dt_dev);
626         if (IS_ERR(update))
627                 RETURN(PTR_ERR(update));
628
629         LASSERT(name != NULL);
630         buf_len = strlen(name);
631         rc = osp_insert_update(env, update, OBJ_XATTR_GET,
632                                (struct lu_fid *)lu_object_fid(&dt->do_lu),
633                                1, &buf_len, (char **)&name);
634         if (rc != 0) {
635                 CERROR("%s: Insert update error: rc = %d\n",
636                        dt->do_lu.lo_dev->ld_obd->obd_name, rc);
637                 GOTO(out, rc);
638         }
639         dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
640
641         rc = osp_remote_sync(env, dt_dev, update, &req);
642         if (rc != 0)
643                 GOTO(out, rc);
644
645         reply = req_capsule_server_sized_get(&req->rq_pill, &RMF_UPDATE_REPLY,
646                                             UPDATE_BUFFER_SIZE);
647         if (reply->ur_version != UPDATE_REPLY_V1) {
648                 CERROR("%s: Wrong version %x expected %x: rc = %d\n",
649                        dt_dev->dd_lu_dev.ld_obd->obd_name,
650                        reply->ur_version, UPDATE_REPLY_V1, -EPROTO);
651                 GOTO(out, rc = -EPROTO);
652         }
653
654         size = update_get_reply_buf(reply, &ea_buf, 0);
655         if (size < 0)
656                 GOTO(out, rc = size);
657
658         LASSERT(size > 0 && size < PAGE_CACHE_SIZE);
659         LASSERT(ea_buf != NULL);
660
661         rc = size;
662         if (buf->lb_buf != NULL)
663                 memcpy(buf->lb_buf, ea_buf, size);
664 out:
665         if (req != NULL)
666                 ptlrpc_req_finished(req);
667
668         osp_destroy_update_req(update);
669
670         RETURN(rc);
671 }
672
673 static void osp_md_object_read_lock(const struct lu_env *env,
674                                     struct dt_object *dt, unsigned role)
675 {
676         struct osp_object  *obj = dt2osp_obj(dt);
677
678         LASSERT(obj->opo_owner != env);
679         down_read_nested(&obj->opo_sem, role);
680
681         LASSERT(obj->opo_owner == NULL);
682 }
683
684 static void osp_md_object_write_lock(const struct lu_env *env,
685                                      struct dt_object *dt, unsigned role)
686 {
687         struct osp_object *obj = dt2osp_obj(dt);
688
689         down_write_nested(&obj->opo_sem, role);
690
691         LASSERT(obj->opo_owner == NULL);
692         obj->opo_owner = env;
693 }
694
695 static void osp_md_object_read_unlock(const struct lu_env *env,
696                                       struct dt_object *dt)
697 {
698         struct osp_object *obj = dt2osp_obj(dt);
699
700         up_read(&obj->opo_sem);
701 }
702
703 static void osp_md_object_write_unlock(const struct lu_env *env,
704                                        struct dt_object *dt)
705 {
706         struct osp_object *obj = dt2osp_obj(dt);
707
708         LASSERT(obj->opo_owner == env);
709         obj->opo_owner = NULL;
710         up_write(&obj->opo_sem);
711 }
712
713 static int osp_md_object_write_locked(const struct lu_env *env,
714                                       struct dt_object *dt)
715 {
716         struct osp_object *obj = dt2osp_obj(dt);
717
718         return obj->opo_owner == env;
719 }
720
721 static int osp_md_index_lookup(const struct lu_env *env, struct dt_object *dt,
722                                struct dt_rec *rec, const struct dt_key *key,
723                                struct lustre_capa *capa)
724 {
725         struct dt_device        *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
726         struct update_request   *update;
727         struct ptlrpc_request   *req = NULL;
728         int                     size = strlen((char *)key) + 1;
729         char                    *name = (char *)key;
730         int                     rc;
731         struct update_reply     *reply;
732         struct lu_fid           *fid;
733
734         ENTRY;
735
736         /* Because it needs send the update buffer right away,
737          * just create an update buffer, instead of attaching the
738          * update_remote list of the thandle.
739          */
740         update = osp_create_update_req(dt_dev);
741         if (IS_ERR(update))
742                 RETURN(PTR_ERR(update));
743
744         rc = osp_insert_update(env, update, OBJ_INDEX_LOOKUP,
745                                (struct lu_fid *)lu_object_fid(&dt->do_lu),
746                                1, &size, (char **)&name);
747         if (rc) {
748                 CERROR("%s: Insert update error: rc = %d\n",
749                        dt_dev->dd_lu_dev.ld_obd->obd_name, rc);
750                 GOTO(out, rc);
751         }
752
753         rc = osp_remote_sync(env, dt_dev, update, &req);
754         if (rc < 0)
755                 GOTO(out, rc);
756
757         reply = req_capsule_server_sized_get(&req->rq_pill, &RMF_UPDATE_REPLY,
758                                              UPDATE_BUFFER_SIZE);
759         if (reply->ur_version != UPDATE_REPLY_V1) {
760                 CERROR("%s: Wrong version %x expected %x: rc = %d\n",
761                        dt_dev->dd_lu_dev.ld_obd->obd_name,
762                        reply->ur_version, UPDATE_REPLY_V1, -EPROTO);
763                 GOTO(out, rc = -EPROTO);
764         }
765
766         rc = update_get_reply_result(reply, NULL, 0);
767         if (rc < 0) {
768                 CERROR("%s: wrong version lookup "DFID" %s: rc = %d\n",
769                        dt_dev->dd_lu_dev.ld_obd->obd_name,
770                        PFID(lu_object_fid(&dt->do_lu)), (char *)key, rc);
771                 GOTO(out, rc);
772         }
773
774         size = update_get_reply_buf(reply, (void **)&fid, 0);
775         if (size < 0)
776                 GOTO(out, rc = size);
777
778         if (size != sizeof(struct lu_fid)) {
779                 CERROR("%s: lookup "DFID" %s wrong size %d: rc = %d\n",
780                        dt_dev->dd_lu_dev.ld_obd->obd_name,
781                        PFID(lu_object_fid(&dt->do_lu)), (char *)key, size, rc);
782                 GOTO(out, rc = -EINVAL);
783         }
784
785         fid_le_to_cpu(fid, fid);
786         if (!fid_is_sane(fid)) {
787                 CERROR("%s: lookup "DFID" %s invalid fid "DFID": rc = %d\n",
788                        dt_dev->dd_lu_dev.ld_obd->obd_name,
789                        PFID(lu_object_fid(&dt->do_lu)), (char *)key, PFID(fid),
790                        rc);
791                 GOTO(out, rc = -EINVAL);
792         }
793         memcpy(rec, fid, sizeof(*fid));
794 out:
795         if (req != NULL)
796                 ptlrpc_req_finished(req);
797
798         osp_destroy_update_req(update);
799
800         RETURN(rc);
801 }
802
803 static int osp_md_declare_insert(const struct lu_env *env,
804                                  struct dt_object *dt,
805                                  const struct dt_rec *rec,
806                                  const struct dt_key *key,
807                                  struct thandle *th)
808 {
809         struct update_request   *update;
810         struct lu_fid           *fid;
811         struct lu_fid           *rec_fid = (struct lu_fid *)rec;
812         int                     size[2] = {strlen((char *)key) + 1,
813                                                   sizeof(*rec_fid)};
814         char                    *bufs[2] = {(char *)key, (char *)rec_fid};
815         int                     rc;
816
817         update = osp_find_create_update_loc(th, dt);
818         if (IS_ERR(update)) {
819                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
820                        dt->do_lu.lo_dev->ld_obd->obd_name,
821                        (int)PTR_ERR(update));
822                 return PTR_ERR(update);
823         }
824
825         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
826
827         CDEBUG(D_INFO, "%s: insert index of "DFID" %s: "DFID"\n",
828                dt->do_lu.lo_dev->ld_obd->obd_name,
829                PFID(fid), (char *)key, PFID(rec_fid));
830
831         fid_cpu_to_le(rec_fid, rec_fid);
832
833         rc = osp_insert_update(env, update, OBJ_INDEX_INSERT, fid,
834                                ARRAY_SIZE(size), size, bufs);
835         return rc;
836 }
837
838 static int osp_md_index_insert(const struct lu_env *env,
839                                struct dt_object *dt,
840                                const struct dt_rec *rec,
841                                const struct dt_key *key,
842                                struct thandle *th,
843                                struct lustre_capa *capa,
844                                int ignore_quota)
845 {
846         return 0;
847 }
848
849 static int osp_md_declare_delete(const struct lu_env *env,
850                                  struct dt_object *dt,
851                                  const struct dt_key *key,
852                                  struct thandle *th)
853 {
854         struct update_request *update;
855         struct lu_fid *fid;
856         int size = strlen((char *)key) + 1;
857         char *buf = (char *)key;
858         int rc;
859
860         update = osp_find_create_update_loc(th, dt);
861         if (IS_ERR(update)) {
862                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
863                        dt->do_lu.lo_dev->ld_obd->obd_name,
864                        (int)PTR_ERR(update));
865                 return PTR_ERR(update);
866         }
867
868         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
869
870         rc = osp_insert_update(env, update, OBJ_INDEX_DELETE, fid, 1, &size,
871                                &buf);
872
873         return rc;
874 }
875
876 static int osp_md_index_delete(const struct lu_env *env,
877                                struct dt_object *dt,
878                                const struct dt_key *key,
879                                struct thandle *th,
880                                struct lustre_capa *capa)
881 {
882         CDEBUG(D_INFO, "index delete "DFID" %s\n",
883                PFID(&dt->do_lu.lo_header->loh_fid), (char *)key);
884
885         return 0;
886 }
887
888 /**
889  * Creates or initializes iterator context.
890  *
891  * Note: for OSP, these index iterate api is only used to check
892  * whether the directory is empty now (see mdd_dir_is_empty).
893  * Since dir_empty will be return by OBJ_ATTR_GET(see osp_md_attr_get/
894  * out_attr_get). So the implementation of these iterator is simplied
895  * to make mdd_dir_is_empty happy. The real iterator should be
896  * implemented, if we need it one day.
897  */
898 static struct dt_it *osp_it_init(const struct lu_env *env,
899                                  struct dt_object *dt,
900                                  __u32 attr,
901                                 struct lustre_capa *capa)
902 {
903         lu_object_get(&dt->do_lu);
904         return (struct dt_it *)dt;
905 }
906
907 static void osp_it_fini(const struct lu_env *env, struct dt_it *di)
908 {
909         struct dt_object *dt = (struct dt_object *)di;
910         lu_object_put(env, &dt->do_lu);
911 }
912
913 static int osp_it_get(const struct lu_env *env,
914                       struct dt_it *di, const struct dt_key *key)
915 {
916         return 1;
917 }
918
919 static void osp_it_put(const struct lu_env *env, struct dt_it *di)
920 {
921         return;
922 }
923
924 static int osp_it_next(const struct lu_env *env, struct dt_it *di)
925 {
926         struct dt_object *dt = (struct dt_object *)di;
927         struct osp_object *o = dt2osp_obj(dt);
928
929         if (o->opo_empty)
930                 return 1;
931
932         return 0;
933 }
934
935 static struct dt_key *osp_it_key(const struct lu_env *env,
936                                  const struct dt_it *di)
937 {
938         LBUG();
939         return NULL;
940 }
941
942 static int osp_it_key_size(const struct lu_env *env, const struct dt_it *di)
943 {
944         LBUG();
945         return 0;
946 }
947
948 static int osp_it_rec(const struct lu_env *env, const struct dt_it *di,
949                       struct dt_rec *lde, __u32 attr)
950 {
951         LBUG();
952         return 0;
953 }
954
955 static __u64 osp_it_store(const struct lu_env *env, const struct dt_it *di)
956 {
957         LBUG();
958         return 0;
959 }
960
961 static int osp_it_load(const struct lu_env *env, const struct dt_it *di,
962                        __u64 hash)
963 {
964         LBUG();
965         return 0;
966 }
967
968 static int osp_it_key_rec(const struct lu_env *env, const struct dt_it *di,
969                           void *key_rec)
970 {
971         LBUG();
972         return 0;
973 }
974
975 static const struct dt_index_operations osp_md_index_ops = {
976         .dio_lookup         = osp_md_index_lookup,
977         .dio_declare_insert = osp_md_declare_insert,
978         .dio_insert         = osp_md_index_insert,
979         .dio_declare_delete = osp_md_declare_delete,
980         .dio_delete         = osp_md_index_delete,
981         .dio_it     = {
982                 .init     = osp_it_init,
983                 .fini     = osp_it_fini,
984                 .get      = osp_it_get,
985                 .put      = osp_it_put,
986                 .next     = osp_it_next,
987                 .key      = osp_it_key,
988                 .key_size = osp_it_key_size,
989                 .rec      = osp_it_rec,
990                 .store    = osp_it_store,
991                 .load     = osp_it_load,
992                 .key_rec  = osp_it_key_rec,
993         }
994 };
995
996 static int osp_md_index_try(const struct lu_env *env,
997                             struct dt_object *dt,
998                             const struct dt_index_features *feat)
999 {
1000         dt->do_index_ops = &osp_md_index_ops;
1001         return 0;
1002 }
1003
1004 static int osp_md_attr_get(const struct lu_env *env,
1005                            struct dt_object *dt, struct lu_attr *attr,
1006                            struct lustre_capa *capa)
1007 {
1008         struct osp_object     *obj = dt2osp_obj(dt);
1009         struct dt_device      *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
1010         struct update_request *update = NULL;
1011         struct ptlrpc_request *req = NULL;
1012         int rc;
1013         ENTRY;
1014
1015         /* Because it needs send the update buffer right away,
1016          * just create an update buffer, instead of attaching the
1017          * update_remote list of the thandle.
1018          */
1019         update = osp_create_update_req(dt_dev);
1020         if (IS_ERR(update))
1021                 RETURN(PTR_ERR(update));
1022
1023         rc = osp_insert_update(env, update, OBJ_ATTR_GET,
1024                                (struct lu_fid *)lu_object_fid(&dt->do_lu),
1025                                0, NULL, NULL);
1026         if (rc) {
1027                 CERROR("%s: Insert update error: rc = %d\n",
1028                        dt_dev->dd_lu_dev.ld_obd->obd_name, rc);
1029                 GOTO(out, rc);
1030         }
1031         dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
1032
1033         rc = osp_remote_sync(env, dt_dev, update, &req);
1034         if (rc < 0)
1035                 GOTO(out, rc);
1036
1037         rc = osp_get_attr_from_req(env, req, attr, 0);
1038         if (rc)
1039                 GOTO(out, rc);
1040
1041         if (attr->la_flags == 1)
1042                 obj->opo_empty = 0;
1043         else
1044                 obj->opo_empty = 1;
1045 out:
1046         if (req != NULL)
1047                 ptlrpc_req_finished(req);
1048
1049         osp_destroy_update_req(update);
1050
1051         RETURN(rc);
1052 }
1053
1054 static int osp_md_declare_object_destroy(const struct lu_env *env,
1055                                          struct dt_object *dt,
1056                                          struct thandle *th)
1057 {
1058         struct osp_object  *o = dt2osp_obj(dt);
1059         int                 rc = 0;
1060         ENTRY;
1061
1062         /*
1063          * track objects to be destroyed via llog
1064          */
1065         rc = osp_sync_declare_add(env, o, MDS_UNLINK64_REC, th);
1066
1067         RETURN(rc);
1068 }
1069
1070 static int osp_md_object_destroy(const struct lu_env *env,
1071                                  struct dt_object *dt, struct thandle *th)
1072 {
1073         struct osp_object  *o = dt2osp_obj(dt);
1074         int                 rc = 0;
1075         ENTRY;
1076
1077         /*
1078          * once transaction is committed put proper command on
1079          * the queue going to our OST
1080          */
1081         rc = osp_sync_add(env, o, MDS_UNLINK64_REC, th, NULL);
1082
1083         /* not needed in cache any more */
1084         set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags);
1085
1086         RETURN(rc);
1087 }
1088
1089 static int osp_md_object_lock(const struct lu_env *env,
1090                               struct dt_object *dt,
1091                               struct lustre_handle *lh,
1092                               struct ldlm_enqueue_info *einfo,
1093                               void *policy)
1094 {
1095         struct osp_thread_info *info = osp_env_info(env);
1096         struct ldlm_res_id     *res_id = &info->osi_resid;
1097         struct dt_device       *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
1098         struct osp_device      *osp = dt2osp_dev(dt_dev);
1099         struct ptlrpc_request  *req = NULL;
1100         int                     rc = 0;
1101         __u64                   flags = 0;
1102         ldlm_mode_t             mode;
1103
1104         fid_build_reg_res_name(lu_object_fid(&dt->do_lu), res_id);
1105
1106         mode = ldlm_lock_match(osp->opd_obd->obd_namespace,
1107                                LDLM_FL_BLOCK_GRANTED, res_id,
1108                                einfo->ei_type,
1109                                (ldlm_policy_data_t *)policy,
1110                                einfo->ei_mode, lh, 0);
1111         if (mode > 0)
1112                 return ELDLM_OK;
1113
1114         req = ldlm_enqueue_pack(osp->opd_exp, 0);
1115         if (IS_ERR(req))
1116                 RETURN(PTR_ERR(req));
1117
1118         rc = ldlm_cli_enqueue(osp->opd_exp, &req, einfo, res_id,
1119                               (const ldlm_policy_data_t *)policy,
1120                               &flags, NULL, 0, LVB_T_NONE, lh, 0);
1121
1122         ptlrpc_req_finished(req);
1123
1124         return rc == ELDLM_OK ? 0 : -EIO;
1125 }
1126
1127 struct dt_object_operations osp_md_obj_ops = {
1128         .do_read_lock         = osp_md_object_read_lock,
1129         .do_write_lock        = osp_md_object_write_lock,
1130         .do_read_unlock       = osp_md_object_read_unlock,
1131         .do_write_unlock      = osp_md_object_write_unlock,
1132         .do_write_locked      = osp_md_object_write_locked,
1133         .do_declare_create    = osp_md_declare_object_create,
1134         .do_create            = osp_md_object_create,
1135         .do_declare_ref_add   = osp_md_declare_ref_add,
1136         .do_ref_add           = osp_md_object_ref_add,
1137         .do_declare_ref_del   = osp_md_declare_object_ref_del,
1138         .do_ref_del           = osp_md_object_ref_del,
1139         .do_declare_destroy   = osp_md_declare_object_destroy,
1140         .do_destroy           = osp_md_object_destroy,
1141         .do_ah_init           = osp_md_ah_init,
1142         .do_attr_get          = osp_md_attr_get,
1143         .do_declare_attr_set  = osp_md_declare_attr_set,
1144         .do_attr_set          = osp_md_attr_set,
1145         .do_declare_xattr_set = osp_md_declare_xattr_set,
1146         .do_xattr_set         = osp_md_xattr_set,
1147         .do_xattr_get         = osp_md_xattr_get,
1148         .do_index_try         = osp_md_index_try,
1149         .do_object_lock       = osp_md_object_lock,
1150 };