Whamcloud - gitweb
LU-8079 llog: Remove llog_cat_init_and_process
[fs/lustre-release.git] / lustre / osp / osp_sync.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2012, 2015, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/osp/osp_sync.c
37  *
38  * Lustre OST Proxy Device
39  *
40  * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
41  * Author: Mikhail Pershin <mike.pershin@intel.com>
42  */
43
44 #define DEBUG_SUBSYSTEM S_MDS
45
46 #include <linux/kthread.h>
47 #include <lustre_log.h>
48 #include <lustre_update.h>
49 #include "osp_internal.h"
50
51 static int osp_sync_id_traction_init(struct osp_device *d);
52 static void osp_sync_id_traction_fini(struct osp_device *d);
53 static __u32 osp_sync_id_get(struct osp_device *d, __u32 id);
54 static void osp_sync_remove_from_tracker(struct osp_device *d);
55
56 /*
57  * this is a components of OSP implementing synchronization between MDS and OST
58  * it llogs all interesting changes (currently it's uig/gid change and object
59  * destroy) atomically, then makes sure changes hit OST storage
60  *
61  * we have 4 queues of work:
62  *
63  * the first queue is llog itself, once read a change is stored in 2nd queue
64  * in form of RPC (but RPC isn't fired yet).
65  *
66  * the second queue (opd_syn_waiting_for_commit) holds changes awaiting local
67  * commit. once change is committed locally it migrates onto 3rd queue.
68  *
69  * the third queue (opd_syn_committed_here) holds changes committed locally,
70  * but not sent to OST (as the pipe can be full). once pipe becomes non-full
71  * we take a change from the queue and fire corresponded RPC.
72  *
73  * once RPC is reported committed by OST (using regular last_committed mech.)
74  * the change jumps into 4th queue (opd_syn_committed_there), now we can
75  * cancel corresponded llog record and release RPC
76  *
77  * opd_syn_changes is a number of unread llog records (to be processed).
78  * notice this number doesn't include llog records from previous boots.
79  * with OSP_SYN_THRESHOLD we try to batch processing a bit (TO BE IMPLEMENTED)
80  *
81  * opd_syn_rpc_in_progress is a number of requests in 2-4 queues.
82  * we control this with OSP_MAX_IN_PROGRESS so that OSP don't consume
83  * too much memory -- how to deal with 1000th OSTs ? batching could help?
84  *
85  * opd_syn_rpc_in_flight is a number of RPC in flight.
86  * we control this with OSP_MAX_IN_FLIGHT
87  */
88
89 /* XXX: do math to learn reasonable threshold
90  * should it be ~ number of changes fitting bulk? */
91
92 #define OSP_SYN_THRESHOLD       10
93 #define OSP_MAX_IN_FLIGHT       8
94 #define OSP_MAX_IN_PROGRESS     4096
95
96 #define OSP_JOB_MAGIC           0x26112005
97
98 struct osp_job_req_args {
99         /** bytes reserved for ptlrpc_replay_req() */
100         struct ptlrpc_replay_async_args jra_raa;
101         struct list_head                jra_committed_link;
102         struct list_head                jra_inflight_link;
103         __u32                           jra_magic;
104 };
105
106 static inline int osp_sync_running(struct osp_device *d)
107 {
108         return !!(d->opd_syn_thread.t_flags & SVC_RUNNING);
109 }
110
111 /**
112  * Check status: whether OSP thread has stopped
113  *
114  * \param[in] d         OSP device
115  *
116  * \retval 0            still running
117  * \retval 1            stopped
118  */
119 static inline int osp_sync_stopped(struct osp_device *d)
120 {
121         return !!(d->opd_syn_thread.t_flags & SVC_STOPPED);
122 }
123
124 /*
125  ** Check for new changes to sync
126  *
127  * \param[in] d         OSP device
128  *
129  * \retval 1            there are changes
130  * \retval 0            there are no changes
131  */
132 static inline int osp_sync_has_new_job(struct osp_device *d)
133 {
134         return ((d->opd_syn_last_processed_id < d->opd_syn_last_used_id) &&
135                 (d->opd_syn_last_processed_id < d->opd_syn_last_committed_id))
136                 || (d->opd_syn_prev_done == 0);
137 }
138
139 static inline int osp_sync_inflight_conflict(struct osp_device *d,
140                                              struct llog_rec_hdr *h)
141 {
142         struct osp_job_req_args *jra;
143         struct ost_id            ostid;
144         int                      conflict = 0;
145
146         if (h == NULL || h->lrh_type == LLOG_GEN_REC ||
147             list_empty(&d->opd_syn_inflight_list))
148                 return conflict;
149
150         memset(&ostid, 0, sizeof(ostid));
151         switch (h->lrh_type) {
152         case MDS_UNLINK_REC:
153                 ostid_set_seq(&ostid, ((struct llog_unlink_rec *)h)->lur_oseq);
154                 ostid_set_id(&ostid, ((struct llog_unlink_rec *)h)->lur_oid);
155                 break;
156         case MDS_UNLINK64_REC:
157                 fid_to_ostid(&((struct llog_unlink64_rec *)h)->lur_fid, &ostid);
158                 break;
159         case MDS_SETATTR64_REC:
160                 ostid = ((struct llog_setattr64_rec *)h)->lsr_oi;
161                 break;
162         default:
163                 LBUG();
164         }
165
166         spin_lock(&d->opd_syn_lock);
167         list_for_each_entry(jra, &d->opd_syn_inflight_list, jra_inflight_link) {
168                 struct ptlrpc_request   *req;
169                 struct ost_body         *body;
170
171                 LASSERT(jra->jra_magic == OSP_JOB_MAGIC);
172
173                 req = container_of((void *)jra, struct ptlrpc_request,
174                                    rq_async_args);
175                 body = req_capsule_client_get(&req->rq_pill,
176                                               &RMF_OST_BODY);
177                 LASSERT(body);
178
179                 if (memcmp(&ostid, &body->oa.o_oi, sizeof(ostid)) == 0) {
180                         conflict = 1;
181                         break;
182                 }
183         }
184         spin_unlock(&d->opd_syn_lock);
185
186         return conflict;
187 }
188
189 static inline int osp_sync_low_in_progress(struct osp_device *d)
190 {
191         return d->opd_syn_rpc_in_progress < d->opd_syn_max_rpc_in_progress;
192 }
193
194 /**
195  * Check for room in the network pipe to OST
196  *
197  * \param[in] d         OSP device
198  *
199  * \retval 1            there is room
200  * \retval 0            no room, the pipe is full
201  */
202 static inline int osp_sync_low_in_flight(struct osp_device *d)
203 {
204         return d->opd_syn_rpc_in_flight < d->opd_syn_max_rpc_in_flight;
205 }
206
207 /**
208  * Wake up check for the main sync thread
209  *
210  * \param[in] d         OSP device
211  *
212  * \retval 1            time to wake up
213  * \retval 0            no need to wake up
214  */
215 static inline int osp_sync_has_work(struct osp_device *d)
216 {
217         /* has new/old changes and low in-progress? */
218         if (osp_sync_has_new_job(d) && osp_sync_low_in_progress(d) &&
219             osp_sync_low_in_flight(d) && d->opd_imp_connected)
220                 return 1;
221
222         /* has remotely committed? */
223         if (!list_empty(&d->opd_syn_committed_there))
224                 return 1;
225
226         return 0;
227 }
228
229 #define osp_sync_check_for_work(d)                      \
230 {                                                       \
231         if (osp_sync_has_work(d)) {                     \
232                 wake_up(&d->opd_syn_waitq);    \
233         }                                               \
234 }
235
236 void __osp_sync_check_for_work(struct osp_device *d)
237 {
238         osp_sync_check_for_work(d);
239 }
240
241 /**
242  * Check and return ready-for-new status.
243  *
244  * The thread processing llog record uses this function to check whether
245  * it's time to take another record and process it. The number of conditions
246  * must be met: the connection should be ready, RPCs in flight not exceeding
247  * the limit, the record is committed locally, etc (see the lines below).
248  *
249  * \param[in] d         OSP device
250  * \param[in] rec       next llog record to process
251  *
252  * \retval 0            not ready
253  * \retval 1            ready
254  */
255 static inline int osp_sync_can_process_new(struct osp_device *d,
256                                            struct llog_rec_hdr *rec)
257 {
258         LASSERT(d);
259
260         if (unlikely(atomic_read(&d->opd_syn_barrier) > 0))
261                 return 0;
262         if (unlikely(osp_sync_inflight_conflict(d, rec)))
263                 return 0;
264         if (!osp_sync_low_in_progress(d))
265                 return 0;
266         if (!osp_sync_low_in_flight(d))
267                 return 0;
268         if (!d->opd_imp_connected)
269                 return 0;
270         if (d->opd_syn_prev_done == 0)
271                 return 1;
272         if (d->opd_syn_changes == 0)
273                 return 0;
274         if (rec == NULL || rec->lrh_id <= d->opd_syn_last_committed_id)
275                 return 1;
276         return 0;
277 }
278
279 /**
280  * Declare intention to add a new change.
281  *
282  * With regard to OSD API, we have to declare any changes ahead. In this
283  * case we declare an intention to add a llog record representing the
284  * change on the local storage.
285  *
286  * \param[in] env       LU environment provided by the caller
287  * \param[in] o         OSP object
288  * \param[in] type      type of change: MDS_UNLINK64_REC or MDS_SETATTR64_REC
289  * \param[in] th        transaction handle (local)
290  *
291  * \retval 0            on success
292  * \retval negative     negated errno on error
293  */
294 int osp_sync_declare_add(const struct lu_env *env, struct osp_object *o,
295                          llog_op_type type, struct thandle *th)
296 {
297         struct osp_thread_info  *osi = osp_env_info(env);
298         struct osp_device       *d = lu2osp_dev(o->opo_obj.do_lu.lo_dev);
299         struct llog_ctxt        *ctxt;
300         struct thandle          *storage_th;
301         int                      rc;
302
303         ENTRY;
304
305         /* it's a layering violation, to access internals of th,
306          * but we can do this as a sanity check, for a while */
307         LASSERT(th->th_top != NULL);
308         storage_th = thandle_get_sub_by_dt(env, th->th_top, d->opd_storage);
309         if (IS_ERR(storage_th))
310                 RETURN(PTR_ERR(storage_th));
311
312         switch (type) {
313         case MDS_UNLINK64_REC:
314                 osi->osi_hdr.lrh_len = sizeof(struct llog_unlink64_rec);
315                 break;
316         case MDS_SETATTR64_REC:
317                 osi->osi_hdr.lrh_len = sizeof(struct llog_setattr64_rec);
318                 break;
319         default:
320                 LBUG();
321         }
322
323         /* we want ->dt_trans_start() to allocate per-thandle structure */
324         storage_th->th_tags |= LCT_OSP_THREAD;
325
326         ctxt = llog_get_context(d->opd_obd, LLOG_MDS_OST_ORIG_CTXT);
327         LASSERT(ctxt);
328
329         rc = llog_declare_add(env, ctxt->loc_handle, &osi->osi_hdr,
330                               storage_th);
331         llog_ctxt_put(ctxt);
332
333         RETURN(rc);
334 }
335
336 /**
337  * Generate a llog record for a given change.
338  *
339  * Generates a llog record for the change passed. The change can be of two
340  * types: unlink and setattr. The record gets an ID which later will be
341  * used to track commit status of the change. For unlink changes, the caller
342  * can supply a starting FID and the count of the objects to destroy. For
343  * setattr the caller should apply attributes to apply.
344  *
345  *
346  * \param[in] env       LU environment provided by the caller
347  * \param[in] d         OSP device
348  * \param[in] fid       fid of the object the change should be applied to
349  * \param[in] type      type of change: MDS_UNLINK64_REC or MDS_SETATTR64_REC
350  * \param[in] count     count of objects to destroy
351  * \param[in] th        transaction handle (local)
352  * \param[in] attr      attributes for setattr
353  *
354  * \retval 0            on success
355  * \retval negative     negated errno on error
356  */
357 static int osp_sync_add_rec(const struct lu_env *env, struct osp_device *d,
358                             const struct lu_fid *fid, llog_op_type type,
359                             int count, struct thandle *th,
360                             const struct lu_attr *attr)
361 {
362         struct osp_thread_info  *osi = osp_env_info(env);
363         struct llog_ctxt        *ctxt;
364         struct osp_txn_info     *txn;
365         struct thandle          *storage_th;
366         int                      rc;
367
368         ENTRY;
369
370         /* it's a layering violation, to access internals of th,
371          * but we can do this as a sanity check, for a while */
372         LASSERT(th->th_top != NULL);
373         storage_th = thandle_get_sub_by_dt(env, th->th_top, d->opd_storage);
374         if (IS_ERR(storage_th))
375                 RETURN(PTR_ERR(storage_th));
376
377         switch (type) {
378         case MDS_UNLINK64_REC:
379                 osi->osi_hdr.lrh_len = sizeof(osi->osi_unlink);
380                 osi->osi_hdr.lrh_type = MDS_UNLINK64_REC;
381                 osi->osi_unlink.lur_fid  = *fid;
382                 osi->osi_unlink.lur_count = count;
383                 break;
384         case MDS_SETATTR64_REC:
385                 rc = fid_to_ostid(fid, &osi->osi_oi);
386                 LASSERT(rc == 0);
387                 osi->osi_hdr.lrh_len = sizeof(osi->osi_setattr);
388                 osi->osi_hdr.lrh_type = MDS_SETATTR64_REC;
389                 osi->osi_setattr.lsr_oi  = osi->osi_oi;
390                 LASSERT(attr);
391                 osi->osi_setattr.lsr_uid = attr->la_uid;
392                 osi->osi_setattr.lsr_gid = attr->la_gid;
393                 osi->osi_setattr.lsr_valid =
394                         ((attr->la_valid & LA_UID) ? OBD_MD_FLUID : 0) |
395                         ((attr->la_valid & LA_GID) ? OBD_MD_FLGID : 0);
396                 break;
397         default:
398                 LBUG();
399         }
400
401         txn = osp_txn_info(&storage_th->th_ctx);
402         LASSERT(txn);
403
404         txn->oti_current_id = osp_sync_id_get(d, txn->oti_current_id);
405         osi->osi_hdr.lrh_id = txn->oti_current_id;
406
407         ctxt = llog_get_context(d->opd_obd, LLOG_MDS_OST_ORIG_CTXT);
408         if (ctxt == NULL)
409                 RETURN(-ENOMEM);
410
411         rc = llog_add(env, ctxt->loc_handle, &osi->osi_hdr, &osi->osi_cookie,
412                       storage_th);
413         llog_ctxt_put(ctxt);
414
415         if (likely(rc >= 0)) {
416                 CDEBUG(D_OTHER, "%s: new record "DOSTID":%lu/%lu: %d\n",
417                        d->opd_obd->obd_name,
418                        POSTID(&osi->osi_cookie.lgc_lgl.lgl_oi),
419                        (unsigned long)osi->osi_cookie.lgc_lgl.lgl_ogen,
420                        (unsigned long)osi->osi_cookie.lgc_index, rc);
421                 spin_lock(&d->opd_syn_lock);
422                 d->opd_syn_changes++;
423                 spin_unlock(&d->opd_syn_lock);
424         }
425         /* return 0 always here, error case just cause no llog record */
426         RETURN(0);
427 }
428
429 int osp_sync_add(const struct lu_env *env, struct osp_object *o,
430                  llog_op_type type, struct thandle *th,
431                  const struct lu_attr *attr)
432 {
433         return osp_sync_add_rec(env, lu2osp_dev(o->opo_obj.do_lu.lo_dev),
434                                 lu_object_fid(&o->opo_obj.do_lu), type, 1,
435                                 th, attr);
436 }
437
438 int osp_sync_gap(const struct lu_env *env, struct osp_device *d,
439                         struct lu_fid *fid, int lost, struct thandle *th)
440 {
441         return osp_sync_add_rec(env, d, fid, MDS_UNLINK64_REC, lost, th, NULL);
442 }
443
444 /*
445  * it's quite obvious we can't maintain all the structures in the memory:
446  * while OST is down, MDS can be processing thousands and thousands of unlinks
447  * filling persistent llogs and in-core respresentation
448  *
449  * this doesn't scale at all. so we need basically the following:
450  * a) destroy/setattr append llog records
451  * b) once llog has grown to X records, we process first Y committed records
452  *
453  *  once record R is found via llog_process(), it becomes committed after any
454  *  subsequent commit callback (at the most)
455  */
456
457 /**
458  * ptlrpc commit callback.
459  *
460  * The callback is called by PTLRPC when a RPC is reported committed by the
461  * target (OST). We register the callback for the every RPC applying a change
462  * from the llog. This way we know then the llog records can be cancelled.
463  * Notice the callback can be called when OSP is finishing. We can detect this
464  * checking that actual transno in the request is less or equal of known
465  * committed transno (see osp_sync_process_committed() for the details).
466  * XXX: this is pretty expensive and can be improved later using batching.
467  *
468  * \param[in] req       request
469  */
470 static void osp_sync_request_commit_cb(struct ptlrpc_request *req)
471 {
472         struct osp_device *d = req->rq_cb_data;
473         struct osp_job_req_args *jra;
474
475         CDEBUG(D_HA, "commit req %p, transno "LPU64"\n", req, req->rq_transno);
476
477         if (unlikely(req->rq_transno == 0))
478                 return;
479
480         /* do not do any opd_dyn_rpc_* accounting here
481          * it's done in osp_sync_interpret sooner or later */
482         LASSERT(d);
483
484         jra = ptlrpc_req_async_args(req);
485         LASSERT(jra->jra_magic == OSP_JOB_MAGIC);
486         LASSERT(list_empty(&jra->jra_committed_link));
487
488         ptlrpc_request_addref(req);
489
490         spin_lock(&d->opd_syn_lock);
491         list_add(&jra->jra_committed_link, &d->opd_syn_committed_there);
492         spin_unlock(&d->opd_syn_lock);
493
494         /* XXX: some batching wouldn't hurt */
495         wake_up(&d->opd_syn_waitq);
496 }
497
498 /**
499  * RPC interpretation callback.
500  *
501  * The callback is called by ptlrpc when RPC is replied. Now we have to decide
502  * whether we should:
503  *  - put request on a special list to wait until it's committed by the target,
504  *    if the request is successful
505  *  - schedule llog record cancel if no target object is found
506  *  - try later (essentially after reboot) in case of unexpected error
507  *
508  * \param[in] env       LU environment provided by the caller
509  * \param[in] req       request replied
510  * \param[in] aa        callback data
511  * \param[in] rc        result of RPC
512  *
513  * \retval 0            always
514  */
515 static int osp_sync_interpret(const struct lu_env *env,
516                               struct ptlrpc_request *req, void *aa, int rc)
517 {
518         struct osp_device *d = req->rq_cb_data;
519         struct osp_job_req_args *jra = aa;
520
521         if (jra->jra_magic != OSP_JOB_MAGIC) {
522                 DEBUG_REQ(D_ERROR, req, "bad magic %u\n", jra->jra_magic);
523                 LBUG();
524         }
525         LASSERT(d);
526
527         CDEBUG(D_HA, "reply req %p/%d, rc %d, transno %u\n", req,
528                atomic_read(&req->rq_refcount),
529                rc, (unsigned) req->rq_transno);
530         LASSERT(rc || req->rq_transno);
531
532         if (rc == -ENOENT) {
533                 /*
534                  * we tried to destroy object or update attributes,
535                  * but object doesn't exist anymore - cancell llog record
536                  */
537                 LASSERT(req->rq_transno == 0);
538                 LASSERT(list_empty(&jra->jra_committed_link));
539
540                 ptlrpc_request_addref(req);
541
542                 spin_lock(&d->opd_syn_lock);
543                 list_add(&jra->jra_committed_link, &d->opd_syn_committed_there);
544                 spin_unlock(&d->opd_syn_lock);
545
546                 wake_up(&d->opd_syn_waitq);
547         } else if (rc) {
548                 struct obd_import *imp = req->rq_import;
549                 /*
550                  * error happened, we'll try to repeat on next boot ?
551                  */
552                 LASSERTF(req->rq_transno == 0 ||
553                          req->rq_import_generation < imp->imp_generation,
554                          "transno "LPU64", rc %d, gen: req %d, imp %d\n",
555                          req->rq_transno, rc, req->rq_import_generation,
556                          imp->imp_generation);
557                 if (req->rq_transno == 0) {
558                         /* this is the last time we see the request
559                          * if transno is not zero, then commit cb
560                          * will be called at some point */
561                         LASSERT(d->opd_syn_rpc_in_progress > 0);
562                         spin_lock(&d->opd_syn_lock);
563                         d->opd_syn_rpc_in_progress--;
564                         spin_unlock(&d->opd_syn_lock);
565                 }
566
567                 wake_up(&d->opd_syn_waitq);
568         } else if (d->opd_pre != NULL &&
569                    unlikely(d->opd_pre_status == -ENOSPC)) {
570                 /*
571                  * if current status is -ENOSPC (lack of free space on OST)
572                  * then we should poll OST immediately once object destroy
573                  * is replied
574                  */
575                 osp_statfs_need_now(d);
576         }
577
578         LASSERT(d->opd_syn_rpc_in_flight > 0);
579         spin_lock(&d->opd_syn_lock);
580         d->opd_syn_rpc_in_flight--;
581         list_del_init(&jra->jra_inflight_link);
582         spin_unlock(&d->opd_syn_lock);
583         if (unlikely(atomic_read(&d->opd_syn_barrier) > 0))
584                 wake_up(&d->opd_syn_barrier_waitq);
585         CDEBUG(D_OTHER, "%s: %d in flight, %d in progress\n",
586                d->opd_obd->obd_name, d->opd_syn_rpc_in_flight,
587                d->opd_syn_rpc_in_progress);
588
589         osp_sync_check_for_work(d);
590
591         return 0;
592 }
593
594 /*
595  ** Add request to ptlrpc queue.
596  *
597  * This is just a tiny helper function to put the request on the sending list
598  *
599  * \param[in] d         OSP device
600  * \param[in] req       request
601  */
602 static void osp_sync_send_new_rpc(struct osp_device *d,
603                                   struct ptlrpc_request *req)
604 {
605         struct osp_job_req_args *jra;
606
607         LASSERT(d->opd_syn_rpc_in_flight <= d->opd_syn_max_rpc_in_flight);
608
609         jra = ptlrpc_req_async_args(req);
610         jra->jra_magic = OSP_JOB_MAGIC;
611         INIT_LIST_HEAD(&jra->jra_committed_link);
612         spin_lock(&d->opd_syn_lock);
613         list_add_tail(&jra->jra_inflight_link, &d->opd_syn_inflight_list);
614         spin_unlock(&d->opd_syn_lock);
615
616         ptlrpcd_add_req(req);
617 }
618
619
620 /**
621  * Allocate and prepare RPC for a new change.
622  *
623  * The function allocates and initializes an RPC which will be sent soon to
624  * apply the change to the target OST. The request is initialized from the
625  * llog record passed. Notice only the fields common to all type of changes
626  * are initialized.
627  *
628  * \param[in] d         OSP device
629  * \param[in] llh       llog handle where the record is stored
630  * \param[in] h         llog record
631  * \param[in] op        type of the change
632  * \param[in] format    request format to be used
633  *
634  * \retval pointer              new request on success
635  * \retval ERR_PTR(errno)       on error
636  */
637 static struct ptlrpc_request *osp_sync_new_job(struct osp_device *d,
638                                                struct llog_handle *llh,
639                                                struct llog_rec_hdr *h,
640                                                ost_cmd_t op,
641                                                const struct req_format *format)
642 {
643         struct ptlrpc_request   *req;
644         struct ost_body         *body;
645         struct obd_import       *imp;
646         int                      rc;
647
648         /* Prepare the request */
649         imp = d->opd_obd->u.cli.cl_import;
650         LASSERT(imp);
651
652         if (OBD_FAIL_CHECK(OBD_FAIL_OSP_CHECK_ENOMEM))
653                 RETURN(ERR_PTR(-ENOMEM));
654
655         req = ptlrpc_request_alloc(imp, format);
656         if (req == NULL)
657                 RETURN(ERR_PTR(-ENOMEM));
658
659         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, op);
660         if (rc) {
661                 ptlrpc_req_finished(req);
662                 return ERR_PTR(rc);
663         }
664
665         /*
666          * this is a trick: to save on memory allocations we put cookie
667          * into the request, but don't set corresponded flag in o_valid
668          * so that OST doesn't interpret this cookie. once the request
669          * is committed on OST we take cookie from the request and cancel
670          */
671         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
672         LASSERT(body);
673         body->oa.o_lcookie.lgc_lgl = llh->lgh_id;
674         body->oa.o_lcookie.lgc_subsys = LLOG_MDS_OST_ORIG_CTXT;
675         body->oa.o_lcookie.lgc_index = h->lrh_index;
676
677         req->rq_interpret_reply = osp_sync_interpret;
678         req->rq_commit_cb = osp_sync_request_commit_cb;
679         req->rq_cb_data = d;
680
681         ptlrpc_request_set_replen(req);
682
683         return req;
684 }
685
686 /**
687  * Generate a request for setattr change.
688  *
689  * The function prepares a new RPC, initializes it with setattr specific
690  * bits and send the RPC.
691  *
692  * \param[in] d         OSP device
693  * \param[in] llh       llog handle where the record is stored
694  * \param[in] h         llog record
695  *
696  * \retval 0            on success
697  * \retval 1            on invalid record
698  * \retval negative     negated errno on error
699  */
700 static int osp_sync_new_setattr_job(struct osp_device *d,
701                                     struct llog_handle *llh,
702                                     struct llog_rec_hdr *h)
703 {
704         struct llog_setattr64_rec       *rec = (struct llog_setattr64_rec *)h;
705         struct ptlrpc_request           *req;
706         struct ost_body                 *body;
707
708         ENTRY;
709         LASSERT(h->lrh_type == MDS_SETATTR64_REC);
710
711         if (OBD_FAIL_CHECK(OBD_FAIL_OSP_CHECK_INVALID_REC))
712                 RETURN(1);
713         /* lsr_valid can only be 0 or have OBD_MD_{FLUID,FLGID} set,
714          * so no bits other than these should be set. */
715         if ((rec->lsr_valid & ~(OBD_MD_FLUID | OBD_MD_FLGID)) != 0) {
716                 CERROR("%s: invalid setattr record, lsr_valid:"LPU64"\n",
717                        d->opd_obd->obd_name, rec->lsr_valid);
718                 /* return 1 on invalid record */
719                 RETURN(1);
720         }
721
722         req = osp_sync_new_job(d, llh, h, OST_SETATTR, &RQF_OST_SETATTR);
723         if (IS_ERR(req))
724                 RETURN(PTR_ERR(req));
725
726         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
727         LASSERT(body);
728         body->oa.o_oi = rec->lsr_oi;
729         body->oa.o_uid = rec->lsr_uid;
730         body->oa.o_gid = rec->lsr_gid;
731         body->oa.o_valid = OBD_MD_FLGROUP | OBD_MD_FLID;
732         /* old setattr record (prior 2.6.0) doesn't have 'valid' stored,
733          * we assume that both UID and GID are valid in that case. */
734         if (rec->lsr_valid == 0)
735                 body->oa.o_valid |= (OBD_MD_FLUID | OBD_MD_FLGID);
736         else
737                 body->oa.o_valid |= rec->lsr_valid;
738
739         osp_sync_send_new_rpc(d, req);
740         RETURN(0);
741 }
742
743 /**
744  * Generate a request for unlink change.
745  *
746  * The function prepares a new RPC, initializes it with unlink(destroy)
747  * specific bits and sends the RPC. The function is used to handle
748  * llog_unlink_rec which were used in the older versions of Lustre.
749  * Current version uses llog_unlink_rec64.
750  *
751  * \param[in] d         OSP device
752  * \param[in] llh       llog handle where the record is stored
753  * \param[in] h         llog record
754  *
755  * \retval 0            on success
756  * \retval negative     negated errno on error
757  */
758 static int osp_sync_new_unlink_job(struct osp_device *d,
759                                    struct llog_handle *llh,
760                                    struct llog_rec_hdr *h)
761 {
762         struct llog_unlink_rec  *rec = (struct llog_unlink_rec *)h;
763         struct ptlrpc_request   *req;
764         struct ost_body         *body;
765
766         ENTRY;
767         LASSERT(h->lrh_type == MDS_UNLINK_REC);
768
769         req = osp_sync_new_job(d, llh, h, OST_DESTROY, &RQF_OST_DESTROY);
770         if (IS_ERR(req))
771                 RETURN(PTR_ERR(req));
772
773         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
774         LASSERT(body);
775         ostid_set_seq(&body->oa.o_oi, rec->lur_oseq);
776         ostid_set_id(&body->oa.o_oi, rec->lur_oid);
777         body->oa.o_misc = rec->lur_count;
778         body->oa.o_valid = OBD_MD_FLGROUP | OBD_MD_FLID;
779         if (rec->lur_count)
780                 body->oa.o_valid |= OBD_MD_FLOBJCOUNT;
781
782         osp_sync_send_new_rpc(d, req);
783         RETURN(0);
784 }
785
786 /**
787  * Generate a request for unlink change.
788  *
789  * The function prepares a new RPC, initializes it with unlink(destroy)
790  * specific bits and sends the RPC. Depending on the target (MDT or OST)
791  * two different protocols are used. For MDT we use OUT (basically OSD API
792  * updates transferred via a network). For OST we still use the old
793  * protocol (OBD?), originally for compatibility. Later we can start to
794  * use OUT for OST as well, this will allow batching and better code
795  * unification.
796  *
797  * \param[in] d         OSP device
798  * \param[in] llh       llog handle where the record is stored
799  * \param[in] h         llog record
800  *
801  * \retval 0            on success
802  * \retval negative     negated errno on error
803  */
804 static int osp_sync_new_unlink64_job(struct osp_device *d,
805                                      struct llog_handle *llh,
806                                      struct llog_rec_hdr *h)
807 {
808         struct llog_unlink64_rec        *rec = (struct llog_unlink64_rec *)h;
809         struct ptlrpc_request           *req = NULL;
810         struct ost_body                 *body;
811         int                              rc;
812
813         ENTRY;
814         LASSERT(h->lrh_type == MDS_UNLINK64_REC);
815         req = osp_sync_new_job(d, llh, h, OST_DESTROY,
816                                &RQF_OST_DESTROY);
817         if (IS_ERR(req))
818                 RETURN(PTR_ERR(req));
819
820         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
821         if (body == NULL)
822                 RETURN(-EFAULT);
823         rc = fid_to_ostid(&rec->lur_fid, &body->oa.o_oi);
824         if (rc < 0)
825                 RETURN(rc);
826         body->oa.o_misc = rec->lur_count;
827         body->oa.o_valid = OBD_MD_FLGROUP | OBD_MD_FLID |
828                            OBD_MD_FLOBJCOUNT;
829         osp_sync_send_new_rpc(d, req);
830         RETURN(0);
831 }
832
833 /**
834  * Process llog records.
835  *
836  * This function is called to process the llog records committed locally.
837  * In the recovery model used by OSP we can apply a change to a remote
838  * target once corresponding transaction (like posix unlink) is committed
839  * locally so can't revert.
840  * Depending on the llog record type, a given handler is called that is
841  * responsible for preparing and sending the RPC to apply the change.
842  * Special record type LLOG_GEN_REC marking a reboot is cancelled right away.
843  *
844  * \param[in] env       LU environment provided by the caller
845  * \param[in] d         OSP device
846  * \param[in] llh       llog handle where the record is stored
847  * \param[in] rec       llog record
848  */
849 static void osp_sync_process_record(const struct lu_env *env,
850                                     struct osp_device *d,
851                                     struct llog_handle *llh,
852                                     struct llog_rec_hdr *rec)
853 {
854         struct llog_handle      *cathandle = llh->u.phd.phd_cat_handle;
855         struct llog_cookie       cookie;
856         int                      rc = 0;
857
858         ENTRY;
859
860         cookie.lgc_lgl = llh->lgh_id;
861         cookie.lgc_subsys = LLOG_MDS_OST_ORIG_CTXT;
862         cookie.lgc_index = rec->lrh_index;
863
864         if (unlikely(rec->lrh_type == LLOG_GEN_REC)) {
865                 struct llog_gen_rec *gen = (struct llog_gen_rec *)rec;
866
867                 /* we're waiting for the record generated by this instance */
868                 LASSERT(d->opd_syn_prev_done == 0);
869                 if (!memcmp(&d->opd_syn_generation, &gen->lgr_gen,
870                             sizeof(gen->lgr_gen))) {
871                         CDEBUG(D_HA, "processed all old entries\n");
872                         d->opd_syn_prev_done = 1;
873                 }
874
875                 /* cancel any generation record */
876                 rc = llog_cat_cancel_records(env, cathandle, 1, &cookie);
877
878                 RETURN_EXIT;
879         }
880
881         /*
882          * now we prepare and fill requests to OST, put them on the queue
883          * and fire after next commit callback
884          */
885
886         /* notice we increment counters before sending RPC, to be consistent
887          * in RPC interpret callback which may happen very quickly */
888         spin_lock(&d->opd_syn_lock);
889         d->opd_syn_rpc_in_flight++;
890         d->opd_syn_rpc_in_progress++;
891         spin_unlock(&d->opd_syn_lock);
892
893         switch (rec->lrh_type) {
894         /* case MDS_UNLINK_REC is kept for compatibility */
895         case MDS_UNLINK_REC:
896                 rc = osp_sync_new_unlink_job(d, llh, rec);
897                 break;
898         case MDS_UNLINK64_REC:
899                 rc = osp_sync_new_unlink64_job(d, llh, rec);
900                 break;
901         case MDS_SETATTR64_REC:
902                 rc = osp_sync_new_setattr_job(d, llh, rec);
903                 break;
904         default:
905                 CERROR("%s: unknown record type: %x\n", d->opd_obd->obd_name,
906                        rec->lrh_type);
907                 /* treat "unknown record type" as "invalid" */
908                 rc = 1;
909                 break;
910         }
911
912         spin_lock(&d->opd_syn_lock);
913
914         /* For all kinds of records, not matter successful or not,
915          * we should decrease changes and bump last_processed_id.
916          */
917         if (d->opd_syn_prev_done) {
918                 LASSERT(d->opd_syn_changes > 0);
919                 LASSERT(rec->lrh_id <= d->opd_syn_last_committed_id);
920                 /* NOTE: it's possible to meet same id if
921                  * OST stores few stripes of same file
922                  */
923                 if (rec->lrh_id > d->opd_syn_last_processed_id) {
924                         d->opd_syn_last_processed_id = rec->lrh_id;
925                         wake_up(&d->opd_syn_barrier_waitq);
926                 }
927                 d->opd_syn_changes--;
928         }
929         if (rc != 0) {
930                 d->opd_syn_rpc_in_flight--;
931                 d->opd_syn_rpc_in_progress--;
932         }
933         CDEBUG(D_OTHER, "%s: %d in flight, %d in progress\n",
934                d->opd_obd->obd_name, d->opd_syn_rpc_in_flight,
935                d->opd_syn_rpc_in_progress);
936
937         spin_unlock(&d->opd_syn_lock);
938
939         /* Delete the invalid record */
940         if (rc == 1) {
941                 rc = llog_cat_cancel_records(env, cathandle, 1, &cookie);
942                 if (rc != 0)
943                         CERROR("%s: can't delete invalid record: "
944                                "fid = "DFID", rec_id = %u, rc = %d\n",
945                                d->opd_obd->obd_name,
946                                PFID(lu_object_fid(&cathandle->lgh_obj->do_lu)),
947                                rec->lrh_id, rc);
948         }
949
950         CDEBUG(D_HA, "found record %x, %d, idx %u, id %u\n",
951                rec->lrh_type, rec->lrh_len, rec->lrh_index, rec->lrh_id);
952
953         RETURN_EXIT;
954 }
955
956 /**
957  * Cancel llog records for the committed changes.
958  *
959  * The function walks through the list of the committed RPCs and cancels
960  * corresponding llog records. see osp_sync_request_commit_cb() for the
961  * details.
962  *
963  * \param[in] env       LU environment provided by the caller
964  * \param[in] d         OSP device
965  */
966 static void osp_sync_process_committed(const struct lu_env *env,
967                                        struct osp_device *d)
968 {
969         struct obd_device       *obd = d->opd_obd;
970         struct obd_import       *imp = obd->u.cli.cl_import;
971         struct ost_body         *body;
972         struct ptlrpc_request   *req;
973         struct llog_ctxt        *ctxt;
974         struct llog_handle      *llh;
975         struct list_head         list;
976         int                      rc, done = 0;
977
978         ENTRY;
979
980         if (list_empty(&d->opd_syn_committed_there))
981                 return;
982
983         /*
984          * if current status is -ENOSPC (lack of free space on OST)
985          * then we should poll OST immediately once object destroy
986          * is committed.
987          * notice: we do this upon commit as well because some backends
988          * (like DMU) do not release space right away.
989          */
990         if (d->opd_pre != NULL && unlikely(d->opd_pre_status == -ENOSPC))
991                 osp_statfs_need_now(d);
992
993         /*
994          * now cancel them all
995          * XXX: can we improve this using some batching?
996          *      with batch RPC that'll happen automatically?
997          * XXX: can we store ctxt in lod_device and save few cycles ?
998          */
999         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
1000         LASSERT(ctxt);
1001
1002         llh = ctxt->loc_handle;
1003         LASSERT(llh);
1004
1005         INIT_LIST_HEAD(&list);
1006         spin_lock(&d->opd_syn_lock);
1007         list_splice(&d->opd_syn_committed_there, &list);
1008         INIT_LIST_HEAD(&d->opd_syn_committed_there);
1009         spin_unlock(&d->opd_syn_lock);
1010
1011         while (!list_empty(&list)) {
1012                 struct osp_job_req_args *jra;
1013
1014                 jra = list_entry(list.next, struct osp_job_req_args,
1015                                  jra_committed_link);
1016                 LASSERT(jra->jra_magic == OSP_JOB_MAGIC);
1017                 list_del_init(&jra->jra_committed_link);
1018
1019                 req = container_of((void *)jra, struct ptlrpc_request,
1020                                    rq_async_args);
1021                 body = req_capsule_client_get(&req->rq_pill,
1022                                               &RMF_OST_BODY);
1023                 LASSERT(body);
1024                 /* import can be closing, thus all commit cb's are
1025                  * called we can check committness directly */
1026                 if (req->rq_import_generation == imp->imp_generation) {
1027                         rc = llog_cat_cancel_records(env, llh, 1,
1028                                                      &body->oa.o_lcookie);
1029                         if (rc)
1030                                 CERROR("%s: can't cancel record: %d\n",
1031                                        obd->obd_name, rc);
1032                 } else {
1033                         DEBUG_REQ(D_OTHER, req, "imp_committed = "LPU64,
1034                                   imp->imp_peer_committed_transno);
1035                 }
1036                 ptlrpc_req_finished(req);
1037                 done++;
1038         }
1039
1040         llog_ctxt_put(ctxt);
1041
1042         LASSERT(d->opd_syn_rpc_in_progress >= done);
1043         spin_lock(&d->opd_syn_lock);
1044         d->opd_syn_rpc_in_progress -= done;
1045         spin_unlock(&d->opd_syn_lock);
1046         CDEBUG(D_OTHER, "%s: %d in flight, %d in progress\n",
1047                d->opd_obd->obd_name, d->opd_syn_rpc_in_flight,
1048                d->opd_syn_rpc_in_progress);
1049
1050         osp_sync_check_for_work(d);
1051
1052         /* wake up the thread if requested to stop:
1053          * it might be waiting for in-progress to complete */
1054         if (unlikely(osp_sync_running(d) == 0))
1055                 wake_up(&d->opd_syn_waitq);
1056
1057         EXIT;
1058 }
1059
1060 /**
1061  * The core of the syncing mechanism.
1062  *
1063  * This is a callback called by the llog processing function. Essentially it
1064  * suspends llog processing until there is a record to process (it's supposed
1065  * to be committed locally). The function handles RPCs committed by the target
1066  * and cancels corresponding llog records.
1067  *
1068  * \param[in] env       LU environment provided by the caller
1069  * \param[in] llh       llog handle we're processing
1070  * \param[in] rec       current llog record
1071  * \param[in] data      callback data containing a pointer to the device
1072  *
1073  * \retval 0                    to ask the caller (llog_process()) to continue
1074  * \retval LLOG_PROC_BREAK      to ask the caller to break
1075  */
1076 static int osp_sync_process_queues(const struct lu_env *env,
1077                                    struct llog_handle *llh,
1078                                    struct llog_rec_hdr *rec,
1079                                    void *data)
1080 {
1081         struct osp_device       *d = data;
1082
1083         do {
1084                 struct l_wait_info lwi = { 0 };
1085
1086                 if (!osp_sync_running(d)) {
1087                         CDEBUG(D_HA, "stop llog processing\n");
1088                         return LLOG_PROC_BREAK;
1089                 }
1090
1091                 /* process requests committed by OST */
1092                 osp_sync_process_committed(env, d);
1093
1094                 /* if we there are changes to be processed and we have
1095                  * resources for this ... do now */
1096                 if (osp_sync_can_process_new(d, rec)) {
1097                         if (llh == NULL) {
1098                                 /* ask llog for another record */
1099                                 CDEBUG(D_HA, "%lu changes, %u in progress,"
1100                                        " %u in flight\n",
1101                                        d->opd_syn_changes,
1102                                        d->opd_syn_rpc_in_progress,
1103                                        d->opd_syn_rpc_in_flight);
1104                                 return 0;
1105                         }
1106                         osp_sync_process_record(env, d, llh, rec);
1107                         llh = NULL;
1108                         rec = NULL;
1109                 }
1110
1111                 if (d->opd_syn_last_processed_id == d->opd_syn_last_used_id)
1112                         osp_sync_remove_from_tracker(d);
1113
1114                 l_wait_event(d->opd_syn_waitq,
1115                              !osp_sync_running(d) ||
1116                              osp_sync_can_process_new(d, rec) ||
1117                              !list_empty(&d->opd_syn_committed_there),
1118                              &lwi);
1119         } while (1);
1120 }
1121
1122 /**
1123  * OSP sync thread.
1124  *
1125  * This thread runs llog_cat_process() scanner calling our callback
1126  * to process llog records. in the callback we implement tricky
1127  * state machine as we don't want to start scanning of the llog again
1128  * and again, also we don't want to process too many records and send
1129  * too many RPCs a time. so, depending on current load (num of changes
1130  * being synced to OST) the callback can suspend awaiting for some
1131  * new conditions, like syncs completed.
1132  *
1133  * In order to process llog records left by previous boots and to allow
1134  * llog_process_thread() to find something (otherwise it'd just exit
1135  * immediately) we add a special GENERATATION record on each boot.
1136  *
1137  * \param[in] _arg      a pointer to thread's arguments
1138  *
1139  * \retval 0            on success
1140  * \retval negative     negated errno on error
1141  */
1142 static int osp_sync_thread(void *_arg)
1143 {
1144         struct osp_device       *d = _arg;
1145         struct ptlrpc_thread    *thread = &d->opd_syn_thread;
1146         struct l_wait_info       lwi = { 0 };
1147         struct llog_ctxt        *ctxt;
1148         struct obd_device       *obd = d->opd_obd;
1149         struct llog_handle      *llh;
1150         struct lu_env            env;
1151         int                      rc, count;
1152
1153         ENTRY;
1154
1155         rc = lu_env_init(&env, LCT_LOCAL);
1156         if (rc) {
1157                 CERROR("%s: can't initialize env: rc = %d\n",
1158                        obd->obd_name, rc);
1159                 RETURN(rc);
1160         }
1161
1162         spin_lock(&d->opd_syn_lock);
1163         thread->t_flags = SVC_RUNNING;
1164         spin_unlock(&d->opd_syn_lock);
1165         wake_up(&thread->t_ctl_waitq);
1166
1167         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
1168         if (ctxt == NULL) {
1169                 CERROR("can't get appropriate context\n");
1170                 GOTO(out, rc = -EINVAL);
1171         }
1172
1173         llh = ctxt->loc_handle;
1174         if (llh == NULL) {
1175                 CERROR("can't get llh\n");
1176                 llog_ctxt_put(ctxt);
1177                 GOTO(out, rc = -EINVAL);
1178         }
1179
1180         rc = llog_cat_process(&env, llh, osp_sync_process_queues, d, 0, 0);
1181         LASSERTF(rc == 0 || rc == LLOG_PROC_BREAK,
1182                  "%lu changes, %u in progress, %u in flight: %d\n",
1183                  d->opd_syn_changes, d->opd_syn_rpc_in_progress,
1184                  d->opd_syn_rpc_in_flight, rc);
1185
1186         /* we don't expect llog_process_thread() to exit till umount */
1187         LASSERTF(thread->t_flags != SVC_RUNNING,
1188                  "%lu changes, %u in progress, %u in flight\n",
1189                  d->opd_syn_changes, d->opd_syn_rpc_in_progress,
1190                  d->opd_syn_rpc_in_flight);
1191
1192         /* wait till all the requests are completed */
1193         count = 0;
1194         while (d->opd_syn_rpc_in_progress > 0) {
1195                 osp_sync_process_committed(&env, d);
1196
1197                 lwi = LWI_TIMEOUT(cfs_time_seconds(5), NULL, NULL);
1198                 rc = l_wait_event(d->opd_syn_waitq,
1199                                   d->opd_syn_rpc_in_progress == 0,
1200                                   &lwi);
1201                 if (rc == -ETIMEDOUT)
1202                         count++;
1203                 LASSERTF(count < 10, "%s: %d %d %sempty\n",
1204                          d->opd_obd->obd_name, d->opd_syn_rpc_in_progress,
1205                          d->opd_syn_rpc_in_flight,
1206                          list_empty(&d->opd_syn_committed_there) ? "" : "!");
1207
1208         }
1209
1210         llog_cat_close(&env, llh);
1211         rc = llog_cleanup(&env, ctxt);
1212         if (rc)
1213                 CERROR("can't cleanup llog: %d\n", rc);
1214 out:
1215         LASSERTF(d->opd_syn_rpc_in_progress == 0,
1216                  "%s: %d %d %sempty\n",
1217                  d->opd_obd->obd_name, d->opd_syn_rpc_in_progress,
1218                  d->opd_syn_rpc_in_flight,
1219                  list_empty(&d->opd_syn_committed_there) ? "" : "!");
1220
1221         thread->t_flags = SVC_STOPPED;
1222
1223         wake_up(&thread->t_ctl_waitq);
1224
1225         lu_env_fini(&env);
1226
1227         RETURN(0);
1228 }
1229
1230 /**
1231  * Initialize llog.
1232  *
1233  * Initializes the llog. Specific llog to be used depends on the type of the
1234  * target OSP represents (OST or MDT). The function adds appends a new llog
1235  * record to mark the place where the records associated with this boot
1236  * start.
1237  *
1238  * \param[in] env       LU environment provided by the caller
1239  * \param[in] d         OSP device
1240  *
1241  * \retval 0            on success
1242  * \retval negative     negated errno on error
1243  */
1244 static int osp_sync_llog_init(const struct lu_env *env, struct osp_device *d)
1245 {
1246         struct osp_thread_info  *osi = osp_env_info(env);
1247         struct lu_fid           *fid = &osi->osi_fid;
1248         struct llog_handle      *lgh = NULL;
1249         struct obd_device       *obd = d->opd_obd;
1250         struct llog_ctxt        *ctxt;
1251         int                     rc;
1252
1253         ENTRY;
1254
1255         LASSERT(obd);
1256
1257         /*
1258          * open llog corresponding to our OST
1259          */
1260         OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt);
1261         obd->obd_lvfs_ctxt.dt = d->opd_storage;
1262
1263         lu_local_obj_fid(fid, LLOG_CATALOGS_OID);
1264
1265         rc = llog_osd_get_cat_list(env, d->opd_storage, d->opd_index, 1,
1266                                    &osi->osi_cid, fid);
1267         if (rc < 0) {
1268                 if (rc != -EFAULT) {
1269                         CERROR("%s: can't get id from catalogs: rc = %d\n",
1270                                obd->obd_name, rc);
1271                         RETURN(rc);
1272                 }
1273
1274                 /* After sparse OST indices is supported, the CATALOG file
1275                  * may become a sparse file that results in failure on
1276                  * reading. Skip this error as the llog will be created
1277                  * later */
1278                 memset(&osi->osi_cid, 0, sizeof(osi->osi_cid));
1279                 rc = 0;
1280         }
1281
1282         CDEBUG(D_INFO, "%s: Init llog for %d - catid "DOSTID":%x\n",
1283                obd->obd_name, d->opd_index,
1284                POSTID(&osi->osi_cid.lci_logid.lgl_oi),
1285                osi->osi_cid.lci_logid.lgl_ogen);
1286
1287         rc = llog_setup(env, obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT,
1288                         d->opd_storage->dd_lu_dev.ld_obd,
1289                         &osp_mds_ost_orig_logops);
1290         if (rc)
1291                 RETURN(rc);
1292
1293         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
1294         LASSERT(ctxt);
1295
1296         if (likely(logid_id(&osi->osi_cid.lci_logid) != 0)) {
1297                 rc = llog_open(env, ctxt, &lgh, &osi->osi_cid.lci_logid, NULL,
1298                                LLOG_OPEN_EXISTS);
1299                 /* re-create llog if it is missing */
1300                 if (rc == -ENOENT)
1301                         logid_set_id(&osi->osi_cid.lci_logid, 0);
1302                 else if (rc < 0)
1303                         GOTO(out_cleanup, rc);
1304         }
1305
1306         if (unlikely(logid_id(&osi->osi_cid.lci_logid) == 0)) {
1307                 rc = llog_open_create(env, ctxt, &lgh, NULL, NULL);
1308                 if (rc < 0)
1309                         GOTO(out_cleanup, rc);
1310                 osi->osi_cid.lci_logid = lgh->lgh_id;
1311         }
1312
1313         LASSERT(lgh != NULL);
1314         ctxt->loc_handle = lgh;
1315
1316         rc = llog_init_handle(env, lgh, LLOG_F_IS_CAT, NULL);
1317         if (rc)
1318                 GOTO(out_close, rc);
1319
1320         rc = llog_osd_put_cat_list(env, d->opd_storage, d->opd_index, 1,
1321                                    &osi->osi_cid, fid);
1322         if (rc)
1323                 GOTO(out_close, rc);
1324
1325         /*
1326          * put a mark in the llog till which we'll be processing
1327          * old records restless
1328          */
1329         d->opd_syn_generation.mnt_cnt = cfs_time_current();
1330         d->opd_syn_generation.conn_cnt = cfs_time_current();
1331
1332         osi->osi_hdr.lrh_type = LLOG_GEN_REC;
1333         osi->osi_hdr.lrh_len = sizeof(osi->osi_gen);
1334
1335         memcpy(&osi->osi_gen.lgr_gen, &d->opd_syn_generation,
1336                sizeof(osi->osi_gen.lgr_gen));
1337
1338         rc = llog_cat_add(env, lgh, &osi->osi_gen.lgr_hdr, &osi->osi_cookie);
1339         if (rc < 0)
1340                 GOTO(out_close, rc);
1341         llog_ctxt_put(ctxt);
1342         RETURN(0);
1343 out_close:
1344         llog_cat_close(env, lgh);
1345 out_cleanup:
1346         llog_cleanup(env, ctxt);
1347         RETURN(rc);
1348 }
1349
1350 /**
1351  * Cleanup llog used for syncing.
1352  *
1353  * Closes and cleanups the llog. The function is called when the device is
1354  * shutting down.
1355  *
1356  * \param[in] env       LU environment provided by the caller
1357  * \param[in] d         OSP device
1358  */
1359 static void osp_sync_llog_fini(const struct lu_env *env, struct osp_device *d)
1360 {
1361         struct llog_ctxt *ctxt;
1362
1363         ctxt = llog_get_context(d->opd_obd, LLOG_MDS_OST_ORIG_CTXT);
1364         if (ctxt != NULL)
1365                 llog_cat_close(env, ctxt->loc_handle);
1366         llog_cleanup(env, ctxt);
1367 }
1368
1369 /**
1370  * Initialization of the sync component of OSP.
1371  *
1372  * Initializes the llog and starts a new thread to handle the changes to
1373  * the remote target (OST or MDT).
1374  *
1375  * \param[in] env       LU environment provided by the caller
1376  * \param[in] d         OSP device
1377  *
1378  * \retval 0            on success
1379  * \retval negative     negated errno on error
1380  */
1381 int osp_sync_init(const struct lu_env *env, struct osp_device *d)
1382 {
1383         struct l_wait_info       lwi = { 0 };
1384         struct task_struct      *task;
1385         int                      rc;
1386
1387         ENTRY;
1388
1389         rc = osp_sync_id_traction_init(d);
1390         if (rc)
1391                 RETURN(rc);
1392
1393         /*
1394          * initialize llog storing changes
1395          */
1396         rc = osp_sync_llog_init(env, d);
1397         if (rc) {
1398                 CERROR("%s: can't initialize llog: rc = %d\n",
1399                        d->opd_obd->obd_name, rc);
1400                 GOTO(err_id, rc);
1401         }
1402
1403         /*
1404          * Start synchronization thread
1405          */
1406         d->opd_syn_max_rpc_in_flight = OSP_MAX_IN_FLIGHT;
1407         d->opd_syn_max_rpc_in_progress = OSP_MAX_IN_PROGRESS;
1408         spin_lock_init(&d->opd_syn_lock);
1409         init_waitqueue_head(&d->opd_syn_waitq);
1410         init_waitqueue_head(&d->opd_syn_barrier_waitq);
1411         init_waitqueue_head(&d->opd_syn_thread.t_ctl_waitq);
1412         INIT_LIST_HEAD(&d->opd_syn_inflight_list);
1413         INIT_LIST_HEAD(&d->opd_syn_committed_there);
1414
1415         task = kthread_run(osp_sync_thread, d, "osp-syn-%u-%u",
1416                            d->opd_index, d->opd_group);
1417         if (IS_ERR(task)) {
1418                 rc = PTR_ERR(task);
1419                 CERROR("%s: cannot start sync thread: rc = %d\n",
1420                        d->opd_obd->obd_name, rc);
1421                 GOTO(err_llog, rc);
1422         }
1423
1424         l_wait_event(d->opd_syn_thread.t_ctl_waitq,
1425                      osp_sync_running(d) || osp_sync_stopped(d), &lwi);
1426
1427         RETURN(0);
1428 err_llog:
1429         osp_sync_llog_fini(env, d);
1430 err_id:
1431         osp_sync_id_traction_fini(d);
1432         return rc;
1433 }
1434
1435 /**
1436  * Stop the syncing thread.
1437  *
1438  * Asks the syncing thread to stop and wait until it's stopped.
1439  *
1440  * \param[in] d         OSP device
1441  *
1442  * \retval              0
1443  */
1444 int osp_sync_fini(struct osp_device *d)
1445 {
1446         struct ptlrpc_thread *thread = &d->opd_syn_thread;
1447
1448         ENTRY;
1449
1450         thread->t_flags = SVC_STOPPING;
1451         wake_up(&d->opd_syn_waitq);
1452         wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_STOPPED);
1453
1454         /*
1455          * unregister transaction callbacks only when sync thread
1456          * has finished operations with llog
1457          */
1458         osp_sync_id_traction_fini(d);
1459
1460         RETURN(0);
1461 }
1462
1463 static DEFINE_MUTEX(osp_id_tracker_sem);
1464 static struct list_head osp_id_tracker_list =
1465                 LIST_HEAD_INIT(osp_id_tracker_list);
1466
1467 /**
1468  * OSD commit callback.
1469  *
1470  * The function is used as a local OSD commit callback to track the highest
1471  * committed llog record id. see osp_sync_id_traction_init() for the details.
1472  *
1473  * \param[in] th        local transaction handle committed
1474  * \param[in] cookie    commit callback data (our private structure)
1475  */
1476 static void osp_sync_tracker_commit_cb(struct thandle *th, void *cookie)
1477 {
1478         struct osp_id_tracker   *tr = cookie;
1479         struct osp_device       *d;
1480         struct osp_txn_info     *txn;
1481
1482         LASSERT(tr);
1483
1484         txn = osp_txn_info(&th->th_ctx);
1485         if (txn == NULL || txn->oti_current_id < tr->otr_committed_id)
1486                 return;
1487
1488         spin_lock(&tr->otr_lock);
1489         if (likely(txn->oti_current_id > tr->otr_committed_id)) {
1490                 CDEBUG(D_OTHER, "committed: %u -> %u\n",
1491                        tr->otr_committed_id, txn->oti_current_id);
1492                 tr->otr_committed_id = txn->oti_current_id;
1493
1494                 list_for_each_entry(d, &tr->otr_wakeup_list,
1495                                     opd_syn_ontrack) {
1496                         d->opd_syn_last_committed_id = tr->otr_committed_id;
1497                         wake_up(&d->opd_syn_waitq);
1498                 }
1499         }
1500         spin_unlock(&tr->otr_lock);
1501 }
1502
1503 /**
1504  * Initialize commit tracking mechanism.
1505  *
1506  * Some setups may have thousands of OSTs and each will be represented by OSP.
1507  * Meaning order of magnitute many more changes to apply every second. In order
1508  * to keep the number of commit callbacks low this mechanism was introduced.
1509  * The mechanism is very similar to transno used by MDT service: it's an single
1510  * ID stream which can be assigned by any OSP to its llog records. The tricky
1511  * part is that ID is stored in per-transaction data and re-used by all the OSPs
1512  * involved in that transaction. Then all these OSPs are woken up utilizing a single OSD commit callback.
1513  *
1514  * The function initializes the data used by the tracker described above.
1515  * A singler tracker per OSD device is created.
1516  *
1517  * \param[in] d         OSP device
1518  *
1519  * \retval 0            on success
1520  * \retval negative     negated errno on error
1521  */
1522 static int osp_sync_id_traction_init(struct osp_device *d)
1523 {
1524         struct osp_id_tracker   *tr, *found = NULL;
1525         int                      rc = 0;
1526
1527         LASSERT(d);
1528         LASSERT(d->opd_storage);
1529         LASSERT(d->opd_syn_tracker == NULL);
1530         INIT_LIST_HEAD(&d->opd_syn_ontrack);
1531
1532         mutex_lock(&osp_id_tracker_sem);
1533         list_for_each_entry(tr, &osp_id_tracker_list, otr_list) {
1534                 if (tr->otr_dev == d->opd_storage) {
1535                         LASSERT(atomic_read(&tr->otr_refcount));
1536                         atomic_inc(&tr->otr_refcount);
1537                         d->opd_syn_tracker = tr;
1538                         found = tr;
1539                         break;
1540                 }
1541         }
1542
1543         if (found == NULL) {
1544                 rc = -ENOMEM;
1545                 OBD_ALLOC_PTR(tr);
1546                 if (tr) {
1547                         d->opd_syn_tracker = tr;
1548                         spin_lock_init(&tr->otr_lock);
1549                         tr->otr_dev = d->opd_storage;
1550                         tr->otr_next_id = 1;
1551                         tr->otr_committed_id = 0;
1552                         atomic_set(&tr->otr_refcount, 1);
1553                         INIT_LIST_HEAD(&tr->otr_wakeup_list);
1554                         list_add(&tr->otr_list, &osp_id_tracker_list);
1555                         tr->otr_tx_cb.dtc_txn_commit =
1556                                                 osp_sync_tracker_commit_cb;
1557                         tr->otr_tx_cb.dtc_cookie = tr;
1558                         tr->otr_tx_cb.dtc_tag = LCT_MD_THREAD;
1559                         dt_txn_callback_add(d->opd_storage, &tr->otr_tx_cb);
1560                         rc = 0;
1561                 }
1562         }
1563         mutex_unlock(&osp_id_tracker_sem);
1564
1565         return rc;
1566 }
1567
1568 /**
1569  * Release commit tracker.
1570  *
1571  * Decrease a refcounter on the tracker used by the given OSP device \a d.
1572  * If no more users left, then the tracker is released.
1573  *
1574  * \param[in] d         OSP device
1575  */
1576 static void osp_sync_id_traction_fini(struct osp_device *d)
1577 {
1578         struct osp_id_tracker *tr;
1579
1580         ENTRY;
1581
1582         LASSERT(d);
1583         tr = d->opd_syn_tracker;
1584         if (tr == NULL) {
1585                 EXIT;
1586                 return;
1587         }
1588
1589         osp_sync_remove_from_tracker(d);
1590
1591         mutex_lock(&osp_id_tracker_sem);
1592         if (atomic_dec_and_test(&tr->otr_refcount)) {
1593                 dt_txn_callback_del(d->opd_storage, &tr->otr_tx_cb);
1594                 LASSERT(list_empty(&tr->otr_wakeup_list));
1595                 list_del(&tr->otr_list);
1596                 OBD_FREE_PTR(tr);
1597                 d->opd_syn_tracker = NULL;
1598         }
1599         mutex_unlock(&osp_id_tracker_sem);
1600
1601         EXIT;
1602 }
1603
1604 /**
1605  * Generate a new ID on a tracker.
1606  *
1607  * Generates a new ID using the tracker associated with the given OSP device
1608  * \a d, if the given ID \a id is non-zero. Unconditially adds OSP device to
1609  * the wakeup list, so OSP won't miss when a transaction using the ID is
1610  * committed. Notice ID is 32bit, but llog doesn't support >2^32 records anyway.
1611  *
1612  * \param[in] d         OSP device
1613  * \param[in] id        0 or ID generated previously
1614  *
1615  * \retval              ID the caller should use
1616  */
1617 static __u32 osp_sync_id_get(struct osp_device *d, __u32 id)
1618 {
1619         struct osp_id_tracker *tr;
1620
1621         tr = d->opd_syn_tracker;
1622         LASSERT(tr);
1623
1624         /* XXX: we can improve this introducing per-cpu preallocated ids? */
1625         spin_lock(&tr->otr_lock);
1626         if (unlikely(tr->otr_next_id <= d->opd_syn_last_used_id)) {
1627                 spin_unlock(&tr->otr_lock);
1628                 CERROR("%s: next %u, last synced %lu\n",
1629                        d->opd_obd->obd_name, tr->otr_next_id,
1630                        d->opd_syn_last_used_id);
1631                 LBUG();
1632         }
1633
1634         if (id == 0)
1635                 id = tr->otr_next_id++;
1636         if (id > d->opd_syn_last_used_id)
1637                 d->opd_syn_last_used_id = id;
1638         if (list_empty(&d->opd_syn_ontrack))
1639                 list_add(&d->opd_syn_ontrack, &tr->otr_wakeup_list);
1640         spin_unlock(&tr->otr_lock);
1641         CDEBUG(D_OTHER, "new id %u\n", (unsigned) id);
1642
1643         return id;
1644 }
1645
1646 /**
1647  * Stop to propagate commit status to OSP.
1648  *
1649  * If the OSP does not have any llog records she's waiting to commit, then
1650  * it is possible to unsubscribe from wakeups from the tracking using this
1651  * method.
1652  *
1653  * \param[in] d         OSP device not willing to wakeup
1654  */
1655 static void osp_sync_remove_from_tracker(struct osp_device *d)
1656 {
1657         struct osp_id_tracker *tr;
1658
1659         tr = d->opd_syn_tracker;
1660         LASSERT(tr);
1661
1662         if (list_empty(&d->opd_syn_ontrack))
1663                 return;
1664
1665         spin_lock(&tr->otr_lock);
1666         list_del_init(&d->opd_syn_ontrack);
1667         spin_unlock(&tr->otr_lock);
1668 }
1669