Whamcloud - gitweb
LU-1187 out: add resend check for update.
[fs/lustre-release.git] / lustre / osp / osp_md_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, Intel Corporation.
24  */
25 /*
26  * lustre/osp/osp_md_object.c
27  *
28  * Lustre MDT Proxy Device
29  *
30  * Author: Di Wang <di.wang@intel.com>
31  */
32
33 #ifndef EXPORT_SYMTAB
34 # define EXPORT_SYMTAB
35 #endif
36 #define DEBUG_SUBSYSTEM S_MDS
37
38 #include <lustre_log.h>
39 #include <lustre_update.h>
40 #include "osp_internal.h"
41 static const char dot[] = ".";
42 static const char dotdot[] = "..";
43
44 static int osp_prep_update_req(const struct lu_env *env,
45                                struct osp_device *osp,
46                                struct update_buf *ubuf, int ubuf_len,
47                                struct ptlrpc_request **reqp)
48 {
49         struct obd_import      *imp;
50         struct ptlrpc_request  *req;
51         struct update_buf      *tmp;
52         int                     rc;
53         ENTRY;
54
55         imp = osp->opd_obd->u.cli.cl_import;
56         LASSERT(imp);
57
58         req = ptlrpc_request_alloc(imp, &RQF_UPDATE_OBJ);
59         if (req == NULL)
60                 RETURN(-ENOMEM);
61
62         req_capsule_set_size(&req->rq_pill, &RMF_UPDATE, RCL_CLIENT,
63                              UPDATE_BUFFER_SIZE);
64
65         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, UPDATE_OBJ);
66         if (rc != 0) {
67                 ptlrpc_req_finished(req);
68                 RETURN(rc);
69         }
70
71         req_capsule_set_size(&req->rq_pill, &RMF_UPDATE_REPLY, RCL_SERVER,
72                              UPDATE_BUFFER_SIZE);
73
74         tmp = req_capsule_client_get(&req->rq_pill, &RMF_UPDATE);
75         memcpy(tmp, ubuf, ubuf_len);
76
77         ptlrpc_request_set_replen(req);
78
79         *reqp = req;
80
81         RETURN(rc);
82 }
83
84 static int osp_remote_sync(const struct lu_env *env, struct dt_device *dt,
85                            struct update_request *update,
86                            struct ptlrpc_request **reqp)
87 {
88         struct osp_device       *osp = dt2osp_dev(dt);
89         struct ptlrpc_request   *req = NULL;
90         int                     rc;
91         ENTRY;
92
93         rc = osp_prep_update_req(env, osp, update->ur_buf, UPDATE_BUFFER_SIZE,
94                                  &req);
95         if (rc)
96                 RETURN(rc);
97
98         /* Note: some dt index api might return non-zero result here, like
99          * osd_index_ea_lookup, so we should only check rc < 0 here */
100         rc = ptlrpc_queue_wait(req);
101         if (rc < 0) {
102                 ptlrpc_req_finished(req);
103                 update->ur_rc = rc;
104                 RETURN(rc);
105         }
106
107         if (reqp != NULL) {
108                 *reqp = req;
109                 RETURN(rc);
110         }
111
112         update->ur_rc = rc;
113
114         ptlrpc_req_finished(req);
115
116         RETURN(rc);
117 }
118
119 /**
120  * Create a new update request for the device.
121  */
122 static struct update_request
123 *osp_create_update_req(struct dt_device *dt)
124 {
125         struct update_request *update;
126
127         OBD_ALLOC_PTR(update);
128         if (!update)
129                 return ERR_PTR(-ENOMEM);
130
131         OBD_ALLOC_LARGE(update->ur_buf, UPDATE_BUFFER_SIZE);
132         if (update->ur_buf == NULL) {
133                 OBD_FREE_PTR(update);
134                 return ERR_PTR(-ENOMEM);
135         }
136
137         CFS_INIT_LIST_HEAD(&update->ur_list);
138         update->ur_dt = dt;
139         update->ur_batchid = 0;
140         update->ur_buf->ub_magic = UPDATE_BUFFER_MAGIC;
141         update->ur_buf->ub_count = 0;
142
143         return update;
144 }
145
146 static void osp_destroy_update_req(struct update_request *update)
147 {
148         if (update == NULL)
149                 return;
150
151         cfs_list_del(&update->ur_list);
152         if (update->ur_buf != NULL)
153                 OBD_FREE_LARGE(update->ur_buf, UPDATE_BUFFER_SIZE);
154
155         OBD_FREE_PTR(update);
156         return;
157 }
158
159 int osp_trans_stop(const struct lu_env *env, struct thandle *th)
160 {
161         int rc = 0;
162
163         osp_destroy_update_req(th->th_current_request);
164         rc = th->th_current_request->ur_rc;
165         th->th_current_request = NULL;
166
167         return rc;
168 }
169
170 /**
171  * In DNE phase I, all remote updates will be packed into RPC (the format
172  * description is in lustre_idl.h) during declare phase, all of updates
173  * are attached to the transaction, one entry per OSP. Then in trans start,
174  * LOD will walk through these entries and send these UPDATEs to the remote
175  * MDT to be executed synchronously.
176  */
177 int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
178                     struct thandle *th)
179 {
180         struct update_request *update;
181         int rc = 0;
182
183         /* In phase I, if the transaction includes remote updates, the local
184          * update should be synchronized, so it will set th_sync = 1 */
185         update = th->th_current_request;
186         LASSERT(update != NULL && update->ur_dt == dt);
187         if (update->ur_buf->ub_count > 0) {
188                 rc = osp_remote_sync(env, dt, update, NULL);
189                 th->th_sync = 1;
190         }
191
192         RETURN(rc);
193 }
194
195 /**
196  * Insert the update into the th_bufs for the device.
197  */
198 static int osp_insert_update(const struct lu_env *env,
199                              struct update_request *update, int op,
200                              struct lu_fid *fid, int count,
201                              int *lens, char **bufs)
202 {
203         struct update_buf    *ubuf = update->ur_buf;
204         struct update        *obj_update;
205         char                 *ptr;
206         int                   i;
207         int                   update_length;
208         int                   rc = 0;
209         ENTRY;
210
211         obj_update = (struct update *)((char *)ubuf +
212                       cfs_size_round(update_buf_size(ubuf)));
213
214         /* Check update size to make sure it can fit into the buffer */
215         update_length = cfs_size_round(offsetof(struct update,
216                                        u_bufs[0]));
217         for (i = 0; i < count; i++)
218                 update_length += cfs_size_round(lens[i]);
219
220         if (cfs_size_round(update_buf_size(ubuf)) + update_length >
221             UPDATE_BUFFER_SIZE || ubuf->ub_count >= UPDATE_MAX_OPS) {
222                 CERROR("%s: insert up %p, idx %d cnt %d len %lu: rc = %d\n",
223                         update->ur_dt->dd_lu_dev.ld_obd->obd_name, ubuf,
224                         update_length, ubuf->ub_count, update_buf_size(ubuf),
225                         -E2BIG);
226                 RETURN(-E2BIG);
227         }
228
229         if (count > UPDATE_BUF_COUNT) {
230                 CERROR("%s: Insert too much params %d "DFID" op %d: rc = %d\n",
231                         update->ur_dt->dd_lu_dev.ld_obd->obd_name, count,
232                         PFID(fid), op, -E2BIG);
233                 RETURN(-E2BIG);
234         }
235
236         /* fill the update into the update buffer */
237         fid_cpu_to_le(&obj_update->u_fid, fid);
238         obj_update->u_type = cpu_to_le32(op);
239         obj_update->u_batchid = update->ur_batchid;
240         for (i = 0; i < count; i++)
241                 obj_update->u_lens[i] = cpu_to_le32(lens[i]);
242
243         ptr = (char *)obj_update +
244                         cfs_size_round(offsetof(struct update, u_bufs[0]));
245         for (i = 0; i < count; i++)
246                 LOGL(bufs[i], lens[i], ptr);
247
248         ubuf->ub_count++;
249
250         CDEBUG(D_INFO, "%s: %p "DFID" idx %d: op %d params %d:%lu\n",
251                update->ur_dt->dd_lu_dev.ld_obd->obd_name, ubuf, PFID(fid),
252                ubuf->ub_count, op, count, update_buf_size(ubuf));
253
254         RETURN(rc);
255 }
256
257 static struct update_request
258 *osp_find_update(struct thandle *th, struct dt_device *dt_dev)
259 {
260         struct update_request   *update;
261
262         /* Because transaction api does not proivde the interface
263          * to transfer the update from LOD to OSP,  we need walk
264          * remote update list to find the update, this probably
265          * should move to LOD layer, when update can be part of
266          * the trancation api parameter. XXX */
267         cfs_list_for_each_entry(update, &th->th_remote_update_list, ur_list) {
268                 if (update->ur_dt == dt_dev)
269                         return update;
270         }
271         return NULL;
272 }
273
274 static inline void osp_md_add_update_batchid(struct update_request *update)
275 {
276         update->ur_batchid++;
277 }
278
279 /**
280  * Find one loc in th_dev/dev_obj_update for the update,
281  * Because only one thread can access this thandle, no need
282  * lock now.
283  */
284 static struct update_request
285 *osp_find_create_update_loc(struct thandle *th, struct dt_object *dt)
286 {
287         struct dt_device        *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
288         struct update_request   *update;
289         ENTRY;
290
291         update = osp_find_update(th, dt_dev);
292         if (update != NULL)
293                 RETURN(update);
294
295         update = osp_create_update_req(dt_dev);
296         if (IS_ERR(update))
297                 RETURN(update);
298
299         cfs_list_add_tail(&update->ur_list, &th->th_remote_update_list);
300
301         RETURN(update);
302 }
303
304 static int osp_get_attr_from_req(const struct lu_env *env,
305                                  struct ptlrpc_request *req,
306                                  struct lu_attr *attr, int index)
307 {
308         struct update_reply     *reply;
309         struct obdo             *lobdo = &osp_env_info(env)->osi_obdo;
310         struct obdo             *wobdo;
311         int                     size;
312
313         LASSERT(attr != NULL);
314
315         reply = req_capsule_server_sized_get(&req->rq_pill, &RMF_UPDATE_REPLY,
316                                              UPDATE_BUFFER_SIZE);
317         if (reply->ur_version != UPDATE_REPLY_V1)
318                 return -EPROTO;
319
320         size = update_get_reply_buf(reply, (void **)&wobdo, index);
321         if (size != sizeof(struct obdo))
322                 return -EPROTO;
323
324         obdo_le_to_cpu(wobdo, wobdo);
325         lustre_get_wire_obdo(lobdo, wobdo);
326         la_from_obdo(attr, lobdo, lobdo->o_valid);
327
328         return 0;
329 }
330
331 static int osp_md_declare_object_create(const struct lu_env *env,
332                                         struct dt_object *dt,
333                                         struct lu_attr *attr,
334                                         struct dt_allocation_hint *hint,
335                                         struct dt_object_format *dof,
336                                         struct thandle *th)
337 {
338         struct osp_thread_info  *osi = osp_env_info(env);
339         struct update_request   *update;
340         struct lu_fid           *fid1;
341         int                     sizes[2] = {sizeof(struct obdo), 0};
342         char                    *bufs[2] = {NULL, NULL};
343         int                     buf_count;
344         int                     rc;
345
346
347         update = osp_find_create_update_loc(th, dt);
348         if (IS_ERR(update)) {
349                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
350                        dt->do_lu.lo_dev->ld_obd->obd_name,
351                        (int)PTR_ERR(update));
352                 return PTR_ERR(update);
353         }
354
355         osi->osi_obdo.o_valid = 0;
356         LASSERT(S_ISDIR(attr->la_mode));
357         obdo_from_la(&osi->osi_obdo, attr, attr->la_valid);
358         lustre_set_wire_obdo(&osi->osi_obdo, &osi->osi_obdo);
359         obdo_cpu_to_le(&osi->osi_obdo, &osi->osi_obdo);
360
361         bufs[0] = (char *)&osi->osi_obdo;
362         buf_count = 1;
363         fid1 = (struct lu_fid *)lu_object_fid(&dt->do_lu);
364         if (hint->dah_parent) {
365                 struct lu_fid *fid2;
366                 struct lu_fid *tmp_fid = &osi->osi_fid;
367
368                 fid2 = (struct lu_fid *)lu_object_fid(&hint->dah_parent->do_lu);
369                 fid_cpu_to_le(tmp_fid, fid2);
370                 sizes[1] = sizeof(*tmp_fid);
371                 bufs[1] = (char *)tmp_fid;
372                 buf_count++;
373         }
374
375         if (lu_object_exists(&dt->do_lu)) {
376                 /* If the object already exists, we needs to destroy
377                  * this orphan object first.
378                  *
379                  * The scenario might happen in this case
380                  *
381                  * 1. client send remote create to MDT0.
382                  * 2. MDT0 send create update to MDT1.
383                  * 3. MDT1 finished create synchronously.
384                  * 4. MDT0 failed and reboot.
385                  * 5. client resend remote create to MDT0.
386                  * 6. MDT0 tries to resend create update to MDT1,
387                  *    but find the object already exists
388                  */
389                 CDEBUG(D_HA, "%s: object "DFID" exists, destroy this orphan\n",
390                        dt->do_lu.lo_dev->ld_obd->obd_name, PFID(fid1));
391
392                 rc = osp_insert_update(env, update, OBJ_REF_DEL, fid1, 0,
393                                        NULL, NULL);
394                 if (rc != 0)
395                         GOTO(out, rc);
396
397                 if (S_ISDIR(lu_object_attr(&dt->do_lu))) {
398                         /* decrease for ".." */
399                         rc = osp_insert_update(env, update, OBJ_REF_DEL, fid1,
400                                                0, NULL, NULL);
401                         if (rc != 0)
402                                 GOTO(out, rc);
403                 }
404
405                 rc = osp_insert_update(env, update, OBJ_DESTROY, fid1, 0, NULL,
406                                        NULL);
407                 if (rc != 0)
408                         GOTO(out, rc);
409
410                 dt->do_lu.lo_header->loh_attr &= ~LOHA_EXISTS;
411                 /* Increase batchid to add this orphan object deletion
412                  * to separate transaction */
413                 osp_md_add_update_batchid(update);
414         }
415
416         rc = osp_insert_update(env, update, OBJ_CREATE, fid1, buf_count, sizes,
417                                bufs);
418 out:
419         if (rc)
420                 CERROR("%s: Insert update error: rc = %d\n",
421                        dt->do_lu.lo_dev->ld_obd->obd_name, rc);
422
423         return rc;
424 }
425
426 static int osp_md_object_create(const struct lu_env *env, struct dt_object *dt,
427                                 struct lu_attr *attr,
428                                 struct dt_allocation_hint *hint,
429                                 struct dt_object_format *dof,
430                                 struct thandle *th)
431 {
432         struct osp_object  *obj = dt2osp_obj(dt);
433
434         CDEBUG(D_INFO, "create object "DFID"\n",
435                PFID(&dt->do_lu.lo_header->loh_fid));
436
437         /* Because the create update RPC will be sent during declare phase,
438          * if creation reaches here, it means the object has been created
439          * successfully */
440         dt->do_lu.lo_header->loh_attr |= LOHA_EXISTS | (attr->la_mode & S_IFMT);
441         obj->opo_empty = 1;
442
443         return 0;
444 }
445
446 static int osp_md_declare_object_ref_del(const struct lu_env *env,
447                                          struct dt_object *dt,
448                                          struct thandle *th)
449 {
450         struct update_request   *update;
451         struct lu_fid           *fid;
452         int                     rc;
453
454         update = osp_find_create_update_loc(th, dt);
455         if (IS_ERR(update)) {
456                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
457                        dt->do_lu.lo_dev->ld_obd->obd_name,
458                       (int)PTR_ERR(update));
459                 return PTR_ERR(update);
460         }
461
462         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
463
464         rc = osp_insert_update(env, update, OBJ_REF_DEL, fid, 0, NULL, NULL);
465
466         return rc;
467 }
468
469 static int osp_md_object_ref_del(const struct lu_env *env,
470                                  struct dt_object *dt,
471                                  struct thandle *th)
472 {
473         CDEBUG(D_INFO, "ref del object "DFID"\n",
474                PFID(&dt->do_lu.lo_header->loh_fid));
475
476         return 0;
477 }
478
479 static int osp_md_declare_ref_add(const struct lu_env *env,
480                                   struct dt_object *dt, struct thandle *th)
481 {
482         struct update_request   *update;
483         struct lu_fid           *fid;
484         int                     rc;
485
486         update = osp_find_create_update_loc(th, dt);
487         if (IS_ERR(update)) {
488                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
489                        dt->do_lu.lo_dev->ld_obd->obd_name,
490                        (int)PTR_ERR(update));
491                 return PTR_ERR(update);
492         }
493
494         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
495
496         rc = osp_insert_update(env, update, OBJ_REF_ADD, fid, 0, NULL, NULL);
497
498         return rc;
499 }
500
501 static int osp_md_object_ref_add(const struct lu_env *env,
502                                  struct dt_object *dt,
503                                  struct thandle *th)
504 {
505         CDEBUG(D_INFO, "ref add object "DFID"\n",
506                PFID(&dt->do_lu.lo_header->loh_fid));
507
508         return 0;
509 }
510
511 static void osp_md_ah_init(const struct lu_env *env,
512                            struct dt_allocation_hint *ah,
513                            struct dt_object *parent,
514                            struct dt_object *child,
515                            cfs_umode_t child_mode)
516 {
517         LASSERT(ah);
518
519         memset(ah, 0, sizeof(*ah));
520         ah->dah_parent = parent;
521         ah->dah_mode = child_mode;
522 }
523
524 static int osp_md_declare_attr_set(const struct lu_env *env,
525                                    struct dt_object *dt,
526                                    const struct lu_attr *attr,
527                                    struct thandle *th)
528 {
529         struct osp_thread_info *osi = osp_env_info(env);
530         struct update_request  *update;
531         struct lu_fid          *fid;
532         int                     size = sizeof(struct obdo);
533         char                   *buf;
534         int                     rc;
535
536         update = osp_find_create_update_loc(th, dt);
537         if (IS_ERR(update)) {
538                 CERROR("%s: Get OSP update buf failed: %d\n",
539                        dt->do_lu.lo_dev->ld_obd->obd_name,
540                        (int)PTR_ERR(update));
541                 return PTR_ERR(update);
542         }
543
544         osi->osi_obdo.o_valid = 0;
545         LASSERT(!(attr->la_valid & (LA_MODE | LA_TYPE)));
546         obdo_from_la(&osi->osi_obdo, (struct lu_attr *)attr,
547                      attr->la_valid);
548         lustre_set_wire_obdo(&osi->osi_obdo, &osi->osi_obdo);
549         obdo_cpu_to_le(&osi->osi_obdo, &osi->osi_obdo);
550
551         buf = (char *)&osi->osi_obdo;
552         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
553
554         rc = osp_insert_update(env, update, OBJ_ATTR_SET, fid, 1, &size, &buf);
555
556         return rc;
557 }
558
559 static int osp_md_attr_set(const struct lu_env *env, struct dt_object *dt,
560                            const struct lu_attr *attr, struct thandle *th,
561                            struct lustre_capa *capa)
562 {
563         CDEBUG(D_INFO, "attr set object "DFID"\n",
564                PFID(&dt->do_lu.lo_header->loh_fid));
565
566         RETURN(0);
567 }
568
569 static int osp_md_declare_xattr_set(const struct lu_env *env,
570                                     struct dt_object *dt,
571                                     const struct lu_buf *buf,
572                                     const char *name, int flag,
573                                     struct thandle *th)
574 {
575         struct update_request   *update;
576         struct lu_fid           *fid;
577         int                     sizes[3] = {strlen(name), buf->lb_len,
578                                             sizeof(int)};
579         char                    *bufs[3] = {(char *)name, (char *)buf->lb_buf };
580         int                     rc;
581
582         update = osp_find_create_update_loc(th, dt);
583         if (IS_ERR(update)) {
584                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
585                        dt->do_lu.lo_dev->ld_obd->obd_name,
586                        (int)PTR_ERR(update));
587                 return PTR_ERR(update);
588         }
589
590         flag = cpu_to_le32(flag);
591         bufs[2] = (char *)&flag;
592
593         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
594         rc = osp_insert_update(env, update, OBJ_XATTR_SET, fid,
595                                ARRAY_SIZE(sizes), sizes, bufs);
596
597         return rc;
598 }
599
600 static int osp_md_xattr_set(const struct lu_env *env, struct dt_object *dt,
601                             const struct lu_buf *buf, const char *name, int fl,
602                             struct thandle *th, struct lustre_capa *capa)
603 {
604         CDEBUG(D_INFO, "xattr %s set object "DFID"\n", name,
605                PFID(&dt->do_lu.lo_header->loh_fid));
606
607         return 0;
608 }
609
610 static int osp_md_xattr_get(const struct lu_env *env, struct dt_object *dt,
611                             struct lu_buf *buf, const char *name,
612                             struct lustre_capa *capa)
613 {
614         struct dt_device        *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
615         struct update_request   *update = NULL;
616         struct ptlrpc_request   *req = NULL;
617         int                     rc;
618         int                     buf_len;
619         int                     size;
620         struct update_reply     *reply;
621         void                    *ea_buf;
622         ENTRY;
623
624         /* Because it needs send the update buffer right away,
625          * just create an update buffer, instead of attaching the
626          * update_remote list of the thandle.
627          */
628         update = osp_create_update_req(dt_dev);
629         if (IS_ERR(update))
630                 RETURN(PTR_ERR(update));
631
632         LASSERT(name != NULL);
633         buf_len = strlen(name);
634         rc = osp_insert_update(env, update, OBJ_XATTR_GET,
635                                (struct lu_fid *)lu_object_fid(&dt->do_lu),
636                                1, &buf_len, (char **)&name);
637         if (rc != 0) {
638                 CERROR("%s: Insert update error: rc = %d\n",
639                        dt->do_lu.lo_dev->ld_obd->obd_name, rc);
640                 GOTO(out, rc);
641         }
642         dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
643
644         rc = osp_remote_sync(env, dt_dev, update, &req);
645         if (rc != 0)
646                 GOTO(out, rc);
647
648         reply = req_capsule_server_sized_get(&req->rq_pill, &RMF_UPDATE_REPLY,
649                                             UPDATE_BUFFER_SIZE);
650         if (reply->ur_version != UPDATE_REPLY_V1) {
651                 CERROR("%s: Wrong version %x expected %x: rc = %d\n",
652                        dt_dev->dd_lu_dev.ld_obd->obd_name,
653                        reply->ur_version, UPDATE_REPLY_V1, -EPROTO);
654                 GOTO(out, rc = -EPROTO);
655         }
656
657         size = update_get_reply_buf(reply, &ea_buf, 0);
658         if (size < 0)
659                 GOTO(out, rc = size);
660
661         LASSERT(size > 0 && size < CFS_PAGE_SIZE);
662         LASSERT(ea_buf != NULL);
663
664         buf->lb_len = size;
665         memcpy(buf->lb_buf, ea_buf, size);
666 out:
667         if (req != NULL)
668                 ptlrpc_req_finished(req);
669
670         if (update != NULL)
671                 osp_destroy_update_req(update);
672
673         RETURN(rc);
674 }
675
676 static void osp_md_object_read_lock(const struct lu_env *env,
677                                     struct dt_object *dt, unsigned role)
678 {
679         struct osp_object  *obj = dt2osp_obj(dt);
680
681         LASSERT(obj->opo_owner != env);
682         down_read_nested(&obj->opo_sem, role);
683
684         LASSERT(obj->opo_owner == NULL);
685 }
686
687 static void osp_md_object_write_lock(const struct lu_env *env,
688                                      struct dt_object *dt, unsigned role)
689 {
690         struct osp_object *obj = dt2osp_obj(dt);
691
692         down_write_nested(&obj->opo_sem, role);
693
694         LASSERT(obj->opo_owner == NULL);
695         obj->opo_owner = env;
696 }
697
698 static void osp_md_object_read_unlock(const struct lu_env *env,
699                                       struct dt_object *dt)
700 {
701         struct osp_object *obj = dt2osp_obj(dt);
702
703         up_read(&obj->opo_sem);
704 }
705
706 static void osp_md_object_write_unlock(const struct lu_env *env,
707                                        struct dt_object *dt)
708 {
709         struct osp_object *obj = dt2osp_obj(dt);
710
711         LASSERT(obj->opo_owner == env);
712         obj->opo_owner = NULL;
713         up_write(&obj->opo_sem);
714 }
715
716 static int osp_md_object_write_locked(const struct lu_env *env,
717                                       struct dt_object *dt)
718 {
719         struct osp_object *obj = dt2osp_obj(dt);
720
721         return obj->opo_owner == env;
722 }
723
724 static int osp_md_index_lookup(const struct lu_env *env, struct dt_object *dt,
725                                struct dt_rec *rec, const struct dt_key *key,
726                                struct lustre_capa *capa)
727 {
728         struct dt_device        *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
729         struct update_request   *update;
730         struct ptlrpc_request   *req = NULL;
731         int                     size = strlen((char *)key) + 1;
732         char                    *name = (char *)key;
733         int                     rc;
734         struct update_reply     *reply;
735         struct lu_fid           *fid;
736
737         ENTRY;
738
739         /* Because it needs send the update buffer right away,
740          * just create an update buffer, instead of attaching the
741          * update_remote list of the thandle.
742          */
743         update = osp_create_update_req(dt_dev);
744         if (IS_ERR(update))
745                 RETURN(PTR_ERR(update));
746
747         rc = osp_insert_update(env, update, OBJ_INDEX_LOOKUP,
748                                (struct lu_fid *)lu_object_fid(&dt->do_lu),
749                                1, &size, (char **)&name);
750         if (rc) {
751                 CERROR("%s: Insert update error: rc = %d\n",
752                        dt_dev->dd_lu_dev.ld_obd->obd_name, rc);
753                 GOTO(out, rc);
754         }
755
756         rc = osp_remote_sync(env, dt_dev, update, &req);
757         if (rc < 0) {
758                 CERROR("%s: lookup "DFID" %s failed: rc = %d\n",
759                        dt_dev->dd_lu_dev.ld_obd->obd_name,
760                        PFID(lu_object_fid(&dt->do_lu)), (char *)key, rc);
761                 GOTO(out, rc);
762         }
763
764         reply = req_capsule_server_sized_get(&req->rq_pill, &RMF_UPDATE_REPLY,
765                                              UPDATE_BUFFER_SIZE);
766         if (reply->ur_version != UPDATE_REPLY_V1) {
767                 CERROR("%s: Wrong version %x expected %x: rc = %d\n",
768                        dt_dev->dd_lu_dev.ld_obd->obd_name,
769                        reply->ur_version, UPDATE_REPLY_V1, -EPROTO);
770                 GOTO(out, rc = -EPROTO);
771         }
772
773         rc = update_get_reply_result(reply, NULL, 0);
774         if (rc < 0) {
775                 CERROR("%s: wrong version lookup "DFID" %s: rc = %d\n",
776                        dt_dev->dd_lu_dev.ld_obd->obd_name,
777                        PFID(lu_object_fid(&dt->do_lu)), (char *)key, rc);
778                 GOTO(out, rc);
779         }
780
781         size = update_get_reply_buf(reply, (void **)&fid, 0);
782         if (size < 0)
783                 GOTO(out, rc = size);
784
785         if (size != sizeof(struct lu_fid)) {
786                 CERROR("%s: lookup "DFID" %s wrong size %d: rc = %d\n",
787                        dt_dev->dd_lu_dev.ld_obd->obd_name,
788                        PFID(lu_object_fid(&dt->do_lu)), (char *)key, size, rc);
789                 GOTO(out, rc = -EINVAL);
790         }
791
792         fid_le_to_cpu(fid, fid);
793         if (!fid_is_sane(fid)) {
794                 CERROR("%s: lookup "DFID" %s invalid fid "DFID": rc = %d\n",
795                        dt_dev->dd_lu_dev.ld_obd->obd_name,
796                        PFID(lu_object_fid(&dt->do_lu)), (char *)key, PFID(fid),
797                        rc);
798                 GOTO(out, rc = -EINVAL);
799         }
800         memcpy(rec, fid, sizeof(*fid));
801 out:
802         if (req != NULL)
803                 ptlrpc_req_finished(req);
804
805         if (update != NULL)
806                 osp_destroy_update_req(update);
807
808         RETURN(rc);
809 }
810
811 static int osp_md_declare_insert(const struct lu_env *env,
812                                  struct dt_object *dt,
813                                  const struct dt_rec *rec,
814                                  const struct dt_key *key,
815                                  struct thandle *th)
816 {
817         struct update_request   *update;
818         struct lu_fid           *fid;
819         struct lu_fid           *rec_fid = (struct lu_fid *)rec;
820         int                     size[2] = {strlen((char *)key) + 1,
821                                                   sizeof(*rec_fid)};
822         char                    *bufs[2] = {(char *)key, (char *)rec_fid};
823         int                     rc;
824
825         update = osp_find_create_update_loc(th, dt);
826         if (IS_ERR(update)) {
827                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
828                        dt->do_lu.lo_dev->ld_obd->obd_name,
829                        (int)PTR_ERR(update));
830                 return PTR_ERR(update);
831         }
832
833         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
834
835         CDEBUG(D_INFO, "%s: insert index of "DFID" %s: "DFID"\n",
836                dt->do_lu.lo_dev->ld_obd->obd_name,
837                PFID(fid), (char *)key, PFID(rec_fid));
838
839         fid_cpu_to_le(rec_fid, rec_fid);
840
841         rc = osp_insert_update(env, update, OBJ_INDEX_INSERT, fid,
842                                ARRAY_SIZE(size), size, bufs);
843         return rc;
844 }
845
846 static int osp_md_index_insert(const struct lu_env *env,
847                                struct dt_object *dt,
848                                const struct dt_rec *rec,
849                                const struct dt_key *key,
850                                struct thandle *th,
851                                struct lustre_capa *capa,
852                                int ignore_quota)
853 {
854         return 0;
855 }
856
857 static int osp_md_declare_delete(const struct lu_env *env,
858                                  struct dt_object *dt,
859                                  const struct dt_key *key,
860                                  struct thandle *th)
861 {
862         struct update_request *update;
863         struct lu_fid *fid;
864         int size = strlen((char *)key) + 1;
865         char *buf = (char *)key;
866         int rc;
867
868         update = osp_find_create_update_loc(th, dt);
869         if (IS_ERR(update)) {
870                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
871                        dt->do_lu.lo_dev->ld_obd->obd_name,
872                        (int)PTR_ERR(update));
873                 return PTR_ERR(update);
874         }
875
876         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
877
878         rc = osp_insert_update(env, update, OBJ_INDEX_DELETE, fid, 1, &size,
879                                &buf);
880
881         return rc;
882 }
883
884 static int osp_md_index_delete(const struct lu_env *env,
885                                struct dt_object *dt,
886                                const struct dt_key *key,
887                                struct thandle *th,
888                                struct lustre_capa *capa)
889 {
890         CDEBUG(D_INFO, "index delete "DFID" %s\n",
891                PFID(&dt->do_lu.lo_header->loh_fid), (char *)key);
892
893         return 0;
894 }
895
896 /**
897  * Creates or initializes iterator context.
898  *
899  * Note: for OSP, these index iterate api is only used to check
900  * whether the directory is empty now (see mdd_dir_is_empty).
901  * Since dir_empty will be return by OBJ_ATTR_GET(see osp_md_attr_get/
902  * out_attr_get). So the implementation of these iterator is simplied
903  * to make mdd_dir_is_empty happy. The real iterator should be
904  * implemented, if we need it one day.
905  */
906 static struct dt_it *osp_it_init(const struct lu_env *env,
907                                  struct dt_object *dt,
908                                  __u32 attr,
909                                 struct lustre_capa *capa)
910 {
911         lu_object_get(&dt->do_lu);
912         return (struct dt_it *)dt;
913 }
914
915 static void osp_it_fini(const struct lu_env *env, struct dt_it *di)
916 {
917         struct dt_object *dt = (struct dt_object *)di;
918         lu_object_put(env, &dt->do_lu);
919 }
920
921 static int osp_it_get(const struct lu_env *env,
922                       struct dt_it *di, const struct dt_key *key)
923 {
924         return 1;
925 }
926
927 static void osp_it_put(const struct lu_env *env, struct dt_it *di)
928 {
929         return;
930 }
931
932 static int osp_it_next(const struct lu_env *env, struct dt_it *di)
933 {
934         struct dt_object *dt = (struct dt_object *)di;
935         struct osp_object *osp_obj = dt2osp_obj(dt);
936         if (osp_obj->opo_empty)
937                 return 1;
938         return 0;
939 }
940
941 static struct dt_key *osp_it_key(const struct lu_env *env,
942                                  const struct dt_it *di)
943 {
944         LBUG();
945         return NULL;
946 }
947
948 static int osp_it_key_size(const struct lu_env *env, const struct dt_it *di)
949 {
950         LBUG();
951         return 0;
952 }
953
954 static int osp_it_rec(const struct lu_env *env, const struct dt_it *di,
955                       struct dt_rec *lde, __u32 attr)
956 {
957         LBUG();
958         return 0;
959 }
960
961 static __u64 osp_it_store(const struct lu_env *env, const struct dt_it *di)
962 {
963         LBUG();
964         return 0;
965 }
966
967 static int osp_it_load(const struct lu_env *env, const struct dt_it *di,
968                        __u64 hash)
969 {
970         LBUG();
971         return 0;
972 }
973
974 static int osp_it_key_rec(const struct lu_env *env, const struct dt_it *di,
975                           void *key_rec)
976 {
977         LBUG();
978         return 0;
979 }
980
981 static const struct dt_index_operations osp_md_index_ops = {
982         .dio_lookup         = osp_md_index_lookup,
983         .dio_declare_insert = osp_md_declare_insert,
984         .dio_insert         = osp_md_index_insert,
985         .dio_declare_delete = osp_md_declare_delete,
986         .dio_delete         = osp_md_index_delete,
987         .dio_it     = {
988                 .init     = osp_it_init,
989                 .fini     = osp_it_fini,
990                 .get      = osp_it_get,
991                 .put      = osp_it_put,
992                 .next     = osp_it_next,
993                 .key      = osp_it_key,
994                 .key_size = osp_it_key_size,
995                 .rec      = osp_it_rec,
996                 .store    = osp_it_store,
997                 .load     = osp_it_load,
998                 .key_rec  = osp_it_key_rec,
999         }
1000 };
1001
1002 static int osp_md_index_try(const struct lu_env *env,
1003                             struct dt_object *dt,
1004                             const struct dt_index_features *feat)
1005 {
1006         dt->do_index_ops = &osp_md_index_ops;
1007         return 0;
1008 }
1009
1010 static int osp_md_attr_get(const struct lu_env *env,
1011                            struct dt_object *dt, struct lu_attr *attr,
1012                            struct lustre_capa *capa)
1013 {
1014         struct osp_object     *obj = dt2osp_obj(dt);
1015         struct dt_device      *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
1016         struct update_request *update = NULL;
1017         struct ptlrpc_request *req = NULL;
1018         int rc;
1019         ENTRY;
1020
1021         /* Because it needs send the update buffer right away,
1022          * just create an update buffer, instead of attaching the
1023          * update_remote list of the thandle.
1024          */
1025         update = osp_create_update_req(dt_dev);
1026         if (IS_ERR(update))
1027                 RETURN(PTR_ERR(update));
1028
1029         rc = osp_insert_update(env, update, OBJ_ATTR_GET,
1030                                (struct lu_fid *)lu_object_fid(&dt->do_lu),
1031                                0, NULL, NULL);
1032         if (rc) {
1033                 CERROR("%s: Insert update error: rc = %d\n",
1034                        dt_dev->dd_lu_dev.ld_obd->obd_name, rc);
1035                 GOTO(out, rc);
1036         }
1037         dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
1038
1039         rc = osp_remote_sync(env, dt_dev, update, &req);
1040         if (rc < 0)
1041                 GOTO(out, rc);
1042
1043         rc = osp_get_attr_from_req(env, req, attr, 0);
1044         if (rc)
1045                 GOTO(out, rc);
1046
1047         if (attr->la_flags == 1)
1048                 obj->opo_empty = 0;
1049         else
1050                 obj->opo_empty = 1;
1051 out:
1052         if (req != NULL)
1053                 ptlrpc_req_finished(req);
1054
1055         if (update != NULL)
1056                 osp_destroy_update_req(update);
1057
1058         RETURN(rc);
1059 }
1060
1061 static int osp_md_declare_object_destroy(const struct lu_env *env,
1062                                          struct dt_object *dt,
1063                                          struct thandle *th)
1064 {
1065         struct osp_object  *o = dt2osp_obj(dt);
1066         int                 rc = 0;
1067         ENTRY;
1068
1069         /*
1070          * track objects to be destroyed via llog
1071          */
1072         rc = osp_sync_declare_add(env, o, MDS_UNLINK64_REC, th);
1073
1074         RETURN(rc);
1075 }
1076
1077 static int osp_md_object_destroy(const struct lu_env *env,
1078                                  struct dt_object *dt, struct thandle *th)
1079 {
1080         struct osp_object  *o = dt2osp_obj(dt);
1081         int                 rc = 0;
1082         ENTRY;
1083
1084         /*
1085          * once transaction is committed put proper command on
1086          * the queue going to our OST
1087          */
1088         rc = osp_sync_add(env, o, MDS_UNLINK64_REC, th, NULL);
1089
1090         /* not needed in cache any more */
1091         set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags);
1092
1093         RETURN(rc);
1094 }
1095
1096 struct dt_object_operations osp_md_obj_ops = {
1097         .do_read_lock         = osp_md_object_read_lock,
1098         .do_write_lock        = osp_md_object_write_lock,
1099         .do_read_unlock       = osp_md_object_read_unlock,
1100         .do_write_unlock      = osp_md_object_write_unlock,
1101         .do_write_locked      = osp_md_object_write_locked,
1102         .do_declare_create    = osp_md_declare_object_create,
1103         .do_create            = osp_md_object_create,
1104         .do_declare_ref_add   = osp_md_declare_ref_add,
1105         .do_ref_add           = osp_md_object_ref_add,
1106         .do_declare_ref_del   = osp_md_declare_object_ref_del,
1107         .do_ref_del           = osp_md_object_ref_del,
1108         .do_declare_destroy   = osp_md_declare_object_destroy,
1109         .do_destroy           = osp_md_object_destroy,
1110         .do_ah_init           = osp_md_ah_init,
1111         .do_attr_get          = osp_md_attr_get,
1112         .do_declare_attr_set  = osp_md_declare_attr_set,
1113         .do_attr_set          = osp_md_attr_set,
1114         .do_declare_xattr_set = osp_md_declare_xattr_set,
1115         .do_xattr_set         = osp_md_xattr_set,
1116         .do_xattr_get         = osp_md_xattr_get,
1117         .do_index_try         = osp_md_index_try,
1118 };
1119
1120 static int osp_md_object_lock(const struct lu_env *env,
1121                               struct dt_object *dt,
1122                               struct lustre_handle *lh,
1123                               struct ldlm_enqueue_info *einfo,
1124                               void *policy)
1125 {
1126         struct osp_thread_info *info = osp_env_info(env);
1127         struct ldlm_res_id     *res_id = &info->osi_resid;
1128         struct dt_device       *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
1129         struct osp_device      *osp = dt2osp_dev(dt_dev);
1130         struct ptlrpc_request  *req = NULL;
1131         int                     rc = 0;
1132         __u64                   flags = 0;
1133         ldlm_mode_t             mode;
1134
1135         fid_build_reg_res_name(lu_object_fid(&dt->do_lu), res_id);
1136
1137         mode = ldlm_lock_match(osp->opd_obd->obd_namespace,
1138                                LDLM_FL_BLOCK_GRANTED, res_id,
1139                                einfo->ei_type,
1140                                (ldlm_policy_data_t *)policy,
1141                                einfo->ei_mode, lh, 0);
1142         if (mode > 0)
1143                 return ELDLM_OK;
1144
1145         req = ldlm_enqueue_pack(osp->opd_exp, 0);
1146         if (IS_ERR(req))
1147                 RETURN(PTR_ERR(req));
1148
1149         rc = ldlm_cli_enqueue(osp->opd_exp, &req, einfo, res_id,
1150                               (const ldlm_policy_data_t *)policy,
1151                               &flags, NULL, 0, LVB_T_NONE, lh, 0);
1152
1153         ptlrpc_req_finished(req);
1154
1155         return rc == ELDLM_OK ? 0 : -EIO;
1156 }
1157
1158 struct dt_lock_operations osp_md_lock_ops = {
1159         .do_object_lock       = osp_md_object_lock,
1160 };
1161