Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ matrix:
- sudo apt-get update -qq
- sudo apt-get install -qq valgrind
script:
- cd src/plasma
- make valgrind
- cd ../..

- python src/plasma/test/test.py valgrind

- python src/photon/test/test.py valgrind

install:
Expand Down
17 changes: 14 additions & 3 deletions src/plasma/Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
CC = gcc
CFLAGS = -g -Wall --std=c99 -D_XOPEN_SOURCE=500 -D_POSIX_C_SOURCE=200809L -I. -I../common -I../common/thirdparty
TEST_CFLAGS = -DPLASMA_TEST=1 -I.
BUILD = build

all: $(BUILD)/plasma_store $(BUILD)/plasma_manager $(BUILD)/plasma_client.so $(BUILD)/example $(BUILD)/libplasma_client.a
Expand All @@ -12,6 +13,9 @@ clean:
cd ../common; make clean
rm -r $(BUILD)/*

$(BUILD)/manager_tests: test/manager_tests.c plasma.h plasma_client.h plasma_client.c plasma_manager.h plasma_manager.c fling.h fling.c common
$(CC) $(CFLAGS) $(TEST_CFLAGS) -o $@ test/manager_tests.c plasma_manager.c plasma_client.c fling.c ../common/build/libcommon.a ../common/thirdparty/hiredis/libhiredis.a

$(BUILD)/plasma_store: plasma_store.c plasma.h fling.h fling.c malloc.c malloc.h thirdparty/dlmalloc.c common
$(CC) $(CFLAGS) plasma_store.c fling.c malloc.c ../common/build/libcommon.a -o $(BUILD)/plasma_store

Expand All @@ -28,12 +32,19 @@ $(BUILD)/example: plasma_client.c plasma.h example.c fling.h fling.c common
$(CC) $(CFLAGS) plasma_client.c example.c fling.c ../common/build/libcommon.a -o $(BUILD)/example

common: FORCE
cd ../common; make
git submodule update --init --recursive
cd ../common; make

# Set the request timeout low for testing purposes.
test: CFLAGS += -DRAY_TIMEOUT=50
test: FORCE
cd ../common; make redis
# First, build and run all the unit tests.
test: $(BUILD)/manager_tests FORCE
./build/manager_tests
cd ../common; make redis
# Next, build all the executables for Python testing.
test: all

valgrind: test
valgrind --leak-check=full --error-exitcode=1 ./build/manager_tests

FORCE:
41 changes: 34 additions & 7 deletions src/plasma/plasma_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
#include "fling.h"
#include "uthash.h"

/* Number of times we try connecting to a socket. */
#define NUM_CONNECT_ATTEMPTS 50

typedef struct {
/** Key that uniquely identifies the memory mapped file. In practice, we
* take the numerical value of the file descriptor in the object store. */
Expand Down Expand Up @@ -311,7 +314,8 @@ plasma_connection *plasma_connect(const char *store_socket_name,
*/
int fd = -1;
int connected_successfully = 0;
for (int num_attempts = 0; num_attempts < 50; ++num_attempts) {
for (int num_attempts = 0; num_attempts < NUM_CONNECT_ATTEMPTS;
++num_attempts) {
fd = connect_ipc_sock(store_socket_name);
if (fd >= 0) {
connected_successfully = 1;
Expand All @@ -330,6 +334,10 @@ plasma_connection *plasma_connect(const char *store_socket_name,
result->store_conn = fd;
if (manager_addr != NULL) {
result->manager_conn = plasma_manager_connect(manager_addr, manager_port);
if (result->manager_conn < 0) {
LOG_ERR("Could not connect to Plasma manager %s:%d", manager_addr,
manager_port);
}
} else {
result->manager_conn = -1;
}
Expand All @@ -348,18 +356,17 @@ void plasma_disconnect(plasma_connection *conn) {

#define h_addr h_addr_list[0]

/* TODO(swang): Return the error to the caller. */
int plasma_manager_connect(const char *ip_addr, int port) {
int plasma_manager_try_connect(const char *ip_addr, int port) {
int fd = socket(PF_INET, SOCK_STREAM, 0);
if (fd < 0) {
LOG_ERR("could not create socket");
exit(-1);
return -1;
}

struct hostent *manager = gethostbyname(ip_addr); /* TODO(pcm): cache this */
if (!manager) {
LOG_ERR("plasma manager %s not found", ip_addr);
exit(-1);
return -1;
}

struct sockaddr_in addr;
Expand All @@ -370,10 +377,26 @@ int plasma_manager_connect(const char *ip_addr, int port) {
int r = connect(fd, (struct sockaddr *) &addr, sizeof(addr));
if (r < 0) {
LOG_ERR(
"could not establish connection to manager with id %s:%d (probably ran "
"could not establish connection to manager with id %s:%d (may have run "
"out of ports)",
&ip_addr[0], port);
exit(-1);
return -1;
}
return fd;
}

int plasma_manager_connect(const char *ip_addr, int port) {
/* Try to connect to the Plasma manager. If unsuccessful, retry several times.
*/
int fd = -1;
for (int num_attempts = 0; num_attempts < NUM_CONNECT_ATTEMPTS;
++num_attempts) {
fd = plasma_manager_try_connect(ip_addr, port);
if (fd >= 0) {
break;
}
/* Sleep for 100 milliseconds. */
usleep(100000);
}
return fd;
}
Expand Down Expand Up @@ -432,3 +455,7 @@ void plasma_fetch(plasma_connection *conn,
"Received unexpected object ID from manager during fetch.");
}
}

int get_manager_fd(plasma_connection *conn) {
return conn->manager_conn;
}
15 changes: 13 additions & 2 deletions src/plasma/plasma_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,13 @@ plasma_connection *plasma_connect(const char *store_socket_name,
void plasma_disconnect(plasma_connection *conn);

/**
* Connect to a possibly remote Plasma Manager.
* Try to connect to a possibly remote Plasma Manager.
*
* @param addr The IP address of the Plasma Manager to connect to.
* @param port The port of the Plasma Manager to connect to.
* @return The file descriptor to use to send messages to the Plasma Manager.
* @return The file descriptor to use to send messages to the
* Plasma Manager. If connection was unsuccessful, this
* value is -1.
*/
int plasma_manager_connect(const char *addr, int port);

Expand Down Expand Up @@ -195,4 +197,13 @@ void plasma_fetch(plasma_connection *conn,
*/
int plasma_subscribe(plasma_connection *conn);

/**
* Get the file descriptor for the socket connection to the plasma manager.
*
* @param conn The plasma connection.
* @return The file descriptor for the manager connection. If there is no
* connection to the manager, this is -1.
*/
int get_manager_fd(plasma_connection *conn);

#endif
Loading