Whamcloud - gitweb
LU-4976 osp: add comments for osp_md_object functions
[fs/lustre-release.git] / lustre / osp / osp_md_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, Intel Corporation.
24  */
25 /*
26  * lustre/osp/osp_md_object.c
27  *
28  * OST/MDT proxy device (OSP) Metadata methods
29  *
30  * This file implements methods for remote MD object, which include
31  * dt_object_operations, dt_index_operations and dt_body_operations.
32  *
33  * If there are multiple MDTs in one filesystem, one operation might
34  * include modifications in several MDTs. In such cases, clients
35  * send the RPC to the master MDT, then the operation is decomposed into
36  * object updates which will be dispatched to OSD or OSP. The local updates
37  * go to local OSD and the remote updates go to OSP. In OSP, these remote
38  * object updates will be packed into an update RPC, sent to the remote MDT
39  * and handled by Object Update Target (OUT).
40  *
41  * In DNE phase I, because of missing complete recovery solution, updates
42  * will be executed in order and synchronously.
43  *     1. The transaction is created.
44  *     2. In transaction declare, it collects and packs remote
45  *        updates (in osp_md_declare_xxx()).
46  *     3. In transaction start, it sends these remote updates
47  *        to remote MDTs, which will execute these updates synchronously.
48  *     4. In transaction execute phase, the local updates will be executed
49  *        synchronously.
50  *
51  * Author: Di Wang <di.wang@intel.com>
52  */
53
54 #define DEBUG_SUBSYSTEM S_MDS
55
56 #include <lustre_log.h>
57 #include "osp_internal.h"
58
59 static const char dot[] = ".";
60 static const char dotdot[] = "..";
61
62 /**
63  * Implementation of dt_object_operations::do_declare_create
64  *
65  * Insert object create update into the RPC, which will be sent during
66  * transaction start. Note: if the object has already been created,
67  * we must add object destroy updates ahead of create updates, so it will
68  * destroy then recreate the object.
69  *
70  * \param[in] env       execution environment
71  * \param[in] dt        remote object to be created
72  * \param[in] attr      attribute of the created object
73  * \param[in] hint      creation hint
74  * \param[in] dof       creation format information
75  * \param[in] th        the transaction handle
76  *
77  * \retval              0 if the insertion succeeds.
78  * \retval              negative errno if the insertion fails.
79  */
80 int osp_md_declare_object_create(const struct lu_env *env,
81                                  struct dt_object *dt,
82                                  struct lu_attr *attr,
83                                  struct dt_allocation_hint *hint,
84                                  struct dt_object_format *dof,
85                                  struct thandle *th)
86 {
87         struct osp_thread_info          *osi = osp_env_info(env);
88         struct dt_update_request        *update;
89         struct lu_fid                   *fid1;
90         int                             sizes[2] = {sizeof(struct obdo), 0};
91         char                            *bufs[2] = {NULL, NULL};
92         int                             buf_count;
93         int                             rc;
94
95         update = out_find_create_update_loc(th, dt);
96         if (IS_ERR(update)) {
97                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
98                        dt->do_lu.lo_dev->ld_obd->obd_name,
99                        (int)PTR_ERR(update));
100                 return PTR_ERR(update);
101         }
102
103         osi->osi_obdo.o_valid = 0;
104         obdo_from_la(&osi->osi_obdo, attr, attr->la_valid);
105         lustre_set_wire_obdo(NULL, &osi->osi_obdo, &osi->osi_obdo);
106
107         bufs[0] = (char *)&osi->osi_obdo;
108         buf_count = 1;
109         fid1 = (struct lu_fid *)lu_object_fid(&dt->do_lu);
110         if (hint != NULL && hint->dah_parent) {
111                 struct lu_fid *fid2;
112
113                 fid2 = (struct lu_fid *)lu_object_fid(&hint->dah_parent->do_lu);
114                 sizes[1] = sizeof(*fid2);
115                 bufs[1] = (char *)fid2;
116                 buf_count++;
117         }
118
119         if (lu_object_exists(&dt->do_lu)) {
120                 /* If the object already exists, we needs to destroy
121                  * this orphan object first.
122                  *
123                  * The scenario might happen in this case
124                  *
125                  * 1. client send remote create to MDT0.
126                  * 2. MDT0 send create update to MDT1.
127                  * 3. MDT1 finished create synchronously.
128                  * 4. MDT0 failed and reboot.
129                  * 5. client resend remote create to MDT0.
130                  * 6. MDT0 tries to resend create update to MDT1,
131                  *    but find the object already exists
132                  */
133                 CDEBUG(D_HA, "%s: object "DFID" exists, destroy this orphan\n",
134                        dt->do_lu.lo_dev->ld_obd->obd_name, PFID(fid1));
135
136                 rc = out_insert_update(env, update, OUT_REF_DEL, fid1, 0,
137                                        NULL, NULL);
138                 if (rc != 0)
139                         GOTO(out, rc);
140
141                 if (S_ISDIR(lu_object_attr(&dt->do_lu))) {
142                         /* decrease for ".." */
143                         rc = out_insert_update(env, update, OUT_REF_DEL, fid1,
144                                                0, NULL, NULL);
145                         if (rc != 0)
146                                 GOTO(out, rc);
147                 }
148
149                 rc = out_insert_update(env, update, OUT_DESTROY, fid1, 0, NULL,
150                                        NULL);
151                 if (rc != 0)
152                         GOTO(out, rc);
153
154                 dt->do_lu.lo_header->loh_attr &= ~LOHA_EXISTS;
155                 /* Increase batchid to add this orphan object deletion
156                  * to separate transaction */
157                 update_inc_batchid(update);
158         }
159
160         rc = out_insert_update(env, update, OUT_CREATE, fid1, buf_count, sizes,
161                                (const char **)bufs);
162 out:
163         if (rc)
164                 CERROR("%s: Insert update error: rc = %d\n",
165                        dt->do_lu.lo_dev->ld_obd->obd_name, rc);
166
167         return rc;
168 }
169
170 /**
171  * Implementation of dt_object_operations::do_create
172  *
173  * It sets necessary flags for created object. In DNE phase I,
174  * remote updates are actually executed during transaction start,
175  * i.e. the object has already been created when calling this method.
176  *
177  * \param[in] env       execution environment
178  * \param[in] dt        object to be created
179  * \param[in] attr      attribute of the created object
180  * \param[in] hint      creation hint
181  * \param[in] dof       creation format information
182  * \param[in] th        the transaction handle
183  *
184  * \retval              only return 0 for now
185  */
186 int osp_md_object_create(const struct lu_env *env, struct dt_object *dt,
187                          struct lu_attr *attr, struct dt_allocation_hint *hint,
188                          struct dt_object_format *dof, struct thandle *th)
189 {
190         CDEBUG(D_INFO, "create object "DFID"\n",
191                PFID(&dt->do_lu.lo_header->loh_fid));
192
193         /* Because the create update RPC will be sent during declare phase,
194          * if creation reaches here, it means the object has been created
195          * successfully */
196         dt->do_lu.lo_header->loh_attr |= LOHA_EXISTS | (attr->la_mode & S_IFMT);
197
198         return 0;
199 }
200
201 /**
202  * Implementation of dt_object_operations::do_declare_ref_del
203  *
204  * Declare decreasing the reference count of the remote object, i.e. insert
205  * decreasing object reference count update into the RPC, which will be sent
206  * during transaction start.
207  *
208  * \param[in] env       execution environment
209  * \param[in] dt        object to decrease the reference count.
210  * \param[in] th        the transaction handle of refcount decrease.
211  *
212  * \retval              0 if the insertion succeeds.
213  * \retval              negative errno if the insertion fails.
214  */
215 static int osp_md_declare_object_ref_del(const struct lu_env *env,
216                                          struct dt_object *dt,
217                                          struct thandle *th)
218 {
219         struct dt_update_request        *update;
220         struct lu_fid                   *fid;
221         int                             rc;
222
223         update = out_find_create_update_loc(th, dt);
224         if (IS_ERR(update)) {
225                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
226                        dt->do_lu.lo_dev->ld_obd->obd_name,
227                       (int)PTR_ERR(update));
228                 return PTR_ERR(update);
229         }
230
231         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
232
233         rc = out_insert_update(env, update, OUT_REF_DEL, fid, 0, NULL, NULL);
234
235         return rc;
236 }
237
238 /**
239  * Implementation of dt_object_operations::do_ref_del
240  *
241  * Do nothing in this method for now. In DNE phase I, remote updates are
242  * actually executed during transaction start, i.e. the object reference
243  * count has already been decreased when calling this method.
244  *
245  * \param[in] env       execution environment
246  * \param[in] dt        object to decrease the reference count
247  * \param[in] th        the transaction handle
248  *
249  * \retval              only return 0 for now
250  */
251 static int osp_md_object_ref_del(const struct lu_env *env,
252                                  struct dt_object *dt,
253                                  struct thandle *th)
254 {
255         CDEBUG(D_INFO, "ref del object "DFID"\n",
256                PFID(&dt->do_lu.lo_header->loh_fid));
257
258         return 0;
259 }
260
261 /**
262  * Implementation of dt_object_operations::do_declare_ref_del
263  *
264  * Declare increasing the reference count of the remote object,
265  * i.e. insert increasing object reference count update into RPC.
266  *
267  * \param[in] env       execution environment
268  * \param[in] dt        object on which to increase the reference count.
269  * \param[in] th        the transaction handle.
270  *
271  * \retval              0 if the insertion succeeds.
272  * \retval              negative errno if the insertion fails.
273  */
274 static int osp_md_declare_ref_add(const struct lu_env *env,
275                                   struct dt_object *dt, struct thandle *th)
276 {
277         struct dt_update_request        *update;
278         struct lu_fid                   *fid;
279         int                             rc;
280
281         update = out_find_create_update_loc(th, dt);
282         if (IS_ERR(update)) {
283                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
284                        dt->do_lu.lo_dev->ld_obd->obd_name,
285                        (int)PTR_ERR(update));
286                 return PTR_ERR(update);
287         }
288
289         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
290
291         rc = out_insert_update(env, update, OUT_REF_ADD, fid, 0, NULL, NULL);
292
293         return rc;
294 }
295
296 /**
297  * Implementation of dt_object_operations::do_ref_add
298  *
299  * Do nothing in this method for now. In DNE phase I, remote updates are
300  * actually executed during transaction start, i.e. the object reference
301  * count has already been increased when calling this method.
302  *
303  * \param[in] env       execution environment
304  * \param[in] dt        object on which to increase the reference count
305  * \param[in] th        the transaction handle
306  *
307  * \retval              only return 0 for now
308  */
309 static int osp_md_object_ref_add(const struct lu_env *env, struct dt_object *dt,
310                                  struct thandle *th)
311 {
312         CDEBUG(D_INFO, "ref add object "DFID"\n",
313                PFID(&dt->do_lu.lo_header->loh_fid));
314
315         return 0;
316 }
317
318 /**
319  * Implementation of dt_object_operations::do_ah_init
320  *
321  * Initialize the allocation hint for object creation, which is usually called
322  * before the creation, and these hints (parent and child mode) will be sent to
323  * the remote Object Update Target (OUT) and used in the object create process,
324  * same as OSD object creation.
325  *
326  * \param[in] env       execution environment
327  * \param[in] ah        the hint to be initialized
328  * \param[in] parent    the parent of the object
329  * \param[in] child     the object to be created
330  * \param[in] child_mode the mode of the created object
331  */
332 static void osp_md_ah_init(const struct lu_env *env,
333                            struct dt_allocation_hint *ah,
334                            struct dt_object *parent,
335                            struct dt_object *child,
336                            umode_t child_mode)
337 {
338         LASSERT(ah);
339
340         ah->dah_parent = parent;
341         ah->dah_mode = child_mode;
342 }
343
344 /**
345  * Implementation of dt_object_operations::do_declare_attr_get
346  *
347  * Declare setting attributes of the remote object, i.e. insert remote
348  * object attr_set update into RPC.
349  *
350  * \param[in] env       execution environment
351  * \param[in] dt        object on which to set attributes
352  * \param[in] attr      attributes to be set
353  * \param[in] th        the transaction handle
354  *
355  * \retval              0 if the insertion succeeds.
356  * \retval              negative errno if the insertion fails.
357  */
358 int osp_md_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
359                             const struct lu_attr *attr, struct thandle *th)
360 {
361         struct osp_thread_info          *osi = osp_env_info(env);
362         struct dt_update_request        *update;
363         struct lu_fid                   *fid;
364         int                             size = sizeof(struct obdo);
365         char                            *buf;
366         int                             rc;
367
368         update = out_find_create_update_loc(th, dt);
369         if (IS_ERR(update)) {
370                 CERROR("%s: Get OSP update buf failed: %d\n",
371                        dt->do_lu.lo_dev->ld_obd->obd_name,
372                        (int)PTR_ERR(update));
373                 return PTR_ERR(update);
374         }
375
376         osi->osi_obdo.o_valid = 0;
377         obdo_from_la(&osi->osi_obdo, (struct lu_attr *)attr,
378                      attr->la_valid);
379         lustre_set_wire_obdo(NULL, &osi->osi_obdo, &osi->osi_obdo);
380
381         buf = (char *)&osi->osi_obdo;
382         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
383
384         rc = out_insert_update(env, update, OUT_ATTR_SET, fid, 1, &size,
385                                (const char **)&buf);
386
387         return rc;
388 }
389
390 /**
391  * Implementation of dt_object_operations::do_attr_set
392  *
393  * Do nothing in this method for now. In DNE phase I, remote updates
394  * are actually executed during transaction start, i.e. object attributes
395  * have already been set when calling this method.
396  *
397  * \param[in] env       execution environment
398  * \param[in] dt        object to set attributes
399  * \param[in] attr      attributes to be set
400  * \param[in] th        the transaction handle
401  * \param[in] capa      capability of setting attributes (not yet implemented).
402  *
403  * \retval              only return 0 for now
404  */
405 int osp_md_attr_set(const struct lu_env *env, struct dt_object *dt,
406                     const struct lu_attr *attr, struct thandle *th,
407                     struct lustre_capa *capa)
408 {
409         CDEBUG(D_INFO, "attr set object "DFID"\n",
410                PFID(&dt->do_lu.lo_header->loh_fid));
411
412         RETURN(0);
413 }
414
415 /**
416  * Implementation of dt_object_operations::do_read_lock
417  *
418  * osp_md_object_{read,write}_lock() will only lock the remote object in the
419  * local cache, which uses the semaphore (opo_sem) inside the osp_object to
420  * lock the object. Note: it will not lock the object in the whole cluster,
421  * which relies on the LDLM lock.
422  *
423  * \param[in] env       execution environment
424  * \param[in] dt        object to be locked
425  * \param[in] role      lock role from MDD layer, see mdd_object_role().
426  */
427 static void osp_md_object_read_lock(const struct lu_env *env,
428                                     struct dt_object *dt, unsigned role)
429 {
430         struct osp_object  *obj = dt2osp_obj(dt);
431
432         LASSERT(obj->opo_owner != env);
433         down_read_nested(&obj->opo_sem, role);
434
435         LASSERT(obj->opo_owner == NULL);
436 }
437
438 /**
439  * Implementation of dt_object_operations::do_write_lock
440  *
441  * Lock the remote object in write mode.
442  *
443  * \param[in] env       execution environment
444  * \param[in] dt        object to be locked
445  * \param[in] role      lock role from MDD layer, see mdd_object_role().
446  */
447 static void osp_md_object_write_lock(const struct lu_env *env,
448                                      struct dt_object *dt, unsigned role)
449 {
450         struct osp_object *obj = dt2osp_obj(dt);
451
452         down_write_nested(&obj->opo_sem, role);
453
454         LASSERT(obj->opo_owner == NULL);
455         obj->opo_owner = env;
456 }
457
458 /**
459  * Implementation of dt_object_operations::do_read_unlock
460  *
461  * Unlock the read lock of remote object.
462  *
463  * \param[in] env       execution environment
464  * \param[in] dt        object to be unlocked
465  */
466 static void osp_md_object_read_unlock(const struct lu_env *env,
467                                       struct dt_object *dt)
468 {
469         struct osp_object *obj = dt2osp_obj(dt);
470
471         up_read(&obj->opo_sem);
472 }
473
474 /**
475  * Implementation of dt_object_operations::do_write_unlock
476  *
477  * Unlock the write lock of remote object.
478  *
479  * \param[in] env       execution environment
480  * \param[in] dt        object to be unlocked
481  */
482 static void osp_md_object_write_unlock(const struct lu_env *env,
483                                        struct dt_object *dt)
484 {
485         struct osp_object *obj = dt2osp_obj(dt);
486
487         LASSERT(obj->opo_owner == env);
488         obj->opo_owner = NULL;
489         up_write(&obj->opo_sem);
490 }
491
492 /**
493  * Implementation of dt_object_operations::do_write_locked
494  *
495  * Test if the object is locked in write mode.
496  *
497  * \param[in] env       execution environment
498  * \param[in] dt        object to be tested
499  */
500 static int osp_md_object_write_locked(const struct lu_env *env,
501                                       struct dt_object *dt)
502 {
503         struct osp_object *obj = dt2osp_obj(dt);
504
505         return obj->opo_owner == env;
506 }
507
508 /**
509  * Implementation of dt_index_operations::dio_lookup
510  *
511  * Look up record by key under a remote index object. It packs lookup update
512  * into RPC, sends to the remote OUT and waits for the lookup result.
513  *
514  * \param[in] env       execution environment
515  * \param[in] dt        index object to lookup
516  * \param[out] rec      record in which to return lookup result
517  * \param[in] key       key of index which will be looked up
518  * \param[in] capa      capability of lookup (not yet implemented)
519  *
520  * \retval              1 if the lookup succeeds.
521  * \retval              negative errno if the lookup fails.
522  */
523 static int osp_md_index_lookup(const struct lu_env *env, struct dt_object *dt,
524                                struct dt_rec *rec, const struct dt_key *key,
525                                struct lustre_capa *capa)
526 {
527         struct lu_buf           *lbuf   = &osp_env_info(env)->osi_lb2;
528         struct osp_device       *osp    = lu2osp_dev(dt->do_lu.lo_dev);
529         struct dt_device        *dt_dev = &osp->opd_dt_dev;
530         struct dt_update_request   *update;
531         struct object_update_reply *reply;
532         struct ptlrpc_request      *req = NULL;
533         int                        size = strlen((char *)key) + 1;
534         struct lu_fid              *fid;
535         int                        rc;
536         ENTRY;
537
538         /* Because it needs send the update buffer right away,
539          * just create an update buffer, instead of attaching the
540          * update_remote list of the thandle.
541          */
542         update = out_create_update_req(dt_dev);
543         if (IS_ERR(update))
544                 RETURN(PTR_ERR(update));
545
546         rc = out_insert_update(env, update, OUT_INDEX_LOOKUP,
547                                lu_object_fid(&dt->do_lu),
548                                1, &size, (const char **)&key);
549         if (rc) {
550                 CERROR("%s: Insert update error: rc = %d\n",
551                        dt_dev->dd_lu_dev.ld_obd->obd_name, rc);
552                 GOTO(out, rc);
553         }
554
555         rc = out_remote_sync(env, osp->opd_obd->u.cli.cl_import, update, &req);
556         if (rc < 0)
557                 GOTO(out, rc);
558
559         reply = req_capsule_server_sized_get(&req->rq_pill,
560                                              &RMF_OUT_UPDATE_REPLY,
561                                              OUT_UPDATE_REPLY_SIZE);
562         if (reply->ourp_magic != UPDATE_REPLY_MAGIC) {
563                 CERROR("%s: Wrong version %x expected %x: rc = %d\n",
564                        dt_dev->dd_lu_dev.ld_obd->obd_name,
565                        reply->ourp_magic, UPDATE_REPLY_MAGIC, -EPROTO);
566                 GOTO(out, rc = -EPROTO);
567         }
568
569         rc = object_update_result_data_get(reply, lbuf, 0);
570         if (rc < 0)
571                 GOTO(out, rc);
572
573         if (lbuf->lb_len != sizeof(*fid)) {
574                 CERROR("%s: lookup "DFID" %s wrong size %d\n",
575                        dt_dev->dd_lu_dev.ld_obd->obd_name,
576                        PFID(lu_object_fid(&dt->do_lu)), (char *)key,
577                        (int)lbuf->lb_len);
578                 GOTO(out, rc = -EINVAL);
579         }
580
581         fid = lbuf->lb_buf;
582         if (ptlrpc_rep_need_swab(req))
583                 lustre_swab_lu_fid(fid);
584         if (!fid_is_sane(fid)) {
585                 CERROR("%s: lookup "DFID" %s invalid fid "DFID"\n",
586                        dt_dev->dd_lu_dev.ld_obd->obd_name,
587                        PFID(lu_object_fid(&dt->do_lu)), (char *)key, PFID(fid));
588                 GOTO(out, rc = -EINVAL);
589         }
590
591         memcpy(rec, fid, sizeof(*fid));
592
593         GOTO(out, rc = 1);
594
595 out:
596         if (req != NULL)
597                 ptlrpc_req_finished(req);
598
599         out_destroy_update_req(update);
600
601         return rc;
602 }
603
604 /**
605  * Implementation of dt_index_operations::dio_declare_insert
606  *
607  * Declare the index insert of the remote object, i.e. pack index insert update
608  * into the RPC, which will be sent during transaction start.
609  *
610  * \param[in] env       execution environment
611  * \param[in] dt        object for which to insert index
612  * \param[in] rec       record of the index which will be inserted
613  * \param[in] key       key of the index which will be inserted
614  * \param[in] th        the transaction handle
615  *
616  * \retval              0 if the insertion succeeds.
617  * \retval              negative errno if the insertion fails.
618  */
619 static int osp_md_declare_insert(const struct lu_env *env,
620                                  struct dt_object *dt,
621                                  const struct dt_rec *rec,
622                                  const struct dt_key *key,
623                                  struct thandle *th)
624 {
625         struct osp_thread_info     *info = osp_env_info(env);
626         struct dt_update_request   *update;
627         struct dt_insert_rec       *rec1 = (struct dt_insert_rec *)rec;
628         struct lu_fid              *fid =
629                                 (struct lu_fid *)lu_object_fid(&dt->do_lu);
630         struct lu_fid              *rec_fid = &info->osi_fid;
631         __u32                       type = cpu_to_le32(rec1->rec_type);
632         int                         size[3] = { strlen((char *)key) + 1,
633                                                 sizeof(*rec_fid),
634                                                 sizeof(type) };
635         const char                 *bufs[3] = { (char *)key,
636                                                 (char *)rec_fid,
637                                                 (char *)&type };
638         int                         rc;
639
640         update = out_find_create_update_loc(th, dt);
641         if (IS_ERR(update)) {
642                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
643                        dt->do_lu.lo_dev->ld_obd->obd_name,
644                        (int)PTR_ERR(update));
645                 return PTR_ERR(update);
646         }
647
648         CDEBUG(D_INFO, "%s: insert index of "DFID" %s: "DFID", %u\n",
649                dt->do_lu.lo_dev->ld_obd->obd_name,
650                PFID(fid), (char *)key, PFID(rec1->rec_fid), rec1->rec_type);
651
652         fid_cpu_to_le(rec_fid, rec1->rec_fid);
653         rc = out_insert_update(env, update, OUT_INDEX_INSERT, fid,
654                                ARRAY_SIZE(size), size, bufs);
655         return rc;
656 }
657
658 /**
659  * Implementation of dt_index_operations::dio_insert
660  *
661  * Do nothing in this method for now. In DNE phase I, remote updates
662  * are actually executed during transaction start, i.e. the index has
663  * already been inserted when calling this method.
664  *
665  * \param[in] env       execution environment
666  * \param[in] dt        object for which to insert index
667  * \param[in] rec       record of the index to be inserted
668  * \param[in] key       key of the index to be inserted
669  * \param[in] th        the transaction handle
670  * \param[in] capa      capability of insert (not yet implemented)
671  * \param[in] ignore_quota quota enforcement for insert
672  *
673  * \retval              only return 0 for now
674  */
675 static int osp_md_index_insert(const struct lu_env *env,
676                                struct dt_object *dt,
677                                const struct dt_rec *rec,
678                                const struct dt_key *key,
679                                struct thandle *th,
680                                struct lustre_capa *capa,
681                                int ignore_quota)
682 {
683         return 0;
684 }
685
686 /**
687  * Implementation of dt_index_operations::dio_declare_delete
688  *
689  * Declare the index delete of the remote object, i.e. insert index delete
690  * update into the RPC, which will be sent during transaction start.
691  *
692  * \param[in] env       execution environment
693  * \param[in] dt        object for which to delete index
694  * \param[in] key       key of the index
695  * \param[in] th        the transaction handle
696  *
697  * \retval              0 if the insertion succeeds.
698  * \retval              negative errno if the insertion fails.
699  */
700 static int osp_md_declare_delete(const struct lu_env *env,
701                                  struct dt_object *dt,
702                                  const struct dt_key *key,
703                                  struct thandle *th)
704 {
705         struct dt_update_request *update;
706         struct lu_fid *fid;
707         int size = strlen((char *)key) + 1;
708         int rc;
709
710         update = out_find_create_update_loc(th, dt);
711         if (IS_ERR(update)) {
712                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
713                        dt->do_lu.lo_dev->ld_obd->obd_name,
714                        (int)PTR_ERR(update));
715                 return PTR_ERR(update);
716         }
717
718         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
719
720         rc = out_insert_update(env, update, OUT_INDEX_DELETE, fid, 1, &size,
721                                (const char **)&key);
722
723         return rc;
724 }
725
726 /**
727  * Implementation of dt_index_operations::dio_delete
728  *
729  * Do nothing in this method for now. Because in DNE phase I, remote updates
730  * are actually executed during transaction start, i.e. the index has already
731  * been deleted when calling this method.
732  *
733  * \param[in] env       execution environment
734  * \param[in] dt        object for which to delete index
735  * \param[in] key       key of the index which will be deleted
736  * \param[in] th        the transaction handle
737  * \param[in] capa      capability of delete (not yet implemented)
738  *
739  * \retval              only return 0 for now
740  */
741 static int osp_md_index_delete(const struct lu_env *env,
742                                struct dt_object *dt,
743                                const struct dt_key *key,
744                                struct thandle *th,
745                                struct lustre_capa *capa)
746 {
747         CDEBUG(D_INFO, "index delete "DFID" %s\n",
748                PFID(&dt->do_lu.lo_header->loh_fid), (char *)key);
749
750         return 0;
751 }
752
753 /**
754  * Implementation of dt_index_operations::dio_it.next
755  *
756  * Advance the pointer of the iterator to the next entry. It shares a similar
757  * internal implementation with osp_orphan_it_next(), which is being used for
758  * remote orphan index object. This method will be used for remote directory.
759  *
760  * \param[in] env       execution environment
761  * \param[in] di        iterator of this iteration
762  *
763  * \retval              0 if the pointer is advanced successfuly.
764  * \retval              1 if it reaches to the end of the index object.
765  * \retval              negative errno if the pointer cannot be advanced.
766  */
767 int osp_md_index_it_next(const struct lu_env *env, struct dt_it *di)
768 {
769         struct osp_it           *it = (struct osp_it *)di;
770         struct lu_idxpage       *idxpage;
771         struct lu_dirent        *ent = (struct lu_dirent *)it->ooi_ent;
772         int                     rc;
773         ENTRY;
774
775 again:
776         idxpage = it->ooi_cur_idxpage;
777         if (idxpage != NULL) {
778                 if (idxpage->lip_nr == 0)
779                         RETURN(1);
780
781                 it->ooi_pos_ent++;
782                 if (ent == NULL) {
783                         it->ooi_ent =
784                               (struct lu_dirent *)idxpage->lip_entries;
785                         RETURN(0);
786                 } else if (le16_to_cpu(ent->lde_reclen) != 0 &&
787                            it->ooi_pos_ent < idxpage->lip_nr) {
788                         ent = (struct lu_dirent *)(((char *)ent) +
789                                         le16_to_cpu(ent->lde_reclen));
790                         it->ooi_ent = ent;
791                         RETURN(0);
792                 } else {
793                         it->ooi_ent = NULL;
794                 }
795         }
796
797         rc = osp_it_next_page(env, di);
798         if (rc == 0)
799                 goto again;
800
801         RETURN(rc);
802 }
803
804 /**
805  * Implementation of dt_index_operations::dio_it.key
806  *
807  * Get the key at current iterator poisiton. These iteration methods
808  * (dio_it) will only be used for iterating the remote directory, so
809  * the key is the name of the directory entry.
810  *
811  * \param[in] env       execution environment
812  * \param[in] di        iterator of this iteration
813  *
814  * \retval              name of the current entry
815  */
816 static struct dt_key *osp_it_key(const struct lu_env *env,
817                                  const struct dt_it *di)
818 {
819         struct osp_it           *it = (struct osp_it *)di;
820         struct lu_dirent        *ent = (struct lu_dirent *)it->ooi_ent;
821
822         return (struct dt_key *)ent->lde_name;
823 }
824
825 /**
826  * Implementation of dt_index_operations::dio_it.key_size
827  *
828  * Get the key size at current iterator poisiton. These iteration methods
829  * (dio_it) will only be used for iterating the remote directory, so the key
830  * size is the name size of the directory entry.
831  *
832  * \param[in] env       execution environment
833  * \param[in] di        iterator of this iteration
834  *
835  * \retval              name size of the current entry
836  */
837
838 static int osp_it_key_size(const struct lu_env *env, const struct dt_it *di)
839 {
840         struct osp_it           *it = (struct osp_it *)di;
841         struct lu_dirent        *ent = (struct lu_dirent *)it->ooi_ent;
842
843         return (int)le16_to_cpu(ent->lde_namelen);
844 }
845
846 /**
847  * Implementation of dt_index_operations::dio_it.rec
848  *
849  * Get the record at current iterator position. These iteration methods
850  * (dio_it) will only be used for iterating the remote directory, so it
851  * uses lu_dirent_calc_size() to calculate the record size.
852  *
853  * \param[in] env       execution environment
854  * \param[in] di        iterator of this iteration
855  * \param[out] rec      the record to be returned
856  * \param[in] attr      attributes of the index object, so it knows
857  *                      how to pack the entry.
858  *
859  * \retval              only return 0 for now
860  */
861 static int osp_md_index_it_rec(const struct lu_env *env, const struct dt_it *di,
862                                struct dt_rec *rec, __u32 attr)
863 {
864         struct osp_it           *it = (struct osp_it *)di;
865         struct lu_dirent        *ent = (struct lu_dirent *)it->ooi_ent;
866         int                     reclen;
867
868         reclen = lu_dirent_calc_size(le16_to_cpu(ent->lde_namelen), attr);
869         memcpy(rec, ent, reclen);
870         return 0;
871 }
872
873 /**
874  * Implementation of dt_index_operations::dio_it.load
875  *
876  * Locate the iteration cursor to the specified position (cookie).
877  *
878  * \param[in] env       pointer to the thread context
879  * \param[in] di        pointer to the iteration structure
880  * \param[in] hash      the specified position
881  *
882  * \retval              positive number for locating to the exactly position
883  *                      or the next
884  * \retval              0 for arriving at the end of the iteration
885  * \retval              negative error number on failure
886  */
887 static int osp_it_load(const struct lu_env *env, const struct dt_it *di,
888                        __u64 hash)
889 {
890         struct osp_it   *it     = (struct osp_it *)di;
891         int              rc;
892
893         it->ooi_next = hash;
894         rc = osp_md_index_it_next(env, (struct dt_it *)di);
895         if (rc == 1)
896                 return 0;
897
898         if (rc == 0)
899                 return 1;
900
901         return rc;
902 }
903
904 const struct dt_index_operations osp_md_index_ops = {
905         .dio_lookup         = osp_md_index_lookup,
906         .dio_declare_insert = osp_md_declare_insert,
907         .dio_insert         = osp_md_index_insert,
908         .dio_declare_delete = osp_md_declare_delete,
909         .dio_delete         = osp_md_index_delete,
910         .dio_it     = {
911                 .init     = osp_it_init,
912                 .fini     = osp_it_fini,
913                 .get      = osp_it_get,
914                 .put      = osp_it_put,
915                 .next     = osp_md_index_it_next,
916                 .key      = osp_it_key,
917                 .key_size = osp_it_key_size,
918                 .rec      = osp_md_index_it_rec,
919                 .store    = osp_it_store,
920                 .load     = osp_it_load,
921                 .key_rec  = osp_it_key_rec,
922         }
923 };
924
925 /**
926  * Implementation of dt_object_operations::do_index_try
927  *
928  * Try to initialize the index API pointer for the given object. This
929  * is the entry point of the index API, i.e. we must call this method
930  * to initialize the index object before calling other index methods.
931  *
932  * \param[in] env       execution environment
933  * \param[in] dt        index object to be initialized
934  * \param[in] feat      the index feature of the object
935  *
936  * \retval              0 if the initialization succeeds.
937  * \retval              negative errno if the initialization fails.
938  */
939 static int osp_md_index_try(const struct lu_env *env,
940                             struct dt_object *dt,
941                             const struct dt_index_features *feat)
942 {
943         dt->do_index_ops = &osp_md_index_ops;
944         return 0;
945 }
946
947 /**
948  * Implementation of dt_object_operations::do_object_lock
949  *
950  * Enqueue a lock (by ldlm_cli_enqueue()) of remote object on the remote MDT,
951  * which will lock the object in the global namespace.
952  *
953  * \param[in] env       execution environment
954  * \param[in] dt        object to be locked
955  * \param[out] lh       lock handle
956  * \param[in] einfo     enqueue information
957  * \param[in] policy    lock policy
958  *
959  * \retval              ELDLM_OK if locking the object succeeds.
960  * \retval              negative errno if locking fails.
961  */
962 static int osp_md_object_lock(const struct lu_env *env,
963                               struct dt_object *dt,
964                               struct lustre_handle *lh,
965                               struct ldlm_enqueue_info *einfo,
966                               ldlm_policy_data_t *policy)
967 {
968         struct ldlm_res_id      *res_id;
969         struct dt_device        *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
970         struct osp_device       *osp = dt2osp_dev(dt_dev);
971         struct ptlrpc_request   *req;
972         int                     rc = 0;
973         __u64                   flags = 0;
974         ldlm_mode_t             mode;
975
976         res_id = einfo->ei_res_id;
977         LASSERT(res_id != NULL);
978
979         mode = ldlm_lock_match(osp->opd_obd->obd_namespace,
980                                LDLM_FL_BLOCK_GRANTED, res_id,
981                                einfo->ei_type, policy,
982                                einfo->ei_mode, lh, 0);
983         if (mode > 0)
984                 return ELDLM_OK;
985
986         req = ldlm_enqueue_pack(osp->opd_exp, 0);
987         if (IS_ERR(req))
988                 RETURN(PTR_ERR(req));
989
990         rc = ldlm_cli_enqueue(osp->opd_exp, &req, einfo, res_id,
991                               (const ldlm_policy_data_t *)policy,
992                               &flags, NULL, 0, LVB_T_NONE, lh, 0);
993
994         ptlrpc_req_finished(req);
995
996         return rc == ELDLM_OK ? 0 : -EIO;
997 }
998
999 /**
1000  * Implementation of dt_object_operations::do_object_unlock
1001  *
1002  * Cancel a lock of a remote object.
1003  *
1004  * \param[in] env       execution environment
1005  * \param[in] dt        object to be unlocked
1006  * \param[in] einfo     lock enqueue information
1007  * \param[in] policy    lock policy
1008  *
1009  * \retval              Only return 0 for now.
1010  */
1011 static int osp_md_object_unlock(const struct lu_env *env,
1012                                 struct dt_object *dt,
1013                                 struct ldlm_enqueue_info *einfo,
1014                                 ldlm_policy_data_t *policy)
1015 {
1016         struct lustre_handle    *lockh = einfo->ei_cbdata;
1017
1018         /* unlock finally */
1019         ldlm_lock_decref(lockh, einfo->ei_mode);
1020
1021         return 0;
1022 }
1023
1024 struct dt_object_operations osp_md_obj_ops = {
1025         .do_read_lock         = osp_md_object_read_lock,
1026         .do_write_lock        = osp_md_object_write_lock,
1027         .do_read_unlock       = osp_md_object_read_unlock,
1028         .do_write_unlock      = osp_md_object_write_unlock,
1029         .do_write_locked      = osp_md_object_write_locked,
1030         .do_declare_create    = osp_md_declare_object_create,
1031         .do_create            = osp_md_object_create,
1032         .do_declare_ref_add   = osp_md_declare_ref_add,
1033         .do_ref_add           = osp_md_object_ref_add,
1034         .do_declare_ref_del   = osp_md_declare_object_ref_del,
1035         .do_ref_del           = osp_md_object_ref_del,
1036         .do_declare_destroy   = osp_declare_object_destroy,
1037         .do_destroy           = osp_object_destroy,
1038         .do_ah_init           = osp_md_ah_init,
1039         .do_attr_get          = osp_attr_get,
1040         .do_declare_attr_set  = osp_md_declare_attr_set,
1041         .do_attr_set          = osp_md_attr_set,
1042         .do_xattr_get         = osp_xattr_get,
1043         .do_declare_xattr_set = osp_declare_xattr_set,
1044         .do_xattr_set         = osp_xattr_set,
1045         .do_declare_xattr_del = osp_declare_xattr_del,
1046         .do_xattr_del         = osp_xattr_del,
1047         .do_index_try         = osp_md_index_try,
1048         .do_object_lock       = osp_md_object_lock,
1049         .do_object_unlock     = osp_md_object_unlock,
1050 };
1051
1052 /**
1053  * Implementation of dt_body_operations::dbo_declare_write
1054  *
1055  * Declare an object write. In DNE phase I, it will pack the write
1056  * object update into the RPC.
1057  *
1058  * \param[in] env       execution environment
1059  * \param[in] dt        object to be written
1060  * \param[in] buf       buffer to write which includes an embedded size field
1061  * \param[in] pos       offet in the object to start writing at
1062  * \param[in] th        transaction handle
1063  *
1064  * \retval              0 if the insertion succeeds.
1065  * \retval              negative errno if the insertion fails.
1066  */
1067 static ssize_t osp_md_declare_write(const struct lu_env *env,
1068                                     struct dt_object *dt,
1069                                     const struct lu_buf *buf,
1070                                     loff_t pos, struct thandle *th)
1071 {
1072         struct dt_update_request  *update;
1073         struct lu_fid             *fid;
1074         int                       sizes[2] = {buf->lb_len, sizeof(pos)};
1075         const char                *bufs[2] = {(char *)buf->lb_buf,
1076                                               (char *)&pos};
1077         ssize_t                   rc;
1078
1079         update = out_find_create_update_loc(th, dt);
1080         if (IS_ERR(update)) {
1081                 CERROR("%s: Get OSP update buf failed: rc = %d\n",
1082                        dt->do_lu.lo_dev->ld_obd->obd_name,
1083                        (int)PTR_ERR(update));
1084                 return PTR_ERR(update);
1085         }
1086
1087         pos = cpu_to_le64(pos);
1088         bufs[1] = (char *)&pos;
1089         fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
1090         rc = out_insert_update(env, update, OUT_WRITE, fid,
1091                                ARRAY_SIZE(sizes), sizes, bufs);
1092
1093         return rc;
1094
1095 }
1096
1097 /**
1098  * Implementation of dt_body_operations::dbo_write
1099  *
1100  * Return the buffer size. In DNE phase I, remote updates
1101  * are actually executed during transaction start, the buffer has
1102  * already been written when this method is being called.
1103  *
1104  * \param[in] env       execution environment
1105  * \param[in] dt        object to be written
1106  * \param[in] buf       buffer to write which includes an embedded size field
1107  * \param[in] pos       offet in the object to start writing at
1108  * \param[in] th        transaction handle
1109  * \param[in] capa      capability of the write (not yet implemented)
1110  * \param[in] ignore_quota quota enforcement for this write
1111  *
1112  * \retval              the buffer size in bytes.
1113  */
1114 static ssize_t osp_md_write(const struct lu_env *env, struct dt_object *dt,
1115                             const struct lu_buf *buf, loff_t *pos,
1116                             struct thandle *handle,
1117                             struct lustre_capa *capa, int ignore_quota)
1118 {
1119         return buf->lb_len;
1120 }
1121
1122 /* These body operation will be used to write symlinks during migration etc */
1123 struct dt_body_operations osp_md_body_ops = {
1124         .dbo_declare_write      = osp_md_declare_write,
1125         .dbo_write              = osp_md_write,
1126 };