summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Ohly <patrick.ohly@intel.com>2014-01-14 14:44:00 +0100
committerPatrick Ohly <patrick.ohly@intel.com>2014-01-14 14:52:57 +0100
commit5be68241800bd6f7ab60fdc05856f1c2844e06f1 (patch)
treecbb77e120d65a36703f1e3e4c3282fa49092b121
parent57f0027e98f6157316eaa827190dc88e232fdf87 (diff)
PBAP: transfer data via pipe (part of FDO #72112)pbap-pipe
The main advantage is that processed data can be discarded immediately. When using a plain file, the entire address book must be stored in it. It also enables suspending a transfer by stopping to read from the pipe, either via some internal API or simply freezing the syncevo-local-sync process with SIGSTOP. The drawback is that obexd does not react well to a full pipe. It simply gets stuck in a blocking write(); in other words, all obexd operations get frozen and obexd stops responding on D-Bus.
-rw-r--r--src/backends/pbap/PbapSyncSource.cpp192
-rw-r--r--src/syncevo/TmpFile.cpp37
-rw-r--r--src/syncevo/TmpFile.h41
3 files changed, 216 insertions, 54 deletions
diff --git a/src/backends/pbap/PbapSyncSource.cpp b/src/backends/pbap/PbapSyncSource.cpp
index 3fa6280b..3c52af55 100644
--- a/src/backends/pbap/PbapSyncSource.cpp
+++ b/src/backends/pbap/PbapSyncSource.cpp
@@ -64,12 +64,28 @@ SE_BEGIN_CXX
#define OBC_TRANSFER_INTERFACE_NEW5 "org.bluez.obex.Transfer1"
typedef std::map<int, pcrecpp::StringPiece> Content;
+typedef std::list<std::string> ContactQueue;
class PullAll
{
std::string m_buffer; // vCards kept in memory when using old obexd.
TmpFile m_tmpFile; // Stored in temporary file and mmapped with more recent obexd.
- Content m_content; // Refers to chunks of m_buffer or m_tmpFile without copying them.
+
+ // When using memory-mapped files:
+ // refers to chunks of m_buffer or m_tmpFile without copying them via the contact number.
+ Content m_content;
+
+ // When using pipe:
+ // - split into queue of std::strings, read from start to finish
+ // - discard contact strings that are no longer needed
+ int m_firstContactInQueue;
+ ContactQueue m_queue;
+ // - buffer for reading from pipe
+ char *m_pipeBuffer;
+ size_t m_pipeBufferSize;
+ size_t m_pipeBufferUsed;
+ size_t m_pipeBufferTotal;
+
int m_numContacts; // Number of existing contacts, according to GetSize() or after downloading.
int m_currentContact; // Numbered starting with zero according to discovery in addVCards.
boost::shared_ptr<PbapSession> m_session; // Only set when there is a transfer ongoing.
@@ -77,11 +93,27 @@ class PullAll
friend class PbapSession;
public:
+ PullAll();
+ ~PullAll();
+
std::string getNextID();
bool getContact(int contactNumber, pcrecpp::StringPiece &vcard);
const char *addVCards(int startIndex, const pcrecpp::StringPiece &content);
};
+PullAll::PullAll() :
+ m_firstContactInQueue(0),
+ m_pipeBuffer(NULL),
+ m_pipeBufferSize(0),
+ m_pipeBufferUsed(0),
+ m_pipeBufferTotal(0)
+{}
+
+PullAll::~PullAll()
+{
+ free(m_pipeBuffer);
+}
+
enum PullData
{
PULL_AS_CONFIGURED,
@@ -541,7 +573,8 @@ boost::shared_ptr<PullAll> PbapSession::startPullAll(PullData pullData)
state->m_numContacts = GDBusCXX::DBusClientCall1<uint16_t>(*m_session, "GetSize")();
SE_LOG_DEBUG(NULL, "Expecting %d contacts.", state->m_numContacts);
- state->m_tmpFile.create();
+ TmpFile::Type type = getenv("SYNCEVOLUTION_PBAP_PIPE") ? TmpFile::PIPE : TmpFile::FILE;
+ state->m_tmpFile.create(type);
SE_LOG_DEBUG(NULL, "Created temporary file for PullAll %s", state->m_tmpFile.filename().c_str());
GDBusCXX::DBusClientCall1<std::pair<GDBusCXX::DBusObject_t, Params> > pullall(*m_session, "PullAll");
std::pair<GDBusCXX::DBusObject_t, Params> tuple =
@@ -590,7 +623,14 @@ const char *PullAll::addVCards(int startIndex, const pcrecpp::StringPiece &vcard
pcrecpp::RE re("[\\r\\n]*(^BEGIN:VCARD.*?^END:VCARD)",
pcrecpp::RE_Options().set_dotall(true).set_multiline(true));
while (re.Consume(&tmp, &vcarddata)) {
- m_content[count] = vcarddata;
+ if (m_tmpFile.getType() == TmpFile::PIPE) {
+ // Must copy into queue.
+ m_queue.push_back(std::string());
+ m_queue.back().assign(vcarddata.data(), vcarddata.size());
+ } else {
+ // Can continue using the memory-mapped file.
+ m_content[count] = vcarddata;
+ }
++count;
}
@@ -644,58 +684,122 @@ bool PullAll::getContact(int contactNumber, pcrecpp::StringPiece &vcard)
return false;
}
- Content::iterator it;
- while ((it = m_content.find(contactNumber)) == m_content.end() &&
- m_session &&
- (!m_session->transferComplete() ||
- m_tmpFile.moreData())) {
- // Wait? We rely on regular propgress signals to wake us up.
- // obex 0.47 sends them every 64KB, at least in combination
- // with a Samsung Galaxy SIII. This may depend on both obexd
- // and the phone, so better check ourselves and perhaps do it
- // less often - unmap/map can be expensive and invalidates
- // some of the unread data (at least how it is implemented
- // now).
- while (!m_session->transferComplete() && m_tmpFile.moreData() < 128 * 1024) {
- g_main_context_iteration(NULL, true);
+ if (m_tmpFile.getType() == TmpFile::PIPE) {
+ // Delete old contacts.
+ ContactQueue::iterator it = m_queue.begin();
+ while (m_firstContactInQueue < contactNumber) {
+ ++it;
+ ++m_firstContactInQueue;
}
- m_session->checkForError();
- if (m_tmpFile.moreData()) {
- // Remap. This shifts all addresses already stored in
- // m_content, so beware and update those.
- pcrecpp::StringPiece oldMem = m_tmpFile.stringPiece();
- m_tmpFile.unmap();
- m_tmpFile.map();
- pcrecpp::StringPiece newMem = m_tmpFile.stringPiece();
- ssize_t delta = newMem.data() - oldMem.data();
- BOOST_FOREACH (Content::value_type &entry, m_content) {
- pcrecpp::StringPiece &vcard = entry.second;
- vcard.set(vcard.data() + delta, vcard.size());
+ m_queue.erase(m_queue.begin(), it);
+
+ bool eof = false;
+ while (m_queue.empty() &&
+ (!m_session->transferComplete() || !eof)) {
+ // Read at least 64KB, increase buffer if too
+ // small. Happens at least once (initial read) and may
+ // happen again when a contact is larger than the current
+ // buffer size.
+ static const size_t chunkSize = 64 * 1024;
+ if (m_pipeBufferSize - m_pipeBufferUsed < chunkSize) {
+ size_t newSize = m_pipeBufferSize + chunkSize;
+ char *newBuffer = (char *)realloc(m_pipeBuffer, newSize);
+ if (!newBuffer) {
+ // Nothing changed, but we can't proceed.
+ SE_THROW("getContact(): out of memory");
+ }
+ m_pipeBuffer = newBuffer;
+ m_pipeBufferSize = newSize;
+ }
+
+ // Try reading. Blocks until at least one byte becomes available.
+ ssize_t newData = read(m_tmpFile.getFD(), m_pipeBuffer + m_pipeBufferUsed, m_pipeBufferSize - m_pipeBufferUsed);
+ SE_LOG_DEBUG(NULL, "PBAP content: next chunk %ld, total %ld, %s",
+ (long)newData, (long)m_pipeBufferTotal,
+ newData < 0 ? strerror(errno) : "<<okay>>");
+ if (newData == 0) {
+ eof = true;
+ } else if (newData < 0) {
+ SE_THROW(StringPrintf("reading PBAP data from pipe: %s", strerror(errno)));
+ } else {
+ m_pipeBufferUsed += newData;
+ m_pipeBufferTotal += newData;
}
// File exists and obexd has written into it, so now we
// can unlink it to avoid leaking it if we crash.
m_tmpFile.remove();
- // Continue parsing where we stopped before.
- pcrecpp::StringPiece next(newMem.data() + m_tmpFileOffset,
- newMem.size() - m_tmpFileOffset);
+ // Parse next chunk, shift remaining data that couldn't
+ // be parsed yet to beginning of buffer and continue;
+ pcrecpp::StringPiece next(m_pipeBuffer, m_pipeBufferUsed);
const char *end = addVCards(m_content.size(), next);
- int newTmpFileOffset = end - newMem.data();
- SE_LOG_DEBUG(NULL, "PBAP content parsed: %d out of %d (total), %d out of %d (last update)",
- newTmpFileOffset,
- newMem.size(),
- (int)(end - next.data()),
- next.size());
- m_tmpFileOffset = newTmpFileOffset;
+ size_t remaining = m_pipeBuffer + m_pipeBufferUsed - end;
+ memmove(m_pipeBuffer, end, remaining);
+ m_pipeBufferUsed = remaining;
}
- }
- if (it == m_content.end()) {
- SE_LOG_DEBUG(NULL, "did not get the expected contact #%d, perhaps some contacts were deleted?", contactNumber);
- return false;
+ if (m_queue.empty()) {
+ SE_LOG_DEBUG(NULL, "did not get the expected contact #%d, perhaps some contacts were deleted?", contactNumber);
+ return false;
+ }
+ const std::string &next = m_queue.front();
+ vcard.set(next.c_str(), next.size());
+ } else {
+ Content::iterator it;
+ while ((it = m_content.find(contactNumber)) == m_content.end() &&
+ m_session &&
+ (!m_session->transferComplete() ||
+ m_tmpFile.moreData())) {
+ // Wait? We rely on regular propgress signals to wake us up.
+ // obex 0.47 sends them every 64KB, at least in combination
+ // with a Samsung Galaxy SIII. This may depend on both obexd
+ // and the phone, so better check ourselves and perhaps do it
+ // less often - unmap/map can be expensive and invalidates
+ // some of the unread data (at least how it is implemented
+ // now).
+ while (!m_session->transferComplete() && m_tmpFile.moreData() < 128 * 1024) {
+ g_main_context_iteration(NULL, true);
+ }
+ m_session->checkForError();
+ if (m_tmpFile.moreData()) {
+ // Remap. This shifts all addresses already stored in
+ // m_content, so beware and update those.
+ pcrecpp::StringPiece oldMem = m_tmpFile.stringPiece();
+ m_tmpFile.unmap();
+ m_tmpFile.map();
+ pcrecpp::StringPiece newMem = m_tmpFile.stringPiece();
+ ssize_t delta = newMem.data() - oldMem.data();
+ BOOST_FOREACH (Content::value_type &entry, m_content) {
+ pcrecpp::StringPiece &vcard = entry.second;
+ vcard.set(vcard.data() + delta, vcard.size());
+ }
+
+ // File exists and obexd has written into it, so now we
+ // can unlink it to avoid leaking it if we crash.
+ m_tmpFile.remove();
+
+ // Continue parsing where we stopped before.
+ pcrecpp::StringPiece next(newMem.data() + m_tmpFileOffset,
+ newMem.size() - m_tmpFileOffset);
+ const char *end = addVCards(m_content.size(), next);
+ int newTmpFileOffset = end - newMem.data();
+ SE_LOG_DEBUG(NULL, "PBAP content parsed: %d out of %d (total), %d out of %d (last update)",
+ newTmpFileOffset,
+ newMem.size(),
+ (int)(end - next.data()),
+ next.size());
+ m_tmpFileOffset = newTmpFileOffset;
+ }
+ }
+
+ if (it == m_content.end()) {
+ SE_LOG_DEBUG(NULL, "did not get the expected contact #%d, perhaps some contacts were deleted?", contactNumber);
+ return false;
+ }
+ vcard = it->second;
}
- vcard = it->second;
+
return true;
}
diff --git a/src/syncevo/TmpFile.cpp b/src/syncevo/TmpFile.cpp
index 3e988968..d1a59811 100644
--- a/src/syncevo/TmpFile.cpp
+++ b/src/syncevo/TmpFile.cpp
@@ -19,7 +19,9 @@
#include <cstdio>
+#include <errno.h>
#include <unistd.h>
+#include <fcntl.h>
#include <sys/stat.h>
#include <sys/mman.h>
@@ -27,9 +29,11 @@
#include <glib/gstdio.h>
#include "TmpFile.h"
+#include "util.h"
TmpFile::TmpFile() :
+ m_type(FILE),
m_fd(-1),
m_mapptr(0),
m_mapsize(0)
@@ -42,6 +46,10 @@ TmpFile::~TmpFile()
try {
unmap();
close();
+ if (m_type == PIPE &&
+ !m_filename.empty()) {
+ unlink(m_filename.c_str());
+ }
} catch (std::exception &x) {
fprintf(stderr, "TmpFile::~TmpFile(): %s\n", x.what());
} catch (...) {
@@ -50,7 +58,7 @@ TmpFile::~TmpFile()
}
-void TmpFile::create()
+void TmpFile::create(Type type)
{
gchar *filename = NULL;
GError *error = NULL;
@@ -66,6 +74,33 @@ void TmpFile::create()
}
m_filename = filename;
g_free(filename);
+ m_type = type;
+ if (type == PIPE) {
+ // We merely use the normal file to get a temporary file name which
+ // is guaranteed to be unique. There's a slight chance for a denial-of-service
+ // attack when someone creates a link or normal file directly after we remove
+ // the file, but because mknod neither overwrites an existing entry nor follows
+ // symlinks, the effect is smaller compared to opening a file.
+ unlink(m_filename.c_str());
+ if (mknod(m_filename.c_str(), S_IFIFO|S_IRWXU, 0)) {
+ m_filename = "";
+ throw TmpFileException(SyncEvo::StringPrintf("mknod(%s): %s",
+ m_filename.c_str(),
+ strerror(errno)));
+ }
+ // Open without blocking. Necessary because otherwise we end up
+ // waiting here. Opening later also does not work, because then
+ // obexd gets stuck in its open() call while we wait for it to
+ // acknowledge the start of the transfer.
+ m_fd = open(m_filename.c_str(), O_RDONLY|O_NONBLOCK, 0);
+ if (m_fd < 0) {
+ throw TmpFileException(SyncEvo::StringPrintf("open(%s): %s",
+ m_filename.c_str(),
+ strerror(errno)));
+ }
+ // From now on, block on the pipe.
+ fcntl(m_fd, F_SETFL, fcntl(m_fd, F_GETFL) & ~O_NONBLOCK);
+ }
}
diff --git a/src/syncevo/TmpFile.h b/src/syncevo/TmpFile.h
index 9b4f676b..45261e99 100644
--- a/src/syncevo/TmpFile.h
+++ b/src/syncevo/TmpFile.h
@@ -40,27 +40,30 @@ class TmpFileException : public std::runtime_error
/**
* Class for handling temporary files, either read/write access
- * or memory mapped.
+ * or memory mapped. Optionally creates a pipe instead of a plain file.
+ *
+ * Reading is done mapping the plain file into memory (file) or simply
+ * reading from the file descriptor (file or pipe).
*
* Closing and removing a mapped file is supported by calling close()
* after map().
*/
class TmpFile
{
- protected:
- int m_fd;
- void *m_mapptr;
- size_t m_mapsize;
- std::string m_filename;
-
public:
+ enum Type {
+ FILE,
+ PIPE
+ };
+
TmpFile();
virtual ~TmpFile();
/**
- * Create a temporary file.
+ * Create a temporary file or pipe.
*/
- void create();
+ void create(Type type = FILE);
+
/**
* Map a view of file and optionally return pointer and/or size.
*
@@ -70,12 +73,24 @@ class TmpFile
* @param mapsize Pointer to variable for mapped size. (can be NULL)
*/
void map(void **mapptr = 0, size_t *mapsize = 0);
+
/**
* Unmap a view of file.
*/
void unmap();
/**
+ * File descriptor, ready for reading from start of file or pipe
+ * after create().
+ */
+ int getFD() const { return m_fd; }
+
+ /**
+ * FILE by default, otherwise the value given to create().
+ */
+ Type getType() const { return m_type; }
+
+ /**
* Returns amount of bytes not mapped into memory yet, zero if none.
*/
size_t moreData() const;
@@ -134,6 +149,14 @@ class TmpFile
* @return pcrecpp::StringPiece of the mapped view
*/
pcrecpp::StringPiece stringPiece();
+
+ protected:
+ Type m_type;
+ int m_fd;
+ void *m_mapptr;
+ size_t m_mapsize;
+ std::string m_filename;
+
};
#endif // INCL_SYNCEVOLUTION_TMPFILE