From 699345161c26cbd0a53ebb73c272aa964647de75 Mon Sep 17 00:00:00 2001 From: Ondrej Kozina Date: Fri, 24 Jul 2015 12:47:02 +0200 Subject: [PATCH 2/2] - get files by pipe --- Makefile.am | 2 +- client/Makefile.am | 4 +- client/commands.cc | 28 ++++++-- client/types.cc | 14 ++++ client/types.h | 24 +++++++ configure.ac | 1 + pipe/AsyncWriteStream.h | 162 ++++++++++++++++++++++++++++++++++++++++++++ pipe/DataStream.h | 68 +++++++++++++++++++ pipe/Makefile.am | 10 +++ pipe/Pipe.cc | 59 ++++++++++++++++ pipe/Pipe.h | 72 ++++++++++++++++++++ pipe/SyncReadStream.h | 107 +++++++++++++++++++++++++++++ server/Client.cc | 155 ++++++++++++++++++++++++++++++++++++++---- server/Client.h | 19 ++++-- server/FileSerialization.h | 48 +++++++++++++ server/FilesTransferTask.cc | 66 ++++++++++++++++++ server/FilesTransferTask.h | 60 ++++++++++++++++ server/Makefile.am | 6 +- server/RefComparison.h | 35 ++++++++++ server/Types.cc | 10 +++ server/Types.h | 4 ++ server/snapperd.cc | 4 ++ 22 files changed, 933 insertions(+), 25 deletions(-) create mode 100644 pipe/AsyncWriteStream.h create mode 100644 pipe/DataStream.h create mode 100644 pipe/Makefile.am create mode 100644 pipe/Pipe.cc create mode 100644 pipe/Pipe.h create mode 100644 pipe/SyncReadStream.h create mode 100644 server/FileSerialization.h create mode 100644 server/FilesTransferTask.cc create mode 100644 server/FilesTransferTask.h create mode 100644 server/RefComparison.h diff --git a/Makefile.am b/Makefile.am index 00a799a..83e482a 100644 --- a/Makefile.am +++ b/Makefile.am @@ -2,7 +2,7 @@ # Makefile.am for snapper # -SUBDIRS = snapper examples dbus server client scripts pam data doc po \ +SUBDIRS = snapper examples dbus pipe server client scripts pam data doc po \ testsuite testsuite-real testsuite-cmp AUTOMAKE_OPTIONS = foreign dist-bzip2 no-dist-gzip diff --git a/client/Makefile.am b/client/Makefile.am index aecddbd..f2e2590 100644 --- a/client/Makefile.am +++ b/client/Makefile.am @@ -16,7 +16,7 @@ snapper_SOURCES = \ misc.cc misc.h \ errors.cc errors.h -snapper_LDADD = ../snapper/libsnapper.la utils/libutils.la ../dbus/libdbus.la +snapper_LDADD = ../snapper/libsnapper.la utils/libutils.la ../dbus/libdbus.la ../pipe/libpipe.la -lboost_serialization libexecdir = /usr/lib/snapper @@ -36,5 +36,5 @@ systemd_helper_SOURCES = \ misc.cc misc.h \ errors.cc errors.h -systemd_helper_LDADD = ../snapper/libsnapper.la utils/libutils.la ../dbus/libdbus.la +systemd_helper_LDADD = ../snapper/libsnapper.la utils/libutils.la ../dbus/libdbus.la ../pipe/libpipe.la -lboost_serialization diff --git a/client/commands.cc b/client/commands.cc index 908e44f..2680e42 100644 --- a/client/commands.cc +++ b/client/commands.cc @@ -19,12 +19,15 @@ * find current contact information at www.novell.com. */ +#include +#include #include #include "commands.h" #include "utils/text.h" #include "snapper/AppUtil.h" +#include "pipe/SyncReadStream.h" using namespace std; @@ -382,17 +385,34 @@ list command_get_xfiles(DBus::Connection& conn, const string& config_name, unsigned int number1, unsigned int number2) { - DBus::MessageMethodCall call(SERVICE, OBJECT, INTERFACE, "GetFiles"); + DBus::MessageMethodCall call(SERVICE, OBJECT, INTERFACE, "GetFilesByPipe"); DBus::Hoho hoho(call); - hoho << config_name << number1 << number2; + hoho << config_name << "/" << number1 << number2; DBus::Message reply = conn.send_with_reply_and_block(call); + pipe_stream::FileDescriptor fd; + DBus::Hihi hihi(reply); + hihi >> fd; + + pipe_stream::SyncReadStream> rs(fd); + list files; - DBus::Hihi hihi(reply); - hihi >> files; + struct Ffunct { + Ffunct(list& fs) : files(fs) {} + void operator()(XFile* f) { files.push_back(*f); delete f; } + list& files; + }; + + Ffunct ff(files); + + while (rs.incoming()) + { + vector tmp = rs.receive(); + std::for_each(tmp.begin(), tmp.end(), ff); + } files.sort(); // snapperd can have different locale than client // so sorting is required here diff --git a/client/types.cc b/client/types.cc index bbd75f1..c0d10cd 100644 --- a/client/types.cc +++ b/client/types.cc @@ -125,4 +125,18 @@ namespace DBus return hoho; } + + Hihi& + operator>>(Hihi& hihi, FileDescriptor& data) + { + if (hihi.get_type() != DBUS_TYPE_UNIX_FD) + throw MarshallingException(); + + int fd; + dbus_message_iter_get_basic(hihi.top(), &fd); + dbus_message_iter_next(hihi.top()); + data.set_fd(fd); + + return hihi; + } } diff --git a/client/types.h b/client/types.h index d06325e..93a4ed5 100644 --- a/client/types.h +++ b/client/types.h @@ -24,6 +24,8 @@ #include #include +#include + using std::string; using std::list; using std::map; @@ -32,8 +34,10 @@ using std::map; #include "dbus/DBusConnection.h" #include "snapper/Snapshot.h" #include "snapper/File.h" +#include "pipe/Pipe.h" using namespace snapper; +using namespace pipe_stream; struct XConfigInfo @@ -114,4 +118,24 @@ namespace DBus Hihi& operator>>(Hihi& hihi, XFile& data); + Hihi& operator>>(Hihi& hihi, FileDescriptor& data); + +} + +namespace boost +{ + namespace serialization + { + template + void load(Archive& ar, XFile& f, const unsigned int v) + { + ar >> f.name >> f.status; + } + + template + void serialize(Archive& ar, XFile& f, const unsigned int v) + { + boost::serialization::split_free(ar, f, v); + } + } } diff --git a/configure.ac b/configure.ac index c8b4470..6652a90 100644 --- a/configure.ac +++ b/configure.ac @@ -154,6 +154,7 @@ AC_OUTPUT( examples/c/Makefile examples/c++-lib/Makefile dbus/Makefile + pipe/Makefile server/Makefile client/Makefile client/utils/Makefile diff --git a/pipe/AsyncWriteStream.h b/pipe/AsyncWriteStream.h new file mode 100644 index 0000000..96d009b --- /dev/null +++ b/pipe/AsyncWriteStream.h @@ -0,0 +1,162 @@ +/* + * Copyright (c) [2015] Red Hat, Inc. + * + * All Rights Reserved. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of version 2 of the GNU General Public License as published + * by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + */ + +#ifndef SNAPPER_ASYNC_WRITE_STREAM_H +#define SNAPPER_ASYNC_WRITE_STREAM_H + +#include +#include + +#include +#include + +#include + +#include "pipe/Pipe.h" +#include "pipe/DataStream.h" + +namespace pipe_stream +{ + using boost::asio::posix::stream_descriptor; + + typedef boost::function callback; + + template + class AsyncWriteStream : public BaseStream + { + public: + + AsyncWriteStream(const FileDescriptor& fd, callback cb); + + void async_write(); + void append(const T& data); + void run(); + + private: + + void handle_write(const boost::system::error_code& ec); + void terminate(); + + callback _cb; + stream_descriptor _pipe; + + }; + + + template + AsyncWriteStream::AsyncWriteStream(const FileDescriptor& fd, callback cb) + : BaseStream(), _cb(cb), _pipe(this->_io_service, fd.get_fd()) + { + assert(cb != NULL); + } + + + template + void AsyncWriteStream::append(const T& data) + { + std::ostream oss(&this->_data_buf); + + try + { + boost::archive::text_oarchive oar(oss); + oar << data; + } + catch (const boost::archive::archive_exception& e) + { + throw StreamSerializationException(); + } + } + + + template + void AsyncWriteStream::terminate() + { + const size_t terminator = 0; + + try + { + boost::asio::write(this->_pipe, boost::asio::buffer(&terminator, sizeof(terminator))); + } + catch (const boost::system::system_error& e) + { + throw StreamException(e.what()); + } + } + + + template + void AsyncWriteStream::async_write() + { + size_t header = this->_data_buf.size(); + + if (header > 0) + { + std::vector buffers; + + buffers.push_back(boost::asio::buffer(&header, sizeof(header))); + buffers.push_back(this->_data_buf.data()); + + boost::asio::async_write(this->_pipe, + buffers, + boost::bind( + &AsyncWriteStream::handle_write, + this, + boost::asio::placeholders::error + )); + } + else + { + terminate(); + } + } + + + template + void AsyncWriteStream::handle_write(const boost::system::error_code& ec) + { + if (!ec) + { + this->_data_buf.consume(this->_data_buf.size()); + _cb(); // callback for next batch to send + } + else + { + throw StreamException(boost::system::system_error(ec).what()); + } + } + + + template + void AsyncWriteStream::run() + { + try + { + this->_io_service.run(); + } + catch (const boost::system::system_error& e) + { + throw StreamException(e.what()); + } + } + +} + +#endif + diff --git a/pipe/DataStream.h b/pipe/DataStream.h new file mode 100644 index 0000000..8d2d4ec --- /dev/null +++ b/pipe/DataStream.h @@ -0,0 +1,68 @@ +/* + * Copyright (c) [2015] Red Hat, Inc. + * + * All Rights Reserved. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of version 2 of the GNU General Public License as published + * by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + */ + +#ifndef SNAPPER_DATASTREAM_H +#define SNAPPER_DATASTREAM_H + +#include +#include + +#include + +namespace pipe_stream +{ + using std::string; + using boost::asio::io_service; + + struct StreamException : public std::exception + { + explicit StreamException() throw() : msg("Stream exception") {} + explicit StreamException(const char* msg) throw() : msg(msg) {} + virtual const char* what() const throw() { return msg.c_str(); } + const string msg; + }; + + struct StreamSerializationException : public StreamException + { + explicit StreamSerializationException() throw() {} + virtual const char* what() const throw() { return "Serialization exception"; } + }; + + + template + class BaseStream + { + public: + + virtual ~BaseStream() {} + + protected: + + BaseStream(): _io_service(), _data_buf() {} + + io_service _io_service; + boost::asio::streambuf _data_buf; + + }; + +} + + +#endif diff --git a/pipe/Makefile.am b/pipe/Makefile.am new file mode 100644 index 0000000..e8c1869 --- /dev/null +++ b/pipe/Makefile.am @@ -0,0 +1,10 @@ +# +# Makefile.am for snapper/pipe +# + +AM_CPPFLAGS = -I$(top_srcdir) + +noinst_LTLIBRARIES = libpipe.la + +libpipe_la_SOURCES = \ + Pipe.cc Pipe.h diff --git a/pipe/Pipe.cc b/pipe/Pipe.cc new file mode 100644 index 0000000..0530825 --- /dev/null +++ b/pipe/Pipe.cc @@ -0,0 +1,59 @@ +/* + * Copyright (c) [2015] Red Hat, Inc. + * + * All Rights Reserved. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of version 2 of the GNU General Public License as published + * by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + */ + +#include +#include + +#include "pipe/Pipe.h" + +namespace pipe_stream +{ + void + FileDescriptor::close() + { + if (_fd != -1) + { + ::close(_fd); + _fd = -1; + } + } + + + Pipe::Pipe() + : rs(), ws() + { + int pipefd[2]; + + if (pipe(pipefd)) + { + throw PipeException(); + } + + // FIXME: of a race is found to be real issue, use pipe2 instead + if (fcntl(pipefd[0], F_SETFD, FD_CLOEXEC) < 0 || fcntl(pipefd[1], F_SETFD, FD_CLOEXEC) < 0) + { + throw PipeException(); + } + + rs.set_fd(pipefd[0]); + ws.set_fd(pipefd[1]); + } + +} diff --git a/pipe/Pipe.h b/pipe/Pipe.h new file mode 100644 index 0000000..b523796 --- /dev/null +++ b/pipe/Pipe.h @@ -0,0 +1,72 @@ +/* + * Copyright (c) [2015] Red Hat, Inc. + * + * All Rights Reserved. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of version 2 of the GNU General Public License as published + * by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + */ + +#ifndef SNAPPER_PIPE_H +#define SNAPPER_PIPE_H + +#include + +#include + +namespace pipe_stream +{ + struct PipeException : public std::exception + { + explicit PipeException() throw() {} + virtual const char* what() const throw() { return "Socket pair exception"; } + }; + + + class FileDescriptor : boost::noncopyable + { + public: + + FileDescriptor() : _fd(-1) {} + FileDescriptor(int fd) : _fd(fd) {} + ~FileDescriptor() { close(); } + + int get_fd() const { return _fd; } + void set_fd(int fd) { _fd = fd; } + void close(); + + private: + + int _fd; + + }; + + + class Pipe + { + public: + Pipe(); + + FileDescriptor& read_end() { return rs; } + FileDescriptor& write_end() { return ws; } + + private: + + FileDescriptor rs; + FileDescriptor ws; + + }; +} + +#endif diff --git a/pipe/SyncReadStream.h b/pipe/SyncReadStream.h new file mode 100644 index 0000000..52615f2 --- /dev/null +++ b/pipe/SyncReadStream.h @@ -0,0 +1,107 @@ +/* + * Copyright (c) [2015] Red Hat, Inc. + * + * All Rights Reserved. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of version 2 of the GNU General Public License as published + * by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + */ + +#ifndef SNAPPER_SYNC_READ_STREAM_H +#define SNAPPER_SYNC_READ_STREAM_H + +#include +#include + +#include + +#include "pipe/Pipe.h" +#include "pipe/DataStream.h" + +namespace pipe_stream +{ + using boost::asio::posix::stream_descriptor; + + template + class SyncReadStream : public BaseStream + { + public: + + SyncReadStream(const FileDescriptor& fd); + + bool incoming(); + T receive(); + + private: + + stream_descriptor _pipe; + size_t header; + + }; + + + template + SyncReadStream::SyncReadStream(const FileDescriptor& fd) + : BaseStream(), _pipe(this->_io_service, fd.get_fd()), header(0) + { + } + + + template + bool SyncReadStream::incoming() + { + try + { + boost::asio::read(this->_pipe, boost::asio::buffer(&header, sizeof(header))); + + if (header > 0) + { + boost::asio::read(this->_pipe, this->_data_buf.prepare(header)); + this->_data_buf.commit(header); + } + } + catch (const boost::system::system_error& e) + { + throw StreamException(boost::system::system_error(e).what()); + } + + return header > 0; + } + + + template + T SyncReadStream::receive() + { + T t; + std::istream is(&this->_data_buf); + + try + { + boost::archive::text_iarchive iar(is); + iar >> t; + } + catch (const boost::archive::archive_exception& e) + { + throw StreamSerializationException(); + } + + this->_data_buf.consume(header); + + return t; + } + +} + +#endif + diff --git a/server/Client.cc b/server/Client.cc index 570a4a7..a0eb01c 100644 --- a/server/Client.cc +++ b/server/Client.cc @@ -48,10 +48,19 @@ Client::Client(const string& name) Client::~Client() { thread.interrupt(); + ft_thread.interrupt(); + if (thread.joinable()) thread.join(); - for (list::iterator it = comparisons.begin(); it != comparisons.end(); ++it) + if (ft_thread.joinable()) + ft_thread.join(); + + // must empty FileTransferTask queue first + while (!ft_tasks.empty()) + ft_tasks.pop(); + + for (list::iterator it = comparisons.begin(); it != comparisons.end(); ++it) { delete_comparison(it); } @@ -78,14 +87,14 @@ Client::~Client() } -list::iterator +list::iterator Client::find_comparison(Snapper* snapper, Snapshots::const_iterator snapshot1, Snapshots::const_iterator snapshot2) { - for (list::iterator it = comparisons.begin(); it != comparisons.end(); ++it) + for (list::iterator it = comparisons.begin(); it != comparisons.end(); ++it) { - if ((*it)->getSnapper() == snapper && (*it)->getSnapshot1() == snapshot1 && - (*it)->getSnapshot2() == snapshot2) + if ((*it)->cmp->getSnapper() == snapper && (*it)->cmp->getSnapshot1() == snapshot1 && + (*it)->cmp->getSnapshot2() == snapshot2) return it; } @@ -93,7 +102,7 @@ Client::find_comparison(Snapper* snapper, Snapshots::const_iterator snapshot1, } -list::iterator +list::iterator Client::find_comparison(Snapper* snapper, unsigned int number1, unsigned int number2) { Snapshots& snapshots = snapper->getSnapshots(); @@ -105,9 +114,9 @@ Client::find_comparison(Snapper* snapper, unsigned int number1, unsigned int num void -Client::delete_comparison(list::iterator it) +Client::delete_comparison(list::iterator it) { - const Snapper* s = (*it)->getSnapper(); + const Snapper* s = (*it)->cmp->getSnapper(); for (MetaSnappers::iterator it2 = meta_snappers.begin(); it2 != meta_snappers.end(); ++it2) { @@ -347,6 +356,14 @@ Client::introspect(DBus::Connection& conn, DBus::Message& msg) " \n" " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" " \n" " \n" @@ -1223,7 +1240,7 @@ Client::create_comparison(DBus::Connection& conn, DBus::Message& msg) lock.lock(); - comparisons.push_back(comparison); + comparisons.push_back(new XComparison(comparison)); it->inc_use_count(); @@ -1233,6 +1250,13 @@ Client::create_comparison(DBus::Connection& conn, DBus::Message& msg) } +struct ComparisonInUse : public std::exception +{ + explicit ComparisonInUse() throw() {} + virtual const char* what() const throw() { return "comparison in use"; } +}; + + void Client::delete_comparison(DBus::Connection& conn, DBus::Message& msg) { @@ -1250,7 +1274,10 @@ Client::delete_comparison(DBus::Connection& conn, DBus::Message& msg) check_permission(conn, msg, *it); - list::iterator it2 = find_comparison(it->getSnapper(), num1, num2); + list::iterator it2 = find_comparison(it->getSnapper(), num1, num2); + + if ((*it2)->use_count() > 0) + throw ComparisonInUse(); delete_comparison(it2); comparisons.erase(it2); @@ -1278,9 +1305,9 @@ Client::get_files(DBus::Connection& conn, DBus::Message& msg) check_permission(conn, msg, *it); - list::iterator it2 = find_comparison(it->getSnapper(), num1, num2); + list::iterator it2 = find_comparison(it->getSnapper(), num1, num2); - const Files& files = (*it2)->getFiles(); + const Files& files = (*it2)->cmp->getFiles(); DBus::MessageMethodReturn reply(msg); @@ -1291,6 +1318,52 @@ Client::get_files(DBus::Connection& conn, DBus::Message& msg) } +struct PathException : public std::exception +{ + explicit PathException() throw() {} + virtual const char* what() const throw() { return "Invalid path"; } +}; + + +void +Client::get_files_pipe(DBus::Connection& conn, DBus::Message& msg) +{ + string config_name; + string path; // relative to volume root + dbus_uint32_t num1, num2; + + DBus::Hihi hihi(msg); + hihi >> config_name >> path >> num1 >> num2; + + y2deb("GetFilesByPipe config_name:" << config_name << " subpath: " << path << " num1:" << num1 << " num2:" << num2); + + boost::unique_lock lock(big_mutex); + + MetaSnappers::iterator it = meta_snappers.find(config_name); + + check_permission(conn, msg, *it); + + // We don't support paths yet + if (path != "/") + throw PathException(); + + list::iterator it2 = find_comparison(it->getSnapper(), num1, num2); + + DBus::MessageMethodReturn reply(msg); + DBus::Hoho hoho(reply); + + boost::shared_ptr ftask(new FilesTransferTask(**it2)); + + hoho << ftask->get_read_end(); + conn.send(reply); + + ftask->get_read_end().close(); + + ftask->init(); + add_transfer_task(ftask); +} + + void Client::sync(DBus::Connection& conn, DBus::Message& msg) { @@ -1431,6 +1504,8 @@ Client::dispatch(DBus::Connection& conn, DBus::Message& msg) delete_comparison(conn, msg); else if (msg.is_method_call(INTERFACE, "GetFiles")) get_files(conn, msg); + else if (msg.is_method_call(INTERFACE, "GetFilesByPipe")) + get_files_pipe(conn, msg); else if (msg.is_method_call(INTERFACE, "Sync")) sync(conn, msg); else if (msg.is_method_call(INTERFACE, "Debug")) @@ -1555,6 +1630,16 @@ Client::dispatch(DBus::Connection& conn, DBus::Message& msg) DBus::MessageError reply(msg, "error.invalid_group", DBUS_ERROR_FAILED); conn.send(reply); } + catch (const StreamException& e) + { + DBus::MessageError reply(msg, "error.pipe_stream", DBUS_ERROR_FAILED); + conn.send(reply); + } + catch (const ComparisonInUse& e) + { + DBus::MessageError reply(msg, "error.comparison_in_use", DBUS_ERROR_FAILED); + conn.send(reply); + } catch (...) { y2err("caught unknown exception"); @@ -1579,6 +1664,20 @@ Client::add_task(DBus::Connection& conn, DBus::Message& msg) void +Client::add_transfer_task(boost::shared_ptr f_pt) +{ + if (ft_thread.get_id() == boost::thread::id()) + ft_thread = boost::thread(boost::bind(&Client::files_transfer_worker, this)); + + boost::unique_lock lock(ft_mutex); + ft_tasks.push(f_pt); + lock.unlock(); + + ft_condition.notify_one(); +} + + +void Client::worker() { try @@ -1646,3 +1745,35 @@ Clients::has_zombies() const return false; } + + +void +Client::files_transfer_worker() +{ + try + { + while (true) + { + boost::unique_lock lock(ft_mutex); + while (ft_tasks.empty()) + ft_condition.wait(lock); + + boost::shared_ptr ptr(ft_tasks.front()); + ft_tasks.pop(); + lock.unlock(); + + try + { + ptr->start(); + } + catch (const StreamException& e) + { + y2err("Error occured during files transfer: " << e.what()); + } + } + } + catch (const boost::thread_interrupted&) + { + y2deb("files transfer worker interrupted"); + } +} diff --git a/server/Client.h b/server/Client.h index b24e66f..7240778 100644 --- a/server/Client.h +++ b/server/Client.h @@ -36,6 +36,8 @@ #include #include +#include "RefComparison.h" +#include "FilesTransferTask.h" #include "MetaSnapper.h" @@ -104,6 +106,7 @@ public: void create_comparison(DBus::Connection& conn, DBus::Message& msg); void delete_comparison(DBus::Connection& conn, DBus::Message& msg); void get_files(DBus::Connection& conn, DBus::Message& msg); + void get_files_pipe(DBus::Connection& conn, DBus::Message& msg); void sync(DBus::Connection& conn, DBus::Message& msg); void debug(DBus::Connection& conn, DBus::Message& msg) const; @@ -112,14 +115,14 @@ public: Client(const string& name); ~Client(); - list::iterator find_comparison(Snapper* snapper, unsigned int number1, + list::iterator find_comparison(Snapper* snapper, unsigned int number1, unsigned int number2); - list::iterator find_comparison(Snapper* snapper, + list::iterator find_comparison(Snapper* snapper, Snapshots::const_iterator snapshot1, Snapshots::const_iterator snapshot2); - void delete_comparison(list::iterator); + void delete_comparison(list::iterator); void add_lock(const string& config_name); void remove_lock(const string& config_name); @@ -130,7 +133,7 @@ public: const string name; - list comparisons; + list comparisons; set locks; @@ -147,15 +150,23 @@ public: boost::condition_variable condition; boost::mutex mutex; boost::thread thread; + + boost::condition_variable ft_condition; + boost::mutex ft_mutex; + boost::thread ft_thread; + queue tasks; + queue> ft_tasks; bool zombie; void add_task(DBus::Connection& conn, DBus::Message& msg); + void add_transfer_task(boost::shared_ptr p_ft); private: void worker(); + void files_transfer_worker(); }; diff --git a/server/FileSerialization.h b/server/FileSerialization.h new file mode 100644 index 0000000..eb0a9a0 --- /dev/null +++ b/server/FileSerialization.h @@ -0,0 +1,48 @@ +/* + * Copyright (c) [2015] Red Hat, Inc. + * + * All Rights Reserved. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of version 2 of the GNU General Public License as published + * by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + */ + +#ifndef SNAPPER_FILE_SERIALIZATION_H +#define SNAPPER_FILE_SERIALIZATION_H + +#include + +#include "snapper/File.h" + +namespace boost +{ + namespace serialization + { + template + void save(Archive& ar, const snapper::File& f, const unsigned int version) + { + const unsigned int tmp = f.getPreToPostStatus(); + ar << f.getName() << tmp; + } + + + template + void serialize(Archive& ar, snapper::File& f, const unsigned int version) + { + boost::serialization::split_free(ar, f, version); + } + } +} + +#endif diff --git a/server/FilesTransferTask.cc b/server/FilesTransferTask.cc new file mode 100644 index 0000000..01d5a69 --- /dev/null +++ b/server/FilesTransferTask.cc @@ -0,0 +1,66 @@ +/* + * Copyright (c) [2015] Red Hat, Inc. + * + * All Rights Reserved. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of version 2 of the GNU General Public License as published + * by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + */ + +#include + +#include "FilesTransferTask.h" +#include "FileSerialization.h" + +#define BUF_SIZE 65536 + +FilesTransferTask::FilesTransferTask(XComparison& xcmp) + : RefHolder(xcmp), xcmp(xcmp), _pipe(), ws(_pipe.write_end(), boost::bind(&FilesTransferTask::append_next, this)) +{ +} + + +void +FilesTransferTask::append_next() +{ + if (cit != xcmp.cmp->getFiles().end()) + { + vector vec; + + if(std::distance(cit, xcmp.cmp->getFiles().end()) > BUF_SIZE) + { + std::transform(cit, cit + BUF_SIZE, std::back_inserter(vec), FilesTransferTask::addressor()); + cit += BUF_SIZE; + } + else + { + std::transform(cit, xcmp.cmp->getFiles().end(), std::back_inserter(vec), FilesTransferTask::addressor()); + cit = xcmp.cmp->getFiles().end(); + } + + ws.append(vec); + } + + ws.async_write(); +} + + +void +FilesTransferTask::init() +{ + cit = xcmp.cmp->getFiles().begin(); + + append_next(); +} + diff --git a/server/FilesTransferTask.h b/server/FilesTransferTask.h new file mode 100644 index 0000000..cb89249 --- /dev/null +++ b/server/FilesTransferTask.h @@ -0,0 +1,60 @@ +/* + * Copyright (c) [2015] Red Hat, Inc. + * + * All Rights Reserved. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of version 2 of the GNU General Public License as published + * by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + */ + +#ifndef SNAPPER_FILES_TRANSFER_H +#define SNAPPER_FILES_TRANSFER_H + +#include + +#include "pipe/AsyncWriteStream.h" +#include "RefCounter.h" +#include "RefComparison.h" + +using namespace pipe_stream; +using namespace snapper; + + +class FilesTransferTask : public RefHolder +{ +public: + + FilesTransferTask(XComparison& xcmp); + + FileDescriptor& get_read_end() { return _pipe.read_end(); } + + void init(); + void start() { ws.run(); } + +private: + + struct addressor { + const File* operator()(const File & f) const { return &f; } + }; + + void append_next(); + + XComparison& xcmp; + Files::const_iterator cit; + + Pipe _pipe; + AsyncWriteStream> ws; +}; + +#endif diff --git a/server/Makefile.am b/server/Makefile.am index c268349..055639f 100644 --- a/server/Makefile.am +++ b/server/Makefile.am @@ -12,7 +12,9 @@ snapperd_SOURCES = \ MetaSnapper.cc MetaSnapper.h \ Background.cc Background.h \ Types.cc Types.h \ - RefCounter.cc RefCounter.h + RefCounter.cc RefCounter.h \ + FilesTransferTask.cc FilesTransferTask.h \ + FileSerialization.h -snapperd_LDADD = ../snapper/libsnapper.la ../dbus/libdbus.la -lrt +snapperd_LDADD = ../snapper/libsnapper.la ../dbus/libdbus.la ../pipe/libpipe.la -lrt -lboost_serialization diff --git a/server/RefComparison.h b/server/RefComparison.h new file mode 100644 index 0000000..46c7043 --- /dev/null +++ b/server/RefComparison.h @@ -0,0 +1,35 @@ +/* + * Copyright (c) [2015] Red Hat, Inc. + * + * All Rights Reserved. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of version 2 of the GNU General Public License as published + * by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + */ + +#ifndef SNAPPER_REF_COMPARISON_H +#define SNAPPER_REF_COMPARISON_H + +#include "RefCounter.h" +#include "snapper/Comparison.h" + +struct XComparison : public RefCounter +{ + XComparison(snapper::Comparison* cmp) : RefCounter(), cmp(cmp) {} + ~XComparison() { assert(use_count() == 0); delete cmp; } + const snapper::Comparison* cmp; +}; + +#endif + diff --git a/server/Types.cc b/server/Types.cc index 1aa23d1..aa8dc8e 100644 --- a/server/Types.cc +++ b/server/Types.cc @@ -101,4 +101,14 @@ namespace DBus return hoho; } + Hoho& + operator<<(Hoho& hoho, const FileDescriptor& data) + { + const int fd = data.get_fd(); + if (!dbus_message_iter_append_basic(hoho.top(), DBUS_TYPE_UNIX_FD, &fd)) + throw FatalException(); + + return hoho; + } + } diff --git a/server/Types.h b/server/Types.h index bf4441e..cc3caa1 100644 --- a/server/Types.h +++ b/server/Types.h @@ -32,12 +32,14 @@ #include #include #include +#include using std::string; using std::list; using namespace snapper; +using pipe_stream::FileDescriptor; namespace DBus @@ -59,4 +61,6 @@ namespace DBus Hoho& operator<<(Hoho& hoho, const Files& data); + Hoho& operator<<(Hoho& hoho, const FileDescriptor& data); + } diff --git a/server/snapperd.cc b/server/snapperd.cc index 7e0c340..cf54528 100644 --- a/server/snapperd.cc +++ b/server/snapperd.cc @@ -22,6 +22,7 @@ #include #include +#include #include #include @@ -123,6 +124,7 @@ MyMainLoop::client_disconnected(const string& name) { client->zombie = true; client->thread.interrupt(); + client->ft_thread.interrupt(); } reset_idle_count(); } @@ -261,6 +263,8 @@ main(int argc, char** argv) setLogQuery(&log_query); } + signal(SIGPIPE, SIG_IGN); + dbus_threads_init_default(); MyMainLoop mainloop(DBUS_BUS_SYSTEM); -- 2.5.5