Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public interface Reconfigurable extends Configurable {
* If the property cannot be changed, throw a
* {@link ReconfigurationException}.
*/
public String reconfigureProperty(String property, String newVal)
void reconfigureProperty(String property, String newVal)
throws ReconfigurationException;

/**
Expand All @@ -46,12 +46,10 @@ public String reconfigureProperty(String property, String newVal)
* then changeConf should not throw an exception when changing
* this property.
*/
public boolean isPropertyReconfigurable(String property);
boolean isPropertyReconfigurable(String property);

/**
* Return all the properties that can be changed at run time.
*/
public Collection<String> getReconfigurableProperties();


Collection<String> getReconfigurableProperties();
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,14 @@ private static class ReconfigurationThread extends Thread {
// See {@link ReconfigurationServlet#applyChanges}
public void run() {
LOG.info("Starting reconfiguration task.");
Configuration oldConf = this.parent.getConf();
Configuration newConf = this.parent.getNewConf();
Collection<PropertyChange> changes =
this.parent.getChangedProperties(newConf, oldConf);
final Configuration oldConf = parent.getConf();
final Configuration newConf = parent.getNewConf();
final Collection<PropertyChange> changes =
parent.getChangedProperties(newConf, oldConf);
Map<PropertyChange, Optional<String>> results = Maps.newHashMap();
for (PropertyChange change : changes) {
String errorMessage = null;
if (!this.parent.isPropertyReconfigurable(change.prop)) {
if (!parent.isPropertyReconfigurable(change.prop)) {
LOG.info(String.format(
"Property %s is not configurable: old value: %s, new value: %s",
change.prop, change.oldVal, change.newVal));
Expand All @@ -130,17 +130,23 @@ public void run() {
+ "\" to \"" + ((change.newVal == null) ? "<default>" : change.newVal)
+ "\".");
try {
this.parent.reconfigurePropertyImpl(change.prop, change.newVal);
String effectiveValue =
parent.reconfigurePropertyImpl(change.prop, change.newVal);
if (change.newVal != null) {
oldConf.set(change.prop, effectiveValue);
} else {
oldConf.unset(change.prop);
}
} catch (ReconfigurationException e) {
errorMessage = e.getCause().getMessage();
}
results.put(change, Optional.fromNullable(errorMessage));
}

synchronized (this.parent.reconfigLock) {
this.parent.endTime = Time.now();
this.parent.status = Collections.unmodifiableMap(results);
this.parent.reconfigThread = null;
synchronized (parent.reconfigLock) {
parent.endTime = Time.now();
parent.status = Collections.unmodifiableMap(results);
parent.reconfigThread = null;
}
}
}
Expand Down Expand Up @@ -203,21 +209,19 @@ public void shutdownReconfigurationTask() {
* reconfigureProperty.
*/
@Override
public final String reconfigureProperty(String property, String newVal)
public final void reconfigureProperty(String property, String newVal)
throws ReconfigurationException {
if (isPropertyReconfigurable(property)) {
LOG.info("changing property " + property + " to " + newVal);
String oldVal;
synchronized(getConf()) {
oldVal = getConf().get(property);
reconfigurePropertyImpl(property, newVal);
getConf().get(property);
String effectiveValue = reconfigurePropertyImpl(property, newVal);
if (newVal != null) {
getConf().set(property, newVal);
getConf().set(property, effectiveValue);
} else {
getConf().unset(property);
}
}
return oldVal;
} else {
throw new ReconfigurationException(property, newVal,
getConf().get(property));
Expand Down Expand Up @@ -251,8 +255,15 @@ public boolean isPropertyReconfigurable(String property) {
* that is being changed. If this object owns other Reconfigurable objects
* reconfigureProperty should be called recursively to make sure that
* to make sure that the configuration of these objects is updated.
*
* @param property Name of the property that is being reconfigured.
* @param newVal Proposed new value of the property.
* @return Effective new value of the property. This may be different from
* newVal.
*
* @throws ReconfigurationException if there was an error applying newVal.
*/
protected abstract void reconfigurePropertyImpl(String property, String newVal)
throws ReconfigurationException;
protected abstract String reconfigurePropertyImpl(
String property, String newVal) throws ReconfigurationException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.conf;

import com.google.common.base.Optional;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
Expand All @@ -27,13 +28,13 @@
import org.junit.Before;

import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
Expand All @@ -44,6 +45,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;

public class TestReconfiguration {
private Configuration conf1;
Expand Down Expand Up @@ -129,9 +131,10 @@ public Collection<String> getReconfigurableProperties() {
}

@Override
public synchronized void reconfigurePropertyImpl(
public synchronized String reconfigurePropertyImpl(
String property, String newVal) throws ReconfigurationException {
// do nothing
return newVal;
}

/**
Expand Down Expand Up @@ -354,13 +357,14 @@ public Collection<String> getReconfigurableProperties() {
}

@Override
public synchronized void reconfigurePropertyImpl(String property,
public synchronized String reconfigurePropertyImpl(String property,
String newVal) throws ReconfigurationException {
try {
latch.await();
} catch (InterruptedException e) {
// Ignore
}
return newVal;
}
}

Expand Down Expand Up @@ -395,9 +399,9 @@ public void testAsyncReconfigure()
doReturn(false).when(dummy).isPropertyReconfigurable(eq("name2"));
doReturn(true).when(dummy).isPropertyReconfigurable(eq("name3"));

doNothing().when(dummy)
doReturn("dummy").when(dummy)
.reconfigurePropertyImpl(eq("name1"), anyString());
doNothing().when(dummy)
doReturn("dummy").when(dummy)
.reconfigurePropertyImpl(eq("name2"), anyString());
doThrow(new ReconfigurationException("NAME3", "NEW3", "OLD3",
new IOException("io exception")))
Expand Down Expand Up @@ -474,4 +478,132 @@ public void testStartReconfigurationFailureDueToExistingRunningTask()
GenericTestUtils.assertExceptionContains("The server is stopped", e);
}
}
}

/**
* Ensure that {@link ReconfigurableBase#reconfigureProperty} updates the
* parent's cached configuration on success.
* @throws IOException
*/
@Test (timeout=300000)
public void testConfIsUpdatedOnSuccess() throws ReconfigurationException {
final String property = "FOO";
final String value1 = "value1";
final String value2 = "value2";

final Configuration conf = new Configuration();
conf.set(property, value1);
final Configuration newConf = new Configuration();
newConf.set(property, value2);

final ReconfigurableBase reconfigurable = makeReconfigurable(
conf, newConf, Arrays.asList(property));

reconfigurable.reconfigureProperty(property, value2);
assertThat(reconfigurable.getConf().get(property), is(value2));
}

/**
* Ensure that {@link ReconfigurableBase#startReconfigurationTask} updates
* its parent's cached configuration on success.
* @throws IOException
*/
@Test (timeout=300000)
public void testConfIsUpdatedOnSuccessAsync() throws ReconfigurationException,
TimeoutException, InterruptedException, IOException {
final String property = "FOO";
final String value1 = "value1";
final String value2 = "value2";

final Configuration conf = new Configuration();
conf.set(property, value1);
final Configuration newConf = new Configuration();
newConf.set(property, value2);

final ReconfigurableBase reconfigurable = makeReconfigurable(
conf, newConf, Arrays.asList(property));

// Kick off a reconfiguration task and wait until it completes.
reconfigurable.startReconfigurationTask();
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return reconfigurable.getReconfigurationTaskStatus().stopped();
}
}, 100, 60000);
assertThat(reconfigurable.getConf().get(property), is(value2));
}

/**
* Ensure that {@link ReconfigurableBase#reconfigureProperty} unsets the
* property in its parent's configuration when the new value is null.
* @throws IOException
*/
@Test (timeout=300000)
public void testConfIsUnset() throws ReconfigurationException {
final String property = "FOO";
final String value1 = "value1";

final Configuration conf = new Configuration();
conf.set(property, value1);
final Configuration newConf = new Configuration();

final ReconfigurableBase reconfigurable = makeReconfigurable(
conf, newConf, Arrays.asList(property));

reconfigurable.reconfigureProperty(property, null);
assertNull(reconfigurable.getConf().get(property));
}

/**
* Ensure that {@link ReconfigurableBase#startReconfigurationTask} unsets the
* property in its parent's configuration when the new value is null.
* @throws IOException
*/
@Test (timeout=300000)
public void testConfIsUnsetAsync() throws ReconfigurationException,
IOException, TimeoutException, InterruptedException {
final String property = "FOO";
final String value1 = "value1";

final Configuration conf = new Configuration();
conf.set(property, value1);
final Configuration newConf = new Configuration();

final ReconfigurableBase reconfigurable = makeReconfigurable(
conf, newConf, Arrays.asList(property));

// Kick off a reconfiguration task and wait until it completes.
reconfigurable.startReconfigurationTask();
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return reconfigurable.getReconfigurationTaskStatus().stopped();
}
}, 100, 60000);
assertNull(reconfigurable.getConf().get(property));
}

private ReconfigurableBase makeReconfigurable(
final Configuration oldConf, final Configuration newConf,
final Collection<String> reconfigurableProperties) {

return new ReconfigurableBase(oldConf) {
@Override
protected Configuration getNewConf() {
return newConf;
}

@Override
public Collection<String> getReconfigurableProperties() {
return reconfigurableProperties;
}

@Override
protected String reconfigurePropertyImpl(
String property, String newVal) throws ReconfigurationException {
return newVal;
}
};
}
}

Loading