Whamcloud - gitweb
5dcec3c9ef40a8384ee4c1ed03a1d0fb6b9f2851
[fs/lustre-release.git] / lustre / osp / osp_md_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, Intel Corporation.
24  */
25 /*
26  * lustre/osp/osp_md_object.c
27  *
28  * Lustre MDT Proxy Device
29  *
30  * Author: Di Wang <di.wang@intel.com>
31  */
32
33 #ifndef EXPORT_SYMTAB
34 # define EXPORT_SYMTAB
35 #endif
36 #define DEBUG_SUBSYSTEM S_MDS
37
38 #include <lustre_log.h>
39 #include <lustre_update.h>
40 #include "osp_internal.h"
41 static const char dot[] = ".";
42 static const char dotdot[] = "..";
43
44 static int osp_prep_update_req(const struct lu_env *env,
45                                struct osp_device *osp,
46                                struct update_buf *ubuf, int ubuf_len,
47                                struct ptlrpc_request **reqp)
48 {
49         struct obd_import      *imp;
50         struct ptlrpc_request  *req;
51         struct update_buf      *tmp;
52         int                     rc;
53         ENTRY;
54
55         imp = osp->opd_obd->u.cli.cl_import;
56         LASSERT(imp);
57
58         req = ptlrpc_request_alloc(imp, &RQF_UPDATE_OBJ);
59         if (req == NULL)
60                 RETURN(-ENOMEM);
61
62         req_capsule_set_size(&req->rq_pill, &RMF_UPDATE, RCL_CLIENT,
63                              UPDATE_BUFFER_SIZE);
64
65         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, UPDATE_OBJ);
66         if (rc != 0) {
67                 ptlrpc_req_finished(req);
68                 RETURN(rc);
69         }
70
71         req_capsule_set_size(&req->rq_pill, &RMF_UPDATE_REPLY, RCL_SERVER,
72                              UPDATE_BUFFER_SIZE);
73
74         tmp = req_capsule_client_get(&req->rq_pill, &RMF_UPDATE);
75         memcpy(tmp, ubuf, ubuf_len);
76
77         ptlrpc_request_set_replen(req);
78
79         *reqp = req;
80
81         RETURN(rc);
82 }
83
84 static int osp_remote_sync(const struct lu_env *env, struct dt_device *dt,
85                            struct update_request *update,
86                            struct ptlrpc_request **reqp)
87 {
88         struct osp_device       *osp = dt2osp_dev(dt);
89         struct ptlrpc_request   *req = NULL;
90         int                     rc;
91         ENTRY;
92
93         rc = osp_prep_update_req(env, osp, update->ur_buf, UPDATE_BUFFER_SIZE,
94                                  &req);
95         if (rc)
96                 RETURN(rc);
97
98         /* Note: some dt index api might return non-zero result here, like
99          * osd_index_ea_lookup, so we should only check rc < 0 here */
100         rc = ptlrpc_queue_wait(req);
101         if (rc < 0) {
102                 ptlrpc_req_finished(req);
103                 update->ur_rc = rc;
104                 RETURN(rc);
105         }
106
107         if (reqp != NULL) {
108                 *reqp = req;
109                 RETURN(rc);
110         }
111
112         update->ur_rc = rc;
113
114         ptlrpc_req_finished(req);
115
116         RETURN(rc);
117 }
118
119 /**
120  * Create a new update request for the device.
121  */
122 static struct update_request
123 *osp_create_update_req(struct dt_device *dt)
124 {
125         struct update_request *update;
126
127         OBD_ALLOC_PTR(update);
128         if (!update)
129                 return ERR_PTR(-ENOMEM);
130
131         OBD_ALLOC_LARGE(update->ur_buf, UPDATE_BUFFER_SIZE);
132         if (update->ur_buf == NULL) {
133                 OBD_FREE_PTR(update);
134                 return ERR_PTR(-ENOMEM);
135         }
136
137         CFS_INIT_LIST_HEAD(&update->ur_list);
138         update->ur_dt = dt;
139         update->ur_batchid = 0;
140         update->ur_buf->ub_magic = UPDATE_BUFFER_MAGIC;
141         update->ur_buf->ub_count = 0;
142
143         return update;
144 }
145
146 static void osp_destroy_update_req(struct update_request *update)
147 {
148         if (update == NULL)
149                 return;
150
151         cfs_list_del(&update->ur_list);
152         if (update->ur_buf != NULL)
153                 OBD_FREE_LARGE(update->ur_buf, UPDATE_BUFFER_SIZE);
154
155         OBD_FREE_PTR(update);
156         return;
157 }
158
159 int osp_trans_stop(const struct lu_env *env, struct thandle *th)
160 {
161         int rc = 0;
162
163         osp_destroy_update_req(th->th_current_request);
164         rc = th->th_current_request->ur_rc;
165         th->th_current_request = NULL;
166
167         return rc;
168 }
169
170 /**
171  * In DNE phase I, all remote updates will be packed into RPC (the format
172  * description is in lustre_idl.h) during declare phase, all of updates
173  * are attached to the transaction, one entry per OSP. Then in trans start,
174  * LOD will walk through these entries and send these UPDATEs to the remote
175  * MDT to be executed synchronously.
176  */
177 int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
178                     struct thandle *th)
179 {
180         struct update_request *update;
181         int rc = 0;
182
183         /* In phase I, if the transaction includes remote updates, the local
184          * update should be synchronized, so it will set th_sync = 1 */
185         update = th->th_current_request;
186         LASSERT(update != NULL && update->ur_dt == dt);
187         if (update->ur_buf->ub_count > 0) {
188                 rc = osp_remote_sync(env, dt, update, NULL);
189                 th->th_sync = 1;
190         }
191
192         RETURN(rc);
193 }
194
195 /**
196  * Insert the update into the th_bufs for the device.
197  */
198 static int osp_insert_update(const struct lu_env *env,
199                              struct update_request *update, int op,
200                              struct lu_fid *fid, int count,
201                              int *lens, char **bufs)
202 {
203         struct update_buf    *ubuf = update->ur_buf;
204         struct update        *obj_update;
205         char                 *ptr;
206         int                   i;
207         int                   update_length;
208         int                   rc = 0;
209         ENTRY;
210
211         obj_update = (struct update *)((char *)ubuf +
212                       cfs_size_round(update_buf_size(ubuf)));
213
214         /* Check update size to make sure it can fit into the buffer */
215         update_length = cfs_size_round(offsetof(struct update,
216                                        u_bufs[0]));
217         for (i = 0; i < count; i++)
218                 update_length += cfs_size_round(lens[i]);
219
220         if (cfs_size_round(update_buf_size(ubuf)) + update_length >
221             UPDATE_BUFFER_SIZE || ubuf->ub_count >= UPDATE_MAX_OPS) {
222                 CERROR("%s: insert up %p, idx %d cnt %d len %lu: rc = %d\n",
223                         update->ur_dt->dd_lu_dev.ld_obd->obd_name, ubuf,
224                         update_length, ubuf->ub_count, update_buf_size(ubuf),
225                         -E2BIG);
226                 RETURN(-E2BIG);
227         }
228
229         if (count > UPDATE_BUF_COUNT) {
230                 CERROR("%s: Insert too much params %d "DFID" op %d: rc = %d\n",
231                         update->ur_dt->dd_lu_dev.ld_obd->obd_name, count,
232                         PFID(fid), op, -E2BIG);
233                 RETURN(-E2BIG);
234         }
235
236         /* fill the update into the update buffer */
237         fid_cpu_to_le(&obj_update->u_fid, fid);
238         obj_update->u_type = cpu_to_le32(op);
239         obj_update->u_batchid = update->ur_batchid;
240         for (i = 0; i < count; i++)
241                 obj_update->u_lens[i] = cpu_to_le32(lens[i]);
242
243         ptr = (char *)obj_update +
244                         cfs_size_round(offsetof(struct update, u_bufs[0]));
245         for (i = 0; i < count; i++)
246                 LOGL(bufs[i], lens[i], ptr);
247
248         ubuf->ub_count++;
249
250         CDEBUG(D_INFO, "%s: %p "DFID" idx %d: op %d params %d:%lu\n",
251                update->ur_dt->dd_lu_dev.ld_obd->obd_name, ubuf, PFID(fid),
252                ubuf->ub_count, op, count, update_buf_size(ubuf));
253
254         RETURN(rc);
255 }
256
257 static struct update_request
258 *osp_find_update(struct thandle *th, struct dt_device *dt_dev)
259 {
260         struct update_request   *update;
261
262         /* Because transaction api does not proivde the interface
263          * to transfer the update from LOD to OSP,  we need walk
264          * remote update list to find the update, this probably
265          * should move to LOD layer, when update can be part of
266          * the trancation api parameter. XXX */
267         cfs_list_for_each_entry(update, &th->th_remote_update_list, ur_list) {
268                 if (update->ur_dt == dt_dev)
269                         return update;
270         }
271         return NULL;
272 }
273
274 static inline void osp_md_add_update_batchid(struct update_request *update)
275 {
276         update->ur_batchid++;
277 }
278
279 /**
280  * Find one loc in th_dev/dev_obj_update for the update,
281  * Because only one thread can access this thandle, no need
282  * lock now.
283  */
284 static struct update_request
285 *osp_find_create_update_loc(struct thandle *th, struct dt_object *dt)
286 {
287         struct dt_device        *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
288         struct update_request   *update;
289         ENTRY;
290
291         update = osp_find_update(th, dt_dev);
292         if (update != NULL)
293                 RETURN(update);
294
295         update = osp_create_update_req(dt_dev);
296         if (IS_ERR(update))
297                 RETURN(update);
298
299         cfs_list_add_tail(&update->ur_list, &th->th_remote_update_list);
300
301         RETURN(update);
302 }
303
304 static int osp_get_attr_from_req(const struct lu_env *env,
305                                  struct ptlrpc_request *req,
306                                  struct lu_attr *attr, int index)
307 {
308         struct update_reply     *reply;
309         struct obdo             *lobdo = &osp_env_info(env)->osi_obdo;
310         struct obdo             *wobdo;
311         int                     size;
312
313         LASSERT(attr != NULL);
314
315         reply = req_capsule_server_sized_get(&req->rq_pill, &RMF_UPDATE_REPLY,
316                                              UPDATE_BUFFER_SIZE);
317         if (reply->ur_version != UPDATE_REPLY_V1)
318                 return -EPROTO;
319
320         size = update_get_reply_buf(reply, (void **)&wobdo, index);
321         if (size != sizeof(struct obdo))
322                 return -EPROTO;
323
324         obdo_le_to_cpu(wobdo, wobdo);
325         lustre_get_wire_obdo(lobdo, wobdo);
326         la_from_obdo(attr, lobdo, lobdo->o_valid);
327
328         return 0;
329 }
330
331 static int osp_md_declare_object_create(const struct lu_env *env,
332                                         struct dt_object *dt,
333                                         struct lu_attr *attr,
334                                         struct dt_allocation_hint *hint,
335                                         struct dt_object_format *dof,
336                                         struct thandle *th)
337 {
338         struct osp_thread_info  *osi = osp_env_info(env);
339         struct update_request   *update;
340         struct lu_fid           *fid1;
341         int                     sizes[2] = {sizeof(struct obdo), 0};
342         char                    *bufs[2] = {NULL, NULL};
343         int                     buf_count;
344         int                     rc;
345
346
347         update = osp_find_create_update_loc(th, dt);
348         if (IS_ERR(update)) {
349                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
350                        dt->do_lu.lo_dev->ld_obd->obd_name,
351                        (int)PTR_ERR(update));
352                 return PTR_ERR(update);
353         }
354
355         osi->osi_obdo.o_valid = 0;
356         LASSERT(S_ISDIR(attr->la_mode));
357         obdo_from_la(&osi->osi_obdo, attr, attr->la_valid);
358         lustre_set_wire_obdo(&osi->osi_obdo, &osi->osi_obdo);
359         obdo_cpu_to_le(&osi->osi_obdo, &osi->osi_obdo);
360
361         bufs[0] = (char *)&osi->osi_obdo;
362         buf_count = 1;
363         fid1 = (struct lu_fid *)lu_object_fid(&dt->do_lu);
364         if (hint->dah_parent) {
365                 struct lu_fid *fid2;
366                 struct lu_fid *tmp_fid = &osi->osi_fid;
367
368                 fid2 = (struct lu_fid *)lu_object_fid(&hint->dah_parent->do_lu);
369                 fid_cpu_to_le(tmp_fid, fid2);
370                 sizes[1] = sizeof(*tmp_fid);
371                 bufs[1] = (char *)tmp_fid;
372                 buf_count++;
373         }
374
375         if (lu_object_exists(&dt->do_lu)) {
376                 /* If the object already exists, we needs to destroy
377                  * this orphan object first.
378                  *
379                  * The scenario might happen in this case
380                  *
381                  * 1. client send remote create to MDT0.
382                  * 2. MDT0 send create update to MDT1.
383                  * 3. MDT1 finished create synchronously.
384                  * 4. MDT0 failed and reboot.
385                  * 5. client resend remote create to MDT0.
386                  * 6. MDT0 tries to resend create update to MDT1,
387                  *    but find the object already exists
388                  */
389                 CDEBUG(D_HA, "%s: object "DFID" exists, destroy this orphan\n",
390                        dt->do_lu.lo_dev->ld_obd->obd_name, PFID(fid1));
391
392                 rc = osp_insert_update(env, update, OBJ_REF_DEL, fid1, 0,
393                                        NULL, NULL);
394                 if (rc != 0)
395                         GOTO(out, rc);
396
397                 if (S_ISDIR(lu_object_attr(&dt->do_lu))) {
398                         /* decrease for ".." */
399                         rc = osp_insert_update(env, update, OBJ_REF_DEL, fid1,
400                                                0, NULL, NULL);
401                         if (rc != 0)
402                                 GOTO(out, rc);
403                 }
404
405                 rc = osp_insert_update(env, update, OBJ_DESTROY, fid1, 0, NULL,
406                                        NULL);
407                 if (rc != 0)
408                         GOTO(out, rc);
409
410                 dt->do_lu.lo_header->loh_attr &= ~LOHA_EXISTS;
411                 /* Increase batchid to add this orphan object deletion
412                  * to separate transaction */
413                 osp_md_add_update_batchid(update);
414         }
415
416         rc = osp_insert_update(env, update, OBJ_CREATE, fid1, buf_count, sizes,
417                                bufs);
418 out:
419         if (rc)
420                 CERROR("%s: Insert update error: rc = %d\n",
421                        dt->do_lu.lo_dev->ld_obd->obd_name, rc);
422
423         return rc;
424 }
425
426 static int osp_md_object_create(const struct lu_env *env, struct dt_object *dt,
427                                 struct lu_attr *attr,
428                                 struct dt_allocation_hint *hint,
429                                 struct dt_object_format *dof,
430                                 struct thandle *th)
431 {
432         struct osp_object  *obj = dt2osp_obj(dt);
433
434         CDEBUG(D_INFO, "create object "DFID"\n",
435                PFID(&dt->do_lu.lo_header->loh_fid));
436
437         /* Because the create update RPC will be sent during declare phase,
438          * if creation reaches here, it means the object has been created
439          * successfully */
440         dt->do_lu.lo_header->loh_attr |= LOHA_EXISTS | (attr->la_mode & S_IFMT);
441         obj->opo_empty = 1;
442
443         return 0;
444 }
445
446 static int osp_md_declare_object_ref_del(const struct lu_env *env,
447                                          struct dt_object *dt,
448                                          struct thandle *th)
449 {
450         struct update_request   *update;
451         struct lu_fid           *fid;
452         int                     rc;
453
454         update = osp_find_create_update_loc(th, dt);
455         if (IS_ERR(update)) {
456                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
457                        dt->do_lu.lo_dev->ld_obd->obd_name,
458                       (int)PTR_ERR(update));
459                 return PTR_ERR(update);
460         }
461
462         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
463
464         rc = osp_insert_update(env, update, OBJ_REF_DEL, fid, 0, NULL, NULL);
465
466         return rc;
467 }
468
469 static int osp_md_object_ref_del(const struct lu_env *env,
470                                  struct dt_object *dt,
471                                  struct thandle *th)
472 {
473         CDEBUG(D_INFO, "ref del object "DFID"\n",
474                PFID(&dt->do_lu.lo_header->loh_fid));
475
476         return 0;
477 }
478
479 static int osp_md_declare_ref_add(const struct lu_env *env,
480                                   struct dt_object *dt, struct thandle *th)
481 {
482         struct update_request   *update;
483         struct lu_fid           *fid;
484         int                     rc;
485
486         update = osp_find_create_update_loc(th, dt);
487         if (IS_ERR(update)) {
488                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
489                        dt->do_lu.lo_dev->ld_obd->obd_name,
490                        (int)PTR_ERR(update));
491                 return PTR_ERR(update);
492         }
493
494         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
495
496         rc = osp_insert_update(env, update, OBJ_REF_ADD, fid, 0, NULL, NULL);
497
498         return rc;
499 }
500
501 static int osp_md_object_ref_add(const struct lu_env *env,
502                                  struct dt_object *dt,
503                                  struct thandle *th)
504 {
505         CDEBUG(D_INFO, "ref add object "DFID"\n",
506                PFID(&dt->do_lu.lo_header->loh_fid));
507
508         return 0;
509 }
510
511 static void osp_md_ah_init(const struct lu_env *env,
512                            struct dt_allocation_hint *ah,
513                            struct dt_object *parent,
514                            struct dt_object *child,
515                            cfs_umode_t child_mode)
516 {
517         LASSERT(ah);
518
519         memset(ah, 0, sizeof(*ah));
520         ah->dah_parent = parent;
521         ah->dah_mode = child_mode;
522 }
523
524 static int osp_md_declare_attr_set(const struct lu_env *env,
525                                    struct dt_object *dt,
526                                    const struct lu_attr *attr,
527                                    struct thandle *th)
528 {
529         struct osp_thread_info *osi = osp_env_info(env);
530         struct update_request  *update;
531         struct lu_fid          *fid;
532         int                     size = sizeof(struct obdo);
533         char                   *buf;
534         int                     rc;
535
536         update = osp_find_create_update_loc(th, dt);
537         if (IS_ERR(update)) {
538                 CERROR("%s: Get OSP update buf failed: %d\n",
539                        dt->do_lu.lo_dev->ld_obd->obd_name,
540                        (int)PTR_ERR(update));
541                 return PTR_ERR(update);
542         }
543
544         osi->osi_obdo.o_valid = 0;
545         LASSERT(!(attr->la_valid & (LA_MODE | LA_TYPE)));
546         obdo_from_la(&osi->osi_obdo, (struct lu_attr *)attr,
547                      attr->la_valid);
548         lustre_set_wire_obdo(&osi->osi_obdo, &osi->osi_obdo);
549         obdo_cpu_to_le(&osi->osi_obdo, &osi->osi_obdo);
550
551         buf = (char *)&osi->osi_obdo;
552         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
553
554         rc = osp_insert_update(env, update, OBJ_ATTR_SET, fid, 1, &size, &buf);
555
556         return rc;
557 }
558
559 static int osp_md_attr_set(const struct lu_env *env, struct dt_object *dt,
560                            const struct lu_attr *attr, struct thandle *th,
561                            struct lustre_capa *capa)
562 {
563         CDEBUG(D_INFO, "attr set object "DFID"\n",
564                PFID(&dt->do_lu.lo_header->loh_fid));
565
566         RETURN(0);
567 }
568
569 static int osp_md_declare_xattr_set(const struct lu_env *env,
570                                     struct dt_object *dt,
571                                     const struct lu_buf *buf,
572                                     const char *name, int flag,
573                                     struct thandle *th)
574 {
575         struct update_request   *update;
576         struct lu_fid           *fid;
577         int                     sizes[3] = {strlen(name), buf->lb_len,
578                                             sizeof(int)};
579         char                    *bufs[3] = {(char *)name, (char *)buf->lb_buf };
580         int                     rc;
581
582         LASSERT(buf->lb_len > 0 && buf->lb_buf != NULL);
583         update = osp_find_create_update_loc(th, dt);
584         if (IS_ERR(update)) {
585                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
586                        dt->do_lu.lo_dev->ld_obd->obd_name,
587                        (int)PTR_ERR(update));
588                 return PTR_ERR(update);
589         }
590
591         flag = cpu_to_le32(flag);
592         bufs[2] = (char *)&flag;
593
594         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
595         rc = osp_insert_update(env, update, OBJ_XATTR_SET, fid,
596                                ARRAY_SIZE(sizes), sizes, bufs);
597
598         return rc;
599 }
600
601 static int osp_md_xattr_set(const struct lu_env *env, struct dt_object *dt,
602                             const struct lu_buf *buf, const char *name, int fl,
603                             struct thandle *th, struct lustre_capa *capa)
604 {
605         CDEBUG(D_INFO, "xattr %s set object "DFID"\n", name,
606                PFID(&dt->do_lu.lo_header->loh_fid));
607
608         return 0;
609 }
610
611 static int osp_md_xattr_get(const struct lu_env *env, struct dt_object *dt,
612                             struct lu_buf *buf, const char *name,
613                             struct lustre_capa *capa)
614 {
615         struct dt_device        *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
616         struct update_request   *update = NULL;
617         struct ptlrpc_request   *req = NULL;
618         int                     rc;
619         int                     buf_len;
620         int                     size;
621         struct update_reply     *reply;
622         void                    *ea_buf;
623         ENTRY;
624
625         /* Because it needs send the update buffer right away,
626          * just create an update buffer, instead of attaching the
627          * update_remote list of the thandle.
628          */
629         update = osp_create_update_req(dt_dev);
630         if (IS_ERR(update))
631                 RETURN(PTR_ERR(update));
632
633         LASSERT(name != NULL);
634         buf_len = strlen(name);
635         rc = osp_insert_update(env, update, OBJ_XATTR_GET,
636                                (struct lu_fid *)lu_object_fid(&dt->do_lu),
637                                1, &buf_len, (char **)&name);
638         if (rc != 0) {
639                 CERROR("%s: Insert update error: rc = %d\n",
640                        dt->do_lu.lo_dev->ld_obd->obd_name, rc);
641                 GOTO(out, rc);
642         }
643         dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
644
645         rc = osp_remote_sync(env, dt_dev, update, &req);
646         if (rc != 0)
647                 GOTO(out, rc);
648
649         reply = req_capsule_server_sized_get(&req->rq_pill, &RMF_UPDATE_REPLY,
650                                             UPDATE_BUFFER_SIZE);
651         if (reply->ur_version != UPDATE_REPLY_V1) {
652                 CERROR("%s: Wrong version %x expected %x: rc = %d\n",
653                        dt_dev->dd_lu_dev.ld_obd->obd_name,
654                        reply->ur_version, UPDATE_REPLY_V1, -EPROTO);
655                 GOTO(out, rc = -EPROTO);
656         }
657
658         size = update_get_reply_buf(reply, &ea_buf, 0);
659         if (size < 0)
660                 GOTO(out, rc = size);
661
662         LASSERT(size > 0 && size < CFS_PAGE_SIZE);
663         LASSERT(ea_buf != NULL);
664
665         rc = size;
666         if (buf->lb_buf != NULL)
667                 memcpy(buf->lb_buf, ea_buf, size);
668 out:
669         if (req != NULL)
670                 ptlrpc_req_finished(req);
671
672         if (update != NULL)
673                 osp_destroy_update_req(update);
674
675         RETURN(rc);
676 }
677
678 static void osp_md_object_read_lock(const struct lu_env *env,
679                                     struct dt_object *dt, unsigned role)
680 {
681         struct osp_object  *obj = dt2osp_obj(dt);
682
683         LASSERT(obj->opo_owner != env);
684         down_read_nested(&obj->opo_sem, role);
685
686         LASSERT(obj->opo_owner == NULL);
687 }
688
689 static void osp_md_object_write_lock(const struct lu_env *env,
690                                      struct dt_object *dt, unsigned role)
691 {
692         struct osp_object *obj = dt2osp_obj(dt);
693
694         down_write_nested(&obj->opo_sem, role);
695
696         LASSERT(obj->opo_owner == NULL);
697         obj->opo_owner = env;
698 }
699
700 static void osp_md_object_read_unlock(const struct lu_env *env,
701                                       struct dt_object *dt)
702 {
703         struct osp_object *obj = dt2osp_obj(dt);
704
705         up_read(&obj->opo_sem);
706 }
707
708 static void osp_md_object_write_unlock(const struct lu_env *env,
709                                        struct dt_object *dt)
710 {
711         struct osp_object *obj = dt2osp_obj(dt);
712
713         LASSERT(obj->opo_owner == env);
714         obj->opo_owner = NULL;
715         up_write(&obj->opo_sem);
716 }
717
718 static int osp_md_object_write_locked(const struct lu_env *env,
719                                       struct dt_object *dt)
720 {
721         struct osp_object *obj = dt2osp_obj(dt);
722
723         return obj->opo_owner == env;
724 }
725
726 static int osp_md_index_lookup(const struct lu_env *env, struct dt_object *dt,
727                                struct dt_rec *rec, const struct dt_key *key,
728                                struct lustre_capa *capa)
729 {
730         struct dt_device        *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
731         struct update_request   *update;
732         struct ptlrpc_request   *req = NULL;
733         int                     size = strlen((char *)key) + 1;
734         char                    *name = (char *)key;
735         int                     rc;
736         struct update_reply     *reply;
737         struct lu_fid           *fid;
738
739         ENTRY;
740
741         /* Because it needs send the update buffer right away,
742          * just create an update buffer, instead of attaching the
743          * update_remote list of the thandle.
744          */
745         update = osp_create_update_req(dt_dev);
746         if (IS_ERR(update))
747                 RETURN(PTR_ERR(update));
748
749         rc = osp_insert_update(env, update, OBJ_INDEX_LOOKUP,
750                                (struct lu_fid *)lu_object_fid(&dt->do_lu),
751                                1, &size, (char **)&name);
752         if (rc) {
753                 CERROR("%s: Insert update error: rc = %d\n",
754                        dt_dev->dd_lu_dev.ld_obd->obd_name, rc);
755                 GOTO(out, rc);
756         }
757
758         rc = osp_remote_sync(env, dt_dev, update, &req);
759         if (rc < 0) {
760                 CERROR("%s: lookup "DFID" %s failed: rc = %d\n",
761                        dt_dev->dd_lu_dev.ld_obd->obd_name,
762                        PFID(lu_object_fid(&dt->do_lu)), (char *)key, rc);
763                 GOTO(out, rc);
764         }
765
766         reply = req_capsule_server_sized_get(&req->rq_pill, &RMF_UPDATE_REPLY,
767                                              UPDATE_BUFFER_SIZE);
768         if (reply->ur_version != UPDATE_REPLY_V1) {
769                 CERROR("%s: Wrong version %x expected %x: rc = %d\n",
770                        dt_dev->dd_lu_dev.ld_obd->obd_name,
771                        reply->ur_version, UPDATE_REPLY_V1, -EPROTO);
772                 GOTO(out, rc = -EPROTO);
773         }
774
775         rc = update_get_reply_result(reply, NULL, 0);
776         if (rc < 0) {
777                 CERROR("%s: wrong version lookup "DFID" %s: rc = %d\n",
778                        dt_dev->dd_lu_dev.ld_obd->obd_name,
779                        PFID(lu_object_fid(&dt->do_lu)), (char *)key, rc);
780                 GOTO(out, rc);
781         }
782
783         size = update_get_reply_buf(reply, (void **)&fid, 0);
784         if (size < 0)
785                 GOTO(out, rc = size);
786
787         if (size != sizeof(struct lu_fid)) {
788                 CERROR("%s: lookup "DFID" %s wrong size %d: rc = %d\n",
789                        dt_dev->dd_lu_dev.ld_obd->obd_name,
790                        PFID(lu_object_fid(&dt->do_lu)), (char *)key, size, rc);
791                 GOTO(out, rc = -EINVAL);
792         }
793
794         fid_le_to_cpu(fid, fid);
795         if (!fid_is_sane(fid)) {
796                 CERROR("%s: lookup "DFID" %s invalid fid "DFID": rc = %d\n",
797                        dt_dev->dd_lu_dev.ld_obd->obd_name,
798                        PFID(lu_object_fid(&dt->do_lu)), (char *)key, PFID(fid),
799                        rc);
800                 GOTO(out, rc = -EINVAL);
801         }
802         memcpy(rec, fid, sizeof(*fid));
803 out:
804         if (req != NULL)
805                 ptlrpc_req_finished(req);
806
807         if (update != NULL)
808                 osp_destroy_update_req(update);
809
810         RETURN(rc);
811 }
812
813 static int osp_md_declare_insert(const struct lu_env *env,
814                                  struct dt_object *dt,
815                                  const struct dt_rec *rec,
816                                  const struct dt_key *key,
817                                  struct thandle *th)
818 {
819         struct update_request   *update;
820         struct lu_fid           *fid;
821         struct lu_fid           *rec_fid = (struct lu_fid *)rec;
822         int                     size[2] = {strlen((char *)key) + 1,
823                                                   sizeof(*rec_fid)};
824         char                    *bufs[2] = {(char *)key, (char *)rec_fid};
825         int                     rc;
826
827         update = osp_find_create_update_loc(th, dt);
828         if (IS_ERR(update)) {
829                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
830                        dt->do_lu.lo_dev->ld_obd->obd_name,
831                        (int)PTR_ERR(update));
832                 return PTR_ERR(update);
833         }
834
835         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
836
837         CDEBUG(D_INFO, "%s: insert index of "DFID" %s: "DFID"\n",
838                dt->do_lu.lo_dev->ld_obd->obd_name,
839                PFID(fid), (char *)key, PFID(rec_fid));
840
841         fid_cpu_to_le(rec_fid, rec_fid);
842
843         rc = osp_insert_update(env, update, OBJ_INDEX_INSERT, fid,
844                                ARRAY_SIZE(size), size, bufs);
845         return rc;
846 }
847
848 static int osp_md_index_insert(const struct lu_env *env,
849                                struct dt_object *dt,
850                                const struct dt_rec *rec,
851                                const struct dt_key *key,
852                                struct thandle *th,
853                                struct lustre_capa *capa,
854                                int ignore_quota)
855 {
856         return 0;
857 }
858
859 static int osp_md_declare_delete(const struct lu_env *env,
860                                  struct dt_object *dt,
861                                  const struct dt_key *key,
862                                  struct thandle *th)
863 {
864         struct update_request *update;
865         struct lu_fid *fid;
866         int size = strlen((char *)key) + 1;
867         char *buf = (char *)key;
868         int rc;
869
870         update = osp_find_create_update_loc(th, dt);
871         if (IS_ERR(update)) {
872                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
873                        dt->do_lu.lo_dev->ld_obd->obd_name,
874                        (int)PTR_ERR(update));
875                 return PTR_ERR(update);
876         }
877
878         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
879
880         rc = osp_insert_update(env, update, OBJ_INDEX_DELETE, fid, 1, &size,
881                                &buf);
882
883         return rc;
884 }
885
886 static int osp_md_index_delete(const struct lu_env *env,
887                                struct dt_object *dt,
888                                const struct dt_key *key,
889                                struct thandle *th,
890                                struct lustre_capa *capa)
891 {
892         CDEBUG(D_INFO, "index delete "DFID" %s\n",
893                PFID(&dt->do_lu.lo_header->loh_fid), (char *)key);
894
895         return 0;
896 }
897
898 /**
899  * Creates or initializes iterator context.
900  *
901  * Note: for OSP, these index iterate api is only used to check
902  * whether the directory is empty now (see mdd_dir_is_empty).
903  * Since dir_empty will be return by OBJ_ATTR_GET(see osp_md_attr_get/
904  * out_attr_get). So the implementation of these iterator is simplied
905  * to make mdd_dir_is_empty happy. The real iterator should be
906  * implemented, if we need it one day.
907  */
908 static struct dt_it *osp_it_init(const struct lu_env *env,
909                                  struct dt_object *dt,
910                                  __u32 attr,
911                                 struct lustre_capa *capa)
912 {
913         lu_object_get(&dt->do_lu);
914         return (struct dt_it *)dt;
915 }
916
917 static void osp_it_fini(const struct lu_env *env, struct dt_it *di)
918 {
919         struct dt_object *dt = (struct dt_object *)di;
920         lu_object_put(env, &dt->do_lu);
921 }
922
923 static int osp_it_get(const struct lu_env *env,
924                       struct dt_it *di, const struct dt_key *key)
925 {
926         return 1;
927 }
928
929 static void osp_it_put(const struct lu_env *env, struct dt_it *di)
930 {
931         return;
932 }
933
934 static int osp_it_next(const struct lu_env *env, struct dt_it *di)
935 {
936         struct dt_object *dt = (struct dt_object *)di;
937         struct osp_object *osp_obj = dt2osp_obj(dt);
938         if (osp_obj->opo_empty)
939                 return 1;
940         return 0;
941 }
942
943 static struct dt_key *osp_it_key(const struct lu_env *env,
944                                  const struct dt_it *di)
945 {
946         LBUG();
947         return NULL;
948 }
949
950 static int osp_it_key_size(const struct lu_env *env, const struct dt_it *di)
951 {
952         LBUG();
953         return 0;
954 }
955
956 static int osp_it_rec(const struct lu_env *env, const struct dt_it *di,
957                       struct dt_rec *lde, __u32 attr)
958 {
959         LBUG();
960         return 0;
961 }
962
963 static __u64 osp_it_store(const struct lu_env *env, const struct dt_it *di)
964 {
965         LBUG();
966         return 0;
967 }
968
969 static int osp_it_load(const struct lu_env *env, const struct dt_it *di,
970                        __u64 hash)
971 {
972         LBUG();
973         return 0;
974 }
975
976 static int osp_it_key_rec(const struct lu_env *env, const struct dt_it *di,
977                           void *key_rec)
978 {
979         LBUG();
980         return 0;
981 }
982
983 static const struct dt_index_operations osp_md_index_ops = {
984         .dio_lookup         = osp_md_index_lookup,
985         .dio_declare_insert = osp_md_declare_insert,
986         .dio_insert         = osp_md_index_insert,
987         .dio_declare_delete = osp_md_declare_delete,
988         .dio_delete         = osp_md_index_delete,
989         .dio_it     = {
990                 .init     = osp_it_init,
991                 .fini     = osp_it_fini,
992                 .get      = osp_it_get,
993                 .put      = osp_it_put,
994                 .next     = osp_it_next,
995                 .key      = osp_it_key,
996                 .key_size = osp_it_key_size,
997                 .rec      = osp_it_rec,
998                 .store    = osp_it_store,
999                 .load     = osp_it_load,
1000                 .key_rec  = osp_it_key_rec,
1001         }
1002 };
1003
1004 static int osp_md_index_try(const struct lu_env *env,
1005                             struct dt_object *dt,
1006                             const struct dt_index_features *feat)
1007 {
1008         dt->do_index_ops = &osp_md_index_ops;
1009         return 0;
1010 }
1011
1012 static int osp_md_attr_get(const struct lu_env *env,
1013                            struct dt_object *dt, struct lu_attr *attr,
1014                            struct lustre_capa *capa)
1015 {
1016         struct osp_object     *obj = dt2osp_obj(dt);
1017         struct dt_device      *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
1018         struct update_request *update = NULL;
1019         struct ptlrpc_request *req = NULL;
1020         int rc;
1021         ENTRY;
1022
1023         /* Because it needs send the update buffer right away,
1024          * just create an update buffer, instead of attaching the
1025          * update_remote list of the thandle.
1026          */
1027         update = osp_create_update_req(dt_dev);
1028         if (IS_ERR(update))
1029                 RETURN(PTR_ERR(update));
1030
1031         rc = osp_insert_update(env, update, OBJ_ATTR_GET,
1032                                (struct lu_fid *)lu_object_fid(&dt->do_lu),
1033                                0, NULL, NULL);
1034         if (rc) {
1035                 CERROR("%s: Insert update error: rc = %d\n",
1036                        dt_dev->dd_lu_dev.ld_obd->obd_name, rc);
1037                 GOTO(out, rc);
1038         }
1039         dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
1040
1041         rc = osp_remote_sync(env, dt_dev, update, &req);
1042         if (rc < 0)
1043                 GOTO(out, rc);
1044
1045         rc = osp_get_attr_from_req(env, req, attr, 0);
1046         if (rc)
1047                 GOTO(out, rc);
1048
1049         if (attr->la_flags == 1)
1050                 obj->opo_empty = 0;
1051         else
1052                 obj->opo_empty = 1;
1053 out:
1054         if (req != NULL)
1055                 ptlrpc_req_finished(req);
1056
1057         if (update != NULL)
1058                 osp_destroy_update_req(update);
1059
1060         RETURN(rc);
1061 }
1062
1063 static int osp_md_declare_object_destroy(const struct lu_env *env,
1064                                          struct dt_object *dt,
1065                                          struct thandle *th)
1066 {
1067         struct osp_object  *o = dt2osp_obj(dt);
1068         int                 rc = 0;
1069         ENTRY;
1070
1071         /*
1072          * track objects to be destroyed via llog
1073          */
1074         rc = osp_sync_declare_add(env, o, MDS_UNLINK64_REC, th);
1075
1076         RETURN(rc);
1077 }
1078
1079 static int osp_md_object_destroy(const struct lu_env *env,
1080                                  struct dt_object *dt, struct thandle *th)
1081 {
1082         struct osp_object  *o = dt2osp_obj(dt);
1083         int                 rc = 0;
1084         ENTRY;
1085
1086         /*
1087          * once transaction is committed put proper command on
1088          * the queue going to our OST
1089          */
1090         rc = osp_sync_add(env, o, MDS_UNLINK64_REC, th, NULL);
1091
1092         /* not needed in cache any more */
1093         set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags);
1094
1095         RETURN(rc);
1096 }
1097
1098 struct dt_object_operations osp_md_obj_ops = {
1099         .do_read_lock         = osp_md_object_read_lock,
1100         .do_write_lock        = osp_md_object_write_lock,
1101         .do_read_unlock       = osp_md_object_read_unlock,
1102         .do_write_unlock      = osp_md_object_write_unlock,
1103         .do_write_locked      = osp_md_object_write_locked,
1104         .do_declare_create    = osp_md_declare_object_create,
1105         .do_create            = osp_md_object_create,
1106         .do_declare_ref_add   = osp_md_declare_ref_add,
1107         .do_ref_add           = osp_md_object_ref_add,
1108         .do_declare_ref_del   = osp_md_declare_object_ref_del,
1109         .do_ref_del           = osp_md_object_ref_del,
1110         .do_declare_destroy   = osp_md_declare_object_destroy,
1111         .do_destroy           = osp_md_object_destroy,
1112         .do_ah_init           = osp_md_ah_init,
1113         .do_attr_get          = osp_md_attr_get,
1114         .do_declare_attr_set  = osp_md_declare_attr_set,
1115         .do_attr_set          = osp_md_attr_set,
1116         .do_declare_xattr_set = osp_md_declare_xattr_set,
1117         .do_xattr_set         = osp_md_xattr_set,
1118         .do_xattr_get         = osp_md_xattr_get,
1119         .do_index_try         = osp_md_index_try,
1120 };
1121
1122 static int osp_md_object_lock(const struct lu_env *env,
1123                               struct dt_object *dt,
1124                               struct lustre_handle *lh,
1125                               struct ldlm_enqueue_info *einfo,
1126                               void *policy)
1127 {
1128         struct osp_thread_info *info = osp_env_info(env);
1129         struct ldlm_res_id     *res_id = &info->osi_resid;
1130         struct dt_device       *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
1131         struct osp_device      *osp = dt2osp_dev(dt_dev);
1132         struct ptlrpc_request  *req = NULL;
1133         int                     rc = 0;
1134         __u64                   flags = 0;
1135         ldlm_mode_t             mode;
1136
1137         fid_build_reg_res_name(lu_object_fid(&dt->do_lu), res_id);
1138
1139         mode = ldlm_lock_match(osp->opd_obd->obd_namespace,
1140                                LDLM_FL_BLOCK_GRANTED, res_id,
1141                                einfo->ei_type,
1142                                (ldlm_policy_data_t *)policy,
1143                                einfo->ei_mode, lh, 0);
1144         if (mode > 0)
1145                 return ELDLM_OK;
1146
1147         req = ldlm_enqueue_pack(osp->opd_exp, 0);
1148         if (IS_ERR(req))
1149                 RETURN(PTR_ERR(req));
1150
1151         rc = ldlm_cli_enqueue(osp->opd_exp, &req, einfo, res_id,
1152                               (const ldlm_policy_data_t *)policy,
1153                               &flags, NULL, 0, LVB_T_NONE, lh, 0);
1154
1155         ptlrpc_req_finished(req);
1156
1157         return rc == ELDLM_OK ? 0 : -EIO;
1158 }
1159
1160 struct dt_lock_operations osp_md_lock_ops = {
1161         .do_object_lock       = osp_md_object_lock,
1162 };
1163