Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ public boolean append(Object kbase, long koff, int klen, Object vbase, long voff
// must be stored in the same memory page.
// (8 byte key length) (key) (value) (8 byte pointer to next value)
int uaoSize = UnsafeAlignedOffset.getUaoSize();
final long recordLength = (2 * uaoSize) + klen + vlen + 8;
final long recordLength = (2L * uaoSize) + klen + vlen + 8;
if (currentPage == null || currentPage.size() - pageCursor < recordLength) {
if (!acquireNewPage(recordLength + uaoSize)) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ private[deploy] class DriverRunner(
// check if attempting another run
keepTrying = supervise && exitCode != 0 && !killed
if (keepTrying) {
if (clock.getTimeMillis() - processStart > successfulRunDuration * 1000) {
if (clock.getTimeMillis() - processStart > successfulRunDuration * 1000L) {
waitSeconds = 1
}
logInfo(s"Command exited with status $exitCode, re-launching after $waitSeconds s.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
// the left side of max is >=1 whenever partsScanned >= 2
numPartsToTry = Math.max(1,
(1.5 * num * partsScanned / results.size).toInt - partsScanned)
numPartsToTry = Math.min(numPartsToTry, partsScanned * 4)
numPartsToTry = Math.min(numPartsToTry, partsScanned * 4L)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ private[spark] class BlockManager(
case e: Exception if i < MAX_ATTEMPTS =>
logError(s"Failed to connect to external shuffle server, will retry ${MAX_ATTEMPTS - i}"
+ s" more times after waiting $SLEEP_TIME_SECS seconds...", e)
Thread.sleep(SLEEP_TIME_SECS * 1000)
Thread.sleep(SLEEP_TIME_SECS * 1000L)
case NonFatal(e) =>
throw new SparkException("Unable to register with external shuffle server due to : " +
e.getMessage, e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public static long calculateSizeOfUnderlyingByteArray(long numFields, int elemen
private long elementOffset;

private long getElementOffset(int ordinal, int elementSize) {
return elementOffset + ordinal * elementSize;
return elementOffset + ordinal * (long)elementSize;
}

public Object getBaseObject() { return baseObject; }
Expand Down Expand Up @@ -414,46 +414,46 @@ public byte[] toByteArray() {
public short[] toShortArray() {
short[] values = new short[numElements];
Platform.copyMemory(
baseObject, elementOffset, values, Platform.SHORT_ARRAY_OFFSET, numElements * 2);
baseObject, elementOffset, values, Platform.SHORT_ARRAY_OFFSET, numElements * 2L);
return values;
}

@Override
public int[] toIntArray() {
int[] values = new int[numElements];
Platform.copyMemory(
baseObject, elementOffset, values, Platform.INT_ARRAY_OFFSET, numElements * 4);
baseObject, elementOffset, values, Platform.INT_ARRAY_OFFSET, numElements * 4L);
return values;
}

@Override
public long[] toLongArray() {
long[] values = new long[numElements];
Platform.copyMemory(
baseObject, elementOffset, values, Platform.LONG_ARRAY_OFFSET, numElements * 8);
baseObject, elementOffset, values, Platform.LONG_ARRAY_OFFSET, numElements * 8L);
return values;
}

@Override
public float[] toFloatArray() {
float[] values = new float[numElements];
Platform.copyMemory(
baseObject, elementOffset, values, Platform.FLOAT_ARRAY_OFFSET, numElements * 4);
baseObject, elementOffset, values, Platform.FLOAT_ARRAY_OFFSET, numElements * 4L);
return values;
}

@Override
public double[] toDoubleArray() {
double[] values = new double[numElements];
Platform.copyMemory(
baseObject, elementOffset, values, Platform.DOUBLE_ARRAY_OFFSET, numElements * 8);
baseObject, elementOffset, values, Platform.DOUBLE_ARRAY_OFFSET, numElements * 8L);
return values;
}

private static UnsafeArrayData fromPrimitiveArray(
Object arr, int offset, int length, int elementSize) {
final long headerInBytes = calculateHeaderPortionInBytes(length);
final long valueRegionInBytes = elementSize * length;
final long valueRegionInBytes = (long)elementSize * length;
final long totalSizeInLongs = (headerInBytes + valueRegionInBytes + 7) / 8;
if (totalSizeInLongs > Integer.MAX_VALUE / 8) {
throw new UnsupportedOperationException("Cannot convert this array to unsafe format as " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public final class VariableLengthRowBasedKeyValueBatch extends RowBasedKeyValueB
@Override
public UnsafeRow appendRow(Object kbase, long koff, int klen,
Object vbase, long voff, int vlen) {
final long recordLength = 8 + klen + vlen + 8;
final long recordLength = 8L + klen + vlen + 8;
// if run out of max supported rows or page size, return null
if (numRows >= capacity || page == null || page.size() - pageCursor < recordLength) {
return null;
Expand Down
Loading